Graph Definitions¶
StateGraph¶
Bases:Generic[StateT,InputT,OutputT]
A graph whose nodes communicate by reading and writing to a shared state.The signature of each node is State -> Partial
Each state key can optionally be annotated with a reducer function thatwill be used to aggregate the values of that key received from multiple nodes.The signature of a reducer function is (Value, Value) -> Value.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
state_schema | type[StateT] | The schema class that defines the state. | required |
config_schema | type[Any] | None | The schema class that defines the configuration.Use this to expose configurable parameters in your API. | None |
Example
fromlangchain_core.runnablesimportRunnableConfigfromtyping_extensionsimportAnnotated,TypedDictfromlanggraph.checkpoint.memoryimportMemorySaverfromlanggraph.graphimportStateGraphdefreducer(a:list,b:int|None)->list:ifbisnotNone:returna+[b]returnaclassState(TypedDict):x:Annotated[list,reducer]classConfigSchema(TypedDict):r:floatgraph=StateGraph(State,config_schema=ConfigSchema)defnode(state:State,config:RunnableConfig)->dict:r=config["configurable"].get("r",1.0)x=state["x"][-1]next_value=x*r*(1-x)return{"x":next_value}graph.add_node("A",node)graph.set_entry_point("A")graph.set_finish_point("A")compiled=graph.compile()print(compiled.config_specs)# [ConfigurableFieldSpec(id='r', annotation=<class 'float'>, name=None, description=None, default=None, is_shared=False, dependencies=None)]step1=compiled.invoke({"x":0.5},{"configurable":{"r":3.0}})# {'x': [0.5, 0.75]}
Methods:
Name | Description |
---|---|
add_node | Add a new node to the state graph. |
add_edge | Add a directed edge from the start node (or list of start nodes) to the end node. |
add_conditional_edges | Add a conditional edge from the starting node to any number of destination nodes. |
add_sequence | Add a sequence of nodes that will be executed in the provided order. |
compile | Compiles the state graph into a |
add_node¶
add_node(node:str|StateNode[StateT],action:StateNode[StateT]|None=None,*,defer:bool=False,metadata:dict[str,Any]|None=None,input_schema:type[Any]|None=None,retry_policy:(RetryPolicy|Sequence[RetryPolicy]|None)=None,cache_policy:CachePolicy|None=None,destinations:(dict[str,str]|tuple[str,...]|None)=None,**kwargs:Unpack[DeprecatedKwargs])->Self
Add a new node to the state graph.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
node | str |StateNode[StateT] | The function or runnable this node will run.If a string is provided, it will be used as the node name, and action will be used as the function or runnable. | required |
action | StateNode[StateT] | None | The action associated with the node. (default: None)Will be used as the node function or runnable if | None |
defer | bool | Whether to defer the execution of the node until the run is about to end. | False |
metadata | dict[str,Any] | None | The metadata associated with the node. (default: None) | None |
input_schema | type[Any] | None | The input schema for the node. (default: the graph's state schema) | None |
retry_policy | RetryPolicy |Sequence[RetryPolicy] | None | The retry policy for the node. (default: None)If a sequence is provided, the first matching policy will be applied. | None |
cache_policy | CachePolicy | None | The cache policy for the node. (default: None) | None |
destinations | dict[str,str] |tuple[str, ...] | None | Destinations that indicate where a node can route to.This is useful for edgeless graphs with nodes that return | None |
Example
fromtyping_extensionsimportTypedDictfromlangchain_core.runnablesimportRunnableConfigfromlanggraph.graphimportSTART,StateGraphclassState(TypedDict):x:intdefmy_node(state:State,config:RunnableConfig)->State:return{"x":state["x"]+1}builder=StateGraph(State)builder.add_node(my_node)# node name will be 'my_node'builder.add_edge(START,"my_node")graph=builder.compile()graph.invoke({"x":1})# {'x': 2}
Customize the name:
Returns:
Name | Type | Description |
---|---|---|
Self | Self | The instance of the state graph, allowing for method chaining. |
add_edge¶
Add a directed edge from the start node (or list of start nodes) to the end node.
When a single start node is provided, the graph will wait for that node to completebefore executing the end node. When multiple start nodes are provided,the graph will wait for ALL of the start nodes to complete before executing the end node.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
start_key | str |list[str] | The key(s) of the start node(s) of the edge. | required |
end_key | str | The key of the end node of the edge. | required |
Raises:
Type | Description |
---|---|
ValueError | If the start key is 'END' or if the start key or end key is not present in the graph. |
Returns:
Name | Type | Description |
---|---|---|
Self | Self | The instance of the state graph, allowing for method chaining. |
add_conditional_edges¶
add_conditional_edges(source:str,path:(Callable[...,Hashable|list[Hashable]]|Callable[...,Awaitable[Hashable|list[Hashable]]]|Runnable[Any,Hashable|list[Hashable]]),path_map:dict[Hashable,str]|list[str]|None=None,)->Self
Add a conditional edge from the starting node to any number of destination nodes.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
source | str | The starting node. This conditional edge will run whenexiting this node. | required |
path | Callable[...,Hashable |list[Hashable]] |Callable[...,Awaitable[Hashable |list[Hashable]]] |Runnable[Any,Hashable |list[Hashable]] | The callable that determines the nextnode or nodes. If not specifying | required |
path_map | dict[Hashable,str] |list[str] | None | Optional mapping of paths to nodenames. If omitted the paths returned by | None |
Returns:
Name | Type | Description |
---|---|---|
Self | Self | The instance of the graph, allowing for method chaining. |
Without typehints on thepath
function's return value (e.g.,-> Literal["foo", "__end__"]:
)
or a path_map, the graph visualization assumes the edge could transition to any node in the graph.
add_sequence¶
Add a sequence of nodes that will be executed in the provided order.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
nodes | Sequence[StateNode[StateT] |tuple[str,StateNode[StateT]]] | A sequence of StateNodes (callables that accept a state arg) or (name, StateNode) tuples.If no names are provided, the name will be inferred from the node object (e.g. a runnable or a callable name).Each node will be executed in the order provided. | required |
Raises:
Type | Description |
---|---|
ValueError | if the sequence is empty. |
ValueError | if the sequence contains duplicate node names. |
Returns:
Name | Type | Description |
---|---|---|
Self | Self | The instance of the state graph, allowing for method chaining. |
compile¶
compile(checkpointer:Checkpointer=None,*,cache:BaseCache|None=None,store:BaseStore|None=None,interrupt_before:All|list[str]|None=None,interrupt_after:All|list[str]|None=None,debug:bool=False,name:str|None=None)->CompiledStateGraph[StateT,InputT,OutputT]
Compiles the state graph into aCompiledStateGraph
object.
The compiled graph implements theRunnable
interface and can be invoked,streamed, batched, and run asynchronously.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
checkpointer | Checkpointer | A checkpoint saver object or flag.If provided, this Checkpointer serves as a fully versioned "short-term memory" for the graph,allowing it to be paused, resumed, and replayed from any point.If None, it may inherit the parent graph's checkpointer when used as a subgraph.If False, it will not use or inherit any checkpointer. | None |
interrupt_before | All |list[str] | None | An optional list of node names to interrupt before. | None |
interrupt_after | All |list[str] | None | An optional list of node names to interrupt after. | None |
debug | bool | A flag indicating whether to enable debug mode. | False |
name | str | None | The name to use for the compiled graph. | None |
Returns:
Name | Type | Description |
---|---|---|
CompiledStateGraph | CompiledStateGraph[StateT,InputT,OutputT] | The compiled state graph. |
CompiledStateGraph¶
Bases:Pregel[StateT,InputT,OutputT]
,Generic[StateT,InputT,OutputT]
Methods:
Name | Description |
---|---|
stream | Stream graph steps for a single input. |
astream | Asynchronously stream graph steps for a single input. |
invoke | Run the graph with a single input and config. |
ainvoke | Asynchronously invoke the graph on a single input. |
get_state | Get the current state of the graph. |
aget_state | Get the current state of the graph. |
get_state_history | Get the history of the state of the graph. |
aget_state_history | Asynchronously get the history of the state of the graph. |
update_state | Update the state of the graph with the given values, as if they came from |
aupdate_state | Asynchronously update the state of the graph with the given values, as if they came from |
bulk_update_state | Apply updates to the graph state in bulk. Requires a checkpointer to be set. |
abulk_update_state | Asynchronously apply updates to the graph state in bulk. Requires a checkpointer to be set. |
get_graph | Return a drawable representation of the computation graph. |
aget_graph | Return a drawable representation of the computation graph. |
get_subgraphs | Get the subgraphs of the graph. |
aget_subgraphs | Get the subgraphs of the graph. |
with_config | Create a copy of the Pregel object with an updated config. |
stream¶
stream(input:InputT|Command|None,config:RunnableConfig|None=None,*,stream_mode:(StreamMode|Sequence[StreamMode]|None)=None,print_mode:StreamMode|Sequence[StreamMode]=(),output_keys:str|Sequence[str]|None=None,interrupt_before:All|Sequence[str]|None=None,interrupt_after:All|Sequence[str]|None=None,checkpoint_during:bool|None=None,debug:bool|None=None,subgraphs:bool=False)->Iterator[dict[str,Any]|Any]
Stream graph steps for a single input.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
input | InputT |Command | None | The input to the graph. | required |
config | RunnableConfig | None | The configuration to use for the run. | None |
stream_mode | StreamMode |Sequence[StreamMode] | None | The mode to stream output, defaults to
You can pass a list as the SeeLangGraph streaming guide for more details. | None |
print_mode | StreamMode |Sequence[StreamMode] | Accepts the same values as | () |
output_keys | str |Sequence[str] | None | The keys to stream, defaults to all non-context channels. | None |
interrupt_before | All |Sequence[str] | None | Nodes to interrupt before, defaults to all nodes in the graph. | None |
interrupt_after | All |Sequence[str] | None | Nodes to interrupt after, defaults to all nodes in the graph. | None |
checkpoint_during | bool | None | Whether to checkpoint intermediate steps, defaults to False. If False, only the final checkpoint is saved. | None |
subgraphs | bool | Whether to stream events from inside subgraphs, defaults to False.If True, the events will be emitted as tuples SeeLangGraph streaming guide for more details. | False |
Yields:
Type | Description |
---|---|
dict[str,Any] |Any | The output of each step in the graph. The output shape depends on the stream_mode. |
astreamasync
¶
astream(input:InputT|Command|None,config:RunnableConfig|None=None,*,stream_mode:(StreamMode|Sequence[StreamMode]|None)=None,print_mode:StreamMode|Sequence[StreamMode]=(),output_keys:str|Sequence[str]|None=None,interrupt_before:All|Sequence[str]|None=None,interrupt_after:All|Sequence[str]|None=None,checkpoint_during:bool|None=None,debug:bool|None=None,subgraphs:bool=False)->AsyncIterator[dict[str,Any]|Any]
Asynchronously stream graph steps for a single input.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
input | InputT |Command | None | The input to the graph. | required |
config | RunnableConfig | None | The configuration to use for the run. | None |
stream_mode | StreamMode |Sequence[StreamMode] | None | The mode to stream output, defaults to
You can pass a list as the SeeLangGraph streaming guide for more details. | None |
print_mode | StreamMode |Sequence[StreamMode] | Accepts the same values as | () |
output_keys | str |Sequence[str] | None | The keys to stream, defaults to all non-context channels. | None |
interrupt_before | All |Sequence[str] | None | Nodes to interrupt before, defaults to all nodes in the graph. | None |
interrupt_after | All |Sequence[str] | None | Nodes to interrupt after, defaults to all nodes in the graph. | None |
checkpoint_during | bool | None | Whether to checkpoint intermediate steps, defaults to False. If False, only the final checkpoint is saved. | None |
subgraphs | bool | Whether to stream events from inside subgraphs, defaults to False.If True, the events will be emitted as tuples SeeLangGraph streaming guide for more details. | False |
Yields:
Type | Description |
---|---|
AsyncIterator[dict[str,Any] |Any] | The output of each step in the graph. The output shape depends on the stream_mode. |
invoke¶
invoke(input:InputT|Command|None,config:RunnableConfig|None=None,*,stream_mode:StreamMode="values",print_mode:StreamMode|Sequence[StreamMode]=(),output_keys:str|Sequence[str]|None=None,interrupt_before:All|Sequence[str]|None=None,interrupt_after:All|Sequence[str]|None=None,**kwargs:Any)->dict[str,Any]|Any
Run the graph with a single input and config.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
input | InputT |Command | None | The input data for the graph. It can be a dictionary or any other type. | required |
config | RunnableConfig | None | Optional. The configuration for the graph run. | None |
stream_mode | StreamMode | Optional[str]. The stream mode for the graph run. Default is "values". | 'values' |
print_mode | StreamMode |Sequence[StreamMode] | Accepts the same values as | () |
output_keys | str |Sequence[str] | None | Optional. The output keys to retrieve from the graph run. | None |
interrupt_before | All |Sequence[str] | None | Optional. The nodes to interrupt the graph run before. | None |
interrupt_after | All |Sequence[str] | None | Optional. The nodes to interrupt the graph run after. | None |
**kwargs | Any | Additional keyword arguments to pass to the graph run. | {} |
Returns:
Type | Description |
---|---|
dict[str,Any] |Any | The output of the graph run. If stream_mode is "values", it returns the latest output. |
dict[str,Any] |Any | If stream_mode is not "values", it returns a list of output chunks. |
ainvokeasync
¶
ainvoke(input:InputT|Command|None,config:RunnableConfig|None=None,*,stream_mode:StreamMode="values",print_mode:StreamMode|Sequence[StreamMode]=(),output_keys:str|Sequence[str]|None=None,interrupt_before:All|Sequence[str]|None=None,interrupt_after:All|Sequence[str]|None=None,**kwargs:Any)->dict[str,Any]|Any
Asynchronously invoke the graph on a single input.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
input | InputT |Command | None | The input data for the computation. It can be a dictionary or any other type. | required |
config | RunnableConfig | None | Optional. The configuration for the computation. | None |
stream_mode | StreamMode | Optional. The stream mode for the computation. Default is "values". | 'values' |
print_mode | StreamMode |Sequence[StreamMode] | Accepts the same values as | () |
output_keys | str |Sequence[str] | None | Optional. The output keys to include in the result. Default is None. | None |
interrupt_before | All |Sequence[str] | None | Optional. The nodes to interrupt before. Default is None. | None |
interrupt_after | All |Sequence[str] | None | Optional. The nodes to interrupt after. Default is None. | None |
**kwargs | Any | Additional keyword arguments. | {} |
Returns:
Type | Description |
---|---|
dict[str,Any] |Any | The result of the computation. If stream_mode is "values", it returns the latest value. |
dict[str,Any] |Any | If stream_mode is "chunks", it returns a list of chunks. |
get_state¶
get_state(config:RunnableConfig,*,subgraphs:bool=False)->StateSnapshot
Get the current state of the graph.
aget_stateasync
¶
aget_state(config:RunnableConfig,*,subgraphs:bool=False)->StateSnapshot
Get the current state of the graph.
get_state_history¶
get_state_history(config:RunnableConfig,*,filter:dict[str,Any]|None=None,before:RunnableConfig|None=None,limit:int|None=None)->Iterator[StateSnapshot]
Get the history of the state of the graph.
aget_state_historyasync
¶
aget_state_history(config:RunnableConfig,*,filter:dict[str,Any]|None=None,before:RunnableConfig|None=None,limit:int|None=None)->AsyncIterator[StateSnapshot]
Asynchronously get the history of the state of the graph.
update_state¶
update_state(config:RunnableConfig,values:dict[str,Any]|Any|None,as_node:str|None=None,task_id:str|None=None,)->RunnableConfig
Update the state of the graph with the given values, as if they came fromnodeas_node
. Ifas_node
is not provided, it will be set to the last nodethat updated the state, if not ambiguous.
aupdate_stateasync
¶
aupdate_state(config:RunnableConfig,values:dict[str,Any]|Any,as_node:str|None=None,task_id:str|None=None,)->RunnableConfig
Asynchronously update the state of the graph with the given values, as if they came fromnodeas_node
. Ifas_node
is not provided, it will be set to the last nodethat updated the state, if not ambiguous.
bulk_update_state¶
bulk_update_state(config:RunnableConfig,supersteps:Sequence[Sequence[StateUpdate]],)->RunnableConfig
Apply updates to the graph state in bulk. Requires a checkpointer to be set.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
config | RunnableConfig | The config to apply the updates to. | required |
supersteps | Sequence[Sequence[StateUpdate]] | A list of supersteps, each including a list of updates to apply sequentially to a graph state. Each update is a tuple of the form | required |
Raises:
Type | Description |
---|---|
ValueError | If no checkpointer is set or no updates are provided. |
InvalidUpdateError | If an invalid update is provided. |
Returns:
Name | Type | Description |
---|---|---|
RunnableConfig | RunnableConfig | The updated config. |
abulk_update_stateasync
¶
abulk_update_state(config:RunnableConfig,supersteps:Sequence[Sequence[StateUpdate]],)->RunnableConfig
Asynchronously apply updates to the graph state in bulk. Requires a checkpointer to be set.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
config | RunnableConfig | The config to apply the updates to. | required |
supersteps | Sequence[Sequence[StateUpdate]] | A list of supersteps, each including a list of updates to apply sequentially to a graph state. Each update is a tuple of the form | required |
Raises:
Type | Description |
---|---|
ValueError | If no checkpointer is set or no updates are provided. |
InvalidUpdateError | If an invalid update is provided. |
Returns:
Name | Type | Description |
---|---|---|
RunnableConfig | RunnableConfig | The updated config. |
get_graph¶
get_graph(config:RunnableConfig|None=None,*,xray:int|bool=False)->Graph
Return a drawable representation of the computation graph.
aget_graphasync
¶
aget_graph(config:RunnableConfig|None=None,*,xray:int|bool=False)->Graph
Return a drawable representation of the computation graph.
get_subgraphs¶
Get the subgraphs of the graph.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
namespace | str | None | The namespace to filter the subgraphs by. | None |
recurse | bool | Whether to recurse into the subgraphs.If False, only the immediate subgraphs will be returned. | False |
Returns:
Type | Description |
---|---|
Iterator[tuple[str,PregelProtocol]] | Iterator[tuple[str, PregelProtocol]]: An iterator of the (namespace, subgraph) pairs. |
aget_subgraphsasync
¶
aget_subgraphs(*,namespace:str|None=None,recurse:bool=False)->AsyncIterator[tuple[str,PregelProtocol]]
Get the subgraphs of the graph.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
namespace | str | None | The namespace to filter the subgraphs by. | None |
recurse | bool | Whether to recurse into the subgraphs.If False, only the immediate subgraphs will be returned. | False |
Returns:
Type | Description |
---|---|
AsyncIterator[tuple[str,PregelProtocol]] | AsyncIterator[tuple[str, PregelProtocol]]: An iterator of the (namespace, subgraph) pairs. |
with_config¶
with_config(config:RunnableConfig|None=None,**kwargs:Any)->Self
Create a copy of the Pregel object with an updated config.
Functions:
Name | Description |
---|---|
add_messages | Merges two lists of messages, updating existing messages by ID. |
add_messages¶
add_messages(left:Messages,right:Messages,*,format:Literal["langchain-openai"]|None=None)->Messages
Merges two lists of messages, updating existing messages by ID.
By default, this ensures the state is "append-only", unless thenew message has the same ID as an existing message.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
left | Messages | The base list of messages. | required |
right | Messages | The list of messages (or single message) to mergeinto the base list. | required |
format | Literal['langchain-openai'] | None | The format to return messages in. If None then messages will bereturned as is. If 'langchain-openai' then messages will be returned asBaseMessage objects with their contents formatted to match OpenAI messageformat, meaning contents can be string, 'text' blocks, or 'image_url' blocksand tool responses are returned as their own ToolMessages. Requirement Must have | None |
Returns:
Type | Description |
---|---|
Messages | A new list of messages with the messages from |
Messages | If a message in |
Messages | message from |
Example
fromlangchain_core.messagesimportAIMessage,HumanMessagemsgs1=[HumanMessage(content="Hello",id="1")]msgs2=[AIMessage(content="Hi there!",id="2")]add_messages(msgs1,msgs2)# [HumanMessage(content='Hello', id='1'), AIMessage(content='Hi there!', id='2')]
msgs1=[HumanMessage(content="Hello",id="1")]msgs2=[HumanMessage(content="Hello again",id="1")]add_messages(msgs1,msgs2)# [HumanMessage(content='Hello again', id='1')]
fromtypingimportAnnotatedfromtyping_extensionsimportTypedDictfromlanggraph.graphimportStateGraphclassState(TypedDict):messages:Annotated[list,add_messages]builder=StateGraph(State)builder.add_node("chatbot",lambdastate:{"messages":[("assistant","Hello")]})builder.set_entry_point("chatbot")builder.set_finish_point("chatbot")graph=builder.compile()graph.invoke({})# {'messages': [AIMessage(content='Hello', id=...)]}
fromtypingimportAnnotatedfromtyping_extensionsimportTypedDictfromlanggraph.graphimportStateGraph,add_messagesclassState(TypedDict):messages:Annotated[list,add_messages(format='langchain-openai')]defchatbot_node(state:State)->list:return{"messages":[{"role":"user","content":[{"type":"text","text":"Here's an image:","cache_control":{"type":"ephemeral"},},{"type":"image","source":{"type":"base64","media_type":"image/jpeg","data":"1234",},},]},]}builder=StateGraph(State)builder.add_node("chatbot",chatbot_node)builder.set_entry_point("chatbot")builder.set_finish_point("chatbot")graph=builder.compile()graph.invoke({"messages":[]})# {# 'messages': [# HumanMessage(# content=[# {"type": "text", "text": "Here's an image:"},# {# "type": "image_url",# "image_url": {"url": "data:image/jpeg;base64,1234"},# },# ],# ),# ]# }