Source code for genlayer.calldata

"""
GenVM calldata encoding and decoding module.

This module provides:

* ``encode``: Encode Python objects to calldata bytes
* ``decode``: Decode calldata bytes to Python objects
* ``to_str``: Human-readable string representation
* ``CalldataEncodable``: ABC for custom encoding
* Type aliases: ``Encodable``, ``Decoded``, ``EncodableWithDefault``

Calldata natively supports following types:

#. Primitive types:

	#. python built-in: :py:class:`bool`, :py:obj:`None`, :py:class:`int`, :py:class:`str`, :py:class:`bytes`
	#. :py:meth:`~genlayer.types.Address` type

#. Composite types:

	#. :py:class:`list` (and any other :py:class:`collections.abc.Sequence`)
	#. :py:class:`dict` with :py:class:`str` keys (and any other :py:class:`collections.abc.Mapping` with :py:class:`str` keys)

For full calldata specification see `genvm repo <https://github.com/yeagerai/genvm/blob/main/doc/calldata.md>`_
"""

__all__ = (
	'encode',
	'decode',
	'to_str',
	'Encodable',
	'EncodableWithDefault',
	'Decoded',
	'CalldataEncodable',
	'DecodingError',
)

from genlayer.types import Address
import typing
import collections.abc
import dataclasses
import abc
import json

import genlayer._internal.reflect as reflect

BITS_IN_TYPE = 3

TYPE_SPECIAL = 0
TYPE_PINT = 1
TYPE_NINT = 2
TYPE_BYTES = 3
TYPE_STR = 4
TYPE_ARR = 5
TYPE_MAP = 6

SPECIAL_NULL = (0 << BITS_IN_TYPE) | TYPE_SPECIAL
SPECIAL_FALSE = (1 << BITS_IN_TYPE) | TYPE_SPECIAL
SPECIAL_TRUE = (2 << BITS_IN_TYPE) | TYPE_SPECIAL
SPECIAL_ADDR = (3 << BITS_IN_TYPE) | TYPE_SPECIAL


[docs] class CalldataEncodable(metaclass=abc.ABCMeta): """ Abstract class to support calldata encoding for custom types Can be used to simplify code """
[docs] @abc.abstractmethod def __to_calldata__(self) -> 'Encodable': """ Override this method to return calldata-compatible type .. warning:: returning ``self`` may lead to an infinite loop or an exception """ raise NotImplementedError()
type Decoded = None | int | str | bytes | list[Decoded] | dict[str, Decoded] """ Type that represents what type is coerced to after ``decode . encode`` """ type Encodable = ( None | int | str | Address | bool | bytes | collections.abc.Sequence[Encodable] | collections.abc.Mapping[str, Encodable] | CalldataEncodable ) """ Type that can be encoded into calldata """ type EncodableWithDefault[T] = Encodable | T """ Type that can be encoded into calldata, provided ``default`` function ``T -> Encodable`` """ def encode_default_parameter(b): if not dataclasses.is_dataclass(b): return b if isinstance(b, type): raise TypeError(f'expected dataclass instance, got type {b!r}') return {field.name: getattr(b, field.name) for field in dataclasses.fields(b)}
[docs] def encode[T]( x: EncodableWithDefault[T], /, *, default: typing.Callable[ [EncodableWithDefault[T]], Encodable ] = encode_default_parameter, ) -> bytes: """ Encodes python object into calldata bytes :param default: function to be applied to each object recursively, it must return object encodable to calldata .. warning:: All composite types in the end are coerced to :py:class:`dict` and :py:class:`list`, so custom type information is *not* be preserved. Such types include: #. :py:class:`CalldataEncodable` #. :py:mod:`dataclasses` """ mem = bytearray() def append_uleb128(i): if i < 0: raise ValueError(f'uleb128 requires non-negative integer, got {i}') if i == 0: mem.append(0) while i > 0: cur = i & 0x7F i = i >> 7 if i > 0: cur |= 0x80 mem.append(cur) def impl_dict(b: collections.abc.Mapping): keys = list(b.keys()) keys.sort() le = len(keys) le = (le << 3) | TYPE_MAP append_uleb128(le) for k in keys: with reflect.context_notes(f'key {k!r}'): if not isinstance(k, str): raise TypeError(f'key is not string {reflect.repr_type(type(k))}') bts = k.encode('utf-8') append_uleb128(len(bts)) mem.extend(bts) impl(b[k]) def impl(b: EncodableWithDefault[T]): b = default(b) if isinstance(b, CalldataEncodable): b = b.__to_calldata__() if b is None: mem.append(SPECIAL_NULL) elif b is True: mem.append(SPECIAL_TRUE) elif b is False: mem.append(SPECIAL_FALSE) elif isinstance(b, int): if b >= 0: b = (b << 3) | TYPE_PINT append_uleb128(b) else: b = -b - 1 b = (b << 3) | TYPE_NINT append_uleb128(b) elif isinstance(b, Address): mem.append(SPECIAL_ADDR) mem.extend(b.as_bytes) elif isinstance(b, (bytes, bytearray)): lb = len(b) lb = (lb << 3) | TYPE_BYTES append_uleb128(lb) mem.extend(b) elif isinstance(b, memoryview): mem.extend(b.tolist()) elif isinstance(b, str): b = b.encode('utf-8') lb = len(b) lb = (lb << 3) | TYPE_STR append_uleb128(lb) mem.extend(b) elif isinstance(b, collections.abc.Sequence): lb = len(b) lb = (lb << 3) | TYPE_ARR append_uleb128(lb) for x in b: impl(x) elif isinstance(b, collections.abc.Mapping): impl_dict(b) else: raise TypeError(f'not calldata encodable {b!r}: {reflect.repr_type(type(b))}') impl(x) return bytes(mem)
[docs] class DecodingError(ValueError): pass
[docs] def decode( mem0: collections.abc.Buffer, /, *, memview2bytes: typing.Callable[[memoryview], typing.Any] = bytes, ) -> Decoded: """ Decodes calldata encoded bytes into python DSL Out of composite types it will contain only :py:class:`dict` and :py:class:`list` """ mem: memoryview = memoryview(mem0) def fetch_mem(cnt: int) -> memoryview: nonlocal mem if len(mem) < cnt: raise DecodingError('unexpected end of memory') ret = mem[:cnt] mem = mem[cnt:] return ret def read_uleb128() -> int: nonlocal mem ret = 0 off = 0 while True: m = fetch_mem(1)[0] ret = ret | ((m & 0x7F) << off) if (m & 0x80) == 0: if m == 0 and off != 0: raise DecodingError('most significant octet can not be zero') break off += 7 return ret def impl() -> typing.Any: nonlocal mem code = read_uleb128() typ = code & 0x7 if typ == TYPE_SPECIAL: if code == SPECIAL_NULL: return None if code == SPECIAL_FALSE: return False if code == SPECIAL_TRUE: return True if code == SPECIAL_ADDR: return Address(fetch_mem(Address.SIZE)) raise DecodingError(f'Unknown special {bin(code)} {hex(code)}') code = code >> 3 if typ == TYPE_PINT: return code elif typ == TYPE_NINT: return -code - 1 elif typ == TYPE_BYTES: return memview2bytes(fetch_mem(code)) elif typ == TYPE_STR: return str(fetch_mem(code), encoding='utf-8') elif typ == TYPE_ARR: ret_arr = [] for _i in range(code): ret_arr.append(impl()) return ret_arr elif typ == TYPE_MAP: ret_dict: dict[str, typing.Any] = {} prev = None for _i in range(code): le = read_uleb128() key = str(fetch_mem(le), encoding='utf-8') if prev is not None: if prev >= key: raise DecodingError(f'unordered calldata keys: `{prev}` >= `{key}`') prev = key if key in ret_dict: raise DecodingError(f'duplicate calldata map key `{key}`') ret_dict[key] = impl() return ret_dict raise DecodingError(f'invalid type {typ}') res = impl() if len(mem) != 0: raise DecodingError(f'unparsed end {bytes(mem[:5])!r}... (decoded {res})') return res
[docs] def to_str(d: Encodable, /) -> str: """ Transforms calldata DSL into human readable json-like format, should be used for debug purposes only """ buf: list[str] = [] def impl(d: Encodable, /) -> None: if d is None: buf.append('null') elif d is True: buf.append('true') elif d is False: buf.append('false') elif isinstance(d, str): buf.append(json.dumps(d)) elif isinstance(d, (bytes, bytearray)): buf.append('b#') buf.append(d.hex()) elif isinstance(d, memoryview): buf.append('b#') buf.append(d.hex()) elif isinstance(d, int): buf.append(str(d)) elif isinstance(d, Address): buf.append('addr#') buf.append(d.as_bytes.hex()) elif isinstance(d, collections.abc.Mapping): buf.append('{') comma = False for k, v in d.items(): if comma: buf.append(',') comma = True buf.append(json.dumps(k)) buf.append(':') impl(v) buf.append('}') elif isinstance(d, collections.abc.Sequence): buf.append('[') comma = False for v in d: if comma: buf.append(',') comma = True impl(v) buf.append(']') elif isinstance(d, CalldataEncodable): impl(d.__to_calldata__()) else: raise DecodingError(f"can't encode {d} to calldata") impl(d) return ''.join(buf)