Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Memory

Session

Bases:Protocol

Protocol for session implementations.

Session stores conversation history for a specific session, allowingagents to maintain context without requiring explicit manual memory management.

Source code insrc/agents/memory/session.py
@runtime_checkableclassSession(Protocol):"""Protocol for session implementations.    Session stores conversation history for a specific session, allowing    agents to maintain context without requiring explicit manual memory management.    """session_id:strasyncdefget_items(self,limit:int|None=None)->list[TResponseInputItem]:"""Retrieve the conversation history for this session.        Args:            limit: Maximum number of items to retrieve. If None, retrieves all items.                   When specified, returns the latest N items in chronological order.        Returns:            List of input items representing the conversation history        """...asyncdefadd_items(self,items:list[TResponseInputItem])->None:"""Add new items to the conversation history.        Args:            items: List of input items to add to the history        """...asyncdefpop_item(self)->TResponseInputItem|None:"""Remove and return the most recent item from the session.        Returns:            The most recent item if it exists, None if the session is empty        """...asyncdefclear_session(self)->None:"""Clear all items for this session."""...

get_itemsasync

get_items(limit:int|None=None,)->list[TResponseInputItem]

Retrieve the conversation history for this session.

Parameters:

NameTypeDescriptionDefault
limitint | None

Maximum number of items to retrieve. If None, retrieves all items. When specified, returns the latest N items in chronological order.

None

Returns:

TypeDescription
list[TResponseInputItem]

List of input items representing the conversation history

Source code insrc/agents/memory/session.py
asyncdefget_items(self,limit:int|None=None)->list[TResponseInputItem]:"""Retrieve the conversation history for this session.    Args:        limit: Maximum number of items to retrieve. If None, retrieves all items.               When specified, returns the latest N items in chronological order.    Returns:        List of input items representing the conversation history    """...

add_itemsasync

add_items(items:list[TResponseInputItem])->None

Add new items to the conversation history.

Parameters:

NameTypeDescriptionDefault
itemslist[TResponseInputItem]

List of input items to add to the history

required
Source code insrc/agents/memory/session.py
asyncdefadd_items(self,items:list[TResponseInputItem])->None:"""Add new items to the conversation history.    Args:        items: List of input items to add to the history    """...

pop_itemasync

pop_item()->TResponseInputItem|None

Remove and return the most recent item from the session.

Returns:

TypeDescription
TResponseInputItem | None

The most recent item if it exists, None if the session is empty

Source code insrc/agents/memory/session.py
asyncdefpop_item(self)->TResponseInputItem|None:"""Remove and return the most recent item from the session.    Returns:        The most recent item if it exists, None if the session is empty    """...

clear_sessionasync

clear_session()->None

Clear all items for this session.

Source code insrc/agents/memory/session.py
asyncdefclear_session(self)->None:"""Clear all items for this session."""...

SQLiteSession

Bases:SessionABC

SQLite-based implementation of session storage.

This implementation stores conversation history in a SQLite database.By default, uses an in-memory database that is lost when the process ends.For persistent storage, provide a file path.

Source code insrc/agents/memory/sqlite_session.py
classSQLiteSession(SessionABC):"""SQLite-based implementation of session storage.    This implementation stores conversation history in a SQLite database.    By default, uses an in-memory database that is lost when the process ends.    For persistent storage, provide a file path.    """def__init__(self,session_id:str,db_path:str|Path=":memory:",sessions_table:str="agent_sessions",messages_table:str="agent_messages",):"""Initialize the SQLite session.        Args:            session_id: Unique identifier for the conversation session            db_path: Path to the SQLite database file. Defaults to ':memory:' (in-memory database)            sessions_table: Name of the table to store session metadata. Defaults to                'agent_sessions'            messages_table: Name of the table to store message data. Defaults to 'agent_messages'        """self.session_id=session_idself.db_path=db_pathself.sessions_table=sessions_tableself.messages_table=messages_tableself._local=threading.local()self._lock=threading.Lock()# For in-memory databases, we need a shared connection to avoid thread isolation# For file databases, we use thread-local connections for better concurrencyself._is_memory_db=str(db_path)==":memory:"ifself._is_memory_db:self._shared_connection=sqlite3.connect(":memory:",check_same_thread=False)self._shared_connection.execute("PRAGMA journal_mode=WAL")self._init_db_for_connection(self._shared_connection)else:# For file databases, initialize the schema once since it persistsinit_conn=sqlite3.connect(str(self.db_path),check_same_thread=False)init_conn.execute("PRAGMA journal_mode=WAL")self._init_db_for_connection(init_conn)init_conn.close()def_get_connection(self)->sqlite3.Connection:"""Get a database connection."""ifself._is_memory_db:# Use shared connection for in-memory database to avoid thread isolationreturnself._shared_connectionelse:# Use thread-local connections for file databasesifnothasattr(self._local,"connection"):self._local.connection=sqlite3.connect(str(self.db_path),check_same_thread=False,)self._local.connection.execute("PRAGMA journal_mode=WAL")assertisinstance(self._local.connection,sqlite3.Connection),(f"Expected sqlite3.Connection, got{type(self._local.connection)}")returnself._local.connectiondef_init_db_for_connection(self,conn:sqlite3.Connection)->None:"""Initialize the database schema for a specific connection."""conn.execute(f"""            CREATE TABLE IF NOT EXISTS{self.sessions_table} (                session_id TEXT PRIMARY KEY,                created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,                updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP            )        """)conn.execute(f"""            CREATE TABLE IF NOT EXISTS{self.messages_table} (                id INTEGER PRIMARY KEY AUTOINCREMENT,                session_id TEXT NOT NULL,                message_data TEXT NOT NULL,                created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,                FOREIGN KEY (session_id) REFERENCES{self.sessions_table} (session_id)                    ON DELETE CASCADE            )        """)conn.execute(f"""            CREATE INDEX IF NOT EXISTS idx_{self.messages_table}_session_id            ON{self.messages_table} (session_id, created_at)        """)conn.commit()asyncdefget_items(self,limit:int|None=None)->list[TResponseInputItem]:"""Retrieve the conversation history for this session.        Args:            limit: Maximum number of items to retrieve. If None, retrieves all items.                   When specified, returns the latest N items in chronological order.        Returns:            List of input items representing the conversation history        """def_get_items_sync():conn=self._get_connection()withself._lockifself._is_memory_dbelsethreading.Lock():iflimitisNone:# Fetch all items in chronological ordercursor=conn.execute(f"""                        SELECT message_data FROM{self.messages_table}                        WHERE session_id = ?                        ORDER BY created_at ASC                    """,(self.session_id,),)else:# Fetch the latest N items in chronological ordercursor=conn.execute(f"""                        SELECT message_data FROM{self.messages_table}                        WHERE session_id = ?                        ORDER BY created_at DESC                        LIMIT ?                        """,(self.session_id,limit),)rows=cursor.fetchall()# Reverse to get chronological order when using DESCiflimitisnotNone:rows=list(reversed(rows))items=[]for(message_data,)inrows:try:item=json.loads(message_data)items.append(item)exceptjson.JSONDecodeError:# Skip invalid JSON entriescontinuereturnitemsreturnawaitasyncio.to_thread(_get_items_sync)asyncdefadd_items(self,items:list[TResponseInputItem])->None:"""Add new items to the conversation history.        Args:            items: List of input items to add to the history        """ifnotitems:returndef_add_items_sync():conn=self._get_connection()withself._lockifself._is_memory_dbelsethreading.Lock():# Ensure session existsconn.execute(f"""                    INSERT OR IGNORE INTO{self.sessions_table} (session_id) VALUES (?)                """,(self.session_id,),)# Add itemsmessage_data=[(self.session_id,json.dumps(item))foriteminitems]conn.executemany(f"""                    INSERT INTO{self.messages_table} (session_id, message_data) VALUES (?, ?)                """,message_data,)# Update session timestampconn.execute(f"""                    UPDATE{self.sessions_table}                    SET updated_at = CURRENT_TIMESTAMP                    WHERE session_id = ?                """,(self.session_id,),)conn.commit()awaitasyncio.to_thread(_add_items_sync)asyncdefpop_item(self)->TResponseInputItem|None:"""Remove and return the most recent item from the session.        Returns:            The most recent item if it exists, None if the session is empty        """def_pop_item_sync():conn=self._get_connection()withself._lockifself._is_memory_dbelsethreading.Lock():# Use DELETE with RETURNING to atomically delete and return the most recent itemcursor=conn.execute(f"""                    DELETE FROM{self.messages_table}                    WHERE id = (                        SELECT id FROM{self.messages_table}                        WHERE session_id = ?                        ORDER BY created_at DESC                        LIMIT 1                    )                    RETURNING message_data                    """,(self.session_id,),)result=cursor.fetchone()conn.commit()ifresult:message_data=result[0]try:item=json.loads(message_data)returnitemexceptjson.JSONDecodeError:# Return None for corrupted JSON entries (already deleted)returnNonereturnNonereturnawaitasyncio.to_thread(_pop_item_sync)asyncdefclear_session(self)->None:"""Clear all items for this session."""def_clear_session_sync():conn=self._get_connection()withself._lockifself._is_memory_dbelsethreading.Lock():conn.execute(f"DELETE FROM{self.messages_table} WHERE session_id = ?",(self.session_id,),)conn.execute(f"DELETE FROM{self.sessions_table} WHERE session_id = ?",(self.session_id,),)conn.commit()awaitasyncio.to_thread(_clear_session_sync)defclose(self)->None:"""Close the database connection."""ifself._is_memory_db:ifhasattr(self,"_shared_connection"):self._shared_connection.close()else:ifhasattr(self._local,"connection"):self._local.connection.close()

__init__

__init__(session_id:str,db_path:str|Path=":memory:",sessions_table:str="agent_sessions",messages_table:str="agent_messages",)

Initialize the SQLite session.

Parameters:

NameTypeDescriptionDefault
session_idstr

Unique identifier for the conversation session

required
db_pathstr |Path

Path to the SQLite database file. Defaults to ':memory:' (in-memory database)

':memory:'
sessions_tablestr

Name of the table to store session metadata. Defaults to'agent_sessions'

'agent_sessions'
messages_tablestr

Name of the table to store message data. Defaults to 'agent_messages'

'agent_messages'
Source code insrc/agents/memory/sqlite_session.py
def__init__(self,session_id:str,db_path:str|Path=":memory:",sessions_table:str="agent_sessions",messages_table:str="agent_messages",):"""Initialize the SQLite session.    Args:        session_id: Unique identifier for the conversation session        db_path: Path to the SQLite database file. Defaults to ':memory:' (in-memory database)        sessions_table: Name of the table to store session metadata. Defaults to            'agent_sessions'        messages_table: Name of the table to store message data. Defaults to 'agent_messages'    """self.session_id=session_idself.db_path=db_pathself.sessions_table=sessions_tableself.messages_table=messages_tableself._local=threading.local()self._lock=threading.Lock()# For in-memory databases, we need a shared connection to avoid thread isolation# For file databases, we use thread-local connections for better concurrencyself._is_memory_db=str(db_path)==":memory:"ifself._is_memory_db:self._shared_connection=sqlite3.connect(":memory:",check_same_thread=False)self._shared_connection.execute("PRAGMA journal_mode=WAL")self._init_db_for_connection(self._shared_connection)else:# For file databases, initialize the schema once since it persistsinit_conn=sqlite3.connect(str(self.db_path),check_same_thread=False)init_conn.execute("PRAGMA journal_mode=WAL")self._init_db_for_connection(init_conn)init_conn.close()

get_itemsasync

get_items(limit:int|None=None,)->list[TResponseInputItem]

Retrieve the conversation history for this session.

Parameters:

NameTypeDescriptionDefault
limitint | None

Maximum number of items to retrieve. If None, retrieves all items. When specified, returns the latest N items in chronological order.

None

Returns:

TypeDescription
list[TResponseInputItem]

List of input items representing the conversation history

Source code insrc/agents/memory/sqlite_session.py
asyncdefget_items(self,limit:int|None=None)->list[TResponseInputItem]:"""Retrieve the conversation history for this session.    Args:        limit: Maximum number of items to retrieve. If None, retrieves all items.               When specified, returns the latest N items in chronological order.    Returns:        List of input items representing the conversation history    """def_get_items_sync():conn=self._get_connection()withself._lockifself._is_memory_dbelsethreading.Lock():iflimitisNone:# Fetch all items in chronological ordercursor=conn.execute(f"""                    SELECT message_data FROM{self.messages_table}                    WHERE session_id = ?                    ORDER BY created_at ASC                """,(self.session_id,),)else:# Fetch the latest N items in chronological ordercursor=conn.execute(f"""                    SELECT message_data FROM{self.messages_table}                    WHERE session_id = ?                    ORDER BY created_at DESC                    LIMIT ?                    """,(self.session_id,limit),)rows=cursor.fetchall()# Reverse to get chronological order when using DESCiflimitisnotNone:rows=list(reversed(rows))items=[]for(message_data,)inrows:try:item=json.loads(message_data)items.append(item)exceptjson.JSONDecodeError:# Skip invalid JSON entriescontinuereturnitemsreturnawaitasyncio.to_thread(_get_items_sync)

add_itemsasync

add_items(items:list[TResponseInputItem])->None

Add new items to the conversation history.

Parameters:

NameTypeDescriptionDefault
itemslist[TResponseInputItem]

List of input items to add to the history

required
Source code insrc/agents/memory/sqlite_session.py
asyncdefadd_items(self,items:list[TResponseInputItem])->None:"""Add new items to the conversation history.    Args:        items: List of input items to add to the history    """ifnotitems:returndef_add_items_sync():conn=self._get_connection()withself._lockifself._is_memory_dbelsethreading.Lock():# Ensure session existsconn.execute(f"""                INSERT OR IGNORE INTO{self.sessions_table} (session_id) VALUES (?)            """,(self.session_id,),)# Add itemsmessage_data=[(self.session_id,json.dumps(item))foriteminitems]conn.executemany(f"""                INSERT INTO{self.messages_table} (session_id, message_data) VALUES (?, ?)            """,message_data,)# Update session timestampconn.execute(f"""                UPDATE{self.sessions_table}                SET updated_at = CURRENT_TIMESTAMP                WHERE session_id = ?            """,(self.session_id,),)conn.commit()awaitasyncio.to_thread(_add_items_sync)

pop_itemasync

pop_item()->TResponseInputItem|None

Remove and return the most recent item from the session.

Returns:

TypeDescription
TResponseInputItem | None

The most recent item if it exists, None if the session is empty

Source code insrc/agents/memory/sqlite_session.py
asyncdefpop_item(self)->TResponseInputItem|None:"""Remove and return the most recent item from the session.    Returns:        The most recent item if it exists, None if the session is empty    """def_pop_item_sync():conn=self._get_connection()withself._lockifself._is_memory_dbelsethreading.Lock():# Use DELETE with RETURNING to atomically delete and return the most recent itemcursor=conn.execute(f"""                DELETE FROM{self.messages_table}                WHERE id = (                    SELECT id FROM{self.messages_table}                    WHERE session_id = ?                    ORDER BY created_at DESC                    LIMIT 1                )                RETURNING message_data                """,(self.session_id,),)result=cursor.fetchone()conn.commit()ifresult:message_data=result[0]try:item=json.loads(message_data)returnitemexceptjson.JSONDecodeError:# Return None for corrupted JSON entries (already deleted)returnNonereturnNonereturnawaitasyncio.to_thread(_pop_item_sync)

clear_sessionasync

clear_session()->None

Clear all items for this session.

Source code insrc/agents/memory/sqlite_session.py
asyncdefclear_session(self)->None:"""Clear all items for this session."""def_clear_session_sync():conn=self._get_connection()withself._lockifself._is_memory_dbelsethreading.Lock():conn.execute(f"DELETE FROM{self.messages_table} WHERE session_id = ?",(self.session_id,),)conn.execute(f"DELETE FROM{self.sessions_table} WHERE session_id = ?",(self.session_id,),)conn.commit()awaitasyncio.to_thread(_clear_session_sync)

close

close()->None

Close the database connection.

Source code insrc/agents/memory/sqlite_session.py
defclose(self)->None:"""Close the database connection."""ifself._is_memory_db:ifhasattr(self,"_shared_connection"):self._shared_connection.close()else:ifhasattr(self._local,"connection"):self._local.connection.close()

OpenAIConversationsSession

Bases:SessionABC

Source code insrc/agents/memory/openai_conversations_session.py
classOpenAIConversationsSession(SessionABC):def__init__(self,*,conversation_id:str|None=None,openai_client:AsyncOpenAI|None=None,):self._session_id:str|None=conversation_id_openai_client=openai_clientif_openai_clientisNone:_openai_client=get_default_openai_client()orAsyncOpenAI()# this never be None hereself._openai_client:AsyncOpenAI=_openai_clientasyncdef_get_session_id(self)->str:ifself._session_idisNone:self._session_id=awaitstart_openai_conversations_session(self._openai_client)returnself._session_idasyncdef_clear_session_id(self)->None:self._session_id=Noneasyncdefget_items(self,limit:int|None=None)->list[TResponseInputItem]:session_id=awaitself._get_session_id()all_items=[]iflimitisNone:asyncforiteminself._openai_client.conversations.items.list(conversation_id=session_id,order="asc",):# calling model_dump() to make this serializableall_items.append(item.model_dump(exclude_unset=True))else:asyncforiteminself._openai_client.conversations.items.list(conversation_id=session_id,limit=limit,order="desc",):# calling model_dump() to make this serializableall_items.append(item.model_dump(exclude_unset=True))iflimitisnotNoneandlen(all_items)>=limit:breakall_items.reverse()returnall_items# type: ignoreasyncdefadd_items(self,items:list[TResponseInputItem])->None:session_id=awaitself._get_session_id()awaitself._openai_client.conversations.items.create(conversation_id=session_id,items=items,)asyncdefpop_item(self)->TResponseInputItem|None:session_id=awaitself._get_session_id()items=awaitself.get_items(limit=1)ifnotitems:returnNoneitem_id:str=str(items[0]["id"])# type: ignore [typeddict-item]awaitself._openai_client.conversations.items.delete(conversation_id=session_id,item_id=item_id)returnitems[0]asyncdefclear_session(self)->None:session_id=awaitself._get_session_id()awaitself._openai_client.conversations.delete(conversation_id=session_id,)awaitself._clear_session_id()

[8]ページ先頭

©2009-2025 Movatter.jp