Compare commits

..

No commits in common. "main" and "v0.0.3" have entirely different histories.
main ... v0.0.3

68 changed files with 681 additions and 1300 deletions

View File

@ -1,9 +1,6 @@
{% macro create_ibc_streamline_udfs() %}
{% if var("UPDATE_UDFS_AND_SPS") %}
{% do run_query("CREATE SCHEMA IF NOT EXISTS streamline") %}
{{ create_udf_bulk_rest_api_v2() }}
{{ create_udf_bulk_decode_logs() }}
{{ create_udf_bulk_decode_traces() }}
{% endif %}
{% endmacro %}

View File

@ -0,0 +1,70 @@
{% macro streamline_external_table_query_v2(
model,
partition_function
) %}
{% set days = var("BRONZE_LOOKBACK_DAYS")%}
WITH meta AS (
SELECT
last_modified AS inserted_timestamp,
file_name,
{{ partition_function }} AS partition_key
FROM
TABLE(
information_schema.external_table_file_registration_history(
start_time => DATEADD('day', -ABS({{days}}), CURRENT_TIMESTAMP()),
table_name => '{{ source( "bronze_streamline", model) }}')
) A
)
SELECT
s.*,
b.file_name,
inserted_timestamp
FROM
{{ source(
"bronze_streamline",
model
) }}
s
JOIN meta b
ON b.file_name = metadata$filename
AND b.partition_key = s.partition_key
WHERE
b.partition_key = s.partition_key
AND DATA :error IS NULL
{% endmacro %}
{% macro streamline_external_table_FR_query_v2(
model,
partition_function
) %}
WITH meta AS (
SELECT
registered_on AS inserted_timestamp,
file_name,
{{ partition_function }} AS partition_key
FROM
TABLE(
information_schema.external_table_files(
table_name => '{{ source( "bronze_streamline", model) }}'
)
) A
)
SELECT
s.*,
b.file_name,
inserted_timestamp
FROM
{{ source(
"bronze_streamline",
model
) }}
s
JOIN meta b
ON b.file_name = metadata$filename
AND b.partition_key = s.partition_key
WHERE
b.partition_key = s.partition_key
AND DATA :error IS NULL
{% endmacro %}

View File

@ -6,10 +6,10 @@
CREATE OR REPLACE EXTERNAL FUNCTION streamline.udf_bulk_rest_api_v2(json object) returns array api_integration =
{% if target.name == "prod" %}
{{ log("Creating prod udf_bulk_rest_api_v2", info=True) }}
{{ var("API_INTEGRATION") }} AS 'https://{{ var("EXTERNAL_FUNCTION_URI") | lower }}udf_bulk_rest_api'
{{ var("API_INTEGRATION_PROD") }} AS 'https://{{ var("EXTERNAL_FUNCTION_URI") | lower }}udf_bulk_rest_api'
{% elif target.name == "dev" %}
{{ log("Creating dev udf_bulk_rest_api_v2", info=True) }}
{{ var("API_INTEGRATION") }} AS 'https://{{ var("EXTERNAL_FUNCTION_URI") | lower }}udf_bulk_rest_api'
{{ var("API_INTEGRATION_DEV") }} AS 'https://{{ var("EXTERNAL_FUNCTION_URI") | lower }}udf_bulk_rest_api'
{% else %}
{{ log("Creating default (dev) udf_bulk_rest_api_v2", info=True) }}
{{ var("config")["dev"]["API_INTEGRATION"] }} AS 'https://{{ var("config")["dev"]["EXTERNAL_FUNCTION_URI"] | lower }}udf_bulk_rest_api'

View File

@ -1,11 +0,0 @@
{% macro generate_schema_name(custom_schema_name=none, node=none) -%}
{% set node_name = node.name %}
{% set split_name = node_name.split('__') %}
{{ split_name[0] | trim }}
{%- endmacro %}
{% macro generate_alias_name(custom_alias_name=none, node=none) -%}
{% set node_name = node.name %}
{% set split_name = node_name.split('__') %}
{{ split_name[1] | trim }}
{%- endmacro %}

View File

@ -1,14 +0,0 @@
{% macro log_model_details() %}
{%- if execute -%}
/*
DBT Model Config:
{{ model.config | tojson(indent=2) }}
*/
/*
Raw Code:
{{ model.raw_code }}
*/
{%- endif -%}
{% endmacro %}

View File

@ -3,7 +3,7 @@
'GLOBAL_PROJECT_NAME': 'cosmos',
'GLOBAL_NODE_PROVIDER': 'quicknode',
'GLOBAL_NODE_VAULT_PATH': 'vault/prod/cosmos/quicknode/mainnet',
'GLOBAL_NODE_URL': '{Service}/{Authentication}',
'GLOBAL_NODE_URL': '{service}/{Authentication}',
'GLOBAL_WRAPPED_NATIVE_ASSET_ADDRESS': '',
'MAIN_SL_BLOCKS_PER_HOUR': 3600
} %}

View File

@ -1,12 +0,0 @@
{% macro neutron_vars() %}
{% set vars = {
'GLOBAL_PROJECT_NAME': 'neutron',
'GLOBAL_NODE_PROVIDER': 'publicnode',
'GLOBAL_NODE_VAULT_PATH': 'vault/prod/neutron/mainnet',
'GLOBAL_NODE_URL': '{Service}/{Authentication}',
'GLOBAL_WRAPPED_NATIVE_ASSET_ADDRESS': '',
'MAIN_SL_BLOCKS_PER_HOUR': 3600
} %}
{{ return(vars) }}
{% endmacro %}

View File

@ -1,64 +1,141 @@
{% macro streamline_external_table_query(
source_name,
partition_function="CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 4), '_', 1) AS INTEGER)"
source_name,
source_version,
partition_function,
balances,
block_id,
uses_receipts_by_hash
) %}
{% set days = var("BRONZE_LOOKBACK_DAYS")%}
{% if source_version != '' %}
{% set source_version = '_' ~ source_version.lower() %}
{% endif %}
WITH meta AS (
SELECT
last_modified AS inserted_timestamp,
job_created_time AS _inserted_timestamp,
file_name,
{{ partition_function }} AS partition_key
FROM
TABLE(
information_schema.external_table_file_registration_history(
start_time => DATEADD('day', -ABS({{days}}), CURRENT_TIMESTAMP()),
table_name => '{{ source( "bronze_streamline", source_name) }}')
start_time => DATEADD('day', -3, CURRENT_TIMESTAMP()),
table_name => '{{ source( "bronze_streamline", source_name ~ source_version) }}')
) A
)
)
SELECT
s.*,
b.file_name,
inserted_timestamp
b._inserted_timestamp
{% if balances %},
r.block_timestamp :: TIMESTAMP AS block_timestamp
{% endif %}
{% if block_id %},
COALESCE(
s.value :"block_id" :: STRING,
s.metadata :request :"data" :id :: STRING,
PARSE_JSON(
s.metadata :request :"data"
) :id :: STRING
) :: INT AS block_id
{% endif %}
{% if uses_receipts_by_hash %},
s.value :"TX_HASH" :: STRING AS tx_hash
{% endif %}
FROM
{{ source("bronze_streamline", source_name) }} s
{{ source(
"bronze_streamline",
source_name ~ source_version
) }}
s
JOIN meta b
ON b.file_name = metadata$filename
AND b.partition_key = s.partition_key
{% if balances %}
JOIN {{ ref('_block_ranges') }}
r
ON r.block_id = COALESCE(
s.value :"block_id" :: INT,
s.value :"block_id" :: INT
)
{% endif %}
WHERE
b.partition_key = s.partition_key
AND DATA :error IS NULL
AND DATA IS NOT NULL
{% endmacro %}
{% macro streamline_external_table_query_fr(
source_name,
partition_function="CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 4), '_', 1) AS INTEGER)"
source_version,
partition_function,
partition_join_key,
balances,
block_id,
uses_receipts_by_hash
) %}
{% if source_version != '' %}
{% set source_version = '_' ~ source_version.lower() %}
{% endif %}
WITH meta AS (
SELECT
registered_on AS inserted_timestamp,
registered_on AS _inserted_timestamp,
file_name,
{{ partition_function }} AS partition_key
FROM
TABLE(
information_schema.external_table_files(
table_name => '{{ source( "bronze_streamline", source_name) }}'
table_name => '{{ source( "bronze_streamline", source_name ~ source_version) }}'
)
) A
)
SELECT
s.*,
b.file_name,
inserted_timestamp
b._inserted_timestamp
{% if balances %},
r.block_timestamp :: TIMESTAMP AS block_timestamp
{% endif %}
{% if block_id %},
COALESCE(
s.value :"block_id" :: STRING,
s.value :"block_id" :: STRING,
s.metadata :request :"data" :id :: STRING,
PARSE_JSON(
s.metadata :request :"data"
) :id :: STRING
) :: INT AS block_id
{% endif %}
{% if uses_receipts_by_hash %},
s.value :"TX_HASH" :: STRING AS tx_hash
{% endif %}
FROM
{{ source(
"bronze_streamline",
source_name
source_name ~ source_version
) }}
s
JOIN meta b
ON b.file_name = metadata$filename
AND b.partition_key = s.partition_key
AND b.partition_key = s.{{ partition_join_key }}
{% if balances %}
JOIN {{ ref('_block_ranges') }}
r
ON r.block_id = COALESCE(
s.value :"block_id" :: INT,
s.value :"block_id" :: INT
)
{% endif %}
WHERE
b.partition_key = s.partition_key
{% endmacro %}
b.partition_key = s.{{ partition_join_key }}
AND DATA :error IS NULL
AND DATA IS NOT NULL
{% endmacro %}

View File

@ -0,0 +1,26 @@
{# Log configuration details #}
{{ log_model_details() }}
{{ config(
materialized = 'view',
tags = ['silver','admin','variables','phase_1']
) }}
SELECT
PACKAGE,
category,
variable_key AS key,
default_value,
default_type,
_inserted_timestamp,
{{ dbt_utils.generate_surrogate_key(
['variable_key']
) }} AS dim_variables_id
FROM
{{ source(
'fsc_evm_admin',
'_master_keys'
) }}
qualify(ROW_NUMBER() over (PARTITION BY variable_key
ORDER BY
_inserted_timestamp DESC)) = 1

View File

@ -0,0 +1,20 @@
version: 2
models:
- name: admin__dim_variables
tests:
- dbt_utils.unique_combination_of_columns:
combination_of_columns:
- DIM_VARIABLES_ID
columns:
- name: KEY
tests:
- not_null
- name: DEFAULT_VALUE
tests:
- not_null
- name: DEFAULT_TYPE
tests:
- not_null
- name: DIM_VARIABLES_ID
tests:
- not_null

View File

@ -0,0 +1,27 @@
{# Log configuration details #}
{{ log_model_details() }}
{{ config(
materialized = 'view',
tags = ['silver','admin','variables','phase_1']
) }}
SELECT
f.project,
d.PACKAGE,
d.CATEGORY,
f.key,
f.value,
f.parent_key,
d.default_value,
d.default_type,
{{ dbt_utils.generate_surrogate_key(
['f.project', 'f.key', 'f.parent_key']
) }} AS ez_variables_id
FROM
{{ ref('admin__fact_variables') }}
f
LEFT JOIN {{ ref('admin__dim_variables') }}
d
ON f.key = d.key
OR f.parent_key = d.key

View File

@ -0,0 +1,26 @@
version: 2
models:
- name: admin__ez_variables
tests:
- dbt_utils.unique_combination_of_columns:
combination_of_columns:
- EZ_VARIABLES_ID
columns:
- name: PROJECT
tests:
- not_null
- name: KEY
tests:
- not_null
- name: VALUE
tests:
- not_null
- name: DEFAULT_VALUE
tests:
- not_null
- name: DEFAULT_TYPE
tests:
- not_null
- name: EZ_VARIABLES_ID
tests:
- not_null

View File

@ -0,0 +1,34 @@
{# Get variables #}
{% set vars = return_vars() %}
{# Log configuration details #}
{{ log_model_details() }}
{{ config(
materialized = 'table',
tags = ['silver','admin','rpc_settings','phase_1']
) }}
SELECT
blockchain,
receipts_by_block,
blocks_per_hour,
blocks_fields,
transactions_fields,
receipts_fields,
traces_fields,
inserted_at AS rpc_sampled_at
FROM
{{ source(
"fsc_evm_admin",
"rpc_node_logs"
) }}
WHERE
RESULT :error :: STRING IS NULL
AND LOWER(blockchain) = LOWER('{{ vars.GLOBAL_PROJECT_NAME }}')
AND LOWER(network) = LOWER('{{ vars.GLOBAL_NETWORK }}') qualify ROW_NUMBER() over (
PARTITION BY blockchain,
network
ORDER BY
rpc_sampled_at DESC
) = 1

View File

@ -0,0 +1,68 @@
{# Log configuration details #}
{{ log_model_details() }}
{{ config(
materialized = 'view',
tags = ['silver','admin','variables','phase_1']
) }}
{%- set vars_data = vars_config(all_projects=true) -%}
{%- set project = target.database.lower() | replace('_dev', '') -%}
WITH flattened_data AS (
{% for project, project_config in vars_data.items() %}
{% for key, value in project_config.items() %}
{% if value is mapping %}
{% for nested_key, nested_value in value.items() %}
SELECT
'{{ project }}' AS project,
'{{ nested_key }}' AS key,
{% if nested_value is string %}
'{{ nested_value }}' AS VALUE,
{% elif nested_value is iterable and nested_value is not string %}
'{{ nested_value | tojson }}' AS VALUE,
{% else %}
'{{ nested_value }}' AS VALUE,
{% endif %}
'{{ key }}' AS parent_key
{% if not loop.last %}UNION ALL{% endif %}
{% endfor %}
{% if not loop.last %}UNION ALL{% endif %}
{% else %}
SELECT
'{{ project }}' AS project,
'{{ key }}' AS key,
{% if value is string %}
'{{ value }}' AS VALUE,
{% elif value is iterable and value is not string %}
'{{ value | tojson }}' AS VALUE,
{% else %}
'{{ value }}' AS VALUE,
{% endif %}
NULL AS parent_key
{% if not loop.last %}UNION ALL{% endif %}
{% endif %}
{% endfor %}
{% if not loop.last %}UNION ALL{% endif %}
{% endfor %}
)
SELECT
project,
key,
VALUE,
parent_key,
{{ dbt_utils.generate_surrogate_key(
['project', 'key', 'parent_key']
) }} AS fact_variables_id
FROM
flattened_data
{% if project != 'fsc_evm' %}
WHERE
project = '{{ project }}'
{% endif %}

View File

@ -0,0 +1,20 @@
version: 2
models:
- name: admin__fact_variables
tests:
- dbt_utils.unique_combination_of_columns:
combination_of_columns:
- FACT_VARIABLES_ID
columns:
- name: PROJECT
tests:
- not_null
- name: KEY
tests:
- not_null
- name: VALUE
tests:
- not_null
- name: FACT_VARIABLES_ID
tests:
- not_null

View File

@ -0,0 +1,25 @@
{# Get variables #}
{% set vars = return_vars() %}
{# Log configuration details #}
{{ log_model_details() }}
{{ config(
materialized = 'incremental',
cluster_by = 'round(_id,-3)',
post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION on equality(_id)",
full_refresh = false,
tags = ['silver','admin','phase_1']
) }}
SELECT
ROW_NUMBER() over (
ORDER BY
SEQ4()
) - 1 :: INT AS _id
FROM
TABLE(GENERATOR(rowcount => {{ vars.GLOBAL_MAX_SEQUENCE_NUMBER }}))
WHERE 1=1
{% if is_incremental() %}
AND 1=0
{% endif %}

View File

@ -0,0 +1,14 @@
version: 2
models:
- name: admin__number_sequence
description: |
This model generates a sequence of numbers for a given range.
tests:
- dbt_utils.unique_combination_of_columns:
combination_of_columns:
- _ID
columns:
- name: _ID
tests:
- not_null
description: Primary key for the table

View File

@ -4,5 +4,6 @@
) }}
{{ streamline_external_table_query(
source_name = 'blocks'
source_name = 'blocks',
source_version = 'v2'
) }}

View File

@ -4,5 +4,6 @@
) }}
{{ streamline_external_table_query(
source_name = 'transactions'
source_name = 'transactions',
source_version = 'v2'
) }}

View File

@ -4,5 +4,6 @@
) }}
{{ streamline_external_table_query(
source_name = 'tx_count'
source_name = 'tx_counts',
source_version = 'v2'
) }}

View File

@ -4,5 +4,6 @@
) }}
{{ streamline_external_table_query_fr(
source_name = 'blocks'
source_name = 'blocks',
source_version = 'v2'
) }}

View File

@ -4,5 +4,6 @@
) }}
{{ streamline_external_table_query_fr(
source_name = 'transactions'
source_name = 'transactions',
source_version = 'v2'
) }}

View File

@ -4,5 +4,6 @@
) }}
{{ streamline_external_table_query_fr(
source_name = 'tx_count'
source_name = 'tx_counts',
source_version = 'v2'
) }}

View File

@ -1,172 +0,0 @@
version: 2
models:
- name: core__fact_blocks
description: '{{ doc("core__fact_blocks") }}'
data_tests:
- dbt_utils.recency:
datepart: hour
field: modified_timestamp
interval: 3
severity: error
tags: ['test_recency']
columns:
- name: blockchain
description: '{{ doc("blockchain") }}'
- name: block_id
description: '{{ doc("block_id") }}'
- name: block_timestamp
description: '{{ doc("block_timestamp") }}'
- name: chain_id
description: '{{ doc("chain_id") }}'
- name: tx_count
description: '{{ doc("tx_count") }}'
- name: proposer_address
description: '{{ doc("proposer_address") }}'
- name: validator_hash
description: '{{ doc("validator_hash") }}'
- name: fact_blocks_id
description: '{{ doc("pk") }}'
- name: inserted_timestamp
description: '{{ doc("inserted_timestamp") }}'
- name: modified_timestamp
description: '{{ doc("modified_timestamp") }}'
- name: core__fact_msg_attributes
description: '{{ doc("core__fact_msg_attributes") }}'
data_tests:
- dbt_utils.recency:
datepart: hour
field: modified_timestamp
interval: 3
severity: error
tags: ['test_recency']
columns:
- name: block_id
description: '{{ doc("block_id") }}'
- name: block_timestamp
description: '{{ doc("block_timestamp") }}'
- name: tx_id
description: '{{ doc("tx_id") }}'
- name: tx_succeeded
description: '{{ doc("tx_succeeded") }}'
- name: msg_group
description: '{{ doc("msg_group") }}'
- name: msg_index
description: '{{ doc("msg_index") }}'
- name: msg_type
description: '{{ doc("msg_type") }}'
- name: attribute_index
description: '{{ doc("attribute_index") }}'
- name: attribute_key
description: '{{ doc("attribute_key") }}'
- name: attribute_value
description: '{{ doc("attribute_value") }}'
- name: fact_msg_attributes_id
description: '{{ doc("pk") }}'
- name: inserted_timestamp
description: '{{ doc("inserted_timestamp") }}'
- name: modified_timestamp
description: '{{ doc("modified_timestamp") }}'
- name: core__fact_msgs
description: '{{ doc("core__fact_msgs") }}'
data_tests:
- dbt_utils.recency:
datepart: hour
field: modified_timestamp
interval: 3
severity: error
tags: ['test_recency']
columns:
- name: block_id
description: '{{ doc("block_id") }}'
- name: block_timestamp
description: '{{ doc("block_timestamp") }}'
- name: tx_id
description: '{{ doc("tx_id") }}'
- name: tx_succeeded
description: '{{ doc("tx_succeeded") }}'
- name: msg_group
description: '{{ doc("msg_group") }}'
- name: msg_type
description: '{{ doc("msg_type") }}'
- name: msg_index
description: '{{ doc("msg_index") }}'
- name: msg
description: '{{ doc("msg") }}'
- name: fact_msgs_id
description: '{{ doc("pk") }}'
- name: inserted_timestamp
description: '{{ doc("inserted_timestamp") }}'
- name: modified_timestamp
description: '{{ doc("modified_timestamp") }}'
- name: core__fact_transactions
description: '{{ doc("core__fact_transactions") }}'
data_tests:
- dbt_utils.recency:
datepart: hour
field: modified_timestamp
interval: 3
severity: error
tags: ['test_recency']
columns:
- name: block_id
description: '{{ doc("block_id") }}'
- name: block_timestamp
description: '{{ doc("block_timestamp") }}'
- name: codespace
description: '{{ doc("codespace") }}'
- name: tx_id
description: '{{ doc("tx_id") }}'
- name: tx_succeeded
description: '{{ doc("tx_succeeded") }}'
- name: tx_code
description: '{{ doc("tx_code") }}'
- name: tx_log
description: '{{ doc("tx_log") }}'
- name: gas_used
description: '{{ doc("gas_used") }}'
- name: gas_wanted
description: '{{ doc("gas_wanted") }}'
- name: fact_transactions_id
description: '{{ doc("pk") }}'
- name: inserted_timestamp
description: '{{ doc("inserted_timestamp") }}'
- name: modified_timestamp
description: '{{ doc("modified_timestamp") }}'
- name: core__fact_transactions_logs
description: '{{ doc("core__fact_transactions_logs") }}'
data_tests:
- dbt_utils.recency:
datepart: hour
field: modified_timestamp
interval: 3
severity: error
tags: ['test_recency']
columns:
- name: block_id
description: '{{ doc("block_id") }}'
- name: block_timestamp
description: '{{ doc("block_timestamp") }}'
- name: tx_id
description: '{{ doc("tx_id") }}'
- name: tx_succeeded
description: '{{ doc("tx_succeeded") }}'
- name: tx_code
description: '{{ doc("tx_code") }}'
- name: codespace
description: '{{ doc("codespace") }}'
- name: tx_log
description: '{{ doc("tx_log") }}'
- name: transactions_logs_id
description: '{{ doc("pk") }}'
- name: inserted_timestamp
description: '{{ doc("inserted_timestamp") }}'
- name: modified_timestamp
description: '{{ doc("modified_timestamp") }}'

View File

@ -2,11 +2,11 @@
{% set vars = return_vars() %}
{# Set fact_blocks specific variables #}
{#{% set rpc_vars = set_dynamic_fields('fact_blocks') %}#}
{% set rpc_vars = set_dynamic_fields('fact_blocks') %}
{{ config(
materialized = 'incremental',
incremental_strategy = 'merge',
incremental_strategy = 'delete+insert',
unique_key = 'blocks_id',
cluster_by = ['block_timestamp::DATE'],
post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION on equality(block_id)",

View File

@ -1,62 +1,29 @@
{# Get variables #}
{% set vars = return_vars() %}
-- depends_on: {{ ref('silver__msgs') }}
{# Set fact_transactions specific variables #}
{% set rpc_vars = set_dynamic_fields('fact_transactions') %}
{{ config (
{{ config(
materialized = 'incremental',
incremental_strategy = 'merge',
incremental_strategy = 'delete+insert',
unique_key = 'msg_attributes_id',
cluster_by = ['modified_timestamp::DATE'],
cluster_by = ['block_timestamp::DATE'],
post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION on equality(block_id, tx_id)",
incremental_predicates = [fsc_ibc.standard_predicate()],
tags = ['silver', 'core', 'phase_2']
tags = ['gold', 'core', 'phase_2']
) }}
WITH silver_msgs AS (
SELECT
block_id,
block_timestamp,
tx_id,
tx_succeeded,
msg_group,
msg_sub_group,
msg_index,
msg_type,
f.index AS attribute_index,
CASE
WHEN TRY_BASE64_DECODE_STRING(f.value :key) IS NULL
THEN f.value :key
ELSE TRY_BASE64_DECODE_STRING(f.value :key)
END AS attribute_key,
CASE
WHEN TRY_BASE64_DECODE_STRING(f.value :key) IS NULL
THEN f.value :value
ELSE TRY_BASE64_DECODE_STRING(f.value :value)
END AS attribute_value,
msgs._inserted_timestamp
FROM
{{ ref('silver__msgs') }} AS msgs,
LATERAL FLATTEN(
input => msgs.msg,
path => 'attributes'
) AS f
{% if is_incremental() %}
WHERE
_inserted_timestamp :: DATE >= (
SELECT
MAX(_inserted_timestamp) :: DATE - 2
FROM
{{ this }}
)
{% endif %}
)
SELECT
block_id,
block_timestamp,
tx_id,
tx_succeeded,
msg_group,
msg_sub_group,
CONCAT(
msg_group,
':',
msg_sub_group
) AS msg_group,
msg_index,
msg_type,
attribute_index,
@ -64,10 +31,18 @@ SELECT
attribute_value,
{{ dbt_utils.generate_surrogate_key(
['tx_id','msg_index','attribute_index']
) }} AS msg_attributes_id,
) }} AS fact_msg_attributes_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp,
_inserted_timestamp,
'{{ invocation_id }}' AS _invocation_id
FROM
silver_msgs
{{ ref('silver__msg_attributes') }}
{% if is_incremental() %}
WHERE
modified_timestamp :: DATE >= (
SELECT
MAX(modified_timestamp)
FROM
{{ this }}
)
{% endif %}

View File

@ -2,11 +2,11 @@
{% set vars = return_vars() %}
{# Set fact_transactions specific variables #}
{#{% set rpc_vars = set_dynamic_fields('fact_transactions') %}#}
{% set rpc_vars = set_dynamic_fields('fact_transactions') %}
{{ config(
materialized = 'incremental',
incremental_strategy = 'merge',
incremental_strategy = 'delete+insert',
unique_key = 'transactions_id',
cluster_by = ['block_timestamp::DATE'],
post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION on equality(block_id, tx_id)",

View File

@ -2,11 +2,11 @@
{% set vars = return_vars() %}
{# Set fact_transactions specific variables #}
{#{% set rpc_vars = set_dynamic_fields('fact_transactions') %}#}
{% set rpc_vars = set_dynamic_fields('fact_transactions') %}
{{ config(
materialized = 'incremental',
incremental_strategy = 'merge',
incremental_strategy = 'delete+insert',
unique_key = 'transactions_id',
cluster_by = ['block_timestamp::DATE'],
incremental_predicates = [fsc_ibc.standard_predicate()],

View File

@ -1,40 +0,0 @@
{# Get variables #}
{% set vars = return_vars() %}
{# Set fact_transactions_logs specific variables #}
{#{% set rpc_vars = set_dynamic_fields('fact_transactions_logs') %}#}
{{ config(
materialized = 'incremental',
incremental_strategy = 'merge',
unique_key = 'transactions_logs_id',
cluster_by = ['block_timestamp::DATE'],
post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION on equality(block_id, tx_id)",
incremental_predicates = [fsc_ibc.standard_predicate()],
tags = ['gold', 'core', 'phase_2'],
enabled = false
) }}
SELECT
block_id,
block_timestamp,
tx_id,
tx_succeeded,
tx_code,
codespace,
tx_log,
transactions_logs_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp,
'{{ invocation_id }}' AS _invocation_id
FROM
{{ ref('silver__transactions_logs') }}
{% if is_incremental() %}
WHERE
modified_timestamp :: DATE >= (
SELECT
MAX(modified_timestamp)
FROM
{{ this }}
)
{% endif %}

View File

@ -1,74 +0,0 @@
{# Get variables #}
{% set vars = return_vars() %}
{{ log_model_details() }}
{{ config(
materialized = 'incremental',
incremental_strategy = 'delete+insert',
unique_key = "block_timestamp_hour",
cluster_by = ['block_timestamp_hour::DATE'],
meta ={ 'database_tags':{ 'table':{ 'PURPOSE': 'STATS, METRICS, CORE, HOURLY' } } },
tags = ['gold','stats','curated','phase_4']
) }}
WITH txs AS (
SELECT
block_timestamp_hour,
block_id_min,
block_id_max,
block_count,
transaction_count,
transaction_count_success,
transaction_count_failed,
total_gas_used,
-- unique_from_count, -- TODO: Add to silver_stats__core_metrics_hourly when available
-- total_fees, -- TODO: Add to silver_stats__core_metrics_hourly when available
LAST_VALUE(
p.close ignore nulls
) over (
ORDER BY
block_timestamp_hour rows unbounded preceding
) AS imputed_close,
s.inserted_timestamp,
s.modified_timestamp
FROM
{{ ref('silver_stats__core_metrics_hourly') }} s
LEFT JOIN {{ ref('silver__hourly_prices_coingecko') }} p
ON s.block_timestamp_hour = p.recorded_hour
AND p.id = '{{ vars.GLOBAL_PROJECT_NAME }}'
)
SELECT
block_timestamp_hour,
block_id_min AS block_number_min,
block_id_max AS block_number_max,
block_count,
transaction_count,
transaction_count_success,
transaction_count_failed,
-- unique_from_count, -- TODO: Add to silver_stats__core_metrics_hourly when available
total_gas_used AS total_fees_native, -- TODO: Replace with total_fees when available
ROUND(
total_gas_used * imputed_close,
2
) AS total_fees_usd,
{{ dbt_utils.generate_surrogate_key(['block_timestamp_hour']) }} AS ez_core_metrics_hourly_id,
inserted_timestamp,
modified_timestamp
FROM
txs
WHERE
block_timestamp_hour < DATE_TRUNC('hour', CURRENT_TIMESTAMP)
{% if is_incremental() %}
AND
block_timestamp_hour >= COALESCE(
DATEADD('hour', -4, (
SELECT DATE_TRUNC('hour', MIN(block_timestamp_hour))
FROM {{ ref('silver_stats__core_metrics_hourly') }}
WHERE modified_timestamp >= (
SELECT MAX(modified_timestamp) FROM {{ this }}
)
)),
'2025-01-01 00:00:00'
)
{% endif %}

View File

@ -1,200 +0,0 @@
version: 2
models:
- name: silver__blocks
config:
contract:
enforced: false
data_tests:
- dbt_utils.sequential_values:
column_name: block_id
interval: 1
config:
error_if: ">100"
tags: ['test_recency']
columns:
- name: block_id
data_type: NUMBER
data_tests:
- not_null:
tags: ['test_quality']
- name: block_timestamp
data_type: TIMESTAMP_NTZ
data_tests:
- not_null:
tags: ['test_quality']
- name: chain_id
data_type: VARCHAR
- name: tx_count
data_type: NUMBER
data_tests:
- dbt_utils.expression_is_true:
expression: ">=0"
tags: ['test_quality']
- name: proposer_address
data_type: VARCHAR
data_tests:
- not_null:
tags: ['test_quality']
- name: validator_hash
data_type: VARCHAR
data_tests:
- not_null:
tags: ['test_quality']
- name: header
data_type: VARIANT
- name: _inserted_timestamp
data_type: TIMESTAMP_NTZ
- name: blocks_id
data_type: VARCHAR
- name: inserted_timestamp
data_type: TIMESTAMP_NTZ
- name: modified_timestamp
data_type: TIMESTAMP_NTZ
- name: _invocation_id
data_type: VARCHAR
- name: silver__msg_attributes
columns:
- name: block_id
data_type: NUMBER
- name: block_timestamp
data_type: TIMESTAMP_NTZ
- name: tx_id
data_type: VARCHAR
- name: tx_succeeded
data_type: BOOLEAN
- name: msg_group
data_type: NUMBER
- name: msg_sub_group
data_type: NUMBER
- name: msg_index
data_type: NUMBER
- name: msg_type
data_type: VARCHAR
- name: attribute_index
data_type: NUMBER
- name: attribute_key
data_type: VARCHAR
- name: attribute_value
data_type: VARCHAR
- name: msg_attributes_id
data_type: VARCHAR
- name: inserted_timestamp
data_type: TIMESTAMP_NTZ
- name: modified_timestamp
data_type: TIMESTAMP_NTZ
- name: _inserted_timestamp
data_type: TIMESTAMP_NTZ
- name: _invocation_id
data_type: VARCHAR
- name: silver__msgs
columns:
- name: block_id
data_type: NUMBER
- name: block_timestamp
data_type: TIMESTAMP_NTZ
- name: tx_id
data_type: VARCHAR
- name: tx_succeeded
data_type: BOOLEAN
- name: msg_group
data_type: NUMBER
- name: msg_sub_group
data_type: NUMBER
- name: msg_index
data_type: NUMBER
- name: msg_type
data_type: VARCHAR
- name: msg
data_type: VARIANT
- name: msgs_id
data_type: VARCHAR
- name: inserted_timestamp
data_type: TIMESTAMP_NTZ
- name: modified_timestamp
data_type: TIMESTAMP_NTZ
- name: _inserted_timestamp
data_type: TIMESTAMP_NTZ
- name: _invocation_id
data_type: VARCHAR
- name: silver__transactions_logs
columns:
- name: block_id
data_type: NUMBER
- name: block_timestamp
data_type: TIMESTAMP_NTZ
- name: tx_id
data_type: VARCHAR
- name: tx_succeeded
data_type: BOOLEAN
- name: tx_code
data_type: NUMBER
- name: codespace
data_type: VARIANT
- name: tx_log
data_type: VARIANT
- name: transactions_logs_id
data_type: VARCHAR
- name: _inserted_timestamp
data_type: TIMESTAMP_NTZ
- name: inserted_timestamp
data_type: TIMESTAMP_NTZ
- name: modified_timestamp
data_type: TIMESTAMP_NTZ
- name: _invocation_id
data_type: VARCHAR
- name: silver__transactions
config:
contract:
enforced: false
columns:
- name: block_id
data_type: NUMBER
data_tests:
- not_null:
tags: ['test_quality']
- name: block_timestamp
data_type: TIMESTAMP_NTZ
data_tests:
- not_null:
tags: ['test_quality']
- name: codespace
data_type: VARIANT
- name: tx_id
data_type: VARCHAR
data_tests:
- not_null:
tags: ['test_quality']
- name: tx_index
data_type: NUMBER
- name: tx_log
data_type: VARCHAR
- name: tx_succeeded
data_type: BOOLEAN
- name: gas_used
data_type: NUMBER
- name: gas_wanted
data_type: NUMBER
- name: tx_code
data_type: NUMBER
- name: DATA
data_type: VARIANT
- name: partition_key
data_type: VARCHAR
- name: block_id_requested
data_type: NUMBER
- name: _inserted_timestamp
data_type: TIMESTAMP_NTZ
- name: transactions_id
data_type: VARCHAR
- name: inserted_timestamp
data_type: TIMESTAMP_NTZ
- name: modified_timestamp
data_type: TIMESTAMP_NTZ
- name: _invocation_id
data_type: VARCHAR
- name: silver__transfers

View File

@ -1,76 +0,0 @@
{# Get variables #}
{% set vars = return_vars() %}
{{ config (
materialized = "incremental",
incremental_strategy = 'merge',
unique_key = 'transactions_id',
cluster_by = ['modified_timestamp::DATE','partition_key'],
incremental_predicates = [fsc_ibc.standard_predicate()],
tags = ['silver', 'core', 'phase_2']
) }}
-- depends_on: {{ ref('bronze__transactions') }}
-- depends_on: {{ ref('bronze__transactions_fr') }}
WITH bronze_transactions AS (
SELECT
VALUE :BLOCK_ID_REQUESTED AS block_id,
TO_TIMESTAMP_NTZ(
VALUE:BLOCK_TIMESTAMP::STRING,
'YYYY_MM_DD_HH_MI_SS_FF3'
) AS block_timestamp,
DATA :hash :: STRING AS tx_id,
DATA :index AS tx_index,
DATA :tx_result :codespace :: STRING AS codespace,
DATA :tx_result :gas_used :: NUMBER AS gas_used,
DATA :tx_result :gas_wanted :: NUMBER AS gas_wanted,
CASE
WHEN DATA :tx_result :code :: NUMBER = 0 THEN TRUE
ELSE FALSE
END AS tx_succeeded,
DATA :tx_result :code :: INT AS tx_code,
COALESCE(
TRY_PARSE_JSON(
DATA :tx_result :log
),
DATA :tx_result :log
) AS tx_log,
DATA :tx_result :events AS msgs,
DATA,
partition_key,
inserted_timestamp AS _inserted_timestamp,
{{ dbt_utils.generate_surrogate_key(
['block_id', 'tx_id']
) }} AS transactions_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp,
'{{ invocation_id }}' AS _invocation_id
FROM
{% if is_incremental() %}
{{ ref('bronze__transactions') }}
{% else %}
{{ ref('bronze__transactions_fr') }}
{% endif %}
WHERE
DATA <> PARSE_JSON('[]')
{% if is_incremental() %}
AND
inserted_timestamp >= (
SELECT
MAX(_inserted_timestamp)
FROM
{{ this }}
)
{% endif %}
)
SELECT
*
FROM
bronze_transactions
QUALIFY ROW_NUMBER() OVER (
PARTITION BY block_id, tx_id
ORDER BY _inserted_timestamp DESC
) = 1

View File

@ -1,129 +0,0 @@
{{ config(
materialized = 'incremental',
unique_key = ['tx_id'],
incremental_strategy = 'merge',
cluster_by = ['block_timestamp::DATE'],
tags = ['silver', 'core', 'phase_2']
) }}
WITH event_attributes AS (
SELECT
t.block_id,
t.block_timestamp,
t.tx_id,
t.tx_succeeded,
t.tx_code,
t.codespace,
COALESCE(l.value:msg_index::INTEGER, 0) as msg_index,
e.value:type::STRING as event_type,
ARRAY_AGG(
OBJECT_CONSTRUCT(
'key', a.value:key::STRING,
'value', COALESCE(a.value:value::STRING, '')
)
) as attributes,
t._inserted_timestamp
FROM {{ ref('silver__transactions') }} t,
LATERAL FLATTEN(input => PARSE_JSON(t.tx_log)) l,
LATERAL FLATTEN(input => l.value:events) e,
LATERAL FLATTEN(input => e.value:attributes) a
WHERE t.tx_log IS NOT NULL
AND t.tx_succeeded = TRUE
AND IS_OBJECT(PARSE_JSON(t.tx_log))
{% if is_incremental() %}
AND t._inserted_timestamp >= (
SELECT MAX(_inserted_timestamp)
FROM {{ this }}
)
{% endif %}
GROUP BY
t.block_id,
t.block_timestamp,
t.tx_id,
t.tx_succeeded,
t.tx_code,
t.codespace,
l.value:msg_index,
e.value:type,
t._inserted_timestamp
),
parsed_logs AS (
SELECT
block_id,
block_timestamp,
tx_id,
tx_succeeded,
tx_code,
codespace,
ARRAY_AGG(
OBJECT_CONSTRUCT(
'msg_index', msg_index,
'event_type', event_type,
'attributes', attributes
)
) WITHIN GROUP (ORDER BY msg_index, event_type) as tx_log,
_inserted_timestamp
FROM event_attributes
GROUP BY
block_id,
block_timestamp,
tx_id,
tx_succeeded,
tx_code,
codespace,
_inserted_timestamp
),
failed_txs AS (
SELECT
t.block_id,
t.block_timestamp,
t.tx_id,
t.tx_succeeded,
t.tx_code,
t.codespace,
ARRAY_CONSTRUCT(
OBJECT_CONSTRUCT(
'msg_index', 0,
'event_type', 'error',
'attributes', ARRAY_CONSTRUCT(
OBJECT_CONSTRUCT(
'key', 'error_message',
'value', t.tx_log::STRING
)
)
)
) as tx_log,
t._inserted_timestamp
FROM {{ ref('silver__transactions') }} t
WHERE t.tx_succeeded = FALSE
AND t.tx_log IS NOT NULL
{% if is_incremental() %}
AND t._inserted_timestamp >= (
SELECT MAX(_inserted_timestamp)
FROM {{ this }}
)
{% endif %}
)
SELECT
block_id,
block_timestamp,
tx_id,
tx_succeeded,
tx_code,
codespace,
tx_log,
{{ dbt_utils.generate_surrogate_key(['tx_id']) }} as transactions_logs_id,
_inserted_timestamp,
CURRENT_TIMESTAMP() as inserted_timestamp,
CURRENT_TIMESTAMP() as modified_timestamp,
'{{ invocation_id }}' as _invocation_id
FROM (
SELECT * FROM parsed_logs
UNION ALL
SELECT * FROM failed_txs
) t
QUALIFY ROW_NUMBER() OVER (PARTITION BY tx_id ORDER BY _inserted_timestamp DESC) = 1
ORDER BY block_timestamp, tx_id

View File

@ -1,91 +0,0 @@
{% set vars = return_vars() %}
{{ log_model_details() }}
{{ config(
materialized = 'incremental',
unique_key = 'test_timestamp',
full_refresh = vars.GLOBAL_SILVER_FR_ENABLED,
tags = ['silver','observability','phase_3']
) }}
WITH summary_stats AS (
SELECT
MIN(block_number) AS min_block,
MAX(block_number) AS max_block,
MIN(block_timestamp) AS min_block_timestamp,
MAX(block_timestamp) AS max_block_timestamp,
COUNT(1) AS blocks_tested
FROM
{{ ref('core__fact_blocks') }}
WHERE
block_timestamp <= DATEADD('hour', -12, CURRENT_TIMESTAMP())
{% if is_incremental() %}
AND (
block_number >= (
SELECT MIN(block_number) FROM (
SELECT MIN(block_number) AS block_number
FROM {{ ref('core__fact_blocks') }}
WHERE block_timestamp BETWEEN DATEADD('hour', -96, CURRENT_TIMESTAMP())
AND DATEADD('hour', -95, CURRENT_TIMESTAMP())
UNION
SELECT MIN(VALUE) - 1 AS block_number
FROM (
SELECT blocks_impacted_array
FROM {{ this }}
QUALIFY ROW_NUMBER() OVER (ORDER BY test_timestamp DESC) = 1
),
LATERAL FLATTEN(input => blocks_impacted_array)
)
)
{% if vars.MAIN_OBSERV_FULL_TEST_ENABLED %}
OR block_number >= 0
{% endif %}
)
{% endif %}
),
block_range AS (
SELECT _id AS block_number
FROM {{ source('crosschain_silver', 'number_sequence') }}
WHERE _id BETWEEN (
SELECT min_block FROM summary_stats
) AND (
SELECT max_block FROM summary_stats
)
),
blocks AS (
SELECT
l.block_number,
block_timestamp,
LAG(l.block_number, 1) OVER (ORDER BY l.block_number ASC) AS prev_block_number
FROM {{ ref('core__fact_blocks') }} l
INNER JOIN block_range b ON l.block_number = b.block_number
AND l.block_number >= (
SELECT MIN(block_number) FROM block_range
)
),
block_gen AS (
SELECT _id AS block_number
FROM {{ source('crosschain_silver', 'number_sequence') }}
WHERE _id BETWEEN (
SELECT MIN(block_number) FROM blocks
) AND (
SELECT MAX(block_number) FROM blocks
)
)
SELECT
'blocks' AS test_name,
MIN(b.block_number) AS min_block,
MAX(b.block_number) AS max_block,
MIN(b.block_timestamp) AS min_block_timestamp,
MAX(b.block_timestamp) AS max_block_timestamp,
COUNT(1) AS blocks_tested,
COUNT(CASE WHEN C.block_number IS NOT NULL THEN A.block_number END) AS blocks_impacted_count,
ARRAY_AGG(CASE WHEN C.block_number IS NOT NULL THEN A.block_number END) WITHIN GROUP (ORDER BY A.block_number) AS blocks_impacted_array,
CURRENT_TIMESTAMP() AS test_timestamp,
CURRENT_TIMESTAMP() AS modified_timestamp
FROM block_gen A
LEFT JOIN blocks b ON A.block_number = b.block_number
LEFT JOIN blocks C ON A.block_number > C.prev_block_number
AND A.block_number < C.block_number
AND C.block_number - C.prev_block_number <> 1
WHERE COALESCE(b.block_number, C.block_number) IS NOT NULL

View File

@ -1,98 +0,0 @@
{% set vars = return_vars() %}
{{ log_model_details() }}
{{ config(
materialized = 'incremental',
unique_key = 'test_timestamp',
full_refresh = vars.GLOBAL_SILVER_FR_ENABLED,
tags = ['silver','observability','phase_3']
) }}
WITH summary_stats AS (
SELECT
MIN(block_id) AS min_block_id,
MAX(block_id) AS max_block_id,
MIN(block_timestamp) AS min_block_timestamp,
MAX(block_timestamp) AS max_block_timestamp,
COUNT(1) AS blocks_tested
FROM
{{ ref('core__fact_blocks') }}
WHERE
block_timestamp <= DATEADD('hour', -12, CURRENT_TIMESTAMP())
{% if is_incremental() %}
AND (
block_id >= (
SELECT MIN(block_id) FROM (
SELECT MIN(block_id) AS block_id
FROM {{ ref('core__fact_blocks') }}
WHERE block_timestamp BETWEEN DATEADD('hour', -96, CURRENT_TIMESTAMP())
AND DATEADD('hour', -95, CURRENT_TIMESTAMP())
UNION
SELECT MIN(VALUE) - 1 AS block_id
FROM (
SELECT blocks_impacted_array
FROM {{ this }}
QUALIFY ROW_NUMBER() OVER (ORDER BY test_timestamp DESC) = 1
),
LATERAL FLATTEN(input => blocks_impacted_array)
)
)
{% if vars.MAIN_OBSERV_FULL_TEST_ENABLED %}
OR block_id >= 0
{% endif %}
)
{% endif %}
),
base_blocks AS (
SELECT
block_id,
block_timestamp,
tx_count AS transaction_count
FROM
{{ ref('core__fact_blocks') }}
WHERE
block_timestamp <= (
SELECT max_block_timestamp FROM summary_stats
)
),
actual_tx_counts AS (
SELECT
block_id,
COUNT(1) AS transaction_count
FROM
{{ ref('core__fact_transactions') }}
WHERE
block_id IS NOT NULL
GROUP BY block_id
),
potential_missing_txs AS (
SELECT
e.block_id
FROM
base_blocks e
LEFT OUTER JOIN actual_tx_counts a
ON e.block_id = a.block_id
WHERE
COALESCE(a.transaction_count, 0) <> e.transaction_count
),
impacted_blocks AS (
SELECT
COUNT(1) AS blocks_impacted_count,
ARRAY_AGG(block_id) WITHIN GROUP (ORDER BY block_id) AS blocks_impacted_array
FROM potential_missing_txs
)
SELECT
'transactions' AS test_name,
min_block_id AS min_block,
max_block_id AS max_block,
min_block_timestamp,
max_block_timestamp,
blocks_tested,
blocks_impacted_count,
blocks_impacted_array,
SYSDATE() AS test_timestamp,
SYSDATE() AS modified_timestamp
FROM
summary_stats
JOIN impacted_blocks
ON 1 = 1

View File

@ -1,43 +0,0 @@
{# Get variables #}
{% set vars = return_vars() %}
{# Log configuration details #}
{{ log_model_details() }}
{{ config(
materialized = 'incremental',
unique_key = ['id','recorded_hour'],
incremental_strategy = 'merge',
merge_exclude_columns = ["inserted_timestamp"],
cluster_by = 'recorded_hour::DATE',
tags = ['noncore']
) }}
SELECT
id,
recorded_hour,
OPEN,
high,
low,
CLOSE,
_INSERTED_TIMESTAMP,
hourly_prices_coin_gecko_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp,
'{{ invocation_id }}' AS _invocation_id
FROM
{{ source(
'crosschain_silver',
'hourly_prices_coin_gecko'
) }}
WHERE
id = '{{ vars.GLOBAL_PROJECT_NAME }}'
{% if is_incremental() %}
AND modified_timestamp >= (
SELECT
MAX(modified_timestamp)
FROM
{{ this }}
)
{% endif %}

View File

@ -1,9 +1,11 @@
{# Get variables #}
{% set vars = return_vars() %}
-- depends_on: {{ ref('bronze__blocks') }}
{{ config(
materialized = 'incremental',
incremental_strategy = 'merge',
incremental_strategy = 'delete+insert',
unique_key = 'blocks_id',
cluster_by = ['modified_timestamp::DATE'],
incremental_predicates = [fsc_ibc.standard_predicate()],
@ -11,10 +13,9 @@
) }}
WITH bronze_blocks AS (
SELECT
'{{ vars.GLOBAL_PROJECT_NAME }}' AS blockchain,
VALUE :BLOCK_ID :: INT AS block_id,
block_id,
COALESCE(
DATA :result :block :header :time :: TIMESTAMP,
DATA :block :header :time :: TIMESTAMP,
@ -26,12 +27,8 @@ WITH bronze_blocks AS (
DATA :block :header :chain_id :: STRING
) AS chain_id,
COALESCE(
ARRAY_SIZE(
DATA :result :block :data :txs
) :: NUMBER,
ARRAY_SIZE(
DATA :block :data :txs
) :: NUMBER
ARRAY_SIZE(DATA :result :block :data :txs) :: NUMBER,
ARRAY_SIZE(DATA :block :data :txs) :: NUMBER
) AS tx_count,
COALESCE(
DATA :result :block :header :proposer_address :: STRING,
@ -45,29 +42,23 @@ WITH bronze_blocks AS (
DATA :result :block :header,
DATA :block :header
) AS header,
inserted_timestamp
_inserted_timestamp
FROM
{% if is_incremental() %}
{{ ref('bronze__blocks') }}
{% else %}
{{ ref('bronze__blocks_fr') }}
{% endif %}
WHERE
VALUE :data :error IS NULL
AND DATA :error IS NULL
{% if is_incremental() %}
AND inserted_timestamp >= (
SELECT
MAX(inserted_timestamp)
FROM
{{ this }}
)
{% endif %}
{{ ref('bronze__blocks') }}
WHERE
VALUE :data :error IS NULL
AND DATA :error IS NULL
AND DATA :result :begin_block_events IS NULL
{% if is_incremental() %}
AND _inserted_timestamp :: DATE >= (
SELECT
MAX(_inserted_timestamp) :: DATE - 2
FROM
{{ this }}
)
{% endif %}
)
SELECT
blockchain,
block_id,
block_timestamp,
chain_id,
@ -75,17 +66,14 @@ SELECT
proposer_address,
validator_hash,
header,
_inserted_timestamp,
{{ dbt_utils.generate_surrogate_key(['chain_id', 'block_id']) }} AS blocks_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp,
'{{ invocation_id }}' AS _invocation_id
FROM
bronze_blocks
WHERE
block_id is not null
QUALIFY ROW_NUMBER() OVER (
PARTITION BY chain_id,
block_id
ORDER BY
inserted_timestamp DESC
) = 1
bronze_blocks
QUALIFY ROW_NUMBER() over (
PARTITION BY chain_id, block_id
ORDER BY _inserted_timestamp DESC
) = 1

View File

@ -5,7 +5,7 @@
{{ config (
materialized = 'incremental',
incremental_strategy = 'merge',
incremental_strategy = 'delete+insert',
unique_key = 'msg_attributes_id',
cluster_by = ['modified_timestamp::DATE'],
incremental_predicates = [fsc_ibc.standard_predicate()],
@ -35,11 +35,11 @@ WITH silver_msgs AS (
END AS attribute_value,
msgs._inserted_timestamp
FROM
{{ ref('silver__msgs') }} AS msgs,
LATERAL FLATTEN(
input => msgs.msg,
path => 'attributes'
) AS f
{{ ref('silver__msgs') }} AS msgs
LATERAL FLATTEN(
input => msgs.msg,
path => 'attributes'
) AS f
{% if is_incremental() %}
WHERE
_inserted_timestamp :: DATE >= (

View File

@ -5,9 +5,9 @@
{{ config (
materialized = 'incremental',
incremental_strategy = 'merge',
incremental_strategy = 'delete+insert',
unique_key = 'msgs_id',
cluster_by = ['modified_timestamp::DATE'],
cluster_by = ['modified_timestamp::DATE', 'partition_key'],
incremental_predicates = [fsc_ibc.standard_predicate()],
tags = ['silver', 'core', 'phase_2']
) }}
@ -59,7 +59,7 @@ WITH bronze_msgs AS (
THEN msg :attributes [0] :key
ELSE TRY_BASE64_DECODE_STRING(msg :attributes [0] :value)
END AS attribute_value,
transactions._inserted_timestamp
t._inserted_timestamp
FROM
{{ ref('silver__transactions') }} transactions
JOIN LATERAL FLATTEN(input => transactions.msgs) f
@ -124,7 +124,12 @@ msgs AS (
bronze_msgs.msg_index,
msg_type,
msg,
bronze_msgs._inserted_timestamp
concat_ws(
'-',
bronze_msgs.tx_id,
bronze_msgs.msg_index
) AS unique_key,
_inserted_timestamp
FROM
bronze_msgs
LEFT JOIN GROUPING b
@ -141,6 +146,7 @@ SELECT
msg_index,
msg_type,
msg :: OBJECT AS msg,
unique_key,
{{ dbt_utils.generate_surrogate_key(
['tx_id','msg_index']
) }} AS msgs_id,

View File

@ -0,0 +1,118 @@
{# Get variables #}
{% set vars = return_vars() %}
-- depends_on: {{ ref('bronze__transactions') }}
-- depends_on: {{ ref('bronze__transactions_fr') }}
{{ config (
materialized = "incremental",
incremental_strategy = 'delete+insert',
unique_key = 'transactions_id',
cluster_by = ['modified_timestamp::DATE','partition_key'],
incremental_predicates = [fsc_ibc.standard_predicate()],
tags = ['silver', 'core', 'phase_2']
) }}
WITH bronze_transactions AS (
SELECT
block_id,
COALESCE(
DATA :hash,
f.value :hash
) :: STRING AS tx_id,
COALESCE(
DATA :index,
f.index
) AS tx_index,
COALESCE(
DATA :tx_result :codespace,
f.value :tx_result :codespace
) :: STRING AS codespace,
COALESCE(
DATA :tx_result :gas_used,
f.value :tx_result :gas_used
) :: NUMBER AS gas_used,
COALESCE(
DATA :tx_result :gas_wanted,
f.value :tx_result :gas_wanted
) :: NUMBER AS gas_wanted,
COALESCE(
DATA :tx_result :code,
f.value :tx_result :code
) :: INT AS tx_code,
COALESCE(
TRY_PARSE_JSON(
COALESCE(
DATA :tx_result :log,
f.value :tx_result :log
)
),
COALESCE(
DATA :tx_result :log,
f.value :tx_result :log
)
) AS tx_log,
CASE
WHEN f.value IS NOT NULL THEN f.value
ELSE DATA
END AS DATA,
partition_key,
COALESCE(
transactions.value :block_id_REQUESTED,
REPLACE(
metadata :request :params [0],
'tx.height='
)
) AS block_id_requested,
inserted_timestamp AS _inserted_timestamp
FROM
{% if is_incremental() %}
{{ ref('bronze__transactions') }}
{% else %}
{{ ref('bronze__transactions_fr') }}
{% endif %}
AS transactions
JOIN LATERAL FLATTEN(
DATA :result :txs,
outer => TRUE
) AS f
WHERE
{% if is_incremental() %}
inserted_timestamp >= (
SELECT
MAX(_inserted_timestamp)
FROM
{{ this }}
)
{% endif %}
)
SELECT
block_id,
block_timestamp,
codespace,
tx_id,
tx_index,
tx_log,
tx_succeeded,
{# tx_from, #}
{# fee, #}
{# fee_denom, #}
gas_used,
gas_wanted,
tx_code,
DATA,
partition_key,
block_id_requested,
_inserted_timestamp,
{{ dbt_utils.generate_surrogate_key(
['block_id_requested', 'tx_id']
) }} AS transactions_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp,
'{{ invocation_id }}' AS _invocation_id
FROM
bronze_transactions
QUALIFY(ROW_NUMBER() over (
PARTITION BY block_id_requested, tx_id
ORDER BY _inserted_timestamp DESC)
) = 1

View File

@ -1,59 +0,0 @@
{# Log configuration details #}
{{ log_model_details() }}
{# Set up dbt configuration #}
{{ config(
materialized = 'incremental',
incremental_strategy = 'delete+insert',
unique_key = "block_timestamp_hour",
cluster_by = ['block_timestamp_hour::DATE'],
tags = ['silver_stats','curated','stats','daily_test','phase_4']
) }}
{# run incremental timestamp value first then use it as a static value #}
{% if execute %}
{% if is_incremental() %}
{% set query %}
SELECT
MIN(DATE_TRUNC('hour', block_timestamp)) AS block_timestamp_hour
FROM
{{ ref('core__fact_transactions') }}
WHERE
modified_timestamp >= (
SELECT
MAX(modified_timestamp)
FROM
{{ this }}
)
{% endset %}
{% set min_block_timestamp_hour = run_query(query).columns [0].values() [0] %}
{% endif %}
{% endif %}
{# TODO: unique_from_count and total_fees are omitted because tx_from and fee are not present in core__fact_transactions. #}
{# Main query starts here #}
SELECT
DATE_TRUNC('hour', block_timestamp) AS block_timestamp_hour,
MIN(block_id) AS block_id_min,
MAX(block_id) AS block_id_max,
COUNT(DISTINCT block_id) AS block_count,
COUNT(DISTINCT tx_id) AS transaction_count,
COUNT(DISTINCT CASE WHEN tx_succeeded THEN tx_id END) AS transaction_count_success,
COUNT(DISTINCT CASE WHEN NOT tx_succeeded THEN tx_id END) AS transaction_count_failed,
SUM(gas_used) AS total_gas_used,
MAX(modified_timestamp) AS _inserted_timestamp,
{{ dbt_utils.generate_surrogate_key(['block_timestamp_hour']) }} AS core_metrics_hourly_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp,
'{{ invocation_id }}' AS _invocation_id
FROM
{{ ref('core__fact_transactions') }}
WHERE
DATE_TRUNC('hour', block_timestamp) < DATE_TRUNC('hour', CURRENT_TIMESTAMP)
{% if is_incremental() %}
AND DATE_TRUNC('hour', block_timestamp) >= '{{ min_block_timestamp_hour }}'
{% endif %}
GROUP BY 1

View File

@ -1,5 +1,6 @@
{# Get variables #}
{% set vars = return_vars() %}
-- depends_on: {{ ref('bronze__blocks') }}
{{ config (
materialized = "incremental",
@ -7,17 +8,12 @@
unique_key = "block_id",
cluster_by = "ROUND(block_id, -3)",
merge_exclude_columns = ["inserted_timestamp"],
post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION on equality(block_id)",
tags = ['streamline','core','complete','phase_1']
post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION on equality(block_id)"
) }}
SELECT
DATA :result :block :header :height :: INT AS block_id,
DATA :result :block :header :time :: TIMESTAMP AS block_timestamp,
ARRAY_SIZE(
DATA :result :block :data :txs
) tx_count,
{{ dbt_utils.generate_surrogate_key(
{{ dbt_utils.generate_surrogate_key(
['block_id']
) }} AS complete_blocks_id,
SYSDATE() AS inserted_timestamp,

View File

@ -1,19 +1,27 @@
{# Get variables #}
{% set vars = return_vars() %}
-- depends_on: {{ ref('bronze__transactions') }}
{{ config (
materialized = "incremental",
incremental_strategy = 'merge',
unique_key = "complete_transactions_id",
cluster_by = "ROUND(block_id, -3)",
merge_exclude_columns = ["inserted_timestamp"],
post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION on equality(block_id)",
tags = ['streamline','core','complete','phase_1']
post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION on equality(block_id)"
) }}
SELECT
VALUE :BLOCK_ID_REQUESTED :: INT AS block_id,
VALUE :PAGE_NUMBER :: INT AS page_number,
COALESCE(
VALUE :block_id_REQUESTED,
DATA :height,
VALUE :data :result :txs [0] :height
) :: INT AS block_id,
COALESCE(
VALUE :PAGE_NUMBER,
metadata :request :params [2]
) :: INT AS page_number,
{{ dbt_utils.generate_surrogate_key(
['block_id','page_number']
) }} AS complete_transactions_id,

View File

@ -1,7 +1,7 @@
{# Get variables #}
{% set vars = return_vars() %}
-- depends_on: {{ ref('bronze__tx_count') }}
-- depends_on: {{ ref('bronze__tx_counts') }}
{{ config (
materialized = "incremental",
@ -9,8 +9,7 @@
unique_key = "block_id",
cluster_by = "ROUND(block_id, -3)",
merge_exclude_columns = ["inserted_timestamp"],
post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION on equality(block_id)",
tags = ['streamline','core','complete','phase_1']
post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION on equality(block_id)"
) }}
SELECT
@ -26,7 +25,7 @@ SELECT
FROM
{% if is_incremental() %}
{{ ref('bronze__tx_count') }}
{{ ref('bronze__tx_counts') }}
WHERE
inserted_timestamp >= (
SELECT
@ -34,8 +33,9 @@ WHERE
FROM
{{ this }}
)
AND block_id NOT IN (21208991)
{% else %}
{{ ref('bronze__tx_count_fr') }}
{{ ref('bronze__tx_counts_fr') }}
{% endif %}
qualify(ROW_NUMBER() over (PARTITION BY block_id

View File

@ -1,12 +1,7 @@
{# Get variables #}
{% set vars = return_vars() %}
{# Log configuration details #}
{{ log_model_details() }}
{# Set up dbt configuration #}
{{ config (
materialized = "view",
materialized = 'view',
tags = ['streamline','core','realtime','phase_1']
) }}
@ -24,12 +19,12 @@ WITH blocks AS (
)
SELECT
ROUND(block_id, -4) :: INT AS partition_key,
block_id,
{{ target.database }}.live.udf_api(
'POST',
'{{ vars.GLOBAL_NODE_URL }}',
OBJECT_CONSTRUCT(
'Content-Type', 'application/json'
'Content-Type', 'application/json',
'fsc-quantum-state', 'streamline'
),
OBJECT_CONSTRUCT(
'id', block_id,
@ -40,7 +35,9 @@ SELECT
'{{ vars.GLOBAL_NODE_VAULT_PATH }}'
) AS request
FROM
blocks
blocks
ORDER BY
block_id
LIMIT {{ vars.MAIN_SL_BLOCKS_REALTIME_SQL_LIMIT }}
@ -52,14 +49,13 @@ LIMIT {{ vars.MAIN_SL_BLOCKS_REALTIME_SQL_LIMIT }}
'producer_batch_size': vars.MAIN_SL_BLOCKS_REALTIME_PRODUCER_BATCH_SIZE,
'worker_batch_size': vars.MAIN_SL_BLOCKS_REALTIME_WORKER_BATCH_SIZE,
'async_concurrent_requests': vars.MAIN_SL_BLOCKS_REALTIME_ASYNC_CONCURRENT_REQUESTS,
'sql_source' : this.identifier,
"order_by_column": "block_id"
'sql_source': "{{this.identifier}}"
} %}
{% set function_call_sql %}
{{ fsc_utils.if_data_call_function_v2(
func = 'streamline.udf_bulk_rest_api_v2',
target = this.schema ~ '.' ~ this.identifier,
target = '{{this.schema}}.{{this.identifier}}',
params = params
) }}
{% endset %}

View File

@ -1,23 +1,10 @@
{# Get variables #}
{% set vars = return_vars() %}
{# Log configuration details #}
{{ log_model_details() }}
{# Set up dbt configuration #}
{{ config (
materialized = "view",
tags = ['streamline','core','realtime','phase_1']
) }}
WITH blocks AS (
SELECT
A.block_id,
A.block_timestamp,
tx_count
FROM
{{ ref('streamline__blocks_complete') }} A
{{ ref('streamline__tx_counts_complete') }} A
WHERE
tx_count > 0
),
@ -40,7 +27,6 @@ numbers AS (
blocks_with_page_numbers AS (
SELECT
tt.block_id :: INT AS block_id,
tt.block_timestamp,
n.n AS page_number
FROM
blocks tt
@ -54,7 +40,6 @@ numbers AS (
EXCEPT
SELECT
block_id,
null as block_timestamp, -- placeholder for now...
page_number
FROM
{{ ref('streamline__transactions_complete') }}
@ -68,7 +53,8 @@ numbers AS (
'POST',
'{{ vars.GLOBAL_NODE_URL }}',
OBJECT_CONSTRUCT(
'Content-Type', 'application/json'
'Content-Type', 'application/json',
'fsc-quantum-state', 'streamline'
),
OBJECT_CONSTRUCT(
'id', block_id,
@ -85,10 +71,11 @@ numbers AS (
'{{ vars.GLOBAL_NODE_VAULT_PATH }}'
) AS request,
page_number,
block_id AS block_id_requested,
to_char(block_timestamp,'YYYY_MM_DD_HH_MI_SS_FF3') AS block_timestamp
block_id AS block_id_requested
FROM
blocks_with_page_numbers
ORDER BY
block_id
LIMIT {{ vars.MAIN_SL_TRANSACTIONS_REALTIME_SQL_LIMIT }}
@ -100,15 +87,14 @@ LIMIT {{ vars.MAIN_SL_TRANSACTIONS_REALTIME_SQL_LIMIT }}
'producer_batch_size': vars.MAIN_SL_TRANSACTIONS_REALTIME_PRODUCER_BATCH_SIZE,
'worker_batch_size': vars.MAIN_SL_TRANSACTIONS_REALTIME_WORKER_BATCH_SIZE,
'async_concurrent_requests': vars.MAIN_SL_TRANSACTIONS_REALTIME_ASYNC_CONCURRENT_REQUESTS,
'sql_source' : this.identifier,
'exploded_key': '["result.txs"]',
"order_by_column": "block_id_requested"
'sql_source': "{{this.identifier}}",
'exploded_key': '["result.txs"]'
} %}
{% set function_call_sql %}
{{ fsc_utils.if_data_call_function_v2(
func = 'streamline.udf_bulk_rest_api_v2',
target = this.schema ~ '.' ~ this.identifier,
target = '{{this.schema}}.{{this.identifier}}',
params = params
) }}
{% endset %}

View File

@ -1,12 +1,10 @@
{# Get variables #}
{% set vars = return_vars() %}
{# Log configuration details #}
{{ log_model_details() }}
-- depends_on: {{ ref('streamline__tx_counts_complete') }}
{# Set up dbt configuration #}
{{ config (
materialized = "view",
materialized = 'view',
tags = ['streamline','core','realtime','phase_1']
) }}
@ -20,7 +18,7 @@ WITH blocks AS (
SELECT
block_id
FROM
{{ ref('streamline__tx_count_complete') }}
{{ ref('streamline__tx_counts_complete') }}
),
{# retry AS (
SELECT
@ -50,7 +48,8 @@ SELECT
'POST',
'{{ vars.GLOBAL_NODE_URL }}',
OBJECT_CONSTRUCT(
'Content-Type', 'application/json'
'Content-Type', 'application/json',
'fsc-quantum-state', 'streamline'
),
OBJECT_CONSTRUCT(
'id', block_id,
@ -79,13 +78,13 @@ ORDER BY
'producer_batch_size': vars.MAIN_SL_TX_COUNTS_REALTIME_PRODUCER_BATCH_SIZE,
'worker_batch_size': vars.MAIN_SL_TX_COUNTS_REALTIME_WORKER_BATCH_SIZE,
'async_concurrent_requests': vars.MAIN_SL_TX_COUNTS_REALTIME_ASYNC_CONCURRENT_REQUESTS,
'sql_source' : this.identifier
'sql_source' :"{{this.identifier}}"
} %}
{% set function_call_sql %}
{{ fsc_utils.if_data_call_function_v2(
func = 'streamline.udf_bulk_rest_api_v2',
target = this.schema ~ '.' ~ this.identifier,
target = '{{this.schema}}.{{this.identifier}}',
params = params
) }}
{% endset %}

View File

@ -1,9 +0,0 @@
version: 2
models:
- name: streamline__chainhead
description: "This model is used to get the chainhead from the blockchain."
columns:
- name: BLOCK_ID
tests:
- not_null

View File

@ -1,12 +1,6 @@
{# Get variables #}
{% set vars = return_vars() %}
{# Log configuration details #}
{{ log_model_details() }}
{{ config (
materialized = "view",
tags = ['streamline','core','chainhead','phase_1']
tags = ['streamline_view']
) }}
SELECT
@ -19,7 +13,10 @@ FROM
WHERE
_id <= (
SELECT
MAX(block_id)
COALESCE(
block_id,
0
)
FROM
{{ ref('streamline__chainhead') }}
)

View File

@ -1,12 +1,6 @@
{# Get variables #}
{% set vars = return_vars() %}
{# Log configuration details #}
{{ log_model_details() }}
{{ config (
materialized = 'view',
tags = ['streamline','core','chainhead','phase_1']
materialized = "view",
tags = ['streamline_view']
) }}
SELECT

View File

@ -1,5 +0,0 @@
{% docs attribute_index %}
The position in which attributes occur within a message
{% enddocs %}

View File

@ -1,5 +0,0 @@
{% docs attribute_key %}
The key from the key-value pair from the message attribute
{% enddocs %}

View File

@ -1,5 +0,0 @@
{% docs attribute_value %}
The value from the key-value pair from the message attribute
{% enddocs %}

View File

@ -1,5 +0,0 @@
{% docs msg %}
The flattened underlying json from the messages or events within the transactions
{% enddocs %}

View File

@ -1,5 +0,0 @@
{% docs core__fact_blocks %}
Records of all blocks that have occurred onchain, dating back to the genesis block.
{% enddocs %}

View File

@ -1,5 +0,0 @@
{% docs core__fact_msg_attributes %}
Records of all message attributes associated to messages that have occurred onchain.
{% enddocs %}

View File

@ -1,5 +0,0 @@
{% docs core__fact_msgs %}
Records of all message attributes associated to messages that have occurred onchain.
{% enddocs %}

View File

@ -1,5 +0,0 @@
{% docs core__fact_transactions %}
Records of all transactions that have occurred onchain, dating back to the genesis block.
{% enddocs %}

View File

@ -1,5 +0,0 @@
{% docs core__fact_transactions_logs %}
Records of all transactions logs that have occurred onchain, dating back to the genesis block.
{% enddocs %}

View File

@ -1,5 +0,0 @@
{% docs core__fact_transfers %}
Records of all transfers that have occurred onchain, dating back to the genesis block.
{% enddocs %}

View File

@ -1,5 +0,0 @@
{% docs tx_succeeded %}
A boolean indicator for whether the transaction was successful.
{% enddocs %}

View File

@ -11,17 +11,8 @@ sources:
schema: streamline
tables:
- name: blocks
freshness:
warn_after: {count: 2, period: hour}
error_after: {count: 4, period: hour}
- name: tx_counts
freshness:
warn_after: {count: 2, period: hour}
error_after: {count: 4, period: hour}
- name: transactions
freshness:
warn_after: {count: 2, period: hour}
error_after: {count: 4, period: hour}
- name: txs
- name: crosschain_silver
{{ 'CROSSCHAIN_DEV' if '_DEV' in target.database.upper() else 'CROSSCHAIN' }}
schema: silver

View File

@ -1,6 +1,6 @@
packages:
- git: https://github.com/FlipsideCrypto/fsc-utils.git
revision: v1.36.1
revision: v1.32.0
- package: get-select/dbt_snowflake_query_tags
version: [">=2.0.0", "<3.0.0"]
- package: dbt-labs/dbt_external_tables