-
Notifications
You must be signed in to change notification settings - Fork 4
/
Zlib.py
executable file
·156 lines (133 loc) · 5.52 KB
/
Zlib.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
import sys
import ctypes as C
import os
from ctypes import util
from Peach.transformer import Transformer
if sys.platform == "linux2" or sys.platform == "darwin":
_zlib = C.cdll.LoadLibrary(util.find_library('libz'))
elif sys.platform == "win32":
_zlib = C.cdll.LoadLibrary(os.path.join(os.path.dirname(__file__), "zlib1.dll"))
else:
raise NotImplementedError
class _z_stream(C.Structure):
_fields_ = [
("next_in", C.POINTER(C.c_ubyte)),
("avail_in", C.c_uint),
("total_in", C.c_ulong),
("next_out", C.POINTER(C.c_ubyte)),
("avail_out", C.c_uint),
("total_out", C.c_ulong),
("msg", C.c_char_p),
("state", C.c_void_p),
("zalloc", C.c_void_p),
("zfree", C.c_void_p),
("opaque", C.c_void_p),
("data_type", C.c_int),
("adler", C.c_ulong),
("reserved", C.c_ulong),
]
ZLIB_VERSION = C.c_char_p("1.2.3")
Z_NULL = 0x00
Z_OK = 0x00
Z_STREAM_END = 0x01
Z_NEED_DICT = 0x02
Z_BUF_ERR = -0x05
Z_NO_FLUSH = 0x00
Z_SYNC_FLUSH = 0x02
Z_FINISH = 0x04
CHUNK = 1024 * 128
class Compressor(object):
def __init__(self, level=-1, dictionary=None):
self.level = level
self.st = _z_stream()
err = _zlib.deflateInit_(C.byref(self.st), self.level, ZLIB_VERSION,
C.sizeof(self.st))
assert err == Z_OK, err
if dictionary:
err = _zlib.deflateSetDictionary(
C.byref(self.st),
C.cast(C.c_char_p(dictionary), C.POINTER(C.c_ubyte)),
len(dictionary)
)
assert err == Z_OK, err
def __call__(self, input):
outbuf = C.create_string_buffer(CHUNK)
self.st.avail_in = len(input)
self.st.next_in = C.cast(C.c_char_p(input), C.POINTER(C.c_ubyte))
self.st.next_out = C.cast(outbuf, C.POINTER(C.c_ubyte))
self.st.avail_out = CHUNK
err = _zlib.deflate(C.byref(self.st), Z_SYNC_FLUSH)
if err in [Z_OK, Z_STREAM_END]:
x = outbuf[:CHUNK - self.st.avail_out]
return x
else:
raise AssertionError(err)
def __del__(self):
err = _zlib.deflateEnd(C.byref(self.st))
class Decompressor(object):
def __init__(self, dictionary=None):
self.dictionary = dictionary
self.st = _z_stream()
err = _zlib.inflateInit2_(C.byref(self.st), 15, ZLIB_VERSION,
C.sizeof(self.st))
assert err == Z_OK, err
def __call__(self, input):
outbuf = C.create_string_buffer(CHUNK)
self.st.avail_in = len(input)
self.st.next_in = C.cast(C.c_char_p(input), C.POINTER(C.c_ubyte))
self.st.avail_out = CHUNK
self.st.next_out = C.cast(outbuf, C.POINTER(C.c_ubyte))
err = _zlib.inflate(C.byref(self.st), Z_SYNC_FLUSH)
if err == Z_NEED_DICT:
assert self.dictionary, "no dictionary provided"
dict_id = _zlib.adler32(0,
C.cast(C.c_char_p(self.dictionary), C.POINTER(C.c_ubyte)),
len(self.dictionary))
err = _zlib.inflateSetDictionary(
C.byref(self.st),
C.cast(C.c_char_p(self.dictionary), C.POINTER(C.c_ubyte)),
len(self.dictionary)
)
assert err == Z_OK, err
err = _zlib.inflate(C.byref(self.st), Z_SYNC_FLUSH)
if err in [Z_OK, Z_STREAM_END]:
return outbuf[:CHUNK - self.st.avail_out]
else:
raise AssertionError(err)
def __del__(self):
err = _zlib.inflateEnd(C.byref(self.st))
class DictZLib(Transformer):
def __init__(self, level=-1, dicto=None):
Transformer.__init__(self)
self._level = level
self._dicto = dicto
def realEncode(self, data):
_compress = Compressor(self._level, self._dicto)
return _compress(data)
def realDecode(self, data):
_decompress = Decompressor(self._dicto)
return _decompress(data)
class SPDYZLib(DictZLib):
def __init__(self):
_dicto = \
"optionsgetheadpostputdeletetraceacceptaccept-charsetaccept-encod"\
"ingaccept-languageauthorizationexpectfromhostif-modified-sinceif"\
"-matchif-none-matchif-rangeif-unmodifiedsincemax-forwardsproxy-a"\
"uthorizationrangerefererteuser-agent1001012002012022032042052063"\
"0030130230330430530630740040140240340440540640740840941041141241"\
"3414415416417500501502503504505accept-rangesageetaglocationproxy"\
"-authenticatepublicretry-afterservervarywarningwww-authenticatea"\
"llowcontent-basecontent-encodingcache-controlconnectiondatetrail"\
"ertransfer-encodingupgradeviawarningcontent-languagecontent-leng"\
"thcontent-locationcontent-md5content-rangecontent-typeetagexpire"\
"slast-modifiedset-cookieMondayTuesdayWednesdayThursdayFridaySatu"\
"rdaySundayJanFebMarAprMayJunJulAugSepOctNovDecchunkedtext/htmlim"\
"age/pngimage/jpgimage/gifapplication/xmlapplication/xhtmltext/pl"\
"ainpublicmax-agecharset=iso-8859-1utf-8gzipdeflateHTTP/1.1status"\
"versionurl\0"
DictZLib.__init__(self, dicto=_dicto)
if __name__ == "__main__":
print(SPDYZLib().realDecode(DictZLib().realEncode('big ba\x45ng')))