Source code for genlayer.py.eth._internal.codecs

import abc
import typing
import collections.abc

from functools import partial
from dataclasses import dataclass

from genlayer.py.types import Address, SizedArray

type Tails = list[typing.Callable[[EncodeState], None]]


[docs] @dataclass class EncodeState: current_off: int result: bytearray tails: Tails def put_iloc(self): put_at = len(self.result) self.result.extend(b'\x00' * 32) off0 = self.current_off def putter(state): to_put = len(state.result) - off0 self.result[put_at : put_at + 32] = int.to_bytes(to_put, 32, 'big') self.tails.append(putter) def derived(self) -> 'EncodeState': return EncodeState( current_off=len(self.result), result=self.result, tails=[], ) def run_tails(self): current_tails = self.tails self.tails = [] while len(current_tails) != 0: for i in current_tails: i(self) current_tails = self.tails self.tails = []
[docs] @dataclass class DecoderState: mem: memoryview current_off: int current_off_0: int def fetch_head(self, le: int) -> memoryview: res = self.mem[self.current_off : self.current_off + le] self.current_off += le return res def indirected(self) -> 'DecoderState': off = int.from_bytes(self.fetch_head(32), 'big', signed=False) new_off_0 = self.current_off_0 + off return DecoderState( current_off_0=new_off_0, current_off=new_off_0, mem=self.mem, )
[docs] class Codec[T](metaclass=abc.ABCMeta): @property @abc.abstractmethod def is_dynamic(self) -> bool: raise NotImplementedError() @property @abc.abstractmethod def name(self) -> str: raise NotImplementedError() @property @abc.abstractmethod def size_here(self) -> int: raise NotImplementedError() @abc.abstractmethod def encode(self, state: EncodeState, val: T): raise NotImplementedError() @abc.abstractmethod def decode(self, state: DecoderState) -> T: raise NotImplementedError()
[docs] def __repr__(self): return self.name
[docs] class IntCodec[T: int](Codec[T]): @property def is_dynamic(self) -> bool: return False @property def size_here(self) -> int: return 32 @property def name(self) -> str: return self._name
[docs] def __init__(self, bits: int, signed: bool): if signed: self._name = f'int{bits}' else: self._name = f'uint{bits}' self.signed = signed
def encode(self, state: EncodeState, val: T): state.result.extend(val.to_bytes(32, 'big', signed=self.signed)) def decode(self, state: DecoderState) -> int: # type: ignore return int.from_bytes(state.fetch_head(32), 'big', signed=self.signed)
[docs] class BoolCodec(Codec[bool]): @property def is_dynamic(self) -> bool: return False @property def size_here(self) -> int: return 32 @property def name(self) -> str: return 'bool' def encode(self, state: EncodeState, val: bool): state.result.extend(int.to_bytes(1 if val else 0, 32, 'big')) def decode(self, state: DecoderState) -> bool: return state.fetch_head(32) != b'\x00' * 32
[docs] class AddressCodec(Codec[Address]): @property def is_dynamic(self) -> bool: return False @property def size_here(self) -> int: return 32 @property def name(self) -> str: return 'address' def encode(self, state: EncodeState, val: Address): state.result.extend(b'\x00' * 12) state.result.extend(val.as_bytes) def decode(self, state: DecoderState) -> Address: state.fetch_head(12) return Address(state.fetch_head(20))
[docs] class BytesNCodec(Codec):
[docs] def __init__(self, bytes: int): self.bytes = bytes
@property def is_dynamic(self) -> bool: return False @property def size_here(self) -> int: return 32 @property def name(self) -> str: return f'bytes{self.bytes}' def encode(self, state: EncodeState, val): assert len(val) == self.bytes state.result.extend(val) state.result.extend(b'\x00' * (32 - self.bytes)) def decode(self, state: DecoderState): res = state.fetch_head(self.bytes) state.fetch_head(32 - self.bytes) return res
[docs] class BytesStrCodec[T: str | bytes](Codec[T]):
[docs] def __init__(self, t: typing.Type[T]): self.type = t
@property def is_dynamic(self) -> bool: return True @property def size_here(self) -> int: return 32 @property def name(self) -> str: if issubclass(self.type, str): return 'string' else: return 'bytes' def encode(self, state: EncodeState, val: T): state.put_iloc() as_bytes: bytes if issubclass(self.type, str): as_bytes = val.encode('utf-8') # type: ignore else: as_bytes = val # type: ignore def put_bytes(state): state.result.extend(int.to_bytes(len(as_bytes), 32, 'big')) state.result.extend(as_bytes) state.result.extend(b'\x00' * ((32 - len(as_bytes) % 32) % 32)) state.tails.append(put_bytes) def decode(self, state: DecoderState) -> T: state = state.indirected() le = int.from_bytes(state.fetch_head(32), 'big', signed=False) as_bytes = state.fetch_head(le) if issubclass(self.type, str): return str(as_bytes, 'utf-8') # type: ignore else: return bytes(as_bytes) # type: ignore
[docs] class DynArrayCodec[T](Codec[collections.abc.Sequence[T]]):
[docs] def __init__(self, elem_encoder: Codec[T]): self.elem_encoder = elem_encoder
@property def is_dynamic(self) -> bool: return True @property def size_here(self) -> int: return 32 @property def name(self) -> str: return self.elem_encoder.name + '[]' def _encode_now(self, state: EncodeState, val: collections.abc.Sequence[T]): state.result.extend(int.to_bytes(len(val), 32, 'big')) der = state.derived() for v in val: der.tails.append(partial(self.elem_encoder.encode, val=v)) der.run_tails() def encode(self, state: EncodeState, val: collections.abc.Sequence[T]): state.put_iloc() state.tails.append(partial(self._encode_now, val=val)) def decode(self, state: DecoderState) -> collections.abc.Sequence[T]: state = state.indirected() le = int.from_bytes(state.fetch_head(32), 'big', signed=False) state.current_off_0 += 32 res = [] for i in range(le): res.append(self.elem_encoder.decode(state)) return res
[docs] class ArrayCodec[T, S: int](Codec[SizedArray[T, S]]):
[docs] def __init__(self, elem_encoder: Codec[T], elem_count: S): self.elem_encoder = elem_encoder self.elem_count = elem_count
@property def is_dynamic(self) -> bool: return self.elem_encoder.is_dynamic @property def size_here(self) -> int: if self.is_dynamic: return 32 return self.elem_encoder.size_here * self.elem_count @property def name(self) -> str: return self.elem_encoder.name + f'[{self.elem_count}]' def _encode_now(self, state: EncodeState, val: SizedArray[T, S]): if self.is_dynamic: state = state.derived() for v in val: self.elem_encoder.encode(state, v) if self.is_dynamic: state.run_tails() def encode(self, state: EncodeState, val: SizedArray[T, S]): if self.is_dynamic: state.put_iloc() state.tails.append(partial(self._encode_now, val=val)) else: self._encode_now(state, val) def decode(self, state: DecoderState) -> SizedArray[T, S]: res: list[T] = [] if self.is_dynamic: state = state.indirected() for i in range(self.elem_count): res.append(self.elem_encoder.decode(state)) return res # type: ignore
[docs] class TupleCodec[*T](Codec[tuple[*T]]):
[docs] def __init__(self, elem_encoders: tuple[Codec, ...], force_inplace: bool): self.elem_encoders = elem_encoders if force_inplace: self._is_dynamic = False else: self._is_dynamic = any(e.is_dynamic for e in elem_encoders) if self._is_dynamic: self._size_here = 32 else: self._size_here = sum(e.size_here for e in elem_encoders) self._name = '(' + ','.join(e.name for e in elem_encoders) + ')'
@property def is_dynamic(self) -> bool: return self._is_dynamic @property def size_here(self) -> int: return self._size_here @property def name(self) -> str: return self._name def _encode_now(self, state: EncodeState, val: tuple[*T]): if self._is_dynamic: der = state.derived() else: der = state for p, a in zip(self.elem_encoders, val): der.tails.append(partial(p.encode, val=a)) if self._is_dynamic: der.run_tails() def encode(self, state: EncodeState, val: tuple[*T]): assert len(val) == len(self.elem_encoders) if self._is_dynamic: state.put_iloc() state.tails.append(partial(self._encode_now, val=val)) else: self._encode_now(state, val) def decode(self, state: DecoderState) -> tuple[*T]: res = [] if self._is_dynamic: state = state.indirected() for enc in self.elem_encoders: res.append(enc.decode(state)) return tuple(res)