Source code for genlayer.py.keccak

# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

# NOTE: source code from https://github.com/ctz/keccak
# some code was truncated for use in this project
"""
Implements keccak hash

Source code is taken from `https://github.com/ctz/keccak <https://github.com/ctz/keccak>`_ (Apache 2.0 license)
"""

__all__ = ('Keccak256', 'KeccakHash')

from math import log
from operator import xor
from copy import deepcopy
from functools import reduce
import collections.abc

# The Keccak-f round constants.
RoundConstants = [
	0x0000000000000001,
	0x0000000000008082,
	0x800000000000808A,
	0x8000000080008000,
	0x000000000000808B,
	0x0000000080000001,
	0x8000000080008081,
	0x8000000000008009,
	0x000000000000008A,
	0x0000000000000088,
	0x0000000080008009,
	0x000000008000000A,
	0x000000008000808B,
	0x800000000000008B,
	0x8000000000008089,
	0x8000000000008003,
	0x8000000000008002,
	0x8000000000000080,
	0x000000000000800A,
	0x800000008000000A,
	0x8000000080008081,
	0x8000000000008080,
	0x0000000080000001,
	0x8000000080008008,
]

RotationConstants = [
	[0, 1, 62, 28, 27],
	[36, 44, 6, 55, 20],
	[3, 10, 43, 25, 39],
	[41, 45, 15, 21, 8],
	[18, 2, 61, 56, 14],
]

Masks = [(1 << i) - 1 for i in range(65)]


def bits2bytes(x):
	return (int(x) + 7) // 8


def rol(value, left, bits):
	"""
	Circularly rotate 'value' to the left,
	treating it as a quantity of the given size in bits.
	"""
	top = value >> (bits - left)
	bot = (value & Masks[bits - left]) << left
	return bot | top


def ror(value, right, bits):
	"""
	Circularly rotate 'value' to the right,
	treating it as a quantity of the given size in bits.
	"""
	top = value >> right
	bot = (value & Masks[right]) << (bits - right)
	return bot | top


def multirate_padding(used_bytes, align_bytes):
	"""
	The Keccak padding function.
	"""
	padlen = align_bytes - used_bytes
	if padlen == 0:
		padlen = align_bytes
	# note: padding done in 'internal bit ordering', wherein LSB is leftmost
	if padlen == 1:
		return [0x81]
	else:
		return [0x01] + ([0x00] * (padlen - 2)) + [0x80]


def sha_padding(used_bytes, align_bytes):
	"""
	The SHA3 padding function
	"""
	padlen = align_bytes - (used_bytes % align_bytes)
	if padlen == 1:
		return [0x86]
	elif padlen == 2:
		return [0x06, 0x80]
	else:
		return [0x06] + ([0x00] * (padlen - 2)) + [0x80]


def shake_padding(used_bytes, align_bytes):
	"""
	The SHAKE padding function
	"""
	padlen = align_bytes - (used_bytes % align_bytes)
	if padlen == 1:
		return [0x9F]
	elif padlen == 2:
		return [0x1F, 0x80]
	else:
		return [0x1F] + ([0x00] * (padlen - 2)) + [0x80]


def keccak_f(state):
	"""
	This is Keccak-f permutation.  It operates on and
	mutates the passed-in KeccakState.  It returns nothing.
	"""

	def keccak_round(a, rc):
		w, h = state.W, state.H
		rangew, rangeh = state.rangeW, state.rangeH
		lanew = state.lanew
		zero = state.zero

		# theta
		c = [reduce(xor, a[x]) for x in rangew]
		d = [0] * w
		for x in rangew:
			d[x] = c[(x - 1) % w] ^ rol(c[(x + 1) % w], 1, lanew)
			for y in rangeh:
				a[x][y] ^= d[x]

		# rho and pi
		b = zero()
		for x in rangew:
			for y in rangeh:
				b[y % w][(2 * x + 3 * y) % h] = rol(a[x][y], RotationConstants[y][x], lanew)

		# chi
		for x in rangew:
			for y in rangeh:
				a[x][y] = b[x][y] ^ ((~b[(x + 1) % w][y]) & b[(x + 2) % w][y])

		# iota
		a[0][0] ^= rc

	nr = 12 + 2 * int(log(state.lanew, 2))

	for ir in range(nr):
		keccak_round(state.s, RoundConstants[ir])


class KeccakState:
	"""
	A keccak state container.

	The state is stored as a 5x5 table of integers.
	"""

	__slots__ = ('lanew', 'bitrate', 'b', 's', 'bitrate_bytes')

	W = 5
	H = 5

	rangeW = range(W)
	rangeH = range(H)

	@staticmethod
	def zero():
		"""
		Returns an zero state table.
		"""
		return [[0] * KeccakState.W for _ in KeccakState.rangeH]

	@staticmethod
	def format(st):
		"""
		Formats the given state as hex, in natural byte order.
		"""
		rows = []

		def fmt(stx):
			return '%016x' % stx

		for y in KeccakState.rangeH:
			row = []
			for x in KeccakState.rangeW:
				row.append(fmt(st[x][y]))
			rows.append(' '.join(row))
		return '\n'.join(rows)

	@staticmethod
	def lane2bytes(s, w):
		"""
		Converts the lane s to a sequence of byte values,
		assuming a lane is w bits.
		"""
		o = []
		for b in range(0, w, 8):
			o.append((s >> b) & 0xFF)
		return o

	@staticmethod
	def bytes2lane(bb):
		"""
		Converts a sequence of byte values to a lane.
		"""
		r = 0
		for b in reversed(bb):
			r = r << 8 | b
		return r

	@staticmethod
	def ilist2bytes(bb):
		"""
		Converts a sequence of byte values to a bytestring.
		"""
		return bytes(bb)

	@staticmethod
	def bytes2ilist(ss):
		"""
		Converts a string or bytestring to a sequence of byte values.
		"""
		return map(ord, ss) if isinstance(ss, str) else list(ss)

	def __init__(self, bitrate, b):
		self.bitrate = bitrate
		self.b = b

		# only byte-aligned
		assert self.bitrate % 8 == 0
		self.bitrate_bytes = bits2bytes(self.bitrate)

		assert self.b % 25 == 0
		self.lanew = self.b // 25

		self.s = KeccakState.zero()

	def __str__(self):
		return KeccakState.format(self.s)

	def absorb(self, bb):
		"""
		Mixes in the given bitrate-length string to the state.
		"""
		assert len(bb) == self.bitrate_bytes

		bb += [0] * bits2bytes(self.b - self.bitrate)
		i = 0

		for y in self.rangeH:
			for x in self.rangeW:
				self.s[x][y] ^= KeccakState.bytes2lane(bb[i : i + 8])
				i += 8

	def squeeze(self):
		"""
		Returns the bitrate-length prefix of the state to be output.
		"""
		return self.get_bytes()[: self.bitrate_bytes]

	def get_bytes(self):
		"""
		Convert whole state to a byte string.
		"""
		out = [0] * bits2bytes(self.b)
		i = 0
		for y in self.rangeH:
			for x in self.rangeW:
				v = KeccakState.lane2bytes(self.s[x][y], self.lanew)
				out[i : i + 8] = v
				i += 8
		return out

	def set_bytes(self, bb):
		"""
		Set whole state from byte string, which is assumed
		to be the correct length.
		"""
		i = 0
		for y in self.rangeH:
			for x in self.rangeW:
				self.s[x][y] = KeccakState.bytes2lane(bb[i : i + 8])
				i += 8


class KeccakSponge:
	__slots__ = ('state', 'padfn', 'permfn', 'buffer')

	def __init__(self, bitrate, width, padfn, permfn):
		self.state = KeccakState(bitrate, width)
		self.padfn = padfn
		self.permfn = permfn
		self.buffer = []

	def copy(self):
		return deepcopy(self)

	def absorb_block(self, bb):
		assert len(bb) == self.state.bitrate_bytes
		self.state.absorb(bb)
		self.permfn(self.state)

	def absorb(self, s):
		self.buffer += s

		while len(self.buffer) >= self.state.bitrate_bytes:
			self.absorb_block(self.buffer[: self.state.bitrate_bytes])
			self.buffer = self.buffer[self.state.bitrate_bytes :]

	def absorb_final(self):
		padded = self.buffer + self.padfn(len(self.buffer), self.state.bitrate_bytes)
		self.absorb_block(padded)
		self.buffer = []

	def squeeze_once(self):
		rc = self.state.squeeze()
		self.permfn(self.state)
		return rc

	def squeeze(self, l):
		z = self.squeeze_once()
		while len(z) < l:
			z += self.squeeze_once()
		return z[:l]


[docs] class KeccakHash: """ The Keccak hash function, with a hashlib-compatible interface. """ __slots__ = ('sponge', 'digest_size', 'block_size')
[docs] def __init__(self, bitrate_bits, capacity_bits, output_bits): # our in-absorption sponge. this is never given padding assert bitrate_bits + capacity_bits in (25, 50, 100, 200, 400, 800, 1600) self.sponge = KeccakSponge( bitrate_bits, bitrate_bits + capacity_bits, multirate_padding, keccak_f ) # hashlib interface members assert output_bits % 8 == 0 self.digest_size = bits2bytes(output_bits) self.block_size = bits2bytes(bitrate_bits)
[docs] def __repr__(self): inf = ( self.sponge.state.bitrate, self.sponge.state.b - self.sponge.state.bitrate, self.digest_size * 8, ) return '<KeccakHash with r=%d, c=%d, image=%d>' % inf
def copy(self): return deepcopy(self) def update(self, s: collections.abc.Buffer) -> None: self.sponge.absorb(s) def digest(self) -> bytes: finalised = self.sponge.copy() finalised.absorb_final() digest = finalised.squeeze(self.digest_size) return KeccakState.ilist2bytes(digest) def hexdigest(self) -> str: return self.digest().hex()
[docs] @staticmethod def preset(bitrate_bits, capacity_bits, output_bits): """ Returns a factory function for the given bitrate, sponge capacity and output length. The function accepts an optional initial input, ala hashlib. """ def create(initial_input: collections.abc.Buffer | None = None): h = KeccakHash(bitrate_bits, capacity_bits, output_bits) if initial_input is not None: h.update(initial_input) return h return create
Keccak256 = KeccakHash.preset(1088, 512, 256) """ Default preset for Keccak hash that is the same as one used in eth """