Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@ import { Steps, Tabs, Callout } from "nextra/components";

# Setup Arcade with LangChain

LangChain is a popular agentic framework that abstracts a lot of the complexity of building AI agents. It is built on top of LangGraph, a lower level orchestration framework that offers more control over the inner flow of the agent.
Learn how to integrate Arcade tools using LangChain primitives.

LangChain is a popular agentic framework that abstracts a lot of the complexity of building AI agents. LangGraph builds on top of LangGraph, a lower level orchestration framework that offers more control over the inner flow of the agent.

<GuideOverview>
<GuideOverview.Outcomes>
Expand Down Expand Up @@ -35,11 +37,11 @@ Learn how to integrate Arcade tools using LangChain primitives

## LangChain primitives you will use in this guide

LangChain provides multiple abstractions for building AI agents, and it's useful to internalize how some of these primitives work, so you can understand and extend the different agentic patterns LangChain supports.
LangChain offers multiple abstractions for building AI agents, and it's useful to internalize how some of these primitives work, so you can understand and extend the different agentic patterns LangChain supports.

- [_Agents_](https://docs.langchain.com/oss/python/langchain/agents): Most agentic frameworks, including LangChain, provide an abstraction for a ReAct agent.
- [_Interrupts_](https://docs.langchain.com/oss/python/langgraph/interrupts): Interrupts in LangChain are a way to control the flow of the agentic loop when something needs to be done outside of the normal ReAct flow. For example, if a tool requires authorization, you can interrupt the agent and ask the user to authorize the tool before continuing.
- [_Checkpointers_](https://docs.langchain.com/oss/python/langgraph/persistence): Checkpointers are how LangChain implements persistence. A checkpointer stores the agent's state in a "checkpoint" that you can resume later. You save those checkpoints to a _thread_, which you can access after the agent's execution, making it simple for long-running agents and for handling interruptions and more sophisticated flows such as branching, time travel, and more.
- [_Agents_](https://docs.langchain.com/oss/python/langchain/agents): Most agentic frameworks, including LangChain, offer an abstraction for a ReAct agent.
- [_Interrupts_](https://docs.langchain.com/oss/python/langgraph/interrupts): Interrupts in LangChain are a way to control the flow of the agentic loop when you need to do something outside of the normal ReAct flow. For example, if a tool requires authorization, you can interrupt the agent and ask the user to authorize the tool before continuing.
- [_Checkpointers_](https://docs.langchain.com/oss/python/langgraph/persistence): Checkpointers are how LangChain implements persistence. A checkpointer stores the agent's state in a "checkpoint" that you can resume later. You save those checkpoints to a _thread_, which you can access after the agent's execution, making it straightforward for long-running agents and for handling interruptions and more sophisticated flows such as branching, time travel, and more.

## Integrate Arcade tools into a LangChain agent

Expand Down Expand Up @@ -85,7 +87,7 @@ from langgraph.types import Command, interrupt
from pydantic import BaseModel, Field, create_model
```

This is quite a number of imports, let's break them down:
This is a significant number of imports, so you can break them down:

- Arcade imports:
- `AsyncArcade`: The Arcade client, interacts with the Arcade API.
Expand All @@ -102,9 +104,9 @@ This is quite a number of imports, let's break them down:
- `interrupt`: Interrupts the ReAct flow and asks the user for input.
- Other imports:
- `load_dotenv`: Loads the environment variables from the `.env` file.
- `os`: The operating system module, used to interact with the operating system.
- `typing` imports: Used for type hints, which are not required but recommended for type safety.
- `pydantic` imports: Used for data validation and model creation when converting Arcade tools to LangChain tools.
- `os`: The operating system module, you use to interact with the operating system.
- `typing` imports: These help with type hints, which are not required but recommended for type safety.
- `pydantic` imports: You use these for data validation and model creation when converting Arcade tools to LangChain tools.

### Configure the agent

Expand Down Expand Up @@ -133,7 +135,7 @@ MODEL = "gpt-5-nano"

Here you convert the Arcade tools to LangChain tools. You use the `arcade_schema_to_pydantic` function to convert the Arcade tool definition to a Pydantic model, and then use the moddel to define a `StructuredTool` and create a LangChain tool.

The `arcade_to_langchain` function wraps the Arcade client and dynamically creates a `tool_function` that executes the tool and handles the authorization flow using the `interrupt` function. Once the tool is authorized, the `tool_function` will use the Arcade client to execute the tool with the provided arguments, and handle any errors that may occur.
The `arcade_to_langchain` function wraps the Arcade client and dynamically creates a `tool_function` that executes the tool and handles the authorization flow using the `interrupt` function. Once the tool authorization completes, the `tool_function` will use the Arcade client to execute the tool with the provided arguments, and handle any errors that may occur.

```python filename="main.py"
TYPE_MAPPING = {
Expand All @@ -159,7 +161,7 @@ def arcade_schema_to_pydantic(tool_def: ToolDefinition) -> type[BaseModel]:
if param_type is list and param.value_schema.inner_val_type:
inner_type: type[Any] = get_python_type(param.value_schema.inner_val_type)
param_type = list[inner_type]
param_description = param.description or "No description provided."
param_description = param.description or "We provide no description."
default = ... if param.required else None
fields[param.name] = (
param_type,
Expand All @@ -182,7 +184,7 @@ async def arcade_to_langchain(
async def tool_function(config: RunnableConfig, **kwargs: Any) -> Any:
user_id = config.get("configurable", {}).get("user_id") if config else None
if not user_id:
raise ValueError("User ID is required to execute Arcade tools")
raise ValueError("You must provide User ID to execute Arcade tools")

auth_response = await arcade_client.tools.authorize(
tool_name=arcade_tool.qualified_name,
Expand Down Expand Up @@ -247,7 +249,7 @@ async def arcade_to_langchain(

### Write a helper function to get Arcade tools in LangChain format

In this helper function you use the Arcade client to retrieved the tools you configured at the beginning of the `main.py` file. You will use a dictionary to store the tools and avoid possible duplicates that may occur if you retrieve the same tool in the `TOOLS` and `MCP_SERVERS` variables. After retrieving all the tools, you will call the `arcade_to_langchain` function to convert the Arcade tools to LangChain tools.
In this helper function you use the Arcade client to retrieve the tools you configured at the beginning of the `main.py` file. You will use a dictionary to store the tools and avoid possible duplicates that may occur if you retrieve the same tool in the `TOOLS` and `MCP_SERVERS` variables. After retrieving all the tools, you will call the `arcade_to_langchain` function to convert the Arcade tools to LangChain tools.

```python filename="main.py"
async def get_arcade_tools(
Expand Down Expand Up @@ -291,7 +293,7 @@ async def get_arcade_tools(

### Write the interrupt handler

In LangChain, each interrupt needs to be "resolved" for the flow to continue. In response to an interrupt, you need to return a decision object with the information needed to resolve the interrupt. In this case, the decision is whether the authorization was successful, in which case the tool call will be retried, or if the authorization failed, the flow will be interrupted with an error, and the agent will decide what to do next.
In LangChain, you need to "resolve" each interrupt for the flow to continue. In response to an interrupt, you need to return a decision object with the information needed to resolve the interrupt. In this case, the decision is whether the authorization was successful, in which case the system will retry the tool call, or if the authorization failed, the flow will interrupt with an error, and the agent will decide what to do next.

This helper function receives an interrupt and returns a decision object. Decision objects can be of any serializable type (convertible to JSON). In this case, you return a dictionary with a boolean flag indicating if the authorization was successful.

Expand Down Expand Up @@ -343,7 +345,7 @@ async def handle_authorization_interrupt(

### Write the invoke helper

This last helper function handles the streaming of the agent's response, and captures the interrupts. When an interrupt is detected, it is added to the `interrupts` array, and the flow is interrupted. If there are no interrupts, it will just stream the agent's to your console.
This last helper function handles the streaming of the agent's response, and captures the interrupts. When the system detects an interrupt, it adds the interrupt to the `interrupts` array, and the flow stops. If there are no interrupts, it will just stream the agent's response to your console.

```python filename="main.py"
async def stream_agent_response(agent, input_data, config) -> List[Any]:
Expand Down Expand Up @@ -381,7 +383,7 @@ async def stream_agent_response(agent, input_data, config) -> List[Any]:

Finally, write the main function that will create the agent, initialize the conversation, and handle the user input.

Here the `config` object is used to configure the `thread_id`, which tells the agent to store the state of the conversation into that specific thread. In the main function you will also initialize the checkpointer, and handle route the interrupts to the handles you wrote earlier. Notice how a single turn of the agentic loop may have multiple interrupts, and you need to handle them all before continuing to the next turn.
Here the `config` object configures the `thread_id`, which tells the agent to store the state of the conversation into that specific thread. In the main function you will also initialize the checkpointer, and handle route the interrupts to the handlers you wrote earlier. Notice how a single turn of the agentic loop may have multiple interrupts, and you need to handle them all before continuing to the next turn.

```python filename="main.py"
async def main():
Expand Down Expand Up @@ -448,7 +450,7 @@ async def main():

# Handle interrupts if any occurred
if interrupts:
print(f"\n⚠️ Detected {len(interrupts)} interrupt(s)\n")
print(f"\n⚠️ Detected {len(interrupts)} interrupt\n")

# Process each interrupt
for interrupt_obj in interrupts:
Expand Down Expand Up @@ -497,15 +499,15 @@ You should see the agent responding to your prompts like any model, as well as h

## Key takeaways

- Arcade tools can be integrated into any agentic framework like LangChain, all you need is to transform the Arcade tools into LangChain tools and handle the authorization flow.
- You can integrate Arcade tools into any agentic framework like LangChain, all you need is to transform the Arcade tools into LangChain tools and handle the authorization flow.
- Context isolation: By handling the authorization flow outside of the agent's context, you remove the risk of the LLM replacing the authorization URL or leaking it, and you keep the context free from any authorization-related traces, which reduces the risk of hallucinations.
- You can leverage the interrupts mechanism to handle human intervention in the agent's flow, useful for authorization flows, policy enforcement, or anything else that requires input from the user.

## Next Steps
## Next steps

1. Try adding additional tools to the agent or modifying the tools in the catalog for a different use case by modifying the `MCP_SERVERS` and `TOOLS` variables.
2. Try refactoring the `handle_authorization_interrupt` function to handle more complex flows, such as human-in-the-loop.
3. Try implementing a fully deterministic flow before the agentic loop, use this deterministic phase to prepare the context for the agent, adding things like the current date, time, or any other information that is relevant to the task at hand.
3. Try implementing a fully deterministic flow before the agentic loop, you can use this deterministic phase to prepare the context for the agent, adding things like the current date, time, or any other information that is relevant to the task at hand.

## Example code

Expand Down Expand Up @@ -569,7 +571,7 @@ def arcade_schema_to_pydantic(tool_def: ToolDefinition) -> type[BaseModel]:
if param_type is list and param.value_schema.inner_val_type:
inner_type: type[Any] = get_python_type(param.value_schema.inner_val_type)
param_type = list[inner_type]
param_description = param.description or "No description provided."
param_description = param.description or "We provide no description."
default = ... if param.required else None
fields[param.name] = (
param_type,
Expand All @@ -593,7 +595,7 @@ async def arcade_to_langchain(
async def tool_function(config: RunnableConfig, **kwargs: Any) -> Any:
user_id = config.get("configurable", {}).get("user_id") if config else None
if not user_id:
raise ValueError("User ID is required to execute Arcade tools")
raise ValueError("You must provide User ID to execute Arcade tools")

auth_response = await arcade_client.tools.authorize(
tool_name=arcade_tool.qualified_name,
Expand Down Expand Up @@ -750,116 +752,4 @@ async def stream_agent_response(agent, input_data, config) -> List[Any]:
for msg in node_output["messages"]:
# Tool calls from the AI
if isinstance(msg, AIMessage) and msg.tool_calls:
for tool_call in msg.tool_calls:
print(f"🔧 Calling tool: {tool_call['name']}")

# Tool response - just acknowledge it, don't dump the content
elif isinstance(msg, ToolMessage):
print(f" ✓ {msg.name} completed, processing output...")

# Final AI response text
elif isinstance(msg, AIMessage) and msg.content:
print(f"\n🤖 Assistant:\n{msg.content}")

return interrupts


async def main():
# Initialize Arcade client
arcade = AsyncArcade()

# Get tools
all_tools = await get_arcade_tools(arcade_client=arcade,
mcp_servers=MCP_SERVERS, tools=TOOLS)

# Initialize LLM
model = ChatOpenAI(
model=MODEL,
api_key=os.getenv("OPENAI_API_KEY")
)

# Create agent with memory checkpointer
memory = MemorySaver()
agent = create_agent(
system_prompt=SYSTEM_PROMPT,
model=model,
tools=all_tools,
checkpointer=memory
)

print(f"\n🤖 Agent created with {len(all_tools)} tools")
print("Type 'quit' or 'exit' to end the conversation.\n")
print("="*70)

# Configuration for agent execution
config = {
"configurable": {
"thread_id": "conversation_thread",
"user_id": ARCADE_USER_ID
}
}

# Interactive conversation loop
while True:
# Get user input
try:
user_message = input("\n💬 You: ").strip()
except (EOFError, KeyboardInterrupt):
print("\n\n👋 Goodbye!")
break

# Check for exit commands
if not user_message:
continue
if user_message.lower() in ("quit", "exit", "q"):
print("\n👋 Goodbye!")
break

print("="*70)

# Start with user message
current_input = {"messages": [{"role": "user", "content": user_message}]}

# Agent execution loop with interrupt handling
while True:
print("\n🔄 Running agent...\n")

interrupts = await stream_agent_response(agent, current_input, config)

# Handle interrupts if any occurred
if interrupts:
print(f"\n⚠️ Detected {len(interrupts)} interrupt(s)\n")

# Process each interrupt
for interrupt_obj in interrupts:
interrupt_type = interrupt_obj.value.get("type")

if interrupt_type == "authorization_required":
# Handle authorization interrupt
decision = await handle_authorization_interrupt(
interrupt_obj.value,
arcade
)

# Resume agent with authorization decision
current_input = Command(resume=decision)
break # Continue to next iteration
else:
print(f"❌ Unknown interrupt type: {interrupt_type}")
break
else:
# All interrupts processed without break
break
else:
# No interrupts - agent completed successfully
print("\n✅ Response complete!")
break

print("\n" + "="*70)


if __name__ == "__main__":
asyncio.run(main())
```

</details>
for tool_call in
Loading