Merge pull request #351 from FlipsideCrypto/AN-5237-native-transfers

AN-5237/ native transfers fix
This commit is contained in:
Jack Forgash 2024-09-25 14:43:32 -06:00 committed by GitHub
commit 0192fe07b1
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 122 additions and 168 deletions

View File

@ -0,0 +1,5 @@
{% docs tokens_burnt %}
The amount of tokens burnt in a transaction or receipt execution.
{% enddocs %}

View File

@ -71,12 +71,12 @@ xfers AS (
action_id,
block_timestamp,
block_id,
deposit,
amount_unadj :: INT AS deposit,
_partition_by_block_number,
_inserted_timestamp,
modified_timestamp AS _modified_timestamp
FROM
{{ ref('silver__transfers_s3') }}
{{ ref('silver__token_transfer_native') }}
{% if var("MANUAL_FIX") %}
WHERE {{ partition_load_manual('no_buffer') }}

View File

@ -0,0 +1,74 @@
{{ config(
materialized = 'incremental',
merge_exclude_columns = ["inserted_timestamp"],
incremental_predicates = ["COALESCE(DBT_INTERNAL_DEST.block_timestamp::DATE,'2099-12-31') >= (select min(block_timestamp::DATE) from " ~ generate_tmp_view_name(this) ~ ")"],
cluster_by = ['block_timestamp::DATE'],
unique_key = 'token_transfer_native_id',
incremental_strategy = 'merge',
tags = ['curated','scheduled_non_core']
) }}
WITH action_events AS(
SELECT
tx_hash,
block_id,
block_timestamp,
action_id,
action_data :deposit :: STRING AS amount_unadj,
predecessor_id,
receiver_id,
signer_id,
receipt_succeeded,
gas_price,
gas_burnt,
tokens_burnt,
_partition_by_block_number,
_inserted_timestamp,
modified_timestamp AS _modified_timestamp
FROM
{{ ref('silver__actions_events_s3') }}
WHERE
action_name = 'Transfer'
{% if var("MANUAL_FIX") %}
AND {{ partition_load_manual('no_buffer') }}
{% else %}
{% if is_incremental() %}
AND _modified_timestamp >= (
SELECT
MAX(modified_timestamp)
FROM
{{ this }}
)
{% endif %}
{% endif %}
)
SELECT
tx_hash,
block_id,
block_timestamp,
action_id,
amount_unadj,
amount_unadj :: DOUBLE / pow(
10,
24
) AS amount_adj,
predecessor_id,
receiver_id,
signer_id,
receipt_succeeded,
gas_price,
gas_burnt,
tokens_burnt,
_partition_by_block_number,
_inserted_timestamp,
{{ dbt_utils.generate_surrogate_key(
['action_id', 'predecessor_id', 'receiver_id', 'amount_unadj']
) }} AS token_transfer_native_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp,
'{{ invocation_id }}' AS _invocation_id
FROM
action_events

View File

@ -1,19 +1,25 @@
version: 2
models:
- name: silver__transfers_s3
- name: silver__token_transfer_native
description: |-
This table records all the Transfer actions of the Near blockchain.
columns:
- name: TX_HASH
description: "{{ doc('tx_hash')}}"
tests:
- not_null
- name: ACTION_ID
description: "{{ doc('action_id')}}"
tests:
- not_null
- name: BLOCK_ID
description: "{{ doc('block_id')}}"
tests:
- not_null
- name: BLOCK_TIMESTAMP
description: "{{ doc('block_timestamp')}}"
@ -21,47 +27,50 @@ models:
- not_null:
where: _inserted_timestamp <= current_timestamp - interval '1 hour'
- name: TX_SIGNER
description: "{{ doc('tx_signer')}}"
- name: AMOUNT_UNADJ
description: "{{ doc('amount_unadj')}}"
tests:
- not_null
- name: TX_RECEIVER
description: "{{ doc('tx_receiver')}}"
- name: AMOUNT_ADJ
description: "{{ doc('amount_adj')}}"
tests:
- not_null
- name: PREDECESSOR_ID
description: "{{ doc('predecessor_id')}}"
tests:
- not_null
- name: SIGNER_ID
description: "{{ doc('signer_id')}}"
tests:
- not_null
- name: RECEIVER_ID
description: "{{ doc('receiver_id')}}"
- name: DEPOSIT
description: "{{ doc('deposit')}}"
- name: TRANSACTION_FEE
description: "{{ doc('transaction_fee')}}"
tests:
- not_null:
where: _inserted_timestamp <= current_timestamp - interval '1 hour'
- not_null
- name: GAS_USED
description: "{{ doc('gas_used')}}"
tests:
- not_null:
where: _inserted_timestamp <= current_timestamp - interval '1 hour'
- name: TX_SUCCEEDED
description: "{{ doc('tx_succeeded')}}"
- name: RECEIPT_SUCCEEDED
description: "{{ doc('receipt_succeeded')}}"
- name: STATUS
description: "{{ doc('status')}}"
tests:
- not_null:
where: _inserted_timestamp <= current_timestamp - interval '1 hour'
- not_null
- name: GAS_PRICE
description: "{{ doc('gas_price')}}"
tests:
- not_null
- name: GAS_BURNT
description: "{{ doc('gas_burnt')}}"
tests:
- not_null
- name: TOKENS_BURNT
description: "{{ doc('tokens_burnt')}}"
tests:
- not_null
- name: _PARTITION_BY_BLOCK_NUMBER
description: "{{ doc('_partition_by_block_number')}}"
@ -72,7 +81,7 @@ models:
- name: _MODIFIED_TIMESTAMP
description: "{{ doc('_modified_timestamp')}}"
- name: TRANSFERS_ID
- name: TOKEN_TRANSFER_NATIVE_ID
description: "{{doc('id')}}"
tests:
- not_null

View File

@ -1,134 +0,0 @@
{{ config(
materialized = 'incremental',
merge_exclude_columns = ["inserted_timestamp"],
incremental_predicates = ["COALESCE(DBT_INTERNAL_DEST.block_timestamp::DATE,'2099-12-31') >= (select min(block_timestamp::DATE) from " ~ generate_tmp_view_name(this) ~ ")"],
cluster_by = ['block_timestamp::DATE'],
unique_key = 'action_id',
incremental_strategy = 'merge',
post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION ON EQUALITY(tx_hash,action_id,predecessor_id,receiver_id);",
tags = ['curated','scheduled_non_core']
) }}
{# Note - multisource model #}
WITH action_events AS(
SELECT
tx_hash,
block_id,
block_timestamp,
action_id,
action_data :deposit :: INT AS deposit,
predecessor_id,
receiver_id,
signer_id,
receipt_succeeded,
gas_price,
gas_burnt,
tokens_burnt,
_partition_by_block_number,
_inserted_timestamp,
modified_timestamp AS _modified_timestamp
FROM
{{ ref('silver__actions_events_s3') }}
WHERE
action_name = 'Transfer'
{% if var("MANUAL_FIX") %}
AND {{ partition_load_manual('no_buffer') }}
{% else %}
{% if is_incremental() %}
AND _modified_timestamp >= (
SELECT
MAX(_modified_timestamp)
FROM
{{ this }}
)
{% endif %}
{% endif %}
),
txs AS (
SELECT
tx_hash,
tx :receipt :: ARRAY AS tx_receipt,
block_id,
block_timestamp,
tx_receiver,
tx_signer,
transaction_fee,
gas_used,
tx_succeeded,
_partition_by_block_number,
_inserted_timestamp,
modified_timestamp AS _modified_timestamp
FROM
{{ ref('silver__streamline_transactions_final') }}
{% if var("MANUAL_FIX") %}
WHERE {{ partition_load_manual('no_buffer') }}
{% else %}
{% if is_incremental() %}
WHERE _modified_timestamp >= (
SELECT
MAX(_modified_timestamp)
FROM
{{ this }}
)
{% endif %}
{% endif %}
),
actions AS (
SELECT
A.tx_hash,
A.action_id,
A.block_id,
A.block_timestamp,
t.tx_signer,
t.tx_receiver,
A.predecessor_id,
A.receiver_id,
A.signer_id,
A.deposit,
t.transaction_fee,
A.gas_burnt AS gas_used,
A.receipt_succeeded,
t.tx_succeeded,
A._partition_by_block_number,
A._inserted_timestamp,
A._modified_timestamp
FROM
action_events A
INNER JOIN txs t
ON A.tx_hash = t.tx_hash
),
FINAL AS (
SELECT
block_id,
block_timestamp,
action_id,
deposit,
tx_hash,
tx_signer,
tx_receiver,
predecessor_id,
signer_id,
receiver_id,
transaction_fee,
gas_used,
tx_succeeded,
receipt_succeeded,
ARRAY_MIN([tx_succeeded, receipt_succeeded]) :: BOOLEAN AS status,
_partition_by_block_number,
_inserted_timestamp,
_modified_timestamp
FROM
actions
)
SELECT
*,
{{ dbt_utils.generate_surrogate_key(
['action_id']
) }} AS transfers_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp,
'{{ invocation_id }}' AS _invocation_id
FROM
FINAL

View File

@ -21,15 +21,15 @@ WITH native_transfers AS (
predecessor_id AS from_address,
receiver_id AS to_address,
NULL AS memo,
IFF(REGEXP_LIKE(deposit, '^[0-9]+$'), deposit, NULL) AS amount_unadj,
amount_unadj,
'native' AS transfer_type,
_inserted_timestamp,
modified_timestamp AS _modified_timestamp,
_partition_by_block_number
FROM
{{ ref('silver__transfers_s3') }}
{{ ref('silver__token_transfer_native') }}
WHERE
status = TRUE AND deposit != 0
receipt_succeeded
{% if var("MANUAL_FIX") %}
AND
{{ partition_load_manual('no_buffer') }}