Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Memory

Session

Bases:Protocol

Protocol for session implementations.

Session stores conversation history for a specific session, allowingagents to maintain context without requiring explicit manual memory management.

Source code insrc/agents/memory/session.py
@runtime_checkableclassSession(Protocol):"""Protocol for session implementations.    Session stores conversation history for a specific session, allowing    agents to maintain context without requiring explicit manual memory management.    """session_id:strsession_settings:SessionSettings|None=Noneasyncdefget_items(self,limit:int|None=None)->list[TResponseInputItem]:"""Retrieve the conversation history for this session.        Args:            limit: Maximum number of items to retrieve. If None, retrieves all items.                   When specified, returns the latest N items in chronological order.        Returns:            List of input items representing the conversation history        """...asyncdefadd_items(self,items:list[TResponseInputItem])->None:"""Add new items to the conversation history.        Args:            items: List of input items to add to the history        """...asyncdefpop_item(self)->TResponseInputItem|None:"""Remove and return the most recent item from the session.        Returns:            The most recent item if it exists, None if the session is empty        """...asyncdefclear_session(self)->None:"""Clear all items for this session."""...

get_itemsasync

get_items(limit:int|None=None,)->list[TResponseInputItem]

Retrieve the conversation history for this session.

Parameters:

NameTypeDescriptionDefault
limitint | None

Maximum number of items to retrieve. If None, retrieves all items. When specified, returns the latest N items in chronological order.

None

Returns:

TypeDescription
list[TResponseInputItem]

List of input items representing the conversation history

Source code insrc/agents/memory/session.py
asyncdefget_items(self,limit:int|None=None)->list[TResponseInputItem]:"""Retrieve the conversation history for this session.    Args:        limit: Maximum number of items to retrieve. If None, retrieves all items.               When specified, returns the latest N items in chronological order.    Returns:        List of input items representing the conversation history    """...

add_itemsasync

add_items(items:list[TResponseInputItem])->None

Add new items to the conversation history.

Parameters:

NameTypeDescriptionDefault
itemslist[TResponseInputItem]

List of input items to add to the history

required
Source code insrc/agents/memory/session.py
asyncdefadd_items(self,items:list[TResponseInputItem])->None:"""Add new items to the conversation history.    Args:        items: List of input items to add to the history    """...

pop_itemasync

pop_item()->TResponseInputItem|None

Remove and return the most recent item from the session.

Returns:

TypeDescription
TResponseInputItem | None

The most recent item if it exists, None if the session is empty

Source code insrc/agents/memory/session.py
asyncdefpop_item(self)->TResponseInputItem|None:"""Remove and return the most recent item from the session.    Returns:        The most recent item if it exists, None if the session is empty    """...

clear_sessionasync

clear_session()->None

Clear all items for this session.

Source code insrc/agents/memory/session.py
asyncdefclear_session(self)->None:"""Clear all items for this session."""...

SQLiteSession

Bases:SessionABC

SQLite-based implementation of session storage.

This implementation stores conversation history in a SQLite database.By default, uses an in-memory database that is lost when the process ends.For persistent storage, provide a file path.

Source code insrc/agents/memory/sqlite_session.py
 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281
classSQLiteSession(SessionABC):"""SQLite-based implementation of session storage.    This implementation stores conversation history in a SQLite database.    By default, uses an in-memory database that is lost when the process ends.    For persistent storage, provide a file path.    """def__init__(self,session_id:str,db_path:str|Path=":memory:",sessions_table:str="agent_sessions",messages_table:str="agent_messages",session_settings:SessionSettings|None=None,):"""Initialize the SQLite session.        Args:            session_id: Unique identifier for the conversation session            db_path: Path to the SQLite database file. Defaults to ':memory:' (in-memory database)            sessions_table: Name of the table to store session metadata. Defaults to                'agent_sessions'            messages_table: Name of the table to store message data. Defaults to 'agent_messages'            session_settings: Session configuration settings including default limit for                retrieving items. If None, uses default SessionSettings().        """self.session_id=session_idself.session_settings=session_settingsorSessionSettings()self.db_path=db_pathself.sessions_table=sessions_tableself.messages_table=messages_tableself._local=threading.local()self._lock=threading.Lock()# For in-memory databases, we need a shared connection to avoid thread isolation# For file databases, we use thread-local connections for better concurrencyself._is_memory_db=str(db_path)==":memory:"ifself._is_memory_db:self._shared_connection=sqlite3.connect(":memory:",check_same_thread=False)self._shared_connection.execute("PRAGMA journal_mode=WAL")self._init_db_for_connection(self._shared_connection)else:# For file databases, initialize the schema once since it persistsinit_conn=sqlite3.connect(str(self.db_path),check_same_thread=False)init_conn.execute("PRAGMA journal_mode=WAL")self._init_db_for_connection(init_conn)init_conn.close()def_get_connection(self)->sqlite3.Connection:"""Get a database connection."""ifself._is_memory_db:# Use shared connection for in-memory database to avoid thread isolationreturnself._shared_connectionelse:# Use thread-local connections for file databasesifnothasattr(self._local,"connection"):self._local.connection=sqlite3.connect(str(self.db_path),check_same_thread=False,)self._local.connection.execute("PRAGMA journal_mode=WAL")assertisinstance(self._local.connection,sqlite3.Connection),(f"Expected sqlite3.Connection, got{type(self._local.connection)}")returnself._local.connectiondef_init_db_for_connection(self,conn:sqlite3.Connection)->None:"""Initialize the database schema for a specific connection."""conn.execute(f"""            CREATE TABLE IF NOT EXISTS{self.sessions_table} (                session_id TEXT PRIMARY KEY,                created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,                updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP            )        """)conn.execute(f"""            CREATE TABLE IF NOT EXISTS{self.messages_table} (                id INTEGER PRIMARY KEY AUTOINCREMENT,                session_id TEXT NOT NULL,                message_data TEXT NOT NULL,                created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,                FOREIGN KEY (session_id) REFERENCES{self.sessions_table} (session_id)                    ON DELETE CASCADE            )        """)conn.execute(f"""            CREATE INDEX IF NOT EXISTS idx_{self.messages_table}_session_id            ON{self.messages_table} (session_id, id)        """)conn.commit()asyncdefget_items(self,limit:int|None=None)->list[TResponseInputItem]:"""Retrieve the conversation history for this session.        Args:            limit: Maximum number of items to retrieve. If None, uses session_settings.limit.                   When specified, returns the latest N items in chronological order.        Returns:            List of input items representing the conversation history        """session_limit=resolve_session_limit(limit,self.session_settings)def_get_items_sync():conn=self._get_connection()withself._lockifself._is_memory_dbelsethreading.Lock():ifsession_limitisNone:# Fetch all items in chronological ordercursor=conn.execute(f"""                        SELECT message_data FROM{self.messages_table}                        WHERE session_id = ?                        ORDER BY id ASC                    """,(self.session_id,),)else:# Fetch the latest N items in chronological ordercursor=conn.execute(f"""                        SELECT message_data FROM{self.messages_table}                        WHERE session_id = ?                        ORDER BY id DESC                        LIMIT ?                        """,(self.session_id,session_limit),)rows=cursor.fetchall()# Reverse to get chronological order when using DESCifsession_limitisnotNone:rows=list(reversed(rows))items=[]for(message_data,)inrows:try:item=json.loads(message_data)items.append(item)exceptjson.JSONDecodeError:# Skip invalid JSON entriescontinuereturnitemsreturnawaitasyncio.to_thread(_get_items_sync)asyncdefadd_items(self,items:list[TResponseInputItem])->None:"""Add new items to the conversation history.        Args:            items: List of input items to add to the history        """ifnotitems:returndef_add_items_sync():conn=self._get_connection()withself._lockifself._is_memory_dbelsethreading.Lock():# Ensure session existsconn.execute(f"""                    INSERT OR IGNORE INTO{self.sessions_table} (session_id) VALUES (?)                """,(self.session_id,),)# Add itemsmessage_data=[(self.session_id,json.dumps(item))foriteminitems]conn.executemany(f"""                    INSERT INTO{self.messages_table} (session_id, message_data) VALUES (?, ?)                """,message_data,)# Update session timestampconn.execute(f"""                    UPDATE{self.sessions_table}                    SET updated_at = CURRENT_TIMESTAMP                    WHERE session_id = ?                """,(self.session_id,),)conn.commit()awaitasyncio.to_thread(_add_items_sync)asyncdefpop_item(self)->TResponseInputItem|None:"""Remove and return the most recent item from the session.        Returns:            The most recent item if it exists, None if the session is empty        """def_pop_item_sync():conn=self._get_connection()withself._lockifself._is_memory_dbelsethreading.Lock():# Use DELETE with RETURNING to atomically delete and return the most recent itemcursor=conn.execute(f"""                    DELETE FROM{self.messages_table}                    WHERE id = (                        SELECT id FROM{self.messages_table}                        WHERE session_id = ?                        ORDER BY id DESC                        LIMIT 1                    )                    RETURNING message_data                    """,(self.session_id,),)result=cursor.fetchone()conn.commit()ifresult:message_data=result[0]try:item=json.loads(message_data)returnitemexceptjson.JSONDecodeError:# Return None for corrupted JSON entries (already deleted)returnNonereturnNonereturnawaitasyncio.to_thread(_pop_item_sync)asyncdefclear_session(self)->None:"""Clear all items for this session."""def_clear_session_sync():conn=self._get_connection()withself._lockifself._is_memory_dbelsethreading.Lock():conn.execute(f"DELETE FROM{self.messages_table} WHERE session_id = ?",(self.session_id,),)conn.execute(f"DELETE FROM{self.sessions_table} WHERE session_id = ?",(self.session_id,),)conn.commit()awaitasyncio.to_thread(_clear_session_sync)defclose(self)->None:"""Close the database connection."""ifself._is_memory_db:ifhasattr(self,"_shared_connection"):self._shared_connection.close()else:ifhasattr(self._local,"connection"):self._local.connection.close()

__init__

__init__(session_id:str,db_path:str|Path=":memory:",sessions_table:str="agent_sessions",messages_table:str="agent_messages",session_settings:SessionSettings|None=None,)

Initialize the SQLite session.

Parameters:

NameTypeDescriptionDefault
session_idstr

Unique identifier for the conversation session

required
db_pathstr |Path

Path to the SQLite database file. Defaults to ':memory:' (in-memory database)

':memory:'
sessions_tablestr

Name of the table to store session metadata. Defaults to'agent_sessions'

'agent_sessions'
messages_tablestr

Name of the table to store message data. Defaults to 'agent_messages'

'agent_messages'
session_settingsSessionSettings | None

Session configuration settings including default limit forretrieving items. If None, uses default SessionSettings().

None
Source code insrc/agents/memory/sqlite_session.py
def__init__(self,session_id:str,db_path:str|Path=":memory:",sessions_table:str="agent_sessions",messages_table:str="agent_messages",session_settings:SessionSettings|None=None,):"""Initialize the SQLite session.    Args:        session_id: Unique identifier for the conversation session        db_path: Path to the SQLite database file. Defaults to ':memory:' (in-memory database)        sessions_table: Name of the table to store session metadata. Defaults to            'agent_sessions'        messages_table: Name of the table to store message data. Defaults to 'agent_messages'        session_settings: Session configuration settings including default limit for            retrieving items. If None, uses default SessionSettings().    """self.session_id=session_idself.session_settings=session_settingsorSessionSettings()self.db_path=db_pathself.sessions_table=sessions_tableself.messages_table=messages_tableself._local=threading.local()self._lock=threading.Lock()# For in-memory databases, we need a shared connection to avoid thread isolation# For file databases, we use thread-local connections for better concurrencyself._is_memory_db=str(db_path)==":memory:"ifself._is_memory_db:self._shared_connection=sqlite3.connect(":memory:",check_same_thread=False)self._shared_connection.execute("PRAGMA journal_mode=WAL")self._init_db_for_connection(self._shared_connection)else:# For file databases, initialize the schema once since it persistsinit_conn=sqlite3.connect(str(self.db_path),check_same_thread=False)init_conn.execute("PRAGMA journal_mode=WAL")self._init_db_for_connection(init_conn)init_conn.close()

get_itemsasync

get_items(limit:int|None=None,)->list[TResponseInputItem]

Retrieve the conversation history for this session.

Parameters:

NameTypeDescriptionDefault
limitint | None

Maximum number of items to retrieve. If None, uses session_settings.limit. When specified, returns the latest N items in chronological order.

None

Returns:

TypeDescription
list[TResponseInputItem]

List of input items representing the conversation history

Source code insrc/agents/memory/sqlite_session.py
asyncdefget_items(self,limit:int|None=None)->list[TResponseInputItem]:"""Retrieve the conversation history for this session.    Args:        limit: Maximum number of items to retrieve. If None, uses session_settings.limit.               When specified, returns the latest N items in chronological order.    Returns:        List of input items representing the conversation history    """session_limit=resolve_session_limit(limit,self.session_settings)def_get_items_sync():conn=self._get_connection()withself._lockifself._is_memory_dbelsethreading.Lock():ifsession_limitisNone:# Fetch all items in chronological ordercursor=conn.execute(f"""                    SELECT message_data FROM{self.messages_table}                    WHERE session_id = ?                    ORDER BY id ASC                """,(self.session_id,),)else:# Fetch the latest N items in chronological ordercursor=conn.execute(f"""                    SELECT message_data FROM{self.messages_table}                    WHERE session_id = ?                    ORDER BY id DESC                    LIMIT ?                    """,(self.session_id,session_limit),)rows=cursor.fetchall()# Reverse to get chronological order when using DESCifsession_limitisnotNone:rows=list(reversed(rows))items=[]for(message_data,)inrows:try:item=json.loads(message_data)items.append(item)exceptjson.JSONDecodeError:# Skip invalid JSON entriescontinuereturnitemsreturnawaitasyncio.to_thread(_get_items_sync)

add_itemsasync

add_items(items:list[TResponseInputItem])->None

Add new items to the conversation history.

Parameters:

NameTypeDescriptionDefault
itemslist[TResponseInputItem]

List of input items to add to the history

required
Source code insrc/agents/memory/sqlite_session.py
asyncdefadd_items(self,items:list[TResponseInputItem])->None:"""Add new items to the conversation history.    Args:        items: List of input items to add to the history    """ifnotitems:returndef_add_items_sync():conn=self._get_connection()withself._lockifself._is_memory_dbelsethreading.Lock():# Ensure session existsconn.execute(f"""                INSERT OR IGNORE INTO{self.sessions_table} (session_id) VALUES (?)            """,(self.session_id,),)# Add itemsmessage_data=[(self.session_id,json.dumps(item))foriteminitems]conn.executemany(f"""                INSERT INTO{self.messages_table} (session_id, message_data) VALUES (?, ?)            """,message_data,)# Update session timestampconn.execute(f"""                UPDATE{self.sessions_table}                SET updated_at = CURRENT_TIMESTAMP                WHERE session_id = ?            """,(self.session_id,),)conn.commit()awaitasyncio.to_thread(_add_items_sync)

pop_itemasync

pop_item()->TResponseInputItem|None

Remove and return the most recent item from the session.

Returns:

TypeDescription
TResponseInputItem | None

The most recent item if it exists, None if the session is empty

Source code insrc/agents/memory/sqlite_session.py
asyncdefpop_item(self)->TResponseInputItem|None:"""Remove and return the most recent item from the session.    Returns:        The most recent item if it exists, None if the session is empty    """def_pop_item_sync():conn=self._get_connection()withself._lockifself._is_memory_dbelsethreading.Lock():# Use DELETE with RETURNING to atomically delete and return the most recent itemcursor=conn.execute(f"""                DELETE FROM{self.messages_table}                WHERE id = (                    SELECT id FROM{self.messages_table}                    WHERE session_id = ?                    ORDER BY id DESC                    LIMIT 1                )                RETURNING message_data                """,(self.session_id,),)result=cursor.fetchone()conn.commit()ifresult:message_data=result[0]try:item=json.loads(message_data)returnitemexceptjson.JSONDecodeError:# Return None for corrupted JSON entries (already deleted)returnNonereturnNonereturnawaitasyncio.to_thread(_pop_item_sync)

clear_sessionasync

clear_session()->None

Clear all items for this session.

Source code insrc/agents/memory/sqlite_session.py
asyncdefclear_session(self)->None:"""Clear all items for this session."""def_clear_session_sync():conn=self._get_connection()withself._lockifself._is_memory_dbelsethreading.Lock():conn.execute(f"DELETE FROM{self.messages_table} WHERE session_id = ?",(self.session_id,),)conn.execute(f"DELETE FROM{self.sessions_table} WHERE session_id = ?",(self.session_id,),)conn.commit()awaitasyncio.to_thread(_clear_session_sync)

close

close()->None

Close the database connection.

Source code insrc/agents/memory/sqlite_session.py
defclose(self)->None:"""Close the database connection."""ifself._is_memory_db:ifhasattr(self,"_shared_connection"):self._shared_connection.close()else:ifhasattr(self._local,"connection"):self._local.connection.close()

OpenAIConversationsSession

Bases:SessionABC

Source code insrc/agents/memory/openai_conversations_session.py
classOpenAIConversationsSession(SessionABC):def__init__(self,*,conversation_id:str|None=None,openai_client:AsyncOpenAI|None=None,session_settings:SessionSettings|None=None,):self._session_id:str|None=conversation_idself.session_settings=session_settingsorSessionSettings()_openai_client=openai_clientif_openai_clientisNone:_openai_client=get_default_openai_client()orAsyncOpenAI()# this never be None hereself._openai_client:AsyncOpenAI=_openai_client@propertydefsession_id(self)->str:"""Get the session ID (conversation ID).        Returns:            The conversation ID for this session.        Raises:            ValueError: If the session has not been initialized yet.                Call any session method (get_items, add_items, etc.) first                to trigger lazy initialization.        """ifself._session_idisNone:raiseValueError("Session ID not yet available. The session is lazily initialized ""on first API call. Call get_items(), add_items(), or similar first.")returnself._session_id@session_id.setterdefsession_id(self,value:str)->None:"""Set the session ID (conversation ID)."""self._session_id=valueasyncdef_get_session_id(self)->str:ifself._session_idisNone:self._session_id=awaitstart_openai_conversations_session(self._openai_client)returnself._session_idasyncdef_clear_session_id(self)->None:self._session_id=Noneasyncdefget_items(self,limit:int|None=None)->list[TResponseInputItem]:session_id=awaitself._get_session_id()session_limit=resolve_session_limit(limit,self.session_settings)all_items=[]ifsession_limitisNone:asyncforiteminself._openai_client.conversations.items.list(conversation_id=session_id,order="asc",):# calling model_dump() to make this serializableall_items.append(item.model_dump(exclude_unset=True))else:asyncforiteminself._openai_client.conversations.items.list(conversation_id=session_id,limit=session_limit,order="desc",):# calling model_dump() to make this serializableall_items.append(item.model_dump(exclude_unset=True))ifsession_limitisnotNoneandlen(all_items)>=session_limit:breakall_items.reverse()returnall_items# type: ignoreasyncdefadd_items(self,items:list[TResponseInputItem])->None:session_id=awaitself._get_session_id()ifnotitems:returnawaitself._openai_client.conversations.items.create(conversation_id=session_id,items=items,)asyncdefpop_item(self)->TResponseInputItem|None:session_id=awaitself._get_session_id()items=awaitself.get_items(limit=1)ifnotitems:returnNoneitem_id:str=str(items[0]["id"])# type: ignore [typeddict-item]awaitself._openai_client.conversations.items.delete(conversation_id=session_id,item_id=item_id)returnitems[0]asyncdefclear_session(self)->None:session_id=awaitself._get_session_id()awaitself._openai_client.conversations.delete(conversation_id=session_id,)awaitself._clear_session_id()

session_idpropertywritable

session_id:str

Get the session ID (conversation ID).

Returns:

TypeDescription
str

The conversation ID for this session.

Raises:

TypeDescription
ValueError

If the session has not been initialized yet.Call any session method (get_items, add_items, etc.) firstto trigger lazy initialization.


[8]ページ先頭

©2009-2026 Movatter.jp