openai/openai-python
Publicmirrored fromhttps://github.com/openai/openai-pythonAvailable
tests/lib/test_bedrock.py
488lines · modecode
| 1 | from __future__ import annotations |
| 2 | |
| 3 | import json |
| 4 | from typing import Any, Union, Protocol, cast |
| 5 | |
| 6 | import httpx |
| 7 | import pytest |
| 8 | from httpx import URL |
| 9 | from respx import MockRouter |
| 10 | |
| 11 | from openai import OpenAIError, NotFoundError |
| 12 | from tests.utils import update_env |
| 13 | from openai._types import Omit |
| 14 | from openai.lib.bedrock import BedrockOpenAI, AsyncBedrockOpenAI |
| 15 | |
| 16 | Client = Union[BedrockOpenAI, AsyncBedrockOpenAI] |
| 17 | |
| 18 | RESPONSE_BODY: dict[str, Any] = { |
| 19 | "id": "resp_123", |
| 20 | "object": "response", |
| 21 | "created_at": 0, |
| 22 | "status": "completed", |
| 23 | "background": False, |
| 24 | "error": None, |
| 25 | "incomplete_details": None, |
| 26 | "instructions": None, |
| 27 | "max_output_tokens": None, |
| 28 | "max_tool_calls": None, |
| 29 | "model": "gpt-4o", |
| 30 | "output": [], |
| 31 | "parallel_tool_calls": True, |
| 32 | "previous_response_id": None, |
| 33 | "prompt_cache_key": None, |
| 34 | "reasoning": {"effort": None, "summary": None}, |
| 35 | "safety_identifier": None, |
| 36 | "service_tier": "default", |
| 37 | "store": True, |
| 38 | "temperature": 1.0, |
| 39 | "text": {"format": {"type": "text"}, "verbosity": "medium"}, |
| 40 | "tool_choice": "auto", |
| 41 | "tools": [], |
| 42 | "top_logprobs": 0, |
| 43 | "top_p": 1.0, |
| 44 | "truncation": "disabled", |
| 45 | "usage": { |
| 46 | "input_tokens": 0, |
| 47 | "input_tokens_details": {"cached_tokens": 0}, |
| 48 | "output_tokens": 0, |
| 49 | "output_tokens_details": {"reasoning_tokens": 0}, |
| 50 | "total_tokens": 0, |
| 51 | }, |
| 52 | "user": None, |
| 53 | "metadata": {}, |
| 54 | } |
| 55 | COMPACTED_RESPONSE_BODY: dict[str, Any] = { |
| 56 | "id": "resp_123", |
| 57 | "created_at": 0, |
| 58 | "object": "response.compaction", |
| 59 | "output": [], |
| 60 | "usage": RESPONSE_BODY["usage"], |
| 61 | } |
| 62 | INPUT_ITEMS_BODY: dict[str, Any] = { |
| 63 | "object": "list", |
| 64 | "data": [], |
| 65 | "first_id": "item_123", |
| 66 | "last_id": "item_123", |
| 67 | "has_more": False, |
| 68 | } |
| 69 | INPUT_TOKENS_BODY: dict[str, Any] = { |
| 70 | "object": "response.input_tokens", |
| 71 | "input_tokens": 1, |
| 72 | } |
| 73 | |
| 74 | |
| 75 | class MockRequestCall(Protocol): |
| 76 | request: httpx.Request |
| 77 | |
| 78 | |
| 79 | def make_sync_client(**kwargs: Any) -> BedrockOpenAI: |
| 80 | return BedrockOpenAI(http_client=httpx.Client(trust_env=False), **kwargs) |
| 81 | |
| 82 | |
| 83 | def make_async_client(**kwargs: Any) -> AsyncBedrockOpenAI: |
| 84 | return AsyncBedrockOpenAI(http_client=httpx.AsyncClient(trust_env=False), **kwargs) |
| 85 | |
| 86 | |
| 87 | def response_created_sse() -> str: |
| 88 | event: dict[str, Any] = {"type": "response.created", "sequence_number": 0, "response": RESPONSE_BODY} |
| 89 | return f"event: response.created\ndata: {json.dumps(event)}\n\n" |
| 90 | |
| 91 | |
| 92 | @pytest.mark.parametrize("client_cls", [BedrockOpenAI, AsyncBedrockOpenAI]) |
| 93 | def test_region_derived_base_url(client_cls: type[Client]) -> None: |
| 94 | with update_env(AWS_BEDROCK_BASE_URL=Omit(), AWS_REGION="us-east-1", AWS_DEFAULT_REGION=Omit()): |
| 95 | client = ( |
| 96 | make_sync_client(api_key="token") if client_cls is BedrockOpenAI else make_async_client(api_key="token") |
| 97 | ) |
| 98 | |
| 99 | assert client.base_url == URL("https://bedrock-mantle.us-east-1.api.aws/openai/v1/") |
| 100 | |
| 101 | |
| 102 | @pytest.mark.parametrize("client_cls", [BedrockOpenAI, AsyncBedrockOpenAI]) |
| 103 | def test_bedrock_config_precedence(client_cls: type[Client]) -> None: |
| 104 | with update_env( |
| 105 | AWS_BEDROCK_BASE_URL="https://env.example.com/openai/v1", |
| 106 | AWS_BEARER_TOKEN_BEDROCK="env token", |
| 107 | AWS_REGION="us-east-1", |
| 108 | AWS_DEFAULT_REGION="us-west-2", |
| 109 | ): |
| 110 | client = ( |
| 111 | make_sync_client( |
| 112 | base_url="https://explicit.example.com/openai/v1/responses", |
| 113 | api_key="explicit token", |
| 114 | ) |
| 115 | if client_cls is BedrockOpenAI |
| 116 | else make_async_client( |
| 117 | base_url="https://explicit.example.com/openai/v1/responses", |
| 118 | api_key="explicit token", |
| 119 | ) |
| 120 | ) |
| 121 | |
| 122 | assert client.base_url == URL("https://explicit.example.com/openai/v1/") |
| 123 | assert client.api_key == "explicit token" |
| 124 | |
| 125 | |
| 126 | @pytest.mark.parametrize("client_cls", [BedrockOpenAI, AsyncBedrockOpenAI]) |
| 127 | def test_bedrock_region_precedence(client_cls: type[Client]) -> None: |
| 128 | with update_env(AWS_BEDROCK_BASE_URL=Omit(), AWS_REGION="us-east-1", AWS_DEFAULT_REGION="us-west-2"): |
| 129 | explicit_region_client = ( |
| 130 | make_sync_client(aws_region="eu-west-1", api_key="token") |
| 131 | if client_cls is BedrockOpenAI |
| 132 | else make_async_client(aws_region="eu-west-1", api_key="token") |
| 133 | ) |
| 134 | aws_region_client = ( |
| 135 | make_sync_client(api_key="token") if client_cls is BedrockOpenAI else make_async_client(api_key="token") |
| 136 | ) |
| 137 | |
| 138 | with update_env(AWS_BEDROCK_BASE_URL=Omit(), AWS_REGION=Omit(), AWS_DEFAULT_REGION="us-west-2"): |
| 139 | default_region_client = ( |
| 140 | make_sync_client(api_key="token") if client_cls is BedrockOpenAI else make_async_client(api_key="token") |
| 141 | ) |
| 142 | |
| 143 | assert explicit_region_client.base_url == URL("https://bedrock-mantle.eu-west-1.api.aws/openai/v1/") |
| 144 | assert aws_region_client.base_url == URL("https://bedrock-mantle.us-east-1.api.aws/openai/v1/") |
| 145 | assert default_region_client.base_url == URL("https://bedrock-mantle.us-west-2.api.aws/openai/v1/") |
| 146 | |
| 147 | |
| 148 | @pytest.mark.parametrize("client_cls", [BedrockOpenAI, AsyncBedrockOpenAI]) |
| 149 | def test_normalizes_responses_url(client_cls: type[Client]) -> None: |
| 150 | client = ( |
| 151 | make_sync_client(base_url="https://example.com/openai/v1/responses", api_key="token") |
| 152 | if client_cls is BedrockOpenAI |
| 153 | else make_async_client(base_url="https://example.com/openai/v1/responses", api_key="token") |
| 154 | ) |
| 155 | |
| 156 | assert client.base_url == URL("https://example.com/openai/v1/") |
| 157 | |
| 158 | |
| 159 | @pytest.mark.parametrize("client_cls", [BedrockOpenAI, AsyncBedrockOpenAI]) |
| 160 | def test_requires_endpoint_configuration(client_cls: type[Client]) -> None: |
| 161 | with update_env(AWS_BEDROCK_BASE_URL=Omit(), AWS_REGION=Omit(), AWS_DEFAULT_REGION=Omit()): |
| 162 | with pytest.raises(OpenAIError, match="Must provide one of the `base_url` or `aws_region`"): |
| 163 | client_cls(api_key="token") |
| 164 | |
| 165 | |
| 166 | @pytest.mark.parametrize("client_cls", [BedrockOpenAI, AsyncBedrockOpenAI]) |
| 167 | def test_does_not_use_openai_api_key(client_cls: type[Client]) -> None: |
| 168 | with update_env( |
| 169 | OPENAI_API_KEY="openai token", |
| 170 | AWS_BEARER_TOKEN_BEDROCK=Omit(), |
| 171 | AWS_BEDROCK_BASE_URL="https://example.com/openai/v1", |
| 172 | ): |
| 173 | with pytest.raises(OpenAIError, match="AWS_BEARER_TOKEN_BEDROCK"): |
| 174 | client_cls() |
| 175 | |
| 176 | |
| 177 | @pytest.mark.parametrize("client_cls", [BedrockOpenAI, AsyncBedrockOpenAI]) |
| 178 | def test_rejects_static_token_and_provider(client_cls: type[Client]) -> None: |
| 179 | with pytest.raises(OpenAIError, match="mutually exclusive"): |
| 180 | client_cls( |
| 181 | base_url="https://example.com/openai/v1", |
| 182 | api_key="token", |
| 183 | bedrock_token_provider=lambda: "provider token", |
| 184 | ) |
| 185 | |
| 186 | |
| 187 | @pytest.mark.parametrize("client_cls", [BedrockOpenAI, AsyncBedrockOpenAI]) |
| 188 | def test_requires_refreshable_tokens_to_use_provider_option(client_cls: type[Client]) -> None: |
| 189 | with pytest.raises(OpenAIError, match="bedrock_token_provider"): |
| 190 | client_cls( |
| 191 | base_url="https://example.com/openai/v1", |
| 192 | api_key=lambda: "provider token", # type: ignore[arg-type] |
| 193 | ) |
| 194 | |
| 195 | |
| 196 | @pytest.mark.respx() |
| 197 | def test_token_provider_refresh_sync(respx_mock: MockRouter) -> None: |
| 198 | respx_mock.post("https://example.com/openai/v1/responses").mock( |
| 199 | side_effect=[ |
| 200 | httpx.Response(500, json={"error": "server error"}), |
| 201 | httpx.Response(200, json=RESPONSE_BODY), |
| 202 | ] |
| 203 | ) |
| 204 | tokens = iter(["first", "second"]) |
| 205 | client = BedrockOpenAI( |
| 206 | base_url="https://example.com/openai/v1", |
| 207 | bedrock_token_provider=lambda: next(tokens), |
| 208 | http_client=httpx.Client(trust_env=False), |
| 209 | max_retries=1, |
| 210 | ) |
| 211 | |
| 212 | client.responses.create(model="gpt-4o", input="hello") |
| 213 | |
| 214 | calls = cast("list[MockRequestCall]", respx_mock.calls) |
| 215 | assert calls[0].request.headers["Authorization"] == "Bearer first" |
| 216 | assert calls[1].request.headers["Authorization"] == "Bearer second" |
| 217 | |
| 218 | |
| 219 | @pytest.mark.asyncio |
| 220 | @pytest.mark.respx() |
| 221 | async def test_token_provider_refresh_async(respx_mock: MockRouter) -> None: |
| 222 | respx_mock.post("https://example.com/openai/v1/responses").mock( |
| 223 | side_effect=[ |
| 224 | httpx.Response(500, json={"error": "server error"}), |
| 225 | httpx.Response(200, json=RESPONSE_BODY), |
| 226 | ] |
| 227 | ) |
| 228 | tokens = iter(["first", "second"]) |
| 229 | client = AsyncBedrockOpenAI( |
| 230 | base_url="https://example.com/openai/v1", |
| 231 | bedrock_token_provider=lambda: next(tokens), |
| 232 | http_client=httpx.AsyncClient(trust_env=False), |
| 233 | max_retries=1, |
| 234 | ) |
| 235 | |
| 236 | await client.responses.create(model="gpt-4o", input="hello") |
| 237 | |
| 238 | calls = cast("list[MockRequestCall]", respx_mock.calls) |
| 239 | assert calls[0].request.headers["Authorization"] == "Bearer first" |
| 240 | assert calls[1].request.headers["Authorization"] == "Bearer second" |
| 241 | |
| 242 | |
| 243 | def test_preserves_token_provider_across_with_options() -> None: |
| 244 | client = BedrockOpenAI( |
| 245 | base_url="https://example.com/openai/v1", |
| 246 | bedrock_token_provider=lambda: "provider token", |
| 247 | http_client=httpx.Client(trust_env=False), |
| 248 | ) |
| 249 | |
| 250 | copied_client = client.with_options(timeout=1) |
| 251 | |
| 252 | assert copied_client._refresh_api_key() == "provider token" |
| 253 | |
| 254 | |
| 255 | @pytest.mark.parametrize("client_cls", [BedrockOpenAI, AsyncBedrockOpenAI]) |
| 256 | def test_with_options_api_key_replaces_token_provider(client_cls: type[Client]) -> None: |
| 257 | client = ( |
| 258 | make_sync_client( |
| 259 | base_url="https://example.com/openai/v1", |
| 260 | bedrock_token_provider=lambda: "provider token", |
| 261 | ) |
| 262 | if client_cls is BedrockOpenAI |
| 263 | else make_async_client( |
| 264 | base_url="https://example.com/openai/v1", |
| 265 | bedrock_token_provider=lambda: "provider token", |
| 266 | ) |
| 267 | ) |
| 268 | |
| 269 | copied_client = client.with_options(api_key="static token") |
| 270 | |
| 271 | assert copied_client.api_key == "static token" |
| 272 | assert copied_client._bedrock_token_provider is None |
| 273 | |
| 274 | |
| 275 | @pytest.mark.parametrize("client_cls", [BedrockOpenAI, AsyncBedrockOpenAI]) |
| 276 | def test_with_options_aws_region_recomputes_region_derived_base_url(client_cls: type[Client]) -> None: |
| 277 | with update_env(AWS_BEDROCK_BASE_URL=Omit(), AWS_REGION=Omit(), AWS_DEFAULT_REGION=Omit()): |
| 278 | client = ( |
| 279 | make_sync_client(aws_region="us-east-1", api_key="token") |
| 280 | if client_cls is BedrockOpenAI |
| 281 | else make_async_client(aws_region="us-east-1", api_key="token") |
| 282 | ) |
| 283 | |
| 284 | copied_client = client.with_options(aws_region="eu-west-1") |
| 285 | |
| 286 | assert copied_client.aws_region == "eu-west-1" |
| 287 | assert copied_client.base_url == URL("https://bedrock-mantle.eu-west-1.api.aws/openai/v1/") |
| 288 | |
| 289 | |
| 290 | @pytest.mark.parametrize("client_cls", [BedrockOpenAI, AsyncBedrockOpenAI]) |
| 291 | def test_with_options_aws_region_keeps_explicit_base_url(client_cls: type[Client]) -> None: |
| 292 | client = ( |
| 293 | make_sync_client(base_url="https://example.com/openai/v1", aws_region="us-east-1", api_key="token") |
| 294 | if client_cls is BedrockOpenAI |
| 295 | else make_async_client(base_url="https://example.com/openai/v1", aws_region="us-east-1", api_key="token") |
| 296 | ) |
| 297 | |
| 298 | copied_client = client.with_options(aws_region="eu-west-1") |
| 299 | |
| 300 | assert copied_client.aws_region == "eu-west-1" |
| 301 | assert copied_client.base_url == URL("https://example.com/openai/v1/") |
| 302 | |
| 303 | |
| 304 | @pytest.mark.parametrize( |
| 305 | "copy_kwargs", |
| 306 | [ |
| 307 | {"admin_api_key": "admin token"}, |
| 308 | {"workload_identity": cast(Any, {})}, |
| 309 | ], |
| 310 | ) |
| 311 | def test_rejects_non_bedrock_copy_auth(copy_kwargs: dict[str, Any]) -> None: |
| 312 | client = make_sync_client(base_url="https://example.com/openai/v1", api_key="token") |
| 313 | |
| 314 | with pytest.raises(OpenAIError, match="only supports Bedrock bearer token authentication"): |
| 315 | client.with_options(**copy_kwargs) |
| 316 | |
| 317 | |
| 318 | @pytest.mark.respx() |
| 319 | def test_passes_non_responses_resources_through(respx_mock: MockRouter) -> None: |
| 320 | respx_mock.post("https://example.com/openai/v1/chat/completions").mock( |
| 321 | return_value=httpx.Response( |
| 322 | 404, |
| 323 | json={"error": {"message": "AWS does not support chat completions here"}}, |
| 324 | headers={"x-request-id": "req_chat"}, |
| 325 | ) |
| 326 | ) |
| 327 | client = make_sync_client(base_url="https://example.com/openai/v1", api_key="token") |
| 328 | |
| 329 | with pytest.raises(NotFoundError, match="AWS does not support chat completions here") as exc: |
| 330 | client.chat.completions.create(model="gpt-4o", messages=[]) |
| 331 | |
| 332 | assert exc.value.request_id == "req_chat" |
| 333 | calls = cast("list[MockRequestCall]", respx_mock.calls) |
| 334 | assert calls[0].request.url == URL("https://example.com/openai/v1/chat/completions") |
| 335 | |
| 336 | |
| 337 | @pytest.mark.asyncio |
| 338 | @pytest.mark.respx() |
| 339 | async def test_passes_non_responses_resources_through_async(respx_mock: MockRouter) -> None: |
| 340 | respx_mock.post("https://example.com/openai/v1/chat/completions").mock( |
| 341 | return_value=httpx.Response( |
| 342 | 404, |
| 343 | json={"error": {"message": "AWS does not support chat completions here"}}, |
| 344 | headers={"x-request-id": "req_chat"}, |
| 345 | ) |
| 346 | ) |
| 347 | client = make_async_client(base_url="https://example.com/openai/v1", api_key="token") |
| 348 | |
| 349 | with pytest.raises(NotFoundError, match="AWS does not support chat completions here") as exc: |
| 350 | await client.chat.completions.create(model="gpt-4o", messages=[]) |
| 351 | |
| 352 | assert exc.value.request_id == "req_chat" |
| 353 | calls = cast("list[MockRequestCall]", respx_mock.calls) |
| 354 | assert calls[0].request.url == URL("https://example.com/openai/v1/chat/completions") |
| 355 | |
| 356 | |
| 357 | @pytest.mark.respx() |
| 358 | def test_passes_responses_features_through(respx_mock: MockRouter) -> None: |
| 359 | respx_mock.post("https://example.com/openai/v1/responses").mock( |
| 360 | return_value=httpx.Response(200, json=RESPONSE_BODY) |
| 361 | ) |
| 362 | client = make_sync_client(base_url="https://example.com/openai/v1", api_key="token") |
| 363 | |
| 364 | response = client.responses.create( |
| 365 | model="gpt-4o", |
| 366 | input="hello", |
| 367 | tools=[{"type": "web_search_preview"}], # type: ignore[list-item] |
| 368 | ) |
| 369 | |
| 370 | assert response.id == "resp_123" |
| 371 | calls = cast("list[MockRequestCall]", respx_mock.calls) |
| 372 | assert json.loads(calls[0].request.content)["tools"] == [{"type": "web_search_preview"}] |
| 373 | |
| 374 | |
| 375 | @pytest.mark.respx() |
| 376 | def test_passes_admin_security_routes_through(respx_mock: MockRouter) -> None: |
| 377 | respx_mock.get("https://example.com/openai/v1/organization/invites").mock( |
| 378 | return_value=httpx.Response( |
| 379 | 404, |
| 380 | json={"error": {"message": "AWS does not support organization invites here"}}, |
| 381 | headers={"x-request-id": "req_admin"}, |
| 382 | ) |
| 383 | ) |
| 384 | client = make_sync_client(base_url="https://example.com/openai/v1", api_key="token") |
| 385 | |
| 386 | with pytest.raises(NotFoundError, match="AWS does not support organization invites here"): |
| 387 | list(client.admin.organization.invites.list()) |
| 388 | |
| 389 | calls = cast("list[MockRequestCall]", respx_mock.calls) |
| 390 | assert calls[0].request.headers["Authorization"] == "Bearer token" |
| 391 | |
| 392 | |
| 393 | @pytest.mark.respx() |
| 394 | def test_refreshes_token_provider_for_admin_security_routes(respx_mock: MockRouter) -> None: |
| 395 | respx_mock.get("https://example.com/openai/v1/organization/invites").mock( |
| 396 | side_effect=[ |
| 397 | httpx.Response(500, json={"error": "server error"}), |
| 398 | httpx.Response( |
| 399 | 404, |
| 400 | json={"error": {"message": "AWS does not support organization invites here"}}, |
| 401 | headers={"x-request-id": "req_admin"}, |
| 402 | ), |
| 403 | ] |
| 404 | ) |
| 405 | tokens = iter(["first", "second"]) |
| 406 | client = BedrockOpenAI( |
| 407 | base_url="https://example.com/openai/v1", |
| 408 | bedrock_token_provider=lambda: next(tokens), |
| 409 | http_client=httpx.Client(trust_env=False), |
| 410 | max_retries=1, |
| 411 | ) |
| 412 | |
| 413 | with pytest.raises(NotFoundError, match="AWS does not support organization invites here"): |
| 414 | list(client.admin.organization.invites.list()) |
| 415 | |
| 416 | calls = cast("list[MockRequestCall]", respx_mock.calls) |
| 417 | assert calls[0].request.headers["Authorization"] == "Bearer first" |
| 418 | assert calls[1].request.headers["Authorization"] == "Bearer second" |
| 419 | |
| 420 | |
| 421 | @pytest.mark.respx() |
| 422 | def test_allows_responses_http_methods(respx_mock: MockRouter) -> None: |
| 423 | respx_mock.post("https://example.com/openai/v1/responses").mock( |
| 424 | return_value=httpx.Response(200, json=RESPONSE_BODY) |
| 425 | ) |
| 426 | respx_mock.get("https://example.com/openai/v1/responses/resp_123?starting_after=1&stream=true").mock( |
| 427 | return_value=httpx.Response(200, text=response_created_sse(), headers={"Content-Type": "text/event-stream"}) |
| 428 | ) |
| 429 | respx_mock.get("https://example.com/openai/v1/responses/resp_123?stream=true").mock( |
| 430 | return_value=httpx.Response(200, text=response_created_sse(), headers={"Content-Type": "text/event-stream"}) |
| 431 | ) |
| 432 | respx_mock.get("https://example.com/openai/v1/responses/resp_123").mock( |
| 433 | return_value=httpx.Response(200, json=RESPONSE_BODY) |
| 434 | ) |
| 435 | respx_mock.post("https://example.com/openai/v1/responses/resp_123/cancel").mock( |
| 436 | return_value=httpx.Response(200, json=RESPONSE_BODY) |
| 437 | ) |
| 438 | respx_mock.post("https://example.com/openai/v1/responses/compact").mock( |
| 439 | return_value=httpx.Response(200, json=COMPACTED_RESPONSE_BODY) |
| 440 | ) |
| 441 | respx_mock.get("https://example.com/openai/v1/responses/resp_123/input_items").mock( |
| 442 | return_value=httpx.Response(200, json=INPUT_ITEMS_BODY) |
| 443 | ) |
| 444 | respx_mock.post("https://example.com/openai/v1/responses/input_tokens").mock( |
| 445 | return_value=httpx.Response(200, json=INPUT_TOKENS_BODY) |
| 446 | ) |
| 447 | client = make_sync_client(base_url="https://example.com/openai/v1", api_key="token") |
| 448 | |
| 449 | assert client.responses.create(model="gpt-4o", input="hello", background=True).id == "resp_123" |
| 450 | assert client.responses.retrieve("resp_123").id == "resp_123" |
| 451 | assert [event.type for event in client.responses.retrieve("resp_123", starting_after=1, stream=True)] == [ |
| 452 | "response.created" |
| 453 | ] |
| 454 | assert [event.type for event in client.responses.retrieve("resp_123", stream=True)] == ["response.created"] |
| 455 | assert client.responses.cancel("resp_123").id == "resp_123" |
| 456 | assert client.responses.compact(model="gpt-4o").object == "response.compaction" |
| 457 | assert list(client.responses.input_items.list("resp_123")) == [] |
| 458 | assert client.responses.input_tokens.count(model="gpt-4o", input="hello").input_tokens == 1 |
| 459 | |
| 460 | calls = cast("list[MockRequestCall]", respx_mock.calls) |
| 461 | assert {call.request.headers["Authorization"] for call in calls} == {"Bearer token"} |
| 462 | |
| 463 | |
| 464 | @pytest.mark.respx() |
| 465 | def test_allows_sse_and_response_wrappers(respx_mock: MockRouter) -> None: |
| 466 | respx_mock.post("https://example.com/openai/v1/responses").mock( |
| 467 | side_effect=[ |
| 468 | httpx.Response(200, text=response_created_sse(), headers={"Content-Type": "text/event-stream"}), |
| 469 | httpx.Response(200, json=RESPONSE_BODY), |
| 470 | httpx.Response(200, json=RESPONSE_BODY), |
| 471 | ] |
| 472 | ) |
| 473 | client = make_sync_client(base_url="https://example.com/openai/v1", api_key="token") |
| 474 | |
| 475 | events = list(client.responses.create(model="gpt-4o", input="hello", stream=True)) |
| 476 | assert [event.type for event in events] == ["response.created"] |
| 477 | |
| 478 | raw_response = client.responses.with_raw_response.create(model="gpt-4o", input="hello") |
| 479 | assert raw_response.parse().id == "resp_123" |
| 480 | |
| 481 | with client.responses.with_streaming_response.create(model="gpt-4o", input="hello") as response: |
| 482 | assert response.parse().id == "resp_123" |
| 483 | |
| 484 | |
| 485 | def test_does_not_guard_responses_connect() -> None: |
| 486 | client = make_sync_client(base_url="https://example.com/openai/v1", api_key="token") |
| 487 | |
| 488 | assert client.responses.connect() is not None |
| 489 | |