AN-4606/decoded-logs (#20)

* decoded logs streamline, silver, core and macros

* realtime job

* vars in workflow

* v2

* wait

* overview docs

* decoder sched

* remove coalesce
This commit is contained in:
drethereum 2024-03-13 11:21:32 -06:00 committed by GitHub
parent a079acf22d
commit 251ddd2ac1
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
59 changed files with 1131 additions and 2 deletions

View File

@ -41,4 +41,4 @@ jobs:
dbt deps
- name: Run DBT Jobs
run: |
dbt run -m "blast_models,tag:non_realtime"
dbt run --vars '{"STREAMLINE_INVOKE_STREAMS":True}' -m "blast_models,tag:non_realtime" "blast_models,tag:streamline_decoded_logs_complete" "blast_models,tag:streamline_decoded_logs_realtime"

View File

@ -0,0 +1,44 @@
name: dbt_run_streamline_decoder
run-name: dbt_run_streamline_decoder
on:
workflow_dispatch:
branches:
- "main"
env:
DBT_PROFILES_DIR: ./
ACCOUNT: "${{ vars.ACCOUNT }}"
ROLE: "${{ vars.ROLE }}"
USER: "${{ vars.USER }}"
PASSWORD: "${{ secrets.PASSWORD }}"
REGION: "${{ vars.REGION }}"
DATABASE: "${{ vars.DATABASE }}"
WAREHOUSE: "${{ vars.WAREHOUSE }}"
SCHEMA: "${{ vars.SCHEMA }}"
concurrency:
group: ${{ github.workflow }}
jobs:
run_dbt_jobs:
runs-on: ubuntu-latest
environment:
name: workflow_prod
steps:
- uses: actions/checkout@v3
- uses: actions/setup-python@v4
with:
python-version: "3.10"
cache: "pip"
- name: install dependencies
run: |
pip install -r requirements.txt
dbt deps
- name: Run DBT Jobs
run: |
dbt run -m "blast_models,tag:decoded_logs"

View File

@ -0,0 +1,45 @@
name: dbt_run_streamline_decoder_history
run-name: dbt_run_streamline_decoder_history
on:
workflow_dispatch:
schedule:
# Runs "at 1:05 UTC AM" (see https://crontab.guru)
- cron: '5 1 * * *'
env:
DBT_PROFILES_DIR: ./
ACCOUNT: "${{ vars.ACCOUNT }}"
ROLE: "${{ vars.ROLE }}"
USER: "${{ vars.USER }}"
PASSWORD: "${{ secrets.PASSWORD }}"
REGION: "${{ vars.REGION }}"
DATABASE: "${{ vars.DATABASE }}"
WAREHOUSE: "${{ vars.WAREHOUSE }}"
SCHEMA: "${{ vars.SCHEMA }}"
concurrency:
group: ${{ github.workflow }}
jobs:
run_dbt_jobs:
runs-on: ubuntu-latest
environment:
name: workflow_prod
steps:
- uses: actions/checkout@v3
- uses: actions/setup-python@v4
with:
python-version: "3.10"
cache: "pip"
- name: install dependencies
run: |
pip install -r requirements.txt
dbt deps
- name: Run DBT Jobs
run: |
dbt run --threads 8 --vars '{"STREAMLINE_INVOKE_STREAMS":True,"WAIT":120}' -m "blast_models,tag:streamline_decoded_logs_complete" "blast_models,tag:streamline_decoded_logs_history"

View File

@ -1,4 +1,5 @@
workflow_name,workflow_schedule
dbt_run_scheduled_non_realtime,"17,47 * * * *"
dbt_run_streamline_chainhead,"10,40 * * * *"
dbt_run_streamline_decoder,"25,55 * * * *"
dbt_test_tasks,"10 * * * *"
1 workflow_name workflow_schedule
2 dbt_run_scheduled_non_realtime 17,47 * * * *
3 dbt_run_streamline_chainhead 10,40 * * * *
4 dbt_run_streamline_decoder 25,55 * * * *
5 dbt_test_tasks 10 * * * *

View File

@ -8,7 +8,8 @@
) }}
{{ create_udf_rest_api() }}
{{ create_aws_blast_api() }}
{{ create_udf_bulk_decode_logs() }}
{% endset %}
{% do run_query(sql) %}
{{- fsc_utils.create_udfs() -}}

View File

@ -8,4 +8,15 @@
{% else %}
aws_blast_api_dev AS 'https://y9d0tuavh6.execute-api.us-east-1.amazonaws.com/stg/udf_bulk_rest_api'
{%- endif %};
{% endmacro %}
{% macro create_udf_bulk_decode_logs() %}
CREATE
OR REPLACE EXTERNAL FUNCTION streamline.udf_bulk_decode_logs(
json OBJECT
) returns ARRAY api_integration = {% if target.name == "prod" %}
aws_blast_api AS 'https://42gzudc5si.execute-api.us-east-1.amazonaws.com/prod/bulk_decode_logs'
{% else %}
aws_blast_api_dev AS'https://y9d0tuavh6.execute-api.us-east-1.amazonaws.com/stg/bulk_decode_logs'
{%- endif %};
{% endmacro %}

View File

@ -25,6 +25,7 @@ There is more information on how to use dbt docs in the last section of this doc
**Fact Tables:**
- [fact_blocks](https://flipsidecrypto.github.io/blast-models/#!/model/model.blast_models.core__fact_blocks)
- [fact_event_logs](https://flipsidecrypto.github.io/blast-models/#!/model/model.blast_models.core__fact_event_logs)
- [fact_decoded_event_logs](https://flipsidecrypto.github.io/blast-models/#!/model/model.blast_models.core__fact_decoded_event_logs)
- [fact_traces](https://flipsidecrypto.github.io/blast-models/#!/model/model.blast_models.core__fact_traces)
- [fact_transactions](https://flipsidecrypto.github.io/blast-models/#!/model/model.blast_models.core__fact_transactions)
- [fact_token_transfers](https://flipsidecrypto.github.io/blast-models/#!/model/model.blast_models.core__fact_token_transfers)
@ -32,6 +33,7 @@ There is more information on how to use dbt docs in the last section of this doc
**Convenience Tables:**
- [ez_native_transfers](https://flipsidecrypto.github.io/blast-models/#!/model/model.blast_models.core__ez_native_transfers)
- [ez_token_transfers](https://flipsidecrypto.github.io/blast-models/#!/model/model.blast_models.core__ez_token_transfers)
- [ez_decoded_event_logs](https://flipsidecrypto.github.io/blast-models/#!/model/model.blast_models.core__ez_decoded_event_logs)
### Price Tables (blast.price)
- [fact_hourly_token_prices](https://flipsidecrypto.github.io/blast-models/#!/model/model.blast_models.price__fact_hourly_token_prices)

View File

@ -0,0 +1,30 @@
{{ config(
materialized = 'view',
persist_docs ={ "relation": true,
"columns": true }
) }}
SELECT
block_number,
block_timestamp,
tx_hash,
event_index,
contract_address,
token_name AS contract_name,
event_name,
decoded_flat AS decoded_log,
decoded_data AS full_decoded_log,
origin_function_signature,
origin_from_address,
origin_to_address,
topics,
DATA,
event_removed,
tx_status,
decoded_logs_id AS ez_decoded_event_logs_id,
GREATEST(COALESCE(l.inserted_timestamp, '2000-01-01'), COALESCE(C.inserted_timestamp, '2000-01-01')) AS inserted_timestamp,
GREATEST(COALESCE(l.modified_timestamp, '2000-01-01'), COALESCE(C.modified_timestamp, '2000-01-01')) AS modified_timestamp
FROM
{{ ref('silver__decoded_logs') }}
l
LEFT JOIN {{ ref('silver__contracts') }} C USING (contract_address)

View File

@ -0,0 +1,65 @@
version: 2
models:
- name: core__ez_decoded_event_logs
description: >
'For information on how to submit a contract for decoding, as well as how ABIs are sourced, please visit [here](https://science.flipsidecrypto.xyz/abi-requestor/).
This model contains decoded event logs for contracts that we have an ABI for. Please note, this table does not include all event logs, only those that we have an ABI for.
The `decoded_log` column is the easiest place to query decoded data. It is a JSON object, where the keys are the names of the event parameters, and the values are the values of the event parameters.
You can select from this column using the following sample format, `decoded_log:from::string` or more generally, `decoded_log:<event_param>::datatype`. See below for a full sample query.
The `full_decoded_logs` column contains the same information, as well as additional fields such as the datatype of the decoded data. You may need to laterally flatten this column to query the data.
Sample query for USDC Transfer events:
```sql
select
tx_hash,
block_number,
contract_address,
decoded_log:from::string as from_address,
decoded_log:to::string as to_address,
decoded_log:value::integer as value
from ethereum.core.fact_decoded_event_logs
where contract_address = lower('0xA0b86991c6218b36c1d19D4a2e9Eb0cE3606eB48')
and block_number between 16400000 and 16405000
and event_name = 'Transfer'
limit 50```'
columns:
- name: BLOCK_NUMBER
description: '{{ doc("blast_block_number") }}'
- name: BLOCK_TIMESTAMP
description: '{{ doc("blast_block_timestamp") }}'
- name: TX_HASH
description: '{{ doc("blast_logs_tx_hash") }}'
- name: EVENT_INDEX
description: '{{ doc("blast_event_index") }}'
- name: CONTRACT_ADDRESS
description: '{{ doc("blast_logs_contract_address") }}'
- name: CONTRACT_NAME
description: 'The name of the contract, if the contract has a name() function.'
- name: EVENT_NAME
description: 'The name of the event, as defined in the contract ABI.'
- name: DECODED_LOG
description: 'The flattened decoded log, where the keys are the names of the event parameters, and the values are the values of the event parameters.'
- name: FULL_DECODED_LOG
description: 'The full decoded log, including the event name, the event parameters, and the data type of the event parameters.'
- name: ORIGIN_FUNCTION_SIGNATURE
description: '{{ doc("blast_tx_origin_sig") }}'
- name: ORIGIN_FROM_ADDRESS
description: '{{ doc("blast_origin_from") }}'
- name: ORIGIN_TO_ADDRESS
description: '{{ doc("blast_origin_to") }}'
- name: TOPICS
description: '{{ doc("blast_topics") }}'
- name: DATA
description: '{{ doc("blast_logs_data") }}'
- name: EVENT_REMOVED
description: '{{ doc("blast_event_removed") }}'
- name: TX_STATUS
description: '{{ doc("blast_tx_status") }}'
- name: EZ_DECODED_EVENT_LOGS_ID
description: '{{ doc("pk") }}'
- name: INSERTED_TIMESTAMP
description: '{{ doc("inserted_timestamp") }}'
- name: MODIFIED_TIMESTAMP
description: '{{ doc("modified_timestamp") }}'

View File

@ -0,0 +1,20 @@
{{ config(
materialized = 'view',
persist_docs ={ "relation": true,
"columns": true }
) }}
SELECT
block_number,
block_timestamp,
tx_hash,
event_index,
contract_address,
event_name,
decoded_flat AS decoded_log,
decoded_data AS full_decoded_log,
decoded_logs_id AS fact_decoded_event_logs_id,
inserted_timestamp,
modified_timestamp
FROM
{{ ref('silver__decoded_logs') }}

View File

@ -0,0 +1,49 @@
version: 2
models:
- name: core__fact_decoded_event_logs
description: >
'For information on how to submit a contract for decoding, as well as how ABIs are sourced, please visit [here](https://science.flipsidecrypto.xyz/abi-requestor/).
This model contains decoded event logs for contracts that we have an ABI for. Please note, this table does not include all event logs, only those that we have an ABI for.
This table will perform better than the `core__ez_decoded_event_logs` table, but does not include as many columns.
The `decoded_log` column is the easiest place to query decoded data. It is a JSON object, where the keys are the names of the event parameters, and the values are the values of the event parameters.
You can select from this column using the following sample format, `decoded_log:from::string` or more generally, `decoded_log:<event_param>::datatype`. See below for a full sample query.
The `full_decoded_logs` column contains the same information, as well as additional fields such as the datatype of the decoded data. You may need to laterally flatten this column to query the data.
Sample query for USDC Transfer events:
```sql
select
tx_hash,
block_number,
contract_address,
decoded_log:from::string as from_address,
decoded_log:to::string as to_address,
decoded_log:value::integer as value
from ethereum.core.fact_decoded_event_logs
where contract_address = lower('0xA0b86991c6218b36c1d19D4a2e9Eb0cE3606eB48')
and block_number between 16400000 and 16405000
and event_name = 'Transfer'
limit 50```'
columns:
- name: BLOCK_NUMBER
description: '{{ doc("blast_block_number") }}'
- name: BLOCK_TIMESTAMP
description: '{{ doc("blast_block_timestamp") }}'
- name: TX_HASH
description: '{{ doc("blast_logs_tx_hash") }}'
- name: EVENT_INDEX
description: '{{ doc("blast_event_index") }}'
- name: CONTRACT_ADDRESS
description: '{{ doc("blast_logs_contract_address") }}'
- name: EVENT_NAME
description: 'The name of the event, as defined in the contract ABI.'
- name: DECODED_LOG
description: 'The flattened decoded log, where the keys are the names of the event parameters, and the values are the values of the event parameters.'
- name: FULL_DECODED_LOG
description: 'The full decoded log, including the event name, the event parameters, and the data type of the event parameters.'
- name: FACT_DECODED_EVENT_LOGS_ID
description: '{{ doc("pk") }}'
- name: INSERTED_TIMESTAMP
description: '{{ doc("inserted_timestamp") }}'
- name: MODIFIED_TIMESTAMP
description: '{{ doc("modified_timestamp") }}'

View File

@ -0,0 +1,238 @@
-- depends_on: {{ ref('bronze__decoded_logs') }}
{{ config (
materialized = "incremental",
unique_key = ['block_number', 'event_index'],
cluster_by = "block_timestamp::date",
incremental_predicates = ["dynamic_range", "block_number"],
post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION",
merge_exclude_columns = ["inserted_timestamp"],
full_refresh = false,
tags = ['decoded_logs','reorg']
) }}
WITH base_data AS (
SELECT
block_number :: INTEGER AS block_number,
SPLIT(
id,
'-'
) [0] :: STRING AS tx_hash,
SPLIT(
id,
'-'
) [1] :: INTEGER AS event_index,
DATA :name :: STRING AS event_name,
LOWER(
DATA :address :: STRING
) :: STRING AS contract_address,
DATA AS decoded_data,
id :: STRING AS _log_id,
TO_TIMESTAMP_NTZ(_inserted_timestamp) AS _inserted_timestamp
FROM
{% if is_incremental() %}
{{ ref('bronze__decoded_logs') }}
WHERE
TO_TIMESTAMP_NTZ(_inserted_timestamp) >= (
SELECT
MAX(_inserted_timestamp)
FROM
{{ this }}
)
AND DATA NOT ILIKE '%Event topic is not present in given ABI%'
{% else %}
{{ ref('bronze__fr_decoded_logs') }}
WHERE
DATA NOT ILIKE '%Event topic is not present in given ABI%'
{% endif %}
qualify(ROW_NUMBER() over (PARTITION BY block_number, event_index
ORDER BY
_inserted_timestamp DESC, _partition_by_created_date DESC)) = 1
),
transformed_logs AS (
SELECT
block_number,
tx_hash,
event_index,
contract_address,
event_name,
decoded_data,
_inserted_timestamp,
_log_id,
utils.udf_transform_logs(decoded_data) AS transformed
FROM
base_data
),
FINAL AS (
SELECT
b.tx_hash,
b.block_number,
b.event_index,
b.event_name,
b.contract_address,
b.decoded_data,
transformed,
b._log_id,
b._inserted_timestamp,
OBJECT_AGG(
DISTINCT CASE
WHEN v.value :name = '' THEN CONCAT(
'anonymous_',
v.index
)
ELSE v.value :name
END,
v.value :value
) AS decoded_flat
FROM
transformed_logs b,
LATERAL FLATTEN(
input => transformed :data
) v
GROUP BY
b.tx_hash,
b.block_number,
b.event_index,
b.event_name,
b.contract_address,
b.decoded_data,
transformed,
b._log_id,
b._inserted_timestamp
),
new_records AS (
SELECT
b.tx_hash,
b.block_number,
b.event_index,
b.event_name,
b.contract_address,
b.decoded_data,
b.transformed,
b._log_id,
b._inserted_timestamp,
b.decoded_flat,
block_timestamp,
origin_function_signature,
origin_from_address,
origin_to_address,
topics,
DATA,
event_removed :: STRING AS event_removed,
tx_status,
CASE
WHEN block_timestamp IS NULL THEN TRUE
ELSE FALSE
END AS is_pending
FROM
FINAL b
LEFT JOIN {{ ref('silver__logs') }} USING (
block_number,
_log_id
)
)
{% if is_incremental() %},
missing_data AS (
SELECT
t.tx_hash,
t.block_number,
t.event_index,
t.event_name,
t.contract_address,
t.decoded_data,
t.transformed,
t._log_id,
GREATEST(
TO_TIMESTAMP_NTZ(
t._inserted_timestamp
),
TO_TIMESTAMP_NTZ(
l._inserted_timestamp
)
) AS _inserted_timestamp,
t.decoded_flat,
l.block_timestamp,
l.origin_function_signature,
l.origin_from_address,
l.origin_to_address,
l.topics,
l.data,
l.event_removed :: STRING AS event_removed,
l.tx_status,
FALSE AS is_pending
FROM
{{ this }}
t
INNER JOIN {{ ref('silver__logs') }}
l USING (
block_number,
_log_id
)
WHERE
t.is_pending
AND l.block_timestamp IS NOT NULL
)
{% endif %}
SELECT
tx_hash,
block_number,
event_index,
event_name,
contract_address,
decoded_data,
transformed,
_log_id,
_inserted_timestamp,
decoded_flat,
block_timestamp,
origin_function_signature,
origin_from_address,
origin_to_address,
topics,
DATA,
event_removed,
tx_status,
is_pending,
{{ dbt_utils.generate_surrogate_key(
['tx_hash', 'event_index']
) }} AS decoded_logs_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp,
'{{ invocation_id }}' AS _invocation_id
FROM
new_records
{% if is_incremental() %}
UNION
SELECT
tx_hash,
block_number,
event_index,
event_name,
contract_address,
decoded_data,
transformed,
_log_id,
_inserted_timestamp,
decoded_flat,
block_timestamp,
origin_function_signature,
origin_from_address,
origin_to_address,
topics,
DATA,
event_removed,
tx_status,
is_pending,
{{ dbt_utils.generate_surrogate_key(
['tx_hash', 'event_index']
) }} AS decoded_logs_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp,
'{{ invocation_id }}' AS _invocation_id
FROM
missing_data
{% endif %}

View File

@ -0,0 +1,9 @@
{{ config (
materialized = 'view',
tags = ['full_test']
) }}
SELECT
*
FROM
{{ ref('silver__decoded_logs') }}

View File

@ -0,0 +1,52 @@
version: 2
models:
- name: test_silver__decoded_logs_full
tests:
- dbt_utils.unique_combination_of_columns:
combination_of_columns:
- _LOG_ID
- dbt_utils.recency:
datepart: day
field: _INSERTED_TIMESTAMP
interval: 1
columns:
- name: BLOCK_NUMBER
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- NUMBER
- FLOAT
- name: TX_HASH
tests:
- not_null
- dbt_expectations.expect_column_values_to_match_regex:
regex: 0[xX][0-9a-fA-F]+
- name: EVENT_INDEX
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- NUMBER
- FLOAT
- name: CONTRACT_ADDRESS
tests:
- not_null
- dbt_expectations.expect_column_values_to_match_regex:
regex: 0[xX][0-9a-fA-F]+
- name: _INSERTED_TIMESTAMP
tests:
- not_null
- name: EVENT_NAME
tests:
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- STRING
- VARCHAR

View File

@ -0,0 +1,23 @@
{{ config (
materialized = 'view',
tags = ['recent_test']
) }}
WITH last_3_days AS (
SELECT
block_number
FROM
{{ ref("_block_lookback") }}
)
SELECT
*
FROM
{{ ref('silver__decoded_logs') }}
WHERE
block_number >= (
SELECT
block_number
FROM
last_3_days
)

View File

@ -0,0 +1,56 @@
version: 2
models:
- name: test_silver__decoded_logs_recent
tests:
- dbt_utils.unique_combination_of_columns:
combination_of_columns:
- _LOG_ID
- dbt_utils.recency:
datepart: day
field: _INSERTED_TIMESTAMP
interval: 1
- fsc_utils.recent_decoded_logs_match:
config:
severity: error
error_if: ">0"
columns:
- name: BLOCK_NUMBER
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- NUMBER
- FLOAT
- name: TX_HASH
tests:
- not_null
- dbt_expectations.expect_column_values_to_match_regex:
regex: 0[xX][0-9a-fA-F]+
- name: EVENT_INDEX
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- NUMBER
- FLOAT
- name: CONTRACT_ADDRESS
tests:
- not_null
- dbt_expectations.expect_column_values_to_match_regex:
regex: 0[xX][0-9a-fA-F]+
- name: _INSERTED_TIMESTAMP
tests:
- not_null
- name: EVENT_NAME
tests:
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- STRING
- VARCHAR

View File

@ -0,0 +1,41 @@
{{ config (
materialized = 'view'
) }}
WITH meta AS (
SELECT
last_modified AS _inserted_timestamp,
file_name,
CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 6), '_', 1) AS INTEGER) AS _partition_by_block_number,
TO_DATE(
concat_ws('-', SPLIT_PART(file_name, '/', 3), SPLIT_PART(file_name, '/', 4), SPLIT_PART(file_name, '/', 5))
) AS _partition_by_created_date
FROM
TABLE(
information_schema.external_table_file_registration_history(
start_time => DATEADD('day', -3, CURRENT_TIMESTAMP()),
table_name => '{{ source( "bronze_streamline", "decoded_logs") }}')
) A
)
SELECT
block_number,
id :: STRING AS id,
DATA,
_inserted_timestamp,
s._partition_by_block_number AS _partition_by_block_number,
s._partition_by_created_date AS _partition_by_created_date
FROM
{{ source(
"bronze_streamline",
"decoded_logs"
) }}
s
JOIN meta b
ON b.file_name = metadata$filename
AND b._partition_by_block_number = s._partition_by_block_number
AND b._partition_by_created_date = s._partition_by_created_date
WHERE
b._partition_by_block_number = s._partition_by_block_number
AND b._partition_by_created_date = s._partition_by_created_date
AND s._partition_by_created_date >= DATEADD('day', -2, CURRENT_TIMESTAMP())

View File

@ -0,0 +1,40 @@
{{ config (
materialized = 'view'
) }}
WITH meta AS (
SELECT
registered_on AS _inserted_timestamp,
file_name,
CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 6), '_', 1) AS INTEGER) AS _partition_by_block_number,
TO_DATE(
concat_ws('-', SPLIT_PART(file_name, '/', 3), SPLIT_PART(file_name, '/', 4), SPLIT_PART(file_name, '/', 5))
) AS _partition_by_created_date
FROM
TABLE(
information_schema.external_table_files(
table_name => '{{ source( "bronze_streamline", "decoded_logs") }}'
)
) A
)
SELECT
block_number,
id :: STRING AS id,
DATA,
_inserted_timestamp,
s._partition_by_block_number AS _partition_by_block_number,
s._partition_by_created_date AS _partition_by_created_date
FROM
{{ source(
"bronze_streamline",
"decoded_logs"
) }}
s
JOIN meta b
ON b.file_name = metadata$filename
AND b._partition_by_block_number = s._partition_by_block_number
AND b._partition_by_created_date = s._partition_by_created_date
WHERE
b._partition_by_block_number = s._partition_by_block_number
AND b._partition_by_created_date = s._partition_by_created_date

View File

@ -0,0 +1,32 @@
-- depends_on: {{ ref('bronze__decoded_logs') }}
{{ config (
materialized = "incremental",
unique_key = "_log_id",
cluster_by = "ROUND(block_number, -3)",
incremental_predicates = ["dynamic_range", "block_number"],
merge_update_columns = ["_log_id"],
post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION on equality(_log_id)",
tags = ['streamline_decoded_logs_complete']
) }}
SELECT
block_number,
id AS _log_id,
_inserted_timestamp
FROM
{% if is_incremental() %}
{{ ref('bronze__decoded_logs') }}
WHERE
TO_TIMESTAMP_NTZ(_inserted_timestamp) >= (
SELECT
COALESCE(MAX(TO_TIMESTAMP_NTZ(_inserted_timestamp)), '1970-01-01 00:00:00') _inserted_timestamp
FROM
{{ this }})
{% else %}
{{ ref('bronze__fr_decoded_logs') }}
{% endif %}
qualify(ROW_NUMBER() over (PARTITION BY id
ORDER BY
_inserted_timestamp DESC)) = 1

View File

@ -0,0 +1,16 @@
{{ config (
materialized = "view",
post_hook = [if_data_call_function(
func = "{{model.schema}}.udf_bulk_decode_logs(object_construct('sql_source', '{{model.alias}}', 'external_table', 'DECODED_LOGS', 'sql_limit', {{var('sql_limit','7500000')}}, 'producer_batch_size', {{var('producer_batch_size','20000000')}}, 'worker_batch_size', {{var('worker_batch_size','10000000')}}))",
target = "{{model.schema}}.{{model.alias}}"
),
if_data_call_wait()],
tags = ['streamline_decoded_logs_history']
) }}
{% set start = this.identifier.split("_") [-2] %}
{% set stop = this.identifier.split("_") [-1] %}
{{ fsc_utils.decode_logs_history(
start,
stop
) }}

View File

@ -0,0 +1,16 @@
{{ config (
materialized = "view",
post_hook = [if_data_call_function(
func = "{{model.schema}}.udf_bulk_decode_logs(object_construct('sql_source', '{{model.alias}}', 'external_table', 'DECODED_LOGS', 'sql_limit', {{var('sql_limit','7500000')}}, 'producer_batch_size', {{var('producer_batch_size','20000000')}}, 'worker_batch_size', {{var('worker_batch_size','10000000')}}))",
target = "{{model.schema}}.{{model.alias}}"
),
if_data_call_wait()],
tags = ['streamline_decoded_logs_history']
) }}
{% set start = this.identifier.split("_") [-2] %}
{% set stop = this.identifier.split("_") [-1] %}
{{ fsc_utils.decode_logs_history(
start,
stop
) }}

View File

@ -0,0 +1,16 @@
{{ config (
materialized = "view",
post_hook = [if_data_call_function(
func = "{{model.schema}}.udf_bulk_decode_logs(object_construct('sql_source', '{{model.alias}}', 'external_table', 'DECODED_LOGS', 'sql_limit', {{var('sql_limit','7500000')}}, 'producer_batch_size', {{var('producer_batch_size','20000000')}}, 'worker_batch_size', {{var('worker_batch_size','10000000')}}))",
target = "{{model.schema}}.{{model.alias}}"
),
if_data_call_wait()],
tags = ['streamline_decoded_logs_history']
) }}
{% set start = this.identifier.split("_") [-2] %}
{% set stop = this.identifier.split("_") [-1] %}
{{ fsc_utils.decode_logs_history(
start,
stop
) }}

View File

@ -0,0 +1,16 @@
{{ config (
materialized = "view",
post_hook = [if_data_call_function(
func = "{{model.schema}}.udf_bulk_decode_logs(object_construct('sql_source', '{{model.alias}}', 'external_table', 'DECODED_LOGS', 'sql_limit', {{var('sql_limit','7500000')}}, 'producer_batch_size', {{var('producer_batch_size','20000000')}}, 'worker_batch_size', {{var('worker_batch_size','10000000')}}))",
target = "{{model.schema}}.{{model.alias}}"
),
if_data_call_wait()],
tags = ['streamline_decoded_logs_history']
) }}
{% set start = this.identifier.split("_") [-2] %}
{% set stop = this.identifier.split("_") [-1] %}
{{ fsc_utils.decode_logs_history(
start,
stop
) }}

View File

@ -0,0 +1,16 @@
{{ config (
materialized = "view",
post_hook = [if_data_call_function(
func = "{{model.schema}}.udf_bulk_decode_logs(object_construct('sql_source', '{{model.alias}}', 'external_table', 'DECODED_LOGS', 'sql_limit', {{var('sql_limit','7500000')}}, 'producer_batch_size', {{var('producer_batch_size','20000000')}}, 'worker_batch_size', {{var('worker_batch_size','10000000')}}))",
target = "{{model.schema}}.{{model.alias}}"
),
if_data_call_wait()],
tags = ['streamline_decoded_logs_history']
) }}
{% set start = this.identifier.split("_") [-2] %}
{% set stop = this.identifier.split("_") [-1] %}
{{ fsc_utils.decode_logs_history(
start,
stop
) }}

View File

@ -0,0 +1,16 @@
{{ config (
materialized = "view",
post_hook = [if_data_call_function(
func = "{{model.schema}}.udf_bulk_decode_logs(object_construct('sql_source', '{{model.alias}}', 'external_table', 'DECODED_LOGS', 'sql_limit', {{var('sql_limit','7500000')}}, 'producer_batch_size', {{var('producer_batch_size','20000000')}}, 'worker_batch_size', {{var('worker_batch_size','10000000')}}))",
target = "{{model.schema}}.{{model.alias}}"
),
if_data_call_wait()],
tags = ['streamline_decoded_logs_history']
) }}
{% set start = this.identifier.split("_") [-2] %}
{% set stop = this.identifier.split("_") [-1] %}
{{ fsc_utils.decode_logs_history(
start,
stop
) }}

View File

@ -0,0 +1,16 @@
{{ config (
materialized = "view",
post_hook = [if_data_call_function(
func = "{{model.schema}}.udf_bulk_decode_logs(object_construct('sql_source', '{{model.alias}}', 'external_table', 'DECODED_LOGS', 'sql_limit', {{var('sql_limit','7500000')}}, 'producer_batch_size', {{var('producer_batch_size','20000000')}}, 'worker_batch_size', {{var('worker_batch_size','10000000')}}))",
target = "{{model.schema}}.{{model.alias}}"
),
if_data_call_wait()],
tags = ['streamline_decoded_logs_history']
) }}
{% set start = this.identifier.split("_") [-2] %}
{% set stop = this.identifier.split("_") [-1] %}
{{ fsc_utils.decode_logs_history(
start,
stop
) }}

View File

@ -0,0 +1,16 @@
{{ config (
materialized = "view",
post_hook = [if_data_call_function(
func = "{{model.schema}}.udf_bulk_decode_logs(object_construct('sql_source', '{{model.alias}}', 'external_table', 'DECODED_LOGS', 'sql_limit', {{var('sql_limit','7500000')}}, 'producer_batch_size', {{var('producer_batch_size','20000000')}}, 'worker_batch_size', {{var('worker_batch_size','10000000')}}))",
target = "{{model.schema}}.{{model.alias}}"
),
if_data_call_wait()],
tags = ['streamline_decoded_logs_history']
) }}
{% set start = this.identifier.split("_") [-2] %}
{% set stop = this.identifier.split("_") [-1] %}
{{ fsc_utils.decode_logs_history(
start,
stop
) }}

View File

@ -0,0 +1,16 @@
{{ config (
materialized = "view",
post_hook = [if_data_call_function(
func = "{{model.schema}}.udf_bulk_decode_logs(object_construct('sql_source', '{{model.alias}}', 'external_table', 'DECODED_LOGS', 'sql_limit', {{var('sql_limit','7500000')}}, 'producer_batch_size', {{var('producer_batch_size','20000000')}}, 'worker_batch_size', {{var('worker_batch_size','10000000')}}))",
target = "{{model.schema}}.{{model.alias}}"
),
if_data_call_wait()],
tags = ['streamline_decoded_logs_history']
) }}
{% set start = this.identifier.split("_") [-2] %}
{% set stop = this.identifier.split("_") [-1] %}
{{ fsc_utils.decode_logs_history(
start,
stop
) }}

View File

@ -0,0 +1,16 @@
{{ config (
materialized = "view",
post_hook = [if_data_call_function(
func = "{{model.schema}}.udf_bulk_decode_logs(object_construct('sql_source', '{{model.alias}}', 'external_table', 'DECODED_LOGS', 'sql_limit', {{var('sql_limit','7500000')}}, 'producer_batch_size', {{var('producer_batch_size','20000000')}}, 'worker_batch_size', {{var('worker_batch_size','10000000')}}))",
target = "{{model.schema}}.{{model.alias}}"
),
if_data_call_wait()],
tags = ['streamline_decoded_logs_history']
) }}
{% set start = this.identifier.split("_") [-2] %}
{% set stop = this.identifier.split("_") [-1] %}
{{ fsc_utils.decode_logs_history(
start,
stop
) }}

View File

@ -0,0 +1,16 @@
{{ config (
materialized = "view",
post_hook = [if_data_call_function(
func = "{{model.schema}}.udf_bulk_decode_logs(object_construct('sql_source', '{{model.alias}}', 'external_table', 'DECODED_LOGS', 'sql_limit', {{var('sql_limit','7500000')}}, 'producer_batch_size', {{var('producer_batch_size','20000000')}}, 'worker_batch_size', {{var('worker_batch_size','10000000')}}))",
target = "{{model.schema}}.{{model.alias}}"
),
if_data_call_wait()],
tags = ['streamline_decoded_logs_history']
) }}
{% set start = this.identifier.split("_") [-2] %}
{% set stop = this.identifier.split("_") [-1] %}
{{ fsc_utils.decode_logs_history(
start,
stop
) }}

View File

@ -0,0 +1,16 @@
{{ config (
materialized = "view",
post_hook = [if_data_call_function(
func = "{{model.schema}}.udf_bulk_decode_logs(object_construct('sql_source', '{{model.alias}}', 'external_table', 'DECODED_LOGS', 'sql_limit', {{var('sql_limit','7500000')}}, 'producer_batch_size', {{var('producer_batch_size','20000000')}}, 'worker_batch_size', {{var('worker_batch_size','10000000')}}))",
target = "{{model.schema}}.{{model.alias}}"
),
if_data_call_wait()],
tags = ['streamline_decoded_logs_history']
) }}
{% set start = this.identifier.split("_") [-2] %}
{% set stop = this.identifier.split("_") [-1] %}
{{ fsc_utils.decode_logs_history(
start,
stop
) }}

View File

@ -0,0 +1,16 @@
{{ config (
materialized = "view",
post_hook = [if_data_call_function(
func = "{{model.schema}}.udf_bulk_decode_logs(object_construct('sql_source', '{{model.alias}}', 'external_table', 'DECODED_LOGS', 'sql_limit', {{var('sql_limit','7500000')}}, 'producer_batch_size', {{var('producer_batch_size','20000000')}}, 'worker_batch_size', {{var('worker_batch_size','10000000')}}))",
target = "{{model.schema}}.{{model.alias}}"
),
if_data_call_wait()],
tags = ['streamline_decoded_logs_history']
) }}
{% set start = this.identifier.split("_") [-2] %}
{% set stop = this.identifier.split("_") [-1] %}
{{ fsc_utils.decode_logs_history(
start,
stop
) }}

View File

@ -0,0 +1,16 @@
{{ config (
materialized = "view",
post_hook = [if_data_call_function(
func = "{{model.schema}}.udf_bulk_decode_logs(object_construct('sql_source', '{{model.alias}}', 'external_table', 'DECODED_LOGS', 'sql_limit', {{var('sql_limit','7500000')}}, 'producer_batch_size', {{var('producer_batch_size','20000000')}}, 'worker_batch_size', {{var('worker_batch_size','10000000')}}))",
target = "{{model.schema}}.{{model.alias}}"
),
if_data_call_wait()],
tags = ['streamline_decoded_logs_history']
) }}
{% set start = this.identifier.split("_") [-2] %}
{% set stop = this.identifier.split("_") [-1] %}
{{ fsc_utils.decode_logs_history(
start,
stop
) }}

View File

@ -0,0 +1,16 @@
{{ config (
materialized = "view",
post_hook = [if_data_call_function(
func = "{{model.schema}}.udf_bulk_decode_logs(object_construct('sql_source', '{{model.alias}}', 'external_table', 'DECODED_LOGS', 'sql_limit', {{var('sql_limit','7500000')}}, 'producer_batch_size', {{var('producer_batch_size','20000000')}}, 'worker_batch_size', {{var('worker_batch_size','10000000')}}))",
target = "{{model.schema}}.{{model.alias}}"
),
if_data_call_wait()],
tags = ['streamline_decoded_logs_history']
) }}
{% set start = this.identifier.split("_") [-2] %}
{% set stop = this.identifier.split("_") [-1] %}
{{ fsc_utils.decode_logs_history(
start,
stop
) }}

View File

@ -0,0 +1,16 @@
{{ config (
materialized = "view",
post_hook = [if_data_call_function(
func = "{{model.schema}}.udf_bulk_decode_logs(object_construct('sql_source', '{{model.alias}}', 'external_table', 'DECODED_LOGS', 'sql_limit', {{var('sql_limit','7500000')}}, 'producer_batch_size', {{var('producer_batch_size','20000000')}}, 'worker_batch_size', {{var('worker_batch_size','10000000')}}))",
target = "{{model.schema}}.{{model.alias}}"
),
if_data_call_wait()],
tags = ['streamline_decoded_logs_history']
) }}
{% set start = this.identifier.split("_") [-2] %}
{% set stop = this.identifier.split("_") [-1] %}
{{ fsc_utils.decode_logs_history(
start,
stop
) }}

View File

@ -0,0 +1,16 @@
{{ config (
materialized = "view",
post_hook = [if_data_call_function(
func = "{{model.schema}}.udf_bulk_decode_logs(object_construct('sql_source', '{{model.alias}}', 'external_table', 'DECODED_LOGS', 'sql_limit', {{var('sql_limit','7500000')}}, 'producer_batch_size', {{var('producer_batch_size','20000000')}}, 'worker_batch_size', {{var('worker_batch_size','10000000')}}))",
target = "{{model.schema}}.{{model.alias}}"
),
if_data_call_wait()],
tags = ['streamline_decoded_logs_history']
) }}
{% set start = this.identifier.split("_") [-2] %}
{% set stop = this.identifier.split("_") [-1] %}
{{ fsc_utils.decode_logs_history(
start,
stop
) }}

View File

@ -0,0 +1,16 @@
{{ config (
materialized = "view",
post_hook = [if_data_call_function(
func = "{{model.schema}}.udf_bulk_decode_logs(object_construct('sql_source', '{{model.alias}}', 'external_table', 'DECODED_LOGS', 'sql_limit', {{var('sql_limit','7500000')}}, 'producer_batch_size', {{var('producer_batch_size','20000000')}}, 'worker_batch_size', {{var('worker_batch_size','10000000')}}))",
target = "{{model.schema}}.{{model.alias}}"
),
if_data_call_wait()],
tags = ['streamline_decoded_logs_history']
) }}
{% set start = this.identifier.split("_") [-2] %}
{% set stop = this.identifier.split("_") [-1] %}
{{ fsc_utils.decode_logs_history(
start,
stop
) }}

View File

@ -0,0 +1,16 @@
{{ config (
materialized = "view",
post_hook = [if_data_call_function(
func = "{{model.schema}}.udf_bulk_decode_logs(object_construct('sql_source', '{{model.alias}}', 'external_table', 'DECODED_LOGS', 'sql_limit', {{var('sql_limit','7500000')}}, 'producer_batch_size', {{var('producer_batch_size','20000000')}}, 'worker_batch_size', {{var('worker_batch_size','10000000')}}))",
target = "{{model.schema}}.{{model.alias}}"
),
if_data_call_wait()],
tags = ['streamline_decoded_logs_history']
) }}
{% set start = this.identifier.split("_") [-2] %}
{% set stop = this.identifier.split("_") [-1] %}
{{ fsc_utils.decode_logs_history(
start,
stop
) }}

View File

@ -0,0 +1,66 @@
{{ config (
materialized = "view",
post_hook = [if_data_call_function(
func = "{{this.schema}}.udf_bulk_decode_logs(object_construct('sql_source', '{{this.identifier}}', 'external_table', 'DECODED_LOGS', 'sql_limit', {{var('sql_limit','2000000')}}, 'producer_batch_size', {{var('producer_batch_size','400000')}}, 'worker_batch_size', {{var('worker_batch_size','200000')}}))",
target = "{{this.schema}}.{{this.identifier}}"
),
"call system$wait(" ~ var("WAIT", 400) ~ ")" ],
tags = ['streamline_decoded_logs_realtime']
) }}
WITH look_back AS (
SELECT
block_number
FROM
{{ ref("_max_block_by_date") }}
qualify ROW_NUMBER() over (
ORDER BY
block_number DESC
) = 1
)
SELECT
l.block_number,
l._log_id,
A.abi AS abi,
OBJECT_CONSTRUCT(
'topics',
l.topics,
'data',
l.data,
'address',
l.contract_address
) AS DATA
FROM
{{ ref("silver__logs") }}
l
INNER JOIN {{ ref("silver__complete_event_abis") }} A
ON A.parent_contract_address = l.contract_address
AND A.event_signature = l.topics [0] :: STRING
AND l.block_number BETWEEN A.start_block
AND A.end_block
WHERE
(
l.block_number >= (
SELECT
block_number
FROM
look_back
)
)
AND l.block_number IS NOT NULL
AND l.block_timestamp >= DATEADD('day', -2, CURRENT_DATE())
AND _log_id NOT IN (
SELECT
_log_id
FROM
{{ ref("streamline__complete_decode_logs") }}
WHERE
block_number >= (
SELECT
block_number
FROM
look_back
)
AND _inserted_timestamp >= DATEADD('day', -2, CURRENT_DATE())
)