Merge pull request #26 from FlipsideCrypto/add/base-58-decoder

Add/non-evm-decode-udfs
This commit is contained in:
drethereum 2023-12-11 10:24:33 -07:00 committed by GitHub
commit 7bc119a94e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 212 additions and 1 deletions

View File

@ -157,5 +157,52 @@
HANDLER = 'transform'
sql: |
{{ fsc_utils.create_udf_transform_logs() | indent(4) }}
- name: {{ schema }}.udf_hex_to_base58
signature:
- [input, STRING]
return_type: TEXT
options: |
LANGUAGE PYTHON
RUNTIME_VERSION = '3.8'
HANDLER = 'transform_hex_to_base58'
sql: |
{{ fsc_utils.create_udf_hex_to_base58() | indent(4) }}
- name: {{ schema }}.udf_hex_to_bech32
signature:
- [input, STRING]
- [hrp, STRING]
return_type: TEXT
options: |
LANGUAGE PYTHON
RUNTIME_VERSION = '3.8'
HANDLER = 'transform_hex_to_bech32'
sql: |
{{ fsc_utils.create_udf_hex_to_bech32() | indent(4) }}
- name: {{ schema }}.udf_hex_to_algorand
signature:
- [input, STRING]
return_type: TEXT
options: |
LANGUAGE PYTHON
RUNTIME_VERSION = '3.8'
HANDLER = 'transform_hex_to_algorand'
sql: |
{{ fsc_utils.create_udf_hex_to_algorand() | indent(4) }}
- name: {{ schema }}.udf_hex_to_tezos
signature:
- [input, STRING]
- [prefix, STRING]
return_type: TEXT
options: |
LANGUAGE PYTHON
RUNTIME_VERSION = '3.8'
HANDLER = 'transform_hex_to_tezos'
sql: |
{{ fsc_utils.create_udf_hex_to_tezos() | indent(4) }}
{% endmacro %}

View File

@ -177,4 +177,168 @@ def transform(events: dict):
except:
return events
{% endmacro %}
{% endmacro %}
{% macro create_udf_hex_to_base58() %}
def transform_hex_to_base58(input):
if input is None or not input.startswith('0x'):
return 'Invalid input'
input = input[2:]
ALPHABET = "123456789ABCDEFGHJKLMNPQRSTUVWXYZabcdefghijkmnopqrstuvwxyz"
byte_array = bytes.fromhex(input)
num = int.from_bytes(byte_array, 'big')
encoded = ''
while num > 0:
num, remainder = divmod(num, 58)
encoded = ALPHABET[remainder] + encoded
for byte in byte_array:
if byte == 0:
encoded = '1' + encoded
else:
break
return encoded
{% endmacro %}
{% macro create_udf_hex_to_bech32() %}
def transform_hex_to_bech32(input, hrp=''):
CHARSET = "qpzry9x8gf2tvdw0s3jn54khce6mua7l"
def bech32_polymod(values):
generator = [0x3b6a57b2, 0x26508e6d, 0x1ea119fa, 0x3d4233dd, 0x2a1462b3]
checksum = 1
for value in values:
top = checksum >> 25
checksum = ((checksum & 0x1ffffff) << 5) ^ value
for i in range(5):
checksum ^= generator[i] if ((top >> i) & 1) else 0
return checksum
def bech32_hrp_expand(hrp):
return [ord(x) >> 5 for x in hrp] + [0] + [ord(x) & 31 for x in hrp]
def bech32_create_checksum(hrp, data):
values = bech32_hrp_expand(hrp) + data
polymod = bech32_polymod(values + [0, 0, 0, 0, 0, 0]) ^ 1
return [(polymod >> 5 * (5 - i)) & 31 for i in range(6)]
def bech32_convertbits(data, from_bits, to_bits, pad=True):
acc = 0
bits = 0
ret = []
maxv = (1 << to_bits) - 1
max_acc = (1 << (from_bits + to_bits - 1)) - 1
for value in data:
acc = ((acc << from_bits) | value) & max_acc
bits += from_bits
while bits >= to_bits:
bits -= to_bits
ret.append((acc >> bits) & maxv)
if pad and bits:
ret.append((acc << (to_bits - bits)) & maxv)
return ret
if input is None or not input.startswith('0x'):
return 'Invalid input'
input = input[2:]
data = bytes.fromhex(input)
data5bit = bech32_convertbits(list(data), 8, 5)
if data5bit is None:
return 'Data conversion failed'
checksum = bech32_create_checksum(hrp, data5bit)
return hrp + '1' + ''.join([CHARSET[d] for d in data5bit + checksum])
{% endmacro %}
{% macro create_udf_hex_to_algorand() %}
import hashlib
import base64
def transform_hex_to_algorand(input):
if input is None or not input.startswith('0x'):
return 'Invalid input'
input = input[2:]
public_key_bytes = bytearray.fromhex(input)
sha512_256_hash = hashlib.new('sha512_256', public_key_bytes).digest()
checksum = sha512_256_hash[-4:]
algorand_address = base64.b32encode(public_key_bytes + checksum).decode('utf-8').rstrip('=')
return algorand_address
{% endmacro %}
{% macro create_udf_hex_to_tezos() %}
import hashlib
def transform_hex_to_tezos(input, prefix):
if input is None or not input.startswith('0x'):
return 'Invalid input'
input = input[2:]
if len(input) != 40:
return 'Invalid length'
hash_bytes = bytes.fromhex(input)
prefixes = {
'tz1': '06a19f', # Ed25519
'tz2': '06a1a1', # Secp256k1
'tz3': '06a1a4' # P-256
}
if prefix not in prefixes:
return 'Invalid prefix: Must be tz1, tz2, or tz3'
prefix_bytes = bytes.fromhex(prefixes[prefix])
prefixed_hash = prefix_bytes + hash_bytes
checksum = hashlib.sha256(hashlib.sha256(prefixed_hash).digest()).digest()[:4]
full_hash = prefixed_hash + checksum
tezos_address = transform_hex_to_base58(full_hash.hex())
return tezos_address
def transform_hex_to_base58(input):
if input is None:
return None
ALPHABET = "123456789ABCDEFGHJKLMNPQRSTUVWXYZabcdefghijkmnopqrstuvwxyz"
byte_array = bytes.fromhex(input)
num = int.from_bytes(byte_array, 'big')
encoded = ''
while num > 0:
num, remainder = divmod(num, 58)
encoded = ALPHABET[remainder] + encoded
for byte in byte_array:
if byte == 0:
encoded = '1' + encoded
else:
break
return encoded
{% endmacro %}