added native transfers models and its downstream core metrics models

This commit is contained in:
Mike Stepanovic 2024-10-31 15:24:26 -06:00
parent 4625c197f2
commit 938c776fdc
10 changed files with 660 additions and 0 deletions

46
.github/workflows/dbt_run_non_core.yml vendored Normal file
View File

@ -0,0 +1,46 @@
name: dbt_run_non_core
run-name: dbt_run_non_core
on:
workflow_dispatch:
# schedule:
# # Runs "at minute 7 and 37, every hour" (see https://crontab.guru)
# - cron: '10 * * * *'
env:
USE_VARS: "${{ vars.USE_VARS }}"
DBT_PROFILES_DIR: "${{ vars.DBT_PROFILES_DIR }}"
DBT_VERSION: "${{ vars.DBT_VERSION }}"
ACCOUNT: "${{ vars.ACCOUNT }}"
ROLE: "${{ vars.ROLE }}"
USER: "${{ vars.USER }}"
PASSWORD: "${{ secrets.PASSWORD }}"
REGION: "${{ vars.REGION }}"
DATABASE: "${{ vars.DATABASE }}"
WAREHOUSE: "${{ vars.WAREHOUSE }}"
SCHEMA: "${{ vars.SCHEMA }}"
concurrency:
group: ${{ github.workflow }}
jobs:
run_dbt_jobs:
runs-on: ubuntu-latest
environment:
name: workflow_prod
steps:
- uses: actions/checkout@v3
- uses: actions/setup-python@v4
with:
python-version: "${{ vars.PYTHON_VERSION }}"
cache: "pip"
- name: install dependencies
run: |
pip install -r requirements.txt
dbt deps
- name: Run DBT Jobs
run: |
dbt run -m "aleo_models,tag:noncore"

View File

@ -0,0 +1,5 @@
{% docs transfer_type %}
The type of transfer, namely public or private.
{% enddocs %}

View File

@ -0,0 +1,45 @@
{{ config(
materialized = 'incremental',
incremental_predicates = ["COALESCE(DBT_INTERNAL_DEST.block_timestamp::DATE,'2099-12-31') >= (select min(block_timestamp::DATE) from " ~ generate_tmp_view_name(this) ~ ")"],
unique_key = ['transition_id'],
incremental_strategy = 'merge',
merge_exclude_columns = ["inserted_timestamp"],
cluster_by = ['block_timestamp::DATE'],
post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION ON EQUALITY(tx_id,sender,receiver);",
tags = ['core','full_test']
) }}
SELECT
block_id,
block_timestamp,
tx_id,
tx_succeeded,
transition_id,
transfer_type,
sender,
receiver,
amount,
currency,
{{ dbt_utils.generate_surrogate_key(
['tx_id','transition_id','transfer_type']
) }} AS fact_transfers_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp,
'{{ invocation_id }}' AS _invocation_id
FROM
{{ ref('silver__native_transfers') }}
{% if is_incremental() %}
WHERE
modified_timestamp >= DATEADD(
'minute',
-5,(
SELECT
MAX(
modified_timestamp
)
FROM
{{ this }}
)
)
{% endif %}

View File

@ -0,0 +1,47 @@
version: 2
models:
- name: core__fact_transfers
description: Records of all wallet to wallet transfers on Aleo.
columns:
- name: BLOCK_ID
description: "{{ doc('block_id') }}"
tests:
- dbt_expectations.expect_column_to_exist
- name: BLOCK_TIMESTAMP
description: "{{ doc('block_timestamp') }}"
tests:
- dbt_expectations.expect_column_to_exist
- name: TX_ID
description: "{{ doc('tx_id') }}"
tests:
- dbt_expectations.expect_column_to_exist
- name: TX_SUCCEEDED
description: "{{ doc('tx_succeeded') }}"
tests:
- dbt_expectations.expect_column_to_exist
- name: TRANSFER_TYPE
description: "{{ doc('transfer_type') }}"
tests:
- dbt_expectations.expect_column_to_exist
- name: SENDER
description: "Address that tokens are transferred from."
tests:
- dbt_expectations.expect_column_to_exist
- name: RECEIVER
description: "Address that tokens are transferred to."
tests:
- dbt_expectations.expect_column_to_exist
- name: AMOUNT
description: "Number of tokens transferred."
tests:
- dbt_expectations.expect_column_to_exist
- name: CURRENCY
description: "Currency of the transfer."
tests:
- dbt_expectations.expect_column_to_exist
- name: FACT_TRANSFERS_ID
description: '{{ doc("pk") }}'
- name: INSERTED_TIMESTAMP
description: '{{ doc("inserted_timestamp") }}'
- name: MODIFIED_TIMESTAMP
description: '{{ doc("modified_timestamp") }}'

View File

@ -0,0 +1,26 @@
{{ config(
materialized = 'incremental',
incremental_strategy = 'delete+insert',
unique_key = "block_timestamp_hour",
cluster_by = ['block_timestamp_hour::DATE'],
tags = ['noncore','recent_test'],
meta ={ 'database_tags':{ 'table':{ 'PURPOSE': 'STATS, METRICS, CORE, HOURLY',
}} }
) }}
SELECT
block_timestamp_hour,
block_id_min,
block_id_max,
block_count,
transaction_count,
transaction_count_success,
transaction_count_failed,
unique_from_count,
unique_to_count,
total_fees AS total_fees_native,
core_metrics_hourly_id AS ez_core_metrics_hourly_id,
inserted_timestamp,
modified_timestamp
FROM
{{ ref('silver_stats__core_metrics_hourly') }}

View File

@ -0,0 +1,102 @@
version: 2
models:
- name: stats__ez_core_metrics_hourly
description: 'Hourly core metrics for the Aleo blockchain.'
tests:
- dbt_utils.unique_combination_of_columns:
combination_of_columns:
- BLOCK_TIMESTAMP_HOUR
columns:
- name: BLOCK_TIMESTAMP_HOUR
description: "Truncated timestamp of a block."
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- TIMESTAMP_LTZ
- TIMESTAMP_NTZ
- name: BLOCK_ID_MIN
description: "Minimum block ID in an hour."
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- NUMBER
- FLOAT
- name: BLOCK_ID_MAX
description: "Maximum block ID in an hour."
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- NUMBER
- FLOAT
- name: BLOCK_COUNT
description: "Number of blocks in an hour."
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- NUMBER
- FLOAT
- name: TRANSACTION_COUNT
description: "Number of transactions in an hour."
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- NUMBER
- FLOAT
- name: TRANSACTION_COUNT_SUCCESS
description: "Number of successful transactions in an hour."
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- NUMBER
- FLOAT
- name: TRANSACTION_COUNT_FAILED
description: "Number of failed transactions in an hour."
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- NUMBER
- FLOAT
- name: UNIQUE_FROM_COUNT
description: "Number of unique senders in an hour."
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- NUMBER
- FLOAT
- name: UNIQUE_TO_COUNT
description: "Number of unique receivers in an hour."
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- NUMBER
- FLOAT
- name: TOTAL_FEES_NATIVE
description: "Total fees in native currency for an hour."
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- DECIMAL
- FLOAT
- NUMBER
- name: TOTAL_FEES_USD
description: "Total fees in USD for an hour."
- name: EZ_CORE_METRICS_HOURLY_ID
description: '{{ doc("pk") }}'
- name: INSERTED_TIMESTAMP
description: '{{ doc("inserted_timestamp") }}'
tests:
- dbt_expectations.expect_row_values_to_have_recent_data:
datepart: day
interval: 1
- name: MODIFIED_TIMESTAMP
description: '{{ doc("modified_timestamp") }}'

View File

@ -0,0 +1,127 @@
{{ config(
materialized = 'incremental',
incremental_predicates = ["COALESCE(DBT_INTERNAL_DEST.block_timestamp::DATE,'2099-12-31') >= (select min(block_timestamp::DATE) from " ~ generate_tmp_view_name(this) ~ ")"],
unique_key = ['transition_id'],
incremental_strategy = 'merge',
merge_exclude_columns = ['inserted_timestamp'],
cluster_by = ['block_timestamp::DATE'],
tags = ['core', 'recent_test']
) }}
WITH base AS (
SELECT
block_id,
block_timestamp,
tx_id,
transition_id,
outputs,
function,
succeeded,
inserted_timestamp,
modified_timestamp,
invocation_id
FROM
{{ ref('silver__transitions') }}
WHERE
program_id = 'credits.aleo'
AND function IN (
'transfer_public',
'transfer_private',
'transfer_public_as_signer',
'transfer_private_to_public',
'transfer_public_to_private'
)
),
output_args AS (
SELECT
block_id,
block_timestamp,
tx_id,
transition_id,
function,
succeeded,
REGEXP_SUBSTR(
outputs[array_size(outputs)-1] :value :: STRING,
'arguments:\\s*\\[(.*?)\\]',
1,
1,
'sie'
) as args_string,
inserted_timestamp,
modified_timestamp,
invocation_id
FROM
base
),
output_args_cleaned AS (
SELECT
block_id,
block_timestamp,
tx_id,
transition_id,
function,
succeeded,
SPLIT(
REGEXP_REPLACE(
REGEXP_REPLACE(
REGEXP_REPLACE(args_string, '\\s+', ''),
'\\[|\\]',
''
),
'u64$',
''
),
','
) as args_array,
inserted_timestamp,
modified_timestamp,
invocation_id
FROM output_args
),
mapped_transfers AS (
SELECT
block_id,
block_timestamp,
tx_id,
transition_id,
succeeded,
function,
CASE
WHEN function IN ('transfer_public', 'transfer_public_as_signer') THEN args_array[0]
WHEN function = 'transfer_private_to_public' THEN null
WHEN function = 'transfer_public_to_private' THEN args_array[0]
WHEN function = 'transfer_private' THEN null
END :: STRING as transfer_from,
CASE
WHEN function IN ('transfer_public', 'transfer_public_as_signer') THEN args_array[1]
WHEN function = 'transfer_private_to_public' THEN args_array[0]
WHEN function = 'transfer_public_to_private' THEN null
WHEN function = 'transfer_private' THEN null
END :: STRING as transfer_to,
CASE
WHEN function IN ('transfer_public', 'transfer_public_as_signer') THEN args_array[2]
WHEN function = 'transfer_private_to_public' THEN args_array[1]
WHEN function = 'transfer_public_to_private' THEN args_array[1]
WHEN function = 'transfer_private' THEN null
END :: STRING as amount,
inserted_timestamp,
modified_timestamp,
invocation_id
FROM output_args_cleaned
)
select
block_id,
block_timestamp,
tx_id,
transition_id,
succeeded as tx_succeeded,
function as transfer_type,
transfer_from as sender,
transfer_to as receiver,
REPLACE(amount, 'u64', '') :: INT as amount,
'aleo_credits' as currency,
inserted_timestamp,
modified_timestamp,
invocation_id
from
mapped_transfers

View File

@ -0,0 +1,96 @@
version: 2
models:
- name: silver__native_transfers
description: Records of native token transfers on Aleo between wallets
tests:
- dbt_utils.unique_combination_of_columns:
combination_of_columns:
- TX_ID
- TRANSITION_ID
- CURRENCY
columns:
- name: BLOCK_ID
description: "{{ doc('block_id') }}"
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- NUMBER
- FLOAT
- name: BLOCK_TIMESTAMP
description: "{{ doc('block_timestamp') }}"
tests:
- not_null:
where: inserted_timestamp < dateadd('hour', -1, SYSDATE())
- dbt_expectations.expect_row_values_to_have_recent_data:
datepart: day
interval: 1
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- TIMESTAMP_NTZ
- name: TX_ID
description: "{{ doc('tx_id') }}"
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- STRING
- VARCHAR
- name: TRANSITION_ID
description: "{{ doc('transition_id') }}"
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- STRING
- VARCHAR
- name: TX_SUCCEEDED
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- BOOLEAN
- name: TRANSFER_TYPE
description: "{{ doc('transfer_type') }}"
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- STRING
- VARCHAR
- name: SENDER
description: "Address that tokens are transferred from."
tests:
- not_null:
where: transfer_type IN ('transfer_public', 'transfer_public_as_signer', 'transfer_public_to_private')
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- STRING
- VARCHAR
- name: RECEIVER
description: "Address that tokens are transferred to."
tests:
- not_null:
where: transfer_type IN ('transfer_public', 'transfer_public_as_signer', 'transfer_private_to_public')
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- STRING
- VARCHAR
- name: AMOUNT
description: "Number of tokens transferred."
tests:
- not_null:
where: transfer_type != 'transfer_private'
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- NUMBER
- FLOAT
- name: CURRENCY
description: "Currency of the transfer."
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- STRING
- VARCHAR

View File

@ -0,0 +1,82 @@
{{ config(
materialized = 'view',
tags = ['noncore','recent_test']
) }}
/* run incremental timestamp value first then use it as a static value */
{% if execute %}
{% if is_incremental() %}
{% set query %}
SELECT
MIN(DATE_TRUNC('hour', block_timestamp)) block_timestamp_hour
FROM
{{ ref('silver__transitions') }}
WHERE
inserted_timestamp >= (
SELECT
MAX(inserted_timestamp)
FROM
{{ this }}
) {% endset %}
{% set min_block_timestamp_hour = run_query(query).columns [0].values() [0] %}
{% endif %}
{% endif %}
SELECT
DATE_TRUNC('hour', ts.block_timestamp) AS block_timestamp_hour,
MIN(block_id) AS block_id_min,
MAX(block_id) AS block_id_max,
COUNT(
DISTINCT block_id
) AS block_count,
COUNT(
DISTINCT tx_id
) AS transaction_count,
COUNT(
DISTINCT CASE
WHEN succeeded THEN tx_id
END
) AS transaction_count_success,
COUNT(
DISTINCT CASE
WHEN NOT succeeded THEN tx_id
END
) AS transaction_count_failed,
COUNT(
DISTINCT tf.sender
) AS unique_from_count,
COUNT(
DISTINCT tf.receiver
) AS unique_to_count,
SUM(f.fee) AS total_fees,
MAX(ts.inserted_timestamp) AS _inserted_timestamp,
{{ dbt_utils.generate_surrogate_key(
['block_timestamp_hour']
) }} AS core_metrics_hourly_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp,
'{{ invocation_id }}' AS _invocation_id
FROM
{{ ref('silver__transitions') }} ts
JOIN
{{ref ('silver__native_transfers')}} tf
USING(tx_id)
JOIN
{{ ref('silver__transitions_fee') }} f
USING(tx_id)
WHERE
DATE_TRUNC('hour', ts.block_timestamp) < DATE_TRUNC(
'hour',
CURRENT_TIMESTAMP
)
{% if is_incremental() %}
AND DATE_TRUNC(
'hour',
ts.block_timestamp
) >= '{{ min_block_timestamp_hour }}'
{% endif %}
GROUP BY
1

View File

@ -0,0 +1,84 @@
version: 2
models:
- name: silver_stats__core_metrics_hourly
tests:
- dbt_utils.unique_combination_of_columns:
combination_of_columns:
- BLOCK_TIMESTAMP_HOUR
columns:
- name: BLOCK_TIMESTAMP_HOUR
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- TIMESTAMP_LTZ
- TIMESTAMP_NTZ
- name: BLOCK_ID_MIN
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- NUMBER
- FLOAT
- name: BLOCK_ID_MAX
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- NUMBER
- FLOAT
- name: BLOCK_COUNT
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- NUMBER
- FLOAT
- name: TRANSACTION_COUNT
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- NUMBER
- FLOAT
- name: TRANSACTION_COUNT_SUCCESS
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- NUMBER
- FLOAT
- name: TRANSACTION_COUNT_FAILED
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- NUMBER
- FLOAT
- name: UNIQUE_FROM_COUNT
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- NUMBER
- FLOAT
- name: UNIQUE_TO_COUNT
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- NUMBER
- FLOAT
- name: TOTAL_FEES
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- DECIMAL
- FLOAT
- NUMBER
- name: _INSERTED_TIMESTAMP
tests:
- dbt_expectations.expect_row_values_to_have_recent_data:
datepart: day
interval: 1