Movatterモバイル変換


[0]ホーム

URL:


Skip to content

SQLAlchemySession

Bases:SessionABC

SQLAlchemy implementation of :pyclass:agents.memory.session.Session.

Source code insrc/agents/extensions/memory/sqlalchemy_session.py
 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334
classSQLAlchemySession(SessionABC):"""SQLAlchemy implementation of :pyclass:`agents.memory.session.Session`."""_metadata:MetaData_sessions:Table_messages:Tabledef__init__(self,session_id:str,*,engine:AsyncEngine,create_tables:bool=False,sessions_table:str="agent_sessions",messages_table:str="agent_messages",):"""Initializes a new SQLAlchemySession.        Args:            session_id (str): Unique identifier for the conversation.            engine (AsyncEngine): A pre-configured SQLAlchemy async engine. The engine                must be created with an async driver (e.g., 'postgresql+asyncpg://',                'mysql+aiomysql://', or 'sqlite+aiosqlite://').            create_tables (bool, optional): Whether to automatically create the required                tables and indexes. Defaults to False for production use. Set to True for                development and testing when migrations aren't used.            sessions_table (str, optional): Override the default table name for sessions if needed.            messages_table (str, optional): Override the default table name for messages if needed.        """self.session_id=session_idself._engine=engineself._lock=asyncio.Lock()self._metadata=MetaData()self._sessions=Table(sessions_table,self._metadata,Column("session_id",String,primary_key=True),Column("created_at",TIMESTAMP(timezone=False),server_default=sql_text("CURRENT_TIMESTAMP"),nullable=False,),Column("updated_at",TIMESTAMP(timezone=False),server_default=sql_text("CURRENT_TIMESTAMP"),onupdate=sql_text("CURRENT_TIMESTAMP"),nullable=False,),)self._messages=Table(messages_table,self._metadata,Column("id",Integer,primary_key=True,autoincrement=True),Column("session_id",String,ForeignKey(f"{sessions_table}.session_id",ondelete="CASCADE"),nullable=False,),Column("message_data",Text,nullable=False),Column("created_at",TIMESTAMP(timezone=False),server_default=sql_text("CURRENT_TIMESTAMP"),nullable=False,),Index(f"idx_{messages_table}_session_time","session_id","created_at",),sqlite_autoincrement=True,)# Async session factoryself._session_factory=async_sessionmaker(self._engine,expire_on_commit=False)self._create_tables=create_tables# ---------------------------------------------------------------------# Convenience constructors# ---------------------------------------------------------------------@classmethoddeffrom_url(cls,session_id:str,*,url:str,engine_kwargs:dict[str,Any]|None=None,**kwargs:Any,)->SQLAlchemySession:"""Create a session from a database URL string.        Args:            session_id (str): Conversation ID.            url (str): Any SQLAlchemy async URL, e.g. "postgresql+asyncpg://user:pass@host/db".            engine_kwargs (dict[str, Any] | None): Additional keyword arguments forwarded to                sqlalchemy.ext.asyncio.create_async_engine.            **kwargs: Additional keyword arguments forwarded to the main constructor                (e.g., create_tables, custom table names, etc.).        Returns:            SQLAlchemySession: An instance of SQLAlchemySession connected to the specified database.        """engine_kwargs=engine_kwargsor{}engine=create_async_engine(url,**engine_kwargs)returncls(session_id,engine=engine,**kwargs)asyncdef_serialize_item(self,item:TResponseInputItem)->str:"""Serialize an item to JSON string. Can be overridden by subclasses."""returnjson.dumps(item,separators=(",",":"))asyncdef_deserialize_item(self,item:str)->TResponseInputItem:"""Deserialize a JSON string to an item. Can be overridden by subclasses."""returnjson.loads(item)# type: ignore[no-any-return]# ------------------------------------------------------------------# Session protocol implementation# ------------------------------------------------------------------asyncdef_ensure_tables(self)->None:"""Ensure tables are created before any database operations."""ifself._create_tables:asyncwithself._engine.begin()asconn:awaitconn.run_sync(self._metadata.create_all)self._create_tables=False# Only create onceasyncdefget_items(self,limit:int|None=None)->list[TResponseInputItem]:"""Retrieve the conversation history for this session.        Args:            limit: Maximum number of items to retrieve. If None, retrieves all items.                   When specified, returns the latest N items in chronological order.        Returns:            List of input items representing the conversation history        """awaitself._ensure_tables()asyncwithself._session_factory()assess:iflimitisNone:stmt=(select(self._messages.c.message_data).where(self._messages.c.session_id==self.session_id).order_by(self._messages.c.created_at.asc(),self._messages.c.id.asc(),))else:stmt=(select(self._messages.c.message_data).where(self._messages.c.session_id==self.session_id)# Use DESC + LIMIT to get the latest N# then reverse later for chronological order..order_by(self._messages.c.created_at.desc(),self._messages.c.id.desc(),).limit(limit))result=awaitsess.execute(stmt)rows:list[str]=[row[0]forrowinresult.all()]iflimitisnotNone:rows.reverse()items:list[TResponseInputItem]=[]forrawinrows:try:items.append(awaitself._deserialize_item(raw))exceptjson.JSONDecodeError:# Skip corrupted rowscontinuereturnitemsasyncdefadd_items(self,items:list[TResponseInputItem])->None:"""Add new items to the conversation history.        Args:            items: List of input items to add to the history        """ifnotitems:returnawaitself._ensure_tables()payload=[{"session_id":self.session_id,"message_data":awaitself._serialize_item(item),}foriteminitems]asyncwithself._session_factory()assess:asyncwithsess.begin():# Ensure the parent session row exists - use merge for cross-DB compatibility# Check if session existsexisting=awaitsess.execute(select(self._sessions.c.session_id).where(self._sessions.c.session_id==self.session_id))ifnotexisting.scalar_one_or_none():# Session doesn't exist, create itawaitsess.execute(insert(self._sessions).values({"session_id":self.session_id}))# Insert messages in bulkawaitsess.execute(insert(self._messages),payload)# Touch updated_at columnawaitsess.execute(update(self._sessions).where(self._sessions.c.session_id==self.session_id).values(updated_at=sql_text("CURRENT_TIMESTAMP")))asyncdefpop_item(self)->TResponseInputItem|None:"""Remove and return the most recent item from the session.        Returns:            The most recent item if it exists, None if the session is empty        """awaitself._ensure_tables()asyncwithself._session_factory()assess:asyncwithsess.begin():# Fallback for all dialects - get ID first, then deletesubq=(select(self._messages.c.id).where(self._messages.c.session_id==self.session_id).order_by(self._messages.c.created_at.desc(),self._messages.c.id.desc(),).limit(1))res=awaitsess.execute(subq)row_id=res.scalar_one_or_none()ifrow_idisNone:returnNone# Fetch data before deletingres_data=awaitsess.execute(select(self._messages.c.message_data).where(self._messages.c.id==row_id))row=res_data.scalar_one_or_none()awaitsess.execute(delete(self._messages).where(self._messages.c.id==row_id))ifrowisNone:returnNonetry:returnawaitself._deserialize_item(row)exceptjson.JSONDecodeError:returnNoneasyncdefclear_session(self)->None:"""Clear all items for this session."""awaitself._ensure_tables()asyncwithself._session_factory()assess:asyncwithsess.begin():awaitsess.execute(delete(self._messages).where(self._messages.c.session_id==self.session_id))awaitsess.execute(delete(self._sessions).where(self._sessions.c.session_id==self.session_id))@propertydefengine(self)->AsyncEngine:"""Access the underlying SQLAlchemy AsyncEngine.        This property provides direct access to the engine for advanced use cases,        such as checking connection pool status, configuring engine settings,        or manually disposing the engine when needed.        Returns:            AsyncEngine: The SQLAlchemy async engine instance.        """returnself._engine

engineproperty

engine:AsyncEngine

Access the underlying SQLAlchemy AsyncEngine.

This property provides direct access to the engine for advanced use cases,such as checking connection pool status, configuring engine settings,or manually disposing the engine when needed.

Returns:

NameTypeDescription
AsyncEngineAsyncEngine

The SQLAlchemy async engine instance.

__init__

__init__(session_id:str,*,engine:AsyncEngine,create_tables:bool=False,sessions_table:str="agent_sessions",messages_table:str="agent_messages",)

Initializes a new SQLAlchemySession.

Parameters:

NameTypeDescriptionDefault
session_idstr

Unique identifier for the conversation.

required
engineAsyncEngine

A pre-configured SQLAlchemy async engine. The enginemust be created with an async driver (e.g., 'postgresql+asyncpg://','mysql+aiomysql://', or 'sqlite+aiosqlite://').

required
create_tablesbool

Whether to automatically create the requiredtables and indexes. Defaults to False for production use. Set to True fordevelopment and testing when migrations aren't used.

False
sessions_tablestr

Override the default table name for sessions if needed.

'agent_sessions'
messages_tablestr

Override the default table name for messages if needed.

'agent_messages'
Source code insrc/agents/extensions/memory/sqlalchemy_session.py
def__init__(self,session_id:str,*,engine:AsyncEngine,create_tables:bool=False,sessions_table:str="agent_sessions",messages_table:str="agent_messages",):"""Initializes a new SQLAlchemySession.    Args:        session_id (str): Unique identifier for the conversation.        engine (AsyncEngine): A pre-configured SQLAlchemy async engine. The engine            must be created with an async driver (e.g., 'postgresql+asyncpg://',            'mysql+aiomysql://', or 'sqlite+aiosqlite://').        create_tables (bool, optional): Whether to automatically create the required            tables and indexes. Defaults to False for production use. Set to True for            development and testing when migrations aren't used.        sessions_table (str, optional): Override the default table name for sessions if needed.        messages_table (str, optional): Override the default table name for messages if needed.    """self.session_id=session_idself._engine=engineself._lock=asyncio.Lock()self._metadata=MetaData()self._sessions=Table(sessions_table,self._metadata,Column("session_id",String,primary_key=True),Column("created_at",TIMESTAMP(timezone=False),server_default=sql_text("CURRENT_TIMESTAMP"),nullable=False,),Column("updated_at",TIMESTAMP(timezone=False),server_default=sql_text("CURRENT_TIMESTAMP"),onupdate=sql_text("CURRENT_TIMESTAMP"),nullable=False,),)self._messages=Table(messages_table,self._metadata,Column("id",Integer,primary_key=True,autoincrement=True),Column("session_id",String,ForeignKey(f"{sessions_table}.session_id",ondelete="CASCADE"),nullable=False,),Column("message_data",Text,nullable=False),Column("created_at",TIMESTAMP(timezone=False),server_default=sql_text("CURRENT_TIMESTAMP"),nullable=False,),Index(f"idx_{messages_table}_session_time","session_id","created_at",),sqlite_autoincrement=True,)# Async session factoryself._session_factory=async_sessionmaker(self._engine,expire_on_commit=False)self._create_tables=create_tables

from_urlclassmethod

from_url(session_id:str,*,url:str,engine_kwargs:dict[str,Any]|None=None,**kwargs:Any,)->SQLAlchemySession

Create a session from a database URL string.

Parameters:

NameTypeDescriptionDefault
session_idstr

Conversation ID.

required
urlstr

Any SQLAlchemy async URL, e.g. "postgresql+asyncpg://user:pass@host/db".

required
engine_kwargsdict[str,Any] | None

Additional keyword arguments forwarded tosqlalchemy.ext.asyncio.create_async_engine.

None
**kwargsAny

Additional keyword arguments forwarded to the main constructor(e.g., create_tables, custom table names, etc.).

{}

Returns:

NameTypeDescription
SQLAlchemySessionSQLAlchemySession

An instance of SQLAlchemySession connected to the specified database.

Source code insrc/agents/extensions/memory/sqlalchemy_session.py
@classmethoddeffrom_url(cls,session_id:str,*,url:str,engine_kwargs:dict[str,Any]|None=None,**kwargs:Any,)->SQLAlchemySession:"""Create a session from a database URL string.    Args:        session_id (str): Conversation ID.        url (str): Any SQLAlchemy async URL, e.g. "postgresql+asyncpg://user:pass@host/db".        engine_kwargs (dict[str, Any] | None): Additional keyword arguments forwarded to            sqlalchemy.ext.asyncio.create_async_engine.        **kwargs: Additional keyword arguments forwarded to the main constructor            (e.g., create_tables, custom table names, etc.).    Returns:        SQLAlchemySession: An instance of SQLAlchemySession connected to the specified database.    """engine_kwargs=engine_kwargsor{}engine=create_async_engine(url,**engine_kwargs)returncls(session_id,engine=engine,**kwargs)

get_itemsasync

get_items(limit:int|None=None,)->list[TResponseInputItem]

Retrieve the conversation history for this session.

Parameters:

NameTypeDescriptionDefault
limitint | None

Maximum number of items to retrieve. If None, retrieves all items. When specified, returns the latest N items in chronological order.

None

Returns:

TypeDescription
list[TResponseInputItem]

List of input items representing the conversation history

Source code insrc/agents/extensions/memory/sqlalchemy_session.py
asyncdefget_items(self,limit:int|None=None)->list[TResponseInputItem]:"""Retrieve the conversation history for this session.    Args:        limit: Maximum number of items to retrieve. If None, retrieves all items.               When specified, returns the latest N items in chronological order.    Returns:        List of input items representing the conversation history    """awaitself._ensure_tables()asyncwithself._session_factory()assess:iflimitisNone:stmt=(select(self._messages.c.message_data).where(self._messages.c.session_id==self.session_id).order_by(self._messages.c.created_at.asc(),self._messages.c.id.asc(),))else:stmt=(select(self._messages.c.message_data).where(self._messages.c.session_id==self.session_id)# Use DESC + LIMIT to get the latest N# then reverse later for chronological order..order_by(self._messages.c.created_at.desc(),self._messages.c.id.desc(),).limit(limit))result=awaitsess.execute(stmt)rows:list[str]=[row[0]forrowinresult.all()]iflimitisnotNone:rows.reverse()items:list[TResponseInputItem]=[]forrawinrows:try:items.append(awaitself._deserialize_item(raw))exceptjson.JSONDecodeError:# Skip corrupted rowscontinuereturnitems

add_itemsasync

add_items(items:list[TResponseInputItem])->None

Add new items to the conversation history.

Parameters:

NameTypeDescriptionDefault
itemslist[TResponseInputItem]

List of input items to add to the history

required
Source code insrc/agents/extensions/memory/sqlalchemy_session.py
asyncdefadd_items(self,items:list[TResponseInputItem])->None:"""Add new items to the conversation history.    Args:        items: List of input items to add to the history    """ifnotitems:returnawaitself._ensure_tables()payload=[{"session_id":self.session_id,"message_data":awaitself._serialize_item(item),}foriteminitems]asyncwithself._session_factory()assess:asyncwithsess.begin():# Ensure the parent session row exists - use merge for cross-DB compatibility# Check if session existsexisting=awaitsess.execute(select(self._sessions.c.session_id).where(self._sessions.c.session_id==self.session_id))ifnotexisting.scalar_one_or_none():# Session doesn't exist, create itawaitsess.execute(insert(self._sessions).values({"session_id":self.session_id}))# Insert messages in bulkawaitsess.execute(insert(self._messages),payload)# Touch updated_at columnawaitsess.execute(update(self._sessions).where(self._sessions.c.session_id==self.session_id).values(updated_at=sql_text("CURRENT_TIMESTAMP")))

pop_itemasync

pop_item()->TResponseInputItem|None

Remove and return the most recent item from the session.

Returns:

TypeDescription
TResponseInputItem | None

The most recent item if it exists, None if the session is empty

Source code insrc/agents/extensions/memory/sqlalchemy_session.py
asyncdefpop_item(self)->TResponseInputItem|None:"""Remove and return the most recent item from the session.    Returns:        The most recent item if it exists, None if the session is empty    """awaitself._ensure_tables()asyncwithself._session_factory()assess:asyncwithsess.begin():# Fallback for all dialects - get ID first, then deletesubq=(select(self._messages.c.id).where(self._messages.c.session_id==self.session_id).order_by(self._messages.c.created_at.desc(),self._messages.c.id.desc(),).limit(1))res=awaitsess.execute(subq)row_id=res.scalar_one_or_none()ifrow_idisNone:returnNone# Fetch data before deletingres_data=awaitsess.execute(select(self._messages.c.message_data).where(self._messages.c.id==row_id))row=res_data.scalar_one_or_none()awaitsess.execute(delete(self._messages).where(self._messages.c.id==row_id))ifrowisNone:returnNonetry:returnawaitself._deserialize_item(row)exceptjson.JSONDecodeError:returnNone

clear_sessionasync

clear_session()->None

Clear all items for this session.

Source code insrc/agents/extensions/memory/sqlalchemy_session.py
asyncdefclear_session(self)->None:"""Clear all items for this session."""awaitself._ensure_tables()asyncwithself._session_factory()assess:asyncwithsess.begin():awaitsess.execute(delete(self._messages).where(self._messages.c.session_id==self.session_id))awaitsess.execute(delete(self._sessions).where(self._sessions.c.session_id==self.session_id))

[8]ページ先頭

©2009-2025 Movatter.jp