Source code for genlayer.py.storage.vec

__all__ = ('DynArray', 'Array')

import typing
import math

from ._internal.core import *
from ._internal.core import _WithStorageSlot
from ._internal.desc_base_types import _u32_desc


[docs] class DynArray[T](_WithStorageSlot, collections.abc.MutableSequence[T]): """ Represents exponentially growing array (:py:class:`list` in python terms) that can be persisted on the blockchain """ _item_desc: TypeDesc
[docs] def __init__(self): """ This class can't be created with ``DynArray()`` :raises TypeError: always """ raise TypeError("this class can't be instantiated by user")
@staticmethod def _view_at(item_desc: TypeDesc, slot: StorageSlot, off: int) -> 'DynArray': slf = DynArray.__new__(DynArray) slf._item_desc = item_desc slf._storage_slot = slot slf._off = off return slf def __len__(self) -> int: return _u32_desc.get(self._storage_slot, self._off) def _map_index(self, idx: int) -> int: le = len(self) if idx < 0: idx += le if idx < 0 or idx >= le: raise IndexError(f'index out of range {idx} not in 0..<{le}') return idx @typing.overload def __getitem__(self, idx: int) -> T: ... @typing.overload def __getitem__(self, idx: slice) -> list[T]: ... def __getitem__(self, idx: int | slice) -> T | list[T]: if isinstance(idx, int): idx = self._map_index(idx) items_at = self._storage_slot.indirect(self._off) return self._item_desc.get(items_at, idx * self._item_desc.size) else: start, stop, step = idx.indices(len(self)) ret = [] step_sign = 1 if step >= 0 else -1 while start * step_sign < stop * step_sign: ret.append(self[start]) start += step return ret @typing.overload def __setitem__(self, idx: int, val: T) -> None: ... @typing.overload def __setitem__(self, idx: slice, val: collections.abc.Sequence[T]) -> None: ... def __setitem__(self, idx: int | slice, val: T | collections.abc.Sequence[T]) -> None: if isinstance(idx, int): idx = self._map_index(idx) items_at = self._storage_slot.indirect(self._off) self._item_desc.set(items_at, idx * self._item_desc.size, val) return else: start, stop, step = self._slice_to_idx(idx) new_val = typing.cast(collections.abc.Sequence[T], val) left_in_new = len(new_val) if isinstance(idx.step, int) and idx.step < 0: new_val = reversed(new_val) left_in_range = (stop - start) // step new_it = iter(new_val) # just reassign existing values common_values_cnt = min(left_in_new, left_in_range) for i in range(common_values_cnt): self[start + i * step] = next(new_it) start += common_values_cnt left_in_range -= common_values_cnt left_in_new -= common_values_cnt # if we have other values we must remove them if left_in_range > 0: del self[start:stop:step] # if we have some unassigned we must insert it here elif left_in_new > 0: # move current to the right items_at = self._storage_slot.indirect(self._off) for i in range(len(self) - 1, start - 1, -1): self._item_desc.set( items_at, (i + left_in_new) * self._item_desc.size, self[i] ) for i in range(left_in_new): self._item_desc.set( items_at, (start + i) * self._item_desc.size, next(new_it) ) _u32_desc.set(self._storage_slot, self._off, len(self) + left_in_new) def _slice_to_idx(self, s: slice) -> tuple[int, int, int]: start, stop, step = s.indices(len(self)) if step < 0: step *= -1 start, stop = stop, start # stop += (step - (stop - start) % step) % step start = stop - (stop - start - 1) // step * step stop += 1 return start, stop, step @typing.overload def __delitem__(self, idx: int) -> None: ... @typing.overload def __delitem__(self, idx: slice) -> None: ... def __delitem__(self, idx: int | slice) -> None: if isinstance(idx, int): start = self._map_index(idx) stop = start + 1 step = 1 else: start, stop, step = self._slice_to_idx(idx) if stop <= start: return next_deletion = start insert_idx = start for i in range(start, len(self)): if i == next_deletion: next_deletion = i + step if next_deletion >= stop: next_deletion = -1 continue self[insert_idx] = self[i] insert_idx += 1 _u32_desc.set(self._storage_slot, self._off, insert_idx)
[docs] def insert(self, index: int, value: T) -> None: index = self._map_index(index) old_len = len(self) _u32_desc.set(self._storage_slot, self._off, old_len + 1) for i in range(old_len, index, -1): self[i] = self[i - 1] self[index] = value
def __iter__(self) -> typing.Any: for i in range(len(self)): yield self[i]
[docs] def append(self, value: T) -> None: le = len(self) _u32_desc.set(self._storage_slot, self._off, le + 1) items_at = self._storage_slot.indirect(self._off) return self._item_desc.set(items_at, le * self._item_desc.size, value)
[docs] def append_new_get(self) -> T: le = len(self) _u32_desc.set(self._storage_slot, self._off, le + 1) items_at = self._storage_slot.indirect(self._off) return self._item_desc.get(items_at, le * self._item_desc.size)
[docs] def pop(self) -> None: le = len(self) if le == 0: raise Exception("can't pop from empty array") _u32_desc.set(self._storage_slot, self._off, le - 1)
[docs] def __repr__(self) -> str: ret: list[str] = [] ret.append('[') comma = False for x in self: if comma: ret.append(',') comma = True ret.append(repr(x)) ret.append(']') return ''.join(ret)
class _VecCopyAction(ComplexCopyAction): def __init__(self, item_desc: TypeDesc): self._item_desc = item_desc def copy(self, frm: StorageSlot, frm_off: int, to: StorageSlot, to_off: int) -> int: le = _u32_desc.get(frm, frm_off) _u32_desc.set(to, to_off, le) cop = self._item_desc.copy_actions to_indirect = to.indirect(to_off) frm_indirect = frm.indirect(frm_off) if len(cop) == 1 and isinstance(cop[0], int): to_indirect.write(0, frm_indirect.read(0, cop[0] * le)) else: cum_off = 0 for _i in range(le): cum_off += actions_apply_copy(cop, to_indirect, cum_off, frm_indirect, cum_off) return _u32_desc.size def __repr__(self): return '_VecCopyAction' class _DynArrayDesc(TypeDesc): _item_desc: TypeDesc def __init__(self, it_desc: TypeDesc): self._item_desc = it_desc self._cop = _VecCopyAction(it_desc) TypeDesc.__init__(self, _u32_desc.size, [self._cop]) def get(self, slot: StorageSlot, off: int) -> DynArray: return DynArray._view_at(self._item_desc, slot, off) def set(self, slot: StorageSlot, off: int, val: DynArray | list) -> None: if isinstance(val, list): _u32_desc.set(slot, off, len(val)) indirect_slot = slot.indirect(off) for i in range(len(val)): self._item_desc.set(indirect_slot, i * self._item_desc.size, val[i]) return if val._item_desc is not self: raise Exception('incompatible vector type') self._cop.copy(val._storage_slot, val._off, slot, off) def __eq__(self, r): if not isinstance(r, _DynArrayDesc): return False return self._item_desc == r._item_desc def __hash__(self): return hash(('_VecDesc', hash(self._item_desc))) def __repr__(self): return f'_VecDesc[{self._item_desc!r}]'
[docs] class Array[T, S: int](_WithStorageSlot, collections.abc.Sequence): """ Constantly sized array that can be persisted on the blockchain """ _item_desc: TypeDesc _len: int
[docs] def __init__(self): """ This class can't be created with ``Array()`` :raises TypeError: always """ raise TypeError("this class can't be instantiated by user")
def __len__(self) -> int: return self._len @staticmethod def _view_at(item_desc: TypeDesc, le: int, slot: StorageSlot, off: int) -> 'Array': slf = Array.__new__(Array) slf._item_desc = item_desc slf._len = le slf._storage_slot = slot slf._off = off return slf def _map_index(self, idx: int) -> int: le = len(self) if idx < 0: idx += le if idx < 0 or idx >= le: raise IndexError(f'index out of range {idx} not in 0..<{le}') return idx def __getitem__(self, idx: int) -> T: idx = self._map_index(idx) return self._item_desc.get( self._storage_slot, self._off + idx * self._item_desc.size ) def __setitem__(self, idx: int, val: T) -> None: idx = self._map_index(idx) self._item_desc.set(self._storage_slot, self._off + idx * self._item_desc.size, val) def __iter__(self): for i in range(len(self)): yield self[i]
class _ArrayDesc(TypeDesc): _item_desc: TypeDesc _len: int def __init__(self, it_desc: TypeDesc, le: int): self._item_desc = it_desc self._len = le cop: list[CopyAction] = [] for _i in range(le): actions_append(cop, it_desc.copy_actions) TypeDesc.__init__(self, self._item_desc.size * le, cop) def get(self, slot: StorageSlot, off: int) -> Array: return Array._view_at(self._item_desc, self._len, slot, off) def set(self, slot: StorageSlot, off: int, val: Array | list) -> None: assert len(val) == self._len if isinstance(val, list): for i in range(self._len): self._item_desc.set(slot, off + i * self._item_desc.size, val[i]) else: actions_apply_copy(self.copy_actions, slot, off, val._storage_slot, val._off) def __eq__(self, r): if not isinstance(r, Array): return False return self._item_desc == r._item_desc and self._len == r._len def __hash__(self): return hash(('_FixedVecDesc', hash(self._item_desc), self._len)) def __repr__(self): return f'_FixedVecDesc[{self._item_desc!r}, {self._len}]'