* history

* typo
This commit is contained in:
Austin 2024-06-21 11:33:44 -04:00 committed by GitHub
parent 46eddba5fa
commit 1d03556a04
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
17 changed files with 431 additions and 0 deletions

View File

@ -0,0 +1,45 @@
name: dbt_run_streamline_decoder_history
run-name: dbt_run_streamline_decoder_history
on:
workflow_dispatch:
schedule:
# Runs “at 7:03” (see https://crontab.guru)
- cron: '3 7 * * *'
env:
DBT_PROFILES_DIR: ./
ACCOUNT: "${{ vars.ACCOUNT }}"
ROLE: "${{ vars.ROLE }}"
USER: "${{ vars.USER }}"
PASSWORD: "${{ secrets.PASSWORD }}"
REGION: "${{ vars.REGION }}"
DATABASE: "${{ vars.DATABASE }}"
WAREHOUSE: "${{ vars.WAREHOUSE }}"
SCHEMA: "${{ vars.SCHEMA }}"
concurrency:
group: ${{ github.workflow }}
jobs:
run_dbt_jobs:
runs-on: ubuntu-latest
environment:
name: workflow_prod
steps:
- uses: actions/checkout@v3
- uses: actions/setup-python@v4
with:
python-version: "3.10"
cache: "pip"
- name: install dependencies
run: |
pip install -r requirements.txt
dbt deps
- name: Run DBT Jobs
run: |
dbt run --threads 8 --vars '{"STREAMLINE_INVOKE_STREAMS":True,"WAIT":240}' -m "sei_models,tag:streamline_decoded_logs_history" "sei_models,tag:streamline_decoded_logs_complete"

View File

@ -182,4 +182,69 @@ FROM
WHERE
b.partition_key = s.partition_key
AND DATA :error IS NULL
{% endmacro %}
{% macro decode_logs_history(
start,
stop
) %}
WITH look_back AS (
SELECT
block_number
FROM
{{ ref("_max_evm_block_by_date") }}
qualify ROW_NUMBER() over (
ORDER BY
block_number DESC
) = 1
)
SELECT
l.block_number,
l._log_id,
A.abi AS abi,
OBJECT_CONSTRUCT(
'topics',
l.topics,
'data',
l.data,
'address',
l.contract_address
) AS DATA
FROM
{{ ref("silver_evm__logs") }}
l
INNER JOIN {{ ref("silver_evm__complete_event_abis") }} A
ON A.parent_contract_address = l.contract_address
AND A.event_signature = l.topics[0]:: STRING
AND l.block_number BETWEEN A.start_block
AND A.end_block
WHERE
(
l.block_number BETWEEN {{ start }}
AND {{ stop }}
)
AND l.block_number <= (
SELECT
block_number
FROM
look_back
)
AND _log_id NOT IN (
SELECT
_log_id
FROM
{{ ref("streamline__complete_decode_logs") }}
WHERE
(
block_number BETWEEN {{ start }}
AND {{ stop }}
)
AND block_number <= (
SELECT
block_number
FROM
look_back
)
)
{% endmacro %}

View File

@ -0,0 +1,27 @@
{{ config (
materialized = "ephemeral",
unique_key = "block_number",
) }}
WITH base AS (
SELECT
block_timestamp :: DATE AS block_date,
MAX(block_number) as block_number
FROM
{{ ref("silver_evm__blocks") }}
GROUP BY
block_timestamp :: DATE
)
SELECT
block_date,
block_number
FROM
base
WHERE
block_date <> (
SELECT
MAX(block_date)
FROM
base
)

View File

@ -0,0 +1,21 @@
{{ config (
materialized = "view",
post_hook = [fsc_utils.if_data_call_function_v2(
func = 'streamline.udf_bulk_decode_logs_v2',
target = "{{this.schema}}.{{this.identifier}}",
params ={ "external_table" :"decoded_logs",
"sql_limit" :"7500000",
"producer_batch_size" :"400000",
"worker_batch_size" :"200000",
"sql_source" :"{{this.identifier}}" } ),
if_data_call_wait()
],
tags = ['streamline_decoded_logs_history']
) }}
{% set start = this.identifier.split("_") [-2] %}
{% set stop = this.identifier.split("_") [-1] %}
{{ decode_logs_history(
start,
stop
) }}

View File

@ -0,0 +1,21 @@
{{ config (
materialized = "view",
post_hook = [fsc_utils.if_data_call_function_v2(
func = 'streamline.udf_bulk_decode_logs_v2',
target = "{{this.schema}}.{{this.identifier}}",
params ={ "external_table" :"decoded_logs",
"sql_limit" :"7500000",
"producer_batch_size" :"400000",
"worker_batch_size" :"200000",
"sql_source" :"{{this.identifier}}" } ),
if_data_call_wait()
],
tags = ['streamline_decoded_logs_history']
) }}
{% set start = this.identifier.split("_") [-2] %}
{% set stop = this.identifier.split("_") [-1] %}
{{ decode_logs_history(
start,
stop
) }}

View File

@ -0,0 +1,21 @@
{{ config (
materialized = "view",
post_hook = [fsc_utils.if_data_call_function_v2(
func = 'streamline.udf_bulk_decode_logs_v2',
target = "{{this.schema}}.{{this.identifier}}",
params ={ "external_table" :"decoded_logs",
"sql_limit" :"7500000",
"producer_batch_size" :"400000",
"worker_batch_size" :"200000",
"sql_source" :"{{this.identifier}}" } ),
if_data_call_wait()
],
tags = ['streamline_decoded_logs_history']
) }}
{% set start = this.identifier.split("_") [-2] %}
{% set stop = this.identifier.split("_") [-1] %}
{{ decode_logs_history(
start,
stop
) }}

View File

@ -0,0 +1,21 @@
{{ config (
materialized = "view",
post_hook = [fsc_utils.if_data_call_function_v2(
func = 'streamline.udf_bulk_decode_logs_v2',
target = "{{this.schema}}.{{this.identifier}}",
params ={ "external_table" :"decoded_logs",
"sql_limit" :"7500000",
"producer_batch_size" :"400000",
"worker_batch_size" :"200000",
"sql_source" :"{{this.identifier}}" } ),
if_data_call_wait()
],
tags = ['streamline_decoded_logs_history']
) }}
{% set start = this.identifier.split("_") [-2] %}
{% set stop = this.identifier.split("_") [-1] %}
{{ decode_logs_history(
start,
stop
) }}

View File

@ -0,0 +1,21 @@
{{ config (
materialized = "view",
post_hook = [fsc_utils.if_data_call_function_v2(
func = 'streamline.udf_bulk_decode_logs_v2',
target = "{{this.schema}}.{{this.identifier}}",
params ={ "external_table" :"decoded_logs",
"sql_limit" :"7500000",
"producer_batch_size" :"400000",
"worker_batch_size" :"200000",
"sql_source" :"{{this.identifier}}" } ),
if_data_call_wait()
],
tags = ['streamline_decoded_logs_history']
) }}
{% set start = this.identifier.split("_") [-2] %}
{% set stop = this.identifier.split("_") [-1] %}
{{ decode_logs_history(
start,
stop
) }}

View File

@ -0,0 +1,21 @@
{{ config (
materialized = "view",
post_hook = [fsc_utils.if_data_call_function_v2(
func = 'streamline.udf_bulk_decode_logs_v2',
target = "{{this.schema}}.{{this.identifier}}",
params ={ "external_table" :"decoded_logs",
"sql_limit" :"7500000",
"producer_batch_size" :"400000",
"worker_batch_size" :"200000",
"sql_source" :"{{this.identifier}}" } ),
if_data_call_wait()
],
tags = ['streamline_decoded_logs_history']
) }}
{% set start = this.identifier.split("_") [-2] %}
{% set stop = this.identifier.split("_") [-1] %}
{{ decode_logs_history(
start,
stop
) }}

View File

@ -0,0 +1,21 @@
{{ config (
materialized = "view",
post_hook = [fsc_utils.if_data_call_function_v2(
func = 'streamline.udf_bulk_decode_logs_v2',
target = "{{this.schema}}.{{this.identifier}}",
params ={ "external_table" :"decoded_logs",
"sql_limit" :"7500000",
"producer_batch_size" :"400000",
"worker_batch_size" :"200000",
"sql_source" :"{{this.identifier}}" } ),
if_data_call_wait()
],
tags = ['streamline_decoded_logs_history']
) }}
{% set start = this.identifier.split("_") [-2] %}
{% set stop = this.identifier.split("_") [-1] %}
{{ decode_logs_history(
start,
stop
) }}

View File

@ -0,0 +1,21 @@
{{ config (
materialized = "view",
post_hook = [fsc_utils.if_data_call_function_v2(
func = 'streamline.udf_bulk_decode_logs_v2',
target = "{{this.schema}}.{{this.identifier}}",
params ={ "external_table" :"decoded_logs",
"sql_limit" :"7500000",
"producer_batch_size" :"400000",
"worker_batch_size" :"200000",
"sql_source" :"{{this.identifier}}" } ),
if_data_call_wait()
],
tags = ['streamline_decoded_logs_history']
) }}
{% set start = this.identifier.split("_") [-2] %}
{% set stop = this.identifier.split("_") [-1] %}
{{ decode_logs_history(
start,
stop
) }}

View File

@ -0,0 +1,21 @@
{{ config (
materialized = "view",
post_hook = [fsc_utils.if_data_call_function_v2(
func = 'streamline.udf_bulk_decode_logs_v2',
target = "{{this.schema}}.{{this.identifier}}",
params ={ "external_table" :"decoded_logs",
"sql_limit" :"7500000",
"producer_batch_size" :"400000",
"worker_batch_size" :"200000",
"sql_source" :"{{this.identifier}}" } ),
if_data_call_wait()
],
tags = ['streamline_decoded_logs_history']
) }}
{% set start = this.identifier.split("_") [-2] %}
{% set stop = this.identifier.split("_") [-1] %}
{{ decode_logs_history(
start,
stop
) }}

View File

@ -0,0 +1,21 @@
{{ config (
materialized = "view",
post_hook = [fsc_utils.if_data_call_function_v2(
func = 'streamline.udf_bulk_decode_logs_v2',
target = "{{this.schema}}.{{this.identifier}}",
params ={ "external_table" :"decoded_logs",
"sql_limit" :"7500000",
"producer_batch_size" :"400000",
"worker_batch_size" :"200000",
"sql_source" :"{{this.identifier}}" } ),
if_data_call_wait()
],
tags = ['streamline_decoded_logs_history']
) }}
{% set start = this.identifier.split("_") [-2] %}
{% set stop = this.identifier.split("_") [-1] %}
{{ decode_logs_history(
start,
stop
) }}

View File

@ -0,0 +1,21 @@
{{ config (
materialized = "view",
post_hook = [fsc_utils.if_data_call_function_v2(
func = 'streamline.udf_bulk_decode_logs_v2',
target = "{{this.schema}}.{{this.identifier}}",
params ={ "external_table" :"decoded_logs",
"sql_limit" :"7500000",
"producer_batch_size" :"400000",
"worker_batch_size" :"200000",
"sql_source" :"{{this.identifier}}" } ),
if_data_call_wait()
],
tags = ['streamline_decoded_logs_history']
) }}
{% set start = this.identifier.split("_") [-2] %}
{% set stop = this.identifier.split("_") [-1] %}
{{ decode_logs_history(
start,
stop
) }}

View File

@ -0,0 +1,21 @@
{{ config (
materialized = "view",
post_hook = [fsc_utils.if_data_call_function_v2(
func = 'streamline.udf_bulk_decode_logs_v2',
target = "{{this.schema}}.{{this.identifier}}",
params ={ "external_table" :"decoded_logs",
"sql_limit" :"7500000",
"producer_batch_size" :"400000",
"worker_batch_size" :"200000",
"sql_source" :"{{this.identifier}}" } ),
if_data_call_wait()
],
tags = ['streamline_decoded_logs_history']
) }}
{% set start = this.identifier.split("_") [-2] %}
{% set stop = this.identifier.split("_") [-1] %}
{{ decode_logs_history(
start,
stop
) }}

View File

@ -0,0 +1,21 @@
{{ config (
materialized = "view",
post_hook = [fsc_utils.if_data_call_function_v2(
func = 'streamline.udf_bulk_decode_logs_v2',
target = "{{this.schema}}.{{this.identifier}}",
params ={ "external_table" :"decoded_logs",
"sql_limit" :"7500000",
"producer_batch_size" :"400000",
"worker_batch_size" :"200000",
"sql_source" :"{{this.identifier}}" } ),
if_data_call_wait()
],
tags = ['streamline_decoded_logs_history']
) }}
{% set start = this.identifier.split("_") [-2] %}
{% set stop = this.identifier.split("_") [-1] %}
{{ decode_logs_history(
start,
stop
) }}

View File

@ -0,0 +1,21 @@
{{ config (
materialized = "view",
post_hook = [fsc_utils.if_data_call_function_v2(
func = 'streamline.udf_bulk_decode_logs_v2',
target = "{{this.schema}}.{{this.identifier}}",
params ={ "external_table" :"decoded_logs",
"sql_limit" :"7500000",
"producer_batch_size" :"400000",
"worker_batch_size" :"200000",
"sql_source" :"{{this.identifier}}" } ),
if_data_call_wait()
],
tags = ['streamline_decoded_logs_history']
) }}
{% set start = this.identifier.split("_") [-2] %}
{% set stop = this.identifier.split("_") [-1] %}
{{ decode_logs_history(
start,
stop
) }}