Source code for genlayer.std.runner
"""
Module that is used to run python contracts in the default way
"""
__all__ = ('run',)
import typing
import genlayer.std._wasi as wasi
import genlayer.py.calldata
from ..py.types import Rollback
from ..py.storage._internal.generate import _known_descs
def _give_result(res_fn: typing.Callable[[], typing.Any]):
try:
res = res_fn()
except Rollback as r:
wasi.rollback(r.msg)
from .advanced import AlreadySerializedResult
if isinstance(res, AlreadySerializedResult):
wasi.contract_return(res)
else:
wasi.contract_return(genlayer.py.calldata.encode(res))
def _is_public(meth) -> bool:
if meth is None:
return False
return getattr(meth, '__public__', False)
[docs]
def run(mod):
"""
:param mod: module that contains a contract
"""
contract: type = getattr(mod, '__KNOWN_CONTRACT')
entrypoint: bytes = wasi.get_entrypoint()
mem = memoryview(entrypoint)
CALL = b'call!'
NONDET = b'nondet!'
SANDBOX = b'sandbox!'
if entrypoint.startswith(CALL):
mem = mem[len(CALL) :]
calldata = genlayer.py.calldata.decode(mem)
from . import message
if message.is_init:
meth = getattr(contract, '__init__')
if _is_public(meth):
raise Exception(f'constructor must be private')
else:
meth_name = calldata['method']
if meth_name == '__get_schema__':
if hasattr(contract, '__get_schema__'):
_give_result(contract.__get_schema__)
return
from ..py.get_schema import get_schema
import json
_give_result(lambda: json.dumps(get_schema(contract), separators=(',', ':')))
return
meth = getattr(contract, meth_name)
if not _is_public(meth):
raise Exception(f"can't call non-public methods")
from ._internal.storage import STORAGE_MAN, ROOT_STORAGE_ADDRESS
top_slot = STORAGE_MAN.get_store_slot(ROOT_STORAGE_ADDRESS)
contract_instance = _known_descs[contract].get(top_slot, 0)
_give_result(
lambda: meth(
contract_instance, *calldata.get('args', []), **calldata.get('kwargs', {})
)
)
elif entrypoint.startswith(NONDET):
mem = mem[len(NONDET) :]
# fetch leaders result length
le = int.from_bytes(mem[:4], 'little')
mem = mem[4:]
leaders_res_mem = mem[:le]
mem = mem[le:]
import cloudpickle
runner = cloudpickle.loads(mem)
if le == 0:
_give_result(runner)
else:
from ._internal import decode_sub_vm_result_retn
leaders_res = decode_sub_vm_result_retn(leaders_res_mem)
_give_result(lambda: runner(leaders_res))
elif entrypoint.startswith(SANDBOX):
mem = mem[len(SANDBOX) :]
import cloudpickle
runner = cloudpickle.loads(mem)
_give_result(lambda: cloudpickle.dumps(runner()))
else:
raise Exception(f'unknown entrypoint {entrypoint}')