openai/openai-python

Public

mirrored from https://github.com/openai/openai-pythonAvailable

CodeCommitsIssuesPull requestsActionsInsightsSecurity
fb69e674f3caaf451ad55ad92d430d0a50e7c0a4

Branches

Tags

  • No tags available.
0Branches0Tags
Go to file
Add file
Code

Clone

HTTPS

Download ZIP

src/openai/_base_client.py

2000lines · modecode

1from __future__ import annotations
2
3import sys
4import json
5import time
6import uuid
7import email
8import asyncio
9import inspect
10import logging
11import platform
12import email.utils
13from types import TracebackType
14from random import random
15from typing import (
16 TYPE_CHECKING,
17 Any,
18 Dict,
19 Type,
20 Union,
21 Generic,
22 Mapping,
23 TypeVar,
24 Iterable,
25 Iterator,
26 Optional,
27 Generator,
28 AsyncIterator,
29 cast,
30 overload,
31)
32from typing_extensions import Literal, override, get_origin
33
34import anyio
35import httpx
36import distro
37import pydantic
38from httpx import URL
39from pydantic import PrivateAttr
40
41from . import _exceptions
42from ._qs import Querystring
43from ._files import to_httpx_files, async_to_httpx_files
44from ._types import (
45 NOT_GIVEN,
46 Body,
47 Omit,
48 Query,
49 Headers,
50 Timeout,
51 NotGiven,
52 ResponseT,
53 AnyMapping,
54 PostParser,
55 RequestFiles,
56 HttpxSendArgs,
57 RequestOptions,
58 HttpxRequestFiles,
59 ModelBuilderProtocol,
60)
61from ._utils import SensitiveHeadersFilter, is_dict, is_list, asyncify, is_given, lru_cache, is_mapping
62from ._compat import PYDANTIC_V2, model_copy, model_dump
63from ._models import GenericModel, FinalRequestOptions, validate_type, construct_type
64from ._response import (
65 APIResponse,
66 BaseAPIResponse,
67 AsyncAPIResponse,
68 extract_response_type,
69)
70from ._constants import (
71 DEFAULT_TIMEOUT,
72 MAX_RETRY_DELAY,
73 DEFAULT_MAX_RETRIES,
74 INITIAL_RETRY_DELAY,
75 RAW_RESPONSE_HEADER,
76 OVERRIDE_CAST_TO_HEADER,
77 DEFAULT_CONNECTION_LIMITS,
78)
79from ._streaming import Stream, SSEDecoder, AsyncStream, SSEBytesDecoder
80from ._exceptions import (
81 APIStatusError,
82 APITimeoutError,
83 APIConnectionError,
84 APIResponseValidationError,
85)
86from ._legacy_response import LegacyAPIResponse
87
88log: logging.Logger = logging.getLogger(__name__)
89log.addFilter(SensitiveHeadersFilter())
90
91# TODO: make base page type vars covariant
92SyncPageT = TypeVar("SyncPageT", bound="BaseSyncPage[Any]")
93AsyncPageT = TypeVar("AsyncPageT", bound="BaseAsyncPage[Any]")
94
95
96_T = TypeVar("_T")
97_T_co = TypeVar("_T_co", covariant=True)
98
99_StreamT = TypeVar("_StreamT", bound=Stream[Any])
100_AsyncStreamT = TypeVar("_AsyncStreamT", bound=AsyncStream[Any])
101
102if TYPE_CHECKING:
103 from httpx._config import DEFAULT_TIMEOUT_CONFIG as HTTPX_DEFAULT_TIMEOUT
104else:
105 try:
106 from httpx._config import DEFAULT_TIMEOUT_CONFIG as HTTPX_DEFAULT_TIMEOUT
107 except ImportError:
108 # taken from https://github.com/encode/httpx/blob/3ba5fe0d7ac70222590e759c31442b1cab263791/httpx/_config.py#L366
109 HTTPX_DEFAULT_TIMEOUT = Timeout(5.0)
110
111
112class PageInfo:
113 """Stores the necessary information to build the request to retrieve the next page.
114
115 Either `url` or `params` must be set.
116 """
117
118 url: URL | NotGiven
119 params: Query | NotGiven
120
121 @overload
122 def __init__(
123 self,
124 *,
125 url: URL,
126 ) -> None: ...
127
128 @overload
129 def __init__(
130 self,
131 *,
132 params: Query,
133 ) -> None: ...
134
135 def __init__(
136 self,
137 *,
138 url: URL | NotGiven = NOT_GIVEN,
139 params: Query | NotGiven = NOT_GIVEN,
140 ) -> None:
141 self.url = url
142 self.params = params
143
144 @override
145 def __repr__(self) -> str:
146 if self.url:
147 return f"{self.__class__.__name__}(url={self.url})"
148 return f"{self.__class__.__name__}(params={self.params})"
149
150
151class BasePage(GenericModel, Generic[_T]):
152 """
153 Defines the core interface for pagination.
154
155 Type Args:
156 ModelT: The pydantic model that represents an item in the response.
157
158 Methods:
159 has_next_page(): Check if there is another page available
160 next_page_info(): Get the necessary information to make a request for the next page
161 """
162
163 _options: FinalRequestOptions = PrivateAttr()
164 _model: Type[_T] = PrivateAttr()
165
166 def has_next_page(self) -> bool:
167 items = self._get_page_items()
168 if not items:
169 return False
170 return self.next_page_info() is not None
171
172 def next_page_info(self) -> Optional[PageInfo]: ...
173
174 def _get_page_items(self) -> Iterable[_T]: # type: ignore[empty-body]
175 ...
176
177 def _params_from_url(self, url: URL) -> httpx.QueryParams:
178 # TODO: do we have to preprocess params here?
179 return httpx.QueryParams(cast(Any, self._options.params)).merge(url.params)
180
181 def _info_to_options(self, info: PageInfo) -> FinalRequestOptions:
182 options = model_copy(self._options)
183 options._strip_raw_response_header()
184
185 if not isinstance(info.params, NotGiven):
186 options.params = {**options.params, **info.params}
187 return options
188
189 if not isinstance(info.url, NotGiven):
190 params = self._params_from_url(info.url)
191 url = info.url.copy_with(params=params)
192 options.params = dict(url.params)
193 options.url = str(url)
194 return options
195
196 raise ValueError("Unexpected PageInfo state")
197
198
199class BaseSyncPage(BasePage[_T], Generic[_T]):
200 _client: SyncAPIClient = pydantic.PrivateAttr()
201
202 def _set_private_attributes(
203 self,
204 client: SyncAPIClient,
205 model: Type[_T],
206 options: FinalRequestOptions,
207 ) -> None:
208 if PYDANTIC_V2 and getattr(self, "__pydantic_private__", None) is None:
209 self.__pydantic_private__ = {}
210
211 self._model = model
212 self._client = client
213 self._options = options
214
215 # Pydantic uses a custom `__iter__` method to support casting BaseModels
216 # to dictionaries. e.g. dict(model).
217 # As we want to support `for item in page`, this is inherently incompatible
218 # with the default pydantic behaviour. It is not possible to support both
219 # use cases at once. Fortunately, this is not a big deal as all other pydantic
220 # methods should continue to work as expected as there is an alternative method
221 # to cast a model to a dictionary, model.dict(), which is used internally
222 # by pydantic.
223 def __iter__(self) -> Iterator[_T]: # type: ignore
224 for page in self.iter_pages():
225 for item in page._get_page_items():
226 yield item
227
228 def iter_pages(self: SyncPageT) -> Iterator[SyncPageT]:
229 page = self
230 while True:
231 yield page
232 if page.has_next_page():
233 page = page.get_next_page()
234 else:
235 return
236
237 def get_next_page(self: SyncPageT) -> SyncPageT:
238 info = self.next_page_info()
239 if not info:
240 raise RuntimeError(
241 "No next page expected; please check `.has_next_page()` before calling `.get_next_page()`."
242 )
243
244 options = self._info_to_options(info)
245 return self._client._request_api_list(self._model, page=self.__class__, options=options)
246
247
248class AsyncPaginator(Generic[_T, AsyncPageT]):
249 def __init__(
250 self,
251 client: AsyncAPIClient,
252 options: FinalRequestOptions,
253 page_cls: Type[AsyncPageT],
254 model: Type[_T],
255 ) -> None:
256 self._model = model
257 self._client = client
258 self._options = options
259 self._page_cls = page_cls
260
261 def __await__(self) -> Generator[Any, None, AsyncPageT]:
262 return self._get_page().__await__()
263
264 async def _get_page(self) -> AsyncPageT:
265 def _parser(resp: AsyncPageT) -> AsyncPageT:
266 resp._set_private_attributes(
267 model=self._model,
268 options=self._options,
269 client=self._client,
270 )
271 return resp
272
273 self._options.post_parser = _parser
274
275 return await self._client.request(self._page_cls, self._options)
276
277 async def __aiter__(self) -> AsyncIterator[_T]:
278 # https://github.com/microsoft/pyright/issues/3464
279 page = cast(
280 AsyncPageT,
281 await self, # type: ignore
282 )
283 async for item in page:
284 yield item
285
286
287class BaseAsyncPage(BasePage[_T], Generic[_T]):
288 _client: AsyncAPIClient = pydantic.PrivateAttr()
289
290 def _set_private_attributes(
291 self,
292 model: Type[_T],
293 client: AsyncAPIClient,
294 options: FinalRequestOptions,
295 ) -> None:
296 if PYDANTIC_V2 and getattr(self, "__pydantic_private__", None) is None:
297 self.__pydantic_private__ = {}
298
299 self._model = model
300 self._client = client
301 self._options = options
302
303 async def __aiter__(self) -> AsyncIterator[_T]:
304 async for page in self.iter_pages():
305 for item in page._get_page_items():
306 yield item
307
308 async def iter_pages(self: AsyncPageT) -> AsyncIterator[AsyncPageT]:
309 page = self
310 while True:
311 yield page
312 if page.has_next_page():
313 page = await page.get_next_page()
314 else:
315 return
316
317 async def get_next_page(self: AsyncPageT) -> AsyncPageT:
318 info = self.next_page_info()
319 if not info:
320 raise RuntimeError(
321 "No next page expected; please check `.has_next_page()` before calling `.get_next_page()`."
322 )
323
324 options = self._info_to_options(info)
325 return await self._client._request_api_list(self._model, page=self.__class__, options=options)
326
327
328_HttpxClientT = TypeVar("_HttpxClientT", bound=Union[httpx.Client, httpx.AsyncClient])
329_DefaultStreamT = TypeVar("_DefaultStreamT", bound=Union[Stream[Any], AsyncStream[Any]])
330
331
332class BaseClient(Generic[_HttpxClientT, _DefaultStreamT]):
333 _client: _HttpxClientT
334 _version: str
335 _base_url: URL
336 max_retries: int
337 timeout: Union[float, Timeout, None]
338 _strict_response_validation: bool
339 _idempotency_header: str | None
340 _default_stream_cls: type[_DefaultStreamT] | None = None
341
342 def __init__(
343 self,
344 *,
345 version: str,
346 base_url: str | URL,
347 _strict_response_validation: bool,
348 max_retries: int = DEFAULT_MAX_RETRIES,
349 timeout: float | Timeout | None = DEFAULT_TIMEOUT,
350 custom_headers: Mapping[str, str] | None = None,
351 custom_query: Mapping[str, object] | None = None,
352 ) -> None:
353 self._version = version
354 self._base_url = self._enforce_trailing_slash(URL(base_url))
355 self.max_retries = max_retries
356 self.timeout = timeout
357 self._custom_headers = custom_headers or {}
358 self._custom_query = custom_query or {}
359 self._strict_response_validation = _strict_response_validation
360 self._idempotency_header = None
361 self._platform: Platform | None = None
362
363 if max_retries is None: # pyright: ignore[reportUnnecessaryComparison]
364 raise TypeError(
365 "max_retries cannot be None. If you want to disable retries, pass `0`; if you want unlimited retries, pass `math.inf` or a very high number; if you want the default behavior, pass `openai.DEFAULT_MAX_RETRIES`"
366 )
367
368 def _enforce_trailing_slash(self, url: URL) -> URL:
369 if url.raw_path.endswith(b"/"):
370 return url
371 return url.copy_with(raw_path=url.raw_path + b"/")
372
373 def _make_status_error_from_response(
374 self,
375 response: httpx.Response,
376 ) -> APIStatusError:
377 if response.is_closed and not response.is_stream_consumed:
378 # We can't read the response body as it has been closed
379 # before it was read. This can happen if an event hook
380 # raises a status error.
381 body = None
382 err_msg = f"Error code: {response.status_code}"
383 else:
384 err_text = response.text.strip()
385 body = err_text
386
387 try:
388 body = json.loads(err_text)
389 err_msg = f"Error code: {response.status_code} - {body}"
390 except Exception:
391 err_msg = err_text or f"Error code: {response.status_code}"
392
393 return self._make_status_error(err_msg, body=body, response=response)
394
395 def _make_status_error(
396 self,
397 err_msg: str,
398 *,
399 body: object,
400 response: httpx.Response,
401 ) -> _exceptions.APIStatusError:
402 raise NotImplementedError()
403
404 def _build_headers(self, options: FinalRequestOptions, *, retries_taken: int = 0) -> httpx.Headers:
405 custom_headers = options.headers or {}
406 headers_dict = _merge_mappings(self.default_headers, custom_headers)
407 self._validate_headers(headers_dict, custom_headers)
408
409 # headers are case-insensitive while dictionaries are not.
410 headers = httpx.Headers(headers_dict)
411
412 idempotency_header = self._idempotency_header
413 if idempotency_header and options.method.lower() != "get" and idempotency_header not in headers:
414 headers[idempotency_header] = options.idempotency_key or self._idempotency_key()
415
416 # Don't set these headers if they were already set or removed by the caller. We check
417 # `custom_headers`, which can contain `Omit()`, instead of `headers` to account for the removal case.
418 lower_custom_headers = [header.lower() for header in custom_headers]
419 if "x-stainless-retry-count" not in lower_custom_headers:
420 headers["x-stainless-retry-count"] = str(retries_taken)
421 if "x-stainless-read-timeout" not in lower_custom_headers:
422 timeout = self.timeout if isinstance(options.timeout, NotGiven) else options.timeout
423 if isinstance(timeout, Timeout):
424 timeout = timeout.read
425 if timeout is not None:
426 headers["x-stainless-read-timeout"] = str(timeout)
427
428 return headers
429
430 def _prepare_url(self, url: str) -> URL:
431 """
432 Merge a URL argument together with any 'base_url' on the client,
433 to create the URL used for the outgoing request.
434 """
435 # Copied from httpx's `_merge_url` method.
436 merge_url = URL(url)
437 if merge_url.is_relative_url:
438 merge_raw_path = self.base_url.raw_path + merge_url.raw_path.lstrip(b"/")
439 return self.base_url.copy_with(raw_path=merge_raw_path)
440
441 return merge_url
442
443 def _make_sse_decoder(self) -> SSEDecoder | SSEBytesDecoder:
444 return SSEDecoder()
445
446 def _build_request(
447 self,
448 options: FinalRequestOptions,
449 *,
450 retries_taken: int = 0,
451 ) -> httpx.Request:
452 if log.isEnabledFor(logging.DEBUG):
453 log.debug("Request options: %s", model_dump(options, exclude_unset=True))
454
455 kwargs: dict[str, Any] = {}
456
457 json_data = options.json_data
458 if options.extra_json is not None:
459 if json_data is None:
460 json_data = cast(Body, options.extra_json)
461 elif is_mapping(json_data):
462 json_data = _merge_mappings(json_data, options.extra_json)
463 else:
464 raise RuntimeError(f"Unexpected JSON data type, {type(json_data)}, cannot merge with `extra_body`")
465
466 headers = self._build_headers(options, retries_taken=retries_taken)
467 params = _merge_mappings(self.default_query, options.params)
468 content_type = headers.get("Content-Type")
469 files = options.files
470
471 # If the given Content-Type header is multipart/form-data then it
472 # has to be removed so that httpx can generate the header with
473 # additional information for us as it has to be in this form
474 # for the server to be able to correctly parse the request:
475 # multipart/form-data; boundary=---abc--
476 if content_type is not None and content_type.startswith("multipart/form-data"):
477 if "boundary" not in content_type:
478 # only remove the header if the boundary hasn't been explicitly set
479 # as the caller doesn't want httpx to come up with their own boundary
480 headers.pop("Content-Type")
481
482 # As we are now sending multipart/form-data instead of application/json
483 # we need to tell httpx to use it, https://www.python-httpx.org/advanced/clients/#multipart-file-encoding
484 if json_data:
485 if not is_dict(json_data):
486 raise TypeError(
487 f"Expected query input to be a dictionary for multipart requests but got {type(json_data)} instead."
488 )
489 kwargs["data"] = self._serialize_multipartform(json_data)
490
491 # httpx determines whether or not to send a "multipart/form-data"
492 # request based on the truthiness of the "files" argument.
493 # This gets around that issue by generating a dict value that
494 # evaluates to true.
495 #
496 # https://github.com/encode/httpx/discussions/2399#discussioncomment-3814186
497 if not files:
498 files = cast(HttpxRequestFiles, ForceMultipartDict())
499
500 prepared_url = self._prepare_url(options.url)
501 if "_" in prepared_url.host:
502 # work around https://github.com/encode/httpx/discussions/2880
503 kwargs["extensions"] = {"sni_hostname": prepared_url.host.replace("_", "-")}
504
505 # TODO: report this error to httpx
506 return self._client.build_request( # pyright: ignore[reportUnknownMemberType]
507 headers=headers,
508 timeout=self.timeout if isinstance(options.timeout, NotGiven) else options.timeout,
509 method=options.method,
510 url=prepared_url,
511 # the `Query` type that we use is incompatible with qs'
512 # `Params` type as it needs to be typed as `Mapping[str, object]`
513 # so that passing a `TypedDict` doesn't cause an error.
514 # https://github.com/microsoft/pyright/issues/3526#event-6715453066
515 params=self.qs.stringify(cast(Mapping[str, Any], params)) if params else None,
516 json=json_data if is_given(json_data) else None,
517 files=files,
518 **kwargs,
519 )
520
521 def _serialize_multipartform(self, data: Mapping[object, object]) -> dict[str, object]:
522 items = self.qs.stringify_items(
523 # TODO: type ignore is required as stringify_items is well typed but we can't be
524 # well typed without heavy validation.
525 data, # type: ignore
526 array_format="brackets",
527 )
528 serialized: dict[str, object] = {}
529 for key, value in items:
530 existing = serialized.get(key)
531
532 if not existing:
533 serialized[key] = value
534 continue
535
536 # If a value has already been set for this key then that
537 # means we're sending data like `array[]=[1, 2, 3]` and we
538 # need to tell httpx that we want to send multiple values with
539 # the same key which is done by using a list or a tuple.
540 #
541 # Note: 2d arrays should never result in the same key at both
542 # levels so it's safe to assume that if the value is a list,
543 # it was because we changed it to be a list.
544 if is_list(existing):
545 existing.append(value)
546 else:
547 serialized[key] = [existing, value]
548
549 return serialized
550
551 def _maybe_override_cast_to(self, cast_to: type[ResponseT], options: FinalRequestOptions) -> type[ResponseT]:
552 if not is_given(options.headers):
553 return cast_to
554
555 # make a copy of the headers so we don't mutate user-input
556 headers = dict(options.headers)
557
558 # we internally support defining a temporary header to override the
559 # default `cast_to` type for use with `.with_raw_response` and `.with_streaming_response`
560 # see _response.py for implementation details
561 override_cast_to = headers.pop(OVERRIDE_CAST_TO_HEADER, NOT_GIVEN)
562 if is_given(override_cast_to):
563 options.headers = headers
564 return cast(Type[ResponseT], override_cast_to)
565
566 return cast_to
567
568 def _should_stream_response_body(self, request: httpx.Request) -> bool:
569 return request.headers.get(RAW_RESPONSE_HEADER) == "stream" # type: ignore[no-any-return]
570
571 def _process_response_data(
572 self,
573 *,
574 data: object,
575 cast_to: type[ResponseT],
576 response: httpx.Response,
577 ) -> ResponseT:
578 if data is None:
579 return cast(ResponseT, None)
580
581 if cast_to is object:
582 return cast(ResponseT, data)
583
584 try:
585 if inspect.isclass(cast_to) and issubclass(cast_to, ModelBuilderProtocol):
586 return cast(ResponseT, cast_to.build(response=response, data=data))
587
588 if self._strict_response_validation:
589 return cast(ResponseT, validate_type(type_=cast_to, value=data))
590
591 return cast(ResponseT, construct_type(type_=cast_to, value=data))
592 except pydantic.ValidationError as err:
593 raise APIResponseValidationError(response=response, body=data) from err
594
595 @property
596 def qs(self) -> Querystring:
597 return Querystring()
598
599 @property
600 def custom_auth(self) -> httpx.Auth | None:
601 return None
602
603 @property
604 def auth_headers(self) -> dict[str, str]:
605 return {}
606
607 @property
608 def default_headers(self) -> dict[str, str | Omit]:
609 return {
610 "Accept": "application/json",
611 "Content-Type": "application/json",
612 "User-Agent": self.user_agent,
613 **self.platform_headers(),
614 **self.auth_headers,
615 **self._custom_headers,
616 }
617
618 @property
619 def default_query(self) -> dict[str, object]:
620 return {
621 **self._custom_query,
622 }
623
624 def _validate_headers(
625 self,
626 headers: Headers, # noqa: ARG002
627 custom_headers: Headers, # noqa: ARG002
628 ) -> None:
629 """Validate the given default headers and custom headers.
630
631 Does nothing by default.
632 """
633 return
634
635 @property
636 def user_agent(self) -> str:
637 return f"{self.__class__.__name__}/Python {self._version}"
638
639 @property
640 def base_url(self) -> URL:
641 return self._base_url
642
643 @base_url.setter
644 def base_url(self, url: URL | str) -> None:
645 self._base_url = self._enforce_trailing_slash(url if isinstance(url, URL) else URL(url))
646
647 def platform_headers(self) -> Dict[str, str]:
648 # the actual implementation is in a separate `lru_cache` decorated
649 # function because adding `lru_cache` to methods will leak memory
650 # https://github.com/python/cpython/issues/88476
651 return platform_headers(self._version, platform=self._platform)
652
653 def _parse_retry_after_header(self, response_headers: Optional[httpx.Headers] = None) -> float | None:
654 """Returns a float of the number of seconds (not milliseconds) to wait after retrying, or None if unspecified.
655
656 About the Retry-After header: https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Retry-After
657 See also https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Retry-After#syntax
658 """
659 if response_headers is None:
660 return None
661
662 # First, try the non-standard `retry-after-ms` header for milliseconds,
663 # which is more precise than integer-seconds `retry-after`
664 try:
665 retry_ms_header = response_headers.get("retry-after-ms", None)
666 return float(retry_ms_header) / 1000
667 except (TypeError, ValueError):
668 pass
669
670 # Next, try parsing `retry-after` header as seconds (allowing nonstandard floats).
671 retry_header = response_headers.get("retry-after")
672 try:
673 # note: the spec indicates that this should only ever be an integer
674 # but if someone sends a float there's no reason for us to not respect it
675 return float(retry_header)
676 except (TypeError, ValueError):
677 pass
678
679 # Last, try parsing `retry-after` as a date.
680 retry_date_tuple = email.utils.parsedate_tz(retry_header)
681 if retry_date_tuple is None:
682 return None
683
684 retry_date = email.utils.mktime_tz(retry_date_tuple)
685 return float(retry_date - time.time())
686
687 def _calculate_retry_timeout(
688 self,
689 remaining_retries: int,
690 options: FinalRequestOptions,
691 response_headers: Optional[httpx.Headers] = None,
692 ) -> float:
693 max_retries = options.get_max_retries(self.max_retries)
694
695 # If the API asks us to wait a certain amount of time (and it's a reasonable amount), just do what it says.
696 retry_after = self._parse_retry_after_header(response_headers)
697 if retry_after is not None and 0 < retry_after <= 60:
698 return retry_after
699
700 # Also cap retry count to 1000 to avoid any potential overflows with `pow`
701 nb_retries = min(max_retries - remaining_retries, 1000)
702
703 # Apply exponential backoff, but not more than the max.
704 sleep_seconds = min(INITIAL_RETRY_DELAY * pow(2.0, nb_retries), MAX_RETRY_DELAY)
705
706 # Apply some jitter, plus-or-minus half a second.
707 jitter = 1 - 0.25 * random()
708 timeout = sleep_seconds * jitter
709 return timeout if timeout >= 0 else 0
710
711 def _should_retry(self, response: httpx.Response) -> bool:
712 # Note: this is not a standard header
713 should_retry_header = response.headers.get("x-should-retry")
714
715 # If the server explicitly says whether or not to retry, obey.
716 if should_retry_header == "true":
717 log.debug("Retrying as header `x-should-retry` is set to `true`")
718 return True
719 if should_retry_header == "false":
720 log.debug("Not retrying as header `x-should-retry` is set to `false`")
721 return False
722
723 # Retry on request timeouts.
724 if response.status_code == 408:
725 log.debug("Retrying due to status code %i", response.status_code)
726 return True
727
728 # Retry on lock timeouts.
729 if response.status_code == 409:
730 log.debug("Retrying due to status code %i", response.status_code)
731 return True
732
733 # Retry on rate limits.
734 if response.status_code == 429:
735 log.debug("Retrying due to status code %i", response.status_code)
736 return True
737
738 # Retry internal errors.
739 if response.status_code >= 500:
740 log.debug("Retrying due to status code %i", response.status_code)
741 return True
742
743 log.debug("Not retrying")
744 return False
745
746 def _idempotency_key(self) -> str:
747 return f"stainless-python-retry-{uuid.uuid4()}"
748
749
750class _DefaultHttpxClient(httpx.Client):
751 def __init__(self, **kwargs: Any) -> None:
752 kwargs.setdefault("timeout", DEFAULT_TIMEOUT)
753 kwargs.setdefault("limits", DEFAULT_CONNECTION_LIMITS)
754 kwargs.setdefault("follow_redirects", True)
755 super().__init__(**kwargs)
756
757
758if TYPE_CHECKING:
759 DefaultHttpxClient = httpx.Client
760 """An alias to `httpx.Client` that provides the same defaults that this SDK
761 uses internally.
762
763 This is useful because overriding the `http_client` with your own instance of
764 `httpx.Client` will result in httpx's defaults being used, not ours.
765 """
766else:
767 DefaultHttpxClient = _DefaultHttpxClient
768
769
770class SyncHttpxClientWrapper(DefaultHttpxClient):
771 def __del__(self) -> None:
772 if self.is_closed:
773 return
774
775 try:
776 self.close()
777 except Exception:
778 pass
779
780
781class SyncAPIClient(BaseClient[httpx.Client, Stream[Any]]):
782 _client: httpx.Client
783 _default_stream_cls: type[Stream[Any]] | None = None
784
785 def __init__(
786 self,
787 *,
788 version: str,
789 base_url: str | URL,
790 max_retries: int = DEFAULT_MAX_RETRIES,
791 timeout: float | Timeout | None | NotGiven = NOT_GIVEN,
792 http_client: httpx.Client | None = None,
793 custom_headers: Mapping[str, str] | None = None,
794 custom_query: Mapping[str, object] | None = None,
795 _strict_response_validation: bool,
796 ) -> None:
797 if not is_given(timeout):
798 # if the user passed in a custom http client with a non-default
799 # timeout set then we use that timeout.
800 #
801 # note: there is an edge case here where the user passes in a client
802 # where they've explicitly set the timeout to match the default timeout
803 # as this check is structural, meaning that we'll think they didn't
804 # pass in a timeout and will ignore it
805 if http_client and http_client.timeout != HTTPX_DEFAULT_TIMEOUT:
806 timeout = http_client.timeout
807 else:
808 timeout = DEFAULT_TIMEOUT
809
810 if http_client is not None and not isinstance(http_client, httpx.Client): # pyright: ignore[reportUnnecessaryIsInstance]
811 raise TypeError(
812 f"Invalid `http_client` argument; Expected an instance of `httpx.Client` but got {type(http_client)}"
813 )
814
815 super().__init__(
816 version=version,
817 # cast to a valid type because mypy doesn't understand our type narrowing
818 timeout=cast(Timeout, timeout),
819 base_url=base_url,
820 max_retries=max_retries,
821 custom_query=custom_query,
822 custom_headers=custom_headers,
823 _strict_response_validation=_strict_response_validation,
824 )
825 self._client = http_client or SyncHttpxClientWrapper(
826 base_url=base_url,
827 # cast to a valid type because mypy doesn't understand our type narrowing
828 timeout=cast(Timeout, timeout),
829 )
830
831 def is_closed(self) -> bool:
832 return self._client.is_closed
833
834 def close(self) -> None:
835 """Close the underlying HTTPX client.
836
837 The client will *not* be usable after this.
838 """
839 # If an error is thrown while constructing a client, self._client
840 # may not be present
841 if hasattr(self, "_client"):
842 self._client.close()
843
844 def __enter__(self: _T) -> _T:
845 return self
846
847 def __exit__(
848 self,
849 exc_type: type[BaseException] | None,
850 exc: BaseException | None,
851 exc_tb: TracebackType | None,
852 ) -> None:
853 self.close()
854
855 def _prepare_options(
856 self,
857 options: FinalRequestOptions, # noqa: ARG002
858 ) -> FinalRequestOptions:
859 """Hook for mutating the given options"""
860 return options
861
862 def _prepare_request(
863 self,
864 request: httpx.Request, # noqa: ARG002
865 ) -> None:
866 """This method is used as a callback for mutating the `Request` object
867 after it has been constructed.
868 This is useful for cases where you want to add certain headers based off of
869 the request properties, e.g. `url`, `method` etc.
870 """
871 return None
872
873 @overload
874 def request(
875 self,
876 cast_to: Type[ResponseT],
877 options: FinalRequestOptions,
878 remaining_retries: Optional[int] = None,
879 *,
880 stream: Literal[True],
881 stream_cls: Type[_StreamT],
882 ) -> _StreamT: ...
883
884 @overload
885 def request(
886 self,
887 cast_to: Type[ResponseT],
888 options: FinalRequestOptions,
889 remaining_retries: Optional[int] = None,
890 *,
891 stream: Literal[False] = False,
892 ) -> ResponseT: ...
893
894 @overload
895 def request(
896 self,
897 cast_to: Type[ResponseT],
898 options: FinalRequestOptions,
899 remaining_retries: Optional[int] = None,
900 *,
901 stream: bool = False,
902 stream_cls: Type[_StreamT] | None = None,
903 ) -> ResponseT | _StreamT: ...
904
905 def request(
906 self,
907 cast_to: Type[ResponseT],
908 options: FinalRequestOptions,
909 remaining_retries: Optional[int] = None,
910 *,
911 stream: bool = False,
912 stream_cls: type[_StreamT] | None = None,
913 ) -> ResponseT | _StreamT:
914 if remaining_retries is not None:
915 retries_taken = options.get_max_retries(self.max_retries) - remaining_retries
916 else:
917 retries_taken = 0
918
919 return self._request(
920 cast_to=cast_to,
921 options=options,
922 stream=stream,
923 stream_cls=stream_cls,
924 retries_taken=retries_taken,
925 )
926
927 def _request(
928 self,
929 *,
930 cast_to: Type[ResponseT],
931 options: FinalRequestOptions,
932 retries_taken: int,
933 stream: bool,
934 stream_cls: type[_StreamT] | None,
935 ) -> ResponseT | _StreamT:
936 # create a copy of the options we were given so that if the
937 # options are mutated later & we then retry, the retries are
938 # given the original options
939 input_options = model_copy(options)
940
941 cast_to = self._maybe_override_cast_to(cast_to, options)
942 options = self._prepare_options(options)
943
944 remaining_retries = options.get_max_retries(self.max_retries) - retries_taken
945 request = self._build_request(options, retries_taken=retries_taken)
946 self._prepare_request(request)
947
948 kwargs: HttpxSendArgs = {}
949 if self.custom_auth is not None:
950 kwargs["auth"] = self.custom_auth
951
952 log.debug("Sending HTTP Request: %s %s", request.method, request.url)
953
954 try:
955 response = self._client.send(
956 request,
957 stream=stream or self._should_stream_response_body(request=request),
958 **kwargs,
959 )
960 except httpx.TimeoutException as err:
961 log.debug("Encountered httpx.TimeoutException", exc_info=True)
962
963 if remaining_retries > 0:
964 return self._retry_request(
965 input_options,
966 cast_to,
967 retries_taken=retries_taken,
968 stream=stream,
969 stream_cls=stream_cls,
970 response_headers=None,
971 )
972
973 log.debug("Raising timeout error")
974 raise APITimeoutError(request=request) from err
975 except Exception as err:
976 log.debug("Encountered Exception", exc_info=True)
977
978 if remaining_retries > 0:
979 return self._retry_request(
980 input_options,
981 cast_to,
982 retries_taken=retries_taken,
983 stream=stream,
984 stream_cls=stream_cls,
985 response_headers=None,
986 )
987
988 log.debug("Raising connection error")
989 raise APIConnectionError(request=request) from err
990
991 log.debug(
992 'HTTP Response: %s %s "%i %s" %s',
993 request.method,
994 request.url,
995 response.status_code,
996 response.reason_phrase,
997 response.headers,
998 )
999 log.debug("request_id: %s", response.headers.get("x-request-id"))
1000
1001 try:
1002 response.raise_for_status()
1003 except httpx.HTTPStatusError as err: # thrown on 4xx and 5xx status code
1004 log.debug("Encountered httpx.HTTPStatusError", exc_info=True)
1005
1006 if remaining_retries > 0 and self._should_retry(err.response):
1007 err.response.close()
1008 return self._retry_request(
1009 input_options,
1010 cast_to,
1011 retries_taken=retries_taken,
1012 response_headers=err.response.headers,
1013 stream=stream,
1014 stream_cls=stream_cls,
1015 )
1016
1017 # If the response is streamed then we need to explicitly read the response
1018 # to completion before attempting to access the response text.
1019 if not err.response.is_closed:
1020 err.response.read()
1021
1022 log.debug("Re-raising status error")
1023 raise self._make_status_error_from_response(err.response) from None
1024
1025 return self._process_response(
1026 cast_to=cast_to,
1027 options=options,
1028 response=response,
1029 stream=stream,
1030 stream_cls=stream_cls,
1031 retries_taken=retries_taken,
1032 )
1033
1034 def _retry_request(
1035 self,
1036 options: FinalRequestOptions,
1037 cast_to: Type[ResponseT],
1038 *,
1039 retries_taken: int,
1040 response_headers: httpx.Headers | None,
1041 stream: bool,
1042 stream_cls: type[_StreamT] | None,
1043 ) -> ResponseT | _StreamT:
1044 remaining_retries = options.get_max_retries(self.max_retries) - retries_taken
1045 if remaining_retries == 1:
1046 log.debug("1 retry left")
1047 else:
1048 log.debug("%i retries left", remaining_retries)
1049
1050 timeout = self._calculate_retry_timeout(remaining_retries, options, response_headers)
1051 log.info("Retrying request to %s in %f seconds", options.url, timeout)
1052
1053 # In a synchronous context we are blocking the entire thread. Up to the library user to run the client in a
1054 # different thread if necessary.
1055 time.sleep(timeout)
1056
1057 return self._request(
1058 options=options,
1059 cast_to=cast_to,
1060 retries_taken=retries_taken + 1,
1061 stream=stream,
1062 stream_cls=stream_cls,
1063 )
1064
1065 def _process_response(
1066 self,
1067 *,
1068 cast_to: Type[ResponseT],
1069 options: FinalRequestOptions,
1070 response: httpx.Response,
1071 stream: bool,
1072 stream_cls: type[Stream[Any]] | type[AsyncStream[Any]] | None,
1073 retries_taken: int = 0,
1074 ) -> ResponseT:
1075 if response.request.headers.get(RAW_RESPONSE_HEADER) == "true":
1076 return cast(
1077 ResponseT,
1078 LegacyAPIResponse(
1079 raw=response,
1080 client=self,
1081 cast_to=cast_to,
1082 stream=stream,
1083 stream_cls=stream_cls,
1084 options=options,
1085 retries_taken=retries_taken,
1086 ),
1087 )
1088
1089 origin = get_origin(cast_to) or cast_to
1090
1091 if inspect.isclass(origin) and issubclass(origin, BaseAPIResponse):
1092 if not issubclass(origin, APIResponse):
1093 raise TypeError(f"API Response types must subclass {APIResponse}; Received {origin}")
1094
1095 response_cls = cast("type[BaseAPIResponse[Any]]", cast_to)
1096 return cast(
1097 ResponseT,
1098 response_cls(
1099 raw=response,
1100 client=self,
1101 cast_to=extract_response_type(response_cls),
1102 stream=stream,
1103 stream_cls=stream_cls,
1104 options=options,
1105 retries_taken=retries_taken,
1106 ),
1107 )
1108
1109 if cast_to == httpx.Response:
1110 return cast(ResponseT, response)
1111
1112 api_response = APIResponse(
1113 raw=response,
1114 client=self,
1115 cast_to=cast("type[ResponseT]", cast_to), # pyright: ignore[reportUnnecessaryCast]
1116 stream=stream,
1117 stream_cls=stream_cls,
1118 options=options,
1119 retries_taken=retries_taken,
1120 )
1121 if bool(response.request.headers.get(RAW_RESPONSE_HEADER)):
1122 return cast(ResponseT, api_response)
1123
1124 return api_response.parse()
1125
1126 def _request_api_list(
1127 self,
1128 model: Type[object],
1129 page: Type[SyncPageT],
1130 options: FinalRequestOptions,
1131 ) -> SyncPageT:
1132 def _parser(resp: SyncPageT) -> SyncPageT:
1133 resp._set_private_attributes(
1134 client=self,
1135 model=model,
1136 options=options,
1137 )
1138 return resp
1139
1140 options.post_parser = _parser
1141
1142 return self.request(page, options, stream=False)
1143
1144 @overload
1145 def get(
1146 self,
1147 path: str,
1148 *,
1149 cast_to: Type[ResponseT],
1150 options: RequestOptions = {},
1151 stream: Literal[False] = False,
1152 ) -> ResponseT: ...
1153
1154 @overload
1155 def get(
1156 self,
1157 path: str,
1158 *,
1159 cast_to: Type[ResponseT],
1160 options: RequestOptions = {},
1161 stream: Literal[True],
1162 stream_cls: type[_StreamT],
1163 ) -> _StreamT: ...
1164
1165 @overload
1166 def get(
1167 self,
1168 path: str,
1169 *,
1170 cast_to: Type[ResponseT],
1171 options: RequestOptions = {},
1172 stream: bool,
1173 stream_cls: type[_StreamT] | None = None,
1174 ) -> ResponseT | _StreamT: ...
1175
1176 def get(
1177 self,
1178 path: str,
1179 *,
1180 cast_to: Type[ResponseT],
1181 options: RequestOptions = {},
1182 stream: bool = False,
1183 stream_cls: type[_StreamT] | None = None,
1184 ) -> ResponseT | _StreamT:
1185 opts = FinalRequestOptions.construct(method="get", url=path, **options)
1186 # cast is required because mypy complains about returning Any even though
1187 # it understands the type variables
1188 return cast(ResponseT, self.request(cast_to, opts, stream=stream, stream_cls=stream_cls))
1189
1190 @overload
1191 def post(
1192 self,
1193 path: str,
1194 *,
1195 cast_to: Type[ResponseT],
1196 body: Body | None = None,
1197 options: RequestOptions = {},
1198 files: RequestFiles | None = None,
1199 stream: Literal[False] = False,
1200 ) -> ResponseT: ...
1201
1202 @overload
1203 def post(
1204 self,
1205 path: str,
1206 *,
1207 cast_to: Type[ResponseT],
1208 body: Body | None = None,
1209 options: RequestOptions = {},
1210 files: RequestFiles | None = None,
1211 stream: Literal[True],
1212 stream_cls: type[_StreamT],
1213 ) -> _StreamT: ...
1214
1215 @overload
1216 def post(
1217 self,
1218 path: str,
1219 *,
1220 cast_to: Type[ResponseT],
1221 body: Body | None = None,
1222 options: RequestOptions = {},
1223 files: RequestFiles | None = None,
1224 stream: bool,
1225 stream_cls: type[_StreamT] | None = None,
1226 ) -> ResponseT | _StreamT: ...
1227
1228 def post(
1229 self,
1230 path: str,
1231 *,
1232 cast_to: Type[ResponseT],
1233 body: Body | None = None,
1234 options: RequestOptions = {},
1235 files: RequestFiles | None = None,
1236 stream: bool = False,
1237 stream_cls: type[_StreamT] | None = None,
1238 ) -> ResponseT | _StreamT:
1239 opts = FinalRequestOptions.construct(
1240 method="post", url=path, json_data=body, files=to_httpx_files(files), **options
1241 )
1242 return cast(ResponseT, self.request(cast_to, opts, stream=stream, stream_cls=stream_cls))
1243
1244 def patch(
1245 self,
1246 path: str,
1247 *,
1248 cast_to: Type[ResponseT],
1249 body: Body | None = None,
1250 options: RequestOptions = {},
1251 ) -> ResponseT:
1252 opts = FinalRequestOptions.construct(method="patch", url=path, json_data=body, **options)
1253 return self.request(cast_to, opts)
1254
1255 def put(
1256 self,
1257 path: str,
1258 *,
1259 cast_to: Type[ResponseT],
1260 body: Body | None = None,
1261 files: RequestFiles | None = None,
1262 options: RequestOptions = {},
1263 ) -> ResponseT:
1264 opts = FinalRequestOptions.construct(
1265 method="put", url=path, json_data=body, files=to_httpx_files(files), **options
1266 )
1267 return self.request(cast_to, opts)
1268
1269 def delete(
1270 self,
1271 path: str,
1272 *,
1273 cast_to: Type[ResponseT],
1274 body: Body | None = None,
1275 options: RequestOptions = {},
1276 ) -> ResponseT:
1277 opts = FinalRequestOptions.construct(method="delete", url=path, json_data=body, **options)
1278 return self.request(cast_to, opts)
1279
1280 def get_api_list(
1281 self,
1282 path: str,
1283 *,
1284 model: Type[object],
1285 page: Type[SyncPageT],
1286 body: Body | None = None,
1287 options: RequestOptions = {},
1288 method: str = "get",
1289 ) -> SyncPageT:
1290 opts = FinalRequestOptions.construct(method=method, url=path, json_data=body, **options)
1291 return self._request_api_list(model, page, opts)
1292
1293
1294class _DefaultAsyncHttpxClient(httpx.AsyncClient):
1295 def __init__(self, **kwargs: Any) -> None:
1296 kwargs.setdefault("timeout", DEFAULT_TIMEOUT)
1297 kwargs.setdefault("limits", DEFAULT_CONNECTION_LIMITS)
1298 kwargs.setdefault("follow_redirects", True)
1299 super().__init__(**kwargs)
1300
1301
1302if TYPE_CHECKING:
1303 DefaultAsyncHttpxClient = httpx.AsyncClient
1304 """An alias to `httpx.AsyncClient` that provides the same defaults that this SDK
1305 uses internally.
1306
1307 This is useful because overriding the `http_client` with your own instance of
1308 `httpx.AsyncClient` will result in httpx's defaults being used, not ours.
1309 """
1310else:
1311 DefaultAsyncHttpxClient = _DefaultAsyncHttpxClient
1312
1313
1314class AsyncHttpxClientWrapper(DefaultAsyncHttpxClient):
1315 def __del__(self) -> None:
1316 if self.is_closed:
1317 return
1318
1319 try:
1320 # TODO(someday): support non asyncio runtimes here
1321 asyncio.get_running_loop().create_task(self.aclose())
1322 except Exception:
1323 pass
1324
1325
1326class AsyncAPIClient(BaseClient[httpx.AsyncClient, AsyncStream[Any]]):
1327 _client: httpx.AsyncClient
1328 _default_stream_cls: type[AsyncStream[Any]] | None = None
1329
1330 def __init__(
1331 self,
1332 *,
1333 version: str,
1334 base_url: str | URL,
1335 _strict_response_validation: bool,
1336 max_retries: int = DEFAULT_MAX_RETRIES,
1337 timeout: float | Timeout | None | NotGiven = NOT_GIVEN,
1338 http_client: httpx.AsyncClient | None = None,
1339 custom_headers: Mapping[str, str] | None = None,
1340 custom_query: Mapping[str, object] | None = None,
1341 ) -> None:
1342 if not is_given(timeout):
1343 # if the user passed in a custom http client with a non-default
1344 # timeout set then we use that timeout.
1345 #
1346 # note: there is an edge case here where the user passes in a client
1347 # where they've explicitly set the timeout to match the default timeout
1348 # as this check is structural, meaning that we'll think they didn't
1349 # pass in a timeout and will ignore it
1350 if http_client and http_client.timeout != HTTPX_DEFAULT_TIMEOUT:
1351 timeout = http_client.timeout
1352 else:
1353 timeout = DEFAULT_TIMEOUT
1354
1355 if http_client is not None and not isinstance(http_client, httpx.AsyncClient): # pyright: ignore[reportUnnecessaryIsInstance]
1356 raise TypeError(
1357 f"Invalid `http_client` argument; Expected an instance of `httpx.AsyncClient` but got {type(http_client)}"
1358 )
1359
1360 super().__init__(
1361 version=version,
1362 base_url=base_url,
1363 # cast to a valid type because mypy doesn't understand our type narrowing
1364 timeout=cast(Timeout, timeout),
1365 max_retries=max_retries,
1366 custom_query=custom_query,
1367 custom_headers=custom_headers,
1368 _strict_response_validation=_strict_response_validation,
1369 )
1370 self._client = http_client or AsyncHttpxClientWrapper(
1371 base_url=base_url,
1372 # cast to a valid type because mypy doesn't understand our type narrowing
1373 timeout=cast(Timeout, timeout),
1374 )
1375
1376 def is_closed(self) -> bool:
1377 return self._client.is_closed
1378
1379 async def close(self) -> None:
1380 """Close the underlying HTTPX client.
1381
1382 The client will *not* be usable after this.
1383 """
1384 await self._client.aclose()
1385
1386 async def __aenter__(self: _T) -> _T:
1387 return self
1388
1389 async def __aexit__(
1390 self,
1391 exc_type: type[BaseException] | None,
1392 exc: BaseException | None,
1393 exc_tb: TracebackType | None,
1394 ) -> None:
1395 await self.close()
1396
1397 async def _prepare_options(
1398 self,
1399 options: FinalRequestOptions, # noqa: ARG002
1400 ) -> FinalRequestOptions:
1401 """Hook for mutating the given options"""
1402 return options
1403
1404 async def _prepare_request(
1405 self,
1406 request: httpx.Request, # noqa: ARG002
1407 ) -> None:
1408 """This method is used as a callback for mutating the `Request` object
1409 after it has been constructed.
1410 This is useful for cases where you want to add certain headers based off of
1411 the request properties, e.g. `url`, `method` etc.
1412 """
1413 return None
1414
1415 @overload
1416 async def request(
1417 self,
1418 cast_to: Type[ResponseT],
1419 options: FinalRequestOptions,
1420 *,
1421 stream: Literal[False] = False,
1422 remaining_retries: Optional[int] = None,
1423 ) -> ResponseT: ...
1424
1425 @overload
1426 async def request(
1427 self,
1428 cast_to: Type[ResponseT],
1429 options: FinalRequestOptions,
1430 *,
1431 stream: Literal[True],
1432 stream_cls: type[_AsyncStreamT],
1433 remaining_retries: Optional[int] = None,
1434 ) -> _AsyncStreamT: ...
1435
1436 @overload
1437 async def request(
1438 self,
1439 cast_to: Type[ResponseT],
1440 options: FinalRequestOptions,
1441 *,
1442 stream: bool,
1443 stream_cls: type[_AsyncStreamT] | None = None,
1444 remaining_retries: Optional[int] = None,
1445 ) -> ResponseT | _AsyncStreamT: ...
1446
1447 async def request(
1448 self,
1449 cast_to: Type[ResponseT],
1450 options: FinalRequestOptions,
1451 *,
1452 stream: bool = False,
1453 stream_cls: type[_AsyncStreamT] | None = None,
1454 remaining_retries: Optional[int] = None,
1455 ) -> ResponseT | _AsyncStreamT:
1456 if remaining_retries is not None:
1457 retries_taken = options.get_max_retries(self.max_retries) - remaining_retries
1458 else:
1459 retries_taken = 0
1460
1461 return await self._request(
1462 cast_to=cast_to,
1463 options=options,
1464 stream=stream,
1465 stream_cls=stream_cls,
1466 retries_taken=retries_taken,
1467 )
1468
1469 async def _request(
1470 self,
1471 cast_to: Type[ResponseT],
1472 options: FinalRequestOptions,
1473 *,
1474 stream: bool,
1475 stream_cls: type[_AsyncStreamT] | None,
1476 retries_taken: int,
1477 ) -> ResponseT | _AsyncStreamT:
1478 if self._platform is None:
1479 # `get_platform` can make blocking IO calls so we
1480 # execute it earlier while we are in an async context
1481 self._platform = await asyncify(get_platform)()
1482
1483 # create a copy of the options we were given so that if the
1484 # options are mutated later & we then retry, the retries are
1485 # given the original options
1486 input_options = model_copy(options)
1487
1488 cast_to = self._maybe_override_cast_to(cast_to, options)
1489 options = await self._prepare_options(options)
1490
1491 remaining_retries = options.get_max_retries(self.max_retries) - retries_taken
1492 request = self._build_request(options, retries_taken=retries_taken)
1493 await self._prepare_request(request)
1494
1495 kwargs: HttpxSendArgs = {}
1496 if self.custom_auth is not None:
1497 kwargs["auth"] = self.custom_auth
1498
1499 try:
1500 response = await self._client.send(
1501 request,
1502 stream=stream or self._should_stream_response_body(request=request),
1503 **kwargs,
1504 )
1505 except httpx.TimeoutException as err:
1506 log.debug("Encountered httpx.TimeoutException", exc_info=True)
1507
1508 if remaining_retries > 0:
1509 return await self._retry_request(
1510 input_options,
1511 cast_to,
1512 retries_taken=retries_taken,
1513 stream=stream,
1514 stream_cls=stream_cls,
1515 response_headers=None,
1516 )
1517
1518 log.debug("Raising timeout error")
1519 raise APITimeoutError(request=request) from err
1520 except Exception as err:
1521 log.debug("Encountered Exception", exc_info=True)
1522
1523 if remaining_retries > 0:
1524 return await self._retry_request(
1525 input_options,
1526 cast_to,
1527 retries_taken=retries_taken,
1528 stream=stream,
1529 stream_cls=stream_cls,
1530 response_headers=None,
1531 )
1532
1533 log.debug("Raising connection error")
1534 raise APIConnectionError(request=request) from err
1535
1536 log.debug(
1537 'HTTP Request: %s %s "%i %s"', request.method, request.url, response.status_code, response.reason_phrase
1538 )
1539
1540 try:
1541 response.raise_for_status()
1542 except httpx.HTTPStatusError as err: # thrown on 4xx and 5xx status code
1543 log.debug("Encountered httpx.HTTPStatusError", exc_info=True)
1544
1545 if remaining_retries > 0 and self._should_retry(err.response):
1546 await err.response.aclose()
1547 return await self._retry_request(
1548 input_options,
1549 cast_to,
1550 retries_taken=retries_taken,
1551 response_headers=err.response.headers,
1552 stream=stream,
1553 stream_cls=stream_cls,
1554 )
1555
1556 # If the response is streamed then we need to explicitly read the response
1557 # to completion before attempting to access the response text.
1558 if not err.response.is_closed:
1559 await err.response.aread()
1560
1561 log.debug("Re-raising status error")
1562 raise self._make_status_error_from_response(err.response) from None
1563
1564 return await self._process_response(
1565 cast_to=cast_to,
1566 options=options,
1567 response=response,
1568 stream=stream,
1569 stream_cls=stream_cls,
1570 retries_taken=retries_taken,
1571 )
1572
1573 async def _retry_request(
1574 self,
1575 options: FinalRequestOptions,
1576 cast_to: Type[ResponseT],
1577 *,
1578 retries_taken: int,
1579 response_headers: httpx.Headers | None,
1580 stream: bool,
1581 stream_cls: type[_AsyncStreamT] | None,
1582 ) -> ResponseT | _AsyncStreamT:
1583 remaining_retries = options.get_max_retries(self.max_retries) - retries_taken
1584 if remaining_retries == 1:
1585 log.debug("1 retry left")
1586 else:
1587 log.debug("%i retries left", remaining_retries)
1588
1589 timeout = self._calculate_retry_timeout(remaining_retries, options, response_headers)
1590 log.info("Retrying request to %s in %f seconds", options.url, timeout)
1591
1592 await anyio.sleep(timeout)
1593
1594 return await self._request(
1595 options=options,
1596 cast_to=cast_to,
1597 retries_taken=retries_taken + 1,
1598 stream=stream,
1599 stream_cls=stream_cls,
1600 )
1601
1602 async def _process_response(
1603 self,
1604 *,
1605 cast_to: Type[ResponseT],
1606 options: FinalRequestOptions,
1607 response: httpx.Response,
1608 stream: bool,
1609 stream_cls: type[Stream[Any]] | type[AsyncStream[Any]] | None,
1610 retries_taken: int = 0,
1611 ) -> ResponseT:
1612 if response.request.headers.get(RAW_RESPONSE_HEADER) == "true":
1613 return cast(
1614 ResponseT,
1615 LegacyAPIResponse(
1616 raw=response,
1617 client=self,
1618 cast_to=cast_to,
1619 stream=stream,
1620 stream_cls=stream_cls,
1621 options=options,
1622 retries_taken=retries_taken,
1623 ),
1624 )
1625
1626 origin = get_origin(cast_to) or cast_to
1627
1628 if inspect.isclass(origin) and issubclass(origin, BaseAPIResponse):
1629 if not issubclass(origin, AsyncAPIResponse):
1630 raise TypeError(f"API Response types must subclass {AsyncAPIResponse}; Received {origin}")
1631
1632 response_cls = cast("type[BaseAPIResponse[Any]]", cast_to)
1633 return cast(
1634 "ResponseT",
1635 response_cls(
1636 raw=response,
1637 client=self,
1638 cast_to=extract_response_type(response_cls),
1639 stream=stream,
1640 stream_cls=stream_cls,
1641 options=options,
1642 retries_taken=retries_taken,
1643 ),
1644 )
1645
1646 if cast_to == httpx.Response:
1647 return cast(ResponseT, response)
1648
1649 api_response = AsyncAPIResponse(
1650 raw=response,
1651 client=self,
1652 cast_to=cast("type[ResponseT]", cast_to), # pyright: ignore[reportUnnecessaryCast]
1653 stream=stream,
1654 stream_cls=stream_cls,
1655 options=options,
1656 retries_taken=retries_taken,
1657 )
1658 if bool(response.request.headers.get(RAW_RESPONSE_HEADER)):
1659 return cast(ResponseT, api_response)
1660
1661 return await api_response.parse()
1662
1663 def _request_api_list(
1664 self,
1665 model: Type[_T],
1666 page: Type[AsyncPageT],
1667 options: FinalRequestOptions,
1668 ) -> AsyncPaginator[_T, AsyncPageT]:
1669 return AsyncPaginator(client=self, options=options, page_cls=page, model=model)
1670
1671 @overload
1672 async def get(
1673 self,
1674 path: str,
1675 *,
1676 cast_to: Type[ResponseT],
1677 options: RequestOptions = {},
1678 stream: Literal[False] = False,
1679 ) -> ResponseT: ...
1680
1681 @overload
1682 async def get(
1683 self,
1684 path: str,
1685 *,
1686 cast_to: Type[ResponseT],
1687 options: RequestOptions = {},
1688 stream: Literal[True],
1689 stream_cls: type[_AsyncStreamT],
1690 ) -> _AsyncStreamT: ...
1691
1692 @overload
1693 async def get(
1694 self,
1695 path: str,
1696 *,
1697 cast_to: Type[ResponseT],
1698 options: RequestOptions = {},
1699 stream: bool,
1700 stream_cls: type[_AsyncStreamT] | None = None,
1701 ) -> ResponseT | _AsyncStreamT: ...
1702
1703 async def get(
1704 self,
1705 path: str,
1706 *,
1707 cast_to: Type[ResponseT],
1708 options: RequestOptions = {},
1709 stream: bool = False,
1710 stream_cls: type[_AsyncStreamT] | None = None,
1711 ) -> ResponseT | _AsyncStreamT:
1712 opts = FinalRequestOptions.construct(method="get", url=path, **options)
1713 return await self.request(cast_to, opts, stream=stream, stream_cls=stream_cls)
1714
1715 @overload
1716 async def post(
1717 self,
1718 path: str,
1719 *,
1720 cast_to: Type[ResponseT],
1721 body: Body | None = None,
1722 files: RequestFiles | None = None,
1723 options: RequestOptions = {},
1724 stream: Literal[False] = False,
1725 ) -> ResponseT: ...
1726
1727 @overload
1728 async def post(
1729 self,
1730 path: str,
1731 *,
1732 cast_to: Type[ResponseT],
1733 body: Body | None = None,
1734 files: RequestFiles | None = None,
1735 options: RequestOptions = {},
1736 stream: Literal[True],
1737 stream_cls: type[_AsyncStreamT],
1738 ) -> _AsyncStreamT: ...
1739
1740 @overload
1741 async def post(
1742 self,
1743 path: str,
1744 *,
1745 cast_to: Type[ResponseT],
1746 body: Body | None = None,
1747 files: RequestFiles | None = None,
1748 options: RequestOptions = {},
1749 stream: bool,
1750 stream_cls: type[_AsyncStreamT] | None = None,
1751 ) -> ResponseT | _AsyncStreamT: ...
1752
1753 async def post(
1754 self,
1755 path: str,
1756 *,
1757 cast_to: Type[ResponseT],
1758 body: Body | None = None,
1759 files: RequestFiles | None = None,
1760 options: RequestOptions = {},
1761 stream: bool = False,
1762 stream_cls: type[_AsyncStreamT] | None = None,
1763 ) -> ResponseT | _AsyncStreamT:
1764 opts = FinalRequestOptions.construct(
1765 method="post", url=path, json_data=body, files=await async_to_httpx_files(files), **options
1766 )
1767 return await self.request(cast_to, opts, stream=stream, stream_cls=stream_cls)
1768
1769 async def patch(
1770 self,
1771 path: str,
1772 *,
1773 cast_to: Type[ResponseT],
1774 body: Body | None = None,
1775 options: RequestOptions = {},
1776 ) -> ResponseT:
1777 opts = FinalRequestOptions.construct(method="patch", url=path, json_data=body, **options)
1778 return await self.request(cast_to, opts)
1779
1780 async def put(
1781 self,
1782 path: str,
1783 *,
1784 cast_to: Type[ResponseT],
1785 body: Body | None = None,
1786 files: RequestFiles | None = None,
1787 options: RequestOptions = {},
1788 ) -> ResponseT:
1789 opts = FinalRequestOptions.construct(
1790 method="put", url=path, json_data=body, files=await async_to_httpx_files(files), **options
1791 )
1792 return await self.request(cast_to, opts)
1793
1794 async def delete(
1795 self,
1796 path: str,
1797 *,
1798 cast_to: Type[ResponseT],
1799 body: Body | None = None,
1800 options: RequestOptions = {},
1801 ) -> ResponseT:
1802 opts = FinalRequestOptions.construct(method="delete", url=path, json_data=body, **options)
1803 return await self.request(cast_to, opts)
1804
1805 def get_api_list(
1806 self,
1807 path: str,
1808 *,
1809 model: Type[_T],
1810 page: Type[AsyncPageT],
1811 body: Body | None = None,
1812 options: RequestOptions = {},
1813 method: str = "get",
1814 ) -> AsyncPaginator[_T, AsyncPageT]:
1815 opts = FinalRequestOptions.construct(method=method, url=path, json_data=body, **options)
1816 return self._request_api_list(model, page, opts)
1817
1818
1819def make_request_options(
1820 *,
1821 query: Query | None = None,
1822 extra_headers: Headers | None = None,
1823 extra_query: Query | None = None,
1824 extra_body: Body | None = None,
1825 idempotency_key: str | None = None,
1826 timeout: float | httpx.Timeout | None | NotGiven = NOT_GIVEN,
1827 post_parser: PostParser | NotGiven = NOT_GIVEN,
1828) -> RequestOptions:
1829 """Create a dict of type RequestOptions without keys of NotGiven values."""
1830 options: RequestOptions = {}
1831 if extra_headers is not None:
1832 options["headers"] = extra_headers
1833
1834 if extra_body is not None:
1835 options["extra_json"] = cast(AnyMapping, extra_body)
1836
1837 if query is not None:
1838 options["params"] = query
1839
1840 if extra_query is not None:
1841 options["params"] = {**options.get("params", {}), **extra_query}
1842
1843 if not isinstance(timeout, NotGiven):
1844 options["timeout"] = timeout
1845
1846 if idempotency_key is not None:
1847 options["idempotency_key"] = idempotency_key
1848
1849 if is_given(post_parser):
1850 # internal
1851 options["post_parser"] = post_parser # type: ignore
1852
1853 return options
1854
1855
1856class ForceMultipartDict(Dict[str, None]):
1857 def __bool__(self) -> bool:
1858 return True
1859
1860
1861class OtherPlatform:
1862 def __init__(self, name: str) -> None:
1863 self.name = name
1864
1865 @override
1866 def __str__(self) -> str:
1867 return f"Other:{self.name}"
1868
1869
1870Platform = Union[
1871 OtherPlatform,
1872 Literal[
1873 "MacOS",
1874 "Linux",
1875 "Windows",
1876 "FreeBSD",
1877 "OpenBSD",
1878 "iOS",
1879 "Android",
1880 "Unknown",
1881 ],
1882]
1883
1884
1885def get_platform() -> Platform:
1886 try:
1887 system = platform.system().lower()
1888 platform_name = platform.platform().lower()
1889 except Exception:
1890 return "Unknown"
1891
1892 if "iphone" in platform_name or "ipad" in platform_name:
1893 # Tested using Python3IDE on an iPhone 11 and Pythonista on an iPad 7
1894 # system is Darwin and platform_name is a string like:
1895 # - Darwin-21.6.0-iPhone12,1-64bit
1896 # - Darwin-21.6.0-iPad7,11-64bit
1897 return "iOS"
1898
1899 if system == "darwin":
1900 return "MacOS"
1901
1902 if system == "windows":
1903 return "Windows"
1904
1905 if "android" in platform_name:
1906 # Tested using Pydroid 3
1907 # system is Linux and platform_name is a string like 'Linux-5.10.81-android12-9-00001-geba40aecb3b7-ab8534902-aarch64-with-libc'
1908 return "Android"
1909
1910 if system == "linux":
1911 # https://distro.readthedocs.io/en/latest/#distro.id
1912 distro_id = distro.id()
1913 if distro_id == "freebsd":
1914 return "FreeBSD"
1915
1916 if distro_id == "openbsd":
1917 return "OpenBSD"
1918
1919 return "Linux"
1920
1921 if platform_name:
1922 return OtherPlatform(platform_name)
1923
1924 return "Unknown"
1925
1926
1927@lru_cache(maxsize=None)
1928def platform_headers(version: str, *, platform: Platform | None) -> Dict[str, str]:
1929 return {
1930 "X-Stainless-Lang": "python",
1931 "X-Stainless-Package-Version": version,
1932 "X-Stainless-OS": str(platform or get_platform()),
1933 "X-Stainless-Arch": str(get_architecture()),
1934 "X-Stainless-Runtime": get_python_runtime(),
1935 "X-Stainless-Runtime-Version": get_python_version(),
1936 }
1937
1938
1939class OtherArch:
1940 def __init__(self, name: str) -> None:
1941 self.name = name
1942
1943 @override
1944 def __str__(self) -> str:
1945 return f"other:{self.name}"
1946
1947
1948Arch = Union[OtherArch, Literal["x32", "x64", "arm", "arm64", "unknown"]]
1949
1950
1951def get_python_runtime() -> str:
1952 try:
1953 return platform.python_implementation()
1954 except Exception:
1955 return "unknown"
1956
1957
1958def get_python_version() -> str:
1959 try:
1960 return platform.python_version()
1961 except Exception:
1962 return "unknown"
1963
1964
1965def get_architecture() -> Arch:
1966 try:
1967 machine = platform.machine().lower()
1968 except Exception:
1969 return "unknown"
1970
1971 if machine in ("arm64", "aarch64"):
1972 return "arm64"
1973
1974 # TODO: untested
1975 if machine == "arm":
1976 return "arm"
1977
1978 if machine == "x86_64":
1979 return "x64"
1980
1981 # TODO: untested
1982 if sys.maxsize <= 2**32:
1983 return "x32"
1984
1985 if machine:
1986 return OtherArch(machine)
1987
1988 return "unknown"
1989
1990
1991def _merge_mappings(
1992 obj1: Mapping[_T_co, Union[_T, Omit]],
1993 obj2: Mapping[_T_co, Union[_T, Omit]],
1994) -> Dict[_T_co, _T]:
1995 """Merge two mappings of the same type, removing any values that are instances of `Omit`.
1996
1997 In cases with duplicate keys the second mapping takes precedence.
1998 """
1999 merged = {**obj1, **obj2}
2000 return {key: value for key, value in merged.items() if not isinstance(value, Omit)}
2001