mirror of
https://github.com/FlipsideCrypto/polygon-models.git
synced 2026-02-06 13:41:53 +00:00
parent
46e569d88e
commit
155ae34132
@ -132,4 +132,12 @@ vars:
|
||||
|
||||
### MAIN_PACKAGE VARIABLES END ###
|
||||
|
||||
### DECODER_PACKAGE VARIABLES BEGIN ###
|
||||
|
||||
## REQUIRED
|
||||
|
||||
## OPTIONAL
|
||||
|
||||
### DECODER_PACKAGE VARIABLES END ###
|
||||
|
||||
#### FSC_EVM END ####
|
||||
|
||||
@ -1,29 +1,26 @@
|
||||
{% macro decoded_logs_history(backfill_mode=false) %}
|
||||
|
||||
{%- set params = {
|
||||
"sql_limit": var("DECODED_LOGS_HISTORY_SQL_LIMIT", 7500000),
|
||||
"sql_limit": var("DECODED_LOGS_HISTORY_SQL_LIMIT", 8000000),
|
||||
"producer_batch_size": var("DECODED_LOGS_HISTORY_PRODUCER_BATCH_SIZE", 400000),
|
||||
"worker_batch_size": var("DECODED_LOGS_HISTORY_WORKER_BATCH_SIZE", 100000),
|
||||
"producer_limit_size": var("DECODED_LOGS_HISTORY_WORKER_BATCH_SIZE", 100000)
|
||||
"worker_batch_size": var("DECODED_LOGS_HISTORY_WORKER_BATCH_SIZE", 100000)
|
||||
} -%}
|
||||
|
||||
{% set wait_time = var("DECODED_LOGS_HISTORY_WAIT_TIME", 60) %}
|
||||
|
||||
{% set find_months_query %}
|
||||
SELECT
|
||||
DISTINCT date_trunc('month', block_timestamp)::date as month
|
||||
FROM {{ ref('core__fact_blocks') }}
|
||||
ORDER BY month ASC
|
||||
{% endset %}
|
||||
|
||||
{% set results = run_query(find_months_query) %}
|
||||
|
||||
{% if execute %}
|
||||
{% set months = results.columns[0].values() %}
|
||||
|
||||
|
||||
{% for month in months %}
|
||||
{% set view_name = 'decoded_logs_history_' ~ month.strftime('%Y_%m') %}
|
||||
|
||||
|
||||
{% set create_view_query %}
|
||||
create or replace view streamline.{{view_name}} as (
|
||||
WITH target_blocks AS (
|
||||
@ -46,7 +43,7 @@
|
||||
),
|
||||
existing_logs_to_exclude AS (
|
||||
SELECT _log_id
|
||||
FROM {{ ref('streamline__complete_decode_logs') }} l
|
||||
FROM {{ ref('streamline__decoded_logs_complete') }} l
|
||||
INNER JOIN target_blocks b using (block_number)
|
||||
),
|
||||
candidate_logs AS (
|
||||
@ -84,11 +81,9 @@
|
||||
LIMIT {{ params.sql_limit }}
|
||||
)
|
||||
{% endset %}
|
||||
|
||||
{# Create the view #}
|
||||
{% do run_query(create_view_query) %}
|
||||
{{ log("Created view for month " ~ month.strftime('%Y-%m'), info=True) }}
|
||||
|
||||
{% if var("STREAMLINE_INVOKE_STREAMS", false) %}
|
||||
{# Check if rows exist first #}
|
||||
{% set check_rows_query %}
|
||||
@ -99,19 +94,22 @@
|
||||
{% set has_rows = results.columns[0].values()[0] %}
|
||||
|
||||
{% if has_rows %}
|
||||
{# Invoke streamline since rows exist to decode #}
|
||||
{# Invoke streamline, if rows exist to decode #}
|
||||
{% set decode_query %}
|
||||
SELECT streamline.udf_bulk_decode_logs(
|
||||
object_construct(
|
||||
'sql_source', '{{view_name}}',
|
||||
'producer_batch_size', {{ params.producer_batch_size }},
|
||||
'producer_limit_size', {{ params.producer_limit_size }})
|
||||
SELECT
|
||||
streamline.udf_bulk_decode_logs_v2(
|
||||
PARSE_JSON(
|
||||
$${ "external_table": "decoded_logs",
|
||||
"producer_batch_size": {{ params.producer_batch_size }},
|
||||
"sql_limit": {{ params.sql_limit }},
|
||||
"sql_source": "{{view_name}}",
|
||||
"worker_batch_size": {{ params.worker_batch_size }} }$$
|
||||
)
|
||||
);
|
||||
{% endset %}
|
||||
|
||||
{% do run_query(decode_query) %}
|
||||
{{ log("Triggered decoding for month " ~ month.strftime('%Y-%m'), info=True) }}
|
||||
|
||||
{# Call wait since we actually did some decoding #}
|
||||
{% do run_query("call system$wait(" ~ wait_time ~ ")") %}
|
||||
{{ log("Completed wait after decoding for month " ~ month.strftime('%Y-%m'), info=True) }}
|
||||
@ -119,7 +117,7 @@
|
||||
{{ log("No rows to decode for month " ~ month.strftime('%Y-%m'), info=True) }}
|
||||
{% endif %}
|
||||
{% endif %}
|
||||
|
||||
|
||||
{% endfor %}
|
||||
{% endif %}
|
||||
|
||||
|
||||
@ -1,14 +1,14 @@
|
||||
{% macro run_decoded_logs_history() %}
|
||||
|
||||
{% set blockchain = var('GLOBAL_PROD_DB_NAME','').lower() %}
|
||||
|
||||
{% set check_for_new_user_abis_query %}
|
||||
select 1
|
||||
from {{ ref('silver__user_verified_abis') }}
|
||||
where _inserted_timestamp::date = sysdate()::date
|
||||
and dayname(sysdate()) <> 'Sat'
|
||||
{% endset %}
|
||||
|
||||
{% set results = run_query(check_for_new_user_abis_query) %}
|
||||
|
||||
{% if execute %}
|
||||
{% set new_user_abis = results.columns[0].values()[0] %}
|
||||
|
||||
@ -17,7 +17,7 @@
|
||||
SELECT
|
||||
github_actions.workflow_dispatches(
|
||||
'FlipsideCrypto',
|
||||
'polygon-models',
|
||||
'{{ blockchain }}' ~ '-models',
|
||||
'dbt_run_streamline_decoded_logs_history.yml',
|
||||
NULL
|
||||
)
|
||||
|
||||
@ -0,0 +1,101 @@
|
||||
{% macro streamline_external_table_query_decoder(
|
||||
source_name,
|
||||
source_version
|
||||
) %}
|
||||
|
||||
{% if source_version != '' %}
|
||||
{% set source_version = '_' ~ source_version.lower() %}
|
||||
{% endif %}
|
||||
|
||||
WITH meta AS (
|
||||
SELECT
|
||||
job_created_time AS _inserted_timestamp,
|
||||
file_name,
|
||||
CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 6), '_', 1) AS INTEGER) AS _partition_by_block_number,
|
||||
TO_DATE(
|
||||
concat_ws('-', SPLIT_PART(file_name, '/', 3), SPLIT_PART(file_name, '/', 4), SPLIT_PART(file_name, '/', 5))
|
||||
) AS _partition_by_created_date
|
||||
FROM
|
||||
TABLE(
|
||||
information_schema.external_table_file_registration_history(
|
||||
start_time => DATEADD('day', -3, CURRENT_TIMESTAMP()),
|
||||
table_name => '{{ source( "bronze_streamline", source_name ~ source_version) }}')
|
||||
) A
|
||||
)
|
||||
SELECT
|
||||
block_number,
|
||||
id :: STRING AS id,
|
||||
DATA,
|
||||
metadata,
|
||||
b.file_name,
|
||||
_inserted_timestamp,
|
||||
s._partition_by_block_number AS _partition_by_block_number,
|
||||
s._partition_by_created_date AS _partition_by_created_date
|
||||
FROM
|
||||
{{ source(
|
||||
"bronze_streamline",
|
||||
source_name ~ source_version
|
||||
) }}
|
||||
s
|
||||
JOIN meta b
|
||||
ON b.file_name = metadata$filename
|
||||
AND b._partition_by_block_number = s._partition_by_block_number
|
||||
AND b._partition_by_created_date = s._partition_by_created_date
|
||||
WHERE
|
||||
b._partition_by_block_number = s._partition_by_block_number
|
||||
AND b._partition_by_created_date = s._partition_by_created_date
|
||||
AND s._partition_by_created_date >= DATEADD('day', -2, CURRENT_TIMESTAMP())
|
||||
AND DATA :error IS NULL
|
||||
AND DATA IS NOT NULL
|
||||
{% endmacro %}
|
||||
|
||||
|
||||
{% macro streamline_external_table_query_decoder_fr(
|
||||
source_name,
|
||||
source_version
|
||||
) %}
|
||||
|
||||
{% if source_version != '' %}
|
||||
{% set source_version = '_' ~ source_version.lower() %}
|
||||
{% endif %}
|
||||
|
||||
WITH meta AS (
|
||||
SELECT
|
||||
registered_on AS _inserted_timestamp,
|
||||
file_name,
|
||||
CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 6), '_', 1) AS INTEGER) AS _partition_by_block_number,
|
||||
TO_DATE(
|
||||
concat_ws('-', SPLIT_PART(file_name, '/', 3), SPLIT_PART(file_name, '/', 4), SPLIT_PART(file_name, '/', 5))
|
||||
) AS _partition_by_created_date
|
||||
FROM
|
||||
TABLE(
|
||||
information_schema.external_table_files(
|
||||
table_name => '{{ source( "bronze_streamline", source_name ~ source_version) }}'
|
||||
)
|
||||
) A
|
||||
)
|
||||
SELECT
|
||||
block_number,
|
||||
id :: STRING AS id,
|
||||
DATA,
|
||||
metadata,
|
||||
b.file_name,
|
||||
_inserted_timestamp,
|
||||
s._partition_by_block_number AS _partition_by_block_number,
|
||||
s._partition_by_created_date AS _partition_by_created_date
|
||||
FROM
|
||||
{{ source(
|
||||
"bronze_streamline",
|
||||
source_name ~ source_version
|
||||
) }}
|
||||
s
|
||||
JOIN meta b
|
||||
ON b.file_name = metadata$filename
|
||||
AND b._partition_by_block_number = s._partition_by_block_number
|
||||
AND b._partition_by_created_date = s._partition_by_created_date
|
||||
WHERE
|
||||
b._partition_by_block_number = s._partition_by_block_number
|
||||
AND b._partition_by_created_date = s._partition_by_created_date
|
||||
AND DATA :error IS NULL
|
||||
AND DATA IS NOT NULL
|
||||
{% endmacro %}
|
||||
36
macros/fsc_evm_temp/main_package/logging/logging.sql
Normal file
36
macros/fsc_evm_temp/main_package/logging/logging.sql
Normal file
@ -0,0 +1,36 @@
|
||||
{% macro log_model_details(vars=false, params=false) %}
|
||||
|
||||
{%- if execute -%}
|
||||
/*
|
||||
DBT Model Config:
|
||||
{{ model.config | tojson(indent=2) }}
|
||||
*/
|
||||
|
||||
{% if vars is not false %}
|
||||
|
||||
{% if var('LOG_MODEL_DETAILS', false) %}
|
||||
{{ log( vars | tojson(indent=2), info=True) }}
|
||||
{% endif %}
|
||||
/*
|
||||
Variables:
|
||||
{{ vars | tojson(indent=2) }}
|
||||
*/
|
||||
{% endif %}
|
||||
|
||||
{% if params is not false %}
|
||||
|
||||
{% if var('LOG_MODEL_DETAILS', false) %}
|
||||
{{ log( params | tojson(indent=2), info=True) }}
|
||||
{% endif %}
|
||||
/*
|
||||
Parameters:
|
||||
{{ params | tojson(indent=2) }}
|
||||
*/
|
||||
{% endif %}
|
||||
|
||||
/*
|
||||
Raw Code:
|
||||
{{ model.raw_code }}
|
||||
*/
|
||||
{%- endif -%}
|
||||
{% endmacro %}
|
||||
@ -42,7 +42,7 @@ WHERE
|
||||
)
|
||||
AND DATA NOT ILIKE '%Event topic is not present in given ABI%'
|
||||
{% else %}
|
||||
{{ ref('bronze__fr_decoded_logs') }}
|
||||
{{ ref('bronze__decoded_logs_fr') }}
|
||||
WHERE
|
||||
_partition_by_block_number <= 2500000
|
||||
AND DATA NOT ILIKE '%Event topic is not present in given ABI%'
|
||||
|
||||
@ -30,6 +30,7 @@ sources:
|
||||
- name: receipts_v2
|
||||
- name: traces_v2
|
||||
- name: confirm_blocks_v2
|
||||
- name: decoded_logs_v2
|
||||
- name: streamline_crosschain
|
||||
database: streamline
|
||||
schema: crosschain
|
||||
|
||||
@ -1,41 +1,23 @@
|
||||
{{ config (
|
||||
materialized = 'view'
|
||||
{# Set variables #}
|
||||
{% set source_name = 'DECODED_LOGS' %}
|
||||
{% set source_version = 'V2' if var('GLOBAL_USES_STREAMLINE_V1', false) else '' %}
|
||||
{% set model_type = '' %}
|
||||
|
||||
{%- set default_vars = set_default_variables_bronze(source_name, model_type) -%}
|
||||
|
||||
{# Log configuration details #}
|
||||
{{ log_model_details(
|
||||
vars = default_vars
|
||||
) }}
|
||||
|
||||
WITH meta AS (
|
||||
{# Set up dbt configuration #}
|
||||
{{ config (
|
||||
materialized = 'view',
|
||||
tags = ['bronze_decoded_logs']
|
||||
) }}
|
||||
|
||||
SELECT
|
||||
last_modified AS _inserted_timestamp,
|
||||
file_name,
|
||||
CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 6), '_', 1) AS INTEGER) AS _partition_by_block_number,
|
||||
TO_DATE(
|
||||
concat_ws('-', SPLIT_PART(file_name, '/', 3), SPLIT_PART(file_name, '/', 4), SPLIT_PART(file_name, '/', 5))
|
||||
) AS _partition_by_created_date
|
||||
FROM
|
||||
TABLE(
|
||||
information_schema.external_table_file_registration_history(
|
||||
start_time => DATEADD('day', -3, CURRENT_TIMESTAMP()),
|
||||
table_name => '{{ source( "bronze_streamline", "decoded_logs") }}')
|
||||
) A
|
||||
)
|
||||
SELECT
|
||||
block_number,
|
||||
id :: STRING AS id,
|
||||
DATA,
|
||||
_inserted_timestamp,
|
||||
s._partition_by_block_number AS _partition_by_block_number,
|
||||
s._partition_by_created_date AS _partition_by_created_date
|
||||
FROM
|
||||
{{ source(
|
||||
"bronze_streamline",
|
||||
"decoded_logs"
|
||||
) }}
|
||||
s
|
||||
JOIN meta b
|
||||
ON b.file_name = metadata$filename
|
||||
AND b._partition_by_block_number = s._partition_by_block_number
|
||||
AND b._partition_by_created_date = s._partition_by_created_date
|
||||
WHERE
|
||||
b._partition_by_block_number = s._partition_by_block_number
|
||||
AND b._partition_by_created_date = s._partition_by_created_date
|
||||
AND s._partition_by_created_date >= DATEADD('day', -2, CURRENT_TIMESTAMP())
|
||||
{# Main query starts here #}
|
||||
{{ streamline_external_table_query_decoder(
|
||||
source_name = source_name.lower(),
|
||||
source_version = source_version.lower()
|
||||
) }}
|
||||
20
models/streamline/bronze/decoder/bronze__decoded_logs_fr.sql
Normal file
20
models/streamline/bronze/decoder/bronze__decoded_logs_fr.sql
Normal file
@ -0,0 +1,20 @@
|
||||
{# Log configuration details #}
|
||||
{{ log_model_details() }}
|
||||
|
||||
{# Set up dbt configuration #}
|
||||
{{ config (
|
||||
materialized = 'view',
|
||||
tags = ['bronze_decoded_logs']
|
||||
) }}
|
||||
|
||||
SELECT
|
||||
*
|
||||
FROM
|
||||
{{ ref('bronze__decoded_logs_fr_v2') }}
|
||||
{% if var('GLOBAL_USES_STREAMLINE_V1', false) %}
|
||||
UNION ALL
|
||||
SELECT
|
||||
*
|
||||
FROM
|
||||
{{ ref('bronze__decoded_logs_fr_v1') }}
|
||||
{% endif %}
|
||||
@ -0,0 +1,23 @@
|
||||
{# Set variables #}
|
||||
{% set source_name = 'DECODED_LOGS' %}
|
||||
{% set source_version = '' %}
|
||||
{% set model_type = 'FR' %}
|
||||
|
||||
{%- set default_vars = set_default_variables_bronze(source_name, model_type) -%}
|
||||
|
||||
{# Log configuration details #}
|
||||
{{ log_model_details(
|
||||
vars = default_vars
|
||||
) }}
|
||||
|
||||
{# Set up dbt configuration #}
|
||||
{{ config (
|
||||
materialized = 'view',
|
||||
tags = ['bronze_decoded_logs_streamline_v1']
|
||||
) }}
|
||||
|
||||
{# Main query starts here #}
|
||||
{{ streamline_external_table_query_decoder_fr(
|
||||
source_name = source_name.lower(),
|
||||
source_version = source_version.lower()
|
||||
) }}
|
||||
@ -0,0 +1,23 @@
|
||||
{# Set variables #}
|
||||
{% set source_name = 'DECODED_LOGS' %}
|
||||
{% set source_version = 'V2' if var('GLOBAL_USES_STREAMLINE_V1', false) else '' %}
|
||||
{% set model_type = 'FR' %}
|
||||
|
||||
{%- set default_vars = set_default_variables_bronze(source_name, model_type) -%}
|
||||
|
||||
{# Log configuration details #}
|
||||
{{ log_model_details(
|
||||
vars = default_vars
|
||||
) }}
|
||||
|
||||
{# Set up dbt configuration #}
|
||||
{{ config (
|
||||
materialized = 'view',
|
||||
tags = ['bronze_decoded_logs']
|
||||
) }}
|
||||
|
||||
{# Main query starts here #}
|
||||
{{ streamline_external_table_query_decoder_fr(
|
||||
source_name = source_name.lower(),
|
||||
source_version = source_version.lower()
|
||||
) }}
|
||||
@ -1,40 +0,0 @@
|
||||
{{ config (
|
||||
materialized = 'view'
|
||||
) }}
|
||||
|
||||
WITH meta AS (
|
||||
|
||||
SELECT
|
||||
registered_on AS _inserted_timestamp,
|
||||
file_name,
|
||||
CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 6), '_', 1) AS INTEGER) AS _partition_by_block_number,
|
||||
TO_DATE(
|
||||
concat_ws('-', SPLIT_PART(file_name, '/', 3), SPLIT_PART(file_name, '/', 4), SPLIT_PART(file_name, '/', 5))
|
||||
) AS _partition_by_created_date
|
||||
FROM
|
||||
TABLE(
|
||||
information_schema.external_table_files(
|
||||
table_name => '{{ source( "bronze_streamline", "decoded_logs") }}'
|
||||
)
|
||||
) A
|
||||
)
|
||||
SELECT
|
||||
block_number,
|
||||
id :: STRING AS id,
|
||||
DATA,
|
||||
_inserted_timestamp,
|
||||
s._partition_by_block_number AS _partition_by_block_number,
|
||||
s._partition_by_created_date AS _partition_by_created_date
|
||||
FROM
|
||||
{{ source(
|
||||
"bronze_streamline",
|
||||
"decoded_logs"
|
||||
) }}
|
||||
s
|
||||
JOIN meta b
|
||||
ON b.file_name = metadata$filename
|
||||
AND b._partition_by_block_number = s._partition_by_block_number
|
||||
AND b._partition_by_created_date = s._partition_by_created_date
|
||||
WHERE
|
||||
b._partition_by_block_number = s._partition_by_block_number
|
||||
AND b._partition_by_created_date = s._partition_by_created_date
|
||||
@ -0,0 +1,50 @@
|
||||
{# Set variables #}
|
||||
{%- set source_name = 'DECODED_LOGS' -%}
|
||||
{%- set model_type = 'COMPLETE' -%}
|
||||
|
||||
{%- set full_refresh_type = var((source_name ~ '_complete_full_refresh').upper(), false) -%}
|
||||
|
||||
{% set post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION on equality(_log_id)" %}
|
||||
|
||||
{# Log configuration details #}
|
||||
{{ log_model_details() }}
|
||||
|
||||
{# Set up dbt configuration #}
|
||||
-- depends_on: {{ ref('bronze__' ~ source_name.lower()) }}
|
||||
|
||||
{{ config (
|
||||
materialized = "incremental",
|
||||
unique_key = "_log_id",
|
||||
cluster_by = "ROUND(block_number, -3)",
|
||||
incremental_predicates = ["dynamic_range", "block_number"],
|
||||
merge_update_columns = ["_log_id"],
|
||||
post_hook = post_hook,
|
||||
full_refresh = full_refresh_type,
|
||||
tags = ['streamline_decoded_logs_complete']
|
||||
) }}
|
||||
|
||||
{# Main query starts here #}
|
||||
SELECT
|
||||
block_number,
|
||||
file_name,
|
||||
id AS _log_id,
|
||||
{{ dbt_utils.generate_surrogate_key(['id']) }} AS complete_{{ source_name.lower() }}_id,
|
||||
SYSDATE() AS inserted_timestamp,
|
||||
SYSDATE() AS modified_timestamp,
|
||||
_inserted_timestamp,
|
||||
'{{ invocation_id }}' AS _invocation_id
|
||||
FROM
|
||||
{% if is_incremental() %}
|
||||
{{ ref('bronze__' ~ source_name.lower()) }}
|
||||
WHERE
|
||||
_inserted_timestamp >= (
|
||||
SELECT
|
||||
COALESCE(MAX(_inserted_timestamp), '1970-01-01'::TIMESTAMP) AS _inserted_timestamp
|
||||
FROM
|
||||
{{ this }}
|
||||
)
|
||||
{% else %}
|
||||
{{ ref('bronze__' ~ source_name.lower() ~ '_fr') }}
|
||||
{% endif %}
|
||||
|
||||
QUALIFY (ROW_NUMBER() OVER (PARTITION BY id ORDER BY _inserted_timestamp DESC)) = 1
|
||||
@ -0,0 +1,110 @@
|
||||
{%- set testing_limit = var('DECODED_LOGS_REALTIME_TESTING_LIMIT', none) -%}
|
||||
|
||||
{%- set streamline_params = {
|
||||
"external_table": var("DECODED_LOGS_REALTIME_EXTERNAL_TABLE", "decoded_logs"),
|
||||
"sql_limit": var("DECODED_LOGS_REALTIME_SQL_LIMIT", 10000000),
|
||||
"producer_batch_size": var("DECODED_LOGS_REALTIME_PRODUCER_BATCH_SIZE", 400000),
|
||||
"worker_batch_size": var("DECODED_LOGS_REALTIME_WORKER_BATCH_SIZE", 200000),
|
||||
"sql_source": "decoded_logs_realtime"
|
||||
} -%}
|
||||
|
||||
{# Log configuration details #}
|
||||
{{ log_model_details(
|
||||
params = streamline_params
|
||||
) }}
|
||||
|
||||
{# Set up dbt configuration #}
|
||||
{{ config (
|
||||
materialized = "view",
|
||||
post_hook = [fsc_utils.if_data_call_function_v2(
|
||||
func = 'streamline.udf_bulk_decode_logs_v2',
|
||||
target = "{{this.schema}}.{{this.identifier}}",
|
||||
params = {
|
||||
"external_table": streamline_params['external_table'],
|
||||
"sql_limit": streamline_params['sql_limit'],
|
||||
"producer_batch_size": streamline_params['producer_batch_size'],
|
||||
"worker_batch_size": streamline_params['worker_batch_size'],
|
||||
"sql_source": streamline_params['sql_source']
|
||||
}
|
||||
),
|
||||
fsc_utils.if_data_call_wait()],
|
||||
tags = ['streamline_decoded_logs_realtime']
|
||||
) }}
|
||||
|
||||
WITH target_blocks AS (
|
||||
SELECT
|
||||
block_number
|
||||
FROM
|
||||
{{ ref('core__fact_blocks') }}
|
||||
WHERE
|
||||
block_number >= (
|
||||
SELECT
|
||||
block_number
|
||||
FROM
|
||||
{{ ref('_24_hour_lookback') }}
|
||||
)
|
||||
),
|
||||
existing_logs_to_exclude AS (
|
||||
SELECT
|
||||
_log_id
|
||||
FROM
|
||||
{{ ref('streamline__decoded_logs_complete') }}
|
||||
l
|
||||
INNER JOIN target_blocks b USING (block_number)
|
||||
WHERE
|
||||
l.inserted_timestamp :: DATE >= DATEADD('day', -2, SYSDATE())
|
||||
),
|
||||
candidate_logs AS (
|
||||
SELECT
|
||||
l.block_number,
|
||||
l.tx_hash,
|
||||
l.event_index,
|
||||
l.contract_address,
|
||||
l.topics,
|
||||
l.data,
|
||||
CONCAT(
|
||||
l.tx_hash :: STRING,
|
||||
'-',
|
||||
l.event_index :: STRING
|
||||
) AS _log_id
|
||||
FROM
|
||||
target_blocks b
|
||||
INNER JOIN {{ ref('core__fact_event_logs') }}
|
||||
l USING (block_number)
|
||||
WHERE
|
||||
l.tx_status = 'SUCCESS'
|
||||
AND l.inserted_timestamp :: DATE >= DATEADD('day', -2, SYSDATE())
|
||||
)
|
||||
SELECT
|
||||
l.block_number,
|
||||
l._log_id,
|
||||
A.abi,
|
||||
OBJECT_CONSTRUCT(
|
||||
'topics',
|
||||
l.topics,
|
||||
'data',
|
||||
l.data,
|
||||
'address',
|
||||
l.contract_address
|
||||
) AS DATA
|
||||
FROM
|
||||
candidate_logs l
|
||||
INNER JOIN {{ ref('silver__complete_event_abis') }} A
|
||||
ON A.parent_contract_address = l.contract_address
|
||||
AND A.event_signature = l.topics [0] :: STRING
|
||||
AND l.block_number BETWEEN A.start_block
|
||||
AND A.end_block
|
||||
WHERE
|
||||
NOT EXISTS (
|
||||
SELECT
|
||||
1
|
||||
FROM
|
||||
existing_logs_to_exclude e
|
||||
WHERE
|
||||
e._log_id = l._log_id
|
||||
)
|
||||
|
||||
{% if testing_limit is not none %}
|
||||
LIMIT
|
||||
{{ testing_limit }}
|
||||
{% endif %}
|
||||
@ -1,34 +0,0 @@
|
||||
-- depends_on: {{ ref('bronze__decoded_logs') }}
|
||||
{{ config (
|
||||
materialized = "incremental",
|
||||
unique_key = "_log_id",
|
||||
cluster_by = "ROUND(block_number, -3)",
|
||||
incremental_predicates = ["dynamic_range", "block_number"],
|
||||
merge_update_columns = ["_log_id"],
|
||||
post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION on equality(_log_id)",
|
||||
full_refresh = false,
|
||||
tags = ['streamline_decoded_logs_complete']
|
||||
) }}
|
||||
|
||||
SELECT
|
||||
block_number,
|
||||
id AS _log_id,
|
||||
_inserted_timestamp
|
||||
FROM
|
||||
|
||||
{% if is_incremental() %}
|
||||
{{ ref('bronze__decoded_logs') }}
|
||||
WHERE
|
||||
TO_TIMESTAMP_NTZ(_inserted_timestamp) >= (
|
||||
SELECT
|
||||
MAX(_inserted_timestamp)
|
||||
FROM
|
||||
{{ this }}
|
||||
)
|
||||
{% else %}
|
||||
{{ ref('bronze__fr_decoded_logs') }}
|
||||
{% endif %}
|
||||
|
||||
qualify(ROW_NUMBER() over (PARTITION BY id
|
||||
ORDER BY
|
||||
_inserted_timestamp DESC)) = 1
|
||||
@ -1,80 +0,0 @@
|
||||
{{ config (
|
||||
materialized = "view",
|
||||
post_hook = [if_data_call_function( func = "{{this.schema}}.udf_bulk_decode_logs(object_construct('sql_source', '{{this.identifier}}','producer_batch_size', 20000000,'producer_limit_size', 20000000))", target = "{{this.schema}}.{{this.identifier}}" ),"call system$wait(" ~ var("WAIT", 400) ~ ")" ],
|
||||
tags = ['streamline_decoded_logs_realtime']
|
||||
) }}
|
||||
|
||||
WITH target_blocks AS (
|
||||
|
||||
SELECT
|
||||
block_number
|
||||
FROM
|
||||
{{ ref('core__fact_blocks') }}
|
||||
WHERE
|
||||
block_number >= (
|
||||
SELECT
|
||||
block_number
|
||||
FROM
|
||||
{{ ref("_block_lookback") }}
|
||||
)
|
||||
),
|
||||
existing_logs_to_exclude AS (
|
||||
SELECT
|
||||
_log_id
|
||||
FROM
|
||||
{{ ref('streamline__complete_decode_logs') }}
|
||||
l
|
||||
INNER JOIN target_blocks b USING (block_number)
|
||||
WHERE
|
||||
l._inserted_timestamp :: DATE >= DATEADD('day', -5, SYSDATE())
|
||||
),
|
||||
candidate_logs AS (
|
||||
SELECT
|
||||
l.block_number,
|
||||
l.tx_hash,
|
||||
l.event_index,
|
||||
l.contract_address,
|
||||
l.topics,
|
||||
l.data,
|
||||
CONCAT(
|
||||
l.tx_hash :: STRING,
|
||||
'-',
|
||||
l.event_index :: STRING
|
||||
) AS _log_id
|
||||
FROM
|
||||
target_blocks b
|
||||
INNER JOIN {{ ref('core__fact_event_logs') }}
|
||||
l USING (block_number)
|
||||
WHERE
|
||||
l.tx_status = 'SUCCESS'
|
||||
AND l.inserted_timestamp :: DATE >= DATEADD('day', -5, SYSDATE())
|
||||
)
|
||||
SELECT
|
||||
l.block_number,
|
||||
l._log_id,
|
||||
A.abi AS abi,
|
||||
OBJECT_CONSTRUCT(
|
||||
'topics',
|
||||
l.topics,
|
||||
'data',
|
||||
l.data,
|
||||
'address',
|
||||
l.contract_address
|
||||
) AS DATA
|
||||
FROM
|
||||
candidate_logs l
|
||||
INNER JOIN {{ ref('silver__complete_event_abis') }} A
|
||||
ON A.parent_contract_address = l.contract_address
|
||||
AND A.event_signature = l.topics [0] :: STRING
|
||||
AND l.block_number BETWEEN A.start_block
|
||||
AND A.end_block
|
||||
WHERE
|
||||
NOT EXISTS (
|
||||
SELECT
|
||||
1
|
||||
FROM
|
||||
existing_logs_to_exclude e
|
||||
WHERE
|
||||
e._log_id = l._log_id
|
||||
)
|
||||
limit 7500000
|
||||
@ -6,7 +6,7 @@ packages:
|
||||
- package: dbt-labs/dbt_utils
|
||||
version: 1.0.0
|
||||
- git: https://github.com/FlipsideCrypto/fsc-utils.git
|
||||
revision: eb33ac727af26ebc8a8cc9711d4a6ebc3790a107
|
||||
revision: c3ab97e8e06d31e8c6f63819714e0a2d45c45e82
|
||||
- package: get-select/dbt_snowflake_query_tags
|
||||
version: 2.5.0
|
||||
- git: https://github.com/FlipsideCrypto/fsc-evm.git
|
||||
@ -15,4 +15,4 @@ packages:
|
||||
version: 0.7.2
|
||||
- git: https://github.com/FlipsideCrypto/livequery-models.git
|
||||
revision: b024188be4e9c6bc00ed77797ebdc92d351d620e
|
||||
sha1_hash: 622a679ecf98e6ebf3c904241902ce5328c77e52
|
||||
sha1_hash: b53a8dd1a1f99a375e8f024920d5eb7630a0765f
|
||||
|
||||
@ -6,7 +6,7 @@ packages:
|
||||
- package: dbt-labs/dbt_utils
|
||||
version: 1.0.0
|
||||
- git: https://github.com/FlipsideCrypto/fsc-utils.git
|
||||
revision: v1.29.0
|
||||
revision: v1.31.0
|
||||
- package: get-select/dbt_snowflake_query_tags
|
||||
version: [">=2.0.0", "<3.0.0"]
|
||||
- git: https://github.com/FlipsideCrypto/fsc-evm.git
|
||||
|
||||
Loading…
Reference in New Issue
Block a user