This commit is contained in:
Austin 2025-06-17 12:03:47 -04:00
parent 9cd6fd257b
commit e307841e91
27 changed files with 483 additions and 132 deletions

View File

@ -45,7 +45,7 @@ jobs:
- name: Run DBT Jobs
run: |
dbt run --vars '{"STREAMLINE_INVOKE_STREAMS":True}' -m 1+models/silver/streamline/realtime/streamline__blocks_realtime.sql 1+models/silver/streamline/realtime/streamline__transactions_realtime.sql
dbt run --vars '{"STREAMLINE_INVOKE_STREAMS":True}' -m tag:streamline_core_evm_realtime
- name: Store logs
uses: actions/upload-artifact@v4

View File

@ -1,5 +1,5 @@
name: dbt_run_streamline_realtime_tx_receipts
run-name: dbt_run_streamline_realtime_tx_receipts
name: dbt_run_streamline_realtime_step_2
run-name: dbt_run_streamline_realtime_step_2
on:
workflow_dispatch:
@ -45,7 +45,7 @@ jobs:
- name: Run DBT Jobs
run: |
dbt run --vars '{"STREAMLINE_INVOKE_STREAMS":True}' -m 1+models/silver/streamline/realtime/streamline__tx_receipts_realtime.sql
dbt run --vars '{"STREAMLINE_INVOKE_STREAMS":True}' -m tag:streamline_core_evm_realtime_step_2
- name: Store logs
uses: actions/upload-artifact@v4

View File

@ -46,7 +46,6 @@ jobs:
- name: Run DBT Jobs
run: |
dbt test --exclude tag:full_test --models tag:recent_test
continue-on-error: true
notify-failure:
needs: [run_dbt_jobs]

View File

@ -1,51 +0,0 @@
name: dbt_test_tasks
on:
workflow_call:
env:
USE_VARS: "${{ vars.USE_VARS }}"
DBT_VERSION: "${{ vars.DBT_VERSION }}"
DBT_PROFILES_DIR: "${{ vars.DBT_PROFILES_DIR }}"
ACCOUNT: "${{ vars.ACCOUNT }}"
ROLE: "${{ vars.ROLE }}"
USER: "${{ vars.USER }}"
PASSWORD: "${{ secrets.PASSWORD }}"
REGION: "${{ vars.REGION }}"
DATABASE: "${{ vars.DATABASE }}"
SCHEMA: "${{ vars.SCHEMA }}"
WAREHOUSE: "${{ vars.WAREHOUSE }}"
concurrency:
group: ${{ github.workflow }}
cancel-in-progress: false
jobs:
run_dbt_jobs:
runs-on: ubuntu-latest
environment:
name: workflow_prod
steps:
- uses: actions/checkout@v3
- uses: actions/setup-python@v4
with:
python-version: "3.10"
cache: "pip"
- name: install dependencies
run: |
pip install -r requirements.txt
dbt deps
- name: Run DBT Jobs
run: |
dbt test -m models/github_actions/github_actions__current_task_status.sql
notify-failure:
needs: [run_dbt_jobs]
if: failure()
uses: ./.github/workflows/slack_notify.yml
secrets:
EVM_SLACK_WEBHOOK_URL: ${{ secrets.EVM_SLACK_WEBHOOK_URL }}

View File

@ -46,7 +46,6 @@ jobs:
- name: Run DBT Jobs
run: |
dbt test --exclude tag:recent_test --models tag:full_test
continue-on-error: true
notify-failure:
needs: [run_dbt_jobs]

View File

@ -1,5 +1,5 @@
workflow_name,workflow_schedule
dbt_run_streamline_realtime_blocks_transactions,"12,42 * * * *"
dbt_run_streamline_realtime_tx_receipts,"17,47 * * * *"
dbt_run_streamline_realtime_step_2,"20,52 * * * *"
dbt_run_scheduled,"25,55 * * * *"
dbt_test_tasks,"28,58 * * * *"
dbt_test_tasks,"28,58 * * * *"
1 workflow_name workflow_schedule
2 dbt_run_streamline_realtime_blocks_transactions 12,42 * * * *
3 dbt_run_streamline_realtime_tx_receipts dbt_run_streamline_realtime_step_2 17,47 * * * * 20,52 * * * *
4 dbt_run_scheduled 25,55 * * * *
5 dbt_test_tasks 28,58 * * * *

View File

@ -0,0 +1,40 @@
{{ config (
materialized = 'view',
tags = ['bronze','core','streamline_v1','phase_1']
) }}
WITH meta AS (
SELECT
job_created_time AS _inserted_timestamp,
file_name,
CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 4), '_', 1) AS INTEGER) AS partition_key
FROM
TABLE(
information_schema.external_table_file_registration_history(
start_time => DATEADD('day', -3, CURRENT_TIMESTAMP()),
table_name => '{{ source( "bronze_streamline", "receipts_by_hash") }}'
)
) A
)
SELECT
s.*,
b.file_name,
b._inserted_timestamp,
COALESCE(
s.value :"BLOCK_NUMBER" :: STRING,
s.metadata :request :"data" :id :: STRING,
PARSE_JSON(
s.metadata :request :"data"
) :id :: STRING
) :: INT AS block_number,
s.value: "TX_HASH" :: STRING AS tx_hash
FROM
{{ source( "bronze_streamline", "receipts_by_hash") }}
s
JOIN meta b
ON b.file_name = metadata$filename
AND b.partition_key = s.partition_key
WHERE
b.partition_key = s.partition_key
AND DATA :error IS NULL
AND DATA IS NOT NULL

View File

@ -0,0 +1,28 @@
{{ config (
materialized = 'view',
tags = ['bronze','core','streamline_v1','phase_1']
) }}
SELECT
partition_key,
block_number,
tx_hash,
VALUE,
DATA,
metadata,
file_name,
_inserted_timestamp
FROM
{{ ref('bronze__receipts_fr_v2') }}
UNION ALL
SELECT
_partition_by_block_id AS partition_key,
block_number,
tx_hash,
VALUE,
DATA,
metadata,
file_name,
_inserted_timestamp
FROM
{{ ref('bronze__receipts_fr_v1') }}

View File

@ -0,0 +1,40 @@
{{ config (
materialized = 'view',
tags = ['bronze','core','streamline_v1','phase_1']
) }}
WITH meta AS (
SELECT
registered_on AS _inserted_timestamp,
file_name,
CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 4), '_', 1) AS INTEGER ) AS partition_key
FROM
TABLE(
information_schema.external_table_files(
table_name => '{{ source( "bronze_streamline", "receipts_by_hash") }}'
)
) A
)
SELECT
s.*,
b.file_name,
b._inserted_timestamp,
COALESCE(
s.value :"BLOCK_NUMBER" :: STRING,
s.value :"block_number" :: STRING,
s.metadata :request :"data" :id :: STRING,
PARSE_JSON(
s.metadata :request :"data"
) :id :: STRING
) :: INT AS block_number,
s.value: "TX_HASH" :: STRING AS tx_hash
FROM
{{ source( "bronze_streamline", "receipts_by_hash") }}
s
JOIN meta b
ON b.file_name = metadata$filename
AND b.partition_key = s.partition_key
WHERE
b.partition_key = s.partition_key
AND DATA :error IS NULL
AND DATA IS NOT NULL

View File

@ -0,0 +1,40 @@
{{ config (
materialized = 'view',
tags = ['bronze','core','streamline_v1','phase_1']
) }}
WITH meta AS (
SELECT
job_created_time AS _inserted_timestamp,
file_name,
CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 4), '_', 1) AS INTEGER) AS partition_key
FROM
TABLE(
information_schema.external_table_file_registration_history(
start_time => DATEADD('day', -3, CURRENT_TIMESTAMP()),
table_name => '{{ source( "bronze_streamline", "traces_by_hash") }}'
)
) A
)
SELECT
s.*,
b.file_name,
b._inserted_timestamp,
COALESCE(
s.value :"BLOCK_NUMBER" :: STRING,
s.metadata :request :"data" :id :: STRING,
PARSE_JSON(
s.metadata :request :"data"
) :id :: STRING
) :: INT AS block_number,
s.value: "TX_HASH" :: STRING AS tx_hash
FROM
{{ source( "bronze_streamline", "traces_by_hash") }}
s
JOIN meta b
ON b.file_name = metadata$filename
AND b.partition_key = s.partition_key
WHERE
b.partition_key = s.partition_key
AND DATA :error IS NULL
AND DATA IS NOT NULL

View File

@ -0,0 +1,28 @@
{{ config (
materialized = 'view',
tags = ['bronze','core','streamline_v1','phase_1']
) }}
SELECT
partition_key,
block_number,
tx_hash,
VALUE,
DATA,
metadata,
file_name,
_inserted_timestamp
FROM
{{ ref('bronze__traces_fr_v2') }}
UNION ALL
SELECT
_partition_by_block_id AS partition_key,
block_number,
tx_hash,
VALUE,
DATA,
metadata,
file_name,
_inserted_timestamp
FROM
{{ ref('bronze__traces_fr_v1') }}

View File

@ -0,0 +1,40 @@
{{ config (
materialized = 'view',
tags = ['bronze','core','streamline_v1','phase_1']
) }}
WITH meta AS (
SELECT
registered_on AS _inserted_timestamp,
file_name,
CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 4), '_', 1) AS INTEGER ) AS partition_key
FROM
TABLE(
information_schema.external_table_files(
table_name => '{{ source( "bronze_streamline", "traces_by_hash") }}'
)
) A
)
SELECT
s.*,
b.file_name,
b._inserted_timestamp,
COALESCE(
s.value :"BLOCK_NUMBER" :: STRING,
s.value :"block_number" :: STRING,
s.metadata :request :"data" :id :: STRING,
PARSE_JSON(
s.metadata :request :"data"
) :id :: STRING
) :: INT AS block_number,
s.value: "TX_HASH" :: STRING AS tx_hash
FROM
{{ source( "bronze_streamline", "traces_by_hash") }}
s
JOIN meta b
ON b.file_name = metadata$filename
AND b.partition_key = s.partition_key
WHERE
b.partition_key = s.partition_key
AND DATA :error IS NULL
AND DATA IS NOT NULL

View File

@ -4,7 +4,7 @@
incremental_strategy = 'delete+insert',
unique_key = "block_number",
cluster_by = "block_timestamp::date",
tags = ['core']
tags = ['core','streamline_core_evm_realtime']
) }}
SELECT
@ -49,7 +49,7 @@ SELECT
DATA :result :logsBloom :: STRING AS logs_bloom,
DATA :result :stateRoot :: STRING AS state_root,
DATA :result :transactionsRoot :: STRING AS transactions_root,
_partition_by_block_id,
partition_key as _partition_by_block_id,
_inserted_timestamp,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp,

View File

@ -4,7 +4,7 @@
incremental_strategy = 'delete+insert',
unique_key = "tx_id",
cluster_by = "block_timestamp::date, _inserted_timestamp::date",
tags = ['core']
tags = ['core','streamline_core_evm_realtime']
) }}
WITH base AS (

View File

@ -3,11 +3,16 @@
materialized = "incremental",
unique_key = "id",
cluster_by = "ROUND(block_number, -3)",
post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION on equality(id)"
post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION on equality(id)",
tags = ['streamline_core_evm_realtime']
) }}
SELECT
id,
MD5(
CAST(
COALESCE(CAST(block_number AS text), '' :: STRING) AS text
)
) AS id,
block_number,
_inserted_timestamp,
DATA :result :transactions AS transactions

View File

@ -3,11 +3,16 @@
materialized = "incremental",
unique_key = "id",
cluster_by = "ROUND(block_number, -3)",
post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION on equality(id)"
post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION on equality(id)",
tags = ['streamline_core_evm_realtime_step_2']
) }}
SELECT
id,
MD5(
CAST(
COALESCE(CAST(CONCAT(block_number, '_-_', COALESCE(tx_hash, '')) AS text), '' :: STRING) AS text
)
) AS id,
block_number,
tx_hash,
_inserted_timestamp

View File

@ -3,13 +3,17 @@
materialized = "incremental",
unique_key = "id",
cluster_by = "ROUND(block_number, -3)",
post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION on equality(id)"
post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION on equality(id)",
tags = ['streamline_core_evm_realtime']
) }}
SELECT
id,
MD5(
CAST(
COALESCE(CAST(block_number AS text), '' :: STRING) AS text
)
) AS id,
block_number,
tx_hash,
_inserted_timestamp
FROM

View File

@ -2,11 +2,17 @@
{{ config (
materialized = "incremental",
unique_key = "id",
post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION on equality(id)"
post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION on equality(id)",
tags = ['streamline_core_evm_realtime_step_2']
) }}
SELECT
id,
MD5(
CAST(
COALESCE(CAST(CONCAT(block_number, '_-_', COALESCE(tx_hash, '')) AS text), '' :: STRING) AS text
)
) AS id,
block_number,
tx_hash,
_inserted_timestamp
FROM

View File

@ -0,0 +1,48 @@
{{ config (
materialized = "view",
post_hook = fsc_utils.if_data_call_function_v2(
func = 'streamline.udf_bulk_rest_api_v2',
target = "{{this.schema}}.{{this.identifier}}",
params ={ "external_table" :'blocks_transactions',
"sql_limit" :"120000",
"producer_batch_size" :"12000",
"worker_batch_size" :"4000",
"sql_source" :'{{this.identifier}}',
"exploded_key": tojson(['result', 'result.transactions']) }
),
tags = ['streamline_core_evm_history']
) }}
with blocks as (
select
block_number
from {{ ref('streamline__blocks') }}
except
select
block_number
from {{ ref('streamline__complete_blocks') }}
inner join {{ ref('streamline__complete_transactions') }} using (block_number)
)
SELECT
block_number,
ROUND(block_number, -3) AS partition_key,
live.udf_api(
'POST',
'{URL}',
OBJECT_CONSTRUCT(
'Content-Type', 'application/json',
'fsc-quantum-state', 'streamline'
),
OBJECT_CONSTRUCT(
'id', block_number,
'jsonrpc', '2.0',
'method', 'eth_getBlockByNumber',
'params', ARRAY_CONSTRUCT(utils.udf_int_to_hex(block_number), TRUE)
),
'Vault/prod/evm/aurora/mainnet'
) AS request
from blocks
order by block_number desc
limit 120000

View File

@ -0,0 +1,53 @@
{{ config (
materialized = "view",
post_hook = fsc_utils.if_data_call_function_v2(
func = 'streamline.udf_bulk_rest_api_v2',
target = "{{this.schema}}.{{this.identifier}}",
params ={ "external_table" :'receipts_by_hash',
"sql_limit" :"120000",
"producer_batch_size" :"12000",
"worker_batch_size" :"4000",
"sql_source" :'{{this.identifier}}' }
),
tags = ['streamline_core_evm_history']
) }}
with txs as (
select
block_number,
tx_hash
from {{ ref('silver__transactions') }}
except
select
block_number,
tx_hash
from {{ ref('streamline__complete_tx_receipts') }}
)
SELECT
block_number,
tx_hash,
ROUND(
block_number,
-3
) AS partition_key,
live.udf_api(
'POST',
'{URL}',
OBJECT_CONSTRUCT(
'Content-Type', 'application/json',
'fsc-quantum-state', 'streamline'
),
OBJECT_CONSTRUCT(
'id', block_number,
'jsonrpc', '2.0',
'method', 'eth_getTransactionReceipt',
'params', ARRAY_CONSTRUCT(tx_hash)
),
'Vault/prod/evm/aurora/mainnet'
) AS request
from txs
order by block_number desc
limit 120000

View File

@ -0,0 +1,52 @@
{{ config (
materialized = "view",
post_hook = fsc_utils.if_data_call_function_v2(
func = 'streamline.udf_bulk_rest_api_v2',
target = "{{this.schema}}.{{this.identifier}}",
params ={
"external_table" :'traces_by_hash',
"sql_limit" :"120000",
"producer_batch_size" :"12000",
"worker_batch_size" :"4000",
"sql_source" :'{{this.identifier}}'
}
),
tags = ['streamline_core_evm_history']
) }}
with txs as (
select
block_number,
tx_hash
from {{ ref('silver__transactions') }}
except
select
block_number,
tx_hash
from {{ ref('streamline__complete_traces') }}
)
SELECT
block_number,
tx_hash,
ROUND(block_number, -3) AS partition_key,
live.udf_api(
'POST',
'{URL}',
OBJECT_CONSTRUCT(
'Content-Type', 'application/json',
'fsc-quantum-state', 'streamline'
),
OBJECT_CONSTRUCT(
'id', block_number,
'jsonrpc', '2.0',
'method', 'debug_traceTransaction',
'params', ARRAY_CONSTRUCT(tx_hash, OBJECT_CONSTRUCT('tracer', 'callTracer', 'timeout', '120s'))
),
'Vault/prod/evm/aurora/mainnet'
) AS request
from txs
order by block_number desc
limit 120000

View File

@ -1,40 +0,0 @@
{{ config (
materialized = "view",
post_hook = if_data_call_function(
func = "{{this.schema}}.udf_json_rpc(object_construct('sql_source', '{{this.identifier}}', 'external_table', 'traces', 'producer_batch_size',20000, 'producer_limit_size',5000000, 'worker_batch_size',200))",
target = "{{this.schema}}.{{this.identifier}}"
)
) }}
WITH tbl AS (
SELECT
block_number,
tx_hash
FROM
{{ ref("silver__transactions") }}
WHERE
block_number IS NOT NULL
AND tx_hash IS NOT NULL
EXCEPT
SELECT
block_number,
tx_hash
FROM
{{ ref("streamline__complete_traces") }}
WHERE
block_number IS NOT NULL
AND tx_hash IS NOT NULL
)
SELECT
block_number,
'debug_traceTransaction' AS method,
CONCAT(
tx_hash,
'_-_',
'{"tracer": "callTracer","timeout": "30s"}'
) AS params
FROM
tbl
ORDER BY
block_number ASC

View File

@ -4,17 +4,29 @@
func = 'streamline.udf_bulk_rest_api_v2',
target = "{{this.schema}}.{{this.identifier}}",
params ={ "external_table" :'blocks_transactions',
"sql_limit" :"30000",
"producer_batch_size" :"30000",
"worker_batch_size" :"10000",
"sql_limit" :"12000",
"producer_batch_size" :"12000",
"worker_batch_size" :"4000",
"sql_source" :'{{this.identifier}}',
"exploded_key": tojson(['result', 'result.transactions']) }
),
tags = ['streamline_core_evm_realtime']
) }}
with blocks as (
select
block_number
from {{ ref('streamline__blocks') }}
where block_number >= (select block_number from {{ ref('_block_lookback') }})
except
select
block_number
from {{ ref('streamline__complete_blocks') }}
inner join {{ ref('streamline__complete_transactions') }} using (block_number)
where block_number >= (select block_number from {{ ref('_block_lookback') }})
)
SELECT
150815373 as block_number,
block_number,
ROUND(block_number, -3) AS partition_key,
live.udf_api(
'POST',
@ -30,4 +42,9 @@ SELECT
'params', ARRAY_CONSTRUCT(utils.udf_int_to_hex(block_number), TRUE)
),
'Vault/prod/evm/aurora/mainnet'
) AS request
) AS request
from blocks
order by block_number asc
limit 12000

View File

@ -4,17 +4,31 @@
func = 'streamline.udf_bulk_rest_api_v2',
target = "{{this.schema}}.{{this.identifier}}",
params ={ "external_table" :'receipts_by_hash',
"sql_limit" :"30000",
"producer_batch_size" :"30000",
"worker_batch_size" :"10000",
"sql_limit" :"12000",
"producer_batch_size" :"12000",
"worker_batch_size" :"4000",
"sql_source" :'{{this.identifier}}' }
),
tags = ['streamline','core','realtime','phase_1']
tags = ['streamline_core_evm_realtime_step_2']
) }}
with txs as (
select
block_number,
tx_hash
from {{ ref('silver__transactions') }}
where block_number >= (select block_number from {{ ref('_block_lookback') }})
except
select
block_number,
tx_hash
from {{ ref('streamline__complete_tx_receipts') }}
where block_number >= (select block_number from {{ ref('_block_lookback') }})
)
SELECT
150949168 as block_number,
'0x967586085f70584ab5e5886ba9c7d1b2f227f554ed594a491978e4a2110bfdd7' as tx_hash,
block_number,
tx_hash,
ROUND(
block_number,
-3
@ -33,4 +47,9 @@ SELECT
'params', ARRAY_CONSTRUCT(tx_hash)
),
'Vault/prod/evm/aurora/mainnet'
) AS request
) AS request
from txs
order by block_number asc
limit 12000

View File

@ -5,18 +5,32 @@
target = "{{this.schema}}.{{this.identifier}}",
params ={
"external_table" :'traces_by_hash',
"sql_limit" :"30000",
"producer_batch_size" :"30000",
"worker_batch_size" :"10000",
"sql_limit" :"12000",
"producer_batch_size" :"12000",
"worker_batch_size" :"4000",
"sql_source" :'{{this.identifier}}'
}
),
tags = ['streamline_core_evm_realtime']
tags = ['streamline_core_evm_realtime_step_2']
) }}
with txs as (
select
block_number,
tx_hash
from {{ ref('silver__transactions') }}
where block_number >= (select block_number from {{ ref('_block_lookback') }})
except
select
block_number,
tx_hash
from {{ ref('streamline__complete_traces') }}
where block_number >= (select block_number from {{ ref('_block_lookback') }})
)
SELECT
150949168 as block_number,
'0x967586085f70584ab5e5886ba9c7d1b2f227f554ed594a491978e4a2110bfdd7' as tx_hash,
block_number,
tx_hash,
ROUND(block_number, -3) AS partition_key,
live.udf_api(
'POST',
@ -32,4 +46,9 @@ SELECT
'params', ARRAY_CONSTRUCT(tx_hash, OBJECT_CONSTRUCT('tracer', 'callTracer', 'timeout', '120s'))
),
'Vault/prod/evm/aurora/mainnet'
) AS request
) AS request
from txs
order by block_number asc
limit 12000

View File

@ -1,6 +1,6 @@
{{ config (
materialized = "view",
tags = ['streamline','core','chainhead','phase_1']
tags = ['streamline_core_evm_realtime']
) }}
SELECT

View File

@ -1,6 +1,6 @@
{{ config (
materialized = 'table',
tags = ['streamline','core','chainhead','phase_1']
tags = ['streamline_core_evm_realtime']
) }}
SELECT