openai/openai-python

Public

mirrored fromhttps://github.com/openai/openai-pythonAvailable

CodeCommitsIssuesPull requestsActionsInsightsSecurity
v1.61.0

Branches

Tags

  • No tags available.
0Branches0Tags
Go to file
Add file
Code

Clone

HTTPS

Download ZIP

examples/realtime/push_to_talk_app.py

281lines · modecode

1#!/usr/bin/env uv run
2####################################################################
3# Sample TUI app with a push to talk interface to the Realtime API #
4# If you have `uv` installed and the `OPENAI_API_KEY` #
5# environment variable set, you can run this example with just #
6# #
7# `./examples/realtime/push_to_talk_app.py` #
8####################################################################
9#
10# /// script
11# requires-python = ">=3.9"
12# dependencies = [
13# "textual",
14# "numpy",
15# "pyaudio",
16# "pydub",
17# "sounddevice",
18# "openai[realtime]",
19# ]
20#
21# [tool.uv.sources]
22# openai = { path = "../../", editable = true }
23# ///
24from __future__ import annotations
25
26import base64
27import asyncio
28from typing import Any, cast
29from typing_extensions import override
30
31from textual import events
32from audio_util import CHANNELS, SAMPLE_RATE, AudioPlayerAsync
33from textual.app import App, ComposeResult
34from textual.widgets import Button, Static, RichLog
35from textual.reactive import reactive
36from textual.containers import Container
37
38from openai import AsyncOpenAI
39from openai.types.beta.realtime.session import Session
40from openai.resources.beta.realtime.realtime import AsyncRealtimeConnection
41
42
43class SessionDisplay(Static):
44 """A widget that shows the current session ID."""
45
46 session_id = reactive("")
47
48 @override
49 def render(self) -> str:
50 return f"Session ID: {self.session_id}" if self.session_id else "Connecting..."
51
52
53class AudioStatusIndicator(Static):
54 """A widget that shows the current audio recording status."""
55
56 is_recording = reactive(False)
57
58 @override
59 def render(self) -> str:
60 status = (
61 "🔴 Recording... (Press K to stop)" if self.is_recording else "⚪ Press K to start recording (Q to quit)"
62 )
63 return status
64
65
66class RealtimeApp(App[None]):
67 CSS = """
68 Screen {
69 background: #1a1b26; /* Dark blue-grey background */
70 }
71
72 Container {
73 border: double rgb(91, 164, 91);
74 }
75
76 Horizontal {
77 width: 100%;
78 }
79
80 #input-container {
81 height: 5; /* Explicit height for input container */
82 margin: 1 1;
83 padding: 1 2;
84 }
85
86 Input {
87 width: 80%;
88 height: 3; /* Explicit height for input */
89 }
90
91 Button {
92 width: 20%;
93 height: 3; /* Explicit height for button */
94 }
95
96 #bottom-pane {
97 width: 100%;
98 height: 82%; /* Reduced to make room for session display */
99 border: round rgb(205, 133, 63);
100 content-align: center middle;
101 }
102
103 #status-indicator {
104 height: 3;
105 content-align: center middle;
106 background: #2a2b36;
107 border: solid rgb(91, 164, 91);
108 margin: 1 1;
109 }
110
111 #session-display {
112 height: 3;
113 content-align: center middle;
114 background: #2a2b36;
115 border: solid rgb(91, 164, 91);
116 margin: 1 1;
117 }
118
119 Static {
120 color: white;
121 }
122 """
123
124 client: AsyncOpenAI
125 should_send_audio: asyncio.Event
126 audio_player: AudioPlayerAsync
127 last_audio_item_id: str | None
128 connection: AsyncRealtimeConnection | None
129 session: Session | None
130 connected: asyncio.Event
131
132 def __init__(self) -> None:
133 super().__init__()
134 self.connection = None
135 self.session = None
136 self.client = AsyncOpenAI()
137 self.audio_player = AudioPlayerAsync()
138 self.last_audio_item_id = None
139 self.should_send_audio = asyncio.Event()
140 self.connected = asyncio.Event()
141
142 @override
143 def compose(self) -> ComposeResult:
144 """Create child widgets for the app."""
145 with Container():
146 yield SessionDisplay(id="session-display")
147 yield AudioStatusIndicator(id="status-indicator")
148 yield RichLog(id="bottom-pane", wrap=True, highlight=True, markup=True)
149
150 async def on_mount(self) -> None:
151 self.run_worker(self.handle_realtime_connection())
152 self.run_worker(self.send_mic_audio())
153
154 async def handle_realtime_connection(self) -> None:
155 async with self.client.beta.realtime.connect(model="gpt-4o-realtime-preview") as conn:
156 self.connection = conn
157 self.connected.set()
158
159 # note: this is the default and can be omitted
160 # if you want to manually handle VAD yourself, then set `'turn_detection': None`
161 await conn.session.update(session={"turn_detection": {"type": "server_vad"}})
162
163 acc_items: dict[str, Any] = {}
164
165 async for event in conn:
166 if event.type == "session.created":
167 self.session = event.session
168 session_display = self.query_one(SessionDisplay)
169 assert event.session.id is not None
170 session_display.session_id = event.session.id
171 continue
172
173 if event.type == "session.updated":
174 self.session = event.session
175 continue
176
177 if event.type == "response.audio.delta":
178 if event.item_id != self.last_audio_item_id:
179 self.audio_player.reset_frame_count()
180 self.last_audio_item_id = event.item_id
181
182 bytes_data = base64.b64decode(event.delta)
183 self.audio_player.add_data(bytes_data)
184 continue
185
186 if event.type == "response.audio_transcript.delta":
187 try:
188 text = acc_items[event.item_id]
189 except KeyError:
190 acc_items[event.item_id] = event.delta
191 else:
192 acc_items[event.item_id] = text + event.delta
193
194 # Clear and update the entire content because RichLog otherwise treats each delta as a new line
195 bottom_pane = self.query_one("#bottom-pane", RichLog)
196 bottom_pane.clear()
197 bottom_pane.write(acc_items[event.item_id])
198 continue
199
200 async def _get_connection(self) -> AsyncRealtimeConnection:
201 await self.connected.wait()
202 assert self.connection is not None
203 return self.connection
204
205 async def send_mic_audio(self) -> None:
206 import sounddevice as sd # type: ignore
207
208 sent_audio = False
209
210 device_info = sd.query_devices()
211 print(device_info)
212
213 read_size = int(SAMPLE_RATE * 0.02)
214
215 stream = sd.InputStream(
216 channels=CHANNELS,
217 samplerate=SAMPLE_RATE,
218 dtype="int16",
219 )
220 stream.start()
221
222 status_indicator = self.query_one(AudioStatusIndicator)
223
224 try:
225 while True:
226 if stream.read_available < read_size:
227 await asyncio.sleep(0)
228 continue
229
230 await self.should_send_audio.wait()
231 status_indicator.is_recording = True
232
233 data, _ = stream.read(read_size)
234
235 connection = await self._get_connection()
236 if not sent_audio:
237 asyncio.create_task(connection.send({"type": "response.cancel"}))
238 sent_audio = True
239
240 await connection.input_audio_buffer.append(audio=base64.b64encode(cast(Any, data)).decode("utf-8"))
241
242 await asyncio.sleep(0)
243 except KeyboardInterrupt:
244 pass
245 finally:
246 stream.stop()
247 stream.close()
248
249 async def on_key(self, event: events.Key) -> None:
250 """Handle key press events."""
251 if event.key == "enter":
252 self.query_one(Button).press()
253 return
254
255 if event.key == "q":
256 self.exit()
257 return
258
259 if event.key == "k":
260 status_indicator = self.query_one(AudioStatusIndicator)
261 if status_indicator.is_recording:
262 self.should_send_audio.clear()
263 status_indicator.is_recording = False
264
265 if self.session and self.session.turn_detection is None:
266 # The default in the API is that the model will automatically detect when the user has
267 # stopped talking and then start responding itself.
268 #
269 # However if we're in manual `turn_detection` mode then we need to
270 # manually tell the model to commit the audio buffer and start responding.
271 conn = await self._get_connection()
272 await conn.input_audio_buffer.commit()
273 await conn.response.create()
274 else:
275 self.should_send_audio.set()
276 status_indicator.is_recording = True
277
278
279if __name__ == "__main__":
280 app = RealtimeApp()
281 app.run()
282