Merge main, keeping streamline files

This commit is contained in:
Mike Stepanovic 2025-03-07 13:23:28 -07:00
commit 7232f76c08
6 changed files with 280 additions and 15 deletions

View File

@ -0,0 +1,41 @@
{{ config (
materialized = "incremental",
incremental_strategy = 'merge',
unique_key = "block_number",
cluster_by = "ROUND(block_number, -3)",
merge_exclude_columns = ["inserted_timestamp"],
post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION on equality(block_number)"
) }}
-- depends_on: {{ ref('bronze__blocks_tx') }}
SELECT
DATA :block_height :: INT AS block_number,
DATA :block_timestamp :: bigint AS block_timestamp,
DATA :first_version :: bigint AS first_version,
DATA :last_version :: bigint AS last_version,
last_version - first_version + 1 AS tx_count_from_versions,
{{ dbt_utils.generate_surrogate_key(
['block_number']
) }} AS blocks_tx_complete_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp,
file_name,
'{{ invocation_id }}' AS _invocation_id
FROM
{% if is_incremental() %}
{{ ref('bronze__blocks_tx') }}
WHERE
inserted_timestamp >= (
SELECT
MAX(modified_timestamp)
FROM
{{ this }}
)
{% else %}
{{ ref('bronze__blocks_tx_FR') }}
{% endif %}
qualify(ROW_NUMBER() over (PARTITION BY block_number
ORDER BY
inserted_timestamp DESC)) = 1

View File

@ -0,0 +1,42 @@
{{ config (
materialized = "incremental",
incremental_strategy = 'merge',
unique_key = ['block_number','multiplier_no'],
cluster_by = "ROUND(block_number, -3)",
merge_exclude_columns = ["inserted_timestamp"],
post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION on equality(block_number)"
) }}
-- depends_on: {{ ref('bronze__transactions') }}
SELECT
A.value :BLOCK_NUMBER :: INT AS block_number,
A.value :MULTIPLIER :: INT AS multiplier_no,
{{ dbt_utils.generate_surrogate_key(
['block_number','multiplier_no']
) }} AS transactions_complete_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp,
file_name,
'{{ invocation_id }}' AS _invocation_id
FROM
{% if is_incremental() %}
{{ ref('bronze__transactions') }}
{% else %}
{{ ref('bronze__transactions_FR') }}
{% endif %}
A
{% if is_incremental() %}
WHERE
A.inserted_timestamp >= (
SELECT
COALESCE(MAX(modified_timestamp), '1970-01-01' :: DATE)
FROM
{{ this }})
{% endif %}
qualify(ROW_NUMBER() over (PARTITION BY block_number
ORDER BY
A.inserted_timestamp DESC)) = 1

View File

@ -0,0 +1,49 @@
{{ config (
materialized = "view",
post_hook = fsc_utils.if_data_call_function_v2(
func = 'streamline.udf_bulk_rest_api_v2',
target = "{{this.schema}}.{{this.identifier}}",
params ={ "external_table" :"blocks_tx",
"sql_limit" :"10000",
"producer_batch_size" :"5000",
"worker_batch_size" :"5000",
"sql_source" :"{{this.identifier}}",
"order_by_column": "block_number" }
),
tags = ['streamline_core_realtime']
) }}
WITH blocks AS (
SELECT
block_number
FROM
{{ ref('streamline__blocks') }}
EXCEPT
SELECT
block_number
FROM
{{ ref('streamline__blocks_tx_complete') }}
)
SELECT
block_number,
ROUND(
block_number,
-4
) :: INT AS partition_key,
{{ target.database }}.live.udf_api(
'GET',
'{Service}/v1/blocks/by_height/' || block_number || '?with_transactions=false',
OBJECT_CONSTRUCT(
'Content-Type',
'application/json',
'User-Agent',
'Flipside_Crypto/0.1'
),
PARSE_JSON('{}'),
'Vault/prod/movement/mainnet'
) AS request
FROM
blocks
ORDER BY
block_number

View File

@ -0,0 +1,111 @@
{{ config (
materialized = "view",
post_hook = fsc_utils.if_data_call_function_v2(
func = 'streamline.udf_bulk_rest_api_v2',
target = "{{this.schema}}.{{this.identifier}}",
params ={ "external_table" :"transactions",
"sql_limit" :"10000",
"producer_batch_size" :"5000",
"worker_batch_size" :"5000",
"sql_source" :"{{this.identifier}}",
"exploded_key": tojson(["data"]),
"order_by_column": "block_number" }
),
tags = ['streamline_core_realtime']
) }}
WITH blocks AS (
SELECT
A.block_number,
tx_count_from_versions AS tx_count,
first_version,
last_version,
block_timestamp
FROM
{{ ref('streamline__blocks_tx_complete') }} A
WHERE
block_number <> 0
),
numbers AS (
SELECT
1 AS n
UNION ALL
SELECT
n + 1
FROM
numbers
WHERE
n < (
SELECT
CEIL(MAX(tx_count) / 100.0)
FROM
blocks)
),
blocks_with_page_numbers AS (
SELECT
tt.block_number,
n.n - 1 AS multiplier,
first_version,
last_version,
tx_count,
block_timestamp
FROM
blocks tt
JOIN numbers n
ON n.n <= CASE
WHEN tt.tx_count % 100 = 0 THEN tt.tx_count / 100
ELSE FLOOR(
tt.tx_count / 100
) + 1
END
),
WORK AS (
SELECT
A.block_number,
block_timestamp,
first_version,
last_version,
first_version +(
100 * multiplier
) AS tx_version,
multiplier,
LEAST (
tx_count - 100 * multiplier,
100
) AS lim,
tx_count
FROM
blocks_with_page_numbers A
LEFT JOIN {{ ref('streamline__transactions_complete') }}
b
ON A.block_number = b.block_number
AND multiplier = b.multiplier_no
WHERE
b.block_number IS NULL
)
SELECT
block_number,
block_timestamp,
first_version,
last_version,
tx_version,
multiplier,
ROUND(
block_number,
-4
) :: INT AS partition_key,
{{ target.database }}.live.udf_api(
'GET',
'{Service}/v1/transactions?start=' || tx_version || '&limit=' || lim,
OBJECT_CONSTRUCT(
'Content-Type',
'application/json',
'User-Agent',
'Flipside_Crypto/0.1'
),
PARSE_JSON('{}'),
'Vault/prod/movement/mainnet'
) AS request
FROM
WORK

View File

@ -0,0 +1,22 @@
{{ config (
materialized = "view",
tags = ['streamline_view']
) }}
SELECT
0 AS block_number
UNION ALL
SELECT
_id AS block_number
FROM
{{ source(
'crosschain_silver',
'number_sequence'
) }}
WHERE
_id <= (
SELECT
MAX(block_number)
FROM
{{ ref('streamline__chainhead') }}
)

View File

@ -1,16 +1,16 @@
packages:
- package: calogica/dbt_expectations
version: 0.8.5
- package: dbt-labs/dbt_utils
version: 1.0.0
- git: https://github.com/FlipsideCrypto/fsc-utils.git
revision: eb33ac727af26ebc8a8cc9711d4a6ebc3790a107
- package: get-select/dbt_snowflake_query_tags
version: 2.5.0
- package: Snowflake-Labs/dbt_constraints
version: 0.6.3
- package: calogica/dbt_date
version: 0.7.2
- git: https://github.com/FlipsideCrypto/livequery-models.git
revision: b024188be4e9c6bc00ed77797ebdc92d351d620e
sha1_hash: d3219b9c206b5988189dcdafae0ec22ca9b4056c
- package: calogica/dbt_expectations
version: 0.8.5
- package: dbt-labs/dbt_utils
version: 1.0.0
- git: https://github.com/FlipsideCrypto/fsc-utils.git
revision: d3cf679e079f0cf06142de9386f215e55fe26b3b
- package: get-select/dbt_snowflake_query_tags
version: 2.5.0
- package: Snowflake-Labs/dbt_constraints
version: 1.0.4
- package: calogica/dbt_date
version: 0.7.2
- git: https://github.com/FlipsideCrypto/livequery-models.git
revision: b024188be4e9c6bc00ed77797ebdc92d351d620e
sha1_hash: f14e55a0ab40f81e4341c5413a5b3d6e566ef058