mirror of
https://github.com/FlipsideCrypto/fsc-utils.git
synced 2026-02-06 10:56:49 +00:00
1013 lines
37 KiB
SQL
1013 lines
37 KiB
SQL
{% macro python_hex_to_int() %}
|
|
def hex_to_int(hex) -> str:
|
|
"""
|
|
Converts hex (of any size) to int (as a string). Snowflake and java script can only handle up to 64-bit (38 digits of precision)
|
|
hex_to_int('200000000000000000000000000000211');
|
|
>> 680564733841876926926749214863536423441
|
|
hex_to_int('0x200000000000000000000000000000211');
|
|
>> 680564733841876926926749214863536423441
|
|
hex_to_int(NULL);
|
|
>> NULL
|
|
"""
|
|
return (str(int(hex, 16)) if hex and hex != "0x" else None)
|
|
{% endmacro %}
|
|
|
|
|
|
{% macro python_udf_hex_to_int_with_encoding() %}
|
|
def hex_to_int(encoding, hex) -> str:
|
|
"""
|
|
Converts hex (of any size) to int (as a string). Snowflake and java script can only handle up to 64-bit (38 digits of precision)
|
|
hex_to_int('hex', '200000000000000000000000000000211');
|
|
>> 680564733841876926926749214863536423441
|
|
hex_to_int('hex', '0x200000000000000000000000000000211');
|
|
>> 680564733841876926926749214863536423441
|
|
hex_to_int('hex', NULL);
|
|
>> NULL
|
|
hex_to_int('s2c', 'ffffffffffffffffffffffffffffffffffffffffffffffffffffffffe5b83acf');
|
|
>> -440911153
|
|
"""
|
|
if not hex:
|
|
return None
|
|
if encoding.lower() == 's2c':
|
|
if hex[0:2].lower() != '0x':
|
|
hex = f'0x{hex}'
|
|
|
|
bits = len(hex[2:])*4
|
|
value = int(hex, 0)
|
|
if value & (1 << (bits-1)):
|
|
value -= 1 << bits
|
|
return str(value)
|
|
else:
|
|
return str(int(hex, 16))
|
|
{% endmacro %}
|
|
|
|
{% macro create_udf_keccak256() %}
|
|
from Crypto.Hash import keccak
|
|
|
|
def udf_encode(event_name):
|
|
keccak_hash = keccak.new(digest_bits=256)
|
|
keccak_hash.update(event_name.encode('utf-8'))
|
|
return '0x' + keccak_hash.hexdigest()
|
|
{% endmacro %}
|
|
|
|
{% macro create_udf_evm_text_signature() %}
|
|
|
|
def get_simplified_signature(abi):
|
|
def generate_signature(inputs):
|
|
signature_parts = []
|
|
for input_data in inputs:
|
|
if 'components' in input_data:
|
|
component_signature_parts = []
|
|
components = input_data['components']
|
|
component_signature_parts.extend(generate_signature(components))
|
|
component_signature_parts[-1] = component_signature_parts[-1].rstrip(",")
|
|
if input_data['type'].endswith('[]'):
|
|
signature_parts.append("(" + "".join(component_signature_parts) + ")[],")
|
|
else:
|
|
signature_parts.append("(" + "".join(component_signature_parts) + "),")
|
|
else:
|
|
signature_parts.append(input_data['type'].replace('enum ', '').replace(' payable', '') + ",")
|
|
return signature_parts
|
|
|
|
signature_parts = [abi['name'] + "("]
|
|
signature_parts.extend(generate_signature(abi['inputs']))
|
|
signature_parts[-1] = signature_parts[-1].rstrip(",") + ")"
|
|
return "".join(signature_parts)
|
|
{% endmacro %}
|
|
|
|
{% macro create_udf_decimal_adjust() %}
|
|
|
|
from decimal import Decimal, ROUND_DOWN
|
|
|
|
def custom_divide(input, adjustment):
|
|
try:
|
|
if adjustment is None or input is None:
|
|
return None
|
|
|
|
# Perform the division using Decimal type
|
|
result = Decimal(input) / pow(10, Decimal(adjustment))
|
|
|
|
# Determine the number of decimal places in the result
|
|
decimal_places = max(0, -result.as_tuple().exponent)
|
|
|
|
# Convert the result to a string representation without scientific notation and with dynamic decimal precision
|
|
result_str = "{:.{prec}f}".format(result, prec=decimal_places)
|
|
|
|
return result_str
|
|
except Exception as e:
|
|
return None
|
|
{% endmacro %}
|
|
|
|
{% macro create_udf_cron_to_prior_timestamps() %}
|
|
import croniter
|
|
import datetime
|
|
|
|
class TimestampGenerator:
|
|
|
|
def __init__(self):
|
|
pass
|
|
|
|
def process(self, workflow_name, workflow_schedule):
|
|
for timestamp in self.generate_timestamps(workflow_name, workflow_schedule):
|
|
yield (workflow_name, workflow_schedule, timestamp)
|
|
|
|
def generate_timestamps(self, workflow_name, workflow_schedule):
|
|
# Create a cron iterator object
|
|
cron = croniter.croniter(workflow_schedule)
|
|
|
|
# Generate timestamps for the prev 10 runs
|
|
timestamps = []
|
|
for i in range(10):
|
|
prev_run = cron.get_prev(datetime.datetime)
|
|
timestamps.append(prev_run)
|
|
|
|
return timestamps
|
|
{% endmacro %}
|
|
|
|
{% macro create_udf_transform_logs() %}
|
|
|
|
from copy import deepcopy
|
|
|
|
def transform_tuple(components: list, values: list):
|
|
transformed_values = []
|
|
for i, component in enumerate(components):
|
|
if i < len(values):
|
|
if component["type"] == "tuple":
|
|
transformed_values.append({"value": transform_tuple(component["components"], values[i]), **component})
|
|
elif component["type"] == "tuple[]":
|
|
if not values[i]:
|
|
transformed_values.append({"value": [], **component})
|
|
continue
|
|
sub_values = [transform_tuple(component["components"], v) for v in values[i]]
|
|
transformed_values.append({"value": sub_values, **component})
|
|
else:
|
|
transformed_values.append({"value": values[i], **component})
|
|
return {item["name"]: item["value"] for item in transformed_values}
|
|
|
|
def transform_event(event: dict):
|
|
new_event = deepcopy(event)
|
|
if new_event.get("components"):
|
|
components = new_event.get("components")
|
|
|
|
if not new_event["value"]:
|
|
return new_event
|
|
|
|
if isinstance(new_event["value"][0], list):
|
|
result_list = []
|
|
for value_set in new_event["value"]:
|
|
result_list.append(transform_tuple(components, value_set))
|
|
new_event["value"] = result_list
|
|
|
|
else:
|
|
new_event["value"] = transform_tuple(components, new_event["value"])
|
|
|
|
return new_event
|
|
|
|
else:
|
|
return event
|
|
|
|
def transform(events: dict):
|
|
try:
|
|
results = [
|
|
transform_event(event) if event.get("decoded") else event
|
|
for event in events["data"]
|
|
]
|
|
events["data"] = results
|
|
return events
|
|
except:
|
|
return events
|
|
|
|
{% endmacro %}
|
|
|
|
{% macro create_udf_base58_to_hex() %}
|
|
|
|
def transform_base58_to_hex(base58):
|
|
if base58 is None:
|
|
return 'Invalid input'
|
|
|
|
ALPHABET = "123456789ABCDEFGHJKLMNPQRSTUVWXYZabcdefghijkmnopqrstuvwxyz"
|
|
base_count = len(ALPHABET)
|
|
|
|
num = 0
|
|
leading_zeros = 0
|
|
|
|
for char in base58:
|
|
if char == '1':
|
|
leading_zeros += 1
|
|
else:
|
|
break
|
|
|
|
for char in base58:
|
|
num *= base_count
|
|
if char in ALPHABET:
|
|
num += ALPHABET.index(char)
|
|
else:
|
|
return 'Invalid character in input'
|
|
|
|
hex_string = hex(num)[2:]
|
|
|
|
if len(hex_string) % 2 != 0:
|
|
hex_string = '0' + hex_string
|
|
|
|
hex_leading_zeros = '00' * leading_zeros
|
|
|
|
return '0x' + hex_leading_zeros + hex_string
|
|
|
|
{% endmacro %}
|
|
|
|
{% macro create_udf_hex_to_base58() %}
|
|
|
|
def transform_hex_to_base58(input):
|
|
if input is None or not input.startswith('0x'):
|
|
return 'Invalid input'
|
|
|
|
input = input[2:]
|
|
|
|
ALPHABET = "123456789ABCDEFGHJKLMNPQRSTUVWXYZabcdefghijkmnopqrstuvwxyz"
|
|
byte_array = bytes.fromhex(input)
|
|
num = int.from_bytes(byte_array, 'big')
|
|
|
|
encoded = ''
|
|
while num > 0:
|
|
num, remainder = divmod(num, 58)
|
|
encoded = ALPHABET[remainder] + encoded
|
|
|
|
for byte in byte_array:
|
|
if byte == 0:
|
|
encoded = '1' + encoded
|
|
else:
|
|
break
|
|
|
|
return encoded
|
|
|
|
{% endmacro %}
|
|
|
|
{% macro create_udf_hex_to_bech32() %}
|
|
|
|
def transform_hex_to_bech32(input, hrp=''):
|
|
CHARSET = "qpzry9x8gf2tvdw0s3jn54khce6mua7l"
|
|
|
|
def bech32_polymod(values):
|
|
generator = [0x3b6a57b2, 0x26508e6d, 0x1ea119fa, 0x3d4233dd, 0x2a1462b3]
|
|
checksum = 1
|
|
for value in values:
|
|
top = checksum >> 25
|
|
checksum = ((checksum & 0x1ffffff) << 5) ^ value
|
|
for i in range(5):
|
|
checksum ^= generator[i] if ((top >> i) & 1) else 0
|
|
return checksum
|
|
|
|
def bech32_hrp_expand(hrp):
|
|
return [ord(x) >> 5 for x in hrp] + [0] + [ord(x) & 31 for x in hrp]
|
|
|
|
def bech32_create_checksum(hrp, data):
|
|
values = bech32_hrp_expand(hrp) + data
|
|
polymod = bech32_polymod(values + [0, 0, 0, 0, 0, 0]) ^ 1
|
|
return [(polymod >> 5 * (5 - i)) & 31 for i in range(6)]
|
|
|
|
def bech32_convertbits(data, from_bits, to_bits, pad=True):
|
|
acc = 0
|
|
bits = 0
|
|
ret = []
|
|
maxv = (1 << to_bits) - 1
|
|
max_acc = (1 << (from_bits + to_bits - 1)) - 1
|
|
for value in data:
|
|
acc = ((acc << from_bits) | value) & max_acc
|
|
bits += from_bits
|
|
while bits >= to_bits:
|
|
bits -= to_bits
|
|
ret.append((acc >> bits) & maxv)
|
|
if pad and bits:
|
|
ret.append((acc << (to_bits - bits)) & maxv)
|
|
return ret
|
|
|
|
if input is None or not input.startswith('0x'):
|
|
return 'Invalid input'
|
|
|
|
input = input[2:]
|
|
|
|
data = bytes.fromhex(input)
|
|
data5bit = bech32_convertbits(list(data), 8, 5)
|
|
|
|
if data5bit is None:
|
|
return 'Data conversion failed'
|
|
|
|
checksum = bech32_create_checksum(hrp, data5bit)
|
|
|
|
return hrp + '1' + ''.join([CHARSET[d] for d in data5bit + checksum])
|
|
|
|
{% endmacro %}
|
|
|
|
{% macro create_udf_hex_to_algorand() %}
|
|
|
|
import hashlib
|
|
import base64
|
|
|
|
def transform_hex_to_algorand(input):
|
|
if input is None or not input.startswith('0x'):
|
|
return 'Invalid input'
|
|
|
|
input = input[2:]
|
|
public_key_bytes = bytearray.fromhex(input)
|
|
|
|
sha512_256_hash = hashlib.new('sha512_256', public_key_bytes).digest()
|
|
|
|
checksum = sha512_256_hash[-4:]
|
|
|
|
algorand_address = base64.b32encode(public_key_bytes + checksum).decode('utf-8').rstrip('=')
|
|
|
|
return algorand_address
|
|
|
|
{% endmacro %}
|
|
|
|
{% macro create_udf_hex_to_tezos() %}
|
|
|
|
import hashlib
|
|
|
|
def transform_hex_to_tezos(input, prefix):
|
|
if input is None or not input.startswith('0x'):
|
|
return 'Invalid input'
|
|
|
|
input = input[2:]
|
|
|
|
if len(input) != 40:
|
|
return 'Invalid length'
|
|
|
|
hash_bytes = bytes.fromhex(input)
|
|
|
|
prefixes = {
|
|
'tz1': '06a19f', # Ed25519
|
|
'tz2': '06a1a1', # Secp256k1
|
|
'tz3': '06a1a4' # P-256
|
|
}
|
|
|
|
if prefix not in prefixes:
|
|
return 'Invalid prefix: Must be tz1, tz2, or tz3'
|
|
|
|
prefix_bytes = bytes.fromhex(prefixes[prefix])
|
|
|
|
prefixed_hash = prefix_bytes + hash_bytes
|
|
|
|
checksum = hashlib.sha256(hashlib.sha256(prefixed_hash).digest()).digest()[:4]
|
|
|
|
full_hash = prefixed_hash + checksum
|
|
|
|
tezos_address = transform_hex_to_base58(full_hash.hex())
|
|
|
|
return tezos_address
|
|
|
|
def transform_hex_to_base58(input):
|
|
if input is None:
|
|
return None
|
|
|
|
ALPHABET = "123456789ABCDEFGHJKLMNPQRSTUVWXYZabcdefghijkmnopqrstuvwxyz"
|
|
byte_array = bytes.fromhex(input)
|
|
num = int.from_bytes(byte_array, 'big')
|
|
|
|
encoded = ''
|
|
while num > 0:
|
|
num, remainder = divmod(num, 58)
|
|
encoded = ALPHABET[remainder] + encoded
|
|
|
|
for byte in byte_array:
|
|
if byte == 0:
|
|
encoded = '1' + encoded
|
|
else:
|
|
break
|
|
|
|
return encoded
|
|
|
|
{% endmacro %}
|
|
|
|
{% macro create_udf_detect_overflowed_responses() %}
|
|
|
|
import pandas as pd
|
|
from snowflake.snowpark.files import SnowflakeFile
|
|
|
|
VARCHAR_MAX = 16_777_216
|
|
def main(file_url, index_cols):
|
|
with SnowflakeFile.open(file_url, 'rb') as f:
|
|
df = pd.read_json(f, lines=True, compression='gzip')
|
|
data_length = df["data"].astype(str).apply(len)
|
|
return df[data_length > VARCHAR_MAX][index_cols].values.tolist()
|
|
|
|
{% endmacro %}
|
|
|
|
{% macro create_udtf_flatten_overflowed_responses() %}
|
|
|
|
import logging
|
|
import simplejson as json
|
|
|
|
import numpy as np
|
|
import pandas as pd
|
|
from snowflake.snowpark.files import SnowflakeFile
|
|
|
|
VARCHAR_MAX = 16_777_216
|
|
|
|
logger = logging.getLogger("udtf_flatten_overflowed_responses")
|
|
|
|
class Flatten:
|
|
"""
|
|
Recursive function to flatten a nested JSON file
|
|
"""
|
|
|
|
def __init__(self, mode: str, exploded_key: list) -> None:
|
|
self.mode = mode
|
|
self.exploded_key = exploded_key
|
|
|
|
def _flatten_response(
|
|
self,
|
|
response_key: str,
|
|
responses: str,
|
|
block_number: int,
|
|
metadata: dict,
|
|
seq_index: int = 0,
|
|
path: str = "",
|
|
):
|
|
"""
|
|
Example:
|
|
|
|
input: {"a":1, "b":[77,88], "c": {"d":"X"}}
|
|
|
|
output:
|
|
- SEQ: A unique sequence number associated with the input record; the sequence is not guaranteed to be gap-free or ordered in any particular way.
|
|
- KEY: For maps or objects, this column contains the key to the exploded value.
|
|
- PATH: The path to the element within a data structure which needs to be flattened.
|
|
- INDEX: The index of the element, if it is an array; otherwise NULL.
|
|
- VALUE_: The value of the element of the flattened array/object.
|
|
|
|
"""
|
|
exploded_data = []
|
|
if self.mode == "array":
|
|
check_mode = isinstance(responses, list)
|
|
elif self.mode == "dict":
|
|
check_mode = isinstance(responses, dict)
|
|
elif self.mode == "both":
|
|
check_mode = isinstance(responses, list) or isinstance(responses, dict)
|
|
|
|
if check_mode:
|
|
if isinstance(responses, dict):
|
|
looped_keys = responses.keys()
|
|
for key in looped_keys:
|
|
next_path = f"{path}.{key}" if path else key
|
|
index = None
|
|
exploded_data.append(
|
|
{
|
|
"block_number": block_number,
|
|
"metadata": metadata,
|
|
"seq": seq_index,
|
|
"key": key,
|
|
"path": next_path,
|
|
"index": index,
|
|
"value_": responses[key],
|
|
}
|
|
)
|
|
exploded_data.extend(
|
|
self._flatten_response(
|
|
key,
|
|
responses[key],
|
|
block_number,
|
|
metadata,
|
|
seq_index,
|
|
next_path,
|
|
)
|
|
)
|
|
|
|
elif isinstance(responses, list):
|
|
looped_keys = range(len(responses))
|
|
if response_key in self.exploded_key or len(self.exploded_key) == 0:
|
|
for item_i, item in enumerate(responses):
|
|
if response_key == "result":
|
|
seq_index += 1
|
|
index = item_i
|
|
exploded_data.append(
|
|
{
|
|
"block_number": block_number,
|
|
"metadata": metadata,
|
|
"seq": seq_index,
|
|
"key": None,
|
|
"path": f"{path}[{item_i}]",
|
|
"index": index,
|
|
"value_": item,
|
|
}
|
|
)
|
|
exploded_data.extend(
|
|
self._flatten_response(
|
|
item_i,
|
|
item,
|
|
block_number,
|
|
metadata,
|
|
seq_index,
|
|
f"{path}[{item_i}]",
|
|
)
|
|
)
|
|
|
|
return exploded_data
|
|
|
|
class FlattenRows:
|
|
"""
|
|
Recursive function to flatten a given JSON file from Snowflake stage
|
|
"""
|
|
def process(self, file_url: str, index_cols: list, index_vals: list):
|
|
with SnowflakeFile.open(file_url, 'rb') as f:
|
|
df = pd.read_json(f, lines=True, compression='gzip')
|
|
|
|
df.set_index(index_cols, inplace=True, drop=False)
|
|
df = df.loc[index_vals]
|
|
|
|
flattener = Flatten(mode="both", exploded_key=[])
|
|
|
|
df["value_"] = df.apply(
|
|
lambda x: flattener._flatten_response(
|
|
block_number=x["block_number"], metadata=x["metadata"], responses=x["data"], response_key=None
|
|
),
|
|
axis="columns",
|
|
)
|
|
df["value_"] = df["value_"].apply(pd.DataFrame.from_records)
|
|
df["index_cols"] = df.index
|
|
df = df[["index_cols", "value_"]]
|
|
flattened = pd.concat(
|
|
df["value_"].values.tolist(), keys=df["index_cols"].values.tolist()
|
|
).droplevel(-1)
|
|
|
|
cleansed = flattened.replace({np.nan: None})
|
|
|
|
overflow = cleansed["value_"].astype(str).apply(len) > VARCHAR_MAX
|
|
|
|
cleansed.loc[overflow, ["value_"]] = None
|
|
temp_index_cols = list(range(len(index_cols)))
|
|
cleansed = cleansed.reset_index(names=temp_index_cols, drop=False)
|
|
cleansed["index_cols"] = cleansed[temp_index_cols].apply(list, axis=1)
|
|
cleansed.drop(columns=temp_index_cols, inplace=True, errors="ignore")
|
|
return list(cleansed[np.roll(cleansed.columns.values, 1).tolist()].itertuples(index=False, name=None))
|
|
{% endmacro %}
|
|
|
|
{% macro create_udf_decompress_zlib() %}
|
|
import zlib
|
|
import codecs
|
|
|
|
def decompress_zlib(compressed_string):
|
|
try:
|
|
if not compressed_string:
|
|
return None
|
|
|
|
# Remove b prefix and suffix if present
|
|
if compressed_string.startswith("b'") and compressed_string.endswith("'"):
|
|
compressed_string = compressed_string[2:-1]
|
|
elif compressed_string.startswith('b"') and compressed_string.endswith('"'):
|
|
compressed_string = compressed_string[2:-1]
|
|
|
|
# Decode the escaped string to bytes
|
|
compressed_bytes = codecs.decode(compressed_string, 'unicode_escape')
|
|
|
|
# Convert to bytes if string
|
|
if isinstance(compressed_bytes, str):
|
|
compressed_bytes = compressed_bytes.encode('latin-1')
|
|
|
|
# Decompress the zlib data
|
|
decompressed = zlib.decompress(compressed_bytes)
|
|
|
|
# Return as UTF-8 string
|
|
return decompressed.decode('utf-8')
|
|
except Exception as e:
|
|
return f"Error decompressing: {str(e)}"
|
|
{% endmacro %}
|
|
|
|
{% macro create_udf_stablecoin_data_parse() %}
|
|
import re
|
|
|
|
class udf_stablecoin_data_parse:
|
|
def process(self, peggeddata_content):
|
|
"""Main parsing function"""
|
|
|
|
def extract_field_value(obj_text, field_name):
|
|
"""Extract field value from object text using regex patterns"""
|
|
|
|
# Handle different field patterns
|
|
patterns = [
|
|
rf'{field_name}\s*:\s*"([^"]*)"',
|
|
rf"{field_name}\s*:\s*'([^']*)'",
|
|
rf'{field_name}\s*:\s*`([^`]*)`',
|
|
rf'{field_name}\s*:\s*(true|false|null|undefined)',
|
|
rf'{field_name}\s*:\s*([^,}}\n]+)'
|
|
]
|
|
|
|
for pattern in patterns:
|
|
match = re.search(pattern, obj_text, re.IGNORECASE | re.DOTALL)
|
|
if match:
|
|
value = match.group(1).strip()
|
|
|
|
# Clean up the value
|
|
value = re.sub(r'[,}}\n]', '', value).strip()
|
|
|
|
if value.lower() in ('null', 'undefined', ''):
|
|
return None
|
|
|
|
# Handle boolean values
|
|
if value.lower() == 'true':
|
|
return True
|
|
if value.lower() == 'false':
|
|
return False
|
|
|
|
return value
|
|
|
|
return None
|
|
|
|
def convert_value(value, expected_type):
|
|
"""Convert value to appropriate type"""
|
|
if value is None:
|
|
return None
|
|
|
|
if expected_type == 'BOOLEAN':
|
|
if isinstance(value, bool):
|
|
return value
|
|
if isinstance(value, str):
|
|
lower = value.lower()
|
|
if lower == 'true':
|
|
return True
|
|
if lower == 'false':
|
|
return False
|
|
return None
|
|
|
|
return str(value) if value is not None else None
|
|
|
|
try:
|
|
# Find the main array content - make the regex non-greedy but capture everything
|
|
array_match = re.search(r'export\s+default\s*\[(.*)\];?\s*$', peggeddata_content, re.DOTALL)
|
|
if not array_match:
|
|
raise Exception('Could not find exported array in peggedData content')
|
|
|
|
array_content = array_match.group(1).strip()
|
|
|
|
# Use a simpler regex-based approach to split objects
|
|
# Remove comments and clean up the array content first
|
|
# Instead of removing line comments entirely, just remove the // markers but keep the content
|
|
clean_content = re.sub(r'^\s*//\s*', '', array_content, flags=re.MULTILINE) # Remove // at start of lines
|
|
clean_content = re.sub(r'\n\s*//\s*', '\n', clean_content) # Remove // from middle of lines
|
|
# Instead of removing block comments entirely, just remove the comment markers but keep the content
|
|
clean_content = re.sub(r'/\*', '', clean_content) # Remove opening block comment markers
|
|
clean_content = re.sub(r'\*/', '', clean_content) # Remove closing block comment markers
|
|
|
|
# Find all objects using regex - look for {...} patterns
|
|
# This is more reliable than manual parsing
|
|
object_pattern = r'\{[^{}]*(?:\{[^{}]*\}[^{}]*)*\}'
|
|
matches = re.finditer(object_pattern, clean_content, re.DOTALL)
|
|
|
|
objects = []
|
|
for match in matches:
|
|
obj_text = match.group(0).strip()
|
|
if obj_text and len(obj_text) > 10: # Filter out small matches
|
|
objects.append(obj_text)
|
|
|
|
# If the simple regex didn't work, try a more complex nested approach
|
|
if not objects:
|
|
# More complex regex for nested objects
|
|
nested_pattern = r'\{(?:[^{}]|(?:\{[^{}]*\}))*\}'
|
|
nested_matches = re.findall(nested_pattern, clean_content, re.DOTALL)
|
|
objects = [obj.strip() for obj in nested_matches if len(obj.strip()) > 20]
|
|
|
|
# Still no objects? Try manual parsing with better logic
|
|
if not objects:
|
|
objects = []
|
|
current_object = ''
|
|
brace_count = 0
|
|
in_string = False
|
|
string_char = ''
|
|
i = 0
|
|
|
|
while i < len(clean_content):
|
|
char = clean_content[i]
|
|
|
|
# Handle string literals
|
|
if not in_string and char in ('"', "'", '`'):
|
|
in_string = True
|
|
string_char = char
|
|
elif in_string and char == string_char:
|
|
# Check if it's escaped
|
|
if i > 0 and clean_content[i-1] != '\\':
|
|
in_string = False
|
|
string_char = ''
|
|
|
|
# Handle braces only when not in string
|
|
if not in_string:
|
|
if char == '{':
|
|
if brace_count == 0:
|
|
current_object = '{' # Start new object
|
|
else:
|
|
current_object += char
|
|
brace_count += 1
|
|
elif char == '}':
|
|
current_object += char
|
|
brace_count -= 1
|
|
if brace_count == 0 and current_object.strip():
|
|
# Complete object found
|
|
objects.append(current_object.strip())
|
|
current_object = ''
|
|
elif brace_count > 0:
|
|
current_object += char
|
|
else:
|
|
if brace_count > 0:
|
|
current_object += char
|
|
|
|
i += 1
|
|
|
|
if not objects:
|
|
# Last resort: try splitting on id: pattern
|
|
id_splits = re.split(r'\n\s*id:\s*["\']', clean_content)
|
|
if len(id_splits) > 1:
|
|
objects = []
|
|
for i, part in enumerate(id_splits[1:], 1): # Skip first empty part
|
|
# Try to reconstruct the object
|
|
obj_start = clean_content.find(f'id:', clean_content.find(part))
|
|
if obj_start > 0:
|
|
# Look backwards for opening brace
|
|
brace_start = clean_content.rfind('{', 0, obj_start)
|
|
if brace_start >= 0:
|
|
# Look forward for matching closing brace
|
|
brace_count = 0
|
|
for j in range(brace_start, len(clean_content)):
|
|
if clean_content[j] == '{':
|
|
brace_count += 1
|
|
elif clean_content[j] == '}':
|
|
brace_count -= 1
|
|
if brace_count == 0:
|
|
obj_text = clean_content[brace_start:j+1].strip()
|
|
if len(obj_text) > 20:
|
|
objects.append(obj_text)
|
|
break
|
|
|
|
if not objects:
|
|
raise Exception(f'No objects found after all parsing attempts. Sample content: {clean_content[:500]}...')
|
|
|
|
# Process each object and extract the required fields
|
|
for i, obj_text in enumerate(objects):
|
|
try:
|
|
data = {
|
|
'id': extract_field_value(obj_text, 'id'),
|
|
'name': extract_field_value(obj_text, 'name'),
|
|
'address': extract_field_value(obj_text, 'address'),
|
|
'symbol': extract_field_value(obj_text, 'symbol'),
|
|
'onCoinGecko': extract_field_value(obj_text, 'onCoinGecko'),
|
|
'gecko_id': extract_field_value(obj_text, 'gecko_id'),
|
|
'cmcId': extract_field_value(obj_text, 'cmcId'),
|
|
'pegType': extract_field_value(obj_text, 'pegType'),
|
|
'pegMechanism': extract_field_value(obj_text, 'pegMechanism'),
|
|
'priceSource': extract_field_value(obj_text, 'priceSource'),
|
|
'deadFrom': extract_field_value(obj_text, 'deadFrom'),
|
|
'delisted': extract_field_value(obj_text, 'delisted'),
|
|
'deprecated': extract_field_value(obj_text, 'deprecated'),
|
|
'doublecounted': extract_field_value(obj_text, 'doublecounted')
|
|
}
|
|
|
|
# Only include objects that have at least id and name
|
|
if data['id'] and data['name']:
|
|
yield (
|
|
convert_value(data['id'], 'STRING'),
|
|
convert_value(data['name'], 'STRING'),
|
|
convert_value(data['address'], 'STRING'),
|
|
convert_value(data['symbol'], 'STRING'),
|
|
convert_value(data['onCoinGecko'], 'BOOLEAN'),
|
|
convert_value(data['gecko_id'], 'STRING'),
|
|
convert_value(data['cmcId'], 'STRING'),
|
|
convert_value(data['pegType'], 'STRING'),
|
|
convert_value(data['pegMechanism'], 'STRING'),
|
|
convert_value(data['priceSource'], 'STRING'),
|
|
convert_value(data['deadFrom'], 'STRING'),
|
|
convert_value(data['delisted'], 'BOOLEAN'),
|
|
convert_value(data['deprecated'], 'BOOLEAN'),
|
|
convert_value(data['doublecounted'], 'BOOLEAN')
|
|
)
|
|
|
|
except Exception as obj_error:
|
|
# Skip malformed objects but continue processing
|
|
continue
|
|
|
|
except Exception as error:
|
|
raise Exception(f'Error parsing peggedData content: {str(error)}')
|
|
{% endmacro %}
|
|
|
|
{% macro create_udf_encode_contract_call() %}
|
|
|
|
def encode_call(function_abi, input_values):
|
|
"""
|
|
Encodes EVM contract function calls into ABI-encoded calldata.
|
|
|
|
This function generates complete calldata (selector + encoded params) that can be
|
|
used directly in eth_call JSON-RPC requests to query contract state.
|
|
"""
|
|
import eth_abi
|
|
from eth_hash.auto import keccak
|
|
import json
|
|
|
|
def get_function_signature(abi):
|
|
"""
|
|
Generate function signature using the same logic as utils.udf_evm_text_signature.
|
|
|
|
Examples:
|
|
balanceOf(address)
|
|
transfer(address,uint256)
|
|
swap((address,address,uint256))
|
|
"""
|
|
def generate_signature(inputs):
|
|
signature_parts = []
|
|
for input_data in inputs:
|
|
if 'components' in input_data:
|
|
# Handle nested tuples
|
|
component_signature_parts = []
|
|
components = input_data['components']
|
|
component_signature_parts.extend(generate_signature(components))
|
|
component_signature_parts[-1] = component_signature_parts[-1].rstrip(",")
|
|
if input_data['type'].endswith('[]'):
|
|
signature_parts.append("(" + "".join(component_signature_parts) + ")[],")
|
|
else:
|
|
signature_parts.append("(" + "".join(component_signature_parts) + "),")
|
|
else:
|
|
# Clean up Solidity-specific modifiers
|
|
signature_parts.append(input_data['type'].replace('enum ', '').replace(' payable', '') + ",")
|
|
return signature_parts
|
|
|
|
signature_parts = [abi['name'] + "("]
|
|
signature_parts.extend(generate_signature(abi.get('inputs', [])))
|
|
if len(signature_parts) > 1:
|
|
signature_parts[-1] = signature_parts[-1].rstrip(",") + ")"
|
|
else:
|
|
signature_parts.append(")")
|
|
return "".join(signature_parts)
|
|
|
|
def function_selector(abi):
|
|
"""Calculate 4-byte function selector using Keccak256 hash."""
|
|
signature = get_function_signature(abi)
|
|
hash_bytes = keccak(signature.encode('utf-8'))
|
|
return hash_bytes[:4].hex(), signature
|
|
|
|
def get_canonical_type(input_spec):
|
|
"""
|
|
Convert ABI input spec to canonical type string for eth_abi encoding.
|
|
|
|
Handles tuple expansion: tuple -> (address,uint256,bytes)
|
|
"""
|
|
param_type = input_spec['type']
|
|
|
|
if param_type.startswith('tuple'):
|
|
components = input_spec.get('components', [])
|
|
component_types = ','.join([get_canonical_type(comp) for comp in components])
|
|
canonical = f"({component_types})"
|
|
|
|
# Preserve array suffixes: tuple[] -> (address,uint256)[]
|
|
if param_type.endswith('[]'):
|
|
array_suffix = param_type[5:] # Everything after 'tuple'
|
|
canonical += array_suffix
|
|
|
|
return canonical
|
|
|
|
return param_type
|
|
|
|
def prepare_value(value, param_type, components=None):
|
|
"""
|
|
Convert Snowflake values to Python types suitable for eth_abi encoding.
|
|
|
|
Handles type coercion and format normalization for all Solidity types.
|
|
"""
|
|
# Handle null/None values with sensible defaults
|
|
if value is None:
|
|
if param_type.startswith('uint') or param_type.startswith('int'):
|
|
return 0
|
|
elif param_type == 'address':
|
|
return '0x' + '0' * 40
|
|
elif param_type == 'bool':
|
|
return False
|
|
elif param_type.startswith('bytes'):
|
|
return b''
|
|
else:
|
|
return value
|
|
|
|
# CRITICAL: Check arrays FIRST (before base types)
|
|
# This prevents bytes[] from matching the bytes check
|
|
if param_type.endswith('[]'):
|
|
base_type = param_type[:-2]
|
|
if not isinstance(value, list):
|
|
return []
|
|
|
|
# Special handling for tuple arrays
|
|
if base_type == 'tuple' and components:
|
|
return [prepare_tuple(v, components) for v in value]
|
|
else:
|
|
return [prepare_value(v, base_type) for v in value]
|
|
|
|
# Base type conversions
|
|
if param_type == 'address':
|
|
addr = str(value).lower()
|
|
if not addr.startswith('0x'):
|
|
addr = '0x' + addr
|
|
return addr
|
|
|
|
if param_type.startswith('uint') or param_type.startswith('int'):
|
|
return int(value)
|
|
|
|
if param_type == 'bool':
|
|
if isinstance(value, str):
|
|
return value.lower() in ('true', '1', 'yes')
|
|
return bool(value)
|
|
|
|
if param_type.startswith('bytes'):
|
|
if isinstance(value, str):
|
|
if value.startswith('0x'):
|
|
value = value[2:]
|
|
return bytes.fromhex(value)
|
|
return value
|
|
|
|
if param_type == 'string':
|
|
return str(value)
|
|
|
|
return value
|
|
|
|
def prepare_tuple(value, components):
|
|
"""
|
|
Recursively prepare tuple values, handling nested structures.
|
|
|
|
Tuples can contain other tuples, arrays, or tuple arrays.
|
|
"""
|
|
if not isinstance(value, (list, tuple)):
|
|
# Support dict-style input (by component name)
|
|
if isinstance(value, dict):
|
|
value = [value.get(comp.get('name', f'field_{i}'))
|
|
for i, comp in enumerate(components)]
|
|
else:
|
|
return value
|
|
|
|
result = []
|
|
for i, comp in enumerate(components):
|
|
if i >= len(value):
|
|
result.append(None)
|
|
continue
|
|
|
|
comp_type = comp['type']
|
|
val = value[i]
|
|
|
|
# Handle tuple arrays within tuples
|
|
if comp_type.endswith('[]') and comp_type.startswith('tuple'):
|
|
sub_components = comp.get('components', [])
|
|
result.append(prepare_value(val, comp_type, sub_components))
|
|
elif comp_type.startswith('tuple'):
|
|
# Single tuple (not array)
|
|
sub_components = comp.get('components', [])
|
|
result.append(prepare_tuple(val, sub_components))
|
|
else:
|
|
result.append(prepare_value(val, comp_type))
|
|
|
|
return tuple(result)
|
|
|
|
try:
|
|
inputs = function_abi.get('inputs', [])
|
|
|
|
# Calculate selector using battle-tested signature generation
|
|
selector_hex, signature = function_selector(function_abi)
|
|
|
|
# Functions with no inputs only need the selector
|
|
if not inputs:
|
|
return '0x' + selector_hex
|
|
|
|
# Prepare values for encoding
|
|
prepared_values = []
|
|
for i, inp in enumerate(inputs):
|
|
if i >= len(input_values):
|
|
prepared_values.append(None)
|
|
continue
|
|
|
|
value = input_values[i]
|
|
param_type = inp['type']
|
|
|
|
# Handle tuple arrays at top level
|
|
if param_type.endswith('[]') and param_type.startswith('tuple'):
|
|
components = inp.get('components', [])
|
|
prepared_values.append(prepare_value(value, param_type, components))
|
|
elif param_type.startswith('tuple'):
|
|
# Single tuple (not array)
|
|
components = inp.get('components', [])
|
|
prepared_values.append(prepare_tuple(value, components))
|
|
else:
|
|
prepared_values.append(prepare_value(value, param_type))
|
|
|
|
# Get canonical type strings for eth_abi (expands tuples)
|
|
types = [get_canonical_type(inp) for inp in inputs]
|
|
|
|
# Encode parameters using eth_abi
|
|
encoded_params = eth_abi.encode(types, prepared_values).hex()
|
|
|
|
# Return complete calldata: selector + encoded params
|
|
return '0x' + selector_hex + encoded_params
|
|
|
|
except Exception as e:
|
|
# Return structured error for debugging
|
|
import traceback
|
|
return json.dumps({
|
|
'error': str(e),
|
|
'traceback': traceback.format_exc(),
|
|
'function': function_abi.get('name', 'unknown'),
|
|
'signature': signature if 'signature' in locals() else 'not computed',
|
|
'selector': '0x' + selector_hex if 'selector_hex' in locals() else 'not computed',
|
|
'types': types if 'types' in locals() else 'not computed'
|
|
})
|
|
|
|
{% endmacro %}
|