Merge pull request #345 from FlipsideCrypto/AN-5098-refactor-token-transfer

An 5098 refactor token transfer
This commit is contained in:
Jack Forgash 2024-09-19 13:26:09 -06:00 committed by GitHub
commit 7b452f363c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
16 changed files with 849 additions and 622 deletions

View File

@ -64,6 +64,7 @@ vars:
STREAMLINE_LOAD_LOOKBACK_HOURS: 3
RECEIPT_MAP_LOOKBACK_HOURS: 6
IS_MIGRATION: False
HEAL_MODELS: []
dispatch:
- macro_namespace: dbt

View File

@ -3,3 +3,9 @@
An unadjusted amount (of tokens, price, etc.) for the relevant record. This is the number as it appears on chain and is not decimal adjusted.
{% enddocs %}
{% docs amount_unadj %}
An unadjusted amount (of tokens, price, etc.) for the relevant record. This is the number as it appears on chain and is not decimal adjusted.
{% enddocs %}

View File

@ -1,7 +1,7 @@
{{ config(
materialized = 'incremental',
incremental_predicates = ["COALESCE(DBT_INTERNAL_DEST.block_timestamp::DATE,'2099-12-31') >= (select min(block_timestamp::DATE) from " ~ generate_tmp_view_name(this) ~ ")"],
unique_key = "fact_token_transfers_id",
unique_key = "ez_token_transfers_id",
incremental_strategy = 'merge',
merge_exclude_columns = ["inserted_timestamp"],
cluster_by = ['block_timestamp::DATE'],
@ -32,8 +32,8 @@ SELECT
from_address,
to_address,
memo,
amount_raw,
amount_raw_precise,
amount_unadj :: STRING AS amount_raw,
amount_unadj :: FLOAT AS amount_raw_precise,
IFF(
C.decimals IS NOT NULL,
utils.udf_decimal_adjust(
@ -53,14 +53,11 @@ SELECT
C.symbol AS symbol,
price AS token_price,
transfer_type,
{{ dbt_utils.generate_surrogate_key(
['transfers_id']
) }} AS fact_token_transfers_id,
transfers_complete_id AS ez_token_transfers_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp,
'{{ invocation_id }}' AS _invocation_id
SYSDATE() AS modified_timestamp
FROM
{{ ref('silver__token_transfers') }}
{{ ref('silver__token_transfers_complete') }}
t
LEFT JOIN hourly_prices p
ON t.contract_address = p.token_address

View File

@ -3,6 +3,12 @@ version: 2
models:
- name: core__ez_token_transfers
description: This table records all native token transfers and nep-141
tests:
- dbt_utils.recency:
datepart: hour
field: block_timestamp
interval: 2
columns:
- name: BLOCK_ID
description: "{{ doc('block_id')}}"
@ -51,7 +57,7 @@ models:
- name: HAS_PRICE
description: "Boolean value indicating if the token has a price"
- name: FACT_TOKEN_TRANSFERS_ID
- name: EZ_TOKEN_TRANSFERS_ID
description: "{{doc('id')}}"
tests:
- unique:

View File

@ -1,432 +0,0 @@
{{ config(
materialized = 'incremental',
incremental_predicates = ["COALESCE(DBT_INTERNAL_DEST.block_timestamp::DATE,'2099-12-31') >= (select min(block_timestamp::DATE) from " ~ generate_tmp_view_name(this) ~ ")"],
merge_exclude_columns = ["inserted_timestamp"],
cluster_by = ['block_timestamp::DATE','modified_timestamp::Date'],
unique_key = 'transfers_id',
incremental_strategy = 'merge',
post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION ON EQUALITY(tx_hash,action_id,contract_address,from_address,to_address);",
tags = ['curated','scheduled_non_core']
) }}
{# Note - multisource model #}
-- Curation Challenge - 'https://flipsidecrypto.xyz/Hossein/transfer-sector-of-near-curation-challenge-zgM44F'
WITH actions_events AS (
SELECT
block_id,
block_timestamp,
tx_hash,
action_id,
signer_id,
receiver_id,
action_name,
method_name,
deposit,
logs,
receipt_succeeded,
_inserted_timestamp,
modified_timestamp AS _modified_timestamp,
_partition_by_block_number
FROM
{{ ref('silver__actions_events_function_call_s3') }}
WHERE
receipt_succeeded = TRUE
AND logs [0] IS NOT NULL
AND RIGHT(
action_id,
2
) = '-0' {% if var("MANUAL_FIX") %}
AND {{ partition_load_manual('no_buffer') }}
{% else %}
{% if is_incremental() %}
AND _modified_timestamp >= (
SELECT
MAX(_modified_timestamp)
FROM
{{ this }}
WHERE
transfer_type = 'nep141'
)
{% endif %}
{% endif %}
),
---------------------------- Native Token Transfers ------------------------------
native_transfers AS (
SELECT
block_id,
block_timestamp,
tx_hash,
action_id,
predecessor_id AS from_address,
receiver_id AS to_address,
IFF(REGEXP_LIKE(deposit, '^[0-9]+$'), deposit, NULL) AS amount_unadjusted,
--numeric validation (there are some exceptions that needs to be ignored)
receipt_succeeded,
_inserted_timestamp,
modified_timestamp AS _modified_timestamp,
_partition_by_block_number
FROM
{{ ref('silver__transfers_s3') }}
WHERE
status = TRUE
AND deposit != 0 {% if var("MANUAL_FIX") %}
AND {{ partition_load_manual('no_buffer') }}
{% else %}
{% if is_incremental() %}
AND _modified_timestamp >= (
SELECT
MAX(_modified_timestamp)
FROM
{{ this }}
WHERE
transfer_type = 'native'
)
{% endif %}
{% endif %}
),
------------------------------ NEAR Tokens (NEP 141) --------------------------------
orders AS (
SELECT
block_id,
block_timestamp,
tx_hash,
action_id,
receiver_id,
TRY_PARSE_JSON(REPLACE(g.value, 'EVENT_JSON:')) AS DATA,
DATA :event :: STRING AS event,
g.index AS rn,
_inserted_timestamp,
_modified_timestamp,
_partition_by_block_number
FROM
actions_events
JOIN LATERAL FLATTEN(
input => logs
) g
WHERE
DATA :event :: STRING = 'order_added'
),
orders_final AS (
SELECT
block_id,
block_timestamp,
tx_hash,
action_id,
f.value :sell_token :: STRING AS contract_address,
f.value :owner_id :: STRING AS from_address,
receiver_id :: STRING AS to_address,
(
f.value :original_amount
) :: variant AS amount_unadjusted,
'order' AS memo,
f.index AS rn,
_inserted_timestamp,
_modified_timestamp,
_partition_by_block_number
FROM
orders
JOIN LATERAL FLATTEN(
input => DATA :data
) f
WHERE
amount_unadjusted > 0
),
add_liquidity AS (
SELECT
block_id,
block_timestamp,
tx_hash,
action_id,
REGEXP_SUBSTR(
SPLIT.value,
'"\\d+ ([^"]*)["]',
1,
1,
'e',
1
) :: STRING AS contract_address,
NULL AS from_address,
receiver_id AS to_address,
REGEXP_SUBSTR(
SPLIT.value,
'"(\\d+) ',
1,
1,
'e',
1
) :: variant AS amount_unadjusted,
'add_liquidity' AS memo,
INDEX AS rn,
_inserted_timestamp,
_modified_timestamp,
_partition_by_block_number
FROM
actions_events,
LATERAL FLATTEN (
input => SPLIT(
REGEXP_SUBSTR(
logs [0],
'\\["(.*?)"\\]'
),
','
)
) SPLIT
WHERE
logs [0] LIKE 'Liquidity added [%minted % shares'
),
ft_transfers_method AS (
SELECT
block_id,
block_timestamp,
tx_hash,
action_id,
receiver_id AS contract_address,
REGEXP_SUBSTR(
VALUE,
'from ([^ ]+)',
1,
1,
'',
1
) :: STRING AS from_address,
REGEXP_SUBSTR(
VALUE,
'to ([^ ]+)',
1,
1,
'',
1
) :: STRING AS to_address,
REGEXP_SUBSTR(
VALUE,
'\\d+'
) :: variant AS amount_unadjusted,
'' AS memo,
b.index AS rn,
_inserted_timestamp,
_modified_timestamp,
_partition_by_block_number
FROM
actions_events
JOIN LATERAL FLATTEN(
input => logs
) b
WHERE
method_name = 'ft_transfer'
AND from_address IS NOT NULL
AND to_address IS NOT NULL
AND amount_unadjusted IS NOT NULL
),
ft_transfers_event AS (
SELECT
block_id,
block_timestamp,
tx_hash,
action_id,
TRY_PARSE_JSON(REPLACE(VALUE, 'EVENT_JSON:')) AS DATA,
b.index AS logs_rn,
receiver_id AS contract_address,
_inserted_timestamp,
_modified_timestamp,
_partition_by_block_number
FROM
actions_events
JOIN LATERAL FLATTEN(
input => logs
) b
WHERE
DATA :event :: STRING IN (
'ft_transfer'
)
),
ft_transfers_final AS (
SELECT
block_id,
block_timestamp,
tx_hash,
action_id,
contract_address,
NVL(
f.value :old_owner_id,
NULL
) :: STRING AS from_address,
NVL(
f.value :new_owner_id,
f.value :owner_id
) :: STRING AS to_address,
f.value :amount :: variant AS amount_unadjusted,
f.value :memo :: STRING AS memo,
logs_rn + f.index AS rn,
_inserted_timestamp,
_modified_timestamp,
_partition_by_block_number
FROM
ft_transfers_event
JOIN LATERAL FLATTEN(
input => DATA :data
) f
WHERE
amount_unadjusted > 0
),
ft_mints AS (
SELECT
block_id,
block_timestamp,
tx_hash,
action_id,
TRY_PARSE_JSON(REPLACE(VALUE, 'EVENT_JSON:')) AS DATA,
b.index AS logs_rn,
receiver_id AS contract_address,
_inserted_timestamp,
_modified_timestamp,
_partition_by_block_number
FROM
actions_events
JOIN LATERAL FLATTEN(
input => logs
) b
WHERE
DATA :event :: STRING IN (
'ft_mint'
)
),
ft_mints_final AS (
SELECT
block_id,
block_timestamp,
tx_hash,
action_id,
contract_address,
NVL(
f.value :old_owner_id,
NULL
) :: STRING AS from_address,
NVL(
f.value :new_owner_id,
f.value :owner_id
) :: STRING AS to_address,
f.value :amount :: variant AS amount_unadjusted,
f.value :memo :: STRING AS memo,
logs_rn + f.index AS rn,
_inserted_timestamp,
_modified_timestamp,
_partition_by_block_number
FROM
ft_mints
JOIN LATERAL FLATTEN(
input => DATA :data
) f
WHERE
amount_unadjusted > 0
),
nep_transfers AS (
SELECT
*
FROM
ft_transfers_method
UNION ALL
SELECT
*
FROM
ft_transfers_final
UNION ALL
SELECT
*
FROM
ft_mints_final
UNION ALL
SELECT
*
FROM
orders_final
UNION ALL
SELECT
*
FROM
add_liquidity
),
------------------------------ MODELS --------------------------------
native_final AS (
SELECT
block_id,
block_timestamp,
tx_hash,
action_id,
'wrap.near' AS contract_address,
from_address :: STRING,
to_address :: STRING,
NULL AS memo,
'0' AS rn,
'native' AS transfer_type,
amount_unadjusted :: STRING AS amount_raw,
amount_unadjusted :: FLOAT AS amount_raw_precise,
_inserted_timestamp,
_modified_timestamp,
_partition_by_block_number
FROM
native_transfers
),
nep_final AS (
SELECT
block_id,
block_timestamp,
tx_hash,
action_id,
contract_address,
from_address,
to_address,
memo,
rn :: STRING AS rn,
'nep141' AS transfer_type,
amount_unadjusted :: STRING AS amount_raw,
amount_unadjusted :: FLOAT AS amount_raw_precise,
_inserted_timestamp,
_modified_timestamp,
_partition_by_block_number
FROM
nep_transfers
),
------------------------------ FINAL --------------------------------
transfer_union AS (
SELECT
*
FROM
nep_final
UNION ALL
SELECT
*
FROM
native_final
),
FINAL AS (
SELECT
block_id,
block_timestamp,
tx_hash,
action_id,
rn,
contract_address,
from_address,
to_address,
memo,
amount_raw,
amount_raw_precise,
transfer_type,
_inserted_timestamp,
_modified_timestamp,
_partition_by_block_number
FROM
transfer_union
)
SELECT
*,
{{ dbt_utils.generate_surrogate_key(
['tx_hash', 'action_id','contract_address','amount_raw','from_address','to_address','memo','rn']
) }} AS transfers_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp,
'{{ invocation_id }}' AS _invocation_id
FROM
FINAL

View File

@ -1,157 +0,0 @@
{{ config(
materialized = 'incremental',
merge_exclude_columns = ["inserted_timestamp"],
cluster_by = ['block_timestamp::DATE'],
unique_key = 'action_id',
incremental_strategy = 'merge',
tags = ['curated','scheduled_non_core'],
enabled = False
) }}
{# TODO - test and apply, where applicable.
Model disabled but will keep in repo for ref #}
{# Note - multisource model #}
{% if execute %}
{% if is_incremental() %}
{% set max_modified_query %}
SELECT
MAX(_modified_timestamp) AS _modified_timestamp
FROM
{{ this }}
{% endset %}
{% set max_modified_timestamp = run_query(max_modified_query).columns [0].values() [0] %}
{% endif %}
{% set query = """ CREATE OR REPLACE TEMPORARY TABLE silver.transfers_s3_temp_dates as SELECT MIN(COALESCE(b._modified_timestamp,'2050-01-01') ) as _modified_timestamp FROM """ ~ ref('silver__actions_events_s3') ~ """ a join """ ~ ref('silver__streamline_transactions_final') ~ """ b ON A.tx_hash = b.tx_hash WHERE a.action_name = 'Transfer' """ %}
{% set incr = "" %}
{% if is_incremental() %}
{% set incr = """ AND GREATEST(a._modified_timestamp,b._modified_timestamp) >= '""" ~ max_modified_timestamp ~ """' """ %}
{% endif %}
{% do run_query(
query ~ incr
) %}
{% endif %}
WITH action_events AS(
SELECT
tx_hash,
block_id,
block_timestamp,
action_id,
action_data :deposit :: INT AS deposit,
predecessor_id,
receiver_id,
signer_id,
receipt_succeeded,
gas_price,
gas_burnt,
tokens_burnt,
_partition_by_block_number,
_inserted_timestamp,
modified_timestamp AS _modified_timestamp
FROM
{{ ref('silver__actions_events_s3') }}
WHERE
action_name = 'Transfer' {% if var("MANUAL_FIX") %}
AND {{ partition_load_manual('no_buffer') }}
{% else %}
{% if is_incremental() %}
AND _modified_timestamp >= '{{ max_modified_timestamp }}'
{% endif %}
{% endif %}
),
txs AS (
SELECT
tx_hash,
tx :receipt :: ARRAY AS tx_receipt,
block_id,
block_timestamp,
tx_receiver,
tx_signer,
transaction_fee,
gas_used,
tx_succeeded,
_partition_by_block_number,
_inserted_timestamp,
modified_timestamp AS _modified_timestamp
FROM
{{ ref('silver__streamline_transactions_final') }}
{% if var("MANUAL_FIX") %}
WHERE
{{ partition_load_manual('no_buffer') }}
{% else %}
{% if is_incremental() %}
WHERE
_modified_timestamp >= (
SELECT
MAX(_modified_timestamp)
FROM
silver.transfers_s3_temp_dates
)
{% endif %}
{% endif %}
),
actions AS (
SELECT
A.tx_hash,
A.action_id,
A.block_id,
A.block_timestamp,
t.tx_signer,
t.tx_receiver,
A.predecessor_id,
A.receiver_id,
A.signer_id,
A.deposit,
t.transaction_fee,
A.gas_burnt AS gas_used,
A.receipt_succeeded,
t.tx_succeeded,
A._partition_by_block_number,
A._inserted_timestamp,
A._modified_timestamp
FROM
action_events A
INNER JOIN txs t
ON A.tx_hash = t.tx_hash
),
FINAL AS (
SELECT
block_id,
block_timestamp,
action_id,
deposit,
tx_hash,
tx_signer,
tx_receiver,
predecessor_id,
signer_id,
receiver_id,
transaction_fee,
gas_used,
tx_succeeded,
receipt_succeeded,
array_min([tx_succeeded, receipt_succeeded]) :: BOOLEAN AS status,
_partition_by_block_number,
_inserted_timestamp,
_modified_timestamp
FROM
actions
)
SELECT
*,
{{ dbt_utils.generate_surrogate_key(
['action_id']
) }} AS transfers_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp,
'{{ invocation_id }}' AS _invocation_id
FROM
FINAL

View File

@ -5,7 +5,7 @@
cluster_by = ['block_timestamp::DATE'],
unique_key = 'action_id',
incremental_strategy = 'merge',
post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION ON EQUALITY(tx_hash,action_id, tx_hash,tx_signer,tx_receiver,predecessor_id,signer_id,receiver_id);",
post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION ON EQUALITY(tx_hash,action_id,predecessor_id,receiver_id);",
tags = ['curated','scheduled_non_core']
) }}
{# Note - multisource model #}

View File

@ -74,6 +74,9 @@ models:
- name: TRANSFERS_ID
description: "{{doc('id')}}"
tests:
- not_null
- unique
- name: INSERTED_TIMESTAMP
description: "{{doc('inserted_timestamp')}}"

View File

@ -0,0 +1,58 @@
{{ config(
materialized = 'incremental',
incremental_predicates = ["COALESCE(DBT_INTERNAL_DEST.block_timestamp::DATE,'2099-12-31') >= (select min(block_timestamp::DATE) from " ~ generate_tmp_view_name(this) ~ ")"],
merge_exclude_columns = ["inserted_timestamp"],
cluster_by = ['block_timestamp::DATE','_modified_timestamp::Date'],
unique_key = 'transfers_base_id',
incremental_strategy = 'merge',
tags = ['curated','scheduled_non_core']
) }}
WITH nep141 AS (
SELECT
block_id,
block_timestamp,
tx_hash,
action_id,
predecessor_id,
signer_id,
receiver_id,
action_name,
method_name,
deposit,
logs,
receipt_succeeded,
_inserted_timestamp,
modified_timestamp AS _modified_timestamp,
_partition_by_block_number
FROM
{{ ref('silver__actions_events_function_call_s3') }}
WHERE
receipt_succeeded = TRUE
AND logs [0] IS NOT NULL
AND RIGHT(
action_id,
2
) = '-0'
{% if var("MANUAL_FIX") %}
AND {{ partition_load_manual('no_buffer') }}
{% elif is_incremental() %}
AND _modified_timestamp >= (
SELECT
MAX(_modified_timestamp)
FROM
{{ this }}
)
{% endif %}
)
SELECT
*,
{{ dbt_utils.generate_surrogate_key(
['action_id']
) }} AS transfers_base_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp,
'{{ invocation_id }}' AS _invocation_id
FROM
nep141

View File

@ -0,0 +1,102 @@
{{ config(
materialized = 'incremental',
incremental_predicates = ["COALESCE(DBT_INTERNAL_DEST.block_timestamp::DATE,'2099-12-31') >= (select min(block_timestamp::DATE) from " ~ generate_tmp_view_name(this) ~ ")"],
merge_exclude_columns = ["inserted_timestamp"],
cluster_by = ['block_timestamp::DATE','_modified_timestamp::Date'],
unique_key = 'transfers_event_id',
incremental_strategy = 'merge',
tags = ['curated','scheduled_non_core']
) }}
WITH actions_events AS (
SELECT
block_id,
block_timestamp,
tx_hash,
action_id,
signer_id,
receiver_id,
action_name,
method_name,
deposit,
logs,
receipt_succeeded,
_inserted_timestamp,
modified_timestamp as _modified_timestamp,
_partition_by_block_number
FROM
{{ ref('silver__token_transfer_base') }}
{% if var("MANUAL_FIX") %}
WHERE {{ partition_load_manual('no_buffer') }}
{% elif is_incremental() %}
WHERE _modified_timestamp >= (
SELECT
MAX(_modified_timestamp)
FROM
{{ this }}
)
{% endif %}
),
ft_transfers_event AS (
SELECT
block_id,
block_timestamp,
tx_hash,
action_id,
TRY_PARSE_JSON(REPLACE(VALUE, 'EVENT_JSON:')) AS DATA,
b.index AS logs_rn,
receiver_id AS contract_address,
_inserted_timestamp,
_modified_timestamp,
_partition_by_block_number
FROM
actions_events
JOIN LATERAL FLATTEN(
input => logs
) b
WHERE
DATA :event :: STRING IN (
'ft_transfer'
)
),
ft_transfers_final AS (
SELECT
block_id,
block_timestamp,
tx_hash,
action_id,
contract_address,
NVL(
f.value :old_owner_id,
NULL
) :: STRING AS from_address,
NVL(
f.value :new_owner_id,
f.value :owner_id
) :: STRING AS to_address,
f.value :amount :: variant AS amount_unadj,
f.value :memo :: STRING AS memo,
logs_rn + f.index AS rn,
_inserted_timestamp,
_modified_timestamp,
_partition_by_block_number
FROM
ft_transfers_event
JOIN LATERAL FLATTEN(
input => DATA :data
) f
WHERE
amount_unadj > 0
)
SELECT
*,
{{ dbt_utils.generate_surrogate_key(
['action_id']
) }} AS transfers_event_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp,
'{{ invocation_id }}' AS _invocation_id
FROM
ft_transfers_final

View File

@ -0,0 +1,93 @@
{{ config(
materialized = 'incremental',
incremental_predicates = ["COALESCE(DBT_INTERNAL_DEST.block_timestamp::DATE,'2099-12-31') >= (select min(block_timestamp::DATE) from " ~ generate_tmp_view_name(this) ~ ")"],
merge_exclude_columns = ["inserted_timestamp"],
cluster_by = ['block_timestamp::DATE','_modified_timestamp::Date'],
unique_key = 'transfers_id',
incremental_strategy = 'merge',
tags = ['curated','scheduled_non_core']
) }}
WITH actions_events AS (
SELECT
block_id,
block_timestamp,
tx_hash,
action_id,
signer_id,
receiver_id,
action_name,
method_name,
deposit,
logs,
receipt_succeeded,
_inserted_timestamp,
modified_timestamp as _modified_timestamp,
_partition_by_block_number
FROM
{{ ref('silver__token_transfer_base') }}
{% if var("MANUAL_FIX") %}
WHERE {{ partition_load_manual('no_buffer') }}
{% elif is_incremental() %}
WHERE _modified_timestamp >= (
SELECT
MAX(_modified_timestamp)
FROM
{{ this }}
)
{% endif %}
),
ft_transfers_method AS (
SELECT
block_id,
block_timestamp,
tx_hash,
action_id,
receiver_id AS contract_address,
REGEXP_SUBSTR(
VALUE,
'from ([^ ]+)',
1,
1,
'',
1
) :: STRING AS from_address,
REGEXP_SUBSTR(
VALUE,
'to ([^ ]+)',
1,
1,
'',
1
) :: STRING AS to_address,
REGEXP_SUBSTR(
VALUE,
'\\d+'
) :: variant AS amount_unadj,
'' AS memo,
b.index AS rn,
_inserted_timestamp,
_modified_timestamp,
_partition_by_block_number
FROM
actions_events
JOIN LATERAL FLATTEN(
input => logs
) b
WHERE
method_name = 'ft_transfer'
AND from_address IS NOT NULL
AND to_address IS NOT NULL
AND amount_unadj IS NOT NULL
)
SELECT
*,
{{ dbt_utils.generate_surrogate_key(
['action_id']
) }} AS transfers_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp,
'{{ invocation_id }}' AS _invocation_id
FROM
ft_transfers_method

View File

@ -0,0 +1,95 @@
{{ config(
materialized = 'incremental',
incremental_predicates = ["COALESCE(DBT_INTERNAL_DEST.block_timestamp::DATE,'2099-12-31') >= (select min(block_timestamp::DATE) from " ~ generate_tmp_view_name(this) ~ ")"],
merge_exclude_columns = ["inserted_timestamp"],
cluster_by = ['block_timestamp::DATE','_modified_timestamp::Date'],
unique_key = 'transfers_liquidity_id',
incremental_strategy = 'merge',
tags = ['curated','scheduled_non_core']
) }}
WITH actions_events AS (
SELECT
block_id,
block_timestamp,
tx_hash,
action_id,
signer_id,
receiver_id,
action_name,
method_name,
deposit,
logs,
receipt_succeeded,
_inserted_timestamp,
modified_timestamp as _modified_timestamp,
_partition_by_block_number
FROM
{{ ref('silver__token_transfer_base') }}
{% if var("MANUAL_FIX") %}
WHERE {{ partition_load_manual('no_buffer') }}
{% elif is_incremental() %}
WHERE _modified_timestamp >= (
SELECT
MAX(_modified_timestamp)
FROM
{{ this }}
)
{% endif %}
),
add_liquidity AS (
SELECT
block_id,
block_timestamp,
tx_hash,
action_id,
REGEXP_SUBSTR(
SPLIT.value,
'"\\d+ ([^"]*)["]',
1,
1,
'e',
1
) :: STRING AS contract_address,
NULL AS from_address,
receiver_id AS to_address,
REGEXP_SUBSTR(
SPLIT.value,
'"(\\d+) ',
1,
1,
'e',
1
) :: variant AS amount_unadj,
'add_liquidity' AS memo,
INDEX AS rn,
_inserted_timestamp,
_modified_timestamp,
_partition_by_block_number
FROM
actions_events,
LATERAL FLATTEN (
input => SPLIT(
REGEXP_SUBSTR(
logs [0],
'\\["(.*?)"\\]'
),
','
)
) SPLIT
WHERE
logs [0] LIKE 'Liquidity added [%minted % shares'
)
SELECT
*,
{{ dbt_utils.generate_surrogate_key(
['action_id']
) }} AS transfers_liquidity_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp,
'{{ invocation_id }}' AS _invocation_id
FROM
add_liquidity

View File

@ -0,0 +1,104 @@
{{ config(
materialized = 'incremental',
incremental_predicates = ["COALESCE(DBT_INTERNAL_DEST.block_timestamp::DATE,'2099-12-31') >= (select min(block_timestamp::DATE) from " ~ generate_tmp_view_name(this) ~ ")"],
merge_exclude_columns = ["inserted_timestamp"],
cluster_by = ['block_timestamp::DATE','_modified_timestamp::Date'],
unique_key = 'mint_id',
incremental_strategy = 'merge',
tags = ['curated','scheduled_non_core']
) }}
WITH actions_events AS (
SELECT
block_id,
block_timestamp,
tx_hash,
action_id,
signer_id,
receiver_id,
action_name,
method_name,
deposit,
logs,
receipt_succeeded,
_inserted_timestamp,
modified_timestamp as _modified_timestamp,
_partition_by_block_number
FROM
{{ ref('silver__token_transfer_base') }}
{% if var("MANUAL_FIX") %}
WHERE {{ partition_load_manual('no_buffer') }}
{% elif is_incremental() %}
WHERE _modified_timestamp >= (
SELECT
MAX(_modified_timestamp)
FROM
{{ this }}
)
{% endif %}
)
,
ft_mints AS (
SELECT
block_id,
block_timestamp,
tx_hash,
action_id,
TRY_PARSE_JSON(REPLACE(VALUE, 'EVENT_JSON:')) AS DATA,
b.index AS logs_rn,
receiver_id AS contract_address,
_inserted_timestamp,
_modified_timestamp,
_partition_by_block_number
FROM
actions_events
JOIN LATERAL FLATTEN(
input => logs
) b
WHERE
DATA :event :: STRING IN (
'ft_mint'
)
),
ft_mints_final AS (
SELECT
block_id,
block_timestamp,
tx_hash,
action_id,
contract_address,
NVL(
f.value :old_owner_id,
NULL
) :: STRING AS from_address,
NVL(
f.value :new_owner_id,
f.value :owner_id
) :: STRING AS to_address,
f.value :amount :: variant AS amount_unadj,
f.value :memo :: STRING AS memo,
logs_rn + f.index AS rn,
_inserted_timestamp,
_modified_timestamp,
_partition_by_block_number
FROM
ft_mints
JOIN LATERAL FLATTEN(
input => DATA :data
) f
WHERE
amount_unadj > 0
)
SELECT
*,
{{ dbt_utils.generate_surrogate_key(
['action_id','rn']
)}} AS mint_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp,
'{{ invocation_id }}' AS _invocation_id
FROM
ft_mints_final
WHERE
mint_id IS NOT NULL

View File

@ -0,0 +1,96 @@
{{ config(
materialized = 'incremental',
incremental_predicates = ["COALESCE(DBT_INTERNAL_DEST.block_timestamp::DATE,'2099-12-31') >= (select min(block_timestamp::DATE) from " ~ generate_tmp_view_name(this) ~ ")"],
merge_exclude_columns = ["inserted_timestamp"],
cluster_by = ['block_timestamp::DATE','_modified_timestamp::Date'],
unique_key = 'transfers_orders_id',
incremental_strategy = 'merge',
tags = ['curated','scheduled_non_core']
) }}
WITH actions_events AS (
SELECT
block_id,
block_timestamp,
tx_hash,
action_id,
signer_id,
receiver_id,
action_name,
method_name,
deposit,
logs,
receipt_succeeded,
_inserted_timestamp,
modified_timestamp as _modified_timestamp,
_partition_by_block_number
FROM
{{ ref('silver__token_transfer_base') }}
{% if var("MANUAL_FIX") %}
WHERE {{ partition_load_manual('no_buffer') }}
{% elif is_incremental() %}
WHERE _modified_timestamp >= (
SELECT
MAX(_modified_timestamp)
FROM
{{ this }}
)
{% endif %}
),
orders AS (
SELECT
block_id,
block_timestamp,
tx_hash,
action_id,
receiver_id,
TRY_PARSE_JSON(REPLACE(g.value, 'EVENT_JSON:')) AS DATA,
DATA :event :: STRING AS event,
g.index AS rn,
_inserted_timestamp,
_modified_timestamp,
_partition_by_block_number
FROM
actions_events
JOIN LATERAL FLATTEN(
input => logs
) g
WHERE
DATA :event :: STRING = 'order_added'
),
orders_final AS (
SELECT
block_id,
block_timestamp,
tx_hash,
action_id,
f.value :sell_token :: STRING AS contract_address,
f.value :owner_id :: STRING AS from_address,
receiver_id :: STRING AS to_address,
(
f.value :original_amount
) :: variant AS amount_unadj,
'order' AS memo,
f.index AS rn,
_inserted_timestamp,
_modified_timestamp,
_partition_by_block_number
FROM
orders
JOIN LATERAL FLATTEN(
input => DATA :data
) f
WHERE
amount_unadj > 0
)
SELECT
*,
{{ dbt_utils.generate_surrogate_key(
['action_id']
) }} AS transfers_orders_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp,
'{{ invocation_id }}' AS _invocation_id
FROM
orders_final

View File

@ -0,0 +1,265 @@
{{ config(
materialized = 'incremental',
incremental_predicates = ["COALESCE(DBT_INTERNAL_DEST.block_timestamp::DATE,'2099-12-31') >= (select min(block_timestamp::DATE) from " ~ generate_tmp_view_name(this) ~ ")"],
merge_exclude_columns = ["inserted_timestamp"],
cluster_by = ['block_timestamp::DATE','modified_timestamp::Date'],
unique_key = 'transfers_complete_id',
incremental_strategy = 'merge',
post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION ON EQUALITY(tx_hash,contract_address,from_address,to_address);",
tags = ['curated','scheduled_non_core']
) }}
WITH native_transfers AS (
SELECT
block_id,
block_timestamp,
tx_hash,
action_id,
'0' AS rn,
'wrap.near' AS contract_address,
predecessor_id AS from_address,
receiver_id AS to_address,
NULL AS memo,
IFF(REGEXP_LIKE(deposit, '^[0-9]+$'), deposit, NULL) AS amount_unadj,
'native' AS transfer_type,
_inserted_timestamp,
modified_timestamp AS _modified_timestamp,
_partition_by_block_number
FROM
{{ ref('silver__transfers_s3') }}
WHERE
status = TRUE AND deposit != 0
{% if var("MANUAL_FIX") %}
AND
{{ partition_load_manual('no_buffer') }}
{% elif is_incremental() %}
AND
_modified_timestamp >= (
SELECT
MAX(_modified_timestamp)
FROM
{{ this }}
)
{% endif %}
),
ft_transfers_method AS (
SELECT
block_id,
block_timestamp,
tx_hash,
action_id,
rn,
contract_address,
from_address,
to_address,
memo,
amount_unadj,
'nep141' AS transfer_type,
_inserted_timestamp,
modified_timestamp AS _modified_timestamp,
_partition_by_block_number
FROM
{{ ref('silver__token_transfer_ft_transfers_method') }}
{% if var("MANUAL_FIX") %}
WHERE
{{ partition_load_manual('no_buffer') }}
{% elif is_incremental() %}
WHERE
_modified_timestamp >= (
SELECT
MAX(_modified_timestamp)
FROM
{{ this }}
)
{% endif %}
),
ft_transfers_event AS (
SELECT
block_id,
block_timestamp,
tx_hash,
action_id,
rn,
contract_address,
from_address,
to_address,
memo,
amount_unadj,
'nep141' AS transfer_type,
_inserted_timestamp,
modified_timestamp AS _modified_timestamp,
_partition_by_block_number
FROM
{{ ref('silver__token_transfer_ft_transfers_event') }}
{% if var("MANUAL_FIX") %}
WHERE
{{ partition_load_manual('no_buffer') }}
{% elif is_incremental() %}
WHERE
_modified_timestamp >= (
SELECT
MAX(_modified_timestamp)
FROM
{{ this }}
)
{% endif %}
),
mints AS (
SELECT
block_id,
block_timestamp,
tx_hash,
action_id,
rn,
contract_address,
from_address,
to_address,
memo,
amount_unadj,
'nep141' AS transfer_type,
_inserted_timestamp,
modified_timestamp AS _modified_timestamp,
_partition_by_block_number
FROM
{{ ref('silver__token_transfer_mints') }}
{% if var("MANUAL_FIX") %}
WHERE
{{ partition_load_manual('no_buffer') }}
{% elif is_incremental() %}
WHERE
_modified_timestamp >= (
SELECT
MAX(_modified_timestamp)
FROM
{{ this }}
)
{% endif %}
),
orders AS (
SELECT
block_id,
block_timestamp,
tx_hash,
action_id,
rn,
contract_address,
from_address,
to_address,
memo,
amount_unadj,
'nep141' AS transfer_type,
_inserted_timestamp,
modified_timestamp AS _modified_timestamp,
_partition_by_block_number
FROM
{{ ref('silver__token_transfer_orders') }}
{% if var("MANUAL_FIX") %}
WHERE
{{ partition_load_manual('no_buffer') }}
{% elif is_incremental() %}
WHERE
_modified_timestamp >= (
SELECT
MAX(_modified_timestamp)
FROM
{{ this }}
)
{% endif %}
),
liquidity AS (
SELECT
block_id,
block_timestamp,
tx_hash,
action_id,
rn,
contract_address,
from_address,
to_address,
memo,
amount_unadj,
'nep141' AS transfer_type,
_inserted_timestamp,
modified_timestamp AS _modified_timestamp,
_partition_by_block_number
FROM
{{ ref('silver__token_transfer_liquidity') }}
{% if var("MANUAL_FIX") %}
WHERE
{{ partition_load_manual('no_buffer') }}
{% elif is_incremental() %}
WHERE
_modified_timestamp >= (
SELECT
MAX(_modified_timestamp)
FROM
{{ this }}
)
{% endif %}
),
FINAL AS (
SELECT
*
FROM
native_transfers
UNION ALL
SELECT
*
FROM
ft_transfers_method
UNION ALL
SELECT
*
FROM
ft_transfers_event
UNION ALL
SELECT
*
FROM
mints
UNION ALL
SELECT
*
FROM
orders
UNION ALL
SELECT
*
FROM
liquidity
)
SELECT
block_id,
block_timestamp,
tx_hash,
action_id,
rn,
contract_address,
from_address,
to_address,
memo,
amount_unadj,
transfer_type,
_inserted_timestamp,
_modified_timestamp,
_partition_by_block_number,
{{ dbt_utils.generate_surrogate_key(
['action_id','contract_address','amount_unadj','from_address','to_address','rn']
) }} AS transfers_complete_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp,
'{{ invocation_id }}' AS _invocation_id
FROM
FINAL

View File

@ -1,18 +1,8 @@
version: 2
models:
- name: silver__token_transfers
tests:
- dbt_utils.unique_combination_of_columns:
combination_of_columns:
- tx_hash
- action_id
- contract_address
- amount_raw
- from_address
- to_address
- memo
- rn
- name: silver__token_transfers_complete
description: |-
This table records all the Native Token + FTs Transfers of the Near blockchain.
columns:
@ -23,7 +13,7 @@ models:
description: "{{ doc('block_timestamp')}}"
tests:
- not_null:
where: _inserted_timestamp <= current_timestamp - interval '10 hour'
where: _inserted_timestamp <= SYSDATE() - interval '2 hours'
- name: TX_HASH
description: "{{ doc('tx_hash')}}"
@ -37,20 +27,17 @@ models:
- name: FROM_ADDRESS
description: "{{ doc('from_address')}}"
- name: RN
description: "Row number"
- name: TO_ADDRESS
description: "{{ doc('to_address')}}"
- name: RN
description: "Row number"
- name: MEMO
description: "{{ doc('memo')}}"
- name: AMOUNT_RAW
description: "{{ doc('amount_raw')}}"
- name: AMOUNT_RAW_PRECISE
description: "{{ doc('amount_adj')}}"
- name: AMOUNT_UNADJ
description: "{{ doc('amount_unadj')}}"
- name: TRANSFER_TYPE
description: "{{ doc('transfer_type')}}"
@ -61,8 +48,11 @@ models:
- name: _MODIFIED_TIMESTAMP
description: "{{ doc('_modified_timestamp')}}"
- name: TRANSFERS_ID
- name: TRANSFERS_COMPLETE_ID
description: "{{doc('id')}}"
tests:
- not_null
- unique
- name: INSERTED_TIMESTAMP
description: "{{doc('inserted_timestamp')}}"