diff --git a/models/descriptions/amount_sent.md b/models/descriptions/amount_sent.md new file mode 100644 index 0000000..b6f363d --- /dev/null +++ b/models/descriptions/amount_sent.md @@ -0,0 +1,5 @@ +{% docs amount_sent %} + +The amount that the user sent in a transfer. + +{% enddocs %} \ No newline at end of file diff --git a/models/descriptions/currency_sent.md b/models/descriptions/currency_sent.md new file mode 100644 index 0000000..5a34fb5 --- /dev/null +++ b/models/descriptions/currency_sent.md @@ -0,0 +1,5 @@ +{% docs currency_sent %} + +The currency that the user transferred between wallets. + +{% enddocs %} \ No newline at end of file diff --git a/models/descriptions/currency_sent_decimal.md b/models/descriptions/currency_sent_decimal.md new file mode 100644 index 0000000..173c2f5 --- /dev/null +++ b/models/descriptions/currency_sent_decimal.md @@ -0,0 +1,5 @@ +{% docs currency_sent_decimal %} + +Divide the swap_from_amount by POW(10, swap_from_decimal) to get the amount the user transferred. + +{% enddocs %} \ No newline at end of file diff --git a/models/descriptions/sender.md b/models/descriptions/sender.md new file mode 100644 index 0000000..1b55aab --- /dev/null +++ b/models/descriptions/sender.md @@ -0,0 +1,5 @@ +{% docs sender %} + +The wallet address of the individual sent tokens in the transfer. + +{% enddocs %} \ No newline at end of file diff --git a/models/silver/silver__transfers.sql b/models/silver/silver__transfers.sql new file mode 100644 index 0000000..dc148e4 --- /dev/null +++ b/models/silver/silver__transfers.sql @@ -0,0 +1,318 @@ +{{ config( + materialized = 'incremental', + unique_key = "CONCAT_WS('-', tx_id, msg_index, currency_sent)", + incremental_strategy = 'delete+insert', + cluster_by = ['_ingested_at::DATE'], +) }} + +WITH sender AS ( + SELECT + tx_id, + msg_index, + SPLIT_PART( + attribute_value, + '/', + 0 + ) AS sender + FROM + {{ ref('silver__msg_attributes') }} + WHERE + attribute_key = 'acc_seq' + + {% if is_incremental() %} + AND _ingested_at :: DATE >= CURRENT_DATE - 2 + {% endif %} +), + +message_index_ibc AS ( + SELECT + att.tx_id, + MAX(att.msg_index) as max_index + FROM + {{ ref('silver__msg_attributes') }} att + + INNER JOIN sender s + ON att.tx_id = s.tx_id + + WHERE + msg_type = 'coin_spent' OR msg_type = 'transfer' + AND + attribute_key = 'amount' + AND + att.msg_index > s.msg_index + + {% if is_incremental() %} + AND _ingested_at :: DATE >= CURRENT_DATE - 2 + {% endif %} + + GROUP BY + att.tx_id +), + +coin_sent_ibc AS ( + SELECT + a.tx_id, + COALESCE( + SPLIT_PART( + TRIM( + REGEXP_REPLACE( + attribute_value, + '[^[:digit:]]', + ' ' + ) + ), + ' ', + 0 + ), + TRY_PARSE_JSON(attribute_value):amount + ) AS amount_sent, + COALESCE( + RIGHT(attribute_value, LENGTH(attribute_value) - LENGTH(SPLIT_PART(TRIM(REGEXP_REPLACE(attribute_value, '[^[:digit:]]', ' ')), ' ', 0))), + TRY_PARSE_JSON(attribute_value)[1]:denom ) + AS currency_sent, + l.raw_metadata [1] :exponent AS currency_sent_decimal + + FROM {{ ref('silver__msg_attributes') }} a + + LEFT OUTER JOIN message_index_ibc m + ON a.tx_id = m.tx_id + + LEFT OUTER JOIN {{ ref('silver__asset_metadata') }} l + ON RIGHT(attribute_value, LENGTH(attribute_value) - LENGTH(SPLIT_PART(TRIM(REGEXP_REPLACE(attribute_value, '[^[:digit:]]', ' ')), ' ', 0))) = l.address + + WHERE a.msg_index = m.max_index + AND a.attribute_key = 'amount' + + {% if is_incremental() %} + AND _ingested_at :: DATE >= CURRENT_DATE - 2 + {% endif %} + +), + +receiver_ibc AS ( + SELECT + tx_id, + COALESCE( + attribute_value, + TRY_PARSE_JSON(attribute_value):receiver + ) AS receiver, + MAX(msg_index) AS msg_index + FROM + {{ ref('silver__msg_attributes') }} + WHERE + msg_type = 'ibc_transfer' + AND + attribute_key = 'receiver' + + {% if is_incremental() %} + AND _ingested_at :: DATE >= CURRENT_DATE - 2 + {% endif %} + + GROUP BY tx_id, receiver +), + +osmo_tx_ids AS ( + SELECT + DISTINCT tx_id + FROM {{ ref('silver__msg_attributes') }} + WHERE + msg_type = 'message' + AND + attribute_key = 'module' + AND + attribute_value = 'bank' + + {% if is_incremental() %} + AND _ingested_at :: DATE >= CURRENT_DATE - 2 + {% endif %} +), + +message_indexes_osmo AS ( + SELECT + v.tx_id, + attribute_key, + m.msg_index + FROM + osmo_tx_ids v + + LEFT OUTER JOIN {{ ref('silver__msg_attributes') }} m + ON v.tx_id = m.tx_id + + INNER JOIN sender s + ON v.tx_id = s.tx_id + + WHERE + msg_type = 'transfer' + AND + attribute_key = 'amount' + AND + m.msg_index > s.msg_index + + {% if is_incremental() %} + AND _ingested_at :: DATE >= CURRENT_DATE - 2 + {% endif %} + +), + +osmo_receiver AS ( + SELECT + o.tx_id, + m.msg_index, + attribute_value as receiver + FROM osmo_tx_ids o + + LEFT OUTER JOIN {{ ref('silver__msg_attributes') }} m + ON o.tx_id = m.tx_id + + LEFT OUTER JOIN message_indexes_osmo idx + ON idx.tx_id = m.tx_id + + + WHERE + m.msg_type = 'transfer' + AND + m.attribute_key = 'recipient' + AND + idx.msg_index = m.msg_index + + {% if is_incremental() %} + AND _ingested_at :: DATE >= CURRENT_DATE - 2 + {% endif %} +), + +osmo_amount AS ( + SELECT + o.tx_id, + m.msg_index, + SPLIT_PART( + TRIM( + REGEXP_REPLACE( + attribute_value, + '[^[:digit:]]', + ' ' + ) + ), + ' ', + 0 + ) AS amount_sent, + RIGHT(attribute_value, LENGTH(attribute_value) - LENGTH(SPLIT_PART(TRIM(REGEXP_REPLACE(attribute_value, '[^[:digit:]]', ' ')), ' ', 0))) AS currency_sent, + l.raw_metadata [1] :exponent AS currency_sent_decimal + FROM osmo_tx_ids o + + LEFT OUTER JOIN {{ ref('silver__msg_attributes') }} m + ON o.tx_id = m.tx_id + + LEFT OUTER JOIN message_indexes_osmo idx + ON idx.tx_id = m.tx_id + + LEFT OUTER JOIN {{ ref('silver__asset_metadata') }} l + ON RIGHT(attribute_value, LENGTH(attribute_value) - LENGTH(SPLIT_PART(TRIM(REGEXP_REPLACE(attribute_value, '[^[:digit:]]', ' ')), ' ', 0))) = l.address + + WHERE + m.msg_type = 'transfer' + AND + m.attribute_key = 'amount' + AND + idx.msg_index = m.msg_index + + {% if is_incremental() %} + AND _ingested_at :: DATE >= CURRENT_DATE - 2 + {% endif %} +) + +SELECT + block_id, + block_timestamp, + blockchain, + chain_id, + r.tx_id, + tx_status, + r.msg_index, + sender, + amount_sent, + currency_sent, + currency_sent_decimal, + receiver, + _ingested_at +FROM receiver_ibc r + +LEFT OUTER JOIN coin_sent_ibc c +ON r.tx_id = c.tx_id + +LEFT OUTER JOIN sender s +ON r.tx_id = s.tx_id + +LEFT OUTER JOIN {{ ref('silver__transactions') }} t +ON r.tx_id = t.tx_id + +{% if is_incremental() %} +AND _ingested_at :: DATE >= CURRENT_DATE - 2 +{% endif %} + +UNION ALL + +SELECT + block_id, + block_timestamp, + blockchain, + chain_id, + r.tx_id, + tx_status, + r.msg_index, + sender, + amount_sent, + currency_sent, + currency_sent_decimal, + receiver, + _ingested_at +FROM osmo_receiver r + +LEFT OUTER JOIN osmo_amount c +ON r.tx_id = c.tx_id +AND r.msg_index = c.msg_index + +LEFT OUTER JOIN sender s +ON r.tx_id = s.tx_id + +LEFT OUTER JOIN {{ ref('silver__transactions') }} t +ON r.tx_id = t.tx_id + +{% if is_incremental() %} +AND _ingested_at :: DATE >= CURRENT_DATE - 2 +{% endif %} + +UNION ALL + +SELECT + m.block_id, + m.block_timestamp, + m.blockchain, + m.chain_id, + s.tx_id, + tx_status, + m.msg_index, + TRY_PARSE_JSON(attribute_value):sender :: STRING AS sender, + TRY_PARSE_JSON(attribute_value):amount :: INTEGER AS amount_sent, + TRY_PARSE_JSON(attribute_value):denom :: STRING currency_sent, + raw_metadata[1]:exponent :: INTEGER AS currency_sent_decimal, + TRY_PARSE_JSON(attribute_value):receiver :: STRING AS receiver, + m._ingested_at + +FROM sender s + +LEFT OUTER JOIN {{ ref('silver__msg_attributes') }} m +ON s.tx_id = m.tx_id + +LEFT OUTER JOIN {{ ref('silver__asset_metadata') }} a +ON TRY_PARSE_JSON(attribute_value):denom :: STRING = COALESCE(raw_metadata[0]:aliases[0] :: STRING, raw_metadata[0]:denom :: STRING) + +LEFT OUTER JOIN {{ ref('silver__transactions') }} t +ON s.tx_id = t.tx_id + +WHERE m.msg_type = 'write_acknowledgement' +AND m.attribute_key = 'packet_data' + +{% if is_incremental() %} +AND m._ingested_at :: DATE >= CURRENT_DATE - 2 +AND t._ingested_at :: DATE >= CURRENT_DATE - 2 +{% endif %} \ No newline at end of file diff --git a/models/silver/silver__transfers.yml b/models/silver/silver__transfers.yml new file mode 100644 index 0000000..9d7c124 --- /dev/null +++ b/models/silver/silver__transfers.yml @@ -0,0 +1,66 @@ +version: 2 +models: + - name: silver__transfers + description: Records of all transfers on Osmosis, including IBC transfers as on- and off-ramps to Osmosis and wallet to wallet transfers + tests: + - dbt_utils.unique_combination_of_columns: + combination_of_columns: + - TX_ID + - MSG_INDEX + - CURRENCY_SENT + columns: + - name: BLOCK_ID + description: "{{ doc('block_id') }}" + tests: + - not_null + - dbt_expectations.expect_column_values_to_be_in_type_list: + column_type_list: + - NUMBER + - FLOAT + - name: BLOCK_TIMESTAMP + description: "{{ doc('block_timestamp') }}" + tests: + - not_null + - dbt_expectations.expect_row_values_to_have_recent_data: + datepart: day + interval: 1 + - dbt_expectations.expect_column_values_to_be_in_type_list: + column_type_list: + - TIMESTAMP_NTZ + - name: BLOCKCHAIN + description: "{{ doc('blockchain') }}" + tests: + - not_null + - name: CHAIN_ID + description: "{{ doc('chain_id') }}" + tests: + - not_null + - name: TX_ID + description: "{{ doc('tx_id') }}" + tests: + - not_null + - name: TX_STATUS + description: "{{ doc('tx_status') }}" + tests: + - not_null + - name: SENDER + description: "{{ doc('sender') }}" + tests: + - not_null + - name: AMOUNT_SENT + description: "{{ doc('amount_sent') }}" + tests: + - not_null + - name: CURRENCY_SENT + description: "{{ doc('currency_sent') }}" + tests: + - not_null + - name: CURRENCY_SENT_DECIMAL + description: "{{ doc('swap_from_decimal') }}" + - name: _INGESTED_AT + description: "{{ doc('ingested_at') }}" + tests: + - not_null + - dbt_expectations.expect_column_values_to_be_in_type_list: + column_type_list: + - TIMESTAMP_NTZ