openai/openai-python

Public

mirrored from https://github.com/openai/openai-pythonAvailable

CodeCommitsIssuesPull requestsActionsInsightsSecurity
v1.92.1

Branches

Tags

  • No tags available.
0Branches0Tags
Go to file
Add file
Code

Clone

HTTPS

Download ZIP

src/openai/_base_client.py

2017lines · modecode

1from __future__ import annotations
2
3import sys
4import json
5import time
6import uuid
7import email
8import asyncio
9import inspect
10import logging
11import platform
12import email.utils
13from types import TracebackType
14from random import random
15from typing import (
16 TYPE_CHECKING,
17 Any,
18 Dict,
19 Type,
20 Union,
21 Generic,
22 Mapping,
23 TypeVar,
24 Iterable,
25 Iterator,
26 Optional,
27 Generator,
28 AsyncIterator,
29 cast,
30 overload,
31)
32from typing_extensions import Literal, override, get_origin
33
34import anyio
35import httpx
36import distro
37import pydantic
38from httpx import URL
39from pydantic import PrivateAttr
40
41from . import _exceptions
42from ._qs import Querystring
43from ._files import to_httpx_files, async_to_httpx_files
44from ._types import (
45 NOT_GIVEN,
46 Body,
47 Omit,
48 Query,
49 Headers,
50 Timeout,
51 NotGiven,
52 ResponseT,
53 AnyMapping,
54 PostParser,
55 RequestFiles,
56 HttpxSendArgs,
57 RequestOptions,
58 HttpxRequestFiles,
59 ModelBuilderProtocol,
60)
61from ._utils import SensitiveHeadersFilter, is_dict, is_list, asyncify, is_given, lru_cache, is_mapping
62from ._compat import PYDANTIC_V2, model_copy, model_dump
63from ._models import GenericModel, FinalRequestOptions, validate_type, construct_type
64from ._response import (
65 APIResponse,
66 BaseAPIResponse,
67 AsyncAPIResponse,
68 extract_response_type,
69)
70from ._constants import (
71 DEFAULT_TIMEOUT,
72 MAX_RETRY_DELAY,
73 DEFAULT_MAX_RETRIES,
74 INITIAL_RETRY_DELAY,
75 RAW_RESPONSE_HEADER,
76 OVERRIDE_CAST_TO_HEADER,
77 DEFAULT_CONNECTION_LIMITS,
78)
79from ._streaming import Stream, SSEDecoder, AsyncStream, SSEBytesDecoder
80from ._exceptions import (
81 APIStatusError,
82 APITimeoutError,
83 APIConnectionError,
84 APIResponseValidationError,
85)
86from ._legacy_response import LegacyAPIResponse
87
88log: logging.Logger = logging.getLogger(__name__)
89log.addFilter(SensitiveHeadersFilter())
90
91# TODO: make base page type vars covariant
92SyncPageT = TypeVar("SyncPageT", bound="BaseSyncPage[Any]")
93AsyncPageT = TypeVar("AsyncPageT", bound="BaseAsyncPage[Any]")
94
95
96_T = TypeVar("_T")
97_T_co = TypeVar("_T_co", covariant=True)
98
99_StreamT = TypeVar("_StreamT", bound=Stream[Any])
100_AsyncStreamT = TypeVar("_AsyncStreamT", bound=AsyncStream[Any])
101
102if TYPE_CHECKING:
103 from httpx._config import (
104 DEFAULT_TIMEOUT_CONFIG, # pyright: ignore[reportPrivateImportUsage]
105 )
106
107 HTTPX_DEFAULT_TIMEOUT = DEFAULT_TIMEOUT_CONFIG
108else:
109 try:
110 from httpx._config import DEFAULT_TIMEOUT_CONFIG as HTTPX_DEFAULT_TIMEOUT
111 except ImportError:
112 # taken from https://github.com/encode/httpx/blob/3ba5fe0d7ac70222590e759c31442b1cab263791/httpx/_config.py#L366
113 HTTPX_DEFAULT_TIMEOUT = Timeout(5.0)
114
115
116class PageInfo:
117 """Stores the necessary information to build the request to retrieve the next page.
118
119 Either `url` or `params` must be set.
120 """
121
122 url: URL | NotGiven
123 params: Query | NotGiven
124 json: Body | NotGiven
125
126 @overload
127 def __init__(
128 self,
129 *,
130 url: URL,
131 ) -> None: ...
132
133 @overload
134 def __init__(
135 self,
136 *,
137 params: Query,
138 ) -> None: ...
139
140 @overload
141 def __init__(
142 self,
143 *,
144 json: Body,
145 ) -> None: ...
146
147 def __init__(
148 self,
149 *,
150 url: URL | NotGiven = NOT_GIVEN,
151 json: Body | NotGiven = NOT_GIVEN,
152 params: Query | NotGiven = NOT_GIVEN,
153 ) -> None:
154 self.url = url
155 self.json = json
156 self.params = params
157
158 @override
159 def __repr__(self) -> str:
160 if self.url:
161 return f"{self.__class__.__name__}(url={self.url})"
162 if self.json:
163 return f"{self.__class__.__name__}(json={self.json})"
164 return f"{self.__class__.__name__}(params={self.params})"
165
166
167class BasePage(GenericModel, Generic[_T]):
168 """
169 Defines the core interface for pagination.
170
171 Type Args:
172 ModelT: The pydantic model that represents an item in the response.
173
174 Methods:
175 has_next_page(): Check if there is another page available
176 next_page_info(): Get the necessary information to make a request for the next page
177 """
178
179 _options: FinalRequestOptions = PrivateAttr()
180 _model: Type[_T] = PrivateAttr()
181
182 def has_next_page(self) -> bool:
183 items = self._get_page_items()
184 if not items:
185 return False
186 return self.next_page_info() is not None
187
188 def next_page_info(self) -> Optional[PageInfo]: ...
189
190 def _get_page_items(self) -> Iterable[_T]: # type: ignore[empty-body]
191 ...
192
193 def _params_from_url(self, url: URL) -> httpx.QueryParams:
194 # TODO: do we have to preprocess params here?
195 return httpx.QueryParams(cast(Any, self._options.params)).merge(url.params)
196
197 def _info_to_options(self, info: PageInfo) -> FinalRequestOptions:
198 options = model_copy(self._options)
199 options._strip_raw_response_header()
200
201 if not isinstance(info.params, NotGiven):
202 options.params = {**options.params, **info.params}
203 return options
204
205 if not isinstance(info.url, NotGiven):
206 params = self._params_from_url(info.url)
207 url = info.url.copy_with(params=params)
208 options.params = dict(url.params)
209 options.url = str(url)
210 return options
211
212 if not isinstance(info.json, NotGiven):
213 if not is_mapping(info.json):
214 raise TypeError("Pagination is only supported with mappings")
215
216 if not options.json_data:
217 options.json_data = {**info.json}
218 else:
219 if not is_mapping(options.json_data):
220 raise TypeError("Pagination is only supported with mappings")
221
222 options.json_data = {**options.json_data, **info.json}
223 return options
224
225 raise ValueError("Unexpected PageInfo state")
226
227
228class BaseSyncPage(BasePage[_T], Generic[_T]):
229 _client: SyncAPIClient = pydantic.PrivateAttr()
230
231 def _set_private_attributes(
232 self,
233 client: SyncAPIClient,
234 model: Type[_T],
235 options: FinalRequestOptions,
236 ) -> None:
237 if PYDANTIC_V2 and getattr(self, "__pydantic_private__", None) is None:
238 self.__pydantic_private__ = {}
239
240 self._model = model
241 self._client = client
242 self._options = options
243
244 # Pydantic uses a custom `__iter__` method to support casting BaseModels
245 # to dictionaries. e.g. dict(model).
246 # As we want to support `for item in page`, this is inherently incompatible
247 # with the default pydantic behaviour. It is not possible to support both
248 # use cases at once. Fortunately, this is not a big deal as all other pydantic
249 # methods should continue to work as expected as there is an alternative method
250 # to cast a model to a dictionary, model.dict(), which is used internally
251 # by pydantic.
252 def __iter__(self) -> Iterator[_T]: # type: ignore
253 for page in self.iter_pages():
254 for item in page._get_page_items():
255 yield item
256
257 def iter_pages(self: SyncPageT) -> Iterator[SyncPageT]:
258 page = self
259 while True:
260 yield page
261 if page.has_next_page():
262 page = page.get_next_page()
263 else:
264 return
265
266 def get_next_page(self: SyncPageT) -> SyncPageT:
267 info = self.next_page_info()
268 if not info:
269 raise RuntimeError(
270 "No next page expected; please check `.has_next_page()` before calling `.get_next_page()`."
271 )
272
273 options = self._info_to_options(info)
274 return self._client._request_api_list(self._model, page=self.__class__, options=options)
275
276
277class AsyncPaginator(Generic[_T, AsyncPageT]):
278 def __init__(
279 self,
280 client: AsyncAPIClient,
281 options: FinalRequestOptions,
282 page_cls: Type[AsyncPageT],
283 model: Type[_T],
284 ) -> None:
285 self._model = model
286 self._client = client
287 self._options = options
288 self._page_cls = page_cls
289
290 def __await__(self) -> Generator[Any, None, AsyncPageT]:
291 return self._get_page().__await__()
292
293 async def _get_page(self) -> AsyncPageT:
294 def _parser(resp: AsyncPageT) -> AsyncPageT:
295 resp._set_private_attributes(
296 model=self._model,
297 options=self._options,
298 client=self._client,
299 )
300 return resp
301
302 self._options.post_parser = _parser
303
304 return await self._client.request(self._page_cls, self._options)
305
306 async def __aiter__(self) -> AsyncIterator[_T]:
307 # https://github.com/microsoft/pyright/issues/3464
308 page = cast(
309 AsyncPageT,
310 await self, # type: ignore
311 )
312 async for item in page:
313 yield item
314
315
316class BaseAsyncPage(BasePage[_T], Generic[_T]):
317 _client: AsyncAPIClient = pydantic.PrivateAttr()
318
319 def _set_private_attributes(
320 self,
321 model: Type[_T],
322 client: AsyncAPIClient,
323 options: FinalRequestOptions,
324 ) -> None:
325 if PYDANTIC_V2 and getattr(self, "__pydantic_private__", None) is None:
326 self.__pydantic_private__ = {}
327
328 self._model = model
329 self._client = client
330 self._options = options
331
332 async def __aiter__(self) -> AsyncIterator[_T]:
333 async for page in self.iter_pages():
334 for item in page._get_page_items():
335 yield item
336
337 async def iter_pages(self: AsyncPageT) -> AsyncIterator[AsyncPageT]:
338 page = self
339 while True:
340 yield page
341 if page.has_next_page():
342 page = await page.get_next_page()
343 else:
344 return
345
346 async def get_next_page(self: AsyncPageT) -> AsyncPageT:
347 info = self.next_page_info()
348 if not info:
349 raise RuntimeError(
350 "No next page expected; please check `.has_next_page()` before calling `.get_next_page()`."
351 )
352
353 options = self._info_to_options(info)
354 return await self._client._request_api_list(self._model, page=self.__class__, options=options)
355
356
357_HttpxClientT = TypeVar("_HttpxClientT", bound=Union[httpx.Client, httpx.AsyncClient])
358_DefaultStreamT = TypeVar("_DefaultStreamT", bound=Union[Stream[Any], AsyncStream[Any]])
359
360
361class BaseClient(Generic[_HttpxClientT, _DefaultStreamT]):
362 _client: _HttpxClientT
363 _version: str
364 _base_url: URL
365 max_retries: int
366 timeout: Union[float, Timeout, None]
367 _strict_response_validation: bool
368 _idempotency_header: str | None
369 _default_stream_cls: type[_DefaultStreamT] | None = None
370
371 def __init__(
372 self,
373 *,
374 version: str,
375 base_url: str | URL,
376 _strict_response_validation: bool,
377 max_retries: int = DEFAULT_MAX_RETRIES,
378 timeout: float | Timeout | None = DEFAULT_TIMEOUT,
379 custom_headers: Mapping[str, str] | None = None,
380 custom_query: Mapping[str, object] | None = None,
381 ) -> None:
382 self._version = version
383 self._base_url = self._enforce_trailing_slash(URL(base_url))
384 self.max_retries = max_retries
385 self.timeout = timeout
386 self._custom_headers = custom_headers or {}
387 self._custom_query = custom_query or {}
388 self._strict_response_validation = _strict_response_validation
389 self._idempotency_header = None
390 self._platform: Platform | None = None
391
392 if max_retries is None: # pyright: ignore[reportUnnecessaryComparison]
393 raise TypeError(
394 "max_retries cannot be None. If you want to disable retries, pass `0`; if you want unlimited retries, pass `math.inf` or a very high number; if you want the default behavior, pass `openai.DEFAULT_MAX_RETRIES`"
395 )
396
397 def _enforce_trailing_slash(self, url: URL) -> URL:
398 if url.raw_path.endswith(b"/"):
399 return url
400 return url.copy_with(raw_path=url.raw_path + b"/")
401
402 def _make_status_error_from_response(
403 self,
404 response: httpx.Response,
405 ) -> APIStatusError:
406 if response.is_closed and not response.is_stream_consumed:
407 # We can't read the response body as it has been closed
408 # before it was read. This can happen if an event hook
409 # raises a status error.
410 body = None
411 err_msg = f"Error code: {response.status_code}"
412 else:
413 err_text = response.text.strip()
414 body = err_text
415
416 try:
417 body = json.loads(err_text)
418 err_msg = f"Error code: {response.status_code} - {body}"
419 except Exception:
420 err_msg = err_text or f"Error code: {response.status_code}"
421
422 return self._make_status_error(err_msg, body=body, response=response)
423
424 def _make_status_error(
425 self,
426 err_msg: str,
427 *,
428 body: object,
429 response: httpx.Response,
430 ) -> _exceptions.APIStatusError:
431 raise NotImplementedError()
432
433 def _build_headers(self, options: FinalRequestOptions, *, retries_taken: int = 0) -> httpx.Headers:
434 custom_headers = options.headers or {}
435 headers_dict = _merge_mappings(self.default_headers, custom_headers)
436 self._validate_headers(headers_dict, custom_headers)
437
438 # headers are case-insensitive while dictionaries are not.
439 headers = httpx.Headers(headers_dict)
440
441 idempotency_header = self._idempotency_header
442 if idempotency_header and options.idempotency_key and idempotency_header not in headers:
443 headers[idempotency_header] = options.idempotency_key
444
445 # Don't set these headers if they were already set or removed by the caller. We check
446 # `custom_headers`, which can contain `Omit()`, instead of `headers` to account for the removal case.
447 lower_custom_headers = [header.lower() for header in custom_headers]
448 if "x-stainless-retry-count" not in lower_custom_headers:
449 headers["x-stainless-retry-count"] = str(retries_taken)
450 if "x-stainless-read-timeout" not in lower_custom_headers:
451 timeout = self.timeout if isinstance(options.timeout, NotGiven) else options.timeout
452 if isinstance(timeout, Timeout):
453 timeout = timeout.read
454 if timeout is not None:
455 headers["x-stainless-read-timeout"] = str(timeout)
456
457 return headers
458
459 def _prepare_url(self, url: str) -> URL:
460 """
461 Merge a URL argument together with any 'base_url' on the client,
462 to create the URL used for the outgoing request.
463 """
464 # Copied from httpx's `_merge_url` method.
465 merge_url = URL(url)
466 if merge_url.is_relative_url:
467 merge_raw_path = self.base_url.raw_path + merge_url.raw_path.lstrip(b"/")
468 return self.base_url.copy_with(raw_path=merge_raw_path)
469
470 return merge_url
471
472 def _make_sse_decoder(self) -> SSEDecoder | SSEBytesDecoder:
473 return SSEDecoder()
474
475 def _build_request(
476 self,
477 options: FinalRequestOptions,
478 *,
479 retries_taken: int = 0,
480 ) -> httpx.Request:
481 if log.isEnabledFor(logging.DEBUG):
482 log.debug("Request options: %s", model_dump(options, exclude_unset=True))
483
484 kwargs: dict[str, Any] = {}
485
486 json_data = options.json_data
487 if options.extra_json is not None:
488 if json_data is None:
489 json_data = cast(Body, options.extra_json)
490 elif is_mapping(json_data):
491 json_data = _merge_mappings(json_data, options.extra_json)
492 else:
493 raise RuntimeError(f"Unexpected JSON data type, {type(json_data)}, cannot merge with `extra_body`")
494
495 headers = self._build_headers(options, retries_taken=retries_taken)
496 params = _merge_mappings(self.default_query, options.params)
497 content_type = headers.get("Content-Type")
498 files = options.files
499
500 # If the given Content-Type header is multipart/form-data then it
501 # has to be removed so that httpx can generate the header with
502 # additional information for us as it has to be in this form
503 # for the server to be able to correctly parse the request:
504 # multipart/form-data; boundary=---abc--
505 if content_type is not None and content_type.startswith("multipart/form-data"):
506 if "boundary" not in content_type:
507 # only remove the header if the boundary hasn't been explicitly set
508 # as the caller doesn't want httpx to come up with their own boundary
509 headers.pop("Content-Type")
510
511 # As we are now sending multipart/form-data instead of application/json
512 # we need to tell httpx to use it, https://www.python-httpx.org/advanced/clients/#multipart-file-encoding
513 if json_data:
514 if not is_dict(json_data):
515 raise TypeError(
516 f"Expected query input to be a dictionary for multipart requests but got {type(json_data)} instead."
517 )
518 kwargs["data"] = self._serialize_multipartform(json_data)
519
520 # httpx determines whether or not to send a "multipart/form-data"
521 # request based on the truthiness of the "files" argument.
522 # This gets around that issue by generating a dict value that
523 # evaluates to true.
524 #
525 # https://github.com/encode/httpx/discussions/2399#discussioncomment-3814186
526 if not files:
527 files = cast(HttpxRequestFiles, ForceMultipartDict())
528
529 prepared_url = self._prepare_url(options.url)
530 if "_" in prepared_url.host:
531 # work around https://github.com/encode/httpx/discussions/2880
532 kwargs["extensions"] = {"sni_hostname": prepared_url.host.replace("_", "-")}
533
534 # TODO: report this error to httpx
535 return self._client.build_request( # pyright: ignore[reportUnknownMemberType]
536 headers=headers,
537 timeout=self.timeout if isinstance(options.timeout, NotGiven) else options.timeout,
538 method=options.method,
539 url=prepared_url,
540 # the `Query` type that we use is incompatible with qs'
541 # `Params` type as it needs to be typed as `Mapping[str, object]`
542 # so that passing a `TypedDict` doesn't cause an error.
543 # https://github.com/microsoft/pyright/issues/3526#event-6715453066
544 params=self.qs.stringify(cast(Mapping[str, Any], params)) if params else None,
545 json=json_data if is_given(json_data) else None,
546 files=files,
547 **kwargs,
548 )
549
550 def _serialize_multipartform(self, data: Mapping[object, object]) -> dict[str, object]:
551 items = self.qs.stringify_items(
552 # TODO: type ignore is required as stringify_items is well typed but we can't be
553 # well typed without heavy validation.
554 data, # type: ignore
555 array_format="brackets",
556 )
557 serialized: dict[str, object] = {}
558 for key, value in items:
559 existing = serialized.get(key)
560
561 if not existing:
562 serialized[key] = value
563 continue
564
565 # If a value has already been set for this key then that
566 # means we're sending data like `array[]=[1, 2, 3]` and we
567 # need to tell httpx that we want to send multiple values with
568 # the same key which is done by using a list or a tuple.
569 #
570 # Note: 2d arrays should never result in the same key at both
571 # levels so it's safe to assume that if the value is a list,
572 # it was because we changed it to be a list.
573 if is_list(existing):
574 existing.append(value)
575 else:
576 serialized[key] = [existing, value]
577
578 return serialized
579
580 def _maybe_override_cast_to(self, cast_to: type[ResponseT], options: FinalRequestOptions) -> type[ResponseT]:
581 if not is_given(options.headers):
582 return cast_to
583
584 # make a copy of the headers so we don't mutate user-input
585 headers = dict(options.headers)
586
587 # we internally support defining a temporary header to override the
588 # default `cast_to` type for use with `.with_raw_response` and `.with_streaming_response`
589 # see _response.py for implementation details
590 override_cast_to = headers.pop(OVERRIDE_CAST_TO_HEADER, NOT_GIVEN)
591 if is_given(override_cast_to):
592 options.headers = headers
593 return cast(Type[ResponseT], override_cast_to)
594
595 return cast_to
596
597 def _should_stream_response_body(self, request: httpx.Request) -> bool:
598 return request.headers.get(RAW_RESPONSE_HEADER) == "stream" # type: ignore[no-any-return]
599
600 def _process_response_data(
601 self,
602 *,
603 data: object,
604 cast_to: type[ResponseT],
605 response: httpx.Response,
606 ) -> ResponseT:
607 if data is None:
608 return cast(ResponseT, None)
609
610 if cast_to is object:
611 return cast(ResponseT, data)
612
613 try:
614 if inspect.isclass(cast_to) and issubclass(cast_to, ModelBuilderProtocol):
615 return cast(ResponseT, cast_to.build(response=response, data=data))
616
617 if self._strict_response_validation:
618 return cast(ResponseT, validate_type(type_=cast_to, value=data))
619
620 return cast(ResponseT, construct_type(type_=cast_to, value=data))
621 except pydantic.ValidationError as err:
622 raise APIResponseValidationError(response=response, body=data) from err
623
624 @property
625 def qs(self) -> Querystring:
626 return Querystring()
627
628 @property
629 def custom_auth(self) -> httpx.Auth | None:
630 return None
631
632 @property
633 def auth_headers(self) -> dict[str, str]:
634 return {}
635
636 @property
637 def default_headers(self) -> dict[str, str | Omit]:
638 return {
639 "Accept": "application/json",
640 "Content-Type": "application/json",
641 "User-Agent": self.user_agent,
642 **self.platform_headers(),
643 **self.auth_headers,
644 **self._custom_headers,
645 }
646
647 @property
648 def default_query(self) -> dict[str, object]:
649 return {
650 **self._custom_query,
651 }
652
653 def _validate_headers(
654 self,
655 headers: Headers, # noqa: ARG002
656 custom_headers: Headers, # noqa: ARG002
657 ) -> None:
658 """Validate the given default headers and custom headers.
659
660 Does nothing by default.
661 """
662 return
663
664 @property
665 def user_agent(self) -> str:
666 return f"{self.__class__.__name__}/Python {self._version}"
667
668 @property
669 def base_url(self) -> URL:
670 return self._base_url
671
672 @base_url.setter
673 def base_url(self, url: URL | str) -> None:
674 self._base_url = self._enforce_trailing_slash(url if isinstance(url, URL) else URL(url))
675
676 def platform_headers(self) -> Dict[str, str]:
677 # the actual implementation is in a separate `lru_cache` decorated
678 # function because adding `lru_cache` to methods will leak memory
679 # https://github.com/python/cpython/issues/88476
680 return platform_headers(self._version, platform=self._platform)
681
682 def _parse_retry_after_header(self, response_headers: Optional[httpx.Headers] = None) -> float | None:
683 """Returns a float of the number of seconds (not milliseconds) to wait after retrying, or None if unspecified.
684
685 About the Retry-After header: https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Retry-After
686 See also https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Retry-After#syntax
687 """
688 if response_headers is None:
689 return None
690
691 # First, try the non-standard `retry-after-ms` header for milliseconds,
692 # which is more precise than integer-seconds `retry-after`
693 try:
694 retry_ms_header = response_headers.get("retry-after-ms", None)
695 return float(retry_ms_header) / 1000
696 except (TypeError, ValueError):
697 pass
698
699 # Next, try parsing `retry-after` header as seconds (allowing nonstandard floats).
700 retry_header = response_headers.get("retry-after")
701 try:
702 # note: the spec indicates that this should only ever be an integer
703 # but if someone sends a float there's no reason for us to not respect it
704 return float(retry_header)
705 except (TypeError, ValueError):
706 pass
707
708 # Last, try parsing `retry-after` as a date.
709 retry_date_tuple = email.utils.parsedate_tz(retry_header)
710 if retry_date_tuple is None:
711 return None
712
713 retry_date = email.utils.mktime_tz(retry_date_tuple)
714 return float(retry_date - time.time())
715
716 def _calculate_retry_timeout(
717 self,
718 remaining_retries: int,
719 options: FinalRequestOptions,
720 response_headers: Optional[httpx.Headers] = None,
721 ) -> float:
722 max_retries = options.get_max_retries(self.max_retries)
723
724 # If the API asks us to wait a certain amount of time (and it's a reasonable amount), just do what it says.
725 retry_after = self._parse_retry_after_header(response_headers)
726 if retry_after is not None and 0 < retry_after <= 60:
727 return retry_after
728
729 # Also cap retry count to 1000 to avoid any potential overflows with `pow`
730 nb_retries = min(max_retries - remaining_retries, 1000)
731
732 # Apply exponential backoff, but not more than the max.
733 sleep_seconds = min(INITIAL_RETRY_DELAY * pow(2.0, nb_retries), MAX_RETRY_DELAY)
734
735 # Apply some jitter, plus-or-minus half a second.
736 jitter = 1 - 0.25 * random()
737 timeout = sleep_seconds * jitter
738 return timeout if timeout >= 0 else 0
739
740 def _should_retry(self, response: httpx.Response) -> bool:
741 # Note: this is not a standard header
742 should_retry_header = response.headers.get("x-should-retry")
743
744 # If the server explicitly says whether or not to retry, obey.
745 if should_retry_header == "true":
746 log.debug("Retrying as header `x-should-retry` is set to `true`")
747 return True
748 if should_retry_header == "false":
749 log.debug("Not retrying as header `x-should-retry` is set to `false`")
750 return False
751
752 # Retry on request timeouts.
753 if response.status_code == 408:
754 log.debug("Retrying due to status code %i", response.status_code)
755 return True
756
757 # Retry on lock timeouts.
758 if response.status_code == 409:
759 log.debug("Retrying due to status code %i", response.status_code)
760 return True
761
762 # Retry on rate limits.
763 if response.status_code == 429:
764 log.debug("Retrying due to status code %i", response.status_code)
765 return True
766
767 # Retry internal errors.
768 if response.status_code >= 500:
769 log.debug("Retrying due to status code %i", response.status_code)
770 return True
771
772 log.debug("Not retrying")
773 return False
774
775 def _idempotency_key(self) -> str:
776 return f"stainless-python-retry-{uuid.uuid4()}"
777
778
779class _DefaultHttpxClient(httpx.Client):
780 def __init__(self, **kwargs: Any) -> None:
781 kwargs.setdefault("timeout", DEFAULT_TIMEOUT)
782 kwargs.setdefault("limits", DEFAULT_CONNECTION_LIMITS)
783 kwargs.setdefault("follow_redirects", True)
784 super().__init__(**kwargs)
785
786
787if TYPE_CHECKING:
788 DefaultHttpxClient = httpx.Client
789 """An alias to `httpx.Client` that provides the same defaults that this SDK
790 uses internally.
791
792 This is useful because overriding the `http_client` with your own instance of
793 `httpx.Client` will result in httpx's defaults being used, not ours.
794 """
795else:
796 DefaultHttpxClient = _DefaultHttpxClient
797
798
799class SyncHttpxClientWrapper(DefaultHttpxClient):
800 def __del__(self) -> None:
801 if self.is_closed:
802 return
803
804 try:
805 self.close()
806 except Exception:
807 pass
808
809
810class SyncAPIClient(BaseClient[httpx.Client, Stream[Any]]):
811 _client: httpx.Client
812 _default_stream_cls: type[Stream[Any]] | None = None
813
814 def __init__(
815 self,
816 *,
817 version: str,
818 base_url: str | URL,
819 max_retries: int = DEFAULT_MAX_RETRIES,
820 timeout: float | Timeout | None | NotGiven = NOT_GIVEN,
821 http_client: httpx.Client | None = None,
822 custom_headers: Mapping[str, str] | None = None,
823 custom_query: Mapping[str, object] | None = None,
824 _strict_response_validation: bool,
825 ) -> None:
826 if not is_given(timeout):
827 # if the user passed in a custom http client with a non-default
828 # timeout set then we use that timeout.
829 #
830 # note: there is an edge case here where the user passes in a client
831 # where they've explicitly set the timeout to match the default timeout
832 # as this check is structural, meaning that we'll think they didn't
833 # pass in a timeout and will ignore it
834 if http_client and http_client.timeout != HTTPX_DEFAULT_TIMEOUT:
835 timeout = http_client.timeout
836 else:
837 timeout = DEFAULT_TIMEOUT
838
839 if http_client is not None and not isinstance(http_client, httpx.Client): # pyright: ignore[reportUnnecessaryIsInstance]
840 raise TypeError(
841 f"Invalid `http_client` argument; Expected an instance of `httpx.Client` but got {type(http_client)}"
842 )
843
844 super().__init__(
845 version=version,
846 # cast to a valid type because mypy doesn't understand our type narrowing
847 timeout=cast(Timeout, timeout),
848 base_url=base_url,
849 max_retries=max_retries,
850 custom_query=custom_query,
851 custom_headers=custom_headers,
852 _strict_response_validation=_strict_response_validation,
853 )
854 self._client = http_client or SyncHttpxClientWrapper(
855 base_url=base_url,
856 # cast to a valid type because mypy doesn't understand our type narrowing
857 timeout=cast(Timeout, timeout),
858 )
859
860 def is_closed(self) -> bool:
861 return self._client.is_closed
862
863 def close(self) -> None:
864 """Close the underlying HTTPX client.
865
866 The client will *not* be usable after this.
867 """
868 # If an error is thrown while constructing a client, self._client
869 # may not be present
870 if hasattr(self, "_client"):
871 self._client.close()
872
873 def __enter__(self: _T) -> _T:
874 return self
875
876 def __exit__(
877 self,
878 exc_type: type[BaseException] | None,
879 exc: BaseException | None,
880 exc_tb: TracebackType | None,
881 ) -> None:
882 self.close()
883
884 def _prepare_options(
885 self,
886 options: FinalRequestOptions, # noqa: ARG002
887 ) -> FinalRequestOptions:
888 """Hook for mutating the given options"""
889 return options
890
891 def _prepare_request(
892 self,
893 request: httpx.Request, # noqa: ARG002
894 ) -> None:
895 """This method is used as a callback for mutating the `Request` object
896 after it has been constructed.
897 This is useful for cases where you want to add certain headers based off of
898 the request properties, e.g. `url`, `method` etc.
899 """
900 return None
901
902 @overload
903 def request(
904 self,
905 cast_to: Type[ResponseT],
906 options: FinalRequestOptions,
907 *,
908 stream: Literal[True],
909 stream_cls: Type[_StreamT],
910 ) -> _StreamT: ...
911
912 @overload
913 def request(
914 self,
915 cast_to: Type[ResponseT],
916 options: FinalRequestOptions,
917 *,
918 stream: Literal[False] = False,
919 ) -> ResponseT: ...
920
921 @overload
922 def request(
923 self,
924 cast_to: Type[ResponseT],
925 options: FinalRequestOptions,
926 *,
927 stream: bool = False,
928 stream_cls: Type[_StreamT] | None = None,
929 ) -> ResponseT | _StreamT: ...
930
931 def request(
932 self,
933 cast_to: Type[ResponseT],
934 options: FinalRequestOptions,
935 *,
936 stream: bool = False,
937 stream_cls: type[_StreamT] | None = None,
938 ) -> ResponseT | _StreamT:
939 cast_to = self._maybe_override_cast_to(cast_to, options)
940
941 # create a copy of the options we were given so that if the
942 # options are mutated later & we then retry, the retries are
943 # given the original options
944 input_options = model_copy(options)
945 if input_options.idempotency_key is None and input_options.method.lower() != "get":
946 # ensure the idempotency key is reused between requests
947 input_options.idempotency_key = self._idempotency_key()
948
949 response: httpx.Response | None = None
950 max_retries = input_options.get_max_retries(self.max_retries)
951
952 retries_taken = 0
953 for retries_taken in range(max_retries + 1):
954 options = model_copy(input_options)
955 options = self._prepare_options(options)
956
957 remaining_retries = max_retries - retries_taken
958 request = self._build_request(options, retries_taken=retries_taken)
959 self._prepare_request(request)
960
961 kwargs: HttpxSendArgs = {}
962 if self.custom_auth is not None:
963 kwargs["auth"] = self.custom_auth
964
965 if options.follow_redirects is not None:
966 kwargs["follow_redirects"] = options.follow_redirects
967
968 log.debug("Sending HTTP Request: %s %s", request.method, request.url)
969
970 response = None
971 try:
972 response = self._client.send(
973 request,
974 stream=stream or self._should_stream_response_body(request=request),
975 **kwargs,
976 )
977 except httpx.TimeoutException as err:
978 log.debug("Encountered httpx.TimeoutException", exc_info=True)
979
980 if remaining_retries > 0:
981 self._sleep_for_retry(
982 retries_taken=retries_taken,
983 max_retries=max_retries,
984 options=input_options,
985 response=None,
986 )
987 continue
988
989 log.debug("Raising timeout error")
990 raise APITimeoutError(request=request) from err
991 except Exception as err:
992 log.debug("Encountered Exception", exc_info=True)
993
994 if remaining_retries > 0:
995 self._sleep_for_retry(
996 retries_taken=retries_taken,
997 max_retries=max_retries,
998 options=input_options,
999 response=None,
1000 )
1001 continue
1002
1003 log.debug("Raising connection error")
1004 raise APIConnectionError(request=request) from err
1005
1006 log.debug(
1007 'HTTP Response: %s %s "%i %s" %s',
1008 request.method,
1009 request.url,
1010 response.status_code,
1011 response.reason_phrase,
1012 response.headers,
1013 )
1014 log.debug("request_id: %s", response.headers.get("x-request-id"))
1015
1016 try:
1017 response.raise_for_status()
1018 except httpx.HTTPStatusError as err: # thrown on 4xx and 5xx status code
1019 log.debug("Encountered httpx.HTTPStatusError", exc_info=True)
1020
1021 if remaining_retries > 0 and self._should_retry(err.response):
1022 err.response.close()
1023 self._sleep_for_retry(
1024 retries_taken=retries_taken,
1025 max_retries=max_retries,
1026 options=input_options,
1027 response=response,
1028 )
1029 continue
1030
1031 # If the response is streamed then we need to explicitly read the response
1032 # to completion before attempting to access the response text.
1033 if not err.response.is_closed:
1034 err.response.read()
1035
1036 log.debug("Re-raising status error")
1037 raise self._make_status_error_from_response(err.response) from None
1038
1039 break
1040
1041 assert response is not None, "could not resolve response (should never happen)"
1042 return self._process_response(
1043 cast_to=cast_to,
1044 options=options,
1045 response=response,
1046 stream=stream,
1047 stream_cls=stream_cls,
1048 retries_taken=retries_taken,
1049 )
1050
1051 def _sleep_for_retry(
1052 self, *, retries_taken: int, max_retries: int, options: FinalRequestOptions, response: httpx.Response | None
1053 ) -> None:
1054 remaining_retries = max_retries - retries_taken
1055 if remaining_retries == 1:
1056 log.debug("1 retry left")
1057 else:
1058 log.debug("%i retries left", remaining_retries)
1059
1060 timeout = self._calculate_retry_timeout(remaining_retries, options, response.headers if response else None)
1061 log.info("Retrying request to %s in %f seconds", options.url, timeout)
1062
1063 time.sleep(timeout)
1064
1065 def _process_response(
1066 self,
1067 *,
1068 cast_to: Type[ResponseT],
1069 options: FinalRequestOptions,
1070 response: httpx.Response,
1071 stream: bool,
1072 stream_cls: type[Stream[Any]] | type[AsyncStream[Any]] | None,
1073 retries_taken: int = 0,
1074 ) -> ResponseT:
1075 if response.request.headers.get(RAW_RESPONSE_HEADER) == "true":
1076 return cast(
1077 ResponseT,
1078 LegacyAPIResponse(
1079 raw=response,
1080 client=self,
1081 cast_to=cast_to,
1082 stream=stream,
1083 stream_cls=stream_cls,
1084 options=options,
1085 retries_taken=retries_taken,
1086 ),
1087 )
1088
1089 origin = get_origin(cast_to) or cast_to
1090
1091 if (
1092 inspect.isclass(origin)
1093 and issubclass(origin, BaseAPIResponse)
1094 # we only want to actually return the custom BaseAPIResponse class if we're
1095 # returning the raw response, or if we're not streaming SSE, as if we're streaming
1096 # SSE then `cast_to` doesn't actively reflect the type we need to parse into
1097 and (not stream or bool(response.request.headers.get(RAW_RESPONSE_HEADER)))
1098 ):
1099 if not issubclass(origin, APIResponse):
1100 raise TypeError(f"API Response types must subclass {APIResponse}; Received {origin}")
1101
1102 response_cls = cast("type[BaseAPIResponse[Any]]", cast_to)
1103 return cast(
1104 ResponseT,
1105 response_cls(
1106 raw=response,
1107 client=self,
1108 cast_to=extract_response_type(response_cls),
1109 stream=stream,
1110 stream_cls=stream_cls,
1111 options=options,
1112 retries_taken=retries_taken,
1113 ),
1114 )
1115
1116 if cast_to == httpx.Response:
1117 return cast(ResponseT, response)
1118
1119 api_response = APIResponse(
1120 raw=response,
1121 client=self,
1122 cast_to=cast("type[ResponseT]", cast_to), # pyright: ignore[reportUnnecessaryCast]
1123 stream=stream,
1124 stream_cls=stream_cls,
1125 options=options,
1126 retries_taken=retries_taken,
1127 )
1128 if bool(response.request.headers.get(RAW_RESPONSE_HEADER)):
1129 return cast(ResponseT, api_response)
1130
1131 return api_response.parse()
1132
1133 def _request_api_list(
1134 self,
1135 model: Type[object],
1136 page: Type[SyncPageT],
1137 options: FinalRequestOptions,
1138 ) -> SyncPageT:
1139 def _parser(resp: SyncPageT) -> SyncPageT:
1140 resp._set_private_attributes(
1141 client=self,
1142 model=model,
1143 options=options,
1144 )
1145 return resp
1146
1147 options.post_parser = _parser
1148
1149 return self.request(page, options, stream=False)
1150
1151 @overload
1152 def get(
1153 self,
1154 path: str,
1155 *,
1156 cast_to: Type[ResponseT],
1157 options: RequestOptions = {},
1158 stream: Literal[False] = False,
1159 ) -> ResponseT: ...
1160
1161 @overload
1162 def get(
1163 self,
1164 path: str,
1165 *,
1166 cast_to: Type[ResponseT],
1167 options: RequestOptions = {},
1168 stream: Literal[True],
1169 stream_cls: type[_StreamT],
1170 ) -> _StreamT: ...
1171
1172 @overload
1173 def get(
1174 self,
1175 path: str,
1176 *,
1177 cast_to: Type[ResponseT],
1178 options: RequestOptions = {},
1179 stream: bool,
1180 stream_cls: type[_StreamT] | None = None,
1181 ) -> ResponseT | _StreamT: ...
1182
1183 def get(
1184 self,
1185 path: str,
1186 *,
1187 cast_to: Type[ResponseT],
1188 options: RequestOptions = {},
1189 stream: bool = False,
1190 stream_cls: type[_StreamT] | None = None,
1191 ) -> ResponseT | _StreamT:
1192 opts = FinalRequestOptions.construct(method="get", url=path, **options)
1193 # cast is required because mypy complains about returning Any even though
1194 # it understands the type variables
1195 return cast(ResponseT, self.request(cast_to, opts, stream=stream, stream_cls=stream_cls))
1196
1197 @overload
1198 def post(
1199 self,
1200 path: str,
1201 *,
1202 cast_to: Type[ResponseT],
1203 body: Body | None = None,
1204 options: RequestOptions = {},
1205 files: RequestFiles | None = None,
1206 stream: Literal[False] = False,
1207 ) -> ResponseT: ...
1208
1209 @overload
1210 def post(
1211 self,
1212 path: str,
1213 *,
1214 cast_to: Type[ResponseT],
1215 body: Body | None = None,
1216 options: RequestOptions = {},
1217 files: RequestFiles | None = None,
1218 stream: Literal[True],
1219 stream_cls: type[_StreamT],
1220 ) -> _StreamT: ...
1221
1222 @overload
1223 def post(
1224 self,
1225 path: str,
1226 *,
1227 cast_to: Type[ResponseT],
1228 body: Body | None = None,
1229 options: RequestOptions = {},
1230 files: RequestFiles | None = None,
1231 stream: bool,
1232 stream_cls: type[_StreamT] | None = None,
1233 ) -> ResponseT | _StreamT: ...
1234
1235 def post(
1236 self,
1237 path: str,
1238 *,
1239 cast_to: Type[ResponseT],
1240 body: Body | None = None,
1241 options: RequestOptions = {},
1242 files: RequestFiles | None = None,
1243 stream: bool = False,
1244 stream_cls: type[_StreamT] | None = None,
1245 ) -> ResponseT | _StreamT:
1246 opts = FinalRequestOptions.construct(
1247 method="post", url=path, json_data=body, files=to_httpx_files(files), **options
1248 )
1249 return cast(ResponseT, self.request(cast_to, opts, stream=stream, stream_cls=stream_cls))
1250
1251 def patch(
1252 self,
1253 path: str,
1254 *,
1255 cast_to: Type[ResponseT],
1256 body: Body | None = None,
1257 options: RequestOptions = {},
1258 ) -> ResponseT:
1259 opts = FinalRequestOptions.construct(method="patch", url=path, json_data=body, **options)
1260 return self.request(cast_to, opts)
1261
1262 def put(
1263 self,
1264 path: str,
1265 *,
1266 cast_to: Type[ResponseT],
1267 body: Body | None = None,
1268 files: RequestFiles | None = None,
1269 options: RequestOptions = {},
1270 ) -> ResponseT:
1271 opts = FinalRequestOptions.construct(
1272 method="put", url=path, json_data=body, files=to_httpx_files(files), **options
1273 )
1274 return self.request(cast_to, opts)
1275
1276 def delete(
1277 self,
1278 path: str,
1279 *,
1280 cast_to: Type[ResponseT],
1281 body: Body | None = None,
1282 options: RequestOptions = {},
1283 ) -> ResponseT:
1284 opts = FinalRequestOptions.construct(method="delete", url=path, json_data=body, **options)
1285 return self.request(cast_to, opts)
1286
1287 def get_api_list(
1288 self,
1289 path: str,
1290 *,
1291 model: Type[object],
1292 page: Type[SyncPageT],
1293 body: Body | None = None,
1294 options: RequestOptions = {},
1295 method: str = "get",
1296 ) -> SyncPageT:
1297 opts = FinalRequestOptions.construct(method=method, url=path, json_data=body, **options)
1298 return self._request_api_list(model, page, opts)
1299
1300
1301class _DefaultAsyncHttpxClient(httpx.AsyncClient):
1302 def __init__(self, **kwargs: Any) -> None:
1303 kwargs.setdefault("timeout", DEFAULT_TIMEOUT)
1304 kwargs.setdefault("limits", DEFAULT_CONNECTION_LIMITS)
1305 kwargs.setdefault("follow_redirects", True)
1306 super().__init__(**kwargs)
1307
1308
1309try:
1310 import httpx_aiohttp
1311except ImportError:
1312
1313 class _DefaultAioHttpClient(httpx.AsyncClient):
1314 def __init__(self, **_kwargs: Any) -> None:
1315 raise RuntimeError("To use the aiohttp client you must have installed the package with the `aiohttp` extra")
1316else:
1317
1318 class _DefaultAioHttpClient(httpx_aiohttp.HttpxAiohttpClient): # type: ignore
1319 def __init__(self, **kwargs: Any) -> None:
1320 kwargs.setdefault("timeout", DEFAULT_TIMEOUT)
1321 kwargs.setdefault("limits", DEFAULT_CONNECTION_LIMITS)
1322 kwargs.setdefault("follow_redirects", True)
1323
1324 super().__init__(**kwargs)
1325
1326
1327if TYPE_CHECKING:
1328 DefaultAsyncHttpxClient = httpx.AsyncClient
1329 """An alias to `httpx.AsyncClient` that provides the same defaults that this SDK
1330 uses internally.
1331
1332 This is useful because overriding the `http_client` with your own instance of
1333 `httpx.AsyncClient` will result in httpx's defaults being used, not ours.
1334 """
1335
1336 DefaultAioHttpClient = httpx.AsyncClient
1337 """An alias to `httpx.AsyncClient` that changes the default HTTP transport to `aiohttp`."""
1338else:
1339 DefaultAsyncHttpxClient = _DefaultAsyncHttpxClient
1340 DefaultAioHttpClient = _DefaultAioHttpClient
1341
1342
1343class AsyncHttpxClientWrapper(DefaultAsyncHttpxClient):
1344 def __del__(self) -> None:
1345 if self.is_closed:
1346 return
1347
1348 try:
1349 # TODO(someday): support non asyncio runtimes here
1350 asyncio.get_running_loop().create_task(self.aclose())
1351 except Exception:
1352 pass
1353
1354
1355class AsyncAPIClient(BaseClient[httpx.AsyncClient, AsyncStream[Any]]):
1356 _client: httpx.AsyncClient
1357 _default_stream_cls: type[AsyncStream[Any]] | None = None
1358
1359 def __init__(
1360 self,
1361 *,
1362 version: str,
1363 base_url: str | URL,
1364 _strict_response_validation: bool,
1365 max_retries: int = DEFAULT_MAX_RETRIES,
1366 timeout: float | Timeout | None | NotGiven = NOT_GIVEN,
1367 http_client: httpx.AsyncClient | None = None,
1368 custom_headers: Mapping[str, str] | None = None,
1369 custom_query: Mapping[str, object] | None = None,
1370 ) -> None:
1371 if not is_given(timeout):
1372 # if the user passed in a custom http client with a non-default
1373 # timeout set then we use that timeout.
1374 #
1375 # note: there is an edge case here where the user passes in a client
1376 # where they've explicitly set the timeout to match the default timeout
1377 # as this check is structural, meaning that we'll think they didn't
1378 # pass in a timeout and will ignore it
1379 if http_client and http_client.timeout != HTTPX_DEFAULT_TIMEOUT:
1380 timeout = http_client.timeout
1381 else:
1382 timeout = DEFAULT_TIMEOUT
1383
1384 if http_client is not None and not isinstance(http_client, httpx.AsyncClient): # pyright: ignore[reportUnnecessaryIsInstance]
1385 raise TypeError(
1386 f"Invalid `http_client` argument; Expected an instance of `httpx.AsyncClient` but got {type(http_client)}"
1387 )
1388
1389 super().__init__(
1390 version=version,
1391 base_url=base_url,
1392 # cast to a valid type because mypy doesn't understand our type narrowing
1393 timeout=cast(Timeout, timeout),
1394 max_retries=max_retries,
1395 custom_query=custom_query,
1396 custom_headers=custom_headers,
1397 _strict_response_validation=_strict_response_validation,
1398 )
1399 self._client = http_client or AsyncHttpxClientWrapper(
1400 base_url=base_url,
1401 # cast to a valid type because mypy doesn't understand our type narrowing
1402 timeout=cast(Timeout, timeout),
1403 )
1404
1405 def is_closed(self) -> bool:
1406 return self._client.is_closed
1407
1408 async def close(self) -> None:
1409 """Close the underlying HTTPX client.
1410
1411 The client will *not* be usable after this.
1412 """
1413 await self._client.aclose()
1414
1415 async def __aenter__(self: _T) -> _T:
1416 return self
1417
1418 async def __aexit__(
1419 self,
1420 exc_type: type[BaseException] | None,
1421 exc: BaseException | None,
1422 exc_tb: TracebackType | None,
1423 ) -> None:
1424 await self.close()
1425
1426 async def _prepare_options(
1427 self,
1428 options: FinalRequestOptions, # noqa: ARG002
1429 ) -> FinalRequestOptions:
1430 """Hook for mutating the given options"""
1431 return options
1432
1433 async def _prepare_request(
1434 self,
1435 request: httpx.Request, # noqa: ARG002
1436 ) -> None:
1437 """This method is used as a callback for mutating the `Request` object
1438 after it has been constructed.
1439 This is useful for cases where you want to add certain headers based off of
1440 the request properties, e.g. `url`, `method` etc.
1441 """
1442 return None
1443
1444 @overload
1445 async def request(
1446 self,
1447 cast_to: Type[ResponseT],
1448 options: FinalRequestOptions,
1449 *,
1450 stream: Literal[False] = False,
1451 ) -> ResponseT: ...
1452
1453 @overload
1454 async def request(
1455 self,
1456 cast_to: Type[ResponseT],
1457 options: FinalRequestOptions,
1458 *,
1459 stream: Literal[True],
1460 stream_cls: type[_AsyncStreamT],
1461 ) -> _AsyncStreamT: ...
1462
1463 @overload
1464 async def request(
1465 self,
1466 cast_to: Type[ResponseT],
1467 options: FinalRequestOptions,
1468 *,
1469 stream: bool,
1470 stream_cls: type[_AsyncStreamT] | None = None,
1471 ) -> ResponseT | _AsyncStreamT: ...
1472
1473 async def request(
1474 self,
1475 cast_to: Type[ResponseT],
1476 options: FinalRequestOptions,
1477 *,
1478 stream: bool = False,
1479 stream_cls: type[_AsyncStreamT] | None = None,
1480 ) -> ResponseT | _AsyncStreamT:
1481 if self._platform is None:
1482 # `get_platform` can make blocking IO calls so we
1483 # execute it earlier while we are in an async context
1484 self._platform = await asyncify(get_platform)()
1485
1486 cast_to = self._maybe_override_cast_to(cast_to, options)
1487
1488 # create a copy of the options we were given so that if the
1489 # options are mutated later & we then retry, the retries are
1490 # given the original options
1491 input_options = model_copy(options)
1492 if input_options.idempotency_key is None and input_options.method.lower() != "get":
1493 # ensure the idempotency key is reused between requests
1494 input_options.idempotency_key = self._idempotency_key()
1495
1496 response: httpx.Response | None = None
1497 max_retries = input_options.get_max_retries(self.max_retries)
1498
1499 retries_taken = 0
1500 for retries_taken in range(max_retries + 1):
1501 options = model_copy(input_options)
1502 options = await self._prepare_options(options)
1503
1504 remaining_retries = max_retries - retries_taken
1505 request = self._build_request(options, retries_taken=retries_taken)
1506 await self._prepare_request(request)
1507
1508 kwargs: HttpxSendArgs = {}
1509 if self.custom_auth is not None:
1510 kwargs["auth"] = self.custom_auth
1511
1512 if options.follow_redirects is not None:
1513 kwargs["follow_redirects"] = options.follow_redirects
1514
1515 log.debug("Sending HTTP Request: %s %s", request.method, request.url)
1516
1517 response = None
1518 try:
1519 response = await self._client.send(
1520 request,
1521 stream=stream or self._should_stream_response_body(request=request),
1522 **kwargs,
1523 )
1524 except httpx.TimeoutException as err:
1525 log.debug("Encountered httpx.TimeoutException", exc_info=True)
1526
1527 if remaining_retries > 0:
1528 await self._sleep_for_retry(
1529 retries_taken=retries_taken,
1530 max_retries=max_retries,
1531 options=input_options,
1532 response=None,
1533 )
1534 continue
1535
1536 log.debug("Raising timeout error")
1537 raise APITimeoutError(request=request) from err
1538 except Exception as err:
1539 log.debug("Encountered Exception", exc_info=True)
1540
1541 if remaining_retries > 0:
1542 await self._sleep_for_retry(
1543 retries_taken=retries_taken,
1544 max_retries=max_retries,
1545 options=input_options,
1546 response=None,
1547 )
1548 continue
1549
1550 log.debug("Raising connection error")
1551 raise APIConnectionError(request=request) from err
1552
1553 log.debug(
1554 'HTTP Response: %s %s "%i %s" %s',
1555 request.method,
1556 request.url,
1557 response.status_code,
1558 response.reason_phrase,
1559 response.headers,
1560 )
1561 log.debug("request_id: %s", response.headers.get("x-request-id"))
1562
1563 try:
1564 response.raise_for_status()
1565 except httpx.HTTPStatusError as err: # thrown on 4xx and 5xx status code
1566 log.debug("Encountered httpx.HTTPStatusError", exc_info=True)
1567
1568 if remaining_retries > 0 and self._should_retry(err.response):
1569 await err.response.aclose()
1570 await self._sleep_for_retry(
1571 retries_taken=retries_taken,
1572 max_retries=max_retries,
1573 options=input_options,
1574 response=response,
1575 )
1576 continue
1577
1578 # If the response is streamed then we need to explicitly read the response
1579 # to completion before attempting to access the response text.
1580 if not err.response.is_closed:
1581 await err.response.aread()
1582
1583 log.debug("Re-raising status error")
1584 raise self._make_status_error_from_response(err.response) from None
1585
1586 break
1587
1588 assert response is not None, "could not resolve response (should never happen)"
1589 return await self._process_response(
1590 cast_to=cast_to,
1591 options=options,
1592 response=response,
1593 stream=stream,
1594 stream_cls=stream_cls,
1595 retries_taken=retries_taken,
1596 )
1597
1598 async def _sleep_for_retry(
1599 self, *, retries_taken: int, max_retries: int, options: FinalRequestOptions, response: httpx.Response | None
1600 ) -> None:
1601 remaining_retries = max_retries - retries_taken
1602 if remaining_retries == 1:
1603 log.debug("1 retry left")
1604 else:
1605 log.debug("%i retries left", remaining_retries)
1606
1607 timeout = self._calculate_retry_timeout(remaining_retries, options, response.headers if response else None)
1608 log.info("Retrying request to %s in %f seconds", options.url, timeout)
1609
1610 await anyio.sleep(timeout)
1611
1612 async def _process_response(
1613 self,
1614 *,
1615 cast_to: Type[ResponseT],
1616 options: FinalRequestOptions,
1617 response: httpx.Response,
1618 stream: bool,
1619 stream_cls: type[Stream[Any]] | type[AsyncStream[Any]] | None,
1620 retries_taken: int = 0,
1621 ) -> ResponseT:
1622 if response.request.headers.get(RAW_RESPONSE_HEADER) == "true":
1623 return cast(
1624 ResponseT,
1625 LegacyAPIResponse(
1626 raw=response,
1627 client=self,
1628 cast_to=cast_to,
1629 stream=stream,
1630 stream_cls=stream_cls,
1631 options=options,
1632 retries_taken=retries_taken,
1633 ),
1634 )
1635
1636 origin = get_origin(cast_to) or cast_to
1637
1638 if (
1639 inspect.isclass(origin)
1640 and issubclass(origin, BaseAPIResponse)
1641 # we only want to actually return the custom BaseAPIResponse class if we're
1642 # returning the raw response, or if we're not streaming SSE, as if we're streaming
1643 # SSE then `cast_to` doesn't actively reflect the type we need to parse into
1644 and (not stream or bool(response.request.headers.get(RAW_RESPONSE_HEADER)))
1645 ):
1646 if not issubclass(origin, AsyncAPIResponse):
1647 raise TypeError(f"API Response types must subclass {AsyncAPIResponse}; Received {origin}")
1648
1649 response_cls = cast("type[BaseAPIResponse[Any]]", cast_to)
1650 return cast(
1651 "ResponseT",
1652 response_cls(
1653 raw=response,
1654 client=self,
1655 cast_to=extract_response_type(response_cls),
1656 stream=stream,
1657 stream_cls=stream_cls,
1658 options=options,
1659 retries_taken=retries_taken,
1660 ),
1661 )
1662
1663 if cast_to == httpx.Response:
1664 return cast(ResponseT, response)
1665
1666 api_response = AsyncAPIResponse(
1667 raw=response,
1668 client=self,
1669 cast_to=cast("type[ResponseT]", cast_to), # pyright: ignore[reportUnnecessaryCast]
1670 stream=stream,
1671 stream_cls=stream_cls,
1672 options=options,
1673 retries_taken=retries_taken,
1674 )
1675 if bool(response.request.headers.get(RAW_RESPONSE_HEADER)):
1676 return cast(ResponseT, api_response)
1677
1678 return await api_response.parse()
1679
1680 def _request_api_list(
1681 self,
1682 model: Type[_T],
1683 page: Type[AsyncPageT],
1684 options: FinalRequestOptions,
1685 ) -> AsyncPaginator[_T, AsyncPageT]:
1686 return AsyncPaginator(client=self, options=options, page_cls=page, model=model)
1687
1688 @overload
1689 async def get(
1690 self,
1691 path: str,
1692 *,
1693 cast_to: Type[ResponseT],
1694 options: RequestOptions = {},
1695 stream: Literal[False] = False,
1696 ) -> ResponseT: ...
1697
1698 @overload
1699 async def get(
1700 self,
1701 path: str,
1702 *,
1703 cast_to: Type[ResponseT],
1704 options: RequestOptions = {},
1705 stream: Literal[True],
1706 stream_cls: type[_AsyncStreamT],
1707 ) -> _AsyncStreamT: ...
1708
1709 @overload
1710 async def get(
1711 self,
1712 path: str,
1713 *,
1714 cast_to: Type[ResponseT],
1715 options: RequestOptions = {},
1716 stream: bool,
1717 stream_cls: type[_AsyncStreamT] | None = None,
1718 ) -> ResponseT | _AsyncStreamT: ...
1719
1720 async def get(
1721 self,
1722 path: str,
1723 *,
1724 cast_to: Type[ResponseT],
1725 options: RequestOptions = {},
1726 stream: bool = False,
1727 stream_cls: type[_AsyncStreamT] | None = None,
1728 ) -> ResponseT | _AsyncStreamT:
1729 opts = FinalRequestOptions.construct(method="get", url=path, **options)
1730 return await self.request(cast_to, opts, stream=stream, stream_cls=stream_cls)
1731
1732 @overload
1733 async def post(
1734 self,
1735 path: str,
1736 *,
1737 cast_to: Type[ResponseT],
1738 body: Body | None = None,
1739 files: RequestFiles | None = None,
1740 options: RequestOptions = {},
1741 stream: Literal[False] = False,
1742 ) -> ResponseT: ...
1743
1744 @overload
1745 async def post(
1746 self,
1747 path: str,
1748 *,
1749 cast_to: Type[ResponseT],
1750 body: Body | None = None,
1751 files: RequestFiles | None = None,
1752 options: RequestOptions = {},
1753 stream: Literal[True],
1754 stream_cls: type[_AsyncStreamT],
1755 ) -> _AsyncStreamT: ...
1756
1757 @overload
1758 async def post(
1759 self,
1760 path: str,
1761 *,
1762 cast_to: Type[ResponseT],
1763 body: Body | None = None,
1764 files: RequestFiles | None = None,
1765 options: RequestOptions = {},
1766 stream: bool,
1767 stream_cls: type[_AsyncStreamT] | None = None,
1768 ) -> ResponseT | _AsyncStreamT: ...
1769
1770 async def post(
1771 self,
1772 path: str,
1773 *,
1774 cast_to: Type[ResponseT],
1775 body: Body | None = None,
1776 files: RequestFiles | None = None,
1777 options: RequestOptions = {},
1778 stream: bool = False,
1779 stream_cls: type[_AsyncStreamT] | None = None,
1780 ) -> ResponseT | _AsyncStreamT:
1781 opts = FinalRequestOptions.construct(
1782 method="post", url=path, json_data=body, files=await async_to_httpx_files(files), **options
1783 )
1784 return await self.request(cast_to, opts, stream=stream, stream_cls=stream_cls)
1785
1786 async def patch(
1787 self,
1788 path: str,
1789 *,
1790 cast_to: Type[ResponseT],
1791 body: Body | None = None,
1792 options: RequestOptions = {},
1793 ) -> ResponseT:
1794 opts = FinalRequestOptions.construct(method="patch", url=path, json_data=body, **options)
1795 return await self.request(cast_to, opts)
1796
1797 async def put(
1798 self,
1799 path: str,
1800 *,
1801 cast_to: Type[ResponseT],
1802 body: Body | None = None,
1803 files: RequestFiles | None = None,
1804 options: RequestOptions = {},
1805 ) -> ResponseT:
1806 opts = FinalRequestOptions.construct(
1807 method="put", url=path, json_data=body, files=await async_to_httpx_files(files), **options
1808 )
1809 return await self.request(cast_to, opts)
1810
1811 async def delete(
1812 self,
1813 path: str,
1814 *,
1815 cast_to: Type[ResponseT],
1816 body: Body | None = None,
1817 options: RequestOptions = {},
1818 ) -> ResponseT:
1819 opts = FinalRequestOptions.construct(method="delete", url=path, json_data=body, **options)
1820 return await self.request(cast_to, opts)
1821
1822 def get_api_list(
1823 self,
1824 path: str,
1825 *,
1826 model: Type[_T],
1827 page: Type[AsyncPageT],
1828 body: Body | None = None,
1829 options: RequestOptions = {},
1830 method: str = "get",
1831 ) -> AsyncPaginator[_T, AsyncPageT]:
1832 opts = FinalRequestOptions.construct(method=method, url=path, json_data=body, **options)
1833 return self._request_api_list(model, page, opts)
1834
1835
1836def make_request_options(
1837 *,
1838 query: Query | None = None,
1839 extra_headers: Headers | None = None,
1840 extra_query: Query | None = None,
1841 extra_body: Body | None = None,
1842 idempotency_key: str | None = None,
1843 timeout: float | httpx.Timeout | None | NotGiven = NOT_GIVEN,
1844 post_parser: PostParser | NotGiven = NOT_GIVEN,
1845) -> RequestOptions:
1846 """Create a dict of type RequestOptions without keys of NotGiven values."""
1847 options: RequestOptions = {}
1848 if extra_headers is not None:
1849 options["headers"] = extra_headers
1850
1851 if extra_body is not None:
1852 options["extra_json"] = cast(AnyMapping, extra_body)
1853
1854 if query is not None:
1855 options["params"] = query
1856
1857 if extra_query is not None:
1858 options["params"] = {**options.get("params", {}), **extra_query}
1859
1860 if not isinstance(timeout, NotGiven):
1861 options["timeout"] = timeout
1862
1863 if idempotency_key is not None:
1864 options["idempotency_key"] = idempotency_key
1865
1866 if is_given(post_parser):
1867 # internal
1868 options["post_parser"] = post_parser # type: ignore
1869
1870 return options
1871
1872
1873class ForceMultipartDict(Dict[str, None]):
1874 def __bool__(self) -> bool:
1875 return True
1876
1877
1878class OtherPlatform:
1879 def __init__(self, name: str) -> None:
1880 self.name = name
1881
1882 @override
1883 def __str__(self) -> str:
1884 return f"Other:{self.name}"
1885
1886
1887Platform = Union[
1888 OtherPlatform,
1889 Literal[
1890 "MacOS",
1891 "Linux",
1892 "Windows",
1893 "FreeBSD",
1894 "OpenBSD",
1895 "iOS",
1896 "Android",
1897 "Unknown",
1898 ],
1899]
1900
1901
1902def get_platform() -> Platform:
1903 try:
1904 system = platform.system().lower()
1905 platform_name = platform.platform().lower()
1906 except Exception:
1907 return "Unknown"
1908
1909 if "iphone" in platform_name or "ipad" in platform_name:
1910 # Tested using Python3IDE on an iPhone 11 and Pythonista on an iPad 7
1911 # system is Darwin and platform_name is a string like:
1912 # - Darwin-21.6.0-iPhone12,1-64bit
1913 # - Darwin-21.6.0-iPad7,11-64bit
1914 return "iOS"
1915
1916 if system == "darwin":
1917 return "MacOS"
1918
1919 if system == "windows":
1920 return "Windows"
1921
1922 if "android" in platform_name:
1923 # Tested using Pydroid 3
1924 # system is Linux and platform_name is a string like 'Linux-5.10.81-android12-9-00001-geba40aecb3b7-ab8534902-aarch64-with-libc'
1925 return "Android"
1926
1927 if system == "linux":
1928 # https://distro.readthedocs.io/en/latest/#distro.id
1929 distro_id = distro.id()
1930 if distro_id == "freebsd":
1931 return "FreeBSD"
1932
1933 if distro_id == "openbsd":
1934 return "OpenBSD"
1935
1936 return "Linux"
1937
1938 if platform_name:
1939 return OtherPlatform(platform_name)
1940
1941 return "Unknown"
1942
1943
1944@lru_cache(maxsize=None)
1945def platform_headers(version: str, *, platform: Platform | None) -> Dict[str, str]:
1946 return {
1947 "X-Stainless-Lang": "python",
1948 "X-Stainless-Package-Version": version,
1949 "X-Stainless-OS": str(platform or get_platform()),
1950 "X-Stainless-Arch": str(get_architecture()),
1951 "X-Stainless-Runtime": get_python_runtime(),
1952 "X-Stainless-Runtime-Version": get_python_version(),
1953 }
1954
1955
1956class OtherArch:
1957 def __init__(self, name: str) -> None:
1958 self.name = name
1959
1960 @override
1961 def __str__(self) -> str:
1962 return f"other:{self.name}"
1963
1964
1965Arch = Union[OtherArch, Literal["x32", "x64", "arm", "arm64", "unknown"]]
1966
1967
1968def get_python_runtime() -> str:
1969 try:
1970 return platform.python_implementation()
1971 except Exception:
1972 return "unknown"
1973
1974
1975def get_python_version() -> str:
1976 try:
1977 return platform.python_version()
1978 except Exception:
1979 return "unknown"
1980
1981
1982def get_architecture() -> Arch:
1983 try:
1984 machine = platform.machine().lower()
1985 except Exception:
1986 return "unknown"
1987
1988 if machine in ("arm64", "aarch64"):
1989 return "arm64"
1990
1991 # TODO: untested
1992 if machine == "arm":
1993 return "arm"
1994
1995 if machine == "x86_64":
1996 return "x64"
1997
1998 # TODO: untested
1999 if sys.maxsize <= 2**32:
2000 return "x32"
2001
2002 if machine:
2003 return OtherArch(machine)
2004
2005 return "unknown"
2006
2007
2008def _merge_mappings(
2009 obj1: Mapping[_T_co, Union[_T, Omit]],
2010 obj2: Mapping[_T_co, Union[_T, Omit]],
2011) -> Dict[_T_co, _T]:
2012 """Merge two mappings of the same type, removing any values that are instances of `Omit`.
2013
2014 In cases with duplicate keys the second mapping takes precedence.
2015 """
2016 merged = {**obj1, **obj2}
2017 return {key: value for key, value in merged.items() if not isinstance(value, Omit)}