An 5526/log program events (#43)

* log events table and macro

* add signers

* update test

* revert

* filter unnecessary records

* update qualify

* comment
This commit is contained in:
tarikceric 2025-01-14 11:02:16 -08:00 committed by GitHub
parent 58d39f7c37
commit 03de3d114b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 308 additions and 0 deletions

View File

@ -10,6 +10,7 @@
{{ create_udf_get_compute_units_total(schema = "silver") }}
{{ create_udf_get_tx_size(schema = "silver") }}
{{ create_udf_get_all_inner_instruction_program_ids(schema = "silver") }}
{{ create_udf_get_logs_program_data(schema = "silver") }}
{% endset %}
{% do run_query(sql) %}
{% endif %}

View File

@ -194,4 +194,162 @@ def get_all_inner_instruction_program_ids(inner_instruction) -> list:
return program_ids
$$;
{% endmacro %}
{% macro create_udf_get_logs_program_data(schema) %}
create or replace function {{ schema }}.udf_get_logs_program_data(logs array)
returns array
language python
runtime_version = '3.8'
handler = 'get_logs_program_data'
AS
$$
import re
import base64
def get_logs_program_data(logs) -> list:
def base58_decode(s):
base58_chars = '123456789ABCDEFGHJKLMNPQRSTUVWXYZabcdefghijkmnopqrstuvwxyz'
base58_base = len(base58_chars)
num = 0
for char in s:
num *= base58_base
if char not in base58_chars:
return b""
num += base58_chars.index(char)
result = bytearray()
while num > 0:
num, mod = divmod(num, 256)
result.insert(0, mod)
# Handle leading zeros in Base58 string
for char in s:
if char == '1':
result.insert(0, 0)
else:
break
return bytes(result)
def is_eclipse_program_id(s):
return len(base58_decode(s)) == 32
def is_base64(s):
if not re.match(r'^[A-Za-z0-9+/]*={0,2}$', s):
return False
if len(s) % 4 != 0:
return False
try:
decoded = base64.b64decode(s)
printable_ratio = sum(32 <= byte <= 126 for byte in decoded) / len(decoded)
if printable_ratio > 0.9:
return False
return True
except:
return False
program_data = []
parent_event_type = ""
child_event_type = ""
parent_index = -1
child_index = None
current_ancestry = []
program_end_pattern = re.compile(r"Program ([a-zA-Z0-9]+) success")
pattern = re.compile(r'invoke \[(?!1\])\d+\]')
if len(logs) == 1:
return None
try:
for i, log in enumerate(logs):
if log == "Log truncated":
break
if log.endswith(" invoke [1]"):
child_index = None
program = log.replace("Program ","").replace(" invoke [1]","")
parent_index += 1
if i+1 < len(logs) and logs[i+1].startswith("Program log: Instruction: "):
parent_event_type = logs[i+1].replace("Program log: Instruction: ","")
elif i+1 < len(logs) and logs[i+1].startswith("Program log: IX: "):
parent_event_type = logs[i+1].replace("Program log: IX: ","")
else:
parent_event_type = "UNKNOWN"
current_ancestry = [(program,None,parent_event_type)]
elif log.startswith("Call BPF program ") or log.startswith("Upgraded program "): # handle legacy BPF log format
# remove legacy log parsing code it is not compatible w/ new
continue
elif bool(pattern.search(log)):
child_index = child_index+1 if child_index is not None else 0
current_program = pattern.sub('', log.replace("Program ","")).strip()
current_node = int(pattern.search(log)[0].replace("invoke [","").replace("]",""))
if i+1 < len(logs) and logs[i+1].startswith("Program log: Instruction: "):
child_event_type = logs[i+1].replace("Program log: Instruction: ","")
elif i+1 < len(logs) and logs[i+1].startswith("Program log: IX: "):
child_event_type = logs[i+1].replace("Program log: IX: ","")
else:
child_event_type = "UNKNOWN"
if len(current_ancestry) >= current_node:
current_ancestry[current_node-1] = (current_program, child_index, child_event_type)
else:
current_ancestry.append((current_program, child_index, child_event_type))
maybe_program_end = program_end_pattern.search(log)
if maybe_program_end:
maybe_program_id = maybe_program_end.group(1)
else:
maybe_program_id = ""
if is_eclipse_program_id(maybe_program_id):
current_program_id = current_ancestry[-1][0]
current_event_type = current_ancestry[-1][2]
current_index = parent_index
current_inner_index = current_ancestry[-1][1]
current_ancestry.pop()
else:
current_program_id = current_ancestry[-1][0]
current_event_type = current_ancestry[-1][2]
current_index = parent_index
current_inner_index = current_ancestry[-1][1]
if log.startswith("Program data: "):
data = log.replace("Program data: ", "")
if program_data and program_data[-1]["data"] is None:
# Update the most recent entry
program_data[-1]["data"] = data
else:
# Append new entry if not found
program_data.append({
"data": data,
"program_id": current_program_id,
"index": current_index,
"inner_index": current_inner_index,
"event_type": current_event_type
})
elif log.startswith("Program log: "):
data = log.replace("Program log: ", "")
if is_base64(data):
program_data.append({"data": data,
"program_id": current_program_id,
"index": current_index,
"inner_index": current_inner_index,
"event_type": current_event_type})
break
elif current_program_id != "TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA" and current_program_id != "TokenzQdBNbLqP5VEhdkAS6EPFLC1PHnBqCXEpPxuEb" and current_event_type != "UNKNOWN":
program_data.append({"data": None,
"program_id": current_program_id,
"index": current_index,
"inner_index": current_inner_index,
"event_type": current_event_type})
except Exception as e:
message = f"error trying to parse logs {e}"
return [{"error": message}]
return program_data if len(program_data) > 0 else None
$$;
{% endmacro %}

View File

@ -0,0 +1,5 @@
{% docs log_index %}
Position of program data log for a specific transaction. Used to differentiate logs with the same (index, inner_index)
{% enddocs %}

View File

@ -0,0 +1,76 @@
{{ config(
materialized = 'incremental',
unique_key = "transaction_logs_program_data_id",
incremental_predicates = ["dynamic_range_predicate", "block_timestamp::date"],
cluster_by = ['block_timestamp::DATE','_inserted_timestamp::DATE'],
post_hook = enable_search_optimization('{{this.schema}}','{{this.identifier}}','ON EQUALITY(tx_id, program_id)'),
merge_exclude_columns = ["inserted_timestamp"],
tags = ['scheduled_core'],
) }}
WITH base AS (
SELECT
block_timestamp,
block_id,
tx_id,
succeeded,
signers,
silver.udf_get_logs_program_data(log_messages) AS program_data_logs,
_inserted_timestamp
FROM
{{ ref('silver__transactions') }}
WHERE
succeeded
AND log_messages IS NOT NULL
{% if is_incremental() %}
AND _inserted_timestamp >= (SELECT max(_inserted_timestamp) FROM {{ this }})
{% endif %}
),
prefinal as (
SELECT
t.block_timestamp,
t.block_id,
t.tx_id,
t.succeeded,
t.signers,
l.value:index::int AS index,
l.value:inner_index::int AS inner_index,
l.index AS log_index,
l.value:program_id::string AS program_id,
l.value:event_type::string AS event_type,
l.value:data::string AS data,
l.value:error::string AS _udf_error,
_inserted_timestamp
FROM
base t
JOIN
table(flatten(program_data_logs)) l
)
-- for logs with null 'data', we only need to return a single log per tx/program_id/event_type since additional entries are not useful
-- if the log does have 'data', we always return it
SELECT
*,
{{ dbt_utils.generate_surrogate_key(
['tx_id','index','log_index']
) }} AS transaction_logs_program_data_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp,
'{{ invocation_id }}' AS _invocation_id
FROM
prefinal
WHERE
data IS NULL
QUALIFY (ROW_NUMBER() OVER (PARTITION BY tx_id, program_id, event_type ORDER BY log_index DESC)) = 1
UNION ALL
SELECT
*,
{{ dbt_utils.generate_surrogate_key(
['tx_id','index','log_index']
) }} AS transaction_logs_program_data_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp,
'{{ invocation_id }}' AS _invocation_id
FROM
prefinal
WHERE
data IS NOT NULL

View File

@ -0,0 +1,68 @@
version: 2
models:
- name: silver__transaction_logs_program_data
recent_date_filter: &recent_date_filter
config:
where: >
_inserted_timestamp >= current_date - 7
columns:
- name: BLOCK_TIMESTAMP
description: "{{ doc('block_timestamp') }}"
tests:
- not_null:
where: _inserted_timestamp >= current_date - 7 AND block_id > 39824213
- dbt_expectations.expect_row_values_to_have_recent_data:
datepart: day
interval: 1
- name: BLOCK_ID
description: "{{ doc('block_id') }}"
tests:
- not_null: *recent_date_filter
- name: TX_ID
description: "{{ doc('tx_id') }}"
tests:
- not_null: *recent_date_filter
- name: SUCCEEDED
description: "{{ doc('tx_succeeded') }}"
tests:
- not_null: *recent_date_filter
- name: INDEX
description: "{{ doc('event_index') }}"
tests:
- not_null: *recent_date_filter
- name: LOG_INDEX
description: "{{ doc('log_index') }}"
tests:
- not_null: *recent_date_filter
- name: DATA
description: >
The encoded program data
- name: PROGRAM_ID
description: "{{ doc('program_id') }}"
tests:
- not_null: *recent_date_filter
- name: _UDF_ERROR
description: >
Error from `udf_get_logs_program_data` if unsuccessful
- name: _INSERTED_TIMESTAMP
description: "{{ doc('_inserted_timestamp') }}"
tests:
- not_null
- name: TRANSACTION_LOGS_PROGRAM_DATA_ID
description: '{{ doc("pk") }}'
tests:
- unique: *recent_date_filter
- name: INSERTED_TIMESTAMP
description: '{{ doc("inserted_timestamp") }}'
tests:
- not_null: *recent_date_filter
- name: MODIFIED_TIMESTAMP
description: '{{ doc("modified_timestamp") }}'
tests:
- not_null: *recent_date_filter
- name: _INVOCATION_ID
description: '{{ doc("_invocation_id") }}'
tests:
- not_null:
name: test_silver__not_null_transaction_logs_program_data__invocation_id
<<: *recent_date_filter