openai/openai-python

Public

mirrored from https://github.com/openai/openai-pythonAvailable

CodeCommitsIssuesPull requestsActionsInsightsSecurity
v0.26.0

Branches

Tags

  • No tags available.
0Branches0Tags
Go to file
Add file
Code

Clone

HTTPS

Download ZIP

openai/api_requestor.py

668lines · modecode

1import asyncio
2import json
3import platform
4import sys
5import threading
6import warnings
7from contextlib import asynccontextmanager
8from json import JSONDecodeError
9from typing import (
10 AsyncGenerator,
11 AsyncIterator,
12 Dict,
13 Iterator,
14 Optional,
15 Tuple,
16 Union,
17 overload,
18)
19from urllib.parse import urlencode, urlsplit, urlunsplit
20
21import aiohttp
22import requests
23
24if sys.version_info >= (3, 8):
25 from typing import Literal
26else:
27 from typing_extensions import Literal
28
29import openai
30from openai import error, util, version
31from openai.openai_response import OpenAIResponse
32from openai.util import ApiType
33
34TIMEOUT_SECS = 600
35MAX_CONNECTION_RETRIES = 2
36
37# Has one attribute per thread, 'session'.
38_thread_context = threading.local()
39
40
41def _build_api_url(url, query):
42 scheme, netloc, path, base_query, fragment = urlsplit(url)
43
44 if base_query:
45 query = "%s&%s" % (base_query, query)
46
47 return urlunsplit((scheme, netloc, path, query, fragment))
48
49
50def _requests_proxies_arg(proxy) -> Optional[Dict[str, str]]:
51 """Returns a value suitable for the 'proxies' argument to 'requests.request."""
52 if proxy is None:
53 return None
54 elif isinstance(proxy, str):
55 return {"http": proxy, "https": proxy}
56 elif isinstance(proxy, dict):
57 return proxy.copy()
58 else:
59 raise ValueError(
60 "'openai.proxy' must be specified as either a string URL or a dict with string URL under the https and/or http keys."
61 )
62
63
64def _aiohttp_proxies_arg(proxy) -> Optional[str]:
65 """Returns a value suitable for the 'proxies' argument to 'aiohttp.ClientSession.request."""
66 if proxy is None:
67 return None
68 elif isinstance(proxy, str):
69 return proxy
70 elif isinstance(proxy, dict):
71 return proxy["https"] if "https" in proxy else proxy["http"]
72 else:
73 raise ValueError(
74 "'openai.proxy' must be specified as either a string URL or a dict with string URL under the https and/or http keys."
75 )
76
77
78def _make_session() -> requests.Session:
79 if not openai.verify_ssl_certs:
80 warnings.warn("verify_ssl_certs is ignored; openai always verifies.")
81 s = requests.Session()
82 proxies = _requests_proxies_arg(openai.proxy)
83 if proxies:
84 s.proxies = proxies
85 s.mount(
86 "https://",
87 requests.adapters.HTTPAdapter(max_retries=MAX_CONNECTION_RETRIES),
88 )
89 return s
90
91
92def parse_stream_helper(line):
93 if line:
94 if line == b"data: [DONE]":
95 # return here will cause GeneratorExit exception in urllib3
96 # and it will close http connection with TCP Reset
97 return None
98 if hasattr(line, "decode"):
99 line = line.decode("utf-8")
100 if line.startswith("data: "):
101 line = line[len("data: ") :]
102 return line
103 return None
104
105
106def parse_stream(rbody):
107 for line in rbody:
108 _line = parse_stream_helper(line)
109 if _line is not None:
110 yield _line
111
112
113async def parse_stream_async(rbody: aiohttp.StreamReader):
114 async for line in rbody:
115 _line = parse_stream_helper(line)
116 if _line is not None:
117 yield _line
118
119
120class APIRequestor:
121 def __init__(
122 self,
123 key=None,
124 api_base=None,
125 api_type=None,
126 api_version=None,
127 organization=None,
128 ):
129 self.api_base = api_base or openai.api_base
130 self.api_key = key or util.default_api_key()
131 self.api_type = (
132 ApiType.from_str(api_type)
133 if api_type
134 else ApiType.from_str(openai.api_type)
135 )
136 self.api_version = api_version or openai.api_version
137 self.organization = organization or openai.organization
138
139 @classmethod
140 def format_app_info(cls, info):
141 str = info["name"]
142 if info["version"]:
143 str += "/%s" % (info["version"],)
144 if info["url"]:
145 str += " (%s)" % (info["url"],)
146 return str
147
148 @overload
149 def request(
150 self,
151 method,
152 url,
153 params,
154 headers,
155 files,
156 stream: Literal[True],
157 request_id: Optional[str] = ...,
158 request_timeout: Optional[Union[float, Tuple[float, float]]] = ...,
159 ) -> Tuple[Iterator[OpenAIResponse], bool, str]:
160 pass
161
162 @overload
163 def request(
164 self,
165 method,
166 url,
167 params=...,
168 headers=...,
169 files=...,
170 *,
171 stream: Literal[True],
172 request_id: Optional[str] = ...,
173 request_timeout: Optional[Union[float, Tuple[float, float]]] = ...,
174 ) -> Tuple[Iterator[OpenAIResponse], bool, str]:
175 pass
176
177 @overload
178 def request(
179 self,
180 method,
181 url,
182 params=...,
183 headers=...,
184 files=...,
185 stream: Literal[False] = ...,
186 request_id: Optional[str] = ...,
187 request_timeout: Optional[Union[float, Tuple[float, float]]] = ...,
188 ) -> Tuple[OpenAIResponse, bool, str]:
189 pass
190
191 @overload
192 def request(
193 self,
194 method,
195 url,
196 params=...,
197 headers=...,
198 files=...,
199 stream: bool = ...,
200 request_id: Optional[str] = ...,
201 request_timeout: Optional[Union[float, Tuple[float, float]]] = ...,
202 ) -> Tuple[Union[OpenAIResponse, Iterator[OpenAIResponse]], bool, str]:
203 pass
204
205 def request(
206 self,
207 method,
208 url,
209 params=None,
210 headers=None,
211 files=None,
212 stream: bool = False,
213 request_id: Optional[str] = None,
214 request_timeout: Optional[Union[float, Tuple[float, float]]] = None,
215 ) -> Tuple[Union[OpenAIResponse, Iterator[OpenAIResponse]], bool, str]:
216 result = self.request_raw(
217 method.lower(),
218 url,
219 params=params,
220 supplied_headers=headers,
221 files=files,
222 stream=stream,
223 request_id=request_id,
224 request_timeout=request_timeout,
225 )
226 resp, got_stream = self._interpret_response(result, stream)
227 return resp, got_stream, self.api_key
228
229 @overload
230 async def arequest(
231 self,
232 method,
233 url,
234 params,
235 headers,
236 files,
237 stream: Literal[True],
238 request_id: Optional[str] = ...,
239 request_timeout: Optional[Union[float, Tuple[float, float]]] = ...,
240 ) -> Tuple[AsyncGenerator[OpenAIResponse, None], bool, str]:
241 pass
242
243 @overload
244 async def arequest(
245 self,
246 method,
247 url,
248 params=...,
249 headers=...,
250 files=...,
251 *,
252 stream: Literal[True],
253 request_id: Optional[str] = ...,
254 request_timeout: Optional[Union[float, Tuple[float, float]]] = ...,
255 ) -> Tuple[AsyncGenerator[OpenAIResponse, None], bool, str]:
256 pass
257
258 @overload
259 async def arequest(
260 self,
261 method,
262 url,
263 params=...,
264 headers=...,
265 files=...,
266 stream: Literal[False] = ...,
267 request_id: Optional[str] = ...,
268 request_timeout: Optional[Union[float, Tuple[float, float]]] = ...,
269 ) -> Tuple[OpenAIResponse, bool, str]:
270 pass
271
272 @overload
273 async def arequest(
274 self,
275 method,
276 url,
277 params=...,
278 headers=...,
279 files=...,
280 stream: bool = ...,
281 request_id: Optional[str] = ...,
282 request_timeout: Optional[Union[float, Tuple[float, float]]] = ...,
283 ) -> Tuple[Union[OpenAIResponse, AsyncGenerator[OpenAIResponse, None]], bool, str]:
284 pass
285
286 async def arequest(
287 self,
288 method,
289 url,
290 params=None,
291 headers=None,
292 files=None,
293 stream: bool = False,
294 request_id: Optional[str] = None,
295 request_timeout: Optional[Union[float, Tuple[float, float]]] = None,
296 ) -> Tuple[Union[OpenAIResponse, AsyncGenerator[OpenAIResponse, None]], bool, str]:
297 async with aiohttp_session() as session:
298 result = await self.arequest_raw(
299 method.lower(),
300 url,
301 session,
302 params=params,
303 supplied_headers=headers,
304 files=files,
305 request_id=request_id,
306 request_timeout=request_timeout,
307 )
308 resp, got_stream = await self._interpret_async_response(result, stream)
309 return resp, got_stream, self.api_key
310
311 def handle_error_response(self, rbody, rcode, resp, rheaders, stream_error=False):
312 try:
313 error_data = resp["error"]
314 except (KeyError, TypeError):
315 raise error.APIError(
316 "Invalid response object from API: %r (HTTP response code "
317 "was %d)" % (rbody, rcode),
318 rbody,
319 rcode,
320 resp,
321 )
322
323 if "internal_message" in error_data:
324 error_data["message"] += "\n\n" + error_data["internal_message"]
325
326 util.log_info(
327 "OpenAI API error received",
328 error_code=error_data.get("code"),
329 error_type=error_data.get("type"),
330 error_message=error_data.get("message"),
331 error_param=error_data.get("param"),
332 stream_error=stream_error,
333 )
334
335 # Rate limits were previously coded as 400's with code 'rate_limit'
336 if rcode == 429:
337 return error.RateLimitError(
338 error_data.get("message"), rbody, rcode, resp, rheaders
339 )
340 elif rcode in [400, 404, 415]:
341 return error.InvalidRequestError(
342 error_data.get("message"),
343 error_data.get("param"),
344 error_data.get("code"),
345 rbody,
346 rcode,
347 resp,
348 rheaders,
349 )
350 elif rcode == 401:
351 return error.AuthenticationError(
352 error_data.get("message"), rbody, rcode, resp, rheaders
353 )
354 elif rcode == 403:
355 return error.PermissionError(
356 error_data.get("message"), rbody, rcode, resp, rheaders
357 )
358 elif rcode == 409:
359 return error.TryAgain(
360 error_data.get("message"), rbody, rcode, resp, rheaders
361 )
362 elif stream_error:
363 # TODO: we will soon attach status codes to stream errors
364 parts = [error_data.get("message"), "(Error occurred while streaming.)"]
365 message = " ".join([p for p in parts if p is not None])
366 return error.APIError(message, rbody, rcode, resp, rheaders)
367 else:
368 return error.APIError(
369 f"{error_data.get('message')} {rbody} {rcode} {resp} {rheaders}",
370 rbody,
371 rcode,
372 resp,
373 rheaders,
374 )
375
376 def request_headers(
377 self, method: str, extra, request_id: Optional[str]
378 ) -> Dict[str, str]:
379 user_agent = "OpenAI/v1 PythonBindings/%s" % (version.VERSION,)
380 if openai.app_info:
381 user_agent += " " + self.format_app_info(openai.app_info)
382
383 uname_without_node = " ".join(
384 v for k, v in platform.uname()._asdict().items() if k != "node"
385 )
386 ua = {
387 "bindings_version": version.VERSION,
388 "httplib": "requests",
389 "lang": "python",
390 "lang_version": platform.python_version(),
391 "platform": platform.platform(),
392 "publisher": "openai",
393 "uname": uname_without_node,
394 }
395 if openai.app_info:
396 ua["application"] = openai.app_info
397
398 headers = {
399 "X-OpenAI-Client-User-Agent": json.dumps(ua),
400 "User-Agent": user_agent,
401 }
402
403 headers.update(util.api_key_to_header(self.api_type, self.api_key))
404
405 if self.organization:
406 headers["OpenAI-Organization"] = self.organization
407
408 if self.api_version is not None and self.api_type == ApiType.OPEN_AI:
409 headers["OpenAI-Version"] = self.api_version
410 if request_id is not None:
411 headers["X-Request-Id"] = request_id
412 if openai.debug:
413 headers["OpenAI-Debug"] = "true"
414 headers.update(extra)
415
416 return headers
417
418 def _validate_headers(
419 self, supplied_headers: Optional[Dict[str, str]]
420 ) -> Dict[str, str]:
421 headers: Dict[str, str] = {}
422 if supplied_headers is None:
423 return headers
424
425 if not isinstance(supplied_headers, dict):
426 raise TypeError("Headers must be a dictionary")
427
428 for k, v in supplied_headers.items():
429 if not isinstance(k, str):
430 raise TypeError("Header keys must be strings")
431 if not isinstance(v, str):
432 raise TypeError("Header values must be strings")
433 headers[k] = v
434
435 # NOTE: It is possible to do more validation of the headers, but a request could always
436 # be made to the API manually with invalid headers, so we need to handle them server side.
437
438 return headers
439
440 def _prepare_request_raw(
441 self,
442 url,
443 supplied_headers,
444 method,
445 params,
446 files,
447 request_id: Optional[str],
448 ) -> Tuple[str, Dict[str, str], Optional[bytes]]:
449 abs_url = "%s%s" % (self.api_base, url)
450 headers = self._validate_headers(supplied_headers)
451
452 data = None
453 if method == "get" or method == "delete":
454 if params:
455 encoded_params = urlencode(
456 [(k, v) for k, v in params.items() if v is not None]
457 )
458 abs_url = _build_api_url(abs_url, encoded_params)
459 elif method in {"post", "put"}:
460 if params and files:
461 raise ValueError("At most one of params and files may be specified.")
462 if params:
463 data = json.dumps(params).encode()
464 headers["Content-Type"] = "application/json"
465 else:
466 raise error.APIConnectionError(
467 "Unrecognized HTTP method %r. This may indicate a bug in the "
468 "OpenAI bindings. Please contact support@openai.com for "
469 "assistance." % (method,)
470 )
471
472 headers = self.request_headers(method, headers, request_id)
473
474 util.log_info("Request to OpenAI API", method=method, path=abs_url)
475 util.log_debug("Post details", data=data, api_version=self.api_version)
476
477 return abs_url, headers, data
478
479 def request_raw(
480 self,
481 method,
482 url,
483 *,
484 params=None,
485 supplied_headers: Optional[Dict[str, str]] = None,
486 files=None,
487 stream: bool = False,
488 request_id: Optional[str] = None,
489 request_timeout: Optional[Union[float, Tuple[float, float]]] = None,
490 ) -> requests.Response:
491 abs_url, headers, data = self._prepare_request_raw(
492 url, supplied_headers, method, params, files, request_id
493 )
494
495 if not hasattr(_thread_context, "session"):
496 _thread_context.session = _make_session()
497 try:
498 result = _thread_context.session.request(
499 method,
500 abs_url,
501 headers=headers,
502 data=data,
503 files=files,
504 stream=stream,
505 timeout=request_timeout if request_timeout else TIMEOUT_SECS,
506 )
507 except requests.exceptions.Timeout as e:
508 raise error.Timeout("Request timed out: {}".format(e)) from e
509 except requests.exceptions.RequestException as e:
510 raise error.APIConnectionError("Error communicating with OpenAI: {}".format(e)) from e
511 util.log_info(
512 "OpenAI API response",
513 path=abs_url,
514 response_code=result.status_code,
515 processing_ms=result.headers.get("OpenAI-Processing-Ms"),
516 request_id=result.headers.get("X-Request-Id"),
517 )
518 # Don't read the whole stream for debug logging unless necessary.
519 if openai.log == "debug":
520 util.log_debug(
521 "API response body", body=result.content, headers=result.headers
522 )
523 return result
524
525 async def arequest_raw(
526 self,
527 method,
528 url,
529 session,
530 *,
531 params=None,
532 supplied_headers: Optional[Dict[str, str]] = None,
533 files=None,
534 request_id: Optional[str] = None,
535 request_timeout: Optional[Union[float, Tuple[float, float]]] = None,
536 ) -> aiohttp.ClientResponse:
537 abs_url, headers, data = self._prepare_request_raw(
538 url, supplied_headers, method, params, files, request_id
539 )
540
541 if isinstance(request_timeout, tuple):
542 timeout = aiohttp.ClientTimeout(
543 connect=request_timeout[0],
544 total=request_timeout[1],
545 )
546 else:
547 timeout = aiohttp.ClientTimeout(
548 total=request_timeout if request_timeout else TIMEOUT_SECS
549 )
550
551 if files:
552 # TODO: Use `aiohttp.MultipartWriter` to create the multipart form data here.
553 # For now we use the private `requests` method that is known to have worked so far.
554 data, content_type = requests.models.RequestEncodingMixin._encode_files( # type: ignore
555 files, data
556 )
557 headers["Content-Type"] = content_type
558 request_kwargs = {
559 "method": method,
560 "url": abs_url,
561 "headers": headers,
562 "data": data,
563 "proxy": _aiohttp_proxies_arg(openai.proxy),
564 "timeout": timeout,
565 }
566 try:
567 result = await session.request(**request_kwargs)
568 util.log_info(
569 "OpenAI API response",
570 path=abs_url,
571 response_code=result.status,
572 processing_ms=result.headers.get("OpenAI-Processing-Ms"),
573 request_id=result.headers.get("X-Request-Id"),
574 )
575 # Don't read the whole stream for debug logging unless necessary.
576 if openai.log == "debug":
577 util.log_debug(
578 "API response body", body=result.content, headers=result.headers
579 )
580 return result
581 except (aiohttp.ServerTimeoutError, asyncio.TimeoutError) as e:
582 raise error.Timeout("Request timed out") from e
583 except aiohttp.ClientError as e:
584 raise error.APIConnectionError("Error communicating with OpenAI") from e
585
586 def _interpret_response(
587 self, result: requests.Response, stream: bool
588 ) -> Tuple[Union[OpenAIResponse, Iterator[OpenAIResponse]], bool]:
589 """Returns the response(s) and a bool indicating whether it is a stream."""
590 if stream and "text/event-stream" in result.headers.get("Content-Type", ""):
591 return (
592 self._interpret_response_line(
593 line, result.status_code, result.headers, stream=True
594 )
595 for line in parse_stream(result.iter_lines())
596 ), True
597 else:
598 return (
599 self._interpret_response_line(
600 result.content, result.status_code, result.headers, stream=False
601 ),
602 False,
603 )
604
605 async def _interpret_async_response(
606 self, result: aiohttp.ClientResponse, stream: bool
607 ) -> Tuple[Union[OpenAIResponse, AsyncGenerator[OpenAIResponse, None]], bool]:
608 """Returns the response(s) and a bool indicating whether it is a stream."""
609 if stream and "text/event-stream" in result.headers.get("Content-Type", ""):
610 return (
611 self._interpret_response_line(
612 line, result.status, result.headers, stream=True
613 )
614 async for line in parse_stream_async(result.content)
615 ), True
616 else:
617 try:
618 await result.read()
619 except aiohttp.ClientError as e:
620 util.log_warn(e, body=result.content)
621 return (
622 self._interpret_response_line(
623 await result.read(), result.status, result.headers, stream=False
624 ),
625 False,
626 )
627
628 def _interpret_response_line(
629 self, rbody, rcode, rheaders, stream: bool
630 ) -> OpenAIResponse:
631 # HTTP 204 response code does not have any content in the body.
632 if rcode == 204:
633 return OpenAIResponse(None, rheaders)
634
635 if rcode == 503:
636 raise error.ServiceUnavailableError(
637 "The server is overloaded or not ready yet.",
638 rbody,
639 rcode,
640 headers=rheaders,
641 )
642 try:
643 if hasattr(rbody, "decode"):
644 rbody = rbody.decode("utf-8")
645 data = json.loads(rbody)
646 except (JSONDecodeError, UnicodeDecodeError):
647 raise error.APIError(
648 f"HTTP code {rcode} from API ({rbody})", rbody, rcode, headers=rheaders
649 )
650 resp = OpenAIResponse(data, rheaders)
651 # In the future, we might add a "status" parameter to errors
652 # to better handle the "error while streaming" case.
653 stream_error = stream and "error" in resp.data
654 if stream_error or not 200 <= rcode < 300:
655 raise self.handle_error_response(
656 rbody, rcode, resp.data, rheaders, stream_error=stream_error
657 )
658 return resp
659
660
661@asynccontextmanager
662async def aiohttp_session() -> AsyncIterator[aiohttp.ClientSession]:
663 user_set_session = openai.aiosession.get()
664 if user_set_session:
665 yield user_set_session
666 else:
667 async with aiohttp.ClientSession() as session:
668 yield session
669