Silver events (#12)

* wip

* events model + dependent udf

* docs and tests

* add inner_instructions

* change column to inner_index

---------

Co-authored-by: desmond-hui <desmond@flipsidecryto.com>
This commit is contained in:
desmond-hui 2024-09-09 06:44:22 -07:00 committed by GitHub
parent 130c66c58e
commit 09b0d04ff2
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
11 changed files with 359 additions and 0 deletions

View File

@ -9,6 +9,7 @@
{{ create_udf_get_compute_units_consumed(schema = "silver") }}
{{ create_udf_get_compute_units_total(schema = "silver") }}
{{ create_udf_get_tx_size(schema = "silver") }}
{{ create_udf_get_all_inner_instruction_program_ids(schema = "silver") }}
{% endset %}
{% do run_query(sql) %}
{% endif %}

View File

@ -174,3 +174,24 @@ def get_tx_size(accts, instructions, version, addr_lookups, signers) -> int:
$$;
{% endmacro %}
{% macro create_udf_get_all_inner_instruction_program_ids(schema) %}
create or replace function {{ schema }}.udf_get_all_inner_instruction_program_ids(inner_instruction variant)
returns array
language python
runtime_version = '3.8'
handler = 'get_all_inner_instruction_program_ids'
as
$$
def get_all_inner_instruction_program_ids(inner_instruction) -> list:
program_ids = []
if inner_instruction:
for v in inner_instruction.get('instructions',[]):
if type(v) is dict and v.get("programId"):
program_ids.append(v.get("programId"))
else:
program_ids.append(None)
return program_ids
$$;
{% endmacro %}

View File

@ -0,0 +1,5 @@
{% docs event_index %}
Location of the instruction (event) within a transaction
{% enddocs %}

View File

@ -0,0 +1,5 @@
{% docs event_type %}
The type of event (i.e. "delegate", "withdraw") that is occurring
{% enddocs %}

View File

@ -0,0 +1,5 @@
{% docs inner_instruction_index %}
Location of the instruction within an instruction's (event) inner instruction
{% enddocs %}

View File

@ -0,0 +1,5 @@
{% docs inner_instruction_program_ids %}
List of program ids that is called by the instruction
{% enddocs %}

View File

@ -0,0 +1,5 @@
{% docs program_id %}
An address that identifies the program that is being interacted with. I.E. which DEX for a swap or marketplace for an NFT sale.
{% enddocs %}

View File

@ -0,0 +1,94 @@
{{ config(
materialized = 'incremental',
unique_key = ['block_id','tx_id','index'],
incremental_predicates = ["dynamic_range_predicate", "block_timestamp::date"],
cluster_by = ['block_timestamp::DATE','_inserted_timestamp::DATE','program_id'],
merge_exclude_columns = ["inserted_timestamp"],
tags = ['scheduled_core']
) }}
{% if execute %}
{% if is_incremental() %}
{% set max_inserted_query %}
SELECT
MAX(_inserted_timestamp) AS _inserted_timestamp
FROM
{{ this }}
{% endset %}
{% set max_inserted_timestamp = run_query(max_inserted_query)[0][0] %}
{% endif %}
{% endif %}
WITH base_transactions AS (
SELECT
block_timestamp,
block_id,
tx_id,
succeeded,
signers,
instructions,
inner_instructions,
_inserted_timestamp
FROM
{{ ref('silver__transactions') }}
{% if is_incremental() %}
WHERE
_inserted_timestamp >= '{{ max_inserted_timestamp }}'
{% else %}
WHERE
_inserted_timestamp::date = '2024-08-30' /* TODO replace with whenever we start getting data in PROD */
{% endif %}
),
base_instructions AS (
SELECT
block_timestamp,
block_id,
tx_id,
succeeded,
signers,
i.index::integer AS index,
i.value:parsed:type::string AS event_type,
i.value:programId::string AS program_id,
i.value AS instruction,
_inserted_timestamp
FROM
base_transactions t,
table(flatten(instructions)) i
),
base_inner_instructions AS (
SELECT
block_id,
tx_id,
ii.value:index::integer AS mapped_instruction_index,
ii.value AS inner_instruction,
silver.udf_get_all_inner_instruction_program_ids(ii.value) AS inner_instruction_program_ids,
FROM
base_transactions t,
table(flatten(inner_instructions)) ii
)
SELECT
i.block_timestamp,
i.block_id,
i.tx_id,
i.signers,
i.succeeded,
i.index,
i.program_id AS program_id,
i.event_type AS event_type,
i.instruction,
ii.inner_instruction,
ii.inner_instruction_program_ids,
i._inserted_timestamp,
{{ dbt_utils.generate_surrogate_key(
['i.block_id', 'i.tx_id', 'i.index']
) }} AS events_id,
sysdate() AS inserted_timestamp,
sysdate() AS modified_timestamp,
'{{ invocation_id }}' AS _invocation_id
FROM
base_instructions i
LEFT OUTER JOIN
base_inner_instructions ii
ON ii.block_id = i.block_id
AND ii.tx_id = i.tx_id
AND ii.mapped_instruction_index = i.index

View File

@ -0,0 +1,75 @@
version: 2
models:
- name: silver__events
recent_date_filter: &recent_date_filter
config:
where: _inserted_timestamp >= current_date - 7
tests:
- dbt_utils.unique_combination_of_columns:
combination_of_columns:
- TX_ID
- INDEX
where: block_timestamp::date > current_date - 30
columns:
- name: BLOCK_TIMESTAMP
description: "{{ doc('block_timestamp') }}"
tests:
- not_null: *recent_date_filter
# TODO, add this once we understand usage a bit more
# - dbt_expectations.expect_row_values_to_have_recent_data:
# datepart: day
# interval: 2
- name: BLOCK_ID
description: "{{ doc('block_id') }}"
tests:
- not_null: *recent_date_filter
- name: TX_ID
description: "{{ doc('tx_id') }}"
tests:
- not_null: *recent_date_filter
- name: SIGNERS
description: "{{ doc('signers') }}"
tests:
- not_null: *recent_date_filter
- name: SUCCEEDED
description: "{{ doc('tx_succeeded') }}"
tests:
- not_null: *recent_date_filter
- name: INDEX
description: "{{ doc('event_index') }}"
tests:
- not_null: *recent_date_filter
- name: PROGRAM_ID
description: "{{ doc('program_id') }}"
- name: EVENT_TYPE
description: "{{ doc('event_type') }}"
- name: INSTRUCTION
description: "{{ doc('instruction') }}"
tests:
- not_null: *recent_date_filter
- name: INNER_INSTRUCTION
description: "{{ doc('inner_instruction') }}"
- name: INNER_INSTRUCTION_PROGRAM_IDS
description: "{{ doc('inner_instruction_program_ids') }}"
- name: _INSERTED_TIMESTAMP
description: "{{ doc('_inserted_timestamp') }}"
tests:
- not_null
- name: EVENTS_ID
description: '{{ doc("pk") }}'
data_tests:
- not_null: *recent_date_filter
- name: INSERTED_TIMESTAMP
description: '{{ doc("inserted_timestamp") }}'
data_tests:
- not_null: *recent_date_filter
- name: MODIFIED_TIMESTAMP
description: '{{ doc("modified_timestamp") }}'
data_tests:
- not_null: *recent_date_filter
- name: _INVOCATION_ID
description: '{{ doc("_invocation_id") }}'
data_tests:
- not_null:
name: test_silver__not_null_events_invocation_id
<<: *recent_date_filter

View File

@ -0,0 +1,67 @@
{{ config(
materialized = 'incremental',
unique_key = ['block_id','tx_id','instruction_index','inner_index'],
incremental_predicates = ["dynamic_range_predicate", "block_timestamp::date"],
cluster_by = ['block_timestamp::DATE','_inserted_timestamp::DATE','program_id'],
merge_exclude_columns = ["inserted_timestamp"],
tags = ['scheduled_core']
) }}
{% if execute %}
{% if is_incremental() %}
{% set max_inserted_query %}
SELECT
MAX(_inserted_timestamp) AS _inserted_timestamp
FROM
{{ this }}
{% endset %}
{% set max_inserted_timestamp = run_query(max_inserted_query)[0][0] %}
{% endif %}
{% endif %}
WITH pre_final AS (
SELECT
block_timestamp,
block_id,
tx_id,
e.index AS instruction_index,
ii.index::integer AS inner_index,
succeeded,
signers,
e.program_id AS instruction_program_id,
ii.value:programId::string AS program_id,
ii.value:parsed:type::string AS event_type,
ii.value AS instruction,
_inserted_timestamp
FROM
{{ ref('silver__events') }} e,
table(flatten(inner_instruction:instructions)) ii
{% if is_incremental() %}
WHERE
_inserted_timestamp >= '{{ max_inserted_timestamp }}'
{% else %}
WHERE
_inserted_timestamp::date = '2024-08-30' /* TODO replace with whenever we start getting data in PROD */
{% endif %}
)
SELECT
block_timestamp,
block_id,
tx_id,
signers,
succeeded,
instruction_index,
inner_index,
instruction_program_id,
program_id,
event_type,
instruction,
_inserted_timestamp,
{{ dbt_utils.generate_surrogate_key(
['block_id', 'tx_id', 'instruction_index', 'inner_index']
) }} AS inner_instructions_id,
sysdate() AS inserted_timestamp,
sysdate() AS modified_timestamp,
'{{ invocation_id }}' AS _invocation_id
FROM
pre_final

View File

@ -0,0 +1,76 @@
version: 2
models:
- name: silver__inner_instructions
recent_date_filter: &recent_date_filter
config:
where: _inserted_timestamp >= current_date - 7
tests:
- dbt_utils.unique_combination_of_columns:
combination_of_columns:
- TX_ID
- INSTRUCTION_INDEX
- INNER_INDEX
where: block_timestamp::date > current_date - 30
columns:
- name: BLOCK_TIMESTAMP
description: "{{ doc('block_timestamp') }}"
tests:
- not_null: *recent_date_filter
# TODO, add this once we understand usage a bit more
# - dbt_expectations.expect_row_values_to_have_recent_data:
# datepart: day
# interval: 2
- name: BLOCK_ID
description: "{{ doc('block_id') }}"
tests:
- not_null: *recent_date_filter
- name: TX_ID
description: "{{ doc('tx_id') }}"
tests:
- not_null: *recent_date_filter
- name: SIGNERS
description: "{{ doc('signers') }}"
tests:
- not_null: *recent_date_filter
- name: SUCCEEDED
description: "{{ doc('tx_succeeded') }}"
tests:
- not_null: *recent_date_filter
- name: INSTRUCTION_INDEX
description: "{{ doc('event_index') }}"
- name: INNER_INDEX
description: "{{ doc('inner_instruction_index') }}"
tests:
- not_null: *recent_date_filter
- name: INSTRUCTION_PROGRAM_ID
description: "{{ doc('program_id') }}. For the instruction calling this inner instruction."
- name: PROGRAM_ID
description: "{{ doc('program_id') }}"
- name: EVENT_TYPE
description: "{{ doc('event_type') }}"
- name: INSTRUCTION
description: "{{ doc('instruction') }}"
tests:
- not_null: *recent_date_filter
- name: _INSERTED_TIMESTAMP
description: "{{ doc('_inserted_timestamp') }}"
tests:
- not_null
- name: INNER_INSTRUCTIONS_ID
description: '{{ doc("pk") }}'
data_tests:
- not_null: *recent_date_filter
- name: INSERTED_TIMESTAMP
description: '{{ doc("inserted_timestamp") }}'
data_tests:
- not_null: *recent_date_filter
- name: MODIFIED_TIMESTAMP
description: '{{ doc("modified_timestamp") }}'
data_tests:
- not_null: *recent_date_filter
- name: _INVOCATION_ID
description: '{{ doc("_invocation_id") }}'
data_tests:
- not_null:
name: test_silver__not_null_inner_instructions_invocation_id
<<: *recent_date_filter