openai/openai-python

Public

mirrored from https://github.com/openai/openai-pythonAvailable

CodeCommitsIssuesPull requestsActionsInsightsSecurity
v1.13.0

Branches

Tags

  • No tags available.
0Branches0Tags
Go to file
Add file
Code

Clone

HTTPS

Download ZIP

src/openai/_base_client.py

1954lines · modecode

1from __future__ import annotations
2
3import json
4import time
5import uuid
6import email
7import asyncio
8import inspect
9import logging
10import platform
11import warnings
12import email.utils
13from types import TracebackType
14from random import random
15from typing import (
16 TYPE_CHECKING,
17 Any,
18 Dict,
19 Type,
20 Union,
21 Generic,
22 Mapping,
23 TypeVar,
24 Iterable,
25 Iterator,
26 Optional,
27 Generator,
28 AsyncIterator,
29 cast,
30 overload,
31)
32from functools import lru_cache
33from typing_extensions import Literal, override, get_origin
34
35import anyio
36import httpx
37import distro
38import pydantic
39from httpx import URL, Limits
40from pydantic import PrivateAttr
41
42from . import _exceptions
43from ._qs import Querystring
44from ._files import to_httpx_files, async_to_httpx_files
45from ._types import (
46 NOT_GIVEN,
47 Body,
48 Omit,
49 Query,
50 Headers,
51 Timeout,
52 NotGiven,
53 ResponseT,
54 Transport,
55 AnyMapping,
56 PostParser,
57 ProxiesTypes,
58 RequestFiles,
59 HttpxSendArgs,
60 AsyncTransport,
61 RequestOptions,
62 ModelBuilderProtocol,
63)
64from ._utils import is_dict, is_list, is_given, is_mapping
65from ._compat import model_copy, model_dump
66from ._models import GenericModel, FinalRequestOptions, validate_type, construct_type
67from ._response import (
68 APIResponse,
69 BaseAPIResponse,
70 AsyncAPIResponse,
71 extract_response_type,
72)
73from ._constants import (
74 DEFAULT_LIMITS,
75 DEFAULT_TIMEOUT,
76 MAX_RETRY_DELAY,
77 DEFAULT_MAX_RETRIES,
78 INITIAL_RETRY_DELAY,
79 RAW_RESPONSE_HEADER,
80 OVERRIDE_CAST_TO_HEADER,
81)
82from ._streaming import Stream, AsyncStream
83from ._exceptions import (
84 APIStatusError,
85 APITimeoutError,
86 APIConnectionError,
87 APIResponseValidationError,
88)
89from ._legacy_response import LegacyAPIResponse
90
91log: logging.Logger = logging.getLogger(__name__)
92
93# TODO: make base page type vars covariant
94SyncPageT = TypeVar("SyncPageT", bound="BaseSyncPage[Any]")
95AsyncPageT = TypeVar("AsyncPageT", bound="BaseAsyncPage[Any]")
96
97
98_T = TypeVar("_T")
99_T_co = TypeVar("_T_co", covariant=True)
100
101_StreamT = TypeVar("_StreamT", bound=Stream[Any])
102_AsyncStreamT = TypeVar("_AsyncStreamT", bound=AsyncStream[Any])
103
104if TYPE_CHECKING:
105 from httpx._config import DEFAULT_TIMEOUT_CONFIG as HTTPX_DEFAULT_TIMEOUT
106else:
107 try:
108 from httpx._config import DEFAULT_TIMEOUT_CONFIG as HTTPX_DEFAULT_TIMEOUT
109 except ImportError:
110 # taken from https://github.com/encode/httpx/blob/3ba5fe0d7ac70222590e759c31442b1cab263791/httpx/_config.py#L366
111 HTTPX_DEFAULT_TIMEOUT = Timeout(5.0)
112
113
114class PageInfo:
115 """Stores the necessary information to build the request to retrieve the next page.
116
117 Either `url` or `params` must be set.
118 """
119
120 url: URL | NotGiven
121 params: Query | NotGiven
122
123 @overload
124 def __init__(
125 self,
126 *,
127 url: URL,
128 ) -> None:
129 ...
130
131 @overload
132 def __init__(
133 self,
134 *,
135 params: Query,
136 ) -> None:
137 ...
138
139 def __init__(
140 self,
141 *,
142 url: URL | NotGiven = NOT_GIVEN,
143 params: Query | NotGiven = NOT_GIVEN,
144 ) -> None:
145 self.url = url
146 self.params = params
147
148
149class BasePage(GenericModel, Generic[_T]):
150 """
151 Defines the core interface for pagination.
152
153 Type Args:
154 ModelT: The pydantic model that represents an item in the response.
155
156 Methods:
157 has_next_page(): Check if there is another page available
158 next_page_info(): Get the necessary information to make a request for the next page
159 """
160
161 _options: FinalRequestOptions = PrivateAttr()
162 _model: Type[_T] = PrivateAttr()
163
164 def has_next_page(self) -> bool:
165 items = self._get_page_items()
166 if not items:
167 return False
168 return self.next_page_info() is not None
169
170 def next_page_info(self) -> Optional[PageInfo]:
171 ...
172
173 def _get_page_items(self) -> Iterable[_T]: # type: ignore[empty-body]
174 ...
175
176 def _params_from_url(self, url: URL) -> httpx.QueryParams:
177 # TODO: do we have to preprocess params here?
178 return httpx.QueryParams(cast(Any, self._options.params)).merge(url.params)
179
180 def _info_to_options(self, info: PageInfo) -> FinalRequestOptions:
181 options = model_copy(self._options)
182 options._strip_raw_response_header()
183
184 if not isinstance(info.params, NotGiven):
185 options.params = {**options.params, **info.params}
186 return options
187
188 if not isinstance(info.url, NotGiven):
189 params = self._params_from_url(info.url)
190 url = info.url.copy_with(params=params)
191 options.params = dict(url.params)
192 options.url = str(url)
193 return options
194
195 raise ValueError("Unexpected PageInfo state")
196
197
198class BaseSyncPage(BasePage[_T], Generic[_T]):
199 _client: SyncAPIClient = pydantic.PrivateAttr()
200
201 def _set_private_attributes(
202 self,
203 client: SyncAPIClient,
204 model: Type[_T],
205 options: FinalRequestOptions,
206 ) -> None:
207 self._model = model
208 self._client = client
209 self._options = options
210
211 # Pydantic uses a custom `__iter__` method to support casting BaseModels
212 # to dictionaries. e.g. dict(model).
213 # As we want to support `for item in page`, this is inherently incompatible
214 # with the default pydantic behaviour. It is not possible to support both
215 # use cases at once. Fortunately, this is not a big deal as all other pydantic
216 # methods should continue to work as expected as there is an alternative method
217 # to cast a model to a dictionary, model.dict(), which is used internally
218 # by pydantic.
219 def __iter__(self) -> Iterator[_T]: # type: ignore
220 for page in self.iter_pages():
221 for item in page._get_page_items():
222 yield item
223
224 def iter_pages(self: SyncPageT) -> Iterator[SyncPageT]:
225 page = self
226 while True:
227 yield page
228 if page.has_next_page():
229 page = page.get_next_page()
230 else:
231 return
232
233 def get_next_page(self: SyncPageT) -> SyncPageT:
234 info = self.next_page_info()
235 if not info:
236 raise RuntimeError(
237 "No next page expected; please check `.has_next_page()` before calling `.get_next_page()`."
238 )
239
240 options = self._info_to_options(info)
241 return self._client._request_api_list(self._model, page=self.__class__, options=options)
242
243
244class AsyncPaginator(Generic[_T, AsyncPageT]):
245 def __init__(
246 self,
247 client: AsyncAPIClient,
248 options: FinalRequestOptions,
249 page_cls: Type[AsyncPageT],
250 model: Type[_T],
251 ) -> None:
252 self._model = model
253 self._client = client
254 self._options = options
255 self._page_cls = page_cls
256
257 def __await__(self) -> Generator[Any, None, AsyncPageT]:
258 return self._get_page().__await__()
259
260 async def _get_page(self) -> AsyncPageT:
261 def _parser(resp: AsyncPageT) -> AsyncPageT:
262 resp._set_private_attributes(
263 model=self._model,
264 options=self._options,
265 client=self._client,
266 )
267 return resp
268
269 self._options.post_parser = _parser
270
271 return await self._client.request(self._page_cls, self._options)
272
273 async def __aiter__(self) -> AsyncIterator[_T]:
274 # https://github.com/microsoft/pyright/issues/3464
275 page = cast(
276 AsyncPageT,
277 await self, # type: ignore
278 )
279 async for item in page:
280 yield item
281
282
283class BaseAsyncPage(BasePage[_T], Generic[_T]):
284 _client: AsyncAPIClient = pydantic.PrivateAttr()
285
286 def _set_private_attributes(
287 self,
288 model: Type[_T],
289 client: AsyncAPIClient,
290 options: FinalRequestOptions,
291 ) -> None:
292 self._model = model
293 self._client = client
294 self._options = options
295
296 async def __aiter__(self) -> AsyncIterator[_T]:
297 async for page in self.iter_pages():
298 for item in page._get_page_items():
299 yield item
300
301 async def iter_pages(self: AsyncPageT) -> AsyncIterator[AsyncPageT]:
302 page = self
303 while True:
304 yield page
305 if page.has_next_page():
306 page = await page.get_next_page()
307 else:
308 return
309
310 async def get_next_page(self: AsyncPageT) -> AsyncPageT:
311 info = self.next_page_info()
312 if not info:
313 raise RuntimeError(
314 "No next page expected; please check `.has_next_page()` before calling `.get_next_page()`."
315 )
316
317 options = self._info_to_options(info)
318 return await self._client._request_api_list(self._model, page=self.__class__, options=options)
319
320
321_HttpxClientT = TypeVar("_HttpxClientT", bound=Union[httpx.Client, httpx.AsyncClient])
322_DefaultStreamT = TypeVar("_DefaultStreamT", bound=Union[Stream[Any], AsyncStream[Any]])
323
324
325class BaseClient(Generic[_HttpxClientT, _DefaultStreamT]):
326 _client: _HttpxClientT
327 _version: str
328 _base_url: URL
329 max_retries: int
330 timeout: Union[float, Timeout, None]
331 _limits: httpx.Limits
332 _proxies: ProxiesTypes | None
333 _transport: Transport | AsyncTransport | None
334 _strict_response_validation: bool
335 _idempotency_header: str | None
336 _default_stream_cls: type[_DefaultStreamT] | None = None
337
338 def __init__(
339 self,
340 *,
341 version: str,
342 base_url: str | URL,
343 _strict_response_validation: bool,
344 max_retries: int = DEFAULT_MAX_RETRIES,
345 timeout: float | Timeout | None = DEFAULT_TIMEOUT,
346 limits: httpx.Limits,
347 transport: Transport | AsyncTransport | None,
348 proxies: ProxiesTypes | None,
349 custom_headers: Mapping[str, str] | None = None,
350 custom_query: Mapping[str, object] | None = None,
351 ) -> None:
352 self._version = version
353 self._base_url = self._enforce_trailing_slash(URL(base_url))
354 self.max_retries = max_retries
355 self.timeout = timeout
356 self._limits = limits
357 self._proxies = proxies
358 self._transport = transport
359 self._custom_headers = custom_headers or {}
360 self._custom_query = custom_query or {}
361 self._strict_response_validation = _strict_response_validation
362 self._idempotency_header = None
363
364 def _enforce_trailing_slash(self, url: URL) -> URL:
365 if url.raw_path.endswith(b"/"):
366 return url
367 return url.copy_with(raw_path=url.raw_path + b"/")
368
369 def _make_status_error_from_response(
370 self,
371 response: httpx.Response,
372 ) -> APIStatusError:
373 if response.is_closed and not response.is_stream_consumed:
374 # We can't read the response body as it has been closed
375 # before it was read. This can happen if an event hook
376 # raises a status error.
377 body = None
378 err_msg = f"Error code: {response.status_code}"
379 else:
380 err_text = response.text.strip()
381 body = err_text
382
383 try:
384 body = json.loads(err_text)
385 err_msg = f"Error code: {response.status_code} - {body}"
386 except Exception:
387 err_msg = err_text or f"Error code: {response.status_code}"
388
389 return self._make_status_error(err_msg, body=body, response=response)
390
391 def _make_status_error(
392 self,
393 err_msg: str,
394 *,
395 body: object,
396 response: httpx.Response,
397 ) -> _exceptions.APIStatusError:
398 raise NotImplementedError()
399
400 def _remaining_retries(
401 self,
402 remaining_retries: Optional[int],
403 options: FinalRequestOptions,
404 ) -> int:
405 return remaining_retries if remaining_retries is not None else options.get_max_retries(self.max_retries)
406
407 def _build_headers(self, options: FinalRequestOptions) -> httpx.Headers:
408 custom_headers = options.headers or {}
409 headers_dict = _merge_mappings(self.default_headers, custom_headers)
410 self._validate_headers(headers_dict, custom_headers)
411
412 # headers are case-insensitive while dictionaries are not.
413 headers = httpx.Headers(headers_dict)
414
415 idempotency_header = self._idempotency_header
416 if idempotency_header and options.method.lower() != "get" and idempotency_header not in headers:
417 headers[idempotency_header] = options.idempotency_key or self._idempotency_key()
418
419 return headers
420
421 def _prepare_url(self, url: str) -> URL:
422 """
423 Merge a URL argument together with any 'base_url' on the client,
424 to create the URL used for the outgoing request.
425 """
426 # Copied from httpx's `_merge_url` method.
427 merge_url = URL(url)
428 if merge_url.is_relative_url:
429 merge_raw_path = self.base_url.raw_path + merge_url.raw_path.lstrip(b"/")
430 return self.base_url.copy_with(raw_path=merge_raw_path)
431
432 return merge_url
433
434 def _build_request(
435 self,
436 options: FinalRequestOptions,
437 ) -> httpx.Request:
438 if log.isEnabledFor(logging.DEBUG):
439 log.debug("Request options: %s", model_dump(options, exclude_unset=True))
440
441 kwargs: dict[str, Any] = {}
442
443 json_data = options.json_data
444 if options.extra_json is not None:
445 if json_data is None:
446 json_data = cast(Body, options.extra_json)
447 elif is_mapping(json_data):
448 json_data = _merge_mappings(json_data, options.extra_json)
449 else:
450 raise RuntimeError(f"Unexpected JSON data type, {type(json_data)}, cannot merge with `extra_body`")
451
452 headers = self._build_headers(options)
453 params = _merge_mappings(self._custom_query, options.params)
454 content_type = headers.get("Content-Type")
455
456 # If the given Content-Type header is multipart/form-data then it
457 # has to be removed so that httpx can generate the header with
458 # additional information for us as it has to be in this form
459 # for the server to be able to correctly parse the request:
460 # multipart/form-data; boundary=---abc--
461 if content_type is not None and content_type.startswith("multipart/form-data"):
462 if "boundary" not in content_type:
463 # only remove the header if the boundary hasn't been explicitly set
464 # as the caller doesn't want httpx to come up with their own boundary
465 headers.pop("Content-Type")
466
467 # As we are now sending multipart/form-data instead of application/json
468 # we need to tell httpx to use it, https://www.python-httpx.org/advanced/#multipart-file-encoding
469 if json_data:
470 if not is_dict(json_data):
471 raise TypeError(
472 f"Expected query input to be a dictionary for multipart requests but got {type(json_data)} instead."
473 )
474 kwargs["data"] = self._serialize_multipartform(json_data)
475
476 # TODO: report this error to httpx
477 return self._client.build_request( # pyright: ignore[reportUnknownMemberType]
478 headers=headers,
479 timeout=self.timeout if isinstance(options.timeout, NotGiven) else options.timeout,
480 method=options.method,
481 url=self._prepare_url(options.url),
482 # the `Query` type that we use is incompatible with qs'
483 # `Params` type as it needs to be typed as `Mapping[str, object]`
484 # so that passing a `TypedDict` doesn't cause an error.
485 # https://github.com/microsoft/pyright/issues/3526#event-6715453066
486 params=self.qs.stringify(cast(Mapping[str, Any], params)) if params else None,
487 json=json_data,
488 files=options.files,
489 **kwargs,
490 )
491
492 def _serialize_multipartform(self, data: Mapping[object, object]) -> dict[str, object]:
493 items = self.qs.stringify_items(
494 # TODO: type ignore is required as stringify_items is well typed but we can't be
495 # well typed without heavy validation.
496 data, # type: ignore
497 array_format="brackets",
498 )
499 serialized: dict[str, object] = {}
500 for key, value in items:
501 existing = serialized.get(key)
502
503 if not existing:
504 serialized[key] = value
505 continue
506
507 # If a value has already been set for this key then that
508 # means we're sending data like `array[]=[1, 2, 3]` and we
509 # need to tell httpx that we want to send multiple values with
510 # the same key which is done by using a list or a tuple.
511 #
512 # Note: 2d arrays should never result in the same key at both
513 # levels so it's safe to assume that if the value is a list,
514 # it was because we changed it to be a list.
515 if is_list(existing):
516 existing.append(value)
517 else:
518 serialized[key] = [existing, value]
519
520 return serialized
521
522 def _maybe_override_cast_to(self, cast_to: type[ResponseT], options: FinalRequestOptions) -> type[ResponseT]:
523 if not is_given(options.headers):
524 return cast_to
525
526 # make a copy of the headers so we don't mutate user-input
527 headers = dict(options.headers)
528
529 # we internally support defining a temporary header to override the
530 # default `cast_to` type for use with `.with_raw_response` and `.with_streaming_response`
531 # see _response.py for implementation details
532 override_cast_to = headers.pop(OVERRIDE_CAST_TO_HEADER, NOT_GIVEN)
533 if is_given(override_cast_to):
534 options.headers = headers
535 return cast(Type[ResponseT], override_cast_to)
536
537 return cast_to
538
539 def _should_stream_response_body(self, request: httpx.Request) -> bool:
540 return request.headers.get(RAW_RESPONSE_HEADER) == "stream" # type: ignore[no-any-return]
541
542 def _process_response_data(
543 self,
544 *,
545 data: object,
546 cast_to: type[ResponseT],
547 response: httpx.Response,
548 ) -> ResponseT:
549 if data is None:
550 return cast(ResponseT, None)
551
552 if cast_to is object:
553 return cast(ResponseT, data)
554
555 try:
556 if inspect.isclass(cast_to) and issubclass(cast_to, ModelBuilderProtocol):
557 return cast(ResponseT, cast_to.build(response=response, data=data))
558
559 if self._strict_response_validation:
560 return cast(ResponseT, validate_type(type_=cast_to, value=data))
561
562 return cast(ResponseT, construct_type(type_=cast_to, value=data))
563 except pydantic.ValidationError as err:
564 raise APIResponseValidationError(response=response, body=data) from err
565
566 @property
567 def qs(self) -> Querystring:
568 return Querystring()
569
570 @property
571 def custom_auth(self) -> httpx.Auth | None:
572 return None
573
574 @property
575 def auth_headers(self) -> dict[str, str]:
576 return {}
577
578 @property
579 def default_headers(self) -> dict[str, str | Omit]:
580 return {
581 "Accept": "application/json",
582 "Content-Type": "application/json",
583 "User-Agent": self.user_agent,
584 **self.platform_headers(),
585 **self.auth_headers,
586 **self._custom_headers,
587 }
588
589 def _validate_headers(
590 self,
591 headers: Headers, # noqa: ARG002
592 custom_headers: Headers, # noqa: ARG002
593 ) -> None:
594 """Validate the given default headers and custom headers.
595
596 Does nothing by default.
597 """
598 return
599
600 @property
601 def user_agent(self) -> str:
602 return f"{self.__class__.__name__}/Python {self._version}"
603
604 @property
605 def base_url(self) -> URL:
606 return self._base_url
607
608 @base_url.setter
609 def base_url(self, url: URL | str) -> None:
610 self._base_url = self._enforce_trailing_slash(url if isinstance(url, URL) else URL(url))
611
612 def platform_headers(self) -> Dict[str, str]:
613 return platform_headers(self._version)
614
615 def _parse_retry_after_header(self, response_headers: Optional[httpx.Headers] = None) -> float | None:
616 """Returns a float of the number of seconds (not milliseconds) to wait after retrying, or None if unspecified.
617
618 About the Retry-After header: https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Retry-After
619 See also https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Retry-After#syntax
620 """
621 if response_headers is None:
622 return None
623
624 # First, try the non-standard `retry-after-ms` header for milliseconds,
625 # which is more precise than integer-seconds `retry-after`
626 try:
627 retry_ms_header = response_headers.get("retry-after-ms", None)
628 return float(retry_ms_header) / 1000
629 except (TypeError, ValueError):
630 pass
631
632 # Next, try parsing `retry-after` header as seconds (allowing nonstandard floats).
633 retry_header = response_headers.get("retry-after")
634 try:
635 # note: the spec indicates that this should only ever be an integer
636 # but if someone sends a float there's no reason for us to not respect it
637 return float(retry_header)
638 except (TypeError, ValueError):
639 pass
640
641 # Last, try parsing `retry-after` as a date.
642 retry_date_tuple = email.utils.parsedate_tz(retry_header)
643 if retry_date_tuple is None:
644 return None
645
646 retry_date = email.utils.mktime_tz(retry_date_tuple)
647 return float(retry_date - time.time())
648
649 def _calculate_retry_timeout(
650 self,
651 remaining_retries: int,
652 options: FinalRequestOptions,
653 response_headers: Optional[httpx.Headers] = None,
654 ) -> float:
655 max_retries = options.get_max_retries(self.max_retries)
656
657 # If the API asks us to wait a certain amount of time (and it's a reasonable amount), just do what it says.
658 retry_after = self._parse_retry_after_header(response_headers)
659 if retry_after is not None and 0 < retry_after <= 60:
660 return retry_after
661
662 nb_retries = max_retries - remaining_retries
663
664 # Apply exponential backoff, but not more than the max.
665 sleep_seconds = min(INITIAL_RETRY_DELAY * pow(2.0, nb_retries), MAX_RETRY_DELAY)
666
667 # Apply some jitter, plus-or-minus half a second.
668 jitter = 1 - 0.25 * random()
669 timeout = sleep_seconds * jitter
670 return timeout if timeout >= 0 else 0
671
672 def _should_retry(self, response: httpx.Response) -> bool:
673 # Note: this is not a standard header
674 should_retry_header = response.headers.get("x-should-retry")
675
676 # If the server explicitly says whether or not to retry, obey.
677 if should_retry_header == "true":
678 log.debug("Retrying as header `x-should-retry` is set to `true`")
679 return True
680 if should_retry_header == "false":
681 log.debug("Not retrying as header `x-should-retry` is set to `false`")
682 return False
683
684 # Retry on request timeouts.
685 if response.status_code == 408:
686 log.debug("Retrying due to status code %i", response.status_code)
687 return True
688
689 # Retry on lock timeouts.
690 if response.status_code == 409:
691 log.debug("Retrying due to status code %i", response.status_code)
692 return True
693
694 # Retry on rate limits.
695 if response.status_code == 429:
696 log.debug("Retrying due to status code %i", response.status_code)
697 return True
698
699 # Retry internal errors.
700 if response.status_code >= 500:
701 log.debug("Retrying due to status code %i", response.status_code)
702 return True
703
704 log.debug("Not retrying")
705 return False
706
707 def _idempotency_key(self) -> str:
708 return f"stainless-python-retry-{uuid.uuid4()}"
709
710
711class SyncHttpxClientWrapper(httpx.Client):
712 def __del__(self) -> None:
713 try:
714 self.close()
715 except Exception:
716 pass
717
718
719class SyncAPIClient(BaseClient[httpx.Client, Stream[Any]]):
720 _client: httpx.Client
721 _default_stream_cls: type[Stream[Any]] | None = None
722
723 def __init__(
724 self,
725 *,
726 version: str,
727 base_url: str | URL,
728 max_retries: int = DEFAULT_MAX_RETRIES,
729 timeout: float | Timeout | None | NotGiven = NOT_GIVEN,
730 transport: Transport | None = None,
731 proxies: ProxiesTypes | None = None,
732 limits: Limits | None = None,
733 http_client: httpx.Client | None = None,
734 custom_headers: Mapping[str, str] | None = None,
735 custom_query: Mapping[str, object] | None = None,
736 _strict_response_validation: bool,
737 ) -> None:
738 if limits is not None:
739 warnings.warn(
740 "The `connection_pool_limits` argument is deprecated. The `http_client` argument should be passed instead",
741 category=DeprecationWarning,
742 stacklevel=3,
743 )
744 if http_client is not None:
745 raise ValueError("The `http_client` argument is mutually exclusive with `connection_pool_limits`")
746 else:
747 limits = DEFAULT_LIMITS
748
749 if transport is not None:
750 warnings.warn(
751 "The `transport` argument is deprecated. The `http_client` argument should be passed instead",
752 category=DeprecationWarning,
753 stacklevel=3,
754 )
755 if http_client is not None:
756 raise ValueError("The `http_client` argument is mutually exclusive with `transport`")
757
758 if proxies is not None:
759 warnings.warn(
760 "The `proxies` argument is deprecated. The `http_client` argument should be passed instead",
761 category=DeprecationWarning,
762 stacklevel=3,
763 )
764 if http_client is not None:
765 raise ValueError("The `http_client` argument is mutually exclusive with `proxies`")
766
767 if not is_given(timeout):
768 # if the user passed in a custom http client with a non-default
769 # timeout set then we use that timeout.
770 #
771 # note: there is an edge case here where the user passes in a client
772 # where they've explicitly set the timeout to match the default timeout
773 # as this check is structural, meaning that we'll think they didn't
774 # pass in a timeout and will ignore it
775 if http_client and http_client.timeout != HTTPX_DEFAULT_TIMEOUT:
776 timeout = http_client.timeout
777 else:
778 timeout = DEFAULT_TIMEOUT
779
780 super().__init__(
781 version=version,
782 limits=limits,
783 # cast to a valid type because mypy doesn't understand our type narrowing
784 timeout=cast(Timeout, timeout),
785 proxies=proxies,
786 base_url=base_url,
787 transport=transport,
788 max_retries=max_retries,
789 custom_query=custom_query,
790 custom_headers=custom_headers,
791 _strict_response_validation=_strict_response_validation,
792 )
793 self._client = http_client or SyncHttpxClientWrapper(
794 base_url=base_url,
795 # cast to a valid type because mypy doesn't understand our type narrowing
796 timeout=cast(Timeout, timeout),
797 proxies=proxies,
798 transport=transport,
799 limits=limits,
800 follow_redirects=True,
801 )
802
803 def is_closed(self) -> bool:
804 return self._client.is_closed
805
806 def close(self) -> None:
807 """Close the underlying HTTPX client.
808
809 The client will *not* be usable after this.
810 """
811 # If an error is thrown while constructing a client, self._client
812 # may not be present
813 if hasattr(self, "_client"):
814 self._client.close()
815
816 def __enter__(self: _T) -> _T:
817 return self
818
819 def __exit__(
820 self,
821 exc_type: type[BaseException] | None,
822 exc: BaseException | None,
823 exc_tb: TracebackType | None,
824 ) -> None:
825 self.close()
826
827 def _prepare_options(
828 self,
829 options: FinalRequestOptions, # noqa: ARG002
830 ) -> None:
831 """Hook for mutating the given options"""
832 return None
833
834 def _prepare_request(
835 self,
836 request: httpx.Request, # noqa: ARG002
837 ) -> None:
838 """This method is used as a callback for mutating the `Request` object
839 after it has been constructed.
840 This is useful for cases where you want to add certain headers based off of
841 the request properties, e.g. `url`, `method` etc.
842 """
843 return None
844
845 @overload
846 def request(
847 self,
848 cast_to: Type[ResponseT],
849 options: FinalRequestOptions,
850 remaining_retries: Optional[int] = None,
851 *,
852 stream: Literal[True],
853 stream_cls: Type[_StreamT],
854 ) -> _StreamT:
855 ...
856
857 @overload
858 def request(
859 self,
860 cast_to: Type[ResponseT],
861 options: FinalRequestOptions,
862 remaining_retries: Optional[int] = None,
863 *,
864 stream: Literal[False] = False,
865 ) -> ResponseT:
866 ...
867
868 @overload
869 def request(
870 self,
871 cast_to: Type[ResponseT],
872 options: FinalRequestOptions,
873 remaining_retries: Optional[int] = None,
874 *,
875 stream: bool = False,
876 stream_cls: Type[_StreamT] | None = None,
877 ) -> ResponseT | _StreamT:
878 ...
879
880 def request(
881 self,
882 cast_to: Type[ResponseT],
883 options: FinalRequestOptions,
884 remaining_retries: Optional[int] = None,
885 *,
886 stream: bool = False,
887 stream_cls: type[_StreamT] | None = None,
888 ) -> ResponseT | _StreamT:
889 return self._request(
890 cast_to=cast_to,
891 options=options,
892 stream=stream,
893 stream_cls=stream_cls,
894 remaining_retries=remaining_retries,
895 )
896
897 def _request(
898 self,
899 *,
900 cast_to: Type[ResponseT],
901 options: FinalRequestOptions,
902 remaining_retries: int | None,
903 stream: bool,
904 stream_cls: type[_StreamT] | None,
905 ) -> ResponseT | _StreamT:
906 cast_to = self._maybe_override_cast_to(cast_to, options)
907 self._prepare_options(options)
908
909 retries = self._remaining_retries(remaining_retries, options)
910 request = self._build_request(options)
911 self._prepare_request(request)
912
913 kwargs: HttpxSendArgs = {}
914 if self.custom_auth is not None:
915 kwargs["auth"] = self.custom_auth
916
917 try:
918 response = self._client.send(
919 request,
920 stream=stream or self._should_stream_response_body(request=request),
921 **kwargs,
922 )
923 except httpx.TimeoutException as err:
924 log.debug("Encountered httpx.TimeoutException", exc_info=True)
925
926 if retries > 0:
927 return self._retry_request(
928 options,
929 cast_to,
930 retries,
931 stream=stream,
932 stream_cls=stream_cls,
933 response_headers=None,
934 )
935
936 log.debug("Raising timeout error")
937 raise APITimeoutError(request=request) from err
938 except Exception as err:
939 log.debug("Encountered Exception", exc_info=True)
940
941 if retries > 0:
942 return self._retry_request(
943 options,
944 cast_to,
945 retries,
946 stream=stream,
947 stream_cls=stream_cls,
948 response_headers=None,
949 )
950
951 log.debug("Raising connection error")
952 raise APIConnectionError(request=request) from err
953
954 log.debug(
955 'HTTP Request: %s %s "%i %s"', request.method, request.url, response.status_code, response.reason_phrase
956 )
957
958 try:
959 response.raise_for_status()
960 except httpx.HTTPStatusError as err: # thrown on 4xx and 5xx status code
961 log.debug("Encountered httpx.HTTPStatusError", exc_info=True)
962
963 if retries > 0 and self._should_retry(err.response):
964 err.response.close()
965 return self._retry_request(
966 options,
967 cast_to,
968 retries,
969 err.response.headers,
970 stream=stream,
971 stream_cls=stream_cls,
972 )
973
974 # If the response is streamed then we need to explicitly read the response
975 # to completion before attempting to access the response text.
976 if not err.response.is_closed:
977 err.response.read()
978
979 log.debug("Re-raising status error")
980 raise self._make_status_error_from_response(err.response) from None
981
982 return self._process_response(
983 cast_to=cast_to,
984 options=options,
985 response=response,
986 stream=stream,
987 stream_cls=stream_cls,
988 )
989
990 def _retry_request(
991 self,
992 options: FinalRequestOptions,
993 cast_to: Type[ResponseT],
994 remaining_retries: int,
995 response_headers: httpx.Headers | None,
996 *,
997 stream: bool,
998 stream_cls: type[_StreamT] | None,
999 ) -> ResponseT | _StreamT:
1000 remaining = remaining_retries - 1
1001 if remaining == 1:
1002 log.debug("1 retry left")
1003 else:
1004 log.debug("%i retries left", remaining)
1005
1006 timeout = self._calculate_retry_timeout(remaining, options, response_headers)
1007 log.info("Retrying request to %s in %f seconds", options.url, timeout)
1008
1009 # In a synchronous context we are blocking the entire thread. Up to the library user to run the client in a
1010 # different thread if necessary.
1011 time.sleep(timeout)
1012
1013 return self._request(
1014 options=options,
1015 cast_to=cast_to,
1016 remaining_retries=remaining,
1017 stream=stream,
1018 stream_cls=stream_cls,
1019 )
1020
1021 def _process_response(
1022 self,
1023 *,
1024 cast_to: Type[ResponseT],
1025 options: FinalRequestOptions,
1026 response: httpx.Response,
1027 stream: bool,
1028 stream_cls: type[Stream[Any]] | type[AsyncStream[Any]] | None,
1029 ) -> ResponseT:
1030 if response.request.headers.get(RAW_RESPONSE_HEADER) == "true":
1031 return cast(
1032 ResponseT,
1033 LegacyAPIResponse(
1034 raw=response,
1035 client=self,
1036 cast_to=cast_to,
1037 stream=stream,
1038 stream_cls=stream_cls,
1039 options=options,
1040 ),
1041 )
1042
1043 origin = get_origin(cast_to) or cast_to
1044
1045 if inspect.isclass(origin) and issubclass(origin, BaseAPIResponse):
1046 if not issubclass(origin, APIResponse):
1047 raise TypeError(f"API Response types must subclass {APIResponse}; Received {origin}")
1048
1049 response_cls = cast("type[BaseAPIResponse[Any]]", cast_to)
1050 return cast(
1051 ResponseT,
1052 response_cls(
1053 raw=response,
1054 client=self,
1055 cast_to=extract_response_type(response_cls),
1056 stream=stream,
1057 stream_cls=stream_cls,
1058 options=options,
1059 ),
1060 )
1061
1062 if cast_to == httpx.Response:
1063 return cast(ResponseT, response)
1064
1065 api_response = APIResponse(
1066 raw=response,
1067 client=self,
1068 cast_to=cast("type[ResponseT]", cast_to), # pyright: ignore[reportUnnecessaryCast]
1069 stream=stream,
1070 stream_cls=stream_cls,
1071 options=options,
1072 )
1073 if bool(response.request.headers.get(RAW_RESPONSE_HEADER)):
1074 return cast(ResponseT, api_response)
1075
1076 return api_response.parse()
1077
1078 def _request_api_list(
1079 self,
1080 model: Type[object],
1081 page: Type[SyncPageT],
1082 options: FinalRequestOptions,
1083 ) -> SyncPageT:
1084 def _parser(resp: SyncPageT) -> SyncPageT:
1085 resp._set_private_attributes(
1086 client=self,
1087 model=model,
1088 options=options,
1089 )
1090 return resp
1091
1092 options.post_parser = _parser
1093
1094 return self.request(page, options, stream=False)
1095
1096 @overload
1097 def get(
1098 self,
1099 path: str,
1100 *,
1101 cast_to: Type[ResponseT],
1102 options: RequestOptions = {},
1103 stream: Literal[False] = False,
1104 ) -> ResponseT:
1105 ...
1106
1107 @overload
1108 def get(
1109 self,
1110 path: str,
1111 *,
1112 cast_to: Type[ResponseT],
1113 options: RequestOptions = {},
1114 stream: Literal[True],
1115 stream_cls: type[_StreamT],
1116 ) -> _StreamT:
1117 ...
1118
1119 @overload
1120 def get(
1121 self,
1122 path: str,
1123 *,
1124 cast_to: Type[ResponseT],
1125 options: RequestOptions = {},
1126 stream: bool,
1127 stream_cls: type[_StreamT] | None = None,
1128 ) -> ResponseT | _StreamT:
1129 ...
1130
1131 def get(
1132 self,
1133 path: str,
1134 *,
1135 cast_to: Type[ResponseT],
1136 options: RequestOptions = {},
1137 stream: bool = False,
1138 stream_cls: type[_StreamT] | None = None,
1139 ) -> ResponseT | _StreamT:
1140 opts = FinalRequestOptions.construct(method="get", url=path, **options)
1141 # cast is required because mypy complains about returning Any even though
1142 # it understands the type variables
1143 return cast(ResponseT, self.request(cast_to, opts, stream=stream, stream_cls=stream_cls))
1144
1145 @overload
1146 def post(
1147 self,
1148 path: str,
1149 *,
1150 cast_to: Type[ResponseT],
1151 body: Body | None = None,
1152 options: RequestOptions = {},
1153 files: RequestFiles | None = None,
1154 stream: Literal[False] = False,
1155 ) -> ResponseT:
1156 ...
1157
1158 @overload
1159 def post(
1160 self,
1161 path: str,
1162 *,
1163 cast_to: Type[ResponseT],
1164 body: Body | None = None,
1165 options: RequestOptions = {},
1166 files: RequestFiles | None = None,
1167 stream: Literal[True],
1168 stream_cls: type[_StreamT],
1169 ) -> _StreamT:
1170 ...
1171
1172 @overload
1173 def post(
1174 self,
1175 path: str,
1176 *,
1177 cast_to: Type[ResponseT],
1178 body: Body | None = None,
1179 options: RequestOptions = {},
1180 files: RequestFiles | None = None,
1181 stream: bool,
1182 stream_cls: type[_StreamT] | None = None,
1183 ) -> ResponseT | _StreamT:
1184 ...
1185
1186 def post(
1187 self,
1188 path: str,
1189 *,
1190 cast_to: Type[ResponseT],
1191 body: Body | None = None,
1192 options: RequestOptions = {},
1193 files: RequestFiles | None = None,
1194 stream: bool = False,
1195 stream_cls: type[_StreamT] | None = None,
1196 ) -> ResponseT | _StreamT:
1197 opts = FinalRequestOptions.construct(
1198 method="post", url=path, json_data=body, files=to_httpx_files(files), **options
1199 )
1200 return cast(ResponseT, self.request(cast_to, opts, stream=stream, stream_cls=stream_cls))
1201
1202 def patch(
1203 self,
1204 path: str,
1205 *,
1206 cast_to: Type[ResponseT],
1207 body: Body | None = None,
1208 options: RequestOptions = {},
1209 ) -> ResponseT:
1210 opts = FinalRequestOptions.construct(method="patch", url=path, json_data=body, **options)
1211 return self.request(cast_to, opts)
1212
1213 def put(
1214 self,
1215 path: str,
1216 *,
1217 cast_to: Type[ResponseT],
1218 body: Body | None = None,
1219 files: RequestFiles | None = None,
1220 options: RequestOptions = {},
1221 ) -> ResponseT:
1222 opts = FinalRequestOptions.construct(
1223 method="put", url=path, json_data=body, files=to_httpx_files(files), **options
1224 )
1225 return self.request(cast_to, opts)
1226
1227 def delete(
1228 self,
1229 path: str,
1230 *,
1231 cast_to: Type[ResponseT],
1232 body: Body | None = None,
1233 options: RequestOptions = {},
1234 ) -> ResponseT:
1235 opts = FinalRequestOptions.construct(method="delete", url=path, json_data=body, **options)
1236 return self.request(cast_to, opts)
1237
1238 def get_api_list(
1239 self,
1240 path: str,
1241 *,
1242 model: Type[object],
1243 page: Type[SyncPageT],
1244 body: Body | None = None,
1245 options: RequestOptions = {},
1246 method: str = "get",
1247 ) -> SyncPageT:
1248 opts = FinalRequestOptions.construct(method=method, url=path, json_data=body, **options)
1249 return self._request_api_list(model, page, opts)
1250
1251
1252class AsyncHttpxClientWrapper(httpx.AsyncClient):
1253 def __del__(self) -> None:
1254 try:
1255 # TODO(someday): support non asyncio runtimes here
1256 asyncio.get_running_loop().create_task(self.aclose())
1257 except Exception:
1258 pass
1259
1260
1261class AsyncAPIClient(BaseClient[httpx.AsyncClient, AsyncStream[Any]]):
1262 _client: httpx.AsyncClient
1263 _default_stream_cls: type[AsyncStream[Any]] | None = None
1264
1265 def __init__(
1266 self,
1267 *,
1268 version: str,
1269 base_url: str | URL,
1270 _strict_response_validation: bool,
1271 max_retries: int = DEFAULT_MAX_RETRIES,
1272 timeout: float | Timeout | None | NotGiven = NOT_GIVEN,
1273 transport: AsyncTransport | None = None,
1274 proxies: ProxiesTypes | None = None,
1275 limits: Limits | None = None,
1276 http_client: httpx.AsyncClient | None = None,
1277 custom_headers: Mapping[str, str] | None = None,
1278 custom_query: Mapping[str, object] | None = None,
1279 ) -> None:
1280 if limits is not None:
1281 warnings.warn(
1282 "The `connection_pool_limits` argument is deprecated. The `http_client` argument should be passed instead",
1283 category=DeprecationWarning,
1284 stacklevel=3,
1285 )
1286 if http_client is not None:
1287 raise ValueError("The `http_client` argument is mutually exclusive with `connection_pool_limits`")
1288 else:
1289 limits = DEFAULT_LIMITS
1290
1291 if transport is not None:
1292 warnings.warn(
1293 "The `transport` argument is deprecated. The `http_client` argument should be passed instead",
1294 category=DeprecationWarning,
1295 stacklevel=3,
1296 )
1297 if http_client is not None:
1298 raise ValueError("The `http_client` argument is mutually exclusive with `transport`")
1299
1300 if proxies is not None:
1301 warnings.warn(
1302 "The `proxies` argument is deprecated. The `http_client` argument should be passed instead",
1303 category=DeprecationWarning,
1304 stacklevel=3,
1305 )
1306 if http_client is not None:
1307 raise ValueError("The `http_client` argument is mutually exclusive with `proxies`")
1308
1309 if not is_given(timeout):
1310 # if the user passed in a custom http client with a non-default
1311 # timeout set then we use that timeout.
1312 #
1313 # note: there is an edge case here where the user passes in a client
1314 # where they've explicitly set the timeout to match the default timeout
1315 # as this check is structural, meaning that we'll think they didn't
1316 # pass in a timeout and will ignore it
1317 if http_client and http_client.timeout != HTTPX_DEFAULT_TIMEOUT:
1318 timeout = http_client.timeout
1319 else:
1320 timeout = DEFAULT_TIMEOUT
1321
1322 super().__init__(
1323 version=version,
1324 base_url=base_url,
1325 limits=limits,
1326 # cast to a valid type because mypy doesn't understand our type narrowing
1327 timeout=cast(Timeout, timeout),
1328 proxies=proxies,
1329 transport=transport,
1330 max_retries=max_retries,
1331 custom_query=custom_query,
1332 custom_headers=custom_headers,
1333 _strict_response_validation=_strict_response_validation,
1334 )
1335 self._client = http_client or AsyncHttpxClientWrapper(
1336 base_url=base_url,
1337 # cast to a valid type because mypy doesn't understand our type narrowing
1338 timeout=cast(Timeout, timeout),
1339 proxies=proxies,
1340 transport=transport,
1341 limits=limits,
1342 follow_redirects=True,
1343 )
1344
1345 def is_closed(self) -> bool:
1346 return self._client.is_closed
1347
1348 async def close(self) -> None:
1349 """Close the underlying HTTPX client.
1350
1351 The client will *not* be usable after this.
1352 """
1353 await self._client.aclose()
1354
1355 async def __aenter__(self: _T) -> _T:
1356 return self
1357
1358 async def __aexit__(
1359 self,
1360 exc_type: type[BaseException] | None,
1361 exc: BaseException | None,
1362 exc_tb: TracebackType | None,
1363 ) -> None:
1364 await self.close()
1365
1366 async def _prepare_options(
1367 self,
1368 options: FinalRequestOptions, # noqa: ARG002
1369 ) -> None:
1370 """Hook for mutating the given options"""
1371 return None
1372
1373 async def _prepare_request(
1374 self,
1375 request: httpx.Request, # noqa: ARG002
1376 ) -> None:
1377 """This method is used as a callback for mutating the `Request` object
1378 after it has been constructed.
1379 This is useful for cases where you want to add certain headers based off of
1380 the request properties, e.g. `url`, `method` etc.
1381 """
1382 return None
1383
1384 @overload
1385 async def request(
1386 self,
1387 cast_to: Type[ResponseT],
1388 options: FinalRequestOptions,
1389 *,
1390 stream: Literal[False] = False,
1391 remaining_retries: Optional[int] = None,
1392 ) -> ResponseT:
1393 ...
1394
1395 @overload
1396 async def request(
1397 self,
1398 cast_to: Type[ResponseT],
1399 options: FinalRequestOptions,
1400 *,
1401 stream: Literal[True],
1402 stream_cls: type[_AsyncStreamT],
1403 remaining_retries: Optional[int] = None,
1404 ) -> _AsyncStreamT:
1405 ...
1406
1407 @overload
1408 async def request(
1409 self,
1410 cast_to: Type[ResponseT],
1411 options: FinalRequestOptions,
1412 *,
1413 stream: bool,
1414 stream_cls: type[_AsyncStreamT] | None = None,
1415 remaining_retries: Optional[int] = None,
1416 ) -> ResponseT | _AsyncStreamT:
1417 ...
1418
1419 async def request(
1420 self,
1421 cast_to: Type[ResponseT],
1422 options: FinalRequestOptions,
1423 *,
1424 stream: bool = False,
1425 stream_cls: type[_AsyncStreamT] | None = None,
1426 remaining_retries: Optional[int] = None,
1427 ) -> ResponseT | _AsyncStreamT:
1428 return await self._request(
1429 cast_to=cast_to,
1430 options=options,
1431 stream=stream,
1432 stream_cls=stream_cls,
1433 remaining_retries=remaining_retries,
1434 )
1435
1436 async def _request(
1437 self,
1438 cast_to: Type[ResponseT],
1439 options: FinalRequestOptions,
1440 *,
1441 stream: bool,
1442 stream_cls: type[_AsyncStreamT] | None,
1443 remaining_retries: int | None,
1444 ) -> ResponseT | _AsyncStreamT:
1445 cast_to = self._maybe_override_cast_to(cast_to, options)
1446 await self._prepare_options(options)
1447
1448 retries = self._remaining_retries(remaining_retries, options)
1449 request = self._build_request(options)
1450 await self._prepare_request(request)
1451
1452 kwargs: HttpxSendArgs = {}
1453 if self.custom_auth is not None:
1454 kwargs["auth"] = self.custom_auth
1455
1456 try:
1457 response = await self._client.send(
1458 request,
1459 stream=stream or self._should_stream_response_body(request=request),
1460 **kwargs,
1461 )
1462 except httpx.TimeoutException as err:
1463 log.debug("Encountered httpx.TimeoutException", exc_info=True)
1464
1465 if retries > 0:
1466 return await self._retry_request(
1467 options,
1468 cast_to,
1469 retries,
1470 stream=stream,
1471 stream_cls=stream_cls,
1472 response_headers=None,
1473 )
1474
1475 log.debug("Raising timeout error")
1476 raise APITimeoutError(request=request) from err
1477 except Exception as err:
1478 log.debug("Encountered Exception", exc_info=True)
1479
1480 if retries > 0:
1481 return await self._retry_request(
1482 options,
1483 cast_to,
1484 retries,
1485 stream=stream,
1486 stream_cls=stream_cls,
1487 response_headers=None,
1488 )
1489
1490 log.debug("Raising connection error")
1491 raise APIConnectionError(request=request) from err
1492
1493 log.debug(
1494 'HTTP Request: %s %s "%i %s"', request.method, request.url, response.status_code, response.reason_phrase
1495 )
1496
1497 try:
1498 response.raise_for_status()
1499 except httpx.HTTPStatusError as err: # thrown on 4xx and 5xx status code
1500 log.debug("Encountered httpx.HTTPStatusError", exc_info=True)
1501
1502 if retries > 0 and self._should_retry(err.response):
1503 await err.response.aclose()
1504 return await self._retry_request(
1505 options,
1506 cast_to,
1507 retries,
1508 err.response.headers,
1509 stream=stream,
1510 stream_cls=stream_cls,
1511 )
1512
1513 # If the response is streamed then we need to explicitly read the response
1514 # to completion before attempting to access the response text.
1515 if not err.response.is_closed:
1516 await err.response.aread()
1517
1518 log.debug("Re-raising status error")
1519 raise self._make_status_error_from_response(err.response) from None
1520
1521 return await self._process_response(
1522 cast_to=cast_to,
1523 options=options,
1524 response=response,
1525 stream=stream,
1526 stream_cls=stream_cls,
1527 )
1528
1529 async def _retry_request(
1530 self,
1531 options: FinalRequestOptions,
1532 cast_to: Type[ResponseT],
1533 remaining_retries: int,
1534 response_headers: httpx.Headers | None,
1535 *,
1536 stream: bool,
1537 stream_cls: type[_AsyncStreamT] | None,
1538 ) -> ResponseT | _AsyncStreamT:
1539 remaining = remaining_retries - 1
1540 if remaining == 1:
1541 log.debug("1 retry left")
1542 else:
1543 log.debug("%i retries left", remaining)
1544
1545 timeout = self._calculate_retry_timeout(remaining, options, response_headers)
1546 log.info("Retrying request to %s in %f seconds", options.url, timeout)
1547
1548 await anyio.sleep(timeout)
1549
1550 return await self._request(
1551 options=options,
1552 cast_to=cast_to,
1553 remaining_retries=remaining,
1554 stream=stream,
1555 stream_cls=stream_cls,
1556 )
1557
1558 async def _process_response(
1559 self,
1560 *,
1561 cast_to: Type[ResponseT],
1562 options: FinalRequestOptions,
1563 response: httpx.Response,
1564 stream: bool,
1565 stream_cls: type[Stream[Any]] | type[AsyncStream[Any]] | None,
1566 ) -> ResponseT:
1567 if response.request.headers.get(RAW_RESPONSE_HEADER) == "true":
1568 return cast(
1569 ResponseT,
1570 LegacyAPIResponse(
1571 raw=response,
1572 client=self,
1573 cast_to=cast_to,
1574 stream=stream,
1575 stream_cls=stream_cls,
1576 options=options,
1577 ),
1578 )
1579
1580 origin = get_origin(cast_to) or cast_to
1581
1582 if inspect.isclass(origin) and issubclass(origin, BaseAPIResponse):
1583 if not issubclass(origin, AsyncAPIResponse):
1584 raise TypeError(f"API Response types must subclass {AsyncAPIResponse}; Received {origin}")
1585
1586 response_cls = cast("type[BaseAPIResponse[Any]]", cast_to)
1587 return cast(
1588 "ResponseT",
1589 response_cls(
1590 raw=response,
1591 client=self,
1592 cast_to=extract_response_type(response_cls),
1593 stream=stream,
1594 stream_cls=stream_cls,
1595 options=options,
1596 ),
1597 )
1598
1599 if cast_to == httpx.Response:
1600 return cast(ResponseT, response)
1601
1602 api_response = AsyncAPIResponse(
1603 raw=response,
1604 client=self,
1605 cast_to=cast("type[ResponseT]", cast_to), # pyright: ignore[reportUnnecessaryCast]
1606 stream=stream,
1607 stream_cls=stream_cls,
1608 options=options,
1609 )
1610 if bool(response.request.headers.get(RAW_RESPONSE_HEADER)):
1611 return cast(ResponseT, api_response)
1612
1613 return await api_response.parse()
1614
1615 def _request_api_list(
1616 self,
1617 model: Type[_T],
1618 page: Type[AsyncPageT],
1619 options: FinalRequestOptions,
1620 ) -> AsyncPaginator[_T, AsyncPageT]:
1621 return AsyncPaginator(client=self, options=options, page_cls=page, model=model)
1622
1623 @overload
1624 async def get(
1625 self,
1626 path: str,
1627 *,
1628 cast_to: Type[ResponseT],
1629 options: RequestOptions = {},
1630 stream: Literal[False] = False,
1631 ) -> ResponseT:
1632 ...
1633
1634 @overload
1635 async def get(
1636 self,
1637 path: str,
1638 *,
1639 cast_to: Type[ResponseT],
1640 options: RequestOptions = {},
1641 stream: Literal[True],
1642 stream_cls: type[_AsyncStreamT],
1643 ) -> _AsyncStreamT:
1644 ...
1645
1646 @overload
1647 async def get(
1648 self,
1649 path: str,
1650 *,
1651 cast_to: Type[ResponseT],
1652 options: RequestOptions = {},
1653 stream: bool,
1654 stream_cls: type[_AsyncStreamT] | None = None,
1655 ) -> ResponseT | _AsyncStreamT:
1656 ...
1657
1658 async def get(
1659 self,
1660 path: str,
1661 *,
1662 cast_to: Type[ResponseT],
1663 options: RequestOptions = {},
1664 stream: bool = False,
1665 stream_cls: type[_AsyncStreamT] | None = None,
1666 ) -> ResponseT | _AsyncStreamT:
1667 opts = FinalRequestOptions.construct(method="get", url=path, **options)
1668 return await self.request(cast_to, opts, stream=stream, stream_cls=stream_cls)
1669
1670 @overload
1671 async def post(
1672 self,
1673 path: str,
1674 *,
1675 cast_to: Type[ResponseT],
1676 body: Body | None = None,
1677 files: RequestFiles | None = None,
1678 options: RequestOptions = {},
1679 stream: Literal[False] = False,
1680 ) -> ResponseT:
1681 ...
1682
1683 @overload
1684 async def post(
1685 self,
1686 path: str,
1687 *,
1688 cast_to: Type[ResponseT],
1689 body: Body | None = None,
1690 files: RequestFiles | None = None,
1691 options: RequestOptions = {},
1692 stream: Literal[True],
1693 stream_cls: type[_AsyncStreamT],
1694 ) -> _AsyncStreamT:
1695 ...
1696
1697 @overload
1698 async def post(
1699 self,
1700 path: str,
1701 *,
1702 cast_to: Type[ResponseT],
1703 body: Body | None = None,
1704 files: RequestFiles | None = None,
1705 options: RequestOptions = {},
1706 stream: bool,
1707 stream_cls: type[_AsyncStreamT] | None = None,
1708 ) -> ResponseT | _AsyncStreamT:
1709 ...
1710
1711 async def post(
1712 self,
1713 path: str,
1714 *,
1715 cast_to: Type[ResponseT],
1716 body: Body | None = None,
1717 files: RequestFiles | None = None,
1718 options: RequestOptions = {},
1719 stream: bool = False,
1720 stream_cls: type[_AsyncStreamT] | None = None,
1721 ) -> ResponseT | _AsyncStreamT:
1722 opts = FinalRequestOptions.construct(
1723 method="post", url=path, json_data=body, files=await async_to_httpx_files(files), **options
1724 )
1725 return await self.request(cast_to, opts, stream=stream, stream_cls=stream_cls)
1726
1727 async def patch(
1728 self,
1729 path: str,
1730 *,
1731 cast_to: Type[ResponseT],
1732 body: Body | None = None,
1733 options: RequestOptions = {},
1734 ) -> ResponseT:
1735 opts = FinalRequestOptions.construct(method="patch", url=path, json_data=body, **options)
1736 return await self.request(cast_to, opts)
1737
1738 async def put(
1739 self,
1740 path: str,
1741 *,
1742 cast_to: Type[ResponseT],
1743 body: Body | None = None,
1744 files: RequestFiles | None = None,
1745 options: RequestOptions = {},
1746 ) -> ResponseT:
1747 opts = FinalRequestOptions.construct(
1748 method="put", url=path, json_data=body, files=await async_to_httpx_files(files), **options
1749 )
1750 return await self.request(cast_to, opts)
1751
1752 async def delete(
1753 self,
1754 path: str,
1755 *,
1756 cast_to: Type[ResponseT],
1757 body: Body | None = None,
1758 options: RequestOptions = {},
1759 ) -> ResponseT:
1760 opts = FinalRequestOptions.construct(method="delete", url=path, json_data=body, **options)
1761 return await self.request(cast_to, opts)
1762
1763 def get_api_list(
1764 self,
1765 path: str,
1766 *,
1767 model: Type[_T],
1768 page: Type[AsyncPageT],
1769 body: Body | None = None,
1770 options: RequestOptions = {},
1771 method: str = "get",
1772 ) -> AsyncPaginator[_T, AsyncPageT]:
1773 opts = FinalRequestOptions.construct(method=method, url=path, json_data=body, **options)
1774 return self._request_api_list(model, page, opts)
1775
1776
1777def make_request_options(
1778 *,
1779 query: Query | None = None,
1780 extra_headers: Headers | None = None,
1781 extra_query: Query | None = None,
1782 extra_body: Body | None = None,
1783 idempotency_key: str | None = None,
1784 timeout: float | httpx.Timeout | None | NotGiven = NOT_GIVEN,
1785 post_parser: PostParser | NotGiven = NOT_GIVEN,
1786) -> RequestOptions:
1787 """Create a dict of type RequestOptions without keys of NotGiven values."""
1788 options: RequestOptions = {}
1789 if extra_headers is not None:
1790 options["headers"] = extra_headers
1791
1792 if extra_body is not None:
1793 options["extra_json"] = cast(AnyMapping, extra_body)
1794
1795 if query is not None:
1796 options["params"] = query
1797
1798 if extra_query is not None:
1799 options["params"] = {**options.get("params", {}), **extra_query}
1800
1801 if not isinstance(timeout, NotGiven):
1802 options["timeout"] = timeout
1803
1804 if idempotency_key is not None:
1805 options["idempotency_key"] = idempotency_key
1806
1807 if is_given(post_parser):
1808 # internal
1809 options["post_parser"] = post_parser # type: ignore
1810
1811 return options
1812
1813
1814class OtherPlatform:
1815 def __init__(self, name: str) -> None:
1816 self.name = name
1817
1818 @override
1819 def __str__(self) -> str:
1820 return f"Other:{self.name}"
1821
1822
1823Platform = Union[
1824 OtherPlatform,
1825 Literal[
1826 "MacOS",
1827 "Linux",
1828 "Windows",
1829 "FreeBSD",
1830 "OpenBSD",
1831 "iOS",
1832 "Android",
1833 "Unknown",
1834 ],
1835]
1836
1837
1838def get_platform() -> Platform:
1839 try:
1840 system = platform.system().lower()
1841 platform_name = platform.platform().lower()
1842 except Exception:
1843 return "Unknown"
1844
1845 if "iphone" in platform_name or "ipad" in platform_name:
1846 # Tested using Python3IDE on an iPhone 11 and Pythonista on an iPad 7
1847 # system is Darwin and platform_name is a string like:
1848 # - Darwin-21.6.0-iPhone12,1-64bit
1849 # - Darwin-21.6.0-iPad7,11-64bit
1850 return "iOS"
1851
1852 if system == "darwin":
1853 return "MacOS"
1854
1855 if system == "windows":
1856 return "Windows"
1857
1858 if "android" in platform_name:
1859 # Tested using Pydroid 3
1860 # system is Linux and platform_name is a string like 'Linux-5.10.81-android12-9-00001-geba40aecb3b7-ab8534902-aarch64-with-libc'
1861 return "Android"
1862
1863 if system == "linux":
1864 # https://distro.readthedocs.io/en/latest/#distro.id
1865 distro_id = distro.id()
1866 if distro_id == "freebsd":
1867 return "FreeBSD"
1868
1869 if distro_id == "openbsd":
1870 return "OpenBSD"
1871
1872 return "Linux"
1873
1874 if platform_name:
1875 return OtherPlatform(platform_name)
1876
1877 return "Unknown"
1878
1879
1880@lru_cache(maxsize=None)
1881def platform_headers(version: str) -> Dict[str, str]:
1882 return {
1883 "X-Stainless-Lang": "python",
1884 "X-Stainless-Package-Version": version,
1885 "X-Stainless-OS": str(get_platform()),
1886 "X-Stainless-Arch": str(get_architecture()),
1887 "X-Stainless-Runtime": get_python_runtime(),
1888 "X-Stainless-Runtime-Version": get_python_version(),
1889 }
1890
1891
1892class OtherArch:
1893 def __init__(self, name: str) -> None:
1894 self.name = name
1895
1896 @override
1897 def __str__(self) -> str:
1898 return f"other:{self.name}"
1899
1900
1901Arch = Union[OtherArch, Literal["x32", "x64", "arm", "arm64", "unknown"]]
1902
1903
1904def get_python_runtime() -> str:
1905 try:
1906 return platform.python_implementation()
1907 except Exception:
1908 return "unknown"
1909
1910
1911def get_python_version() -> str:
1912 try:
1913 return platform.python_version()
1914 except Exception:
1915 return "unknown"
1916
1917
1918def get_architecture() -> Arch:
1919 try:
1920 python_bitness, _ = platform.architecture()
1921 machine = platform.machine().lower()
1922 except Exception:
1923 return "unknown"
1924
1925 if machine in ("arm64", "aarch64"):
1926 return "arm64"
1927
1928 # TODO: untested
1929 if machine == "arm":
1930 return "arm"
1931
1932 if machine == "x86_64":
1933 return "x64"
1934
1935 # TODO: untested
1936 if python_bitness == "32bit":
1937 return "x32"
1938
1939 if machine:
1940 return OtherArch(machine)
1941
1942 return "unknown"
1943
1944
1945def _merge_mappings(
1946 obj1: Mapping[_T_co, Union[_T, Omit]],
1947 obj2: Mapping[_T_co, Union[_T, Omit]],
1948) -> Dict[_T_co, _T]:
1949 """Merge two mappings of the same type, removing any values that are instances of `Omit`.
1950
1951 In cases with duplicate keys the second mapping takes precedence.
1952 """
1953 merged = {**obj1, **obj2}
1954 return {key: value for key, value in merged.items() if not isinstance(value, Omit)}
1955