Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up

Python ElasticSearch ORM based on Pydantic

License

NotificationsYou must be signed in to change notification settings

wallneradam/esorm

Repository files navigation

Logo

ESORM - Python ElasticSearch ORM based on Pydantic

ESORM is an ElasticSearch Object Relational Mapper or Object Document Mapper (ODM) if you like,for Python based on Pydantic. It is a high-level library for managing ElasticSearch documentsin Python. It is fully async and uses annotations and type hints for type checking and IDE autocompletion.

☰ Table of Contents

💾 Installation

pip install pyesorm

🚀 Features

  • Pydantic model representation of ElasticSearch documents
  • Automatic mapping and index creation
  • CRUD operations
  • Full async support (no sync version at all)
  • Mapping to and from ElasticSearch types
  • Support for nested documents
  • Automatic optimistic concurrency control
  • Custom id field
  • Context for bulk operations
  • Supported IDE autocompletion and type checking (PyCharm tested)
  • Everything in the source code is documented and annotated
  • TypedDicts for ElasticSearch queries and aggregations
  • Docstring support for fields
  • Shard routing support
  • Lazy properties
  • Support >= Python 3.8 (tested with 3.8 through 3.12)
  • Support for ElasticSearch 8.x and 7.x
  • Watcher support (You may need ElasticSearch subscription license for this)
  • Pagination and sorting
  • FastAPI integration

Not all ElasticSearch features are supported yet, pull requests are welcome.

Supported ElasticSearch versions

It is tested with ElasticSearch 7.x and 8.x.

Supported Python versions

Tested with Python 3.8 through 3.12.

📖 Usage

Define a model

You can use allPydantic model features, becauseESModel is a subclass ofpydantic.BaseModel.(Actually it is a subclass ofESBaseModel, see morebelow...)

ESModel extends pydanticBaseModel with ElasticSearch specific features. It serializes and deserializesdocuments to and from ElasticSearch types and handle ElasticSearch operations in the background.

Python basic types

fromesormimportESModelclassUser(ESModel):name:strage:int

This is how the python types are converted to ES types:

Python typeES typeComment
strtext
intlong
floatdouble
boolboolean
datetime.datetimedate
datetime.datedate
datetime.timedateStored as 1970-01-01 + time
typing.Literalkeyword
UUIDkeyword
Pathkeyword
IntEnuminteger
Enumkeywordalso StrEnum

Some special pydanctic types are also supported:

Pydantic typeES typeComment
URLkeyword
IPvAddressAnyip

ESORM field types

You can specify ElasticSearch special fields usingesorm.fields module.

fromesormimportESModelfromesorm.fieldsimportkeyword,text,byte,geo_pointclassUser(ESModel):name:textemail:keywordage:bytelocation:geo_point    ...

The supported fields are:

Field nameES type
keywordkeyword
texttext
binarybinary
bytebyte
shortshort
integer orint32integer
long orint64long
unsigned_long oruint64unsigned_long
float16 orhalf_floathalf_float
float32float
doubledouble
booleanboolean
geo_pointgeo_point

Thebinary field acceptsbase64 encoded strings. However, if you providebytes to it, theywill be automatically converted to abase64 string during serialization. When you retrieve thefield, it will always be abase64 encoded string. You can easily convert it back to bytes usingthebytes() method:binary_field.bytes().

You can also useAnnotated types to specify the ES type, like PydanticPositiveInt andNegativeInt and similar.

geo_point

You can use geo_point field type for location data:

fromesormimportESModelfromesorm.fieldsimportgeo_pointclassPlace(ESModel):name:strlocation:geo_pointdefcreate_place():place=Place(name='Budapest',location=geo_point(lat=47.4979,long=19.0402))place.save()

Nested documents

fromesormimportESModelfromesorm.fieldsimportkeyword,text,byteclassUser(ESModel):name:textemail:keywordage:byte=18classPost(ESModel):title:textcontent:textwriter:User# User is a nested document

List primitive fields

You can use list of primitive fields:

fromtypingimportListfromesormimportESModelclassUser(ESModel):emails:List[str]favorite_ids:List[int]     ...

ESBaseModel

ESBaseModel is the base ofESModel.

Use it for abstract models
fromesormimportESModel,ESBaseModelfromesorm.fieldsimportkeyword,text,byte# This way `User` model won't be in the indexclassBaseUser(ESBaseModel):# <---------------# This config will be inherited by UserclassESConfig:id_field='email'name:textemail:keyword# This will be in the index because it is a subclass of ESModelclassUserExtended(BaseUser,ESModel):age:byte=18asyncdefcreate_user():user=UserExtended(name='John Doe',email="john@example.com",age=25    )awaituser.save()
Use it for nested documents

It is useful to use it for nested documents, because by using it will not be included in theElasticSearch index.

fromesormimportESModel,ESBaseModelfromesorm.fieldsimportkeyword,text,byte# This way `User` model won't be in the indexclassUser(ESBaseModel):# <---------------name:textemail:keywordage:byte=18classPost(ESModel):title:textcontent:textwriter:User# User is a nested document

Id field

You can specify id field inmodel settings:

fromesormimportESModelfromesorm.fieldsimportkeyword,text,byteclassUser(ESModel):classESConfig:id_field='email'name:textemail:keywordage:byte=18

This way the field specified inid_field will be removed from the document and used as the document_id in theindex.

If you specify a field namedid in your model, it will be used as the document_id in the index(it will automatically override theid_field setting):

fromesormimportESModelclassUser(ESModel):id:int# This will be used as the document _id in the indexname:str

You can also create an__id__ property in your model to return a custom id:

fromesormimportESModelfromesorm.fieldsimportkeyword,text,byteclassUser(ESModel):name:textemail:keywordage:byte=18@propertydef__id__(self)->str:returnself.email

NOTE: annotation of__id__ method is important, and it must be declared as a property.

Model Settings

You can specify model settings usingESConfig child class.

fromtypingimportOptional,List,Dict,AnyfromesormimportESModelclassUser(ESModel):classESConfig:""" ESModel Config """# The index nameindex:Optional[str]=None# The name of the 'id' fieldid_field:Optional[str]=None# Default sortdefault_sort:Optional[List[Dict[str,Dict[str,str]]]]=None# ElasticSearch index settings (https://www.elastic.co/guide/en/elasticsearch/reference/current/index-modules.html)settings:Optional[Dict[str,Any]]=None# Maximum recursion depth of lazy propertieslazy_property_max_recursion_depth:int=1

ESModelTimestamp

You can useESModelTimestamp class to addcreated_at andupdated_at fields to your model:

fromesormimportESModelTimestampclassUser(ESModelTimestamp):name:strage:int

These fields will be automatically updated to the actualdatetime when you create or update a document.Thecreated_at field will be set only when you create a document. Theupdated_at field will be setwhen you create or update a document.

Describe fields

You can use the usualPydantic field description, but you can also use docstrings like this:

fromesormimportESModelfromesorm.fieldsimportTextFieldclassUser(ESModel):name:str='John Doe'""" The name of the user """age:int=18""" The age of the user """# This is the usual Pydantic way, but I think docstrings are more intuitive and readableaddress:str=TextField(description="The address of the user")

The documentation is usseful if you create an API and you want to generate documentation from the model.It can be used inFastAPI for example.

Aliases

You can specify aliases for fields:

fromesormimportESModelfromesorm.fieldsimportkeyword,FieldclassUser(ESModel):full_name:keyword=Field(alias='fullName')# In ES `fullName` will be the field name

This is good for renaming fields in the model without changing the ElasticSearch field name.

Connecting to ElasticSearch

You can connect with a simple connection string:

fromesormimportconnectasyncdefes_init():awaitconnect('localhost:9200')

Also you can connect to multiple hosts if you have a cluster:

fromesormimportconnectasyncdefes_init():awaitconnect(['localhost:9200','localhost:9201'])

You can wait for node or cluster to be ready (recommended):

fromesormimportconnectasyncdefes_init():awaitconnect('localhost:9200',wait=True)

This will ping the node in 2 seconds intervals until it is ready. It can be a long time.

You can pass any arguments thatAsyncElasticsearch supports:

fromesormimportconnectasyncdefes_init():awaitconnect('localhost:9200',wait=True,sniff_on_start=True,sniff_on_connection_fail=True)

Client

Theconnect function is a wrapper for theAsyncElasticsearch constructor. It creates and storesa global instance of a proxy to anAsyncElasticsearch instance. The model operations will use thisinstance to communicate with ElasticSearch. You can retrieve the proxy client instance and you canuse the same way asAsyncElasticsearch instance:

fromesormimportesasyncdefes_init():awaites.ping()

Create index templates

You can create index templates easily:

fromesormimportmodelasesorm_model# Create index templateasyncdefprepare_es():awaitesorm_model.create_index_template('default_template',prefix_name='esorm_',shards=3,auto_expand_replicas='1-5')

Here this will be applied allesorm_ prefixed (default) indices.

All indices created by ESORM have a prefix, which you can modify globally if you want:

fromesorm.modelimportset_default_index_prefixset_default_index_prefix('custom_prefix_')

The default prefix isesorm_.

Create indices and mappings

You can create indices and mappings automatically from your models:

fromesormimportsetup_mappings# Create indices and mappingsasyncdefprepare_es():importmodels# Import your models# Here models argument is not needed, but you can pass it to prevent unused import warningawaitsetup_mappings(models)

First you must create (import) all model classes. Model classes will be registered into a global registry.Then you can callsetup_mappings function to create indices and mappings for all registered models.

IMPORTANT: This method will ignore mapping errors if you already have an index with the same name. It can update theindicesby new fields, but cannot modify or delete fields! For that you need to reindex your ES database. It is an ElasticSearchlimitation.

Model instances

When you get a model instance from elasticsearch bysearch orget methods, you will get the following privateattributes filled automatically:

AttributeDescription
_idThe ES id of the document
_routingThe routing value of the document
_versionVersion of the document
_primary_termThe primary term of the document
_seq_noThe sequence number of the document

CRUD: Create

fromesormimportESModel# Here the model have automatically generated idclassUser(ESModel):name:strage:intasyncdefcreate_user():# Create a new useruser=User(name='John Doe',age=25)# Save the user to ElasticSearchnew_user_id=awaituser.save()print(new_user_id)

CRUD: Read

fromesormimportESModel# Here the model have automatically generated idclassUser(ESModel):name:strage:intasyncdefget_user(user_id:str):user=awaitUser.get(user_id)print(user.name)

CRUD: Update

On update race conditions are checked automatically (with the help of _primary_term and _seq_no fields).This way an optimistic locking mechanism is implemented.

fromesormimportESModel# Here the model have automatically generated idclassUser(ESModel):name:strage:intasyncdefupdate_user(user_id:str):user=awaitUser.get(user_id)user.name='Jane Doe'awaituser.save()

CRUD: Delete

fromesormimportESModel# Here the model have automatically generated idclassUser(ESModel):name:strage:intasyncdefdelete_user(user_id:str):user=awaitUser.get(user_id)awaituser.delete()

Bulk operations

Bulk operations could be much faster than single operations, if you have lot of documents tocreate, update or delete.

You can use context for bulk operations:

fromtypingimportListfromesormimportESModel,ESBulk# Here the model have automatically generated idclassUser(ESModel):name:strage:intasyncdefbulk_create_users():asyncwithESBulk()asbulk:# Creating or modifiying modelsforiinrange(10):user=User(name=f'User{i}',age=i)awaitbulk.save(user)asyncdefbulk_delete_users(users:List[User]):asyncwithESBulk(wait_for=True)asbulk:# Here we wait for the bulk operation to finish# Deleting modelsforuserinusers:awaitbulk.delete(user)

Thewait_for argument is optional. If it isTrue, the context will wait for the bulk operation to finish.

Search

General search

You can search for documents usingsearch method, where an ES query can be specified as a dictionary.You can useres_dict=True argument to get the result as a dictionary instead of a list. The key will be theid of the document:await User.search(query, res_dict=True).

If you only need one result, you can usesearch_one method.

fromesormimportESModel# Here the model have automatically generated idclassUser(ESModel):name:strage:intasyncdefsearch_users():# Search for users at least 18 years oldusers=awaitUser.search(query={'bool': {'must': [{'range': {'age': {'gte':18                        }                    }                }]            }        }    )foruserinusers:print(user.name)asyncdefsearch_one_user():# Search a user named John Doeuser=awaitUser.search_one(query={'bool': {'must': [{'match': {'name': {'query':'John Doe'                        }                    }                }]            }        }    )print(user.name)

Queries are type checked, because they are annotated asTypedDicts. You can use IDE autocompletion and type checking.

Search with field value terms (dictionary search)

You can search for documents usingsearch_by_fields method, where you can specify a field and a value.It also has ares_dict argument andsearch_one_by_fields variant.

fromesormimportESModel# Here the model have automatically generated idclassUser(ESModel):name:strage:intasyncdefsearch_users():# Search users age is 18users=awaitUser.search_by_fields({'age':18})foruserinusers:print(user.name)

Aggregations

You can useaggregate method to get aggregations.You can specify an ES aggregation query as a dictionary. It also accepts normal ES queries,to be able to fiter which documents you want to aggregate.Both the aggs parameter and the query parameter are type checked, because they are annotated asTypedDicts.You can use IDE autocompletion and type checking.

fromesormimportESModel# Here the model have automatically generated idclassUser(ESModel):name:strage:intcountry:strasyncdefaggregate_avg():# Get average age of usersaggs_def= {'avg_age': {'avg': {'field':'age'            }        }    }aggs=awaitUser.aggregate(aggs_def)print(aggs['avg_age']['value'])asyncdefaggregate_avg_by_country(country='Hungary'):# Get average age of users by countryaggs_def= {'avg_age': {'avg': {'field':'age'            }        }    }query= {'bool': {'must': [{'match': {'country': {'query':country                    }                }            }]        }    }aggs=awaitUser.aggregate(aggs_def,query)print(aggs['avg_age']['value'])asyncdefaggregate_terms():# Get number of users by countryaggs_def= {'countries': {'terms': {'field':'country'            }        }    }aggs=awaitUser.aggregate(aggs_def)forbucketinaggs['countries']['buckets']:print(bucket['key'],bucket['doc_count'])

Pagination and sorting

You can usePagination andSort classes to decorate your models. They simply wrap your modelsand add pagination and sorting functionality to them.

Pagination

You can add a callback parameter to thePagination class which will be invoked after the search withthe total number of documents found.

fromesorm.modelimportESModel,PaginationclassUser(ESModel):id:int# This will be used as the document _id in the indexname:strage:intdefget_users(page=1,page_size=10):defpagination_callback(total:int):# You may set a header value or something else hereprint(f'Total users:{total}')# 1st create the decorator itselfpagination=Pagination(page=page,page_size=page_size)# Then decorate your modelres=pagination(User).search_by_fields(age=18)# Here the result has maximum 10 itemsreturnres

Sorting

It is similar to pagination:

fromesorm.modelimportESModel,SortclassUser(ESModel):id:int# This will be used as the document _id in the indexname:strage:intdefget_users():# 1st create the decorator itselfsort=Sort(sort=[        {'age': {'order':'desc'}},        {'name': {'order':'asc'}}    ])# Then decorate your modelres=sort(User).search_by_fields(age=18)# Here the result is sorted by age ascendingreturnresdefget_user_sorted_by_name():# You can also use this simplified syntaxsort=Sort(sort='name')# Then decorate your modelres=sort(User).all()# Here the result is sorted by age descendingreturnres

🧪 Testing

For testing you can use thetest.sh in the root directory. It is a script to runningtests on multiple python interpreters in virtual environments. At the top of the file you can specifywhich python interpreters you want to test. The ES versions are specified intests/docker-compose.yml file.

If you already have a virtual environment, simply usepytest to run the tests.

🛡 License

This project is licensed under the terms of theMozilla Public License 2.0 (MPL 2.0) license.

📃 Citation

If you use this project in your research, please cite it using the following BibTeX entry:

@misc{esorm,author ={Adam Wallner},title ={ESORM: ElasticSearch Object Relational Mapper},year ={2023},publisher ={GitHub},journal ={GitHub repository},howpublished ={\url{https://github.com/wallneradam/esorm}},}

[8]ページ先頭

©2009-2025 Movatter.jp