add streamline views

This commit is contained in:
Mike Stepanovic 2025-03-05 12:56:21 -07:00
parent 418e4cfca0
commit dc1aafa116
12 changed files with 358 additions and 0 deletions

View File

@ -0,0 +1,7 @@
{{ config (
materialized = 'view'
) }}
{{ streamline_external_table_query_v2(
model = "blocks_tx",
partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 4), '_', 1) AS INTEGER )"
) }}

View File

@ -0,0 +1,7 @@
{{ config (
materialized = 'view'
) }}
{{ streamline_external_table_FR_query_v2(
model = "blocks_tx",
partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 4), '_', 1) AS INTEGER )"
) }}

View File

@ -0,0 +1,7 @@
{{ config (
materialized = 'view'
) }}
{{ streamline_external_table_query_v2(
model = "transactions",
partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 4), '_', 1) AS INTEGER )"
) }}

View File

@ -0,0 +1,7 @@
{{ config (
materialized = 'view'
) }}
{{ streamline_external_table_FR_query_v2(
model = "transactions",
partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 4), '_', 1) AS INTEGER )"
) }}

35
models/sources.yml Normal file
View File

@ -0,0 +1,35 @@
version: 2
sources:
- name: crosschain
database: "{{ 'crosschain' if target.database == 'MOVEMENT' else 'crosschain_dev' }}"
schema: core
tables:
- name: dim_date_hours
- name: address_tags
- name: dim_dates
- name: crosschain_silver
database: "{{ 'crosschain' if target.database == 'MOVEMENT' else 'crosschain_dev' }}"
schema: silver
tables:
- name: number_sequence
- name: bronze_streamline
database: streamline
schema: |
{{ "MOVEMENT_DEV" if var("STREAMLINE_USE_DEV_FOR_EXTERNAL_TABLES", False) else "MOVEMENT" }}
tables:
- name: blocks_tx
loaded_at_field: INSERTED_TIMESTAMP
freshness:
warn_after: {count: 2, period: hour}
error_after: {count: 4, period: hour}
- name: transactions
loaded_at_field: INSERTED_TIMESTAMP
freshness:
warn_after: {count: 2, period: hour}
error_after: {count: 4, period: hour}
- name: github_actions
database: movement
schema: github_actions
tables:
- name: workflows

View File

@ -0,0 +1,41 @@
{{ config (
materialized = "incremental",
incremental_strategy = 'merge',
unique_key = "block_number",
cluster_by = "ROUND(block_number, -3)",
merge_exclude_columns = ["inserted_timestamp"],
post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION on equality(block_number)"
) }}
-- depends_on: {{ ref('bronze__blocks_tx') }}
SELECT
DATA :block_height :: INT AS block_number,
ARRAY_SIZE(
DATA :transactions
) AS tx_count_from_transactions_array,
DATA :last_version :: bigint - DATA :first_version :: bigint + 1 AS tx_count_from_versions,
{{ dbt_utils.generate_surrogate_key(
['block_number']
) }} AS blocks_tx_complete_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp,
file_name,
'{{ invocation_id }}' AS _invocation_id
FROM
{% if is_incremental() %}
{{ ref('bronze__blocks_tx') }}
WHERE
inserted_timestamp >= (
SELECT
MAX(modified_timestamp)
FROM
{{ this }}
)
{% else %}
{{ ref('bronze__blocks_tx_FR') }}
{% endif %}
qualify(ROW_NUMBER() over (PARTITION BY block_number
ORDER BY
inserted_timestamp DESC)) = 1

View File

@ -0,0 +1,46 @@
{{ config (
materialized = "incremental",
incremental_strategy = 'merge',
unique_key = "transactions_complete_id",
cluster_by = "ROUND(block_number, -3)",
merge_exclude_columns = ["inserted_timestamp"],
post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION on equality(block_number)"
) }}
-- depends_on: {{ ref('bronze__transactions') }}
SELECT
A.value :BLOCK_NUMBER :: INT AS block_number,
A.value :MULTIPLIER :: INT AS multiplier_no,
{{ dbt_utils.generate_surrogate_key(
['block_number','multiplier_no']
) }} AS transactions_complete_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp,
file_name,
'{{ invocation_id }}' AS _invocation_id
FROM
{% if is_incremental() %}
{{ ref('bronze__transactions') }}
{% else %}
{{ ref('bronze__transactions_FR') }}
{% endif %}
A
JOIN {{ ref('silver__blocks') }}
b
ON DATA [0] :version :: INT BETWEEN b.first_version
AND b.last_version
{% if is_incremental() %}
WHERE
A.inserted_timestamp >= (
SELECT
COALESCE(MAX(modified_timestamp), '1970-01-01' :: DATE)
FROM
{{ this }})
{% endif %}
qualify(ROW_NUMBER() over (PARTITION BY block_number
ORDER BY
A.inserted_timestamp DESC)) = 1

View File

@ -0,0 +1,27 @@
{{ config (
materialized = "ephemeral",
unique_key = "block_number",
) }}
WITH base AS (
SELECT
block_timestamp :: DATE AS block_date,
MAX(block_number) block_number
FROM
{{ ref("silver__blocks") }}
GROUP BY
block_timestamp :: DATE
)
SELECT
block_date,
block_number
FROM
base
WHERE
block_date <> (
SELECT
MAX(block_date)
FROM
base
)

View File

@ -0,0 +1,49 @@
{{ config (
materialized = "view",
post_hook = fsc_utils.if_data_call_function_v2(
func = 'streamline.udf_bulk_rest_api_v2',
target = "{{this.schema}}.{{this.identifier}}",
params ={ "external_table" :"blocks_tx",
"sql_limit" :"8000",
"producer_batch_size" :"50",
"worker_batch_size" :"50",
"sql_source" :"{{this.identifier}}" }
)
) }}
WITH blocks AS (
SELECT
block_number
FROM
{{ ref('streamline__blocks') }}
WHERE
block_number != 0
EXCEPT
SELECT
block_number
FROM
{{ ref('streamline__blocks_tx_complete') }}
)
SELECT
block_number,
ROUND(
block_number,
-4
) :: INT AS partition_key,
{{ target.database }}.live.udf_api(
'GET',
'{Service}/v1/blocks/by_height/' || block_number || '?with_transactions=true',
OBJECT_CONSTRUCT(
'Content-Type',
'application/json',
'User-Agent',
'Flipside_Crypto/0.1'
),
PARSE_JSON('{}'),
'Vault/prod/movement/mainnet'
) AS request
FROM
blocks
ORDER BY
block_number

View File

@ -0,0 +1,93 @@
{{ config (
materialized = "view",
post_hook = fsc_utils.if_data_call_function_v2(
func = 'streamline.udf_bulk_rest_api_v2',
target = "{{this.schema}}.{{this.identifier}}",
params ={ "external_table" :"transactions",
"sql_limit" :"1000",
"producer_batch_size" :"50",
"worker_batch_size" :"50",
"sql_source" :"{{this.identifier}}" }
)
) }}
WITH blocks AS (
SELECT
A.block_number,
tx_count_from_versions AS tx_count,
first_version AS version_start
FROM
{{ ref('silver__blocks') }} A
),
numbers AS (
SELECT
1 AS n
UNION ALL
SELECT
n + 1
FROM
numbers
WHERE
n < (
SELECT
CEIL(MAX(tx_count) / 100.0)
FROM
blocks)
),
blocks_with_page_numbers AS (
SELECT
tt.block_number :: INT AS block_number,
n.n - 1 AS multiplier,
version_start,
tx_count
FROM
blocks tt
JOIN numbers n
ON n.n <= CASE
WHEN tt.tx_count % 100 = 0 THEN tt.tx_count / 100
ELSE FLOOR(
tt.tx_count / 100
) + 1
END
),
WORK AS (
SELECT
A.block_number,
version_start +(
100 * multiplier
) AS tx_version,
multiplier
FROM
blocks_with_page_numbers A
LEFT JOIN {{ ref('streamline__transactions_complete') }}
b
ON A.block_number = b.block_number
AND multiplier = b.multiplier_no
WHERE
b.block_number IS NULL
)
SELECT
tx_version,
ROUND(
tx_version,
-4
) :: INT AS partition_key,
{{ target.database }}.live.udf_api(
'GET',
'{Service}/v1/transactions?start=' || tx_version || '&limit=100',
OBJECT_CONSTRUCT(
'Content-Type',
'application/json',
'User-Agent',
'Flipside_Crypto/0.1'
),
PARSE_JSON('{}'),
'Vault/prod/movement/mainnet'
) AS request,
block_number,
multiplier
FROM
WORK
ORDER BY
block_number

View File

@ -0,0 +1,19 @@
{{ config (
materialized = "view",
tags = ['streamline_view']
) }}
SELECT
_id AS block_number
FROM
{{ source(
'crosschain_silver',
'number_sequence'
) }}
WHERE
_id <= (
SELECT
MAX(block_number)
FROM
{{ ref('streamline__chainhead') }}
)

View File

@ -0,0 +1,20 @@
{{ config (
materialized = "view",
tags = ['streamline_view']
) }}
SELECT
{{ target.database }}.live.udf_api(
'GET',
'{Service}/v1',
OBJECT_CONSTRUCT(
'Content-Type',
'application/json',
'fsc-quantum-state',
'livequery',
'User-Agent',
'Flipside_Crypto/0.1'
),
OBJECT_CONSTRUCT(),
'Vault/prod/movement/mainnet'
) :data :block_height :: INT AS block_number