AI Agents Masterclass #7 - LangGraph Guide

This commit is contained in:
Cole Medin
2024-08-04 17:24:41 -05:00
parent bf8d518c82
commit f0aef8ff6e
5 changed files with 442 additions and 0 deletions

View File

@@ -0,0 +1,30 @@
# Rename this file to .env once you have filled in the below environment variables!
# Get your Open AI API Key by following these instructions -
# https://help.openai.com/en/articles/4936850-where-do-i-find-my-openai-api-key
# You only need this environment variable set if you set LLM_MODEL to a GPT model
OPENAI_API_KEY=
# Get your Anthropic API Key in your account settings -
# https://console.anthropic.com/settings/keys
# You only need this environment variable set if you set LLM_MODEL to a Claude model
ANTHROPIC_API_KEY=
# See all Open AI models you can use here -
# https://platform.openai.com/docs/models
# And all Anthropic models you can use here -
# https://docs.anthropic.com/en/docs/about-claude/models
# A good default to go with here is gpt-4o or claude-3-5-sonnet-20240620
LLM_MODEL=gpt-4o
# Get your personal Asana access token through the developer console in Asana.
# Feel free to follow these instructions -
# https://developers.asana.com/docs/personal-access-token
ASANA_ACCESS_TOKEN=
# The Asana workspace ID is in the URL when you visit your Asana Admin Console (when logged in).
# Go to the URL "https://app.asana.com/admin" and then your workspace ID
# will appear in the URL as a slew of digits once the site loads.
# If your URL is https://app.asana.com/admin/987654321/insights, then your
# Asana workspace ID is 987654321
ASANA_WORKPLACE_ID=

View File

@@ -0,0 +1,88 @@
from datetime import datetime
import streamlit as st
import asyncio
import json
import uuid
import os
from langchain_core.messages import SystemMessage, AIMessage, HumanMessage, ToolMessage
from runnable import get_runnable
@st.cache_resource
def create_chatbot_instance():
return get_runnable()
chatbot = create_chatbot_instance()
@st.cache_resource
def get_thread_id():
return str(uuid.uuid4())
thread_id = get_thread_id()
system_message = f"""
You are a personal assistant who helps manage tasks in Asana.
You never give IDs to the user since those are just for you to keep track of.
When a user asks to create a task and you don't know the project to add it to for sure, clarify with the user.
The current date is: {datetime.now().date()}
"""
async def prompt_ai(messages):
config = {
"configurable": {
"thread_id": thread_id
}
}
async for event in chatbot.astream_events(
{"messages": messages}, config, version="v2"
):
if event["event"] == "on_chat_model_stream":
yield event["data"]["chunk"].content
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
# ~~~~~~~~~~~~~~~~~~ Main Function with UI Creation ~~~~~~~~~~~~~~~~~~~~
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
async def main():
st.title("Asana Chatbot with LangGraph")
# Initialize chat history
if "messages" not in st.session_state:
st.session_state.messages = [
SystemMessage(content=system_message)
]
# Display chat messages from history on app rerun
for message in st.session_state.messages:
message_json = json.loads(message.json())
message_type = message_json["type"]
if message_type in ["human", "ai", "system"]:
with st.chat_message(message_type):
st.markdown(message_json["content"])
# React to user input
if prompt := st.chat_input("What would you like to do today?"):
# Display user message in chat message container
st.chat_message("user").markdown(prompt)
# Add user message to chat history
st.session_state.messages.append(HumanMessage(content=prompt))
# Display assistant response in chat message container
response_content = ""
with st.chat_message("assistant"):
message_placeholder = st.empty() # Placeholder for updating the message
# Run the async generator to fetch responses
async for chunk in prompt_ai(st.session_state.messages):
response_content += chunk
# Update the placeholder with the current response content
message_placeholder.markdown(response_content)
st.session_state.messages.append(AIMessage(content=response_content))
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -0,0 +1,10 @@
asana==5.0.7
python-dotenv==0.13.0
langchain==0.2.12
langchain-anthropic==0.1.22
langchain-community==0.2.11
langchain-core==0.2.28
langchain-openai==0.1.20
streamlit==1.36.0
langgraph==0.1.19
aiosqlite==0.20.0

View File

@@ -0,0 +1,123 @@
from langgraph.graph.message import AnyMessage, add_messages
from langgraph.checkpoint.aiosqlite import AsyncSqliteSaver
from langchain_core.runnables import RunnableConfig
from langgraph.graph import END, StateGraph
from typing_extensions import TypedDict
from typing import Annotated, Literal, Dict
from dotenv import load_dotenv
import os
from langchain_openai import ChatOpenAI
from langchain_anthropic import ChatAnthropic
from langchain_core.messages import ToolMessage
from tools import available_functions
load_dotenv()
model = os.getenv('LLM_MODEL', 'gpt-4o')
tools = [tool for _, tool in available_functions.items()]
chatbot = ChatOpenAI(model=model, streaming=True) if "gpt" in model.lower() else ChatAnthropic(model=model, streaming=True)
chatbot_with_tools = chatbot.bind_tools(tools)
### State
class GraphState(TypedDict):
"""
Represents the state of our graph.
Attributes:
messages: List of chat messages.
"""
messages: Annotated[list[AnyMessage], add_messages]
async def call_model(state: GraphState, config: RunnableConfig) -> Dict[str, AnyMessage]:
"""
Function that calls the model to generate a response.
Args:
state (GraphState): The current graph state
Returns:
dict: The updated state with a new AI message
"""
print("---CALL MODEL---")
messages = state["messages"]
# Invoke the chatbot with the binded tools
response = await chatbot_with_tools.ainvoke(messages, config)
print("Response from model:", response)
# We return an object because this will get added to the existing list
return {"messages": response}
def tool_node(state: GraphState) -> Dict[str, AnyMessage]:
"""
Function that handles all tool calls.
Args:
state (GraphState): The current graph state
Returns:
dict: The updated state with tool messages
"""
print("---TOOL NODE---")
messages = state["messages"]
last_message = messages[-1] if messages else None
outputs = []
if last_message and last_message.tool_calls:
for call in last_message.tool_calls:
tool = available_functions.get(call['name'], None)
if tool is None:
raise Exception(f"Tool '{call['name']}' not found.")
output = tool.invoke(call['args'])
outputs.append(ToolMessage(
output if isinstance(output, str) else json.dumps(output),
tool_call_id=call['id']
))
return {'messages': outputs}
def should_continue(state: GraphState) -> Literal["__end__", "tools"]:
"""
Determine whether to continue or end the workflow based on if there are tool calls to make.
Args:
state (GraphState): The current graph state
Returns:
str: The next node to execute or END
"""
print("---SHOULD CONTINUE---")
messages = state["messages"]
last_message = messages[-1] if messages else None
# If there is no function call, then we finish
if not last_message or not last_message.tool_calls:
return END
else:
return "tools"
def get_runnable():
workflow = StateGraph(GraphState)
# Define the nodes and how they connect
workflow.add_node("agent", call_model)
workflow.add_node("tools", tool_node)
workflow.set_entry_point("agent")
workflow.add_conditional_edges(
"agent",
should_continue
)
workflow.add_edge("tools", "agent")
# Compile the LangGraph graph into a runnable
memory = AsyncSqliteSaver.from_conn_string(":memory:")
app = workflow.compile(checkpointer=memory)
return app

191
7-langgraph-agent/tools.py Normal file
View File

@@ -0,0 +1,191 @@
import asana
from asana.rest import ApiException
from dotenv import load_dotenv
import json
import os
from langchain_core.tools import tool
load_dotenv()
configuration = asana.Configuration()
configuration.access_token = os.getenv('ASANA_ACCESS_TOKEN', '')
api_client = asana.ApiClient(configuration)
# create an instance of the different Asana API classes
projects_api_instance = asana.ProjectsApi(api_client)
tasks_api_instance = asana.TasksApi(api_client)
workspace_gid = os.getenv("ASANA_WORKPLACE_ID", "")
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
# ~~~~~~~~~~~~~~~~~~~~~ AI Agent Tool Functions ~~~~~~~~~~~~~~~~~~~~~~~~
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
@tool
def create_asana_task(task_name, project_gid, due_on="today"):
"""
Creates a task in Asana given the name of the task and when it is due
Example call:
create_asana_task("Test Task", "2024-06-24")
Args:
task_name (str): The name of the task in Asana
project_gid (str): The ID of the project to add the task to
due_on (str): The date the task is due in the format YYYY-MM-DD. If not given, the current day is used
Returns:
str: The API response of adding the task to Asana or an error message if the API call threw an error
"""
if due_on == "today":
due_on = str(datetime.now().date())
task_body = {
"data": {
"name": task_name,
"due_on": due_on,
"projects": [project_gid]
}
}
try:
api_response = tasks_api_instance.create_task(task_body, {})
return json.dumps(api_response, indent=2)
except ApiException as e:
return f"Exception when calling TasksApi->create_task: {e}"
@tool
def get_asana_projects():
"""
Gets all of the projects in the user's Asana workspace
Returns:
str: The API response from getting the projects or an error message if the projects couldn't be fetched.
The API response is an array of project objects, where each project object looks like:
{'gid': '1207789085525921', 'name': 'Project Name', 'resource_type': 'project'}
"""
opts = {
'limit': 50, # int | Results per page. The number of objects to return per page. The value must be between 1 and 100.
'workspace': workspace_gid, # str | The workspace or organization to filter projects on.
'archived': False # bool | Only return projects whose `archived` field takes on the value of this parameter.
}
try:
api_response = projects_api_instance.get_projects(opts)
return json.dumps(list(api_response), indent=2)
except ApiException as e:
return "Exception when calling ProjectsApi->create_project: %s\n" % e
@tool
def create_asana_project(project_name, due_on=None):
"""
Creates a project in Asana given the name of the project and optionally when it is due
Example call:
create_asana_project("Test Project", "2024-06-24")
Args:
project_name (str): The name of the project in Asana
due_on (str): The date the project is due in the format YYYY-MM-DD. If not supplied, the project is not given a due date
Returns:
str: The API response of adding the project to Asana or an error message if the API call threw an error
"""
body = {
"data": {
"name": project_name, "due_on": due_on, "workspace": workspace_gid
}
} # dict | The project to create.
try:
# Create a project
api_response = projects_api_instance.create_project(body, {})
return json.dumps(api_response, indent=2)
except ApiException as e:
return "Exception when calling ProjectsApi->create_project: %s\n" % e
@tool
def get_asana_tasks(project_gid):
"""
Gets all the Asana tasks in a project
Example call:
get_asana_tasks("1207789085525921")
Args:
project_gid (str): The ID of the project in Asana to fetch the tasks for
Returns:
str: The API response from fetching the tasks for the project in Asana or an error message if the API call threw an error
The API response is an array of tasks objects where each task object is in the format:
{'gid': '1207780961742158', 'created_at': '2024-07-11T16:25:46.380Z', 'due_on': None or date in format "YYYY-MM-DD", 'name': 'Test Task'}
"""
opts = {
'limit': 50, # int | Results per page. The number of objects to return per page. The value must be between 1 and 100.
'project': project_gid, # str | The project to filter tasks on.
'opt_fields': "created_at,name,due_on", # list[str] | This endpoint returns a compact resource, which excludes some properties by default. To include those optional properties, set this query parameter to a comma-separated list of the properties you wish to include.
}
try:
# Get multiple tasks
api_response = tasks_api_instance.get_tasks(opts)
return json.dumps(list(api_response), indent=2)
except ApiException as e:
return "Exception when calling TasksApi->get_tasks: %s\n" % e
@tool
def update_asana_task(task_gid, data):
"""
Updates a task in Asana by updating one or both of completed and/or the due date
Example call:
update_asana_task("1207780961742158", {"completed": True, "due_on": "2024-07-13"})
Args:
task_gid (str): The ID of the task to update
data (dict): A dictionary with either one or both of the keys 'completed' and/or 'due_on'
If given, completed needs to be either True or False.
If given, the due date needs to be in the format 'YYYY-MM-DD'.
Returns:
str: The API response of updating the task or an error message if the API call threw an error
"""
# Data: {"completed": True or False, "due_on": "YYYY-MM-DD"}
body = {"data": data} # dict | The task to update.
try:
# Update a task
api_response = tasks_api_instance.update_task(body, task_gid, {})
return json.dumps(api_response, indent=2)
except ApiException as e:
return "Exception when calling TasksApi->update_task: %s\n" % e
@tool
def delete_task(task_gid):
"""
Deletes a task in Asana
Example call:
delete_task("1207780961742158")
Args:
task_gid (str): The ID of the task to delete
Returns:
str: The API response of deleting the task or an error message if the API call threw an error
"""
try:
# Delete a task
api_response = tasks_api_instance.delete_task(task_gid)
return json.dumps(api_response, indent=2)
except ApiException as e:
return "Exception when calling TasksApi->delete_task: %s\n" % e
# Maps the function names to the actual function object in the script
# This mapping will also be used to create the list of tools to bind to the agent
available_functions = {
"create_asana_task": create_asana_task,
"get_asana_projects": get_asana_projects,
"create_asana_project": create_asana_project,
"get_asana_tasks": get_asana_tasks,
"update_asana_task": update_asana_task,
"delete_task": delete_task
}