openai/openai-python

Public

mirrored from https://github.com/openai/openai-pythonAvailable

CodeCommitsIssuesPull requestsActionsInsightsSecurity
v1.107.3

Branches

Tags

  • No tags available.
0Branches0Tags
Go to file
Add file
Code

Clone

HTTPS

Download ZIP

examples/responses/background_streaming_async.py

53lines · modeblame

f588695fstainless-app[bot]1 years ago1import asyncio
2from typing import List
3
4import rich
5from pydantic import BaseModel
6
7from openai import AsyncOpenAI
8
9
10class Step(BaseModel):
11explanation: str
12output: str
13
14
15class MathResponse(BaseModel):
16steps: List[Step]
17final_answer: str
18
19
20async def main() -> None:
21client = AsyncOpenAI()
22id = None
23async with client.responses.stream(
24input="solve 8x + 31 = 2",
25model="gpt-4o-2024-08-06",
26text_format=MathResponse,
27background=True,
28) as stream:
29async for event in stream:
30if event.type == "response.created":
31id = event.response.id
32if "output_text" in event.type:
33rich.print(event)
34if event.sequence_number == 10:
35break
36
37print("Interrupted. Continuing...")
38
39assert id is not None
40async with client.responses.stream(
41response_id=id,
42starting_after=10,
43text_format=MathResponse,
44) as stream:
45async for event in stream:
46if "output_text" in event.type:
47rich.print(event)
48
49rich.print(stream.get_final_response())
50
51
52if __name__ == "__main__":
53asyncio.run(main())