staking pools

This commit is contained in:
Jack Forgash 2025-04-24 14:07:07 -06:00
parent b0ecf05265
commit 47a4f9a314
2 changed files with 81 additions and 65 deletions

View File

@ -475,4 +475,43 @@ Query Changes:
* Used clean_log field for reliable regex extraction
- Enhanced source chain ID extraction by joining logs table with proper conditions
---
## silver__staking_pools_s3
### Major Changes
- **REQUIRES FULL REFRESH** - Model has been completely refactored to use `core__ez_actions`
- Refactored to use `core__ez_actions` instead of `silver__actions_events_function_call_s3` and `silver__transactions_final`
- Simplified query structure by removing unnecessary joins and CTEs
- Improved data quality by adding explicit receipt success checks
### Architecture Changes
- Removed dependency on deprecated `silver__actions_events_function_call_s3`
- Consolidated data sourcing into a single CTE from `core__ez_actions`
- Improved incremental processing with dynamic range predicate
### Column Changes
#### Removed
- No columns removed
#### Added
- `receipt_id` - Added for better transaction tracking and unique key generation
#### Modified
- Changed surrogate key generation to use `receipt_id` instead of `tx_hash`
- Updated source of `address` to use `receipt_receiver_id` for new pools
- Updated source of `owner` to use `tx_signer` for updated pools
### Configuration Changes
- Added dynamic range predicate for incremental processing
- Updated clustering keys to include both `block_timestamp::DATE` and `modified_timestamp::DATE`
- Changed unique key from `tx_hash` to `staking_pools_id`
- Added search optimization on `EQUALITY(tx_hash,receipt_id,owner,address)`
### Query Changes
- Simplified data sourcing by using `core__ez_actions` directly
- Added explicit success checks with `receipt_succeeded` and `tx_succeeded`
- Improved filtering by using native fields from `core__ez_actions`
- Maintained same business logic for identifying and processing staking pool transactions
---

View File

@ -1,121 +1,88 @@
{{ config(
materialized = 'incremental',
cluster_by = ['block_timestamp'],
unique_key = 'tx_hash',
incremental_strategy = 'merge',
incremental_predicates = ["dynamic_range_predicate_custom","block_timestamp::date"],
merge_exclude_columns = ["inserted_timestamp"],
cluster_by = ['block_timestamp::DATE', 'modified_timestamp::DATE'],
unique_key = 'staking_pools_id',
post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION ON EQUALITY(tx_hash,receipt_id,owner,address);",
tags = ['curated','scheduled_non_core']
) }}
{# Note - multisource model #}
-- TODO ez_actions refactor
WITH txs AS (
WITH actions AS (
SELECT
tx_hash,
block_timestamp,
block_id,
receipt_id,
receipt_receiver_id,
receipt_signer_id,
tx_signer,
tx_receiver,
transaction_json AS tx,
action_data :method_name :: STRING AS method_name,
action_data :args AS args,
receipt_succeeded,
tx_succeeded,
_partition_by_block_number
FROM
{{ ref('silver__transactions_final') }}
{% if var("MANUAL_FIX") %}
WHERE {{ partition_load_manual('no_buffer') }}
{% else %}
WHERE modified_timestamp >= (
SELECT
MAX(modified_timestamp)
FROM
{{ this }}
)
{% endif %}
),
function_calls AS (
SELECT
tx_hash,
block_timestamp,
block_id,
SPLIT(
action_id,
'-'
) [0] :: STRING AS receipt_object_id,
receiver_id,
signer_id,
method_name,
args,
_partition_by_block_number
FROM
{{ ref('silver__actions_events_function_call_s3') }}
WHERE
method_name IN (
FROM
{{ ref('core__ez_actions') }}
WHERE
action_name = 'FunctionCall'
AND receipt_succeeded
AND tx_succeeded
AND method_name IN (
'create_staking_pool',
'update_reward_fee_fraction',
'new'
)
)
{% if var("MANUAL_FIX") %}
AND {{ partition_load_manual('no_buffer') }}
{% else %}
{% if is_incremental() %}
{% if is_incremental() %}
AND modified_timestamp >= (
SELECT
MAX(modified_timestamp)
FROM
{{ this }}
)
{% endif %}
{% endif %}
{% endif %}
),
add_addresses_from_tx AS (
SELECT
fc.tx_hash,
fc.block_timestamp,
fc.block_id,
receipt_object_id,
tx_receiver,
tx_signer,
receiver_id,
signer_id,
method_name,
args,
txs._partition_by_block_number
FROM
function_calls fc
LEFT JOIN txs USING (tx_hash)
),
new_pools AS (
SELECT
tx_hash,
block_timestamp,
block_id,
receipt_id,
args :owner_id :: STRING AS owner,
receiver_id AS address,
receipt_receiver_id AS address,
TRY_PARSE_JSON(
args :reward_fee_fraction
) AS reward_fee_fraction,
'Create' AS tx_type,
_partition_by_block_number
FROM
add_addresses_from_tx
actions
WHERE
tx_hash IN (
SELECT
DISTINCT tx_hash
FROM
add_addresses_from_tx
actions
WHERE
method_name = 'create_staking_pool'
)
AND method_name = 'new'
),
updated_pools AS (
SELECT
tx_hash,
block_timestamp,
block_id,
receipt_id,
tx_signer AS owner,
tx_receiver AS address,
TRY_PARSE_JSON(
@ -124,11 +91,12 @@ updated_pools AS (
'Update' AS tx_type,
_partition_by_block_number
FROM
add_addresses_from_tx
actions
WHERE
method_name = 'update_reward_fee_fraction'
AND reward_fee_fraction IS NOT NULL
),
FINAL AS (
SELECT
*
@ -140,10 +108,19 @@ FINAL AS (
FROM
updated_pools
)
SELECT
*,
tx_hash,
block_timestamp,
block_id,
receipt_id,
owner,
address,
reward_fee_fraction,
tx_type,
_partition_by_block_number,
{{ dbt_utils.generate_surrogate_key(
['tx_hash']
['receipt_id']
) }} AS staking_pools_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp,