Movatterモバイル変換


[0]ホーム

URL:


Skip to main content
OurBuilding Ambient Agents with LangGraph course is now available on LangChain Academy!
Open In ColabOpen on GitHub

How to run custom functions

Prerequisites

This guide assumes familiarity with the following concepts:

You can use arbitrary functions asRunnables. This is useful for formatting or when you need functionality not provided by other LangChain components, and custom functions used as Runnables are calledRunnableLambdas.

Note that all inputs to these functions need to be a SINGLE argument. If you have a function that accepts multiple arguments, you should write a wrapper that accepts a single dict input and unpacks it into multiple arguments.

This guide will cover:

  • How to explicitly create a runnable from a custom function using theRunnableLambda constructor and the convenience@chain decorator
  • Coercion of custom functions into runnables when used in chains
  • How to accept and use run metadata in your custom function
  • How to stream with custom functions by having them return generators

Using the constructor

Below, we explicitly wrap our custom logic using theRunnableLambda constructor:

%pip install-qU langchain langchain_openai

import os
from getpassimport getpass

if"OPENAI_API_KEY"notin os.environ:
os.environ["OPENAI_API_KEY"]= getpass()
from operatorimport itemgetter

from langchain_core.promptsimport ChatPromptTemplate
from langchain_core.runnablesimport RunnableLambda
from langchain_openaiimport ChatOpenAI


deflength_function(text):
returnlen(text)


def_multiple_length_function(text1, text2):
returnlen(text1)*len(text2)


defmultiple_length_function(_dict):
return _multiple_length_function(_dict["text1"], _dict["text2"])


model= ChatOpenAI()

prompt= ChatPromptTemplate.from_template("what is {a} + {b}")

chain=(
{
"a": itemgetter("foo")| RunnableLambda(length_function),
"b":{"text1": itemgetter("foo"),"text2": itemgetter("bar")}
| RunnableLambda(multiple_length_function),
}
| prompt
| model
)

chain.invoke({"foo":"bar","bar":"gah"})
AIMessage(content='3 + 9 equals 12.', response_metadata={'token_usage': {'completion_tokens': 8, 'prompt_tokens': 14, 'total_tokens': 22}, 'model_name': 'gpt-3.5-turbo', 'system_fingerprint': 'fp_c2295e73ad', 'finish_reason': 'stop', 'logprobs': None}, id='run-73728de3-e483-49e3-ad54-51bd9570e71a-0')

The convenience@chain decorator

You can also turn an arbitrary function into a chain by adding a@chain decorator. This is functionally equivalent to wrapping the function in aRunnableLambda constructor as shown above. Here's an example:

from langchain_core.output_parsersimport StrOutputParser
from langchain_core.runnablesimport chain

prompt1= ChatPromptTemplate.from_template("Tell me a joke about {topic}")
prompt2= ChatPromptTemplate.from_template("What is the subject of this joke: {joke}")


@chain
defcustom_chain(text):
prompt_val1= prompt1.invoke({"topic": text})
output1= ChatOpenAI().invoke(prompt_val1)
parsed_output1= StrOutputParser().invoke(output1)
chain2= prompt2| ChatOpenAI()| StrOutputParser()
return chain2.invoke({"joke": parsed_output1})


custom_chain.invoke("bears")
API Reference:StrOutputParser |chain
'The subject of the joke is the bear and his girlfriend.'

Above, the@chain decorator is used to convertcustom_chain into a runnable, which we invoke with the.invoke() method.

If you are using a tracing withLangSmith, you should see acustom_chain trace in there, with the calls to OpenAI nested underneath.

Automatic coercion in chains

When using custom functions in chains with the pipe operator (|), you can omit theRunnableLambda or@chain constructor and rely on coercion. Here's a simple example with a function that takes the output from the model and returns the first five letters of it:

prompt= ChatPromptTemplate.from_template("tell me a story about {topic}")

model= ChatOpenAI()

chain_with_coerced_function= prompt| model|(lambda x: x.content[:5])

chain_with_coerced_function.invoke({"topic":"bears"})
'Once '

Note that we didn't need to wrap the custom function(lambda x: x.content[:5]) in aRunnableLambda constructor because themodel on the left of the pipe operator is already a Runnable. The custom function iscoerced into a runnable. Seethis section for more information.

Passing run metadata

Runnable lambdas can optionally accept aRunnableConfig parameter, which they can use to pass callbacks, tags, and other configuration information to nested runs.

import json

from langchain_core.runnablesimport RunnableConfig


defparse_or_fix(text:str, config: RunnableConfig):
fixing_chain=(
ChatPromptTemplate.from_template(
"Fix the following text:\n\n\`\`\`text\n{input}\n\`\`\`\nError: {error}"
" Don't narrate, just respond with the fixed data."
)
| model
| StrOutputParser()
)
for _inrange(3):
try:
return json.loads(text)
except Exceptionas e:
text= fixing_chain.invoke({"input": text,"error": e}, config)
return"Failed to parse"


from langchain_community.callbacksimport get_openai_callback

with get_openai_callback()as cb:
output= RunnableLambda(parse_or_fix).invoke(
"{foo: bar}",{"tags":["my-tag"],"callbacks":[cb]}
)
print(output)
print(cb)
{'foo': 'bar'}
Tokens Used: 62
Prompt Tokens: 56
Completion Tokens: 6
Successful Requests: 1
Total Cost (USD): $9.6e-05
from langchain_community.callbacksimport get_openai_callback

with get_openai_callback()as cb:
output= RunnableLambda(parse_or_fix).invoke(
"{foo: bar}",{"tags":["my-tag"],"callbacks":[cb]}
)
print(output)
print(cb)
API Reference:get_openai_callback
{'foo': 'bar'}
Tokens Used: 62
Prompt Tokens: 56
Completion Tokens: 6
Successful Requests: 1
Total Cost (USD): $9.6e-05

Streaming

note

RunnableLambda is best suited for code that does not need to support streaming. If you need to support streaming (i.e., be able to operate on chunks of inputs and yield chunks of outputs), useRunnableGenerator instead as in the example below.

You can use generator functions (ie. functions that use theyield keyword, and behave like iterators) in a chain.

The signature of these generators should beIterator[Input] -> Iterator[Output]. Or for async generators:AsyncIterator[Input] -> AsyncIterator[Output].

These are useful for:

  • implementing a custom output parser
  • modifying the output of a previous step, while preserving streaming capabilities

Here's an example of a custom output parser for comma-separated lists. First, we create a chain that generates such a list as text:

from typingimport Iterator, List

prompt= ChatPromptTemplate.from_template(
"Write a comma-separated list of 5 animals similar to: {animal}. Do not include numbers"
)

str_chain= prompt| model| StrOutputParser()

for chunkin str_chain.stream({"animal":"bear"}):
print(chunk, end="", flush=True)
lion, tiger, wolf, gorilla, panda

Next, we define a custom function that will aggregate the currently streamed output and yield it when the model generates the next comma in the list:

# This is a custom parser that splits an iterator of llm tokens
# into a list of strings separated by commas
defsplit_into_list(input: Iterator[str])-> Iterator[List[str]]:
# hold partial input until we get a comma
buffer=""
for chunkininput:
# add current chunk to buffer
buffer+= chunk
# while there are commas in the buffer
while","inbuffer:
# split buffer on comma
comma_index=buffer.index(",")
# yield everything before the comma
yield[buffer[:comma_index].strip()]
# save the rest for the next iteration
buffer=buffer[comma_index+1:]
# yield the last chunk
yield[buffer.strip()]


list_chain= str_chain| split_into_list

for chunkin list_chain.stream({"animal":"bear"}):
print(chunk, flush=True)
['lion']
['tiger']
['wolf']
['gorilla']
['raccoon']

Invoking it gives a full array of values:

list_chain.invoke({"animal":"bear"})
['lion', 'tiger', 'wolf', 'gorilla', 'raccoon']

Async version

If you are working in anasync environment, here is anasync version of the above example:

from typingimport AsyncIterator


asyncdefasplit_into_list(
input: AsyncIterator[str],
)-> AsyncIterator[List[str]]:# async def
buffer=""
asyncfor(
chunk
)ininput:# `input` is a `async_generator` object, so use `async for`
buffer+= chunk
while","inbuffer:
comma_index=buffer.index(",")
yield[buffer[:comma_index].strip()]
buffer=buffer[comma_index+1:]
yield[buffer.strip()]


list_chain= str_chain| asplit_into_list

asyncfor chunkin list_chain.astream({"animal":"bear"}):
print(chunk, flush=True)
['lion']
['tiger']
['wolf']
['gorilla']
['panda']
await list_chain.ainvoke({"animal":"bear"})
['lion', 'tiger', 'wolf', 'gorilla', 'panda']

Next steps

Now you've learned a few different ways to use custom logic within your chains, and how to implement streaming.

To learn more, see the other how-to guides on runnables in this section.


[8]ページ先頭

©2009-2025 Movatter.jp