Update/tx size udf optimization (#863)
Some checks failed
docs_update / run_dbt_jobs (push) Has been cancelled
dbt_run_deploy_udfs_and_sps / run_dbt_jobs (push) Has been cancelled
dbt_run_macro_get_block_production / run_dbt_jobs (push) Has been cancelled
dbt_run_idls_history / run_dbt_jobs (push) Has been cancelled
dbt_run_scheduled_daily / run_dbt_jobs (push) Has been cancelled
dbt_test_intraday / run_dbt_jobs (push) Has been cancelled
dbt_run_streamline_validator_vote_program_accounts_snapshot / run_dbt_jobs (push) Has been cancelled
dbt_run_streamline_validator_vote_accounts_snapshot / run_dbt_jobs (push) Has been cancelled
dbt_run_streamline_validator_metadata_snapshot / run_dbt_jobs (push) Has been cancelled
dbt_run_streamline_validators_list_snapshot / run_dbt_jobs (push) Has been cancelled
dbt_test_scheduled_hourly / run_dbt_jobs (push) Has been cancelled
dbt_run_macro_get_compressed_nft / run_dbt_jobs (push) Has been cancelled
dbt_run_helius_metadata / run_dbt_jobs (push) Has been cancelled
dbt_run_batch_backfill / run_dbt_jobs (push) Has been cancelled
docs_update / notify-failure (push) Has been cancelled
dbt_run_macro_get_block_production / notify-failure (push) Has been cancelled
dbt_run_idls_history / notify-failure (push) Has been cancelled
dbt_run_scheduled_daily / notify-failure (push) Has been cancelled
dbt_test_intraday / notify-failure (push) Has been cancelled
dbt_run_streamline_validator_vote_program_accounts_snapshot / notify-failure (push) Has been cancelled
dbt_run_streamline_validator_vote_accounts_snapshot / notify-failure (push) Has been cancelled
dbt_run_streamline_validator_metadata_snapshot / notify-failure (push) Has been cancelled
dbt_run_streamline_validators_list_snapshot / notify-failure (push) Has been cancelled
dbt_test_scheduled_hourly / notify-failure (push) Has been cancelled
dbt_run_macro_get_compressed_nft / notify-failure (push) Has been cancelled
dbt_run_helius_metadata / notify-failure (push) Has been cancelled
dbt_test_scheduled_weekly / run_dbt_jobs (push) Has been cancelled
dbt_run_streamline_solscan_token_list / run_dbt_jobs (push) Has been cancelled
dbt_run_full_observability / run_dbt_jobs (push) Has been cancelled
dbt_test_scheduled_weekly / notify-failure (push) Has been cancelled
dbt_run_streamline_solscan_token_list / notify-failure (push) Has been cancelled
dbt_run_full_observability / notify-failure (push) Has been cancelled

* update tx_size and compute unit udf; update version

* revert temp source

* fix udf param
This commit is contained in:
tarikceric 2025-08-06 11:01:03 -07:00 committed by GitHub
parent a027f34090
commit 00f87a344e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 113 additions and 60 deletions

View File

@ -2,7 +2,7 @@
create or replace function {{ schema }}.udf_ordered_signers(accts array)
returns array
language python
runtime_version = '3.8'
runtime_version = '3.9'
handler = 'ordered_signers'
as
$$
@ -20,7 +20,7 @@ $$;
create or replace function {{ schema }}.udf_get_all_inner_instruction_events(inner_instruction array)
returns array
language python
runtime_version = '3.8'
runtime_version = '3.9'
handler = 'get_all_inner_instruction_events'
as
$$
@ -41,7 +41,7 @@ $$;
create or replace function {{ schema }}.udf_get_account_balances_index(account string, account_keys array)
returns int
language python
runtime_version = '3.8'
runtime_version = '3.9'
handler = 'get_account_balances_index'
as
$$
@ -58,7 +58,7 @@ $$;
create or replace function {{ schema }}.udf_get_all_inner_instruction_program_ids(inner_instruction variant)
returns array
language python
runtime_version = '3.8'
runtime_version = '3.9'
handler = 'get_all_inner_instruction_program_ids'
as
$$
@ -79,7 +79,7 @@ $$;
create or replace function {{ schema }}.udf_get_multi_signers_swapper(tx_to array, tx_from array, signers array)
returns string
language python
runtime_version = '3.8'
runtime_version = '3.9'
handler = 'get_multi_signers_swapper'
as
$$
@ -104,7 +104,7 @@ $$;
create or replace function {{ schema }}.udf_get_jupv4_inner_programs(inner_instruction array)
returns array
language python
runtime_version = '3.8'
runtime_version = '3.9'
handler = 'get_jupv4_inner_programs'
as
$$
@ -126,7 +126,7 @@ $$;
create or replace function {{ schema }}.udf_get_compute_units_consumed(log_messages array, instructions array)
returns int
language python
runtime_version = '3.8'
runtime_version = '3.9'
handler = 'get_compute_units_consumed'
as
$$
@ -152,27 +152,60 @@ $$;
{% endmacro %}
{% macro create_udf_get_compute_units_total(schema) %}
create or replace function {{ schema }}.udf_get_compute_units_total(log_messages array, instructions array)
create or replace function {{ schema }}.udf_get_compute_units_total(log_messages array)
returns int
language python
runtime_version = '3.8'
runtime_version = '3.9'
handler = 'get_compute_units_total'
as
$$
def get_compute_units_total(log_messages, instructions):
import re
if log_messages is None:
def get_compute_units_total(instructions):
def base58_decode(s):
base58_chars = '123456789ABCDEFGHJKLMNPQRSTUVWXYZabcdefghijkmnopqrstuvwxyz'
base58_map = {c: i for i, c in enumerate(base58_chars)}
bi = 0
for c in s:
if c not in base58_map:
return None
bi = bi * 58 + base58_map[c]
hex_str = hex(bi)[2:]
if len(hex_str) % 2 != 0:
hex_str = '0' + hex_str
# Count leading zeros
leading_zeros = 0
for c in s:
if c == '1':
leading_zeros += 1
else:
break
return '0' * (leading_zeros * 2) + hex_str
if instructions is None:
return None
match = None
# Use ComputeBudget program instruction decoding approach
for instr in instructions:
program_id = instr['programId']
for logs in log_messages:
match = re.search(f"Program {program_id} consumed \d+ of (\d+) compute units", logs)
if match:
total_units = int(match.group(1))
return total_units
if match is None:
return None
if instr.get('programId') == 'ComputeBudget111111111111111111111111111111':
data = instr.get('data')
if data:
hex_data = base58_decode(data)
if hex_data and len(hex_data) >= 2:
# Check if this is a compute unit limit instruction (starts with '02')
if hex_data[:2] == '02':
if len(hex_data) >= 10: # Need at least 10 chars for full instruction
# Extract 4-byte little-endian value (bytes 1-4 after the '02' prefix)
# Reverse byte order for little-endian
byte_str = hex_data[2:10] # Get 4 bytes (8 hex chars)
if len(byte_str) == 8:
# Convert little-endian hex to integer
reversed_bytes = byte_str[6:8] + byte_str[4:6] + byte_str[2:4] + byte_str[0:2]
return int(reversed_bytes, 16)
return None
$$;
{% endmacro %}
@ -180,28 +213,67 @@ $$;
create or replace function {{ schema }}.udf_get_tx_size(accts array, instructions array, version string, addr_lookups array, signers array)
returns int
language python
runtime_version = '3.8'
runtime_version = '3.9'
handler = 'get_tx_size'
AS
$$
def get_tx_size(accts, instructions, version, addr_lookups, signers) -> int:
def compact_array_size(length):
return 1 if length <= 127 else (2 if length <= 16383 else 3)
def base58_decode_size(data_base58):
base58_chars = '123456789ABCDEFGHJKLMNPQRSTUVWXYZabcdefghijkmnopqrstuvwxyz'
base58_map = {c: i for i, c in enumerate(base58_chars)}
bi = 0
leading_zeros = 0
# Decode base58 to integer (break on invalid, dont return 0)
for c in data_base58:
if c not in base58_map:
break
bi = bi * 58 + base58_map[c]
# Create hex string
hex_str = hex(bi)[2:]
if len(hex_str) % 2 != 0:
hex_str = '0' + hex_str
# Count leading zeros (each '1' adds 2 to leading_zeros)
for c in data_base58:
if c == '1':
leading_zeros += 2
else:
break
# Calculate temp_data_size: len('0' * leading_zeros + hex_str)
temp_data_size = len('0' * leading_zeros + hex_str)
# Return compact size + actual size (using division)
return (1 if temp_data_size / 2 <= 127 else (2 if temp_data_size / 2 <= 16383 else 3)) + (temp_data_size / 2)
header_size = 3
n_signers = len(signers)
n_pubkeys = len(accts)
n_instructions = len(instructions)
signature_size = (1 if n_signers <= 127 else (2 if n_signers <= 16383 else 3)) + (n_signers * 64)
# Signature size calculation (optimized with helper function)
signature_size = compact_array_size(n_signers) + (n_signers * 64)
# Version and account handling
if version == '0':
version_size = 1
v0_non_lut_accts_size = len([acct for acct in accts if acct.get('source') == 'transaction'])
account_pubkeys_size = (1 if n_pubkeys <= 127 else (2 if n_pubkeys <= 16383 else 3)) + (v0_non_lut_accts_size * 32)
account_pubkeys_size = compact_array_size(n_pubkeys) + (v0_non_lut_accts_size * 32)
else:
version_size = 0
account_pubkeys_size = (1 if n_pubkeys <= 127 else (2 if n_pubkeys <= 16383 else 3)) + (n_pubkeys * 32)
blockhash_size = 32
program_id_index_size = (1 if n_instructions <= 127 else (2 if n_instructions <= 16383 else 3)) + (n_instructions)
accounts_index_size = sum((1 if len(instruction.get('accounts', [])) <= 127 else (2 if len(instruction.get('accounts', [])) <= 16383 else 3)) + len(instruction.get('accounts', [])) for instruction in instructions)
account_pubkeys_size = compact_array_size(n_pubkeys) + (n_pubkeys * 32)
blockhash_size = 32
program_id_index_size = compact_array_size(n_instructions) + n_instructions
accounts_index_size = sum(compact_array_size(len(instruction.get('accounts', []))) + len(instruction.get('accounts', [])) for instruction in instructions)
# Address lookup size (exactly like original)
address_lookup_size = 0
if version == '0' and addr_lookups:
total_items = len(addr_lookups)
@ -210,34 +282,15 @@ def get_tx_size(accts, instructions, version, addr_lookups, signers) -> int:
address_lookup_size = (total_items * 34) + readonly_items + writeable_items
address_lookup_size = (1 if address_lookup_size <= 127 else (2 if address_lookup_size <= 16383 else 3)) + address_lookup_size
# Data size calculation (process ALL instructions in first loop)
data_size = 0
base58_chars = '123456789ABCDEFGHJKLMNPQRSTUVWXYZabcdefghijkmnopqrstuvwxyz'
base58_map = {c: i for i, c in enumerate(base58_chars)}
# First loop: process ALL instructions (not just those with data)
for instruction in instructions:
bi = 0
leading_zeros = 0
data_base58 = instruction.get('data', b'')
for c in data_base58:
if c not in base58_map:
raise ValueError('Invalid character in Base58 string')
bi = bi * 58 + base58_map[c]
hex_str = hex(bi)[2:]
if len(hex_str) % 2 != 0:
hex_str = '0' + hex_str
for c in data_base58:
if c == '1':
leading_zeros += 2
else:
break
temp_data_size = len('0' * leading_zeros + hex_str)
data_size += (1 if temp_data_size / 2 <= 127 else (2 if temp_data_size / 2 <= 16383 else 3)) + (temp_data_size / 2)
data_base58 = instruction.get('data', b'') # Original gets b'' for missing data
data_size += base58_decode_size(data_base58) # Process even empty data
# Second loop: parsed instructions
for instruction in instructions:
if 'data' not in instruction:
parsed = instruction.get('parsed')
@ -264,16 +317,15 @@ def get_tx_size(accts, instructions, version, addr_lookups, signers) -> int:
elif type_ == 'write' and instruction.get('program') == 'bpf-upgradeable-loader':
info = parsed.get('info')
if info:
bytes_data = info.get('bytes')
if bytes_data:
data_size += len(bytes_data) / 2
final_data_size = data_size
bytes_data = info.get('bytes')
if bytes_data:
data_size += len(bytes_data) / 2
# Final calculation
transaction_size = (
header_size + account_pubkeys_size + blockhash_size +
signature_size + program_id_index_size + accounts_index_size +
final_data_size + address_lookup_size + version_size
data_size + address_lookup_size + version_size
)
return transaction_size
@ -281,11 +333,12 @@ def get_tx_size(accts, instructions, version, addr_lookups, signers) -> int:
$$;
{% endmacro %}
{% macro create_udf_get_account_pubkey_by_name(schema) %}
create or replace function {{ schema }}.udf_get_account_pubkey_by_name(name string, accounts array)
returns string
language python
runtime_version = '3.8'
runtime_version = '3.9'
handler = 'get_account_pubkey_by_name'
as
$$
@ -304,7 +357,7 @@ $$;
create or replace function {{ schema }}.udf_get_logs_program_data(logs array)
returns array
language python
runtime_version = '3.8'
runtime_version = '3.9'
handler = 'get_logs_program_data'
AS
$$

View File

@ -210,7 +210,7 @@ SELECT
WHEN block_id > 204777016 THEN compute_units_consumed
ELSE silver.udf_get_compute_units_consumed(log_messages, instructions)
END AS units_consumed,
silver.udf_get_compute_units_total(log_messages, instructions) as units_limit,
silver.udf_get_compute_units_total(instructions) as units_limit,
silver.udf_get_tx_size(account_keys,instructions,version,address_table_lookups,signers) as tx_size,
version,
tx_index,