hgext3rd/evolve/thirdparty/cbor.py
author Martin von Zweigbergk <martinvonz@google.com>
Wed, 03 Jul 2019 11:13:47 -0700
changeset 4714 c51fc0ae7a7e
parent 3390 b3dbba6e34c9
child 4814 48b30ff742cb
permissions -rw-r--r--
py3: switch from iteritems() to items()

#!python
# -*- Python -*-
#
# Copyright 2014-2015 Brian Olson
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import datetime
import re
import struct
import sys

_IS_PY3 = sys.version_info[0] >= 3

if _IS_PY3:
    from io import BytesIO as StringIO
else:
    try:
        from cStringIO import StringIO
    except:
        from StringIO import StringIO


CBOR_TYPE_MASK = 0xE0  # top 3 bits
CBOR_INFO_BITS = 0x1F  # low 5 bits


CBOR_UINT    = 0x00
CBOR_NEGINT  = 0x20
CBOR_BYTES   = 0x40
CBOR_TEXT    = 0x60
CBOR_ARRAY   = 0x80
CBOR_MAP     = 0xA0
CBOR_TAG     = 0xC0
CBOR_7       = 0xE0  # float and other types

CBOR_UINT8_FOLLOWS  = 24  # 0x18
CBOR_UINT16_FOLLOWS = 25  # 0x19
CBOR_UINT32_FOLLOWS = 26  # 0x1a
CBOR_UINT64_FOLLOWS = 27  # 0x1b
CBOR_VAR_FOLLOWS    = 31  # 0x1f

CBOR_BREAK  = 0xFF

CBOR_FALSE  = (CBOR_7 | 20)
CBOR_TRUE   = (CBOR_7 | 21)
CBOR_NULL   = (CBOR_7 | 22)
CBOR_UNDEFINED   = (CBOR_7 | 23)  # js 'undefined' value

CBOR_FLOAT16 = (CBOR_7 | 25)
CBOR_FLOAT32 = (CBOR_7 | 26)
CBOR_FLOAT64 = (CBOR_7 | 27)

CBOR_TAG_DATE_STRING = 0 # RFC3339
CBOR_TAG_DATE_ARRAY = 1 # any number type follows, seconds since 1970-01-01T00:00:00 UTC
CBOR_TAG_BIGNUM = 2 # big endian byte string follows
CBOR_TAG_NEGBIGNUM = 3 # big endian byte string follows
CBOR_TAG_DECIMAL = 4 # [ 10^x exponent, number ]
CBOR_TAG_BIGFLOAT = 5 # [ 2^x exponent, number ]
CBOR_TAG_BASE64URL = 21
CBOR_TAG_BASE64 = 22
CBOR_TAG_BASE16 = 23
CBOR_TAG_CBOR = 24 # following byte string is embedded CBOR data

CBOR_TAG_URI = 32
CBOR_TAG_BASE64URL = 33
CBOR_TAG_BASE64 = 34
CBOR_TAG_REGEX = 35
CBOR_TAG_MIME = 36 # following text is MIME message, headers, separators and all
CBOR_TAG_CBOR_FILEHEADER = 55799 # can open a file with 0xd9d9f7

_CBOR_TAG_BIGNUM_BYTES = struct.pack('B', CBOR_TAG | CBOR_TAG_BIGNUM)


def dumps_int(val):
    "return bytes representing int val in CBOR"
    if val >= 0:
        # CBOR_UINT is 0, so I'm lazy/efficient about not OR-ing it in.
        if val <= 23:
            return struct.pack('B', val)
        if val <= 0x0ff:
            return struct.pack('BB', CBOR_UINT8_FOLLOWS, val)
        if val <= 0x0ffff:
            return struct.pack('!BH', CBOR_UINT16_FOLLOWS, val)
        if val <= 0x0ffffffff:
            return struct.pack('!BI', CBOR_UINT32_FOLLOWS, val)
        if val <= 0x0ffffffffffffffff:
            return struct.pack('!BQ', CBOR_UINT64_FOLLOWS, val)
        outb = _dumps_bignum_to_bytearray(val)
        return _CBOR_TAG_BIGNUM_BYTES + _encode_type_num(CBOR_BYTES, len(outb)) + outb
    val = -1 - val
    return _encode_type_num(CBOR_NEGINT, val)


if _IS_PY3:
    def _dumps_bignum_to_bytearray(val):
        out = []
        while val > 0:
            out.insert(0, val & 0x0ff)
            val = val >> 8
        return bytes(out)
else:
    def _dumps_bignum_to_bytearray(val):
        out = []
        while val > 0:
            out.insert(0, chr(val & 0x0ff))
            val = val >> 8
        return b''.join(out)


def dumps_float(val):
    return struct.pack("!Bd", CBOR_FLOAT64, val)


_CBOR_TAG_NEGBIGNUM_BYTES = struct.pack('B', CBOR_TAG | CBOR_TAG_NEGBIGNUM)


def _encode_type_num(cbor_type, val):
    """For some CBOR primary type [0..7] and an auxiliary unsigned number, return CBOR encoded bytes"""
    assert val >= 0
    if val <= 23:
        return struct.pack('B', cbor_type | val)
    if val <= 0x0ff:
        return struct.pack('BB', cbor_type | CBOR_UINT8_FOLLOWS, val)
    if val <= 0x0ffff:
        return struct.pack('!BH', cbor_type | CBOR_UINT16_FOLLOWS, val)
    if val <= 0x0ffffffff:
        return struct.pack('!BI', cbor_type | CBOR_UINT32_FOLLOWS, val)
    if (((cbor_type == CBOR_NEGINT) and (val <= 0x07fffffffffffffff)) or
        ((cbor_type != CBOR_NEGINT) and (val <= 0x0ffffffffffffffff))):
        return struct.pack('!BQ', cbor_type | CBOR_UINT64_FOLLOWS, val)
    if cbor_type != CBOR_NEGINT:
        raise Exception("value too big for CBOR unsigned number: {0!r}".format(val))
    outb = _dumps_bignum_to_bytearray(val)
    return _CBOR_TAG_NEGBIGNUM_BYTES + _encode_type_num(CBOR_BYTES, len(outb)) + outb


if _IS_PY3:
    def _is_unicode(val):
        return isinstance(val, str)
else:
    def _is_unicode(val):
        return isinstance(val, unicode)


def dumps_string(val, is_text=None, is_bytes=None):
    if _is_unicode(val):
        val = val.encode('utf8')
        is_text = True
        is_bytes = False
    if (is_bytes) or not (is_text == True):
        return _encode_type_num(CBOR_BYTES, len(val)) + val
    return _encode_type_num(CBOR_TEXT, len(val)) + val


def dumps_array(arr, sort_keys=False):
    head = _encode_type_num(CBOR_ARRAY, len(arr))
    parts = [dumps(x, sort_keys=sort_keys) for x in arr]
    return head + b''.join(parts)


if _IS_PY3:
    def dumps_dict(d, sort_keys=False):
        head = _encode_type_num(CBOR_MAP, len(d))
        parts = [head]
        if sort_keys:
            for k in sorted(d.keys()):
                v = d[k]
                parts.append(dumps(k, sort_keys=sort_keys))
                parts.append(dumps(v, sort_keys=sort_keys))
        else:
            for k,v in d.items():
                parts.append(dumps(k, sort_keys=sort_keys))
                parts.append(dumps(v, sort_keys=sort_keys))
        return b''.join(parts)
else:
    def dumps_dict(d, sort_keys=False):
        head = _encode_type_num(CBOR_MAP, len(d))
        parts = [head]
        if sort_keys:
            for k in sorted(d.iterkeys()):
                v = d[k]
                parts.append(dumps(k, sort_keys=sort_keys))
                parts.append(dumps(v, sort_keys=sort_keys))
        else:
            for k,v in d.items():
                parts.append(dumps(k, sort_keys=sort_keys))
                parts.append(dumps(v, sort_keys=sort_keys))
        return b''.join(parts)


def dumps_bool(b):
    if b:
        return struct.pack('B', CBOR_TRUE)
    return struct.pack('B', CBOR_FALSE)


def dumps_tag(t, sort_keys=False):
    return _encode_type_num(CBOR_TAG, t.tag) + dumps(t.value, sort_keys=sort_keys)


if _IS_PY3:
    def _is_stringish(x):
        return isinstance(x, (str, bytes))
    def _is_intish(x):
        return isinstance(x, int)
else:
    def _is_stringish(x):
        return isinstance(x, (str, basestring, bytes, unicode))
    def _is_intish(x):
        return isinstance(x, (int, long))


def dumps(ob, sort_keys=False):
    if ob is None:
        return struct.pack('B', CBOR_NULL)
    if isinstance(ob, bool):
        return dumps_bool(ob)
    if _is_stringish(ob):
        return dumps_string(ob)
    if isinstance(ob, (list, tuple)):
        return dumps_array(ob, sort_keys=sort_keys)
    # TODO: accept other enumerables and emit a variable length array
    if isinstance(ob, dict):
        return dumps_dict(ob, sort_keys=sort_keys)
    if isinstance(ob, float):
        return dumps_float(ob)
    if _is_intish(ob):
        return dumps_int(ob)
    if isinstance(ob, Tag):
        return dumps_tag(ob, sort_keys=sort_keys)
    raise Exception("don't know how to cbor serialize object of type %s", type(ob))


# same basic signature as json.dump, but with no options (yet)
def dump(obj, fp, sort_keys=False):
    """
    obj: Python object to serialize
    fp: file-like object capable of .write(bytes)
    """
    # this is kinda lame, but probably not inefficient for non-huge objects
    # TODO: .write() to fp as we go as each inner object is serialized
    blob = dumps(obj, sort_keys=sort_keys)
    fp.write(blob)


class Tag(object):
    def __init__(self, tag=None, value=None):
        self.tag = tag
        self.value = value

    def __repr__(self):
        return "Tag({0!r}, {1!r})".format(self.tag, self.value)

    def __eq__(self, other):
        if not isinstance(other, Tag):
            return False
        return (self.tag == other.tag) and (self.value == other.value)


def loads(data):
    """
    Parse CBOR bytes and return Python objects.
    """
    if data is None:
        raise ValueError("got None for buffer to decode in loads")
    fp = StringIO(data)
    return _loads(fp)[0]


def load(fp):
    """
    Parse and return object from fp, a file-like object supporting .read(n)
    """
    return _loads(fp)[0]


_MAX_DEPTH = 100


def _tag_aux(fp, tb):
    bytes_read = 1
    tag = tb & CBOR_TYPE_MASK
    tag_aux = tb & CBOR_INFO_BITS
    if tag_aux <= 23:
        aux = tag_aux
    elif tag_aux == CBOR_UINT8_FOLLOWS:
        data = fp.read(1)
        aux = struct.unpack_from("!B", data, 0)[0]
        bytes_read += 1
    elif tag_aux == CBOR_UINT16_FOLLOWS:
        data = fp.read(2)
        aux = struct.unpack_from("!H", data, 0)[0]
        bytes_read += 2
    elif tag_aux == CBOR_UINT32_FOLLOWS:
        data = fp.read(4)
        aux = struct.unpack_from("!I", data, 0)[0]
        bytes_read += 4
    elif tag_aux == CBOR_UINT64_FOLLOWS:
        data = fp.read(8)
        aux = struct.unpack_from("!Q", data, 0)[0]
        bytes_read += 8
    else:
        assert tag_aux == CBOR_VAR_FOLLOWS, "bogus tag {0:02x}".format(tb)
        aux = None

    return tag, tag_aux, aux, bytes_read


def _read_byte(fp):
    tb = fp.read(1)
    if len(tb) == 0:
        # I guess not all file-like objects do this
        raise EOFError()
    return ord(tb)


def _loads_var_array(fp, limit, depth, returntags, bytes_read):
    ob = []
    tb = _read_byte(fp)
    while tb != CBOR_BREAK:
        (subob, sub_len) = _loads_tb(fp, tb, limit, depth, returntags)
        bytes_read += 1 + sub_len
        ob.append(subob)
        tb = _read_byte(fp)
    return (ob, bytes_read + 1)


def _loads_var_map(fp, limit, depth, returntags, bytes_read):
    ob = {}
    tb = _read_byte(fp)
    while tb != CBOR_BREAK:
        (subk, sub_len) = _loads_tb(fp, tb, limit, depth, returntags)
        bytes_read += 1 + sub_len
        (subv, sub_len) = _loads(fp, limit, depth, returntags)
        bytes_read += sub_len
        ob[subk] = subv
        tb = _read_byte(fp)
    return (ob, bytes_read + 1)


if _IS_PY3:
    def _loads_array(fp, limit, depth, returntags, aux, bytes_read):
        ob = []
        for i in range(aux):
            subob, subpos = _loads(fp)
            bytes_read += subpos
            ob.append(subob)
        return ob, bytes_read
    def _loads_map(fp, limit, depth, returntags, aux, bytes_read):
        ob = {}
        for i in range(aux):
            subk, subpos = _loads(fp)
            bytes_read += subpos
            subv, subpos = _loads(fp)
            bytes_read += subpos
            ob[subk] = subv
        return ob, bytes_read
else:
    def _loads_array(fp, limit, depth, returntags, aux, bytes_read):
        ob = []
        for i in xrange(aux):
            subob, subpos = _loads(fp)
            bytes_read += subpos
            ob.append(subob)
        return ob, bytes_read
    def _loads_map(fp, limit, depth, returntags, aux, bytes_read):
        ob = {}
        for i in xrange(aux):
            subk, subpos = _loads(fp)
            bytes_read += subpos
            subv, subpos = _loads(fp)
            bytes_read += subpos
            ob[subk] = subv
        return ob, bytes_read

def _loads(fp, limit=None, depth=0, returntags=False):
    "return (object, bytes read)"
    if depth > _MAX_DEPTH:
        raise Exception("hit CBOR loads recursion depth limit")

    tb = _read_byte(fp)

    return _loads_tb(fp, tb, limit, depth, returntags)

def _loads_tb(fp, tb, limit=None, depth=0, returntags=False):
    # Some special cases of CBOR_7 best handled by special struct.unpack logic here
    if tb == CBOR_FLOAT16:
        data = fp.read(2)
        hibyte, lowbyte = struct.unpack_from("BB", data, 0)
        exp = (hibyte >> 2) & 0x1F
        mant = ((hibyte & 0x03) << 8) | lowbyte
        if exp == 0:
            val = mant * (2.0 ** -24)
        elif exp == 31:
            if mant == 0:
                val = float('Inf')
            else:
                val = float('NaN')
        else:
            val = (mant + 1024.0) * (2 ** (exp - 25))
        if hibyte & 0x80:
            val = -1.0 * val
        return (val, 3)
    elif tb == CBOR_FLOAT32:
        data = fp.read(4)
        pf = struct.unpack_from("!f", data, 0)
        return (pf[0], 5)
    elif tb == CBOR_FLOAT64:
        data = fp.read(8)
        pf = struct.unpack_from("!d", data, 0)
        return (pf[0], 9)

    tag, tag_aux, aux, bytes_read = _tag_aux(fp, tb)

    if tag == CBOR_UINT:
        return (aux, bytes_read)
    elif tag == CBOR_NEGINT:
        return (-1 - aux, bytes_read)
    elif tag == CBOR_BYTES:
        ob, subpos = loads_bytes(fp, aux)
        return (ob, bytes_read + subpos)
    elif tag == CBOR_TEXT:
        raw, subpos = loads_bytes(fp, aux, btag=CBOR_TEXT)
        ob = raw.decode('utf8')
        return (ob, bytes_read + subpos)
    elif tag == CBOR_ARRAY:
        if aux is None:
            return _loads_var_array(fp, limit, depth, returntags, bytes_read)
        return _loads_array(fp, limit, depth, returntags, aux, bytes_read)
    elif tag == CBOR_MAP:
        if aux is None:
            return _loads_var_map(fp, limit, depth, returntags, bytes_read)
        return _loads_map(fp, limit, depth, returntags, aux, bytes_read)
    elif tag == CBOR_TAG:
        ob, subpos = _loads(fp)
        bytes_read += subpos
        if returntags:
            # Don't interpret the tag, return it and the tagged object.
            ob = Tag(aux, ob)
        else:
            # attempt to interpet the tag and the value into a Python object.
            ob = tagify(ob, aux)
        return ob, bytes_read
    elif tag == CBOR_7:
        if tb == CBOR_TRUE:
            return (True, bytes_read)
        if tb == CBOR_FALSE:
            return (False, bytes_read)
        if tb == CBOR_NULL:
            return (None, bytes_read)
        if tb == CBOR_UNDEFINED:
            return (None, bytes_read)
        raise ValueError("unknown cbor tag 7 byte: {:02x}".format(tb))


def loads_bytes(fp, aux, btag=CBOR_BYTES):
    # TODO: limit to some maximum number of chunks and some maximum total bytes
    if aux is not None:
        # simple case
        ob = fp.read(aux)
        return (ob, aux)
    # read chunks of bytes
    chunklist = []
    total_bytes_read = 0
    while True:
        tb = fp.read(1)[0]
        if not _IS_PY3:
            tb = ord(tb)
        if tb == CBOR_BREAK:
            total_bytes_read += 1
            break
        tag, tag_aux, aux, bytes_read = _tag_aux(fp, tb)
        assert tag == btag, 'variable length value contains unexpected component'
        ob = fp.read(aux)
        chunklist.append(ob)
        total_bytes_read += bytes_read + aux
    return (b''.join(chunklist), total_bytes_read)


if _IS_PY3:
    def _bytes_to_biguint(bs):
        out = 0
        for ch in bs:
            out = out << 8
            out = out | ch
        return out
else:
    def _bytes_to_biguint(bs):
        out = 0
        for ch in bs:
            out = out << 8
            out = out | ord(ch)
        return out


def tagify(ob, aux):
    # TODO: make this extensible?
    # cbor.register_tag_handler(tagnumber, tag_handler)
    # where tag_handler takes (tagnumber, tagged_object)
    if aux == CBOR_TAG_DATE_STRING:
        # TODO: parse RFC3339 date string
        pass
    if aux == CBOR_TAG_DATE_ARRAY:
        return datetime.datetime.utcfromtimestamp(ob)
    if aux == CBOR_TAG_BIGNUM:
        return _bytes_to_biguint(ob)
    if aux == CBOR_TAG_NEGBIGNUM:
        return -1 - _bytes_to_biguint(ob)
    if aux == CBOR_TAG_REGEX:
        # Is this actually a good idea? Should we just return the tag and the raw value to the user somehow?
        return re.compile(ob)
    return Tag(aux, ob)