Trace using the LangSmith REST API
It is HIGHLY recommended to use our Python or TypeScript SDKs to send traces to LangSmith. We have designed these SDKs with optimizations like batching and backgrounding to ensure that your application's performance is not impacted by sending traces to LangSmith. However, if you are unable to use our SDKs, you can use the LangSmith REST API to send traces. Performance may be impacted if you send traces synchronously in your application. This guide will show you how to trace a request using the LangSmith REST API. Please view our API documentation here for a full list of endpoints and request/response schemas.
Basic tracing
The simplest way to log runs is via the POST and PATCH /runs
endpoint. These routes expect minimal contextual information about the tree structure to
When using the LangSmith REST API, you will need to provide your API key in the request headers as "x-api-key"
.
In the simple example, you do not need to set the dotted_order
opr trace_id
fields in the request body. These fields will be automatically generated by the system.
Though this is simpler, it is slower and has a lower rate limit in LangSmith.
The following example shows how you might leverage our API directly in Python. The same principles apply to other languages.
import openai
import os
import requests
from datetime import datetime
from uuid import uuid4
# Send your API Key in the request headers
headers = {"x-api-key": os.environ["LANGSMITH_API_KEY"]}
def post_run(run_id, name, run_type, inputs, parent_id=None):
"""Function to post a new run to the API."""
data = {
"id": run_id.hex,
"name": name,
"run_type": run_type,
"inputs": inputs,
"start_time": datetime.utcnow().isoformat(),
}
if parent_id:
data["parent_run_id"] = parent_id.hex
requests.post(
"https://api.smith.langchain.com/runs", # Update appropriately for self-hosted installations or the EU region
json=data,
headers=headers
)
def patch_run(run_id, outputs):
"""Function to patch a run with outputs."""
requests.patch(
f"https://api.smith.langchain.com/runs/{run_id}",
json={
"outputs": outputs,
"end_time": datetime.now(timezone.utc).isoformat(),
},
headers=headers,
)
# This can be a user input to your app
question = "Can you summarize this morning's meetings?"
# This can be retrieved in a retrieval step
context = "During this morning's meeting, we solved all world conflict."
messages = [
{"role": "system", "content": "You are a helpful assistant. Please respond to the user's request only based on the given context."},
{"role": "user", "content": f"Question: {question}\\nContext: {context}"}
]
# Create parent run
parent_run_id = uuid4()
post_run(parent_run_id, "Chat Pipeline", "chain", {"question": question})
# Create child run
child_run_id = uuid4()
post_run(child_run_id, "OpenAI Call", "llm", {"messages": messages}, parent_run_id)
# Generate a completion
client = openai.Client()
chat_completion = client.chat.completions.create(model="gpt-4o-mini", messages=messages)
# End runs
patch_run(child_run_id, chat_completion.dict())
patch_run(parent_run_id, {"answer": chat_completion.choices[0].message.content})
See the doc on the Run (span) data format for more information.
Batch Ingestion
For faster ingestion of runs and higher rate limits, you can use the POST /runs/multipart
link endpoint.
Below is an example. It requires orjson
(for fast json ) and requests_toolbelt
to run
import json
import os
import uuid
from datetime import datetime, timezone
from typing import Dict, List, Optional
import requests
from requests_toolbelt import MultipartEncoder
def create_dotted_order(
start_time: Optional[datetime] = None, run_id: Optional[uuid.UUID] = None
) -> str:
"""Create a dotted order string for run ordering and hierarchy.
The dotted order is used to establish the sequence and relationships between runs.
It combines a timestamp with a unique identifier to ensure proper ordering and tracing.
"""
st = start_time or datetime.now(timezone.utc)
id_ = run_id or uuid.uuid4()
return f"{st.strftime('%Y%m%dT%H%M%S%fZ')}{id_}"
def create_run_base(
name: str, run_type: str, inputs: dict, start_time: datetime
) -> dict:
"""Create the base structure for a run."""
run_id = uuid.uuid4()
return {
"id": str(run_id),
"trace_id": str(run_id),
"name": name,
"start_time": start_time.isoformat(),
"inputs": inputs,
"run_type": run_type,
}
def construct_run(
name: str,
run_type: str,
inputs: dict,
parent_dotted_order: Optional[str] = None,
) -> dict:
"""Construct a run dictionary with the given parameters.
This function creates a run with a unique ID and dotted order, establishing its place
in the trace hierarchy if it's a child run.
"""
start_time = datetime.now(timezone.utc)
run = create_run_base(name, run_type, inputs, start_time)
current_dotted_order = create_dotted_order(start_time, uuid.UUID(run["id"]))
if parent_dotted_order:
current_dotted_order = f"{parent_dotted_order}.{current_dotted_order}"
run["trace_id"] = parent_dotted_order.split(".")[0].split("Z")[1]
run["parent_run_id"] = parent_dotted_order.split(".")[-1].split("Z")[1]
run["dotted_order"] = current_dotted_order
return run
def serialize_run(operation: str, run_data: dict) -> List[tuple]:
"""Serialize a run for the multipart request.
This function separates the run data into parts for efficient transmission and storage.
The main run data and optional fields (inputs, outputs, events) are serialized separately.
"""
run_id = run_data.get("id", str(uuid.uuid4()))
# Separate optional fields
inputs = run_data.pop("inputs", None)
outputs = run_data.pop("outputs", None)
events = run_data.pop("events", None)
parts = []
# Serialize main run data
run_data_json = json.dumps(run_data).encode("utf-8")
parts.append(
(
f"{operation}.{run_id}",
(
None,
run_data_json,
"application/json",
{"Content-Length": str(len(run_data_json))},
),
)
)
# Serialize optional fields
for key, value in [("inputs", inputs), ("outputs", outputs), ("events", events)]:
if value:
serialized_value = json.dumps(value).encode("utf-8")
parts.append(
(
f"{operation}.{run_id}.{key}",
(
None,
serialized_value,
"application/json",
{"Content-Length": str(len(serialized_value))},
),
)
)
return parts
def batch_ingest_runs(
api_url: str,
api_key: str,
posts: Optional[List[dict]] = None,
patches: Optional[List[dict]] = None,
) -> None:
"""Ingest multiple runs in a single batch request.
This function handles both creating new runs (posts) and updating existing runs (patches).
It's more efficient for ingesting multiple runs compared to individual API calls.
"""
boundary = uuid.uuid4().hex
all_parts = []
for operation, runs in zip(("post", "patch"), (posts, patches)):
if runs:
all_parts.extend(
[part for run in runs for part in serialize_run(operation, run)]
)
encoder = MultipartEncoder(fields=all_parts, boundary=boundary)
headers = {"Content-Type": encoder.content_type, "x-api-key": api_key}
try:
response = requests.post(
f"{api_url}/runs/multipart", data=encoder, headers=headers
)
response.raise_for_status()
print("Successfully ingested runs.")
except requests.RequestException as e:
print(f"Error ingesting runs: {e}")
# In a production environment, you might want to log this error or handle it more robustly
# Configure API URL and key
# For production use, consider using a configuration file or environment variables
api_url = "https://api.smith.langchain.com"
api_key = os.environ.get("LANGSMITH_API_KEY")
if not api_key:
raise ValueError("LANGSMITH_API_KEY environment variable is not set")
# Create a parent run
parent_run = construct_run(
name="Parent Run",
run_type="chain",
inputs={"main_question": "Tell me about France"},
)
# Create a child run, linked to the parent
child_run = construct_run(
name="Child Run",
run_type="llm",
inputs={"question": "What is the capital of France?"},
parent_dotted_order=parent_run["dotted_order"],
)
# First, post the runs to create them
posts = [parent_run, child_run]
batch_ingest_runs(api_url, api_key, posts=posts)
# Then, update the runs with their end times and any outputs
child_run_update = {
**child_run,
"end_time": datetime.now(timezone.utc).isoformat(),
"outputs": {"answer": "Paris is the capital of France."},
}
parent_run_update = {
**parent_run,
"end_time": datetime.now(timezone.utc).isoformat(),
"outputs": {"summary": "Discussion about France, including its capital."},
}
patches = [parent_run_update, child_run_update]
batch_ingest_runs(api_url, api_key, patches=patches)
# Note: This example requires the `requests` and `requests_toolbelt` libraries.
# You can install them using pip:
# pip install requests requests_toolbelt