- Notifications
You must be signed in to change notification settings - Fork0
rush-db/rushdb-python
Folders and files
| Name | Name | Last commit message | Last commit date | |
|---|---|---|---|---|
Repository files navigation
RushDB is an instant database for modern apps and DS/ML ops built on top of Neo4j.It automates data normalization, manages relationships, and infers data types.
pip install rushdb
fromrushdbimportRushDB# Initialize the clientdb=RushDB("RUSHDB_API_KEY")# Create a recorduser=db.records.create(label="USER",data={"name":"John Doe","email":"john@example.com","age":30 })# Find recordsresult=db.records.find({"where": {"age": {"$gte":18},"name": {"$startsWith":"J"} },"limit":10})# Work with SearchResultprint(f"Found{len(result)} records out of{result.total} total")# Iterate over resultsforrecordinresult:print(f"User:{record.get('name')} (Age:{record.get('age')})")# Check if there are more resultsifresult.has_more:print("There are more records available")# Access specific recordsfirst_user=result[0]ifresultelseNone# Create relationshipscompany=db.records.create(label="COMPANY",data={"name":"Acme Inc."})# Attach records with a relationshipuser.attach(target=company,options={"type":"WORKS_AT","direction":"out"})# Run a raw Cypher query (cloud-only)raw=db.query.raw({"query":"MATCH (u:USER) RETURN u LIMIT $limit","params": {"limit":5}})print(raw.get("data"))
RushDB automatically normalizes nested objects into a graph structure:
# Push nested JSON with automatic relationship creationdb.records.create_many("COMPANY", {"name":"Google LLC","rating":4.9,"DEPARTMENT": [{"name":"Research & Development","PROJECT": [{"name":"Bard AI","EMPLOYEE": [{"name":"Jeff Dean","position":"Head of AI Research" }] }] }]})
csv_data="""name,email,age\nJohn,john@example.com,30\nJane,jane@example.com,25"""response=db.records.import_csv(label="USER",data=csv_data,options={"returnResult":True,"suggestTypes":True},parse_config={"header":True,"skipEmptyLines":True,"dynamicTyping":True})print(response.get("data"))
RushDB Python SDK uses a modernSearchResult container that follows Python SDK best practices similar to boto3, google-cloud libraries, and other popular SDKs.
- Generic type support: Uses Python's typing generics (
SearchResult[T]) withRecordSearchResultas a type alias forSearchResult[Record] - List-like access: Index, slice, and iterate like a regular list
- Search context: Access total count, pagination info, and the original search query
- Boolean conversion: Use in if statements naturally (returns
Trueif the result contains any items) - Pagination support: Built-in pagination information and
has_moreproperty
# Perform a searchresult=db.records.find({"where": {"status":"active"},"limit":10,"skip":20})# Check if we have resultsifresult:print(f"Found{len(result)} records")# Access search result informationprint(f"Total matching records:{result.total}")print(f"Has more results:{result.has_more}")print(f"Search query:{result.search_query}")# Get detailed pagination infopage_info=result.get_page_info()print(f"Page info:{page_info}")# Iterate over resultsforrecordinresult:print(f"Record:{record.get('name')}")# List comprehensions worknames= [r.get('name')forrinresult]# Indexing and slicingfirst_record=result[0]ifresultelseNonefirst_five=result[:5]# String representationprint(repr(result))# SearchResult(count=10, total=42)
def__init__(self,data:List[T],total:Optional[int]=None,search_query:Optional[SearchQuery]=None,):""" Initialize search result. Args: data: List of result items total: Total number of matching records (defaults to len(data) if not provided) search_query: The search query used to generate this result (defaults to {}) """
| Property | Type | Description |
|---|---|---|
data | List[T] | The list of result items (generic type) |
total | int | Total number of matching records |
has_more | bool | Whether there are more records available |
search_query | SearchQuery | The search query used to generate result |
| Method | Return Type | Description |
|---|---|---|
to_dict() | dict | Returns standardized dict with total, data, search_query |
get_page_info() | dict | Returns pagination info including total, loaded, has_more |
Implementation Notes:
- If
search_queryis not provided during initialization, it defaults to an empty dictionary{}- The
has_moreproperty is calculated by comparing total with loaded records- The
__bool__method returnsTrueif the result contains any items (len(data) > 0)get_page_info()provides detailed pagination metadata for advanced use cases
# Paginated search using skip/limit in querydefpaginate_results(query_base,page_size=10):current_skip=0whileTrue:# Add pagination to queryquery= {**query_base,"limit":page_size,"skip":current_skip}result=db.records.find(query)ifnotresult:breakprint(f"Processing{len(result)} records (skip:{current_skip})")forrecordinresult:process_record(record)ifnotresult.has_more:breakcurrent_skip+=len(result)# Usagepaginate_results({"where": {"category":"electronics"},"orderBy": {"created_at":"desc"}})
The SDK provides a specialized type alias for search results containing Record objects:
# Type alias for record search resultsRecordSearchResult=SearchResult[Record]
This type is what's returned by methods likedb.records.find(), providing type safety and specialized handling for Record objects while leveraging all the functionality of the generic SearchResult class.
The Record class has been enhanced with better data access patterns and utility methods.
# Create a recorduser=db.records.create("User", {"name":"John Doe","email":"john@example.com","age":30,"department":"Engineering"})# Safe field access with defaultsname=user.get("name")# "John Doe"phone=user.get("phone","Not provided")# "Not provided"# Get clean user data (excludes internal fields like __id, __label)user_data=user.get_data()# Returns: {"name": "John Doe", "email": "john@example.com", "age": 30, "department": "Engineering"}# Get all data including internal fieldsfull_data=user.get_data(exclude_internal=False)# Includes: __id, __label, __proptypes, etc.# Convenient fields propertyfields=user.fields# Same as user.get_data()# Dictionary conversionuser_dict=user.to_dict()# Clean user datafull_dict=user.to_dict(exclude_internal=False)# All data# Direct field accessuser_name=user["name"]# Direct accessuser_id=user["__id"]# Internal field access
# Safe existence checking (no exceptions)ifuser.exists():print("Record is valid and accessible")user.update({"status":"active"})else:print("Record doesn't exist or is not accessible")# Perfect for validation workflowsdefprocess_record_safely(record):ifnotrecord.exists():returnNonereturnrecord.get_data()# Conditional operationsrecords=db.records.find({"where": {"status":"pending"}})forrecordinrecords:ifrecord.exists():record.update({"processed_at":datetime.now()})
user=db.records.create("User", {"name":"Alice Johnson"})print(repr(user))# Record(id='abc-123', label='User')print(str(user))# User: Alice Johnson# For records without namesproduct=db.records.create("Product", {"sku":"ABC123"})print(str(product))# Product (product-id-here)
For comprehensive documentation, tutorials, and examples, please visit:
Documentation includes:
- Complete Records API reference
- Relationship management
- Complex query examples
- Transaction usage
- Vector search capabilities
- Data import tools
- GitHub Issues - Bug reports and feature requests
- Discord Community - Get help from the community
- Email Support - Direct support from the RushDB team
Updates a record by ID, replacing all data.
Signature:
defset(self,record_id:str,data:Dict[str,Any],transaction:Optional[Transaction]=None)->Dict[str,str]
Arguments:
record_id(str): ID of the record to updatedata(Dict[str, Any]): New record datatransaction(Optional[Transaction]): Optional transaction object
Returns:
Dict[str, str]: Response data
Example:
# Update entire record datanew_data= {"name":"Updated Company Name","rating":5.0}response=db.records.set(record_id="record-123",data=new_data)
Updates specific fields of a record by ID.
Signature:
defupdate(self,record_id:str,data:Dict[str,Any],transaction:Optional[Transaction]=None)->Dict[str,str]
Arguments:
record_id(str): ID of the record to updatedata(Dict[str, Any]): Partial record data to updatetransaction(Optional[Transaction]): Optional transaction object
Returns:
Dict[str, str]: Response data
Example:
# Update specific fieldsupdates= {"rating":4.8,"status":"active"}response=db.records.update(record_id="record-123",data=updates)
Searches for records matching specified criteria.
Signature:
deffind(self,search_query:Optional[SearchQuery]=None,record_id:Optional[str]=None,transaction:Optional[Transaction]=None)->RecordSearchResult
Arguments:
search_query(Optional[SearchQuery]): Search query parametersrecord_id(Optional[str]): Optional record ID to search fromtransaction(Optional[Transaction]): Optional transaction object
Returns:
RecordSearchResult: SearchResult container with matching records and metadata
Example:
# Search for records with complex criteriasearch_query= {"where": {"$and": [ {"age": {"$gte":18}}, {"status":"active"}, {"department":"Engineering"} ] },"orderBy": {"created_at":"desc"},"limit":10}result=db.records.find(search_query=search_query)# Work with SearchResultprint(f"Found{len(result)} out of{result.total} total records")# Iterate over resultsforrecordinresult:print(f"Employee:{record.get('name')} -{record.get('department')}")# Check paginationifresult.has_more:print("More results available")# Access specific recordsfirst_employee=result[0]ifresultelseNone# List operationssenior_employees= [rforrinresultifr.get('age',0)>30]
Deletes records matching a query.
Signature:
defdelete(self,search_query:SearchQuery,transaction:Optional[Transaction]=None)->Dict[str,str]
Arguments:
search_query(SearchQuery): Query to match records for deletiontransaction(Optional[Transaction]): Optional transaction object
Returns:
Dict[str, str]: Response data
Example:
# Delete records matching criteriasearch_query= {"where": {"status":"inactive","lastActive": {"$lt":"2023-01-01"} }}response=db.records.delete(search_query)
Deletes one or more records by ID.
Signature:
defdelete_by_id(self,id_or_ids:Union[str,List[str]],transaction:Optional[Transaction]=None)->Dict[str,str]
Arguments:
id_or_ids(Union[str, List[str]]): Single ID or list of IDs to deletetransaction(Optional[Transaction]): Optional transaction object
Returns:
Dict[str, str]: Response data
Example:
# Delete single recordresponse=db.records.delete_by_id("record-123")# Delete multiple recordsresponse=db.records.delete_by_id(["record-123","record-456","record-789"])
Creates relationships between records.
Signature:
defattach(self,source:Union[str,Dict[str,Any]],target:Union[str,List[str],Dict[str,Any],List[Dict[str,Any]],Record,List[Record]],options:Optional[RelationshipOptions]=None,transaction:Optional[Transaction]=None)->Dict[str,str]
Arguments:
source(Union[str, Dict[str, Any]]): Source record ID or datatarget(Union[str, List[str], Dict[str, Any], List[Dict[str, Any]], Record, List[Record]]): Target record(s)options(Optional[RelationshipOptions]): Relationship optionsdirection(Optional[Literal["in", "out"]]): Relationship directiontype(Optional[str]): Relationship type
transaction(Optional[Transaction]): Optional transaction object
Returns:
Dict[str, str]: Response data
Example:
# Create relationship between recordsoptions=RelationshipOptions(type="HAS_EMPLOYEE",direction="out")response=db.records.attach(source="company-123",target=["employee-456","employee-789"],options=options)
Removes relationships between records.
Signature:
defdetach(self,source:Union[str,Dict[str,Any]],target:Union[str,List[str],Dict[str,Any],List[Dict[str,Any]],Record,List[Record]],options:Optional[RelationshipDetachOptions]=None,transaction:Optional[Transaction]=None)->Dict[str,str]
Arguments:
source(Union[str, Dict[str, Any]]): Source record ID or datatarget(Union[str, List[str], Dict[str, Any], List[Dict[str, Any]], Record, List[Record]]): Target record(s)options(Optional[RelationshipDetachOptions]): Detach optionsdirection(Optional[Literal["in", "out"]]): Relationship directiontypeOrTypes(Optional[Union[str, List[str]]]): Relationship type(s)
transaction(Optional[Transaction]): Optional transaction object
Returns:
Dict[str, str]: Response data
Example:
# Remove relationships between recordsoptions=RelationshipDetachOptions(typeOrTypes=["HAS_EMPLOYEE","MANAGES"],direction="out")response=db.records.detach(source="company-123",target="employee-456",options=options)
Imports records from CSV data.
Signature:
defimport_csv(self,label:str,data:str,options:Optional[Dict[str,bool]]=None,transaction:Optional[Transaction]=None)->List[Dict[str,Any]]
Arguments:
label(str): Label for imported recordsdata(Union[str, bytes]): CSV data to importoptions(Optional[Dict[str, bool]]): Import optionstransaction(Optional[Transaction]): Optional transaction object
Returns:
List[Dict[str, Any]]: Imported records data
Example:
# Import records from CSVdata="""name,age,department,roleJohn Doe,30,Engineering,Senior EngineerJane Smith,28,Product,Product ManagerBob Wilson,35,Engineering,Tech Lead"""records=db.records.import_csv(label="EMPLOYEE",data=data,options={"returnResult":True,"suggestTypes":True})
TheRecord class represents a record in RushDB and provides methods for manipulating individual records, including updates, relationships, and deletions.
classRecord:def__init__(self,client:"RushDB",data:Union[Dict[str,Any],None]=None)
Gets the record's unique identifier.
Type:str
Example:
record=db.records.create("USER", {"name":"John"})print(record.id)# e.g., "1234abcd-5678-..."
Gets the record's property types.
Type:str
Example:
record=db.records.create("USER", {"name":"John","age":25})print(record.proptypes)# Returns property type definitions
Gets the record's label.
Type:str
Example:
record=db.records.create("USER", {"name":"John"})print(record.label)# "USER"
Gets the record's creation timestamp from its ID.
Type:int
Example:
record=db.records.create("USER", {"name":"John"})print(record.timestamp)# Unix timestamp in milliseconds
Gets the record's creation date.
Type:datetime
Example:
record=db.records.create("USER", {"name":"John"})print(record.date)# datetime object
Updates all data for the record.
Signature:
defset(self,data:Dict[str,Any],transaction:Optional[Transaction]=None)->Dict[str,str]
Arguments:
data(Dict[str, Any]): New record datatransaction(Optional[Transaction]): Optional transaction object
Returns:
Dict[str, str]: Response data
Example:
record=db.records.create("USER", {"name":"John"})response=record.set({"name":"John Doe","email":"john@example.com","age":30})
Updates specific fields of the record.
Signature:
defupdate(self,data:Dict[str,Any],transaction:Optional[Transaction]=None)->Dict[str,str]
Arguments:
data(Dict[str, Any]): Partial record data to updatetransaction(Optional[Transaction]): Optional transaction object
Returns:
Dict[str, str]: Response data
Example:
record=db.records.create("USER", {"name":"John","email":"john@example.com"})response=record.update({"email":"john.doe@example.com"})
Creates relationships with other records.
Signature:
defattach(self,target:Union[str,List[str],Dict[str,Any],List[Dict[str,Any]],"Record",List["Record"]],options:Optional[RelationshipOptions]=None,transaction:Optional[Transaction]=None)->Dict[str,str]
Arguments:
target(Union[str, List[str], Dict[str, Any], List[Dict[str, Any]], Record, List[Record]]): Target record(s)options(Optional[RelationshipOptions]): Relationship optionsdirection(Optional[Literal["in", "out"]]): Relationship directiontype(Optional[str]): Relationship type
transaction(Optional[Transaction]): Optional transaction object
Returns:
Dict[str, str]: Response data
Example:
# Create two recordsuser=db.records.create("USER", {"name":"John"})group=db.records.create("GROUP", {"name":"Admins"})# Attach user to groupresponse=user.attach(target=group,options=RelationshipOptions(type="BELONGS_TO",direction="out" ))
Removes relationships with other records.
Signature:
defdetach(self,target:Union[str,List[str],Dict[str,Any],List[Dict[str,Any]],"Record",List["Record"]],options:Optional[RelationshipDetachOptions]=None,transaction:Optional[Transaction]=None)->Dict[str,str]
Arguments:
target(Union[str, List[str], Dict[str, Any], List[Dict[str, Any]], Record, List[Record]]): Target record(s)options(Optional[RelationshipDetachOptions]): Detach optionsdirection(Optional[Literal["in", "out"]]): Relationship directiontypeOrTypes(Optional[Union[str, List[str]]]): Relationship type(s)
transaction(Optional[Transaction]): Optional transaction object
Returns:
Dict[str, str]: Response data
Example:
# Detach user from groupresponse=user.detach(target=group,options=RelationshipDetachOptions(typeOrTypes="BELONGS_TO",direction="out" ))
Deletes the record.
Signature:
defdelete(self,transaction:Optional[Transaction]=None)->Dict[str,str]
Arguments:
transaction(Optional[Transaction]): Optional transaction object
Returns:
Dict[str, str]: Response data
Example:
user=db.records.create("USER", {"name":"John"})response=user.delete()
Here's a comprehensive example demonstrating various Record operations:
# Create a new recorduser=db.records.create("USER", {"name":"John Doe","email":"john@example.com","age":30})# Access propertiesprint(f"Record ID:{user.id}")print(f"Label:{user.label}")print(f"Created at:{user.date}")# Update record datauser.update({"age":31,"title":"Senior Developer"})# Create related recordsdepartment=db.records.create("DEPARTMENT", {"name":"Engineering"})project=db.records.create("PROJECT", {"name":"Secret Project"})# Create relationshipsuser.attach(target=department,options=RelationshipOptions(type="BELONGS_TO",direction="out" ))user.attach(target=project,options=RelationshipOptions(type="WORKS_ON",direction="out" ))# Remove relationshipuser.detach(target=project,options=RelationshipDetachOptions(typeOrTypes="WORKS_ON",direction="out" ))# Delete recorduser.delete()
Records can be manipulated within transactions for atomic operations:
# Start a transactiontransaction=db.transactions.begin()try:# Create useruser=db.records.create("USER", {"name":"John Doe"},transaction=transaction )# Update useruser.update( {"status":"active"},transaction=transaction )# Create and attach departmentdept=db.records.create("DEPARTMENT", {"name":"Engineering"},transaction=transaction )user.attach(target=dept,options=RelationshipOptions(type="BELONGS_TO"),transaction=transaction )# Explicitly commit the transaction to make changes permanenttransaction.commit()exceptExceptionase:# Rollback if any error occurstransaction.rollback()raisee# Alternative: Using context managerwithdb.transactions.begin()astransaction:# Perform operations...user=db.records.create("USER", {"name":"John Doe"},transaction=transaction )# Must explicitly commit - transactions are NOT automatically committedtransaction.commit()
ThePropertiesAPI class provides methods for managing and querying properties in RushDB.
classPropertiesAPI(BaseAPI):
Retrieves a list of properties based on optional search criteria.
Signature:
deffind(self,search_query:Optional[SearchQuery]=None,transaction:Optional[Transaction]=None)->List[Property]
Arguments:
query(Optional[SearchQuery]): Search query parameters for filtering propertiestransaction(Optional[Transaction]): Optional transaction object
Returns:
List[Property]: List of properties matching the search criteria
Example:
# Find all propertiesproperties=db.properties.find()# Find properties with specific criteriaquery= {"where": {"name": {"$startsWith":"user_"},# Properties starting with 'user_'"type":"string"# Only string type properties },"limit":10# Limit to 10 results}filtered_properties=db.properties.find(query)
Retrieves a specific property by its ID.
Signature:
deffind_by_id(self,property_id:str,transaction:Optional[Transaction]=None)->Property
Arguments:
property_id(str): Unique identifier of the propertytransaction(Optional[Transaction]): Optional transaction object
Returns:
Property: Property details
Example:
# Retrieve a specific property by IDproperty_details=db.properties.find_by_id("prop_123456")
Deletes a property by its ID.
Signature:
defdelete(self,property_id:str,transaction:Optional[Transaction]=None)->None
Arguments:
property_id(str): Unique identifier of the property to deletetransaction(Optional[Transaction]): Optional transaction object
Returns:
None
Example:
# Delete a propertydb.properties.delete("prop_123456")
Retrieves values for a specific property with optional sorting and pagination.
Signature:
defvalues(self,property_id:str,sort:Optional[Literal["asc","desc"]]=None,skip:Optional[int]=None,limit:Optional[int]=None,transaction:Optional[Transaction]=None)->PropertyValuesData
Arguments:
property_id(str): Unique identifier of the propertysort(Optional[Literal["asc", "desc"]]): Sort order of valuesskip(Optional[int]): Number of values to skip (for pagination)limit(Optional[int]): Maximum number of values to returntransaction(Optional[Transaction]): Optional transaction object
Returns:
PropertyValuesData: Property values data, including optional min/max and list of values
Example:
# Get property valuesvalues_data=db.properties.values(property_id="prop_age",sort="desc",# Sort values in descending orderskip=0,# Start from the first valuelimit=100# Return up to 100 values)# Access valuesprint(values_data.get('values', []))# List of property valuesprint(values_data.get('min'))# Minimum value (for numeric properties)print(values_data.get('max'))# Maximum value (for numeric properties)
# Find all propertiesall_properties=db.properties.find()forpropinall_properties:print(f"Property ID:{prop['id']}")print(f"Name:{prop['name']}")print(f"Type:{prop['type']}")print(f"Metadata:{prop.get('metadata','No metadata')}")print("---")# Detailed property searchquery= {"where": {"type":"number",# Only numeric properties"name": {"$contains":"score"}# Properties with 'score' in name },"limit":5# Limit to 5 results}numeric_score_properties=db.properties.find(query)# Get values for a specific propertyifnumeric_score_properties:first_prop=numeric_score_properties[0]prop_values=db.properties.values(property_id=first_prop['id'],sort="desc",limit=50 )print(f"Values for{first_prop['name']}:")print(f"Min:{prop_values.get('min')}")print(f"Max:{prop_values.get('max')}")# Detailed property examinationdetailed_prop=db.properties.find_by_id(first_prop['id'])print("Detailed Property Info:",detailed_prop)
RushDB supports the following property types:
"boolean": True/False values"datetime": Date and time values"null": Null/empty values"number": Numeric values"string": Text values
property= {"id":"prop_unique_id","name":"user_score","type":"number","metadata":Optional[str]# Optional additional information}property_with_value= {"id":"prop_unique_id","name":"user_score","type":"number","value":95.5# Actual property value}
Properties API methods support optional transactions for atomic operations:
# Using a transaction with explicit committransaction=db.transactions.begin()try:# Perform multiple property-related operationsproperty_to_delete=db.properties.find( {"where": {"name":"temp_property"}},transaction=transaction )[0]db.properties.delete(property_id=property_to_delete['id'],transaction=transaction )# Explicitly commit the transactiontransaction.commit()exceptExceptionase:# Rollback if any error occurstransaction.rollback()raisee# Alternative: Using context manager (auto-rollback on error)withdb.transactions.begin()astransaction:# Perform operationsproperty_to_delete=db.properties.find( {"where": {"name":"temp_property"}},transaction=transaction )[0]db.properties.delete(property_id=property_to_delete['id'],transaction=transaction )# Must explicitly commit - transactions are NOT automatically committedtransaction.commit()
When working with the PropertiesAPI, be prepared to handle potential errors:
try:# Attempt to find or delete a propertyproperty_details=db.properties.find_by_id("non_existent_prop")exceptRushDBErrorase:print(f"Error:{e}")print(f"Error Details:{e.details}")
TheLabelsAPI class provides methods for discovering and working with record labels in RushDB. Labels are used to categorize and type records, similar to table names in relational databases.
classLabelsAPI(BaseAPI):
Discovers labels (record types) that exist in the database and can optionally filter them based on search criteria.
Signature:
deffind(self,search_query:Optional[SearchQuery]=None,transaction:Optional[Transaction]=None)->Dict[str,int]
Arguments:
search_query(Optional[SearchQuery]): Search criteria to filter labelstransaction(Optional[Transaction]): Optional transaction object
Returns:
Dict[str, int]: Dictionary mapping label names to their record counts
Example:
# Get all labels in the databaseall_labels=db.labels.find()print("Available labels:",all_labels)# Output: {'USER': 150, 'DEPARTMENT': 12, 'PROJECT': 45, 'COMPANY': 3}# Search for labels amongst records matching a patternfromrushdb.models.search_queryimportSearchQueryquery=SearchQuery(where={"name": {"$contains":"alice"}})user_labels=db.labels.find(query)print("Labels for records containing 'alice':",user_labels)# Output: {'USER': 2, 'EMPLOYEE': 1}
# Discover all record types in the databaseall_labels=db.labels.find()print(f"Database contains{len(all_labels)} record types:")forlabel,countinall_labels.items():print(f" -{label}:{count} records")# Find labels for records with specific criteriaquery=SearchQuery(where={"status":"active","created_date": {"$gte":"2023-01-01"}})active_labels=db.labels.find(query)print("Labels for active records:")forlabel,countinactive_labels.items():print(f" -{label}:{count} active records")# Use with transactiontransaction=db.transactions.begin()try:labels_in_tx=db.labels.find(transaction=transaction)# Process labels...transaction.commit()exceptExceptionase:transaction.rollback()raisee
TheRelationshipsAPI class provides functionality for querying and analyzing relationships between records in RushDB. Relationships represent connections or associations between different records.
classRelationshipsAPI(BaseAPI):
Search for and retrieve relationships matching the specified criteria with support for pagination and transactions.
Signature:
asyncdeffind(self,search_query:Optional[SearchQuery]=None,pagination:Optional[PaginationParams]=None,transaction:Optional[Union[Transaction,str]]=None)->List[Relationship]
Arguments:
search_query(Optional[SearchQuery]): Search criteria to filter relationshipspagination(Optional[PaginationParams]): Pagination options withlimitandskiptransaction(Optional[Union[Transaction, str]]): Optional transaction object or ID
Returns:
List[Relationship]: List of relationships matching the search criteria
Example:
importasynciofromrushdb.models.search_queryimportSearchQueryasyncdefmain():# Find all relationshipsall_relationships=awaitdb.relationships.find()print(f"Total relationships:{len(all_relationships)}")# Find relationships with paginationpagination= {"limit":50,"skip":0}first_page=awaitdb.relationships.find(pagination=pagination)# Find specific relationship typesquery=SearchQuery(where={"type":"BELONGS_TO"})belongs_to_rels=awaitdb.relationships.find(search_query=query)# Find relationships involving specific recordsuser_query=SearchQuery(where={"$or": [ {"source_id":"user-123"}, {"target_id":"user-123"} ] })user_relationships=awaitdb.relationships.find(search_query=user_query)# Run the async functionasyncio.run(main())
ThePaginationParams TypedDict defines pagination options:
classPaginationParams(TypedDict,total=False):limit:int# Maximum number of relationships to returnskip:int# Number of relationships to skip
importasynciofromrushdb.models.search_queryimportSearchQueryasyncdefexplore_relationships():# Get overview of all relationshipsall_rels=awaitdb.relationships.find()print(f"Database contains{len(all_rels)} relationships")# Paginate through relationshipspage_size=25page=0whileTrue:pagination= {"limit":page_size,"skip":page*page_size}relationships=awaitdb.relationships.find(pagination=pagination)ifnotrelationships:breakprint(f"Page{page+1}:{len(relationships)} relationships")forrelinrelationships:print(f"{rel['source_id']} --[{rel['type']}]-->{rel['target_id']}")page+=1iflen(relationships)<page_size:break# Find relationships by typequery=SearchQuery(where={"type":"WORKS_ON"})work_relationships=awaitdb.relationships.find(search_query=query)print(f"Found{len(work_relationships)} 'WORKS_ON' relationships")# Find relationships within a transactiontransaction=db.transactions.begin()try:tx_rels=awaitdb.relationships.find(transaction=transaction)# Process relationships...transaction.commit()exceptExceptionase:transaction.rollback()raisee# Run the exampleasyncio.run(explore_relationships())
Both LabelsAPI and RelationshipsAPI support transactions:
importasyncioasyncdeftransaction_example():transaction=db.transactions.begin()try:# Find labels within transactionlabels=db.labels.find(transaction=transaction)# Find relationships within transactionrelationships=awaitdb.relationships.find(transaction=transaction)# Perform operations based on discovered data...# Explicitly commit the transactiontransaction.commit()exceptExceptionase:# Rollback on any errortransaction.rollback()raiseeasyncio.run(transaction_example())
Note: The RelationshipsAPI methods are async and require the use ofawait andasyncio for proper execution.
About
RushDB Python SDK
Topics
Resources
License
Code of conduct
Contributing
Uh oh!
There was an error while loading.Please reload this page.
Stars
Watchers
Forks
Packages0
Uh oh!
There was an error while loading.Please reload this page.
Contributors2
Uh oh!
There was an error while loading.Please reload this page.