Movatterモバイル変換


[0]ホーム

URL:


Skip to content

pydantic_graph.persistence

SnapshotStatusmodule-attribute

SnapshotStatus=Literal["created","pending","running","success","error"]

The status of a snapshot.

  • 'created': The snapshot has been created but not yet run.
  • 'pending': The snapshot has been retrieved withload_next but not yet run.
  • 'running': The snapshot is currently running.
  • 'success': The snapshot has been run successfully.
  • 'error': The snapshot has been run but an error occurred.

NodeSnapshotdataclass

Bases:Generic[StateT,RunEndT]

History step describing the execution of a node in a graph.

Source code inpydantic_graph/pydantic_graph/persistence/__init__.py
4445464748495051525354555657585960616263646566
@dataclassclassNodeSnapshot(Generic[StateT,RunEndT]):"""History step describing the execution of a node in a graph."""state:StateT"""The state of the graph before the node is run."""node:Annotated[BaseNode[StateT,Any,RunEndT],_utils.CustomNodeSchema()]"""The node to run next."""start_ts:datetime|None=None"""The timestamp when the node started running, `None` until the run starts."""duration:float|None=None"""The duration of the node run in seconds, if the node has been run."""status:SnapshotStatus='created'"""The status of the snapshot."""kind:Literal['node']='node'"""The kind of history step, can be used as a discriminator when deserializing history."""id:str=UNSET_SNAPSHOT_ID"""Unique ID of the snapshot."""def__post_init__(self)->None:ifself.id==UNSET_SNAPSHOT_ID:self.id=self.node.get_snapshot_id()

stateinstance-attribute

state:StateT

The state of the graph before the node is run.

nodeinstance-attribute

node:Annotated[BaseNode[StateT,Any,RunEndT],CustomNodeSchema()]

The node to run next.

start_tsclass-attributeinstance-attribute

start_ts:datetime|None=None

The timestamp when the node started running,None until the run starts.

durationclass-attributeinstance-attribute

duration:float|None=None

The duration of the node run in seconds, if the node has been run.

statusclass-attributeinstance-attribute

status:SnapshotStatus='created'

The status of the snapshot.

kindclass-attributeinstance-attribute

kind:Literal['node']='node'

The kind of history step, can be used as a discriminator when deserializing history.

idclass-attributeinstance-attribute

id:str=UNSET_SNAPSHOT_ID

Unique ID of the snapshot.

EndSnapshotdataclass

Bases:Generic[StateT,RunEndT]

History step describing the end of a graph run.

Source code inpydantic_graph/pydantic_graph/persistence/__init__.py
697071727374757677787980818283848586878889909192939495
@dataclassclassEndSnapshot(Generic[StateT,RunEndT]):"""History step describing the end of a graph run."""state:StateT"""The state of the graph at the end of the run."""result:End[RunEndT]"""The result of the graph run."""ts:datetime=field(default_factory=_utils.now_utc)"""The timestamp when the graph run ended."""kind:Literal['end']='end'"""The kind of history step, can be used as a discriminator when deserializing history."""id:str=UNSET_SNAPSHOT_ID"""Unique ID of the snapshot."""def__post_init__(self)->None:ifself.id==UNSET_SNAPSHOT_ID:self.id=self.node.get_snapshot_id()@propertydefnode(self)->End[RunEndT]:"""Shim to get the [`result`][pydantic_graph.persistence.EndSnapshot.result].        Useful to allow `[snapshot.node for snapshot in persistence.history]`.        """returnself.result

stateinstance-attribute

state:StateT

The state of the graph at the end of the run.

resultinstance-attribute

result:End[RunEndT]

The result of the graph run.

tsclass-attributeinstance-attribute

ts:datetime=field(default_factory=now_utc)

The timestamp when the graph run ended.

kindclass-attributeinstance-attribute

kind:Literal['end']='end'

The kind of history step, can be used as a discriminator when deserializing history.

idclass-attributeinstance-attribute

id:str=UNSET_SNAPSHOT_ID

Unique ID of the snapshot.

nodeproperty

node:End[RunEndT]

Shim to get theresult.

Useful to allow[snapshot.node for snapshot in persistence.history].

Snapshotmodule-attribute

Snapshot=Union[NodeSnapshot[StateT,RunEndT],EndSnapshot[StateT,RunEndT],]

A step in the history of a graph run.

Graph.run returns a list of these steps describing the execution of the graph,together with the run return value.

BaseStatePersistence

Bases:ABC,Generic[StateT,RunEndT]

Abstract base class for storing the state of a graph run.

Each instance of aBaseStatePersistence subclass should be used for a single graph run.

Source code inpydantic_graph/pydantic_graph/persistence/__init__.py
106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225
classBaseStatePersistence(ABC,Generic[StateT,RunEndT]):"""Abstract base class for storing the state of a graph run.    Each instance of a `BaseStatePersistence` subclass should be used for a single graph run.    """@abstractmethodasyncdefsnapshot_node(self,state:StateT,next_node:BaseNode[StateT,Any,RunEndT])->None:"""Snapshot the state of a graph, when the next step is to run a node.        This method should add a [`NodeSnapshot`][pydantic_graph.persistence.NodeSnapshot] to persistence.        Args:            state: The state of the graph.            next_node: The next node to run.        """raiseNotImplementedError@abstractmethodasyncdefsnapshot_node_if_new(self,snapshot_id:str,state:StateT,next_node:BaseNode[StateT,Any,RunEndT])->None:"""Snapshot the state of a graph if the snapshot ID doesn't already exist in persistence.        This method will generally call [`snapshot_node`][pydantic_graph.persistence.BaseStatePersistence.snapshot_node]        but should do so in an atomic way.        Args:            snapshot_id: The ID of the snapshot to check.            state: The state of the graph.            next_node: The next node to run.        """raiseNotImplementedError@abstractmethodasyncdefsnapshot_end(self,state:StateT,end:End[RunEndT])->None:"""Snapshot the state of a graph when the graph has ended.        This method should add an [`EndSnapshot`][pydantic_graph.persistence.EndSnapshot] to persistence.        Args:            state: The state of the graph.            end: data from the end of the run.        """raiseNotImplementedError@abstractmethoddefrecord_run(self,snapshot_id:str)->AbstractAsyncContextManager[None]:"""Record the run of the node, or error if the node is already running.        Args:            snapshot_id: The ID of the snapshot to record.        Raises:            GraphNodeRunningError: if the node status it not `'created'` or `'pending'`.            LookupError: if the snapshot ID is not found in persistence.        Returns:            An async context manager that records the run of the node.        In particular this should set:        - [`NodeSnapshot.status`][pydantic_graph.persistence.NodeSnapshot.status] to `'running'` and          [`NodeSnapshot.start_ts`][pydantic_graph.persistence.NodeSnapshot.start_ts] when the run starts.        - [`NodeSnapshot.status`][pydantic_graph.persistence.NodeSnapshot.status] to `'success'` or `'error'` and          [`NodeSnapshot.duration`][pydantic_graph.persistence.NodeSnapshot.duration] when the run finishes.        """raiseNotImplementedError@abstractmethodasyncdefload_next(self)->NodeSnapshot[StateT,RunEndT]|None:"""Retrieve a node snapshot with status `'created`' and set its status to `'pending'`.        This is used by [`Graph.iter_from_persistence`][pydantic_graph.graph.Graph.iter_from_persistence]        to get the next node to run.        Returns: The snapshot, or `None` if no snapshot with status `'created`' exists.        """raiseNotImplementedError@abstractmethodasyncdefload_all(self)->list[Snapshot[StateT,RunEndT]]:"""Load the entire history of snapshots.        `load_all` is not used by pydantic-graph itself, instead it's provided to make it convenient to        get all [snapshots][pydantic_graph.persistence.Snapshot] from persistence.        Returns: The list of snapshots.        """raiseNotImplementedErrordefset_graph_types(self,graph:Graph[StateT,Any,RunEndT])->None:"""Set the types of the state and run end from a graph.        You generally won't need to customise this method, instead implement        [`set_types`][pydantic_graph.persistence.BaseStatePersistence.set_types] and        [`should_set_types`][pydantic_graph.persistence.BaseStatePersistence.should_set_types].        """ifself.should_set_types():with_utils.set_nodes_type_context(graph.get_nodes()):self.set_types(*graph.inferred_types)defshould_set_types(self)->bool:"""Whether types need to be set.        Implementations should override this method to return `True` when types have not been set if they are needed.        """returnFalsedefset_types(self,state_type:type[StateT],run_end_type:type[RunEndT])->None:"""Set the types of the state and run end.        This can be used to create [type adapters][pydantic.TypeAdapter] for serializing and deserializing snapshots,        e.g. with [`build_snapshot_list_type_adapter`][pydantic_graph.persistence.build_snapshot_list_type_adapter].        Args:            state_type: The state type.            run_end_type: The run end type.        """pass

snapshot_nodeabstractmethodasync

snapshot_node(state:StateT,next_node:BaseNode[StateT,Any,RunEndT])->None

Snapshot the state of a graph, when the next step is to run a node.

This method should add aNodeSnapshot to persistence.

Parameters:

NameTypeDescriptionDefault
stateStateT

The state of the graph.

required
next_nodeBaseNode[StateT,Any,RunEndT]

The next node to run.

required
Source code inpydantic_graph/pydantic_graph/persistence/__init__.py
112113114115116117118119120121122
@abstractmethodasyncdefsnapshot_node(self,state:StateT,next_node:BaseNode[StateT,Any,RunEndT])->None:"""Snapshot the state of a graph, when the next step is to run a node.    This method should add a [`NodeSnapshot`][pydantic_graph.persistence.NodeSnapshot] to persistence.    Args:        state: The state of the graph.        next_node: The next node to run.    """raiseNotImplementedError

snapshot_node_if_newabstractmethodasync

snapshot_node_if_new(snapshot_id:str,state:StateT,next_node:BaseNode[StateT,Any,RunEndT],)->None

Snapshot the state of a graph if the snapshot ID doesn't already exist in persistence.

This method will generally callsnapshot_nodebut should do so in an atomic way.

Parameters:

NameTypeDescriptionDefault
snapshot_idstr

The ID of the snapshot to check.

required
stateStateT

The state of the graph.

required
next_nodeBaseNode[StateT,Any,RunEndT]

The next node to run.

required
Source code inpydantic_graph/pydantic_graph/persistence/__init__.py
124125126127128129130131132133134135136137138
@abstractmethodasyncdefsnapshot_node_if_new(self,snapshot_id:str,state:StateT,next_node:BaseNode[StateT,Any,RunEndT])->None:"""Snapshot the state of a graph if the snapshot ID doesn't already exist in persistence.    This method will generally call [`snapshot_node`][pydantic_graph.persistence.BaseStatePersistence.snapshot_node]    but should do so in an atomic way.    Args:        snapshot_id: The ID of the snapshot to check.        state: The state of the graph.        next_node: The next node to run.    """raiseNotImplementedError

snapshot_endabstractmethodasync

snapshot_end(state:StateT,end:End[RunEndT])->None

Snapshot the state of a graph when the graph has ended.

This method should add anEndSnapshot to persistence.

Parameters:

NameTypeDescriptionDefault
stateStateT

The state of the graph.

required
endEnd[RunEndT]

data from the end of the run.

required
Source code inpydantic_graph/pydantic_graph/persistence/__init__.py
140141142143144145146147148149150
@abstractmethodasyncdefsnapshot_end(self,state:StateT,end:End[RunEndT])->None:"""Snapshot the state of a graph when the graph has ended.    This method should add an [`EndSnapshot`][pydantic_graph.persistence.EndSnapshot] to persistence.    Args:        state: The state of the graph.        end: data from the end of the run.    """raiseNotImplementedError

record_runabstractmethod

record_run(snapshot_id:str,)->AbstractAsyncContextManager[None]

Record the run of the node, or error if the node is already running.

Parameters:

NameTypeDescriptionDefault
snapshot_idstr

The ID of the snapshot to record.

required

Raises:

TypeDescription
GraphNodeRunningError

if the node status it not'created' or'pending'.

LookupError

if the snapshot ID is not found in persistence.

Returns:

TypeDescription
AbstractAsyncContextManager[None]

An async context manager that records the run of the node.

In particular this should set:

Source code inpydantic_graph/pydantic_graph/persistence/__init__.py
152153154155156157158159160161162163164165166167168169170171172173
@abstractmethoddefrecord_run(self,snapshot_id:str)->AbstractAsyncContextManager[None]:"""Record the run of the node, or error if the node is already running.    Args:        snapshot_id: The ID of the snapshot to record.    Raises:        GraphNodeRunningError: if the node status it not `'created'` or `'pending'`.        LookupError: if the snapshot ID is not found in persistence.    Returns:        An async context manager that records the run of the node.    In particular this should set:    - [`NodeSnapshot.status`][pydantic_graph.persistence.NodeSnapshot.status] to `'running'` and      [`NodeSnapshot.start_ts`][pydantic_graph.persistence.NodeSnapshot.start_ts] when the run starts.    - [`NodeSnapshot.status`][pydantic_graph.persistence.NodeSnapshot.status] to `'success'` or `'error'` and      [`NodeSnapshot.duration`][pydantic_graph.persistence.NodeSnapshot.duration] when the run finishes.    """raiseNotImplementedError

load_nextabstractmethodasync

load_next()->NodeSnapshot[StateT,RunEndT]|None

Retrieve a node snapshot with status'created' and set its status to'pending'.

This is used byGraph.iter_from_persistenceto get the next node to run.

Returns: The snapshot, orNone if no snapshot with status'created' exists.

Source code inpydantic_graph/pydantic_graph/persistence/__init__.py
175176177178179180181182183184
@abstractmethodasyncdefload_next(self)->NodeSnapshot[StateT,RunEndT]|None:"""Retrieve a node snapshot with status `'created`' and set its status to `'pending'`.    This is used by [`Graph.iter_from_persistence`][pydantic_graph.graph.Graph.iter_from_persistence]    to get the next node to run.    Returns: The snapshot, or `None` if no snapshot with status `'created`' exists.    """raiseNotImplementedError

load_allabstractmethodasync

load_all()->list[Snapshot[StateT,RunEndT]]

Load the entire history of snapshots.

load_all is not used by pydantic-graph itself, instead it's provided to make it convenient toget allsnapshots from persistence.

Returns: The list of snapshots.

Source code inpydantic_graph/pydantic_graph/persistence/__init__.py
186187188189190191192193194195
@abstractmethodasyncdefload_all(self)->list[Snapshot[StateT,RunEndT]]:"""Load the entire history of snapshots.    `load_all` is not used by pydantic-graph itself, instead it's provided to make it convenient to    get all [snapshots][pydantic_graph.persistence.Snapshot] from persistence.    Returns: The list of snapshots.    """raiseNotImplementedError

set_graph_types

set_graph_types(graph:Graph[StateT,Any,RunEndT])->None

Set the types of the state and run end from a graph.

You generally won't need to customise this method, instead implementset_types andshould_set_types.

Source code inpydantic_graph/pydantic_graph/persistence/__init__.py
197198199200201202203204205206
defset_graph_types(self,graph:Graph[StateT,Any,RunEndT])->None:"""Set the types of the state and run end from a graph.    You generally won't need to customise this method, instead implement    [`set_types`][pydantic_graph.persistence.BaseStatePersistence.set_types] and    [`should_set_types`][pydantic_graph.persistence.BaseStatePersistence.should_set_types].    """ifself.should_set_types():with_utils.set_nodes_type_context(graph.get_nodes()):self.set_types(*graph.inferred_types)

should_set_types

should_set_types()->bool

Whether types need to be set.

Implementations should override this method to returnTrue when types have not been set if they are needed.

Source code inpydantic_graph/pydantic_graph/persistence/__init__.py
208209210211212213
defshould_set_types(self)->bool:"""Whether types need to be set.    Implementations should override this method to return `True` when types have not been set if they are needed.    """returnFalse

set_types

set_types(state_type:type[StateT],run_end_type:type[RunEndT])->None

Set the types of the state and run end.

This can be used to createtype adapters for serializing and deserializing snapshots,e.g. withbuild_snapshot_list_type_adapter.

Parameters:

NameTypeDescriptionDefault
state_typetype[StateT]

The state type.

required
run_end_typetype[RunEndT]

The run end type.

required
Source code inpydantic_graph/pydantic_graph/persistence/__init__.py
215216217218219220221222223224225
defset_types(self,state_type:type[StateT],run_end_type:type[RunEndT])->None:"""Set the types of the state and run end.    This can be used to create [type adapters][pydantic.TypeAdapter] for serializing and deserializing snapshots,    e.g. with [`build_snapshot_list_type_adapter`][pydantic_graph.persistence.build_snapshot_list_type_adapter].    Args:        state_type: The state type.        run_end_type: The run end type.    """pass

build_snapshot_list_type_adapter

build_snapshot_list_type_adapter(state_t:type[StateT],run_end_t:type[RunEndT])->TypeAdapter[list[Snapshot[StateT,RunEndT]]]

Build a type adapter for a list of snapshots.

This method should be called from withinset_typeswhere context variables will be set such that Pydantic can create a schema forNodeSnapshot.node.

Source code inpydantic_graph/pydantic_graph/persistence/__init__.py
228229230231232233234235236237238
defbuild_snapshot_list_type_adapter(state_t:type[StateT],run_end_t:type[RunEndT])->pydantic.TypeAdapter[list[Snapshot[StateT,RunEndT]]]:"""Build a type adapter for a list of snapshots.    This method should be called from within    [`set_types`][pydantic_graph.persistence.BaseStatePersistence.set_types]    where context variables will be set such that Pydantic can create a schema for    [`NodeSnapshot.node`][pydantic_graph.persistence.NodeSnapshot.node].    """returnpydantic.TypeAdapter(list[Annotated[Snapshot[state_t,run_end_t],pydantic.Discriminator('kind')]])

In memory state persistence.

This module provides simple in memory state persistence for graphs.

SimpleStatePersistencedataclass

Bases:BaseStatePersistence[StateT,RunEndT]

Simple in memory state persistence that just hold the latest snapshot.

If no state persistence implementation is provided when running a graph, this is used by default.

Source code inpydantic_graph/pydantic_graph/persistence/in_mem.py
31323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182
@dataclassclassSimpleStatePersistence(BaseStatePersistence[StateT,RunEndT]):"""Simple in memory state persistence that just hold the latest snapshot.    If no state persistence implementation is provided when running a graph, this is used by default.    """last_snapshot:Snapshot[StateT,RunEndT]|None=None"""The last snapshot."""asyncdefsnapshot_node(self,state:StateT,next_node:BaseNode[StateT,Any,RunEndT])->None:self.last_snapshot=NodeSnapshot(state=state,node=next_node)asyncdefsnapshot_node_if_new(self,snapshot_id:str,state:StateT,next_node:BaseNode[StateT,Any,RunEndT])->None:ifself.last_snapshotandself.last_snapshot.id==snapshot_id:return# pragma: no coverelse:awaitself.snapshot_node(state,next_node)asyncdefsnapshot_end(self,state:StateT,end:End[RunEndT])->None:self.last_snapshot=EndSnapshot(state=state,result=end)@asynccontextmanagerasyncdefrecord_run(self,snapshot_id:str)->AsyncIterator[None]:ifself.last_snapshotisNoneorsnapshot_id!=self.last_snapshot.id:raiseLookupError(f'No snapshot found with id={snapshot_id!r}')assertisinstance(self.last_snapshot,NodeSnapshot),'Only NodeSnapshot can be recorded'exceptions.GraphNodeStatusError.check(self.last_snapshot.status)self.last_snapshot.status='running'self.last_snapshot.start_ts=_utils.now_utc()start=perf_counter()try:yieldexceptException:self.last_snapshot.duration=perf_counter()-startself.last_snapshot.status='error'raiseelse:self.last_snapshot.duration=perf_counter()-startself.last_snapshot.status='success'asyncdefload_next(self)->NodeSnapshot[StateT,RunEndT]|None:ifisinstance(self.last_snapshot,NodeSnapshot)andself.last_snapshot.status=='created':self.last_snapshot.status='pending'returnself.last_snapshotasyncdefload_all(self)->list[Snapshot[StateT,RunEndT]]:raiseNotImplementedError('load is not supported for SimpleStatePersistence')

last_snapshotclass-attributeinstance-attribute

last_snapshot:Snapshot[StateT,RunEndT]|None=None

The last snapshot.

FullStatePersistencedataclass

Bases:BaseStatePersistence[StateT,RunEndT]

In memory state persistence that hold a list of snapshots.

Source code inpydantic_graph/pydantic_graph/persistence/in_mem.py
 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172
@dataclassclassFullStatePersistence(BaseStatePersistence[StateT,RunEndT]):"""In memory state persistence that hold a list of snapshots."""deep_copy:bool=True"""Whether to deep copy the state and nodes when storing them.    Defaults to `True` so even if nodes or state are modified after the snapshot is taken,    the persistence history will record the value at the time of the snapshot.    """history:list[Snapshot[StateT,RunEndT]]=field(default_factory=list)"""List of snapshots taken during the graph run."""_snapshots_type_adapter:pydantic.TypeAdapter[list[Snapshot[StateT,RunEndT]]]|None=field(default=None,init=False,repr=False)asyncdefsnapshot_node(self,state:StateT,next_node:BaseNode[StateT,Any,RunEndT])->None:snapshot=NodeSnapshot(state=self._prep_state(state),node=next_node.deep_copy()ifself.deep_copyelsenext_node,)self.history.append(snapshot)asyncdefsnapshot_node_if_new(self,snapshot_id:str,state:StateT,next_node:BaseNode[StateT,Any,RunEndT])->None:ifnotany(s.id==snapshot_idforsinself.history):awaitself.snapshot_node(state,next_node)asyncdefsnapshot_end(self,state:StateT,end:End[RunEndT])->None:snapshot=EndSnapshot(state=self._prep_state(state),result=end.deep_copy_data()ifself.deep_copyelseend,)self.history.append(snapshot)@asynccontextmanagerasyncdefrecord_run(self,snapshot_id:str)->AsyncIterator[None]:try:snapshot=next(sforsinself.historyifs.id==snapshot_id)exceptStopIterationase:raiseLookupError(f'No snapshot found with id={snapshot_id!r}')fromeassertisinstance(snapshot,NodeSnapshot),'Only NodeSnapshot can be recorded'exceptions.GraphNodeStatusError.check(snapshot.status)snapshot.status='running'snapshot.start_ts=_utils.now_utc()start=perf_counter()try:yieldexceptException:snapshot.duration=perf_counter()-startsnapshot.status='error'raiseelse:snapshot.duration=perf_counter()-startsnapshot.status='success'asyncdefload_next(self)->NodeSnapshot[StateT,RunEndT]|None:ifsnapshot:=next((sforsinself.historyifisinstance(s,NodeSnapshot)ands.status=='created'),None):snapshot.status='pending'returnsnapshotasyncdefload_all(self)->list[Snapshot[StateT,RunEndT]]:returnself.historydefshould_set_types(self)->bool:returnself._snapshots_type_adapterisNonedefset_types(self,state_type:type[StateT],run_end_type:type[RunEndT])->None:self._snapshots_type_adapter=build_snapshot_list_type_adapter(state_type,run_end_type)defdump_json(self,*,indent:int|None=None)->bytes:"""Dump the history to JSON bytes."""assertself._snapshots_type_adapterisnotNone,'type adapter must be set to use `dump_json`'returnself._snapshots_type_adapter.dump_json(self.history,indent=indent)defload_json(self,json_data:str|bytes|bytearray)->None:"""Load the history from JSON."""assertself._snapshots_type_adapterisnotNone,'type adapter must be set to use `load_json`'self.history=self._snapshots_type_adapter.validate_json(json_data)def_prep_state(self,state:StateT)->StateT:"""Prepare state for snapshot, uses [`copy.deepcopy`][copy.deepcopy] by default."""ifnotself.deep_copyorstateisNone:returnstateelse:returncopy.deepcopy(state)

deep_copyclass-attributeinstance-attribute

deep_copy:bool=True

Whether to deep copy the state and nodes when storing them.

Defaults toTrue so even if nodes or state are modified after the snapshot is taken,the persistence history will record the value at the time of the snapshot.

historyclass-attributeinstance-attribute

history:list[Snapshot[StateT,RunEndT]]=field(default_factory=list)

List of snapshots taken during the graph run.

dump_json

dump_json(*,indent:int|None=None)->bytes

Dump the history to JSON bytes.

Source code inpydantic_graph/pydantic_graph/persistence/in_mem.py
157158159160
defdump_json(self,*,indent:int|None=None)->bytes:"""Dump the history to JSON bytes."""assertself._snapshots_type_adapterisnotNone,'type adapter must be set to use `dump_json`'returnself._snapshots_type_adapter.dump_json(self.history,indent=indent)

load_json

load_json(json_data:str|bytes|bytearray)->None

Load the history from JSON.

Source code inpydantic_graph/pydantic_graph/persistence/in_mem.py
162163164165
defload_json(self,json_data:str|bytes|bytearray)->None:"""Load the history from JSON."""assertself._snapshots_type_adapterisnotNone,'type adapter must be set to use `load_json`'self.history=self._snapshots_type_adapter.validate_json(json_data)

FileStatePersistencedataclass

Bases:BaseStatePersistence[StateT,RunEndT]

File based state persistence that hold graph run state in a JSON file.

Source code inpydantic_graph/pydantic_graph/persistence/file.py
 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162
@dataclassclassFileStatePersistence(BaseStatePersistence[StateT,RunEndT]):"""File based state persistence that hold graph run state in a JSON file."""json_file:Path"""Path to the JSON file where the snapshots are stored.    You should use a different file for each graph run, but a single file should be reused for multiple    steps of the same run.    For example if you have a run ID of the form `run_123abc`, you might create a `FileStatePersistence` thus:    ```py    from pathlib import Path    from pydantic_graph import FullStatePersistence    run_id = 'run_123abc'    persistence = FullStatePersistence(Path('runs') / f'{run_id}.json')    ```    """_snapshots_type_adapter:pydantic.TypeAdapter[list[Snapshot[StateT,RunEndT]]]|None=field(default=None,init=False,repr=False)asyncdefsnapshot_node(self,state:StateT,next_node:BaseNode[StateT,Any,RunEndT])->None:awaitself._append_save(NodeSnapshot(state=state,node=next_node))asyncdefsnapshot_node_if_new(self,snapshot_id:str,state:StateT,next_node:BaseNode[StateT,Any,RunEndT])->None:asyncwithself._lock():snapshots=awaitself.load_all()ifnotany(s.id==snapshot_idforsinsnapshots):# pragma: no branchawaitself._append_save(NodeSnapshot(state=state,node=next_node),lock=False)asyncdefsnapshot_end(self,state:StateT,end:End[RunEndT])->None:awaitself._append_save(EndSnapshot(state=state,result=end))@asynccontextmanagerasyncdefrecord_run(self,snapshot_id:str)->AsyncIterator[None]:asyncwithself._lock():snapshots=awaitself.load_all()try:snapshot=next(sforsinsnapshotsifs.id==snapshot_id)exceptStopIterationase:raiseLookupError(f'No snapshot found with id={snapshot_id!r}')fromeassertisinstance(snapshot,NodeSnapshot),'Only NodeSnapshot can be recorded'exceptions.GraphNodeStatusError.check(snapshot.status)snapshot.status='running'snapshot.start_ts=_utils.now_utc()awaitself._save(snapshots)start=perf_counter()try:yieldexceptException:duration=perf_counter()-startasyncwithself._lock():await_graph_utils.run_in_executor(self._after_run_sync,snapshot_id,duration,'error')raiseelse:snapshot.duration=perf_counter()-startasyncwithself._lock():await_graph_utils.run_in_executor(self._after_run_sync,snapshot_id,snapshot.duration,'success')asyncdefload_next(self)->NodeSnapshot[StateT,RunEndT]|None:asyncwithself._lock():snapshots=awaitself.load_all()ifsnapshot:=next((sforsinsnapshotsifisinstance(s,NodeSnapshot)ands.status=='created'),None):snapshot.status='pending'awaitself._save(snapshots)returnsnapshotdefshould_set_types(self)->bool:"""Whether types need to be set."""returnself._snapshots_type_adapterisNonedefset_types(self,state_type:type[StateT],run_end_type:type[RunEndT])->None:self._snapshots_type_adapter=build_snapshot_list_type_adapter(state_type,run_end_type)asyncdefload_all(self)->list[Snapshot[StateT,RunEndT]]:returnawait_graph_utils.run_in_executor(self._load_sync)def_load_sync(self)->list[Snapshot[StateT,RunEndT]]:assertself._snapshots_type_adapterisnotNone,'snapshots type adapter must be set'try:content=self.json_file.read_bytes()exceptFileNotFoundError:return[]else:returnself._snapshots_type_adapter.validate_json(content)def_after_run_sync(self,snapshot_id:str,duration:float,status:SnapshotStatus)->None:snapshots=self._load_sync()snapshot=next(sforsinsnapshotsifs.id==snapshot_id)assertisinstance(snapshot,NodeSnapshot),'Only NodeSnapshot can be recorded'snapshot.duration=durationsnapshot.status=statusself._save_sync(snapshots)asyncdef_save(self,snapshots:list[Snapshot[StateT,RunEndT]])->None:await_graph_utils.run_in_executor(self._save_sync,snapshots)def_save_sync(self,snapshots:list[Snapshot[StateT,RunEndT]])->None:assertself._snapshots_type_adapterisnotNone,'snapshots type adapter must be set'self.json_file.write_bytes(self._snapshots_type_adapter.dump_json(snapshots,indent=2))asyncdef_append_save(self,snapshot:Snapshot[StateT,RunEndT],*,lock:bool=True)->None:assertself._snapshots_type_adapterisnotNone,'snapshots type adapter must be set'asyncwithAsyncExitStack()asstack:iflock:awaitstack.enter_async_context(self._lock())snapshots=awaitself.load_all()snapshots.append(snapshot)awaitself._save(snapshots)@asynccontextmanagerasyncdef_lock(self,*,timeout:float=1.0)->AsyncIterator[None]:"""Lock a file by checking and writing a `.pydantic-graph-persistence-lock` to it.        Args:            timeout: how long to wait for the lock        Returns: an async context manager that holds the lock        """lock_file=self.json_file.parent/f'{self.json_file.name}.pydantic-graph-persistence-lock'lock_id=secrets.token_urlsafe().encode()awaitasyncio.wait_for(_get_lock(lock_file,lock_id),timeout=timeout)try:yieldfinally:await_graph_utils.run_in_executor(lock_file.unlink,missing_ok=True)

json_fileinstance-attribute

json_file:Path

Path to the JSON file where the snapshots are stored.

You should use a different file for each graph run, but a single file should be reused for multiplesteps of the same run.

For example if you have a run ID of the formrun_123abc, you might create aFileStatePersistence thus:

frompathlibimportPathfrompydantic_graphimportFullStatePersistencerun_id='run_123abc'persistence=FullStatePersistence(Path('runs')/f'{run_id}.json')

should_set_types

should_set_types()->bool

Whether types need to be set.

Source code inpydantic_graph/pydantic_graph/persistence/file.py
104105106
defshould_set_types(self)->bool:"""Whether types need to be set."""returnself._snapshots_type_adapterisNone

[8]ページ先頭

©2009-2025 Movatter.jp