diff --git a/macros/create_udfs.sql b/macros/create_udfs.sql index c7d0720..0e71085 100644 --- a/macros/create_udfs.sql +++ b/macros/create_udfs.sql @@ -10,6 +10,7 @@ {{ create_udf_get_compute_units_total(schema = "silver") }} {{ create_udf_get_tx_size(schema = "silver") }} {{ create_udf_get_all_inner_instruction_program_ids(schema = "silver") }} + {{ create_udf_get_logs_program_data(schema = "silver") }} {% endset %} {% do run_query(sql) %} {% endif %} diff --git a/macros/python/udfs.sql b/macros/python/udfs.sql index 3a98324..ba975c2 100644 --- a/macros/python/udfs.sql +++ b/macros/python/udfs.sql @@ -194,4 +194,162 @@ def get_all_inner_instruction_program_ids(inner_instruction) -> list: return program_ids $$; +{% endmacro %} + +{% macro create_udf_get_logs_program_data(schema) %} +create or replace function {{ schema }}.udf_get_logs_program_data(logs array) +returns array +language python +runtime_version = '3.8' +handler = 'get_logs_program_data' +AS +$$ +import re +import base64 + +def get_logs_program_data(logs) -> list: + def base58_decode(s): + base58_chars = '123456789ABCDEFGHJKLMNPQRSTUVWXYZabcdefghijkmnopqrstuvwxyz' + base58_base = len(base58_chars) + num = 0 + for char in s: + num *= base58_base + if char not in base58_chars: + return b"" + num += base58_chars.index(char) + result = bytearray() + while num > 0: + num, mod = divmod(num, 256) + result.insert(0, mod) + # Handle leading zeros in Base58 string + for char in s: + if char == '1': + result.insert(0, 0) + else: + break + return bytes(result) + + def is_eclipse_program_id(s): + return len(base58_decode(s)) == 32 + + def is_base64(s): + if not re.match(r'^[A-Za-z0-9+/]*={0,2}$', s): + return False + + if len(s) % 4 != 0: + return False + + try: + decoded = base64.b64decode(s) + printable_ratio = sum(32 <= byte <= 126 for byte in decoded) / len(decoded) + if printable_ratio > 0.9: + return False + return True + except: + return False + + program_data = [] + parent_event_type = "" + child_event_type = "" + parent_index = -1 + child_index = None + current_ancestry = [] + + program_end_pattern = re.compile(r"Program ([a-zA-Z0-9]+) success") + pattern = re.compile(r'invoke \[(?!1\])\d+\]') + + if len(logs) == 1: + return None + + try: + for i, log in enumerate(logs): + if log == "Log truncated": + break + + if log.endswith(" invoke [1]"): + child_index = None + program = log.replace("Program ","").replace(" invoke [1]","") + parent_index += 1 + + if i+1 < len(logs) and logs[i+1].startswith("Program log: Instruction: "): + parent_event_type = logs[i+1].replace("Program log: Instruction: ","") + elif i+1 < len(logs) and logs[i+1].startswith("Program log: IX: "): + parent_event_type = logs[i+1].replace("Program log: IX: ","") + else: + parent_event_type = "UNKNOWN" + + current_ancestry = [(program,None,parent_event_type)] + elif log.startswith("Call BPF program ") or log.startswith("Upgraded program "): # handle legacy BPF log format + # remove legacy log parsing code it is not compatible w/ new + continue + elif bool(pattern.search(log)): + child_index = child_index+1 if child_index is not None else 0 + current_program = pattern.sub('', log.replace("Program ","")).strip() + current_node = int(pattern.search(log)[0].replace("invoke [","").replace("]","")) + + if i+1 < len(logs) and logs[i+1].startswith("Program log: Instruction: "): + child_event_type = logs[i+1].replace("Program log: Instruction: ","") + elif i+1 < len(logs) and logs[i+1].startswith("Program log: IX: "): + child_event_type = logs[i+1].replace("Program log: IX: ","") + else: + child_event_type = "UNKNOWN" + + if len(current_ancestry) >= current_node: + current_ancestry[current_node-1] = (current_program, child_index, child_event_type) + else: + current_ancestry.append((current_program, child_index, child_event_type)) + + maybe_program_end = program_end_pattern.search(log) + if maybe_program_end: + maybe_program_id = maybe_program_end.group(1) + else: + maybe_program_id = "" + + if is_eclipse_program_id(maybe_program_id): + current_program_id = current_ancestry[-1][0] + current_event_type = current_ancestry[-1][2] + current_index = parent_index + current_inner_index = current_ancestry[-1][1] + current_ancestry.pop() + else: + current_program_id = current_ancestry[-1][0] + current_event_type = current_ancestry[-1][2] + current_index = parent_index + current_inner_index = current_ancestry[-1][1] + + if log.startswith("Program data: "): + data = log.replace("Program data: ", "") + if program_data and program_data[-1]["data"] is None: + # Update the most recent entry + program_data[-1]["data"] = data + else: + # Append new entry if not found + program_data.append({ + "data": data, + "program_id": current_program_id, + "index": current_index, + "inner_index": current_inner_index, + "event_type": current_event_type + }) + elif log.startswith("Program log: "): + data = log.replace("Program log: ", "") + if is_base64(data): + program_data.append({"data": data, + "program_id": current_program_id, + "index": current_index, + "inner_index": current_inner_index, + "event_type": current_event_type}) + break + elif current_program_id != "TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA" and current_program_id != "TokenzQdBNbLqP5VEhdkAS6EPFLC1PHnBqCXEpPxuEb" and current_event_type != "UNKNOWN": + program_data.append({"data": None, + "program_id": current_program_id, + "index": current_index, + "inner_index": current_inner_index, + "event_type": current_event_type}) + except Exception as e: + message = f"error trying to parse logs {e}" + return [{"error": message}] + + return program_data if len(program_data) > 0 else None +$$; {% endmacro %} \ No newline at end of file diff --git a/models/descriptions/log_index.md b/models/descriptions/log_index.md new file mode 100644 index 0000000..65c0609 --- /dev/null +++ b/models/descriptions/log_index.md @@ -0,0 +1,5 @@ +{% docs log_index %} + +Position of program data log for a specific transaction. Used to differentiate logs with the same (index, inner_index) + +{% enddocs %} \ No newline at end of file diff --git a/models/silver/program_logs/silver__transaction_logs_program_data.sql b/models/silver/program_logs/silver__transaction_logs_program_data.sql new file mode 100644 index 0000000..06a3609 --- /dev/null +++ b/models/silver/program_logs/silver__transaction_logs_program_data.sql @@ -0,0 +1,76 @@ +{{ config( + materialized = 'incremental', + unique_key = "transaction_logs_program_data_id", + incremental_predicates = ["dynamic_range_predicate", "block_timestamp::date"], + cluster_by = ['block_timestamp::DATE','_inserted_timestamp::DATE'], + post_hook = enable_search_optimization('{{this.schema}}','{{this.identifier}}','ON EQUALITY(tx_id, program_id)'), + merge_exclude_columns = ["inserted_timestamp"], + tags = ['scheduled_core'], +) }} + +WITH base AS ( + SELECT + block_timestamp, + block_id, + tx_id, + succeeded, + signers, + silver.udf_get_logs_program_data(log_messages) AS program_data_logs, + _inserted_timestamp + FROM + {{ ref('silver__transactions') }} + WHERE + succeeded + AND log_messages IS NOT NULL + {% if is_incremental() %} + AND _inserted_timestamp >= (SELECT max(_inserted_timestamp) FROM {{ this }}) + {% endif %} +), +prefinal as ( + SELECT + t.block_timestamp, + t.block_id, + t.tx_id, + t.succeeded, + t.signers, + l.value:index::int AS index, + l.value:inner_index::int AS inner_index, + l.index AS log_index, + l.value:program_id::string AS program_id, + l.value:event_type::string AS event_type, + l.value:data::string AS data, + l.value:error::string AS _udf_error, + _inserted_timestamp + FROM + base t + JOIN + table(flatten(program_data_logs)) l +) +-- for logs with null 'data', we only need to return a single log per tx/program_id/event_type since additional entries are not useful +-- if the log does have 'data', we always return it +SELECT + *, + {{ dbt_utils.generate_surrogate_key( + ['tx_id','index','log_index'] + ) }} AS transaction_logs_program_data_id, + SYSDATE() AS inserted_timestamp, + SYSDATE() AS modified_timestamp, + '{{ invocation_id }}' AS _invocation_id +FROM + prefinal +WHERE + data IS NULL +QUALIFY (ROW_NUMBER() OVER (PARTITION BY tx_id, program_id, event_type ORDER BY log_index DESC)) = 1 +UNION ALL +SELECT + *, + {{ dbt_utils.generate_surrogate_key( + ['tx_id','index','log_index'] + ) }} AS transaction_logs_program_data_id, + SYSDATE() AS inserted_timestamp, + SYSDATE() AS modified_timestamp, + '{{ invocation_id }}' AS _invocation_id +FROM + prefinal +WHERE + data IS NOT NULL \ No newline at end of file diff --git a/models/silver/program_logs/silver__transaction_logs_program_data.yml b/models/silver/program_logs/silver__transaction_logs_program_data.yml new file mode 100644 index 0000000..64297a6 --- /dev/null +++ b/models/silver/program_logs/silver__transaction_logs_program_data.yml @@ -0,0 +1,68 @@ +version: 2 +models: + - name: silver__transaction_logs_program_data + recent_date_filter: &recent_date_filter + config: + where: > + _inserted_timestamp >= current_date - 7 + columns: + - name: BLOCK_TIMESTAMP + description: "{{ doc('block_timestamp') }}" + tests: + - not_null: + where: _inserted_timestamp >= current_date - 7 AND block_id > 39824213 + - dbt_expectations.expect_row_values_to_have_recent_data: + datepart: day + interval: 1 + - name: BLOCK_ID + description: "{{ doc('block_id') }}" + tests: + - not_null: *recent_date_filter + - name: TX_ID + description: "{{ doc('tx_id') }}" + tests: + - not_null: *recent_date_filter + - name: SUCCEEDED + description: "{{ doc('tx_succeeded') }}" + tests: + - not_null: *recent_date_filter + - name: INDEX + description: "{{ doc('event_index') }}" + tests: + - not_null: *recent_date_filter + - name: LOG_INDEX + description: "{{ doc('log_index') }}" + tests: + - not_null: *recent_date_filter + - name: DATA + description: > + The encoded program data + - name: PROGRAM_ID + description: "{{ doc('program_id') }}" + tests: + - not_null: *recent_date_filter + - name: _UDF_ERROR + description: > + Error from `udf_get_logs_program_data` if unsuccessful + - name: _INSERTED_TIMESTAMP + description: "{{ doc('_inserted_timestamp') }}" + tests: + - not_null + - name: TRANSACTION_LOGS_PROGRAM_DATA_ID + description: '{{ doc("pk") }}' + tests: + - unique: *recent_date_filter + - name: INSERTED_TIMESTAMP + description: '{{ doc("inserted_timestamp") }}' + tests: + - not_null: *recent_date_filter + - name: MODIFIED_TIMESTAMP + description: '{{ doc("modified_timestamp") }}' + tests: + - not_null: *recent_date_filter + - name: _INVOCATION_ID + description: '{{ doc("_invocation_id") }}' + tests: + - not_null: + name: test_silver__not_null_transaction_logs_program_data__invocation_id + <<: *recent_date_filter \ No newline at end of file