openai/chatkit-python

Public

mirrored fromhttps://github.com/openai/chatkit-pythonAvailable

CodeCommitsIssuesPull requestsActionsInsightsSecurity
dacc133c280b39b9334d06ea73f0f1c199e59927

Branches

Tags

  • No tags available.
0Branches0Tags
Go to file
Add file
Code

Clone

HTTPS

Download ZIP

chatkit/agents.py

1267lines · modecode

1import asyncio
2import json
3from collections import defaultdict
4from collections.abc import AsyncIterator
5from datetime import datetime
6from inspect import cleandoc
7from typing import (
8 Annotated,
9 Any,
10 AsyncGenerator,
11 Generic,
12 Sequence,
13 TypeVar,
14 cast,
15)
16
17from agents import (
18 InputGuardrailTripwireTriggered,
19 OutputGuardrailTripwireTriggered,
20 RunResultStreaming,
21 StreamEvent,
22 TResponseInputItem,
23)
24from openai.types.responses import (
25 EasyInputMessageParam,
26 ResponseFunctionToolCall,
27 ResponseFunctionToolCallParam,
28 ResponseInputContentParam,
29 ResponseInputImageParam,
30 ResponseInputMessageContentListParam,
31 ResponseInputTextParam,
32 ResponseOutputText,
33)
34from openai.types.responses.response_input_item_param import (
35 FunctionCallOutput,
36 Message,
37)
38from openai.types.responses.response_output_message import Content
39from openai.types.responses.response_output_text import (
40 Annotation as ResponsesAnnotation,
41)
42from openai.types.responses.response_output_text import (
43 AnnotationContainerFileCitation,
44 AnnotationFileCitation,
45 AnnotationURLCitation,
46)
47from pydantic import BaseModel, ConfigDict, SkipValidation, TypeAdapter
48from typing_extensions import assert_never
49
50from .server import stream_widget
51from .store import Store, StoreItemType
52from .types import (
53 Annotation,
54 AssistantMessageContent,
55 AssistantMessageContentPartAdded,
56 AssistantMessageContentPartAnnotationAdded,
57 AssistantMessageContentPartDone,
58 AssistantMessageContentPartTextDelta,
59 AssistantMessageItem,
60 Attachment,
61 ClientToolCallItem,
62 DurationSummary,
63 EndOfTurnItem,
64 FileSource,
65 GeneratedImage,
66 GeneratedImageItem,
67 GeneratedImageUpdated,
68 HiddenContextItem,
69 SDKHiddenContextItem,
70 StructuredInputItem,
71 Task,
72 TaskItem,
73 ThoughtTask,
74 ThreadItem,
75 ThreadItemAddedEvent,
76 ThreadItemDoneEvent,
77 ThreadItemRemovedEvent,
78 ThreadItemUpdatedEvent,
79 ThreadMetadata,
80 ThreadStreamEvent,
81 URLSource,
82 UserMessageItem,
83 UserMessageTagContent,
84 UserMessageTextContent,
85 WidgetItem,
86 Workflow,
87 WorkflowItem,
88 WorkflowSummary,
89 WorkflowTaskAdded,
90 WorkflowTaskUpdated,
91)
92from .widgets import Markdown, Text, WidgetRoot
93
94
95class ClientToolCall(BaseModel):
96 """
97 Returned from tool methods to indicate a client-side tool call.
98 """
99
100 name: str
101 arguments: dict[str, Any]
102
103
104class _QueueCompleteSentinel: ...
105
106
107TContext = TypeVar("TContext")
108
109
110class AgentContext(BaseModel, Generic[TContext]):
111 model_config = ConfigDict(arbitrary_types_allowed=True)
112
113 thread: ThreadMetadata
114 store: Annotated[Store[TContext], SkipValidation]
115 request_context: TContext
116 previous_response_id: str | None = None
117 client_tool_call: ClientToolCall | None = None
118 workflow_item: WorkflowItem | None = None
119 generated_image_item: GeneratedImageItem | None = None
120 _events: asyncio.Queue[ThreadStreamEvent | _QueueCompleteSentinel] = asyncio.Queue()
121
122 def generate_id(
123 self, type: StoreItemType, thread: ThreadMetadata | None = None
124 ) -> str:
125 """Generate a new store-backed id for the given item type."""
126 if type == "thread":
127 return self.store.generate_thread_id(self.request_context)
128 return self.store.generate_item_id(
129 type, thread or self.thread, self.request_context
130 )
131
132 async def stream_widget(
133 self,
134 widget: WidgetRoot | AsyncGenerator[WidgetRoot, None],
135 copy_text: str | None = None,
136 ) -> None:
137 """Stream a widget into the thread by enqueueing widget events."""
138 async for event in stream_widget(
139 self.thread,
140 widget,
141 copy_text,
142 lambda item_type: self.store.generate_item_id(
143 item_type, self.thread, self.request_context
144 ),
145 ):
146 await self._events.put(event)
147
148 async def end_workflow(
149 self, summary: WorkflowSummary | None = None, expanded: bool = False
150 ) -> None:
151 """Finalize the active workflow item, optionally attaching a summary."""
152 if not self.workflow_item:
153 # No workflow to end
154 return
155
156 if summary is not None:
157 self.workflow_item.workflow.summary = summary
158 elif self.workflow_item.workflow.summary is None:
159 # If no summary was set or provided, set a basic work summary
160 delta = datetime.now() - self.workflow_item.created_at
161 duration = int(delta.total_seconds())
162 self.workflow_item.workflow.summary = DurationSummary(duration=duration)
163 self.workflow_item.workflow.expanded = expanded
164 await self.stream(ThreadItemDoneEvent(item=self.workflow_item))
165 self.workflow_item = None
166
167 async def start_workflow(self, workflow: Workflow) -> None:
168 """Begin streaming a new workflow item."""
169 self.workflow_item = WorkflowItem(
170 id=self.generate_id("workflow"),
171 created_at=datetime.now(),
172 workflow=workflow,
173 thread_id=self.thread.id,
174 )
175
176 if workflow.type != "reasoning" and len(workflow.tasks) == 0:
177 # Defer sending added event until we have tasks
178 return
179
180 await self.stream(ThreadItemAddedEvent(item=self.workflow_item))
181
182 async def update_workflow_task(self, task: Task, task_index: int) -> None:
183 """Update an existing workflow task and stream the delta."""
184 if self.workflow_item is None:
185 raise ValueError("Workflow is not set")
186 # ensure reference is updated in case task is a copy
187 self.workflow_item.workflow.tasks[task_index] = task
188 await self.stream(
189 ThreadItemUpdatedEvent(
190 item_id=self.workflow_item.id,
191 update=WorkflowTaskUpdated(
192 task=task,
193 task_index=task_index,
194 ),
195 )
196 )
197
198 async def add_workflow_task(self, task: Task) -> None:
199 """Append a workflow task and stream the appropriate event."""
200 self.workflow_item = self.workflow_item or WorkflowItem(
201 id=self.generate_id("workflow"),
202 created_at=datetime.now(),
203 workflow=Workflow(type="custom", tasks=[]),
204 thread_id=self.thread.id,
205 )
206 workflow = self.workflow_item.workflow
207 workflow.tasks.append(task)
208
209 if workflow.type != "reasoning" and len(workflow.tasks) == 1:
210 await self.stream(ThreadItemAddedEvent(item=self.workflow_item))
211 else:
212 await self.stream(
213 ThreadItemUpdatedEvent(
214 item_id=self.workflow_item.id,
215 update=WorkflowTaskAdded(
216 task=task,
217 task_index=workflow.tasks.index(task),
218 ),
219 )
220 )
221
222 async def stream(self, event: ThreadStreamEvent) -> None:
223 """Enqueue a ThreadStreamEvent for downstream processing."""
224 await self._events.put(event)
225
226 def _complete(self):
227 self._events.put_nowait(_QueueCompleteSentinel())
228
229
230T1 = TypeVar("T1")
231T2 = TypeVar("T2")
232
233
234async def _merge_generators(
235 a: AsyncIterator[T1],
236 b: AsyncIterator[T2],
237) -> AsyncIterator[T1 | T2]:
238 pending: list[AsyncIterator[T1 | T2]] = [a, b]
239 pending_tasks: dict[asyncio.Task, AsyncIterator[T1 | T2]] = {
240 asyncio.ensure_future(g.__anext__()): g for g in pending
241 }
242 while len(pending_tasks) > 0:
243 done, _ = await asyncio.wait(
244 pending_tasks.keys(), return_when="FIRST_COMPLETED"
245 )
246 stop = False
247 for d in done:
248 try:
249 result = d.result()
250 yield result
251 dg = pending_tasks[d]
252 pending_tasks[asyncio.ensure_future(dg.__anext__())] = dg
253 except StopAsyncIteration:
254 stop = True
255 finally:
256 del pending_tasks[d]
257 if stop:
258 for task in pending_tasks.keys():
259 if not task.cancel():
260 try:
261 yield task.result()
262 except asyncio.CancelledError:
263 pass
264 except asyncio.InvalidStateError:
265 pass
266 break
267
268
269class _EventWrapper:
270 def __init__(self, event: ThreadStreamEvent):
271 self.event = event
272
273
274class _AsyncQueueIterator(AsyncIterator[_EventWrapper]):
275 def __init__(
276 self, queue: asyncio.Queue[ThreadStreamEvent | _QueueCompleteSentinel]
277 ):
278 self.queue = queue
279 self.completed = False
280
281 def __aiter__(self):
282 return self
283
284 async def __anext__(self):
285 if self.completed:
286 raise StopAsyncIteration
287
288 item = await self.queue.get()
289 if isinstance(item, _QueueCompleteSentinel):
290 self.completed = True
291 raise StopAsyncIteration
292 return _EventWrapper(item)
293
294 def drain_and_complete(self) -> None:
295 """Empty the underlying queue without awaiting and mark this iterator completed.
296
297 This is intended for cleanup paths where we must guarantee no awaits
298 occur. All queued items, including any completion sentinel, are
299 discarded.
300 """
301 while True:
302 try:
303 self.queue.get_nowait()
304 except asyncio.QueueEmpty:
305 break
306 self.completed = True
307
308
309class StreamingThoughtTracker(BaseModel):
310 item_id: str
311 index: int
312 task: ThoughtTask
313
314
315class ResponseStreamConverter:
316 """Used by `stream_agent_response` to convert streamed Agents SDK output
317 into values used by ChatKit thread items and thread stream events.
318
319 Defines overridable methods for adapting streamed data (such as image
320 generation results and partial updates) into the forms expected by ChatKit.
321 """
322
323 partial_images: int | None = None
324 """
325 The expected number of partial image updates for an image generation result.
326
327 When set, this value is used to normalize partial image indices into a
328 progress value in the range [0, 1]. If unset, all partial image updates are
329 assigned a progress value of 0.
330 """
331
332 def __init__(self, *, partial_images: int | None = None):
333 """
334 Args:
335 partial_images: The expected number of partial image updates for image
336 generation results, or None if no progress normalization should
337 be performed.
338 """
339 self.partial_images = partial_images
340
341 async def base64_image_to_url(
342 self,
343 image_id: str,
344 base64_image: str,
345 partial_image_index: int | None = None,
346 ) -> str:
347 """
348 Convert a base64-encoded image into a URL.
349
350 This method is used to produce the URL stored on thread items for image
351 generation results.
352
353 Args:
354 image_id: The ID of the image generation call. This stays stable across partial image updates.
355 base64_image: The base64-encoded image.
356 partial_image_index: The index of the partial image update, starting from 0.
357
358 Returns:
359 A URL string.
360 """
361 return f"data:image/png;base64,{base64_image}"
362
363 def partial_image_index_to_progress(self, partial_image_index: int) -> float:
364 """
365 Convert a partial image index into a normalized progress value.
366
367 Args:
368 partial_image_index: The index of the partial image update, starting from 0.
369
370 Returns:
371 A float between 0 and 1 representing progress for the image
372 generation result.
373 """
374 if self.partial_images is None or self.partial_images <= 0:
375 return 0.0
376
377 return min(1.0, partial_image_index / self.partial_images)
378
379 async def file_citation_to_annotation(
380 self, file_citation: AnnotationFileCitation
381 ) -> Annotation | None:
382 """Convert a Responses API file citation into an assistant message annotation."""
383 filename = file_citation.filename
384 if not filename:
385 return None
386
387 return Annotation(
388 source=FileSource(filename=filename, title=filename),
389 index=file_citation.index,
390 )
391
392 async def container_file_citation_to_annotation(
393 self, container_file_citation: AnnotationContainerFileCitation
394 ) -> Annotation | None:
395 """Convert a Responses API container file citation into an assistant message annotation."""
396 filename = container_file_citation.filename
397 if not filename:
398 return None
399
400 return Annotation(
401 source=FileSource(filename=filename, title=filename),
402 index=container_file_citation.end_index,
403 )
404
405 async def url_citation_to_annotation(
406 self, url_citation: AnnotationURLCitation
407 ) -> Annotation | None:
408 """Convert a Responses API URL citation into an assistant message annotation."""
409 return Annotation(
410 source=URLSource(url=url_citation.url, title=url_citation.title),
411 index=url_citation.end_index,
412 )
413
414
415_DEFAULT_RESPONSE_STREAM_CONVERTER = ResponseStreamConverter()
416
417
418async def _convert_content(
419 content: Content, converter: ResponseStreamConverter
420) -> AssistantMessageContent:
421 if content.type == "output_text":
422 annotations = [
423 await _convert_annotation(annotation, converter)
424 for annotation in content.annotations
425 ]
426 annotations = [a for a in annotations if a is not None]
427 return AssistantMessageContent(
428 text=content.text,
429 annotations=annotations,
430 )
431 else:
432 return AssistantMessageContent(
433 text=content.refusal,
434 annotations=[],
435 )
436
437
438async def _convert_annotation(
439 raw_annotation: object, converter: ResponseStreamConverter
440) -> Annotation | None:
441 # There is a bug in the OpenAPI client that sometimes parses the annotation delta event into the wrong class
442 # resulting into annotation being a dict or untyped object instead instead of a ResponsesAnnotation
443 annotation = TypeAdapter[ResponsesAnnotation](ResponsesAnnotation).validate_python(
444 raw_annotation
445 )
446
447 if annotation.type == "file_citation":
448 return await converter.file_citation_to_annotation(annotation)
449
450 if annotation.type == "url_citation":
451 return await converter.url_citation_to_annotation(annotation)
452
453 if annotation.type == "container_file_citation":
454 return await converter.container_file_citation_to_annotation(annotation)
455
456 return None
457
458
459def _function_call_metadata(raw_item: object) -> tuple[str | None, str | None]:
460 if isinstance(raw_item, dict):
461 if raw_item.get("type") != "function_call":
462 return None, None
463
464 call_id = raw_item.get("call_id")
465 item_id = raw_item.get("id")
466 return (
467 call_id if isinstance(call_id, str) else None,
468 item_id if isinstance(item_id, str) else None,
469 )
470
471 if isinstance(raw_item, ResponseFunctionToolCall):
472 return raw_item.call_id, raw_item.id
473
474 return None, None
475
476
477async def stream_agent_response(
478 context: AgentContext,
479 result: RunResultStreaming,
480 *,
481 converter: ResponseStreamConverter = _DEFAULT_RESPONSE_STREAM_CONVERTER,
482) -> AsyncIterator[ThreadStreamEvent]:
483 """
484 Convert a streamed Agents SDK run into ChatKit thread stream events.
485
486 This function consumes a streaming run result and yields `ThreadStreamEvent`
487 objects as the run progresses.
488
489 Args:
490 context: The AgentContext to use for the stream.
491 result: The RunResultStreaming to convert.
492 converter: Defines overridable methods for adapting streamed data (such as image
493 generation results and partial updates) into the forms expected by ChatKit.
494
495 Returns:
496 An async iterator that yields thread stream events representing the run result.
497 """
498 current_item_id = None
499 current_tool_call = None
500 ctx = context
501 thread = context.thread
502 queue_iterator = _AsyncQueueIterator(context._events)
503 produced_items = set()
504 streaming_thought: None | StreamingThoughtTracker = None
505 # item_id -> content_index -> annotation count
506 item_annotation_count: defaultdict[str, defaultdict[int, int]] = defaultdict(
507 lambda: defaultdict(int)
508 )
509
510 # check if the last item in the thread was a workflow or a client tool call
511 # if it was a client tool call, check if the second last item was a workflow
512 # if either was, continue the workflow
513 items = await context.store.load_thread_items(
514 thread.id, None, 2, "desc", context.request_context
515 )
516 last_item = items.data[0] if len(items.data) > 0 else None
517 second_last_item = items.data[1] if len(items.data) > 1 else None
518
519 if last_item and last_item.type == "workflow":
520 ctx.workflow_item = last_item
521 elif (
522 last_item
523 and last_item.type == "client_tool_call"
524 and second_last_item
525 and second_last_item.type == "workflow"
526 ):
527 ctx.workflow_item = second_last_item
528
529 def end_workflow(item: WorkflowItem):
530 if item == ctx.workflow_item:
531 ctx.workflow_item = None
532 delta = datetime.now() - item.created_at
533 duration = int(delta.total_seconds())
534 if item.workflow.summary is None:
535 item.workflow.summary = DurationSummary(duration=duration)
536 # Default to closing all workflows
537 # To keep a workflow open on completion, close it explicitly with
538 # AgentContext.end_workflow(expanded=True)
539 item.workflow.expanded = False
540 return ThreadItemDoneEvent(item=item)
541
542 try:
543 async for event in _merge_generators(result.stream_events(), queue_iterator):
544 # Events emitted from agent context helpers
545 if isinstance(event, _EventWrapper):
546 event = event.event
547 if (
548 event.type == "thread.item.added"
549 or event.type == "thread.item.done"
550 ):
551 # End the current workflow if visual item is added after it
552 if (
553 ctx.workflow_item
554 and ctx.workflow_item.id != event.item.id
555 and event.item.type != "client_tool_call"
556 and event.item.type != "hidden_context_item"
557 ):
558 yield end_workflow(ctx.workflow_item)
559
560 # track the current workflow if one is added
561 if (
562 event.type == "thread.item.added"
563 and event.item.type == "workflow"
564 ):
565 ctx.workflow_item = event.item
566
567 # track integration produced items so we can clean them up if
568 # there is a guardrail tripwire
569 produced_items.add(event.item.id)
570 yield event
571 continue
572
573 if event.type == "run_item_stream_event":
574 event = event.item
575 if event.type == "tool_call_item":
576 current_tool_call, current_item_id = _function_call_metadata(
577 event.raw_item
578 )
579 if current_tool_call:
580 assert current_item_id
581 produced_items.add(current_item_id)
582 continue
583
584 if event.type != "raw_response_event":
585 # Ignore everything else that isn't a raw response event
586 continue
587
588 # Handle Responses events
589 event = event.data
590 if event.type == "response.content_part.added":
591 if event.part.type == "reasoning_text":
592 continue
593 content = await _convert_content(event.part, converter)
594 yield ThreadItemUpdatedEvent(
595 item_id=event.item_id,
596 update=AssistantMessageContentPartAdded(
597 content_index=event.content_index,
598 content=content,
599 ),
600 )
601 elif event.type == "response.output_text.delta":
602 yield ThreadItemUpdatedEvent(
603 item_id=event.item_id,
604 update=AssistantMessageContentPartTextDelta(
605 content_index=event.content_index,
606 delta=event.delta,
607 ),
608 )
609 elif event.type == "response.output_text.done":
610 yield ThreadItemUpdatedEvent(
611 item_id=event.item_id,
612 update=AssistantMessageContentPartDone(
613 content_index=event.content_index,
614 content=AssistantMessageContent(
615 text=event.text,
616 annotations=[],
617 ),
618 ),
619 )
620 elif event.type == "response.output_text.annotation.added":
621 annotation = await _convert_annotation(event.annotation, converter)
622 if annotation:
623 # Manually track annotation indices per content part in case we drop an annotation that
624 # we can't convert to our internal representation (e.g. missing filename).
625 annotation_index = item_annotation_count[event.item_id][
626 event.content_index
627 ]
628 item_annotation_count[event.item_id][event.content_index] = (
629 annotation_index + 1
630 )
631 yield ThreadItemUpdatedEvent(
632 item_id=event.item_id,
633 update=AssistantMessageContentPartAnnotationAdded(
634 content_index=event.content_index,
635 annotation_index=annotation_index,
636 annotation=annotation,
637 ),
638 )
639 continue
640 elif event.type == "response.output_item.added":
641 item = event.item
642 if item.type == "reasoning" and not ctx.workflow_item:
643 ctx.workflow_item = WorkflowItem(
644 id=ctx.generate_id("workflow"),
645 created_at=datetime.now(),
646 workflow=Workflow(type="reasoning", tasks=[]),
647 thread_id=thread.id,
648 )
649 produced_items.add(ctx.workflow_item.id)
650 yield ThreadItemAddedEvent(item=ctx.workflow_item)
651 if item.type == "message":
652 if ctx.workflow_item:
653 yield end_workflow(ctx.workflow_item)
654 produced_items.add(item.id)
655 yield ThreadItemAddedEvent(
656 item=AssistantMessageItem(
657 # Reusing the Responses message ID
658 id=item.id,
659 thread_id=thread.id,
660 content=[
661 await _convert_content(c, converter)
662 for c in item.content
663 ],
664 created_at=datetime.now(),
665 ),
666 )
667 elif item.type == "image_generation_call":
668 ctx.generated_image_item = GeneratedImageItem(
669 id=ctx.generate_id("message"),
670 thread_id=thread.id,
671 created_at=datetime.now(),
672 image=None,
673 )
674 produced_items.add(ctx.generated_image_item.id)
675 yield ThreadItemAddedEvent(item=ctx.generated_image_item)
676 elif event.type == "response.image_generation_call.partial_image":
677 if not ctx.generated_image_item:
678 continue
679
680 url = await converter.base64_image_to_url(
681 image_id=event.item_id,
682 base64_image=event.partial_image_b64,
683 partial_image_index=event.partial_image_index,
684 )
685 progress = converter.partial_image_index_to_progress(
686 event.partial_image_index
687 )
688
689 ctx.generated_image_item.image = GeneratedImage(
690 id=event.item_id, url=url
691 )
692
693 yield ThreadItemUpdatedEvent(
694 item_id=ctx.generated_image_item.id,
695 update=GeneratedImageUpdated(
696 image=ctx.generated_image_item.image, progress=progress
697 ),
698 )
699 elif event.type == "response.reasoning_summary_text.delta":
700 if not ctx.workflow_item:
701 continue
702
703 # stream the first thought in a new workflow so that we can show it earlier
704 if (
705 ctx.workflow_item.workflow.type == "reasoning"
706 and len(ctx.workflow_item.workflow.tasks) == 0
707 ):
708 streaming_thought = StreamingThoughtTracker(
709 item_id=event.item_id,
710 index=event.summary_index,
711 task=ThoughtTask(content=event.delta),
712 )
713 ctx.workflow_item.workflow.tasks.append(streaming_thought.task)
714 yield ThreadItemUpdatedEvent(
715 item_id=ctx.workflow_item.id,
716 update=WorkflowTaskAdded(
717 task=streaming_thought.task,
718 task_index=0,
719 ),
720 )
721 elif (
722 streaming_thought
723 and streaming_thought.task in ctx.workflow_item.workflow.tasks
724 and event.item_id == streaming_thought.item_id
725 and event.summary_index == streaming_thought.index
726 ):
727 streaming_thought.task.content += event.delta
728 yield ThreadItemUpdatedEvent(
729 item_id=ctx.workflow_item.id,
730 update=WorkflowTaskUpdated(
731 task=streaming_thought.task,
732 task_index=ctx.workflow_item.workflow.tasks.index(
733 streaming_thought.task
734 ),
735 ),
736 )
737 elif event.type == "response.reasoning_summary_text.done":
738 if ctx.workflow_item:
739 if (
740 streaming_thought
741 and streaming_thought.task in ctx.workflow_item.workflow.tasks
742 and event.item_id == streaming_thought.item_id
743 and event.summary_index == streaming_thought.index
744 ):
745 task = streaming_thought.task
746 task.content = event.text
747 streaming_thought = None
748 update = WorkflowTaskUpdated(
749 task=task,
750 task_index=ctx.workflow_item.workflow.tasks.index(task),
751 )
752 else:
753 task = ThoughtTask(content=event.text)
754 ctx.workflow_item.workflow.tasks.append(task)
755 update = WorkflowTaskAdded(
756 task=task,
757 task_index=ctx.workflow_item.workflow.tasks.index(task),
758 )
759 yield ThreadItemUpdatedEvent(
760 item_id=ctx.workflow_item.id,
761 update=update,
762 )
763 elif event.type == "response.output_item.done":
764 item = event.item
765 if item.type == "message":
766 produced_items.add(item.id)
767 yield ThreadItemDoneEvent(
768 item=AssistantMessageItem(
769 # Reusing the Responses message ID
770 id=item.id,
771 thread_id=thread.id,
772 content=[
773 await _convert_content(c, converter)
774 for c in item.content
775 ],
776 created_at=datetime.now(),
777 ),
778 )
779 elif item.type == "image_generation_call" and item.result:
780 if not ctx.generated_image_item:
781 continue
782
783 url = await converter.base64_image_to_url(
784 image_id=item.id,
785 base64_image=item.result,
786 )
787 image = GeneratedImage(id=item.id, url=url)
788
789 ctx.generated_image_item.image = image
790 yield ThreadItemDoneEvent(item=ctx.generated_image_item)
791
792 ctx.generated_image_item = None
793
794 except (InputGuardrailTripwireTriggered, OutputGuardrailTripwireTriggered):
795 for item_id in produced_items:
796 yield ThreadItemRemovedEvent(item_id=item_id)
797
798 # Drain remaining events without processing them
799 context._complete()
800 queue_iterator.drain_and_complete()
801
802 raise
803
804 context._complete()
805
806 # Drain remaining events
807 async for event in queue_iterator:
808 yield event.event
809
810 # If there is still an active workflow at the end of the run, store
811 # it's current state so that we can continue it in the next turn.
812 if ctx.workflow_item:
813 await ctx.store.add_thread_item(
814 thread.id, ctx.workflow_item, ctx.request_context
815 )
816
817 if context.client_tool_call:
818 yield ThreadItemDoneEvent(
819 item=ClientToolCallItem(
820 id=current_item_id
821 or context.store.generate_item_id(
822 "tool_call", thread, context.request_context
823 ),
824 thread_id=thread.id,
825 name=context.client_tool_call.name,
826 arguments=context.client_tool_call.arguments,
827 created_at=datetime.now(),
828 call_id=current_tool_call
829 or context.store.generate_item_id(
830 "tool_call", thread, context.request_context
831 ),
832 ),
833 )
834
835
836TWidget = TypeVar("TWidget", bound=Markdown | Text)
837
838
839async def accumulate_text(
840 events: AsyncIterator[StreamEvent],
841 base_widget: TWidget,
842) -> AsyncIterator[TWidget]:
843 text = ""
844 yield base_widget
845 async for event in events:
846 if event.type == "raw_response_event":
847 if event.data.type == "response.output_text.delta":
848 text += event.data.delta
849 yield base_widget.model_copy(update={"value": text})
850 yield base_widget.model_copy(update={"value": text, "streaming": False})
851
852
853class ThreadItemConverter:
854 """
855 Converts thread items to Agent SDK input items.
856 Widgets, Tasks, and Workflows have default conversions but can be customized.
857 Attachments, Tags, and HiddenContextItems require custom handling based on the use case.
858 Other item types are converted automatically.
859 """
860
861 async def attachment_to_message_content(
862 self, attachment: Attachment
863 ) -> ResponseInputContentParam:
864 """
865 Convert an attachment in a user message into a message content part to send to the model.
866 Required when attachments are enabled.
867 """
868 raise NotImplementedError(
869 "An Attachment was included in a UserMessageItem but Converter.attachment_to_message_content was not implemented"
870 )
871
872 async def tag_to_message_content(
873 self, tag: UserMessageTagContent
874 ) -> ResponseInputContentParam:
875 """
876 Convert a tag in a user message into a message content part to send to the model as context.
877 Required when tags are used.
878 """
879 raise NotImplementedError(
880 "A Tag was included in a UserMessageItem but Converter.tag_to_message_content is not implemented"
881 )
882
883 async def generated_image_to_input(
884 self, item: GeneratedImageItem
885 ) -> TResponseInputItem | list[TResponseInputItem] | None:
886 """
887 Convert a GeneratedImageItem into input item(s) to send to the model.
888 Override this method to customize the conversion of generated images, such as when your
889 generated image url is not publicly reachable.
890 """
891 if not item.image:
892 return None
893
894 return Message(
895 type="message",
896 content=[
897 ResponseInputTextParam(
898 type="input_text",
899 text="The following image was generated by the agent.",
900 ),
901 ResponseInputImageParam(
902 type="input_image",
903 detail="auto",
904 image_url=item.image.url,
905 ),
906 ],
907 role="user",
908 )
909
910 async def hidden_context_to_input(
911 self, item: HiddenContextItem
912 ) -> TResponseInputItem | list[TResponseInputItem] | None:
913 """
914 Convert a HiddenContextItem into input item(s) to send to the model.
915 Required to override when HiddenContextItems with non-string content are used.
916 """
917 if not isinstance(item.content, str):
918 raise NotImplementedError(
919 "HiddenContextItems with non-string content were present in a user message but a Converter.hidden_context_to_input was not implemented"
920 )
921
922 text = (
923 "Hidden context for the agent (not shown to the user):\n"
924 f"<HiddenContext>\n{item.content}\n</HiddenContext>"
925 )
926 return Message(
927 type="message",
928 content=[
929 ResponseInputTextParam(
930 type="input_text",
931 text=text,
932 )
933 ],
934 role="user",
935 )
936
937 async def sdk_hidden_context_to_input(
938 self, item: SDKHiddenContextItem
939 ) -> TResponseInputItem | list[TResponseInputItem] | None:
940 """
941 Convert a SDKHiddenContextItem into input item to send to the model.
942 This is used by the ChatKit Python SDK for storing additional context
943 for internal operations.
944 Override if you want to wrap the content in a different format.
945 """
946 text = (
947 "Hidden context for the agent (not shown to the user):\n"
948 f"<HiddenContext>\n{item.content}\n</HiddenContext>"
949 )
950 return Message(
951 type="message",
952 content=[
953 ResponseInputTextParam(
954 type="input_text",
955 text=text,
956 )
957 ],
958 role="user",
959 )
960
961 async def task_to_input(
962 self, item: TaskItem
963 ) -> TResponseInputItem | list[TResponseInputItem] | None:
964 """
965 Convert a TaskItem into input item(s) to send to the model.
966 """
967 if item.task.type != "custom" or (
968 not item.task.title and not item.task.content
969 ):
970 return None
971 title = f"{item.task.title}" if item.task.title else ""
972 content = f"{item.task.content}" if item.task.content else ""
973 task_text = f"{title}: {content}" if title and content else title or content
974 text = f"A message was displayed to the user that the following task was performed:\n<Task>\n{task_text}\n</Task>"
975 return Message(
976 type="message",
977 content=[
978 ResponseInputTextParam(
979 type="input_text",
980 text=text,
981 )
982 ],
983 role="user",
984 )
985
986 async def structured_input_to_input(
987 self, item: StructuredInputItem
988 ) -> TResponseInputItem | list[TResponseInputItem] | None:
989 """
990 Convert a StructuredInputItem into input item(s) to send to the model.
991 """
992 lines = []
993 for question in item.inputs:
994 answer = question.answer
995 if answer is None:
996 lines.append(f"- {question.question}: unanswered")
997 elif answer.skipped:
998 lines.append(f"- {question.question}: skipped")
999 else:
1000 lines.append(f"- {question.question}: {', '.join(answer.values)}")
1001
1002 text = (
1003 "A structured input request was displayed to the user with the following "
1004 f"status: {item.status}\n<StructuredInput>\n"
1005 + "\n".join(lines)
1006 + "\n</StructuredInput>"
1007 )
1008 return Message(
1009 type="message",
1010 content=[
1011 ResponseInputTextParam(
1012 type="input_text",
1013 text=text,
1014 ),
1015 ],
1016 role="user",
1017 )
1018
1019 async def workflow_to_input(
1020 self, item: WorkflowItem
1021 ) -> TResponseInputItem | list[TResponseInputItem] | None:
1022 """
1023 Convert a TaskItem into input item(s) to send to the model.
1024 Returns WorkflowItem.response_items by default.
1025 """
1026 messages = []
1027 for task in item.workflow.tasks:
1028 if task.type != "custom" or (not task.title and not task.content):
1029 continue
1030
1031 title = f"{task.title}" if task.title else ""
1032 content = f"{task.content}" if task.content else ""
1033 task_text = f"{title}: {content}" if title and content else title or content
1034 text = f"A message was displayed to the user that the following task was performed:\n<Task>\n{task_text}\n</Task>"
1035 messages.append(
1036 Message(
1037 type="message",
1038 content=[
1039 ResponseInputTextParam(
1040 type="input_text",
1041 text=text,
1042 )
1043 ],
1044 role="user",
1045 )
1046 )
1047 return messages
1048
1049 async def widget_to_input(
1050 self, item: WidgetItem
1051 ) -> TResponseInputItem | list[TResponseInputItem] | None:
1052 """
1053 Convert a WidgetItem into input item(s) to send to the model.
1054 By default, WidgetItems converted to a text description with a JSON representation of the widget.
1055 """
1056 return Message(
1057 type="message",
1058 content=[
1059 ResponseInputTextParam(
1060 type="input_text",
1061 text=f"The following graphical UI widget (id: {item.id}) was displayed to the user:"
1062 + item.widget.model_dump_json(
1063 exclude_unset=True, exclude_none=True
1064 ),
1065 )
1066 ],
1067 role="user",
1068 )
1069
1070 async def user_message_to_input(
1071 self, item: UserMessageItem, is_last_message: bool = True
1072 ) -> TResponseInputItem | list[TResponseInputItem] | None:
1073 # Build the user text exactly as typed, rendering tags as @key
1074 message_text_parts: list[str] = []
1075 # Track tags separately to add system context
1076 raw_tags: list[UserMessageTagContent] = []
1077
1078 for part in item.content:
1079 if isinstance(part, UserMessageTextContent):
1080 message_text_parts.append(part.text)
1081 elif isinstance(part, UserMessageTagContent):
1082 message_text_parts.append(f"@{part.text}")
1083 raw_tags.append(part)
1084 else:
1085 assert_never(part)
1086
1087 user_text_item = Message(
1088 role="user",
1089 type="message",
1090 content=[
1091 ResponseInputTextParam(
1092 type="input_text", text="".join(message_text_parts)
1093 ),
1094 *[
1095 await self.attachment_to_message_content(a)
1096 for a in item.attachments
1097 ],
1098 ],
1099 )
1100
1101 # Build system items (prepend later): quoted text and @-mention context
1102 context_items: list[TResponseInputItem] = []
1103
1104 if item.quoted_text and is_last_message:
1105 context_items.append(
1106 Message(
1107 role="user",
1108 type="message",
1109 content=[
1110 ResponseInputTextParam(
1111 type="input_text",
1112 text=f"The user is referring to this in particular: \n{item.quoted_text}",
1113 )
1114 ],
1115 )
1116 )
1117
1118 # Dedupe tags (preserve order) and resolve to message content
1119 if raw_tags:
1120 seen, uniq_tags = set(), []
1121 for t in raw_tags:
1122 if t.text not in seen:
1123 seen.add(t.text)
1124 uniq_tags.append(t)
1125
1126 tag_content: ResponseInputMessageContentListParam = [
1127 # should return summarized text items
1128 await self.tag_to_message_content(tag)
1129 for tag in uniq_tags
1130 ]
1131
1132 if tag_content:
1133 context_items.append(
1134 Message(
1135 role="user",
1136 type="message",
1137 content=[
1138 ResponseInputTextParam(
1139 type="input_text",
1140 text=cleandoc("""
1141 # User-provided context for @-mentions
1142 - When referencing resolved entities, use their canonical names **without** '@'.
1143 - The '@' form appears only in user text and should not be echoed.
1144 """).strip(),
1145 ),
1146 *tag_content,
1147 ],
1148 )
1149 )
1150
1151 return [user_text_item, *context_items]
1152
1153 async def assistant_message_to_input(
1154 self, item: AssistantMessageItem
1155 ) -> TResponseInputItem | list[TResponseInputItem] | None:
1156 return EasyInputMessageParam(
1157 type="message",
1158 content=[
1159 # content param doesn't support the assistant message content types
1160 cast(
1161 ResponseInputContentParam,
1162 ResponseOutputText(
1163 type="output_text",
1164 text=c.text,
1165 annotations=[], # TODO: these should be sent back as well
1166 ).model_dump(),
1167 )
1168 for c in item.content
1169 ],
1170 role="assistant",
1171 )
1172
1173 async def client_tool_call_to_input(
1174 self, item: ClientToolCallItem
1175 ) -> TResponseInputItem | list[TResponseInputItem] | None:
1176 if item.status == "pending":
1177 # Filter out pending tool calls - they cannot be sent to the model
1178 return None
1179
1180 return [
1181 ResponseFunctionToolCallParam(
1182 type="function_call",
1183 call_id=item.call_id,
1184 name=item.name,
1185 arguments=json.dumps(item.arguments),
1186 ),
1187 FunctionCallOutput(
1188 type="function_call_output",
1189 call_id=item.call_id,
1190 output=json.dumps(item.output),
1191 ),
1192 ]
1193
1194 async def end_of_turn_to_input(
1195 self, item: EndOfTurnItem
1196 ) -> TResponseInputItem | list[TResponseInputItem] | None:
1197 # Only used for UI hints - you shouldn't need to override this
1198 return None
1199
1200 async def _thread_item_to_input_item(
1201 self,
1202 item: ThreadItem,
1203 is_last_message: bool = True,
1204 ) -> list[TResponseInputItem]:
1205 match item:
1206 case UserMessageItem():
1207 out = await self.user_message_to_input(item, is_last_message) or []
1208 return out if isinstance(out, list) else [out]
1209 case AssistantMessageItem():
1210 out = await self.assistant_message_to_input(item) or []
1211 return out if isinstance(out, list) else [out]
1212 case ClientToolCallItem():
1213 out = await self.client_tool_call_to_input(item) or []
1214 return out if isinstance(out, list) else [out]
1215 case EndOfTurnItem():
1216 out = await self.end_of_turn_to_input(item) or []
1217 return out if isinstance(out, list) else [out]
1218 case WidgetItem():
1219 out = await self.widget_to_input(item) or []
1220 return out if isinstance(out, list) else [out]
1221 case WorkflowItem():
1222 out = await self.workflow_to_input(item) or []
1223 return out if isinstance(out, list) else [out]
1224 case TaskItem():
1225 out = await self.task_to_input(item) or []
1226 return out if isinstance(out, list) else [out]
1227 case StructuredInputItem():
1228 out = await self.structured_input_to_input(item) or []
1229 return out if isinstance(out, list) else [out]
1230 case HiddenContextItem():
1231 out = await self.hidden_context_to_input(item) or []
1232 return out if isinstance(out, list) else [out]
1233 case SDKHiddenContextItem():
1234 out = await self.sdk_hidden_context_to_input(item) or []
1235 return out if isinstance(out, list) else [out]
1236 case GeneratedImageItem():
1237 out = await self.generated_image_to_input(item) or []
1238 return out if isinstance(out, list) else [out]
1239 case _:
1240 assert_never(item)
1241
1242 async def to_agent_input(
1243 self,
1244 thread_items: Sequence[ThreadItem] | ThreadItem,
1245 ) -> list[TResponseInputItem]:
1246 if isinstance(thread_items, Sequence):
1247 # shallow copy in case caller mutates the list while we're iterating
1248 thread_items = thread_items[:]
1249 else:
1250 thread_items = [thread_items]
1251 output: list[TResponseInputItem] = []
1252 for item in thread_items:
1253 output.extend(
1254 await self._thread_item_to_input_item(
1255 item,
1256 is_last_message=item is thread_items[-1],
1257 )
1258 )
1259 return output
1260
1261
1262_DEFAULT_CONVERTER = ThreadItemConverter()
1263
1264
1265def simple_to_agent_input(thread_items: Sequence[ThreadItem] | ThreadItem):
1266 """Helper that converts thread items using the default ThreadItemConverter."""
1267 return _DEFAULT_CONVERTER.to_agent_input(thread_items)
1268