openai/chatkit-python

Public

mirrored fromhttps://github.com/openai/chatkit-pythonAvailable

CodeCommitsIssuesPull requestsActionsInsightsSecurity
49b49f9bcc3b92f40a314377de2530d99784a48e

Branches

Tags

  • No tags available.
0Branches0Tags
Go to file
Add file
Code

Clone

HTTPS

Download ZIP

docs/quickstart.md

261lines · modepreview

# Quick start

To get a basic ChatKit app running—a React chat UI talking to a Python server—clone and run the starter app:


```sh
git clone https://github.com/openai/openai-chatkit-starter-app.git
cd openai-chatkit-starter-app/chatkit
npm install
npm run dev
```

The sections below explain the core components and steps behind the starter app.

## Render chat UI

!!! note "Quick start uses React"
    This section shows the React integration using `@openai/chatkit-react`.  
    If you’re not using React, you can render ChatKit directly with vanilla JavaScript using `@openai/chatkit`. See the [ChatKit.js quick start](https://openai.github.io/chatkit-js/quickstart/) for details.

Install the React bindings:

```sh
npm install @openai/chatkit-react
```

In your index.html, load ChatKit.js:

```html
<!doctype html>
<html lang="en">
  <head>
    <meta charset="UTF-8" />
    <script src="https://cdn.platform.openai.com/deployments/chatkit/chatkit.js"></script>
  </head>
  <body>
    <div id="root"></div>
  </body>
</html>
```

Wire up a minimal React app. Point `api.url` at your ChatKit server endpoint and pass the domain key you configured there.

```tsx
import {ChatKit, useChatKit} from "@openai/chatkit-react";

export function App() {
  const chatkit = useChatKit({
    api: {
      url: "http://localhost:8000/chatkit",
      domainKey: "local-dev", // domain keys are optional in dev
    },
  });

  return <ChatKit control={chatkit.control} />;
}
```

The chat UI will render, but sending messages will fail until you start the server below and provide a store for threads and messages.

## Run your ChatKit server

Install the ChatKit Python package and expose a single `/chatkit` endpoint that forwards requests to a `ChatKitServer` instance.

```sh
pip install openai-chatkit
```

Create `main.py` with a minimal server that is hard-coded to always reply “Hello, world!”—you'll replace this with an actual call to a model in [Respond to a user message](guides/respond-to-user-message.md).

```python
# Other imports omitted for brevity; see the starter repo for a runnable file with all imports.
from chatkit.server import ChatKitServer

app = FastAPI()


class MyChatKitServer(ChatKitServer[dict]):
    async def respond(
        self,
        thread: ThreadMetadata,
        input_user_message: UserMessageItem | None,
        context: dict,
    ) -> AsyncIterator[ThreadStreamEvent]:
        # Streams a fixed "Hello, world!" assistant message
        yield ThreadItemDoneEvent(
            item=AssistantMessageItem(
                thread_id=thread.id,
                id=self.store.generate_item_id("message", thread, context),
                created_at=datetime.now(),
                content=[AssistantMessageContent(text="Hello, world!")],
            ),
        )

# Create your server by passing a store implementation.
# MyChatKitStore is defined in the next section.
server = MyChatKitServer(store=MyChatKitStore())


@app.post("/chatkit")
async def chatkit(request: Request):
    result = await server.process(await request.body(), context={})
    if isinstance(result, StreamingResult):
        return StreamingResponse(result, media_type="text/event-stream")
    return Response(content=result.json, media_type="application/json")
```

All ChatKit requests go to this single endpoint. Set `api.url` on the React side to match (`/chatkit` here), and `ChatKitServer` routes each request internally.


## Store chat data

ChatKit servers require a store to load and save threads, messages, and other items.

For this quickstart, use a small in-memory store so conversations persist while the process is running, without introducing a database. This keeps the example minimal while still matching real ChatKit behavior.


```python
from collections import defaultdict
from chatkit.store import NotFoundError, Store
from chatkit.types import Attachment, Page, ThreadItem, ThreadMetadata


class MyChatKitStore(Store[dict]):
    def __init__(self):
        self.threads: dict[str, ThreadMetadata] = {}
        self.items: dict[str, list[ThreadItem]] = defaultdict(list)

    async def load_thread(self, thread_id: str, context: dict) -> ThreadMetadata:
        if thread_id not in self.threads:
            raise NotFoundError(f"Thread {thread_id} not found")
        return self.threads[thread_id]

    async def save_thread(self, thread: ThreadMetadata, context: dict) -> None:
        self.threads[thread.id] = thread

    async def load_threads(
        self, limit: int, after: str | None, order: str, context: dict
    ) -> Page[ThreadMetadata]:
        threads = list(self.threads.values())
        return self._paginate(
            threads, after, limit, order, sort_key=lambda t: t.created_at, cursor_key=lambda t: t.id
        )

    async def load_thread_items(
        self, thread_id: str, after: str | None, limit: int, order: str, context: dict
    ) -> Page[ThreadItem]:
        items = self.items.get(thread_id, [])
        return self._paginate(
            items, after, limit, order, sort_key=lambda i: i.created_at, cursor_key=lambda i: i.id
        )

    async def add_thread_item(
        self, thread_id: str, item: ThreadItem, context: dict
    ) -> None:
        self.items[thread_id].append(item)

    async def save_item(
        self, thread_id: str, item: ThreadItem, context: dict
    ) -> None:
        items = self.items[thread_id]
        for idx, existing in enumerate(items):
            if existing.id == item.id:
                items[idx] = item
                return
        items.append(item)

    async def load_item(
        self, thread_id: str, item_id: str, context: dict
    ) -> ThreadItem:
        for item in self.items.get(thread_id, []):
            if item.id == item_id:
                return item
        raise NotFoundError(f"Item {item_id} not found in thread {thread_id}")

    async def delete_thread(self, thread_id: str, context: dict) -> None:
        self.threads.pop(thread_id, None)
        self.items.pop(thread_id, None)

    async def delete_thread_item(
        self, thread_id: str, item_id: str, context: dict
    ) -> None:
        self.items[thread_id] = [
            item for item in self.items.get(thread_id, []) if item.id != item_id
        ]

    def _paginate(self, rows: list, after: str | None, limit: int, order: str, sort_key, cursor_key):
        sorted_rows = sorted(rows, key=sort_key, reverse=order == "desc")
        start = 0
        if after:
            for idx, row in enumerate(sorted_rows):
                if cursor_key(row) == after:
                    start = idx + 1
                    break
        data = sorted_rows[start : start + limit]
        has_more = start + limit < len(sorted_rows)
        next_after = cursor_key(data[-1]) if has_more and data else None
        return Page(data=data, has_more=has_more, after=next_after)

    # Attachments are intentionally not implemented for the quickstart

    async def save_attachment(
        self, attachment: Attachment, context: dict
    ) -> None:
        raise NotImplementedError()

    async def load_attachment(
        self, attachment_id: str, context: dict
    ) -> Attachment:
        raise NotImplementedError()

    async def delete_attachment(self, attachment_id: str, context: dict) -> None:
        raise NotImplementedError()

```

This store implements only the methods required for basic chat while the server is running; persistence across restarts and attachments are intentionally omitted.

For production, replace this with a database-backed store (for example, Postgres or MySQL) so threads and items persist across restarts.


## Generate model responses

Replace the hardcoded "Hello, World!" reply from [Run your ChatKit server](#run-your-chatkit-server) with an Agents SDK call to generate real responses. Set `OPENAI_API_KEY` in your environment before running.

Use ChatKit's Agents SDK helpers to simplify request conversion and streaming. The `simple_to_agent_input` helper translates ChatKit thread items to agent input items, and `stream_agent_response` turns the streamed run into ChatKit events:


```python
from agents import Agent, Runner
from chatkit.agents import AgentContext, simple_to_agent_input, stream_agent_response

assistant = Agent(
    name="assistant",
    instructions="You are a helpful assistant.",
    model="gpt-4.1-mini",
)

class MyChatKitServer(ChatKitServer[dict]):
    async def respond(
        self,
        thread: ThreadMetadata,
        input_user_message: UserMessageItem | None,
        context: dict,
    ) -> AsyncIterator[ThreadStreamEvent]:
        # Convert recent thread items (which includes the user message) to model input
        items_page = await self.store.load_thread_items(
            thread.id,
            after=None,
            limit=20,
            order="asc",
            context=context,
        )
        input_items = await simple_to_agent_input(items_page.data)

        # Stream the run through ChatKit events
        agent_context = AgentContext(thread=thread, store=self.store, request_context=context)
        result = Runner.run_streamed(assistant, input_items, context=agent_context)
        async for event in stream_agent_response(agent_context, result):
            yield event
```