openai/chatkit-python

Public

mirrored fromhttps://github.com/openai/chatkit-pythonAvailable

CodeCommitsIssuesPull requestsActionsInsightsSecurity
d1110ab3a327620fefc20e494131239dde94851f

Branches

Tags

  • No tags available.
0Branches0Tags
Go to file
Add file
Code

Clone

HTTPS

Download ZIP

docs/server.md

524lines · modecode

1# ChatKit server integration
2
3ChatKit's server integration offers a flexible and framework-agnostic approach for building realtime chat experiences. By implementing the `ChatKitServer` base class and its `respond` method, you can configure how your workflow responds to user inputs, from using tools to returning rich display widgets. The ChatKit server integration exposes a single endpoint and supports JSON and server‑sent events (SSE) to stream real-time updates.
4
5## Installation
6
7Install the `openai-chatkit` package with the following command:
8
9```bash
10pip install openai-chatkit
11```
12
13## Defining a server class
14
15The `ChatKitServer` base class is the main building block of the ChatKit server implementation.
16
17The `respond` method is executed each time a user sends a message. It is responsible for providing an answer by streaming a set of events. The `respond` method can return assistant messages, tool status messages, workflows, tasks, and widgets.
18
19ChatKit also provides helpers to implement `respond` using Agents SDK. The main one is `stream_agent_response`, which converts a streamed Agents SDK run into ChatKit events.
20
21If you've enabled model or tool options in the composer, they'll appear in `respond` under `input_user_message.inference_options`. Your integration is responsible for handling these values when performing inference.
22
23Example server implementation that calls the Agent SDK runner and streams the result to the ChatKit UI:
24
25```python
26class MyChatKitServer(ChatKitServer):
27 def __init__(
28 self, data_store: Store, attachment_store: AttachmentStore | None = None
29 ):
30 super().__init__(data_store, attachment_store)
31
32 assistant_agent = Agent[AgentContext](
33 model="gpt-4.1",
34 name="Assistant",
35 instructions="You are a helpful assistant"
36 )
37
38 async def respond(
39 self,
40 thread: ThreadMetadata,
41 input: UserMessageItem | None,
42 context: Any,
43 ) -> AsyncIterator[ThreadStreamEvent]:
44 context = AgentContext(
45 thread=thread,
46 store=self.store,
47 request_context=context,
48 )
49 result = Runner.run_streamed(
50 self.assistant_agent,
51 await simple_to_agent_input(input) if input else [],
52 context=context,
53 )
54 async for event in stream_agent_response(
55 context,
56 result,
57 ):
58 yield event
59
60 # ...
61```
62
63## Setting up the endpoint
64
65ChatKit is server-agnostic. All communication happens through a single POST endpoint that returns either JSON directly or streams SSE JSON events.
66
67You are responsible for defining the endpoint using the web server framework of your choice.
68
69Example using ChatKit with FastAPI:
70
71```python
72app = FastAPI()
73data_store = PostgresStore()
74attachment_store = BlobStorageStore(data_store)
75server = MyChatKitServer(data_store, attachment_store)
76
77@app.post("/chatkit")
78async def chatkit_endpoint(request: Request):
79 result = await server.process(await request.body(), {})
80 if isinstance(result, StreamingResult):
81 return StreamingResponse(result, media_type="text/event-stream")
82 else:
83 return Response(content=result.json, media_type="application/json")
84```
85
86## Data store
87
88ChatKit needs to store information about threads, messages, and attachments. The examples above use a provided development-only data store implementation using SQLite (`SQLiteStore`).
89
90You are responsible for implementing the `chatkit.store.Store` class using the data store of your choice. When implementing the store, you must allow for the Thread/Attachment/ThreadItem type shapes changing between library versions. The recommended approach for relational databases is to serialize models into JSON-typed columns instead of separating model fields across multiple columns.
91
92```python
93class Store(ABC, Generic[TContext]):
94 def generate_thread_id(self, context: TContext) -> str: ...
95
96 def generate_item_id(
97 self,
98 item_type: Literal["message", "tool_call", "task", "workflow", "attachment"],
99 thread: ThreadMetadata,
100 context: TContext,
101 ) -> str: ...
102
103 async def load_thread(self, thread_id: str, context: TContext) -> ThreadMetadata: ...
104
105 async def save_thread(self, thread: ThreadMetadata, context: TContext) -> None: ...
106
107 async def load_thread_items(
108 self,
109 thread_id: str,
110 after: str | None,
111 limit: int,
112 order: str,
113 context: TContext,
114 ) -> Page[ThreadItem]: ...
115
116 async def save_attachment(self, attachment: Attachment, context: TContext) -> None: ...
117
118 async def load_attachment(self, attachment_id: str, context: TContext) -> Attachment: ...
119
120 async def delete_attachment(self, attachment_id: str, context: TContext) -> None: ...
121
122 async def load_threads(
123 self,
124 limit: int,
125 after: str | None,
126 order: str,
127 context: TContext,
128 ) -> Page[ThreadMetadata]: ...
129
130 async def add_thread_item(
131 self, thread_id: str, item: ThreadItem, context: TContext
132 ) -> None: ...
133
134 async def save_item(self, thread_id: str, item: ThreadItem, context: TContext) -> None: ...
135
136 async def load_item(self, thread_id: str, item_id: str, context: TContext) -> ThreadItem: ...
137
138 async def delete_thread(self, thread_id: str, context: TContext) -> None: ...
139```
140
141The default implementation prefixes identifiers (for example `msg_4f62d6a7f2c34bd084f57cfb3df9f6bd`) using UUID4 strings. Override `generate_thread_id` and/or `generate_item_id` if your
142integration needs deterministic or pre-allocated identifiers; they will be used whenever ChatKit needs to create a new thread id or a new thread item id.
143
144## Attachment store
145
146Users can upload attachments (files and images) to include with chat messages. You are responsible for providing a storage implementation and handling uploads. The `attachment_store` argument to `ChatKitServer` should implement the `AttachmentStore` interface. If not provided, operations on attachments will raise an error.
147
148ChatKit supports both direct uploads and two‑phase upload, configurable client-side via `ChatKitOptions.composer.attachments.uploadStrategy`.
149
150### Access control
151
152Attachment metadata and file bytes are not protected by ChatKit. Each `AttachmentStore` method receives your request context so you can enforce thread- and user-level authorization before handing out attachment IDs, bytes, or signed URLs. Deny access when the caller does not own the attachment, and generate download URLs that expire quickly. Skipping these checks can leak customer data.
153
154### Direct upload
155
156The direct upload URL is provided client-side as a create option.
157
158The client will POST `multipart/form-data` with a `file` field to that URL. The server should:
159
1601. persist the attachment metadata (`FileAttachment | ImageAttachment`) to the data store and the file bytes to your storage.
1612. respond with JSON representation of `FileAttachment | ImageAttachment`.
162
163### Two‑phase upload
164
165- **Phase 1 (registration and upload URL provisioning)**: The client calls `attachments.create`. ChatKit persists a `FileAttachment | ImageAttachment` sets the `upload_url` and returns it. It's recommended to include the `id` of the `Attachment` in the `upload_url` so that you can associate the file bytes with the `Attachment`.
166- **Phase 2 (upload)**: The client POSTs the bytes to the returned `upload_url` with `multipart/form-data` field `file`.
167
168### Previews
169
170To render thumbnails of an image attached to a user message, set `ImageAttachment.preview_url` to a renderable URL. If you need expiring URLs, do not persist the URL; generate it on demand when returning the attachment to the client.
171
172### AttachmentStore interface
173
174You implement the storage specifics by providing the `AttachmentStore` methods:
175
176```python
177class AttachmentStore(ABC, Generic[TContext]):
178 async def delete_attachment(self, attachment_id: str, context: TContext) -> None: ...
179 async def create_attachment(self, input: AttachmentCreateParams, context: TContext) -> Attachment: ...
180 def generate_attachment_id(self, mime_type: str, context: TContext) -> str: ...
181```
182
183Note: The store does not have to persist bytes itself. It can act as a proxy that issues signed URLs for upload and preview (e.g., S3/GCS/Azure), while your separate upload endpoint writes to object storage.
184
185### Attaching files to Agent SDK inputs
186
187You are also responsible for deciding how to attach attachments to Agent SDK inputs. You can store files in your own storage and attach them as base64-encoded payloads or upload them to the OpenAI Files API and provide the file ID to the Agent SDK.
188
189The example below shows how to create base64-encoded payloads for attachments by customizing a `ThreadItemConverter`. The helper `read_attachment_bytes` stands in for whatever storage accessor you provide (for example, fetching from S3 or a database) because `AttachmentStore` only handles ChatKit protocol calls.
190
191```python
192async def read_attachment_bytes(attachment_id: str) -> bytes:
193 """Replace with your blob-store fetch (S3, local disk, etc.)."""
194 ...
195
196
197class MyConverter(ThreadItemConverter):
198 async def attachment_to_message_content(
199 self, input: Attachment
200 ) -> ResponseInputContentParam:
201 content = await read_attachment_bytes(input.id)
202 data = (
203 "data:"
204 + str(input.mime_type)
205 + ";base64,"
206 + base64.b64encode(content).decode("utf-8")
207 )
208 if isinstance(input, ImageAttachment):
209 return ResponseInputImageParam(
210 type="input_image",
211 detail="auto",
212 image_url=data,
213 )
214 # Note: Agents SDK currently only supports pdf files as ResponseInputFileParam.
215 # To send other text file types, either convert them to pdf on the fly or
216 # add them as input text.
217 return ResponseInputFileParam(
218 type="input_file",
219 file_data=data,
220 filename=input.name or "unknown",
221 )
222
223# In respond(...):
224result = Runner.run_streamed(
225 assistant_agent,
226 await MyConverter().to_agent_input(input),
227 context=context,
228)
229```
230
231## Client tools usage
232
233The ChatKit server implementation can trigger client-side tools.
234
235The tool must be registered both when initializing ChatKit on the client and when setting up Agents SDK on the server.
236
237To trigger a client-side tool from Agents SDK, set `ctx.context.client_tool_call` in the tool implementation with the client-side tool name and arguments. The result of the client tool execution will be provided back to the model.
238
239**Note:** The agent behavior must be set to `tool_use_behavior=StopAtTools` with all client-side tools included in `stop_at_tool_names`. This causes the agent to stop generating new messages until the client tool call is acknowledged by the ChatKit UI.
240
241**Note:** Only one client tool call can be triggered per turn.
242
243**Note:** Client tools are client-side callbacks invoked by the agent during server-side inference. If you're interested in client-side callbacks triggered by a user interacting with a widget, refer to [client actions](actions.md/#client).
244
245```python
246@function_tool(description_override="Add an item to the user's todo list.")
247async def add_to_todo_list(ctx: RunContextWrapper[AgentContext], item: str) -> None:
248 ctx.context.client_tool_call = ClientToolCall(
249 name="add_to_todo_list",
250 arguments={"item": item},
251 )
252
253assistant_agent = Agent[AgentContext](
254 model="gpt-4.1",
255 name="Assistant",
256 instructions="You are a helpful assistant",
257 tools=[add_to_todo_list],
258 tool_use_behavior=StopAtTools(stop_at_tool_names=[add_to_todo_list.name]),
259)
260```
261
262## Agents SDK integration
263
264The ChatKit server is independent of Agents SDK. As long as correct events are returned from the `respond` method, the ChatKit UI will display the conversation as expected.
265
266The ChatKit library provides helpers to integrate with Agents SDK:
267
268- `AgentContext` - The context type that should be used when calling Agents SDK. It provides helpers to stream events from tool calls, render widgets, and initiate client tool calls.
269- `stream_agent_response` - A helper to convert a streamed Agents SDK run into ChatKit events.
270- `ThreadItemConverter` - A helper class that you'll probably extend to convert ChatKit thread items to Agents SDK input items.
271- `simple_to_agent_input` - A helper function that uses the default thread item conversions. The default conversion is limited, but useful for getting started quickly.
272
273```python
274async def respond([]
275 self,
276 thread: ThreadMetadata,
277 input: UserMessageItem | None,
278 context: Any,
279) -> AsyncIterator[ThreadStreamEvent]:
280 context = AgentContext(
281 thread=thread,
282 store=self.store,
283 request_context=context,
284 )
285
286 result = Runner.run_streamed(
287 self.assistant_agent,
288 await simple_to_agent_input(input) if input else [],
289 context=context,
290 )
291
292 async for event in stream_agent_response(context, result):
293 yield event
294```
295
296### ThreadItemConverter
297
298Extend `ThreadItemConverter` when your integration supports:
299
300- Attachments
301- @-mentions (entity tagging)
302- `HiddenContextItem`
303- Custom thread item formats
304
305```python
306from agents import Message, Runner, ResponseInputTextParam
307from chatkit.agents import AgentContext, ThreadItemConverter, stream_agent_response
308from chatkit.types import Attachment, HiddenContextItem, ThreadMetadata, UserMessageItem
309
310
311class MyThreadConverter(ThreadItemConverter):
312 async def attachment_to_message_content(
313 self, attachment: Attachment
314 ) -> ResponseInputTextParam:
315 content = await attachment_store.get_attachment_contents(attachment.id)
316 data_url = "data:%s;base64,%s" % (mime, base64.b64encode(raw).decode("utf-8"))
317 if isinstance(attachment, ImageAttachment):
318 return ResponseInputImageParam(
319 type="input_image",
320 detail="auto",
321 image_url=data_url,
322 )
323
324 # ..handle other attachment types
325
326 async def hidden_context_to_input(self, item: HiddenContextItem) -> Message:
327 return Message(
328 type="message",
329 role="system",
330 content=[
331 ResponseInputTextParam(
332 type="input_text",
333 text=f"<HIDDEN_CONTEXT>{item.content}</HIDDEN_CONTEXT>",
334 )
335 ],
336 )
337
338 async def tag_to_message_content(self, tag: UserMessageTagContent):
339 tag_context = await retrieve_context_for_tag(tag.id)
340 return ResponseInputTextParam(
341 type="input_text",
342 text=f"<TAG>Name:{tag.data.name}\nType:{tag.data.type}\nDetails:{tag_context}</TAG>"
343 )
344
345 # ..handle other @-mentions
346
347 # ..override defaults for other methods
348
349```
350
351## Widgets
352
353Widgets are rich UI components that can be displayed in chat. You can return a widget either directly from the `respond` method (if you want to do so unconditionally) or from a tool call triggered by the model.
354
355Example of a widget returned directly from the `respond` method:
356
357```python
358async def respond(
359 self,
360 thread: ThreadMetadata,
361 input: UserMessageItem | None,
362 context: Any,
363 ) -> AsyncIterator[ThreadStreamEvent]:
364 widget = Text(
365 id="description",
366 value="Text widget",
367 )
368
369 async for event in stream_widget(
370 thread,
371 widget,
372 generate_id=lambda item_type: self.store.generate_item_id(
373 item_type, thread, context
374 ),
375 ):
376 yield event
377```
378
379Example of a widget returned from a tool call:
380
381```python
382@function_tool(description_override="Display a sample widget to the user.")
383async def sample_widget(ctx: RunContextWrapper[AgentContext]) -> None:
384 widget = Text(
385 id="description",
386 value="Text widget",
387 )
388
389 await ctx.context.stream_widget(widget)
390```
391
392The examples above return a fully completed static widget. You can also stream an updating widget by yielding new versions of the widget from a generator function. The ChatKit framework will send updates for the parts of the widget that have changed.
393
394**Note:** Currently, only `<Text>` and `<Markdown>` components marked with an `id` have their text updates streamed.
395
396```python
397async def sample_widget(ctx: RunContextWrapper[AgentContext]) -> None:
398 description_text = Runner.run_streamed(
399 email_generator, "ChatKit is the best thing ever"
400 )
401
402 async def widget_generator() -> AsyncGenerator[Widget, None]:
403 text_widget_updates = accumulate_text(
404 description_text.stream_events(),
405 Text(
406 id="description",
407 value="",
408 streaming=True
409 ),
410 )
411
412 async for text_widget in text_widget_updates:
413 yield Card(
414 children=[text_widget]
415 )
416
417 await ctx.context.stream_widget(widget_generator())
418```
419
420In the example above, the `accumulate_text` function is used to stream the results of an Agents SDK run into a `Text` widget.
421
422### Defining a widget
423
424You may find it easier to write widgets in JSON. To you can parse JSON widgets to `WidgetRoot` instances for your server to stream:
425
426```python
427try:
428 WidgetRoot.model_validate_json(WIDGET_JSON_STRING)
429except ValidationError:
430 # handle invalid json
431```
432
433### Widget reference and examples
434
435See full reference of components, props, and examples in [widgets.md ➡️](./widgets.md).
436
437## Thread metadata
438
439ChatKit provides a way to store arbitrary information associated with a thread. This information is not sent to the UI.
440
441One use case for the metadata is to preserve the [`previous_response_id`](https://platform.openai.com/docs/api-reference/responses/create#responses-create-previous_response_id) and avoid having to re-send all items for an Agent SDK run.
442
443```python
444previous_response_id = thread.metadata.get("previous_response_id")
445
446# Run the Agent SDK run with the previous response id
447result = Runner.run_streamed(
448 agent,
449 input=...,
450 previous_response_id=previous_response_id,
451)
452
453# Save the previous response id for the next run
454thread.metadata["previous_response_id"] = result.response_id
455```
456
457## Automatic thread titles
458
459ChatKit does not automatically title threads, but you can easily implement your own logic to do so.
460
461First, decide when to trigger the thread title update. A simple approach might be to set the thread title the first time a user sends a message.
462
463```python
464from chatkit.agents import simple_to_agent_input
465
466async def maybe_update_thread_title(
467 self,
468 thread: ThreadMetadata,
469 input_item: UserMessageItem,
470) -> None:
471 if thread.title is not None:
472 return
473 agent_input = await simple_to_agent_input(input_item)
474 run = await Runner.run(title_agent, input=agent_input)
475 thread.title = run.final_output
476
477async def respond(
478 self,
479 thread: ThreadMetadata,
480 input: UserMessageItem | None,
481 context: Any,
482) -> AsyncIterator[ThreadStreamEvent]:
483 if input is not None:
484 asyncio.create_task(self.maybe_update_thread_title(thread, input))
485
486 # Generate the model response
487 ...
488```
489
490## Progress updates
491
492If your server-side tool takes a while to run, you can use the progress update event to display the progress to the user.
493
494```python
495@function_tool()
496async def long_running_tool(ctx: RunContextWrapper[AgentContext]) -> str:
497 await ctx.context.stream(
498 ProgressUpdateEvent(text="Loading a user profile...")
499 )
500
501 await asyncio.sleep(1)
502```
503
504The progress update will be automatically replaced by the next assistant message, widget, or another progress update.
505
506## Server context
507
508Sometimes it's useful to pass additional information (like `userId`) to the ChatKit server implementation. The `ChatKitServer.process` method accepts a `context` parameter that it passes to the `respond` method and all data store and file store methods.
509
510```python
511class MyChatKitServer(ChatKitServer):
512 async def respond(..., context) -> AsyncIterator[ThreadStreamEvent]:
513 # consume context["userId"]
514
515server.process(..., context={"userId": "user_123"})
516```
517
518Server context may be used to implement permission checks in AttachmentStore and Store.
519
520```python
521class MyChatKitServer(ChatKitServer):
522 async def load_attachment(..., context) -> Attachment:
523 # check context["userId"] has access to the file
524```
525