Source code for genlayer.py.calldata

"""
This module is responsible for working with genvm calldata

Calldata supports following types:

#. Primitive types:

	#. python built-in: :py:class:`bool`, :py:obj:`None`, :py:class:`int`, :py:class:`str`, :py:class:`bytes`
	#. :py:meth:`~genlayer.py.types.Address` type

#. Composite types:

	#. :py:class:`list` and :py:class:`dict`, as well as :py:class:`collections.abc.Sequence` and :py:class:`collections.abc.Mapping`
	#. :py:class:`CalldataEncodable`
	# :py:mod:`dataclasses`

For full calldata specification see `genvm repo <https://github.com/yeagerai/genvm/blob/main/doc/calldata.md>`_
"""

__all__ = ('encode', 'decode', 'to_str', 'CalldataEncodable')

from .types import Address
import typing
import collections.abc
import dataclasses
import abc
import json

BITS_IN_TYPE = 3

TYPE_SPECIAL = 0
TYPE_PINT = 1
TYPE_NINT = 2
TYPE_BYTES = 3
TYPE_STR = 4
TYPE_ARR = 5
TYPE_MAP = 6

SPECIAL_NULL = (0 << BITS_IN_TYPE) | TYPE_SPECIAL
SPECIAL_FALSE = (1 << BITS_IN_TYPE) | TYPE_SPECIAL
SPECIAL_TRUE = (2 << BITS_IN_TYPE) | TYPE_SPECIAL
SPECIAL_ADDR = (3 << BITS_IN_TYPE) | TYPE_SPECIAL


[docs] class CalldataEncodable(metaclass=abc.ABCMeta): """ Abstract class to support calldata encoding for custom types Can be used to simplify code """
[docs] @abc.abstractmethod def __to_calldata__(self) -> typing.Any: """ Override this method to return calldata-compatible type .. warning:: returning ``self`` may lead to an infinite loop """ ...
[docs] def encode( x: typing.Any, *, default: typing.Callable[[typing.Any], typing.Any] = lambda x: x ) -> bytes: """ Encodes python object into calldata bytes :param default: function to be applied to each object recursively, it must return object encodable to calldata .. warning:: All composite types will be coerced to :py:class:`dict` and :py:class:`list`, so custom type information won't be preserved """ mem = bytearray() def append_uleb128(i): assert i >= 0 if i == 0: mem.append(0) while i > 0: cur = i & 0x7F i = i >> 7 if i > 0: cur |= 0x80 mem.append(cur) def impl_dict(b: collections.abc.Mapping): keys = list(b.keys()) keys.sort() le = len(keys) le = (le << 3) | TYPE_MAP append_uleb128(le) for k in keys: if not isinstance(k, str): raise Exception(f'key is not string {type(k)}') bts = k.encode('utf-8') append_uleb128(len(bts)) mem.extend(bts) impl(b[k]) def impl(b: typing.Any): b = default(b) if isinstance(b, CalldataEncodable): b = b.__to_calldata__() if b is None: mem.append(SPECIAL_NULL) elif b is True: mem.append(SPECIAL_TRUE) elif b is False: mem.append(SPECIAL_FALSE) elif isinstance(b, int): if b >= 0: b = (b << 3) | TYPE_PINT append_uleb128(b) else: b = -b - 1 b = (b << 3) | TYPE_NINT append_uleb128(b) elif isinstance(b, Address): mem.append(SPECIAL_ADDR) mem.extend(b.as_bytes) elif isinstance(b, bytes): lb = len(b) lb = (lb << 3) | TYPE_BYTES append_uleb128(lb) mem.extend(b) elif isinstance(b, str): b = b.encode('utf-8') lb = len(b) lb = (lb << 3) | TYPE_STR append_uleb128(lb) mem.extend(b) elif isinstance(b, collections.abc.Sequence): lb = len(b) lb = (lb << 3) | TYPE_ARR append_uleb128(lb) for x in b: impl(x) elif isinstance(b, collections.abc.Mapping): impl_dict(b) elif dataclasses.is_dataclass(b): assert not isinstance(b, type) try: impl_dict(dataclasses.asdict(b)) except TypeError as e: raise TypeError(f'not calldata encodable', type(b)) from e else: raise TypeError(f'not calldata encodable', type(b)) impl(x) return bytes(mem)
[docs] def decode(mem0: collections.abc.Buffer) -> typing.Any: """ Decodes calldata encoded bytes into python DSL Out of composite types it will contain only :py:class:`dict` and :py:class:`list` """ mem: memoryview = memoryview(mem0) def read_uleb128() -> int: nonlocal mem ret = 0 off = 0 while True: m = mem[0] ret = ret | ((m & 0x7F) << off) off += 7 mem = mem[1:] if (m & 0x80) == 0: break return ret def impl() -> typing.Any: nonlocal mem code = read_uleb128() typ = code & 0x7 if typ == TYPE_SPECIAL: if code == SPECIAL_NULL: return None if code == SPECIAL_FALSE: return False if code == SPECIAL_TRUE: return True if code == SPECIAL_ADDR: ret_addr = mem[: Address.SIZE] mem = mem[Address.SIZE :] return Address(ret_addr) raise Exception(f'Unknown special {bin(code)} {hex(code)}') code = code >> 3 if typ == TYPE_PINT: return code elif typ == TYPE_NINT: return -code - 1 elif typ == TYPE_BYTES: ret_bytes = mem[:code] mem = mem[code:] return ret_bytes elif typ == TYPE_STR: ret_str = mem[:code] mem = mem[code:] return str(ret_str, encoding='utf-8') elif typ == TYPE_ARR: ret_arr = [] for _i in range(code): ret_arr.append(impl()) return ret_arr elif typ == TYPE_MAP: ret_dict: dict[str, typing.Any] = {} prev = None for _i in range(code): le = read_uleb128() key = str(mem[:le], encoding='utf-8') mem = mem[le:] if prev is not None: assert prev < key prev = key assert key not in ret_dict ret_dict[key] = impl() return ret_dict raise Exception(f'invalid type {typ}') res = impl() if len(mem) != 0: raise Exception(f'unparsed end {bytes(mem[:5])!r}... (decoded {res})') return res
[docs] def to_str(d: typing.Any) -> str: """ Transforms calldata DSL into human readable json-like format, should be used for debug purposes only """ buf: list[str] = [] def impl(d: typing.Any) -> None: if d is None: buf.append('null') elif d is True: buf.append('true') elif d is False: buf.append('false') elif isinstance(d, str): buf.append(json.dumps(d)) elif isinstance(d, bytes): buf.append('b#') buf.append(d.hex()) elif isinstance(d, int): buf.append(str(d)) elif isinstance(d, Address): buf.append('addr#') buf.append(d.as_bytes.hex()) elif isinstance(d, collections.abc.Mapping): buf.append('{') comma = False for k, v in d.items(): if comma: buf.append(',') comma = True buf.append(json.dumps(k)) buf.append(':') impl(v) buf.append('}') elif isinstance(d, collections.abc.Sequence): buf.append('[') comma = False for v in d: if comma: buf.append(',') comma = True impl(v) buf.append(']') else: raise Exception(f"can't encode {d} to calldata") impl(d) return ''.join(buf)