--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/hgext3rd/evolve/thirdparty/cbor.py Fri Jan 12 19:23:29 2018 +0530
@@ -0,0 +1,523 @@
+#!python
+# -*- Python -*-
+#
+# Copyright 2014-2015 Brian Olson
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import datetime
+import re
+import struct
+import sys
+
+_IS_PY3 = sys.version_info[0] >= 3
+
+if _IS_PY3:
+ from io import BytesIO as StringIO
+else:
+ try:
+ from cStringIO import StringIO
+ except:
+ from StringIO import StringIO
+
+
+CBOR_TYPE_MASK = 0xE0 # top 3 bits
+CBOR_INFO_BITS = 0x1F # low 5 bits
+
+
+CBOR_UINT = 0x00
+CBOR_NEGINT = 0x20
+CBOR_BYTES = 0x40
+CBOR_TEXT = 0x60
+CBOR_ARRAY = 0x80
+CBOR_MAP = 0xA0
+CBOR_TAG = 0xC0
+CBOR_7 = 0xE0 # float and other types
+
+CBOR_UINT8_FOLLOWS = 24 # 0x18
+CBOR_UINT16_FOLLOWS = 25 # 0x19
+CBOR_UINT32_FOLLOWS = 26 # 0x1a
+CBOR_UINT64_FOLLOWS = 27 # 0x1b
+CBOR_VAR_FOLLOWS = 31 # 0x1f
+
+CBOR_BREAK = 0xFF
+
+CBOR_FALSE = (CBOR_7 | 20)
+CBOR_TRUE = (CBOR_7 | 21)
+CBOR_NULL = (CBOR_7 | 22)
+CBOR_UNDEFINED = (CBOR_7 | 23) # js 'undefined' value
+
+CBOR_FLOAT16 = (CBOR_7 | 25)
+CBOR_FLOAT32 = (CBOR_7 | 26)
+CBOR_FLOAT64 = (CBOR_7 | 27)
+
+CBOR_TAG_DATE_STRING = 0 # RFC3339
+CBOR_TAG_DATE_ARRAY = 1 # any number type follows, seconds since 1970-01-01T00:00:00 UTC
+CBOR_TAG_BIGNUM = 2 # big endian byte string follows
+CBOR_TAG_NEGBIGNUM = 3 # big endian byte string follows
+CBOR_TAG_DECIMAL = 4 # [ 10^x exponent, number ]
+CBOR_TAG_BIGFLOAT = 5 # [ 2^x exponent, number ]
+CBOR_TAG_BASE64URL = 21
+CBOR_TAG_BASE64 = 22
+CBOR_TAG_BASE16 = 23
+CBOR_TAG_CBOR = 24 # following byte string is embedded CBOR data
+
+CBOR_TAG_URI = 32
+CBOR_TAG_BASE64URL = 33
+CBOR_TAG_BASE64 = 34
+CBOR_TAG_REGEX = 35
+CBOR_TAG_MIME = 36 # following text is MIME message, headers, separators and all
+CBOR_TAG_CBOR_FILEHEADER = 55799 # can open a file with 0xd9d9f7
+
+_CBOR_TAG_BIGNUM_BYTES = struct.pack('B', CBOR_TAG | CBOR_TAG_BIGNUM)
+
+
+def dumps_int(val):
+ "return bytes representing int val in CBOR"
+ if val >= 0:
+ # CBOR_UINT is 0, so I'm lazy/efficient about not OR-ing it in.
+ if val <= 23:
+ return struct.pack('B', val)
+ if val <= 0x0ff:
+ return struct.pack('BB', CBOR_UINT8_FOLLOWS, val)
+ if val <= 0x0ffff:
+ return struct.pack('!BH', CBOR_UINT16_FOLLOWS, val)
+ if val <= 0x0ffffffff:
+ return struct.pack('!BI', CBOR_UINT32_FOLLOWS, val)
+ if val <= 0x0ffffffffffffffff:
+ return struct.pack('!BQ', CBOR_UINT64_FOLLOWS, val)
+ outb = _dumps_bignum_to_bytearray(val)
+ return _CBOR_TAG_BIGNUM_BYTES + _encode_type_num(CBOR_BYTES, len(outb)) + outb
+ val = -1 - val
+ return _encode_type_num(CBOR_NEGINT, val)
+
+
+if _IS_PY3:
+ def _dumps_bignum_to_bytearray(val):
+ out = []
+ while val > 0:
+ out.insert(0, val & 0x0ff)
+ val = val >> 8
+ return bytes(out)
+else:
+ def _dumps_bignum_to_bytearray(val):
+ out = []
+ while val > 0:
+ out.insert(0, chr(val & 0x0ff))
+ val = val >> 8
+ return b''.join(out)
+
+
+def dumps_float(val):
+ return struct.pack("!Bd", CBOR_FLOAT64, val)
+
+
+_CBOR_TAG_NEGBIGNUM_BYTES = struct.pack('B', CBOR_TAG | CBOR_TAG_NEGBIGNUM)
+
+
+def _encode_type_num(cbor_type, val):
+ """For some CBOR primary type [0..7] and an auxiliary unsigned number, return CBOR encoded bytes"""
+ assert val >= 0
+ if val <= 23:
+ return struct.pack('B', cbor_type | val)
+ if val <= 0x0ff:
+ return struct.pack('BB', cbor_type | CBOR_UINT8_FOLLOWS, val)
+ if val <= 0x0ffff:
+ return struct.pack('!BH', cbor_type | CBOR_UINT16_FOLLOWS, val)
+ if val <= 0x0ffffffff:
+ return struct.pack('!BI', cbor_type | CBOR_UINT32_FOLLOWS, val)
+ if (((cbor_type == CBOR_NEGINT) and (val <= 0x07fffffffffffffff)) or
+ ((cbor_type != CBOR_NEGINT) and (val <= 0x0ffffffffffffffff))):
+ return struct.pack('!BQ', cbor_type | CBOR_UINT64_FOLLOWS, val)
+ if cbor_type != CBOR_NEGINT:
+ raise Exception("value too big for CBOR unsigned number: {0!r}".format(val))
+ outb = _dumps_bignum_to_bytearray(val)
+ return _CBOR_TAG_NEGBIGNUM_BYTES + _encode_type_num(CBOR_BYTES, len(outb)) + outb
+
+
+if _IS_PY3:
+ def _is_unicode(val):
+ return isinstance(val, str)
+else:
+ def _is_unicode(val):
+ return isinstance(val, unicode)
+
+
+def dumps_string(val, is_text=None, is_bytes=None):
+ if _is_unicode(val):
+ val = val.encode('utf8')
+ is_text = True
+ is_bytes = False
+ if (is_bytes) or not (is_text == True):
+ return _encode_type_num(CBOR_BYTES, len(val)) + val
+ return _encode_type_num(CBOR_TEXT, len(val)) + val
+
+
+def dumps_array(arr, sort_keys=False):
+ head = _encode_type_num(CBOR_ARRAY, len(arr))
+ parts = [dumps(x, sort_keys=sort_keys) for x in arr]
+ return head + b''.join(parts)
+
+
+if _IS_PY3:
+ def dumps_dict(d, sort_keys=False):
+ head = _encode_type_num(CBOR_MAP, len(d))
+ parts = [head]
+ if sort_keys:
+ for k in sorted(d.keys()):
+ v = d[k]
+ parts.append(dumps(k, sort_keys=sort_keys))
+ parts.append(dumps(v, sort_keys=sort_keys))
+ else:
+ for k,v in d.items():
+ parts.append(dumps(k, sort_keys=sort_keys))
+ parts.append(dumps(v, sort_keys=sort_keys))
+ return b''.join(parts)
+else:
+ def dumps_dict(d, sort_keys=False):
+ head = _encode_type_num(CBOR_MAP, len(d))
+ parts = [head]
+ if sort_keys:
+ for k in sorted(d.iterkeys()):
+ v = d[k]
+ parts.append(dumps(k, sort_keys=sort_keys))
+ parts.append(dumps(v, sort_keys=sort_keys))
+ else:
+ for k,v in d.iteritems():
+ parts.append(dumps(k, sort_keys=sort_keys))
+ parts.append(dumps(v, sort_keys=sort_keys))
+ return b''.join(parts)
+
+
+def dumps_bool(b):
+ if b:
+ return struct.pack('B', CBOR_TRUE)
+ return struct.pack('B', CBOR_FALSE)
+
+
+def dumps_tag(t, sort_keys=False):
+ return _encode_type_num(CBOR_TAG, t.tag) + dumps(t.value, sort_keys=sort_keys)
+
+
+if _IS_PY3:
+ def _is_stringish(x):
+ return isinstance(x, (str, bytes))
+ def _is_intish(x):
+ return isinstance(x, int)
+else:
+ def _is_stringish(x):
+ return isinstance(x, (str, basestring, bytes, unicode))
+ def _is_intish(x):
+ return isinstance(x, (int, long))
+
+
+def dumps(ob, sort_keys=False):
+ if ob is None:
+ return struct.pack('B', CBOR_NULL)
+ if isinstance(ob, bool):
+ return dumps_bool(ob)
+ if _is_stringish(ob):
+ return dumps_string(ob)
+ if isinstance(ob, (list, tuple)):
+ return dumps_array(ob, sort_keys=sort_keys)
+ # TODO: accept other enumerables and emit a variable length array
+ if isinstance(ob, dict):
+ return dumps_dict(ob, sort_keys=sort_keys)
+ if isinstance(ob, float):
+ return dumps_float(ob)
+ if _is_intish(ob):
+ return dumps_int(ob)
+ if isinstance(ob, Tag):
+ return dumps_tag(ob, sort_keys=sort_keys)
+ raise Exception("don't know how to cbor serialize object of type %s", type(ob))
+
+
+# same basic signature as json.dump, but with no options (yet)
+def dump(obj, fp, sort_keys=False):
+ """
+ obj: Python object to serialize
+ fp: file-like object capable of .write(bytes)
+ """
+ # this is kinda lame, but probably not inefficient for non-huge objects
+ # TODO: .write() to fp as we go as each inner object is serialized
+ blob = dumps(obj, sort_keys=sort_keys)
+ fp.write(blob)
+
+
+class Tag(object):
+ def __init__(self, tag=None, value=None):
+ self.tag = tag
+ self.value = value
+
+ def __repr__(self):
+ return "Tag({0!r}, {1!r})".format(self.tag, self.value)
+
+ def __eq__(self, other):
+ if not isinstance(other, Tag):
+ return False
+ return (self.tag == other.tag) and (self.value == other.value)
+
+
+def loads(data):
+ """
+ Parse CBOR bytes and return Python objects.
+ """
+ if data is None:
+ raise ValueError("got None for buffer to decode in loads")
+ fp = StringIO(data)
+ return _loads(fp)[0]
+
+
+def load(fp):
+ """
+ Parse and return object from fp, a file-like object supporting .read(n)
+ """
+ return _loads(fp)[0]
+
+
+_MAX_DEPTH = 100
+
+
+def _tag_aux(fp, tb):
+ bytes_read = 1
+ tag = tb & CBOR_TYPE_MASK
+ tag_aux = tb & CBOR_INFO_BITS
+ if tag_aux <= 23:
+ aux = tag_aux
+ elif tag_aux == CBOR_UINT8_FOLLOWS:
+ data = fp.read(1)
+ aux = struct.unpack_from("!B", data, 0)[0]
+ bytes_read += 1
+ elif tag_aux == CBOR_UINT16_FOLLOWS:
+ data = fp.read(2)
+ aux = struct.unpack_from("!H", data, 0)[0]
+ bytes_read += 2
+ elif tag_aux == CBOR_UINT32_FOLLOWS:
+ data = fp.read(4)
+ aux = struct.unpack_from("!I", data, 0)[0]
+ bytes_read += 4
+ elif tag_aux == CBOR_UINT64_FOLLOWS:
+ data = fp.read(8)
+ aux = struct.unpack_from("!Q", data, 0)[0]
+ bytes_read += 8
+ else:
+ assert tag_aux == CBOR_VAR_FOLLOWS, "bogus tag {0:02x}".format(tb)
+ aux = None
+
+ return tag, tag_aux, aux, bytes_read
+
+
+def _read_byte(fp):
+ tb = fp.read(1)
+ if len(tb) == 0:
+ # I guess not all file-like objects do this
+ raise EOFError()
+ return ord(tb)
+
+
+def _loads_var_array(fp, limit, depth, returntags, bytes_read):
+ ob = []
+ tb = _read_byte(fp)
+ while tb != CBOR_BREAK:
+ (subob, sub_len) = _loads_tb(fp, tb, limit, depth, returntags)
+ bytes_read += 1 + sub_len
+ ob.append(subob)
+ tb = _read_byte(fp)
+ return (ob, bytes_read + 1)
+
+
+def _loads_var_map(fp, limit, depth, returntags, bytes_read):
+ ob = {}
+ tb = _read_byte(fp)
+ while tb != CBOR_BREAK:
+ (subk, sub_len) = _loads_tb(fp, tb, limit, depth, returntags)
+ bytes_read += 1 + sub_len
+ (subv, sub_len) = _loads(fp, limit, depth, returntags)
+ bytes_read += sub_len
+ ob[subk] = subv
+ tb = _read_byte(fp)
+ return (ob, bytes_read + 1)
+
+
+if _IS_PY3:
+ def _loads_array(fp, limit, depth, returntags, aux, bytes_read):
+ ob = []
+ for i in range(aux):
+ subob, subpos = _loads(fp)
+ bytes_read += subpos
+ ob.append(subob)
+ return ob, bytes_read
+ def _loads_map(fp, limit, depth, returntags, aux, bytes_read):
+ ob = {}
+ for i in range(aux):
+ subk, subpos = _loads(fp)
+ bytes_read += subpos
+ subv, subpos = _loads(fp)
+ bytes_read += subpos
+ ob[subk] = subv
+ return ob, bytes_read
+else:
+ def _loads_array(fp, limit, depth, returntags, aux, bytes_read):
+ ob = []
+ for i in xrange(aux):
+ subob, subpos = _loads(fp)
+ bytes_read += subpos
+ ob.append(subob)
+ return ob, bytes_read
+ def _loads_map(fp, limit, depth, returntags, aux, bytes_read):
+ ob = {}
+ for i in xrange(aux):
+ subk, subpos = _loads(fp)
+ bytes_read += subpos
+ subv, subpos = _loads(fp)
+ bytes_read += subpos
+ ob[subk] = subv
+ return ob, bytes_read
+
+def _loads(fp, limit=None, depth=0, returntags=False):
+ "return (object, bytes read)"
+ if depth > _MAX_DEPTH:
+ raise Exception("hit CBOR loads recursion depth limit")
+
+ tb = _read_byte(fp)
+
+ return _loads_tb(fp, tb, limit, depth, returntags)
+
+def _loads_tb(fp, tb, limit=None, depth=0, returntags=False):
+ # Some special cases of CBOR_7 best handled by special struct.unpack logic here
+ if tb == CBOR_FLOAT16:
+ data = fp.read(2)
+ hibyte, lowbyte = struct.unpack_from("BB", data, 0)
+ exp = (hibyte >> 2) & 0x1F
+ mant = ((hibyte & 0x03) << 8) | lowbyte
+ if exp == 0:
+ val = mant * (2.0 ** -24)
+ elif exp == 31:
+ if mant == 0:
+ val = float('Inf')
+ else:
+ val = float('NaN')
+ else:
+ val = (mant + 1024.0) * (2 ** (exp - 25))
+ if hibyte & 0x80:
+ val = -1.0 * val
+ return (val, 3)
+ elif tb == CBOR_FLOAT32:
+ data = fp.read(4)
+ pf = struct.unpack_from("!f", data, 0)
+ return (pf[0], 5)
+ elif tb == CBOR_FLOAT64:
+ data = fp.read(8)
+ pf = struct.unpack_from("!d", data, 0)
+ return (pf[0], 9)
+
+ tag, tag_aux, aux, bytes_read = _tag_aux(fp, tb)
+
+ if tag == CBOR_UINT:
+ return (aux, bytes_read)
+ elif tag == CBOR_NEGINT:
+ return (-1 - aux, bytes_read)
+ elif tag == CBOR_BYTES:
+ ob, subpos = loads_bytes(fp, aux)
+ return (ob, bytes_read + subpos)
+ elif tag == CBOR_TEXT:
+ raw, subpos = loads_bytes(fp, aux, btag=CBOR_TEXT)
+ ob = raw.decode('utf8')
+ return (ob, bytes_read + subpos)
+ elif tag == CBOR_ARRAY:
+ if aux is None:
+ return _loads_var_array(fp, limit, depth, returntags, bytes_read)
+ return _loads_array(fp, limit, depth, returntags, aux, bytes_read)
+ elif tag == CBOR_MAP:
+ if aux is None:
+ return _loads_var_map(fp, limit, depth, returntags, bytes_read)
+ return _loads_map(fp, limit, depth, returntags, aux, bytes_read)
+ elif tag == CBOR_TAG:
+ ob, subpos = _loads(fp)
+ bytes_read += subpos
+ if returntags:
+ # Don't interpret the tag, return it and the tagged object.
+ ob = Tag(aux, ob)
+ else:
+ # attempt to interpet the tag and the value into a Python object.
+ ob = tagify(ob, aux)
+ return ob, bytes_read
+ elif tag == CBOR_7:
+ if tb == CBOR_TRUE:
+ return (True, bytes_read)
+ if tb == CBOR_FALSE:
+ return (False, bytes_read)
+ if tb == CBOR_NULL:
+ return (None, bytes_read)
+ if tb == CBOR_UNDEFINED:
+ return (None, bytes_read)
+ raise ValueError("unknown cbor tag 7 byte: {:02x}".format(tb))
+
+
+def loads_bytes(fp, aux, btag=CBOR_BYTES):
+ # TODO: limit to some maximum number of chunks and some maximum total bytes
+ if aux is not None:
+ # simple case
+ ob = fp.read(aux)
+ return (ob, aux)
+ # read chunks of bytes
+ chunklist = []
+ total_bytes_read = 0
+ while True:
+ tb = fp.read(1)[0]
+ if not _IS_PY3:
+ tb = ord(tb)
+ if tb == CBOR_BREAK:
+ total_bytes_read += 1
+ break
+ tag, tag_aux, aux, bytes_read = _tag_aux(fp, tb)
+ assert tag == btag, 'variable length value contains unexpected component'
+ ob = fp.read(aux)
+ chunklist.append(ob)
+ total_bytes_read += bytes_read + aux
+ return (b''.join(chunklist), total_bytes_read)
+
+
+if _IS_PY3:
+ def _bytes_to_biguint(bs):
+ out = 0
+ for ch in bs:
+ out = out << 8
+ out = out | ch
+ return out
+else:
+ def _bytes_to_biguint(bs):
+ out = 0
+ for ch in bs:
+ out = out << 8
+ out = out | ord(ch)
+ return out
+
+
+def tagify(ob, aux):
+ # TODO: make this extensible?
+ # cbor.register_tag_handler(tagnumber, tag_handler)
+ # where tag_handler takes (tagnumber, tagged_object)
+ if aux == CBOR_TAG_DATE_STRING:
+ # TODO: parse RFC3339 date string
+ pass
+ if aux == CBOR_TAG_DATE_ARRAY:
+ return datetime.datetime.utcfromtimestamp(ob)
+ if aux == CBOR_TAG_BIGNUM:
+ return _bytes_to_biguint(ob)
+ if aux == CBOR_TAG_NEGBIGNUM:
+ return -1 - _bytes_to_biguint(ob)
+ if aux == CBOR_TAG_REGEX:
+ # Is this actually a good idea? Should we just return the tag and the raw value to the user somehow?
+ return re.compile(ob)
+ return Tag(aux, ob)