{% macro python_hex_to_int() %} def hex_to_int(hex) -> str: """ Converts hex (of any size) to int (as a string). Snowflake and java script can only handle up to 64-bit (38 digits of precision) hex_to_int('200000000000000000000000000000211'); >> 680564733841876926926749214863536423441 hex_to_int('0x200000000000000000000000000000211'); >> 680564733841876926926749214863536423441 hex_to_int(NULL); >> NULL """ return (str(int(hex, 16)) if hex and hex != "0x" else None) {% endmacro %} {% macro python_udf_hex_to_int_with_encoding() %} def hex_to_int(encoding, hex) -> str: """ Converts hex (of any size) to int (as a string). Snowflake and java script can only handle up to 64-bit (38 digits of precision) hex_to_int('hex', '200000000000000000000000000000211'); >> 680564733841876926926749214863536423441 hex_to_int('hex', '0x200000000000000000000000000000211'); >> 680564733841876926926749214863536423441 hex_to_int('hex', NULL); >> NULL hex_to_int('s2c', 'ffffffffffffffffffffffffffffffffffffffffffffffffffffffffe5b83acf'); >> -440911153 """ if not hex: return None if encoding.lower() == 's2c': if hex[0:2].lower() != '0x': hex = f'0x{hex}' bits = len(hex[2:])*4 value = int(hex, 0) if value & (1 << (bits-1)): value -= 1 << bits return str(value) else: return str(int(hex, 16)) {% endmacro %} {% macro create_udf_keccak256() %} from Crypto.Hash import keccak def udf_encode(event_name): keccak_hash = keccak.new(digest_bits=256) keccak_hash.update(event_name.encode('utf-8')) return '0x' + keccak_hash.hexdigest() {% endmacro %} {% macro create_udf_evm_text_signature() %} def get_simplified_signature(abi): def generate_signature(inputs): signature_parts = [] for input_data in inputs: if 'components' in input_data: component_signature_parts = [] components = input_data['components'] component_signature_parts.extend(generate_signature(components)) component_signature_parts[-1] = component_signature_parts[-1].rstrip(",") if input_data['type'].endswith('[]'): signature_parts.append("(" + "".join(component_signature_parts) + ")[],") else: signature_parts.append("(" + "".join(component_signature_parts) + "),") else: signature_parts.append(input_data['type'].replace('enum ', '').replace(' payable', '') + ",") return signature_parts signature_parts = [abi['name'] + "("] signature_parts.extend(generate_signature(abi['inputs'])) signature_parts[-1] = signature_parts[-1].rstrip(",") + ")" return "".join(signature_parts) {% endmacro %} {% macro create_udf_decimal_adjust() %} from decimal import Decimal, ROUND_DOWN def custom_divide(input, adjustment): try: if adjustment is None or input is None: return None # Perform the division using Decimal type result = Decimal(input) / pow(10, Decimal(adjustment)) # Determine the number of decimal places in the result decimal_places = max(0, -result.as_tuple().exponent) # Convert the result to a string representation without scientific notation and with dynamic decimal precision result_str = "{:.{prec}f}".format(result, prec=decimal_places) return result_str except Exception as e: return None {% endmacro %} {% macro create_udf_cron_to_prior_timestamps() %} import croniter import datetime class TimestampGenerator: def __init__(self): pass def process(self, workflow_name, workflow_schedule): for timestamp in self.generate_timestamps(workflow_name, workflow_schedule): yield (workflow_name, workflow_schedule, timestamp) def generate_timestamps(self, workflow_name, workflow_schedule): # Create a cron iterator object cron = croniter.croniter(workflow_schedule) # Generate timestamps for the prev 10 runs timestamps = [] for i in range(10): prev_run = cron.get_prev(datetime.datetime) timestamps.append(prev_run) return timestamps {% endmacro %}