openai/openai-python

Public

mirrored fromhttps://github.com/openai/openai-pythonAvailable

CodeCommitsIssuesPull requestsActionsInsightsSecurity
v1.97.2

Branches

Tags

  • No tags available.
0Branches0Tags
Go to file
Add file
Code

Clone

HTTPS

Download ZIP

examples/realtime/push_to_talk_app.py

283lines · modecode

1#!/usr/bin/env uv run
2####################################################################
3# Sample TUI app with a push to talk interface to the Realtime API #
4# If you have `uv` installed and the `OPENAI_API_KEY` #
5# environment variable set, you can run this example with just #
6# #
7# `./examples/realtime/push_to_talk_app.py` #
8# #
9# On Mac, you'll also need `brew install portaudio ffmpeg` #
10####################################################################
11#
12# /// script
13# requires-python = ">=3.9"
14# dependencies = [
15# "textual",
16# "numpy",
17# "pyaudio",
18# "pydub",
19# "sounddevice",
20# "openai[realtime]",
21# ]
22#
23# [tool.uv.sources]
24# openai = { path = "../../", editable = true }
25# ///
26from __future__ import annotations
27
28import base64
29import asyncio
30from typing import Any, cast
31from typing_extensions import override
32
33from textual import events
34from audio_util import CHANNELS, SAMPLE_RATE, AudioPlayerAsync
35from textual.app import App, ComposeResult
36from textual.widgets import Button, Static, RichLog
37from textual.reactive import reactive
38from textual.containers import Container
39
40from openai import AsyncOpenAI
41from openai.types.beta.realtime.session import Session
42from openai.resources.beta.realtime.realtime import AsyncRealtimeConnection
43
44
45class SessionDisplay(Static):
46 """A widget that shows the current session ID."""
47
48 session_id = reactive("")
49
50 @override
51 def render(self) -> str:
52 return f"Session ID: {self.session_id}" if self.session_id else "Connecting..."
53
54
55class AudioStatusIndicator(Static):
56 """A widget that shows the current audio recording status."""
57
58 is_recording = reactive(False)
59
60 @override
61 def render(self) -> str:
62 status = (
63 "🔴 Recording... (Press K to stop)" if self.is_recording else "⚪ Press K to start recording (Q to quit)"
64 )
65 return status
66
67
68class RealtimeApp(App[None]):
69 CSS = """
70 Screen {
71 background: #1a1b26; /* Dark blue-grey background */
72 }
73
74 Container {
75 border: double rgb(91, 164, 91);
76 }
77
78 Horizontal {
79 width: 100%;
80 }
81
82 #input-container {
83 height: 5; /* Explicit height for input container */
84 margin: 1 1;
85 padding: 1 2;
86 }
87
88 Input {
89 width: 80%;
90 height: 3; /* Explicit height for input */
91 }
92
93 Button {
94 width: 20%;
95 height: 3; /* Explicit height for button */
96 }
97
98 #bottom-pane {
99 width: 100%;
100 height: 82%; /* Reduced to make room for session display */
101 border: round rgb(205, 133, 63);
102 content-align: center middle;
103 }
104
105 #status-indicator {
106 height: 3;
107 content-align: center middle;
108 background: #2a2b36;
109 border: solid rgb(91, 164, 91);
110 margin: 1 1;
111 }
112
113 #session-display {
114 height: 3;
115 content-align: center middle;
116 background: #2a2b36;
117 border: solid rgb(91, 164, 91);
118 margin: 1 1;
119 }
120
121 Static {
122 color: white;
123 }
124 """
125
126 client: AsyncOpenAI
127 should_send_audio: asyncio.Event
128 audio_player: AudioPlayerAsync
129 last_audio_item_id: str | None
130 connection: AsyncRealtimeConnection | None
131 session: Session | None
132 connected: asyncio.Event
133
134 def __init__(self) -> None:
135 super().__init__()
136 self.connection = None
137 self.session = None
138 self.client = AsyncOpenAI()
139 self.audio_player = AudioPlayerAsync()
140 self.last_audio_item_id = None
141 self.should_send_audio = asyncio.Event()
142 self.connected = asyncio.Event()
143
144 @override
145 def compose(self) -> ComposeResult:
146 """Create child widgets for the app."""
147 with Container():
148 yield SessionDisplay(id="session-display")
149 yield AudioStatusIndicator(id="status-indicator")
150 yield RichLog(id="bottom-pane", wrap=True, highlight=True, markup=True)
151
152 async def on_mount(self) -> None:
153 self.run_worker(self.handle_realtime_connection())
154 self.run_worker(self.send_mic_audio())
155
156 async def handle_realtime_connection(self) -> None:
157 async with self.client.beta.realtime.connect(model="gpt-4o-realtime-preview") as conn:
158 self.connection = conn
159 self.connected.set()
160
161 # note: this is the default and can be omitted
162 # if you want to manually handle VAD yourself, then set `'turn_detection': None`
163 await conn.session.update(session={"turn_detection": {"type": "server_vad"}})
164
165 acc_items: dict[str, Any] = {}
166
167 async for event in conn:
168 if event.type == "session.created":
169 self.session = event.session
170 session_display = self.query_one(SessionDisplay)
171 assert event.session.id is not None
172 session_display.session_id = event.session.id
173 continue
174
175 if event.type == "session.updated":
176 self.session = event.session
177 continue
178
179 if event.type == "response.audio.delta":
180 if event.item_id != self.last_audio_item_id:
181 self.audio_player.reset_frame_count()
182 self.last_audio_item_id = event.item_id
183
184 bytes_data = base64.b64decode(event.delta)
185 self.audio_player.add_data(bytes_data)
186 continue
187
188 if event.type == "response.audio_transcript.delta":
189 try:
190 text = acc_items[event.item_id]
191 except KeyError:
192 acc_items[event.item_id] = event.delta
193 else:
194 acc_items[event.item_id] = text + event.delta
195
196 # Clear and update the entire content because RichLog otherwise treats each delta as a new line
197 bottom_pane = self.query_one("#bottom-pane", RichLog)
198 bottom_pane.clear()
199 bottom_pane.write(acc_items[event.item_id])
200 continue
201
202 async def _get_connection(self) -> AsyncRealtimeConnection:
203 await self.connected.wait()
204 assert self.connection is not None
205 return self.connection
206
207 async def send_mic_audio(self) -> None:
208 import sounddevice as sd # type: ignore
209
210 sent_audio = False
211
212 device_info = sd.query_devices()
213 print(device_info)
214
215 read_size = int(SAMPLE_RATE * 0.02)
216
217 stream = sd.InputStream(
218 channels=CHANNELS,
219 samplerate=SAMPLE_RATE,
220 dtype="int16",
221 )
222 stream.start()
223
224 status_indicator = self.query_one(AudioStatusIndicator)
225
226 try:
227 while True:
228 if stream.read_available < read_size:
229 await asyncio.sleep(0)
230 continue
231
232 await self.should_send_audio.wait()
233 status_indicator.is_recording = True
234
235 data, _ = stream.read(read_size)
236
237 connection = await self._get_connection()
238 if not sent_audio:
239 asyncio.create_task(connection.send({"type": "response.cancel"}))
240 sent_audio = True
241
242 await connection.input_audio_buffer.append(audio=base64.b64encode(cast(Any, data)).decode("utf-8"))
243
244 await asyncio.sleep(0)
245 except KeyboardInterrupt:
246 pass
247 finally:
248 stream.stop()
249 stream.close()
250
251 async def on_key(self, event: events.Key) -> None:
252 """Handle key press events."""
253 if event.key == "enter":
254 self.query_one(Button).press()
255 return
256
257 if event.key == "q":
258 self.exit()
259 return
260
261 if event.key == "k":
262 status_indicator = self.query_one(AudioStatusIndicator)
263 if status_indicator.is_recording:
264 self.should_send_audio.clear()
265 status_indicator.is_recording = False
266
267 if self.session and self.session.turn_detection is None:
268 # The default in the API is that the model will automatically detect when the user has
269 # stopped talking and then start responding itself.
270 #
271 # However if we're in manual `turn_detection` mode then we need to
272 # manually tell the model to commit the audio buffer and start responding.
273 conn = await self._get_connection()
274 await conn.input_audio_buffer.commit()
275 await conn.response.create()
276 else:
277 self.should_send_audio.set()
278 status_indicator.is_recording = True
279
280
281if __name__ == "__main__":
282 app = RealtimeApp()
283 app.run()
284