Merge branch 'main' into AN-4236-Swaps_overhaul

This commit is contained in:
Eric Laurello 2024-01-09 13:49:16 -05:00
commit a0767f964b
3 changed files with 271 additions and 316 deletions

View File

@ -21,7 +21,7 @@ SELECT
COALESCE(
staking_id,
{{ dbt_utils.generate_surrogate_key(
['_unique_key']
['tx_id','msg_index']
) }}
) AS fact_staking_id,
COALESCE(

View File

@ -1,46 +1,37 @@
{{ config(
materialized = 'incremental',
unique_key = "_unique_key",
unique_key = ['tx_id', 'msg_index'],
incremental_strategy = 'merge',
merge_exclude_columns = ["inserted_timestamp"],
cluster_by = ['block_timestamp::DATE'],
tags = ['noncore']
) }}
WITH
{% if is_incremental() %}
max_date AS (
WITH base AS (
SELECT
MAX(
_inserted_timestamp
) _inserted_timestamp
FROM
{{ this }}
),
{% endif %}
base AS (
SELECT
A.tx_id,
A.msg_type,
A.msg_index,
block_id,
block_timestamp,
tx_id,
tx_succeeded,
msg_type,
msg_index,
msg_group,
msg_sub_group
msg_sub_group,
attribute_key,
attribute_value,
_inserted_timestamp
FROM
{{ ref('silver__msg_attributes') }} A
{{ ref('silver__msg_attributes') }}
WHERE
msg_type IN (
'delegate',
'redelegate',
'unbond',
'create_validator'
)
AND attribute_value NOT IN (
'superfluid_delegate',
'superfluid_undelegate',
'superfluid_unbond_underlying_lock'
'create_validator',
'tx',
'coin_spent',
'message'
)
{% if is_incremental() %}
@ -50,57 +41,7 @@ AND _inserted_timestamp >= (
_inserted_timestamp
)
FROM
max_date
)
{% endif %}
),
msg_attr AS (
SELECT
A.tx_id,
A.attribute_key,
A.attribute_value,
A.msg_index,
A.msg_type,
A.msg_group,
A.msg_sub_group,
block_id,
block_timestamp,
tx_succeeded,
_inserted_timestamp
FROM
{{ ref('silver__msg_attributes') }} A
JOIN (
SELECT
DISTINCT tx_id,
msg_index
FROM
base
UNION ALL
SELECT
DISTINCT tx_id,
msg_index + 1 msg_index
FROM
base
) b
ON A.tx_id = b.tx_id
AND A.msg_index = b.msg_index
WHERE
A.msg_type IN (
'delegate',
'message',
'redelegate',
'unbond',
'create_validator'
)
{% if is_incremental() %}
AND _inserted_timestamp >= (
SELECT
MAX(
_inserted_timestamp
)
FROM
max_date
{{ this }}
)
{% endif %}
),
@ -111,35 +52,77 @@ tx_address AS (
attribute_value,
'/',
0
) AS tx_caller_address
) AS tx_caller_address,
_inserted_timestamp
FROM
{{ ref('silver__msg_attributes') }} A
JOIN (
SELECT
DISTINCT tx_id
FROM
base
) b
ON A.tx_id = b.tx_id
base A
WHERE
attribute_key = 'acc_seq'
{% if is_incremental() %}
AND _inserted_timestamp >= (
SELECT
MAX(
_inserted_timestamp
)
FROM
max_date
)
{% endif %}
qualify(ROW_NUMBER() over(PARTITION BY A.tx_id
ORDER BY
msg_index)) = 1
attribute_key = 'acc_seq' qualify(ROW_NUMBER() over(PARTITION BY A.tx_id
ORDER BY
msg_index)) = 1
),
valid AS (
SELECT
block_id,
A.block_timestamp,
A.tx_id,
A.tx_succeeded,
msg_group,
msg_sub_group,
msg_index,
msg_type,
OBJECT_AGG(
attribute_key :: STRING,
attribute_value :: variant
) AS j,
COALESCE(
j :validator :: STRING,
j :destination_validator :: STRING
) AS validator_address,
j :source_validator :: STRING AS redelegate_source_validator_address,
j :amount :: STRING AS amount_raw,
j :authz_msg_index :: INT AS authz_msg_index,
j :completion_time :: STRING AS completion_time,
j :new_shares :: STRING AS new_shares,
SPLIT_PART(
TRIM(
REGEXP_REPLACE(
amount_raw,
'[^[:digit:]]',
' '
)
),
' ',
0
) AS amount,
RIGHT(amount_raw, LENGTH(amount_raw) - LENGTH(SPLIT_PART(TRIM(REGEXP_REPLACE(amount_raw, '[^[:digit:]]', ' ')), ' ', 0))) AS currency,
ROW_NUMBER() over (
PARTITION BY tx_id,
msg_group,
msg_sub_group
ORDER BY
msg_index DESC
) AS del_rank
FROM
base A
WHERE
msg_type IN (
'delegate',
'redelegate',
'unbond',
'create_validator'
)
GROUP BY
block_id,
A.block_timestamp,
A.tx_id,
A.tx_succeeded,
msg_group,
msg_sub_group,
msg_index,
msg_type
),
spent AS (
SELECT
tx_id,
msg_group,
@ -149,15 +132,99 @@ valid AS (
attribute_key :: STRING,
attribute_value :: variant
) AS j,
COALESCE(
j :validator :: STRING,
j :destination_validator :: STRING
) AS validator_address,
j :source_validator :: STRING AS redelegate_source_validator_address
j :spender :: STRING AS spender,
j :amount :: STRING AS amount_raw,
j :authz_msg_index :: INT AS authz_msg_index,
SPLIT_PART(
TRIM(
REGEXP_REPLACE(
amount_raw,
'[^[:digit:]]',
' '
)
),
' ',
0
) AS amount,
RIGHT(amount_raw, LENGTH(amount_raw) - LENGTH(SPLIT_PART(TRIM(REGEXP_REPLACE(amount_raw, '[^[:digit:]]', ' ')), ' ', 0))) AS currency,
ROW_NUMBER() over (
PARTITION BY tx_id,
msg_group,
msg_sub_group
ORDER BY
msg_index DESC
) AS spent_rank
FROM
msg_attr
base
WHERE
attribute_key LIKE '%validator'
msg_type = 'coin_spent'
GROUP BY
tx_id,
msg_group,
msg_sub_group,
msg_index
),
spent_auth AS (
SELECT
tx_id,
msg_group,
msg_sub_group,
msg_index,
spender,
authz_msg_index,
amount,
currency
FROM
spent
WHERE
authz_msg_index IS NOT NULL qualify(ROW_NUMBER() over(PARTITION BY tx_id, authz_msg_index
ORDER BY
msg_index DESC) = 1)
),
spent_amount AS (
SELECT
tx_id,
msg_group,
msg_sub_group,
msg_index,
spender,
authz_msg_index,
amount,
currency
FROM
spent qualify(ROW_NUMBER() over(PARTITION BY tx_id, COALESCE(msg_group, -1), COALESCE(msg_sub_group, -1), amount
ORDER BY
msg_index DESC) = 1)
),
wr AS (
SELECT
tx_id,
msg_group,
msg_sub_group,
msg_index,
OBJECT_AGG(
attribute_key :: STRING,
attribute_value :: variant
) AS j,
j :validator :: STRING AS validator,
j :amount :: STRING AS amount_raw,
j :delegator :: STRING AS delegator,
SPLIT_PART(
TRIM(
REGEXP_REPLACE(
amount_raw,
'[^[:digit:]]',
' '
)
),
' ',
0
) AS amount,
RIGHT(amount_raw, LENGTH(amount_raw) - LENGTH(SPLIT_PART(TRIM(REGEXP_REPLACE(amount_raw, '[^[:digit:]]', ' ')), ' ', 0))) AS currency
FROM
base
WHERE
msg_type = 'withdraw_rewards'
GROUP BY
tx_id,
msg_group,
@ -174,232 +241,125 @@ sendr AS (
attribute_key :: STRING,
attribute_value :: variant
) AS j,
j :module :: STRING AS module,
j :sender :: STRING AS sender
FROM
msg_attr A
base
WHERE
attribute_key = 'sender'
msg_type = 'message'
GROUP BY
tx_id,
msg_group,
msg_sub_group,
msg_index
),
amount AS (
SELECT
tx_id,
msg_group,
msg_sub_group,
msg_index,
OBJECT_AGG(
attribute_key :: STRING,
attribute_value :: variant
) AS j,
j :amount :: STRING AS amount
FROM
msg_attr
WHERE
attribute_key = 'amount'
GROUP BY
tx_id,
msg_group,
msg_sub_group,
msg_index
),
ctime AS (
SELECT
tx_id,
msg_group,
msg_sub_group,
msg_index,
OBJECT_AGG(
attribute_key :: STRING,
attribute_value :: variant
) AS j,
j :completion_time :: STRING AS completion_time
FROM
msg_attr
WHERE
attribute_key = 'completion_time'
GROUP BY
tx_id,
msg_group,
msg_sub_group,
msg_index
),
prefinal AS (
SELECT
A.tx_id,
A.msg_group,
A.msg_sub_group,
b.sender AS delegator_address,
d.amount,
A.msg_type AS action,
C.validator_address,
C.redelegate_source_validator_address,
e.completion_time
FROM
(
SELECT
DISTINCT tx_id,
msg_group,
msg_sub_group,
msg_index,
REPLACE(
REPLACE(
msg_type,
'unbond',
'undelegate'
),
'create_validator',
'delegate'
) msg_type
FROM
base
) A
JOIN sendr b
ON A.tx_id = b.tx_id
AND A.msg_group = b.msg_group
AND A.msg_index + 1 = b.msg_index
JOIN valid C
ON A.tx_id = C.tx_id
AND A.msg_group = C.msg_group
AND A.msg_index = C.msg_index
JOIN amount d
ON A.tx_id = d.tx_id
AND A.msg_group = d.msg_group
AND A.msg_index = d.msg_index
LEFT JOIN ctime e
ON A.tx_id = e.tx_id
AND A.msg_group = e.msg_group
AND A.msg_index = e.msg_index
),
add_dec AS (
SELECT
b.block_id,
b.block_timestamp,
A.tx_id,
b.tx_succeeded,
C.tx_caller_address,
A.action,
A.msg_group,
A.msg_sub_group,
A.delegator_address,
SUM(
CASE
WHEN A.split_amount LIKE '%uosmo' THEN REPLACE(
A.split_amount,
'uosmo'
)
WHEN A.split_amount LIKE '%uion' THEN REPLACE(
A.split_amount,
'uion'
)
WHEN A.split_amount LIKE '%pool%' THEN LEFT(A.split_amount, CHARINDEX('g', A.split_amount) -1)
WHEN A.split_amount LIKE '%ibc%' THEN LEFT(A.split_amount, CHARINDEX('i', A.split_amount) -1)
ELSE A.split_amount
END :: INT
) AS amount,
CASE
WHEN A.split_amount LIKE '%uosmo' THEN 'uosmo'
WHEN A.split_amount LIKE '%uion' THEN 'uion'
WHEN A.split_amount LIKE '%pool%' THEN SUBSTRING(A.split_amount, CHARINDEX('g', A.split_amount), 99)
WHEN A.split_amount LIKE '%ibc%' THEN SUBSTRING(A.split_amount, CHARINDEX('i', A.split_amount), 99)
ELSE 'uosmo'
END AS currency,
A.validator_address,
A.redelegate_source_validator_address,
A.completion_time :: datetime completion_time,
b._inserted_timestamp
FROM
(
SELECT
p.tx_id,
p.action,
p.msg_group,
p.msg_sub_group,
p.delegator_address,
p.validator_address,
p.redelegate_source_validator_address,
p.completion_time,
am.value AS split_amount
FROM
prefinal p,
LATERAL SPLIT_TO_TABLE(
p.amount,
','
) am
) A
JOIN (
SELECT
DISTINCT tx_id,
block_id,
block_timestamp,
tx_succeeded,
_inserted_timestamp
FROM
msg_attr
) b
ON A.tx_id = b.tx_id
JOIN tx_address C
ON A.tx_id = C.tx_id
GROUP BY
b.block_id,
b.block_timestamp,
A.tx_id,
b.tx_succeeded,
C.tx_caller_address,
A.action,
A.msg_group,
A.msg_sub_group,
A.delegator_address,
currency,
A.validator_address,
A.redelegate_source_validator_address,
completion_time,
b._inserted_timestamp
HAVING
module = 'staking'
)
SELECT
block_id,
A.block_timestamp,
A.tx_id,
A.tx_succeeded,
A.tx_caller_address,
A.action,
b.tx_caller_address,
REPLACE(
REPLACE(
msg_type,
'unbond',
'undelegate'
),
'create_validator',
'delegate'
) AS action,
A.msg_group,
A.msg_sub_group,
A.delegator_address,
A.amount,
A.msg_index,
COALESCE(
s.sender,
C.delegator,
d_auth.spender,
d.spender,
d_amount.spender,
b.tx_caller_address
) AS delegator_address,
A.amount :: INT AS amount,
A.currency,
A.validator_address,
A.redelegate_source_validator_address,
A.completion_time,
A.completion_time :: datetime AS completion_time,
CASE
WHEN A.currency LIKE 'gamm/pool/%' THEN 18
ELSE amd.decimal
END AS DECIMAL,
A._inserted_timestamp,
concat_ws(
'-',
tx_id,
msg_group,
COALESCE(
msg_sub_group,
-1
),
action,
currency,
delegator_address,
validator_address
) AS _unique_key,
b._inserted_timestamp,
{{ dbt_utils.generate_surrogate_key(
['_unique_key']
['a.tx_id', 'a.msg_index']
) }} AS staking_id,
staking_id AS _unique_key,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp,
'{{ invocation_id }}' AS _invocation_id
FROM
add_dec A
valid A
JOIN tx_address b
ON A.tx_id = b.tx_id
LEFT JOIN sendr s
ON A.tx_id = s.tx_id
AND A.msg_group = s.msg_group
AND COALESCE(
A.msg_sub_group,
-1
) = COALESCE(
s.msg_sub_group,
-1
)
LEFT JOIN wr C
ON A.tx_id = C.tx_id
AND A.msg_group = C.msg_group
AND COALESCE(
A.msg_sub_group,
-1
) = COALESCE(
C.msg_sub_group,
-1
)
AND A.validator_address = C.validator
LEFT JOIN spent_auth d_auth
ON A.tx_id = d_auth.tx_id
AND A.msg_group = d_auth.msg_group
AND COALESCE(
A.msg_sub_group,
-1
) = COALESCE(
d_auth.msg_sub_group,
-1
)
AND A.authz_msg_index = d_auth.authz_msg_index
LEFT JOIN spent d
ON A.tx_id = d.tx_id
AND A.msg_group = d.msg_group
AND COALESCE(
A.msg_sub_group,
-1
) = COALESCE(
d.msg_sub_group,
-1
)
AND A.amount = d.amount
AND A.del_rank = d.spent_rank
AND A.authz_msg_index IS NULL
LEFT JOIN spent_amount d_amount
ON A.tx_id = d_amount.tx_id
AND A.msg_group = d_amount.msg_group
AND COALESCE(
A.msg_sub_group,
-1
) = COALESCE(
d_amount.msg_sub_group,
-1
)
AND A.amount = d_amount.amount
AND d_auth.tx_id IS NULL
AND d.tx_id IS NULL
LEFT OUTER JOIN {{ ref('silver__asset_metadata') }}
amd
ON A.currency = amd.address

View File

@ -6,12 +6,7 @@ models:
- dbt_utils.unique_combination_of_columns:
combination_of_columns:
- TX_ID
- MSG_GROUP
- MSG_SUB_GROUP
- ACTION
- CURRENCY
- DELEGATOR_ADDRESS
- VALIDATOR_ADDRESS
- MSG_INDEX
columns:
- name: BLOCK_ID
description: "{{ doc('block_id') }}"
@ -25,9 +20,9 @@ models:
description: "{{ doc('block_timestamp') }}"
tests:
- not_null
# - dbt_expectations.expect_row_values_to_have_recent_data:
# datepart: day
# interval: 1
- dbt_expectations.expect_row_values_to_have_recent_data:
datepart: day
interval: 1
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- TIMESTAMP_NTZ