From 7fa382ee39d4e5435db646de086d83204969b7cc Mon Sep 17 00:00:00 2001 From: eric-laurello <102970824+eric-laurello@users.noreply.github.com> Date: Wed, 3 Jan 2024 16:16:56 -0500 Subject: [PATCH] AN-4347 upgrade staking model (#147) * upgrade staking model * edge cases * revert --- models/gold/gov/gov__fact_staking.sql | 2 +- models/silver/gov/silver__staking.sql | 572 ++++++++++++-------------- models/silver/gov/silver__staking.yml | 13 +- 3 files changed, 271 insertions(+), 316 deletions(-) diff --git a/models/gold/gov/gov__fact_staking.sql b/models/gold/gov/gov__fact_staking.sql index a1e4318..53c265a 100644 --- a/models/gold/gov/gov__fact_staking.sql +++ b/models/gold/gov/gov__fact_staking.sql @@ -21,7 +21,7 @@ SELECT COALESCE( staking_id, {{ dbt_utils.generate_surrogate_key( - ['_unique_key'] + ['tx_id','msg_index'] ) }} ) AS fact_staking_id, COALESCE( diff --git a/models/silver/gov/silver__staking.sql b/models/silver/gov/silver__staking.sql index bba783e..aeabe11 100644 --- a/models/silver/gov/silver__staking.sql +++ b/models/silver/gov/silver__staking.sql @@ -1,46 +1,37 @@ {{ config( materialized = 'incremental', - unique_key = "_unique_key", + unique_key = ['tx_id', 'msg_index'], incremental_strategy = 'merge', merge_exclude_columns = ["inserted_timestamp"], cluster_by = ['block_timestamp::DATE'], tags = ['noncore'] ) }} -WITH - -{% if is_incremental() %} -max_date AS ( +WITH base AS ( SELECT - MAX( - _inserted_timestamp - ) _inserted_timestamp - FROM - {{ this }} -), -{% endif %} - -base AS ( - SELECT - A.tx_id, - A.msg_type, - A.msg_index, + block_id, + block_timestamp, + tx_id, + tx_succeeded, + msg_type, + msg_index, msg_group, - msg_sub_group + msg_sub_group, + attribute_key, + attribute_value, + _inserted_timestamp FROM - {{ ref('silver__msg_attributes') }} A + {{ ref('silver__msg_attributes') }} WHERE msg_type IN ( 'delegate', 'redelegate', 'unbond', - 'create_validator' - ) - AND attribute_value NOT IN ( - 'superfluid_delegate', - 'superfluid_undelegate', - 'superfluid_unbond_underlying_lock' + 'create_validator', + 'tx', + 'coin_spent', + 'message' ) {% if is_incremental() %} @@ -50,57 +41,7 @@ AND _inserted_timestamp >= ( _inserted_timestamp ) FROM - max_date -) -{% endif %} -), -msg_attr AS ( - SELECT - A.tx_id, - A.attribute_key, - A.attribute_value, - A.msg_index, - A.msg_type, - A.msg_group, - A.msg_sub_group, - block_id, - block_timestamp, - tx_succeeded, - _inserted_timestamp - FROM - {{ ref('silver__msg_attributes') }} A - JOIN ( - SELECT - DISTINCT tx_id, - msg_index - FROM - base - UNION ALL - SELECT - DISTINCT tx_id, - msg_index + 1 msg_index - FROM - base - ) b - ON A.tx_id = b.tx_id - AND A.msg_index = b.msg_index - WHERE - A.msg_type IN ( - 'delegate', - 'message', - 'redelegate', - 'unbond', - 'create_validator' - ) - -{% if is_incremental() %} -AND _inserted_timestamp >= ( - SELECT - MAX( - _inserted_timestamp - ) - FROM - max_date + {{ this }} ) {% endif %} ), @@ -111,35 +52,77 @@ tx_address AS ( attribute_value, '/', 0 - ) AS tx_caller_address + ) AS tx_caller_address, + _inserted_timestamp FROM - {{ ref('silver__msg_attributes') }} A - JOIN ( - SELECT - DISTINCT tx_id - FROM - base - ) b - ON A.tx_id = b.tx_id + base A WHERE - attribute_key = 'acc_seq' - -{% if is_incremental() %} -AND _inserted_timestamp >= ( - SELECT - MAX( - _inserted_timestamp - ) - FROM - max_date -) -{% endif %} - -qualify(ROW_NUMBER() over(PARTITION BY A.tx_id -ORDER BY - msg_index)) = 1 + attribute_key = 'acc_seq' qualify(ROW_NUMBER() over(PARTITION BY A.tx_id + ORDER BY + msg_index)) = 1 ), valid AS ( + SELECT + block_id, + A.block_timestamp, + A.tx_id, + A.tx_succeeded, + msg_group, + msg_sub_group, + msg_index, + msg_type, + OBJECT_AGG( + attribute_key :: STRING, + attribute_value :: variant + ) AS j, + COALESCE( + j :validator :: STRING, + j :destination_validator :: STRING + ) AS validator_address, + j :source_validator :: STRING AS redelegate_source_validator_address, + j :amount :: STRING AS amount_raw, + j :authz_msg_index :: INT AS authz_msg_index, + j :completion_time :: STRING AS completion_time, + j :new_shares :: STRING AS new_shares, + SPLIT_PART( + TRIM( + REGEXP_REPLACE( + amount_raw, + '[^[:digit:]]', + ' ' + ) + ), + ' ', + 0 + ) AS amount, + RIGHT(amount_raw, LENGTH(amount_raw) - LENGTH(SPLIT_PART(TRIM(REGEXP_REPLACE(amount_raw, '[^[:digit:]]', ' ')), ' ', 0))) AS currency, + ROW_NUMBER() over ( + PARTITION BY tx_id, + msg_group, + msg_sub_group + ORDER BY + msg_index DESC + ) AS del_rank + FROM + base A + WHERE + msg_type IN ( + 'delegate', + 'redelegate', + 'unbond', + 'create_validator' + ) + GROUP BY + block_id, + A.block_timestamp, + A.tx_id, + A.tx_succeeded, + msg_group, + msg_sub_group, + msg_index, + msg_type +), +spent AS ( SELECT tx_id, msg_group, @@ -149,15 +132,99 @@ valid AS ( attribute_key :: STRING, attribute_value :: variant ) AS j, - COALESCE( - j :validator :: STRING, - j :destination_validator :: STRING - ) AS validator_address, - j :source_validator :: STRING AS redelegate_source_validator_address + j :spender :: STRING AS spender, + j :amount :: STRING AS amount_raw, + j :authz_msg_index :: INT AS authz_msg_index, + SPLIT_PART( + TRIM( + REGEXP_REPLACE( + amount_raw, + '[^[:digit:]]', + ' ' + ) + ), + ' ', + 0 + ) AS amount, + RIGHT(amount_raw, LENGTH(amount_raw) - LENGTH(SPLIT_PART(TRIM(REGEXP_REPLACE(amount_raw, '[^[:digit:]]', ' ')), ' ', 0))) AS currency, + ROW_NUMBER() over ( + PARTITION BY tx_id, + msg_group, + msg_sub_group + ORDER BY + msg_index DESC + ) AS spent_rank FROM - msg_attr + base WHERE - attribute_key LIKE '%validator' + msg_type = 'coin_spent' + GROUP BY + tx_id, + msg_group, + msg_sub_group, + msg_index +), +spent_auth AS ( + SELECT + tx_id, + msg_group, + msg_sub_group, + msg_index, + spender, + authz_msg_index, + amount, + currency + FROM + spent + WHERE + authz_msg_index IS NOT NULL qualify(ROW_NUMBER() over(PARTITION BY tx_id, authz_msg_index + ORDER BY + msg_index DESC) = 1) +), +spent_amount AS ( + SELECT + tx_id, + msg_group, + msg_sub_group, + msg_index, + spender, + authz_msg_index, + amount, + currency + FROM + spent qualify(ROW_NUMBER() over(PARTITION BY tx_id, COALESCE(msg_group, -1), COALESCE(msg_sub_group, -1), amount + ORDER BY + msg_index DESC) = 1) +), +wr AS ( + SELECT + tx_id, + msg_group, + msg_sub_group, + msg_index, + OBJECT_AGG( + attribute_key :: STRING, + attribute_value :: variant + ) AS j, + j :validator :: STRING AS validator, + j :amount :: STRING AS amount_raw, + j :delegator :: STRING AS delegator, + SPLIT_PART( + TRIM( + REGEXP_REPLACE( + amount_raw, + '[^[:digit:]]', + ' ' + ) + ), + ' ', + 0 + ) AS amount, + RIGHT(amount_raw, LENGTH(amount_raw) - LENGTH(SPLIT_PART(TRIM(REGEXP_REPLACE(amount_raw, '[^[:digit:]]', ' ')), ' ', 0))) AS currency + FROM + base + WHERE + msg_type = 'withdraw_rewards' GROUP BY tx_id, msg_group, @@ -174,232 +241,125 @@ sendr AS ( attribute_key :: STRING, attribute_value :: variant ) AS j, + j :module :: STRING AS module, j :sender :: STRING AS sender FROM - msg_attr A + base WHERE - attribute_key = 'sender' + msg_type = 'message' GROUP BY tx_id, msg_group, msg_sub_group, msg_index -), -amount AS ( - SELECT - tx_id, - msg_group, - msg_sub_group, - msg_index, - OBJECT_AGG( - attribute_key :: STRING, - attribute_value :: variant - ) AS j, - j :amount :: STRING AS amount - FROM - msg_attr - WHERE - attribute_key = 'amount' - GROUP BY - tx_id, - msg_group, - msg_sub_group, - msg_index -), -ctime AS ( - SELECT - tx_id, - msg_group, - msg_sub_group, - msg_index, - OBJECT_AGG( - attribute_key :: STRING, - attribute_value :: variant - ) AS j, - j :completion_time :: STRING AS completion_time - FROM - msg_attr - WHERE - attribute_key = 'completion_time' - GROUP BY - tx_id, - msg_group, - msg_sub_group, - msg_index -), -prefinal AS ( - SELECT - A.tx_id, - A.msg_group, - A.msg_sub_group, - b.sender AS delegator_address, - d.amount, - A.msg_type AS action, - C.validator_address, - C.redelegate_source_validator_address, - e.completion_time - FROM - ( - SELECT - DISTINCT tx_id, - msg_group, - msg_sub_group, - msg_index, - REPLACE( - REPLACE( - msg_type, - 'unbond', - 'undelegate' - ), - 'create_validator', - 'delegate' - ) msg_type - FROM - base - ) A - JOIN sendr b - ON A.tx_id = b.tx_id - AND A.msg_group = b.msg_group - AND A.msg_index + 1 = b.msg_index - JOIN valid C - ON A.tx_id = C.tx_id - AND A.msg_group = C.msg_group - AND A.msg_index = C.msg_index - JOIN amount d - ON A.tx_id = d.tx_id - AND A.msg_group = d.msg_group - AND A.msg_index = d.msg_index - LEFT JOIN ctime e - ON A.tx_id = e.tx_id - AND A.msg_group = e.msg_group - AND A.msg_index = e.msg_index -), -add_dec AS ( - SELECT - b.block_id, - b.block_timestamp, - A.tx_id, - b.tx_succeeded, - C.tx_caller_address, - A.action, - A.msg_group, - A.msg_sub_group, - A.delegator_address, - SUM( - CASE - WHEN A.split_amount LIKE '%uosmo' THEN REPLACE( - A.split_amount, - 'uosmo' - ) - WHEN A.split_amount LIKE '%uion' THEN REPLACE( - A.split_amount, - 'uion' - ) - WHEN A.split_amount LIKE '%pool%' THEN LEFT(A.split_amount, CHARINDEX('g', A.split_amount) -1) - WHEN A.split_amount LIKE '%ibc%' THEN LEFT(A.split_amount, CHARINDEX('i', A.split_amount) -1) - ELSE A.split_amount - END :: INT - ) AS amount, - CASE - WHEN A.split_amount LIKE '%uosmo' THEN 'uosmo' - WHEN A.split_amount LIKE '%uion' THEN 'uion' - WHEN A.split_amount LIKE '%pool%' THEN SUBSTRING(A.split_amount, CHARINDEX('g', A.split_amount), 99) - WHEN A.split_amount LIKE '%ibc%' THEN SUBSTRING(A.split_amount, CHARINDEX('i', A.split_amount), 99) - ELSE 'uosmo' - END AS currency, - A.validator_address, - A.redelegate_source_validator_address, - A.completion_time :: datetime completion_time, - b._inserted_timestamp - FROM - ( - SELECT - p.tx_id, - p.action, - p.msg_group, - p.msg_sub_group, - p.delegator_address, - p.validator_address, - p.redelegate_source_validator_address, - p.completion_time, - am.value AS split_amount - FROM - prefinal p, - LATERAL SPLIT_TO_TABLE( - p.amount, - ',' - ) am - ) A - JOIN ( - SELECT - DISTINCT tx_id, - block_id, - block_timestamp, - tx_succeeded, - _inserted_timestamp - FROM - msg_attr - ) b - ON A.tx_id = b.tx_id - JOIN tx_address C - ON A.tx_id = C.tx_id - GROUP BY - b.block_id, - b.block_timestamp, - A.tx_id, - b.tx_succeeded, - C.tx_caller_address, - A.action, - A.msg_group, - A.msg_sub_group, - A.delegator_address, - currency, - A.validator_address, - A.redelegate_source_validator_address, - completion_time, - b._inserted_timestamp + HAVING + module = 'staking' ) SELECT block_id, A.block_timestamp, A.tx_id, A.tx_succeeded, - A.tx_caller_address, - A.action, + b.tx_caller_address, + REPLACE( + REPLACE( + msg_type, + 'unbond', + 'undelegate' + ), + 'create_validator', + 'delegate' + ) AS action, A.msg_group, A.msg_sub_group, - A.delegator_address, - A.amount, + A.msg_index, + COALESCE( + s.sender, + C.delegator, + d_auth.spender, + d.spender, + d_amount.spender, + b.tx_caller_address + ) AS delegator_address, + A.amount :: INT AS amount, A.currency, A.validator_address, A.redelegate_source_validator_address, - A.completion_time, + A.completion_time :: datetime AS completion_time, CASE WHEN A.currency LIKE 'gamm/pool/%' THEN 18 ELSE amd.decimal END AS DECIMAL, - A._inserted_timestamp, - concat_ws( - '-', - tx_id, - msg_group, - COALESCE( - msg_sub_group, - -1 - ), - action, - currency, - delegator_address, - validator_address - ) AS _unique_key, + b._inserted_timestamp, {{ dbt_utils.generate_surrogate_key( - ['_unique_key'] + ['a.tx_id', 'a.msg_index'] ) }} AS staking_id, + staking_id AS _unique_key, SYSDATE() AS inserted_timestamp, SYSDATE() AS modified_timestamp, '{{ invocation_id }}' AS _invocation_id FROM - add_dec A + valid A + JOIN tx_address b + ON A.tx_id = b.tx_id + LEFT JOIN sendr s + ON A.tx_id = s.tx_id + AND A.msg_group = s.msg_group + AND COALESCE( + A.msg_sub_group, + -1 + ) = COALESCE( + s.msg_sub_group, + -1 + ) + LEFT JOIN wr C + ON A.tx_id = C.tx_id + AND A.msg_group = C.msg_group + AND COALESCE( + A.msg_sub_group, + -1 + ) = COALESCE( + C.msg_sub_group, + -1 + ) + AND A.validator_address = C.validator + LEFT JOIN spent_auth d_auth + ON A.tx_id = d_auth.tx_id + AND A.msg_group = d_auth.msg_group + AND COALESCE( + A.msg_sub_group, + -1 + ) = COALESCE( + d_auth.msg_sub_group, + -1 + ) + AND A.authz_msg_index = d_auth.authz_msg_index + LEFT JOIN spent d + ON A.tx_id = d.tx_id + AND A.msg_group = d.msg_group + AND COALESCE( + A.msg_sub_group, + -1 + ) = COALESCE( + d.msg_sub_group, + -1 + ) + AND A.amount = d.amount + AND A.del_rank = d.spent_rank + AND A.authz_msg_index IS NULL + LEFT JOIN spent_amount d_amount + ON A.tx_id = d_amount.tx_id + AND A.msg_group = d_amount.msg_group + AND COALESCE( + A.msg_sub_group, + -1 + ) = COALESCE( + d_amount.msg_sub_group, + -1 + ) + AND A.amount = d_amount.amount + AND d_auth.tx_id IS NULL + AND d.tx_id IS NULL LEFT OUTER JOIN {{ ref('silver__asset_metadata') }} amd ON A.currency = amd.address diff --git a/models/silver/gov/silver__staking.yml b/models/silver/gov/silver__staking.yml index f07f60e..97f9f7e 100644 --- a/models/silver/gov/silver__staking.yml +++ b/models/silver/gov/silver__staking.yml @@ -6,12 +6,7 @@ models: - dbt_utils.unique_combination_of_columns: combination_of_columns: - TX_ID - - MSG_GROUP - - MSG_SUB_GROUP - - ACTION - - CURRENCY - - DELEGATOR_ADDRESS - - VALIDATOR_ADDRESS + - MSG_INDEX columns: - name: BLOCK_ID description: "{{ doc('block_id') }}" @@ -25,9 +20,9 @@ models: description: "{{ doc('block_timestamp') }}" tests: - not_null - # - dbt_expectations.expect_row_values_to_have_recent_data: - # datepart: day - # interval: 1 + - dbt_expectations.expect_row_values_to_have_recent_data: + datepart: day + interval: 1 - dbt_expectations.expect_column_values_to_be_in_type_list: column_type_list: - TIMESTAMP_NTZ