split models

This commit is contained in:
drethereum 2025-04-16 17:36:16 -06:00
parent c7946c5a43
commit 85af1ec99a
15 changed files with 311 additions and 83 deletions

View File

@ -17,6 +17,8 @@ sources:
- name: testnet_traces
- name: testnet_confirm_blocks
- name: testnet_decoded_logs
- name: testnet_blocks_v2
- name: testnet_transactions_v2
- name: crosschain_silver
database: "{{ 'crosschain' if target.database.upper() == var('GLOBAL_PROD_DB_NAME').upper() else 'crosschain_dev' }}"
schema: silver

View File

@ -29,7 +29,7 @@ SELECT
FROM
{{ source(
"bronze_streamline",
"testnet_blocks"
"testnet_blocks_v2"
) }}
s
JOIN meta b

View File

@ -3,40 +3,12 @@
tags = ['bronze_core']
) }}
WITH meta AS (
SELECT
registered_on AS _inserted_timestamp,
file_name,
CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 4), '_', 1) AS INTEGER) AS partition_key
FROM
TABLE(
information_schema.external_table_files(
table_name => '{{ source( "bronze_streamline", "testnet_blocks") }}'
)
) A
)
SELECT
s.*,
b.file_name,
b._inserted_timestamp,
COALESCE(
s.value :"BLOCK_NUMBER" :: STRING,
s.value :"block_number" :: STRING,
s.metadata :request :"data" :id :: STRING,
PARSE_JSON(
s.metadata :request :"data"
) :id :: STRING
) :: INT AS block_number
*
FROM
{{ source(
"bronze_streamline",
"testnet_blocks"
) }}
s
JOIN meta b
ON b.file_name = metadata$filename
AND b.partition_key = s.partition_key
WHERE
b.partition_key = s.partition_key
AND DATA :error IS NULL
AND DATA IS NOT NULL
{{ ref('bronze_testnet__blocks_fr_v2') }}
UNION ALL
SELECT
*
FROM
{{ ref('bronze_testnet__blocks_fr_v1') }}

View File

@ -0,0 +1,42 @@
{{ config (
materialized = 'view',
tags = ['bronze_core']
) }}
WITH meta AS (
SELECT
registered_on AS _inserted_timestamp,
file_name,
CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 4), '_', 1) AS INTEGER) AS partition_key
FROM
TABLE(
information_schema.external_table_files(
table_name => '{{ source( "bronze_streamline", "testnet_blocks") }}'
)
) A
)
SELECT
s.*,
b.file_name,
b._inserted_timestamp,
COALESCE(
s.value :"BLOCK_NUMBER" :: STRING,
s.value :"block_number" :: STRING,
s.metadata :request :"data" :id :: STRING,
PARSE_JSON(
s.metadata :request :"data"
) :id :: STRING
) :: INT AS block_number
FROM
{{ source(
"bronze_streamline",
"testnet_blocks"
) }}
s
JOIN meta b
ON b.file_name = metadata$filename
AND b.partition_key = s.partition_key
WHERE
b.partition_key = s.partition_key
AND DATA :error IS NULL
AND DATA IS NOT NULL

View File

@ -0,0 +1,42 @@
{{ config (
materialized = 'view',
tags = ['bronze_core']
) }}
WITH meta AS (
SELECT
registered_on AS _inserted_timestamp,
file_name,
CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 4), '_', 1) AS INTEGER) AS partition_key
FROM
TABLE(
information_schema.external_table_files(
table_name => '{{ source( "bronze_streamline", "testnet_blocks_v2") }}'
)
) A
)
SELECT
s.*,
b.file_name,
b._inserted_timestamp,
COALESCE(
s.value :"BLOCK_NUMBER" :: STRING,
s.value :"block_number" :: STRING,
s.metadata :request :"data" :id :: STRING,
PARSE_JSON(
s.metadata :request :"data"
) :id :: STRING
) :: INT AS block_number
FROM
{{ source(
"bronze_streamline",
"testnet_blocks_v2"
) }}
s
JOIN meta b
ON b.file_name = metadata$filename
AND b.partition_key = s.partition_key
WHERE
b.partition_key = s.partition_key
AND DATA :error IS NULL
AND DATA IS NOT NULL

View File

@ -29,7 +29,7 @@ SELECT
FROM
{{ source(
"bronze_streamline",
"testnet_transactions"
"testnet_transactions_v2"
) }}
s
JOIN meta b

View File

@ -3,40 +3,12 @@
tags = ['bronze_core']
) }}
WITH meta AS (
SELECT
registered_on AS _inserted_timestamp,
file_name,
CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 4), '_', 1) AS INTEGER) AS partition_key
FROM
TABLE(
information_schema.external_table_files(
table_name => '{{ source( "bronze_streamline", "testnet_transactions") }}'
)
) A
)
SELECT
s.*,
b.file_name,
b._inserted_timestamp,
COALESCE(
s.value :"BLOCK_NUMBER" :: STRING,
s.value :"block_number" :: STRING,
s.metadata :request :"data" :id :: STRING,
PARSE_JSON(
s.metadata :request :"data"
) :id :: STRING
) :: INT AS block_number
*
FROM
{{ source(
"bronze_streamline",
"testnet_transactions"
) }}
s
JOIN meta b
ON b.file_name = metadata$filename
AND b.partition_key = s.partition_key
WHERE
b.partition_key = s.partition_key
AND DATA :error IS NULL
AND DATA IS NOT NULL
{{ ref('bronze_testnet__transactions_fr_v2') }}
UNION ALL
SELECT
*
FROM
{{ ref('bronze_testnet__transactions_fr_v1') }}

View File

@ -0,0 +1,42 @@
{{ config (
materialized = 'view',
tags = ['bronze_core']
) }}
WITH meta AS (
SELECT
registered_on AS _inserted_timestamp,
file_name,
CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 4), '_', 1) AS INTEGER) AS partition_key
FROM
TABLE(
information_schema.external_table_files(
table_name => '{{ source( "bronze_streamline", "testnet_transactions") }}'
)
) A
)
SELECT
s.*,
b.file_name,
b._inserted_timestamp,
COALESCE(
s.value :"BLOCK_NUMBER" :: STRING,
s.value :"block_number" :: STRING,
s.metadata :request :"data" :id :: STRING,
PARSE_JSON(
s.metadata :request :"data"
) :id :: STRING
) :: INT AS block_number
FROM
{{ source(
"bronze_streamline",
"testnet_transactions"
) }}
s
JOIN meta b
ON b.file_name = metadata$filename
AND b.partition_key = s.partition_key
WHERE
b.partition_key = s.partition_key
AND DATA :error IS NULL
AND DATA IS NOT NULL

View File

@ -0,0 +1,42 @@
{{ config (
materialized = 'view',
tags = ['bronze_core']
) }}
WITH meta AS (
SELECT
registered_on AS _inserted_timestamp,
file_name,
CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 4), '_', 1) AS INTEGER) AS partition_key
FROM
TABLE(
information_schema.external_table_files(
table_name => '{{ source( "bronze_streamline", "testnet_transactions_v2") }}'
)
) A
)
SELECT
s.*,
b.file_name,
b._inserted_timestamp,
COALESCE(
s.value :"BLOCK_NUMBER" :: STRING,
s.value :"block_number" :: STRING,
s.metadata :request :"data" :id :: STRING,
PARSE_JSON(
s.metadata :request :"data"
) :id :: STRING
) :: INT AS block_number
FROM
{{ source(
"bronze_streamline",
"testnet_transactions_v2"
) }}
s
JOIN meta b
ON b.file_name = metadata$filename
AND b.partition_key = s.partition_key
WHERE
b.partition_key = s.partition_key
AND DATA :error IS NULL
AND DATA IS NOT NULL

View File

@ -0,0 +1,55 @@
{% set node_secret_path = var("GLOBAL_NODE_SECRET_PATH") %}
{{ config (
materialized = "view",
post_hook = fsc_utils.if_data_call_function_v2(
func = 'streamline.udf_bulk_rest_api_v2',
target = "{{this.schema}}.{{this.identifier}}",
params ={ "external_table" :"testnet_blocks",
"sql_limit" :"2000000",
"producer_batch_size" :"7200",
"worker_batch_size" :"1800",
"sql_source" :"{{this.identifier}}",
"async_concurrent_requests" :"1",
"exploded_key": tojson(["result"]) }
),
tags = ['streamline_testnet_history']
) }}
WITH to_do AS (
SELECT block_number
FROM {{ ref("streamline__testnet_blocks") }}
EXCEPT
SELECT block_number
FROM {{ ref("streamline__testnet_blocks_complete") }}
),
ready_blocks AS (
SELECT block_number
FROM to_do
where block_number < (select block_number from {{ ref("_testnet_block_lookback") }})
)
SELECT
block_number,
ROUND(block_number, -3) AS partition_key,
live.udf_api(
'POST',
'{Service}/{Authentication}',
OBJECT_CONSTRUCT(
'Content-Type', 'application/json',
'fsc-quantum-state', 'streamline'
),
OBJECT_CONSTRUCT(
'id', block_number,
'jsonrpc', '2.0',
'method', 'eth_getBlockByNumber',
'params', ARRAY_CONSTRUCT(utils.udf_int_to_hex(block_number), FALSE)
),
'{{ node_secret_path }}'
) AS request
FROM
ready_blocks
ORDER BY block_number desc
LIMIT
2000000

View File

@ -5,13 +5,13 @@
post_hook = fsc_utils.if_data_call_function_v2(
func = 'streamline.udf_bulk_rest_api_v2',
target = "{{this.schema}}.{{this.identifier}}",
params ={ "external_table" :"testnet_blocks_transactions",
params ={ "external_table" :"testnet_transactions",
"sql_limit" :"2000000",
"producer_batch_size" :"7200",
"worker_batch_size" :"1800",
"sql_source" :"{{this.identifier}}",
"async_concurrent_requests" :"1",
"exploded_key": tojson(["result", "result.transactions"]) }
"exploded_key": tojson(["result.transactions"]) }
),
tags = ['streamline_testnet_history']
) }}
@ -21,8 +21,7 @@ WITH to_do AS (
FROM {{ ref("streamline__testnet_blocks") }}
EXCEPT
SELECT block_number
FROM {{ ref("streamline__testnet_blocks_complete") }} b
INNER JOIN {{ ref("streamline__testnet_transactions_complete") }} t USING(block_number)
FROM {{ ref("streamline__testnet_transactions_complete") }}
),
ready_blocks AS (
SELECT block_number

View File

@ -0,0 +1,61 @@
{% set node_secret_path = var("GLOBAL_NODE_SECRET_PATH") %}
{{ config (
materialized = "view",
post_hook = fsc_utils.if_data_call_function_v2(
func = 'streamline.udf_bulk_rest_api_v2',
target = "{{this.schema}}.{{this.identifier}}",
params ={ "external_table" :"testnet_blocks",
"sql_limit" :"14400",
"producer_batch_size" :"3600",
"worker_batch_size" :"1800",
"sql_source" :"{{this.identifier}}",
"exploded_key": tojson(["result"]) }
),
tags = ['streamline_testnet_realtime']
) }}
WITH last_3_days AS (
SELECT block_number
FROM {{ ref("_testnet_block_lookback") }}
),
to_do AS (
SELECT block_number
FROM {{ ref("streamline__testnet_blocks") }}
WHERE block_number IS NOT NULL
AND block_number >= (SELECT block_number FROM last_3_days)
EXCEPT
SELECT block_number
FROM {{ ref("streamline__testnet_blocks_complete") }}
WHERE 1=1
AND block_number >= (SELECT block_number FROM last_3_days)
),
ready_blocks AS (
SELECT block_number
FROM to_do
)
SELECT
block_number,
ROUND(block_number, -3) AS partition_key,
live.udf_api(
'POST',
'{Service}/{Authentication}',
OBJECT_CONSTRUCT(
'Content-Type', 'application/json',
'fsc-quantum-state', 'streamline'
),
OBJECT_CONSTRUCT(
'id', block_number,
'jsonrpc', '2.0',
'method', 'eth_getBlockByNumber',
'params', ARRAY_CONSTRUCT(utils.udf_int_to_hex(block_number), FALSE)
),
'{{ node_secret_path }}'
) AS request
FROM
ready_blocks
ORDER BY block_number desc
LIMIT
14400

View File

@ -58,4 +58,4 @@ FROM
ORDER BY block_number desc
LIMIT
7200
14400

View File

@ -58,4 +58,4 @@ FROM
ORDER BY block_number desc
LIMIT
7200
14400

View File

@ -5,12 +5,12 @@
post_hook = fsc_utils.if_data_call_function_v2(
func = 'streamline.udf_bulk_rest_api_v2',
target = "{{this.schema}}.{{this.identifier}}",
params ={ "external_table" :"testnet_blocks_transactions",
params ={ "external_table" :"testnet_transactions",
"sql_limit" :"14400",
"producer_batch_size" :"3600",
"worker_batch_size" :"1800",
"sql_source" :"{{this.identifier}}",
"exploded_key": tojson(["result", "result.transactions"]) }
"exploded_key": tojson(["result.transactions"]) }
),
tags = ['streamline_testnet_realtime']
) }}
@ -26,8 +26,7 @@ to_do AS (
AND block_number >= (SELECT block_number FROM last_3_days)
EXCEPT
SELECT block_number
FROM {{ ref("streamline__testnet_blocks_complete") }} b
INNER JOIN {{ ref("streamline__testnet_transactions_complete") }} t USING(block_number)
FROM {{ ref("streamline__testnet_transactions_complete") }}
WHERE 1=1
AND block_number >= (SELECT block_number FROM last_3_days)
),
@ -59,4 +58,4 @@ FROM
ORDER BY block_number desc
LIMIT
7200
14400