- Notifications
You must be signed in to change notification settings - Fork1
Python ElasticSearch ORM based on Pydantic
License
wallneradam/esorm
Folders and files
Name | Name | Last commit message | Last commit date | |
---|---|---|---|---|
Repository files navigation
ESORM is an ElasticSearch Object Relational Mapper or Object Document Mapper (ODM) if you like,for Python based on Pydantic. It is a high-level library for managing ElasticSearch documentsin Python. It is fully async and uses annotations and type hints for type checking and IDE autocompletion.
- 💾 Installation
- 🚀 Features
- 📖 Usage
- 🔬 Advanced usage
- 🧪 Testing
- 🛡 License
- 📃 Citation
pip install pyesorm
- Pydantic model representation of ElasticSearch documents
- Automatic mapping and index creation
- CRUD operations
- Full async support (no sync version at all)
- Mapping to and from ElasticSearch types
- Support for nested documents
- Automatic optimistic concurrency control
- Custom id field
- Context for bulk operations
- Supported IDE autocompletion and type checking (PyCharm tested)
- Everything in the source code is documented and annotated
TypedDict
s for ElasticSearch queries and aggregations- Docstring support for fields
- Shard routing support
- Lazy properties
- Support >= Python 3.8 (tested with 3.8 through 3.12)
- Support for ElasticSearch 8.x and 7.x
- Watcher support (You may need ElasticSearch subscription license for this)
- Pagination and sorting
- FastAPI integration
Not all ElasticSearch features are supported yet, pull requests are welcome.
It is tested with ElasticSearch 7.x and 8.x.
Tested with Python 3.8 through 3.12.
You can use allPydantic model features, becauseESModel
is a subclass ofpydantic.BaseModel
.(Actually it is a subclass ofESBaseModel
, see morebelow...)
ESModel
extends pydanticBaseModel
with ElasticSearch specific features. It serializes and deserializesdocuments to and from ElasticSearch types and handle ElasticSearch operations in the background.
fromesormimportESModelclassUser(ESModel):name:strage:int
This is how the python types are converted to ES types:
Python type | ES type | Comment |
---|---|---|
str | text | |
int | long | |
float | double | |
bool | boolean | |
datetime.datetime | date | |
datetime.date | date | |
datetime.time | date | Stored as 1970-01-01 + time |
typing.Literal | keyword | |
UUID | keyword | |
Path | keyword | |
IntEnum | integer | |
Enum | keyword | also StrEnum |
Some special pydanctic types are also supported:
Pydantic type | ES type | Comment |
---|---|---|
URL | keyword | |
IPvAddressAny | ip |
You can specify ElasticSearch special fields usingesorm.fields
module.
fromesormimportESModelfromesorm.fieldsimportkeyword,text,byte,geo_pointclassUser(ESModel):name:textemail:keywordage:bytelocation:geo_point ...
The supported fields are:
Field name | ES type |
---|---|
keyword | keyword |
text | text |
binary | binary |
byte | byte |
short | short |
integer orint32 | integer |
long orint64 | long |
unsigned_long oruint64 | unsigned_long |
float16 orhalf_float | half_float |
float32 | float |
double | double |
boolean | boolean |
geo_point | geo_point |
Thebinary
field acceptsbase64 encoded strings. However, if you providebytes
to it, theywill be automatically converted to abase64 string during serialization. When you retrieve thefield, it will always be abase64 encoded string. You can easily convert it back to bytes usingthebytes()
method:binary_field.bytes()
.
You can also useAnnotated
types to specify the ES type, like PydanticPositiveInt
andNegativeInt
and similar.
You can use geo_point field type for location data:
fromesormimportESModelfromesorm.fieldsimportgeo_pointclassPlace(ESModel):name:strlocation:geo_pointdefcreate_place():place=Place(name='Budapest',location=geo_point(lat=47.4979,long=19.0402))place.save()
fromesormimportESModelfromesorm.fieldsimportkeyword,text,byteclassUser(ESModel):name:textemail:keywordage:byte=18classPost(ESModel):title:textcontent:textwriter:User# User is a nested document
You can use list of primitive fields:
fromtypingimportListfromesormimportESModelclassUser(ESModel):emails:List[str]favorite_ids:List[int] ...
ESBaseModel
is the base ofESModel
.
fromesormimportESModel,ESBaseModelfromesorm.fieldsimportkeyword,text,byte# This way `User` model won't be in the indexclassBaseUser(ESBaseModel):# <---------------# This config will be inherited by UserclassESConfig:id_field='email'name:textemail:keyword# This will be in the index because it is a subclass of ESModelclassUserExtended(BaseUser,ESModel):age:byte=18asyncdefcreate_user():user=UserExtended(name='John Doe',email="john@example.com",age=25 )awaituser.save()
It is useful to use it for nested documents, because by using it will not be included in theElasticSearch index.
fromesormimportESModel,ESBaseModelfromesorm.fieldsimportkeyword,text,byte# This way `User` model won't be in the indexclassUser(ESBaseModel):# <---------------name:textemail:keywordage:byte=18classPost(ESModel):title:textcontent:textwriter:User# User is a nested document
You can specify id field inmodel settings:
fromesormimportESModelfromesorm.fieldsimportkeyword,text,byteclassUser(ESModel):classESConfig:id_field='email'name:textemail:keywordage:byte=18
This way the field specified inid_field
will be removed from the document and used as the document_id
in theindex.
If you specify a field namedid
in your model, it will be used as the document_id
in the index(it will automatically override theid_field
setting):
fromesormimportESModelclassUser(ESModel):id:int# This will be used as the document _id in the indexname:str
You can also create an__id__
property in your model to return a custom id:
fromesormimportESModelfromesorm.fieldsimportkeyword,text,byteclassUser(ESModel):name:textemail:keywordage:byte=18@propertydef__id__(self)->str:returnself.email
NOTE: annotation of__id__
method is important, and it must be declared as a property.
You can specify model settings usingESConfig
child class.
fromtypingimportOptional,List,Dict,AnyfromesormimportESModelclassUser(ESModel):classESConfig:""" ESModel Config """# The index nameindex:Optional[str]=None# The name of the 'id' fieldid_field:Optional[str]=None# Default sortdefault_sort:Optional[List[Dict[str,Dict[str,str]]]]=None# ElasticSearch index settings (https://www.elastic.co/guide/en/elasticsearch/reference/current/index-modules.html)settings:Optional[Dict[str,Any]]=None# Maximum recursion depth of lazy propertieslazy_property_max_recursion_depth:int=1
You can useESModelTimestamp
class to addcreated_at
andupdated_at
fields to your model:
fromesormimportESModelTimestampclassUser(ESModelTimestamp):name:strage:int
These fields will be automatically updated to the actualdatetime
when you create or update a document.Thecreated_at
field will be set only when you create a document. Theupdated_at
field will be setwhen you create or update a document.
You can use the usualPydantic
field description, but you can also use docstrings like this:
fromesormimportESModelfromesorm.fieldsimportTextFieldclassUser(ESModel):name:str='John Doe'""" The name of the user """age:int=18""" The age of the user """# This is the usual Pydantic way, but I think docstrings are more intuitive and readableaddress:str=TextField(description="The address of the user")
The documentation is usseful if you create an API and you want to generate documentation from the model.It can be used inFastAPI for example.
You can specify aliases for fields:
fromesormimportESModelfromesorm.fieldsimportkeyword,FieldclassUser(ESModel):full_name:keyword=Field(alias='fullName')# In ES `fullName` will be the field name
This is good for renaming fields in the model without changing the ElasticSearch field name.
You can connect with a simple connection string:
fromesormimportconnectasyncdefes_init():awaitconnect('localhost:9200')
Also you can connect to multiple hosts if you have a cluster:
fromesormimportconnectasyncdefes_init():awaitconnect(['localhost:9200','localhost:9201'])
You can wait for node or cluster to be ready (recommended):
fromesormimportconnectasyncdefes_init():awaitconnect('localhost:9200',wait=True)
This will ping the node in 2 seconds intervals until it is ready. It can be a long time.
You can pass any arguments thatAsyncElasticsearch
supports:
fromesormimportconnectasyncdefes_init():awaitconnect('localhost:9200',wait=True,sniff_on_start=True,sniff_on_connection_fail=True)
Theconnect
function is a wrapper for theAsyncElasticsearch
constructor. It creates and storesa global instance of a proxy to anAsyncElasticsearch
instance. The model operations will use thisinstance to communicate with ElasticSearch. You can retrieve the proxy client instance and you canuse the same way asAsyncElasticsearch
instance:
fromesormimportesasyncdefes_init():awaites.ping()
You can create index templates easily:
fromesormimportmodelasesorm_model# Create index templateasyncdefprepare_es():awaitesorm_model.create_index_template('default_template',prefix_name='esorm_',shards=3,auto_expand_replicas='1-5')
Here this will be applied allesorm_
prefixed (default) indices.
All indices created by ESORM have a prefix, which you can modify globally if you want:
fromesorm.modelimportset_default_index_prefixset_default_index_prefix('custom_prefix_')
The default prefix isesorm_
.
You can create indices and mappings automatically from your models:
fromesormimportsetup_mappings# Create indices and mappingsasyncdefprepare_es():importmodels# Import your models# Here models argument is not needed, but you can pass it to prevent unused import warningawaitsetup_mappings(models)
First you must create (import) all model classes. Model classes will be registered into a global registry.Then you can callsetup_mappings
function to create indices and mappings for all registered models.
IMPORTANT: This method will ignore mapping errors if you already have an index with the same name. It can update theindicesby new fields, but cannot modify or delete fields! For that you need to reindex your ES database. It is an ElasticSearchlimitation.
When you get a model instance from elasticsearch bysearch
orget
methods, you will get the following privateattributes filled automatically:
Attribute | Description |
---|---|
_id | The ES id of the document |
_routing | The routing value of the document |
_version | Version of the document |
_primary_term | The primary term of the document |
_seq_no | The sequence number of the document |
fromesormimportESModel# Here the model have automatically generated idclassUser(ESModel):name:strage:intasyncdefcreate_user():# Create a new useruser=User(name='John Doe',age=25)# Save the user to ElasticSearchnew_user_id=awaituser.save()print(new_user_id)
fromesormimportESModel# Here the model have automatically generated idclassUser(ESModel):name:strage:intasyncdefget_user(user_id:str):user=awaitUser.get(user_id)print(user.name)
On update race conditions are checked automatically (with the help of _primary_term and _seq_no fields).This way an optimistic locking mechanism is implemented.
fromesormimportESModel# Here the model have automatically generated idclassUser(ESModel):name:strage:intasyncdefupdate_user(user_id:str):user=awaitUser.get(user_id)user.name='Jane Doe'awaituser.save()
fromesormimportESModel# Here the model have automatically generated idclassUser(ESModel):name:strage:intasyncdefdelete_user(user_id:str):user=awaitUser.get(user_id)awaituser.delete()
Bulk operations could be much faster than single operations, if you have lot of documents tocreate, update or delete.
You can use context for bulk operations:
fromtypingimportListfromesormimportESModel,ESBulk# Here the model have automatically generated idclassUser(ESModel):name:strage:intasyncdefbulk_create_users():asyncwithESBulk()asbulk:# Creating or modifiying modelsforiinrange(10):user=User(name=f'User{i}',age=i)awaitbulk.save(user)asyncdefbulk_delete_users(users:List[User]):asyncwithESBulk(wait_for=True)asbulk:# Here we wait for the bulk operation to finish# Deleting modelsforuserinusers:awaitbulk.delete(user)
Thewait_for
argument is optional. If it isTrue
, the context will wait for the bulk operation to finish.
You can search for documents usingsearch
method, where an ES query can be specified as a dictionary.You can useres_dict=True
argument to get the result as a dictionary instead of a list. The key will be theid
of the document:await User.search(query, res_dict=True)
.
If you only need one result, you can usesearch_one
method.
fromesormimportESModel# Here the model have automatically generated idclassUser(ESModel):name:strage:intasyncdefsearch_users():# Search for users at least 18 years oldusers=awaitUser.search(query={'bool': {'must': [{'range': {'age': {'gte':18 } } }] } } )foruserinusers:print(user.name)asyncdefsearch_one_user():# Search a user named John Doeuser=awaitUser.search_one(query={'bool': {'must': [{'match': {'name': {'query':'John Doe' } } }] } } )print(user.name)
Queries are type checked, because they are annotated asTypedDict
s. You can use IDE autocompletion and type checking.
You can search for documents usingsearch_by_fields
method, where you can specify a field and a value.It also has ares_dict
argument andsearch_one_by_fields
variant.
fromesormimportESModel# Here the model have automatically generated idclassUser(ESModel):name:strage:intasyncdefsearch_users():# Search users age is 18users=awaitUser.search_by_fields({'age':18})foruserinusers:print(user.name)
You can useaggregate
method to get aggregations.You can specify an ES aggregation query as a dictionary. It also accepts normal ES queries,to be able to fiter which documents you want to aggregate.Both the aggs parameter and the query parameter are type checked, because they are annotated asTypedDict
s.You can use IDE autocompletion and type checking.
fromesormimportESModel# Here the model have automatically generated idclassUser(ESModel):name:strage:intcountry:strasyncdefaggregate_avg():# Get average age of usersaggs_def= {'avg_age': {'avg': {'field':'age' } } }aggs=awaitUser.aggregate(aggs_def)print(aggs['avg_age']['value'])asyncdefaggregate_avg_by_country(country='Hungary'):# Get average age of users by countryaggs_def= {'avg_age': {'avg': {'field':'age' } } }query= {'bool': {'must': [{'match': {'country': {'query':country } } }] } }aggs=awaitUser.aggregate(aggs_def,query)print(aggs['avg_age']['value'])asyncdefaggregate_terms():# Get number of users by countryaggs_def= {'countries': {'terms': {'field':'country' } } }aggs=awaitUser.aggregate(aggs_def)forbucketinaggs['countries']['buckets']:print(bucket['key'],bucket['doc_count'])
You can usePagination
andSort
classes to decorate your models. They simply wrap your modelsand add pagination and sorting functionality to them.
You can add a callback parameter to thePagination
class which will be invoked after the search withthe total number of documents found.
fromesorm.modelimportESModel,PaginationclassUser(ESModel):id:int# This will be used as the document _id in the indexname:strage:intdefget_users(page=1,page_size=10):defpagination_callback(total:int):# You may set a header value or something else hereprint(f'Total users:{total}')# 1st create the decorator itselfpagination=Pagination(page=page,page_size=page_size)# Then decorate your modelres=pagination(User).search_by_fields(age=18)# Here the result has maximum 10 itemsreturnres
It is similar to pagination:
fromesorm.modelimportESModel,SortclassUser(ESModel):id:int# This will be used as the document _id in the indexname:strage:intdefget_users():# 1st create the decorator itselfsort=Sort(sort=[ {'age': {'order':'desc'}}, {'name': {'order':'asc'}} ])# Then decorate your modelres=sort(User).search_by_fields(age=18)# Here the result is sorted by age ascendingreturnresdefget_user_sorted_by_name():# You can also use this simplified syntaxsort=Sort(sort='name')# Then decorate your modelres=sort(User).all()# Here the result is sorted by age descendingreturnres
For testing you can use thetest.sh
in the root directory. It is a script to runningtests on multiple python interpreters in virtual environments. At the top of the file you can specifywhich python interpreters you want to test. The ES versions are specified intests/docker-compose.yml
file.
If you already have a virtual environment, simply usepytest
to run the tests.
This project is licensed under the terms of theMozilla Public License 2.0 (MPL 2.0) license.
If you use this project in your research, please cite it using the following BibTeX entry:
@misc{esorm,author ={Adam Wallner},title ={ESORM: ElasticSearch Object Relational Mapper},year ={2023},publisher ={GitHub},journal ={GitHub repository},howpublished ={\url{https://github.com/wallneradam/esorm}},}
About
Python ElasticSearch ORM based on Pydantic