upgrade decoding (#68)

* upgrade decoding

* complete
This commit is contained in:
Austin 2024-11-04 15:50:47 -05:00 committed by GitHub
parent 72e03e0670
commit e256ba655e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
20 changed files with 260 additions and 333 deletions

View File

@ -1,12 +1,11 @@
name: dbt_run_streamline_decoder_history
run-name: dbt_run_streamline_decoder_history
name: dbt_run_scheduled_decoded_logs_history_user_abis
run-name: dbt_run_scheduled_decoded_logs_history_user_abis
on:
workflow_dispatch:
schedule:
# Runs “at 7:03” (see https://crontab.guru)
- cron: '3 7 * * *'
branches:
- "main"
env:
DBT_PROFILES_DIR: ./
@ -22,12 +21,10 @@ env:
concurrency:
group: ${{ github.workflow }}
jobs:
run_dbt_jobs:
runs-on: ubuntu-latest
environment:
environment:
name: workflow_prod
steps:
@ -42,6 +39,7 @@ jobs:
run: |
pip install -r requirements.txt
dbt deps
- name: Run DBT Jobs
- name: Kick off decoded logs history, if there are new ABIs from users
run: |
dbt run --threads 8 --vars '{"STREAMLINE_INVOKE_STREAMS":True,"WAIT":240}' -m "sei_models,tag:streamline_decoded_logs_history" "sei_models,tag:streamline_decoded_logs_complete"
dbt run-operation run_decoded_logs_history

View File

@ -0,0 +1,49 @@
name: dbt_run_streamline_decoded_logs_history
run-name: dbt_run_streamline_decoded_logs_history
on:
workflow_dispatch:
branches:
- "main"
env:
DBT_PROFILES_DIR: ./
ACCOUNT: "${{ vars.ACCOUNT }}"
ROLE: "${{ vars.ROLE }}"
USER: "${{ vars.USER }}"
PASSWORD: "${{ secrets.PASSWORD }}"
REGION: "${{ vars.REGION }}"
DATABASE: "${{ vars.DATABASE }}"
WAREHOUSE: "${{ vars.WAREHOUSE }}"
SCHEMA: "${{ vars.SCHEMA }}"
concurrency:
group: ${{ github.workflow }}
jobs:
run_dbt_jobs:
runs-on: ubuntu-latest
environment:
name: workflow_prod
steps:
- uses: actions/checkout@v3
- uses: actions/setup-python@v4
with:
python-version: "3.10"
cache: "pip"
- name: install dependencies
run: |
pip install -r requirements.txt
dbt deps
- name: Update complete table
run: |
dbt run -m "sei_models,tag:streamline_decoded_logs_complete"
- name: Decode historical logs
run: |
dbt run-operation decoded_logs_history --args '{"backfill_mode": true}' --vars '{"STREAMLINE_INVOKE_STREAMS":True}'

View File

@ -4,4 +4,6 @@ dbt_run_non_core,"17 * * * *"
dbt_run_streamline_blocks_txcount_realtime,"24,54 * * * *"
dbt_run_streamline_transactions_realtime,"29,59 * * * *"
dbt_test_tasks,"5,35 * * * *"
dbt_run_streamline_evm_chainhead,"9,39 * * * *"
dbt_run_streamline_evm_chainhead,"9,39 * * * *"
dbt_run_streamline_decoded_logs_history,"5 23 * * 6"
dbt_run_scheduled_decoded_logs_history_user_abis,"8 23 * * *"
1 workflow_name workflow_schedule
4 dbt_run_streamline_blocks_txcount_realtime 24,54 * * * *
5 dbt_run_streamline_transactions_realtime 29,59 * * * *
6 dbt_test_tasks 5,35 * * * *
7 dbt_run_streamline_evm_chainhead 9,39 * * * *
8 dbt_run_streamline_decoded_logs_history 5 23 * * 6
9 dbt_run_scheduled_decoded_logs_history_user_abis 8 23 * * *

View File

@ -0,0 +1,123 @@
{% macro decoded_logs_history(backfill_mode=false) %}
{%- set params = {
"sql_limit": var("DECODED_LOGS_HISTORY_SQL_LIMIT", 7500000),
"producer_batch_size": var("DECODED_LOGS_HISTORY_PRODUCER_BATCH_SIZE", 400000),
"worker_batch_size": var("DECODED_LOGS_HISTORY_WORKER_BATCH_SIZE", 100000)
} -%}
{% set wait_time = var("DECODED_LOGS_HISTORY_WAIT_TIME", 60) %}
{% set find_months_query %}
SELECT
DISTINCT date_trunc('month', block_timestamp)::date as month
FROM {{ ref('core_evm__fact_blocks') }}
ORDER BY month ASC
{% endset %}
{% set results = run_query(find_months_query) %}
{% if execute %}
{% set months = results.columns[0].values() %}
{% for month in months %}
{% set view_name = 'decoded_logs_history_' ~ month.strftime('%Y_%m') %}
{% set create_view_query %}
create or replace view streamline.{{view_name}} as (
WITH target_blocks AS (
SELECT
block_number
FROM {{ ref('core_evm__fact_blocks') }}
WHERE date_trunc('month', block_timestamp) = '{{month}}'::timestamp
),
new_abis AS (
SELECT
abi,
parent_contract_address,
event_signature,
start_block,
end_block
FROM {{ ref('silver_evm__complete_event_abis') }}
{% if not backfill_mode %}
WHERE inserted_timestamp > dateadd('day', -30, sysdate())
{% endif %}
),
existing_logs_to_exclude AS (
SELECT _log_id
FROM {{ ref('streamline__complete_decode_logs') }} l
INNER JOIN target_blocks b using (block_number)
),
candidate_logs AS (
SELECT
l.block_number,
l.tx_hash,
l.event_index,
l.contract_address,
l.topics,
l.data,
concat(l.tx_hash::string, '-', l.event_index::string) as _log_id
FROM target_blocks b
INNER JOIN {{ ref('core_evm__fact_event_logs') }} l using (block_number)
WHERE l.tx_status = 'SUCCESS' and date_trunc('month', l.block_timestamp) = '{{month}}'::timestamp
)
SELECT
l.block_number,
l._log_id,
A.abi,
OBJECT_CONSTRUCT(
'topics', l.topics,
'data', l.data,
'address', l.contract_address
) AS data
FROM candidate_logs l
INNER JOIN new_abis A
ON A.parent_contract_address = l.contract_address
AND A.event_signature = l.topics[0]::STRING
AND l.block_number BETWEEN A.start_block AND A.end_block
WHERE NOT EXISTS (
SELECT 1
FROM existing_logs_to_exclude e
WHERE e._log_id = l._log_id
)
LIMIT {{ params.sql_limit }}
)
{% endset %}
{# Create the view #}
{% do run_query(create_view_query) %}
{{ log("Created view for month " ~ month.strftime('%Y-%m'), info=True) }}
{% if var("STREAMLINE_INVOKE_STREAMS", false) %}
{# Invoke streamline, if rows exist to decode #}
{% set decode_query %}
SELECT
streamline.udf_bulk_decode_logs_v2(
PARSE_JSON(
$${ "external_table": "decoded_logs",
"producer_batch_size": {{ params.producer_batch_size }},
"sql_limit": {{ params.sql_limit }},
"sql_source": "{{view_name}}",
"worker_batch_size": {{ params.worker_batch_size }} }$$
)
)
WHERE
EXISTS(
SELECT 1
FROM streamline.{{view_name}}
LIMIT 1
);
{% endset %}
{% do run_query(decode_query) %}
{{ log("Triggered decoding for month " ~ month.strftime('%Y-%m'), info=True) }}
{# Call wait to avoid queueing up too many jobs #}
{% do run_query("call system$wait(" ~ wait_time ~ ")") %}
{{ log("Completed wait after decoding for month " ~ month.strftime('%Y-%m'), info=True) }}
{% endif %}
{% endfor %}
{% endif %}
{% endmacro %}

View File

@ -0,0 +1,28 @@
{% macro run_decoded_logs_history() %}
{% set check_for_new_user_abis_query %}
select 1
from {{ ref('silver_evm__user_verified_abis') }}
where _inserted_timestamp::date = sysdate()::date
{% endset %}
{% set results = run_query(check_for_new_user_abis_query) %}
{% if execute %}
{% set new_user_abis = results.columns[0].values()[0] %}
{% if new_user_abis %}
{% set invoke_workflow_query %}
SELECT
github_actions.workflow_dispatches(
'FlipsideCrypto',
'sei-models',
'dbt_run_streamline_decoded_logs_history.yml',
NULL
)
{% endset %}
{% do run_query(invoke_workflow_query) %}
{% endif %}
{% endif %}
{% endmacro %}

View File

@ -1,21 +0,0 @@
{{ config (
materialized = "view",
post_hook = [fsc_utils.if_data_call_function_v2(
func = 'streamline.udf_bulk_decode_logs_v2',
target = "{{this.schema}}.{{this.identifier}}",
params ={ "external_table" :"decoded_logs",
"sql_limit" :"7500000",
"producer_batch_size" :"400000",
"worker_batch_size" :"200000",
"sql_source" :"{{this.identifier}}" } ),
if_data_call_wait()
],
tags = ['streamline_decoded_logs_history']
) }}
{% set start = this.identifier.split("_") [-2] %}
{% set stop = this.identifier.split("_") [-1] %}
{{ decode_logs_history(
start,
stop
) }}

View File

@ -1,21 +0,0 @@
{{ config (
materialized = "view",
post_hook = [fsc_utils.if_data_call_function_v2(
func = 'streamline.udf_bulk_decode_logs_v2',
target = "{{this.schema}}.{{this.identifier}}",
params ={ "external_table" :"decoded_logs",
"sql_limit" :"7500000",
"producer_batch_size" :"400000",
"worker_batch_size" :"200000",
"sql_source" :"{{this.identifier}}" } ),
if_data_call_wait()
],
tags = ['streamline_decoded_logs_history']
) }}
{% set start = this.identifier.split("_") [-2] %}
{% set stop = this.identifier.split("_") [-1] %}
{{ decode_logs_history(
start,
stop
) }}

View File

@ -1,21 +0,0 @@
{{ config (
materialized = "view",
post_hook = [fsc_utils.if_data_call_function_v2(
func = 'streamline.udf_bulk_decode_logs_v2',
target = "{{this.schema}}.{{this.identifier}}",
params ={ "external_table" :"decoded_logs",
"sql_limit" :"7500000",
"producer_batch_size" :"400000",
"worker_batch_size" :"200000",
"sql_source" :"{{this.identifier}}" } ),
if_data_call_wait()
],
tags = ['streamline_decoded_logs_history']
) }}
{% set start = this.identifier.split("_") [-2] %}
{% set stop = this.identifier.split("_") [-1] %}
{{ decode_logs_history(
start,
stop
) }}

View File

@ -1,21 +0,0 @@
{{ config (
materialized = "view",
post_hook = [fsc_utils.if_data_call_function_v2(
func = 'streamline.udf_bulk_decode_logs_v2',
target = "{{this.schema}}.{{this.identifier}}",
params ={ "external_table" :"decoded_logs",
"sql_limit" :"7500000",
"producer_batch_size" :"400000",
"worker_batch_size" :"200000",
"sql_source" :"{{this.identifier}}" } ),
if_data_call_wait()
],
tags = ['streamline_decoded_logs_history']
) }}
{% set start = this.identifier.split("_") [-2] %}
{% set stop = this.identifier.split("_") [-1] %}
{{ decode_logs_history(
start,
stop
) }}

View File

@ -1,21 +0,0 @@
{{ config (
materialized = "view",
post_hook = [fsc_utils.if_data_call_function_v2(
func = 'streamline.udf_bulk_decode_logs_v2',
target = "{{this.schema}}.{{this.identifier}}",
params ={ "external_table" :"decoded_logs",
"sql_limit" :"7500000",
"producer_batch_size" :"400000",
"worker_batch_size" :"200000",
"sql_source" :"{{this.identifier}}" } ),
if_data_call_wait()
],
tags = ['streamline_decoded_logs_history']
) }}
{% set start = this.identifier.split("_") [-2] %}
{% set stop = this.identifier.split("_") [-1] %}
{{ decode_logs_history(
start,
stop
) }}

View File

@ -1,21 +0,0 @@
{{ config (
materialized = "view",
post_hook = [fsc_utils.if_data_call_function_v2(
func = 'streamline.udf_bulk_decode_logs_v2',
target = "{{this.schema}}.{{this.identifier}}",
params ={ "external_table" :"decoded_logs",
"sql_limit" :"7500000",
"producer_batch_size" :"400000",
"worker_batch_size" :"200000",
"sql_source" :"{{this.identifier}}" } ),
if_data_call_wait()
],
tags = ['streamline_decoded_logs_history']
) }}
{% set start = this.identifier.split("_") [-2] %}
{% set stop = this.identifier.split("_") [-1] %}
{{ decode_logs_history(
start,
stop
) }}

View File

@ -1,21 +0,0 @@
{{ config (
materialized = "view",
post_hook = [fsc_utils.if_data_call_function_v2(
func = 'streamline.udf_bulk_decode_logs_v2',
target = "{{this.schema}}.{{this.identifier}}",
params ={ "external_table" :"decoded_logs",
"sql_limit" :"7500000",
"producer_batch_size" :"400000",
"worker_batch_size" :"200000",
"sql_source" :"{{this.identifier}}" } ),
if_data_call_wait()
],
tags = ['streamline_decoded_logs_history']
) }}
{% set start = this.identifier.split("_") [-2] %}
{% set stop = this.identifier.split("_") [-1] %}
{{ decode_logs_history(
start,
stop
) }}

View File

@ -1,21 +0,0 @@
{{ config (
materialized = "view",
post_hook = [fsc_utils.if_data_call_function_v2(
func = 'streamline.udf_bulk_decode_logs_v2',
target = "{{this.schema}}.{{this.identifier}}",
params ={ "external_table" :"decoded_logs",
"sql_limit" :"7500000",
"producer_batch_size" :"400000",
"worker_batch_size" :"200000",
"sql_source" :"{{this.identifier}}" } ),
if_data_call_wait()
],
tags = ['streamline_decoded_logs_history']
) }}
{% set start = this.identifier.split("_") [-2] %}
{% set stop = this.identifier.split("_") [-1] %}
{{ decode_logs_history(
start,
stop
) }}

View File

@ -1,21 +0,0 @@
{{ config (
materialized = "view",
post_hook = [fsc_utils.if_data_call_function_v2(
func = 'streamline.udf_bulk_decode_logs_v2',
target = "{{this.schema}}.{{this.identifier}}",
params ={ "external_table" :"decoded_logs",
"sql_limit" :"7500000",
"producer_batch_size" :"400000",
"worker_batch_size" :"200000",
"sql_source" :"{{this.identifier}}" } ),
if_data_call_wait()
],
tags = ['streamline_decoded_logs_history']
) }}
{% set start = this.identifier.split("_") [-2] %}
{% set stop = this.identifier.split("_") [-1] %}
{{ decode_logs_history(
start,
stop
) }}

View File

@ -1,21 +0,0 @@
{{ config (
materialized = "view",
post_hook = [fsc_utils.if_data_call_function_v2(
func = 'streamline.udf_bulk_decode_logs_v2',
target = "{{this.schema}}.{{this.identifier}}",
params ={ "external_table" :"decoded_logs",
"sql_limit" :"7500000",
"producer_batch_size" :"400000",
"worker_batch_size" :"200000",
"sql_source" :"{{this.identifier}}" } ),
if_data_call_wait()
],
tags = ['streamline_decoded_logs_history']
) }}
{% set start = this.identifier.split("_") [-2] %}
{% set stop = this.identifier.split("_") [-1] %}
{{ decode_logs_history(
start,
stop
) }}

View File

@ -1,21 +0,0 @@
{{ config (
materialized = "view",
post_hook = [fsc_utils.if_data_call_function_v2(
func = 'streamline.udf_bulk_decode_logs_v2',
target = "{{this.schema}}.{{this.identifier}}",
params ={ "external_table" :"decoded_logs",
"sql_limit" :"7500000",
"producer_batch_size" :"400000",
"worker_batch_size" :"200000",
"sql_source" :"{{this.identifier}}" } ),
if_data_call_wait()
],
tags = ['streamline_decoded_logs_history']
) }}
{% set start = this.identifier.split("_") [-2] %}
{% set stop = this.identifier.split("_") [-1] %}
{{ decode_logs_history(
start,
stop
) }}

View File

@ -1,21 +0,0 @@
{{ config (
materialized = "view",
post_hook = [fsc_utils.if_data_call_function_v2(
func = 'streamline.udf_bulk_decode_logs_v2',
target = "{{this.schema}}.{{this.identifier}}",
params ={ "external_table" :"decoded_logs",
"sql_limit" :"7500000",
"producer_batch_size" :"400000",
"worker_batch_size" :"200000",
"sql_source" :"{{this.identifier}}" } ),
if_data_call_wait()
],
tags = ['streamline_decoded_logs_history']
) }}
{% set start = this.identifier.split("_") [-2] %}
{% set stop = this.identifier.split("_") [-1] %}
{{ decode_logs_history(
start,
stop
) }}

View File

@ -1,21 +0,0 @@
{{ config (
materialized = "view",
post_hook = [fsc_utils.if_data_call_function_v2(
func = 'streamline.udf_bulk_decode_logs_v2',
target = "{{this.schema}}.{{this.identifier}}",
params ={ "external_table" :"decoded_logs",
"sql_limit" :"7500000",
"producer_batch_size" :"400000",
"worker_batch_size" :"200000",
"sql_source" :"{{this.identifier}}" } ),
if_data_call_wait()
],
tags = ['streamline_decoded_logs_history']
) }}
{% set start = this.identifier.split("_") [-2] %}
{% set stop = this.identifier.split("_") [-1] %}
{{ decode_logs_history(
start,
stop
) }}

View File

@ -1,21 +0,0 @@
{{ config (
materialized = "view",
post_hook = [fsc_utils.if_data_call_function_v2(
func = 'streamline.udf_bulk_decode_logs_v2',
target = "{{this.schema}}.{{this.identifier}}",
params ={ "external_table" :"decoded_logs",
"sql_limit" :"7500000",
"producer_batch_size" :"400000",
"worker_batch_size" :"200000",
"sql_source" :"{{this.identifier}}" } ),
if_data_call_wait()
],
tags = ['streamline_decoded_logs_history']
) }}
{% set start = this.identifier.split("_") [-2] %}
{% set stop = this.identifier.split("_") [-1] %}
{{ decode_logs_history(
start,
stop
) }}

View File

@ -12,12 +12,50 @@
tags = ['streamline_decoded_logs_realtime']
) }}
WITH look_back AS (
WITH target_blocks AS (
SELECT
SELECT
block_number
FROM
{{ ref('core_evm__fact_blocks') }}
WHERE
block_number >= (
SELECT
block_number
FROM
{{ ref("_evm_block_lookback") }}
)
),
existing_logs_to_exclude AS (
SELECT
_log_id
FROM
{{ ref("_evm_block_lookback") }}
{{ ref('streamline__complete_decode_logs') }}
l
INNER JOIN target_blocks b USING (block_number)
WHERE
l._inserted_timestamp :: DATE >= DATEADD('day', -2, SYSDATE())
),
candidate_logs AS (
SELECT
l.block_number,
l.tx_hash,
l.event_index,
l.contract_address,
l.topics,
l.data,
CONCAT(
l.tx_hash :: STRING,
'-',
l.event_index :: STRING
) AS _log_id
FROM
target_blocks b
INNER JOIN {{ ref('core_evm__fact_event_logs') }}
l USING (block_number)
WHERE
l.tx_status = 'SUCCESS'
AND l.inserted_timestamp :: DATE >= DATEADD('day', -2, SYSDATE())
)
SELECT
l.block_number,
@ -32,35 +70,18 @@ SELECT
l.contract_address
) AS DATA
FROM
{{ ref("silver_evm__logs") }}
l
INNER JOIN {{ ref("silver_evm__complete_event_abis") }} A
candidate_logs l
INNER JOIN {{ ref('silver_evm__complete_event_abis') }} A
ON A.parent_contract_address = l.contract_address
AND A.event_signature = l.topics [0] :: STRING
AND l.block_number BETWEEN A.start_block
AND A.end_block
WHERE
(
l.block_number >= (
SELECT
block_number
FROM
look_back
)
)
AND l.block_number IS NOT NULL
AND l.block_timestamp >= DATEADD('day', -2, CURRENT_DATE())
AND _log_id NOT IN (
NOT EXISTS (
SELECT
_log_id
1
FROM
{{ ref("streamline__complete_decode_logs") }}
existing_logs_to_exclude e
WHERE
block_number >= (
SELECT
block_number
FROM
look_back
)
AND _inserted_timestamp >= DATEADD('day', -2, CURRENT_DATE())
)
e._log_id = l._log_id
)