openai/chatkit-python

Public

mirrored from https://github.com/openai/chatkit-pythonAvailable

CodeCommitsIssuesPull requestsActionsInsightsSecurity
main

Branches

Tags

  • No tags available.
0Branches0Tags
Go to file
Add file
Code

Clone

HTTPS

Download ZIP

docs/guides/respond-to-user-message.md

329lines · modecode

1# Respond to a user message
2
3This guide covers how to implement and run a ChatKit server that responds to user messages, including thread loading, inference, event streaming, and persistence.
4
5## Install ChatKit
6
7Install the SDK from PyPI:
8
9```bash
10pip install openai-chatkit
11```
12
13## Build and run your ChatKit server
14
15Your ChatKit server does three main things:
16
171. Accept HTTP requests from your client.
182. Construct a request context (user id, auth, feature flags, etc.).
193. Call `ChatKitServer.respond` to produce streamed events.
20
21### Define a request context
22
23First, define a small context object that will be created per request and passed through your server, store, and agents:
24
25```python
26from dataclasses import dataclass
27
28
29@dataclass
30class MyRequestContext:
31 user_id: str
32```
33
34### Implement your `ChatKitServer`
35
36Subclass `ChatKitServer` and implement `respond`. It runs once per user turn and should yield the events that make up your response. We'll keep this example simple for now and fill in history loading and model calls in later sections.
37
38```python
39from collections.abc import AsyncIterator
40from datetime import datetime
41
42from chatkit.server import ChatKitServer
43from chatkit.types import (
44 AssistantMessageContent,
45 AssistantMessageItem,
46 ThreadItemDoneEvent,
47 ThreadMetadata,
48 ThreadStreamEvent,
49 UserMessageItem,
50)
51
52
53class MyChatKitServer(ChatKitServer[MyRequestContext]):
54 async def respond(
55 self,
56 thread: ThreadMetadata,
57 input: UserMessageItem | None,
58 context: MyRequestContext,
59 ) -> AsyncIterator[ThreadStreamEvent]:
60 # Replace this with your inference pipeline.
61 yield ThreadItemDoneEvent(
62 item=AssistantMessageItem(
63 thread_id=thread.id,
64 id=self.store.generate_item_id("message", thread, context),
65 created_at=datetime.now(),
66 content=[AssistantMessageContent(text="Hi there!")],
67 )
68 )
69```
70
71### Wire ChatKit to your web framework
72
73Expose a single `/chatkit` endpoint that forwards requests to your `MyChatKitServer` instance. For example, with FastAPI:
74
75```python
76from fastapi import FastAPI, Request, Response
77from fastapi.responses import StreamingResponse
78
79from chatkit.server import ChatKitServer, StreamingResult
80
81app = FastAPI()
82store = MyPostgresStore(conn_info)
83server = MyChatKitServer(store)
84
85
86@app.post("/chatkit")
87async def chatkit_endpoint(request: Request):
88 # Build a per-request context from the incoming HTTP request.
89 context = MyRequestContext(user_id="abc123")
90
91 # Let ChatKit handle the request and return either a streaming or JSON result.
92 result = await server.process(await request.body(), context)
93 if isinstance(result, StreamingResult):
94 return StreamingResponse(result, media_type="text/event-stream")
95 return Response(content=result.json, media_type="application/json")
96```
97
98### How request context flows into ChatKit
99
100`ChatKitServer[TContext]` and `Store[TContext]` are generic over a request context type you choose (user id, org, auth scopes, feature flags). Construct it per request and pass it to `server.process`; it flows into `respond` and your store methods.
101
102```python
103context = MyRequestContext(user_id="abc123")
104result = await server.process(await request.body(), context)
105```
106
107Request metadata in the payload is available before calling `process`; include it in your context for auth, tracing, or feature flags.
108
109## Implement your ChatKit data store
110
111Implement the `Store` interface to control how threads, messages, tool calls, and widgets are stored. Prefer serializing thread items as JSON so schema changes do not break storage. Example Postgres store:
112
113```python
114class MyPostgresStore(Store[RequestContext]):
115 def __init__(self, conninfo: str) -> None:
116 self._conninfo = conninfo
117 self._init_schema()
118
119 def _init_schema(self) -> None:
120 with self._connection() as conn, conn.cursor() as cur:
121 cur.execute(
122 """
123 CREATE TABLE IF NOT EXISTS threads (
124 id TEXT PRIMARY KEY,
125 user_id TEXT NOT NULL,
126 created_at TIMESTAMPTZ NOT NULL,
127 data JSONB NOT NULL
128 );
129 """
130 )
131
132 cur.execute(
133 """
134 CREATE TABLE IF NOT EXISTS items (
135 id TEXT PRIMARY KEY,
136 thread_id TEXT NOT NULL
137 REFERENCES threads (id)
138 ON DELETE CASCADE,
139 user_id TEXT NOT NULL,
140 created_at TIMESTAMPTZ NOT NULL,
141 data JSONB NOT NULL
142 );
143 """
144 )
145
146 conn.commit()
147
148 async def load_thread(
149 self, thread_id: str, context: RequestContext
150 ) -> ThreadMetadata:
151 with self._connection() as conn, conn.cursor(row_factory=tuple_row) as cur:
152 cur.execute(
153 "SELECT data FROM threads WHERE id = %s AND user_id = %s",
154 (thread_id, context.user_id),
155 )
156 row = cur.fetchone()
157 if row is None:
158 raise NotFoundError(f"Thread {thread_id} not found")
159 return ThreadMetadata.model_validate(row[0])
160
161 async def save_thread(
162 self, thread: ThreadMetadata, context: RequestContext
163 ) -> None:
164 payload = thread.model_dump(mode="json")
165 with self._connection() as conn, conn.cursor() as cur:
166 cur.execute(
167 """
168 INSERT INTO threads (id, user_id, created_at, data)
169 VALUES (%s, %s, %s, %s)
170 """,
171 (thread.id, context.user_id, thread.created_at, payload),
172 )
173 conn.commit()
174
175 # Implement the remaining Store methods following the same pattern.
176```
177
178Customize ID generation by overriding `generate_thread_id` and `generate_item_id` if you need external or deterministic IDs. Store metadata such as a model `last_response_id` on `ThreadMetadata` to drive your inference pipeline.
179
180## Generate a response using your model
181
182Inside `respond`, you'll usually:
183
1841. Load recent thread history.
1852. Prepare model input for your agent.
1863. Run inference and stream events back to the client.
187
188### Prepare model input
189
190Before you call your model, decide what conversation context you want the model to see.
191
192#### Load thread history (recommended default)
193
194Fetch recent items so the model sees the conversation state before you build the next turn:
195
196```python
197items_page = await self.store.load_thread_items(
198 thread.id,
199 after=None,
200 limit=20, # Tune this limit based on your model/context budget.
201 order="desc",
202 context=context,
203)
204items = list(reversed(items_page.data))
205```
206
207- Start with the defaults: `simple_to_agent_input(items)` is a convenience wrapper around the **default** `ThreadItemConverter` (it calls `ThreadItemConverter().to_agent_input(items)` under the hood).
208- Customize for your integration: some item types require app-specific translation into model input (for example: **attachments**, **tags**, and some **hidden context items**). In those cases, subclass `ThreadItemConverter` and call your converter directly instead of `simple_to_agent_input`. (If your thread includes attachments/tags and you haven't implemented the converter hooks, conversion will raise `NotImplementedError`.)
209
210```python
211from agents import Runner
212
213from chatkit.agents import AgentContext, ThreadItemConverter, simple_to_agent_input
214
215
216# Option A (defaults):
217input_items = await simple_to_agent_input(items)
218
219# Option B (your integration-specific converter):
220# input_items = await MyThreadItemConverter().to_agent_input(items)
221
222agent_context = AgentContext(
223 thread=thread,
224 store=self.store,
225 request_context=context,
226)
227```
228
229Respect any `input.inference_options` the client sends (model, tool choice, etc.) when you build your request to the model.
230
231#### Using `previous_response_id` (OpenAI Responses API only)
232
233If you are using OpenAI models through the **Responses API**, you can pass `previous_response_id` to `Runner.run_streamed(...)` and (often) send only the *new* user message as model input. This can simplify input construction when the provider can retrieve prior context server-side.
234
235Terminology note: Agents exposes the ID of the most recent model response as `result.last_response_id`. On the *next* turn, you pass that saved value as the `previous_response_id` parameter.
236
237**Important restrictions:**
238
239- **OpenAI Responses API only.** Other model providers won't be able to follow a `previous_response_id`, so you must send thread history yourself.
240- **Only includes model-visible history.** If your integration streams ChatKit-only items (e.g. widgets/workflows emitted directly to the client), the model won't know about them unless you also include them in `input_items`.
241- **Works only while the referenced response is retrievable.** Persist `result.last_response_id` and ensure responses are stored (`store=True` / `ModelSettings(store=True)`); otherwise fall back to rebuilding input from thread items.
242
243Example:
244
245```python
246# `ThreadMetadata.metadata` is a free-form dict for integration-specific state.
247# ChatKit does not define a first-class `last_response_id` field on `ThreadMetadata`;
248# your integration can store it under a key and reuse it on the next turn.
249last_response_id = thread.metadata.get("last_response_id")
250last_response_id = last_response_id if isinstance(last_response_id, str) else None
251
252# Often: send only the new message as input when chaining on the server.
253input_items = await simple_to_agent_input(input)
254
255result = Runner.run_streamed(
256 assistant_agent,
257 input_items,
258 context=agent_context,
259 previous_response_id=last_response_id,
260 auto_previous_response_id=True,
261)
262
263# Persist the new response ID so the next turn can chain again.
264if result.last_response_id:
265 thread.metadata["last_response_id"] = result.last_response_id
266 await self.store.save_thread(thread, context=context)
267```
268
269### Run inference and stream events
270
271Run your agent and stream events back to the client. `stream_agent_response` converts an Agents run into ChatKit events; you can also yield events manually.
272
273```python
274from agents import (
275 InputGuardrailTripwireTriggered,
276 OutputGuardrailTripwireTriggered,
277 Runner,
278)
279from chatkit.agents import stream_agent_response
280from chatkit.types import ErrorEvent
281
282result = Runner.run_streamed(
283 assistant_agent,
284 input_items,
285 context=agent_context,
286)
287
288try:
289 async for event in stream_agent_response(agent_context, result):
290 yield event
291except InputGuardrailTripwireTriggered:
292 yield ErrorEvent(message="We blocked that message for safety.")
293except OutputGuardrailTripwireTriggered:
294 yield ErrorEvent(
295 message="The assistant response was blocked.",
296 allow_retry=False,
297 )
298```
299
300To stream events from a server tool during the same turn, use `ctx.context.stream(...)` inside the tool:
301
302```python
303from agents import RunContextWrapper, function_tool
304from chatkit.agents import AgentContext
305from chatkit.types import ProgressUpdateEvent
306
307
308@function_tool()
309async def load_document(ctx: RunContextWrapper[AgentContext], document_id: str):
310 await ctx.context.stream(ProgressUpdateEvent(icon="document", text="Loading document..."))
311 return await get_document_by_id(document_id)
312```
313
314`stream_agent_response` will forward these events alongside any assistant text or tool call updates. Client tool calls are also supported via `ctx.context.client_tool_call` when you register the tool on both client and server.
315
316## Next: add features
317
318- [Let users browse past threads](browse-past-threads.md)
319- [Accept rich user input](accept-rich-user-input.md)
320- [Let users pick tools and models](let-users-pick-tools-and-models.md)
321- [Pass extra app context to your model](pass-extra-app-context-to-your-model.md)
322- [Update the client during a response](update-client-during-response.md)
323- [Build interactive responses with widgets](build-interactive-responses-with-widgets.md)
324- [Add annotations in assistant messages](add-annotations.md)
325- [Stream generated images](stream-generated-images.md)
326- [Keep your app in sync with ChatKit](keep-your-app-in-sync-with-chatkit.md)
327- [Let your app draft and send messages](let-your-app-draft-and-send-messages.md)
328- [Handle feedback](handle-feedback.md)
329- [Prepare your app for production](prepare-your-app-for-production.md)
330