"""This module implements tables, the central place for accessing and manipulatingdata in TinyDB."""fromtypingimport(Callable,Dict,Iterable,Iterator,List,Mapping,Optional,Union,cast,Tuple)from.queriesimportQueryLikefrom.storagesimportStoragefrom.utilsimportLRUCache__all__=('Document','Table')[docs]classDocument(dict):""" A document stored in the database. This class provides a way to access both a document's content and its ID using ``doc.doc_id``. """[docs]def__init__(self,value:Mapping,doc_id:int):super().__init__(value)self.doc_id=doc_id [docs]classTable:""" Represents a single TinyDB table. It provides methods for accessing and manipulating documents. .. admonition:: Query Cache As an optimization, a query cache is implemented using a :class:`~tinydb.utils.LRUCache`. This class mimics the interface of a normal ``dict``, but starts to remove the least-recently used entries once a threshold is reached. The query cache is updated on every search operation. When writing data, the whole cache is discarded as the query results may have changed. .. admonition:: Customization For customization, the following class variables can be set: - ``document_class`` defines the class that is used to represent documents, - ``document_id_class`` defines the class that is used to represent document IDs, - ``query_cache_class`` defines the class that is used for the query cache - ``default_query_cache_capacity`` defines the default capacity of the query cache .. versionadded:: 4.0 :param storage: The storage instance to use for this table :param name: The table name :param cache_size: Maximum capacity of query cache """#: The class used to represent documents#:#: .. versionadded:: 4.0document_class=Document#: The class used to represent a document ID#:#: .. versionadded:: 4.0document_id_class=int#: The class used for caching query results#:#: .. versionadded:: 4.0query_cache_class=LRUCache#: The default capacity of the query cache#:#: .. versionadded:: 4.0default_query_cache_capacity=10[docs]def__init__(self,storage:Storage,name:str,cache_size:int=default_query_cache_capacity):""" Create a table instance. """self._storage=storageself._name=nameself._query_cache:LRUCache[QueryLike,List[Document]] \
=self.query_cache_class(capacity=cache_size)self._next_id=None [docs]def__repr__(self):args=['name={!r}'.format(self.name),'total={}'.format(len(self)),'storage={}'.format(self._storage),]return'<{}{}>'.format(type(self).__name__,', '.join(args)) @propertydefname(self)->str:""" Get the table name. """returnself._name@propertydefstorage(self)->Storage:""" Get the table storage instance. """returnself._storage[docs]definsert(self,document:Mapping)->int:""" Insert a new document into the table. :param document: the document to insert :returns: the inserted document's ID """# Make sure the document implements the ``Mapping`` interfaceifnotisinstance(document,Mapping):raiseValueError('Document is not a Mapping')# First, we get the document ID for the new documentifisinstance(document,Document):# For a `Document` object we use the specified IDdoc_id=document.doc_id# We also reset the stored next ID so the next insert won't# re-use document IDs by accident when storing an old valueself._next_id=Noneelse:# In all other cases we use the next free IDdoc_id=self._get_next_id()# Now, we update the table and add the documentdefupdater(table:dict):ifdoc_idintable:raiseValueError(f'Document with ID{str(doc_id)} 'f'already exists')# By calling ``dict(document)`` we convert the data we got to a# ``dict`` instance even if it was a different class that# implemented the ``Mapping`` interfacetable[doc_id]=dict(document)# See below for details on ``Table._update``self._update_table(updater)returndoc_id [docs]definsert_multiple(self,documents:Iterable[Mapping])->List[int]:""" Insert multiple documents into the table. :param documents: an Iterable of documents to insert :returns: a list containing the inserted documents' IDs """doc_ids=[]defupdater(table:dict):fordocumentindocuments:# Make sure the document implements the ``Mapping`` interfaceifnotisinstance(document,Mapping):raiseValueError('Document is not a Mapping')ifisinstance(document,Document):# Check if document does not override an existing documentifdocument.doc_idintable:raiseValueError(f'Document with ID{str(document.doc_id)} 'f'already exists')# Store the doc_id, so we can return all document IDs# later. Then save the document with its doc_id and# skip the rest of the current loopdoc_id=document.doc_iddoc_ids.append(doc_id)table[doc_id]=dict(document)continue# Generate new document ID for this document# Store the doc_id, so we can return all document IDs# later, then save the document with the new doc_iddoc_id=self._get_next_id()doc_ids.append(doc_id)table[doc_id]=dict(document)# See below for details on ``Table._update``self._update_table(updater)returndoc_ids [docs]defall(self)->List[Document]:""" Get all documents stored in the table. :returns: a list with all documents. """# iter(self) (implemented in Table.__iter__ provides an iterator# that returns all documents in this table. We use it to get a list# of all documents by using the ``list`` constructor to perform the# conversion.returnlist(iter(self)) [docs]defsearch(self,cond:QueryLike)->List[Document]:""" Search for all documents matching a 'where' cond. :param cond: the condition to check against :returns: list of matching documents """# First, we check the query cache to see if it has results for this# querycached_results=self._query_cache.get(cond)ifcached_resultsisnotNone:returncached_results[:]# Perform the search by applying the query to all documents.# Then, only if the document matches the query, convert it# to the document class and document ID class.docs=[self.document_class(doc,self.document_id_class(doc_id))fordoc_id,docinself._read_table().items()ifcond(doc)]# Only cache cacheable queries.## This weird `getattr` dance is needed to make MyPy happy as# it doesn't know that a query might have a `is_cacheable` method# that is not declared in the `QueryLike` protocol due to it being# optional.# See: https://github.com/python/mypy/issues/1424## Note also that by default we expect custom query objects to be# cacheable (which means they need to have a stable hash value).# This is to keep consistency with TinyDB's behavior before# `is_cacheable` was introduced which assumed that all queries# are cacheable.is_cacheable:Callable[[],bool]=getattr(cond,'is_cacheable',lambda:True)ifis_cacheable():# Update the query cacheself._query_cache[cond]=docs[:]returndocs [docs]defget(self,cond:Optional[QueryLike]=None,doc_id:Optional[int]=None,doc_ids:Optional[List]=None)->Optional[Union[Document,List[Document]]]:""" Get exactly one document specified by a query or a document ID. However, if multiple document IDs are given then returns all documents in a list. Returns ``None`` if the document doesn't exist. :param cond: the condition to check against :param doc_id: the document's ID :param doc_ids: the document's IDs(multiple) :returns: the document(s) or ``None`` """table=self._read_table()ifdoc_idisnotNone:# Retrieve a document specified by its IDraw_doc=table.get(str(doc_id),None)ifraw_docisNone:returnNone# Convert the raw data to the document classreturnself.document_class(raw_doc,doc_id)elifdoc_idsisnotNone:# Filter the table by extracting out all those documents which# have doc id specified in the doc_id list.# Since document IDs will be unique, we make it a set to ensure# constant time lookupdoc_ids_set=set(str(doc_id)fordoc_idindoc_ids)# Now return the filtered documents in form of listreturn[self.document_class(doc,self.document_id_class(doc_id))fordoc_id,docintable.items()ifdoc_idindoc_ids_set]elifcondisnotNone:# Find a document specified by a query# The trailing underscore in doc_id_ is needed so MyPy# doesn't think that `doc_id_` (which is a string) needs# to have the same type as `doc_id` which is this function's# parameter and is an optional `int`.fordoc_id_,docinself._read_table().items():ifcond(doc):returnself.document_class(doc,self.document_id_class(doc_id_))returnNoneraiseRuntimeError('You have to pass either cond or doc_id or doc_ids') [docs]defcontains(self,cond:Optional[QueryLike]=None,doc_id:Optional[int]=None)->bool:""" Check whether the database contains a document matching a query or an ID. If ``doc_id`` is set, it checks if the db contains the specified ID. :param cond: the condition use :param doc_id: the document ID to look for """ifdoc_idisnotNone:# Documents specified by IDreturnself.get(doc_id=doc_id)isnotNoneelifcondisnotNone:# Document specified by conditionreturnself.get(cond)isnotNoneraiseRuntimeError('You have to pass either cond or doc_id') [docs]defupdate(self,fields:Union[Mapping,Callable[[Mapping],None]],cond:Optional[QueryLike]=None,doc_ids:Optional[Iterable[int]]=None,)->List[int]:""" Update all matching documents to have a given set of fields. :param fields: the fields that the matching documents will have or a method that will update the documents :param cond: which documents to update :param doc_ids: a list of document IDs :returns: a list containing the updated document's ID """# Define the function that will perform the updateifcallable(fields):defperform_update(table,doc_id):# Update documents by calling the update function provided by# the userfields(table[doc_id])else:defperform_update(table,doc_id):# Update documents by setting all fields from the provided datatable[doc_id].update(fields)ifdoc_idsisnotNone:# Perform the update operation for documents specified by a list# of document IDsupdated_ids=list(doc_ids)defupdater(table:dict):# Call the processing callback with all document IDsfordoc_idinupdated_ids:perform_update(table,doc_id)# Perform the update operation (see _update_table for details)self._update_table(updater)returnupdated_idselifcondisnotNone:# Perform the update operation for documents specified by a query# Collect affected doc_idsupdated_ids=[]defupdater(table:dict):_cond=cast(QueryLike,cond)# We need to convert the keys iterator to a list because# we may remove entries from the ``table`` dict during# iteration and doing this without the list conversion would# result in an exception (RuntimeError: dictionary changed size# during iteration)fordoc_idinlist(table.keys()):# Pass through all documents to find documents matching the# query. Call the processing callback with the document IDif_cond(table[doc_id]):# Add ID to list of updated documentsupdated_ids.append(doc_id)# Perform the update (see above)perform_update(table,doc_id)# Perform the update operation (see _update_table for details)self._update_table(updater)returnupdated_idselse:# Update all documents unconditionallyupdated_ids=[]defupdater(table:dict):# Process all documentsfordoc_idinlist(table.keys()):# Add ID to list of updated documentsupdated_ids.append(doc_id)# Perform the update (see above)perform_update(table,doc_id)# Perform the update operation (see _update_table for details)self._update_table(updater)returnupdated_ids [docs]defupdate_multiple(self,updates:Iterable[Tuple[Union[Mapping,Callable[[Mapping],None]],QueryLike]],)->List[int]:""" Update all matching documents to have a given set of fields. :returns: a list containing the updated document's ID """# Define the function that will perform the updatedefperform_update(fields,table,doc_id):ifcallable(fields):# Update documents by calling the update function provided# by the userfields(table[doc_id])else:# Update documents by setting all fields from the provided# datatable[doc_id].update(fields)# Perform the update operation for documents specified by a query# Collect affected doc_idsupdated_ids=[]defupdater(table:dict):# We need to convert the keys iterator to a list because# we may remove entries from the ``table`` dict during# iteration and doing this without the list conversion would# result in an exception (RuntimeError: dictionary changed size# during iteration)fordoc_idinlist(table.keys()):forfields,condinupdates:_cond=cast(QueryLike,cond)# Pass through all documents to find documents matching the# query. Call the processing callback with the document IDif_cond(table[doc_id]):# Add ID to list of updated documentsupdated_ids.append(doc_id)# Perform the update (see above)perform_update(fields,table,doc_id)# Perform the update operation (see _update_table for details)self._update_table(updater)returnupdated_ids [docs]defupsert(self,document:Mapping,cond:Optional[QueryLike]=None)->List[int]:""" Update documents, if they exist, insert them otherwise. Note: This will update *all* documents matching the query. Document argument can be a tinydb.table.Document object if you want to specify a doc_id. :param document: the document to insert or the fields to update :param cond: which document to look for, optional if you've passed a Document with a doc_id :returns: a list containing the updated documents' IDs """# Extract doc_idifisinstance(document,Document)andhasattr(document,'doc_id'):doc_ids:Optional[List[int]]=[document.doc_id]else:doc_ids=None# Make sure we can actually find a matching documentifdoc_idsisNoneandcondisNone:raiseValueError("If you don't specify a search query, you must ""specify a doc_id. Hint: use a table.Document ""object.")# Perform the update operationtry:updated_docs:Optional[List[int]]=self.update(document,cond,doc_ids)exceptKeyError:# This happens when a doc_id is specified, but it's missingupdated_docs=None# If documents have been updated: return their IDsifupdated_docs:returnupdated_docs# There are no documents that match the specified query -> insert the# data as a new documentreturn[self.insert(document)] [docs]defremove(self,cond:Optional[QueryLike]=None,doc_ids:Optional[Iterable[int]]=None,)->List[int]:""" Remove all matching documents. :param cond: the condition to check against :param doc_ids: a list of document IDs :returns: a list containing the removed documents' ID """ifdoc_idsisnotNone:# This function returns the list of IDs for the documents that have# been removed. When removing documents identified by a set of# document IDs, it's this list of document IDs we need to return# later.# We convert the document ID iterator into a list, so we can both# use the document IDs to remove the specified documents and# to return the list of affected document IDsremoved_ids=list(doc_ids)defupdater(table:dict):fordoc_idinremoved_ids:table.pop(doc_id)# Perform the remove operationself._update_table(updater)returnremoved_idsifcondisnotNone:removed_ids=[]# This updater function will be called with the table data# as its first argument. See ``Table._update`` for details on this# operationdefupdater(table:dict):# We need to convince MyPy (the static type checker) that# the ``cond is not None`` invariant still holds true when# the updater function is called_cond=cast(QueryLike,cond)# We need to convert the keys iterator to a list because# we may remove entries from the ``table`` dict during# iteration and doing this without the list conversion would# result in an exception (RuntimeError: dictionary changed size# during iteration)fordoc_idinlist(table.keys()):if_cond(table[doc_id]):# Add document ID to list of removed document IDsremoved_ids.append(doc_id)# Remove document from the tabletable.pop(doc_id)# Perform the remove operationself._update_table(updater)returnremoved_idsraiseRuntimeError('Use truncate() to remove all documents') [docs]deftruncate(self)->None:""" Truncate the table by removing all documents. """# Update the table by resetting all dataself._update_table(lambdatable:table.clear())# Reset document ID counterself._next_id=None [docs]defcount(self,cond:QueryLike)->int:""" Count the documents matching a query. :param cond: the condition use """returnlen(self.search(cond)) [docs]defclear_cache(self)->None:""" Clear the query cache. """self._query_cache.clear() [docs]def__len__(self):""" Count the total number of documents in this table. """returnlen(self._read_table()) [docs]def__iter__(self)->Iterator[Document]:""" Iterate over all documents stored in the table. :returns: an iterator over all documents. """# Iterate all documents and their IDsfordoc_id,docinself._read_table().items():# Convert documents to the document classyieldself.document_class(doc,self.document_id_class(doc_id)) def_get_next_id(self):""" Return the ID for a newly inserted document. """# If we already know the next IDifself._next_idisnotNone:next_id=self._next_idself._next_id=next_id+1returnnext_id# Determine the next document ID by finding out the max ID value# of the current table documents# Read the table documentstable=self._read_table()# If the table is empty, set the initial IDifnottable:next_id=1self._next_id=next_id+1returnnext_id# Determine the next ID based on the maximum ID that's currently in usemax_id=max(self.document_id_class(i)foriintable.keys())next_id=max_id+1# The next ID we will return AFTER this call needs to be larger than# the current next ID we calculatedself._next_id=next_id+1returnnext_iddef_read_table(self)->Dict[str,Mapping]:""" Read the table data from the underlying storage. Documents and doc_ids are NOT yet transformed, as we may not want to convert *all* documents when returning only one document for example. """# Retrieve the tables from the storagetables=self._storage.read()iftablesisNone:# The database is emptyreturn{}# Retrieve the current table's datatry:table=tables[self.name]exceptKeyError:# The table does not exist yet, so it is emptyreturn{}returntabledef_update_table(self,updater:Callable[[Dict[int,Mapping]],None]):""" Perform a table update operation. The storage interface used by TinyDB only allows to read/write the complete database data, but not modifying only portions of it. Thus, to only update portions of the table data, we first perform a read operation, perform the update on the table data and then write the updated data back to the storage. As a further optimization, we don't convert the documents into the document class, as the table data will *not* be returned to the user. """tables=self._storage.read()iftablesisNone:# The database is emptytables={}try:raw_table=tables[self.name]exceptKeyError:# The table does not exist yet, so it is emptyraw_table={}# Convert the document IDs to the document ID class.# This is required as the rest of TinyDB expects the document IDs# to be an instance of ``self.document_id_class`` but the storage# might convert dict keys to strings.table={self.document_id_class(doc_id):docfordoc_id,docinraw_table.items()}# Perform the table update operationupdater(table)# Convert the document IDs back to strings.# This is required as some storages (most notably the JSON file format)# don't support IDs other than strings.tables[self.name]={str(doc_id):docfordoc_id,docintable.items()}# Write the newly updated data back to the storageself._storage.write(tables)# Clear the query cache, as the table contents have changedself.clear_cache()