AN-5797/sl-upgrade-eth (#1030)

* sl2 upgrade models

* updates for blocks txns and silver

* external table vars and blocks structure

* integration test

* remove
This commit is contained in:
drethereum 2025-03-17 10:16:53 -06:00 committed by GitHub
parent 84ff78815a
commit 27f1bdc15d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
133 changed files with 3441 additions and 717 deletions

View File

@ -43,4 +43,8 @@ jobs:
dbt deps
- name: Run DBT Jobs
run: |
dbt run --vars '{"STREAMLINE_INVOKE_STREAMS":True}' -m "ethereum_models,tag:streamline_core_realtime" "ethereum_models,tag:streamline_core_complete"
dbt run --vars '{"STREAMLINE_INVOKE_STREAMS":True}' -m "ethereum_models,tag:streamline_core_complete" "ethereum_models,tag:streamline_core_realtime" "ethereum_models,tag:streamline_core_complete_receipts" "ethereum_models,tag:streamline_core_realtime_receipts" "ethereum_models,tag:streamline_core_complete_confirm_blocks" "ethereum_models,tag:streamline_core_realtime_confirm_blocks"
- name: Run Chainhead Tests
run: |
dbt test -m "ethereum_models,tag:chainhead"

View File

@ -29,8 +29,7 @@ on:
description: DBT Run Command
required: true
options:
- dbt run --vars '{"STREAMLINE_INVOKE_STREAMS":True}' -m "ethereum_models,tag:streamline_core_history" "ethereum_models,tag:streamline_core_complete"
- dbt run --threads 8 --vars '{"STREAMLINE_INVOKE_STREAMS":True,"WAIT":120,"row_limit":2000000}' -m "ethereum_models,tag:streamline_decoded_logs_history" "ethereum_models,tag:streamline_decoded_logs_complete"
- dbt run --vars '{"STREAMLINE_INVOKE_STREAMS":True}' -m "ethereum_models,tag:streamline_core_complete" "ethereum_models,tag:streamline_core_history" "ethereum_models,tag:streamline_core_complete_receipts" "ethereum_models,tag:streamline_core_history_receipts" "ethereum_models,tag:streamline_core_complete_confirm_blocks" "ethereum_models,tag:streamline_core_history_confirm_blocks"
- dbt run --vars '{"STREAMLINE_INVOKE_STREAMS":True}' -m "ethereum_models,tag:streamline_beacon_history" "ethereum_models,tag:streamline_beacon_complete"
- dbt run --vars '{"STREAMLINE_INVOKE_STREAMS":True}' -m "ethereum_models,tag:streamline_balances_history" "ethereum_models,tag:streamline_balances_complete"
- dbt run --vars '{"STREAMLINE_INVOKE_STREAMS":True}' -m "ethereum_models,tag:streamline_abis_history" "ethereum_models,tag:streamline_abis_complete"
@ -50,8 +49,6 @@ env:
concurrency:
group: ${{ github.workflow }}
jobs:
run_dbt_jobs:
runs-on: ubuntu-latest

View File

@ -99,3 +99,66 @@ vars:
- DBT_CLOUD_ETHEREUM
#### STREAMLINE 2.0 END ####
#### FSC_EVM BEGIN ####
# Visit https://github.com/FlipsideCrypto/fsc-evm/wiki for more information on required and optional variables
### GLOBAL VARIABLES BEGIN ###
## REQUIRED
GLOBAL_PROD_DB_NAME: 'ethereum'
GLOBAL_NODE_SECRET_PATH: 'Vault/prod/ethereum/quicknode/ethereum_mainnet'
GLOBAL_BLOCKS_PER_HOUR: 300
GLOBAL_USES_STREAMLINE_V1: True
### GLOBAL VARIABLES END ###
### MAIN_PACKAGE VARIABLES BEGIN ###
### CORE ###
## REQUIRED
BLOCKS_TRANSACTIONS_REALTIME_EXTERNAL_TABLE: 'blocks_v2'
BLOCKS_TRANSACTIONS_HISTORY_EXTERNAL_TABLE: 'blocks_v2'
TRACES_REALTIME_EXTERNAL_TABLE: 'traces_v2'
TRACES_HISTORY_EXTERNAL_TABLE: 'traces_v2'
RECEIPTS_REALTIME_EXTERNAL_TABLE: 'receipts_v2'
RECEIPTS_HISTORY_EXTERNAL_TABLE: 'receipts_v2'
CONFIRM_BLOCKS_REALTIME_EXTERNAL_TABLE: 'confirm_blocks_v2'
CONFIRM_BLOCKS_HISTORY_EXTERNAL_TABLE: 'confirm_blocks_v2'
## OPTIONAL
# GOLD_FULL_REFRESH: True
# SILVER_FULL_REFRESH: True
# BRONZE_FULL_REFRESH: True
# BLOCKS_COMPLETE_FULL_REFRESH: True
# CONFIRM_BLOCKS_COMPLETE_FULL_REFRESH: True
# TRACES_COMPLETE_FULL_REFRESH: True
# RECEIPTS_COMPLETE_FULL_REFRESH: True
# TRANSACTIONS_COMPLETE_FULL_REFRESH: True
# BLOCKS_TRANSACTIONS_REALTIME_TESTING_LIMIT: 3
# BLOCKS_TRANSACTIONS_HISTORY_TESTING_LIMIT: 3
# TRACES_REALTIME_TESTING_LIMIT: 3
# TRACES_HISTORY_TESTING_LIMIT: 3
# RECEIPTS_REALTIME_TESTING_LIMIT: 3
# RECEIPTS_HISTORY_TESTING_LIMIT: 3
# CONFIRM_BLOCKS_REALTIME_TESTING_LIMIT: 3
# CONFIRM_BLOCKS_HISTORY_TESTING_LIMIT: 3
# ### MAIN_PACKAGE VARIABLES END ###
# ### DECODER_PACKAGE VARIABLES BEGIN ###
# ## REQUIRED
# ## OPTIONAL
# DECODED_LOGS_COMPLETE_FULL_REFRESH: True
# DECODED_LOGS_REALTIME_TESTING_LIMIT: 3
# DECODED_LOGS_HISTORY_SQL_LIMIT: 1 #limit per monthly range
### DECODER_PACKAGE VARIABLES END ###
#### FSC_EVM END ####

View File

@ -1,28 +1,26 @@
{% macro decoded_logs_history(backfill_mode=false) %}
{%- set params = {
"sql_limit": var("DECODED_LOGS_HISTORY_SQL_LIMIT", 7500000),
"sql_limit": var("DECODED_LOGS_HISTORY_SQL_LIMIT", 8000000),
"producer_batch_size": var("DECODED_LOGS_HISTORY_PRODUCER_BATCH_SIZE", 400000),
"worker_batch_size": var("DECODED_LOGS_HISTORY_WORKER_BATCH_SIZE", 100000)
} -%}
{% set wait_time = var("DECODED_LOGS_HISTORY_WAIT_TIME", 60) %}
{% set find_months_query %}
SELECT
DISTINCT date_trunc('month', block_timestamp)::date as month
FROM {{ ref('core__fact_blocks') }}
ORDER BY month ASC
{% endset %}
{% set results = run_query(find_months_query) %}
{% if execute %}
{% set months = results.columns[0].values() %}
{% for month in months %}
{% set view_name = 'decoded_logs_history_' ~ month.strftime('%Y_%m') %}
{% set create_view_query %}
create or replace view streamline.{{view_name}} as (
WITH target_blocks AS (
@ -45,7 +43,7 @@
),
existing_logs_to_exclude AS (
SELECT _log_id
FROM {{ ref('streamline__complete_decoded_logs') }} l
FROM {{ ref('streamline__decoded_logs_complete') }} l
INNER JOIN target_blocks b using (block_number)
),
candidate_logs AS (
@ -83,11 +81,9 @@
LIMIT {{ params.sql_limit }}
)
{% endset %}
{# Create the view #}
{% do run_query(create_view_query) %}
{{ log("Created view for month " ~ month.strftime('%Y-%m'), info=True) }}
{% if var("STREAMLINE_INVOKE_STREAMS", false) %}
{# Check if rows exist first #}
{% set check_rows_query %}
@ -98,7 +94,7 @@
{% set has_rows = results.columns[0].values()[0] %}
{% if has_rows %}
{# Invoke streamline since rows exist to decode #}
{# Invoke streamline, if rows exist to decode #}
{% set decode_query %}
SELECT
streamline.udf_bulk_decode_logs_v2(
@ -114,7 +110,6 @@
{% do run_query(decode_query) %}
{{ log("Triggered decoding for month " ~ month.strftime('%Y-%m'), info=True) }}
{# Call wait since we actually did some decoding #}
{% do run_query("call system$wait(" ~ wait_time ~ ")") %}
{{ log("Completed wait after decoding for month " ~ month.strftime('%Y-%m'), info=True) }}
@ -122,7 +117,7 @@
{{ log("No rows to decode for month " ~ month.strftime('%Y-%m'), info=True) }}
{% endif %}
{% endif %}
{% endfor %}
{% endif %}

View File

@ -15,7 +15,7 @@ WHERE
SELECT
github_actions.workflow_dispatches(
'FlipsideCrypto',
'ethereum-models',
'{{ blockchain }}' || '-models',
'dbt_run_streamline_decoded_logs_history.yml',
NULL
) {% endset %}
@ -23,7 +23,7 @@ WHERE
SELECT
github_actions.workflow_dispatches(
'FlipsideCrypto',
'ethereum-models',
'{{ blockchain }}' || '-models',
'dbt_run_streamline_decoded_traces_history.yml',
NULL
) {% endset %}

View File

@ -0,0 +1,49 @@
{% macro v0_streamline_decoded_complete(
model
) %}
SELECT
block_number,
file_name,
id AS {% if model == 'decoded_logs' %}
_log_id {% elif model == 'decoded_traces' %}
_call_id
{% endif %},
{{ dbt_utils.generate_surrogate_key(
['id']
) }} AS {% if model == 'decoded_logs' %}
complete_decoded_logs_id {% elif model == 'decoded_traces' %}
complete_decoded_traces_id
{% endif %},
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp,
_inserted_timestamp,
'{{ invocation_id }}' AS _invocation_id
FROM
{% if is_incremental() %}
{% if model == 'decoded_logs' %}
{{ ref('bronze__streamline_decoded_logs') }}
{% elif model == 'decoded_traces' %}
{{ ref('bronze__streamline_decoded_traces') }}
{% endif %}
WHERE
TO_TIMESTAMP_NTZ(_inserted_timestamp) >= (
SELECT
MAX(_inserted_timestamp)
FROM
{{ this }}
)
{% else %}
{% if model == 'decoded_logs' %}
{{ ref('bronze__streamline_fr_decoded_logs') }}
{% elif model == 'decoded_traces' %}
{{ ref('bronze__streamline_fr_decoded_traces') }}
{% endif %}
{% endif %}
qualify(ROW_NUMBER() over (PARTITION BY id
ORDER BY
_inserted_timestamp DESC)) = 1
{% endmacro %}

View File

@ -0,0 +1,210 @@
{% macro v0_streamline_external_table_query(
model,
partition_function,
balances = false,
block_number = true
) %}
WITH meta AS (
SELECT
job_created_time AS _inserted_timestamp,
file_name,
{{ partition_function }} AS partition_key
FROM
TABLE(
information_schema.external_table_file_registration_history(
start_time => DATEADD('day', -3, CURRENT_TIMESTAMP()),
table_name => '{{ source( "bronze_streamline", model) }}')
) A
)
SELECT
s.*,
b.file_name,
b._inserted_timestamp
{% if balances %},
r.block_timestamp :: TIMESTAMP AS block_timestamp
{% endif %}
{% if block_number %},
COALESCE(
s.value :"BLOCK_NUMBER" :: INT,
s.metadata :request :"data" :id :: INT,
PARSE_JSON(
s.metadata :request :"data"
) :id :: INT
) AS block_number
{% endif %}
FROM
{{ source(
"bronze_streamline",
model
) }}
s
JOIN meta b
ON b.file_name = metadata$filename
AND b.partition_key = s.partition_key
{% if balances %}
JOIN {{ ref('_block_ranges') }}
r
ON r.block_number = COALESCE(
s.value :"BLOCK_NUMBER" :: INT,
s.value :"block_number" :: INT
)
{% endif %}
WHERE
b.partition_key = s.partition_key
AND DATA :error IS NULL
AND DATA IS NOT NULL
{% endmacro %}
{% macro v0_streamline_external_table_fr_query(
model,
partition_function,
partition_join_key = "partition_key",
balances = false,
block_number = true
) %}
WITH meta AS (
SELECT
registered_on AS _inserted_timestamp,
file_name,
{{ partition_function }} AS partition_key
FROM
TABLE(
information_schema.external_table_files(
table_name => '{{ source( "bronze_streamline", model) }}'
)
) A
)
SELECT
s.*,
b.file_name,
b._inserted_timestamp
{% if balances %},
r.block_timestamp :: TIMESTAMP AS block_timestamp
{% endif %}
{% if block_number %},
COALESCE(
s.value :"BLOCK_NUMBER" :: INT,
s.value :"block_number" :: INT,
s.metadata :request :"data" :id :: INT,
PARSE_JSON(
s.metadata :request :"data"
) :id :: INT
) AS block_number
{% endif %}
FROM
{{ source(
"bronze_streamline",
model
) }}
s
JOIN meta b
ON b.file_name = metadata$filename
AND b.partition_key = s.{{ partition_join_key }}
{% if balances %}
JOIN {{ ref('_block_ranges') }}
r
ON r.block_number = COALESCE(
s.value :"BLOCK_NUMBER" :: INT,
s.value :"block_number" :: INT
)
{% endif %}
WHERE
b.partition_key = s.{{ partition_join_key }}
AND DATA :error IS NULL
AND DATA IS NOT NULL
{% endmacro %}
{% macro v0_streamline_external_table_query_decoder(
model
) %}
WITH meta AS (
SELECT
job_created_time AS _inserted_timestamp,
file_name,
CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 6), '_', 1) AS INTEGER) AS _partition_by_block_number,
TO_DATE(
concat_ws('-', SPLIT_PART(file_name, '/', 3), SPLIT_PART(file_name, '/', 4), SPLIT_PART(file_name, '/', 5))
) AS _partition_by_created_date
FROM
TABLE(
information_schema.external_table_file_registration_history(
start_time => DATEADD('day', -3, CURRENT_TIMESTAMP()),
table_name => '{{ source( "bronze_streamline", model) }}')
) A
)
SELECT
block_number,
id :: STRING AS id,
DATA,
metadata,
b.file_name,
_inserted_timestamp,
s._partition_by_block_number AS _partition_by_block_number,
s._partition_by_created_date AS _partition_by_created_date
FROM
{{ source(
"bronze_streamline",
model
) }}
s
JOIN meta b
ON b.file_name = metadata$filename
AND b._partition_by_block_number = s._partition_by_block_number
AND b._partition_by_created_date = s._partition_by_created_date
WHERE
b._partition_by_block_number = s._partition_by_block_number
AND b._partition_by_created_date = s._partition_by_created_date
AND s._partition_by_created_date >= DATEADD('day', -2, CURRENT_TIMESTAMP())
AND DATA :error IS NULL
AND DATA IS NOT NULL
{% endmacro %}
{% macro v0_streamline_external_table_fr_query_decoder(
model
) %}
WITH meta AS (
SELECT
registered_on AS _inserted_timestamp,
file_name,
CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 6), '_', 1) AS INTEGER) AS _partition_by_block_number,
TO_DATE(
concat_ws('-', SPLIT_PART(file_name, '/', 3), SPLIT_PART(file_name, '/', 4), SPLIT_PART(file_name, '/', 5))
) AS _partition_by_created_date
FROM
TABLE(
information_schema.external_table_files(
table_name => '{{ source( "bronze_streamline", model) }}'
)
) A
)
SELECT
block_number,
id :: STRING AS id,
DATA,
metadata,
b.file_name,
_inserted_timestamp,
s._partition_by_block_number AS _partition_by_block_number,
s._partition_by_created_date AS _partition_by_created_date
FROM
{{ source(
"bronze_streamline",
model
) }}
s
JOIN meta b
ON b.file_name = metadata$filename
AND b._partition_by_block_number = s._partition_by_block_number
AND b._partition_by_created_date = s._partition_by_created_date
WHERE
b._partition_by_block_number = s._partition_by_block_number
AND b._partition_by_created_date = s._partition_by_created_date
AND DATA :error IS NULL
AND DATA IS NOT NULL
{% endmacro %}

View File

@ -0,0 +1,101 @@
{% macro streamline_external_table_query_decoder(
source_name,
source_version
) %}
{% if source_version != '' %}
{% set source_version = '_' ~ source_version.lower() %}
{% endif %}
WITH meta AS (
SELECT
job_created_time AS _inserted_timestamp,
file_name,
CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 6), '_', 1) AS INTEGER) AS _partition_by_block_number,
TO_DATE(
concat_ws('-', SPLIT_PART(file_name, '/', 3), SPLIT_PART(file_name, '/', 4), SPLIT_PART(file_name, '/', 5))
) AS _partition_by_created_date
FROM
TABLE(
information_schema.external_table_file_registration_history(
start_time => DATEADD('day', -3, CURRENT_TIMESTAMP()),
table_name => '{{ source( "bronze_streamline", source_name ~ source_version) }}')
) A
)
SELECT
block_number,
id :: STRING AS id,
DATA,
metadata,
b.file_name,
_inserted_timestamp,
s._partition_by_block_number AS _partition_by_block_number,
s._partition_by_created_date AS _partition_by_created_date
FROM
{{ source(
"bronze_streamline",
source_name ~ source_version
) }}
s
JOIN meta b
ON b.file_name = metadata$filename
AND b._partition_by_block_number = s._partition_by_block_number
AND b._partition_by_created_date = s._partition_by_created_date
WHERE
b._partition_by_block_number = s._partition_by_block_number
AND b._partition_by_created_date = s._partition_by_created_date
AND s._partition_by_created_date >= DATEADD('day', -2, CURRENT_TIMESTAMP())
AND DATA :error IS NULL
AND DATA IS NOT NULL
{% endmacro %}
{% macro streamline_external_table_query_decoder_fr(
source_name,
source_version
) %}
{% if source_version != '' %}
{% set source_version = '_' ~ source_version.lower() %}
{% endif %}
WITH meta AS (
SELECT
registered_on AS _inserted_timestamp,
file_name,
CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 6), '_', 1) AS INTEGER) AS _partition_by_block_number,
TO_DATE(
concat_ws('-', SPLIT_PART(file_name, '/', 3), SPLIT_PART(file_name, '/', 4), SPLIT_PART(file_name, '/', 5))
) AS _partition_by_created_date
FROM
TABLE(
information_schema.external_table_files(
table_name => '{{ source( "bronze_streamline", source_name ~ source_version) }}'
)
) A
)
SELECT
block_number,
id :: STRING AS id,
DATA,
metadata,
b.file_name,
_inserted_timestamp,
s._partition_by_block_number AS _partition_by_block_number,
s._partition_by_created_date AS _partition_by_created_date
FROM
{{ source(
"bronze_streamline",
source_name ~ source_version
) }}
s
JOIN meta b
ON b.file_name = metadata$filename
AND b._partition_by_block_number = s._partition_by_block_number
AND b._partition_by_created_date = s._partition_by_created_date
WHERE
b._partition_by_block_number = s._partition_by_block_number
AND b._partition_by_created_date = s._partition_by_created_date
AND DATA :error IS NULL
AND DATA IS NOT NULL
{% endmacro %}

View File

@ -0,0 +1,141 @@
{% macro streamline_external_table_query(
source_name,
source_version,
partition_function,
balances,
block_number,
uses_receipts_by_hash
) %}
{% if source_version != '' %}
{% set source_version = '_' ~ source_version.lower() %}
{% endif %}
WITH meta AS (
SELECT
job_created_time AS _inserted_timestamp,
file_name,
{{ partition_function }} AS partition_key
FROM
TABLE(
information_schema.external_table_file_registration_history(
start_time => DATEADD('day', -3, CURRENT_TIMESTAMP()),
table_name => '{{ source( "bronze_streamline", source_name ~ source_version) }}')
) A
)
SELECT
s.*,
b.file_name,
b._inserted_timestamp
{% if balances %},
r.block_timestamp :: TIMESTAMP AS block_timestamp
{% endif %}
{% if block_number %},
COALESCE(
s.value :"BLOCK_NUMBER" :: STRING,
s.metadata :request :"data" :id :: STRING,
PARSE_JSON(
s.metadata :request :"data"
) :id :: STRING
) :: INT AS block_number
{% endif %}
{% if uses_receipts_by_hash %},
s.value :"TX_HASH" :: STRING AS tx_hash
{% endif %}
FROM
{{ source(
"bronze_streamline",
source_name ~ source_version
) }}
s
JOIN meta b
ON b.file_name = metadata$filename
AND b.partition_key = s.partition_key
{% if balances %}
JOIN {{ ref('_block_ranges') }}
r
ON r.block_number = COALESCE(
s.value :"BLOCK_NUMBER" :: INT,
s.value :"block_number" :: INT
)
{% endif %}
WHERE
b.partition_key = s.partition_key
AND DATA :error :code IS NULL
AND DATA IS NOT NULL
{% endmacro %}
{% macro streamline_external_table_query_fr(
source_name,
source_version,
partition_function,
partition_join_key,
balances,
block_number,
uses_receipts_by_hash
) %}
{% if source_version != '' %}
{% set source_version = '_' ~ source_version.lower() %}
{% endif %}
WITH meta AS (
SELECT
registered_on AS _inserted_timestamp,
file_name,
{{ partition_function }} AS partition_key
FROM
TABLE(
information_schema.external_table_files(
table_name => '{{ source( "bronze_streamline", source_name ~ source_version) }}'
)
) A
)
SELECT
s.*,
b.file_name,
b._inserted_timestamp
{% if balances %},
r.block_timestamp :: TIMESTAMP AS block_timestamp
{% endif %}
{% if block_number %},
COALESCE(
s.value :"BLOCK_NUMBER" :: STRING,
s.value :"block_number" :: STRING,
s.metadata :request :"data" :id :: STRING,
PARSE_JSON(
s.metadata :request :"data"
) :id :: STRING
) :: INT AS block_number
{% endif %}
{% if uses_receipts_by_hash %},
s.value :"TX_HASH" :: STRING AS tx_hash
{% endif %}
FROM
{{ source(
"bronze_streamline",
source_name ~ source_version
) }}
s
JOIN meta b
ON b.file_name = metadata$filename
AND b.partition_key = s.{{ partition_join_key }}
{% if balances %}
JOIN {{ ref('_block_ranges') }}
r
ON r.block_number = COALESCE(
s.value :"BLOCK_NUMBER" :: INT,
s.value :"block_number" :: INT
)
{% endif %}
WHERE
b.partition_key = s.{{ partition_join_key }}
AND DATA :error :code IS NULL
AND DATA IS NOT NULL
{% endmacro %}

View File

@ -0,0 +1,36 @@
{% macro log_bronze_details(source_name, source_version, model_type, partition_function, partition_join_key, block_number, uses_receipts_by_hash) %}
{% if source_version != '' %}
{% set source_version = '_' ~ source_version.lower() %}
{% endif %}
{% if model_type != '' %}
{% set model_type = '_' ~ model_type %}
{% endif %}
{%- if flags.WHICH == 'compile' and execute -%}
{{ log("=== Current Variable Settings ===", info=True) }}
{{ log(source_name ~ model_type ~ '_PARTITION_FUNCTION: ' ~ partition_function, info=True) }}
{{ log(source_name ~ model_type ~ '_PARTITION_JOIN_KEY: ' ~ partition_join_key, info=True) }}
{{ log(source_name ~ model_type ~ '_BLOCK_NUMBER: ' ~ block_number, info=True) }}
{% if uses_receipts_by_hash %}
{{ log("USES_RECEIPTS_BY_HASH: " ~ uses_receipts_by_hash, info=True) }}
{% endif %}
{{ log("", info=True) }}
{{ log("=== Source Details ===", info=True) }}
{{ log("Source: " ~ source('bronze_streamline', source_name.lower() ~ source_version.lower()), info=True) }}
{{ log("", info=True) }}
{% set config_log = '\n' %}
{% set config_log = config_log ~ '\n=== DBT Model Config ===\n'%}
{% set config_log = config_log ~ '\n{{ config (\n' %}
{% set config_log = config_log ~ ' materialized = "' ~ config.get('materialized') ~ '",\n' %}
{% set config_log = config_log ~ ' tags = ' ~ config.get('tags') | tojson ~ '\n' %}
{% set config_log = config_log ~ ') }}\n' %}
{{ log(config_log, info=True) }}
{{ log("", info=True) }}
{%- endif -%}
{% endmacro %}

View File

@ -0,0 +1,29 @@
{% macro log_complete_details(post_hook, full_refresh_type, uses_receipts_by_hash) %}
{%- if flags.WHICH == 'compile' and execute -%}
{% if uses_receipts_by_hash %}
{{ log("=== Current Variable Settings ===", info=True) }}
{{ log("USES_RECEIPTS_BY_HASH: " ~ uses_receipts_by_hash, info=True) }}
{% endif %}
{% set config_log = '\n' %}
{% set config_log = config_log ~ '\n=== DBT Model Config ===\n'%}
{% set config_log = config_log ~ '\n{{ config (\n' %}
{% set config_log = config_log ~ ' materialized = "' ~ config.get('materialized') ~ '",\n' %}
{% set config_log = config_log ~ ' unique_key = "' ~ config.get('unique_key') ~ '",\n' %}
{% set config_log = config_log ~ ' cluster_by = "' ~ config.get('cluster_by') ~ '",\n' %}
{% set config_log = config_log ~ ' merge_update_columns = ' ~ config.get('merge_update_columns') | tojson ~ ',\n' %}
{% set config_log = config_log ~ ' post_hook = "' ~ post_hook ~ '",\n' %}
{% set config_log = config_log ~ ' incremental_predicates = ' ~ config.get('incremental_predicates') | tojson ~ ',\n' %}
{% set config_log = config_log ~ ' full_refresh = ' ~ full_refresh_type ~ ',\n' %}
{% set config_log = config_log ~ ' tags = ' ~ config.get('tags') | tojson ~ '\n' %}
{% set config_log = config_log ~ ') }}\n' %}
{{ log(config_log, info=True) }}
{{ log("", info=True) }}
{%- endif -%}
{% endmacro %}

View File

@ -0,0 +1,36 @@
{% macro log_model_details(vars=false, params=false) %}
{%- if execute -%}
/*
DBT Model Config:
{{ model.config | tojson(indent=2) }}
*/
{% if vars is not false %}
{% if var('LOG_MODEL_DETAILS', false) %}
{{ log( vars | tojson(indent=2), info=True) }}
{% endif %}
/*
Variables:
{{ vars | tojson(indent=2) }}
*/
{% endif %}
{% if params is not false %}
{% if var('LOG_MODEL_DETAILS', false) %}
{{ log( params | tojson(indent=2), info=True) }}
{% endif %}
/*
Parameters:
{{ params | tojson(indent=2) }}
*/
{% endif %}
/*
Raw Code:
{{ model.raw_code }}
*/
{%- endif -%}
{% endmacro %}

View File

@ -0,0 +1,55 @@
{% macro log_streamline_details(model_name, model_type, node_url, model_quantum_state, sql_limit, testing_limit, order_by_clause, new_build, streamline_params, uses_receipts_by_hash, method, method_params, min_block=0) %}
{%- if flags.WHICH == 'compile' and execute -%}
{{ log("=== Current Variable Settings ===", info=True) }}
{{ log("START_UP_BLOCK: " ~ min_block, info=True) }}
{{ log("", info=True) }}
{{ log("=== API Details ===", info=True) }}
{{ log("NODE_URL: " ~ node_url, info=True) }}
{{ log("NODE_SECRET_PATH: " ~ var('GLOBAL_NODE_SECRET_PATH'), info=True) }}
{{ log("", info=True) }}
{{ log("=== Current Variable Settings ===", info=True) }}
{{ log((model_name ~ '_' ~ model_type ~ '_model_quantum_state').upper() ~ ': ' ~ model_quantum_state, info=True) }}
{{ log((model_name ~ '_' ~ model_type ~ '_sql_limit').upper() ~ ': ' ~ sql_limit, info=True) }}
{{ log((model_name ~ '_' ~ model_type ~ '_testing_limit').upper() ~ ': ' ~ testing_limit, info=True) }}
{{ log((model_name ~ '_' ~ model_type ~ '_order_by_clause').upper() ~ ': ' ~ order_by_clause, info=True) }}
{{ log((model_name ~ '_' ~ model_type ~ '_new_build').upper() ~ ': ' ~ new_build, info=True) }}
{{ log('USES_RECEIPTS_BY_HASH' ~ ': ' ~ uses_receipts_by_hash, info=True) }}
{{ log("", info=True) }}
{{ log("=== RPC Details ===", info=True) }}
{{ log(model_name ~ ": {", info=True) }}
{{ log(" method: '" ~ method ~ "',", info=True) }}
{{ log(" method_params: " ~ method_params, info=True) }}
{{ log("}", info=True) }}
{{ log("", info=True) }}
{% set params_str = streamline_params | tojson %}
{% set params_formatted = params_str | replace('{', '{\n ') | replace('}', '\n }') | replace(', ', ',\n ') %}
{# Clean up the method_params formatting #}
{% set params_formatted = params_formatted | replace('"method_params": "', '"method_params": "') | replace('\\n', ' ') | replace('\\u0027', "'") %}
{% set config_log = '\n' %}
{% set config_log = config_log ~ '\n=== DBT Model Config ===\n'%}
{% set config_log = config_log ~ '\n{{ config (\n' %}
{% set config_log = config_log ~ ' materialized = "' ~ config.get('materialized') ~ '",\n' %}
{% set config_log = config_log ~ ' post_hook = fsc_utils.if_data_call_function_v2(\n' %}
{% set config_log = config_log ~ ' func = "streamline.udf_bulk_rest_api_v2",\n' %}
{% set config_log = config_log ~ ' target = "' ~ this.schema ~ '.' ~ this.identifier ~ '",\n' %}
{% set config_log = config_log ~ ' params = ' ~ params_formatted ~ '\n' %}
{% set config_log = config_log ~ ' ),\n' %}
{% set config_log = config_log ~ ' tags = ' ~ config.get('tags') | tojson ~ '\n' %}
{% set config_log = config_log ~ ') }}\n' %}
{{ log(config_log, info=True) }}
{{ log("", info=True) }}
{%- endif -%}
{% endmacro %}

View File

@ -0,0 +1,47 @@
{% macro set_default_variables_streamline(model_name, model_type) %}
{%- set node_url = var('GLOBAL_NODE_URL', '{Service}/{Authentication}') -%}
{%- set node_secret_path = var('GLOBAL_NODE_SECRET_PATH', '') -%}
{%- set model_quantum_state = var((model_name ~ '_' ~ model_type ~ '_quantum_state').upper(), 'streamline') -%}
{%- set testing_limit = var((model_name ~ '_' ~ model_type ~ '_testing_limit').upper(), none) -%}
{%- set new_build = var((model_name ~ '_' ~ model_type ~ '_new_build').upper(), false) -%}
{%- set default_order = 'ORDER BY partition_key DESC, block_number DESC' if model_type.lower() == 'realtime'
else 'ORDER BY partition_key ASC, block_number ASC' -%}
{%- set order_by_clause = var((model_name ~ '_' ~ model_type ~ '_order_by_clause').upper(), default_order) -%}
{%- set uses_receipts_by_hash = var('GLOBAL_USES_RECEIPTS_BY_HASH', false) -%}
{%- set variables = {
'node_url': node_url,
'node_secret_path': node_secret_path,
'model_quantum_state': model_quantum_state,
'testing_limit': testing_limit,
'new_build': new_build,
'order_by_clause': order_by_clause,
'uses_receipts_by_hash': uses_receipts_by_hash
} -%}
{{ return(variables) }}
{% endmacro %}
{% macro set_default_variables_bronze(source_name, model_type) %}
{%- set partition_function = var(source_name ~ model_type ~ '_PARTITION_FUNCTION',
"CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 4), '_', 1) AS INTEGER)")
-%}
{%- set partition_join_key = var(source_name ~ model_type ~ '_PARTITION_JOIN_KEY', 'partition_key') -%}
{%- set block_number = var(source_name ~ model_type ~ '_BLOCK_NUMBER', true) -%}
{%- set balances = var(source_name ~ model_type ~ '_BALANCES', false) -%}
{%- set uses_receipts_by_hash = var('GLOBAL_USES_RECEIPTS_BY_HASH', false) -%}
{%- set variables = {
'partition_function': partition_function,
'partition_join_key': partition_join_key,
'block_number': block_number,
'balances': balances,
'uses_receipts_by_hash': uses_receipts_by_hash
} -%}
{{ return(variables) }}
{% endmacro %}

View File

@ -0,0 +1,60 @@
{% macro set_streamline_parameters(model_name, model_type, multiplier=1) %}
{%- set rpc_config_details = {
"blocks_transactions": {
"method": 'eth_getBlockByNumber',
"method_params": 'ARRAY_CONSTRUCT(utils.udf_int_to_hex(block_number), TRUE)',
"exploded_key": ['result', 'result.transactions']
},
"receipts_by_hash": {
"method": 'eth_getTransactionReceipt',
"method_params": 'ARRAY_CONSTRUCT(tx_hash)'
},
"receipts": {
"method": 'eth_getBlockReceipts',
"method_params": 'ARRAY_CONSTRUCT(utils.udf_int_to_hex(block_number))',
"exploded_key": ['result'],
"lambdas": 2
},
"traces": {
"method": 'debug_traceBlockByNumber',
"method_params": "ARRAY_CONSTRUCT(utils.udf_int_to_hex(block_number), OBJECT_CONSTRUCT('tracer', 'callTracer', 'timeout', '120s'))",
"exploded_key": ['result'],
"lambdas": 2
},
"confirm_blocks": {
"method": 'eth_getBlockByNumber',
"method_params": 'ARRAY_CONSTRUCT(utils.udf_int_to_hex(block_number), FALSE)'
}
} -%}
{%- set rpc_config = rpc_config_details[model_name.lower()] -%}
{%- set params = {
"external_table": var((model_name ~ '_' ~ model_type ~ '_external_table').upper(), model_name.lower()),
"sql_limit": var((model_name ~ '_' ~ model_type ~ '_sql_limit').upper(), 2 * var('GLOBAL_BLOCKS_PER_HOUR',0) * multiplier),
"producer_batch_size": var((model_name ~ '_' ~ model_type ~ '_producer_batch_size').upper(), 2 * var('GLOBAL_BLOCKS_PER_HOUR',0) * multiplier),
"worker_batch_size": var(
(model_name ~ '_' ~ model_type ~ '_worker_batch_size').upper(),
(2 * var('GLOBAL_BLOCKS_PER_HOUR',0) * multiplier) // (rpc_config.get('lambdas', 1))
),
"sql_source": (model_name ~ '_' ~ model_type).lower(),
"method": rpc_config['method'],
"method_params": rpc_config['method_params']
} -%}
{%- if rpc_config.get('exploded_key') is not none -%}
{%- do params.update({"exploded_key": tojson(rpc_config['exploded_key'])}) -%}
{%- endif -%}
{%- if rpc_config.get('lambdas') is not none -%}
{%- do params.update({"lambdas": rpc_config['lambdas']}) -%}
{%- endif -%}
{{ return(params) }}
{% endmacro %}

View File

@ -1,63 +1,3 @@
{% macro decode_logs_history(
start,
stop
) %}
WITH look_back AS (
SELECT
block_number
FROM
{{ ref("_24_hour_lookback") }}
)
SELECT
l.block_number,
l._log_id,
a.abi AS abi,
OBJECT_CONSTRUCT(
'topics',
l.topics,
'data',
l.data,
'address',
l.contract_address
) AS DATA
FROM
{{ ref("silver__logs") }}
l
INNER JOIN {{ ref("silver__complete_event_abis") }}
a
ON a.parent_contract_address = l.contract_address
and a.event_signature = l.topics[0]::string
and l.block_number between a.start_block and a.end_block
WHERE
(
l.block_number BETWEEN {{ start }}
AND {{ stop }}
)
AND l.block_number <= (
SELECT
block_number
FROM
look_back
)
AND _log_id NOT IN (
SELECT
_log_id
FROM
{{ ref("streamline__complete_decode_logs") }}
WHERE
(
block_number BETWEEN {{ start }}
AND {{ stop }}
)
AND block_number <= (
SELECT
block_number
FROM
look_back
)
)
{% endmacro %}
{% macro decode_traces_history(
start,
stop
@ -123,123 +63,4 @@ WHERE
look_back
))
{% endmacro %}
{% macro streamline_external_table_query(
model,
partition_function,
partition_name,
unique_key
) %}
WITH meta AS (
SELECT
last_modified AS _inserted_timestamp,
file_name,
{{ partition_function }} AS {{ partition_name }}
FROM
TABLE(
information_schema.external_table_file_registration_history(
start_time => DATEADD('day', -3, CURRENT_TIMESTAMP()),
table_name => '{{ source( "bronze_streamline", model) }}')
) A
)
SELECT
{{ unique_key }},
DATA,
_inserted_timestamp,
MD5(
CAST(
COALESCE(CAST({{ unique_key }} AS text), '' :: STRING) AS text
)
) AS id,
s.{{ partition_name }},
s.value AS value
FROM
{{ source(
"bronze_streamline",
model
) }}
s
JOIN meta b
ON b.file_name = metadata$filename
AND b.{{ partition_name }} = s.{{ partition_name }}
WHERE
b.{{ partition_name }} = s.{{ partition_name }}
AND (
DATA :error :code IS NULL
OR DATA :error :code NOT IN (
'-32000',
'-32001',
'-32002',
'-32003',
'-32004',
'-32005',
'-32006',
'-32007',
'-32008',
'-32009',
'-32010',
'-32608'
)
)
{% endmacro %}
{% macro streamline_external_table_FR_query(
model,
partition_function,
partition_name,
unique_key
) %}
WITH meta AS (
SELECT
registered_on AS _inserted_timestamp,
file_name,
{{ partition_function }} AS {{ partition_name }}
FROM
TABLE(
information_schema.external_table_files(
table_name => '{{ source( "bronze_streamline", model) }}'
)
) A
)
SELECT
{{ unique_key }},
DATA,
_inserted_timestamp,
MD5(
CAST(
COALESCE(CAST({{ unique_key }} AS text), '' :: STRING) AS text
)
) AS id,
s.{{ partition_name }},
s.value AS value
FROM
{{ source(
"bronze_streamline",
model
) }}
s
JOIN meta b
ON b.file_name = metadata$filename
AND b.{{ partition_name }} = s.{{ partition_name }}
WHERE
b.{{ partition_name }} = s.{{ partition_name }}
AND (
DATA :error :code IS NULL
OR DATA :error :code NOT IN (
'-32000',
'-32001',
'-32002',
'-32003',
'-32004',
'-32005',
'-32006',
'-32007',
'-32008',
'-32009',
'-32010',
'-32608'
)
)
{% endmacro %}
{% endmacro %}

View File

@ -1,4 +1,4 @@
-- depends_on: {{ ref('bronze__streamline_blocks') }}
-- depends_on: {{ ref('bronze__blocks') }}
{{ config(
materialized = 'incremental',
unique_key = "block_number",
@ -12,47 +12,47 @@
SELECT
block_number,
utils.udf_hex_to_int(
DATA :result :baseFeePerGas :: STRING
DATA :baseFeePerGas :: STRING
) :: INT AS base_fee_per_gas,
utils.udf_hex_to_int(
DATA :result :difficulty :: STRING
DATA :difficulty :: STRING
) :: INT AS difficulty,
DATA :result :extraData :: STRING AS extra_data,
DATA :extraData :: STRING AS extra_data,
utils.udf_hex_to_int(
DATA :result :gasLimit :: STRING
DATA :gasLimit :: STRING
) :: INT AS gas_limit,
utils.udf_hex_to_int(
DATA :result :gasUsed :: STRING
DATA :gasUsed :: STRING
) :: INT AS gas_used,
DATA :result :hash :: STRING AS HASH,
DATA :result :logsBloom :: STRING AS logs_bloom,
DATA :result :miner :: STRING AS miner,
DATA :hash :: STRING AS HASH,
DATA :logsBloom :: STRING AS logs_bloom,
DATA :miner :: STRING AS miner,
utils.udf_hex_to_int(
DATA :result :nonce :: STRING
DATA :nonce :: STRING
) :: INT AS nonce,
utils.udf_hex_to_int(
DATA :result :number :: STRING
DATA :number :: STRING
) :: INT AS NUMBER,
DATA :result :parentHash :: STRING AS parent_hash,
DATA :result :receiptsRoot :: STRING AS receipts_root,
DATA :result :sha3Uncles :: STRING AS sha3_uncles,
DATA :parentHash :: STRING AS parent_hash,
DATA :receiptsRoot :: STRING AS receipts_root,
DATA :sha3Uncles :: STRING AS sha3_uncles,
utils.udf_hex_to_int(
DATA :result :size :: STRING
DATA :size :: STRING
) :: INT AS SIZE,
DATA :result :stateRoot :: STRING AS state_root,
DATA :stateRoot :: STRING AS state_root,
utils.udf_hex_to_int(
DATA :result :timestamp :: STRING
DATA :timestamp :: STRING
) :: TIMESTAMP AS block_timestamp,
utils.udf_hex_to_int(
DATA :result :totalDifficulty :: STRING
DATA :totalDifficulty :: STRING
) :: INT AS total_difficulty,
ARRAY_SIZE(
DATA :result :transactions
DATA :transactions
) AS tx_count,
DATA :result :transactionsRoot :: STRING AS transactions_root,
DATA :result :uncles AS uncles,
DATA :result :withdrawals AS withdrawals,
DATA :result :withdrawalsRoot :: STRING AS withdrawals_root,
DATA :transactionsRoot :: STRING AS transactions_root,
DATA :uncles AS uncles,
DATA :withdrawals AS withdrawals,
DATA :withdrawalsRoot :: STRING AS withdrawals_root,
_inserted_timestamp,
{{ dbt_utils.generate_surrogate_key(
['block_number']
@ -61,15 +61,15 @@ SELECT
SYSDATE() AS modified_timestamp,
'{{ invocation_id }}' AS _invocation_id,
utils.udf_hex_to_int(
DATA :result: blobGasUsed :: STRING
DATA: blobGasUsed :: STRING
) :: INT AS blob_gas_used,
utils.udf_hex_to_int(
DATA :result: excessBlobGas :: STRING
DATA: excessBlobGas :: STRING
) :: INT AS excess_blob_gas
FROM
{% if is_incremental() %}
{{ ref('bronze__streamline_blocks') }}
{{ ref('bronze__blocks') }}
WHERE
_inserted_timestamp >= (
SELECT
@ -78,7 +78,7 @@ WHERE
{{ this }}
)
{% else %}
{{ ref('bronze__streamline_fr_blocks') }}
{{ ref('bronze__blocks_fr') }}
{% endif %}
qualify(ROW_NUMBER() over (PARTITION BY block_number

View File

@ -1,4 +1,4 @@
-- depends_on: {{ ref('bronze__streamline_confirmed_blocks') }}
-- depends_on: {{ ref('bronze__confirm_blocks') }}
{{ config(
materialized = 'incremental',
incremental_strategy = 'delete+insert',
@ -17,7 +17,7 @@ WITH base AS (
FROM
{% if is_incremental() %}
{{ ref('bronze__streamline_confirmed_blocks') }}
{{ ref('bronze__confirm_blocks') }}
WHERE
_inserted_timestamp >= (
SELECT
@ -31,7 +31,7 @@ WHERE
{{ this }}
)
{% else %}
{{ ref('bronze__streamline_fr_confirmed_blocks') }}
{{ ref('bronze__confirm_blocks_fr') }}
{% endif %}
qualify(ROW_NUMBER() over (PARTITION BY block_number

View File

@ -1,4 +1,4 @@
-- depends_on: {{ ref('bronze__streamline_decoded_logs') }}
-- depends_on: {{ ref('bronze__decoded_logs') }}
{{ config (
materialized = "incremental",
unique_key = ['block_number', 'event_index'],
@ -32,7 +32,7 @@ WITH base_data AS (
FROM
{% if is_incremental() %}
{{ ref('bronze__streamline_decoded_logs') }}
{{ ref('bronze__decoded_logs') }}
WHERE
TO_TIMESTAMP_NTZ(_inserted_timestamp) >= (
SELECT
@ -42,7 +42,7 @@ WHERE
)
AND DATA NOT ILIKE '%Event topic is not present in given ABI%'
{% else %}
{{ ref('bronze__streamline_fr_decoded_logs') }}
{{ ref('bronze__decoded_logs_fr') }}
WHERE
_partition_by_block_number <= 4000000
AND DATA NOT ILIKE '%Event topic is not present in given ABI%'

View File

@ -1,4 +1,4 @@
-- depends_on: {{ ref('bronze__streamline_receipts') }}
-- depends_on: {{ ref('bronze__receipts') }}
{{ config(
materialized = 'incremental',
incremental_strategy = 'delete+insert',
@ -18,7 +18,7 @@ WITH base AS (
FROM
{% if is_incremental() %}
{{ ref('bronze__streamline_receipts') }}
{{ ref('bronze__receipts') }}
WHERE
_inserted_timestamp >= (
SELECT
@ -30,7 +30,7 @@ WHERE
DATA
)
{% else %}
{{ ref('bronze__streamline_fr_receipts') }}
{{ ref('bronze__receipts_fr') }}
WHERE
IS_OBJECT(
DATA

View File

@ -1,4 +1,4 @@
-- depends_on: {{ ref('bronze__streamline_traces') }}
-- depends_on: {{ ref('bronze__traces') }}
{{ config (
materialized = "incremental",
incremental_strategy = 'delete+insert',
@ -26,7 +26,7 @@ WITH bronze_traces AS (
{% if is_incremental() and not full_reload_mode %}
{{ ref(
'bronze__streamline_traces'
'bronze__traces'
) }}
WHERE
_inserted_timestamp >= (
@ -37,7 +37,7 @@ WHERE
)
AND DATA :result IS NOT NULL {% elif is_incremental() and full_reload_mode %}
{{ ref(
'bronze__streamline_fr_traces'
'bronze__traces_fr'
) }}
WHERE
partition_key BETWEEN (
@ -54,7 +54,7 @@ WHERE
)
{% else %}
{{ ref(
'bronze__streamline_fr_traces'
'bronze__traces_fr'
) }}
WHERE
partition_key <= 3000000

View File

@ -1,4 +1,4 @@
-- depends_on: {{ ref('bronze__streamline_transactions') }}
-- depends_on: {{ ref('bronze__transactions') }}
{{ config(
materialized = 'incremental',
incremental_strategy = 'delete+insert',
@ -17,7 +17,7 @@ WITH base AS (
FROM
{% if is_incremental() %}
{{ ref('bronze__streamline_transactions') }}
{{ ref('bronze__transactions') }}
WHERE
_inserted_timestamp >= (
SELECT
@ -27,7 +27,7 @@ WHERE
)
AND IS_OBJECT(DATA)
{% else %}
{{ ref('bronze__streamline_fr_transactions') }}
{{ ref('bronze__transactions_fr') }}
WHERE
IS_OBJECT(DATA)
{% endif %}

View File

@ -1,7 +1,7 @@
{{ config (
materialized = 'view'
) }}
{{ fsc_evm.streamline_external_table_query(
{{ v0_streamline_external_table_query(
model = "contract_abis_v2",
partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 4), '_', 1) AS INTEGER)"
) }}

View File

@ -1,7 +1,7 @@
{{ config (
materialized = 'view'
) }}
{{ fsc_evm.streamline_external_table_fr_query(
{{ v0_streamline_external_table_fr_query(
model = "contract_abis",
partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 3), '_', 1) AS INTEGER)",
partition_join_key = "_partition_by_block_id",

View File

@ -1,7 +1,7 @@
{{ config (
materialized = 'view'
) }}
{{ fsc_evm.streamline_external_table_fr_query(
{{ v0_streamline_external_table_fr_query(
model = "contract_abis_v2",
partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 4), '_', 1) AS INTEGER )"
) }}

View File

@ -1,7 +1,7 @@
{{ config (
materialized = 'view'
) }}
{{ fsc_evm.streamline_external_table_query(
{{ v0_streamline_external_table_query(
model = "eth_balances_v2",
partition_function = "TO_NUMBER(SPLIT_PART(file_name, '/', 3))",
balances = true

View File

@ -1,7 +1,7 @@
{{ config (
materialized = 'view'
) }}
{{ fsc_evm.streamline_external_table_fr_query(
{{ v0_streamline_external_table_fr_query(
model = "eth_balances",
partition_function = "TO_NUMBER(SPLIT_PART(file_name, '/', 3))",
partition_join_key = "_partition_by_block_id",

View File

@ -1,7 +1,7 @@
{{ config (
materialized = 'view'
) }}
{{ fsc_evm.streamline_external_table_fr_query(
{{ v0_streamline_external_table_fr_query(
model = "eth_balances_v2",
partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 4), '_', 1) AS INTEGER )",
balances = true

View File

@ -1,7 +1,7 @@
{{ config (
materialized = 'view'
) }}
{{ fsc_evm.streamline_external_table_fr_query(
{{ v0_streamline_external_table_fr_query(
model = "token_balances",
partition_function = "TO_NUMBER(SPLIT_PART(file_name, '/', 3))",
partition_join_key = "_partition_by_block_id",

View File

@ -1,7 +1,7 @@
{{ config (
materialized = 'view'
) }}
{{ fsc_evm.streamline_external_table_fr_query(
{{ v0_streamline_external_table_fr_query(
model = "token_balances_v2",
partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 4), '_', 1) AS INTEGER )",
balances = true

View File

@ -1,7 +1,7 @@
{{ config (
materialized = 'view'
) }}
{{ fsc_evm.streamline_external_table_query(
{{ v0_streamline_external_table_query(
model = "token_balances_v2",
partition_function = "TO_NUMBER(SPLIT_PART(file_name, '/', 3))",
balances = true

View File

@ -2,7 +2,7 @@
materialized = 'view',
tags = ['bronze_beacon_blobs']
) }}
{{ fsc_evm.streamline_external_table_query(
{{ v0_streamline_external_table_query(
model = "beacon_blobs_v2",
partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 4), '_', 1) AS INTEGER)",
block_number = false

View File

@ -1,7 +1,7 @@
{{ config (
materialized = 'view'
) }}
{{ fsc_evm.streamline_external_table_query(
{{ v0_streamline_external_table_query(
model = "beacon_blocks_v2",
partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 4), '_', 1) AS INTEGER)",
block_number = false

View File

@ -1,7 +1,7 @@
{{ config (
materialized = 'view'
) }}
{{ fsc_evm.streamline_external_table_query(
{{ v0_streamline_external_table_query(
model = "beacon_validators_v2",
partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 4), '_', 1) AS INTEGER)",
block_number = false

View File

@ -2,7 +2,7 @@
materialized = 'view',
tags = ['bronze_beacon_blobs']
) }}
{{ fsc_evm.streamline_external_table_fr_query(
{{ v0_streamline_external_table_fr_query(
model = "beacon_blobs_v2",
partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 4), '_', 1) AS INTEGER )",
block_number = false

View File

@ -1,7 +1,7 @@
{{ config (
materialized = 'view'
) }}
{{ fsc_evm.streamline_external_table_fr_query(
{{ v0_streamline_external_table_fr_query(
model = "beacon_blocks",
partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 4), '_', 1) AS INTEGER )",
partition_join_key = "_partition_by_slot_id",

View File

@ -1,7 +1,7 @@
{{ config (
materialized = 'view'
) }}
{{ fsc_evm.streamline_external_table_fr_query(
{{ v0_streamline_external_table_fr_query(
model = "beacon_blocks_v2",
partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 4), '_', 1) AS INTEGER )",
block_number = false

View File

@ -1,7 +1,7 @@
{{ config (
materialized = 'view'
) }}
{{ fsc_evm.streamline_external_table_fr_query(
{{ v0_streamline_external_table_fr_query(
model = "beacon_validators",
partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 4), '_', 1) AS INTEGER )",
partition_join_key = "_partition_by_block_id",

View File

@ -1,7 +1,7 @@
{{ config (
materialized = 'view'
) }}
{{ fsc_evm.streamline_external_table_fr_query(
{{ v0_streamline_external_table_fr_query(
model = "beacon_validators_v2",
partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 4), '_', 1) AS INTEGER )",
block_number = false

View File

@ -0,0 +1,39 @@
{# Set variables #}
{% set source_name = 'BLOCKS' %}
{% set source_version = 'V2' if var('GLOBAL_USES_STREAMLINE_V1', false) else '' %}
{% set model_type = '' %}
{%- set default_vars = set_default_variables_bronze(source_name, model_type) -%}
{% set partition_function = default_vars['partition_function'] %}
{% set partition_join_key = default_vars['partition_join_key'] %}
{% set balances = default_vars['balances'] %}
{% set block_number = default_vars['block_number'] %}
{% set uses_receipts_by_hash = default_vars['uses_receipts_by_hash'] %}
{# Log configuration details #}
{{ log_bronze_details(
source_name = source_name,
source_version = source_version,
model_type = model_type,
partition_function = partition_function,
partition_join_key = partition_join_key,
block_number = block_number,
uses_receipts_by_hash = uses_receipts_by_hash
) }}
{# Set up dbt configuration #}
{{ config (
materialized = 'view',
tags = ['bronze_core']
) }}
{# Main query starts here #}
{{ streamline_external_table_query(
source_name = source_name.lower(),
source_version = source_version.lower(),
partition_function = partition_function,
balances = balances,
block_number = block_number,
uses_receipts_by_hash = uses_receipts_by_hash
) }}

View File

@ -0,0 +1,31 @@
{{ config (
materialized = 'view',
tags = ['bronze_core']
) }}
SELECT
partition_key,
block_number,
VALUE,
CASE
WHEN DATA :result IS NULL THEN DATA
ELSE DATA :result
END AS DATA,
metadata,
file_name,
_inserted_timestamp
FROM
{{ ref('bronze__blocks_fr_v2') }}
{% if var('GLOBAL_USES_STREAMLINE_V1', false) %}
UNION ALL
SELECT
_partition_by_block_id AS partition_key,
block_number,
VALUE,
DATA :result AS DATA,
metadata,
file_name,
_inserted_timestamp
FROM
{{ ref('bronze__blocks_fr_v1') }}
{% endif %}

View File

@ -0,0 +1,40 @@
{# Set variables #}
{% set source_name = 'BLOCKS' %}
{% set source_version = '' %}
{% set model_type = 'FR' %}
{%- set default_vars = set_default_variables_bronze(source_name, model_type) -%}
{% set partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 3), '_', 1) AS INTEGER)" %}
{% set partition_join_key = '_partition_by_block_id' %}
{% set balances = default_vars['balances'] %}
{% set block_number = false %}
{% set uses_receipts_by_hash = default_vars['uses_receipts_by_hash'] %}
{# Log configuration details #}
{{ log_bronze_details(
source_name = source_name,
source_version = source_version,
model_type = model_type,
partition_function = partition_function,
partition_join_key = partition_join_key,
block_number = block_number,
uses_receipts_by_hash = uses_receipts_by_hash
) }}
{# Set up dbt configuration #}
{{ config (
materialized = 'view',
tags = ['bronze_core_streamline_v1']
) }}
{# Main query starts here #}
{{ streamline_external_table_query_fr(
source_name = source_name.lower(),
source_version = source_version.lower(),
partition_function = partition_function,
partition_join_key = partition_join_key,
balances = balances,
block_number = block_number,
uses_receipts_by_hash = uses_receipts_by_hash
) }}

View File

@ -0,0 +1,40 @@
{# Set variables #}
{% set source_name = 'BLOCKS' %}
{% set source_version = 'V2' if var('GLOBAL_USES_STREAMLINE_V1', false) else '' %}
{% set model_type = 'FR' %}
{%- set default_vars = set_default_variables_bronze(source_name, model_type) -%}
{% set partition_function = default_vars['partition_function'] %}
{% set partition_join_key = default_vars['partition_join_key'] %}
{% set balances = default_vars['balances'] %}
{% set block_number = default_vars['block_number'] %}
{% set uses_receipts_by_hash = default_vars['uses_receipts_by_hash'] %}
{# Log configuration details #}
{{ log_bronze_details(
source_name = source_name,
source_version = source_version,
model_type = model_type,
partition_function = partition_function,
partition_join_key = partition_join_key,
block_number = block_number,
uses_receipts_by_hash = uses_receipts_by_hash
) }}
{# Set up dbt configuration #}
{{ config (
materialized = 'view',
tags = ['bronze_core']
) }}
{# Main query starts here #}
{{ streamline_external_table_query_fr(
source_name = source_name.lower(),
source_version = source_version.lower(),
partition_function = partition_function,
partition_join_key = partition_join_key,
balances = balances,
block_number = block_number,
uses_receipts_by_hash = uses_receipts_by_hash
) }}

View File

@ -0,0 +1,39 @@
{# Set variables #}
{% set source_name = 'CONFIRM_BLOCKS' %}
{% set source_version = 'V2' if var('GLOBAL_USES_STREAMLINE_V1', false) else '' %}
{% set model_type = '' %}
{%- set default_vars = set_default_variables_bronze(source_name, model_type) -%}
{% set partition_function = default_vars['partition_function'] %}
{% set partition_join_key = default_vars['partition_join_key'] %}
{% set balances = default_vars['balances'] %}
{% set block_number = default_vars['block_number'] %}
{% set uses_receipts_by_hash = default_vars['uses_receipts_by_hash'] %}
{# Log configuration details #}
{{ log_bronze_details(
source_name = source_name,
source_version = source_version,
model_type = model_type,
partition_function = partition_function,
partition_join_key = partition_join_key,
block_number = block_number,
uses_receipts_by_hash = uses_receipts_by_hash
) }}
{# Set up dbt configuration #}
{{ config (
materialized = 'view',
tags = ['bronze_core']
) }}
{# Main query starts here #}
{{ streamline_external_table_query(
source_name = source_name.lower(),
source_version = source_version.lower(),
partition_function = partition_function,
balances = balances,
block_number = block_number,
uses_receipts_by_hash = uses_receipts_by_hash
) }}

View File

@ -0,0 +1,28 @@
{{ config (
materialized = 'view',
tags = ['bronze_core']
) }}
SELECT
partition_key,
block_number,
VALUE,
DATA,
metadata,
file_name,
_inserted_timestamp
FROM
{{ ref('bronze__confirm_blocks_fr_v2') }}
{% if var('GLOBAL_USES_STREAMLINE_V1', false) %}
UNION ALL
SELECT
_partition_by_block_id AS partition_key,
block_number,
VALUE,
DATA,
metadata,
file_name,
_inserted_timestamp
FROM
{{ ref('bronze__confirm_blocks_fr_v1') }}
{% endif %}

View File

@ -0,0 +1,40 @@
{# Set variables #}
{% set source_name = 'CONFIRM_BLOCKS' %}
{% set source_version = '' %}
{% set model_type = 'FR' %}
{%- set default_vars = set_default_variables_bronze(source_name, model_type) -%}
{% set partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 3), '_', 1) AS INTEGER)" %}
{% set partition_join_key = '_partition_by_block_id' %}
{% set balances = default_vars['balances'] %}
{% set block_number = false %}
{% set uses_receipts_by_hash = default_vars['uses_receipts_by_hash'] %}
{# Log configuration details #}
{{ log_bronze_details(
source_name = source_name,
source_version = source_version,
model_type = model_type,
partition_function = partition_function,
partition_join_key = partition_join_key,
block_number = block_number,
uses_receipts_by_hash = uses_receipts_by_hash
) }}
{# Set up dbt configuration #}
{{ config (
materialized = 'view',
tags = ['bronze_core_streamline_v1']
) }}
{# Main query starts here #}
{{ streamline_external_table_query_fr(
source_name = source_name.lower(),
source_version = source_version.lower(),
partition_function = partition_function,
partition_join_key = partition_join_key,
balances = balances,
block_number = block_number,
uses_receipts_by_hash = uses_receipts_by_hash
) }}

View File

@ -0,0 +1,40 @@
{# Set variables #}
{% set source_name = 'CONFIRM_BLOCKS' %}
{% set source_version = 'V2' if var('GLOBAL_USES_STREAMLINE_V1', false) else '' %}
{% set model_type = 'FR' %}
{%- set default_vars = set_default_variables_bronze(source_name, model_type) -%}
{% set partition_function = default_vars['partition_function'] %}
{% set partition_join_key = default_vars['partition_join_key'] %}
{% set balances = default_vars['balances'] %}
{% set block_number = default_vars['block_number'] %}
{% set uses_receipts_by_hash = default_vars['uses_receipts_by_hash'] %}
{# Log configuration details #}
{{ log_bronze_details(
source_name = source_name,
source_version = source_version,
model_type = model_type,
partition_function = partition_function,
partition_join_key = partition_join_key,
block_number = block_number,
uses_receipts_by_hash = uses_receipts_by_hash
) }}
{# Set up dbt configuration #}
{{ config (
materialized = 'view',
tags = ['bronze_core']
) }}
{# Main query starts here #}
{{ streamline_external_table_query_fr(
source_name = source_name.lower(),
source_version = source_version.lower(),
partition_function = partition_function,
partition_join_key = partition_join_key,
balances = balances,
block_number = block_number,
uses_receipts_by_hash = uses_receipts_by_hash
) }}

View File

@ -0,0 +1,39 @@
{# Set variables #}
{% set source_name = 'RECEIPTS' %}
{% set source_version = 'V2' if var('GLOBAL_USES_STREAMLINE_V1', false) else '' %}
{% set model_type = '' %}
{%- set default_vars = set_default_variables_bronze(source_name, model_type) -%}
{% set partition_function = default_vars['partition_function'] %}
{% set partition_join_key = default_vars['partition_join_key'] %}
{% set balances = default_vars['balances'] %}
{% set block_number = default_vars['block_number'] %}
{% set uses_receipts_by_hash = default_vars['uses_receipts_by_hash'] %}
{# Log configuration details #}
{{ log_bronze_details(
source_name = source_name,
source_version = source_version,
model_type = model_type,
partition_function = partition_function,
partition_join_key = partition_join_key,
block_number = block_number,
uses_receipts_by_hash = uses_receipts_by_hash
) }}
{# Set up dbt configuration #}
{{ config (
materialized = 'view',
tags = ['bronze_receipts']
) }}
{# Main query starts here #}
{{ streamline_external_table_query(
source_name = source_name.lower(),
source_version = source_version.lower(),
partition_function = partition_function,
balances = balances,
block_number = block_number,
uses_receipts_by_hash = uses_receipts_by_hash
) }}

View File

@ -0,0 +1,38 @@
{{ config (
materialized = 'view',
tags = ['bronze_receipts']
) }}
SELECT
partition_key,
block_number,
array_index,
VALUE,
DATA,
metadata,
file_name,
_inserted_timestamp
FROM
{{ ref('bronze__receipts_fr_v2') }}
{% if var('GLOBAL_USES_STREAMLINE_V1',false) %}
UNION ALL
SELECT
_partition_by_block_id AS partition_key,
block_number,
COALESCE(
VALUE :"array_index" :: INT,
TRY_TO_NUMBER(
utils.udf_hex_to_int(
VALUE :"data" :"transactionIndex" :: STRING
)
)
) AS array_index,
VALUE,
DATA,
metadata,
file_name,
_inserted_timestamp
FROM
{{ ref('bronze__receipts_fr_v1') }}
{% endif %}

View File

@ -0,0 +1,40 @@
{# Set variables #}
{% set source_name = 'RECEIPTS' %}
{% set source_version = '' %}
{% set model_type = 'FR' %}
{%- set default_vars = set_default_variables_bronze(source_name, model_type) -%}
{% set partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 3), '_', 1) AS INTEGER)" %}
{% set partition_join_key = '_partition_by_block_id' %}
{% set balances = default_vars['balances'] %}
{% set block_number = false %}
{% set uses_receipts_by_hash = default_vars['uses_receipts_by_hash'] %}
{# Log configuration details #}
{{ log_bronze_details(
source_name = source_name,
source_version = source_version,
model_type = model_type,
partition_function = partition_function,
partition_join_key = partition_join_key,
block_number = block_number,
uses_receipts_by_hash = uses_receipts_by_hash
) }}
{# Set up dbt configuration #}
{{ config (
materialized = 'view',
tags = ['bronze_core_streamline_v1','bronze_receipts']
) }}
{# Main query starts here #}
{{ streamline_external_table_query_fr(
source_name = source_name.lower(),
source_version = source_version.lower(),
partition_function = partition_function,
partition_join_key = partition_join_key,
balances = balances,
block_number = block_number,
uses_receipts_by_hash = uses_receipts_by_hash
) }}

View File

@ -0,0 +1,40 @@
{# Set variables #}
{% set source_name = 'RECEIPTS' %}
{% set source_version = 'V2' if var('GLOBAL_USES_STREAMLINE_V1', false) else '' %}
{% set model_type = 'FR' %}
{%- set default_vars = set_default_variables_bronze(source_name, model_type) -%}
{% set partition_function = default_vars['partition_function'] %}
{% set partition_join_key = default_vars['partition_join_key'] %}
{% set balances = default_vars['balances'] %}
{% set block_number = default_vars['block_number'] %}
{% set uses_receipts_by_hash = default_vars['uses_receipts_by_hash'] %}
{# Log configuration details #}
{{ log_bronze_details(
source_name = source_name,
source_version = source_version,
model_type = model_type,
partition_function = partition_function,
partition_join_key = partition_join_key,
block_number = block_number,
uses_receipts_by_hash = uses_receipts_by_hash
) }}
{# Set up dbt configuration #}
{{ config (
materialized = 'view',
tags = ['bronze_receipts']
) }}
{# Main query starts here #}
{{ streamline_external_table_query_fr(
source_name = source_name.lower(),
source_version = source_version.lower(),
partition_function = partition_function,
partition_join_key = partition_join_key,
balances = balances,
block_number = block_number,
uses_receipts_by_hash = uses_receipts_by_hash
) }}

View File

@ -1,7 +0,0 @@
{{ config (
materialized = 'view'
) }}
{{ fsc_evm.streamline_external_table_query(
model = "blocks_v2",
partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 4), '_', 1) AS INTEGER)"
) }}

View File

@ -1,7 +0,0 @@
{{ config (
materialized = 'view'
) }}
{{ fsc_evm.streamline_external_table_query(
model = "confirm_blocks_v2",
partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 4), '_', 1) AS INTEGER)"
) }}

View File

@ -1,7 +0,0 @@
{{ config (
materialized = 'view'
) }}
{{ fsc_evm.streamline_external_table_query(
model = "receipts_v2",
partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 4), '_', 1) AS INTEGER)"
) }}

View File

@ -1,7 +0,0 @@
{{ config (
materialized = 'view'
) }}
{{ fsc_evm.streamline_external_table_query(
model = "traces_v2",
partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 4), '_', 1) AS INTEGER)"
) }}

View File

@ -1,7 +0,0 @@
{{ config (
materialized = 'view'
) }}
{{ fsc_evm.streamline_external_table_query(
model = "transactions_v2",
partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 4), '_', 1) AS INTEGER)"
) }}

View File

@ -0,0 +1,39 @@
{# Set variables #}
{% set source_name = 'TRACES' %}
{% set source_version = 'V2' if var('GLOBAL_USES_STREAMLINE_V1', false) else '' %}
{% set model_type = '' %}
{%- set default_vars = set_default_variables_bronze(source_name, model_type) -%}
{% set partition_function = default_vars['partition_function'] %}
{% set partition_join_key = default_vars['partition_join_key'] %}
{% set balances = default_vars['balances'] %}
{% set block_number = default_vars['block_number'] %}
{% set uses_receipts_by_hash = default_vars['uses_receipts_by_hash'] %}
{# Log configuration details #}
{{ log_bronze_details(
source_name = source_name,
source_version = source_version,
model_type = model_type,
partition_function = partition_function,
partition_join_key = partition_join_key,
block_number = block_number,
uses_receipts_by_hash = uses_receipts_by_hash
) }}
{# Set up dbt configuration #}
{{ config (
materialized = 'view',
tags = ['bronze_core']
) }}
{# Main query starts here #}
{{ streamline_external_table_query(
source_name = source_name.lower(),
source_version = source_version.lower(),
partition_function = partition_function,
balances = balances,
block_number = block_number,
uses_receipts_by_hash = uses_receipts_by_hash
) }}

View File

@ -0,0 +1,30 @@
{{ config (
materialized = 'view',
tags = ['bronze_core']
) }}
SELECT
partition_key,
block_number,
array_index,
VALUE,
DATA,
metadata,
file_name,
_inserted_timestamp
FROM
{{ ref('bronze__traces_fr_v2') }}
{% if var('GLOBAL_USES_STREAMLINE_V1', false) %}
UNION ALL
SELECT
_partition_by_block_id AS partition_key,
block_number,
VALUE :"array_index" :: INT AS array_index,
VALUE,
DATA,
metadata,
file_name,
_inserted_timestamp
FROM
{{ ref('bronze__traces_fr_v1') }}
{% endif %}

View File

@ -0,0 +1,40 @@
{# Set variables #}
{% set source_name = 'DEBUG_TRACEBLOCKBYNUMBER' if var('GLOBAL_USES_SINGLE_FLIGHT_METHOD',false) else 'TRACES' %}
{% set source_version = '' %}
{% set model_type = 'FR' %}
{%- set default_vars = set_default_variables_bronze(source_name, model_type) -%}
{% set partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 3), '_', 1) AS INTEGER)" %}
{% set partition_join_key = '_partition_by_block_id' %}
{% set balances = default_vars['balances'] %}
{% set block_number = false %}
{% set uses_receipts_by_hash = default_vars['uses_receipts_by_hash'] %}
{# Log configuration details #}
{{ log_bronze_details(
source_name = source_name,
source_version = source_version,
model_type = model_type,
partition_function = partition_function,
partition_join_key = partition_join_key,
block_number = block_number,
uses_receipts_by_hash = uses_receipts_by_hash
) }}
{# Set up dbt configuration #}
{{ config (
materialized = 'view',
tags = ['bronze_core_streamline_v1']
) }}
{# Main query starts here #}
{{ streamline_external_table_query_fr(
source_name = source_name.lower(),
source_version = source_version.lower(),
partition_function = partition_function,
partition_join_key = partition_join_key,
balances = balances,
block_number = block_number,
uses_receipts_by_hash = uses_receipts_by_hash
) }}

View File

@ -0,0 +1,40 @@
{# Set variables #}
{% set source_name = 'TRACES' %}
{% set source_version = 'V2' if var('GLOBAL_USES_STREAMLINE_V1', false) else '' %}
{% set model_type = 'FR' %}
{%- set default_vars = set_default_variables_bronze(source_name, model_type) -%}
{% set partition_function = default_vars['partition_function'] %}
{% set partition_join_key = default_vars['partition_join_key'] %}
{% set balances = default_vars['balances'] %}
{% set block_number = default_vars['block_number'] %}
{% set uses_receipts_by_hash = default_vars['uses_receipts_by_hash'] %}
{# Log configuration details #}
{{ log_bronze_details(
source_name = source_name,
source_version = source_version,
model_type = model_type,
partition_function = partition_function,
partition_join_key = partition_join_key,
block_number = block_number,
uses_receipts_by_hash = uses_receipts_by_hash
) }}
{# Set up dbt configuration #}
{{ config (
materialized = 'view',
tags = ['bronze_core']
) }}
{# Main query starts here #}
{{ streamline_external_table_query_fr(
source_name = source_name.lower(),
source_version = source_version.lower(),
partition_function = partition_function,
partition_join_key = partition_join_key,
balances = balances,
block_number = block_number,
uses_receipts_by_hash = uses_receipts_by_hash
) }}

View File

@ -0,0 +1,39 @@
{# Set variables #}
{% set source_name = 'TRANSACTIONS' %}
{% set source_version = 'V2' if var('GLOBAL_USES_STREAMLINE_V1', false) else '' %}
{% set model_type = '' %}
{%- set default_vars = set_default_variables_bronze(source_name, model_type) -%}
{% set partition_function = default_vars['partition_function'] %}
{% set partition_join_key = default_vars['partition_join_key'] %}
{% set balances = default_vars['balances'] %}
{% set block_number = default_vars['block_number'] %}
{% set uses_receipts_by_hash = default_vars['uses_receipts_by_hash'] %}
{# Log configuration details #}
{{ log_bronze_details(
source_name = source_name,
source_version = source_version,
model_type = model_type,
partition_function = partition_function,
partition_join_key = partition_join_key,
block_number = block_number,
uses_receipts_by_hash = uses_receipts_by_hash
) }}
{# Set up dbt configuration #}
{{ config (
materialized = 'view',
tags = ['bronze_core']
) }}
{# Main query starts here #}
{{ streamline_external_table_query(
source_name = source_name.lower(),
source_version = source_version.lower(),
partition_function = partition_function,
balances = balances,
block_number = block_number,
uses_receipts_by_hash = uses_receipts_by_hash
) }}

View File

@ -0,0 +1,28 @@
{{ config (
materialized = 'view',
tags = ['bronze_core']
) }}
SELECT
partition_key,
block_number,
VALUE,
DATA,
metadata,
file_name,
_inserted_timestamp
FROM
{{ ref('bronze__transactions_fr_v2') }}
{% if var('GLOBAL_USES_STREAMLINE_V1', false) %}
UNION ALL
SELECT
_partition_by_block_id AS partition_key,
block_number,
VALUE,
DATA,
metadata,
file_name,
_inserted_timestamp
FROM
{{ ref('bronze__transactions_fr_v1') }}
{% endif %}

View File

@ -0,0 +1,40 @@
{# Set variables #}
{% set source_name = 'TRANSACTIONS' %}
{% set source_version = '' %}
{% set model_type = 'FR' %}
{%- set default_vars = set_default_variables_bronze(source_name, model_type) -%}
{% set partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 3), '_', 1) AS INTEGER)" %}
{% set partition_join_key = '_partition_by_block_id' %}
{% set balances = default_vars['balances'] %}
{% set block_number = false %}
{% set uses_receipts_by_hash = default_vars['uses_receipts_by_hash'] %}
{# Log configuration details #}
{{ log_bronze_details(
source_name = source_name,
source_version = source_version,
model_type = model_type,
partition_function = partition_function,
partition_join_key = partition_join_key,
block_number = block_number,
uses_receipts_by_hash = uses_receipts_by_hash
) }}
{# Set up dbt configuration #}
{{ config (
materialized = 'view',
tags = ['bronze_core_streamline_v1']
) }}
{# Main query starts here #}
{{ streamline_external_table_query_fr(
source_name = source_name.lower(),
source_version = source_version.lower(),
partition_function = partition_function,
partition_join_key = partition_join_key,
balances = balances,
block_number = block_number,
uses_receipts_by_hash = uses_receipts_by_hash
) }}

View File

@ -0,0 +1,40 @@
{# Set variables #}
{% set source_name = 'TRANSACTIONS' %}
{% set source_version = 'V2' if var('GLOBAL_USES_STREAMLINE_V1', false) else '' %}
{% set model_type = 'FR' %}
{%- set default_vars = set_default_variables_bronze(source_name, model_type) -%}
{% set partition_function = default_vars['partition_function'] %}
{% set partition_join_key = default_vars['partition_join_key'] %}
{% set balances = default_vars['balances'] %}
{% set block_number = default_vars['block_number'] %}
{% set uses_receipts_by_hash = default_vars['uses_receipts_by_hash'] %}
{# Log configuration details #}
{{ log_bronze_details(
source_name = source_name,
source_version = source_version,
model_type = model_type,
partition_function = partition_function,
partition_join_key = partition_join_key,
block_number = block_number,
uses_receipts_by_hash = uses_receipts_by_hash
) }}
{# Set up dbt configuration #}
{{ config (
materialized = 'view',
tags = ['bronze_core']
) }}
{# Main query starts here #}
{{ streamline_external_table_query_fr(
source_name = source_name.lower(),
source_version = source_version.lower(),
partition_function = partition_function,
partition_join_key = partition_join_key,
balances = balances,
block_number = block_number,
uses_receipts_by_hash = uses_receipts_by_hash
) }}

View File

@ -1,6 +0,0 @@
{{ config (
materialized = 'view'
) }}
{{ fsc_evm.streamline_external_table_fr_union_query(
model = "blocks"
) }}

View File

@ -1,9 +0,0 @@
{{ config (
materialized = 'view'
) }}
{{ fsc_evm.streamline_external_table_fr_query(
model = "blocks",
partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 3), '_', 1) AS INTEGER )",
partition_join_key = "_partition_by_block_id",
block_number = false
) }}

View File

@ -1,7 +0,0 @@
{{ config (
materialized = 'view'
) }}
{{ fsc_evm.streamline_external_table_fr_query(
model = "blocks_v2",
partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 4), '_', 1) AS INTEGER )"
) }}

View File

@ -1,6 +0,0 @@
{{ config (
materialized = 'view'
) }}
{{ fsc_evm.streamline_external_table_fr_union_query(
model = "confirmed_blocks"
) }}

View File

@ -1,9 +0,0 @@
{{ config (
materialized = 'view'
) }}
{{ fsc_evm.streamline_external_table_fr_query(
model = "confirm_blocks",
partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 3), '_', 1) AS INTEGER )",
partition_join_key = "_partition_by_block_id",
block_number = false
) }}

View File

@ -1,8 +0,0 @@
{{ config (
materialized = 'view'
) }}
{{ fsc_evm.streamline_external_table_fr_query(
model = "confirm_blocks_v2",
partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 4), '_', 1) AS INTEGER )"
) }}

View File

@ -1,6 +0,0 @@
{{ config (
materialized = 'view'
) }}
{{ fsc_evm.streamline_external_table_fr_union_query(
model = "receipts"
) }}

View File

@ -1,9 +0,0 @@
{{ config (
materialized = 'view'
) }}
{{ fsc_evm.streamline_external_table_fr_query(
model = "receipts",
partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 3), '_', 1) AS INTEGER )",
partition_join_key = "_partition_by_block_id",
block_number = false
) }}

View File

@ -1,7 +0,0 @@
{{ config (
materialized = 'view'
) }}
{{ fsc_evm.streamline_external_table_fr_query(
model = "receipts_v2",
partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 4), '_', 1) AS INTEGER )"
) }}

View File

@ -1,6 +0,0 @@
{{ config (
materialized = 'view'
) }}
{{ fsc_evm.streamline_external_table_fr_union_query(
model = "traces"
) }}

View File

@ -1,9 +0,0 @@
{{ config (
materialized = 'view'
) }}
{{ fsc_evm.streamline_external_table_fr_query(
model = "traces",
partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 3), '_', 1) AS INTEGER )",
partition_join_key = "_partition_by_block_id",
block_number = false
) }}

View File

@ -1,8 +0,0 @@
{{ config (
materialized = 'view'
) }}
{{ fsc_evm.streamline_external_table_fr_query(
model = "traces_v2",
partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 4), '_', 1) AS INTEGER )"
) }}

View File

@ -1,6 +0,0 @@
{{ config (
materialized = 'view'
) }}
{{ fsc_evm.streamline_external_table_fr_union_query(
model = "transactions"
) }}

View File

@ -1,9 +0,0 @@
{{ config (
materialized = 'view'
) }}
{{ fsc_evm.streamline_external_table_fr_query(
model = "transactions",
partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 3), '_', 1) AS INTEGER )",
partition_join_key = "_partition_by_block_id",
block_number = false
) }}

View File

@ -1,8 +0,0 @@
{{ config (
materialized = 'view'
) }}
{{ fsc_evm.streamline_external_table_fr_query(
model = "transactions_v2",
partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 4), '_', 1) AS INTEGER )"
) }}

View File

@ -1,6 +0,0 @@
{{ config (
materialized = 'view'
) }}
{{ fsc_evm.streamline_external_table_query_decoder(
model = "decoded_logs_v2"
) }}

View File

@ -1,13 +0,0 @@
{{ config (
materialized = 'view'
) }}
SELECT
*
FROM
{{ ref('bronze__streamline_fr_decoded_logs_v2') }}
UNION ALL
SELECT
*
FROM
{{ ref('bronze__streamline_fr_decoded_logs_v1') }}

View File

@ -1,6 +0,0 @@
{{ config (
materialized = 'view'
) }}
{{ fsc_evm.streamline_external_table_fr_query_decoder(
model = "decoded_logs"
) }}

View File

@ -1,6 +0,0 @@
{{ config (
materialized = 'view'
) }}
{{ fsc_evm.streamline_external_table_fr_query_decoder(
model = "decoded_logs_v2"
) }}

View File

@ -0,0 +1,23 @@
{# Set variables #}
{% set source_name = 'DECODED_LOGS' %}
{% set source_version = 'V2' if var('GLOBAL_USES_STREAMLINE_V1', false) else '' %}
{% set model_type = '' %}
{%- set default_vars = set_default_variables_bronze(source_name, model_type) -%}
{# Log configuration details #}
{{ log_model_details(
vars = default_vars
) }}
{# Set up dbt configuration #}
{{ config (
materialized = 'view',
tags = ['bronze_decoded_logs']
) }}
{# Main query starts here #}
{{ streamline_external_table_query_decoder(
source_name = source_name.lower(),
source_version = source_version.lower()
) }}

View File

@ -0,0 +1,20 @@
{# Log configuration details #}
{{ log_model_details() }}
{# Set up dbt configuration #}
{{ config (
materialized = 'view',
tags = ['bronze_decoded_logs']
) }}
SELECT
*
FROM
{{ ref('bronze__decoded_logs_fr_v2') }}
{% if var('GLOBAL_USES_STREAMLINE_V1', false) %}
UNION ALL
SELECT
*
FROM
{{ ref('bronze__decoded_logs_fr_v1') }}
{% endif %}

View File

@ -0,0 +1,23 @@
{# Set variables #}
{% set source_name = 'DECODED_LOGS' %}
{% set source_version = '' %}
{% set model_type = 'FR' %}
{%- set default_vars = set_default_variables_bronze(source_name, model_type) -%}
{# Log configuration details #}
{{ log_model_details(
vars = default_vars
) }}
{# Set up dbt configuration #}
{{ config (
materialized = 'view',
tags = ['bronze_decoded_logs_streamline_v1']
) }}
{# Main query starts here #}
{{ streamline_external_table_query_decoder_fr(
source_name = source_name.lower(),
source_version = source_version.lower()
) }}

View File

@ -0,0 +1,23 @@
{# Set variables #}
{% set source_name = 'DECODED_LOGS' %}
{% set source_version = 'V2' if var('GLOBAL_USES_STREAMLINE_V1', false) else '' %}
{% set model_type = 'FR' %}
{%- set default_vars = set_default_variables_bronze(source_name, model_type) -%}
{# Log configuration details #}
{{ log_model_details(
vars = default_vars
) }}
{# Set up dbt configuration #}
{{ config (
materialized = 'view',
tags = ['bronze_decoded_logs']
) }}
{# Main query starts here #}
{{ streamline_external_table_query_decoder_fr(
source_name = source_name.lower(),
source_version = source_version.lower()
) }}

View File

@ -1,6 +1,6 @@
{{ config (
materialized = 'view'
) }}
{{ fsc_evm.streamline_external_table_query_decoder(
{{ v0_streamline_external_table_query_decoder(
model = "decoded_traces_v2"
) }}

View File

@ -1,6 +1,6 @@
{{ config (
materialized = 'view'
) }}
{{ fsc_evm.streamline_external_table_fr_query_decoder(
{{ v0_streamline_external_table_fr_query_decoder(
model = "decoded_traces"
) }}

View File

@ -1,6 +1,6 @@
{{ config (
materialized = 'view'
) }}
{{ fsc_evm.streamline_external_table_fr_query_decoder(
{{ v0_streamline_external_table_fr_query_decoder(
model = "decoded_traces_v2"
) }}

View File

@ -1,7 +1,7 @@
{{ config (
materialized = 'view'
) }}
{{ fsc_evm.streamline_external_table_fr_query(
{{ v0_streamline_external_table_fr_query(
model = "reads",
partition_function = "TO_DATE(concat_ws('-', SPLIT_PART(file_name, '/', 3), SPLIT_PART(file_name, '/', 4), SPLIT_PART(file_name, '/', 5)))",
partition_join_key = "_partition_by_modified_date",

View File

@ -1,7 +1,7 @@
{{ config (
materialized = 'view'
) }}
{{ fsc_evm.streamline_external_table_fr_query(
{{ v0_streamline_external_table_fr_query(
model = "reads_v2",
partition_function = "TO_DATE(SPLIT_PART(SPLIT_PART(file_name, '/', 4), '_', 1))"
) }}

View File

@ -1,7 +1,7 @@
{{ config (
materialized = 'view'
) }}
{{ fsc_evm.streamline_external_table_query(
{{ v0_streamline_external_table_query(
model = "reads_v2",
partition_function = "TO_DATE(SPLIT_PART(SPLIT_PART(file_name, '/', 4), '_', 1))"
) }}

View File

@ -1,4 +1,26 @@
{{ config (
materialized = "ephemeral"
) }}
{{ fsc_evm.block_lookback_24_hour() }}
WITH max_time AS (
SELECT
MAX(block_timestamp) AS max_timestamp
FROM
{{ ref("silver__blocks") }}
)
SELECT
MIN(block_number) AS block_number
FROM
{{ ref("silver__blocks") }}
JOIN max_time
ON block_timestamp BETWEEN DATEADD(
'hour',
-25,
max_timestamp
)
AND DATEADD(
'hour',
-24,
max_timestamp
)

View File

@ -1,4 +1,11 @@
{{ config (
materialized = "ephemeral"
) }}
{{ fsc_evm.block_lookback_72_hour() }}
SELECT
MIN(block_number) AS block_number
FROM
{{ ref("silver__blocks") }}
WHERE
block_timestamp >= DATEADD('hour', -72, TRUNCATE(SYSDATE(), 'HOUR'))
AND block_timestamp < DATEADD('hour', -71, TRUNCATE(SYSDATE(), 'HOUR'))

View File

@ -2,4 +2,54 @@
materialized = "ephemeral",
unique_key = "block_number",
) }}
{{ fsc_evm.block_ranges() }}
SELECT
block_number,
CASE
WHEN RIGHT(
block_number,
1
) = 0 THEN block_number
END AS block_number_10,
CASE
WHEN RIGHT(
block_number,
2
) IN (
00,
25,
50,
75
) THEN block_number
END AS block_number_25,
CASE
WHEN RIGHT(
block_number,
2
) IN (
00,
50
) THEN block_number
END AS block_number_50,
CASE
WHEN RIGHT(
block_number,
2
) IN (00) THEN block_number
END AS block_number_100,
CASE
WHEN RIGHT(
block_number,
3
) IN (000) THEN block_number
END AS block_number_1000,
CASE
WHEN RIGHT(
block_number,
4
) IN (0000) THEN block_number
END AS block_number_10000,
block_timestamp,
TO_TIMESTAMP_NTZ(_inserted_timestamp) AS _inserted_timestamp
FROM
{{ ref("silver__blocks") }}

View File

@ -2,4 +2,26 @@
materialized = "ephemeral",
unique_key = "block_number",
) }}
{{ fsc_evm.max_block_by_date() }}
WITH base AS (
SELECT
block_timestamp :: DATE AS block_date,
MAX(block_number) block_number
FROM
{{ ref("silver__blocks") }}
GROUP BY
block_timestamp :: DATE
)
SELECT
block_date,
block_number
FROM
base
WHERE
block_date <> (
SELECT
MAX(block_date)
FROM
base
)

View File

@ -1,4 +1,37 @@
{{ config (
materialized = "ephemeral"
) }}
{{ fsc_evm.max_block_by_hour() }}
WITH base AS (
SELECT
DATE_TRUNC(
'hour',
block_timestamp
) AS block_hour,
MAX(block_number) block_number
FROM
{{ ref("silver__blocks") }}
WHERE
block_timestamp > DATEADD(
'day',
-5,
CURRENT_DATE
)
GROUP BY
1
)
SELECT
block_hour,
block_number
FROM
base
WHERE
block_hour <> (
SELECT
MAX(
block_hour
)
FROM
base
)

View File

@ -0,0 +1,50 @@
{# Set variables #}
{%- set source_name = 'BLOCKS' -%}
{%- set model_type = 'COMPLETE' -%}
{%- set full_refresh_type = var((source_name ~ '_complete_full_refresh').upper(), false) -%}
{% set post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION on equality(block_number)" %}
{# Log configuration details #}
{{ log_complete_details(
post_hook = post_hook,
full_refresh_type = full_refresh_type
) }}
{# Set up dbt configuration #}
-- depends_on: {{ ref('bronze__' ~ source_name.lower()) }}
{{ config (
materialized = "incremental",
unique_key = "block_number",
cluster_by = "ROUND(block_number, -3)",
post_hook = post_hook,
full_refresh = full_refresh_type,
tags = ['streamline_core_complete']
) }}
{# Main query starts here #}
SELECT
block_number,
file_name,
{{ dbt_utils.generate_surrogate_key(['block_number']) }} AS complete_{{ source_name.lower() }}_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp,
_inserted_timestamp,
'{{ invocation_id }}' AS _invocation_id
FROM
{% if is_incremental() %}
{{ ref('bronze__' ~ source_name.lower()) }}
WHERE
_inserted_timestamp >= (
SELECT
COALESCE(MAX(_inserted_timestamp), '1970-01-01'::TIMESTAMP) AS _inserted_timestamp
FROM
{{ this }}
)
{% else %}
{{ ref('bronze__' ~ source_name.lower() ~ '_fr') }}
{% endif %}
QUALIFY (ROW_NUMBER() OVER (PARTITION BY block_number ORDER BY _inserted_timestamp DESC)) = 1

Some files were not shown because too many files have changed in this diff Show More