remove the join to blocks - get rid of stats and observe and github for now

This commit is contained in:
Eric Laurello 2025-03-07 10:23:42 -05:00
parent 240ea2a579
commit 21bf5a43de
19 changed files with 112 additions and 609 deletions

View File

@ -1,6 +0,0 @@
{{ config(
materialized = 'view',
tags = ['gha_tasks']
) }}
{{ fsc_utils.gha_task_current_status_view() }}

View File

@ -1,17 +0,0 @@
version: 2
models:
- name: github_actions__current_task_status
columns:
- name: PIPELINE_ACTIVE
tests:
- dbt_expectations.expect_column_values_to_be_in_set:
value_set:
- TRUE
- name: SUCCESSES
tests:
- dbt_expectations.expect_column_values_to_be_in_set:
value_set:
- 204
config:
severity: warn
warn_if: ">0"

View File

@ -1,5 +0,0 @@
{{ config(
materialized = 'view'
) }}
{{ fsc_utils.gha_task_history_view() }}

View File

@ -1,5 +0,0 @@
{{ config(
materialized = 'view'
) }}
{{ fsc_utils.gha_task_performance_view() }}

View File

@ -1,5 +0,0 @@
{{ config(
materialized = 'view'
) }}
{{ fsc_utils.gha_task_schedule_view() }}

View File

@ -1,5 +0,0 @@
{{ config(
materialized = 'view'
) }}
{{ fsc_utils.gha_tasks_view() }}

View File

@ -2,7 +2,7 @@
materialized = 'incremental',
unique_key = ['fact_blocks_id'],
incremental_strategy = 'merge',
incremental_predicates = ["dynamic_range_predicate","block_timestamp::DATE"],
incremental_predicates = ["dynamic_range_predicate","block_timestamp::DATE"],
merge_exclude_columns = ['inserted_timestamp'],
cluster_by = ['block_timestamp::DATE'],
tags = ['core']
@ -17,9 +17,7 @@ SELECT
tx_count_from_versions AS tx_count,
{{ dbt_utils.generate_surrogate_key(['block_number']) }} AS fact_blocks_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp,
'{{ invocation_id }}' AS _invocation_id
SYSDATE() AS modified_timestamp
FROM
{{ ref(
'silver__blocks'

View File

@ -2,7 +2,7 @@
materialized = 'incremental',
unique_key = ['tx_hash','change_index'],
incremental_strategy = 'merge',
incremental_predicates = ["dynamic_range_predicate","block_timestamp::DATE"],
incremental_predicates = ["dynamic_range_predicate","block_timestamp::DATE"],
merge_exclude_columns = ["inserted_timestamp"],
cluster_by = ['block_timestamp::DATE','modified_timestamp::DATE'],
post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION ON EQUALITY(version,tx_hash,change_type,inner_change_type,change_address,change_module,change_resource,payload_function);",
@ -10,33 +10,41 @@
) }}
SELECT
block_number,
block_timestamp,
A.block_number,
A.block_timestamp,
A.tx_hash,
version,
tx_hash,
success,
tx_type,
payload_function,
change_index,
change_data,
change_type,
address,
handle,
inner_change_type,
change_address,
change_module,
change_resource,
key,
VALUE,
state_key_hash,
A.tx_type,
A.payload_function,
b.index AS change_index,
b.value :data :data AS change_data,
b.value :type :: STRING AS change_type,
b.value :address :: STRING AS address,
b.value :handle :: STRING AS handle,
b.value :data: "type" :: STRING AS inner_change_type,
SPLIT_PART(
inner_change_type,
'::',
1
) :: STRING AS change_address,
SPLIT_PART(
inner_change_type,
'::',
2
) :: STRING AS change_module,
SUBSTRING(inner_change_type, len(change_address) + len(change_module) + 5) AS change_resource,
b.value :key :: STRING AS key,
b.value :value :: STRING AS VALUE,
b.value :state_key_hash :: STRING AS state_key_hash,
{{ dbt_utils.generate_surrogate_key(['tx_hash','change_index']) }} AS fact_changes_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp,
'{{ invocation_id }}' AS _invocation_id
SYSDATE() AS modified_timestamp
FROM
{{ ref(
'silver__changes'
) }}
'core__fact_transactions'
) }} A,
LATERAL FLATTEN (changes) b
{% if is_incremental() %}
WHERE

View File

@ -2,7 +2,7 @@
materialized = 'incremental',
unique_key = ['tx_hash','version','event_index'],
incremental_strategy = 'merge',
incremental_predicates = ["dynamic_range_predicate","block_timestamp::DATE"],
incremental_predicates = ["dynamic_range_predicate","block_timestamp::DATE"],
merge_exclude_columns = ['inserted_timestamp'],
cluster_by = ['block_timestamp::DATE'],
post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION ON EQUALITY(version,tx_hash,event_type,event_address,event_module,event_resource,payload_function);",
@ -10,34 +10,47 @@
) }}
SELECT
block_number,
block_timestamp,
A.block_number,
A.block_timestamp,
A.tx_hash,
version,
tx_hash,
success,
tx_type,
payload_function,
event_index,
event_type,
event_address,
event_module,
event_resource,
event_data,
account_address,
creation_number,
sequence_number,
A.tx_type,
A.payload_function,
b.index AS event_index,
b.value :type :: STRING AS event_type,
SPLIT_PART(
event_type,
'::',
1
) :: STRING AS event_address,
SPLIT_PART(
event_type,
'::',
2
) :: STRING AS event_module,
SPLIT_PART(
event_type,
'::',
3
) :: STRING AS event_resource,
b.value :data AS event_data,
-- b.value :guid :: STRING AS event_guid, -- extract into account_address + creation_number
b.value :guid :account_address :: STRING AS account_address,
b.value :guid :creation_number :: bigint AS creation_number,
b.value :sequence_number :: bigint AS sequence_number,
{{ dbt_utils.generate_surrogate_key(['tx_hash','version','event_index']) }} AS fact_events_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp,
'{{ invocation_id }}' AS _invocation_id
SYSDATE() AS modified_timestamp
FROM
{{ ref(
'silver__events'
) }}
'core__fact_transactions'
) }} A,
LATERAL FLATTEN (events) b
{% if is_incremental() %}
WHERE
modified_timestamp >= (
A.modified_timestamp >= (
SELECT
MAX(modified_timestamp)
FROM

View File

@ -10,7 +10,7 @@
) }}
SELECT
b.block_number,
A.block_number,
A.block_timestamp,
A.version,
A.tx_hash,
@ -40,19 +40,10 @@ FROM
{{ ref(
'silver__transactions'
) }} A
JOIN {{ ref(
'silver__blocks'
) }}
b
ON A.version BETWEEN b.first_version
AND b.last_version
{% if is_incremental() %}
WHERE
GREATEST(
A.modified_timestamp,
b.modified_timestamp
) >= (
A.modified_timestamp >= (
SELECT
MAX(modified_timestamp)
FROM

View File

@ -2,7 +2,7 @@
materialized = 'incremental',
unique_key = ['tx_hash','block_timestamp::DATE'],
incremental_strategy = 'merge',
incremental_predicates = ["dynamic_range_predicate","block_timestamp::DATE"],
incremental_predicates = ["dynamic_range_predicate","block_timestamp::DATE"],
merge_exclude_columns = ['inserted_timestamp'],
cluster_by = ['block_timestamp::DATE'],
post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION ON EQUALITY(version,tx_hash);",
@ -10,7 +10,7 @@
) }}
SELECT
b.block_number,
A.block_number,
A.block_timestamp,
A.version,
A.tx_hash,
@ -26,7 +26,7 @@ SELECT
A.id,
A.previous_block_votes_bitvec,
A.proposer,
A.ROUND,
A.round,
A.vm_status,
A.state_change_hash,
A.accumulator_root_hash,
@ -39,11 +39,7 @@ SELECT
FROM
{{ ref(
'silver__transactions'
) }} A
JOIN {{ ref('silver__blocks') }}
b
ON A.version BETWEEN b.first_version
AND b.last_version
) }}
WHERE
LEFT(
tx_type,
@ -51,10 +47,7 @@ WHERE
) = 'block'
{% if is_incremental() %}
AND GREATEST(
A.modified_timestamp,
b.modified_timestamp
) >= (
AND A.modified_timestamp >= (
SELECT
MAX(modified_timestamp)
FROM

View File

@ -1,26 +0,0 @@
{{ config(
materialized = 'view',
persist_docs ={ "relation": true,
"columns": true },
meta ={ 'database_tags':{ 'table':{ 'PURPOSE': 'STATS, METRICS, CORE, HOURLY' } } },
tags = ['noncore']
) }}
SELECT
block_timestamp_hour,
block_number_min,
block_number_max,
block_count,
transaction_count,
transaction_count_success,
transaction_count_failed,
unique_sender_count,
unique_payload_function_count,
total_fees AS total_fees_native,
NULL AS total_fees_usd,
core_metrics_hourly_id AS ez_core_metrics_hourly_id,
s.inserted_timestamp AS inserted_timestamp,
s.modified_timestamp AS modified_timestamp
FROM
{{ ref('silver__core_metrics_hourly') }}
s

View File

@ -1,169 +0,0 @@
{{ config(
materialized = 'incremental',
unique_key = 'test_timestamp',
full_refresh = false,
tags = ['observability'],
enabled = true
) }}
WITH summary_stats AS (
SELECT
MIN(block_number) AS min_block,
MAX(block_number) AS max_block,
MIN(block_timestamp) AS min_block_timestamp,
MAX(block_timestamp) AS max_block_timestamp,
COUNT(1) AS blocks_tested
FROM
{{ ref('core__fact_blocks') }}
WHERE
block_timestamp <= DATEADD('hour', -12, CURRENT_TIMESTAMP())
{% if is_incremental() %}
AND (
block_number >= (
SELECT
MIN(block_number)
FROM
(
SELECT
MIN(block_number) AS block_number
FROM
{{ ref('core__fact_blocks') }}
WHERE
block_timestamp BETWEEN DATEADD('hour', -96, CURRENT_TIMESTAMP())
AND DATEADD('hour', -95, CURRENT_TIMESTAMP())
UNION
SELECT
MIN(VALUE) - 1 AS block_number
FROM
(
SELECT
blocks_impacted_array
FROM
{{ this }}
qualify ROW_NUMBER() over (
ORDER BY
test_timestamp DESC
) = 1
),
LATERAL FLATTEN(
input => blocks_impacted_array
)
)
) {% if var('OBSERV_FULL_TEST') %}
OR block_number >= 0
{% endif %}
)
{% endif %}
),
block_range AS (
SELECT
_id AS block_number
FROM
{{ source(
'crosschain_silver',
'number_sequence'
) }}
WHERE
_id BETWEEN (
SELECT
min_block
FROM
summary_stats
)
AND (
SELECT
max_block
FROM
summary_stats
)
),
blocks AS (
SELECT
l.block_number,
block_timestamp,
LAG(
l.block_number,
1
) over (
ORDER BY
l.block_number ASC
) AS prev_block_number
FROM
{{ ref("core__fact_blocks") }}
l
INNER JOIN block_range b
ON l.block_number = b.block_number
AND l.block_number >= (
SELECT
MIN(block_number)
FROM
block_range
)
),
block_gen AS (
SELECT
_id AS block_number
FROM
{{ source(
'crosschain_silver',
'number_sequence'
) }}
WHERE
_id BETWEEN (
SELECT
MIN(block_number)
FROM
blocks
)
AND (
SELECT
MAX(block_number)
FROM
blocks
)
)
SELECT
'blocks' AS test_name,
MIN(
b.block_number
) AS min_block,
MAX(
b.block_number
) AS max_block,
MIN(
b.block_timestamp
) AS min_block_timestamp,
MAX(
b.block_timestamp
) AS max_block_timestamp,
COUNT(1) AS blocks_tested,
COUNT(
CASE
WHEN C.block_number IS NOT NULL THEN A.block_number
END
) AS blocks_impacted_count,
ARRAY_AGG(
CASE
WHEN C.block_number IS NOT NULL THEN A.block_number
END
) within GROUP (
ORDER BY
A.block_number
) AS blocks_impacted_array,
SYSDATE() AS test_timestamp,
SYSDATE() AS modified_timestamp
FROM
block_gen A
LEFT JOIN blocks b
ON A.block_number = b.block_number
LEFT JOIN blocks C
ON A.block_number > C.prev_block_number
AND A.block_number < C.block_number
AND C.block_number - C.prev_block_number <> 1
WHERE
COALESCE(
b.block_number,
C.block_number
) IS NOT NULL

View File

@ -1,138 +0,0 @@
{{ config(
materialized = 'incremental',
unique_key = 'test_timestamp',
full_refresh = false,
tags = ['observability']
) }}
WITH summary_stats AS (
SELECT
MIN(block_number) AS min_block,
MAX(block_number) AS max_block,
MIN(block_timestamp) AS min_block_timestamp,
MAX(block_timestamp) AS max_block_timestamp,
COUNT(1) AS blocks_tested
FROM
{{ ref('core__fact_blocks') }}
WHERE
block_timestamp <= DATEADD('hour', -12, CURRENT_TIMESTAMP())
{% if is_incremental() %}
AND (
block_number >= (
SELECT
MIN(block_number)
FROM
(
SELECT
MIN(block_number) AS block_number
FROM
{{ ref('core__fact_blocks') }}
WHERE
block_timestamp BETWEEN DATEADD('hour', -96, CURRENT_TIMESTAMP())
AND DATEADD('hour', -95, CURRENT_TIMESTAMP())
UNION
SELECT
MIN(VALUE) - 1 AS block_number
FROM
(
SELECT
blocks_impacted_array
FROM
{{ this }}
qualify ROW_NUMBER() over (
ORDER BY
test_timestamp DESC
) = 1
),
LATERAL FLATTEN(
input => blocks_impacted_array
)
)
) {% if var('OBSERV_FULL_TEST') %}
OR block_number >= 0
{% endif %}
)
{% endif %}
),
base_blocks AS (
SELECT
block_number,
tx_count AS transaction_count
FROM
{{ ref('core__fact_blocks') }}
WHERE
block_number BETWEEN (
SELECT
min_block
FROM
summary_stats
)
AND (
SELECT
max_block
FROM
summary_stats
)
),
actual_tx_counts AS (
SELECT
block_number,
COUNT(1) AS transaction_count
FROM
{{ ref('core__fact_transactions') }}
WHERE
block_number BETWEEN (
SELECT
min_block
FROM
summary_stats
)
AND (
SELECT
max_block
FROM
summary_stats
)
GROUP BY
block_number
),
potential_missing_txs AS (
SELECT
e.block_number
FROM
base_blocks e
LEFT OUTER JOIN actual_tx_counts A
ON e.block_number = A.block_number
WHERE
COALESCE(
A.transaction_count,
0
) <> e.transaction_count
),
impacted_blocks AS (
SELECT
COUNT(1) AS blocks_impacted_count,
ARRAY_AGG(block_number) within GROUP (
ORDER BY
block_number
) AS blocks_impacted_array
FROM
potential_missing_txs
)
SELECT
'transactions' AS test_name,
min_block,
max_block,
min_block_timestamp,
max_block_timestamp,
blocks_tested,
blocks_impacted_count,
blocks_impacted_array,
SYSDATE() AS test_timestamp,
SYSDATE() AS modified_timestamp
FROM
summary_stats
JOIN impacted_blocks
ON 1 = 1

View File

@ -19,9 +19,9 @@ WITH base AS (
) AS block_timestamp,
DATA :first_version :: bigint AS first_version,
DATA :last_version :: bigint AS last_version,
ARRAY_SIZE(
{# ARRAY_SIZE(
DATA :transactions
) AS tx_count_from_transactions_array,
) AS tx_count_from_transactions_array, #}
last_version - first_version + 1 AS tx_count_from_versions
FROM
{% if is_incremental() %}
@ -29,7 +29,7 @@ WITH base AS (
WHERE
inserted_timestamp >= (
SELECT
DATEADD('minute', -15, MAX(inserted_timestamp))
DATEADD('minute', -5, MAX(inserted_timestamp))
FROM
{{ this }})
AND
@ -49,7 +49,7 @@ SELECT
block_timestamp,
first_version,
last_version,
tx_count_from_transactions_array,
{# tx_count_from_transactions_array, #}
tx_count_from_versions,
{{ dbt_utils.generate_surrogate_key(
['block_number']

View File

@ -5,7 +5,8 @@
incremental_predicates = ["dynamic_range_predicate", "block_timestamp::DATE"],
merge_exclude_columns = ['inserted_timestamp'],
cluster_by = ['modified_timestamp::DATE'],
tags = ['core']
tags = ['core'],
enabled = false
) }}
SELECT

View File

@ -5,7 +5,8 @@
incremental_predicates = ["dynamic_range_predicate", "block_timestamp::DATE"],
merge_exclude_columns = ['inserted_timestamp'],
cluster_by = ['modified_timestamp::DATE'],
tags = ['core']
tags = ['core'],
enabled = false
) }}
SELECT

View File

@ -1,94 +1,50 @@
{{ config(
materialized = 'incremental',
unique_key = ['tx_hash', 'block_timestamp::DATE'],
incremental_strategy = 'merge',
incremental_predicates = ["dynamic_range_predicate", "block_timestamp::DATE"],
merge_exclude_columns = ['inserted_timestamp'],
cluster_by = ['modified_timestamp::DATE'],
tags = ['core']
materialized = 'incremental',
unique_key = ['tx_hash', 'block_timestamp::DATE'],
incremental_strategy = 'merge',
incremental_predicates = ["dynamic_range_predicate", "block_timestamp::DATE"],
merge_exclude_columns = ['inserted_timestamp'],
cluster_by = ['modified_timestamp::DATE'],
tags = ['core']
) }}
-- depends_on: {{ ref('bronze__blocks_tx') }}
-- depends_on: {{ ref('bronze__transactions') }}
WITH from_transactions AS (
WITH from_blocks AS (
SELECT
A.value :BLOCK_NUMBER :: bigint AS block_number,
TO_TIMESTAMP(
b.value :timestamp :: STRING
A.value :BLOCK_TIMESTAMP :: STRING
) AS block_timestamp,
b.value :hash :: STRING AS tx_hash,
b.value :version :: INT AS version,
b.value :type :: STRING AS tx_type,
b.value AS DATA,
b.data :hash :: STRING AS tx_hash,
b.data :version :: INT AS version,
b.data :type :: STRING AS tx_type,
A.data,
inserted_timestamp AS file_last_updated
FROM
{% if is_incremental() %}
{{ ref('bronze__blocks_tx') }}
{% else %}
{{ ref('bronze__blocks_tx_FR') }}
{% endif %}
A,
LATERAL FLATTEN (DATA :transactions) b
{% if is_incremental() %}
WHERE
A.inserted_timestamp >= (
SELECT
DATEADD('minute', -15, MAX(modified_timestamp))
FROM
{{ this }})
{% endif %}
),
from_transactions AS (
{% if is_incremental() %}
{{ ref('bronze__transactions') }}
{% else %}
{{ ref('bronze__transactions_FR') }}
{% endif %}
A
WHERE
version BETWEEN A.value :FIRST_VERSION :: bigint
AND A.value :LAST_VERSION :: bigint
{% if is_incremental() %}
AND A.inserted_timestamp >= (
SELECT
TO_TIMESTAMP(
b.value :timestamp :: STRING
) AS block_timestamp,
b.value :hash :: STRING AS tx_hash,
b.value :version :: INT AS version,
b.value :type :: STRING AS tx_type,
b.value AS DATA,
inserted_timestamp AS file_last_updated
DATEADD('minute', -5, MAX(modified_timestamp))
FROM
{% if is_incremental() %}
{{ ref('bronze__transactions') }}
{% else %}
{{ ref('bronze__transactions_FR') }}
{{ this }})
{% endif %}
A,
LATERAL FLATTEN(A.data) b
{% if is_incremental() %}
WHERE
A.inserted_timestamp >= (
SELECT
DATEADD('minute', -15, MAX(modified_timestamp))
FROM
{{ this }})
{% endif %}
),
combo AS (
SELECT
block_timestamp,
tx_hash,
version,
tx_type,
DATA,
file_last_updated
FROM
from_blocks
UNION ALL
SELECT
block_timestamp,
tx_hash,
version,
tx_type,
DATA,
file_last_updated
FROM
from_transactions A
),
transformed AS (
SELECT
block_number,
COALESCE(
block_timestamp,
'1970-01-01 00:00:00.000'
@ -120,9 +76,10 @@ transformed AS (
DATA,
file_last_updated
FROM
combo
from_transactions
)
SELECT
block_number,
block_timestamp,
tx_hash,
version,

View File

@ -1,83 +0,0 @@
{{ config(
materialized = 'incremental',
incremental_strategy = 'delete+insert',
unique_key = "block_timestamp_hour",
cluster_by = ['block_timestamp_hour::DATE'],
tags = ['noncore']
) }}
/* run incremental timestamp value first then use it as a static value */
{% if execute %}
{% if is_incremental() %}
{% set query %}
SELECT
MIN(DATE_TRUNC('hour', block_timestamp)) block_timestamp_hour
FROM
{{ ref('core__fact_transactions') }}
WHERE
modified_timestamp >= (
SELECT
MAX(modified_timestamp)
FROM
{{ this }}
) {% endset %}
{% set min_block_timestamp_hour = run_query(query).columns [0].values() [0] %}
{% endif %}
{% endif %}
SELECT
DATE_TRUNC(
'hour',
block_timestamp
) AS block_timestamp_hour,
MIN(block_number) :: FLOAT AS block_number_min,
MAX(block_number) :: FLOAT AS block_number_max,
COUNT(
DISTINCT block_number
) AS block_count,
COUNT(
DISTINCT tx_hash
) AS transaction_count,
COUNT(
DISTINCT CASE
WHEN success THEN tx_hash
END
) AS transaction_count_success,
COUNT(
DISTINCT CASE
WHEN NOT success THEN tx_hash
END
) AS transaction_count_failed,
COUNT(
DISTINCT sender
) AS unique_sender_count,
COUNT(
DISTINCT payload_function
) AS unique_payload_function_count,
SUM(COALESCE(gas_unit_price,0) * gas_used) AS total_fees,
-- in Octa = 10^-8 Aptos
{{ dbt_utils.generate_surrogate_key(
['block_timestamp_hour']
) }} AS core_metrics_hourly_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp,
'{{ invocation_id }}' AS _invocation_id
FROM
{{ ref('core__fact_transactions') }}
WHERE
block_timestamp_hour < DATE_TRUNC(
'hour',
CURRENT_TIMESTAMP
)
{% if is_incremental() %}
AND DATE_TRUNC(
'hour',
block_timestamp
) >= NULLIF(
'{{ min_block_timestamp_hour }}',
'None'
)
{% endif %}
GROUP BY
1