openai/openai-python

Public

mirrored fromhttps://github.com/openai/openai-pythonAvailable

CodeCommitsIssuesPull requestsActionsInsightsSecurity
codex/gate-pypi-publish

Branches

Tags

  • No tags available.
0Branches0Tags
Go to file
Add file
Code

Clone

HTTPS

Download ZIP

tests/lib/test_bedrock.py

488lines · modecode

1from __future__ import annotations
2
3import json
4from typing import Any, Union, Protocol, cast
5
6import httpx
7import pytest
8from httpx import URL
9from respx import MockRouter
10
11from openai import OpenAIError, NotFoundError
12from tests.utils import update_env
13from openai._types import Omit
14from openai.lib.bedrock import BedrockOpenAI, AsyncBedrockOpenAI
15
16Client = Union[BedrockOpenAI, AsyncBedrockOpenAI]
17
18RESPONSE_BODY: dict[str, Any] = {
19 "id": "resp_123",
20 "object": "response",
21 "created_at": 0,
22 "status": "completed",
23 "background": False,
24 "error": None,
25 "incomplete_details": None,
26 "instructions": None,
27 "max_output_tokens": None,
28 "max_tool_calls": None,
29 "model": "gpt-4o",
30 "output": [],
31 "parallel_tool_calls": True,
32 "previous_response_id": None,
33 "prompt_cache_key": None,
34 "reasoning": {"effort": None, "summary": None},
35 "safety_identifier": None,
36 "service_tier": "default",
37 "store": True,
38 "temperature": 1.0,
39 "text": {"format": {"type": "text"}, "verbosity": "medium"},
40 "tool_choice": "auto",
41 "tools": [],
42 "top_logprobs": 0,
43 "top_p": 1.0,
44 "truncation": "disabled",
45 "usage": {
46 "input_tokens": 0,
47 "input_tokens_details": {"cached_tokens": 0},
48 "output_tokens": 0,
49 "output_tokens_details": {"reasoning_tokens": 0},
50 "total_tokens": 0,
51 },
52 "user": None,
53 "metadata": {},
54}
55COMPACTED_RESPONSE_BODY: dict[str, Any] = {
56 "id": "resp_123",
57 "created_at": 0,
58 "object": "response.compaction",
59 "output": [],
60 "usage": RESPONSE_BODY["usage"],
61}
62INPUT_ITEMS_BODY: dict[str, Any] = {
63 "object": "list",
64 "data": [],
65 "first_id": "item_123",
66 "last_id": "item_123",
67 "has_more": False,
68}
69INPUT_TOKENS_BODY: dict[str, Any] = {
70 "object": "response.input_tokens",
71 "input_tokens": 1,
72}
73
74
75class MockRequestCall(Protocol):
76 request: httpx.Request
77
78
79def make_sync_client(**kwargs: Any) -> BedrockOpenAI:
80 return BedrockOpenAI(http_client=httpx.Client(trust_env=False), **kwargs)
81
82
83def make_async_client(**kwargs: Any) -> AsyncBedrockOpenAI:
84 return AsyncBedrockOpenAI(http_client=httpx.AsyncClient(trust_env=False), **kwargs)
85
86
87def response_created_sse() -> str:
88 event: dict[str, Any] = {"type": "response.created", "sequence_number": 0, "response": RESPONSE_BODY}
89 return f"event: response.created\ndata: {json.dumps(event)}\n\n"
90
91
92@pytest.mark.parametrize("client_cls", [BedrockOpenAI, AsyncBedrockOpenAI])
93def test_region_derived_base_url(client_cls: type[Client]) -> None:
94 with update_env(AWS_BEDROCK_BASE_URL=Omit(), AWS_REGION="us-east-1", AWS_DEFAULT_REGION=Omit()):
95 client = (
96 make_sync_client(api_key="token") if client_cls is BedrockOpenAI else make_async_client(api_key="token")
97 )
98
99 assert client.base_url == URL("https://bedrock-mantle.us-east-1.api.aws/openai/v1/")
100
101
102@pytest.mark.parametrize("client_cls", [BedrockOpenAI, AsyncBedrockOpenAI])
103def test_bedrock_config_precedence(client_cls: type[Client]) -> None:
104 with update_env(
105 AWS_BEDROCK_BASE_URL="https://env.example.com/openai/v1",
106 AWS_BEARER_TOKEN_BEDROCK="env token",
107 AWS_REGION="us-east-1",
108 AWS_DEFAULT_REGION="us-west-2",
109 ):
110 client = (
111 make_sync_client(
112 base_url="https://explicit.example.com/openai/v1/responses",
113 api_key="explicit token",
114 )
115 if client_cls is BedrockOpenAI
116 else make_async_client(
117 base_url="https://explicit.example.com/openai/v1/responses",
118 api_key="explicit token",
119 )
120 )
121
122 assert client.base_url == URL("https://explicit.example.com/openai/v1/")
123 assert client.api_key == "explicit token"
124
125
126@pytest.mark.parametrize("client_cls", [BedrockOpenAI, AsyncBedrockOpenAI])
127def test_bedrock_region_precedence(client_cls: type[Client]) -> None:
128 with update_env(AWS_BEDROCK_BASE_URL=Omit(), AWS_REGION="us-east-1", AWS_DEFAULT_REGION="us-west-2"):
129 explicit_region_client = (
130 make_sync_client(aws_region="eu-west-1", api_key="token")
131 if client_cls is BedrockOpenAI
132 else make_async_client(aws_region="eu-west-1", api_key="token")
133 )
134 aws_region_client = (
135 make_sync_client(api_key="token") if client_cls is BedrockOpenAI else make_async_client(api_key="token")
136 )
137
138 with update_env(AWS_BEDROCK_BASE_URL=Omit(), AWS_REGION=Omit(), AWS_DEFAULT_REGION="us-west-2"):
139 default_region_client = (
140 make_sync_client(api_key="token") if client_cls is BedrockOpenAI else make_async_client(api_key="token")
141 )
142
143 assert explicit_region_client.base_url == URL("https://bedrock-mantle.eu-west-1.api.aws/openai/v1/")
144 assert aws_region_client.base_url == URL("https://bedrock-mantle.us-east-1.api.aws/openai/v1/")
145 assert default_region_client.base_url == URL("https://bedrock-mantle.us-west-2.api.aws/openai/v1/")
146
147
148@pytest.mark.parametrize("client_cls", [BedrockOpenAI, AsyncBedrockOpenAI])
149def test_normalizes_responses_url(client_cls: type[Client]) -> None:
150 client = (
151 make_sync_client(base_url="https://example.com/openai/v1/responses", api_key="token")
152 if client_cls is BedrockOpenAI
153 else make_async_client(base_url="https://example.com/openai/v1/responses", api_key="token")
154 )
155
156 assert client.base_url == URL("https://example.com/openai/v1/")
157
158
159@pytest.mark.parametrize("client_cls", [BedrockOpenAI, AsyncBedrockOpenAI])
160def test_requires_endpoint_configuration(client_cls: type[Client]) -> None:
161 with update_env(AWS_BEDROCK_BASE_URL=Omit(), AWS_REGION=Omit(), AWS_DEFAULT_REGION=Omit()):
162 with pytest.raises(OpenAIError, match="Must provide one of the `base_url` or `aws_region`"):
163 client_cls(api_key="token")
164
165
166@pytest.mark.parametrize("client_cls", [BedrockOpenAI, AsyncBedrockOpenAI])
167def test_does_not_use_openai_api_key(client_cls: type[Client]) -> None:
168 with update_env(
169 OPENAI_API_KEY="openai token",
170 AWS_BEARER_TOKEN_BEDROCK=Omit(),
171 AWS_BEDROCK_BASE_URL="https://example.com/openai/v1",
172 ):
173 with pytest.raises(OpenAIError, match="AWS_BEARER_TOKEN_BEDROCK"):
174 client_cls()
175
176
177@pytest.mark.parametrize("client_cls", [BedrockOpenAI, AsyncBedrockOpenAI])
178def test_rejects_static_token_and_provider(client_cls: type[Client]) -> None:
179 with pytest.raises(OpenAIError, match="mutually exclusive"):
180 client_cls(
181 base_url="https://example.com/openai/v1",
182 api_key="token",
183 bedrock_token_provider=lambda: "provider token",
184 )
185
186
187@pytest.mark.parametrize("client_cls", [BedrockOpenAI, AsyncBedrockOpenAI])
188def test_requires_refreshable_tokens_to_use_provider_option(client_cls: type[Client]) -> None:
189 with pytest.raises(OpenAIError, match="bedrock_token_provider"):
190 client_cls(
191 base_url="https://example.com/openai/v1",
192 api_key=lambda: "provider token", # type: ignore[arg-type]
193 )
194
195
196@pytest.mark.respx()
197def test_token_provider_refresh_sync(respx_mock: MockRouter) -> None:
198 respx_mock.post("https://example.com/openai/v1/responses").mock(
199 side_effect=[
200 httpx.Response(500, json={"error": "server error"}),
201 httpx.Response(200, json=RESPONSE_BODY),
202 ]
203 )
204 tokens = iter(["first", "second"])
205 client = BedrockOpenAI(
206 base_url="https://example.com/openai/v1",
207 bedrock_token_provider=lambda: next(tokens),
208 http_client=httpx.Client(trust_env=False),
209 max_retries=1,
210 )
211
212 client.responses.create(model="gpt-4o", input="hello")
213
214 calls = cast("list[MockRequestCall]", respx_mock.calls)
215 assert calls[0].request.headers["Authorization"] == "Bearer first"
216 assert calls[1].request.headers["Authorization"] == "Bearer second"
217
218
219@pytest.mark.asyncio
220@pytest.mark.respx()
221async def test_token_provider_refresh_async(respx_mock: MockRouter) -> None:
222 respx_mock.post("https://example.com/openai/v1/responses").mock(
223 side_effect=[
224 httpx.Response(500, json={"error": "server error"}),
225 httpx.Response(200, json=RESPONSE_BODY),
226 ]
227 )
228 tokens = iter(["first", "second"])
229 client = AsyncBedrockOpenAI(
230 base_url="https://example.com/openai/v1",
231 bedrock_token_provider=lambda: next(tokens),
232 http_client=httpx.AsyncClient(trust_env=False),
233 max_retries=1,
234 )
235
236 await client.responses.create(model="gpt-4o", input="hello")
237
238 calls = cast("list[MockRequestCall]", respx_mock.calls)
239 assert calls[0].request.headers["Authorization"] == "Bearer first"
240 assert calls[1].request.headers["Authorization"] == "Bearer second"
241
242
243def test_preserves_token_provider_across_with_options() -> None:
244 client = BedrockOpenAI(
245 base_url="https://example.com/openai/v1",
246 bedrock_token_provider=lambda: "provider token",
247 http_client=httpx.Client(trust_env=False),
248 )
249
250 copied_client = client.with_options(timeout=1)
251
252 assert copied_client._refresh_api_key() == "provider token"
253
254
255@pytest.mark.parametrize("client_cls", [BedrockOpenAI, AsyncBedrockOpenAI])
256def test_with_options_api_key_replaces_token_provider(client_cls: type[Client]) -> None:
257 client = (
258 make_sync_client(
259 base_url="https://example.com/openai/v1",
260 bedrock_token_provider=lambda: "provider token",
261 )
262 if client_cls is BedrockOpenAI
263 else make_async_client(
264 base_url="https://example.com/openai/v1",
265 bedrock_token_provider=lambda: "provider token",
266 )
267 )
268
269 copied_client = client.with_options(api_key="static token")
270
271 assert copied_client.api_key == "static token"
272 assert copied_client._bedrock_token_provider is None
273
274
275@pytest.mark.parametrize("client_cls", [BedrockOpenAI, AsyncBedrockOpenAI])
276def test_with_options_aws_region_recomputes_region_derived_base_url(client_cls: type[Client]) -> None:
277 with update_env(AWS_BEDROCK_BASE_URL=Omit(), AWS_REGION=Omit(), AWS_DEFAULT_REGION=Omit()):
278 client = (
279 make_sync_client(aws_region="us-east-1", api_key="token")
280 if client_cls is BedrockOpenAI
281 else make_async_client(aws_region="us-east-1", api_key="token")
282 )
283
284 copied_client = client.with_options(aws_region="eu-west-1")
285
286 assert copied_client.aws_region == "eu-west-1"
287 assert copied_client.base_url == URL("https://bedrock-mantle.eu-west-1.api.aws/openai/v1/")
288
289
290@pytest.mark.parametrize("client_cls", [BedrockOpenAI, AsyncBedrockOpenAI])
291def test_with_options_aws_region_keeps_explicit_base_url(client_cls: type[Client]) -> None:
292 client = (
293 make_sync_client(base_url="https://example.com/openai/v1", aws_region="us-east-1", api_key="token")
294 if client_cls is BedrockOpenAI
295 else make_async_client(base_url="https://example.com/openai/v1", aws_region="us-east-1", api_key="token")
296 )
297
298 copied_client = client.with_options(aws_region="eu-west-1")
299
300 assert copied_client.aws_region == "eu-west-1"
301 assert copied_client.base_url == URL("https://example.com/openai/v1/")
302
303
304@pytest.mark.parametrize(
305 "copy_kwargs",
306 [
307 {"admin_api_key": "admin token"},
308 {"workload_identity": cast(Any, {})},
309 ],
310)
311def test_rejects_non_bedrock_copy_auth(copy_kwargs: dict[str, Any]) -> None:
312 client = make_sync_client(base_url="https://example.com/openai/v1", api_key="token")
313
314 with pytest.raises(OpenAIError, match="only supports Bedrock bearer token authentication"):
315 client.with_options(**copy_kwargs)
316
317
318@pytest.mark.respx()
319def test_passes_non_responses_resources_through(respx_mock: MockRouter) -> None:
320 respx_mock.post("https://example.com/openai/v1/chat/completions").mock(
321 return_value=httpx.Response(
322 404,
323 json={"error": {"message": "AWS does not support chat completions here"}},
324 headers={"x-request-id": "req_chat"},
325 )
326 )
327 client = make_sync_client(base_url="https://example.com/openai/v1", api_key="token")
328
329 with pytest.raises(NotFoundError, match="AWS does not support chat completions here") as exc:
330 client.chat.completions.create(model="gpt-4o", messages=[])
331
332 assert exc.value.request_id == "req_chat"
333 calls = cast("list[MockRequestCall]", respx_mock.calls)
334 assert calls[0].request.url == URL("https://example.com/openai/v1/chat/completions")
335
336
337@pytest.mark.asyncio
338@pytest.mark.respx()
339async def test_passes_non_responses_resources_through_async(respx_mock: MockRouter) -> None:
340 respx_mock.post("https://example.com/openai/v1/chat/completions").mock(
341 return_value=httpx.Response(
342 404,
343 json={"error": {"message": "AWS does not support chat completions here"}},
344 headers={"x-request-id": "req_chat"},
345 )
346 )
347 client = make_async_client(base_url="https://example.com/openai/v1", api_key="token")
348
349 with pytest.raises(NotFoundError, match="AWS does not support chat completions here") as exc:
350 await client.chat.completions.create(model="gpt-4o", messages=[])
351
352 assert exc.value.request_id == "req_chat"
353 calls = cast("list[MockRequestCall]", respx_mock.calls)
354 assert calls[0].request.url == URL("https://example.com/openai/v1/chat/completions")
355
356
357@pytest.mark.respx()
358def test_passes_responses_features_through(respx_mock: MockRouter) -> None:
359 respx_mock.post("https://example.com/openai/v1/responses").mock(
360 return_value=httpx.Response(200, json=RESPONSE_BODY)
361 )
362 client = make_sync_client(base_url="https://example.com/openai/v1", api_key="token")
363
364 response = client.responses.create(
365 model="gpt-4o",
366 input="hello",
367 tools=[{"type": "web_search_preview"}], # type: ignore[list-item]
368 )
369
370 assert response.id == "resp_123"
371 calls = cast("list[MockRequestCall]", respx_mock.calls)
372 assert json.loads(calls[0].request.content)["tools"] == [{"type": "web_search_preview"}]
373
374
375@pytest.mark.respx()
376def test_passes_admin_security_routes_through(respx_mock: MockRouter) -> None:
377 respx_mock.get("https://example.com/openai/v1/organization/invites").mock(
378 return_value=httpx.Response(
379 404,
380 json={"error": {"message": "AWS does not support organization invites here"}},
381 headers={"x-request-id": "req_admin"},
382 )
383 )
384 client = make_sync_client(base_url="https://example.com/openai/v1", api_key="token")
385
386 with pytest.raises(NotFoundError, match="AWS does not support organization invites here"):
387 list(client.admin.organization.invites.list())
388
389 calls = cast("list[MockRequestCall]", respx_mock.calls)
390 assert calls[0].request.headers["Authorization"] == "Bearer token"
391
392
393@pytest.mark.respx()
394def test_refreshes_token_provider_for_admin_security_routes(respx_mock: MockRouter) -> None:
395 respx_mock.get("https://example.com/openai/v1/organization/invites").mock(
396 side_effect=[
397 httpx.Response(500, json={"error": "server error"}),
398 httpx.Response(
399 404,
400 json={"error": {"message": "AWS does not support organization invites here"}},
401 headers={"x-request-id": "req_admin"},
402 ),
403 ]
404 )
405 tokens = iter(["first", "second"])
406 client = BedrockOpenAI(
407 base_url="https://example.com/openai/v1",
408 bedrock_token_provider=lambda: next(tokens),
409 http_client=httpx.Client(trust_env=False),
410 max_retries=1,
411 )
412
413 with pytest.raises(NotFoundError, match="AWS does not support organization invites here"):
414 list(client.admin.organization.invites.list())
415
416 calls = cast("list[MockRequestCall]", respx_mock.calls)
417 assert calls[0].request.headers["Authorization"] == "Bearer first"
418 assert calls[1].request.headers["Authorization"] == "Bearer second"
419
420
421@pytest.mark.respx()
422def test_allows_responses_http_methods(respx_mock: MockRouter) -> None:
423 respx_mock.post("https://example.com/openai/v1/responses").mock(
424 return_value=httpx.Response(200, json=RESPONSE_BODY)
425 )
426 respx_mock.get("https://example.com/openai/v1/responses/resp_123?starting_after=1&stream=true").mock(
427 return_value=httpx.Response(200, text=response_created_sse(), headers={"Content-Type": "text/event-stream"})
428 )
429 respx_mock.get("https://example.com/openai/v1/responses/resp_123?stream=true").mock(
430 return_value=httpx.Response(200, text=response_created_sse(), headers={"Content-Type": "text/event-stream"})
431 )
432 respx_mock.get("https://example.com/openai/v1/responses/resp_123").mock(
433 return_value=httpx.Response(200, json=RESPONSE_BODY)
434 )
435 respx_mock.post("https://example.com/openai/v1/responses/resp_123/cancel").mock(
436 return_value=httpx.Response(200, json=RESPONSE_BODY)
437 )
438 respx_mock.post("https://example.com/openai/v1/responses/compact").mock(
439 return_value=httpx.Response(200, json=COMPACTED_RESPONSE_BODY)
440 )
441 respx_mock.get("https://example.com/openai/v1/responses/resp_123/input_items").mock(
442 return_value=httpx.Response(200, json=INPUT_ITEMS_BODY)
443 )
444 respx_mock.post("https://example.com/openai/v1/responses/input_tokens").mock(
445 return_value=httpx.Response(200, json=INPUT_TOKENS_BODY)
446 )
447 client = make_sync_client(base_url="https://example.com/openai/v1", api_key="token")
448
449 assert client.responses.create(model="gpt-4o", input="hello", background=True).id == "resp_123"
450 assert client.responses.retrieve("resp_123").id == "resp_123"
451 assert [event.type for event in client.responses.retrieve("resp_123", starting_after=1, stream=True)] == [
452 "response.created"
453 ]
454 assert [event.type for event in client.responses.retrieve("resp_123", stream=True)] == ["response.created"]
455 assert client.responses.cancel("resp_123").id == "resp_123"
456 assert client.responses.compact(model="gpt-4o").object == "response.compaction"
457 assert list(client.responses.input_items.list("resp_123")) == []
458 assert client.responses.input_tokens.count(model="gpt-4o", input="hello").input_tokens == 1
459
460 calls = cast("list[MockRequestCall]", respx_mock.calls)
461 assert {call.request.headers["Authorization"] for call in calls} == {"Bearer token"}
462
463
464@pytest.mark.respx()
465def test_allows_sse_and_response_wrappers(respx_mock: MockRouter) -> None:
466 respx_mock.post("https://example.com/openai/v1/responses").mock(
467 side_effect=[
468 httpx.Response(200, text=response_created_sse(), headers={"Content-Type": "text/event-stream"}),
469 httpx.Response(200, json=RESPONSE_BODY),
470 httpx.Response(200, json=RESPONSE_BODY),
471 ]
472 )
473 client = make_sync_client(base_url="https://example.com/openai/v1", api_key="token")
474
475 events = list(client.responses.create(model="gpt-4o", input="hello", stream=True))
476 assert [event.type for event in events] == ["response.created"]
477
478 raw_response = client.responses.with_raw_response.create(model="gpt-4o", input="hello")
479 assert raw_response.parse().id == "resp_123"
480
481 with client.responses.with_streaming_response.create(model="gpt-4o", input="hello") as response:
482 assert response.parse().id == "resp_123"
483
484
485def test_does_not_guard_responses_connect() -> None:
486 client = make_sync_client(base_url="https://example.com/openai/v1", api_key="token")
487
488 assert client.responses.connect() is not None
489