|
1 #!python |
|
2 # -*- Python -*- |
|
3 # |
|
4 # Copyright 2014-2015 Brian Olson |
|
5 # |
|
6 # Licensed under the Apache License, Version 2.0 (the "License"); |
|
7 # you may not use this file except in compliance with the License. |
|
8 # You may obtain a copy of the License at |
|
9 # |
|
10 # http://www.apache.org/licenses/LICENSE-2.0 |
|
11 # |
|
12 # Unless required by applicable law or agreed to in writing, software |
|
13 # distributed under the License is distributed on an "AS IS" BASIS, |
|
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|
15 # See the License for the specific language governing permissions and |
|
16 # limitations under the License. |
|
17 |
|
18 import datetime |
|
19 import re |
|
20 import struct |
|
21 import sys |
|
22 |
|
23 _IS_PY3 = sys.version_info[0] >= 3 |
|
24 |
|
25 if _IS_PY3: |
|
26 from io import BytesIO as StringIO |
|
27 else: |
|
28 try: |
|
29 from cStringIO import StringIO |
|
30 except: |
|
31 from StringIO import StringIO |
|
32 |
|
33 |
|
34 CBOR_TYPE_MASK = 0xE0 # top 3 bits |
|
35 CBOR_INFO_BITS = 0x1F # low 5 bits |
|
36 |
|
37 |
|
38 CBOR_UINT = 0x00 |
|
39 CBOR_NEGINT = 0x20 |
|
40 CBOR_BYTES = 0x40 |
|
41 CBOR_TEXT = 0x60 |
|
42 CBOR_ARRAY = 0x80 |
|
43 CBOR_MAP = 0xA0 |
|
44 CBOR_TAG = 0xC0 |
|
45 CBOR_7 = 0xE0 # float and other types |
|
46 |
|
47 CBOR_UINT8_FOLLOWS = 24 # 0x18 |
|
48 CBOR_UINT16_FOLLOWS = 25 # 0x19 |
|
49 CBOR_UINT32_FOLLOWS = 26 # 0x1a |
|
50 CBOR_UINT64_FOLLOWS = 27 # 0x1b |
|
51 CBOR_VAR_FOLLOWS = 31 # 0x1f |
|
52 |
|
53 CBOR_BREAK = 0xFF |
|
54 |
|
55 CBOR_FALSE = (CBOR_7 | 20) |
|
56 CBOR_TRUE = (CBOR_7 | 21) |
|
57 CBOR_NULL = (CBOR_7 | 22) |
|
58 CBOR_UNDEFINED = (CBOR_7 | 23) # js 'undefined' value |
|
59 |
|
60 CBOR_FLOAT16 = (CBOR_7 | 25) |
|
61 CBOR_FLOAT32 = (CBOR_7 | 26) |
|
62 CBOR_FLOAT64 = (CBOR_7 | 27) |
|
63 |
|
64 CBOR_TAG_DATE_STRING = 0 # RFC3339 |
|
65 CBOR_TAG_DATE_ARRAY = 1 # any number type follows, seconds since 1970-01-01T00:00:00 UTC |
|
66 CBOR_TAG_BIGNUM = 2 # big endian byte string follows |
|
67 CBOR_TAG_NEGBIGNUM = 3 # big endian byte string follows |
|
68 CBOR_TAG_DECIMAL = 4 # [ 10^x exponent, number ] |
|
69 CBOR_TAG_BIGFLOAT = 5 # [ 2^x exponent, number ] |
|
70 CBOR_TAG_BASE64URL = 21 |
|
71 CBOR_TAG_BASE64 = 22 |
|
72 CBOR_TAG_BASE16 = 23 |
|
73 CBOR_TAG_CBOR = 24 # following byte string is embedded CBOR data |
|
74 |
|
75 CBOR_TAG_URI = 32 |
|
76 CBOR_TAG_BASE64URL = 33 |
|
77 CBOR_TAG_BASE64 = 34 |
|
78 CBOR_TAG_REGEX = 35 |
|
79 CBOR_TAG_MIME = 36 # following text is MIME message, headers, separators and all |
|
80 CBOR_TAG_CBOR_FILEHEADER = 55799 # can open a file with 0xd9d9f7 |
|
81 |
|
82 _CBOR_TAG_BIGNUM_BYTES = struct.pack('B', CBOR_TAG | CBOR_TAG_BIGNUM) |
|
83 |
|
84 |
|
85 def dumps_int(val): |
|
86 "return bytes representing int val in CBOR" |
|
87 if val >= 0: |
|
88 # CBOR_UINT is 0, so I'm lazy/efficient about not OR-ing it in. |
|
89 if val <= 23: |
|
90 return struct.pack('B', val) |
|
91 if val <= 0x0ff: |
|
92 return struct.pack('BB', CBOR_UINT8_FOLLOWS, val) |
|
93 if val <= 0x0ffff: |
|
94 return struct.pack('!BH', CBOR_UINT16_FOLLOWS, val) |
|
95 if val <= 0x0ffffffff: |
|
96 return struct.pack('!BI', CBOR_UINT32_FOLLOWS, val) |
|
97 if val <= 0x0ffffffffffffffff: |
|
98 return struct.pack('!BQ', CBOR_UINT64_FOLLOWS, val) |
|
99 outb = _dumps_bignum_to_bytearray(val) |
|
100 return _CBOR_TAG_BIGNUM_BYTES + _encode_type_num(CBOR_BYTES, len(outb)) + outb |
|
101 val = -1 - val |
|
102 return _encode_type_num(CBOR_NEGINT, val) |
|
103 |
|
104 |
|
105 if _IS_PY3: |
|
106 def _dumps_bignum_to_bytearray(val): |
|
107 out = [] |
|
108 while val > 0: |
|
109 out.insert(0, val & 0x0ff) |
|
110 val = val >> 8 |
|
111 return bytes(out) |
|
112 else: |
|
113 def _dumps_bignum_to_bytearray(val): |
|
114 out = [] |
|
115 while val > 0: |
|
116 out.insert(0, chr(val & 0x0ff)) |
|
117 val = val >> 8 |
|
118 return b''.join(out) |
|
119 |
|
120 |
|
121 def dumps_float(val): |
|
122 return struct.pack("!Bd", CBOR_FLOAT64, val) |
|
123 |
|
124 |
|
125 _CBOR_TAG_NEGBIGNUM_BYTES = struct.pack('B', CBOR_TAG | CBOR_TAG_NEGBIGNUM) |
|
126 |
|
127 |
|
128 def _encode_type_num(cbor_type, val): |
|
129 """For some CBOR primary type [0..7] and an auxiliary unsigned number, return CBOR encoded bytes""" |
|
130 assert val >= 0 |
|
131 if val <= 23: |
|
132 return struct.pack('B', cbor_type | val) |
|
133 if val <= 0x0ff: |
|
134 return struct.pack('BB', cbor_type | CBOR_UINT8_FOLLOWS, val) |
|
135 if val <= 0x0ffff: |
|
136 return struct.pack('!BH', cbor_type | CBOR_UINT16_FOLLOWS, val) |
|
137 if val <= 0x0ffffffff: |
|
138 return struct.pack('!BI', cbor_type | CBOR_UINT32_FOLLOWS, val) |
|
139 if (((cbor_type == CBOR_NEGINT) and (val <= 0x07fffffffffffffff)) or |
|
140 ((cbor_type != CBOR_NEGINT) and (val <= 0x0ffffffffffffffff))): |
|
141 return struct.pack('!BQ', cbor_type | CBOR_UINT64_FOLLOWS, val) |
|
142 if cbor_type != CBOR_NEGINT: |
|
143 raise Exception("value too big for CBOR unsigned number: {0!r}".format(val)) |
|
144 outb = _dumps_bignum_to_bytearray(val) |
|
145 return _CBOR_TAG_NEGBIGNUM_BYTES + _encode_type_num(CBOR_BYTES, len(outb)) + outb |
|
146 |
|
147 |
|
148 if _IS_PY3: |
|
149 def _is_unicode(val): |
|
150 return isinstance(val, str) |
|
151 else: |
|
152 def _is_unicode(val): |
|
153 return isinstance(val, unicode) |
|
154 |
|
155 |
|
156 def dumps_string(val, is_text=None, is_bytes=None): |
|
157 if _is_unicode(val): |
|
158 val = val.encode('utf8') |
|
159 is_text = True |
|
160 is_bytes = False |
|
161 if (is_bytes) or not (is_text == True): |
|
162 return _encode_type_num(CBOR_BYTES, len(val)) + val |
|
163 return _encode_type_num(CBOR_TEXT, len(val)) + val |
|
164 |
|
165 |
|
166 def dumps_array(arr, sort_keys=False): |
|
167 head = _encode_type_num(CBOR_ARRAY, len(arr)) |
|
168 parts = [dumps(x, sort_keys=sort_keys) for x in arr] |
|
169 return head + b''.join(parts) |
|
170 |
|
171 |
|
172 if _IS_PY3: |
|
173 def dumps_dict(d, sort_keys=False): |
|
174 head = _encode_type_num(CBOR_MAP, len(d)) |
|
175 parts = [head] |
|
176 if sort_keys: |
|
177 for k in sorted(d.keys()): |
|
178 v = d[k] |
|
179 parts.append(dumps(k, sort_keys=sort_keys)) |
|
180 parts.append(dumps(v, sort_keys=sort_keys)) |
|
181 else: |
|
182 for k,v in d.items(): |
|
183 parts.append(dumps(k, sort_keys=sort_keys)) |
|
184 parts.append(dumps(v, sort_keys=sort_keys)) |
|
185 return b''.join(parts) |
|
186 else: |
|
187 def dumps_dict(d, sort_keys=False): |
|
188 head = _encode_type_num(CBOR_MAP, len(d)) |
|
189 parts = [head] |
|
190 if sort_keys: |
|
191 for k in sorted(d.iterkeys()): |
|
192 v = d[k] |
|
193 parts.append(dumps(k, sort_keys=sort_keys)) |
|
194 parts.append(dumps(v, sort_keys=sort_keys)) |
|
195 else: |
|
196 for k,v in d.iteritems(): |
|
197 parts.append(dumps(k, sort_keys=sort_keys)) |
|
198 parts.append(dumps(v, sort_keys=sort_keys)) |
|
199 return b''.join(parts) |
|
200 |
|
201 |
|
202 def dumps_bool(b): |
|
203 if b: |
|
204 return struct.pack('B', CBOR_TRUE) |
|
205 return struct.pack('B', CBOR_FALSE) |
|
206 |
|
207 |
|
208 def dumps_tag(t, sort_keys=False): |
|
209 return _encode_type_num(CBOR_TAG, t.tag) + dumps(t.value, sort_keys=sort_keys) |
|
210 |
|
211 |
|
212 if _IS_PY3: |
|
213 def _is_stringish(x): |
|
214 return isinstance(x, (str, bytes)) |
|
215 def _is_intish(x): |
|
216 return isinstance(x, int) |
|
217 else: |
|
218 def _is_stringish(x): |
|
219 return isinstance(x, (str, basestring, bytes, unicode)) |
|
220 def _is_intish(x): |
|
221 return isinstance(x, (int, long)) |
|
222 |
|
223 |
|
224 def dumps(ob, sort_keys=False): |
|
225 if ob is None: |
|
226 return struct.pack('B', CBOR_NULL) |
|
227 if isinstance(ob, bool): |
|
228 return dumps_bool(ob) |
|
229 if _is_stringish(ob): |
|
230 return dumps_string(ob) |
|
231 if isinstance(ob, (list, tuple)): |
|
232 return dumps_array(ob, sort_keys=sort_keys) |
|
233 # TODO: accept other enumerables and emit a variable length array |
|
234 if isinstance(ob, dict): |
|
235 return dumps_dict(ob, sort_keys=sort_keys) |
|
236 if isinstance(ob, float): |
|
237 return dumps_float(ob) |
|
238 if _is_intish(ob): |
|
239 return dumps_int(ob) |
|
240 if isinstance(ob, Tag): |
|
241 return dumps_tag(ob, sort_keys=sort_keys) |
|
242 raise Exception("don't know how to cbor serialize object of type %s", type(ob)) |
|
243 |
|
244 |
|
245 # same basic signature as json.dump, but with no options (yet) |
|
246 def dump(obj, fp, sort_keys=False): |
|
247 """ |
|
248 obj: Python object to serialize |
|
249 fp: file-like object capable of .write(bytes) |
|
250 """ |
|
251 # this is kinda lame, but probably not inefficient for non-huge objects |
|
252 # TODO: .write() to fp as we go as each inner object is serialized |
|
253 blob = dumps(obj, sort_keys=sort_keys) |
|
254 fp.write(blob) |
|
255 |
|
256 |
|
257 class Tag(object): |
|
258 def __init__(self, tag=None, value=None): |
|
259 self.tag = tag |
|
260 self.value = value |
|
261 |
|
262 def __repr__(self): |
|
263 return "Tag({0!r}, {1!r})".format(self.tag, self.value) |
|
264 |
|
265 def __eq__(self, other): |
|
266 if not isinstance(other, Tag): |
|
267 return False |
|
268 return (self.tag == other.tag) and (self.value == other.value) |
|
269 |
|
270 |
|
271 def loads(data): |
|
272 """ |
|
273 Parse CBOR bytes and return Python objects. |
|
274 """ |
|
275 if data is None: |
|
276 raise ValueError("got None for buffer to decode in loads") |
|
277 fp = StringIO(data) |
|
278 return _loads(fp)[0] |
|
279 |
|
280 |
|
281 def load(fp): |
|
282 """ |
|
283 Parse and return object from fp, a file-like object supporting .read(n) |
|
284 """ |
|
285 return _loads(fp)[0] |
|
286 |
|
287 |
|
288 _MAX_DEPTH = 100 |
|
289 |
|
290 |
|
291 def _tag_aux(fp, tb): |
|
292 bytes_read = 1 |
|
293 tag = tb & CBOR_TYPE_MASK |
|
294 tag_aux = tb & CBOR_INFO_BITS |
|
295 if tag_aux <= 23: |
|
296 aux = tag_aux |
|
297 elif tag_aux == CBOR_UINT8_FOLLOWS: |
|
298 data = fp.read(1) |
|
299 aux = struct.unpack_from("!B", data, 0)[0] |
|
300 bytes_read += 1 |
|
301 elif tag_aux == CBOR_UINT16_FOLLOWS: |
|
302 data = fp.read(2) |
|
303 aux = struct.unpack_from("!H", data, 0)[0] |
|
304 bytes_read += 2 |
|
305 elif tag_aux == CBOR_UINT32_FOLLOWS: |
|
306 data = fp.read(4) |
|
307 aux = struct.unpack_from("!I", data, 0)[0] |
|
308 bytes_read += 4 |
|
309 elif tag_aux == CBOR_UINT64_FOLLOWS: |
|
310 data = fp.read(8) |
|
311 aux = struct.unpack_from("!Q", data, 0)[0] |
|
312 bytes_read += 8 |
|
313 else: |
|
314 assert tag_aux == CBOR_VAR_FOLLOWS, "bogus tag {0:02x}".format(tb) |
|
315 aux = None |
|
316 |
|
317 return tag, tag_aux, aux, bytes_read |
|
318 |
|
319 |
|
320 def _read_byte(fp): |
|
321 tb = fp.read(1) |
|
322 if len(tb) == 0: |
|
323 # I guess not all file-like objects do this |
|
324 raise EOFError() |
|
325 return ord(tb) |
|
326 |
|
327 |
|
328 def _loads_var_array(fp, limit, depth, returntags, bytes_read): |
|
329 ob = [] |
|
330 tb = _read_byte(fp) |
|
331 while tb != CBOR_BREAK: |
|
332 (subob, sub_len) = _loads_tb(fp, tb, limit, depth, returntags) |
|
333 bytes_read += 1 + sub_len |
|
334 ob.append(subob) |
|
335 tb = _read_byte(fp) |
|
336 return (ob, bytes_read + 1) |
|
337 |
|
338 |
|
339 def _loads_var_map(fp, limit, depth, returntags, bytes_read): |
|
340 ob = {} |
|
341 tb = _read_byte(fp) |
|
342 while tb != CBOR_BREAK: |
|
343 (subk, sub_len) = _loads_tb(fp, tb, limit, depth, returntags) |
|
344 bytes_read += 1 + sub_len |
|
345 (subv, sub_len) = _loads(fp, limit, depth, returntags) |
|
346 bytes_read += sub_len |
|
347 ob[subk] = subv |
|
348 tb = _read_byte(fp) |
|
349 return (ob, bytes_read + 1) |
|
350 |
|
351 |
|
352 if _IS_PY3: |
|
353 def _loads_array(fp, limit, depth, returntags, aux, bytes_read): |
|
354 ob = [] |
|
355 for i in range(aux): |
|
356 subob, subpos = _loads(fp) |
|
357 bytes_read += subpos |
|
358 ob.append(subob) |
|
359 return ob, bytes_read |
|
360 def _loads_map(fp, limit, depth, returntags, aux, bytes_read): |
|
361 ob = {} |
|
362 for i in range(aux): |
|
363 subk, subpos = _loads(fp) |
|
364 bytes_read += subpos |
|
365 subv, subpos = _loads(fp) |
|
366 bytes_read += subpos |
|
367 ob[subk] = subv |
|
368 return ob, bytes_read |
|
369 else: |
|
370 def _loads_array(fp, limit, depth, returntags, aux, bytes_read): |
|
371 ob = [] |
|
372 for i in xrange(aux): |
|
373 subob, subpos = _loads(fp) |
|
374 bytes_read += subpos |
|
375 ob.append(subob) |
|
376 return ob, bytes_read |
|
377 def _loads_map(fp, limit, depth, returntags, aux, bytes_read): |
|
378 ob = {} |
|
379 for i in xrange(aux): |
|
380 subk, subpos = _loads(fp) |
|
381 bytes_read += subpos |
|
382 subv, subpos = _loads(fp) |
|
383 bytes_read += subpos |
|
384 ob[subk] = subv |
|
385 return ob, bytes_read |
|
386 |
|
387 def _loads(fp, limit=None, depth=0, returntags=False): |
|
388 "return (object, bytes read)" |
|
389 if depth > _MAX_DEPTH: |
|
390 raise Exception("hit CBOR loads recursion depth limit") |
|
391 |
|
392 tb = _read_byte(fp) |
|
393 |
|
394 return _loads_tb(fp, tb, limit, depth, returntags) |
|
395 |
|
396 def _loads_tb(fp, tb, limit=None, depth=0, returntags=False): |
|
397 # Some special cases of CBOR_7 best handled by special struct.unpack logic here |
|
398 if tb == CBOR_FLOAT16: |
|
399 data = fp.read(2) |
|
400 hibyte, lowbyte = struct.unpack_from("BB", data, 0) |
|
401 exp = (hibyte >> 2) & 0x1F |
|
402 mant = ((hibyte & 0x03) << 8) | lowbyte |
|
403 if exp == 0: |
|
404 val = mant * (2.0 ** -24) |
|
405 elif exp == 31: |
|
406 if mant == 0: |
|
407 val = float('Inf') |
|
408 else: |
|
409 val = float('NaN') |
|
410 else: |
|
411 val = (mant + 1024.0) * (2 ** (exp - 25)) |
|
412 if hibyte & 0x80: |
|
413 val = -1.0 * val |
|
414 return (val, 3) |
|
415 elif tb == CBOR_FLOAT32: |
|
416 data = fp.read(4) |
|
417 pf = struct.unpack_from("!f", data, 0) |
|
418 return (pf[0], 5) |
|
419 elif tb == CBOR_FLOAT64: |
|
420 data = fp.read(8) |
|
421 pf = struct.unpack_from("!d", data, 0) |
|
422 return (pf[0], 9) |
|
423 |
|
424 tag, tag_aux, aux, bytes_read = _tag_aux(fp, tb) |
|
425 |
|
426 if tag == CBOR_UINT: |
|
427 return (aux, bytes_read) |
|
428 elif tag == CBOR_NEGINT: |
|
429 return (-1 - aux, bytes_read) |
|
430 elif tag == CBOR_BYTES: |
|
431 ob, subpos = loads_bytes(fp, aux) |
|
432 return (ob, bytes_read + subpos) |
|
433 elif tag == CBOR_TEXT: |
|
434 raw, subpos = loads_bytes(fp, aux, btag=CBOR_TEXT) |
|
435 ob = raw.decode('utf8') |
|
436 return (ob, bytes_read + subpos) |
|
437 elif tag == CBOR_ARRAY: |
|
438 if aux is None: |
|
439 return _loads_var_array(fp, limit, depth, returntags, bytes_read) |
|
440 return _loads_array(fp, limit, depth, returntags, aux, bytes_read) |
|
441 elif tag == CBOR_MAP: |
|
442 if aux is None: |
|
443 return _loads_var_map(fp, limit, depth, returntags, bytes_read) |
|
444 return _loads_map(fp, limit, depth, returntags, aux, bytes_read) |
|
445 elif tag == CBOR_TAG: |
|
446 ob, subpos = _loads(fp) |
|
447 bytes_read += subpos |
|
448 if returntags: |
|
449 # Don't interpret the tag, return it and the tagged object. |
|
450 ob = Tag(aux, ob) |
|
451 else: |
|
452 # attempt to interpet the tag and the value into a Python object. |
|
453 ob = tagify(ob, aux) |
|
454 return ob, bytes_read |
|
455 elif tag == CBOR_7: |
|
456 if tb == CBOR_TRUE: |
|
457 return (True, bytes_read) |
|
458 if tb == CBOR_FALSE: |
|
459 return (False, bytes_read) |
|
460 if tb == CBOR_NULL: |
|
461 return (None, bytes_read) |
|
462 if tb == CBOR_UNDEFINED: |
|
463 return (None, bytes_read) |
|
464 raise ValueError("unknown cbor tag 7 byte: {:02x}".format(tb)) |
|
465 |
|
466 |
|
467 def loads_bytes(fp, aux, btag=CBOR_BYTES): |
|
468 # TODO: limit to some maximum number of chunks and some maximum total bytes |
|
469 if aux is not None: |
|
470 # simple case |
|
471 ob = fp.read(aux) |
|
472 return (ob, aux) |
|
473 # read chunks of bytes |
|
474 chunklist = [] |
|
475 total_bytes_read = 0 |
|
476 while True: |
|
477 tb = fp.read(1)[0] |
|
478 if not _IS_PY3: |
|
479 tb = ord(tb) |
|
480 if tb == CBOR_BREAK: |
|
481 total_bytes_read += 1 |
|
482 break |
|
483 tag, tag_aux, aux, bytes_read = _tag_aux(fp, tb) |
|
484 assert tag == btag, 'variable length value contains unexpected component' |
|
485 ob = fp.read(aux) |
|
486 chunklist.append(ob) |
|
487 total_bytes_read += bytes_read + aux |
|
488 return (b''.join(chunklist), total_bytes_read) |
|
489 |
|
490 |
|
491 if _IS_PY3: |
|
492 def _bytes_to_biguint(bs): |
|
493 out = 0 |
|
494 for ch in bs: |
|
495 out = out << 8 |
|
496 out = out | ch |
|
497 return out |
|
498 else: |
|
499 def _bytes_to_biguint(bs): |
|
500 out = 0 |
|
501 for ch in bs: |
|
502 out = out << 8 |
|
503 out = out | ord(ch) |
|
504 return out |
|
505 |
|
506 |
|
507 def tagify(ob, aux): |
|
508 # TODO: make this extensible? |
|
509 # cbor.register_tag_handler(tagnumber, tag_handler) |
|
510 # where tag_handler takes (tagnumber, tagged_object) |
|
511 if aux == CBOR_TAG_DATE_STRING: |
|
512 # TODO: parse RFC3339 date string |
|
513 pass |
|
514 if aux == CBOR_TAG_DATE_ARRAY: |
|
515 return datetime.datetime.utcfromtimestamp(ob) |
|
516 if aux == CBOR_TAG_BIGNUM: |
|
517 return _bytes_to_biguint(ob) |
|
518 if aux == CBOR_TAG_NEGBIGNUM: |
|
519 return -1 - _bytes_to_biguint(ob) |
|
520 if aux == CBOR_TAG_REGEX: |
|
521 # Is this actually a good idea? Should we just return the tag and the raw value to the user somehow? |
|
522 return re.compile(ob) |
|
523 return Tag(aux, ob) |