Source code for genlayer.storage._internal.generate

"""
Module that uses reflections that generates python-friendly views to GenVM storage format (mapping from slot addresses to linear memories)
"""

__all__ = ('generate_storage',)

from genlayer.types import *
from genlayer.types import StaticIntMeta

import typing
import sys
import struct

from genlayer.storage.core import *

from .desc_base_types import (
	AddrDesc,
	IntDesc,
	StrDesc,
	BytesDesc,
	BoolDesc,
	NoneDesc,
	_BigIntDesc,
)
from .desc_record import _RecordDesc, RecordExtraFields
from ..dyn_array import DynArray, _DynArrayDesc
from ..array import Array, _ArrayDesc

import genlayer._internal.reflect as reflect

STORAGE_PATCHED_ATTR = '__gl_storage_patched__'
ORIGINAL_INIT_ATTR = '__gl_original_init__'
ALLOW_STORAGE_ATTR = '__gl_allow_storage__'


[docs] def allow[T: type](cls: T) -> T: """ Marks class as allowed to be used within storage. Without this annotation, storage builder will raise an exception when trying to generate description for the class. This behavior is required to prevent accidental usage of classes that are not designed to be used in storage, because storage-generated class is modified and starts to behave differently from regular python class) """ setattr(cls, ALLOW_STORAGE_ATTR, True) return cls
def generate_storage[T: type](cls: T) -> T: populate_np_descs_if_loaded() cls = allow(cls) ctx = _BuilderCtx({}, ()) desc = _storage_build(ctx, cls) _known_descs[cls] = desc return cls _none_desc = NoneDesc() _bigint_desc = _BigIntDesc() _all_int_types: list = [ u8, u16, u24, u32, u40, u48, u56, u64, u72, u80, u88, u96, u104, u112, u120, u128, u136, u144, u152, u160, u168, u176, u184, u192, u200, u208, u216, u224, u232, u240, u248, u256, i8, i16, i24, i32, i40, i48, i56, i64, i72, i80, i88, i96, i104, i112, i120, i128, i136, i144, i152, i160, i168, i176, i184, i192, i200, i208, i216, i224, i232, i240, i248, i256, ] _int_descs: dict[StaticIntMeta, tuple[type, IntDesc]] = {} for t in _all_int_types: sim: StaticIntMeta = t.__metadata__[0] _int_descs[sim] = (t, IntDesc(sim.size, signed=sim.signed)) _known_descs: dict[typing.Any, TypeDesc] = { Address: AddrDesc(), str: StrDesc(), bytes: BytesDesc(), bool: BoolDesc(), type(None): _none_desc, None: _none_desc, # type: ignore bigint: _bigint_desc, } for v in _int_descs.values(): _known_descs[v[0]] = v[1] class _FloatDesc(TypeDesc[float]): __slots__ = ('_type',) def __init__(self): TypeDesc.__init__(self, 8, [8]) self._type = type def get(self, slot: Slot, off: int) -> float: dat = slot.read(off, self.size) return struct.unpack('d', dat)[0] def set(self, slot: Slot, off: int, val: float): slot.write(off, struct.pack('d', val)) _known_descs[float] = _FloatDesc() class _DeferredBuilder: __slots__ = ('_ctx', '_raw_type', '_cached', '_evaluated_at_ctx') def __init__(self, ctx: '_BuilderCtx', raw_type): self._ctx = ctx self._raw_type = raw_type self._cached: TypeDesc | None = None self._evaluated_at_ctx: '_BuilderCtx | None' = None @property def raw(self): return self._raw_type def get(self, instantiated_at_ctx: '_BuilderCtx') -> TypeDesc: if self._cached is None: self._evaluated_at_ctx = instantiated_at_ctx self._cached = _storage_build(self._ctx, self._raw_type) return self._cached class GenerationError(TypeError): pass class _BuilderCtx(typing.NamedTuple): generic_vars: dict[str, _DeferredBuilder] trace: tuple[str, ...] def with_trace(self, msg: str) -> '_BuilderCtx': new_val = self._replace(trace=self.trace + (msg,)) return new_val def type_err(self, msg: str) -> GenerationError: exc = GenerationError(msg) for t in self.trace: exc.add_note(t) return exc @staticmethod def empty(): return _BuilderCtx({}, ()) def _resolve_raw_type(ctx: _BuilderCtx, tp): """Substitute TypeVars in a type using ctx.generic_vars, returning the raw type""" if isinstance(tp, typing.TypeVar): data = ctx.generic_vars.get(tp.__name__) if data is None: raise ctx.type_err(f'Unbound generic type variable `{tp.__name__}`') return _resolve_raw_type(ctx, data.raw) origin = typing.get_origin(tp) if origin is None: return tp args = typing.get_args(tp) new_args = tuple(_resolve_raw_type(ctx, a) for a in args) if new_args == args: return tp return origin[new_args] if len(new_args) > 1 else origin[new_args[0]] def _storage_build(ctx: _BuilderCtx, cls) -> TypeDesc: if cached := _known_descs.get(cls): return cached trace_note = ['during building type `', reflect.repr_type(cls), '`'] if len(ln := reflect.try_get_lineno(cls)) != 0: trace_note.append(' (declared at ') trace_note.append(str(ln)) trace_note.append(')') ctx = ctx.with_trace(''.join(trace_note)) return _storage_build_impl(ctx, cls) def _check_forbidden_origin(ctx: _BuilderCtx, cls): if cls is int: raise ctx.type_err('use `bigint` or one of sized integers') if cls is dict: raise ctx.type_err('use `TreeMap` instead of a dict') if cls is list: raise ctx.type_err('use `DynArray` instead of a list') def _storage_build_impl(ctx: _BuilderCtx, cls) -> TypeDesc: if s := _known_descs.get(cls): return s if isinstance(cls, typing.TypeVar): data = ctx.generic_vars.get(cls.__name__) if data is None: raise ctx.type_err( f'Unbound generic type variable `{cls.__name__}`, known variables: {",".join(ctx.generic_vars.keys())}' ) try: return data.get(ctx) except BaseException as e: e.add_note(f'Instantiated generic variable `{cls.__name__}`') for t in ctx.trace: e.add_note(t) raise origin = typing.get_origin(cls) if origin is None: _check_forbidden_origin(ctx, cls) return _storage_build_struct(ctx._replace(generic_vars={}), cls) if 'numpy' in sys.modules and origin is sys.modules['numpy'].dtype: args = typing.get_args(cls) if len(args) != 1: raise ctx.type_err( f'Expected exactly one argument for numpy dtype, got {len(args)}' ) return _storage_build(ctx, args[0]) _check_forbidden_origin(ctx, origin) args = typing.get_args(cls) if origin is typing.Annotated: return _storage_build_annotated(ctx, cls) if origin is typing.Literal: raise ctx.type_err('Literal types are not supported in storage') if origin is tuple or origin is typing.Tuple: raise ctx.type_err( 'Tuple types are not supported in storage, use a custom class instead' ) if (as_np := try_handle_np(ctx, origin, args)) is not None: return as_np generic_params = origin.__type_params__ if len(generic_params) != len(args): raise ctx.type_err( f'incorrect number of generic arguments for {origin} parameters={generic_params}, args={args}' ) if origin is Array: return _storage_build_array(ctx, cls) if origin is DynArray: if len(typing.get_args(cls)) != 1: raise ctx.type_err( f'Expected exactly one argument for DynArray, got {len(typing.get_args(cls))}' ) elem_type = typing.get_args(cls)[0] elem_desc = _storage_build( ctx.with_trace('during processing DynArray element type'), elem_type ) return _DynArrayDesc(elem_desc) if origin is Indirection: if len(typing.get_args(cls)) != 1: raise ctx.type_err( f'Expected exactly one argument for Indirection, got {len(typing.get_args(cls))}' ) elem_type = typing.get_args(cls)[0] elem_desc = _storage_build( ctx.with_trace('during processing Indirection element type'), elem_type ) return IndirectionTypeDesc(elem_desc) if origin is VLA: if len(typing.get_args(cls)) != 1: raise ctx.type_err( f'Expected exactly one argument for VLA, got {len(typing.get_args(cls))}' ) elem_type = typing.get_args(cls)[0] elem_desc = _storage_build( ctx.with_trace('during processing VLA element type'), elem_type ) return VLATypeDesc(elem_desc) args = typing.get_args(cls) new_generics = {} for idx, (k, v) in enumerate(zip(generic_params, args)): trace_line = f'during processing generic argument of `{reflect.repr_generic(origin, args)}`, argument `{reflect.repr_type(v)}`, at index `{idx}`' var_ctx = ctx.with_trace(trace_line) new_generics[k.__name__] = _DeferredBuilder(var_ctx, v) ctx = ctx._replace(generic_vars=new_generics).with_trace( f'declared generic variables: {", ".join(new_generics.keys())}' ) return _storage_build_struct(ctx, origin) def _storage_build_array(ctx: _BuilderCtx, cls) -> TypeDesc: args = typing.get_args(cls) if len(args) != 2: raise ctx.type_err(f'Expected exactly two arguments for Array, got {len(args)}') type_arg, size_arg = args size_arg = _resolve_raw_type(ctx, size_arg) if typing.get_origin(size_arg) is not typing.Literal: raise ctx.type_err(f'Expected Literal for Array size, got {size_arg}') lit_args = typing.get_args(size_arg) if len(lit_args) != 1 or not isinstance(lit_args[0], int): raise ctx.type_err(f'Expected single int Literal for Array size, got {lit_args}') size = lit_args[0] child_type_desc = _storage_build( ctx.with_trace('during processing Array element type'), type_arg ) res = _ArrayDesc(child_type_desc, size) return res def _storage_build_annotated(ctx: _BuilderCtx, cls) -> TypeDesc: origin = getattr(cls, '__origin__', None) if origin is None: raise ctx.type_err('typing.Annotated should have __origin__') meta: tuple = getattr(cls, '__metadata__', ()) if origin is int: for m in meta: if m == 'bigint': return _bigint_desc if isinstance(m, StaticIntMeta): return _int_descs[m][1] return _storage_build(ctx.with_trace('during processing discarded annotated'), origin) def _storage_build_struct(ctx: _BuilderCtx, cls) -> TypeDesc: if not hasattr(cls, ALLOW_STORAGE_ATTR): raise ctx.type_err( f'class is not marked for usage within storage, please, annotate it with @allow_storage', ) size: int = 0 copy_actions: list[CopyAction] = [] props: dict[str, tuple[TypeDesc, int]] = {} for prop_name, prop_value in typing.get_type_hints(cls, include_extras=True).items(): if typing.get_origin(prop_value) is typing.ClassVar: continue cur_offset: int = size note = f'during processing field `{prop_name}: {prop_value}`' try: prop_desc = _storage_build(ctx.with_trace(note), prop_value) assert isinstance(prop_desc, TypeDesc) except GenerationError: raise except BaseException as e: e.add_note(note) raise props[prop_name] = (prop_desc, cur_offset) if not getattr(cls, STORAGE_PATCHED_ATTR, False): def getter(s: RecordExtraFields, prop_name=prop_name): prop_desc, off = s.__type_desc__.props[prop_name] return prop_desc.get(s._storage_slot, s._off + off) def setter(s: RecordExtraFields, v, prop_name=prop_name): prop_desc, off = s.__type_desc__.props[prop_name] prop_desc.set(s._storage_slot, s._off + off, v) setattr(cls, prop_name, property(getter, setter)) size += prop_desc.size actions_append(copy_actions, prop_desc.copy_actions) description = _RecordDesc(size, copy_actions, props, cls) old_init = cls.__init__ if not hasattr(cls, '__gl_contract__') and not getattr( old_init, STORAGE_PATCHED_ATTR, False ): # here we may want to patch __init__ to allocate in storage gen_var_name = None for generic_name, generic_info in ctx.generic_vars.items(): if generic_info._evaluated_at_ctx is not None: gen_var_name = (generic_name, generic_info._evaluated_at_ctx) break if gen_var_name is not None: def new_init_generic(self, *args, **kwargs): if hasattr(self, '_storage_slot'): old_init(self, *args, **kwargs) return exc = gen_var_name[1].type_err( 'generic storage classes can not be instantiated with __init__, please, use gl.storage.inmem_allocate' ) exc.add_note(f'due to usage of `{gen_var_name[0]}`') exc.add_note(f'in class `{reflect.repr_type(cls)}`') raise exc new_init = new_init_generic else: def new_init_no_generic(self, *args, **kwargs): if not hasattr(self, '_storage_slot'): self._storage_slot = InmemManager().get_store_slot(ROOT_SLOT_ID) self._off = 0 self.__type_desc__ = description old_init(self, *args, **kwargs) new_init = new_init_no_generic setattr(new_init, STORAGE_PATCHED_ATTR, True) setattr(new_init, ORIGINAL_INIT_ATTR, old_init) cls.__init__ = new_init return description from .numpy import try_handle_np, populate_np_descs_if_loaded @generate_storage class _DateTime: seconds: u64 micros: u32 has_tz: bool off_days: i32 off_seconds: i32 off_micros: i32 import datetime, time _dt_desc: TypeDesc[_DateTime] = _known_descs[_DateTime] class _DateTimeDesc(TypeDesc[datetime.datetime]): __slots__ = () def __init__(self): super().__init__(_dt_desc.size, _dt_desc.copy_actions) def get(self, slot: Slot, off: int) -> datetime.datetime: dt = _dt_desc.get(slot, off) def make_date(dt_tuple: time.struct_time, tzinfo): return datetime.datetime( year=dt_tuple.tm_year, month=dt_tuple.tm_mon, day=dt_tuple.tm_mday, hour=dt_tuple.tm_hour, minute=dt_tuple.tm_min, second=dt_tuple.tm_sec, microsecond=dt.micros, tzinfo=tzinfo, ) if dt.has_tz: tz = datetime.timezone( datetime.timedelta( days=dt.off_days, seconds=dt.off_seconds, microseconds=dt.off_micros ) ) dt_tuple = time.gmtime(dt.seconds) return make_date(dt_tuple, datetime.UTC).astimezone(tz) else: tz = None dt_tuple = time.localtime(dt.seconds) return make_date(dt_tuple, tzinfo=tz) def set(self, slot: Slot, off: int, val: datetime.datetime) -> None: dt = _dt_desc.get(slot, off) tz = val.tzinfo dt.seconds = int(val.timestamp()) dt.micros = val.microsecond if tz is None: dt.has_tz = False else: dt.has_tz = True tz_off = tz.utcoffset(None) assert tz_off is not None dt.off_days = tz_off.days dt.off_seconds = tz_off.seconds dt.off_micros = tz_off.microseconds _known_descs[datetime.datetime] = _DateTimeDesc()