openai/chatkit-python

Public

mirrored from https://github.com/openai/chatkit-pythonAvailable

CodeCommitsIssuesPull requestsActionsInsightsSecurity
6df9ee60da4de3f063e752e60bd3ceef920f4e78

Branches

Tags

  • No tags available.
0Branches0Tags
Go to file
Add file
Code

Clone

HTTPS

Download ZIP

tests/test_agents.py

1492lines · modeblame

f688d870victor-openai8 months ago1import asyncio
2import json
3from collections.abc import AsyncIterator
4from datetime import datetime
5from typing import cast
6from unittest.mock import AsyncMock, Mock
7
8import pytest
9from agents import (
10Agent,
11GuardrailFunctionOutput,
12InputGuardrail,
13InputGuardrailResult,
14InputGuardrailTripwireTriggered,
15OutputGuardrail,
16OutputGuardrailResult,
17OutputGuardrailTripwireTriggered,
18RawResponsesStreamEvent,
19RunContextWrapper,
20RunItemStreamEvent,
21Runner,
22RunResultStreaming,
23StreamEvent,
24ToolCallItem,
25)
26from agents._run_impl import QueueCompleteSentinel
27from openai.types.responses import (
28EasyInputMessageParam,
29ResponseFileSearchToolCall,
30ResponseInputContentParam,
31ResponseInputTextParam,
32ResponseOutputItemAddedEvent,
33ResponseOutputItemDoneEvent,
34ResponseOutputMessage,
35ResponseReasoningItem,
36)
37from openai.types.responses.response_content_part_added_event import (
38ResponseContentPartAddedEvent,
39)
40from openai.types.responses.response_file_search_tool_call import Result
bf293721Jiwon Kim7 months ago41from openai.types.responses.response_input_item_param import Message
6403dffeDavid Weedon7 months ago42from openai.types.responses.response_output_text import (
43AnnotationContainerFileCitation as ResponsesAnnotationContainerFileCitation,
44)
f688d870victor-openai8 months ago45from openai.types.responses.response_output_text import (
46AnnotationFileCitation as ResponsesAnnotationFileCitation,
47)
48from openai.types.responses.response_output_text import (
49AnnotationFilePath as ResponsesAnnotationFilePath,
50)
51from openai.types.responses.response_output_text import (
52AnnotationURLCitation as ResponsesAnnotationURLCitation,
53)
54from openai.types.responses.response_output_text import (
55ResponseOutputText,
56)
57from openai.types.responses.response_text_delta_event import ResponseTextDeltaEvent
58from openai.types.responses.response_text_done_event import ResponseTextDoneEvent
59
60from chatkit.agents import (
61AgentContext,
62ThreadItemConverter,
63accumulate_text,
64simple_to_agent_input,
65stream_agent_response,
66)
67from chatkit.types import (
68Annotation,
69AssistantMessageContent,
70AssistantMessageContentPartAdded,
6403dffeDavid Weedon7 months ago71AssistantMessageContentPartAnnotationAdded,
f688d870victor-openai8 months ago72AssistantMessageContentPartDone,
73AssistantMessageContentPartTextDelta,
74AssistantMessageItem,
75Attachment,
76ClientToolCallItem,
77CustomSummary,
78CustomTask,
79DurationSummary,
80FileSource,
bf293721Jiwon Kim7 months ago81HiddenContextItem,
f688d870victor-openai8 months ago82InferenceOptions,
83Page,
84TaskItem,
85ThoughtTask,
86Thread,
87ThreadItemAddedEvent,
88ThreadItemDoneEvent,
643a02f7Jiwon Kim7 months ago89ThreadItemUpdatedEvent,
f688d870victor-openai8 months ago90ThreadStreamEvent,
91URLSource,
92UserMessageItem,
93UserMessageTagContent,
94UserMessageTextContent,
95WidgetItem,
96Workflow,
97WorkflowItem,
98WorkflowTaskAdded,
99WorkflowTaskUpdated,
100)
101from chatkit.widgets import Card, Text
102
103thread = Thread(id="123", title="Test", created_at=datetime.now(), items=Page())
104
105mock_store = Mock()
106mock_store.generate_item_id = lambda item_type, thread, context: f"{item_type}_id"
107mock_store.load_thread_items = AsyncMock(return_value=Page())
108mock_store.add_thread_item = AsyncMock()
109
110
111class RunResult(RunResultStreaming):
112def add_event(self, event: StreamEvent):
113self._event_queue.put_nowait(event)
114
115def done(self):
116self.is_complete = True
117self._event_queue.put_nowait(QueueCompleteSentinel())
118
119def throw_input_guardrails(self):
120self._stored_exception = InputGuardrailTripwireTriggered(
121InputGuardrailResult(
122guardrail=Mock(spec=InputGuardrail),
123output=GuardrailFunctionOutput(
124output_info=None,
125tripwire_triggered=True,
126),
127)
128)
129self.is_complete = True
130self._event_queue.put_nowait(QueueCompleteSentinel())
131
132def throw_output_guardrails(self):
133self._stored_exception = OutputGuardrailTripwireTriggered(
134OutputGuardrailResult(
135guardrail=Mock(spec=OutputGuardrail),
136output=GuardrailFunctionOutput(
137output_info=None,
138tripwire_triggered=True,
139),
140agent=Mock(spec=Agent),
141agent_output=None,
142)
143)
144self.is_complete = True
145self._event_queue.put_nowait(QueueCompleteSentinel())
146
147
148def make_result() -> RunResult:
149return RunResult(
150context_wrapper=Mock(spec=RunContextWrapper),
151input=[],
152tool_input_guardrail_results=[],
153tool_output_guardrail_results=[],
154new_items=[],
155raw_responses=[],
156final_output=None,
157current_agent=Agent(name="test"),
158current_turn=0,
159max_turns=10,
160_current_agent_output_schema=None,
161trace=None,
162is_complete=False,
163_event_queue=asyncio.Queue(),
164_input_guardrail_queue=asyncio.Queue(),
165_output_guardrails_task=None,
166_run_impl_task=None,
167_stored_exception=None,
168output_guardrail_results=[],
169input_guardrail_results=[],
170)
171
172
173async def all_events(
174events: AsyncIterator[ThreadStreamEvent],
175) -> list[ThreadStreamEvent]:
176return [event async for event in events]
177
178
179async def test_returns_widget_item():
180context = AgentContext(
181previous_response_id=None, thread=thread, store=mock_store, request_context=None
182)
183result = make_result()
184result.add_event(
185RunItemStreamEvent(name="tool_called", item=Mock(spec=ToolCallItem))
186)
187await context.stream_widget(Card(children=[Text(value="Hello, world!")]))
188result.done()
189
190events = await all_events(
191stream_agent_response(
192context=context,
193result=result,
194)
195)
196
197assert len(events) == 1
198assert isinstance(events[0], ThreadItemDoneEvent)
199assert isinstance(events[0].item, WidgetItem)
200assert events[0].item.widget == Card(children=[Text(value="Hello, world!")])
201
202
203async def test_returns_widget_item_generator():
204context = AgentContext(
205previous_response_id=None, thread=thread, store=mock_store, request_context=None
206)
207result = make_result()
208result.add_event(
209RunItemStreamEvent(name="tool_called", item=Mock(spec=ToolCallItem))
210)
211
212def render_widget(i: int) -> Card:
213return Card(children=[Text(id="text", value="Hello, world"[:i])])
214
215async def widget_generator():
216yield render_widget(0)
217yield render_widget(12)
218
219await context.stream_widget(widget_generator())
220result.done()
221
222events = await all_events(
223stream_agent_response(
224context=context,
225result=result,
226)
227)
228
229assert len(events) == 3
230assert isinstance(events[0], ThreadItemAddedEvent)
231assert isinstance(events[0].item, WidgetItem)
232assert events[0].item.widget == Card(children=[Text(id="text", value="")])
233
643a02f7Jiwon Kim7 months ago234assert isinstance(events[1], ThreadItemUpdatedEvent)
f688d870victor-openai8 months ago235assert events[1].update.type == "widget.streaming_text.value_delta"
236assert events[1].update.component_id == "text"
237assert events[1].update.delta == "Hello, world"
238
239assert isinstance(events[2], ThreadItemDoneEvent)
240assert isinstance(events[2].item, WidgetItem)
241assert events[2].item.widget == Card(
242children=[Text(id="text", value="Hello, world")]
243)
244
245
246async def test_returns_widget_full_replace_generator():
247context = AgentContext(
248previous_response_id=None, thread=thread, store=mock_store, request_context=None
249)
250result = make_result()
251result.add_event(
252RunItemStreamEvent(name="tool_called", item=Mock(spec=ToolCallItem))
253)
254
255async def widget_generator():
256yield Card(children=[Text(id="text", value="Hello!")])
257yield Card(children=[Text(key="other text", value="World!", streaming=False)])
258
259await context.stream_widget(widget_generator())
260result.done()
261
262events = await all_events(
263stream_agent_response(
264context=context,
265result=result,
266)
267)
268
269assert len(events) == 3
270assert isinstance(events[0], ThreadItemAddedEvent)
271assert isinstance(events[0].item, WidgetItem)
272assert events[0].item.widget == Card(children=[Text(id="text", value="Hello!")])
273
643a02f7Jiwon Kim7 months ago274assert isinstance(events[1], ThreadItemUpdatedEvent)
f688d870victor-openai8 months ago275assert events[1].update.type == "widget.root.updated"
276assert events[1].update.widget == Card(
277children=[Text(key="other text", value="World!", streaming=False)]
278)
279
280assert isinstance(events[2], ThreadItemDoneEvent)
281assert isinstance(events[2].item, WidgetItem)
282assert events[2].item.widget == Card(
283children=[Text(key="other text", value="World!", streaming=False)]
284)
285
286
287async def test_accumulate_text():
288def delta(text: str) -> RawResponsesStreamEvent:
289return RawResponsesStreamEvent(
290type="raw_response_event",
291data=ResponseTextDeltaEvent(
292type="response.output_text.delta",
293delta=text,
294content_index=0,
295item_id="123",
296logprobs=[],
297output_index=0,
298sequence_number=0,
299),
300)
301
302result = Runner.run_streamed(
303Agent("Assistant", instructions="You are a helpful assistant."), "Say hello!"
304)
305result = make_result()
306result.add_event(delta("Hello, "))
307result.add_event(delta("world!"))
308
309result.done()
310
311events = [
312event
313async for event in accumulate_text(
314result.stream_events(), Text(key="text", value="", streaming=True)
315)
316]
317assert events == [
318Text(key="text", value="", streaming=True),
319Text(key="text", value="Hello, ", streaming=True),
320Text(key="text", value="Hello, world!", streaming=True),
321Text(key="text", value="Hello, world!", streaming=False),
322]
323
324
325async def test_input_item_converter_quotes_last_user_message():
326items = [
327UserMessageItem(
328id="123",
329content=[UserMessageTextContent(text="Hello!")],
330attachments=[],
331inference_options=InferenceOptions(),
332thread_id=thread.id,
333quoted_text="Hi!",
334created_at=datetime.now(),
335),
336UserMessageItem(
337id="123",
338content=[UserMessageTextContent(text="I'm well, thank you!")],
339attachments=[],
340inference_options=InferenceOptions(),
341thread_id=thread.id,
342quoted_text="How are you doing?",
343created_at=datetime.now(),
344),
345]
346
347async def throw_exception(
348_: Attachment,
349) -> ResponseInputContentParam:
350raise Exception("Not implemented")
351
352input_items = await simple_to_agent_input(items)
353assert len(input_items) == 3
354assert input_items[0] == {
355"content": [
356{
357"text": "Hello!",
358"type": "input_text",
359},
360],
361"role": "user",
362"type": "message",
363}
364assert input_items[1] == {
365"content": [
366{
367"text": "I'm well, thank you!",
368"type": "input_text",
369},
370],
371"role": "user",
372"type": "message",
373}
374assert input_items[2] == {
375"content": [
376{
377"text": "The user is referring to this in particular: \nHow are you doing?",
378"type": "input_text",
379},
380],
381"role": "user",
382"type": "message",
383}
384
385
386async def test_input_item_converter_to_input_items_mixed():
387items = [
388UserMessageItem(
389id="123",
390content=[UserMessageTextContent(text="Hello!")],
391attachments=[],
392inference_options=InferenceOptions(),
393thread_id=thread.id,
394quoted_text="Hi!",
395created_at=datetime.now(),
396),
397UserMessageItem(
398id="123",
399content=[UserMessageTextContent(text="I'm well, thank you!")],
400attachments=[],
401inference_options=InferenceOptions(),
402thread_id=thread.id,
403quoted_text="How are you doing?",
404created_at=datetime.now(),
405),
406AssistantMessageItem(
407id="123",
408content=[
409AssistantMessageContent(text="How are you doing?"),
410AssistantMessageContent(text="Can't do that"),
411],
412thread_id=thread.id,
413created_at=datetime.now(),
414),
415WidgetItem(
416id="wd_123",
417widget=Card(children=[Text(value="Hello, world!")]),
418thread_id=thread.id,
419created_at=datetime.now(),
420),
421]
422
423input_items = await simple_to_agent_input(items)
424assert len(input_items) == 4
425assert input_items[0] == {
426"content": [
427{
428"text": "Hello!",
429"type": "input_text",
430},
431],
432"role": "user",
433"type": "message",
434}
435assert input_items[1] == {
436"content": [
437{
438"text": "I'm well, thank you!",
439"type": "input_text",
440},
441],
442"role": "user",
443"type": "message",
444}
445assert input_items[2] == {
446"content": [
447{
448"annotations": [],
449"text": "How are you doing?",
450"logprobs": None,
451"type": "output_text",
452},
453{
454"annotations": [],
455"text": "Can't do that",
456"logprobs": None,
457"type": "output_text",
458},
459],
460"type": "message",
461"role": "assistant",
462}
463assert "type" in input_items[3]
464widget_item = cast(EasyInputMessageParam, input_items[3])
465assert widget_item.get("type") == "message"
466assert widget_item.get("role") == "user"
467text = widget_item.get("content")[0]["text"] # type: ignore
468assert (
469"The following graphical UI widget (id: wd_123) was displayed to the user"
470in text
471)
472assert "Hello, world!" in text
473assert "created_at" not in text
474
475
476async def test_input_item_converter_user_input_with_tags():
477class MyThreadItemConverter(ThreadItemConverter):
b474bf11Jiwon Kim7 months ago478async def tag_to_message_content(self, tag):
f688d870victor-openai8 months ago479return ResponseInputTextParam(
480type="input_text", text=tag.text + " " + tag.data["key"]
481)
482
483items = [
484UserMessageItem(
485id="123",
486content=[
487UserMessageTagContent(
488text="Hello!", type="input_tag", id="hello", data={"key": "value"}
489)
490],
491attachments=[],
492inference_options=InferenceOptions(),
493thread_id=thread.id,
494created_at=datetime.now(),
495)
496]
497items = await MyThreadItemConverter().to_agent_input(items)
498
499assert len(items) == 2
500assert items[0] == {
501"content": [
502{
503"text": "@Hello!",
504"type": "input_text",
505},
506],
507"role": "user",
508"type": "message",
509}
510assert items[1] == {
511"content": [
512{
513"text": "# User-provided context for @-mentions\n- When referencing resolved entities, use their canonical names **without** '@'.\n"
514+ "- The '@' form appears only in user text and should not be echoed.",
515"type": "input_text",
516},
517{
518"text": "Hello! value",
519"type": "input_text",
520},
521],
522"role": "user",
523"type": "message",
524}
525
526
527async def test_input_item_converter_user_input_with_tags_throws_by_default():
528items = [
529UserMessageItem(
530id="123",
531content=[
532UserMessageTagContent(
533text="Hello!", type="input_tag", id="hello", data={}
534)
535],
536attachments=[],
537inference_options=InferenceOptions(),
538thread_id=thread.id,
539created_at=datetime.now(),
540)
541]
542
543with pytest.raises(NotImplementedError):
544await simple_to_agent_input(items)
545
546
bf293721Jiwon Kim7 months ago547async def test_input_item_converter_for_hidden_context_with_string_content():
548items = [
549HiddenContextItem(
550id="123",
551content="User pressed the red button",
552thread_id=thread.id,
553created_at=datetime.now(),
554)
555]
556
557# The default converter works for string content
558items = await simple_to_agent_input(items)
559assert len(items) == 1
560assert items[0] == {
561"content": [
562{
563"text": "Hidden context for the agent (not shown to the user):\n<HiddenContext>\nUser pressed the red button\n</HiddenContext>",
564"type": "input_text",
565},
566],
567"role": "user",
568"type": "message",
569}
570
571
572async def test_input_item_converter_for_hidden_context_with_non_string_content():
573items = [
574HiddenContextItem(
575id="123",
576content={"harry": "potter", "hermione": "granger"},
577thread_id=thread.id,
578created_at=datetime.now(),
579)
580]
581
582# Default converter should throw
583with pytest.raises(NotImplementedError):
584await simple_to_agent_input(items)
585
586class MyThreadItemConverter(ThreadItemConverter):
587async def hidden_context_to_input(self, item: HiddenContextItem):
588content = ",".join(item.content.keys())
589return Message(
590type="message",
591content=[
592ResponseInputTextParam(
593type="input_text", text=f"<HIDDEN>{content}</HIDDEN>"
594)
595],
596role="user",
597)
598
599items = await MyThreadItemConverter().to_agent_input(items)
600assert len(items) == 1
601assert items[0] == {
602"content": [
603{
604"text": "<HIDDEN>harry,hermione</HIDDEN>",
605"type": "input_text",
606},
607],
608"role": "user",
609"type": "message",
610}
611
612
f688d870victor-openai8 months ago613async def test_input_item_converter_with_client_tool_call():
614items = [
615UserMessageItem(
616id="123",
617content=[UserMessageTextContent(text="Call a client tool call xyz")],
618attachments=[],
619inference_options=InferenceOptions(),
620thread_id=thread.id,
621quoted_text="Hi!",
622created_at=datetime.now(),
623),
624TaskItem(
625id="tsk_123",
626created_at=datetime.now(),
627task=CustomTask(title="Called xyx"),
628thread_id=thread.id,
629),
630ClientToolCallItem(
631id="ctc_123",
632thread_id=thread.id,
633created_at=datetime.now(),
634name="xyz",
635arguments={"foo": "bar"},
636call_id="ctc_123",
637),
638ClientToolCallItem(
639id="ctc_123_done",
640thread_id=thread.id,
641created_at=datetime.now(),
642name="xyz",
643arguments={"foo": "bar"},
644call_id="ctc_123",
645status="completed",
646output={"success": True},
647),
648]
649
650input_items = await simple_to_agent_input(items)
651assert len(input_items) == 4
652assert input_items[0] == {
653"content": [
654{
655"text": "Call a client tool call xyz",
656"type": "input_text",
657},
658],
659"role": "user",
660"type": "message",
661}
662assert input_items[1] == {
663"content": [
664{
665"text": "A message was displayed to the user that the following task was performed:\n<Task>\nCalled xyx\n</Task>",
666"type": "input_text",
667},
668],
669"type": "message",
670"role": "user",
671}
672assert input_items[2] == {
673"type": "function_call",
674"name": "xyz",
675"arguments": json.dumps({"foo": "bar"}),
676"call_id": "ctc_123",
677}
678assert input_items[3] == {
679"type": "function_call_output",
680"call_id": "ctc_123",
681"output": json.dumps({"success": True}),
682}
683
684
685async def test_stream_agent_response_yields_context_events_without_streaming_events():
686context = AgentContext(
687previous_response_id=None, thread=thread, store=mock_store, request_context=None
688)
689result = make_result()
690
691event = ThreadItemAddedEvent(
692item=WidgetItem(
693id="123",
694created_at=datetime.now(),
695thread_id=thread.id,
696widget=Card(children=[Text(id="text", value="Hello, world!")]),
697),
698)
699
700await context.stream(event)
701
702response_streamer = stream_agent_response(context, result)
703event = await response_streamer.__anext__()
704
705assert event.type == "thread.item.added"
706
707future = asyncio.ensure_future(response_streamer.__anext__())
708assert future.done() is False
709
710result.done()
711
712try:
713await future
714assert False, "expected StopAsyncIteration"
715except StopAsyncIteration:
716pass
717
718assert future.done() is True
719
720
721async def test_stream_agent_response_maps_events():
722context = AgentContext(
723previous_response_id=None, thread=thread, store=mock_store, request_context=None
724)
725result = make_result()
726
727event = ThreadItemAddedEvent(
728item=WidgetItem(
729id="123",
730created_at=datetime.now(),
731thread_id=thread.id,
732widget=Card(children=[Text(id="text", value="Hello, world!")]),
733),
734)
735
736await context.stream(event)
737result.add_event(
738RawResponsesStreamEvent(
739type="raw_response_event",
740data=ResponseTextDeltaEvent(
741type="response.output_text.delta",
742delta="Hello, world!",
743content_index=0,
744item_id="123",
745logprobs=[],
746output_index=0,
747sequence_number=0,
748),
749)
750)
751
752response_streamer = stream_agent_response(context, result)
753event1 = await response_streamer.__anext__()
754event2 = await response_streamer.__anext__()
755
756assert {event1.type, event2.type} == {
757"thread.item.added",
758"thread.item.updated",
759}
760
761future = asyncio.ensure_future(response_streamer.__anext__())
762assert future.done() is False
763
764result.done()
765
766try:
767await future
768assert False, "expected StopAsyncIteration"
769except StopAsyncIteration:
770pass
771
772assert future.done() is True
773
774
775@pytest.mark.parametrize(
776"raw_event,expected_event",
777[
778(
779RawResponsesStreamEvent(
780type="raw_response_event",
781data=ResponseTextDeltaEvent(
782type="response.output_text.delta",
783delta="Hello, world!",
784content_index=0,
785item_id="123",
786logprobs=[],
787output_index=0,
788sequence_number=0,
789),
790),
643a02f7Jiwon Kim7 months ago791ThreadItemUpdatedEvent(
f688d870victor-openai8 months ago792item_id="123",
793update=AssistantMessageContentPartTextDelta(
794content_index=0,
795delta="Hello, world!",
796),
797),
798),
799(
800RawResponsesStreamEvent(
801type="raw_response_event",
802data=ResponseContentPartAddedEvent(
803type="response.content_part.added",
804part=ResponseOutputText(
805type="output_text",
806text="New content",
807annotations=[],
808),
809content_index=1,
810item_id="123",
811output_index=0,
812sequence_number=1,
813),
814),
643a02f7Jiwon Kim7 months ago815ThreadItemUpdatedEvent(
f688d870victor-openai8 months ago816item_id="123",
817update=AssistantMessageContentPartAdded(
818content_index=1,
819content=AssistantMessageContent(text="New content", annotations=[]),
820),
821),
822),
823(
824RawResponsesStreamEvent(
825type="raw_response_event",
826data=ResponseTextDoneEvent(
827type="response.output_text.done",
828text="Final text",
829content_index=0,
830item_id="123",
831logprobs=[],
832output_index=0,
833sequence_number=2,
834),
835),
643a02f7Jiwon Kim7 months ago836ThreadItemUpdatedEvent(
f688d870victor-openai8 months ago837item_id="123",
838update=AssistantMessageContentPartDone(
839content_index=0,
840content=AssistantMessageContent(
841text="Final text",
842annotations=[],
843),
844),
845),
846),
847(
848RawResponsesStreamEvent(
849type="raw_response_event",
850data=Mock(
851type="response.output_text.annotation.added",
852annotation=ResponsesAnnotationFileCitation(
853type="file_citation",
854file_id="file_123",
855filename="file.txt",
856index=5,
857),
858content_index=0,
859item_id="123",
860annotation_index=0,
861output_index=0,
862sequence_number=3,
863),
864),
643a02f7Jiwon Kim7 months ago865ThreadItemUpdatedEvent(
6403dffeDavid Weedon7 months ago866item_id="123",
867update=AssistantMessageContentPartAnnotationAdded(
868content_index=0,
869annotation_index=0,
870annotation=Annotation(
871source=FileSource(filename="file.txt", title="file.txt"),
872index=5,
873),
874),
875),
f688d870victor-openai8 months ago876),
877],
878)
879async def test_event_mapping(raw_event, expected_event):
880context = AgentContext(
881previous_response_id=None, thread=thread, store=mock_store, request_context=None
882)
883result = make_result()
884
885result.add_event(raw_event)
886result.done()
887
888events = await all_events(stream_agent_response(context, result))
889if expected_event:
890assert events == [expected_event]
891else:
892assert events == []
893
894
6403dffeDavid Weedon7 months ago895async def test_stream_agent_response_emits_annotation_added_events():
896context = AgentContext(
897previous_response_id=None, thread=thread, store=mock_store, request_context=None
898)
899result = make_result()
900item_id = "item_123"
901
902def add_annotation_event(annotation, sequence_number):
903result.add_event(
904RawResponsesStreamEvent(
905type="raw_response_event",
906data=Mock(
907type="response.output_text.annotation.added",
908annotation=annotation,
909content_index=0,
910item_id=item_id,
911annotation_index=sequence_number,
912output_index=0,
913sequence_number=sequence_number,
914),
915)
916)
917
918add_annotation_event(
919ResponsesAnnotationFileCitation(
920type="file_citation",
921file_id="file_invalid",
922filename="",
923index=0,
924),
925sequence_number=0,
926)
927add_annotation_event(
928ResponsesAnnotationContainerFileCitation(
929type="container_file_citation",
930container_id="container_1",
931file_id="file_123",
932filename="container.txt",
933start_index=0,
934end_index=3,
935),
936sequence_number=1,
937)
938add_annotation_event(
939ResponsesAnnotationURLCitation(
940type="url_citation",
941url="https://example.com",
942title="Example",
943start_index=1,
944end_index=5,
945),
946sequence_number=2,
947)
948result.done()
949
950events = await all_events(stream_agent_response(context, result))
951assert events == [
643a02f7Jiwon Kim7 months ago952ThreadItemUpdatedEvent(
6403dffeDavid Weedon7 months ago953item_id=item_id,
954update=AssistantMessageContentPartAnnotationAdded(
955content_index=0,
956annotation_index=0,
957annotation=Annotation(
958source=FileSource(filename="container.txt", title="container.txt"),
959index=3,
960),
961),
962),
643a02f7Jiwon Kim7 months ago963ThreadItemUpdatedEvent(
6403dffeDavid Weedon7 months ago964item_id=item_id,
965update=AssistantMessageContentPartAnnotationAdded(
966content_index=0,
967annotation_index=1,
968annotation=Annotation(
969source=URLSource(
970url="https://example.com",
971title="Example",
972),
973index=5,
974),
975),
976),
977]
978
979
f688d870victor-openai8 months ago980@pytest.mark.parametrize("throw_guardrail", ["input", "output"])
981async def test_stream_agent_response_yields_item_removed_event(throw_guardrail):
982context = AgentContext(
983previous_response_id=None, thread=thread, store=mock_store, request_context=None
984)
985result = make_result()
986result.add_event(
987RawResponsesStreamEvent(
988type="raw_response_event",
989data=ResponseOutputItemAddedEvent(
990type="response.output_item.added",
991item=ResponseOutputMessage(
992id="1",
993content=[
994ResponseOutputText(
995annotations=[], type="output_text", text="Hello, world!"
996)
997],
998role="assistant",
999status="completed",
1000type="message",
1001),
1002output_index=0,
1003sequence_number=0,
1004),
1005)
1006)
1007await context.stream(
1008ThreadItemAddedEvent(
1009item=AssistantMessageItem(
1010id="2",
1011content=[AssistantMessageContent(text="Hello, world!")],
1012thread_id=thread.id,
1013created_at=datetime.now(),
1014),
1015)
1016)
1017
1018await context.stream(
1019ThreadItemDoneEvent(
1020item=WidgetItem(
1021id="3",
1022created_at=datetime.now(),
1023thread_id=thread.id,
1024widget=Card(children=[Text(id="text", value="Hello, world!")]),
1025),
1026)
1027)
1028
1029iterator = stream_agent_response(context, result)
1030
1031n = 3
1032events = []
1033# Grab first 3 events to
1034async for event in iterator:
1035n -= 1
1036events.append(event)
1037if n == 0:
1038break
1039
1040if throw_guardrail == "input":
1041result.throw_input_guardrails()
1042else:
1043result.throw_output_guardrails()
1044
1045try:
1046async for event in iterator:
1047events.append(event)
1048assert False, "Guardrail should have been thrown from stream_agent_response"
1049except (InputGuardrailTripwireTriggered, OutputGuardrailTripwireTriggered):
1050pass
1051except Exception as e:
1052assert False, f"Unexpected exception: {e}"
1053
1054deleted_item_ids = {
1055event.item_id for event in events if event.type == "thread.item.removed"
1056}
1057assert deleted_item_ids == {"1", "2", "3"}
1058
1059
1060async def test_stream_agent_response_assistant_message_content_types():
1061AgentContext(
1062previous_response_id=None, thread=thread, store=mock_store, request_context=None
1063)
1064result = make_result()
1065
1066result.add_event(
1067RawResponsesStreamEvent(
1068type="raw_response_event",
1069data=ResponseOutputItemDoneEvent(
1070type="response.output_item.done",
1071item=ResponseFileSearchToolCall(
1072id="fs_0",
1073queries=["Hello, world!"],
1074status="completed",
1075type="file_search_call",
1076results=[
1077Result(
1078file_id="f_123",
1079filename="test.txt",
1080text="Hello, world!",
1081score=1.0,
1082),
1083Result(
1084file_id="f_123",
1085filename="test.txt",
1086text="Hello, friends!",
1087score=0.5,
1088),
1089],
1090),
1091output_index=0,
1092sequence_number=0,
1093),
1094)
1095)
1096result.add_event(
1097RawResponsesStreamEvent(
1098type="raw_response_event",
1099data=ResponseOutputItemDoneEvent(
1100type="response.output_item.done",
1101item=ResponseOutputMessage(
1102id="1",
1103content=[
1104ResponseOutputText(
1105annotations=[
1106ResponsesAnnotationFileCitation(
1107type="file_citation",
1108file_id="f_123",
1109index=0,
1110filename="test.txt",
1111),
6403dffeDavid Weedon7 months ago1112ResponsesAnnotationContainerFileCitation(
1113type="container_file_citation",
1114container_id="container_1",
1115file_id="f_456",
1116filename="container.txt",
1117start_index=0,
1118end_index=3,
1119),
f688d870victor-openai8 months ago1120ResponsesAnnotationURLCitation(
1121type="url_citation",
1122url="https://www.google.com",
1123title="Google",
1124start_index=0,
1125end_index=10,
1126),
1127ResponsesAnnotationFilePath(
1128type="file_path",
1129file_id="123",
1130index=0,
1131),
1132],
1133text="Hello, world!",
1134type="output_text",
1135),
1136ResponseOutputText(
1137annotations=[],
1138text="Can't do that",
1139type="output_text",
1140),
1141],
1142role="assistant",
1143status="completed",
1144type="message",
1145),
1146output_index=0,
1147sequence_number=0,
1148),
1149)
1150)
1151
1152result.done()
1153
1154context = AgentContext(
1155previous_response_id=None, thread=thread, store=mock_store, request_context=None
1156)
1157events = await all_events(stream_agent_response(context, result))
1158assert len(events) == 1
1159assert isinstance(events[0], ThreadItemDoneEvent)
1160message = events[0].item
1161assert isinstance(message, AssistantMessageItem)
1162assert message.content == [
1163AssistantMessageContent(
1164annotations=[
1165Annotation(
1166source=FileSource(
1167filename="test.txt",
1168title="test.txt",
1169),
1170index=0,
1171),
6403dffeDavid Weedon7 months ago1172Annotation(
1173source=FileSource(
1174filename="container.txt",
1175title="container.txt",
1176),
1177index=3,
1178),
f688d870victor-openai8 months ago1179Annotation(
1180source=URLSource(
1181url="https://www.google.com",
1182title="Google",
1183),
1184index=10,
1185),
1186],
1187text="Hello, world!",
1188),
1189AssistantMessageContent(text="Can't do that", annotations=[]),
1190]
1191assert message.id == "1"
1192
1193
1194async def test_workflow_streams_first_thought():
1195context = AgentContext(
1196previous_response_id=None, thread=thread, store=mock_store, request_context=None
1197)
1198result = make_result()
1199
1200# first thought
1201result.add_event(
1202RawResponsesStreamEvent(
1203type="raw_response_event",
1204data=ResponseOutputItemAddedEvent(
1205type="response.output_item.added",
1206item=ResponseReasoningItem(
1207id="resp_1",
1208summary=[],
1209type="reasoning",
1210),
1211output_index=0,
1212sequence_number=0,
1213),
1214)
1215)
1216result.add_event(
1217RawResponsesStreamEvent(
1218type="raw_response_event",
1219data=Mock(
1220type="response.reasoning_summary_text.delta",
1221item_id="resp_1",
1222summary_index=0,
1223delta="Think",
1224),
1225)
1226)
1227result.add_event(
1228RawResponsesStreamEvent(
1229type="raw_response_event",
1230data=Mock(
1231type="response.reasoning_summary_text.delta",
1232item_id="resp_1",
1233summary_index=0,
1234delta="ing 1",
1235),
1236)
1237)
1238result.add_event(
1239RawResponsesStreamEvent(
1240type="raw_response_event",
1241data=Mock(
1242type="response.reasoning_summary_text.done",
1243item_id="resp_1",
1244summary_index=0,
1245text="Thinking 1",
1246),
1247)
1248)
1249
1250# second thought
1251result.add_event(
1252RawResponsesStreamEvent(
1253type="raw_response_event",
1254data=Mock(
1255type="response.reasoning_summary_text.delta",
1256item_id="resp_1",
1257summary_index=1,
1258delta="Think",
1259),
1260)
1261)
1262result.add_event(
1263RawResponsesStreamEvent(
1264type="raw_response_event",
1265data=Mock(
1266type="response.reasoning_summary_text.delta",
1267item_id="resp_1",
1268summary_index=1,
1269delta="ing 2",
1270),
1271)
1272)
1273result.add_event(
1274RawResponsesStreamEvent(
1275type="raw_response_event",
1276data=Mock(
1277type="response.reasoning_summary_text.done",
1278item_id="resp_1",
1279summary_index=1,
1280text="Thinking 2",
1281),
1282)
1283)
1284
1285result.done()
1286stream = stream_agent_response(context, result)
1287
1288# Workflow added
1289event = await anext(stream)
1290assert isinstance(event, ThreadItemAddedEvent)
1291assert context.workflow_item is not None
1292assert context.workflow_item.workflow.type == "reasoning"
1293assert len(context.workflow_item.workflow.tasks) == 0
1294assert event == ThreadItemAddedEvent(item=context.workflow_item)
1295
1296# First thought added
1297event = await anext(stream)
1298assert context.workflow_item is not None
1299assert len(context.workflow_item.workflow.tasks) == 1
643a02f7Jiwon Kim7 months ago1300assert isinstance(event, ThreadItemUpdatedEvent)
1301assert event == ThreadItemUpdatedEvent(
f688d870victor-openai8 months ago1302item_id=context.workflow_item.id,
1303update=WorkflowTaskAdded(
1304task=ThoughtTask(content="Think"),
1305task_index=0,
1306),
1307)
1308
1309# First thought delta
1310event = await anext(stream)
1311assert context.workflow_item is not None
1312assert len(context.workflow_item.workflow.tasks) == 1
643a02f7Jiwon Kim7 months ago1313assert isinstance(event, ThreadItemUpdatedEvent)
1314assert event == ThreadItemUpdatedEvent(
f688d870victor-openai8 months ago1315item_id=context.workflow_item.id,
1316update=WorkflowTaskUpdated(
1317task=ThoughtTask(content="Thinking 1"),
1318task_index=0,
1319),
1320)
1321
1322# First thought done
1323event = await anext(stream)
1324assert context.workflow_item is not None
1325assert len(context.workflow_item.workflow.tasks) == 1
643a02f7Jiwon Kim7 months ago1326assert isinstance(event, ThreadItemUpdatedEvent)
1327assert event == ThreadItemUpdatedEvent(
f688d870victor-openai8 months ago1328item_id=context.workflow_item.id,
1329update=WorkflowTaskUpdated(
1330task=ThoughtTask(content="Thinking 1"),
1331task_index=0,
1332),
1333)
1334
1335# Second thought added (not streamed)
1336event = await anext(stream)
1337assert context.workflow_item is not None
1338assert len(context.workflow_item.workflow.tasks) == 2
643a02f7Jiwon Kim7 months ago1339assert isinstance(event, ThreadItemUpdatedEvent)
1340assert event == ThreadItemUpdatedEvent(
f688d870victor-openai8 months ago1341item_id=context.workflow_item.id,
1342update=WorkflowTaskAdded(
1343task=ThoughtTask(content="Thinking 2"),
1344task_index=1,
1345),
1346)
1347
1348try:
1349while True:
1350await anext(stream)
1351except StopAsyncIteration:
1352pass
1353
1354
1355async def test_workflow_ends_on_message():
1356context = AgentContext(
1357previous_response_id=None, thread=thread, store=mock_store, request_context=None
1358)
1359result = make_result()
1360
1361# first thought
1362result.add_event(
1363RawResponsesStreamEvent(
1364type="raw_response_event",
1365data=ResponseOutputItemAddedEvent(
1366type="response.output_item.added",
1367item=ResponseReasoningItem(
1368id="resp_1",
1369summary=[],
1370type="reasoning",
1371),
1372output_index=0,
1373sequence_number=0,
1374),
1375)
1376)
1377result.add_event(
1378RawResponsesStreamEvent(
1379type="raw_response_event",
1380data=Mock(
1381type="response.reasoning_summary_text.done",
1382item_id="resp_1",
1383summary_index=0,
1384text="Thinking 1",
1385),
1386)
1387)
1388
1389# not reasoning
1390result.add_event(
1391RawResponsesStreamEvent(
1392type="raw_response_event",
1393data=ResponseOutputItemAddedEvent(
1394type="response.output_item.added",
1395item=ResponseOutputMessage(
1396id="m_1",
1397content=[],
1398role="assistant",
1399status="in_progress",
1400type="message",
1401),
1402output_index=0,
1403sequence_number=0,
1404),
1405)
1406)
1407
1408result.done()
1409stream = stream_agent_response(context, result)
1410
1411# Workflow added
1412event = await anext(stream)
1413assert isinstance(event, ThreadItemAddedEvent)
1414assert context.workflow_item is not None
1415assert context.workflow_item.workflow.type == "reasoning"
1416assert len(context.workflow_item.workflow.tasks) == 0
1417assert event == ThreadItemAddedEvent(item=context.workflow_item)
1418
1419# First thought done
1420event = await anext(stream)
1421assert context.workflow_item is not None
1422assert len(context.workflow_item.workflow.tasks) == 1
643a02f7Jiwon Kim7 months ago1423assert isinstance(event, ThreadItemUpdatedEvent)
1424assert event == ThreadItemUpdatedEvent(
f688d870victor-openai8 months ago1425item_id=context.workflow_item.id,
1426update=WorkflowTaskAdded(
1427task=ThoughtTask(content="Thinking 1"),
1428task_index=0,
1429),
1430)
1431
1432# Workflow ended
1433event = await anext(stream)
1434assert isinstance(event, ThreadItemDoneEvent)
1435assert event.item.type == "workflow"
1436assert context.workflow_item is None
1437# Summary and expanded are handled by the end_workflow method
1438assert isinstance(event.item.workflow.summary, DurationSummary)
1439assert event.item.workflow.expanded is False
1440
1441try:
1442while True:
1443await anext(stream)
1444except StopAsyncIteration:
1445pass
1446
1447
1448async def test_existing_workflow_summary_not_overwritten_on_automatic_end():
1449context = AgentContext(
1450previous_response_id=None, thread=thread, store=mock_store, request_context=None
1451)
1452result = make_result()
1453context.workflow_item = WorkflowItem(
1454id="wf_1",
1455created_at=datetime.now(),
1456workflow=Workflow(type="custom", tasks=[], summary=CustomSummary(title="Test")),
1457thread_id=thread.id,
1458)
1459
1460result.add_event(
1461RawResponsesStreamEvent(
1462type="raw_response_event",
1463data=ResponseOutputItemAddedEvent(
1464type="response.output_item.added",
1465item=ResponseOutputMessage(
1466id="m_1",
1467content=[],
1468role="assistant",
1469status="in_progress",
1470type="message",
1471),
1472output_index=0,
1473sequence_number=0,
1474),
1475)
1476)
1477
1478result.done()
1479stream = stream_agent_response(context, result)
1480
1481event = await anext(stream)
1482
1483assert isinstance(event, ThreadItemDoneEvent)
1484assert context.workflow_item is None
1485assert event.item.type == "workflow"
1486assert event.item.workflow.summary == CustomSummary(title="Test")
1487
1488try:
1489while True:
1490await anext(stream)
1491except StopAsyncIteration:
1492pass