SO, cluster, fix silver accounts

This commit is contained in:
Eric Laurello 2025-02-07 10:46:07 -05:00
parent c73897dab3
commit fcaae0b77e
13 changed files with 132 additions and 92 deletions

View File

@ -1,18 +1,19 @@
-- depends_on: {{ ref('silver__assets') }}
{{ config(
materialized = 'incremental',
unique_key = ['dim_assets_id'],
unique_key = ['asset_id'],
incremental_strategy = 'merge',
merge_exclude_columns = ['inserted_timestamp'],
tags = ['core']
post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION ON EQUALITY(asset_code,asset_issuer,asset_id);",
tags = ['scheduled_core']
) }}
SELECT
id,
asset_id,
asset_type,
asset_code,
asset_issuer,
asset_id,
id,
{{ dbt_utils.generate_surrogate_key(
['asset_id']
) }} AS dim_assets_id,

View File

@ -8,7 +8,6 @@ models:
description: '{{ doc("id") }}'
tests:
- dbt_expectations.expect_column_to_exist
- not_null
- name: ASSET_TYPE
description: '{{ doc("asset_type") }}'

View File

@ -2,10 +2,11 @@
{{ config(
materialized = 'incremental',
unique_key = ["account_id","closed_at"],
incremental_predicates = ["dynamic_range_predicate", "closed_at::date"],
incremental_predicates = ["dynamic_range_predicate", "block_timestamp::date"],
merge_exclude_columns = ["inserted_timestamp"],
cluster_by = ['closed_at::DATE'],
tags = ['core']
cluster_by = ['block_timestamp::DATE','closed_at::DATE'],
post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION ON EQUALITY(account_id);",
tags = ['scheduled_core']
) }}
SELECT
@ -20,7 +21,7 @@ SELECT
home_domain,
master_weight,
threshold_low,
threshold_medium,
threshold_medium,
threshold_high,
last_modified_ledger,
ledger_entry_change,
@ -31,13 +32,13 @@ SELECT
sequence_ledger,
sequence_time,
closed_at,
closed_at as block_timestamp,
closed_at AS block_timestamp,
ledger_sequence,
{{ dbt_utils.generate_surrogate_key(['account_id', 'closed_at']) }} AS fact_accounts_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp,
'{{ invocation_id }}' AS _invocation_id
FROM
FROM
{{ ref('silver__accounts') }}
{% if is_incremental() %}
@ -48,4 +49,4 @@ WHERE
FROM
{{ this }}
)
{% endif %}
{% endif %}

View File

@ -1,21 +1,21 @@
-- depends_on: {{ ref('silver__ledgers') }}
{{ config(
materialized = 'incremental',
unique_key = ['fact_ledgers_id'],
incremental_predicates = ['DBT_INTERNAL_DEST.closed_at::DATE >= (select min(closed_at::DATE) from ' ~ generate_tmp_view_name(this) ~ ')'],
cluster_by = ['closed_at::DATE'],
unique_key = ['sequence'],
incremental_predicates = ["dynamic_range_predicate", "block_timestamp::date"],
cluster_by = ['block_timestamp::DATE','closed_at::DATE'],
merge_exclude_columns = ['inserted_timestamp'],
tags = ['core']
tags = ['scheduled_core']
) }}
SELECT
sequence,
SEQUENCE,
ledger_hash,
previous_ledger_hash,
transaction_count,
operation_count,
closed_at,
closed_at as block_timestamp,
closed_at AS block_timestamp,
id,
total_coins,
fee_pool,

View File

@ -77,10 +77,10 @@ models:
tests:
- dbt_expectations.expect_column_to_exist
- name: LEDGER_HEADER
description: '{{ doc("ledger_header") }}'
tests:
- dbt_expectations.expect_column_to_exist
# - name: LEDGER_HEADER
# description: '{{ doc("ledger_header") }}'
# tests:
# - dbt_expectations.expect_column_to_exist
- name: SUCCESSFUL_TRANSACTION_COUNT
description: '{{ doc("successful_transaction_count") }}'

View File

@ -2,15 +2,18 @@
{{ config(
materialized = 'incremental',
unique_key = ["liquidity_pool_id", "closed_at"],
incremental_predicates = ["dynamic_range_predicate", "closed_at::date"],
incremental_predicates = ["dynamic_range_predicate", "block_timestamp::date"],
merge_exclude_columns = ["inserted_timestamp"],
cluster_by = ['closed_at::DATE'],
tags = ['core'],
cluster_by = ['block_timestamp::DATE','closed_at::DATE'],
post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION ON EQUALITY(liquidity_pool_id,asset_a_issuer,asset_b_issuer,asset_a_code,asset_b_code);",
tags = ['scheduled_core']
) }}
SELECT
liquidity_pool_id,
type,
closed_at,
closed_at AS block_timestamp,
TYPE,
fee,
trustline_count,
pool_share_count,
@ -30,8 +33,6 @@ SELECT
batch_id,
batch_run_date,
batch_insert_ts,
closed_at,
closed_at as block_timestamp,
ledger_sequence,
_inserted_timestamp,
{{ dbt_utils.generate_surrogate_key(
@ -40,15 +41,22 @@ SELECT
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp,
'{{ invocation_id }}' AS _invocation_id
FROM
FROM
{{ ref('silver__liquidity_pools') }}
{% if is_incremental() %}
WHERE
_inserted_timestamp > (SELECT MAX(_inserted_timestamp) FROM {{ this }})
_inserted_timestamp > (
SELECT
MAX(_inserted_timestamp)
FROM
{{ this }}
)
{% endif %}
QUALIFY ROW_NUMBER() OVER (
PARTITION BY liquidity_pool_id, closed_at
ORDER BY _inserted_timestamp DESC
qualify ROW_NUMBER() over (
PARTITION BY liquidity_pool_id,
closed_at
ORDER BY
_inserted_timestamp DESC
) = 1

View File

@ -1,17 +1,21 @@
-- depends_on: {{ ref('silver__operations') }}
{{ config(
materialized = 'incremental',
unique_key = ["operations_id"],
incremental_predicates = ["dynamic_range_predicate", "closed_at::date"],
unique_key = ["id"],
incremental_predicates = ["dynamic_range_predicate", "block_timestamp::date"],
merge_exclude_columns = ["inserted_timestamp"],
cluster_by = ['closed_at::DATE'],
tags = ['core']
cluster_by = ['block_timestamp::DATE','closed_at::DATE','type_string'],
post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION ON EQUALITY(source_account,asset,asset_code,asset_issuer,buying_asset_code,buying_asset_issuer,selling_asset_code,selling_asset_issuer);",
tags = ['scheduled_core']
) }}
SELECT
SELECT
id,
closed_at,
closed_at AS block_timestamp,
account,
amount,
asset,
asset_code,
asset_issuer,
asset_type,
@ -22,17 +26,17 @@ SELECT
buying_asset_issuer,
buying_asset_type,
buying_asset_id,
"from",
"from" AS from_account,
funder,
high_threshold,
home_domain,
inflation_dest,
"into",
"limit",
"into" into_account,
"limit" AS limit_amount,
low_threshold,
master_key_weight,
med_threshold,
name,
NAME,
offer_id,
path,
price,
@ -53,11 +57,11 @@ SELECT
source_asset_id,
source_max,
starting_balance,
"to",
"to" AS to_account,
trustee,
trustor,
trustline_asset,
value,
VALUE,
clear_flags,
clear_flags_s,
destination_min,
@ -93,20 +97,19 @@ SELECT
source_account,
op_source_account_muxed,
transaction_id,
type,
TYPE,
type_string,
ledger_sequence,
op_account_muxed,
op_account_muxed_id,
ledger_key_hash,
closed_at,
closed_at as block_timestamp,
batch_id,
batch_run_date,
batch_insert_ts,
asset_balance_changes,
parameters,
PARAMETERS,
parameters_decoded,
function,
FUNCTION,
address,
soroban_operation_type,
extend_to,
@ -114,12 +117,38 @@ SELECT
contract_code_hash,
operation_result_code,
operation_trace_code,
begin_sponsor_muxed,
begin_sponsor_muxed_id,
claimable_balance_id,
claimant,
claimants,
claimant_muxed,
claimant_muxed_id,
data_account_id,
data_name,
details_json,
from_muxed,
from_muxed_id,
funder_muxed,
funder_muxed_id,
into_muxed,
into_muxed_id,
ledgers_to_expire,
op_account_id,
to_muxed,
to_muxed_id,
trustee_muxed,
trustee_muxed_id,
trustline_account_id,
trustor_muxed,
trustor_muxed_id,
_inserted_timestamp,
{{ dbt_utils.generate_surrogate_key(['id']) }} AS fact_operations_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp,
'{{ invocation_id }}' AS _invocation_id
FROM {{ ref('silver__operations') }}
FROM
{{ ref('silver__operations') }}
{% if is_incremental() %}
WHERE
@ -129,4 +158,4 @@ WHERE
FROM
{{ this }}
)
{% endif %}
{% endif %}

View File

@ -4,7 +4,7 @@ models:
- name: core__fact_operations
description: A comprehensive view of all operations executed on the Stellar network, including transaction details, operation types, and their outcomes.
columns:
- name: OP_ID
- name: ID
description: '{{ doc("op_id") }}'
tests:
- unique
@ -46,7 +46,7 @@ models:
- name: BUYING_ASSET_ID
description: '{{ doc("buying_asset_id") }}'
- name: FROM
- name: FROM_ACCOUNT
description: '{{ doc("from") }}'
- name: FUNDER
@ -61,10 +61,10 @@ models:
- name: INFLATION_DEST
description: '{{ doc("inflation_destination") }}'
- name: INTO
- name: INTO_ACCOUNT
description: '{{ doc("into") }}'
- name: LIMIT
- name: LIMIT_AMOUNT
description: '{{ doc("limit") }}'
- name: LOW_THRESHOLD
@ -139,7 +139,7 @@ models:
- name: STARTING_BALANCE
description: '{{ doc("starting_balance") }}'
- name: TO
- name: TO_ACCOUNT
description: '{{ doc("to") }}'
- name: TRUSTEE

View File

@ -1,17 +1,19 @@
--depends_on: {{ ref('silver__trades') }}
{{ config(
materialized = 'incremental',
unique_key = ['fact_trades_id'],
incremental_predicates = ["dynamic_range_predicate"],
unique_key = ["history_operation_id", "trade_order"],
incremental_predicates = ["dynamic_range_predicate", "block_timestamp::date"],
merge_exclude_columns = ["inserted_timestamp"],
cluster_by = ['ledger_closed_at::DATE'],
tags = ['core'],
cluster_by = ['block_timestamp::DATE','ledger_closed_at::DATE'],
post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION ON EQUALITY(selling_account_address,selling_asset_code,selling_asset_issuer,buying_account_address,buying_asset_code,buying_asset_issuer);",
tags = ['scheduled_core']
) }}
SELECT
history_operation_id,
"order",
"order" AS trade_order,
ledger_closed_at,
ledger_closed_at AS block_timestamp,
selling_account_address,
selling_asset_code,
selling_asset_issuer,
@ -38,15 +40,20 @@ SELECT
seller_is_exact,
_inserted_timestamp,
{{ dbt_utils.generate_surrogate_key(
['history_operation_id','"order"']
['history_operation_id','trade_order']
) }} AS fact_trades_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp,
'{{ invocation_id }}' AS _invocation_id
FROM
FROM
{{ ref('silver__trades') }}
{% if is_incremental() %}
WHERE
_inserted_timestamp > (SELECT MAX(_inserted_timestamp) FROM {{ this }})
_inserted_timestamp > (
SELECT
MAX(_inserted_timestamp)
FROM
{{ this }}
)
{% endif %}

View File

@ -4,18 +4,12 @@ models:
- name: core__fact_trades
description: A fact table containing trade execution details from both the decentralized exchange and liquidity pools.
columns:
- name: trades_id
description: '{{ doc("trades_id") }}'
tests:
- unique
- not_null
- name: history_operation_id
description: '{{ doc("history_operation_id") }}'
tests:
- not_null
- name: order
- name: trade_order
description: '{{ doc("order") }}'
tests:
- not_null

View File

@ -1,17 +1,20 @@
-- depends_on: {{ ref('silver__transactions') }}
{{ config(
materialized = 'incremental',
unique_key = ["fact_transactions_id"],
incremental_predicates = ["dynamic_range_predicate"],
unique_key = ["id"],
incremental_predicates = ["dynamic_range_predicate", "block_timestamp::date"],
merge_exclude_columns = ["inserted_timestamp"],
cluster_by = ['closed_at::DATE'],
tags = ['core']
cluster_by = ['block_timestamp::DATE','closed_at::DATE'],
post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION ON EQUALITY(id,transaction_hash,account);",
tags = ['scheduled_core']
) }}
SELECT
id,
transaction_hash,
ledger_sequence,
closed_at,
closed_at AS block_timestamp,
account,
account_sequence,
max_fee,
@ -20,7 +23,7 @@ SELECT
memo_type,
memo,
time_bounds,
successful,
SUCCESSFUL,
fee_charged,
inner_transaction_hash,
fee_account,
@ -43,7 +46,6 @@ SELECT
soroban_resources_instructions,
soroban_resources_read_bytes,
soroban_resources_write_bytes,
closed_at,
transaction_result_code,
inclusion_fee_bid,
inclusion_fee_charged,
@ -71,4 +73,4 @@ WHERE
FROM
{{ this }}
)
{% endif %}
{% endif %}

View File

@ -4,11 +4,11 @@ models:
- name: core__fact_transactions
description: Fact table containing transaction details from the Stellar network.
columns:
- name: transactions_id
description: '{{ doc("pk") }}'
tests:
- unique
- not_null
# - name: id
# description: '{{ doc("pk") }}'
# tests:
# - unique
# - not_null
- name: id
description: Unique identifier for the transaction
@ -62,8 +62,8 @@ models:
- name: successful
description: '{{ doc("successful") }}'
tests:
- not_null
# tests:
# - not_null
- name: fee_charged
description: '{{ doc("fee_charged") }}'

View File

@ -73,10 +73,8 @@ WITH pre_final AS (
VALUE :closed_at :: INT,
6
) AS closed_at,
TO_TIMESTAMP(
VALUE :ledger_sequence :: INT,
6
) AS ledger_sequence,
VALUE :ledger_sequence :: INT,
ledger_sequence,
_inserted_timestamp
FROM
@ -85,11 +83,12 @@ WITH pre_final AS (
{% else %}
{{ ref('bronze__accounts_FR') }}
{% endif %}
WHERE
account_id IS NOT NULL
{% if is_incremental() %}
WHERE
partition_gte_id >= '{{ max_part }}'
AND _inserted_timestamp > '{{ max_is }}'
AND partition_gte_id >= '{{ max_part }}'
AND _inserted_timestamp > '{{ max_is }}'
{% endif %}
qualify ROW_NUMBER() over (