add stats and daily schedule

This commit is contained in:
Eric Laurello 2025-02-18 10:17:22 -05:00
parent 2762666a9f
commit fdf5d855f2
8 changed files with 410 additions and 3 deletions

46
.github/workflows/dbt_run_daily.yml vendored Normal file
View File

@ -0,0 +1,46 @@
name: dbt_run_daily
run-name: dbt_run_daily
on:
workflow_dispatch:
schedule:
# once daily at 2:15 AM UTC
- cron: "15 2 * * *"
env:
DBT_PROFILES_DIR: "${{ vars.DBT_PROFILES_DIR }}"
ACCOUNT: "${{ vars.ACCOUNT }}"
ROLE: "${{ vars.ROLE }}"
USER: "${{ vars.USER }}"
PASSWORD: "${{ secrets.PASSWORD }}"
REGION: "${{ vars.REGION }}"
DATABASE: "${{ vars.DATABASE }}"
WAREHOUSE: "${{ vars.WAREHOUSE }}"
SCHEMA: "${{ vars.SCHEMA }}"
concurrency:
group: ${{ github.workflow }}
jobs:
run_dbt_jobs:
runs-on: ubuntu-latest
environment:
name: workflow_prod
steps:
- uses: actions/checkout@v3
- uses: actions/setup-python@v4
with:
python-version: "${{ vars.PYTHON_VERSION }}"
cache: "pip"
- name: install dependencies
run: |
pip install -r requirements.txt
dbt deps
- name: Run DBT Jobs
run: |
dbt run -m "stellar_models,tag:scheduled_daily"

View File

@ -0,0 +1,60 @@
{% docs ez_core_metrics_hourly_table_doc %}
A convenience table that aggregates block and transaction related metrics using various aggregate functions such as SUM, COUNT, MIN and MAX from the fact_transactions table, on an hourly basis. Stats for the current hour will be updated as new data arrives.
{% enddocs %}
{% docs block_timestamp_hour %}
The hour of the timestamp of the block.
{% enddocs %}
{% docs block_number_min %}
The minimum block number in the hour.
{% enddocs %}
{% docs block_number_max %}
The maximum block number in the hour.
{% enddocs %}
{% docs block_count %}
The number of blocks in the hour.
{% enddocs %}
{% docs transaction_count_success %}
The number of successful transactions in the hour.
{% enddocs %}
{% docs transaction_count_failed %}
The number of failed transactions in the hour.
{% enddocs %}
{% docs total_fees_native %}
The sum of all fees in the hour, in Lamports.
{% enddocs %}
{% docs total_fees_usd %}
The sum of all fees in the hour, in USD.
{% enddocs %}
{% docs unique_accounts_count %}
The count of unique first signers for transactions in the hour.
{% enddocs %}

View File

@ -4,7 +4,7 @@
incremental_strategy = 'merge',
merge_exclude_columns = ["inserted_timestamp"],
post_hook = ["ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION ON EQUALITY(address)", "DELETE FROM {{ this }} WHERE address in (select address from {{ ref('silver__labels') }} where _is_deleted = TRUE);",],
tags = ['scheduled_core']
tags = ['scheduled_daily']
) }}
SELECT

View File

@ -0,0 +1,50 @@
{{ config(
materialized = 'incremental',
incremental_strategy = 'merge',
unique_key = 'block_timestamp_hour',
cluster_by = ['block_timestamp_hour'],
meta ={ 'database_tags':{ 'table':{ 'PURPOSE': 'STATS, METRICS, CORE, HOURLY',
}} },
tags = ['scheduled_daily','stats']
) }}
SELECT
block_timestamp_hour,
block_number_min,
block_number_max,
block_count,
transaction_count,
transaction_count_success,
transaction_count_failed,
unique_accounts_count,
total_fees AS total_fees_native,
ROUND(
(
total_fees / pow(
10,
7
)
) * p.price,
2
) AS total_fees_usd,
core_metrics_hourly_id AS ez_core_metrics_hourly_id,
s.inserted_timestamp AS inserted_timestamp,
s.modified_timestamp AS modified_timestamp
FROM
{{ ref('silver__core_metrics_hourly') }}
s
LEFT JOIN {{ ref('silver__complete_native_prices') }}
p
ON s.block_timestamp_hour = p.hour
{% if is_incremental() %}
WHERE
s.modified_timestamp >= (
SELECT
MAX(
modified_timestamp
)
FROM
{{ this }}
)
{% endif %}

View File

@ -0,0 +1,32 @@
version: 2
models:
- name: stats__ez_core_metrics_hourly
description: '{{ doc("ez_core_metrics_hourly_table_doc") }}'
columns:
- name: BLOCK_TIMESTAMP_HOUR
description: '{{ doc("block_timestamp_hour") }}'
- name: BLOCK_NUMBER_MIN
description: '{{ doc("block_number_min") }}'
- name: BLOCK_NUMBER_MAX
description: '{{ doc("block_number_max") }}'
- name: BLOCK_COUNT
description: '{{ doc("block_count") }}'
- name: TRANSACTION_COUNT
description: '{{ doc("transaction_count") }}'
- name: TRANSACTION_COUNT_SUCCESS
description: '{{ doc("transaction_count_success") }}'
- name: TRANSACTION_COUNT_FAILED
description: '{{ doc("transaction_count_failed") }}'
- name: UNIQUE_ACCOUNTS_COUNT
description: '{{ doc("unique_accounts_count") }}'
- name: TOTAL_FEES_NATIVE
description: '{{ doc("total_fees_native") }}'
- name: TOTAL_FEES_USD
description: '{{ doc("total_fees_usd") }}'
- name: EZ_CORE_METRICS_HOURLY_ID
description: '{{ doc("pk") }}'
- name: INSERTED_TIMESTAMP
description: '{{ doc("inserted_timestamp") }}'
- name: MODIFIED_TIMESTAMP
description: '{{ doc("modified_timestamp") }}'

View File

@ -1,8 +1,7 @@
{{ config(
materialized = 'view',
tags = ['scheduled_core']
tags = ['scheduled_daily']
) }}
{# TODO move to non core when available #}
SELECT
blockchain,

View File

@ -0,0 +1,141 @@
{{ config(
materialized = 'incremental',
incremental_strategy = 'delete+insert',
unique_key = "block_timestamp_hour",
cluster_by = ['block_timestamp_hour::DATE'],
tags = ['scheduled_daily','stats']
) }}
{% if execute %}
{% if is_incremental() %}
{% set query %}
SELECT
DATE_TRUNC('hour', MIN(block_timestamp_hour)) AS block_timestamp_hour
FROM
(
SELECT
MIN(block_timestamp) block_timestamp_hour
FROM
{{ ref('core__fact_ledgers') }}
WHERE
modified_timestamp >= (
SELECT
MAX(modified_timestamp)
FROM
{{ this }}
)
UNION ALL
SELECT
MIN(block_timestamp) block_timestamp_hour
FROM
{{ ref('core__fact_transactions') }}
WHERE
modified_timestamp >= (
SELECT
MAX(modified_timestamp)
FROM
{{ this }}
)
) {% endset %}
{% set min_block_timestamp_hour = run_query(query).columns [0].values() [0] %}
{% endif %}
{% if not min_block_timestamp_hour or min_block_timestamp_hour == 'None' %}
{% set min_block_timestamp_hour = '2099-01-01' %}
{% endif %}
{% endif %}
WITH blcks AS (
SELECT
DATE_TRUNC(
'hour',
block_timestamp
) AS block_timestamp_hour,
MIN(SEQUENCE) AS block_number_min,
MAX(SEQUENCE) AS block_number_max,
COUNT(1) AS block_count,
SUM(transaction_count) AS transaction_count
FROM
{{ ref('core__fact_ledgers') }}
WHERE
block_timestamp_hour < DATE_TRUNC('hour', CURRENT_TIMESTAMP)
{% if is_incremental() %}
AND DATE_TRUNC(
'hour',
block_timestamp
) >= '{{ min_block_timestamp_hour }}'
{% endif %}
GROUP BY
1
),
txs AS (
SELECT
DATE_TRUNC(
'hour',
block_timestamp
) AS block_timestamp_hour,
COUNT(
DISTINCT CASE
WHEN SUCCESSFUL THEN id
END
) AS transaction_count_success,
COUNT(
DISTINCT CASE
WHEN NOT SUCCESSFUL THEN id
END
) AS transaction_count_failed,
COUNT(
DISTINCT account
) AS unique_accounts_count,
SUM(fee_charged) AS total_fees,
{{ dbt_utils.generate_surrogate_key(['block_timestamp_hour']) }} AS core_metrics_hourly_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp,
'{{ invocation_id }}' AS _invocation_id
FROM
{{ ref('core__fact_transactions') }}
WHERE
block_timestamp_hour < DATE_TRUNC('hour', CURRENT_TIMESTAMP)
{% if is_incremental() %}
AND DATE_TRUNC(
'hour',
block_timestamp
) >= '{{ min_block_timestamp_hour }}'
{% endif %}
GROUP BY
1
)
SELECT
A.block_timestamp_hour,
A.block_number_min,
A.block_number_max,
A.block_count,
A.transaction_count,
COALESCE(
b.transaction_count_success,
0
) AS transaction_count_success,
COALESCE(
b.transaction_count_failed,
0
) AS transaction_count_failed,
COALESCE(
b.unique_accounts_count,
0
) AS unique_accounts_count,
COALESCE(
b.total_fees,
0
) AS total_fees,
{{ dbt_utils.generate_surrogate_key(['a.block_timestamp_hour']) }} AS core_metrics_hourly_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp,
'{{ invocation_id }}' AS _invocation_id
FROM
blcks A
LEFT JOIN txs b
ON A.block_timestamp_hour = b.block_timestamp_hour

View File

@ -0,0 +1,79 @@
version: 2
models:
- name: silver__core_metrics_hourly
tests:
- dbt_utils.unique_combination_of_columns:
combination_of_columns:
- BLOCK_TIMESTAMP_HOUR
columns:
- name: BLOCK_TIMESTAMP_HOUR
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- TIMESTAMP_LTZ
- TIMESTAMP_NTZ
- name: BLOCK_NUMBER_MIN
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- NUMBER
- FLOAT
- name: BLOCK_NUMBER_MAX
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- NUMBER
- FLOAT
- name: BLOCK_COUNT
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- NUMBER
- FLOAT
- name: TRANSACTION_COUNT
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- NUMBER
- FLOAT
- name: TRANSACTION_COUNT_SUCCESS
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- NUMBER
- FLOAT
- name: TRANSACTION_COUNT_FAILED
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- NUMBER
- FLOAT
- name: TOTAL_FEES
tests:
- not_null:
where:
block_timestamp_hour::date > '2020-03-16' /* ignore first set of metrics */
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- DECIMAL
- FLOAT
- NUMBER
- name: UNIQUE_ACCOUNTS_COUNT
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- NUMBER
- FLOAT
- name: INSERTED_TIMESTAMP
tests:
- dbt_expectations.expect_row_values_to_have_recent_data:
datepart: day
interval: 1