AN-5627/sl-2 (#408)

* groundwork for sl 2 in core models

* sources and references

* stg uri

* refs and comments

* coalesce transaction index

* add integration for prod
This commit is contained in:
drethereum 2025-01-21 13:00:23 -07:00 committed by GitHub
parent 0116785ee3
commit 46e569d88e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
74 changed files with 2762 additions and 806 deletions

View File

@ -43,4 +43,8 @@ jobs:
dbt deps
- name: Run DBT Jobs
run: |
dbt run --vars '{"STREAMLINE_INVOKE_STREAMS":True}' -m "polygon_models,tag:streamline_core_complete" "polygon_models,tag:streamline_core_realtime"
dbt run --vars '{"STREAMLINE_INVOKE_STREAMS":True}' -m "polygon_models,tag:streamline_core_complete" "polygon_models,tag:streamline_core_realtime" "polygon_models,tag:streamline_core_complete_receipts" "polygon_models,tag:streamline_core_realtime_receipts" "polygon_models,tag:streamline_core_complete_confirm_blocks" "polygon_models,tag:streamline_core_realtime_confirm_blocks"
- name: Run Chainhead Tests
run: |
dbt test -m "polygon_models,tag:chainhead"

View File

@ -29,7 +29,7 @@ on:
description: 'DBT Run Command'
required: true
options:
- dbt run --vars '{"STREAMLINE_INVOKE_STREAMS":True}' -m "polygon_models,tag:streamline_core_complete" "polygon_models,tag:streamline_core_history"
- dbt run --vars '{"STREAMLINE_INVOKE_STREAMS":True}' -m "polygon_models,tag:streamline_core_complete" "polygon_models,tag:streamline_core_history" "polygon_models,tag:streamline_core_complete_receipts" "polygon_models,tag:streamline_core_history_receipts" "polygon_models,tag:streamline_core_complete_confirm_blocks" "polygon_models,tag:streamline_core_history_confirm_blocks"
env:
DBT_PROFILES_DIR: ./

View File

@ -77,21 +77,59 @@ vars:
START_GHA_TASKS: False
STUDIO_TEST_USER_ID: '{{ env_var("STUDIO_TEST_USER_ID", "98d15c30-9fa5-43cd-9c69-3d4c0bb269f5") }}'
API_INTEGRATION: '{{ var("config")[target.name]["API_INTEGRATION"] if var("config")[target.name] else var("config")["dev"]["API_INTEGRATION"] }}'
#### STREAMLINE 2.0 BEGIN ####
API_INTEGRATION: '{{ var("config")[target.name]["API_INTEGRATION"] if var("config")[target.name] else var("config")["dev"]["API_INTEGRATION"] }}'
EXTERNAL_FUNCTION_URI: '{{ var("config")[target.name]["EXTERNAL_FUNCTION_URI"] if var("config")[target.name] else var("config")["dev"]["EXTERNAL_FUNCTION_URI"] }}'
ROLES: |
["INTERNAL_DEV"]
["INTERNAL_DEV"]
config:
# The keys correspond to dbt profiles and are case sensitive
dev:
API_INTEGRATION: AWS_POLYGON_API_DEV
EXTERNAL_FUNCTION_URI: rzyjrd54s6.execute-api.us-east-1.amazonaws.com/dev/
API_INTEGRATION: AWS_POLYGON_API_STG_V2
EXTERNAL_FUNCTION_URI: bwl8qfgi2d.execute-api.us-east-1.amazonaws.com/stg/
ROLES:
- AWS_LAMBDA_POLYGON_API
- INTERNAL_DEV
prod:
API_INTEGRATION: AWS_POLYGON_API
EXTERNAL_FUNCTION_URI: p6dhi5vxn4.execute-api.us-east-1.amazonaws.com/prod/
API_INTEGRATION: AWS_POLYGON_API_PROD_V2
EXTERNAL_FUNCTION_URI: sjfxmdtv9j.execute-api.us-east-1.amazonaws.com/prod/
ROLES:
- AWS_LAMBDA_POLYGON_API
- INTERNAL_DEV
- BI_ANALYTICS_READER
- DBT_CLOUD_POLYGON
#### STREAMLINE 2.0 END ####
#### FSC_EVM BEGIN ####
# Visit https://github.com/FlipsideCrypto/fsc-evm/wiki for more information on required and optional variables
### GLOBAL VARIABLES BEGIN ###
## REQUIRED
GLOBAL_PROD_DB_NAME: 'polygon'
GLOBAL_NODE_SECRET_PATH: 'Vault/prod/polygon/quicknode/mainnet'
GLOBAL_BLOCKS_PER_HOUR: 1700
GLOBAL_USES_STREAMLINE_V1: True
GLOBAL_USES_SINGLE_FLIGHT_METHOD: True
### GLOBAL VARIABLES END ###
### MAIN_PACKAGE VARIABLES BEGIN ###
### CORE ###
## REQUIRED
## OPTIONAL
# GOLD_FULL_REFRESH: True
# SILVER_FULL_REFRESH: True
# BLOCKS_COMPLETE_FULL_REFRESH: True
# CONFIRM_BLOCKS_COMPLETE_FULL_REFRESH: True
# TRACES_COMPLETE_FULL_REFRESH: True
# RECEIPTS_COMPLETE_FULL_REFRESH: True
# TRANSACTIONS_COMPLETE_FULL_REFRESH: True
### MAIN_PACKAGE VARIABLES END ###
#### FSC_EVM END ####

View File

@ -0,0 +1,226 @@
{% macro silver_traces_v1(
full_reload_start_block,
full_reload_blocks,
full_reload_mode = false,
TRACES_ARB_MODE = false,
TRACES_SEI_MODE = false,
TRACES_KAIA_MODE = false,
use_partition_key = false,
schema_name = 'bronze'
) %}
WITH bronze_traces AS (
SELECT
block_number,
{% if use_partition_key %}
partition_key,
{% else %}
_partition_by_block_id AS partition_key,
{% endif %}
VALUE :array_index :: INT AS tx_position,
DATA :result AS full_traces,
{% if TRACES_SEI_MODE %}
DATA :txHash :: STRING AS tx_hash,
{% endif %}
_inserted_timestamp
FROM
{% if is_incremental() and not full_reload_mode %}
{{ ref(
schema_name ~ '__traces'
) }}
WHERE
_inserted_timestamp >= (
SELECT
MAX(_inserted_timestamp) _inserted_timestamp
FROM
{{ this }}
)
AND DATA :result IS NOT NULL {% if TRACES_ARB_MODE %}
AND block_number > 22207817
{% endif %}
{% elif is_incremental() and full_reload_mode %}
{{ ref(
schema_name ~ '__traces_fr'
) }}
WHERE
{% if use_partition_key %}
partition_key BETWEEN (
SELECT
MAX(partition_key) - 100000
FROM
{{ this }}
)
AND (
SELECT
MAX(partition_key) + {{ full_reload_blocks }}
FROM
{{ this }}
)
{% else %}
_partition_by_block_id BETWEEN (
SELECT
MAX(_partition_by_block_id) - 100000
FROM
{{ this }}
)
AND (
SELECT
MAX(_partition_by_block_id) + {{ full_reload_blocks }}
FROM
{{ this }}
)
{% endif %}
{% if TRACES_ARB_MODE %}
AND block_number > 22207817
{% endif %}
{% else %}
{{ ref(
schema_name ~ '__traces_fr'
) }}
WHERE
{% if use_partition_key %}
partition_key <= {{ full_reload_start_block }}
{% else %}
_partition_by_block_id <= {{ full_reload_start_block }}
{% endif %}
{% if TRACES_ARB_MODE %}
AND block_number > 22207817
{% endif %}
{% endif %}
qualify(ROW_NUMBER() over (PARTITION BY block_number, tx_position
ORDER BY
_inserted_timestamp DESC)) = 1
),
flatten_traces AS (
SELECT
block_number,
{% if TRACES_SEI_MODE %}
tx_hash,
{% else %}
tx_position,
{% endif %}
partition_key,
IFF(
path IN (
'result',
'result.value',
'result.type',
'result.to',
'result.input',
'result.gasUsed',
'result.gas',
'result.from',
'result.output',
'result.error',
'result.revertReason',
'result.time',
'gasUsed',
'gas',
'type',
'to',
'from',
'value',
'input',
'error',
'output',
'time',
'revertReason'
{% if TRACES_ARB_MODE %},
'afterEVMTransfers',
'beforeEVMTransfers',
'result.afterEVMTransfers',
'result.beforeEVMTransfers'
{% endif %}
{% if TRACES_KAIA_MODE %},
'reverted',
'result.reverted'
{% endif %}
),
'ORIGIN',
REGEXP_REPLACE(REGEXP_REPLACE(path, '[^0-9]+', '_'), '^_|_$', '')
) AS trace_address,
_inserted_timestamp,
OBJECT_AGG(
key,
VALUE
) AS trace_json,
CASE
WHEN trace_address = 'ORIGIN' THEN NULL
WHEN POSITION(
'_' IN trace_address
) = 0 THEN 'ORIGIN'
ELSE REGEXP_REPLACE(
trace_address,
'_[0-9]+$',
'',
1,
1
)
END AS parent_trace_address,
SPLIT(
trace_address,
'_'
) AS trace_address_array
FROM
bronze_traces txs,
TABLE(
FLATTEN(
input => PARSE_JSON(
txs.full_traces
),
recursive => TRUE
)
) f
WHERE
f.index IS NULL
AND f.key != 'calls'
AND f.path != 'result'
{% if TRACES_ARB_MODE %}
AND f.path NOT LIKE 'afterEVMTransfers[%'
AND f.path NOT LIKE 'beforeEVMTransfers[%'
{% endif %}
{% if TRACES_KAIA_MODE %}
and f.key not in ('message', 'contract')
{% endif %}
GROUP BY
block_number,
{% if TRACES_SEI_MODE %}
tx_hash,
{% else %}
tx_position,
{% endif %}
partition_key,
trace_address,
_inserted_timestamp
)
SELECT
block_number,
{% if TRACES_SEI_MODE %}
tx_hash,
{% else %}
tx_position,
{% endif %}
trace_address,
parent_trace_address,
trace_address_array,
trace_json,
partition_key,
_inserted_timestamp,
{{ dbt_utils.generate_surrogate_key(
['block_number'] +
(['tx_hash'] if TRACES_SEI_MODE else ['tx_position']) +
['trace_address']
) }} AS traces_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp,
'{{ invocation_id }}' AS _invocation_id
FROM
flatten_traces qualify(ROW_NUMBER() over(PARTITION BY traces_id
ORDER BY
_inserted_timestamp DESC)) = 1
{% endmacro %}

View File

@ -0,0 +1,141 @@
{% macro streamline_external_table_query(
source_name,
source_version,
partition_function,
balances,
block_number,
uses_receipts_by_hash
) %}
{% if source_version != '' %}
{% set source_version = '_' ~ source_version.lower() %}
{% endif %}
WITH meta AS (
SELECT
job_created_time AS _inserted_timestamp,
file_name,
{{ partition_function }} AS partition_key
FROM
TABLE(
information_schema.external_table_file_registration_history(
start_time => DATEADD('day', -3, CURRENT_TIMESTAMP()),
table_name => '{{ source( "bronze_streamline", source_name ~ source_version) }}')
) A
)
SELECT
s.*,
b.file_name,
b._inserted_timestamp
{% if balances %},
r.block_timestamp :: TIMESTAMP AS block_timestamp
{% endif %}
{% if block_number %},
COALESCE(
s.value :"BLOCK_NUMBER" :: STRING,
s.metadata :request :"data" :id :: STRING,
PARSE_JSON(
s.metadata :request :"data"
) :id :: STRING
) :: INT AS block_number
{% endif %}
{% if uses_receipts_by_hash %},
s.value :"TX_HASH" :: STRING AS tx_hash
{% endif %}
FROM
{{ source(
"bronze_streamline",
source_name ~ source_version
) }}
s
JOIN meta b
ON b.file_name = metadata$filename
AND b.partition_key = s.partition_key
{% if balances %}
JOIN {{ ref('_block_ranges') }}
r
ON r.block_number = COALESCE(
s.value :"BLOCK_NUMBER" :: INT,
s.value :"block_number" :: INT
)
{% endif %}
WHERE
b.partition_key = s.partition_key
AND DATA :error IS NULL
AND DATA IS NOT NULL
{% endmacro %}
{% macro streamline_external_table_query_fr(
source_name,
source_version,
partition_function,
partition_join_key,
balances,
block_number,
uses_receipts_by_hash
) %}
{% if source_version != '' %}
{% set source_version = '_' ~ source_version.lower() %}
{% endif %}
WITH meta AS (
SELECT
registered_on AS _inserted_timestamp,
file_name,
{{ partition_function }} AS partition_key
FROM
TABLE(
information_schema.external_table_files(
table_name => '{{ source( "bronze_streamline", source_name ~ source_version) }}'
)
) A
)
SELECT
s.*,
b.file_name,
b._inserted_timestamp
{% if balances %},
r.block_timestamp :: TIMESTAMP AS block_timestamp
{% endif %}
{% if block_number %},
COALESCE(
s.value :"BLOCK_NUMBER" :: STRING,
s.value :"block_number" :: STRING,
s.metadata :request :"data" :id :: STRING,
PARSE_JSON(
s.metadata :request :"data"
) :id :: STRING
) :: INT AS block_number
{% endif %}
{% if uses_receipts_by_hash %},
s.value :"TX_HASH" :: STRING AS tx_hash
{% endif %}
FROM
{{ source(
"bronze_streamline",
source_name ~ source_version
) }}
s
JOIN meta b
ON b.file_name = metadata$filename
AND b.partition_key = s.{{ partition_join_key }}
{% if balances %}
JOIN {{ ref('_block_ranges') }}
r
ON r.block_number = COALESCE(
s.value :"BLOCK_NUMBER" :: INT,
s.value :"block_number" :: INT
)
{% endif %}
WHERE
b.partition_key = s.{{ partition_join_key }}
AND DATA :error IS NULL
AND DATA IS NOT NULL
{% endmacro %}

View File

@ -0,0 +1,36 @@
{% macro log_bronze_details(source_name, source_version, model_type, partition_function, partition_join_key, block_number, uses_receipts_by_hash) %}
{% if source_version != '' %}
{% set source_version = '_' ~ source_version.lower() %}
{% endif %}
{% if model_type != '' %}
{% set model_type = '_' ~ model_type %}
{% endif %}
{%- if flags.WHICH == 'compile' and execute -%}
{{ log("=== Current Variable Settings ===", info=True) }}
{{ log(source_name ~ model_type ~ '_PARTITION_FUNCTION: ' ~ partition_function, info=True) }}
{{ log(source_name ~ model_type ~ '_PARTITION_JOIN_KEY: ' ~ partition_join_key, info=True) }}
{{ log(source_name ~ model_type ~ '_BLOCK_NUMBER: ' ~ block_number, info=True) }}
{% if uses_receipts_by_hash %}
{{ log("USES_RECEIPTS_BY_HASH: " ~ uses_receipts_by_hash, info=True) }}
{% endif %}
{{ log("", info=True) }}
{{ log("=== Source Details ===", info=True) }}
{{ log("Source: " ~ source('bronze_streamline', source_name.lower() ~ source_version.lower()), info=True) }}
{{ log("", info=True) }}
{% set config_log = '\n' %}
{% set config_log = config_log ~ '\n=== DBT Model Config ===\n'%}
{% set config_log = config_log ~ '\n{{ config (\n' %}
{% set config_log = config_log ~ ' materialized = "' ~ config.get('materialized') ~ '",\n' %}
{% set config_log = config_log ~ ' tags = ' ~ config.get('tags') | tojson ~ '\n' %}
{% set config_log = config_log ~ ') }}\n' %}
{{ log(config_log, info=True) }}
{{ log("", info=True) }}
{%- endif -%}
{% endmacro %}

View File

@ -0,0 +1,29 @@
{% macro log_complete_details(post_hook, full_refresh_type, uses_receipts_by_hash) %}
{%- if flags.WHICH == 'compile' and execute -%}
{% if uses_receipts_by_hash %}
{{ log("=== Current Variable Settings ===", info=True) }}
{{ log("USES_RECEIPTS_BY_HASH: " ~ uses_receipts_by_hash, info=True) }}
{% endif %}
{% set config_log = '\n' %}
{% set config_log = config_log ~ '\n=== DBT Model Config ===\n'%}
{% set config_log = config_log ~ '\n{{ config (\n' %}
{% set config_log = config_log ~ ' materialized = "' ~ config.get('materialized') ~ '",\n' %}
{% set config_log = config_log ~ ' unique_key = "' ~ config.get('unique_key') ~ '",\n' %}
{% set config_log = config_log ~ ' cluster_by = "' ~ config.get('cluster_by') ~ '",\n' %}
{% set config_log = config_log ~ ' merge_update_columns = ' ~ config.get('merge_update_columns') | tojson ~ ',\n' %}
{% set config_log = config_log ~ ' post_hook = "' ~ post_hook ~ '",\n' %}
{% set config_log = config_log ~ ' incremental_predicates = ' ~ config.get('incremental_predicates') | tojson ~ ',\n' %}
{% set config_log = config_log ~ ' full_refresh = ' ~ full_refresh_type ~ ',\n' %}
{% set config_log = config_log ~ ' tags = ' ~ config.get('tags') | tojson ~ '\n' %}
{% set config_log = config_log ~ ') }}\n' %}
{{ log(config_log, info=True) }}
{{ log("", info=True) }}
{%- endif -%}
{% endmacro %}

View File

@ -0,0 +1,55 @@
{% macro log_streamline_details(model_name, model_type, node_url, model_quantum_state, sql_limit, testing_limit, order_by_clause, new_build, streamline_params, uses_receipts_by_hash, method, method_params, min_block=0) %}
{%- if flags.WHICH == 'compile' and execute -%}
{{ log("=== Current Variable Settings ===", info=True) }}
{{ log("START_UP_BLOCK: " ~ min_block, info=True) }}
{{ log("", info=True) }}
{{ log("=== API Details ===", info=True) }}
{{ log("NODE_URL: " ~ node_url, info=True) }}
{{ log("NODE_SECRET_PATH: " ~ var('GLOBAL_NODE_SECRET_PATH'), info=True) }}
{{ log("", info=True) }}
{{ log("=== Current Variable Settings ===", info=True) }}
{{ log((model_name ~ '_' ~ model_type ~ '_model_quantum_state').upper() ~ ': ' ~ model_quantum_state, info=True) }}
{{ log((model_name ~ '_' ~ model_type ~ '_sql_limit').upper() ~ ': ' ~ sql_limit, info=True) }}
{{ log((model_name ~ '_' ~ model_type ~ '_testing_limit').upper() ~ ': ' ~ testing_limit, info=True) }}
{{ log((model_name ~ '_' ~ model_type ~ '_order_by_clause').upper() ~ ': ' ~ order_by_clause, info=True) }}
{{ log((model_name ~ '_' ~ model_type ~ '_new_build').upper() ~ ': ' ~ new_build, info=True) }}
{{ log('USES_RECEIPTS_BY_HASH' ~ ': ' ~ uses_receipts_by_hash, info=True) }}
{{ log("", info=True) }}
{{ log("=== RPC Details ===", info=True) }}
{{ log(model_name ~ ": {", info=True) }}
{{ log(" method: '" ~ method ~ "',", info=True) }}
{{ log(" method_params: " ~ method_params, info=True) }}
{{ log("}", info=True) }}
{{ log("", info=True) }}
{% set params_str = streamline_params | tojson %}
{% set params_formatted = params_str | replace('{', '{\n ') | replace('}', '\n }') | replace(', ', ',\n ') %}
{# Clean up the method_params formatting #}
{% set params_formatted = params_formatted | replace('"method_params": "', '"method_params": "') | replace('\\n', ' ') | replace('\\u0027', "'") %}
{% set config_log = '\n' %}
{% set config_log = config_log ~ '\n=== DBT Model Config ===\n'%}
{% set config_log = config_log ~ '\n{{ config (\n' %}
{% set config_log = config_log ~ ' materialized = "' ~ config.get('materialized') ~ '",\n' %}
{% set config_log = config_log ~ ' post_hook = fsc_utils.if_data_call_function_v2(\n' %}
{% set config_log = config_log ~ ' func = "streamline.udf_bulk_rest_api_v2",\n' %}
{% set config_log = config_log ~ ' target = "' ~ this.schema ~ '.' ~ this.identifier ~ '",\n' %}
{% set config_log = config_log ~ ' params = ' ~ params_formatted ~ '\n' %}
{% set config_log = config_log ~ ' ),\n' %}
{% set config_log = config_log ~ ' tags = ' ~ config.get('tags') | tojson ~ '\n' %}
{% set config_log = config_log ~ ') }}\n' %}
{{ log(config_log, info=True) }}
{{ log("", info=True) }}
{%- endif -%}
{% endmacro %}

View File

@ -0,0 +1,47 @@
{% macro set_default_variables_streamline(model_name, model_type) %}
{%- set node_url = var('GLOBAL_NODE_URL', '{Service}/{Authentication}') -%}
{%- set node_secret_path = var('GLOBAL_NODE_SECRET_PATH', '') -%}
{%- set model_quantum_state = var((model_name ~ '_' ~ model_type ~ '_quantum_state').upper(), 'streamline') -%}
{%- set testing_limit = var((model_name ~ '_' ~ model_type ~ '_testing_limit').upper(), none) -%}
{%- set new_build = var((model_name ~ '_' ~ model_type ~ '_new_build').upper(), false) -%}
{%- set default_order = 'ORDER BY partition_key DESC, block_number DESC' if model_type.lower() == 'realtime'
else 'ORDER BY partition_key ASC, block_number ASC' -%}
{%- set order_by_clause = var((model_name ~ '_' ~ model_type ~ '_order_by_clause').upper(), default_order) -%}
{%- set uses_receipts_by_hash = var('GLOBAL_USES_RECEIPTS_BY_HASH', false) -%}
{%- set variables = {
'node_url': node_url,
'node_secret_path': node_secret_path,
'model_quantum_state': model_quantum_state,
'testing_limit': testing_limit,
'new_build': new_build,
'order_by_clause': order_by_clause,
'uses_receipts_by_hash': uses_receipts_by_hash
} -%}
{{ return(variables) }}
{% endmacro %}
{% macro set_default_variables_bronze(source_name, model_type) %}
{%- set partition_function = var(source_name ~ model_type ~ '_PARTITION_FUNCTION',
"CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 4), '_', 1) AS INTEGER)")
-%}
{%- set partition_join_key = var(source_name ~ model_type ~ '_PARTITION_JOIN_KEY', 'partition_key') -%}
{%- set block_number = var(source_name ~ model_type ~ '_BLOCK_NUMBER', true) -%}
{%- set balances = var(source_name ~ model_type ~ '_BALANCES', false) -%}
{%- set uses_receipts_by_hash = var('GLOBAL_USES_RECEIPTS_BY_HASH', false) -%}
{%- set variables = {
'partition_function': partition_function,
'partition_join_key': partition_join_key,
'block_number': block_number,
'balances': balances,
'uses_receipts_by_hash': uses_receipts_by_hash
} -%}
{{ return(variables) }}
{% endmacro %}

View File

@ -0,0 +1,57 @@
{% macro set_streamline_parameters(model_name, model_type, multiplier=1) %}
{%- set rpc_config_details = {
"blocks_transactions": {
"method": 'eth_getBlockByNumber',
"method_params": 'ARRAY_CONSTRUCT(utils.udf_int_to_hex(block_number), TRUE)',
"exploded_key": ['result', 'result.transactions']
},
"receipts_by_hash": {
"method": 'eth_getTransactionReceipt',
"method_params": 'ARRAY_CONSTRUCT(tx_hash)'
},
"receipts": {
"method": 'eth_getBlockReceipts',
"method_params": 'ARRAY_CONSTRUCT(utils.udf_int_to_hex(block_number))',
"exploded_key": ['result'],
"lambdas": 2
},
"traces": {
"method": 'debug_traceBlockByNumber',
"method_params": "ARRAY_CONSTRUCT(utils.udf_int_to_hex(block_number), OBJECT_CONSTRUCT('tracer', 'callTracer', 'timeout', '120s'))",
"exploded_key": ['result'],
"lambdas": 2
},
"confirm_blocks": {
"method": 'eth_getBlockByNumber',
"method_params": 'ARRAY_CONSTRUCT(utils.udf_int_to_hex(block_number), FALSE)'
}
} -%}
{%- set rpc_config = rpc_config_details[model_name.lower()] -%}
{%- set params = {
"external_table": var((model_name ~ '_' ~ model_type ~ '_external_table').upper(), model_name.lower()),
"sql_limit": var((model_name ~ '_' ~ model_type ~ '_sql_limit').upper(), 2 * var('GLOBAL_BLOCKS_PER_HOUR',0) * multiplier),
"producer_batch_size": var((model_name ~ '_' ~ model_type ~ '_producer_batch_size').upper(), 2 * var('GLOBAL_BLOCKS_PER_HOUR',0) * multiplier),
"worker_batch_size": var(
(model_name ~ '_' ~ model_type ~ '_worker_batch_size').upper(),
(2 * var('GLOBAL_BLOCKS_PER_HOUR',0) * multiplier) // (rpc_config.get('lambdas', 1))
),
"sql_source": (model_name ~ '_' ~ model_type).lower(),
"method": rpc_config['method'],
"method_params": rpc_config['method_params']
} -%}
{%- if rpc_config.get('exploded_key') is not none -%}
{%- do params.update({"exploded_key": tojson(rpc_config['exploded_key'])}) -%}
{%- endif -%}
{%- if rpc_config.get('lambdas') is not none -%}
{%- do params.update({"lambdas": rpc_config['lambdas']}) -%}
{%- endif -%}
{{ return(params) }}
{% endmacro %}

View File

@ -1,119 +0,0 @@
{% macro streamline_external_table_query(
model,
partition_function,
partition_name,
unique_key
) %}
WITH meta AS (
SELECT
last_modified AS _inserted_timestamp,
file_name,
{{ partition_function }} AS {{ partition_name }}
FROM
TABLE(
information_schema.external_table_file_registration_history(
start_time => DATEADD('day', -3, CURRENT_TIMESTAMP()),
table_name => '{{ source( "bronze_streamline", model) }}')
) A
)
SELECT
{{ unique_key }},
DATA,
_inserted_timestamp,
MD5(
CAST(
COALESCE(CAST({{ unique_key }} AS text), '' :: STRING) AS text
)
) AS id,
s.{{ partition_name }},
s.value AS VALUE,
file_name
FROM
{{ source(
"bronze_streamline",
model
) }}
s
JOIN meta b
ON b.file_name = metadata$filename
AND b.{{ partition_name }} = s.{{ partition_name }}
WHERE
b.{{ partition_name }} = s.{{ partition_name }}
AND (
DATA :error :code IS NULL
OR DATA :error :code NOT IN (
'-32000',
'-32001',
'-32002',
'-32003',
'-32004',
'-32005',
'-32006',
'-32007',
'-32008',
'-32009',
'-32010',
'-32608'
)
)
{% endmacro %}
{% macro streamline_external_table_fr_query(
model,
partition_function,
partition_name,
unique_key
) %}
WITH meta AS (
SELECT
registered_on AS _inserted_timestamp,
file_name,
{{ partition_function }} AS {{ partition_name }}
FROM
TABLE(
information_schema.external_table_files(
table_name => '{{ source( "bronze_streamline", model) }}'
)
) A
)
SELECT
{{ unique_key }},
DATA,
_inserted_timestamp,
MD5(
CAST(
COALESCE(CAST({{ unique_key }} AS text), '' :: STRING) AS text
)
) AS id,
s.{{ partition_name }},
s.value AS VALUE,
file_name
FROM
{{ source(
"bronze_streamline",
model
) }}
s
JOIN meta b
ON b.file_name = metadata$filename
AND b.{{ partition_name }} = s.{{ partition_name }}
WHERE
b.{{ partition_name }} = s.{{ partition_name }}
AND (
DATA :error :code IS NULL
OR DATA :error :code NOT IN (
'-32000',
'-32001',
'-32002',
'-32003',
'-32004',
'-32005',
'-32006',
'-32007',
'-32008',
'-32009',
'-32010',
'-32608'
)
)
{% endmacro %}

View File

@ -43,7 +43,7 @@ missing_txs AS (
block_number,
tx_hash
)
JOIN {{ ref("streamline__complete_debug_traceBlockByNumber") }} USING (block_number)
JOIN {{ ref("streamline__traces_complete") }} USING (block_number)
WHERE
tr.tx_hash IS NULL
)

View File

@ -43,7 +43,7 @@ missing_txs AS (
block_number,
tx_position
)
JOIN {{ ref("streamline__complete_debug_traceBlockByNumber") }} USING (block_number)
JOIN {{ ref("streamline__traces_complete") }} USING (block_number)
LEFT JOIN {{ source(
'polygon_silver',
'overflowed_traces2'

View File

@ -1,4 +1,4 @@
-- depends_on: {{ ref('bronze__streamline_blocks') }}
-- depends_on: {{ ref('bronze__blocks') }}
{{ config(
materialized = 'incremental',
unique_key = "block_number",
@ -58,7 +58,7 @@ SELECT
FROM
{% if is_incremental() %}
{{ ref('bronze__streamline_blocks') }}
{{ ref('bronze__blocks') }}
WHERE
_inserted_timestamp >= (
SELECT
@ -67,7 +67,7 @@ WHERE
{{ this }}
)
{% else %}
{{ ref('bronze__streamline_fr_blocks') }}
{{ ref('bronze__blocks_fr') }}
{% endif %}
qualify(ROW_NUMBER() over (PARTITION BY block_number

View File

@ -1,4 +1,4 @@
-- depends_on: {{ ref('bronze__streamline_confirm_blocks') }}
-- depends_on: {{ ref('bronze__confirm_blocks') }}
{{ config(
materialized = 'incremental',
incremental_strategy = 'delete+insert',
@ -17,7 +17,7 @@ WITH base AS (
FROM
{% if is_incremental() %}
{{ ref('bronze__streamline_confirm_blocks') }}
{{ ref('bronze__confirm_blocks') }}
WHERE
_inserted_timestamp >= (
SELECT
@ -31,7 +31,7 @@ WHERE
{{ this }}
)
{% else %}
{{ ref('bronze__streamline_fr_confirm_blocks') }}
{{ ref('bronze__confirm_blocks_fr') }}
{% endif %}
qualify(ROW_NUMBER() over (PARTITION BY block_number

View File

@ -1,4 +1,4 @@
-- depends_on: {{ ref('bronze__streamline_receipts') }}
-- depends_on: {{ ref('bronze__receipts') }}
{{ config(
materialized = 'incremental',
incremental_strategy = 'delete+insert',
@ -18,7 +18,7 @@ WITH base AS (
FROM
{% if is_incremental() %}
{{ ref('bronze__streamline_receipts') }}
{{ ref('bronze__receipts') }}
WHERE
_inserted_timestamp >= (
SELECT
@ -28,7 +28,7 @@ WHERE
)
AND IS_OBJECT(DATA)
{% else %}
{{ ref('bronze__streamline_fr_receipts') }}
{{ ref('bronze__receipts_fr') }}
WHERE
IS_OBJECT(DATA)
{% endif %}

View File

@ -1,4 +1,4 @@
-- depends_on: {{ ref('bronze__streamline_traces') }}
-- depends_on: {{ ref('bronze__traces') }}
{% set warehouse = 'DBT_SNOWPARK' if var('OVERFLOWED_TRACES') else target.warehouse %}
{{ config (
materialized = "incremental",
@ -21,7 +21,7 @@ WITH bronze_traces AS (
FROM
{% if is_incremental() %}
{{ ref('bronze__streamline_traces') }}
{{ ref('bronze__traces') }}
WHERE
_inserted_timestamp >= (
SELECT
@ -31,7 +31,7 @@ WHERE
)
AND DATA :result IS NOT NULL
{% else %}
{{ ref('bronze__streamline_fr_traces') }}
{{ ref('bronze__traces_fr') }}
WHERE
_partition_by_block_id <= 2300000
AND DATA :result IS NOT NULL

View File

@ -1,4 +1,4 @@
-- depends_on: {{ ref('bronze__streamline_traces') }}
-- depends_on: {{ ref('bronze__traces') }}
{{ config (
materialized = "incremental",
incremental_strategy = 'delete+insert',
@ -8,7 +8,8 @@
full_refresh = false,
tags = ['non_realtime']
) }}
{{ fsc_evm.silver_traces_v1(
{{ silver_traces_v1(
full_reload_start_block = 5000000,
full_reload_blocks = 1000000
full_reload_blocks = 1000000,
use_partition_key = true
) }}

View File

@ -1,4 +1,4 @@
-- depends_on: {{ ref('bronze__streamline_transactions') }}
-- depends_on: {{ ref('bronze__transactions') }}
{{ config(
materialized = 'incremental',
incremental_strategy = 'delete+insert',
@ -17,7 +17,7 @@ WITH base AS (
FROM
{% if is_incremental() %}
{{ ref('bronze__streamline_transactions') }}
{{ ref('bronze__transactions') }}
WHERE
_inserted_timestamp >= (
SELECT
@ -27,7 +27,7 @@ WHERE
)
AND IS_OBJECT(DATA)
{% else %}
{{ ref('bronze__streamline_fr_transactions') }}
{{ ref('bronze__transactions_fr') }}
WHERE
IS_OBJECT(DATA)
{% endif %}

View File

@ -15,16 +15,21 @@ sources:
- name: bronze_streamline
database: streamline
schema: |
{{ "POLYGON_DEV" if var("STREAMLINE_USE_DEV_FOR_EXTERNAL_TABLES", False) else "POLYGON" }}
{{ var('GLOBAL_PROD_DB_NAME') ~ ('_dev' if var("STREAMLINE_USE_DEV_FOR_EXTERNAL_TABLES", False) else '') }}
tables:
- name: receipts
- name: blocks
- name: transactions
- name: debug_traceBlockByNumber
- name: debug_traceblockbynumber
- name: decoded_logs
- name: confirm_blocks
- name: matic_balances
- name: token_balances
- name: blocks_v2
- name: transactions_v2
- name: receipts_v2
- name: traces_v2
- name: confirm_blocks_v2
- name: streamline_crosschain
database: streamline
schema: crosschain

View File

@ -1,11 +0,0 @@
{{ config (
materialized = 'view'
) }}
{% set model = this.identifier.split("_") [-1] %}
{{ streamline_external_table_query(
model,
partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 3), '_', 1) AS INTEGER)",
partition_name = "_partition_by_block_id",
unique_key = "block_number"
) }}

View File

@ -1,9 +0,0 @@
{{ config (
materialized = 'view'
) }}
{{ streamline_external_table_query(
model = "confirm_blocks",
partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 4), '_', 1) AS INTEGER )",
partition_name = "_partition_by_block_id",
unique_key = "block_number"
) }}

View File

@ -1,11 +0,0 @@
{{ config (
materialized = 'view'
) }}
{% set model = this.identifier.split("_") [-1] %}
{{ streamline_external_table_query(
model,
partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 3), '_', 1) AS INTEGER )",
partition_name = "_partition_by_block_id",
unique_key = "block_number"
) }}

View File

@ -1,10 +0,0 @@
{{ config (
materialized = 'view'
) }}
{{ streamline_external_table_query(
model = "debug_traceBlockByNumber",
partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 3), '_', 1) AS INTEGER )",
partition_name = "_partition_by_block_id",
unique_key = "block_number"
) }}

View File

@ -1,11 +0,0 @@
{{ config (
materialized = 'view'
) }}
{% set model = this.identifier.split("_") [-1] %}
{{ streamline_external_table_query(
model,
partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 3), '_', 1) AS INTEGER )",
partition_name = "_partition_by_block_id",
unique_key = "block_number"
) }}

View File

@ -0,0 +1,39 @@
{# Set variables #}
{% set source_name = 'BLOCKS' %}
{% set source_version = 'V2' if var('GLOBAL_USES_STREAMLINE_V1', false) or var('GLOBAL_USES_BLOCKS_TRANSACTIONS_PATH', false) else '' %}
{% set model_type = '' %}
{%- set default_vars = set_default_variables_bronze(source_name, model_type) -%}
{% set partition_function = default_vars['partition_function'] %}
{% set partition_join_key = default_vars['partition_join_key'] %}
{% set balances = default_vars['balances'] %}
{% set block_number = default_vars['block_number'] %}
{% set uses_receipts_by_hash = default_vars['uses_receipts_by_hash'] %}
{# Log configuration details #}
{{ log_bronze_details(
source_name = source_name,
source_version = source_version,
model_type = model_type,
partition_function = partition_function,
partition_join_key = partition_join_key,
block_number = block_number,
uses_receipts_by_hash = uses_receipts_by_hash
) }}
{# Set up dbt configuration #}
{{ config (
materialized = 'view',
tags = ['bronze_core']
) }}
{# Main query starts here #}
{{ streamline_external_table_query(
source_name = source_name.lower(),
source_version = source_version.lower(),
partition_function = partition_function,
balances = balances,
block_number = block_number,
uses_receipts_by_hash = uses_receipts_by_hash
) }}

View File

@ -0,0 +1,41 @@
{{ config (
materialized = 'view',
tags = ['bronze_core']
) }}
SELECT
partition_key,
block_number,
VALUE,
DATA,
metadata,
file_name,
_inserted_timestamp
FROM
{{ ref('bronze__blocks_fr_v2') }}
{% if var('GLOBAL_USES_STREAMLINE_V1', false) %}
UNION ALL
SELECT
_partition_by_block_id AS partition_key,
block_number,
VALUE,
DATA,
metadata,
file_name,
_inserted_timestamp
FROM
{{ ref('bronze__blocks_fr_v1') }}
{% endif %}
{% if var('GLOBAL_USES_BLOCKS_TRANSACTIONS_PATH', false) %}
UNION ALL
SELECT
partition_key,
block_number,
VALUE,
DATA,
metadata,
file_name,
_inserted_timestamp
FROM
{{ ref('bronze__blocks_fr_v2_1') }}
{% endif %}

View File

@ -0,0 +1,40 @@
{# Set variables #}
{% set source_name = 'BLOCKS' %}
{% set source_version = '' %}
{% set model_type = 'FR' %}
{%- set default_vars = set_default_variables_bronze(source_name, model_type) -%}
{% set partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 3), '_', 1) AS INTEGER)" %}
{% set partition_join_key = '_partition_by_block_id' %}
{% set balances = default_vars['balances'] %}
{% set block_number = false %}
{% set uses_receipts_by_hash = default_vars['uses_receipts_by_hash'] %}
{# Log configuration details #}
{{ log_bronze_details(
source_name = source_name,
source_version = source_version,
model_type = model_type,
partition_function = partition_function,
partition_join_key = partition_join_key,
block_number = block_number,
uses_receipts_by_hash = uses_receipts_by_hash
) }}
{# Set up dbt configuration #}
{{ config (
materialized = 'view',
tags = ['bronze_core_streamline_v1']
) }}
{# Main query starts here #}
{{ streamline_external_table_query_fr(
source_name = source_name.lower(),
source_version = source_version.lower(),
partition_function = partition_function,
partition_join_key = partition_join_key,
balances = balances,
block_number = block_number,
uses_receipts_by_hash = uses_receipts_by_hash
) }}

View File

@ -0,0 +1,40 @@
{# Set variables #}
{% set source_name = 'BLOCKS' %}
{% set source_version = 'V2' if var('GLOBAL_USES_STREAMLINE_V1', false) else '' %}
{% set model_type = 'FR' %}
{%- set default_vars = set_default_variables_bronze(source_name, model_type) -%}
{% set partition_function = default_vars['partition_function'] %}
{% set partition_join_key = default_vars['partition_join_key'] %}
{% set balances = default_vars['balances'] %}
{% set block_number = default_vars['block_number'] %}
{% set uses_receipts_by_hash = default_vars['uses_receipts_by_hash'] %}
{# Log configuration details #}
{{ log_bronze_details(
source_name = source_name,
source_version = source_version,
model_type = model_type,
partition_function = partition_function,
partition_join_key = partition_join_key,
block_number = block_number,
uses_receipts_by_hash = uses_receipts_by_hash
) }}
{# Set up dbt configuration #}
{{ config (
materialized = 'view',
tags = ['bronze_core']
) }}
{# Main query starts here #}
{{ streamline_external_table_query_fr(
source_name = source_name.lower(),
source_version = source_version.lower(),
partition_function = partition_function,
partition_join_key = partition_join_key,
balances = balances,
block_number = block_number,
uses_receipts_by_hash = uses_receipts_by_hash
) }}

View File

@ -0,0 +1,39 @@
{# Set variables #}
{% set source_name = 'CONFIRM_BLOCKS' %}
{% set source_version = 'V2' if var('GLOBAL_USES_STREAMLINE_V1', false) else '' %}
{% set model_type = '' %}
{%- set default_vars = set_default_variables_bronze(source_name, model_type) -%}
{% set partition_function = default_vars['partition_function'] %}
{% set partition_join_key = default_vars['partition_join_key'] %}
{% set balances = default_vars['balances'] %}
{% set block_number = default_vars['block_number'] %}
{% set uses_receipts_by_hash = default_vars['uses_receipts_by_hash'] %}
{# Log configuration details #}
{{ log_bronze_details(
source_name = source_name,
source_version = source_version,
model_type = model_type,
partition_function = partition_function,
partition_join_key = partition_join_key,
block_number = block_number,
uses_receipts_by_hash = uses_receipts_by_hash
) }}
{# Set up dbt configuration #}
{{ config (
materialized = 'view',
tags = ['bronze_core']
) }}
{# Main query starts here #}
{{ streamline_external_table_query(
source_name = source_name.lower(),
source_version = source_version.lower(),
partition_function = partition_function,
balances = balances,
block_number = block_number,
uses_receipts_by_hash = uses_receipts_by_hash
) }}

View File

@ -0,0 +1,28 @@
{{ config (
materialized = 'view',
tags = ['bronze_core']
) }}
SELECT
partition_key,
block_number,
VALUE,
DATA,
metadata,
file_name,
_inserted_timestamp
FROM
{{ ref('bronze__confirm_blocks_fr_v2') }}
{% if var('GLOBAL_USES_STREAMLINE_V1', false) %}
UNION ALL
SELECT
_partition_by_block_id AS partition_key,
block_number,
VALUE,
DATA,
metadata,
file_name,
_inserted_timestamp
FROM
{{ ref('bronze__confirm_blocks_fr_v1') }}
{% endif %}

View File

@ -0,0 +1,40 @@
{# Set variables #}
{% set source_name = 'CONFIRM_BLOCKS' %}
{% set source_version = '' %}
{% set model_type = 'FR' %}
{%- set default_vars = set_default_variables_bronze(source_name, model_type) -%}
{% set partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 3), '_', 1) AS INTEGER)" %}
{% set partition_join_key = '_partition_by_block_id' %}
{% set balances = default_vars['balances'] %}
{% set block_number = false %}
{% set uses_receipts_by_hash = default_vars['uses_receipts_by_hash'] %}
{# Log configuration details #}
{{ log_bronze_details(
source_name = source_name,
source_version = source_version,
model_type = model_type,
partition_function = partition_function,
partition_join_key = partition_join_key,
block_number = block_number,
uses_receipts_by_hash = uses_receipts_by_hash
) }}
{# Set up dbt configuration #}
{{ config (
materialized = 'view',
tags = ['bronze_core_streamline_v1']
) }}
{# Main query starts here #}
{{ streamline_external_table_query_fr(
source_name = source_name.lower(),
source_version = source_version.lower(),
partition_function = partition_function,
partition_join_key = partition_join_key,
balances = balances,
block_number = block_number,
uses_receipts_by_hash = uses_receipts_by_hash
) }}

View File

@ -0,0 +1,40 @@
{# Set variables #}
{% set source_name = 'CONFIRM_BLOCKS' %}
{% set source_version = 'V2' if var('GLOBAL_USES_STREAMLINE_V1', false) else '' %}
{% set model_type = 'FR' %}
{%- set default_vars = set_default_variables_bronze(source_name, model_type) -%}
{% set partition_function = default_vars['partition_function'] %}
{% set partition_join_key = default_vars['partition_join_key'] %}
{% set balances = default_vars['balances'] %}
{% set block_number = default_vars['block_number'] %}
{% set uses_receipts_by_hash = default_vars['uses_receipts_by_hash'] %}
{# Log configuration details #}
{{ log_bronze_details(
source_name = source_name,
source_version = source_version,
model_type = model_type,
partition_function = partition_function,
partition_join_key = partition_join_key,
block_number = block_number,
uses_receipts_by_hash = uses_receipts_by_hash
) }}
{# Set up dbt configuration #}
{{ config (
materialized = 'view',
tags = ['bronze_core']
) }}
{# Main query starts here #}
{{ streamline_external_table_query_fr(
source_name = source_name.lower(),
source_version = source_version.lower(),
partition_function = partition_function,
partition_join_key = partition_join_key,
balances = balances,
block_number = block_number,
uses_receipts_by_hash = uses_receipts_by_hash
) }}

View File

@ -0,0 +1,39 @@
{# Set variables #}
{% set source_name = 'RECEIPTS' %}
{% set source_version = 'V2' if var('GLOBAL_USES_STREAMLINE_V1', false) else '' %}
{% set model_type = '' %}
{%- set default_vars = set_default_variables_bronze(source_name, model_type) -%}
{% set partition_function = default_vars['partition_function'] %}
{% set partition_join_key = default_vars['partition_join_key'] %}
{% set balances = default_vars['balances'] %}
{% set block_number = default_vars['block_number'] %}
{% set uses_receipts_by_hash = default_vars['uses_receipts_by_hash'] %}
{# Log configuration details #}
{{ log_bronze_details(
source_name = source_name,
source_version = source_version,
model_type = model_type,
partition_function = partition_function,
partition_join_key = partition_join_key,
block_number = block_number,
uses_receipts_by_hash = uses_receipts_by_hash
) }}
{# Set up dbt configuration #}
{{ config (
materialized = 'view',
tags = ['bronze_receipts']
) }}
{# Main query starts here #}
{{ streamline_external_table_query(
source_name = source_name.lower(),
source_version = source_version.lower(),
partition_function = partition_function,
balances = balances,
block_number = block_number,
uses_receipts_by_hash = uses_receipts_by_hash
) }}

View File

@ -0,0 +1,37 @@
{{ config (
materialized = 'view',
tags = ['bronze_receipts']
) }}
SELECT
partition_key,
block_number,
array_index,
VALUE,
DATA,
metadata,
file_name,
_inserted_timestamp
FROM
{{ ref('bronze__receipts_fr_v2') }}
{% if var('GLOBAL_USES_STREAMLINE_V1', false) %}
UNION ALL
SELECT
_partition_by_block_id AS partition_key,
block_number,
COALESCE(
VALUE :"array_index" :: INT,
TRY_TO_NUMBER(
utils.udf_hex_to_int(
VALUE :"data" :"transactionIndex" :: STRING
)
)
) AS array_index,
VALUE,
DATA,
metadata,
file_name,
_inserted_timestamp
FROM
{{ ref('bronze__receipts_fr_v1') }}
{% endif %}

View File

@ -0,0 +1,40 @@
{# Set variables #}
{% set source_name = 'RECEIPTS' %}
{% set source_version = '' %}
{% set model_type = 'FR' %}
{%- set default_vars = set_default_variables_bronze(source_name, model_type) -%}
{% set partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 3), '_', 1) AS INTEGER)" %}
{% set partition_join_key = '_partition_by_block_id' %}
{% set balances = default_vars['balances'] %}
{% set block_number = false %}
{% set uses_receipts_by_hash = default_vars['uses_receipts_by_hash'] %}
{# Log configuration details #}
{{ log_bronze_details(
source_name = source_name,
source_version = source_version,
model_type = model_type,
partition_function = partition_function,
partition_join_key = partition_join_key,
block_number = block_number,
uses_receipts_by_hash = uses_receipts_by_hash
) }}
{# Set up dbt configuration #}
{{ config (
materialized = 'view',
tags = ['bronze_core_streamline_v1','bronze_receipts']
) }}
{# Main query starts here #}
{{ streamline_external_table_query_fr(
source_name = source_name.lower(),
source_version = source_version.lower(),
partition_function = partition_function,
partition_join_key = partition_join_key,
balances = balances,
block_number = block_number,
uses_receipts_by_hash = uses_receipts_by_hash
) }}

View File

@ -0,0 +1,40 @@
{# Set variables #}
{% set source_name = 'RECEIPTS' %}
{% set source_version = 'V2' if var('GLOBAL_USES_STREAMLINE_V1', false) else '' %}
{% set model_type = 'FR' %}
{%- set default_vars = set_default_variables_bronze(source_name, model_type) -%}
{% set partition_function = default_vars['partition_function'] %}
{% set partition_join_key = default_vars['partition_join_key'] %}
{% set balances = default_vars['balances'] %}
{% set block_number = default_vars['block_number'] %}
{% set uses_receipts_by_hash = default_vars['uses_receipts_by_hash'] %}
{# Log configuration details #}
{{ log_bronze_details(
source_name = source_name,
source_version = source_version,
model_type = model_type,
partition_function = partition_function,
partition_join_key = partition_join_key,
block_number = block_number,
uses_receipts_by_hash = uses_receipts_by_hash
) }}
{# Set up dbt configuration #}
{{ config (
materialized = 'view',
tags = ['bronze_receipts']
) }}
{# Main query starts here #}
{{ streamline_external_table_query_fr(
source_name = source_name.lower(),
source_version = source_version.lower(),
partition_function = partition_function,
partition_join_key = partition_join_key,
balances = balances,
block_number = block_number,
uses_receipts_by_hash = uses_receipts_by_hash
) }}

View File

@ -0,0 +1,39 @@
{# Set variables #}
{% set source_name = 'TRACES' %}
{% set source_version = 'V2' if var('GLOBAL_USES_STREAMLINE_V1', false) else '' %}
{% set model_type = '' %}
{%- set default_vars = set_default_variables_bronze(source_name, model_type) -%}
{% set partition_function = default_vars['partition_function'] %}
{% set partition_join_key = default_vars['partition_join_key'] %}
{% set balances = default_vars['balances'] %}
{% set block_number = default_vars['block_number'] %}
{% set uses_receipts_by_hash = default_vars['uses_receipts_by_hash'] %}
{# Log configuration details #}
{{ log_bronze_details(
source_name = source_name,
source_version = source_version,
model_type = model_type,
partition_function = partition_function,
partition_join_key = partition_join_key,
block_number = block_number,
uses_receipts_by_hash = uses_receipts_by_hash
) }}
{# Set up dbt configuration #}
{{ config (
materialized = 'view',
tags = ['bronze_core']
) }}
{# Main query starts here #}
{{ streamline_external_table_query(
source_name = source_name.lower(),
source_version = source_version.lower(),
partition_function = partition_function,
balances = balances,
block_number = block_number,
uses_receipts_by_hash = uses_receipts_by_hash
) }}

View File

@ -0,0 +1,30 @@
{{ config (
materialized = 'view',
tags = ['bronze_core']
) }}
SELECT
partition_key,
block_number,
array_index,
VALUE,
DATA,
metadata,
file_name,
_inserted_timestamp
FROM
{{ ref('bronze__traces_fr_v2') }}
{% if var('GLOBAL_USES_STREAMLINE_V1', false) %}
UNION ALL
SELECT
_partition_by_block_id AS partition_key,
block_number,
VALUE :"array_index" :: INT AS array_index,
VALUE,
DATA,
metadata,
file_name,
_inserted_timestamp
FROM
{{ ref('bronze__traces_fr_v1') }}
{% endif %}

View File

@ -0,0 +1,40 @@
{# Set variables #}
{% set source_name = 'DEBUG_TRACEBLOCKBYNUMBER' if var('GLOBAL_USES_SINGLE_FLIGHT_METHOD',false) else 'TRACES' %}
{% set source_version = '' %}
{% set model_type = 'FR' %}
{%- set default_vars = set_default_variables_bronze(source_name, model_type) -%}
{% set partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 3), '_', 1) AS INTEGER)" %}
{% set partition_join_key = '_partition_by_block_id' %}
{% set balances = default_vars['balances'] %}
{% set block_number = false %}
{% set uses_receipts_by_hash = default_vars['uses_receipts_by_hash'] %}
{# Log configuration details #}
{{ log_bronze_details(
source_name = source_name,
source_version = source_version,
model_type = model_type,
partition_function = partition_function,
partition_join_key = partition_join_key,
block_number = block_number,
uses_receipts_by_hash = uses_receipts_by_hash
) }}
{# Set up dbt configuration #}
{{ config (
materialized = 'view',
tags = ['bronze_core_streamline_v1']
) }}
{# Main query starts here #}
{{ streamline_external_table_query_fr(
source_name = source_name.lower(),
source_version = source_version.lower(),
partition_function = partition_function,
partition_join_key = partition_join_key,
balances = balances,
block_number = block_number,
uses_receipts_by_hash = uses_receipts_by_hash
) }}

View File

@ -0,0 +1,40 @@
{# Set variables #}
{% set source_name = 'TRACES' %}
{% set source_version = 'V2' if var('GLOBAL_USES_STREAMLINE_V1', false) else '' %}
{% set model_type = 'FR' %}
{%- set default_vars = set_default_variables_bronze(source_name, model_type) -%}
{% set partition_function = default_vars['partition_function'] %}
{% set partition_join_key = default_vars['partition_join_key'] %}
{% set balances = default_vars['balances'] %}
{% set block_number = default_vars['block_number'] %}
{% set uses_receipts_by_hash = default_vars['uses_receipts_by_hash'] %}
{# Log configuration details #}
{{ log_bronze_details(
source_name = source_name,
source_version = source_version,
model_type = model_type,
partition_function = partition_function,
partition_join_key = partition_join_key,
block_number = block_number,
uses_receipts_by_hash = uses_receipts_by_hash
) }}
{# Set up dbt configuration #}
{{ config (
materialized = 'view',
tags = ['bronze_core']
) }}
{# Main query starts here #}
{{ streamline_external_table_query_fr(
source_name = source_name.lower(),
source_version = source_version.lower(),
partition_function = partition_function,
partition_join_key = partition_join_key,
balances = balances,
block_number = block_number,
uses_receipts_by_hash = uses_receipts_by_hash
) }}

View File

@ -0,0 +1,39 @@
{# Set variables #}
{% set source_name = 'TRANSACTIONS' %}
{% set source_version = 'V2' if var('GLOBAL_USES_STREAMLINE_V1', false) or var('GLOBAL_USES_BLOCKS_TRANSACTIONS_PATH', false) else '' %}
{% set model_type = '' %}
{%- set default_vars = set_default_variables_bronze(source_name, model_type) -%}
{% set partition_function = default_vars['partition_function'] %}
{% set partition_join_key = default_vars['partition_join_key'] %}
{% set balances = default_vars['balances'] %}
{% set block_number = default_vars['block_number'] %}
{% set uses_receipts_by_hash = default_vars['uses_receipts_by_hash'] %}
{# Log configuration details #}
{{ log_bronze_details(
source_name = source_name,
source_version = source_version,
model_type = model_type,
partition_function = partition_function,
partition_join_key = partition_join_key,
block_number = block_number,
uses_receipts_by_hash = uses_receipts_by_hash
) }}
{# Set up dbt configuration #}
{{ config (
materialized = 'view',
tags = ['bronze_core']
) }}
{# Main query starts here #}
{{ streamline_external_table_query(
source_name = source_name.lower(),
source_version = source_version.lower(),
partition_function = partition_function,
balances = balances,
block_number = block_number,
uses_receipts_by_hash = uses_receipts_by_hash
) }}

View File

@ -0,0 +1,41 @@
{{ config (
materialized = 'view',
tags = ['bronze_core']
) }}
SELECT
partition_key,
block_number,
VALUE,
DATA,
metadata,
file_name,
_inserted_timestamp
FROM
{{ ref('bronze__transactions_fr_v2') }}
{% if var('GLOBAL_USES_STREAMLINE_V1', false) %}
UNION ALL
SELECT
_partition_by_block_id AS partition_key,
block_number,
VALUE,
DATA,
metadata,
file_name,
_inserted_timestamp
FROM
{{ ref('bronze__transactions_fr_v1') }}
{% endif %}
{% if var('GLOBAL_USES_BLOCKS_TRANSACTIONS_PATH', false) %}
UNION ALL
SELECT
partition_key,
block_number,
VALUE,
DATA,
metadata,
file_name,
_inserted_timestamp
FROM
{{ ref('bronze__transactions_fr_v2_1') }}
{% endif %}

View File

@ -0,0 +1,40 @@
{# Set variables #}
{% set source_name = 'TRANSACTIONS' %}
{% set source_version = '' %}
{% set model_type = 'FR' %}
{%- set default_vars = set_default_variables_bronze(source_name, model_type) -%}
{% set partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 3), '_', 1) AS INTEGER)" %}
{% set partition_join_key = '_partition_by_block_id' %}
{% set balances = default_vars['balances'] %}
{% set block_number = false %}
{% set uses_receipts_by_hash = default_vars['uses_receipts_by_hash'] %}
{# Log configuration details #}
{{ log_bronze_details(
source_name = source_name,
source_version = source_version,
model_type = model_type,
partition_function = partition_function,
partition_join_key = partition_join_key,
block_number = block_number,
uses_receipts_by_hash = uses_receipts_by_hash
) }}
{# Set up dbt configuration #}
{{ config (
materialized = 'view',
tags = ['bronze_core_streamline_v1']
) }}
{# Main query starts here #}
{{ streamline_external_table_query_fr(
source_name = source_name.lower(),
source_version = source_version.lower(),
partition_function = partition_function,
partition_join_key = partition_join_key,
balances = balances,
block_number = block_number,
uses_receipts_by_hash = uses_receipts_by_hash
) }}

View File

@ -0,0 +1,40 @@
{# Set variables #}
{% set source_name = 'TRANSACTIONS' %}
{% set source_version = 'V2' if var('GLOBAL_USES_STREAMLINE_V1', false) else '' %}
{% set model_type = 'FR' %}
{%- set default_vars = set_default_variables_bronze(source_name, model_type) -%}
{% set partition_function = default_vars['partition_function'] %}
{% set partition_join_key = default_vars['partition_join_key'] %}
{% set balances = default_vars['balances'] %}
{% set block_number = default_vars['block_number'] %}
{% set uses_receipts_by_hash = default_vars['uses_receipts_by_hash'] %}
{# Log configuration details #}
{{ log_bronze_details(
source_name = source_name,
source_version = source_version,
model_type = model_type,
partition_function = partition_function,
partition_join_key = partition_join_key,
block_number = block_number,
uses_receipts_by_hash = uses_receipts_by_hash
) }}
{# Set up dbt configuration #}
{{ config (
materialized = 'view',
tags = ['bronze_core']
) }}
{# Main query starts here #}
{{ streamline_external_table_query_fr(
source_name = source_name.lower(),
source_version = source_version.lower(),
partition_function = partition_function,
partition_join_key = partition_join_key,
balances = balances,
block_number = block_number,
uses_receipts_by_hash = uses_receipts_by_hash
) }}

View File

@ -1,11 +0,0 @@
{{ config (
materialized = 'view'
) }}
{% set model = this.identifier.split("_") [-1] %}
{{ streamline_external_table_fr_query(
model,
partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 3), '_', 1) AS INTEGER)",
partition_name = "_partition_by_block_id",
unique_key = "block_number"
) }}

View File

@ -1,9 +0,0 @@
{{ config (
materialized = 'view'
) }}
{{ streamline_external_table_fr_query(
model = "confirm_blocks",
partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 4), '_', 1) AS INTEGER )",
partition_name = "_partition_by_block_id",
unique_key = "block_number"
) }}

View File

@ -1,11 +0,0 @@
{{ config (
materialized = 'view'
) }}
{% set model = this.identifier.split("_") [-1] %}
{{ streamline_external_table_fr_query(
model,
partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 3), '_', 1) AS INTEGER )",
partition_name = "_partition_by_block_id",
unique_key = "block_number"
) }}

View File

@ -1,9 +0,0 @@
{{ config (
materialized = 'view'
) }}
{{ streamline_external_table_fr_query(
model = "debug_traceBlockByNumber",
partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 3), '_', 1) AS INTEGER )",
partition_name = "_partition_by_block_id",
unique_key = "block_number"
) }}

View File

@ -1,11 +0,0 @@
{{ config (
materialized = 'view'
) }}
{% set model = this.identifier.split("_") [-1] %}
{{ streamline_external_table_fr_query(
model,
partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 3), '_', 1) AS INTEGER )",
partition_name = "_partition_by_block_id",
unique_key = "block_number"
) }}

View File

@ -0,0 +1,50 @@
{# Set variables #}
{%- set source_name = 'BLOCKS' -%}
{%- set model_type = 'COMPLETE' -%}
{%- set full_refresh_type = var((source_name ~ '_complete_full_refresh').upper(), false) -%}
{% set post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION on equality(block_number)" %}
{# Log configuration details #}
{{ log_complete_details(
post_hook = post_hook,
full_refresh_type = full_refresh_type
) }}
{# Set up dbt configuration #}
-- depends_on: {{ ref('bronze__' ~ source_name.lower()) }}
{{ config (
materialized = "incremental",
unique_key = "block_number",
cluster_by = "ROUND(block_number, -3)",
post_hook = post_hook,
full_refresh = full_refresh_type,
tags = ['streamline_core_complete']
) }}
{# Main query starts here #}
SELECT
block_number,
file_name,
{{ dbt_utils.generate_surrogate_key(['block_number']) }} AS complete_{{ source_name.lower() }}_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp,
_inserted_timestamp,
'{{ invocation_id }}' AS _invocation_id
FROM
{% if is_incremental() %}
{{ ref('bronze__' ~ source_name.lower()) }}
WHERE
_inserted_timestamp >= (
SELECT
COALESCE(MAX(_inserted_timestamp), '1970-01-01'::TIMESTAMP) AS _inserted_timestamp
FROM
{{ this }}
)
{% else %}
{{ ref('bronze__' ~ source_name.lower() ~ '_fr') }}
{% endif %}
QUALIFY (ROW_NUMBER() OVER (PARTITION BY block_number ORDER BY _inserted_timestamp DESC)) = 1

View File

@ -1,29 +0,0 @@
-- depends_on: {{ ref('bronze__streamline_confirm_blocks') }}
{{ config (
materialized = "incremental",
unique_key = "id",
cluster_by = "ROUND(block_number, -3)",
tags = ['streamline_core_complete']
) }}
SELECT
id,
block_number,
_inserted_timestamp
FROM
{% if is_incremental() %}
{{ ref('bronze__streamline_confirm_blocks') }}
WHERE
_inserted_timestamp >= (
SELECT
COALESCE(MAX(_inserted_timestamp), '1970-01-01' :: TIMESTAMP) _inserted_timestamp
FROM
{{ this }})
{% else %}
{{ ref('bronze__streamline_fr_confirm_blocks') }}
{% endif %}
qualify(ROW_NUMBER() over (PARTITION BY id
ORDER BY
_inserted_timestamp DESC)) = 1

View File

@ -1,33 +0,0 @@
-- depends_on: {{ ref('bronze__streamline_traces') }}
{{ config (
materialized = "incremental",
unique_key = "id",
cluster_by = "ROUND(block_number, -3)",
merge_update_columns = ["id"],
post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION on equality(id)",
tags = ['streamline_core_complete']
) }}
SELECT
id,
block_number,
file_name,
_inserted_timestamp
FROM
{% if is_incremental() %}
{{ ref('bronze__streamline_traces') }}
WHERE
_inserted_timestamp >= (
SELECT
MAX(_inserted_timestamp) _inserted_timestamp
FROM
{{ this }}
)
{% else %}
{{ ref('bronze__streamline_fr_traces') }}
{% endif %}
qualify(ROW_NUMBER() over (PARTITION BY id
ORDER BY
_inserted_timestamp DESC)) = 1

View File

@ -1,33 +0,0 @@
-- depends_on: {{ ref('bronze__streamline_blocks') }}
{{ config (
materialized = "incremental",
unique_key = "id",
cluster_by = "ROUND(block_number, -3)",
merge_update_columns = ["id"],
post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION on equality(id)",
tags = ['streamline_core_complete']
) }}
SELECT
id,
block_number,
file_name,
_inserted_timestamp
FROM
{% if is_incremental() %}
{{ ref('bronze__streamline_blocks') }}
WHERE
_inserted_timestamp >= (
SELECT
MAX(_inserted_timestamp) _inserted_timestamp
FROM
{{ this }}
)
{% else %}
{{ ref('bronze__streamline_fr_blocks') }}
{% endif %}
qualify(ROW_NUMBER() over (PARTITION BY id
ORDER BY
_inserted_timestamp DESC)) = 1

View File

@ -0,0 +1,50 @@
{# Set variables #}
{%- set source_name = 'CONFIRM_BLOCKS' -%}
{%- set model_type = 'COMPLETE' -%}
{%- set full_refresh_type = var((source_name ~ '_complete_full_refresh').upper(), false) -%}
{% set post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION on equality(block_number)" %}
{# Log configuration details #}
{{ log_complete_details(
post_hook = post_hook,
full_refresh_type = full_refresh_type
) }}
{# Set up dbt configuration #}
-- depends_on: {{ ref('bronze__' ~ source_name.lower()) }}
{{ config (
materialized = "incremental",
unique_key = "block_number",
cluster_by = "ROUND(block_number, -3)",
post_hook = post_hook,
full_refresh = full_refresh_type,
tags = ['streamline_core_complete_confirm_blocks']
) }}
{# Main query starts here #}
SELECT
block_number,
file_name,
{{ dbt_utils.generate_surrogate_key(['block_number']) }} AS complete_{{ source_name.lower() }}_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp,
_inserted_timestamp,
'{{ invocation_id }}' AS _invocation_id
FROM
{% if is_incremental() %}
{{ ref('bronze__' ~ source_name.lower()) }}
WHERE
_inserted_timestamp >= (
SELECT
COALESCE(MAX(_inserted_timestamp), '1970-01-01'::TIMESTAMP) AS _inserted_timestamp
FROM
{{ this }}
)
{% else %}
{{ ref('bronze__' ~ source_name.lower() ~ '_fr') }}
{% endif %}
QUALIFY (ROW_NUMBER() OVER (PARTITION BY block_number ORDER BY _inserted_timestamp DESC)) = 1

View File

@ -0,0 +1,50 @@
{# Set variables #}
{%- set source_name = 'RECEIPTS' -%}
{%- set model_type = 'COMPLETE' -%}
{%- set full_refresh_type = var((source_name ~ '_complete_full_refresh').upper(), false) -%}
{% set post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION on equality(block_number)" %}
{# Log configuration details #}
{{ log_complete_details(
post_hook = post_hook,
full_refresh_type = full_refresh_type
) }}
{# Set up dbt configuration #}
-- depends_on: {{ ref('bronze__' ~ source_name.lower()) }}
{{ config (
materialized = "incremental",
unique_key = "block_number",
cluster_by = "ROUND(block_number, -3)",
post_hook = post_hook,
full_refresh = full_refresh_type,
tags = ['streamline_core_complete_receipts']
) }}
{# Main query starts here #}
SELECT
block_number,
file_name,
{{ dbt_utils.generate_surrogate_key(['block_number']) }} AS complete_{{ source_name.lower() }}_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp,
_inserted_timestamp,
'{{ invocation_id }}' AS _invocation_id
FROM
{% if is_incremental() %}
{{ ref('bronze__' ~ source_name.lower()) }}
WHERE
_inserted_timestamp >= (
SELECT
COALESCE(MAX(_inserted_timestamp), '1970-01-01'::TIMESTAMP) AS _inserted_timestamp
FROM
{{ this }}
)
{% else %}
{{ ref('bronze__' ~ source_name.lower() ~ '_fr') }}
{% endif %}
QUALIFY (ROW_NUMBER() OVER (PARTITION BY block_number ORDER BY _inserted_timestamp DESC)) = 1

View File

@ -0,0 +1,50 @@
{# Set variables #}
{%- set source_name = 'TRACES' -%}
{%- set model_type = 'COMPLETE' -%}
{%- set full_refresh_type = var((source_name ~ '_complete_full_refresh').upper(), false) -%}
{% set post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION on equality(block_number)" %}
{# Log configuration details #}
{{ log_complete_details(
post_hook = post_hook,
full_refresh_type = full_refresh_type
) }}
{# Set up dbt configuration #}
-- depends_on: {{ ref('bronze__' ~ source_name.lower()) }}
{{ config (
materialized = "incremental",
unique_key = "block_number",
cluster_by = "ROUND(block_number, -3)",
post_hook = post_hook,
full_refresh = full_refresh_type,
tags = ['streamline_core_complete']
) }}
{# Main query starts here #}
SELECT
block_number,
file_name,
{{ dbt_utils.generate_surrogate_key(['block_number']) }} AS complete_{{ source_name.lower() }}_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp,
_inserted_timestamp,
'{{ invocation_id }}' AS _invocation_id
FROM
{% if is_incremental() %}
{{ ref('bronze__' ~ source_name.lower()) }}
WHERE
_inserted_timestamp >= (
SELECT
COALESCE(MAX(_inserted_timestamp), '1970-01-01'::TIMESTAMP) AS _inserted_timestamp
FROM
{{ this }}
)
{% else %}
{{ ref('bronze__' ~ source_name.lower() ~ '_fr') }}
{% endif %}
QUALIFY (ROW_NUMBER() OVER (PARTITION BY block_number ORDER BY _inserted_timestamp DESC)) = 1

View File

@ -0,0 +1,50 @@
{# Set variables #}
{%- set source_name = 'TRANSACTIONS' -%}
{%- set model_type = 'COMPLETE' -%}
{%- set full_refresh_type = var((source_name ~ '_complete_full_refresh').upper(), false) -%}
{% set post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION on equality(block_number)"%}
{# Log configuration details #}
{{ log_complete_details(
post_hook = post_hook,
full_refresh_type = full_refresh_type
) }}
{# Set up dbt configuration #}
-- depends_on: {{ ref('bronze__' ~ source_name.lower()) }}
{{ config (
materialized = "incremental",
unique_key = "block_number",
cluster_by = "ROUND(block_number, -3)",
post_hook = post_hook,
full_refresh = full_refresh_type,
tags = ['streamline_core_complete']
) }}
{# Main query starts here #}
SELECT
block_number,
file_name,
{{ dbt_utils.generate_surrogate_key(['block_number']) }} AS complete_{{ source_name.lower() }}_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp,
_inserted_timestamp,
'{{ invocation_id }}' AS _invocation_id
FROM
{% if is_incremental() %}
{{ ref('bronze__' ~ source_name.lower()) }}
WHERE
_inserted_timestamp >= (
SELECT
COALESCE(MAX(_inserted_timestamp), '1970-01-01'::TIMESTAMP) AS _inserted_timestamp
FROM
{{ this }}
)
{% else %}
{{ ref('bronze__' ~ source_name.lower() ~ '_fr') }}
{% endif %}
QUALIFY (ROW_NUMBER() OVER (PARTITION BY block_number ORDER BY _inserted_timestamp DESC)) = 1

View File

@ -0,0 +1,112 @@
{# Set variables #}
{%- set model_name = 'BLOCKS_TRANSACTIONS' -%}
{%- set model_type = 'HISTORY' -%}
{%- set default_vars = set_default_variables_streamline(model_name, model_type) -%}
{# Set up parameters for the streamline process. These will come from the vars set in dbt_project.yml #}
{%- set streamline_params = set_streamline_parameters(
model_name=model_name,
model_type=model_type
) -%}
{%- set node_url = default_vars['node_url'] -%}
{%- set node_secret_path = default_vars['node_secret_path'] -%}
{%- set model_quantum_state = default_vars['model_quantum_state'] -%}
{%- set sql_limit = streamline_params['sql_limit'] -%}
{%- set testing_limit = default_vars['testing_limit'] -%}
{%- set order_by_clause = default_vars['order_by_clause'] -%}
{%- set new_build = default_vars['new_build'] -%}
{%- set method_params = streamline_params['method_params'] -%}
{%- set method = streamline_params['method'] -%}
{# Log configuration details #}
{{ log_streamline_details(
model_name=model_name,
model_type=model_type,
node_url=node_url,
model_quantum_state=model_quantum_state,
sql_limit=sql_limit,
testing_limit=testing_limit,
order_by_clause=order_by_clause,
new_build=new_build,
streamline_params=streamline_params,
method_params=method_params,
method=method
) }}
{# Set up dbt configuration #}
{{ config (
materialized = "view",
post_hook = fsc_utils.if_data_call_function_v2(
func = 'streamline.udf_bulk_rest_api_v2',
target = "{{this.schema}}.{{this.identifier}}",
params = streamline_params
),
tags = ['streamline_core_history']
) }}
{# Main query starts here #}
WITH
{% if not new_build %}
last_3_days AS (
SELECT block_number
FROM {{ ref("_block_lookback") }}
),
{% endif %}
{# Identify blocks that need processing #}
to_do AS (
SELECT block_number
FROM {{ ref("streamline__blocks") }}
WHERE
block_number IS NOT NULL
{% if not new_build %}
AND block_number <= (SELECT block_number FROM last_3_days)
{% endif %}
EXCEPT
SELECT block_number
FROM {{ ref("streamline__blocks_complete") }} b
INNER JOIN {{ ref("streamline__transactions_complete") }} t USING(block_number)
WHERE 1=1
{% if not new_build %}
AND block_number <= (SELECT block_number FROM last_3_days)
{% endif %}
),
ready_blocks AS (
SELECT block_number
FROM to_do
{% if testing_limit is not none %}
LIMIT {{ testing_limit }}
{% endif %}
)
{# Generate API requests for each block #}
SELECT
block_number,
ROUND(block_number, -3) AS partition_key,
live.udf_api(
'POST',
'{{ node_url }}',
OBJECT_CONSTRUCT(
'Content-Type', 'application/json',
'fsc-quantum-state', '{{ model_quantum_state }}'
),
OBJECT_CONSTRUCT(
'id', block_number,
'jsonrpc', '2.0',
'method', '{{ method }}',
'params', {{ method_params }}
),
'{{ node_secret_path }}'
) AS request
FROM
ready_blocks
{{ order_by_clause }}
LIMIT {{ sql_limit }}

View File

@ -0,0 +1,133 @@
{# Set variables #}
{%- set model_name = 'CONFIRM_BLOCKS' -%}
{%- set model_type = 'HISTORY' -%}
{%- set default_vars = set_default_variables_streamline(model_name, model_type) -%}
{# Set up parameters for the streamline process. These will come from the vars set in dbt_project.yml #}
{%- set streamline_params = set_streamline_parameters(
model_name=model_name,
model_type=model_type
) -%}
{%- set node_url = default_vars['node_url'] -%}
{%- set node_secret_path = default_vars['node_secret_path'] -%}
{%- set model_quantum_state = default_vars['model_quantum_state'] -%}
{%- set sql_limit = streamline_params['sql_limit'] -%}
{%- set testing_limit = default_vars['testing_limit'] -%}
{%- set order_by_clause = default_vars['order_by_clause'] -%}
{%- set new_build = default_vars['new_build'] -%}
{%- set method_params = streamline_params['method_params'] -%}
{%- set method = streamline_params['method'] -%}
{# Log configuration details #}
{{ log_streamline_details(
model_name=model_name,
model_type=model_type,
node_url=node_url,
model_quantum_state=model_quantum_state,
sql_limit=sql_limit,
testing_limit=testing_limit,
order_by_clause=order_by_clause,
new_build=new_build,
streamline_params=streamline_params,
method_params=method_params,
method=method
) }}
{# Set up dbt configuration #}
{{ config (
materialized = "view",
post_hook = fsc_utils.if_data_call_function_v2(
func = 'streamline.udf_bulk_rest_api_v2',
target = "{{this.schema}}.{{this.identifier}}",
params = streamline_params
),
tags = ['streamline_core_history_confirm_blocks']
) }}
{# Main query starts here #}
WITH
{% if not new_build %}
last_3_days AS (
SELECT block_number
FROM {{ ref("_block_lookback") }}
),
{% endif %}
{# Delay blocks #}
look_back AS (
SELECT
block_number
FROM
{{ ref("_max_block_by_hour") }}
qualify ROW_NUMBER() over (
ORDER BY
block_number DESC
) = 6
),
{# Identify blocks that need processing #}
to_do AS (
SELECT block_number
FROM {{ ref("streamline__blocks") }}
WHERE
block_number IS NOT NULL
AND block_number <= (SELECT block_number FROM look_back)
{% if not new_build %}
AND block_number <= (SELECT block_number FROM last_3_days)
{% endif %}
EXCEPT
{# Exclude blocks that have already been processed #}
SELECT block_number
FROM {{ ref('streamline__' ~ model_name.lower() ~ '_complete') }}
WHERE 1=1
AND block_number IS NOT NULL
AND block_number <= (SELECT block_number FROM look_back)
AND _inserted_timestamp >= DATEADD(
'day',
-4,
SYSDATE()
)
{% if not new_build %}
AND block_number <= (SELECT block_number FROM last_3_days)
{% endif %}
)
{# Prepare the final list of blocks to process #}
,ready_blocks AS (
SELECT block_number
FROM to_do
{% if testing_limit is not none %}
LIMIT {{ testing_limit }}
{% endif %}
)
{# Generate API requests for each block #}
SELECT
block_number,
ROUND(block_number, -3) AS partition_key,
live.udf_api(
'POST',
'{{ node_url }}',
OBJECT_CONSTRUCT(
'Content-Type', 'application/json',
'fsc-quantum-state', '{{ model_quantum_state }}'
),
OBJECT_CONSTRUCT(
'id', block_number,
'jsonrpc', '2.0',
'method', '{{ method }}',
'params', {{ method_params }}
),
'{{ node_secret_path }}'
) AS request
FROM
ready_blocks
{{ order_by_clause }}
LIMIT {{ sql_limit }}

View File

@ -1,76 +0,0 @@
{{ config (
materialized = "view",
post_hook = if_data_call_function(
func = "{{this.schema}}.udf_bulk_json_rpc(object_construct('sql_source', '{{this.identifier}}', 'external_table', 'debug_traceBlockByNumber', 'sql_limit', {{var('sql_limit','60000')}}, 'producer_batch_size', {{var('producer_batch_size','15000')}}, 'worker_batch_size', {{var('worker_batch_size','15000')}}, 'call_type', 'rest', 'exploded_key','[\"result\"]'))",
target = "{{this.schema}}.{{this.identifier}}"
),
tags = ['streamline_core_history']
) }}
WITH last_3_days AS (
SELECT
block_number
FROM
{{ ref("_block_lookback") }}
),
blocks AS (
SELECT
block_number
FROM
{{ ref("streamline__blocks") }}
WHERE
block_number <= (
SELECT
block_number
FROM
last_3_days
)
EXCEPT
SELECT
block_number
FROM
{{ ref("streamline__complete_debug_traceBlockByNumber") }}
WHERE
block_number <= (
SELECT
block_number
FROM
last_3_days
)
)
SELECT
PARSE_JSON(
CONCAT(
'{"jsonrpc": "2.0",',
'"method": "debug_traceBlockByNumber", "params":["',
REPLACE(
concat_ws(
'',
'0x',
to_char(
block_number :: INTEGER,
'XXXXXXXX'
)
),
' ',
''
),
'",{"tracer": "callTracer"}',
'],"id":"',
block_number :: STRING,
'"}'
)
) AS request
FROM
blocks
WHERE
block_number IS NOT NULL
AND block_number NOT IN (
SELECT
block_number
FROM
{{ ref("silver_observability__excluded_trace_blocks") }}
)
ORDER BY
block_number ASC

View File

@ -1,67 +0,0 @@
{{ config (
materialized = "view",
post_hook = if_data_call_function(
func = "{{this.schema}}.udf_bulk_json_rpc(object_construct('sql_source', '{{this.identifier}}', 'external_table', 'qn_getBlockWithReceipts', 'sql_limit', {{var('sql_limit','50000')}}, 'producer_batch_size', {{var('producer_batch_size','25000')}}, 'worker_batch_size', {{var('worker_batch_size','12500')}}, 'batch_call_limit', {{var('batch_call_limit','10')}}, 'call_type', 'batch'))",
target = "{{this.schema}}.{{this.identifier}}"
),
tags = ['streamline_core_history']
) }}
WITH last_3_days AS (
SELECT
block_number
FROM
{{ ref("_block_lookback") }}
),
blocks AS (
SELECT
block_number
FROM
{{ ref("streamline__blocks") }}
WHERE
block_number <= (
SELECT
block_number
FROM
last_3_days
)
EXCEPT
SELECT
block_number
FROM
{{ ref("streamline__complete_qn_getBlockWithReceipts") }}
WHERE
block_number <= (
SELECT
block_number
FROM
last_3_days
)
)
SELECT
PARSE_JSON(
CONCAT(
'{"jsonrpc": "2.0",',
'"method": "qn_getBlockWithReceipts", "params":["',
REPLACE(
concat_ws(
'',
'0x',
to_char(
block_number :: INTEGER,
'XXXXXXXX'
)
),
' ',
''
),
'"],"id":"',
block_number :: STRING,
'"}'
)
) AS request
FROM
blocks
ORDER BY
block_number ASC

View File

@ -0,0 +1,113 @@
{# Set variables #}
{%- set model_name = 'RECEIPTS' -%}
{%- set model_type = 'HISTORY' -%}
{%- set default_vars = set_default_variables_streamline(model_name, model_type) -%}
{# Set up parameters for the streamline process. These will come from the vars set in dbt_project.yml #}
{%- set streamline_params = set_streamline_parameters(
model_name=model_name,
model_type=model_type
) -%}
{%- set node_url = default_vars['node_url'] -%}
{%- set node_secret_path = default_vars['node_secret_path'] -%}
{%- set model_quantum_state = default_vars['model_quantum_state'] -%}
{%- set sql_limit = streamline_params['sql_limit'] -%}
{%- set testing_limit = default_vars['testing_limit'] -%}
{%- set order_by_clause = default_vars['order_by_clause'] -%}
{%- set new_build = default_vars['new_build'] -%}
{%- set method_params = streamline_params['method_params'] -%}
{%- set method = streamline_params['method'] -%}
{# Log configuration details #}
{{ log_streamline_details(
model_name=model_name,
model_type=model_type,
node_url=node_url,
model_quantum_state=model_quantum_state,
sql_limit=sql_limit,
testing_limit=testing_limit,
order_by_clause=order_by_clause,
new_build=new_build,
streamline_params=streamline_params,
method_params=method_params,
method=method
) }}
{# Set up dbt configuration #}
{{ config (
materialized = "view",
post_hook = fsc_utils.if_data_call_function_v2(
func = 'streamline.udf_bulk_rest_api_v2',
target = "{{this.schema}}.{{this.identifier}}",
params = streamline_params
),
tags = ['streamline_core_history_receipts']
) }}
{# Main query starts here #}
WITH
{% if not new_build %}
last_3_days AS (
SELECT block_number
FROM {{ ref("_block_lookback") }}
),
{% endif %}
{# Identify blocks that need processing #}
to_do AS (
SELECT block_number
FROM {{ ref("streamline__blocks") }}
WHERE
block_number IS NOT NULL
{% if not new_build %}
AND block_number <= (SELECT block_number FROM last_3_days)
{% endif %}
EXCEPT
{# Exclude blocks that have already been processed #}
SELECT block_number
FROM {{ ref('streamline__' ~ model_name.lower() ~ '_complete') }}
WHERE 1=1
{% if not new_build %}
AND block_number <= (SELECT block_number FROM last_3_days)
{% endif %}
)
{# Prepare the final list of blocks to process #}
,ready_blocks AS (
SELECT block_number
FROM to_do
{% if testing_limit is not none %}
LIMIT {{ testing_limit }}
{% endif %}
)
{# Generate API requests for each block #}
SELECT
block_number,
ROUND(block_number, -3) AS partition_key,
live.udf_api(
'POST',
'{{ node_url }}',
OBJECT_CONSTRUCT(
'Content-Type', 'application/json',
'fsc-quantum-state', '{{ model_quantum_state }}'
),
OBJECT_CONSTRUCT(
'id', block_number,
'jsonrpc', '2.0',
'method', '{{ method }}',
'params', {{ method_params }}
),
'{{ node_secret_path }}'
) AS request
FROM
ready_blocks
{{ order_by_clause }}
LIMIT {{ sql_limit }}

View File

@ -0,0 +1,113 @@
{# Set variables #}
{%- set model_name = 'TRACES' -%}
{%- set model_type = 'HISTORY' -%}
{%- set default_vars = set_default_variables_streamline(model_name, model_type) -%}
{# Set up parameters for the streamline process. These will come from the vars set in dbt_project.yml #}
{%- set streamline_params = set_streamline_parameters(
model_name=model_name,
model_type=model_type
) -%}
{%- set node_url = default_vars['node_url'] -%}
{%- set node_secret_path = default_vars['node_secret_path'] -%}
{%- set model_quantum_state = default_vars['model_quantum_state'] -%}
{%- set sql_limit = streamline_params['sql_limit'] -%}
{%- set testing_limit = default_vars['testing_limit'] -%}
{%- set order_by_clause = default_vars['order_by_clause'] -%}
{%- set new_build = default_vars['new_build'] -%}
{%- set method_params = streamline_params['method_params'] -%}
{%- set method = streamline_params['method'] -%}
{# Log configuration details #}
{{ log_streamline_details(
model_name=model_name,
model_type=model_type,
node_url=node_url,
model_quantum_state=model_quantum_state,
sql_limit=sql_limit,
testing_limit=testing_limit,
order_by_clause=order_by_clause,
new_build=new_build,
streamline_params=streamline_params,
method_params=method_params,
method=method
) }}
{# Set up dbt configuration #}
{{ config (
materialized = "view",
post_hook = fsc_utils.if_data_call_function_v2(
func = 'streamline.udf_bulk_rest_api_v2',
target = "{{this.schema}}.{{this.identifier}}",
params = streamline_params
),
tags = ['streamline_core_history']
) }}
{# Main query starts here #}
WITH
{% if not new_build %}
last_3_days AS (
SELECT block_number
FROM {{ ref("_block_lookback") }}
),
{% endif %}
{# Identify blocks that need processing #}
to_do AS (
SELECT block_number
FROM {{ ref("streamline__blocks") }}
WHERE
block_number IS NOT NULL
{% if not new_build %}
AND block_number <= (SELECT block_number FROM last_3_days)
{% endif %}
EXCEPT
{# Exclude blocks that have already been processed #}
SELECT block_number
FROM {{ ref('streamline__' ~ model_name.lower() ~ '_complete') }}
WHERE 1=1
{% if not new_build %}
AND block_number <= (SELECT block_number FROM last_3_days)
{% endif %}
)
{# Prepare the final list of blocks to process #}
,ready_blocks AS (
SELECT block_number
FROM to_do
{% if testing_limit is not none %}
LIMIT {{ testing_limit }}
{% endif %}
)
{# Generate API requests for each block #}
SELECT
block_number,
ROUND(block_number, -3) AS partition_key,
live.udf_api(
'POST',
'{{ node_url }}',
OBJECT_CONSTRUCT(
'Content-Type', 'application/json',
'fsc-quantum-state', '{{ model_quantum_state }}'
),
OBJECT_CONSTRUCT(
'id', block_number,
'jsonrpc', '2.0',
'method', '{{ method }}',
'params', {{ method_params }}
),
'{{ node_secret_path }}'
) AS request
FROM
ready_blocks
{{ order_by_clause }}
LIMIT {{ sql_limit }}

View File

@ -0,0 +1,126 @@
{# Set variables #}
{%- set model_name = 'BLOCKS_TRANSACTIONS' -%}
{%- set model_type = 'REALTIME' -%}
{%- set min_block = var('GLOBAL_START_UP_BLOCK', none) -%}
{%- set default_vars = set_default_variables_streamline(model_name, model_type) -%}
{# Set up parameters for the streamline process. These will come from the vars set in dbt_project.yml #}
{%- set streamline_params = set_streamline_parameters(
model_name=model_name,
model_type=model_type
) -%}
{%- set node_url = default_vars['node_url'] -%}
{%- set node_secret_path = default_vars['node_secret_path'] -%}
{%- set model_quantum_state = default_vars['model_quantum_state'] -%}
{%- set sql_limit = streamline_params['sql_limit'] -%}
{%- set testing_limit = default_vars['testing_limit'] -%}
{%- set order_by_clause = default_vars['order_by_clause'] -%}
{%- set new_build = default_vars['new_build'] -%}
{%- set method_params = streamline_params['method_params'] -%}
{%- set method = streamline_params['method'] -%}
{# Log configuration details #}
{{ log_streamline_details(
model_name=model_name,
model_type=model_type,
node_url=node_url,
model_quantum_state=model_quantum_state,
sql_limit=sql_limit,
testing_limit=testing_limit,
order_by_clause=order_by_clause,
new_build=new_build,
streamline_params=streamline_params,
method_params=method_params,
method=method,
min_block=min_block
) }}
{# Set up dbt configuration #}
{{ config (
materialized = "view",
post_hook = fsc_utils.if_data_call_function_v2(
func = 'streamline.udf_bulk_rest_api_v2',
target = "{{this.schema}}.{{this.identifier}}",
params = streamline_params
),
tags = ['streamline_core_realtime']
) }}
{# Main query starts here #}
WITH
{% if not new_build %}
last_3_days AS (
SELECT block_number
FROM {{ ref("_block_lookback") }}
),
{% endif %}
{# Identify blocks that need processing #}
to_do AS (
SELECT block_number
FROM {{ ref("streamline__blocks") }}
WHERE
block_number IS NOT NULL
{% if not new_build %}
AND block_number >= (SELECT block_number FROM last_3_days)
{% endif %}
{% if min_block is not none %}
AND block_number >= {{ min_block }}
{% endif %}
EXCEPT
SELECT block_number
FROM {{ ref("streamline__blocks_complete") }} b
INNER JOIN {{ ref("streamline__transactions_complete") }} t USING(block_number)
WHERE 1=1
{% if not new_build %}
AND block_number >= (SELECT block_number FROM last_3_days)
{% endif %}
),
ready_blocks AS (
SELECT block_number
FROM to_do
{% if not new_build %}
UNION
SELECT block_number
FROM {{ ref("_unconfirmed_blocks") }}
UNION
SELECT block_number
FROM {{ ref("_missing_txs") }}
{% endif %}
{% if testing_limit is not none %}
LIMIT {{ testing_limit }}
{% endif %}
)
{# Generate API requests for each block #}
SELECT
block_number,
ROUND(block_number, -3) AS partition_key,
live.udf_api(
'POST',
'{{ node_url }}',
OBJECT_CONSTRUCT(
'Content-Type', 'application/json',
'fsc-quantum-state', '{{ model_quantum_state }}'
),
OBJECT_CONSTRUCT(
'id', block_number,
'jsonrpc', '2.0',
'method', '{{ method }}',
'params', {{ method_params }}
),
'{{ node_secret_path }}'
) AS request
FROM
ready_blocks
{{ order_by_clause }}
LIMIT {{ sql_limit }}

View File

@ -1,19 +1,63 @@
{{ config (
materialized = "view",
post_hook = if_data_call_function(
func = "{{this.schema}}.udf_bulk_json_rpc(object_construct('sql_source', '{{this.identifier}}', 'external_table', 'confirm_blocks', 'sql_limit', {{var('sql_limit','40000')}}, 'producer_batch_size', {{var('producer_batch_size','10000')}}, 'worker_batch_size', {{var('worker_batch_size','1000')}}, 'batch_call_limit', {{var('batch_call_limit','10')}}))",
target = "{{this.schema}}.{{this.identifier}}"
),
tags = ['streamline_core_realtime']
{# Set variables #}
{%- set model_name = 'CONFIRM_BLOCKS' -%}
{%- set model_type = 'REALTIME' -%}
{%- set min_block = var('GLOBAL_START_UP_BLOCK', none) -%}
{%- set default_vars = set_default_variables_streamline(model_name, model_type) -%}
{# Set up parameters for the streamline process. These will come from the vars set in dbt_project.yml #}
{%- set streamline_params = set_streamline_parameters(
model_name=model_name,
model_type=model_type
) -%}
{%- set node_url = default_vars['node_url'] -%}
{%- set node_secret_path = default_vars['node_secret_path'] -%}
{%- set model_quantum_state = default_vars['model_quantum_state'] -%}
{%- set sql_limit = streamline_params['sql_limit'] -%}
{%- set testing_limit = default_vars['testing_limit'] -%}
{%- set order_by_clause = default_vars['order_by_clause'] -%}
{%- set new_build = default_vars['new_build'] -%}
{%- set method_params = streamline_params['method_params'] -%}
{%- set method = streamline_params['method'] -%}
{# Log configuration details #}
{{ log_streamline_details(
model_name=model_name,
model_type=model_type,
node_url=node_url,
model_quantum_state=model_quantum_state,
sql_limit=sql_limit,
testing_limit=testing_limit,
order_by_clause=order_by_clause,
new_build=new_build,
streamline_params=streamline_params,
method_params=method_params,
method=method,
min_block=min_block
) }}
WITH last_3_days AS (
{# Set up dbt configuration #}
{{ config (
materialized = "view",
post_hook = fsc_utils.if_data_call_function_v2(
func = 'streamline.udf_bulk_rest_api_v2',
target = "{{this.schema}}.{{this.identifier}}",
params = streamline_params
),
tags = ['streamline_core_realtime_confirm_blocks']
) }}
SELECT
block_number
FROM
{{ ref("_block_lookback") }}
),
{# Main query starts here #}
WITH
{% if not new_build %}
last_3_days AS (
SELECT block_number
FROM {{ ref("_block_lookback") }}
),
{% endif %}
{# Delay blocks #}
look_back AS (
SELECT
block_number
@ -23,76 +67,72 @@ look_back AS (
ORDER BY
block_number DESC
) = 6
),
tbl AS (
SELECT
block_number
FROM
{{ ref("streamline__blocks") }}
WHERE
),
{# Identify blocks that need processing #}
to_do AS (
SELECT block_number
FROM {{ ref("streamline__blocks") }}
WHERE
block_number IS NOT NULL
AND block_number <= (
SELECT
block_number
FROM
look_back
)
AND block_number >= (
SELECT
block_number
FROM
last_3_days
)
AND block_number <= (SELECT block_number FROM look_back)
{% if not new_build %}
AND block_number >= (SELECT block_number FROM last_3_days)
{% endif %}
{% if min_block is not none %}
AND block_number >= {{ min_block }}
{% endif %}
EXCEPT
SELECT
block_number
FROM
{{ ref("streamline__complete_confirmed_blocks") }}
WHERE
block_number IS NOT NULL
AND block_number <= (
SELECT
block_number
FROM
look_back
)
{# Exclude blocks that have already been processed #}
SELECT block_number
FROM {{ ref('streamline__' ~ model_name.lower() ~ '_complete') }}
WHERE 1=1
AND block_number IS NOT NULL
AND block_number <= (SELECT block_number FROM look_back)
AND _inserted_timestamp >= DATEADD(
'day',
-4,
SYSDATE()
)
AND block_number >= (
SELECT
block_number
FROM
last_3_days
)
{% if not new_build %}
AND block_number >= (SELECT block_number FROM last_3_days)
{% endif %}
)
{# Prepare the final list of blocks to process #}
,ready_blocks AS (
SELECT block_number
FROM to_do
{% if testing_limit is not none %}
LIMIT {{ testing_limit }}
{% endif %}
)
{# Generate API requests for each block #}
SELECT
PARSE_JSON(
CONCAT(
'{"jsonrpc": "2.0",',
'"method": "eth_getBlockByNumber", "params":["',
REPLACE(
concat_ws(
'',
'0x',
to_char(
block_number :: INTEGER,
'XXXXXXXX'
)
),
' ',
''
),
'", false],"id":"',
block_number :: INTEGER,
'"}'
)
block_number,
ROUND(block_number, -3) AS partition_key,
live.udf_api(
'POST',
'{{ node_url }}',
OBJECT_CONSTRUCT(
'Content-Type', 'application/json',
'fsc-quantum-state', '{{ model_quantum_state }}'
),
OBJECT_CONSTRUCT(
'id', block_number,
'jsonrpc', '2.0',
'method', '{{ method }}',
'params', {{ method_params }}
),
'{{ node_secret_path }}'
) AS request
FROM
tbl
ORDER BY
block_number ASC
LIMIT
34000
ready_blocks
{{ order_by_clause }}
LIMIT {{ sql_limit }}

View File

@ -1,100 +0,0 @@
{{ config (
materialized = "view",
post_hook = if_data_call_function(
func = "{{this.schema}}.udf_bulk_json_rpc(object_construct('sql_source', '{{this.identifier}}', 'external_table', 'debug_traceBlockByNumber', 'sql_limit', {{var('sql_limit','60000')}}, 'producer_batch_size', {{var('producer_batch_size','5000')}}, 'worker_batch_size', {{var('worker_batch_size','500')}}, 'call_type', 'rest', 'exploded_key','[\"result\"]'))",
target = "{{this.schema}}.{{this.identifier}}"
),
tags = ['streamline_core_realtime']
) }}
WITH last_3_days AS (
SELECT
block_number
FROM
{{ ref("_block_lookback") }}
),
blocks AS (
SELECT
block_number
FROM
{{ ref("streamline__blocks") }}
WHERE
(
block_number >= (
SELECT
block_number
FROM
last_3_days
)
)
EXCEPT
SELECT
block_number
FROM
{{ ref("streamline__complete_debug_traceBlockByNumber") }}
WHERE
(
block_number >= (
SELECT
block_number
FROM
last_3_days
)
)
AND _inserted_timestamp >= DATEADD(
'day',
-4,
SYSDATE()
)
),
all_blocks AS (
SELECT
block_number
FROM
blocks
UNION
SELECT
block_number
FROM
(
SELECT
block_number
FROM
{{ ref("_missing_traces") }}
UNION
SELECT
block_number
FROM
{{ ref("_unconfirmed_blocks") }}
)
)
SELECT
PARSE_JSON(
CONCAT(
'{"jsonrpc": "2.0",',
'"method": "debug_traceBlockByNumber", "params":["',
REPLACE(
concat_ws(
'',
'0x',
to_char(
block_number :: INTEGER,
'XXXXXXXX'
)
),
' ',
''
),
'",{"tracer": "callTracer","timeout": "30s"}',
'],"id":"',
block_number :: INTEGER,
'"}'
)
) AS request
FROM
all_blocks
ORDER BY
block_number ASC
LIMIT
17000

View File

@ -1,104 +0,0 @@
{{ config (
materialized = "view",
post_hook = if_data_call_function(
func = "{{this.schema}}.udf_bulk_json_rpc(object_construct('sql_source', '{{this.identifier}}', 'external_table', 'qn_getBlockWithReceipts', 'sql_limit', {{var('sql_limit','50000')}}, 'producer_batch_size', {{var('producer_batch_size','25000')}}, 'worker_batch_size', {{var('worker_batch_size','2500')}}, 'batch_call_limit', {{var('batch_call_limit','10')}}, 'call_type', 'batch'))",
target = "{{this.schema}}.{{this.identifier}}"
),
tags = ['streamline_core_realtime']
) }}
WITH last_3_days AS (
SELECT
block_number
FROM
{{ ref("_block_lookback") }}
),
blocks AS (
SELECT
block_number :: STRING AS block_number
FROM
{{ ref("streamline__blocks") }}
WHERE
(
block_number >= (
SELECT
block_number
FROM
last_3_days
)
)
EXCEPT
SELECT
block_number :: STRING
FROM
{{ ref("streamline__complete_qn_getBlockWithReceipts") }}
WHERE
(
block_number >= (
SELECT
block_number
FROM
last_3_days
)
)
AND _inserted_timestamp >= DATEADD(
'day',
-4,
SYSDATE()
)
),
all_blocks AS (
SELECT
block_number
FROM
blocks
UNION
SELECT
block_number
FROM
(
SELECT
block_number
FROM
{{ ref("_missing_receipts") }}
UNION
SELECT
block_number
FROM
{{ ref("_missing_txs") }}
UNION
SELECT
block_number
FROM
{{ ref("_unconfirmed_blocks") }}
)
)
SELECT
PARSE_JSON(
CONCAT(
'{"jsonrpc": "2.0",',
'"method": "qn_getBlockWithReceipts", "params":["',
REPLACE(
concat_ws(
'',
'0x',
to_char(
block_number :: INTEGER,
'XXXXXXXX'
)
),
' ',
''
),
'"],"id":"',
block_number :: INTEGER,
'"}'
)
) AS request
FROM
all_blocks
ORDER BY
block_number ASC
LIMIT
17000

View File

@ -0,0 +1,130 @@
{# Set variables #}
{%- set model_name = 'RECEIPTS' -%}
{%- set model_type = 'REALTIME' -%}
{%- set min_block = var('GLOBAL_START_UP_BLOCK', none) -%}
{%- set default_vars = set_default_variables_streamline(model_name, model_type) -%}
{# Set up parameters for the streamline process. These will come from the vars set in dbt_project.yml #}
{%- set streamline_params = set_streamline_parameters(
model_name=model_name,
model_type=model_type
) -%}
{%- set node_url = default_vars['node_url'] -%}
{%- set node_secret_path = default_vars['node_secret_path'] -%}
{%- set model_quantum_state = default_vars['model_quantum_state'] -%}
{%- set sql_limit = streamline_params['sql_limit'] -%}
{%- set testing_limit = default_vars['testing_limit'] -%}
{%- set order_by_clause = default_vars['order_by_clause'] -%}
{%- set new_build = default_vars['new_build'] -%}
{%- set method_params = streamline_params['method_params'] -%}
{%- set method = streamline_params['method'] -%}
{# Log configuration details #}
{{ log_streamline_details(
model_name=model_name,
model_type=model_type,
node_url=node_url,
model_quantum_state=model_quantum_state,
sql_limit=sql_limit,
testing_limit=testing_limit,
order_by_clause=order_by_clause,
new_build=new_build,
streamline_params=streamline_params,
method_params=method_params,
method=method,
min_block=min_block
) }}
{# Set up dbt configuration #}
{{ config (
materialized = "view",
post_hook = fsc_utils.if_data_call_function_v2(
func = 'streamline.udf_bulk_rest_api_v2',
target = "{{this.schema}}.{{this.identifier}}",
params = streamline_params
),
tags = ['streamline_core_realtime_receipts']
) }}
{# Main query starts here #}
WITH
{% if not new_build %}
last_3_days AS (
SELECT block_number
FROM {{ ref("_block_lookback") }}
),
{% endif %}
{# Identify blocks that need processing #}
to_do AS (
SELECT block_number
FROM {{ ref("streamline__blocks") }}
WHERE
block_number IS NOT NULL
{% if not new_build %}
AND block_number >= (SELECT block_number FROM last_3_days)
{% endif %}
{% if min_block is not none %}
AND block_number >= {{ min_block }}
{% endif %}
EXCEPT
{# Exclude blocks that have already been processed #}
SELECT block_number
FROM {{ ref('streamline__' ~ model_name.lower() ~ '_complete') }}
WHERE 1=1
{% if not new_build %}
AND block_number >= (SELECT block_number FROM last_3_days)
{% endif %}
)
{# Prepare the final list of blocks to process #}
,ready_blocks AS (
SELECT block_number
FROM to_do
{% if not new_build %}
UNION
SELECT block_number
FROM {{ ref("_unconfirmed_blocks") }}
UNION
SELECT block_number
FROM {{ ref("_missing_txs") }}
UNION
SELECT block_number
FROM {{ ref("_missing_receipts") }}
{% endif %}
{% if testing_limit is not none %}
LIMIT {{ testing_limit }}
{% endif %}
)
{# Generate API requests for each block #}
SELECT
block_number,
ROUND(block_number, -3) AS partition_key,
live.udf_api(
'POST',
'{{ node_url }}',
OBJECT_CONSTRUCT(
'Content-Type', 'application/json',
'fsc-quantum-state', '{{ model_quantum_state }}'
),
OBJECT_CONSTRUCT(
'id', block_number,
'jsonrpc', '2.0',
'method', '{{ method }}',
'params', {{ method_params }}
),
'{{ node_secret_path }}'
) AS request
FROM
ready_blocks
{{ order_by_clause }}
LIMIT {{ sql_limit }}

View File

@ -0,0 +1,127 @@
{# Set variables #}
{%- set model_name = 'TRACES' -%}
{%- set model_type = 'REALTIME' -%}
{%- set min_block = var('GLOBAL_START_UP_BLOCK', none) -%}
{%- set default_vars = set_default_variables_streamline(model_name, model_type) -%}
{# Set up parameters for the streamline process. These will come from the vars set in dbt_project.yml #}
{%- set streamline_params = set_streamline_parameters(
model_name=model_name,
model_type=model_type
) -%}
{%- set node_url = default_vars['node_url'] -%}
{%- set node_secret_path = default_vars['node_secret_path'] -%}
{%- set model_quantum_state = default_vars['model_quantum_state'] -%}
{%- set sql_limit = streamline_params['sql_limit'] -%}
{%- set testing_limit = default_vars['testing_limit'] -%}
{%- set order_by_clause = default_vars['order_by_clause'] -%}
{%- set new_build = default_vars['new_build'] -%}
{%- set method_params = streamline_params['method_params'] -%}
{%- set method = streamline_params['method'] -%}
{# Log configuration details #}
{{ log_streamline_details(
model_name=model_name,
model_type=model_type,
node_url=node_url,
model_quantum_state=model_quantum_state,
sql_limit=sql_limit,
testing_limit=testing_limit,
order_by_clause=order_by_clause,
new_build=new_build,
streamline_params=streamline_params,
method_params=method_params,
method=method,
min_block=min_block
) }}
{# Set up dbt configuration #}
{{ config (
materialized = "view",
post_hook = fsc_utils.if_data_call_function_v2(
func = 'streamline.udf_bulk_rest_api_v2',
target = "{{this.schema}}.{{this.identifier}}",
params = streamline_params
),
tags = ['streamline_core_realtime']
) }}
{# Main query starts here #}
WITH
{% if not new_build %}
last_3_days AS (
SELECT block_number
FROM {{ ref("_block_lookback") }}
),
{% endif %}
{# Identify blocks that need processing #}
to_do AS (
SELECT block_number
FROM {{ ref("streamline__blocks") }}
WHERE
block_number IS NOT NULL
{% if not new_build %}
AND block_number >= (SELECT block_number FROM last_3_days)
{% endif %}
{% if min_block is not none %}
AND block_number >= {{ min_block }}
{% endif %}
EXCEPT
{# Exclude blocks that have already been processed #}
SELECT block_number
FROM {{ ref('streamline__' ~ model_name.lower() ~ '_complete') }}
WHERE 1=1
{% if not new_build %}
AND block_number >= (SELECT block_number FROM last_3_days)
{% endif %}
)
{# Prepare the final list of blocks to process #}
,ready_blocks AS (
SELECT block_number
FROM to_do
{% if not new_build %}
UNION
SELECT block_number
FROM {{ ref("_unconfirmed_blocks") }}
UNION
SELECT block_number
FROM {{ ref("_missing_traces") }}
{% endif %}
{% if testing_limit is not none %}
LIMIT {{ testing_limit }}
{% endif %}
)
{# Generate API requests for each block #}
SELECT
block_number,
ROUND(block_number, -3) AS partition_key,
live.udf_api(
'POST',
'{{ node_url }}',
OBJECT_CONSTRUCT(
'Content-Type', 'application/json',
'fsc-quantum-state', '{{ model_quantum_state }}'
),
OBJECT_CONSTRUCT(
'id', block_number,
'jsonrpc', '2.0',
'method', '{{ method }}',
'params', {{ method_params }}
),
'{{ node_secret_path }}'
) AS request
FROM
ready_blocks
{{ order_by_clause }}
LIMIT {{ sql_limit }}

View File

@ -2,34 +2,20 @@
materialized = "ephemeral"
) }}
WITH lookback AS (
SELECT
block_number
FROM
{{ ref("_block_lookback") }}
)
SELECT
DISTINCT tx.block_number block_number
DISTINCT tx.block_number
FROM
{{ ref("silver__transactions") }}
{{ ref("test_silver__transactions_recent") }}
tx
LEFT JOIN {{ ref("silver__traces") }}
tr
ON tx.block_number = tr.block_number
AND tx.tx_hash = tr.tx_hash
LEFT JOIN {{ ref("test_silver__traces_recent") }}
tr USING (
block_number,
tx_hash
)
WHERE
tx.block_timestamp >= DATEADD('hour', -84, SYSDATE())
AND tr.tx_hash IS NULL
tr.tx_hash IS NULL
AND (
tx.from_address <> '0x0000000000000000000000000000000000000000'
AND tx.to_address <> '0x0000000000000000000000000000000000000000'
)
AND tx.block_number >= (
SELECT
block_number
FROM
lookback
)
AND tr.block_timestamp >= DATEADD('hour', -84, SYSDATE())
AND tr.block_timestamp IS NOT NULL
AND tx.block_timestamp > DATEADD('day', -5, SYSDATE())

View File

@ -1,20 +1,36 @@
{%- if flags.WHICH == 'compile' and execute -%}
{% set config_log = '\n' %}
{% set config_log = config_log ~ '\n=== DBT Model Config ===\n'%}
{% set config_log = config_log ~ '\n{{ config (\n' %}
{% set config_log = config_log ~ ' materialized = "' ~ config.get('materialized') ~ '",\n' %}
{% set config_log = config_log ~ ' tags = ' ~ config.get('tags') | tojson ~ '\n' %}
{% set config_log = config_log ~ ') }}\n' %}
{{ log(config_log, info=True) }}
{{ log("", info=True) }}
{%- endif -%}
{{ config (
materialized = "view",
tags = ['streamline_core_complete']
) }}
{% if execute %}
{% set height = run_query('SELECT streamline.udf_get_chainhead()') %}
{% set block_height = height.columns [0].values() [0] %}
{% else %}
{% set block_height = 0 %}
{% endif %}
SELECT
_id AS block_number
_id,
(
({{ var('GLOBAL_BLOCKS_PER_HOUR',0) }} / 60) * {{ var('GLOBAL_CHAINHEAD_DELAY',3) }}
) :: INT AS block_number_delay, --minute-based block delay
(_id - block_number_delay) :: INT AS block_number,
utils.udf_int_to_hex(block_number) AS block_number_hex
FROM
{{ ref("silver__number_sequence") }}
{{ ref('silver__number_sequence') }}
WHERE
_id <= {{ block_height }}
ORDER BY
_id ASC
_id <= (
SELECT
COALESCE(
block_number,
0
)
FROM
{{ ref("streamline__get_chainhead") }}
)

View File

@ -0,0 +1,54 @@
{%- set model_quantum_state = var('CHAINHEAD_QUANTUM_STATE', 'livequery') -%}
{%- set node_url = var('GLOBAL_NODE_URL', '{Service}/{Authentication}') -%}
{%- if flags.WHICH == 'compile' and execute -%}
{{ log("=== Current Variable Settings ===", info=True) }}
{{ log("CHAINHEAD_QUANTUM_STATE: " ~ model_quantum_state, info=True) }}
{{ log("", info=True) }}
{{ log("=== API Details ===", info=True) }}
{{ log("NODE_URL: " ~ node_url, info=True) }}
{{ log("NODE_SECRET_PATH: " ~ var('GLOBAL_NODE_SECRET_PATH'), info=True) }}
{{ log("", info=True) }}
{% set config_log = '\n' %}
{% set config_log = config_log ~ '\n=== DBT Model Config ===\n'%}
{% set config_log = config_log ~ '\n{{ config (\n' %}
{% set config_log = config_log ~ ' materialized = "' ~ config.get('materialized') ~ '",\n' %}
{% set config_log = config_log ~ ' tags = ' ~ config.get('tags') | tojson ~ '\n' %}
{% set config_log = config_log ~ ') }}\n' %}
{{ log(config_log, info=True) }}
{{ log("", info=True) }}
{%- endif -%}
{{ config (
materialized = 'table',
tags = ['streamline_core_complete','chainhead']
) }}
SELECT
live.udf_api(
'POST',
'{{ node_url }}',
OBJECT_CONSTRUCT(
'Content-Type', 'application/json',
'fsc-quantum-state', '{{ model_quantum_state }}'
),
OBJECT_CONSTRUCT(
'id',
0,
'jsonrpc',
'2.0',
'method',
'eth_blockNumber',
'params',
[]
),
'{{ var('GLOBAL_NODE_SECRET_PATH') }}'
) AS resp,
utils.udf_hex_to_int(
resp :data :result :: STRING
) AS block_number

View File

@ -0,0 +1,9 @@
version: 2
models:
- name: streamline__get_chainhead
description: "This model is used to get the chainhead from the blockchain."
columns:
- name: BLOCK_NUMBER
tests:
- not_null