Silver transactions (#9)

* transctions model and depedent udfs

* docs and tests

* add tx index

* remove block_id from cluster keys

* add and use description macros

---------

Co-authored-by: desmond-hui <desmond@flipsidecryto.com>
This commit is contained in:
desmond-hui 2024-09-05 07:42:27 -07:00 committed by GitHub
parent c8d1c1079b
commit 8a3bf60802
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
24 changed files with 556 additions and 0 deletions

View File

@ -4,6 +4,11 @@
{% if target.database != "ECLIPSE_COMMUNITY_DEV" %}
{{ create_udf_bulk_rest_api_v2() }};
{% endif %}
{{ create_udf_ordered_signers(schema = "silver") }}
{{ create_udf_get_compute_units_consumed(schema = "silver") }}
{{ create_udf_get_compute_units_total(schema = "silver") }}
{{ create_udf_get_tx_size(schema = "silver") }}
{% endset %}
{% do run_query(sql) %}
{% endif %}

176
macros/python/udfs.sql Normal file
View File

@ -0,0 +1,176 @@
{% macro create_udf_ordered_signers(schema) %}
create or replace function {{ schema }}.udf_ordered_signers(accts array)
returns array
language python
runtime_version = '3.8'
handler = 'ordered_signers'
as
$$
def ordered_signers(accts) -> list:
signers = []
for v in accts:
if v["signer"]:
signers.append(v["pubkey"])
return signers
$$;
{% endmacro %}
{% macro create_udf_get_compute_units_consumed(schema) %}
create or replace function {{ schema }}.udf_get_compute_units_consumed(log_messages array, instructions array)
returns int
language python
runtime_version = '3.8'
handler = 'get_compute_units_consumed'
as
$$
def get_compute_units_consumed(log_messages, instructions):
import re
if log_messages is None:
return None
units_consumed_list = []
selected_logs = set()
for instr in instructions:
program_id = instr['programId']
for logs in log_messages:
if logs in selected_logs:
continue
if re.search(f"Program {program_id} consumed", logs):
units_consumed = int(re.findall(r'consumed (\d+)', logs)[0])
units_consumed_list.append(units_consumed)
selected_logs.add(logs)
break
total_units_consumed = sum(units_consumed_list)
return None if total_units_consumed == 0 else total_units_consumed
$$;
{% endmacro %}
{% macro create_udf_get_compute_units_total(schema) %}
create or replace function {{ schema }}.udf_get_compute_units_total(log_messages array, instructions array)
returns int
language python
runtime_version = '3.8'
handler = 'get_compute_units_total'
as
$$
def get_compute_units_total(log_messages, instructions):
import re
if log_messages is None:
return None
match = None
for instr in instructions:
program_id = instr['programId']
for logs in log_messages:
match = re.search(f"Program {program_id} consumed \d+ of (\d+) compute units", logs)
if match:
total_units = int(match.group(1))
return total_units
if match is None:
return None
$$;
{% endmacro %}
{% macro create_udf_get_tx_size(schema) %}
create or replace function {{ schema }}.udf_get_tx_size(accts array, instructions array, version string, addr_lookups array, signers array)
returns int
language python
runtime_version = '3.8'
handler = 'get_tx_size'
AS
$$
def get_tx_size(accts, instructions, version, addr_lookups, signers) -> int:
header_size = 3
n_signers = len(signers)
n_pubkeys = len(accts)
n_instructions = len(instructions)
signature_size = (1 if n_signers <= 127 else (2 if n_signers <= 16383 else 3)) + (n_signers * 64)
if version == '0':
version_size = 1
v0_non_lut_accts_size = len([acct for acct in accts if acct.get('source') == 'transaction'])
account_pubkeys_size = (1 if n_pubkeys <= 127 else (2 if n_pubkeys <= 16383 else 3)) + (v0_non_lut_accts_size * 32)
else:
version_size = 0
account_pubkeys_size = (1 if n_pubkeys <= 127 else (2 if n_pubkeys <= 16383 else 3)) + (n_pubkeys * 32)
blockhash_size = 32
program_id_index_size = (1 if n_instructions <= 127 else (2 if n_instructions <= 16383 else 3)) + (n_instructions)
accounts_index_size = sum((1 if len(instruction.get('accounts', [])) <= 127 else (2 if len(instruction.get('accounts', [])) <= 16383 else 3)) + len(instruction.get('accounts', [])) for instruction in instructions)
address_lookup_size = 0
if version == '0' and addr_lookups:
total_items = len(addr_lookups)
readonly_items = sum(len(item.get('readonlyIndexes', [])) for item in addr_lookups)
writeable_items = sum(len(item.get('writableIndexes', [])) for item in addr_lookups)
address_lookup_size = (total_items * 34) + readonly_items + writeable_items
address_lookup_size = (1 if address_lookup_size <= 127 else (2 if address_lookup_size <= 16383 else 3)) + address_lookup_size
data_size = 0
base58_chars = '123456789ABCDEFGHJKLMNPQRSTUVWXYZabcdefghijkmnopqrstuvwxyz'
base58_map = {c: i for i, c in enumerate(base58_chars)}
for instruction in instructions:
bi = 0
leading_zeros = 0
data_base58 = instruction.get('data', b'')
for c in data_base58:
if c not in base58_map:
raise ValueError('Invalid character in Base58 string')
bi = bi * 58 + base58_map[c]
hex_str = hex(bi)[2:]
if len(hex_str) % 2 != 0:
hex_str = '0' + hex_str
for c in data_base58:
if c == '1':
leading_zeros += 2
else:
break
temp_data_size = len('0' * leading_zeros + hex_str)
data_size += (1 if temp_data_size / 2 <= 127 else (2 if temp_data_size / 2 <= 16383 else 3)) + (temp_data_size / 2)
for instruction in instructions:
if 'data' not in instruction:
parsed = instruction.get('parsed')
if isinstance(parsed, dict):
type_ = parsed.get('type')
else:
type_ = None
if type_ == 'transfer' and instruction.get('program') == 'spl-token':
data_size += 7
accounts_index_size += 4
elif instruction.get('program') == 'spl-memo' and instruction.get('programId') == 'MemoSq4gqABAXKb96qnH8TysNcWxMyWCqXgDLGmfcHr':
data_size += 30
accounts_index_size += 0
elif type_ == 'transfer' and instruction.get('program') == 'system':
data_size += 9
accounts_index_size += 3
elif instruction.get('program') == 'spl-memo' and instruction.get('programId') == 'Memo1UhkJRfHyvLMcVucJwxXeuD728EqVDDwQDxFMNo':
data_size += 43
accounts_index_size += 0
elif type_ == 'transferChecked' and instruction.get('program') == 'spl-token':
data_size += 8
accounts_index_size += 5
elif type_ == 'write' and instruction.get('program') == 'bpf-upgradeable-loader':
info = parsed.get('info')
if info:
bytes_data = info.get('bytes')
if bytes_data:
data_size += len(bytes_data) / 2
final_data_size = data_size
transaction_size = (
header_size + account_pubkeys_size + blockhash_size +
signature_size + program_id_index_size + accounts_index_size +
final_data_size + address_lookup_size + version_size
)
return transaction_size
$$;
{% endmacro %}

View File

@ -0,0 +1,5 @@
{% docs address_table_lookups %}
Array of lookup keys used associate additional account keys with the transactions. Valid for version > 0 transactions.
{% enddocs %}

View File

@ -0,0 +1,5 @@
{% docs inner_instruction %}
A call from one smart contract program to another
{% enddocs %}

View File

@ -0,0 +1,5 @@
{% docs instruction %}
Specifies which program it is calling, which accounts it wants to read or modify, and additional data that serves as auxiliary input to the program
{% enddocs %}

View File

@ -0,0 +1,5 @@
{% docs log_messages %}
Array of log messages written by the program for this transaction
{% enddocs %}

View File

@ -0,0 +1,5 @@
{% docs partition_key %}
Partition used by the upstream bronze model
{% enddocs %}

View File

@ -0,0 +1,5 @@
{% docs post_balances %}
List of post-transaction balances for different accounts
{% enddocs %}

View File

@ -0,0 +1,5 @@
{% docs post_token_balances %}
List of post-transaction token balances for different token accounts
{% enddocs %}

View File

@ -0,0 +1,5 @@
{% docs pre_balances %}
List of pre-transaction balances for different accounts
{% enddocs %}

View File

@ -0,0 +1,5 @@
{% docs pre_token_balances %}
List of pre-transaction token balances for different token accounts
{% enddocs %}

View File

@ -0,0 +1,5 @@
{% docs previous_block_hash %}
Previous block's hash value
{% enddocs %}

View File

@ -0,0 +1,5 @@
{% docs signers %}
List of accounts that signed the transaction
{% enddocs %}

View File

@ -0,0 +1,5 @@
{% docs tx_account_keys %}
List of accounts that are referenced by pre/post sol/token balances objects
{% enddocs %}

View File

@ -0,0 +1,5 @@
{% docs tx_fee %}
Transaction fee (in lamports)
{% enddocs %}

View File

@ -0,0 +1,5 @@
{% docs tx_id %}
A unique key that identifies a transaction
{% enddocs %}

View File

@ -0,0 +1,5 @@
{% docs tx_index %}
The order in which the transaction was executed in a given slot
{% enddocs %}

View File

@ -0,0 +1,5 @@
{% docs tx_size %}
The size of the transaction in bytes.
{% enddocs %}

View File

@ -0,0 +1,5 @@
{% docs tx_succeeded %}
True when a transaction is successful, otherwise false.
{% enddocs %}

View File

@ -0,0 +1,5 @@
{% docs tx_unit_limit %}
The max number of compute units that can be consumed by the program.
{% enddocs %}

View File

@ -0,0 +1,5 @@
{% docs tx_units_consumed %}
The number of compute units consumed by the program.
{% enddocs %}

View File

@ -0,0 +1,5 @@
{% docs tx_version %}
Transaction version, legacy version is listed as NULL or 'legacy'
{% enddocs %}

View File

@ -0,0 +1,178 @@
{{ config(
materialized = 'incremental',
unique_key = "tx_id",
incremental_predicates = ["dynamic_range_predicate", "block_timestamp::date"],
cluster_by = ['block_timestamp::DATE','_inserted_timestamp::DATE'],
merge_exclude_columns = ["inserted_timestamp"],
tags = ['scheduled_core']
) }}
{% if execute %}
{% if is_incremental() %}
{% set max_inserted_query %}
SELECT
MAX(_inserted_timestamp) AS _inserted_timestamp
FROM
{{ this }}
{% endset %}
{% set max_inserted_timestamp = run_query(max_inserted_query)[0][0] %}
{% endif %}
{% endif %}
WITH pre_final AS (
SELECT
b.block_timestamp AS block_timestamp,
t.block_id,
t.tx_id,
t.index,
t.data:transaction:message:recentBlockhash::string AS recent_block_hash,
t.data:meta:fee::number AS fee,
CASE
WHEN is_null_value(t.data:meta:err) THEN
TRUE
ELSE
FALSE
END AS succeeded,
t.data:transaction:message:accountKeys::array AS account_keys,
t.data:meta:preBalances::array AS pre_balances,
t.data:meta:postBalances::array AS post_balances,
t.data:meta:preTokenBalances::array AS pre_token_balances,
t.data:meta:postTokenBalances::array AS post_token_balances,
t.data:transaction:message:instructions::array AS instructions,
t.data:meta:innerInstructions::array AS inner_instructions,
t.data:meta:logMessages::array AS log_messages,
t.data:transaction:message:addressTableLookups::array as address_table_lookups,
t.data:version::string as version,
t.partition_key,
t._inserted_timestamp
FROM
{{ ref('bronze__transactions') }} t
LEFT OUTER JOIN
{{ ref('silver__blocks') }} b
ON b.block_id = t.block_id
WHERE
tx_id IS NOT NULL
AND (
coalesce(t.data:transaction:message:instructions[0]:programId::STRING,'') <> 'Vote111111111111111111111111111111111111111'
OR array_size(t.data:transaction:message:instructions) > 1
)
{% if is_incremental() %}
AND t._inserted_timestamp >= '{{ max_inserted_timestamp }}'
{% else %}
AND t._inserted_timestamp >= '2024-08-01' /* TODO replace with whenever we start getting data in PROD */
{% endif %}
),
{% if is_incremental() %}
prev_null_block_timestamp_txs AS (
SELECT
b.block_timestamp,
t.block_id,
t.tx_id,
t.index,
t.recent_block_hash,
t.signers,
t.fee,
t.succeeded,
t.account_keys,
t.pre_balances,
t.post_balances,
t.pre_token_balances,
t.post_token_balances,
t.instructions,
t.inner_instructions,
t.log_messages,
t.address_table_lookups,
t.units_consumed,
t.units_limit,
t.tx_size,
t.version,
t.partition_key,
GREATEST(
t._inserted_timestamp,
b._inserted_timestamp
) AS _inserted_timestamp
FROM
{{ this }} t
INNER JOIN
{{ ref('silver__blocks') }} b
ON b.block_id = t.block_id
WHERE
t.block_timestamp::DATE IS NULL
),
{% endif %}
qualifying_transactions AS (
SELECT
tx_id,
array_agg(i.value:programId::string) WITHIN GROUP (ORDER BY i.index) AS program_ids
FROM
pre_final
JOIN
table(flatten(instructions)) i
WHERE
(
coalesce(instructions[0]:programId::string,'') <> 'Vote111111111111111111111111111111111111111'
/* small amount of txs have non-compute instructions after the vote */
OR i.value:programId::string NOT IN ('Vote111111111111111111111111111111111111111','ComputeBudget111111111111111111111111111111')
)
GROUP BY 1
HAVING array_size(program_ids) > 0
UNION ALL
/* some txs have no instructions at all, this is being filtered out above so we need to make sure we grab these */
SELECT
tx_id,
null
FROM
pre_final
WHERE
array_size(instructions) = 0
)
SELECT
block_timestamp,
block_id,
tx_id,
index,
recent_block_hash,
silver.udf_ordered_signers(account_keys) AS signers,
fee,
succeeded,
account_keys,
pre_balances,
post_balances,
pre_token_balances,
post_token_balances,
instructions,
inner_instructions,
log_messages,
address_table_lookups,
silver.udf_get_compute_units_consumed(log_messages, instructions) as units_consumed,
silver.udf_get_compute_units_total(log_messages, instructions) as units_limit,
silver.udf_get_tx_size(account_keys,instructions,version,address_table_lookups,signers) as tx_size,
version,
partition_key,
_inserted_timestamp,
{{ dbt_utils.generate_surrogate_key(
['tx_id']
) }} AS transactions_id,
sysdate() AS inserted_timestamp,
sysdate() AS modified_timestamp,
'{{ invocation_id }}' AS _invocation_id
FROM
pre_final b
JOIN
qualifying_transactions
USING(tx_id)
QUALIFY
row_number() OVER (PARTITION BY block_id, tx_id ORDER BY _inserted_timestamp DESC) = 1
{% if is_incremental() %}
UNION
SELECT
*,
{{ dbt_utils.generate_surrogate_key(
['tx_id']
) }} AS transactions_id,
sysdate() AS inserted_timestamp,
sysdate() AS modified_timestamp,
'{{ invocation_id }}' AS _invocation_id
FROM
prev_null_block_timestamp_txs
{% endif %}

View File

@ -0,0 +1,97 @@
version: 2
models:
- name: silver__transactions
recent_date_filter: &recent_date_filter
config:
where: _inserted_timestamp >= current_date - 7
columns:
- name: BLOCK_TIMESTAMP
description: "{{ doc('block_timestamp') }}"
tests:
- not_null: *recent_date_filter
- name: BLOCK_ID
description: "{{ doc('block_id') }}"
tests:
- not_null: *recent_date_filter
- name: RECENT_BLOCK_HASH
description: "{{ doc('previous_block_hash') }}"
tests:
- not_null: *recent_date_filter
- name: TX_ID
description: "{{ doc('tx_id') }}"
tests:
- not_null: *recent_date_filter
- unique:
config:
where: >
block_timestamp::date > current_date - 30
- name: INDEX
description: "{{ doc('tx_index') }}"
tests:
- not_null: *recent_date_filter
- name: FEE
description: "{{ doc('tx_fee') }}"
tests:
- not_null: *recent_date_filter
- name: SUCCEEDED
description: "{{ doc('tx_succeeded') }}"
tests:
- not_null: *recent_date_filter
- name: SIGNERS
description: "{{ doc('signers') }}"
- name: ACCOUNT_KEYS
description: "{{ doc('tx_account_keys') }}"
tests:
- not_null: *recent_date_filter
- name: PRE_BALANCES
description: "{{ doc('pre_balances') }}"
- name: POST_BALANCES
description: "{{ doc('post_balances') }}"
- name: PRE_TOKEN_BALANCES
description: "{{ doc('pre_token_balances') }}"
- name: POST_TOKEN_BALANCES
description: "{{ doc('post_token_balances') }}"
- name: INSTRUCTIONS
description: "{{ doc('instruction') }}"
tests:
- not_null: *recent_date_filter
- name: INNER_INSTRUCTIONS
description: "{{ doc('inner_instruction') }}"
tests:
- not_null: *recent_date_filter
- name: LOG_MESSAGES
description: "{{ doc('log_messages') }}"
- name: ADDRESS_TABLE_LOOKUPS
description: "{{ doc('address_table_lookups') }}"
- name: UNITS_CONSUMED
description: "{{ doc('tx_units_consumed') }}"
- name: UNIT_LIMIT
description: "{{ doc('tx_unit_limit') }}"
- name: TX_SIZE
description: "{{ doc('tx_size') }}"
- name: VERSION
description: "{{ doc('tx_version') }}"
- name: PARTITION_KEY
description: "{{ doc('partition_key') }}"
- name: _INSERTED_TIMESTAMP
description: "{{ doc('_inserted_timestamp') }}"
data_tests:
- not_null: *recent_date_filter
- name: TRANSACTIONS_ID
description: '{{ doc("pk") }}'
data_tests:
- not_null: *recent_date_filter
- name: INSERTED_TIMESTAMP
description: '{{ doc("inserted_timestamp") }}'
data_tests:
- not_null: *recent_date_filter
- name: MODIFIED_TIMESTAMP
description: '{{ doc("modified_timestamp") }}'
data_tests:
- not_null: *recent_date_filter
- name: _INVOCATION_ID
description: '{{ doc("_invocation_id") }}'
data_tests:
- not_null:
name: test_silver__not_null_transactions__invocation_id
<<: *recent_date_filter