openai/openai-python

Public

mirrored from https://github.com/openai/openai-pythonAvailable

CodeCommitsIssuesPull requestsActionsInsightsSecurity
v1.28.1

Branches

Tags

  • No tags available.
0Branches0Tags
Go to file
Add file
Code

Clone

HTTPS

Download ZIP

helpers.md

238lines · modecode

1# Streaming Helpers
2
3OpenAI supports streaming responses when interacting with the [Assistant](#assistant-streaming-api) APIs.
4
5## Assistant Streaming API
6
7OpenAI supports streaming responses from Assistants. The SDK provides convenience wrappers around the API
8so you can subscribe to the types of events you are interested in as well as receive accumulated responses.
9
10More information can be found in the documentation: [Assistant Streaming](https://platform.openai.com/docs/assistants/overview?lang=python)
11
12#### An example of creating a run and subscribing to some events
13
14You can subscribe to events by creating an event handler class and overloading the relevant event handlers.
15
16```python
17from typing_extensions import override
18from openai import AssistantEventHandler, OpenAI
19from openai.types.beta.threads import Text, TextDelta
20from openai.types.beta.threads.runs import ToolCall, ToolCallDelta
21
22client = openai.OpenAI()
23
24# First, we create a EventHandler class to define
25# how we want to handle the events in the response stream.
26
27class EventHandler(AssistantEventHandler):
28 @override
29 def on_text_created(self, text: Text) -> None:
30 print(f"\nassistant > ", end="", flush=True)
31
32 @override
33 def on_text_delta(self, delta: TextDelta, snapshot: Text):
34 print(delta.value, end="", flush=True)
35
36 @override
37 def on_tool_call_created(self, tool_call: ToolCall):
38 print(f"\nassistant > {tool_call.type}\n", flush=True)
39
40 @override
41 def on_tool_call_delta(self, delta: ToolCallDelta, snapshot: ToolCall):
42 if delta.type == "code_interpreter" and delta.code_interpreter:
43 if delta.code_interpreter.input:
44 print(delta.code_interpreter.input, end="", flush=True)
45 if delta.code_interpreter.outputs:
46 print(f"\n\noutput >", flush=True)
47 for output in delta.code_interpreter.outputs:
48 if output.type == "logs":
49 print(f"\n{output.logs}", flush=True)
50
51# Then, we use the `stream` SDK helper
52# with the `EventHandler` class to create the Run
53# and stream the response.
54
55with client.beta.threads.runs.stream(
56 thread_id="thread_id",
57 assistant_id="assistant_id",
58 event_handler=EventHandler(),
59) as stream:
60 stream.until_done()
61```
62
63#### An example of iterating over events
64
65You can also iterate over all the streamed events.
66
67```python
68with client.beta.threads.runs.stream(
69 thread_id=thread.id,
70 assistant_id=assistant.id
71) as stream:
72 for event in stream:
73 # Print the text from text delta events
74 if event.event == "thread.message.delta" and event.data.delta.content:
75 print(event.data.delta.content[0].text)
76```
77
78#### An example of iterating over text
79
80You can also iterate over just the text deltas received
81
82```python
83with client.beta.threads.runs.stream(
84 thread_id=thread.id,
85 assistant_id=assistant.id
86) as stream:
87 for text in stream.text_deltas:
88 print(text)
89```
90
91### Creating Streams
92
93There are three helper methods for creating streams:
94
95```python
96client.beta.threads.runs.stream()
97```
98
99This method can be used to start and stream the response to an existing run with an associated thread
100that is already populated with messages.
101
102```python
103client.beta.threads.create_and_run_stream()
104```
105
106This method can be used to add a message to a thread, start a run and then stream the response.
107
108```python
109client.beta.threads.runs.submit_tool_outputs_stream()
110```
111
112This method can be used to submit a tool output to a run waiting on the output and start a stream.
113
114### Assistant Events
115
116The assistant API provides events you can subscribe to for the following events.
117
118```python
119def on_event(self, event: AssistantStreamEvent)
120```
121
122This allows you to subscribe to all the possible raw events sent by the OpenAI streaming API.
123In many cases it will be more convenient to subscribe to a more specific set of events for your use case.
124
125More information on the types of events can be found here: [Events](https://platform.openai.com/docs/api-reference/assistants-streaming/events)
126
127```python
128def on_run_step_created(self, run_step: RunStep)
129def on_run_step_delta(self, delta: RunStepDelta, snapshot: RunStep)
130def on_run_step_done(self, run_step: RunStep)
131```
132
133These events allow you to subscribe to the creation, delta and completion of a RunStep.
134
135For more information on how Runs and RunSteps work see the documentation [Runs and RunSteps](https://platform.openai.com/docs/assistants/how-it-works/runs-and-run-steps)
136
137```python
138def on_message_created(self, message: Message)
139def on_message_delta(self, delta: MessageDelta, snapshot: Message)
140def on_message_done(self, message: Message)
141```
142
143This allows you to subscribe to Message creation, delta and completion events. Messages can contain
144different types of content that can be sent from a model (and events are available for specific content types).
145For convenience, the delta event includes both the incremental update and an accumulated snapshot of the content.
146
147More information on messages can be found
148on in the documentation page [Message](https://platform.openai.com/docs/api-reference/messages/object).
149
150```python
151def on_text_created(self, text: Text)
152def on_text_delta(self, delta: TextDelta, snapshot: Text)
153def on_text_done(self, text: Text)
154```
155
156These events allow you to subscribe to the creation, delta and completion of a Text content (a specific type of message).
157For convenience, the delta event includes both the incremental update and an accumulated snapshot of the content.
158
159```python
160def on_image_file_done(self, image_file: ImageFile)
161```
162
163Image files are not sent incrementally so an event is provided for when a image file is available.
164
165```python
166def on_tool_call_created(self, tool_call: ToolCall)
167def on_tool_call_delta(self, delta: ToolCallDelta, snapshot: ToolCall)
168def on_tool_call_done(self, tool_call: ToolCall)
169```
170
171These events allow you to subscribe to events for the creation, delta and completion of a ToolCall.
172
173More information on tools can be found here [Tools](https://platform.openai.com/docs/assistants/tools)
174
175```python
176def on_end(self)
177```
178
179The last event send when a stream ends.
180
181```python
182def on_timeout(self)
183```
184
185This event is triggered if the request times out.
186
187```python
188def on_exception(self, exception: Exception)
189```
190
191This event is triggered if an exception occurs during streaming.
192
193### Assistant Methods
194
195The assistant streaming object also provides a few methods for convenience:
196
197```python
198def current_event() -> AssistantStreamEvent | None
199def current_run() -> Run | None
200def current_message_snapshot() -> Message | None
201def current_run_step_snapshot() -> RunStep | None
202```
203
204These methods are provided to allow you to access additional context from within event handlers. In many cases
205the handlers should include all the information you need for processing, but if additional context is required it
206can be accessed.
207
208Note: There is not always a relevant context in certain situations (these will be `None` in those cases).
209
210```python
211def get_final_run(self) -> Run
212def get_final_run_steps(self) -> List[RunStep]
213def get_final_messages(self) -> List[Message]
214```
215
216These methods are provided for convenience to collect information at the end of a stream. Calling these events
217will trigger consumption of the stream until completion and then return the relevant accumulated objects.
218
219# Polling Helpers
220
221When interacting with the API some actions such as starting a Run and adding files to vector stores are asynchronous and take time to complete.
222The SDK includes helper functions which will poll the status until it reaches a terminal state and then return the resulting object.
223If an API method results in an action which could benefit from polling there will be a corresponding version of the
224method ending in `_and_poll`.
225
226All methods also allow you to set the polling frequency, how often the API is checked for an update, via a function argument (`poll_interval_ms`).
227
228The polling methods are:
229
230```python
231client.beta.threads.create_and_run_poll(...)
232client.beta.threads.runs.create_and_poll(...)
233client.beta.threads.runs.submit_tool_ouptputs_and_poll(...)
234client.beta.vector_stores.files.upload_and_poll(...)
235client.beta.vector_stores.files.create_and_poll(...)
236client.beta.vector_stores.file_batches.create_and_poll(...)
237client.beta.vector_stores.file_batches.upload_and_poll(...)
238```
239