decoder history

This commit is contained in:
Austin 2024-07-31 11:59:20 -04:00
parent 609f5853ce
commit a1b9e97221
22 changed files with 768 additions and 1 deletions

View File

@ -0,0 +1,47 @@
name: dbt_run_streamline_decoder
run-name: dbt_run_streamline_decoder
on:
workflow_dispatch:
branches:
- "main"
env:
DBT_PROFILES_DIR: ./
ACCOUNT: "${{ vars.ACCOUNT }}"
ROLE: "${{ vars.ROLE }}"
USER: "${{ vars.USER }}"
PASSWORD: "${{ secrets.PASSWORD }}"
REGION: "${{ vars.REGION }}"
DATABASE: "${{ vars.DATABASE }}"
WAREHOUSE: "${{ vars.WAREHOUSE }}"
SCHEMA: "${{ vars.SCHEMA }}"
concurrency:
group: ${{ github.workflow }}
cancel-in-progress: true
jobs:
run_dbt_jobs:
runs-on: ubuntu-latest
environment:
name: workflow_prod
steps:
- uses: actions/checkout@v3
- uses: actions/setup-python@v4
with:
python-version: "3.10"
cache: "pip"
- name: install dependencies
run: |
pip install -r requirements.txt
dbt deps
- name: Run DBT Jobs
run: |
dbt run -m "berachain_models,tag:decoded_logs"

View File

@ -0,0 +1,48 @@
name: dbt_run_streamline_decoder_history
run-name: dbt_run_streamline_decoder_history
on:
workflow_dispatch:
schedule:
# Runs “At minute 32 past every 12th hour.” (see https://crontab.guru)
- cron: '32 */12 * * *'
env:
DBT_PROFILES_DIR: ./
ACCOUNT: "${{ vars.ACCOUNT }}"
ROLE: "${{ vars.ROLE }}"
USER: "${{ vars.USER }}"
PASSWORD: "${{ secrets.PASSWORD }}"
REGION: "${{ vars.REGION }}"
DATABASE: "${{ vars.DATABASE }}"
WAREHOUSE: "${{ vars.WAREHOUSE }}"
SCHEMA: "${{ vars.SCHEMA }}"
concurrency:
group: ${{ github.workflow }}
cancel-in-progress: true
jobs:
run_dbt_jobs:
runs-on: ubuntu-latest
environment:
name: workflow_prod
steps:
- uses: actions/checkout@v3
- uses: actions/setup-python@v4
with:
python-version: "3.10"
cache: "pip"
- name: install dependencies
run: |
pip install -r requirements.txt
dbt deps
- name: Run DBT Jobs
run: |
dbt run --threads 8 --vars '{"STREAMLINE_INVOKE_STREAMS":True,"WAIT":240}' -m "berachain_models,tag:streamline_decoded_logs_history" "berachain_models,tag:streamline_decoded_logs_complete"

View File

@ -1,4 +1,5 @@
workflow_name,workflow_schedule
dbt_run_streamline_chainhead,"3,33 * * * *"
dbt_run_scheduled_non_realtime,"15,45 * * * *"
dbt_test_tasks,"8 * * * *"
dbt_test_tasks,"8 * * * *"
dbt_run_streamline_decoder,"23,53 * * * *"
1 workflow_name workflow_schedule
2 dbt_run_streamline_chainhead 3,33 * * * *
3 dbt_run_scheduled_non_realtime 15,45 * * * *
4 dbt_test_tasks 8 * * * *
5 dbt_run_streamline_decoder 23,53 * * * *

View File

@ -65,3 +65,67 @@ WHERE
b.partition_key = s.partition_key
AND DATA :error IS NULL
{% endmacro %}
{% macro decode_logs_history(
start,
stop
) %}
WITH look_back AS (
SELECT
block_number
FROM
{{ ref("_max_block_by_date") }}
qualify ROW_NUMBER() over (
ORDER BY
block_number DESC
) = 1
)
SELECT
l.block_number,
l._log_id,
A.abi AS abi,
OBJECT_CONSTRUCT(
'topics',
l.topics,
'data',
l.data,
'address',
l.contract_address
) AS DATA
FROM
{{ ref("silver_testnet__logs") }}
l
INNER JOIN {{ ref("silver_testnet__complete_event_abis") }} A
ON A.parent_contract_address = l.contract_address
AND A.event_signature = l.topics[0]:: STRING
AND l.block_number BETWEEN A.start_block
AND A.end_block
WHERE
(
l.block_number BETWEEN {{ start }}
AND {{ stop }}
)
AND l.block_number <= (
SELECT
block_number
FROM
look_back
)
AND _log_id NOT IN (
SELECT
_log_id
FROM
{{ ref("streamline__testnet_complete_decode_logs") }}
WHERE
(
block_number BETWEEN {{ start }}
AND {{ stop }}
)
AND block_number <= (
SELECT
block_number
FROM
look_back
)
)
{% endmacro %}

View File

@ -0,0 +1,246 @@
-- depends_on: {{ ref('bronze_testnet__decoded_logs') }}
{{ config (
materialized = "incremental",
unique_key = ['block_number', 'event_index'],
cluster_by = ["modified_timestamp::date","block_timestamp::date"],
incremental_predicates = ["dynamic_range", "block_number"],
post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION",
merge_exclude_columns = ["inserted_timestamp"],
full_refresh = false,
tags = ['decoded_logs','reorg']
) }}
WITH base_data AS (
SELECT
block_number :: INTEGER AS block_number,
SPLIT(
id,
'-'
) [0] :: STRING AS tx_hash,
SPLIT(
id,
'-'
) [1] :: INTEGER AS event_index,
DATA :name :: STRING AS event_name,
LOWER(
DATA :address :: STRING
) :: STRING AS contract_address,
DATA AS decoded_data,
id :: STRING AS _log_id,
TO_TIMESTAMP_NTZ(_inserted_timestamp) AS _inserted_timestamp
FROM
{% if is_incremental() %}
{{ ref('bronze_testnet__decoded_logs') }}
WHERE
TO_TIMESTAMP_NTZ(_inserted_timestamp) >= (
SELECT
MAX(_inserted_timestamp) - INTERVAL '2 hours'
FROM
{{ this }}
)
AND DATA NOT ILIKE '%Event topic is not present in given ABI%'
{% else %}
{{ ref('bronze_testnet__fr_decoded_logs') }}
WHERE
DATA NOT ILIKE '%Event topic is not present in given ABI%'
{% endif %}
qualify(ROW_NUMBER() over (PARTITION BY block_number, event_index
ORDER BY
_inserted_timestamp DESC, _partition_by_created_date DESC)) = 1
),
transformed_logs AS (
SELECT
block_number,
tx_hash,
event_index,
contract_address,
event_name,
decoded_data,
_inserted_timestamp,
_log_id,
utils.udf_transform_logs(decoded_data) AS transformed
FROM
base_data
),
FINAL AS (
SELECT
b.tx_hash,
b.block_number,
b.event_index,
b.event_name,
b.contract_address,
b.decoded_data,
transformed,
b._log_id,
b._inserted_timestamp,
OBJECT_AGG(
DISTINCT CASE
WHEN v.value :name = '' THEN CONCAT(
'anonymous_',
v.index
)
ELSE v.value :name
END,
v.value :value
) AS decoded_flat
FROM
transformed_logs b,
LATERAL FLATTEN(
input => transformed :data
) v
GROUP BY
b.tx_hash,
b.block_number,
b.event_index,
b.event_name,
b.contract_address,
b.decoded_data,
transformed,
b._log_id,
b._inserted_timestamp
),
new_records AS (
SELECT
b.tx_hash,
b.block_number,
b.event_index,
b.event_name,
b.contract_address,
b.decoded_data,
b.transformed,
b._log_id,
b._inserted_timestamp,
b.decoded_flat,
block_timestamp,
origin_function_signature,
origin_from_address,
origin_to_address,
topics,
DATA,
event_removed :: STRING AS event_removed,
tx_status,
CASE
WHEN block_timestamp IS NULL THEN TRUE
ELSE FALSE
END AS is_pending
FROM
FINAL b
LEFT JOIN {{ ref('silver_testnet__logs') }} USING (
block_number,
_log_id
)
)
{% if is_incremental() %},
missing_data AS (
SELECT
t.tx_hash,
t.block_number,
t.event_index,
t.event_name,
t.contract_address,
t.decoded_data,
t.transformed,
t._log_id,
GREATEST(
TO_TIMESTAMP_NTZ(
t._inserted_timestamp
),
TO_TIMESTAMP_NTZ(
l._inserted_timestamp
)
) AS _inserted_timestamp,
t.decoded_flat,
l.block_timestamp,
l.origin_function_signature,
l.origin_from_address,
l.origin_to_address,
l.topics,
l.data,
l.event_removed :: STRING AS event_removed,
l.tx_status,
FALSE AS is_pending
FROM
{{ this }}
t
INNER JOIN {{ ref('silver_testnet__logs') }}
l USING (
block_number,
_log_id
)
WHERE
t.is_pending
AND l.block_timestamp IS NOT NULL
),
complete_data as (
{% endif %}
SELECT
tx_hash,
block_number,
event_index,
event_name,
contract_address,
decoded_data,
transformed,
_log_id,
_inserted_timestamp,
decoded_flat,
block_timestamp,
origin_function_signature,
origin_from_address,
origin_to_address,
topics,
DATA,
event_removed,
tx_status,
is_pending,
{{ dbt_utils.generate_surrogate_key(
['tx_hash', 'event_index']
) }} AS decoded_logs_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp,
'{{ invocation_id }}' AS _invocation_id
FROM
new_records
{% if is_incremental() %}
UNION
SELECT
tx_hash,
block_number,
event_index,
event_name,
contract_address,
decoded_data,
transformed,
_log_id,
_inserted_timestamp,
decoded_flat,
block_timestamp,
origin_function_signature,
origin_from_address,
origin_to_address,
topics,
DATA,
event_removed,
tx_status,
is_pending,
{{ dbt_utils.generate_surrogate_key(
['tx_hash', 'event_index']
) }} AS decoded_logs_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp,
'{{ invocation_id }}' AS _invocation_id
FROM
missing_data
{% endif %}
)
SELECT
*
FROM
complete_data qualify(ROW_NUMBER() over (PARTITION BY block_number, event_index
ORDER BY
_inserted_timestamp DESC, is_pending ASC)) = 1

View File

@ -0,0 +1,9 @@
{{ config (
materialized = 'view',
tags = ['full_test']
) }}
SELECT
*
FROM
{{ ref('silver_testnet__decoded_logs') }}

View File

@ -0,0 +1,52 @@
version: 2
models:
- name: test_silver_testnet__decoded_logs_full
tests:
- dbt_utils.unique_combination_of_columns:
combination_of_columns:
- _LOG_ID
- dbt_utils.recency:
datepart: day
field: _INSERTED_TIMESTAMP
interval: 1
columns:
- name: BLOCK_NUMBER
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- NUMBER
- FLOAT
- name: TX_HASH
tests:
- not_null
- dbt_expectations.expect_column_values_to_match_regex:
regex: 0[xX][0-9a-fA-F]+
- name: EVENT_INDEX
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- NUMBER
- FLOAT
- name: CONTRACT_ADDRESS
tests:
- not_null
- dbt_expectations.expect_column_values_to_match_regex:
regex: 0[xX][0-9a-fA-F]+
- name: _INSERTED_TIMESTAMP
tests:
- not_null
- name: EVENT_NAME
tests:
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- STRING
- VARCHAR

View File

@ -0,0 +1,23 @@
{{ config (
materialized = 'view',
tags = ['recent_test']
) }}
WITH last_3_days AS (
SELECT
block_number
FROM
{{ ref("_block_lookback") }}
)
SELECT
*
FROM
{{ ref('silver_testnet__decoded_logs') }}
WHERE
block_number >= (
SELECT
block_number
FROM
last_3_days
)

View File

@ -0,0 +1,46 @@
version: 2
models:
- name: test_silver_testnet__decoded_logs_recent
tests:
- dbt_utils.unique_combination_of_columns:
combination_of_columns:
- _LOG_ID
- dbt_utils.recency:
datepart: day
field: _INSERTED_TIMESTAMP
interval: 1
columns:
- name: BLOCK_NUMBER
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- NUMBER
- FLOAT
- name: TX_HASH
tests:
- not_null
- dbt_expectations.expect_column_values_to_match_regex:
regex: 0[xX][0-9a-fA-F]+
- name: EVENT_INDEX
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- NUMBER
- FLOAT
- name: CONTRACT_ADDRESS
tests:
- not_null
- dbt_expectations.expect_column_values_to_match_regex:
regex: 0[xX][0-9a-fA-F]+
- name: _INSERTED_TIMESTAMP
tests:
- not_null
- name: EVENT_NAME
tests:
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- STRING
- VARCHAR

View File

@ -0,0 +1,21 @@
{{ config (
materialized = "view",
post_hook = [fsc_utils.if_data_call_function_v2(
func = 'streamline.udf_bulk_decode_logs_v2',
target = "{{this.schema}}.{{this.identifier}}",
params ={ "external_table" :"decoded_logs",
"sql_limit" :"7500000",
"producer_batch_size" :"400000",
"worker_batch_size" :"200000",
"sql_source" :"{{this.identifier}}" } ),
if_data_call_wait()
],
tags = ['streamline_decoded_logs_history']
) }}
{% set start = this.identifier.split("_") [-2] %}
{% set stop = this.identifier.split("_") [-1] %}
{{ decode_logs_history(
start,
stop
) }}

View File

@ -0,0 +1,21 @@
{{ config (
materialized = "view",
post_hook = [fsc_utils.if_data_call_function_v2(
func = 'streamline.udf_bulk_decode_logs_v2',
target = "{{this.schema}}.{{this.identifier}}",
params ={ "external_table" :"decoded_logs",
"sql_limit" :"7500000",
"producer_batch_size" :"400000",
"worker_batch_size" :"200000",
"sql_source" :"{{this.identifier}}" } ),
if_data_call_wait()
],
tags = ['streamline_decoded_logs_history']
) }}
{% set start = this.identifier.split("_") [-2] %}
{% set stop = this.identifier.split("_") [-1] %}
{{ decode_logs_history(
start,
stop
) }}

View File

@ -0,0 +1,21 @@
{{ config (
materialized = "view",
post_hook = [fsc_utils.if_data_call_function_v2(
func = 'streamline.udf_bulk_decode_logs_v2',
target = "{{this.schema}}.{{this.identifier}}",
params ={ "external_table" :"decoded_logs",
"sql_limit" :"7500000",
"producer_batch_size" :"400000",
"worker_batch_size" :"200000",
"sql_source" :"{{this.identifier}}" } ),
if_data_call_wait()
],
tags = ['streamline_decoded_logs_history']
) }}
{% set start = this.identifier.split("_") [-2] %}
{% set stop = this.identifier.split("_") [-1] %}
{{ decode_logs_history(
start,
stop
) }}

View File

@ -0,0 +1,21 @@
{{ config (
materialized = "view",
post_hook = [fsc_utils.if_data_call_function_v2(
func = 'streamline.udf_bulk_decode_logs_v2',
target = "{{this.schema}}.{{this.identifier}}",
params ={ "external_table" :"decoded_logs",
"sql_limit" :"7500000",
"producer_batch_size" :"400000",
"worker_batch_size" :"200000",
"sql_source" :"{{this.identifier}}" } ),
if_data_call_wait()
],
tags = ['streamline_decoded_logs_history']
) }}
{% set start = this.identifier.split("_") [-2] %}
{% set stop = this.identifier.split("_") [-1] %}
{{ decode_logs_history(
start,
stop
) }}

View File

@ -0,0 +1,21 @@
{{ config (
materialized = "view",
post_hook = [fsc_utils.if_data_call_function_v2(
func = 'streamline.udf_bulk_decode_logs_v2',
target = "{{this.schema}}.{{this.identifier}}",
params ={ "external_table" :"decoded_logs",
"sql_limit" :"7500000",
"producer_batch_size" :"400000",
"worker_batch_size" :"200000",
"sql_source" :"{{this.identifier}}" } ),
if_data_call_wait()
],
tags = ['streamline_decoded_logs_history']
) }}
{% set start = this.identifier.split("_") [-2] %}
{% set stop = this.identifier.split("_") [-1] %}
{{ decode_logs_history(
start,
stop
) }}

View File

@ -0,0 +1,21 @@
{{ config (
materialized = "view",
post_hook = [fsc_utils.if_data_call_function_v2(
func = 'streamline.udf_bulk_decode_logs_v2',
target = "{{this.schema}}.{{this.identifier}}",
params ={ "external_table" :"decoded_logs",
"sql_limit" :"7500000",
"producer_batch_size" :"400000",
"worker_batch_size" :"200000",
"sql_source" :"{{this.identifier}}" } ),
if_data_call_wait()
],
tags = ['streamline_decoded_logs_history']
) }}
{% set start = this.identifier.split("_") [-2] %}
{% set stop = this.identifier.split("_") [-1] %}
{{ decode_logs_history(
start,
stop
) }}

View File

@ -0,0 +1,21 @@
{{ config (
materialized = "view",
post_hook = [fsc_utils.if_data_call_function_v2(
func = 'streamline.udf_bulk_decode_logs_v2',
target = "{{this.schema}}.{{this.identifier}}",
params ={ "external_table" :"decoded_logs",
"sql_limit" :"7500000",
"producer_batch_size" :"400000",
"worker_batch_size" :"200000",
"sql_source" :"{{this.identifier}}" } ),
if_data_call_wait()
],
tags = ['streamline_decoded_logs_history']
) }}
{% set start = this.identifier.split("_") [-2] %}
{% set stop = this.identifier.split("_") [-1] %}
{{ decode_logs_history(
start,
stop
) }}

View File

@ -0,0 +1,21 @@
{{ config (
materialized = "view",
post_hook = [fsc_utils.if_data_call_function_v2(
func = 'streamline.udf_bulk_decode_logs_v2',
target = "{{this.schema}}.{{this.identifier}}",
params ={ "external_table" :"decoded_logs",
"sql_limit" :"7500000",
"producer_batch_size" :"400000",
"worker_batch_size" :"200000",
"sql_source" :"{{this.identifier}}" } ),
if_data_call_wait()
],
tags = ['streamline_decoded_logs_history']
) }}
{% set start = this.identifier.split("_") [-2] %}
{% set stop = this.identifier.split("_") [-1] %}
{{ decode_logs_history(
start,
stop
) }}

View File

@ -0,0 +1,21 @@
{{ config (
materialized = "view",
post_hook = [fsc_utils.if_data_call_function_v2(
func = 'streamline.udf_bulk_decode_logs_v2',
target = "{{this.schema}}.{{this.identifier}}",
params ={ "external_table" :"decoded_logs",
"sql_limit" :"7500000",
"producer_batch_size" :"400000",
"worker_batch_size" :"200000",
"sql_source" :"{{this.identifier}}" } ),
if_data_call_wait()
],
tags = ['streamline_decoded_logs_history']
) }}
{% set start = this.identifier.split("_") [-2] %}
{% set stop = this.identifier.split("_") [-1] %}
{{ decode_logs_history(
start,
stop
) }}

View File

@ -0,0 +1,21 @@
{{ config (
materialized = "view",
post_hook = [fsc_utils.if_data_call_function_v2(
func = 'streamline.udf_bulk_decode_logs_v2',
target = "{{this.schema}}.{{this.identifier}}",
params ={ "external_table" :"decoded_logs",
"sql_limit" :"7500000",
"producer_batch_size" :"400000",
"worker_batch_size" :"200000",
"sql_source" :"{{this.identifier}}" } ),
if_data_call_wait()
],
tags = ['streamline_decoded_logs_history']
) }}
{% set start = this.identifier.split("_") [-2] %}
{% set stop = this.identifier.split("_") [-1] %}
{{ decode_logs_history(
start,
stop
) }}

View File

@ -0,0 +1,21 @@
{{ config (
materialized = "view",
post_hook = [fsc_utils.if_data_call_function_v2(
func = 'streamline.udf_bulk_decode_logs_v2',
target = "{{this.schema}}.{{this.identifier}}",
params ={ "external_table" :"decoded_logs",
"sql_limit" :"7500000",
"producer_batch_size" :"400000",
"worker_batch_size" :"200000",
"sql_source" :"{{this.identifier}}" } ),
if_data_call_wait()
],
tags = ['streamline_decoded_logs_history']
) }}
{% set start = this.identifier.split("_") [-2] %}
{% set stop = this.identifier.split("_") [-1] %}
{{ decode_logs_history(
start,
stop
) }}