openai/chatkit-python
Publicmirrored from https://github.com/openai/chatkit-pythonAvailable
docs/guides/respond-to-user-message.md
329lines · modecode
| 1 | # Respond to a user message |
| 2 | |
| 3 | This guide covers how to implement and run a ChatKit server that responds to user messages, including thread loading, inference, event streaming, and persistence. |
| 4 | |
| 5 | ## Install ChatKit |
| 6 | |
| 7 | Install the SDK from PyPI: |
| 8 | |
| 9 | ```bash |
| 10 | pip install openai-chatkit |
| 11 | ``` |
| 12 | |
| 13 | ## Build and run your ChatKit server |
| 14 | |
| 15 | Your ChatKit server does three main things: |
| 16 | |
| 17 | 1. Accept HTTP requests from your client. |
| 18 | 2. Construct a request context (user id, auth, feature flags, etc.). |
| 19 | 3. Call `ChatKitServer.respond` to produce streamed events. |
| 20 | |
| 21 | ### Define a request context |
| 22 | |
| 23 | First, define a small context object that will be created per request and passed through your server, store, and agents: |
| 24 | |
| 25 | ```python |
| 26 | from dataclasses import dataclass |
| 27 | |
| 28 | |
| 29 | @dataclass |
| 30 | class MyRequestContext: |
| 31 | user_id: str |
| 32 | ``` |
| 33 | |
| 34 | ### Implement your `ChatKitServer` |
| 35 | |
| 36 | Subclass `ChatKitServer` and implement `respond`. It runs once per user turn and should yield the events that make up your response. We'll keep this example simple for now and fill in history loading and model calls in later sections. |
| 37 | |
| 38 | ```python |
| 39 | from collections.abc import AsyncIterator |
| 40 | from datetime import datetime |
| 41 | |
| 42 | from chatkit.server import ChatKitServer |
| 43 | from chatkit.types import ( |
| 44 | AssistantMessageContent, |
| 45 | AssistantMessageItem, |
| 46 | ThreadItemDoneEvent, |
| 47 | ThreadMetadata, |
| 48 | ThreadStreamEvent, |
| 49 | UserMessageItem, |
| 50 | ) |
| 51 | |
| 52 | |
| 53 | class MyChatKitServer(ChatKitServer[MyRequestContext]): |
| 54 | async def respond( |
| 55 | self, |
| 56 | thread: ThreadMetadata, |
| 57 | input: UserMessageItem | None, |
| 58 | context: MyRequestContext, |
| 59 | ) -> AsyncIterator[ThreadStreamEvent]: |
| 60 | # Replace this with your inference pipeline. |
| 61 | yield ThreadItemDoneEvent( |
| 62 | item=AssistantMessageItem( |
| 63 | thread_id=thread.id, |
| 64 | id=self.store.generate_item_id("message", thread, context), |
| 65 | created_at=datetime.now(), |
| 66 | content=[AssistantMessageContent(text="Hi there!")], |
| 67 | ) |
| 68 | ) |
| 69 | ``` |
| 70 | |
| 71 | ### Wire ChatKit to your web framework |
| 72 | |
| 73 | Expose a single `/chatkit` endpoint that forwards requests to your `MyChatKitServer` instance. For example, with FastAPI: |
| 74 | |
| 75 | ```python |
| 76 | from fastapi import FastAPI, Request, Response |
| 77 | from fastapi.responses import StreamingResponse |
| 78 | |
| 79 | from chatkit.server import ChatKitServer, StreamingResult |
| 80 | |
| 81 | app = FastAPI() |
| 82 | store = MyPostgresStore(conn_info) |
| 83 | server = MyChatKitServer(store) |
| 84 | |
| 85 | |
| 86 | @app.post("/chatkit") |
| 87 | async def chatkit_endpoint(request: Request): |
| 88 | # Build a per-request context from the incoming HTTP request. |
| 89 | context = MyRequestContext(user_id="abc123") |
| 90 | |
| 91 | # Let ChatKit handle the request and return either a streaming or JSON result. |
| 92 | result = await server.process(await request.body(), context) |
| 93 | if isinstance(result, StreamingResult): |
| 94 | return StreamingResponse(result, media_type="text/event-stream") |
| 95 | return Response(content=result.json, media_type="application/json") |
| 96 | ``` |
| 97 | |
| 98 | ### How request context flows into ChatKit |
| 99 | |
| 100 | `ChatKitServer[TContext]` and `Store[TContext]` are generic over a request context type you choose (user id, org, auth scopes, feature flags). Construct it per request and pass it to `server.process`; it flows into `respond` and your store methods. |
| 101 | |
| 102 | ```python |
| 103 | context = MyRequestContext(user_id="abc123") |
| 104 | result = await server.process(await request.body(), context) |
| 105 | ``` |
| 106 | |
| 107 | Request metadata in the payload is available before calling `process`; include it in your context for auth, tracing, or feature flags. |
| 108 | |
| 109 | ## Implement your ChatKit data store |
| 110 | |
| 111 | Implement the `Store` interface to control how threads, messages, tool calls, and widgets are stored. Prefer serializing thread items as JSON so schema changes do not break storage. Example Postgres store: |
| 112 | |
| 113 | ```python |
| 114 | class MyPostgresStore(Store[RequestContext]): |
| 115 | def __init__(self, conninfo: str) -> None: |
| 116 | self._conninfo = conninfo |
| 117 | self._init_schema() |
| 118 | |
| 119 | def _init_schema(self) -> None: |
| 120 | with self._connection() as conn, conn.cursor() as cur: |
| 121 | cur.execute( |
| 122 | """ |
| 123 | CREATE TABLE IF NOT EXISTS threads ( |
| 124 | id TEXT PRIMARY KEY, |
| 125 | user_id TEXT NOT NULL, |
| 126 | created_at TIMESTAMPTZ NOT NULL, |
| 127 | data JSONB NOT NULL |
| 128 | ); |
| 129 | """ |
| 130 | ) |
| 131 | |
| 132 | cur.execute( |
| 133 | """ |
| 134 | CREATE TABLE IF NOT EXISTS items ( |
| 135 | id TEXT PRIMARY KEY, |
| 136 | thread_id TEXT NOT NULL |
| 137 | REFERENCES threads (id) |
| 138 | ON DELETE CASCADE, |
| 139 | user_id TEXT NOT NULL, |
| 140 | created_at TIMESTAMPTZ NOT NULL, |
| 141 | data JSONB NOT NULL |
| 142 | ); |
| 143 | """ |
| 144 | ) |
| 145 | |
| 146 | conn.commit() |
| 147 | |
| 148 | async def load_thread( |
| 149 | self, thread_id: str, context: RequestContext |
| 150 | ) -> ThreadMetadata: |
| 151 | with self._connection() as conn, conn.cursor(row_factory=tuple_row) as cur: |
| 152 | cur.execute( |
| 153 | "SELECT data FROM threads WHERE id = %s AND user_id = %s", |
| 154 | (thread_id, context.user_id), |
| 155 | ) |
| 156 | row = cur.fetchone() |
| 157 | if row is None: |
| 158 | raise NotFoundError(f"Thread {thread_id} not found") |
| 159 | return ThreadMetadata.model_validate(row[0]) |
| 160 | |
| 161 | async def save_thread( |
| 162 | self, thread: ThreadMetadata, context: RequestContext |
| 163 | ) -> None: |
| 164 | payload = thread.model_dump(mode="json") |
| 165 | with self._connection() as conn, conn.cursor() as cur: |
| 166 | cur.execute( |
| 167 | """ |
| 168 | INSERT INTO threads (id, user_id, created_at, data) |
| 169 | VALUES (%s, %s, %s, %s) |
| 170 | """, |
| 171 | (thread.id, context.user_id, thread.created_at, payload), |
| 172 | ) |
| 173 | conn.commit() |
| 174 | |
| 175 | # Implement the remaining Store methods following the same pattern. |
| 176 | ``` |
| 177 | |
| 178 | Customize ID generation by overriding `generate_thread_id` and `generate_item_id` if you need external or deterministic IDs. Store metadata such as a model `last_response_id` on `ThreadMetadata` to drive your inference pipeline. |
| 179 | |
| 180 | ## Generate a response using your model |
| 181 | |
| 182 | Inside `respond`, you'll usually: |
| 183 | |
| 184 | 1. Load recent thread history. |
| 185 | 2. Prepare model input for your agent. |
| 186 | 3. Run inference and stream events back to the client. |
| 187 | |
| 188 | ### Prepare model input |
| 189 | |
| 190 | Before you call your model, decide what conversation context you want the model to see. |
| 191 | |
| 192 | #### Load thread history (recommended default) |
| 193 | |
| 194 | Fetch recent items so the model sees the conversation state before you build the next turn: |
| 195 | |
| 196 | ```python |
| 197 | items_page = await self.store.load_thread_items( |
| 198 | thread.id, |
| 199 | after=None, |
| 200 | limit=20, # Tune this limit based on your model/context budget. |
| 201 | order="desc", |
| 202 | context=context, |
| 203 | ) |
| 204 | items = list(reversed(items_page.data)) |
| 205 | ``` |
| 206 | |
| 207 | - Start with the defaults: `simple_to_agent_input(items)` is a convenience wrapper around the **default** `ThreadItemConverter` (it calls `ThreadItemConverter().to_agent_input(items)` under the hood). |
| 208 | - Customize for your integration: some item types require app-specific translation into model input (for example: **attachments**, **tags**, and some **hidden context items**). In those cases, subclass `ThreadItemConverter` and call your converter directly instead of `simple_to_agent_input`. (If your thread includes attachments/tags and you haven't implemented the converter hooks, conversion will raise `NotImplementedError`.) |
| 209 | |
| 210 | ```python |
| 211 | from agents import Runner |
| 212 | |
| 213 | from chatkit.agents import AgentContext, ThreadItemConverter, simple_to_agent_input |
| 214 | |
| 215 | |
| 216 | # Option A (defaults): |
| 217 | input_items = await simple_to_agent_input(items) |
| 218 | |
| 219 | # Option B (your integration-specific converter): |
| 220 | # input_items = await MyThreadItemConverter().to_agent_input(items) |
| 221 | |
| 222 | agent_context = AgentContext( |
| 223 | thread=thread, |
| 224 | store=self.store, |
| 225 | request_context=context, |
| 226 | ) |
| 227 | ``` |
| 228 | |
| 229 | Respect any `input.inference_options` the client sends (model, tool choice, etc.) when you build your request to the model. |
| 230 | |
| 231 | #### Using `previous_response_id` (OpenAI Responses API only) |
| 232 | |
| 233 | If you are using OpenAI models through the **Responses API**, you can pass `previous_response_id` to `Runner.run_streamed(...)` and (often) send only the *new* user message as model input. This can simplify input construction when the provider can retrieve prior context server-side. |
| 234 | |
| 235 | Terminology note: Agents exposes the ID of the most recent model response as `result.last_response_id`. On the *next* turn, you pass that saved value as the `previous_response_id` parameter. |
| 236 | |
| 237 | **Important restrictions:** |
| 238 | |
| 239 | - **OpenAI Responses API only.** Other model providers won't be able to follow a `previous_response_id`, so you must send thread history yourself. |
| 240 | - **Only includes model-visible history.** If your integration streams ChatKit-only items (e.g. widgets/workflows emitted directly to the client), the model won't know about them unless you also include them in `input_items`. |
| 241 | - **Works only while the referenced response is retrievable.** Persist `result.last_response_id` and ensure responses are stored (`store=True` / `ModelSettings(store=True)`); otherwise fall back to rebuilding input from thread items. |
| 242 | |
| 243 | Example: |
| 244 | |
| 245 | ```python |
| 246 | # `ThreadMetadata.metadata` is a free-form dict for integration-specific state. |
| 247 | # ChatKit does not define a first-class `last_response_id` field on `ThreadMetadata`; |
| 248 | # your integration can store it under a key and reuse it on the next turn. |
| 249 | last_response_id = thread.metadata.get("last_response_id") |
| 250 | last_response_id = last_response_id if isinstance(last_response_id, str) else None |
| 251 | |
| 252 | # Often: send only the new message as input when chaining on the server. |
| 253 | input_items = await simple_to_agent_input(input) |
| 254 | |
| 255 | result = Runner.run_streamed( |
| 256 | assistant_agent, |
| 257 | input_items, |
| 258 | context=agent_context, |
| 259 | previous_response_id=last_response_id, |
| 260 | auto_previous_response_id=True, |
| 261 | ) |
| 262 | |
| 263 | # Persist the new response ID so the next turn can chain again. |
| 264 | if result.last_response_id: |
| 265 | thread.metadata["last_response_id"] = result.last_response_id |
| 266 | await self.store.save_thread(thread, context=context) |
| 267 | ``` |
| 268 | |
| 269 | ### Run inference and stream events |
| 270 | |
| 271 | Run your agent and stream events back to the client. `stream_agent_response` converts an Agents run into ChatKit events; you can also yield events manually. |
| 272 | |
| 273 | ```python |
| 274 | from agents import ( |
| 275 | InputGuardrailTripwireTriggered, |
| 276 | OutputGuardrailTripwireTriggered, |
| 277 | Runner, |
| 278 | ) |
| 279 | from chatkit.agents import stream_agent_response |
| 280 | from chatkit.types import ErrorEvent |
| 281 | |
| 282 | result = Runner.run_streamed( |
| 283 | assistant_agent, |
| 284 | input_items, |
| 285 | context=agent_context, |
| 286 | ) |
| 287 | |
| 288 | try: |
| 289 | async for event in stream_agent_response(agent_context, result): |
| 290 | yield event |
| 291 | except InputGuardrailTripwireTriggered: |
| 292 | yield ErrorEvent(message="We blocked that message for safety.") |
| 293 | except OutputGuardrailTripwireTriggered: |
| 294 | yield ErrorEvent( |
| 295 | message="The assistant response was blocked.", |
| 296 | allow_retry=False, |
| 297 | ) |
| 298 | ``` |
| 299 | |
| 300 | To stream events from a server tool during the same turn, use `ctx.context.stream(...)` inside the tool: |
| 301 | |
| 302 | ```python |
| 303 | from agents import RunContextWrapper, function_tool |
| 304 | from chatkit.agents import AgentContext |
| 305 | from chatkit.types import ProgressUpdateEvent |
| 306 | |
| 307 | |
| 308 | @function_tool() |
| 309 | async def load_document(ctx: RunContextWrapper[AgentContext], document_id: str): |
| 310 | await ctx.context.stream(ProgressUpdateEvent(icon="document", text="Loading document...")) |
| 311 | return await get_document_by_id(document_id) |
| 312 | ``` |
| 313 | |
| 314 | `stream_agent_response` will forward these events alongside any assistant text or tool call updates. Client tool calls are also supported via `ctx.context.client_tool_call` when you register the tool on both client and server. |
| 315 | |
| 316 | ## Next: add features |
| 317 | |
| 318 | - [Let users browse past threads](browse-past-threads.md) |
| 319 | - [Accept rich user input](accept-rich-user-input.md) |
| 320 | - [Let users pick tools and models](let-users-pick-tools-and-models.md) |
| 321 | - [Pass extra app context to your model](pass-extra-app-context-to-your-model.md) |
| 322 | - [Update the client during a response](update-client-during-response.md) |
| 323 | - [Build interactive responses with widgets](build-interactive-responses-with-widgets.md) |
| 324 | - [Add annotations in assistant messages](add-annotations.md) |
| 325 | - [Stream generated images](stream-generated-images.md) |
| 326 | - [Keep your app in sync with ChatKit](keep-your-app-in-sync-with-chatkit.md) |
| 327 | - [Let your app draft and send messages](let-your-app-draft-and-send-messages.md) |
| 328 | - [Handle feedback](handle-feedback.md) |
| 329 | - [Prepare your app for production](prepare-your-app-for-production.md) |
| 330 | |