AN-4520/streamline-udfs-testnet (#4)

* streamline udfs

* udfs

* streamline models

* streamline models

* exploded key

* create udfs edit

* txn history

* testnet sources

* edits

* utils

* livequery for chainhead

* CTE

* secret name path

* udfs

* coalesce

* chainhead table

* tbl config

* request syntax

* core views

* mint precise

* request structure

* Temp changes

* new request structure

* id

* traces structure

* traces history

* partition key

* confirm blocks

* workflows

* tests and observ

* ephit

---------

Co-authored-by: Ryan-Loofy <63126328+Ryan-Loofy@users.noreply.github.com>
This commit is contained in:
drethereum 2024-02-22 12:11:05 -07:00 committed by GitHub
parent f0cd30aba8
commit 5d9186eb4b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
116 changed files with 4065 additions and 61 deletions

View File

@ -0,0 +1,46 @@
name: dbt_alter_gha_task
run-name: dbt_alter_gha_task
on:
workflow_dispatch:
branches:
- "main"
inputs:
workflow_name:
type: string
description: Name of the workflow to perform the action on, no .yml extension
required: true
task_action:
type: choice
description: Action to perform
required: true
options:
- SUSPEND
- RESUME
default: SUSPEND
env:
DBT_PROFILES_DIR: ./
ACCOUNT: "${{ vars.ACCOUNT }}"
ROLE: "${{ vars.ROLE }}"
USER: "${{ vars.USER }}"
PASSWORD: "${{ secrets.PASSWORD }}"
REGION: "${{ vars.REGION }}"
DATABASE: "${{ vars.DATABASE }}"
WAREHOUSE: "${{ vars.WAREHOUSE }}"
SCHEMA: "${{ vars.SCHEMA }}"
concurrency:
group: ${{ github.workflow }}
jobs:
called_workflow_template:
uses: FlipsideCrypto/analytics-workflow-templates/.github/workflows/dbt_alter_gha_tasks.yml@main
with:
workflow_name: |
${{ inputs.workflow_name }}
task_action: |
${{ inputs.task_action }}
environment: workflow_prod
secrets: inherit

27
.github/workflows/dbt_docs_update.yml vendored Normal file
View File

@ -0,0 +1,27 @@
name: docs_update
on:
push:
branches:
- "main"
env:
USE_VARS: "${{ vars.USE_VARS }}"
DBT_PROFILES_DIR: "${{ vars.DBT_PROFILES_DIR }}"
DBT_VERSION: "${{ vars.DBT_VERSION }}"
ACCOUNT: "${{ vars.ACCOUNT }}"
ROLE: "${{ vars.ROLE }}"
USER: "${{ vars.USER }}"
PASSWORD: "${{ secrets.PASSWORD }}"
REGION: "${{ vars.REGION }}"
DATABASE: "${{ vars.DATABASE }}"
WAREHOUSE: "${{ vars.WAREHOUSE }}"
SCHEMA: "${{ vars.SCHEMA }}"
concurrency:
group: ${{ github.workflow }}
jobs:
called_workflow_template:
uses: FlipsideCrypto/analytics-workflow-templates/.github/workflows/dbt_docs_updates.yml@main
secrets: inherit

View File

@ -1,5 +1,5 @@
name: dbt_run_adhoc
run-name: dbt_run_adhoc
run-name: ${{ inputs.dbt_command }}
on:
workflow_dispatch:

View File

@ -1,5 +1,5 @@
name: dbt_run_deployment
run-name: dbt_run_deployment
run-name: ${{ inputs.dbt_command }}
on:
workflow_dispatch:

View File

@ -0,0 +1,45 @@
name: dbt_run_dev_refresh
run-name: dbt_run_dev_refresh
on:
workflow_dispatch:
schedule:
# Runs "at 9:00 UTC" (see https://crontab.guru)
- cron: '0 9 * * *'
env:
DBT_PROFILES_DIR: ./
ACCOUNT: "${{ vars.ACCOUNT }}"
ROLE: "${{ vars.ROLE }}"
USER: "${{ vars.USER }}"
PASSWORD: "${{ secrets.PASSWORD }}"
REGION: "${{ vars.REGION }}"
DATABASE: "${{ vars.DATABASE }}"
WAREHOUSE: "${{ vars.WAREHOUSE }}"
SCHEMA: "${{ vars.SCHEMA }}"
concurrency:
group: ${{ github.workflow }}
jobs:
run_dbt_jobs:
runs-on: ubuntu-latest
environment:
name: workflow_prod
steps:
- uses: actions/checkout@v3
- uses: actions/setup-python@v4
with:
python-version: "3.10"
cache: "pip"
- name: install dependencies
run: |
pip install -r requirements.txt
dbt deps
- name: Run DBT Jobs
run: |
dbt run-operation run_sp_create_prod_clone

View File

@ -0,0 +1,48 @@
name: dbt_run_full_observability
run-name: dbt_run_full_observability
on:
workflow_dispatch:
schedule:
# Runs “At 16:00 on day-of-month 1.” (see https://crontab.guru)
- cron: '0 16 1 * *'
env:
DBT_PROFILES_DIR: ./
ACCOUNT: "${{ vars.ACCOUNT }}"
ROLE: "${{ vars.ROLE }}"
USER: "${{ vars.USER }}"
PASSWORD: "${{ secrets.PASSWORD }}"
REGION: "${{ vars.REGION }}"
DATABASE: "${{ vars.DATABASE }}"
WAREHOUSE: "${{ vars.WAREHOUSE }}"
SCHEMA: "${{ vars.SCHEMA }}"
concurrency:
group: ${{ github.workflow }}
jobs:
run_dbt_jobs:
runs-on: ubuntu-latest
environment:
name: workflow_prod_2xl
steps:
- uses: actions/checkout@v3
- uses: actions/setup-python@v4
with:
python-version: "3.10"
cache: "pip"
- name: install dependencies
run: |
pip install -r requirements.txt
dbt deps
- name: Run DBT Jobs
run: |
dbt run --threads 2 --vars '{"OBSERV_FULL_TEST":True}' -m "blast_models,tag:observability"

View File

@ -0,0 +1,44 @@
name: dbt_run_scheduled_non_realtime
run-name: dbt_run_scheduled_non_realtime
on:
workflow_dispatch:
branches:
- "main"
env:
DBT_PROFILES_DIR: ./
ACCOUNT: "${{ vars.ACCOUNT }}"
ROLE: "${{ vars.ROLE }}"
USER: "${{ vars.USER }}"
PASSWORD: "${{ secrets.PASSWORD }}"
REGION: "${{ vars.REGION }}"
DATABASE: "${{ vars.DATABASE }}"
WAREHOUSE: "${{ vars.WAREHOUSE }}"
SCHEMA: "${{ vars.SCHEMA }}"
concurrency:
group: ${{ github.workflow }}
jobs:
run_dbt_jobs:
runs-on: ubuntu-latest
environment:
name: workflow_prod
steps:
- uses: actions/checkout@v3
- uses: actions/setup-python@v4
with:
python-version: "3.10"
cache: "pip"
- name: install dependencies
run: |
pip install -r requirements.txt
dbt deps
- name: Run DBT Jobs
run: |
dbt run -m "blast_models,tag:non_realtime"

View File

@ -0,0 +1,44 @@
name: dbt_run_streamline_chainhead
run-name: dbt_run_streamline_chainhead
on:
workflow_dispatch:
branches:
- "main"
env:
DBT_PROFILES_DIR: ./
ACCOUNT: "${{ vars.ACCOUNT }}"
ROLE: "${{ vars.ROLE }}"
USER: "${{ vars.USER }}"
PASSWORD: "${{ secrets.PASSWORD }}"
REGION: "${{ vars.REGION }}"
DATABASE: "${{ vars.DATABASE }}"
WAREHOUSE: "${{ vars.WAREHOUSE }}"
SCHEMA: "${{ vars.SCHEMA }}"
concurrency:
group: ${{ github.workflow }}
jobs:
run_dbt_jobs:
runs-on: ubuntu-latest
environment:
name: workflow_prod
steps:
- uses: actions/checkout@v3
- uses: actions/setup-python@v4
with:
python-version: "3.10"
cache: "pip"
- name: install dependencies
run: |
pip install -r requirements.txt
dbt deps
- name: Run DBT Jobs
run: |
dbt run --vars '{"STREAMLINE_INVOKE_STREAMS":True}' -m "blast_models,tag:streamline_core_complete" "blast_models,tag:streamline_core_realtime"

View File

@ -0,0 +1,69 @@
name: dbt_run_streamline_history_adhoc
run-name: dbt_run_streamline_history_adhoc
on:
workflow_dispatch:
branches:
- "main"
inputs:
environment:
type: choice
description: DBT Run Environment
required: true
options:
- dev
- prod
- prod_backfill
default: dev
warehouse:
type: choice
description: Snowflake warehouse
required: true
options:
- DBT
- DBT_CLOUD
- DBT_EMERGENCY
default: DBT
dbt_command:
type: choice
description: 'DBT Run Command'
required: true
options:
- dbt run --vars '{"STREAMLINE_INVOKE_STREAMS":True}' -m "blast_models,tag:streamline_core_history" "blast_models,tag:streamline_core_complete"
env:
DBT_PROFILES_DIR: ./
ACCOUNT: "${{ vars.ACCOUNT }}"
ROLE: "${{ vars.ROLE }}"
USER: "${{ vars.USER }}"
PASSWORD: "${{ secrets.PASSWORD }}"
REGION: "${{ vars.REGION }}"
DATABASE: "${{ vars.DATABASE }}"
WAREHOUSE: "${{ inputs.warehouse }}"
SCHEMA: "${{ vars.SCHEMA }}"
concurrency:
group: ${{ github.workflow }}
jobs:
run_dbt_jobs:
runs-on: ubuntu-latest
environment:
name: workflow_${{ inputs.environment }}
steps:
- uses: actions/checkout@v3
- uses: actions/setup-python@v4
with:
python-version: "3.10"
cache: "pip"
- name: install dependencies
run: |
pip install -r requirements.txt
dbt deps
- name: Run DBT Jobs
run: |
${{ inputs.dbt_command }}

48
.github/workflows/dbt_test_daily.yml vendored Normal file
View File

@ -0,0 +1,48 @@
name: dbt_test_daily
run-name: dbt_test_daily
on:
workflow_dispatch:
schedule:
# Runs "at 9:00 UTC" (see https://crontab.guru)
- cron: '0 9 * * *'
env:
DBT_PROFILES_DIR: ./
ACCOUNT: "${{ vars.ACCOUNT }}"
ROLE: "${{ vars.ROLE }}"
USER: "${{ vars.USER }}"
PASSWORD: "${{ secrets.PASSWORD }}"
REGION: "${{ vars.REGION }}"
DATABASE: "${{ vars.DATABASE }}"
WAREHOUSE: "${{ vars.WAREHOUSE }}"
SCHEMA: "${{ vars.SCHEMA }}"
concurrency:
group: ${{ github.workflow }}
jobs:
run_dbt_jobs:
runs-on: ubuntu-latest
environment:
name: workflow_test
steps:
- uses: actions/checkout@v3
- uses: actions/setup-python@v4
with:
python-version: "3.10"
cache: "pip"
- name: install dependencies
run: |
pip install -r requirements.txt
dbt deps
- name: Run DBT Jobs
run: |
dbt test --exclude "blast_models,tag:full_test" "blast_models,tag:recent_test" "blast_models,tag:gha_tasks" livequery_models

49
.github/workflows/dbt_test_intraday.yml vendored Normal file
View File

@ -0,0 +1,49 @@
name: dbt_test_intraday
run-name: dbt_test_intraday
on:
workflow_dispatch:
schedule:
# Runs “At minute 5 past every 4th hour.” (see https://crontab.guru)
- cron: '5 */4 * * *'
env:
DBT_PROFILES_DIR: ./
ACCOUNT: "${{ vars.ACCOUNT }}"
ROLE: "${{ vars.ROLE }}"
USER: "${{ vars.USER }}"
PASSWORD: "${{ secrets.PASSWORD }}"
REGION: "${{ vars.REGION }}"
DATABASE: "${{ vars.DATABASE }}"
WAREHOUSE: "${{ vars.WAREHOUSE }}"
SCHEMA: "${{ vars.SCHEMA }}"
concurrency:
group: ${{ github.workflow }}
jobs:
run_dbt_jobs:
runs-on: ubuntu-latest
environment:
name: workflow_test
steps:
- uses: actions/checkout@v3
- uses: actions/setup-python@v4
with:
python-version: "3.10"
cache: "pip"
- name: install dependencies
run: |
pip install -r requirements.txt
dbt deps
- name: Run DBT Jobs
run: |
dbt run -m "blast_models,tag:observability"
dbt test -m "blast_models,tag:recent_test"

48
.github/workflows/dbt_test_monthly.yml vendored Normal file
View File

@ -0,0 +1,48 @@
name: dbt_test_monthly
run-name: dbt_test_monthly
on:
workflow_dispatch:
schedule:
# Runs “At 7pm on the 28th of month.” (see https://crontab.guru)
- cron: '0 19 28 * *'
env:
DBT_PROFILES_DIR: ./
ACCOUNT: "${{ vars.ACCOUNT }}"
ROLE: "${{ vars.ROLE }}"
USER: "${{ vars.USER }}"
PASSWORD: "${{ secrets.PASSWORD }}"
REGION: "${{ vars.REGION }}"
DATABASE: "${{ vars.DATABASE }}"
WAREHOUSE: "${{ vars.WAREHOUSE }}"
SCHEMA: "${{ vars.SCHEMA }}"
concurrency:
group: ${{ github.workflow }}
jobs:
run_dbt_jobs:
runs-on: ubuntu-latest
environment:
name: workflow_test
steps:
- uses: actions/checkout@v3
- uses: actions/setup-python@v4
with:
python-version: "3.10"
cache: "pip"
- name: install dependencies
run: |
pip install -r requirements.txt
dbt deps
- name: Run DBT Jobs
run: |
dbt test -m "blast_models,tag:full_test"

27
.github/workflows/dbt_test_tasks.yml vendored Normal file
View File

@ -0,0 +1,27 @@
name: dbt_test_tasks
run-name: dbt_test_tasks
on:
workflow_dispatch:
branches:
- "main"
env:
DBT_PROFILES_DIR: ./
ACCOUNT: "${{ vars.ACCOUNT }}"
ROLE: "${{ vars.ROLE }}"
USER: "${{ vars.USER }}"
PASSWORD: "${{ secrets.PASSWORD }}"
REGION: "${{ vars.REGION }}"
DATABASE: "${{ vars.DATABASE }}"
WAREHOUSE: "${{ vars.WAREHOUSE }}"
SCHEMA: "${{ vars.SCHEMA }}"
concurrency:
group: ${{ github.workflow }}
jobs:
called_workflow_template:
uses: FlipsideCrypto/analytics-workflow-templates/.github/workflows/dbt_test_tasks.yml@main
secrets: inherit

View File

@ -1,6 +1,4 @@
workflow_name,workflow_schedule
dbt_run_scheduled_non_realtime,"22,52 * * * *"
dbt_run_streamline_chainhead,"15,45 * * * *"
dbt_run_streamline_decoder,"0,30 * * * *"
dbt_run_scheduled_curated,"40 * * * *"
dbt_test_tasks,"15 * * * *"
dbt_run_scheduled_non_realtime,"17,47 * * * *"
dbt_run_streamline_chainhead,"10,40 * * * *"
dbt_test_tasks,"10 * * * *"
1 workflow_name workflow_schedule
2 dbt_run_scheduled_non_realtime 22,52 * * * * 17,47 * * * *
3 dbt_run_streamline_chainhead 15,45 * * * * 10,40 * * * *
4 dbt_run_streamline_decoder dbt_test_tasks 0,30 * * * * 10 * * * *
dbt_run_scheduled_curated 40 * * * *
dbt_test_tasks 15 * * * *

View File

@ -28,7 +28,7 @@ tests:
+store_failures: true # all tests
on-run-start:
# - "{{ create_sps() }}"
- "{{ create_sps() }}"
- "{{ create_udfs() }}"
on-run-end:

View File

@ -1,8 +1,8 @@
{# {% macro create_sps() %}
{% macro create_sps() %}
{% if var("UPDATE_UDFS_AND_SPS") %}
{% if target.database == 'BLAST' %}
CREATE schema IF NOT EXISTS _internal;
{{ sp_create_prod_clone('_internal') }};
{% endif %}
{% endif %}
{% endmacro %} #}
{% endmacro %}

View File

@ -1,5 +1,15 @@
{% macro create_udfs() %}
{% if var("UPDATE_UDFS_AND_SPS") %}
{% set sql %}
CREATE schema if NOT EXISTS silver;
{{ create_udtf_get_base_table(
schema = "streamline"
) }}
{{ create_udf_rest_api() }}
{% endset %}
{% do run_query(sql) %}
{{- fsc_utils.create_udfs() -}}
{% endif %}
{% endmacro %}
{% endmacro %}

View File

@ -0,0 +1,22 @@
{% macro create_aws_base_api() %}
{{ log(
"Creating integration for target:" ~ target
) }}
{# {% if target.name == "prod" %}
{% set sql %}
CREATE api integration IF NOT EXISTS aws_blast_api api_provider = aws_api_gateway api_aws_role_arn = 'insert-prod-arn-here' api_allowed_prefixes = (
'insert-prod-url-here'
) enabled = TRUE;
{% endset %}
{% do run_query(sql) %}
#}
{% if target.name == "dev" %}
--replace if with elif after prod is deployed
{% set sql %}
CREATE api integration IF NOT EXISTS aws_blast_api_dev api_provider = aws_api_gateway api_aws_role_arn = 'arn:aws:iam::704693948482:role/blast-api-dev-rolesnowflakeudfsAF733095-Wtkj0DGJ7lOQ' api_allowed_prefixes = (
'https://05340o05al.execute-api.us-east-1.amazonaws.com/dev/'
) enabled = TRUE;
{% endset %}
{% do run_query(sql) %}
{% endif %}
{% endmacro %}

View File

@ -0,0 +1,24 @@
{% macro create_udtf_get_base_table(schema) %}
create or replace function {{ schema }}.udtf_get_base_table(max_height integer)
returns table (height number)
as
$$
with base as (
select
row_number() over (
order by
seq4()
) as id
from
table(generator(rowcount => 100000000))
)
select
id as height
from
base
where
id <= max_height
$$
;
{% endmacro %}

View File

@ -0,0 +1,117 @@
{% macro streamline_external_table_query(
model,
partition_function,
partition_name,
unique_key
) %}
WITH meta AS (
SELECT
last_modified AS _inserted_timestamp,
file_name,
{{ partition_function }} AS {{ partition_name }}
FROM
TABLE(
information_schema.external_table_file_registration_history(
start_time => DATEADD('day', -3, CURRENT_TIMESTAMP()),
table_name => '{{ source( "bronze_streamline", model) }}')
) A
)
SELECT
{{ unique_key }} AS block_number,
DATA,
_inserted_timestamp,
MD5(
CAST(
COALESCE(CAST({{ unique_key }} AS text), '' :: STRING) AS text
)
) AS id,
s.{{ partition_name }},
s.value AS VALUE
FROM
{{ source(
"bronze_streamline",
model
) }}
s
JOIN meta b
ON b.file_name = metadata$filename
AND b.{{ partition_name }} = s.{{ partition_name }}
WHERE
b.{{ partition_name }} = s.{{ partition_name }}
AND (
DATA :error :code IS NULL
OR DATA :error :code NOT IN (
'-32000',
'-32001',
'-32002',
'-32003',
'-32004',
'-32005',
'-32006',
'-32007',
'-32008',
'-32009',
'-32010',
'-32608'
)
)
{% endmacro %}
{% macro streamline_external_table_FR_query(
model,
partition_function,
partition_name,
unique_key
) %}
WITH meta AS (
SELECT
registered_on AS _inserted_timestamp,
file_name,
{{ partition_function }} AS {{ partition_name }}
FROM
TABLE(
information_schema.external_table_files(
table_name => '{{ source( "bronze_streamline", model) }}'
)
) A
)
SELECT
{{ unique_key }} AS block_number,
DATA,
_inserted_timestamp,
MD5(
CAST(
COALESCE(CAST({{ unique_key }} AS text), '' :: STRING) AS text
)
) AS id,
s.{{ partition_name }},
s.value AS VALUE
FROM
{{ source(
"bronze_streamline",
model
) }}
s
JOIN meta b
ON b.file_name = metadata$filename
AND b.{{ partition_name }} = s.{{ partition_name }}
WHERE
b.{{ partition_name }} = s.{{ partition_name }}
AND (
DATA :error :code IS NULL
OR DATA :error :code NOT IN (
'-32000',
'-32001',
'-32002',
'-32003',
'-32004',
'-32005',
'-32006',
'-32007',
'-32008',
'-32009',
'-32010',
'-32608'
)
)
{% endmacro %}

View File

@ -0,0 +1,11 @@
{% macro create_udf_rest_api() %}
CREATE
OR REPLACE EXTERNAL FUNCTION streamline.udf_rest_api(
json OBJECT
) returns ARRAY api_integration =
{% if target.name == "prod" %}
aws_blast_api AS 'insert-prod-url-here'
{% else %}
aws_blast_api_dev AS 'https://05340o05al.execute-api.us-east-1.amazonaws.com/dev/udf_bulk_rest_api'
{%- endif %};
{% endmacro %}

View File

@ -1,4 +1,4 @@
{# {% macro if_data_call_function(
{% macro if_data_call_function(
func,
target
) %}
@ -75,4 +75,4 @@
{% endif %}
{% endif %}
{% endif %}
{% endmacro %} #}
{% endmacro %}

View File

@ -0,0 +1,78 @@
{{ config(
materialized = 'view',
persist_docs ={ "relation": true,
"columns": true }
) }}
SELECT
A.block_number AS block_number,
block_timestamp,
'testnet' AS network,
'blast' AS blockchain,
d.tx_count,
difficulty,
total_difficulty,
extra_data,
gas_limit,
gas_used,
HASH,
parent_hash,
receipts_root,
sha3_uncles,
SIZE,
uncles AS uncle_blocks,
withdrawals_root,
OBJECT_CONSTRUCT(
'baseFeePerGas',
base_fee_per_gas,
'difficulty',
difficulty,
'extraData',
extra_data,
'gasLimit',
gas_limit,
'gasUsed',
gas_used,
'hash',
HASH,
'logsBloom',
logs_bloom,
'miner',
miner,
'nonce',
nonce,
'number',
NUMBER,
'parentHash',
parent_hash,
'receiptsRoot',
receipts_root,
'sha3Uncles',
sha3_uncles,
'size',
SIZE,
'stateRoot',
state_root,
'timestamp',
block_timestamp,
'totalDifficulty',
total_difficulty,
'transactionsRoot',
transactions_root,
'uncles',
uncles
) AS block_header_json,
blocks_id AS fact_blocks_id,
GREATEST(
A.inserted_timestamp,
d.inserted_timestamp
) AS inserted_timestamp,
GREATEST(
A.modified_timestamp,
d.modified_timestamp
) AS modified_timestamp
FROM
{{ ref('silver__blocks') }} A
LEFT JOIN {{ ref('silver__tx_count') }}
d
ON A.block_number = d.block_number

View File

@ -0,0 +1,48 @@
version: 2
models:
- name: core__fact_blocks
description: '{{ doc("blast_blocks_table_doc") }}'
columns:
- name: BLOCK_NUMBER
description: '{{ doc("blast_block_number") }}'
- name: BLOCK_TIMESTAMP
description: '{{ doc("blast_block_timestamp") }}'
- name: NETWORK
description: '{{ doc("blast_network") }}'
- name: BLOCKCHAIN
description: '{{ doc("blast_blockchain") }}'
- name: TX_COUNT
description: '{{ doc("blast_tx_count") }}'
- name: DIFFICULTY
description: '{{ doc("blast_difficulty") }}'
- name: TOTAL_DIFFICULTY
description: '{{ doc("blast_total_difficulty") }}'
- name: EXTRA_DATA
description: '{{ doc("blast_extra_data") }}'
- name: GAS_LIMIT
description: '{{ doc("blast_gas_limit") }}'
- name: GAS_USED
description: '{{ doc("blast_gas_used") }}'
- name: HASH
description: '{{ doc("blast_blocks_hash") }}'
- name: PARENT_HASH
description: '{{ doc("blast_parent_hash") }}'
- name: RECEIPTS_ROOT
description: '{{ doc("blast_receipts_root") }}'
- name: SHA3_UNCLES
description: '{{ doc("blast_sha3_uncles") }}'
- name: SIZE
description: '{{ doc("blast_size") }}'
- name: UNCLE_BLOCKS
description: '{{ doc("blast_uncle_blocks") }}'
- name: BLOCK_HEADER_JSON
description: '{{ doc("blast_block_header_json") }}'
- name: FACT_BLOCKS_ID
description: '{{ doc("pk") }}'
- name: INSERTED_TIMESTAMP
description: '{{ doc("inserted_timestamp") }}'
- name: MODIFIED_TIMESTAMP
description: '{{ doc("modified_timestamp") }}'
- name: WITHDRAWALS_ROOT
description: A hash of the withdrawals that occurred in this block.

View File

@ -0,0 +1,25 @@
{{ config(
materialized = 'view',
persist_docs ={ "relation": true,
"columns": true }
) }}
SELECT
block_number,
block_timestamp,
tx_hash,
origin_function_signature,
origin_from_address,
origin_to_address,
event_index,
contract_address,
topics,
DATA,
event_removed,
tx_status,
_log_id,
logs_id AS fact_event_logs_id,
inserted_timestamp,
modified_timestamp
FROM
{{ ref('silver__logs') }}

View File

@ -0,0 +1,38 @@
version: 2
models:
- name: core__fact_event_logs
description: '{{ doc("blast_logs_table_doc") }}'
columns:
- name: BLOCK_NUMBER
description: '{{ doc("blast_block_number") }}'
- name: BLOCK_TIMESTAMP
description: '{{ doc("blast_block_timestamp") }}'
- name: TX_HASH
description: '{{ doc("blast_logs_tx_hash") }}'
- name: EVENT_INDEX
description: '{{ doc("blast_event_index") }}'
- name: CONTRACT_ADDRESS
description: '{{ doc("blast_logs_contract_address") }}'
- name: TOPICS
description: '{{ doc("blast_topics") }}'
- name: DATA
description: '{{ doc("blast_logs_data") }}'
- name: EVENT_REMOVED
description: '{{ doc("blast_event_removed") }}'
- name: _LOG_ID
description: '{{ doc("internal_column") }}'
- name: TX_STATUS
description: '{{ doc("blast_tx_status") }}'
- name: ORIGIN_FUNCTION_SIGNATURE
description: '{{ doc("blast_origin_sig") }}'
- name: ORIGIN_FROM_ADDRESS
description: '{{ doc("blast_origin_from") }}'
- name: ORIGIN_TO_ADDRESS
description: '{{ doc("blast_origin_to") }}'
- name: FACT_EVENT_LOGS_ID
description: '{{ doc("pk") }}'
- name: INSERTED_TIMESTAMP
description: '{{ doc("inserted_timestamp") }}'
- name: MODIFIED_TIMESTAMP
description: '{{ doc("modified_timestamp") }}'

View File

@ -0,0 +1,32 @@
{{ config(
materialized = 'view',
persist_docs ={ "relation": true,
"columns": true }
) }}
SELECT
tx_hash,
block_number,
block_timestamp,
from_address,
to_address,
eth_value AS VALUE,
eth_value_precise_raw AS value_precise_raw,
eth_value_precise AS value_precise,
gas,
gas_used,
input,
output,
TYPE,
identifier,
DATA,
tx_status,
sub_traces,
trace_status,
error_reason,
trace_index,
traces_id AS fact_traces_id,
inserted_timestamp,
modified_timestamp
FROM
{{ ref('silver__traces') }}

View File

@ -0,0 +1,56 @@
version: 2
models:
- name: core__fact_traces
description: '{{ doc("blast_traces_table_doc") }}'
columns:
- name: BLOCK_NUMBER
description: '{{ doc("blast_traces_block_no") }}'
- name: BLOCK_TIMESTAMP
description: '{{ doc("blast_traces_blocktime") }}'
- name: TX_HASH
description: '{{ doc("blast_traces_tx_hash") }}'
- name: FROM_ADDRESS
description: '{{ doc("blast_traces_from") }}'
- name: TO_ADDRESS
description: '{{ doc("blast_traces_to") }}'
- name: VALUE
description: '{{ doc("blast_traces_value") }}'
- name: VALUE_PRECISE_RAW
description: '{{ doc("precise_amount_unadjusted") }}'
- name: VALUE_PRECISE
description: '{{ doc("precise_amount_adjusted") }}'
- name: GAS
description: '{{ doc("blast_traces_gas") }}'
- name: GAS_USED
description: '{{ doc("blast_traces_gas_used") }}'
- name: INPUT
description: '{{ doc("blast_traces_input") }}'
- name: OUTPUT
description: '{{ doc("blast_traces_output") }}'
- name: TYPE
description: '{{ doc("blast_traces_type") }}'
- name: IDENTIFIER
description: '{{ doc("blast_traces_identifier") }}'
- name: DATA
description: '{{ doc("blast_traces_call_data") }}'
- name: TX_STATUS
description: '{{ doc("blast_tx_status") }}'
- name: SUB_TRACES
description: '{{ doc("blast_traces_sub") }}'
- name: TRACE_STATUS
description: The status of the trace, either `SUCCESS` or `FAIL`
- name: ERROR_REASON
description: The reason for the trace failure, if any.
- name: TRACE_INDEX
description: The index of the trace within the transaction.
- name: FACT_TRACES_ID
description: '{{ doc("pk") }}'
- name: INSERTED_TIMESTAMP
description: '{{ doc("inserted_timestamp") }}'
- name: MODIFIED_TIMESTAMP
description: '{{ doc("modified_timestamp") }}'

View File

@ -0,0 +1,45 @@
{{ config(
materialized = 'view',
persist_docs ={ "relation": true,
"columns": true }
) }}
SELECT
block_number,
block_timestamp,
block_hash,
tx_hash,
nonce,
POSITION,
origin_function_signature,
from_address,
to_address,
VALUE,
value_precise_raw,
value_precise,
tx_fee,
tx_fee_precise,
gas_price,
effective_gas_price,
gas AS gas_limit,
gas_used,
l1_gas_price,
l1_gas_used,
l1_fee_scalar,
l1_fee,
l1_fee_precise,
cumulative_gas_used,
max_fee_per_gas,
max_priority_fee_per_gas,
input_data,
tx_status AS status,
r,
s,
v,
deposit_nonce,
deposit_receipt_version,
transactions_id AS fact_transactions_id,
inserted_timestamp,
modified_timestamp
FROM
{{ ref('silver__transactions') }}

View File

@ -0,0 +1,78 @@
version: 2
models:
- name: core__fact_transactions
description: '{{ doc("blast_tx_table_doc") }}'
columns:
- name: BLOCK_NUMBER
description: '{{ doc("blast_block_number") }}'
- name: BLOCK_TIMESTAMP
description: '{{ doc("blast_block_timestamp") }}'
- name: BLOCK_HASH
description: '{{ doc("blast_tx_block_hash") }}'
- name: TX_HASH
description: '{{ doc("blast_tx_hash") }}'
- name: NONCE
description: '{{ doc("blast_tx_nonce") }}'
- name: POSITION
description: '{{ doc("blast_tx_position") }}'
- name: FROM_ADDRESS
description: '{{ doc("blast_from_address") }}'
- name: TO_ADDRESS
description: '{{ doc("blast_to_address") }}'
- name: VALUE
description: '{{ doc("blast_value") }}'
- name: VALUE_PRECISE_RAW
description: '{{ doc("precise_amount_unadjusted") }}'
- name: VALUE_PRECISE
description: '{{ doc("precise_amount_adjusted") }}'
- name: TX_FEE
description: '{{ doc("blast_tx_fee") }}'
- name: TX_FEE_PRECISE
description: '{{ doc("tx_fee_precise") }}'
- name: GAS_PRICE
description: '{{ doc("blast_tx_gas_price") }}'
- name: EFFECTIVE_GAS_PRICE
description: The total base charge plus tip paid for each unit of gas, in Gwei.
- name: GAS_LIMIT
description: '{{ doc("blast_tx_gas_limit") }}'
- name: GAS_USED
description: '{{ doc("blast_tx_gas_used") }}'
- name: L1_GAS_PRICE
description: '{{ doc("blast_l1_gas_price") }}'
- name: L1_GAS_USED
description: '{{ doc("blast_l1_gas_used") }}'
- name: L1_FEE_SCALAR
description: '{{ doc("blast_l1_fee_scalar") }}'
- name: L1_FEE
description: The L1 portion of fees paid.
- name: L1_FEE_PRECISE
description: '{{ doc("tx_fee_precise") }}'
- name: CUMULATIVE_GAS_USED
description: '{{ doc("blast_cumulative_gas_used") }}'
- name: MAX_FEE_PER_GAS
description: The maximum fee per gas of the transaction, in Gwei.
- name: MAX_PRIORITY_FEE_PER_GAS
description: The maximum priority fee per gas of the transaction, in Gwei.
- name: STATUS
description: '{{ doc("blast_tx_status") }}'
- name: INPUT_DATA
description: '{{ doc("blast_tx_input_data") }}'
- name: ORIGIN_FUNCTION_SIGNATURE
description: '{{ doc("blast_tx_origin_sig") }}'
- name: R
description: The r value of the transaction signature.
- name: S
description: The s value of the transaction signature.
- name: V
description: The v value of the transaction signature.
- name: FACT_TRANSACTIONS_ID
description: '{{ doc("pk") }}'
- name: INSERTED_TIMESTAMP
description: '{{ doc("inserted_timestamp") }}'
- name: MODIFIED_TIMESTAMP
description: '{{ doc("modified_timestamp") }}'
- name: DEPOSIT_NONCE
description: The nonce of the deposit transaction.
- name: DEPOSIT_RECEIPT_VERSION
description: The version of the deposit receipt.

View File

@ -0,0 +1,161 @@
{{ config(
materialized = 'incremental',
unique_key = 'test_timestamp',
full_refresh = false,
tags = ['observability']
) }}
WITH summary_stats AS (
SELECT
MIN(block_number) AS min_block,
MAX(block_number) AS max_block,
MIN(block_timestamp) AS min_block_timestamp,
MAX(block_timestamp) AS max_block_timestamp,
COUNT(1) AS blocks_tested
FROM
{{ ref('silver__blocks') }}
WHERE
block_timestamp <= DATEADD('hour', -12, CURRENT_TIMESTAMP())
{% if is_incremental() %}
AND (
block_number >= (
SELECT
MIN(block_number)
FROM
(
SELECT
MIN(block_number) AS block_number
FROM
{{ ref('silver__blocks') }}
WHERE
block_timestamp BETWEEN DATEADD('hour', -96, CURRENT_TIMESTAMP())
AND DATEADD('hour', -95, CURRENT_TIMESTAMP())
UNION
SELECT
MIN(VALUE) - 1 AS block_number
FROM
(
SELECT
blocks_impacted_array
FROM
{{ this }}
qualify ROW_NUMBER() over (
ORDER BY
test_timestamp DESC
) = 1
),
LATERAL FLATTEN(
input => blocks_impacted_array
)
)
) {% if var('OBSERV_FULL_TEST') %}
OR block_number >= 0
{% endif %}
)
{% endif %}
),
block_range AS (
SELECT
_id AS block_number
FROM
{{ ref('silver__number_sequence') }}
WHERE
_id BETWEEN (
SELECT
min_block
FROM
summary_stats
)
AND (
SELECT
max_block
FROM
summary_stats
)
),
blocks AS (
SELECT
l.block_number,
block_timestamp,
LAG(
l.block_number,
1
) over (
ORDER BY
l.block_number ASC
) AS prev_BLOCK_NUMBER
FROM
{{ ref("silver__blocks") }}
l
INNER JOIN block_range b
ON l.block_number = b.block_number
AND l.block_number >= (
SELECT
MIN(block_number)
FROM
block_range
)
),
block_gen AS (
SELECT
_id AS block_number
FROM
{{ ref('silver__number_sequence') }}
WHERE
_id BETWEEN (
SELECT
MIN(block_number)
FROM
blocks
)
AND (
SELECT
MAX(block_number)
FROM
blocks
)
)
SELECT
'blocks' AS test_name,
MIN(
b.block_number
) AS min_block,
MAX(
b.block_number
) AS max_block,
MIN(
b.block_timestamp
) AS min_block_timestamp,
MAX(
b.block_timestamp
) AS max_block_timestamp,
COUNT(1) AS blocks_tested,
COUNT(
CASE
WHEN C.block_number IS NOT NULL THEN A.block_number
END
) AS blocks_impacted_count,
ARRAY_AGG(
CASE
WHEN C.block_number IS NOT NULL THEN A.block_number
END
) within GROUP (
ORDER BY
A.block_number
) AS blocks_impacted_array,
CURRENT_TIMESTAMP AS test_timestamp
FROM
block_gen A
LEFT JOIN blocks b
ON A.block_number = b.block_number
LEFT JOIN blocks C
ON A.block_number > C.prev_block_number
AND A.block_number < C.block_number
AND C.block_number - C.prev_block_number <> 1
WHERE
COALESCE(
b.block_number,
C.block_number
) IS NOT NULL

View File

@ -0,0 +1,119 @@
{{ config(
materialized = 'incremental',
unique_key = 'test_timestamp',
full_refresh = false,
tags = ['observability']
) }}
WITH summary_stats AS (
SELECT
MIN(block_number) AS min_block,
MAX(block_number) AS max_block,
MIN(block_timestamp) AS min_block_timestamp,
MAX(block_timestamp) AS max_block_timestamp,
COUNT(1) AS blocks_tested
FROM
{{ ref('silver__blocks') }}
WHERE
block_timestamp <= DATEADD('hour', -12, CURRENT_TIMESTAMP())
{% if is_incremental() %}
AND (
block_number >= (
SELECT
MIN(block_number)
FROM
(
SELECT
MIN(block_number) AS block_number
FROM
{{ ref('silver__blocks') }}
WHERE
block_timestamp BETWEEN DATEADD('hour', -96, CURRENT_TIMESTAMP())
AND DATEADD('hour', -95, CURRENT_TIMESTAMP())
UNION
SELECT
MIN(VALUE) - 1 AS block_number
FROM
(
SELECT
blocks_impacted_array
FROM
{{ this }}
qualify ROW_NUMBER() over (
ORDER BY
test_timestamp DESC
) = 1
),
LATERAL FLATTEN(
input => blocks_impacted_array
)
)
) {% if var('OBSERV_FULL_TEST') %}
OR block_number >= 0
{% endif %}
)
{% endif %}
),
block_range AS (
SELECT
_id AS block_number
FROM
{{ ref('silver__number_sequence') }}
WHERE
_id BETWEEN (
SELECT
min_block
FROM
summary_stats
)
AND (
SELECT
max_block
FROM
summary_stats
)
),
broken_blocks AS (
SELECT
DISTINCT block_number
FROM
{{ ref("silver__receipts") }}
r
LEFT JOIN {{ ref("silver__logs") }}
l USING (
block_number,
tx_hash
)
JOIN block_range USING (block_number)
WHERE
l.tx_hash IS NULL
AND ARRAY_SIZE(
r.logs
) > 0
),
impacted_blocks AS (
SELECT
COUNT(1) AS blocks_impacted_count,
ARRAY_AGG(block_number) within GROUP (
ORDER BY
block_number
) AS blocks_impacted_array
FROM
broken_blocks
)
SELECT
'event_logs' AS test_name,
min_block,
max_block,
min_block_timestamp,
max_block_timestamp,
blocks_tested,
blocks_impacted_count,
blocks_impacted_array,
CURRENT_TIMESTAMP() AS test_timestamp
FROM
summary_stats
JOIN impacted_blocks
ON 1 = 1

View File

@ -0,0 +1,117 @@
{{ config(
materialized = 'incremental',
unique_key = 'test_timestamp',
full_refresh = false,
tags = ['observability']
) }}
WITH summary_stats AS (
SELECT
MIN(block_number) AS min_block,
MAX(block_number) AS max_block,
MIN(block_timestamp) AS min_block_timestamp,
MAX(block_timestamp) AS max_block_timestamp,
COUNT(1) AS blocks_tested
FROM
{{ ref('silver__blocks') }}
WHERE
block_timestamp <= DATEADD('hour', -12, CURRENT_TIMESTAMP())
{% if is_incremental() %}
AND (
block_number >= (
SELECT
MIN(block_number)
FROM
(
SELECT
MIN(block_number) AS block_number
FROM
{{ ref('silver__blocks') }}
WHERE
block_timestamp BETWEEN DATEADD('hour', -96, CURRENT_TIMESTAMP())
AND DATEADD('hour', -95, CURRENT_TIMESTAMP())
UNION
SELECT
MIN(VALUE) - 1 AS block_number
FROM
(
SELECT
blocks_impacted_array
FROM
{{ this }}
qualify ROW_NUMBER() over (
ORDER BY
test_timestamp DESC
) = 1
),
LATERAL FLATTEN(
input => blocks_impacted_array
)
)
) {% if var('OBSERV_FULL_TEST') %}
OR block_number >= 0
{% endif %}
)
{% endif %}
),
block_range AS (
SELECT
_id AS block_number
FROM
{{ ref('silver__number_sequence') }}
WHERE
_id BETWEEN (
SELECT
min_block
FROM
summary_stats
)
AND (
SELECT
max_block
FROM
summary_stats
)
),
broken_blocks AS (
SELECT
DISTINCT block_number
FROM
{{ ref("silver__transactions") }}
t
LEFT JOIN {{ ref("silver__receipts") }}
r USING (
block_number,
tx_hash,
block_hash
)
JOIN block_range USING (block_number)
WHERE
r.tx_hash IS NULL
),
impacted_blocks AS (
SELECT
COUNT(1) AS blocks_impacted_count,
ARRAY_AGG(block_number) within GROUP (
ORDER BY
block_number
) AS blocks_impacted_array
FROM
broken_blocks
)
SELECT
'receipts' AS test_name,
min_block,
max_block,
min_block_timestamp,
max_block_timestamp,
blocks_tested,
blocks_impacted_count,
blocks_impacted_array,
CURRENT_TIMESTAMP() AS test_timestamp
FROM
summary_stats
JOIN impacted_blocks
ON 1 = 1

View File

@ -0,0 +1,116 @@
{{ config(
materialized = 'incremental',
unique_key = 'test_timestamp',
full_refresh = false,
tags = ['observability']
) }}
WITH summary_stats AS (
SELECT
MIN(block_number) AS min_block,
MAX(block_number) AS max_block,
MIN(block_timestamp) AS min_block_timestamp,
MAX(block_timestamp) AS max_block_timestamp,
COUNT(1) AS blocks_tested
FROM
{{ ref('silver__blocks') }}
WHERE
block_timestamp <= DATEADD('hour', -12, CURRENT_TIMESTAMP())
{% if is_incremental() %}
AND (
block_number >= (
SELECT
MIN(block_number)
FROM
(
SELECT
MIN(block_number) AS block_number
FROM
{{ ref('silver__blocks') }}
WHERE
block_timestamp BETWEEN DATEADD('hour', -96, CURRENT_TIMESTAMP())
AND DATEADD('hour', -95, CURRENT_TIMESTAMP())
UNION
SELECT
MIN(VALUE) - 1 AS block_number
FROM
(
SELECT
blocks_impacted_array
FROM
{{ this }}
qualify ROW_NUMBER() over (
ORDER BY
test_timestamp DESC
) = 1
),
LATERAL FLATTEN(
input => blocks_impacted_array
)
)
) {% if var('OBSERV_FULL_TEST') %}
OR block_number >= 0
{% endif %}
)
{% endif %}
),
block_range AS (
SELECT
_id AS block_number
FROM
{{ ref('silver__number_sequence') }}
WHERE
_id BETWEEN (
SELECT
min_block
FROM
summary_stats
)
AND (
SELECT
max_block
FROM
summary_stats
)
),
broken_blocks AS (
SELECT
DISTINCT block_number
FROM
{{ ref("silver__transactions") }}
tx
LEFT JOIN {{ ref("silver__traces") }}
tr USING (
block_number,
tx_hash
)
JOIN block_range USING (block_number)
WHERE
tr.tx_hash IS NULL
),
impacted_blocks AS (
SELECT
COUNT(1) AS blocks_impacted_count,
ARRAY_AGG(block_number) within GROUP (
ORDER BY
block_number
) AS blocks_impacted_array
FROM
broken_blocks
)
SELECT
'traces' AS test_name,
min_block,
max_block,
min_block_timestamp,
max_block_timestamp,
blocks_tested,
blocks_impacted_count,
blocks_impacted_array,
CURRENT_TIMESTAMP() AS test_timestamp
FROM
summary_stats
JOIN impacted_blocks
ON 1 = 1

View File

@ -0,0 +1,117 @@
{{ config(
materialized = 'incremental',
unique_key = 'test_timestamp',
full_refresh = false,
tags = ['observability']
) }}
WITH summary_stats AS (
SELECT
MIN(block_number) AS min_block,
MAX(block_number) AS max_block,
MIN(block_timestamp) AS min_block_timestamp,
MAX(block_timestamp) AS max_block_timestamp,
COUNT(1) AS blocks_tested
FROM
{{ ref('silver__blocks') }}
WHERE
block_timestamp <= DATEADD('hour', -12, CURRENT_TIMESTAMP())
{% if is_incremental() %}
AND (
block_number >= (
SELECT
MIN(block_number)
FROM
(
SELECT
MIN(block_number) AS block_number
FROM
{{ ref('silver__blocks') }}
WHERE
block_timestamp BETWEEN DATEADD('hour', -96, CURRENT_TIMESTAMP())
AND DATEADD('hour', -95, CURRENT_TIMESTAMP())
UNION
SELECT
MIN(VALUE) - 1 AS block_number
FROM
(
SELECT
blocks_impacted_array
FROM
{{ this }}
qualify ROW_NUMBER() over (
ORDER BY
test_timestamp DESC
) = 1
),
LATERAL FLATTEN(
input => blocks_impacted_array
)
)
) {% if var('OBSERV_FULL_TEST') %}
OR block_number >= 0
{% endif %}
)
{% endif %}
),
block_range AS (
SELECT
_id AS block_number
FROM
{{ ref('silver__number_sequence') }}
WHERE
_id BETWEEN (
SELECT
min_block
FROM
summary_stats
)
AND (
SELECT
max_block
FROM
summary_stats
)
),
broken_blocks AS (
SELECT
DISTINCT block_number
FROM
{{ ref("silver__confirmed_blocks") }}
b
LEFT JOIN {{ ref("silver__transactions") }}
t USING (
block_number,
tx_hash,
block_hash
)
JOIN block_range USING (block_number)
WHERE
t.tx_hash IS NULL
),
impacted_blocks AS (
SELECT
COUNT(1) AS blocks_impacted_count,
ARRAY_AGG(block_number) within GROUP (
ORDER BY
block_number
) AS blocks_impacted_array
FROM
broken_blocks
)
SELECT
'transactions' AS test_name,
min_block,
max_block,
min_block_timestamp,
max_block_timestamp,
blocks_tested,
blocks_impacted_count,
blocks_impacted_array,
CURRENT_TIMESTAMP() AS test_timestamp
FROM
summary_stats
JOIN impacted_blocks
ON 1 = 1

View File

@ -1,4 +1,4 @@
-- depends_on: {{ ref('bronze__streamline_blocks') }}
-- depends_on: {{ ref('bronze__streamline_blocks_testnet') }}
{{ config(
materialized = 'incremental',
unique_key = "block_number",
@ -12,48 +12,48 @@ SELECT
DATA,
block_number,
utils.udf_hex_to_int(
DATA :baseFeePerGas :: STRING
DATA :result :baseFeePerGas :: STRING
) :: INT AS base_fee_per_gas,
utils.udf_hex_to_int(
DATA :difficulty :: STRING
DATA :result :difficulty :: STRING
) :: INT AS difficulty,
DATA :extraData :: STRING AS extra_data,
DATA :result :extraData :: STRING AS extra_data,
utils.udf_hex_to_int(
DATA :gasLimit :: STRING
DATA :result :gasLimit :: STRING
) :: INT AS gas_limit,
utils.udf_hex_to_int(
DATA :gasUsed :: STRING
DATA :result :gasUsed :: STRING
) :: INT AS gas_used,
DATA :hash :: STRING AS HASH,
DATA :logsBloom :: STRING AS logs_bloom,
DATA :miner :: STRING AS miner,
DATA :mixHash :: STRING AS mixHash,
DATA :result :hash :: STRING AS HASH,
DATA :result :logsBloom :: STRING AS logs_bloom,
DATA :result :miner :: STRING AS miner,
DATA :result :mixHash :: STRING AS mixHash,
utils.udf_hex_to_int(
DATA :nonce :: STRING
DATA :result :nonce :: STRING
) :: INT AS nonce,
utils.udf_hex_to_int(
DATA :number :: STRING
DATA :result :number :: STRING
) :: INT AS NUMBER,
DATA :parentHash :: STRING AS parent_hash,
DATA :receiptsRoot :: STRING AS receipts_root,
DATA :sha3Uncles :: STRING AS sha3_uncles,
DATA :result :parentHash :: STRING AS parent_hash,
DATA :result :receiptsRoot :: STRING AS receipts_root,
DATA :result :sha3Uncles :: STRING AS sha3_uncles,
utils.udf_hex_to_int(
DATA :size :: STRING
DATA :result :size :: STRING
) :: INT AS SIZE,
DATA :stateRoot :: STRING AS state_root,
DATA :result :stateRoot :: STRING AS state_root,
utils.udf_hex_to_int(
DATA :timestamp :: STRING
DATA :result :timestamp :: STRING
) :: TIMESTAMP AS block_timestamp,
utils.udf_hex_to_int(
DATA :totalDifficulty :: STRING
DATA :result :totalDifficulty :: STRING
) :: INT AS total_difficulty,
ARRAY_SIZE(
DATA :transactions
DATA :result :transactions
) AS tx_count,
DATA :transactionsRoot :: STRING AS transactions_root,
DATA :uncles AS uncles,
DATA :withdrawals AS withdrawals,
DATA :withdrawalsRoot :: STRING AS withdrawals_root,
DATA :result :transactionsRoot :: STRING AS transactions_root,
DATA :result :uncles AS uncles,
DATA :result :withdrawals AS withdrawals,
DATA :result :withdrawalsRoot :: STRING AS withdrawals_root,
_inserted_timestamp,
{{ dbt_utils.generate_surrogate_key(
['block_number']
@ -64,7 +64,7 @@ SELECT
FROM
{% if is_incremental() %}
{{ ref('bronze__streamline_blocks') }}
{{ ref('bronze__streamline_blocks_testnet') }}
WHERE
_inserted_timestamp >= (
SELECT
@ -73,7 +73,7 @@ WHERE
{{ this }}
)
{% else %}
{{ ref('bronze__streamline_FR_blocks') }}
{{ ref('bronze__streamline_FR_blocks_testnet') }}
{% endif %}
qualify(ROW_NUMBER() over (PARTITION BY block_number

View File

@ -0,0 +1,50 @@
-- depends_on: {{ ref('bronze__streamline_confirm_blocks_testnet') }}
{{ config(
materialized = 'incremental',
incremental_strategy = 'delete+insert',
unique_key = "block_number",
cluster_by = "round(block_number,-3)",
tags = ['realtime']
) }}
WITH base AS (
SELECT
block_number,
DATA :result :hash :: STRING AS block_hash,
DATA :result :transactions txs,
_inserted_timestamp
FROM
{% if is_incremental() %}
{{ ref('bronze__streamline_confirm_blocks_testnet') }}
WHERE
_inserted_timestamp >= (
SELECT
IFNULL(
MAX(
_inserted_timestamp
),
'1970-01-01' :: TIMESTAMP
) _inserted_timestamp
FROM
{{ this }}
)
{% else %}
{{ ref('bronze__streamline_FR_confirm_blocks_testnet') }}
{% endif %}
qualify(ROW_NUMBER() over (PARTITION BY block_number
ORDER BY
_inserted_timestamp DESC)) = 1
)
SELECT
block_number,
block_hash,
VALUE :: STRING AS tx_hash,
_inserted_timestamp
FROM
base,
LATERAL FLATTEN (
input => txs
)

View File

@ -1,4 +1,4 @@
-- depends_on: {{ ref('bronze__streamline_receipts') }}
-- depends_on: {{ ref('bronze__streamline_receipts_testnet') }}
{{ config(
materialized = 'incremental',
incremental_strategy = 'delete+insert',
@ -17,7 +17,7 @@ WITH base AS (
FROM
{% if is_incremental() %}
{{ ref('bronze__streamline_receipts') }}
{{ ref('bronze__streamline_receipts_testnet') }}
WHERE
_inserted_timestamp >= (
SELECT
@ -27,7 +27,7 @@ WHERE
)
AND IS_OBJECT(DATA)
{% else %}
{{ ref('bronze__streamline_FR_receipts') }}
{{ ref('bronze__streamline_FR_receipts_testnet') }}
WHERE
IS_OBJECT(DATA)
{% endif %}

View File

@ -1,4 +1,4 @@
-- depends_on: {{ ref('bronze__streamline_traces') }}
-- depends_on: {{ ref('bronze__streamline_traces_testnet') }}
{{ config (
materialized = "incremental",
incremental_strategy = 'delete+insert',
@ -12,15 +12,13 @@ WITH bronze_traces AS (
SELECT
block_number,
tx_position,
full_traces,
{# VALUE :array_index :: INT AS tx_position,
DATA :result AS full_traces, #}
VALUE :array_index :: INT AS tx_position,
DATA :result AS full_traces,
_inserted_timestamp
FROM
{% if is_incremental() %}
{{ ref('bronze__streamline_traces') }}
{{ ref('bronze__streamline_traces_testnet') }}
WHERE
_inserted_timestamp >= (
SELECT
@ -28,13 +26,13 @@ WHERE
FROM
{{ this }}
)
{# AND DATA :result IS NOT NULL #}
AND DATA :result IS NOT NULL
{% else %}
{{ ref('bronze__streamline_FR_traces') }}
{# WHERE
_partition_by_block_id <= 2300000
AND
DATA :result IS NOT NULL #}
{{ ref('bronze__streamline_FR_traces_testnet') }}
WHERE
{# _partition_by_block_id <= 2300000
AND #}
DATA :result IS NOT NULL
{% endif %}
qualify(ROW_NUMBER() over (PARTITION BY block_number, tx_position

View File

@ -1,4 +1,4 @@
-- depends_on: {{ ref('bronze__streamline_transactions') }}
-- depends_on: {{ ref('bronze__streamline_transactions_testnet') }}
{{ config(
materialized = 'incremental',
incremental_strategy = 'delete+insert',
@ -17,7 +17,7 @@ WITH base AS (
FROM
{% if is_incremental() %}
{{ ref('bronze__streamline_transactions') }}
{{ ref('bronze__streamline_transactions_testnet') }}
WHERE
_inserted_timestamp >= (
SELECT
@ -27,7 +27,7 @@ WHERE
)
AND IS_OBJECT(DATA)
{% else %}
{{ ref('bronze__streamline_FR_transactions') }}
{{ ref('bronze__streamline_FR_transactions_testnet') }}
WHERE
IS_OBJECT(DATA)
{% endif %}
@ -75,11 +75,14 @@ base_tx AS (
1,
10
) AS origin_function_signature,
TRY_TO_NUMBER(
utils.udf_hex_to_int(
utils.udf_hex_to_int(
DATA :mint :: STRING
)
) AS mint,
) AS mint_precise_raw,
utils.udf_decimal_adjust(
mint_precise_raw,
18
) AS mint_precise,
mint_precise :: FLOAT AS mint,
TRY_TO_NUMBER(
utils.udf_hex_to_int(
DATA :maxFeePerGas :: STRING
@ -141,6 +144,8 @@ new_records AS (
t.tx_hash,
t.input_data,
t.origin_function_signature,
t.mint_precise_raw,
t.mint_precise,
t.mint,
t.max_fee_per_gas,
t.max_priority_fee_per_gas,
@ -222,6 +227,8 @@ missing_data AS (
t.tx_hash,
t.input_data,
t.origin_function_signature,
t.mint_precise_raw,
t.mint_precise,
t.mint,
t.max_fee_per_gas,
t.max_priority_fee_per_gas,
@ -296,6 +303,8 @@ FINAL AS (
tx_hash,
input_data,
origin_function_signature,
mint_precise_raw,
mint_precise,
mint,
max_fee_per_gas,
max_priority_fee_per_gas,
@ -344,6 +353,8 @@ SELECT
tx_hash,
input_data,
origin_function_signature,
mint_precise_raw,
mint_precise,
mint,
max_fee_per_gas,
max_priority_fee_per_gas,
@ -391,6 +402,8 @@ SELECT
tx_hash,
input_data,
origin_function_signature,
mint_precise_raw,
mint_precise,
mint,
max_fee_per_gas,
max_priority_fee_per_gas,

View File

@ -0,0 +1,9 @@
{{ config (
materialized = 'view',
tags = ['full_test']
) }}
SELECT
*
FROM
{{ ref('silver__blocks') }}

View File

@ -0,0 +1,95 @@
version: 2
models:
- name: test_silver__blocks_full
tests:
- dbt_utils.unique_combination_of_columns:
combination_of_columns:
- BLOCK_NUMBER
- fsc_utils.sequence_gaps:
column_name: BLOCK_NUMBER
where: BLOCK_TIMESTAMP < CURRENT_DATE - 1
columns:
- name: BLOCK_NUMBER
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- NUMBER
- FLOAT
- name: BLOCK_TIMESTAMP
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- TIMESTAMP_LTZ
- TIMESTAMP_NTZ
- name: DIFFICULTY
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- NUMBER
- FLOAT
- name: TOTAL_DIFFICULTY
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- NUMBER
- FLOAT
- name: EXTRA_DATA
tests:
- not_null
- name: GAS_LIMIT
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- NUMBER
- FLOAT
- name: GAS_USED
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- NUMBER
- FLOAT
- name: HASH
tests:
- not_null
- dbt_expectations.expect_column_values_to_match_regex:
regex: 0[xX][0-9a-fA-F]+
- name: PARENT_HASH
tests:
- not_null
- dbt_expectations.expect_column_values_to_match_regex:
regex: 0[xX][0-9a-fA-F]+
- name: MINER
tests:
- not_null
- dbt_expectations.expect_column_values_to_match_regex:
regex: 0[xX][0-9a-fA-F]+
- name: NONCE
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- NUMBER
- name: RECEIPTS_ROOT
tests:
- not_null
- dbt_expectations.expect_column_values_to_match_regex:
regex: 0[xX][0-9a-fA-F]+
- name: SHA3_UNCLES
tests:
- not_null
- dbt_expectations.expect_column_values_to_match_regex:
regex: 0[xX][0-9a-fA-F]+
- name: SIZE
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- NUMBER

View File

@ -0,0 +1,23 @@
{{ config (
materialized = 'view',
tags = ['recent_test']
) }}
WITH last_3_days AS (
SELECT
block_number
FROM
{{ ref("_block_lookback") }}
)
SELECT
*
FROM
{{ ref('silver__blocks') }}
WHERE
block_number >= (
SELECT
block_number
FROM
last_3_days
)

View File

@ -0,0 +1,27 @@
version: 2
models:
- name: test_silver__blocks_recent
tests:
- dbt_utils.unique_combination_of_columns:
combination_of_columns:
- BLOCK_NUMBER
- fsc_utils.sequence_gaps:
column_name: BLOCK_NUMBER
config:
severity: error
error_if: ">10"
columns:
- name: BLOCK_NUMBER
tests:
- not_null
- name: BLOCK_TIMESTAMP
tests:
- not_null
- dbt_expectations.expect_row_values_to_have_recent_data:
datepart: hour
interval: 3
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- TIMESTAMP_LTZ
- TIMESTAMP_NTZ

View File

@ -0,0 +1,9 @@
{{ config (
materialized = 'view',
tags = ['full_test']
) }}
SELECT
*
FROM
{{ ref('silver__confirmed_blocks') }}

View File

@ -0,0 +1,34 @@
version: 2
models:
- name: test_silver__confirmed_blocks_full
tests:
- dbt_utils.unique_combination_of_columns:
combination_of_columns:
- TX_HASH
columns:
- name: BLOCK_NUMBER
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- NUMBER
- name: BLOCK_HASH
tests:
- not_null
- dbt_expectations.expect_column_values_to_match_regex:
regex: 0[xX][0-9a-fA-F]+
- name: TX_HASH
tests:
- not_null
- dbt_expectations.expect_column_values_to_match_regex:
regex: 0[xX][0-9a-fA-F]+
- name: _INSERTED_TIMESTAMP
tests:
- not_null
- dbt_expectations.expect_row_values_to_have_recent_data:
datepart: day
interval: 1
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- TIMESTAMP_NTZ
- TIMESTAMP_LTZ

View File

@ -0,0 +1,23 @@
{{ config (
materialized = 'view',
tags = ['recent_test']
) }}
WITH last_3_days AS (
SELECT
block_number
FROM
{{ ref("_block_lookback") }}
)
SELECT
*
FROM
{{ ref('silver__confirmed_blocks') }}
WHERE
block_number >= (
SELECT
block_number
FROM
last_3_days
)

View File

@ -0,0 +1,34 @@
version: 2
models:
- name: test_silver__confirmed_blocks_recent
tests:
- dbt_utils.unique_combination_of_columns:
combination_of_columns:
- TX_HASH
columns:
- name: BLOCK_NUMBER
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- NUMBER
- name: BLOCK_HASH
tests:
- not_null
- dbt_expectations.expect_column_values_to_match_regex:
regex: 0[xX][0-9a-fA-F]+
- name: TX_HASH
tests:
- not_null
- dbt_expectations.expect_column_values_to_match_regex:
regex: 0[xX][0-9a-fA-F]+
- name: _INSERTED_TIMESTAMP
tests:
- not_null
- dbt_expectations.expect_row_values_to_have_recent_data:
datepart: hour
interval: 3
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- TIMESTAMP_NTZ
- TIMESTAMP_LTZ

View File

@ -0,0 +1,9 @@
{{ config (
materialized = 'view',
tags = ['full_test']
) }}
SELECT
*
FROM
{{ ref('silver__logs') }}

View File

@ -0,0 +1,77 @@
version: 2
models:
- name: test_silver__logs_full
tests:
- dbt_utils.unique_combination_of_columns:
combination_of_columns:
- _LOG_ID
- fsc_utils.sequence_gaps:
partition_by:
- BLOCK_NUMBER
- TX_HASH
column_name: EVENT_INDEX
where: BLOCK_TIMESTAMP < CURRENT_DATE - 1
columns:
- name: BLOCK_NUMBER
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- NUMBER
- name: BLOCK_TIMESTAMP
tests:
- not_null:
where: NOT IS_PENDING
- dbt_expectations.expect_row_values_to_have_recent_data:
datepart: day
interval: 1
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- TIMESTAMP_LTZ
- TIMESTAMP_NTZ
- name: TX_HASH
tests:
- not_null
- dbt_expectations.expect_column_values_to_match_regex:
regex: 0[xX][0-9a-fA-F]+
- fsc_utils.tx_block_count:
config:
severity: error
error_if: "!=0"
- name: EVENT_INDEX
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- NUMBER
- FLOAT
- name: CONTRACT_ADDRESS
tests:
- not_null
- dbt_expectations.expect_column_values_to_match_regex:
regex: 0[xX][0-9a-fA-F]+
- name: TOPICS
tests:
- not_null
- name: DATA
tests:
- not_null
- name: EVENT_REMOVED
tests:
- not_null
- name: _LOG_ID
tests:
- not_null
- name: ORIGIN_FUNCTION_SIGNATURE
tests:
- not_null:
where: NOT IS_PENDING
- name: ORIGIN_FROM_ADDRESS
tests:
- not_null
- dbt_expectations.expect_column_values_to_match_regex:
regex: 0[xX][0-9a-fA-F]+
- name: ORIGIN_TO_ADDRESS
tests:
- dbt_expectations.expect_column_values_to_match_regex:
regex: 0[xX][0-9a-fA-F]+

View File

@ -0,0 +1,23 @@
{{ config (
materialized = 'view',
tags = ['recent_test']
) }}
WITH last_3_days AS (
SELECT
block_number
FROM
{{ ref("_block_lookback") }}
)
SELECT
*
FROM
{{ ref('silver__logs') }}
WHERE
block_number >= (
SELECT
block_number
FROM
last_3_days
)

View File

@ -0,0 +1,33 @@
version: 2
models:
- name: test_silver__logs_recent
tests:
- dbt_utils.unique_combination_of_columns:
combination_of_columns:
- _LOG_ID
- fsc_utils.sequence_gaps:
partition_by:
- BLOCK_NUMBER
- TX_HASH
column_name: EVENT_INDEX
columns:
- name: BLOCK_NUMBER
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- NUMBER
- name: BLOCK_TIMESTAMP
tests:
- not_null:
where: NOT IS_PENDING
- dbt_expectations.expect_row_values_to_have_recent_data:
datepart: hour
interval: 3
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- TIMESTAMP_LTZ
- TIMESTAMP_NTZ
- name: TX_HASH
tests:
- not_null

View File

@ -0,0 +1,9 @@
{{ config (
materialized = 'view',
tags = ['full_test']
) }}
SELECT
*
FROM
{{ ref('silver__receipts') }}

View File

@ -0,0 +1,82 @@
version: 2
models:
- name: test_silver__receipts_full
tests:
- dbt_utils.unique_combination_of_columns:
combination_of_columns:
- TX_HASH
- fsc_utils.sequence_gaps:
partition_by:
- BLOCK_NUMBER
column_name: POSITION
columns:
- name: BLOCK_NUMBER
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- NUMBER
- FLOAT
- name: TX_HASH
tests:
- not_null
- dbt_expectations.expect_column_values_to_match_regex:
regex: 0[xX][0-9a-fA-F]+
- name: POSITION
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- NUMBER
- name: FROM_ADDRESS
tests:
- not_null
- dbt_expectations.expect_column_values_to_match_regex:
regex: 0[xX][0-9a-fA-F]+
- name: TO_ADDRESS
tests:
- dbt_expectations.expect_column_values_to_match_regex:
regex: 0[xX][0-9a-fA-F]+
where: TO_ADDRESS IS NOT NULL
- name: BLOCK_HASH
tests:
- not_null
- dbt_expectations.expect_column_values_to_match_regex:
regex: 0[xX][0-9a-fA-F]+
- name: CUMULATIVE_GAS_USED
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- NUMBER
- FLOAT
- name: EFFECTIVE_GAS_PRICE
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- NUMBER
- FLOAT
- name: GAS_USED
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- NUMBER
- FLOAT
- name: TX_STATUS
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_set:
value_set: ['SUCCESS', 'FAIL']
- name: TYPE
tests:
- not_null
- name: _INSERTED_TIMESTAMP
tests:
- not_null
- dbt_expectations.expect_row_values_to_have_recent_data:
datepart: day
interval: 1

View File

@ -0,0 +1,23 @@
{{ config (
materialized = 'view',
tags = ['recent_test']
) }}
WITH last_3_days AS (
SELECT
block_number
FROM
{{ ref("_block_lookback") }}
)
SELECT
*
FROM
{{ ref('silver__receipts') }}
WHERE
block_number >= (
SELECT
block_number
FROM
last_3_days
)

View File

@ -0,0 +1,28 @@
version: 2
models:
- name: test_silver__receipts_recent
tests:
- dbt_utils.unique_combination_of_columns:
combination_of_columns:
- TX_HASH
- fsc_utils.sequence_gaps:
partition_by:
- BLOCK_NUMBER
column_name: POSITION
columns:
- name: BLOCK_NUMBER
tests:
- not_null
- name: TX_HASH
tests:
- not_null
- dbt_expectations.expect_column_values_to_match_regex:
regex: 0[xX][0-9a-fA-F]+
- name: _INSERTED_TIMESTAMP
tests:
- not_null
- dbt_expectations.expect_row_values_to_have_recent_data:
datepart: hour
interval: 3

View File

@ -0,0 +1,9 @@
{{ config (
materialized = 'view',
tags = ['full_test']
) }}
SELECT
*
FROM
{{ ref('silver__traces') }}

View File

@ -0,0 +1,59 @@
version: 2
models:
- name: test_silver__traces_full
tests:
- dbt_utils.unique_combination_of_columns:
combination_of_columns:
- BLOCK_NUMBER
- TX_POSITION
- TRACE_INDEX
columns:
- name: BLOCK_NUMBER
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- NUMBER
- FLOAT
- name: BLOCK_TIMESTAMP
tests:
- not_null:
where: NOT IS_PENDING
- dbt_expectations.expect_row_values_to_have_recent_data:
datepart: day
interval: 1
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- TIMESTAMP_LTZ
- TIMESTAMP_NTZ
- name: TX_HASH
tests:
- not_null:
where: NOT IS_PENDING
- dbt_expectations.expect_column_values_to_match_regex:
regex: 0[xX][0-9a-fA-F]+
- name: FROM_ADDRESS
tests:
- not_null:
where: TYPE <> 'SELFDESTRUCT'
- dbt_expectations.expect_column_values_to_match_regex:
regex: 0[xX][0-9a-fA-F]+
- name: TO_ADDRESS
tests:
- dbt_expectations.expect_column_values_to_match_regex:
regex: 0[xX][0-9a-fA-F]+
where: TO_ADDRESS IS NOT NULL
- name: IDENTIFIER
tests:
- not_null
- name: ETH_VALUE
tests:
- not_null
- name: GAS
tests:
- not_null
- name: GAS_USED
tests:
- not_null

View File

@ -0,0 +1,23 @@
{{ config (
materialized = 'view',
tags = ['recent_test']
) }}
WITH last_3_days AS (
SELECT
block_number
FROM
{{ ref("_block_lookback") }}
)
SELECT
*
FROM
{{ ref('silver__traces') }}
WHERE
block_number >= (
SELECT
block_number
FROM
last_3_days
)

View File

@ -0,0 +1,35 @@
version: 2
models:
- name: test_silver__traces_recent
tests:
- dbt_utils.unique_combination_of_columns:
combination_of_columns:
- BLOCK_NUMBER
- TX_POSITION
- TRACE_INDEX
columns:
- name: BLOCK_NUMBER
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- NUMBER
- FLOAT
- name: BLOCK_TIMESTAMP
tests:
- not_null:
where: NOT IS_PENDING
- dbt_expectations.expect_row_values_to_have_recent_data:
datepart: day
interval: 1
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- TIMESTAMP_LTZ
- TIMESTAMP_NTZ
- name: TX_HASH
tests:
- not_null:
where: NOT IS_PENDING
- dbt_expectations.expect_column_values_to_match_regex:
regex: 0[xX][0-9a-fA-F]+

View File

@ -0,0 +1,9 @@
{{ config (
materialized = 'view',
tags = ['full_test']
) }}
SELECT
*
FROM
{{ ref('silver__transactions') }}

View File

@ -0,0 +1,119 @@
version: 2
models:
- name: test_silver__transactions_full
tests:
- dbt_utils.unique_combination_of_columns:
combination_of_columns:
- TX_HASH
- fsc_utils.sequence_gaps:
partition_by:
- BLOCK_NUMBER
column_name: POSITION
where: BLOCK_TIMESTAMP < CURRENT_DATE - 1
columns:
- name: BLOCK_NUMBER
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- NUMBER
- FLOAT
- name: BLOCK_TIMESTAMP
tests:
- not_null
- dbt_expectations.expect_row_values_to_have_recent_data:
datepart: day
interval: 1
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- TIMESTAMP_LTZ
- TIMESTAMP_NTZ
- name: TX_HASH
tests:
- not_null
- dbt_expectations.expect_column_values_to_match_regex:
regex: 0[xX][0-9a-fA-F]+
- name: NONCE
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- NUMBER
- FLOAT
- name: POSITION
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- NUMBER
- name: FROM_ADDRESS
tests:
- not_null
- dbt_expectations.expect_column_values_to_match_regex:
regex: 0[xX][0-9a-fA-F]+
- name: TO_ADDRESS
tests:
- dbt_expectations.expect_column_values_to_match_regex:
regex: 0[xX][0-9a-fA-F]+
where: TO_ADDRESS IS NOT NULL
- name: VALUE
tests:
- not_null
- name: BLOCK_HASH
tests:
- not_null
- dbt_expectations.expect_column_values_to_match_regex:
regex: 0[xX][0-9a-fA-F]+
- name: GAS_PRICE
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- NUMBER
- FLOAT
- name: GAS
tests:
- not_null
- name: INPUT_DATA
tests:
- not_null
- name: TX_STATUS
tests:
- not_null:
where: NOT IS_PENDING
- dbt_expectations.expect_column_values_to_be_in_set:
value_set: ['SUCCESS', 'FAIL']
where: NOT IS_PENDING
- name: GAS_USED
tests:
- not_null:
where: NOT IS_PENDING
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- NUMBER
- FLOAT
- name: CUMULATIVE_GAS_USED
tests:
- not_null:
where: NOT IS_PENDING
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- NUMBER
- FLOAT
- name: TX_FEE
tests:
- not_null:
where: NOT IS_PENDING
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- NUMBER
- FLOAT
- name: EFFECTIVE_GAS_PRICE
tests:
- not_null:
where: NOT IS_PENDING
- name: ORIGIN_FUNCTION_SIGNATURE
tests:
- not_null

View File

@ -0,0 +1,23 @@
{{ config (
materialized = 'view',
tags = ['recent_test']
) }}
WITH last_3_days AS (
SELECT
block_number
FROM
{{ ref("_block_lookback") }}
)
SELECT
*
FROM
{{ ref('silver__transactions') }}
WHERE
block_number >= (
SELECT
block_number
FROM
last_3_days
)

View File

@ -0,0 +1,22 @@
version: 2
models:
- name: test_silver__transactions_recent
tests:
- dbt_utils.unique_combination_of_columns:
combination_of_columns:
- TX_HASH
- fsc_utils.sequence_gaps:
partition_by:
- BLOCK_NUMBER
column_name: POSITION
columns:
- name: BLOCK_NUMBER
tests:
- not_null
- name: BLOCK_TIMESTAMP
tests:
- not_null
- dbt_expectations.expect_row_values_to_have_recent_data:
datepart: hour
interval: 3

View File

@ -18,6 +18,11 @@ sources:
- name: debug_traceBlockByNumber
- name: decoded_logs
- name: confirm_blocks
- name: receipts_testnet
- name: blocks_testnet
- name: transactions_testnet
- name: traces_testnet
- name: confirm_blocks_testnet
- name: udfs_streamline
database: udfs
schema: streamline

View File

@ -0,0 +1,29 @@
{{ config (
materialized = 'view'
) }}
WITH num_seq AS (
SELECT
_id AS block_number
FROM
{{ ref('silver__number_sequence') }}
WHERE
_id > 1300000
AND _id <= 1300010
)
SELECT
block_number AS block_number,
utils.udf_int_to_hex(block_number) AS block_hex,
live.udf_api(
'POST',
'{blast_testnet_url}',{},{ 'method' :'eth_getBlockByNumber',
'params' :[ block_hex, False ],
'id' :1,
'jsonrpc' :'2.0' },
'quicknode_blast_testnet'
) AS resp,
resp :data :result AS DATA,
SYSDATE() AS _inserted_timestamp
FROM
num_seq

View File

@ -0,0 +1,29 @@
{{ config (
materialized = 'view'
) }}
WITH num_seq AS (
SELECT
_id AS block_number
FROM
{{ ref('silver__number_sequence') }}
WHERE
_id > 1300000
AND _id <= 1300010
)
SELECT
block_number AS block_number,
utils.udf_int_to_hex(block_number) AS block_hex,
live.udf_api(
'POST',
'{blast_testnet_url}',{},{ 'method' :'eth_getBlockByNumber',
'params' :[ block_hex, False ],
'id' :1,
'jsonrpc' :'2.0' },
'quicknode_blast_testnet'
) AS resp,
resp :data :result AS DATA,
SYSDATE() AS _inserted_timestamp
FROM
num_seq

View File

@ -0,0 +1,11 @@
{{ config (
materialized = 'view'
) }}
{# {% set model = this.identifier.split("_") [-1] %} #}
{{ streamline_external_table_FR_query(
model = "blocks_testnet",
partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 4), '_', 1) AS INTEGER )",
partition_name = "_partition_by_block_id",
unique_key = "partition_key"
) }}

View File

@ -0,0 +1,9 @@
{{ config (
materialized = 'view'
) }}
{{ streamline_external_table_FR_query(
model = "confirm_blocks_testnet",
partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 4), '_', 1) AS INTEGER )",
partition_name = "_partition_by_block_id",
unique_key = "partition_key"
) }}

View File

@ -0,0 +1,11 @@
{{ config (
materialized = 'view'
) }}
{# {% set model = this.identifier.split("_") [-1] %} #}
{{ streamline_external_table_FR_query(
model = 'receipts_testnet',
partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 4), '_', 1) AS INTEGER )",
partition_name = "_partition_by_block_id",
unique_key = "partition_key"
) }}

View File

@ -0,0 +1,11 @@
{{ config (
materialized = 'view'
) }}
{# {% set model = this.identifier.split("_") [-1] %} #}
{{ streamline_external_table_FR_query(
model = 'traces_testnet',
partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 4), '_', 1) AS INTEGER )",
partition_name = "_partition_by_block_id",
unique_key = "partition_key"
) }}

View File

@ -0,0 +1,11 @@
{{ config (
materialized = 'view'
) }}
{# {% set model = this.identifier.split("_") [-1] %} #}
{{ streamline_external_table_FR_query(
model = 'transactions_testnet',
partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 4), '_', 1) AS INTEGER )",
partition_name = "_partition_by_block_id",
unique_key = "partition_key"
) }}

View File

@ -0,0 +1,11 @@
{{ config (
materialized = 'view'
) }}
{# {% set model = this.identifier.split("_") [-1] %} #}
{{ streamline_external_table_query(
model = "blocks_testnet",
partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 4), '_', 1) AS INTEGER )",
partition_name = "_partition_by_block_id",
unique_key = "partition_key"
) }}

View File

@ -0,0 +1,9 @@
{{ config (
materialized = 'view'
) }}
{{ streamline_external_table_query(
model = "confirm_blocks_testnet",
partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 4), '_', 1) AS INTEGER )",
partition_name = "_partition_by_block_id",
unique_key = "partition_key"
) }}

View File

@ -0,0 +1,11 @@
{{ config (
materialized = 'view'
) }}
{# {% set model = this.identifier.split("_") [-1] %} #}
{{ streamline_external_table_query(
model = 'receipts_testnet',
partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 4), '_', 1) AS INTEGER )",
partition_name = "_partition_by_block_id",
unique_key = "partition_key"
) }}

View File

@ -0,0 +1,11 @@
{{ config (
materialized = 'view'
) }}
{# {% set model = this.identifier.split("_") [-1] %} #}
{{ streamline_external_table_query(
model = 'traces_testnet',
partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 4), '_', 1) AS INTEGER )",
partition_name = "_partition_by_block_id",
unique_key = "partition_key"
) }}

View File

@ -0,0 +1,11 @@
{{ config (
materialized = 'view'
) }}
{# {% set model = this.identifier.split("_") [-1] %} #}
{{ streamline_external_table_query(
model = 'transactions_testnet',
partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 4), '_', 1) AS INTEGER )",
partition_name = "_partition_by_block_id",
unique_key = "partition_key"
) }}

View File

@ -0,0 +1,11 @@
{{ config (
materialized = "ephemeral"
) }}
SELECT
COALESCE(MIN(block_number), 0) AS block_number
FROM
{{ ref("silver__blocks") }}
WHERE
block_timestamp >= DATEADD('hour', -72, TRUNCATE(SYSDATE(), 'HOUR'))
AND block_timestamp < DATEADD('hour', -71, TRUNCATE(SYSDATE(), 'HOUR'))

View File

@ -0,0 +1,27 @@
{{ config (
materialized = "ephemeral",
unique_key = "block_number",
) }}
WITH base AS (
SELECT
block_timestamp :: DATE AS block_date,
MAX(block_number) block_number
FROM
{{ ref("silver__blocks") }}
GROUP BY
block_timestamp :: DATE
)
SELECT
block_date,
block_number
FROM
base
WHERE
block_date <> (
SELECT
MAX(block_date)
FROM
base
)

View File

@ -0,0 +1,37 @@
{{ config (
materialized = "ephemeral"
) }}
WITH base AS (
SELECT
DATE_TRUNC(
'hour',
block_timestamp
) AS block_hour,
MAX(block_number) block_number
FROM
{{ ref("silver__blocks") }}
WHERE
block_timestamp > DATEADD(
'day',
-5,
CURRENT_DATE
)
GROUP BY
1
)
SELECT
block_hour,
block_number
FROM
base
WHERE
block_hour <> (
SELECT
MAX(
block_hour
)
FROM
base
)

View File

@ -0,0 +1,31 @@
-- depends_on: {{ ref('bronze__streamline_blocks_testnet') }}
{{ config (
materialized = "incremental",
unique_key = "id",
cluster_by = "ROUND(block_number, -3)",
post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION on equality(id)",
tags = ['streamline_core_complete']
) }}
SELECT
id,
block_number,
_inserted_timestamp
FROM
{% if is_incremental() %}
{{ ref('bronze__streamline_blocks_testnet') }}
WHERE
_inserted_timestamp >= (
SELECT
MAX(_inserted_timestamp) _inserted_timestamp
FROM
{{ this }}
)
{% else %}
{{ ref('bronze__streamline_FR_blocks_testnet') }}
{% endif %}
qualify(ROW_NUMBER() over (PARTITION BY id
ORDER BY
_inserted_timestamp DESC)) = 1

View File

@ -0,0 +1,29 @@
-- depends_on: {{ ref('bronze__streamline_confirm_blocks_testnet') }}
{{ config (
materialized = "incremental",
unique_key = "id",
cluster_by = "ROUND(block_number, -3)",
tags = ['streamline_core_complete']
) }}
SELECT
id,
block_number,
_inserted_timestamp
FROM
{% if is_incremental() %}
{{ ref('bronze__streamline_confirm_blocks_testnet') }}
WHERE
_inserted_timestamp >= (
SELECT
COALESCE(MAX(_inserted_timestamp), '1970-01-01' :: TIMESTAMP) _inserted_timestamp
FROM
{{ this }})
{% else %}
{{ ref('bronze__streamline_FR_confirm_blocks_testnet') }}
{% endif %}
qualify(ROW_NUMBER() over (PARTITION BY id
ORDER BY
_inserted_timestamp DESC)) = 1

View File

@ -0,0 +1,31 @@
-- depends_on: {{ ref('bronze__streamline_receipts_testnet') }}
{{ config (
materialized = "incremental",
unique_key = "id",
cluster_by = "ROUND(block_number, -3)",
post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION on equality(id)",
tags = ['streamline_core_complete']
) }}
SELECT
id,
block_number,
_inserted_timestamp
FROM
{% if is_incremental() %}
{{ ref('bronze__streamline_receipts_testnet') }}
WHERE
_inserted_timestamp >= (
SELECT
MAX(_inserted_timestamp) _inserted_timestamp
FROM
{{ this }}
)
{% else %}
{{ ref('bronze__streamline_FR_receipts_testnet') }}
{% endif %}
qualify(ROW_NUMBER() over (PARTITION BY id
ORDER BY
_inserted_timestamp DESC)) = 1

View File

@ -0,0 +1,31 @@
-- depends_on: {{ ref('bronze__streamline_traces_testnet') }}
{{ config (
materialized = "incremental",
unique_key = "id",
cluster_by = "ROUND(block_number, -3)",
post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION on equality(id)",
tags = ['streamline_core_complete']
) }}
SELECT
id,
block_number,
_inserted_timestamp
FROM
{% if is_incremental() %}
{{ ref('bronze__streamline_traces_testnet') }}
WHERE
_inserted_timestamp >= (
SELECT
MAX(_inserted_timestamp) _inserted_timestamp
FROM
{{ this }}
)
{% else %}
{{ ref('bronze__streamline_FR_traces_testnet') }}
{% endif %}
qualify(ROW_NUMBER() over (PARTITION BY id
ORDER BY
_inserted_timestamp DESC)) = 1

View File

@ -0,0 +1,31 @@
-- depends_on: {{ ref('bronze__streamline_transactions_testnet') }}
{{ config (
materialized = "incremental",
unique_key = "id",
cluster_by = "ROUND(block_number, -3)",
post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION on equality(id)",
tags = ['streamline_core_complete']
) }}
SELECT
id,
block_number,
_inserted_timestamp
FROM
{% if is_incremental() %}
{{ ref('bronze__streamline_transactions_testnet') }}
WHERE
_inserted_timestamp >= (
SELECT
MAX(_inserted_timestamp) _inserted_timestamp
FROM
{{ this }}
)
{% else %}
{{ ref('bronze__streamline_FR_transactions_testnet') }}
{% endif %}
qualify(ROW_NUMBER() over (PARTITION BY id
ORDER BY
_inserted_timestamp DESC)) = 1

View File

@ -0,0 +1,76 @@
{{ config (
materialized = "view",
post_hook = if_data_call_function(
func = "{{this.schema}}.udf_rest_api(object_construct('sql_source', '{{this.identifier}}', 'external_table', 'blocks_testnet', 'sql_limit', {{var('sql_limit','100000')}}, 'producer_batch_size', {{var('producer_batch_size','100000')}}, 'worker_batch_size', {{var('worker_batch_size','50000')}}, 'sm_secret_name','dev/blast/node/mainnet'))",
target = "{{this.schema}}.{{this.identifier}}"
),
tags = ['streamline_core_history']
) }}
WITH last_3_days AS (
SELECT
block_number
FROM
{{ ref("_block_lookback") }}
),
blocks AS (
SELECT
{{ dbt_utils.generate_surrogate_key(
['block_number']
) }} AS id,
block_number
FROM
{{ ref("streamline__blocks") }}
WHERE
block_number <= (
SELECT
block_number
FROM
last_3_days
)
EXCEPT
SELECT
id,
block_number
FROM
{{ ref("streamline__complete_blocks") }}
WHERE
block_number <= (
SELECT
block_number
FROM
last_3_days
)
)
SELECT
block_number AS partition_key,
OBJECT_CONSTRUCT(
'method',
'POST',
'url',
'{service}/{Authentication}',
'headers',
OBJECT_CONSTRUCT(
'Content-Type',
'application/json'
),
'params',
PARSE_JSON('{}'),
'data',
OBJECT_CONSTRUCT(
'id',
block_number :: STRING,
'jsonrpc',
'2.0',
'method',
'eth_getBlockByNumber',
'params',
ARRAY_CONSTRUCT(utils.udf_int_to_hex(block_number), FALSE)) :: STRING
) AS request
FROM
blocks
ORDER BY
block_number ASC
LIMIT
1000 --remove for prod

View File

@ -0,0 +1,72 @@
{{ config (
materialized = "view",
post_hook = if_data_call_function(
func = "{{this.schema}}.udf_rest_api(object_construct('sql_source', '{{this.identifier}}', 'external_table', 'receipts_testnet', 'exploded_key','[\"result\"]', 'sql_limit', {{var('sql_limit','100000')}}, 'producer_batch_size', {{var('producer_batch_size','100000')}}, 'worker_batch_size', {{var('worker_batch_size','50000')}}, 'sm_secret_name','dev/blast/node/mainnet'))",
target = "{{this.schema}}.{{this.identifier}}"
),
tags = ['streamline_core_history']
) }}
WITH last_3_days AS (
SELECT
block_number
FROM
{{ ref("_block_lookback") }}
),
blocks AS (
SELECT
block_number
FROM
{{ ref("streamline__blocks") }}
WHERE
block_number <= (
SELECT
block_number
FROM
last_3_days
)
EXCEPT
SELECT
block_number
FROM
{{ ref("streamline__complete_receipts") }}
WHERE
block_number <= (
SELECT
block_number
FROM
last_3_days
)
)
SELECT
block_number AS partition_key,
OBJECT_CONSTRUCT(
'method',
'POST',
'url',
'{service}/{Authentication}',
'headers',
OBJECT_CONSTRUCT(
'Content-Type',
'application/json'
),
'params',
PARSE_JSON('{}'),
'data',
OBJECT_CONSTRUCT(
'id',
block_number :: STRING,
'jsonrpc',
'2.0',
'method',
'eth_getBlockReceipts',
'params',
ARRAY_CONSTRUCT(utils.udf_int_to_hex(block_number))) :: STRING
) AS request
FROM
blocks
ORDER BY
block_number ASC
LIMIT
1000 --remove for prod

View File

@ -0,0 +1,73 @@
{{ config (
materialized = "view",
post_hook = if_data_call_function(
func = "{{this.schema}}.udf_rest_api(object_construct('sql_source', '{{this.identifier}}', 'external_table', 'traces_testnet', 'exploded_key','[\"result\"]', 'sql_limit', {{var('sql_limit','100000')}}, 'producer_batch_size', {{var('producer_batch_size','100000')}}, 'worker_batch_size', {{var('worker_batch_size','50000')}}, 'sm_secret_name','dev/blast/node/mainnet'))",
target = "{{this.schema}}.{{this.identifier}}"
),
tags = ['streamline_core_history']
) }}
WITH last_3_days AS (
SELECT
block_number
FROM
{{ ref("_block_lookback") }}
),
blocks AS (
SELECT
block_number
FROM
{{ ref("streamline__blocks") }}
WHERE
block_number <= (
SELECT
block_number
FROM
last_3_days
)
EXCEPT
SELECT
block_number
FROM
{{ ref("streamline__complete_traces") }}
WHERE
block_number <= (
SELECT
block_number
FROM
last_3_days
)
)
SELECT
block_number AS partition_key,
OBJECT_CONSTRUCT(
'method',
'POST',
'url',
'{service}/{Authentication}',
'headers',
OBJECT_CONSTRUCT(
'Content-Type',
'application/json'
),
'params',
PARSE_JSON('{}'),
'data',
OBJECT_CONSTRUCT(
'id',
block_number :: STRING,
'jsonrpc',
'2.0',
'method',
'debug_traceBlockByNumber',
'params',
ARRAY_CONSTRUCT(utils.udf_int_to_hex(block_number), OBJECT_CONSTRUCT('tracer', 'callTracer', 'timeout', '30s'))
) :: STRING
) AS request
FROM
blocks
ORDER BY
block_number ASC
LIMIT
1000 --remove for prod

View File

@ -0,0 +1,76 @@
{{ config (
materialized = "view",
post_hook = if_data_call_function(
func = "{{this.schema}}.udf_rest_api(object_construct('sql_source', '{{this.identifier}}', 'external_table', 'transactions_testnet', 'exploded_key','[\"result\", \"transactions\"]', 'sql_limit', {{var('sql_limit','100000')}}, 'producer_batch_size', {{var('producer_batch_size','100000')}}, 'worker_batch_size', {{var('worker_batch_size','50000')}}, 'sm_secret_name','dev/blast/node/mainnet'))",
target = "{{this.schema}}.{{this.identifier}}"
),
tags = ['streamline_core_history']
) }}
WITH last_3_days AS (
SELECT
block_number
FROM
{{ ref("_block_lookback") }}
),
blocks AS (
SELECT
{{ dbt_utils.generate_surrogate_key(
['block_number']
) }} AS id,
block_number
FROM
{{ ref("streamline__blocks") }}
WHERE
block_number <= (
SELECT
block_number
FROM
last_3_days
)
EXCEPT
SELECT
id,
block_number
FROM
{{ ref("streamline__complete_transactions") }}
WHERE
block_number <= (
SELECT
block_number
FROM
last_3_days
)
)
SELECT
block_number AS partition_key,
OBJECT_CONSTRUCT(
'method',
'POST',
'url',
'{service}/{Authentication}',
'headers',
OBJECT_CONSTRUCT(
'Content-Type',
'application/json'
),
'params',
PARSE_JSON('{}'),
'data',
OBJECT_CONSTRUCT(
'id',
block_number :: STRING,
'jsonrpc',
'2.0',
'method',
'eth_getBlockByNumber',
'params',
ARRAY_CONSTRUCT(utils.udf_int_to_hex(block_number), TRUE)) :: STRING
) AS request
FROM
blocks
ORDER BY
block_number ASC
LIMIT
1000 --remove for prod

View File

@ -0,0 +1,86 @@
{{ config (
materialized = "view",
post_hook = if_data_call_function(
func = "{{this.schema}}.udf_rest_api(object_construct('sql_source', '{{this.identifier}}', 'external_table', 'blocks_testnet', 'sql_limit', {{var('sql_limit','100000')}}, 'producer_batch_size', {{var('producer_batch_size','100000')}}, 'worker_batch_size', {{var('worker_batch_size','50000')}}, 'sm_secret_name','dev/blast/node/mainnet'))",
target = "{{this.schema}}.{{this.identifier}}"
),
tags = ['streamline_core_realtime']
) }}
WITH last_3_days AS (
SELECT
block_number
FROM
{{ ref("_block_lookback") }}
),
to_do AS (
SELECT
MD5(
CAST(
COALESCE(CAST(block_number AS text), '' :: STRING) AS text
)
) AS id,
block_number
FROM
{{ ref("streamline__blocks") }}
WHERE
(
block_number >= (
SELECT
block_number
FROM
last_3_days
)
)
AND block_number IS NOT NULL
EXCEPT
SELECT
id,
block_number
FROM
{{ ref("streamline__complete_blocks") }}
WHERE
block_number >= (
SELECT
block_number
FROM
last_3_days
)
AND _inserted_timestamp >= DATEADD(
'day',
-4,
SYSDATE()
)
)
SELECT
block_number AS partition_key,
OBJECT_CONSTRUCT(
'method',
'POST',
'url',
'{service}/{Authentication}',
'headers',
OBJECT_CONSTRUCT(
'Content-Type',
'application/json'
),
'params',
PARSE_JSON('{}'),
'data',
OBJECT_CONSTRUCT(
'id',
block_number :: STRING,
'jsonrpc',
'2.0',
'method',
'eth_getBlockByNumber',
'params',
ARRAY_CONSTRUCT(utils.udf_int_to_hex(block_number), FALSE)) :: STRING
) AS request
FROM
to_do
ORDER BY
partition_key ASC
LIMIT
1200 --remove for prod

View File

@ -0,0 +1,101 @@
{{ config (
materialized = "view",
post_hook = if_data_call_function(
func = "{{this.schema}}.udf_rest_api(object_construct('sql_source', '{{this.identifier}}', 'external_table', 'confirm_blocks_testnet', 'sql_limit', {{var('sql_limit','100000')}}, 'producer_batch_size', {{var('producer_batch_size','100000')}}, 'worker_batch_size', {{var('worker_batch_size','50000')}}, 'sm_secret_name','dev/blast/node/mainnet'))",
target = "{{this.schema}}.{{this.identifier}}"
),
tags = ['streamline_core_realtime']
) }}
WITH last_3_days AS (
SELECT
block_number
FROM
{{ ref("_block_lookback") }}
),
look_back AS (
SELECT
block_number
FROM
{{ ref("_max_block_by_hour") }}
qualify ROW_NUMBER() over (
ORDER BY
block_number DESC
) = 6
),
tbl AS (
SELECT
block_number
FROM
{{ ref("streamline__blocks") }}
WHERE
block_number IS NOT NULL
AND block_number <= (
SELECT
block_number
FROM
look_back
)
AND block_number >= (
SELECT
block_number
FROM
last_3_days
)
EXCEPT
SELECT
block_number
FROM
{{ ref("streamline__complete_confirmed_blocks") }}
WHERE
block_number IS NOT NULL
AND block_number <= (
SELECT
block_number
FROM
look_back
)
AND _inserted_timestamp >= DATEADD(
'day',
-4,
SYSDATE()
)
AND block_number >= (
SELECT
block_number
FROM
last_3_days
)
)
SELECT
block_number AS partition_key,
OBJECT_CONSTRUCT(
'method',
'POST',
'url',
'{service}/{Authentication}',
'headers',
OBJECT_CONSTRUCT(
'Content-Type',
'application/json'
),
'params',
PARSE_JSON('{}'),
'data',
OBJECT_CONSTRUCT(
'id',
block_number :: STRING,
'jsonrpc',
'2.0',
'method',
'eth_getBlockByNumber',
'params',
ARRAY_CONSTRUCT(utils.udf_int_to_hex(block_number), FALSE)) :: STRING
) AS request
FROM
tbl
ORDER BY
block_number ASC
LIMIT
1200 --remove for prod

View File

@ -0,0 +1,106 @@
{{ config (
materialized = "view",
post_hook = if_data_call_function(
func = "{{this.schema}}.udf_rest_api(object_construct('sql_source', '{{this.identifier}}', 'external_table', 'receipts_testnet', 'exploded_key','[\"result\"]', 'sql_limit', {{var('sql_limit','100000')}}, 'producer_batch_size', {{var('producer_batch_size','100000')}}, 'worker_batch_size', {{var('worker_batch_size','50000')}}, 'sm_secret_name','dev/blast/node/mainnet'))",
target = "{{this.schema}}.{{this.identifier}}"
),
tags = ['streamline_core_realtime']
) }}
WITH last_3_days AS (
SELECT
block_number
FROM
{{ ref("_block_lookback") }}
),
to_do AS (
SELECT
block_number
FROM
{{ ref("streamline__blocks") }}
WHERE
(
block_number >= (
SELECT
block_number
FROM
last_3_days
)
)
AND block_number IS NOT NULL
EXCEPT
SELECT
block_number
FROM
{{ ref("streamline__complete_receipts") }}
WHERE
block_number >= (
SELECT
block_number
FROM
last_3_days
)
AND _inserted_timestamp >= DATEADD(
'day',
-4,
SYSDATE()
)
),
ready_blocks AS (
SELECT
block_number
FROM
to_do
UNION
SELECT
block_number
FROM
(
SELECT
block_number
FROM
{{ ref("_missing_receipts") }}
UNION
SELECT
block_number
FROM
{{ ref("_missing_txs") }}
UNION
SELECT
block_number
FROM
{{ ref("_unconfirmed_blocks") }}
)
)
SELECT
block_number AS partition_key,
OBJECT_CONSTRUCT(
'method',
'POST',
'url',
'{service}/{Authentication}',
'headers',
OBJECT_CONSTRUCT(
'Content-Type',
'application/json'
),
'params',
PARSE_JSON('{}'),
'data',
OBJECT_CONSTRUCT(
'id',
block_number :: STRING,
'jsonrpc',
'2.0',
'method',
'eth_getBlockReceipts',
'params',
ARRAY_CONSTRUCT(utils.udf_int_to_hex(block_number))) :: STRING
) AS request
FROM
ready_blocks
ORDER BY
block_number ASC
LIMIT
1200 --remove for prod

Some files were not shown because too many files have changed in this diff Show More