openai/openai-python

Public

mirrored from https://github.com/openai/openai-pythonAvailable

CodeCommitsIssuesPull requestsActionsInsightsSecurity
v0.8.0

Branches

Tags

  • No tags available.
0Branches0Tags
Go to file
Add file
Code

Clone

HTTPS

Download ZIP

openai/gzip_stream.py

83lines · modecode

1# Vendored from https://github.com/leenr/gzip-stream
2import gzip
3import io
4
5
6class GZIPCompressedStream(io.RawIOBase):
7 def __init__(self, stream, compression_level):
8 assert 1 <= compression_level <= 9
9
10 self._compression_level = compression_level
11 self._stream = stream
12
13 self._compressed_stream = io.BytesIO()
14 self._compressor = gzip.GzipFile(
15 mode="wb", fileobj=self._compressed_stream, compresslevel=compression_level
16 )
17
18 # because of the GZIP header written by `GzipFile.__init__`:
19 self._compressed_stream.seek(0)
20
21 @property
22 def compression_level(self):
23 return self._compression_level
24
25 @property
26 def stream(self):
27 return self._stream
28
29 def readable(self):
30 return True
31
32 def _read_compressed_into(self, b):
33 buf = self._compressed_stream.read(len(b))
34 b[: len(buf)] = buf
35 return len(buf)
36
37 def readinto(self, b):
38 b = memoryview(b)
39
40 offset = 0
41 size = len(b)
42 while offset < size:
43 offset += self._read_compressed_into(b[offset:])
44 if offset < size:
45 # self._compressed_buffer now empty
46 if self._compressor.closed:
47 # nothing to compress anymore
48 break
49 # compress next bytes
50 self._read_n_compress(size)
51
52 return offset
53
54 def _read_n_compress(self, size):
55 assert size > 0
56
57 data = self._stream.read(size)
58
59 # rewind buffer to the start to free up memory
60 # (because anything currently in the buffer should be already
61 # streamed off the object)
62 self._compressed_stream.seek(0)
63 self._compressed_stream.truncate(0)
64
65 if data:
66 self._compressor.write(data)
67 else:
68 # this will write final data (will flush zlib with Z_FINISH)
69 self._compressor.close()
70
71 # rewind to the buffer start
72 self._compressed_stream.seek(0)
73
74 def __repr__(self):
75 return (
76 "{self.__class__.__name__}("
77 "{self.stream!r}, "
78 "compression_level={self.compression_level!r}"
79 ")"
80 ).format(self=self)
81
82
83__all__ = ("GZIPCompressedStream",)
84