diff --git a/dbt_project.yml b/dbt_project.yml index 89054c5..8df2e13 100644 --- a/dbt_project.yml +++ b/dbt_project.yml @@ -64,6 +64,7 @@ vars: STREAMLINE_LOAD_LOOKBACK_HOURS: 3 RECEIPT_MAP_LOOKBACK_HOURS: 6 IS_MIGRATION: False + HEAL_MODELS: [] dispatch: - macro_namespace: dbt diff --git a/models/descriptions/amount_raw.md b/models/descriptions/amount_raw.md index 57ef3ba..b218764 100644 --- a/models/descriptions/amount_raw.md +++ b/models/descriptions/amount_raw.md @@ -3,3 +3,9 @@ An unadjusted amount (of tokens, price, etc.) for the relevant record. This is the number as it appears on chain and is not decimal adjusted. {% enddocs %} + +{% docs amount_unadj %} + +An unadjusted amount (of tokens, price, etc.) for the relevant record. This is the number as it appears on chain and is not decimal adjusted. + +{% enddocs %} diff --git a/models/gold/core/core__ez_token_transfers.sql b/models/gold/core/core__ez_token_transfers.sql index 87b975c..cd4dd54 100644 --- a/models/gold/core/core__ez_token_transfers.sql +++ b/models/gold/core/core__ez_token_transfers.sql @@ -1,7 +1,7 @@ {{ config( materialized = 'incremental', incremental_predicates = ["COALESCE(DBT_INTERNAL_DEST.block_timestamp::DATE,'2099-12-31') >= (select min(block_timestamp::DATE) from " ~ generate_tmp_view_name(this) ~ ")"], - unique_key = "fact_token_transfers_id", + unique_key = "ez_token_transfers_id", incremental_strategy = 'merge', merge_exclude_columns = ["inserted_timestamp"], cluster_by = ['block_timestamp::DATE'], @@ -32,8 +32,8 @@ SELECT from_address, to_address, memo, - amount_raw, - amount_raw_precise, + amount_unadj :: STRING AS amount_raw, + amount_unadj :: FLOAT AS amount_raw_precise, IFF( C.decimals IS NOT NULL, utils.udf_decimal_adjust( @@ -53,14 +53,11 @@ SELECT C.symbol AS symbol, price AS token_price, transfer_type, - {{ dbt_utils.generate_surrogate_key( - ['transfers_id'] - ) }} AS fact_token_transfers_id, + transfers_complete_id AS ez_token_transfers_id, SYSDATE() AS inserted_timestamp, - SYSDATE() AS modified_timestamp, - '{{ invocation_id }}' AS _invocation_id + SYSDATE() AS modified_timestamp FROM - {{ ref('silver__token_transfers') }} + {{ ref('silver__token_transfers_complete') }} t LEFT JOIN hourly_prices p ON t.contract_address = p.token_address diff --git a/models/gold/core/core__ez_token_transfers.yml b/models/gold/core/core__ez_token_transfers.yml index 7738a73..bb4c7b3 100644 --- a/models/gold/core/core__ez_token_transfers.yml +++ b/models/gold/core/core__ez_token_transfers.yml @@ -3,6 +3,12 @@ version: 2 models: - name: core__ez_token_transfers description: This table records all native token transfers and nep-141 + tests: + - dbt_utils.recency: + datepart: hour + field: block_timestamp + interval: 2 + columns: - name: BLOCK_ID description: "{{ doc('block_id')}}" @@ -51,7 +57,7 @@ models: - name: HAS_PRICE description: "Boolean value indicating if the token has a price" - - name: FACT_TOKEN_TRANSFERS_ID + - name: EZ_TOKEN_TRANSFERS_ID description: "{{doc('id')}}" tests: - unique: diff --git a/models/silver/curated/silver__token_transfers.sql b/models/silver/curated/silver__token_transfers.sql deleted file mode 100644 index e65c2ec..0000000 --- a/models/silver/curated/silver__token_transfers.sql +++ /dev/null @@ -1,432 +0,0 @@ -{{ config( - materialized = 'incremental', - incremental_predicates = ["COALESCE(DBT_INTERNAL_DEST.block_timestamp::DATE,'2099-12-31') >= (select min(block_timestamp::DATE) from " ~ generate_tmp_view_name(this) ~ ")"], - merge_exclude_columns = ["inserted_timestamp"], - cluster_by = ['block_timestamp::DATE','modified_timestamp::Date'], - unique_key = 'transfers_id', - incremental_strategy = 'merge', - post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION ON EQUALITY(tx_hash,action_id,contract_address,from_address,to_address);", - tags = ['curated','scheduled_non_core'] -) }} -{# Note - multisource model #} --- Curation Challenge - 'https://flipsidecrypto.xyz/Hossein/transfer-sector-of-near-curation-challenge-zgM44F' -WITH actions_events AS ( - - SELECT - block_id, - block_timestamp, - tx_hash, - action_id, - signer_id, - receiver_id, - action_name, - method_name, - deposit, - logs, - receipt_succeeded, - _inserted_timestamp, - modified_timestamp AS _modified_timestamp, - _partition_by_block_number - FROM - {{ ref('silver__actions_events_function_call_s3') }} - WHERE - receipt_succeeded = TRUE - AND logs [0] IS NOT NULL - AND RIGHT( - action_id, - 2 - ) = '-0' {% if var("MANUAL_FIX") %} - AND {{ partition_load_manual('no_buffer') }} - {% else %} - -{% if is_incremental() %} -AND _modified_timestamp >= ( - SELECT - MAX(_modified_timestamp) - FROM - {{ this }} - WHERE - transfer_type = 'nep141' -) -{% endif %} -{% endif %} -), ----------------------------- Native Token Transfers ------------------------------ -native_transfers AS ( - SELECT - block_id, - block_timestamp, - tx_hash, - action_id, - predecessor_id AS from_address, - receiver_id AS to_address, - IFF(REGEXP_LIKE(deposit, '^[0-9]+$'), deposit, NULL) AS amount_unadjusted, - --numeric validation (there are some exceptions that needs to be ignored) - receipt_succeeded, - _inserted_timestamp, - modified_timestamp AS _modified_timestamp, - _partition_by_block_number - FROM - {{ ref('silver__transfers_s3') }} - WHERE - status = TRUE - AND deposit != 0 {% if var("MANUAL_FIX") %} - AND {{ partition_load_manual('no_buffer') }} - {% else %} - -{% if is_incremental() %} -AND _modified_timestamp >= ( - SELECT - MAX(_modified_timestamp) - FROM - {{ this }} - WHERE - transfer_type = 'native' -) -{% endif %} -{% endif %} -), ------------------------------- NEAR Tokens (NEP 141) -------------------------------- -orders AS ( - SELECT - block_id, - block_timestamp, - tx_hash, - action_id, - receiver_id, - TRY_PARSE_JSON(REPLACE(g.value, 'EVENT_JSON:')) AS DATA, - DATA :event :: STRING AS event, - g.index AS rn, - _inserted_timestamp, - _modified_timestamp, - _partition_by_block_number - FROM - actions_events - JOIN LATERAL FLATTEN( - input => logs - ) g - WHERE - DATA :event :: STRING = 'order_added' -), -orders_final AS ( - SELECT - block_id, - block_timestamp, - tx_hash, - action_id, - f.value :sell_token :: STRING AS contract_address, - f.value :owner_id :: STRING AS from_address, - receiver_id :: STRING AS to_address, - ( - f.value :original_amount - ) :: variant AS amount_unadjusted, - 'order' AS memo, - f.index AS rn, - _inserted_timestamp, - _modified_timestamp, - _partition_by_block_number - FROM - orders - JOIN LATERAL FLATTEN( - input => DATA :data - ) f - WHERE - amount_unadjusted > 0 -), -add_liquidity AS ( - SELECT - block_id, - block_timestamp, - tx_hash, - action_id, - REGEXP_SUBSTR( - SPLIT.value, - '"\\d+ ([^"]*)["]', - 1, - 1, - 'e', - 1 - ) :: STRING AS contract_address, - NULL AS from_address, - receiver_id AS to_address, - REGEXP_SUBSTR( - SPLIT.value, - '"(\\d+) ', - 1, - 1, - 'e', - 1 - ) :: variant AS amount_unadjusted, - 'add_liquidity' AS memo, - INDEX AS rn, - _inserted_timestamp, - _modified_timestamp, - _partition_by_block_number - FROM - actions_events, - LATERAL FLATTEN ( - input => SPLIT( - REGEXP_SUBSTR( - logs [0], - '\\["(.*?)"\\]' - ), - ',' - ) - ) SPLIT - WHERE - logs [0] LIKE 'Liquidity added [%minted % shares' -), -ft_transfers_method AS ( - SELECT - block_id, - block_timestamp, - tx_hash, - action_id, - receiver_id AS contract_address, - REGEXP_SUBSTR( - VALUE, - 'from ([^ ]+)', - 1, - 1, - '', - 1 - ) :: STRING AS from_address, - REGEXP_SUBSTR( - VALUE, - 'to ([^ ]+)', - 1, - 1, - '', - 1 - ) :: STRING AS to_address, - REGEXP_SUBSTR( - VALUE, - '\\d+' - ) :: variant AS amount_unadjusted, - '' AS memo, - b.index AS rn, - _inserted_timestamp, - _modified_timestamp, - _partition_by_block_number - FROM - actions_events - JOIN LATERAL FLATTEN( - input => logs - ) b - WHERE - method_name = 'ft_transfer' - AND from_address IS NOT NULL - AND to_address IS NOT NULL - AND amount_unadjusted IS NOT NULL -), -ft_transfers_event AS ( - SELECT - block_id, - block_timestamp, - tx_hash, - action_id, - TRY_PARSE_JSON(REPLACE(VALUE, 'EVENT_JSON:')) AS DATA, - b.index AS logs_rn, - receiver_id AS contract_address, - _inserted_timestamp, - _modified_timestamp, - _partition_by_block_number - FROM - actions_events - JOIN LATERAL FLATTEN( - input => logs - ) b - WHERE - DATA :event :: STRING IN ( - 'ft_transfer' - ) -), -ft_transfers_final AS ( - SELECT - block_id, - block_timestamp, - tx_hash, - action_id, - contract_address, - NVL( - f.value :old_owner_id, - NULL - ) :: STRING AS from_address, - NVL( - f.value :new_owner_id, - f.value :owner_id - ) :: STRING AS to_address, - f.value :amount :: variant AS amount_unadjusted, - f.value :memo :: STRING AS memo, - logs_rn + f.index AS rn, - _inserted_timestamp, - _modified_timestamp, - _partition_by_block_number - FROM - ft_transfers_event - JOIN LATERAL FLATTEN( - input => DATA :data - ) f - WHERE - amount_unadjusted > 0 -), -ft_mints AS ( - SELECT - block_id, - block_timestamp, - tx_hash, - action_id, - TRY_PARSE_JSON(REPLACE(VALUE, 'EVENT_JSON:')) AS DATA, - b.index AS logs_rn, - receiver_id AS contract_address, - _inserted_timestamp, - _modified_timestamp, - _partition_by_block_number - FROM - actions_events - JOIN LATERAL FLATTEN( - input => logs - ) b - WHERE - DATA :event :: STRING IN ( - 'ft_mint' - ) -), -ft_mints_final AS ( - SELECT - block_id, - block_timestamp, - tx_hash, - action_id, - contract_address, - NVL( - f.value :old_owner_id, - NULL - ) :: STRING AS from_address, - NVL( - f.value :new_owner_id, - f.value :owner_id - ) :: STRING AS to_address, - f.value :amount :: variant AS amount_unadjusted, - f.value :memo :: STRING AS memo, - logs_rn + f.index AS rn, - _inserted_timestamp, - _modified_timestamp, - _partition_by_block_number - FROM - ft_mints - JOIN LATERAL FLATTEN( - input => DATA :data - ) f - WHERE - amount_unadjusted > 0 -), -nep_transfers AS ( - SELECT - * - FROM - ft_transfers_method - UNION ALL - SELECT - * - FROM - ft_transfers_final - UNION ALL - SELECT - * - FROM - ft_mints_final - UNION ALL - SELECT - * - FROM - orders_final - UNION ALL - SELECT - * - FROM - add_liquidity -), ------------------------------- MODELS -------------------------------- -native_final AS ( - SELECT - block_id, - block_timestamp, - tx_hash, - action_id, - 'wrap.near' AS contract_address, - from_address :: STRING, - to_address :: STRING, - NULL AS memo, - '0' AS rn, - 'native' AS transfer_type, - amount_unadjusted :: STRING AS amount_raw, - amount_unadjusted :: FLOAT AS amount_raw_precise, - _inserted_timestamp, - _modified_timestamp, - _partition_by_block_number - FROM - native_transfers -), -nep_final AS ( - SELECT - block_id, - block_timestamp, - tx_hash, - action_id, - contract_address, - from_address, - to_address, - memo, - rn :: STRING AS rn, - 'nep141' AS transfer_type, - amount_unadjusted :: STRING AS amount_raw, - amount_unadjusted :: FLOAT AS amount_raw_precise, - _inserted_timestamp, - _modified_timestamp, - _partition_by_block_number - FROM - nep_transfers -), ------------------------------- FINAL -------------------------------- -transfer_union AS ( - SELECT - * - FROM - nep_final - UNION ALL - SELECT - * - FROM - native_final -), -FINAL AS ( - SELECT - block_id, - block_timestamp, - tx_hash, - action_id, - rn, - contract_address, - from_address, - to_address, - memo, - amount_raw, - amount_raw_precise, - transfer_type, - _inserted_timestamp, - _modified_timestamp, - _partition_by_block_number - FROM - transfer_union -) -SELECT - *, - {{ dbt_utils.generate_surrogate_key( - ['tx_hash', 'action_id','contract_address','amount_raw','from_address','to_address','memo','rn'] - ) }} AS transfers_id, - SYSDATE() AS inserted_timestamp, - SYSDATE() AS modified_timestamp, - '{{ invocation_id }}' AS _invocation_id -FROM - FINAL diff --git a/models/silver/curated/silver__transfers_s3_inc.sql b/models/silver/curated/silver__transfers_s3_inc.sql deleted file mode 100644 index 3289061..0000000 --- a/models/silver/curated/silver__transfers_s3_inc.sql +++ /dev/null @@ -1,157 +0,0 @@ -{{ config( - materialized = 'incremental', - merge_exclude_columns = ["inserted_timestamp"], - cluster_by = ['block_timestamp::DATE'], - unique_key = 'action_id', - incremental_strategy = 'merge', - tags = ['curated','scheduled_non_core'], - enabled = False -) }} -{# TODO - test and apply, where applicable. -Model disabled but will keep in repo for ref #} -{# Note - multisource model #} -{% if execute %} - -{% if is_incremental() %} -{% set max_modified_query %} - -SELECT - MAX(_modified_timestamp) AS _modified_timestamp -FROM - {{ this }} - - {% endset %} - {% set max_modified_timestamp = run_query(max_modified_query).columns [0].values() [0] %} -{% endif %} - -{% set query = """ CREATE OR REPLACE TEMPORARY TABLE silver.transfers_s3_temp_dates as SELECT MIN(COALESCE(b._modified_timestamp,'2050-01-01') ) as _modified_timestamp FROM """ ~ ref('silver__actions_events_s3') ~ """ a join """ ~ ref('silver__streamline_transactions_final') ~ """ b ON A.tx_hash = b.tx_hash WHERE a.action_name = 'Transfer' """ %} -{% set incr = "" %} - -{% if is_incremental() %} -{% set incr = """ AND GREATEST(a._modified_timestamp,b._modified_timestamp) >= '""" ~ max_modified_timestamp ~ """' """ %} -{% endif %} - -{% do run_query( - query ~ incr -) %} -{% endif %} - -WITH action_events AS( - SELECT - tx_hash, - block_id, - block_timestamp, - action_id, - action_data :deposit :: INT AS deposit, - predecessor_id, - receiver_id, - signer_id, - receipt_succeeded, - gas_price, - gas_burnt, - tokens_burnt, - _partition_by_block_number, - _inserted_timestamp, - modified_timestamp AS _modified_timestamp - FROM - {{ ref('silver__actions_events_s3') }} - WHERE - action_name = 'Transfer' {% if var("MANUAL_FIX") %} - AND {{ partition_load_manual('no_buffer') }} - {% else %} - -{% if is_incremental() %} -AND _modified_timestamp >= '{{ max_modified_timestamp }}' -{% endif %} -{% endif %} -), -txs AS ( - SELECT - tx_hash, - tx :receipt :: ARRAY AS tx_receipt, - block_id, - block_timestamp, - tx_receiver, - tx_signer, - transaction_fee, - gas_used, - tx_succeeded, - _partition_by_block_number, - _inserted_timestamp, - modified_timestamp AS _modified_timestamp - FROM - {{ ref('silver__streamline_transactions_final') }} - - {% if var("MANUAL_FIX") %} - WHERE - {{ partition_load_manual('no_buffer') }} - {% else %} - -{% if is_incremental() %} -WHERE - _modified_timestamp >= ( - SELECT - MAX(_modified_timestamp) - FROM - silver.transfers_s3_temp_dates - ) -{% endif %} -{% endif %} -), -actions AS ( - SELECT - A.tx_hash, - A.action_id, - A.block_id, - A.block_timestamp, - t.tx_signer, - t.tx_receiver, - A.predecessor_id, - A.receiver_id, - A.signer_id, - A.deposit, - t.transaction_fee, - A.gas_burnt AS gas_used, - A.receipt_succeeded, - t.tx_succeeded, - A._partition_by_block_number, - A._inserted_timestamp, - A._modified_timestamp - FROM - action_events A - INNER JOIN txs t - ON A.tx_hash = t.tx_hash -), -FINAL AS ( - SELECT - block_id, - block_timestamp, - action_id, - deposit, - tx_hash, - tx_signer, - tx_receiver, - predecessor_id, - signer_id, - receiver_id, - transaction_fee, - gas_used, - tx_succeeded, - receipt_succeeded, - array_min([tx_succeeded, receipt_succeeded]) :: BOOLEAN AS status, - _partition_by_block_number, - _inserted_timestamp, - _modified_timestamp - FROM - actions -) -SELECT - *, - {{ dbt_utils.generate_surrogate_key( - ['action_id'] - ) }} AS transfers_id, - SYSDATE() AS inserted_timestamp, - SYSDATE() AS modified_timestamp, - '{{ invocation_id }}' AS _invocation_id -FROM - FINAL diff --git a/models/silver/curated/silver__transfers_s3.sql b/models/silver/curated/token_transfers/native/silver__transfers_s3.sql similarity index 96% rename from models/silver/curated/silver__transfers_s3.sql rename to models/silver/curated/token_transfers/native/silver__transfers_s3.sql index 1366d51..3f06796 100644 --- a/models/silver/curated/silver__transfers_s3.sql +++ b/models/silver/curated/token_transfers/native/silver__transfers_s3.sql @@ -5,7 +5,7 @@ cluster_by = ['block_timestamp::DATE'], unique_key = 'action_id', incremental_strategy = 'merge', - post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION ON EQUALITY(tx_hash,action_id, tx_hash,tx_signer,tx_receiver,predecessor_id,signer_id,receiver_id);", + post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION ON EQUALITY(tx_hash,action_id,predecessor_id,receiver_id);", tags = ['curated','scheduled_non_core'] ) }} {# Note - multisource model #} diff --git a/models/silver/curated/silver__transfers_s3.yml b/models/silver/curated/token_transfers/native/silver__transfers_s3.yml similarity index 97% rename from models/silver/curated/silver__transfers_s3.yml rename to models/silver/curated/token_transfers/native/silver__transfers_s3.yml index 82afead..ef3a15c 100644 --- a/models/silver/curated/silver__transfers_s3.yml +++ b/models/silver/curated/token_transfers/native/silver__transfers_s3.yml @@ -74,6 +74,9 @@ models: - name: TRANSFERS_ID description: "{{doc('id')}}" + tests: + - not_null + - unique - name: INSERTED_TIMESTAMP description: "{{doc('inserted_timestamp')}}" diff --git a/models/silver/curated/token_transfers/non_native/silver__token_transfer_base.sql b/models/silver/curated/token_transfers/non_native/silver__token_transfer_base.sql new file mode 100644 index 0000000..51c417d --- /dev/null +++ b/models/silver/curated/token_transfers/non_native/silver__token_transfer_base.sql @@ -0,0 +1,58 @@ +{{ config( + materialized = 'incremental', + incremental_predicates = ["COALESCE(DBT_INTERNAL_DEST.block_timestamp::DATE,'2099-12-31') >= (select min(block_timestamp::DATE) from " ~ generate_tmp_view_name(this) ~ ")"], + merge_exclude_columns = ["inserted_timestamp"], + cluster_by = ['block_timestamp::DATE','_modified_timestamp::Date'], + unique_key = 'transfers_base_id', + incremental_strategy = 'merge', + tags = ['curated','scheduled_non_core'] +) }} + +WITH nep141 AS ( + + SELECT + block_id, + block_timestamp, + tx_hash, + action_id, + predecessor_id, + signer_id, + receiver_id, + action_name, + method_name, + deposit, + logs, + receipt_succeeded, + _inserted_timestamp, + modified_timestamp AS _modified_timestamp, + _partition_by_block_number + FROM + {{ ref('silver__actions_events_function_call_s3') }} + WHERE + receipt_succeeded = TRUE + AND logs [0] IS NOT NULL + AND RIGHT( + action_id, + 2 + ) = '-0' + {% if var("MANUAL_FIX") %} + AND {{ partition_load_manual('no_buffer') }} + {% elif is_incremental() %} + AND _modified_timestamp >= ( + SELECT + MAX(_modified_timestamp) + FROM + {{ this }} + ) + {% endif %} +) +SELECT + *, + {{ dbt_utils.generate_surrogate_key( + ['action_id'] + ) }} AS transfers_base_id, + SYSDATE() AS inserted_timestamp, + SYSDATE() AS modified_timestamp, + '{{ invocation_id }}' AS _invocation_id +FROM + nep141 diff --git a/models/silver/curated/token_transfers/non_native/silver__token_transfer_ft_transfers_event.sql b/models/silver/curated/token_transfers/non_native/silver__token_transfer_ft_transfers_event.sql new file mode 100644 index 0000000..a0e9b2f --- /dev/null +++ b/models/silver/curated/token_transfers/non_native/silver__token_transfer_ft_transfers_event.sql @@ -0,0 +1,102 @@ +{{ config( + materialized = 'incremental', + incremental_predicates = ["COALESCE(DBT_INTERNAL_DEST.block_timestamp::DATE,'2099-12-31') >= (select min(block_timestamp::DATE) from " ~ generate_tmp_view_name(this) ~ ")"], + merge_exclude_columns = ["inserted_timestamp"], + cluster_by = ['block_timestamp::DATE','_modified_timestamp::Date'], + unique_key = 'transfers_event_id', + incremental_strategy = 'merge', + tags = ['curated','scheduled_non_core'] +) }} + + +WITH actions_events AS ( + + SELECT + block_id, + block_timestamp, + tx_hash, + action_id, + signer_id, + receiver_id, + action_name, + method_name, + deposit, + logs, + receipt_succeeded, + _inserted_timestamp, + modified_timestamp as _modified_timestamp, + _partition_by_block_number + FROM + {{ ref('silver__token_transfer_base') }} + {% if var("MANUAL_FIX") %} + WHERE {{ partition_load_manual('no_buffer') }} + {% elif is_incremental() %} + WHERE _modified_timestamp >= ( + SELECT + MAX(_modified_timestamp) + FROM + {{ this }} + ) + {% endif %} +), +ft_transfers_event AS ( + SELECT + block_id, + block_timestamp, + tx_hash, + action_id, + TRY_PARSE_JSON(REPLACE(VALUE, 'EVENT_JSON:')) AS DATA, + b.index AS logs_rn, + receiver_id AS contract_address, + _inserted_timestamp, + _modified_timestamp, + _partition_by_block_number + FROM + actions_events + JOIN LATERAL FLATTEN( + input => logs + ) b + WHERE + DATA :event :: STRING IN ( + 'ft_transfer' + ) +), +ft_transfers_final AS ( + SELECT + block_id, + block_timestamp, + tx_hash, + action_id, + contract_address, + NVL( + f.value :old_owner_id, + NULL + ) :: STRING AS from_address, + NVL( + f.value :new_owner_id, + f.value :owner_id + ) :: STRING AS to_address, + f.value :amount :: variant AS amount_unadj, + f.value :memo :: STRING AS memo, + logs_rn + f.index AS rn, + _inserted_timestamp, + _modified_timestamp, + _partition_by_block_number + FROM + ft_transfers_event + JOIN LATERAL FLATTEN( + input => DATA :data + ) f + WHERE + amount_unadj > 0 +) +SELECT + *, + {{ dbt_utils.generate_surrogate_key( + ['action_id'] + ) }} AS transfers_event_id, + SYSDATE() AS inserted_timestamp, + SYSDATE() AS modified_timestamp, + '{{ invocation_id }}' AS _invocation_id +FROM + ft_transfers_final diff --git a/models/silver/curated/token_transfers/non_native/silver__token_transfer_ft_transfers_method.sql b/models/silver/curated/token_transfers/non_native/silver__token_transfer_ft_transfers_method.sql new file mode 100644 index 0000000..78e5ac7 --- /dev/null +++ b/models/silver/curated/token_transfers/non_native/silver__token_transfer_ft_transfers_method.sql @@ -0,0 +1,93 @@ +{{ config( + materialized = 'incremental', + incremental_predicates = ["COALESCE(DBT_INTERNAL_DEST.block_timestamp::DATE,'2099-12-31') >= (select min(block_timestamp::DATE) from " ~ generate_tmp_view_name(this) ~ ")"], + merge_exclude_columns = ["inserted_timestamp"], + cluster_by = ['block_timestamp::DATE','_modified_timestamp::Date'], + unique_key = 'transfers_id', + incremental_strategy = 'merge', + tags = ['curated','scheduled_non_core'] +) }} + +WITH actions_events AS ( + + SELECT + block_id, + block_timestamp, + tx_hash, + action_id, + signer_id, + receiver_id, + action_name, + method_name, + deposit, + logs, + receipt_succeeded, + _inserted_timestamp, + modified_timestamp as _modified_timestamp, + _partition_by_block_number + FROM + {{ ref('silver__token_transfer_base') }} + {% if var("MANUAL_FIX") %} + WHERE {{ partition_load_manual('no_buffer') }} + {% elif is_incremental() %} + WHERE _modified_timestamp >= ( + SELECT + MAX(_modified_timestamp) + FROM + {{ this }} + ) + {% endif %} +), +ft_transfers_method AS ( + SELECT + block_id, + block_timestamp, + tx_hash, + action_id, + receiver_id AS contract_address, + REGEXP_SUBSTR( + VALUE, + 'from ([^ ]+)', + 1, + 1, + '', + 1 + ) :: STRING AS from_address, + REGEXP_SUBSTR( + VALUE, + 'to ([^ ]+)', + 1, + 1, + '', + 1 + ) :: STRING AS to_address, + REGEXP_SUBSTR( + VALUE, + '\\d+' + ) :: variant AS amount_unadj, + '' AS memo, + b.index AS rn, + _inserted_timestamp, + _modified_timestamp, + _partition_by_block_number + FROM + actions_events + JOIN LATERAL FLATTEN( + input => logs + ) b + WHERE + method_name = 'ft_transfer' + AND from_address IS NOT NULL + AND to_address IS NOT NULL + AND amount_unadj IS NOT NULL +) +SELECT + *, + {{ dbt_utils.generate_surrogate_key( + ['action_id'] + ) }} AS transfers_id, + SYSDATE() AS inserted_timestamp, + SYSDATE() AS modified_timestamp, + '{{ invocation_id }}' AS _invocation_id +FROM + ft_transfers_method diff --git a/models/silver/curated/token_transfers/non_native/silver__token_transfer_liquidity.sql b/models/silver/curated/token_transfers/non_native/silver__token_transfer_liquidity.sql new file mode 100644 index 0000000..3b1408c --- /dev/null +++ b/models/silver/curated/token_transfers/non_native/silver__token_transfer_liquidity.sql @@ -0,0 +1,95 @@ +{{ config( + materialized = 'incremental', + incremental_predicates = ["COALESCE(DBT_INTERNAL_DEST.block_timestamp::DATE,'2099-12-31') >= (select min(block_timestamp::DATE) from " ~ generate_tmp_view_name(this) ~ ")"], + merge_exclude_columns = ["inserted_timestamp"], + cluster_by = ['block_timestamp::DATE','_modified_timestamp::Date'], + unique_key = 'transfers_liquidity_id', + incremental_strategy = 'merge', + tags = ['curated','scheduled_non_core'] +) }} + + + +WITH actions_events AS ( + + SELECT + block_id, + block_timestamp, + tx_hash, + action_id, + signer_id, + receiver_id, + action_name, + method_name, + deposit, + logs, + receipt_succeeded, + _inserted_timestamp, + modified_timestamp as _modified_timestamp, + _partition_by_block_number + FROM + {{ ref('silver__token_transfer_base') }} + {% if var("MANUAL_FIX") %} + WHERE {{ partition_load_manual('no_buffer') }} + {% elif is_incremental() %} + WHERE _modified_timestamp >= ( + SELECT + MAX(_modified_timestamp) + FROM + {{ this }} + ) + {% endif %} +), +add_liquidity AS ( + SELECT + block_id, + block_timestamp, + tx_hash, + action_id, + REGEXP_SUBSTR( + SPLIT.value, + '"\\d+ ([^"]*)["]', + 1, + 1, + 'e', + 1 + ) :: STRING AS contract_address, + NULL AS from_address, + receiver_id AS to_address, + REGEXP_SUBSTR( + SPLIT.value, + '"(\\d+) ', + 1, + 1, + 'e', + 1 + ) :: variant AS amount_unadj, + 'add_liquidity' AS memo, + INDEX AS rn, + _inserted_timestamp, + _modified_timestamp, + _partition_by_block_number + FROM + actions_events, + LATERAL FLATTEN ( + input => SPLIT( + REGEXP_SUBSTR( + logs [0], + '\\["(.*?)"\\]' + ), + ',' + ) + ) SPLIT + WHERE + logs [0] LIKE 'Liquidity added [%minted % shares' +) +SELECT + *, + {{ dbt_utils.generate_surrogate_key( + ['action_id'] + ) }} AS transfers_liquidity_id, + SYSDATE() AS inserted_timestamp, + SYSDATE() AS modified_timestamp, + '{{ invocation_id }}' AS _invocation_id +FROM + add_liquidity diff --git a/models/silver/curated/token_transfers/non_native/silver__token_transfer_mints.sql b/models/silver/curated/token_transfers/non_native/silver__token_transfer_mints.sql new file mode 100644 index 0000000..71b69ef --- /dev/null +++ b/models/silver/curated/token_transfers/non_native/silver__token_transfer_mints.sql @@ -0,0 +1,104 @@ +{{ config( + materialized = 'incremental', + incremental_predicates = ["COALESCE(DBT_INTERNAL_DEST.block_timestamp::DATE,'2099-12-31') >= (select min(block_timestamp::DATE) from " ~ generate_tmp_view_name(this) ~ ")"], + merge_exclude_columns = ["inserted_timestamp"], + cluster_by = ['block_timestamp::DATE','_modified_timestamp::Date'], + unique_key = 'mint_id', + incremental_strategy = 'merge', + tags = ['curated','scheduled_non_core'] +) }} + +WITH actions_events AS ( + + SELECT + block_id, + block_timestamp, + tx_hash, + action_id, + signer_id, + receiver_id, + action_name, + method_name, + deposit, + logs, + receipt_succeeded, + _inserted_timestamp, + modified_timestamp as _modified_timestamp, + _partition_by_block_number + FROM + {{ ref('silver__token_transfer_base') }} + {% if var("MANUAL_FIX") %} + WHERE {{ partition_load_manual('no_buffer') }} + {% elif is_incremental() %} + WHERE _modified_timestamp >= ( + SELECT + MAX(_modified_timestamp) + FROM + {{ this }} + ) + {% endif %} +) +, +ft_mints AS ( + SELECT + block_id, + block_timestamp, + tx_hash, + action_id, + TRY_PARSE_JSON(REPLACE(VALUE, 'EVENT_JSON:')) AS DATA, + b.index AS logs_rn, + receiver_id AS contract_address, + _inserted_timestamp, + _modified_timestamp, + _partition_by_block_number + FROM + actions_events + JOIN LATERAL FLATTEN( + input => logs + ) b + WHERE + DATA :event :: STRING IN ( + 'ft_mint' + ) +), +ft_mints_final AS ( + SELECT + block_id, + block_timestamp, + tx_hash, + action_id, + contract_address, + NVL( + f.value :old_owner_id, + NULL + ) :: STRING AS from_address, + NVL( + f.value :new_owner_id, + f.value :owner_id + ) :: STRING AS to_address, + f.value :amount :: variant AS amount_unadj, + f.value :memo :: STRING AS memo, + logs_rn + f.index AS rn, + _inserted_timestamp, + _modified_timestamp, + _partition_by_block_number + FROM + ft_mints + JOIN LATERAL FLATTEN( + input => DATA :data + ) f + WHERE + amount_unadj > 0 +) +SELECT + *, + {{ dbt_utils.generate_surrogate_key( + ['action_id','rn'] + )}} AS mint_id, + SYSDATE() AS inserted_timestamp, + SYSDATE() AS modified_timestamp, + '{{ invocation_id }}' AS _invocation_id +FROM + ft_mints_final +WHERE + mint_id IS NOT NULL diff --git a/models/silver/curated/token_transfers/non_native/silver__token_transfer_orders.sql b/models/silver/curated/token_transfers/non_native/silver__token_transfer_orders.sql new file mode 100644 index 0000000..16e17cd --- /dev/null +++ b/models/silver/curated/token_transfers/non_native/silver__token_transfer_orders.sql @@ -0,0 +1,96 @@ +{{ config( + materialized = 'incremental', + incremental_predicates = ["COALESCE(DBT_INTERNAL_DEST.block_timestamp::DATE,'2099-12-31') >= (select min(block_timestamp::DATE) from " ~ generate_tmp_view_name(this) ~ ")"], + merge_exclude_columns = ["inserted_timestamp"], + cluster_by = ['block_timestamp::DATE','_modified_timestamp::Date'], + unique_key = 'transfers_orders_id', + incremental_strategy = 'merge', + tags = ['curated','scheduled_non_core'] +) }} + +WITH actions_events AS ( + + SELECT + block_id, + block_timestamp, + tx_hash, + action_id, + signer_id, + receiver_id, + action_name, + method_name, + deposit, + logs, + receipt_succeeded, + _inserted_timestamp, + modified_timestamp as _modified_timestamp, + _partition_by_block_number + FROM + {{ ref('silver__token_transfer_base') }} + {% if var("MANUAL_FIX") %} + WHERE {{ partition_load_manual('no_buffer') }} + {% elif is_incremental() %} + WHERE _modified_timestamp >= ( + SELECT + MAX(_modified_timestamp) + FROM + {{ this }} + ) + {% endif %} +), +orders AS ( + SELECT + block_id, + block_timestamp, + tx_hash, + action_id, + receiver_id, + TRY_PARSE_JSON(REPLACE(g.value, 'EVENT_JSON:')) AS DATA, + DATA :event :: STRING AS event, + g.index AS rn, + _inserted_timestamp, + _modified_timestamp, + _partition_by_block_number + FROM + actions_events + JOIN LATERAL FLATTEN( + input => logs + ) g + WHERE + DATA :event :: STRING = 'order_added' +), +orders_final AS ( + SELECT + block_id, + block_timestamp, + tx_hash, + action_id, + f.value :sell_token :: STRING AS contract_address, + f.value :owner_id :: STRING AS from_address, + receiver_id :: STRING AS to_address, + ( + f.value :original_amount + ) :: variant AS amount_unadj, + 'order' AS memo, + f.index AS rn, + _inserted_timestamp, + _modified_timestamp, + _partition_by_block_number + FROM + orders + JOIN LATERAL FLATTEN( + input => DATA :data + ) f + WHERE + amount_unadj > 0 +) +SELECT + *, + {{ dbt_utils.generate_surrogate_key( + ['action_id'] + ) }} AS transfers_orders_id, + SYSDATE() AS inserted_timestamp, + SYSDATE() AS modified_timestamp, + '{{ invocation_id }}' AS _invocation_id +FROM + orders_final diff --git a/models/silver/curated/token_transfers/silver__token_transfers_complete.sql b/models/silver/curated/token_transfers/silver__token_transfers_complete.sql new file mode 100644 index 0000000..f840be2 --- /dev/null +++ b/models/silver/curated/token_transfers/silver__token_transfers_complete.sql @@ -0,0 +1,265 @@ +{{ config( + materialized = 'incremental', + incremental_predicates = ["COALESCE(DBT_INTERNAL_DEST.block_timestamp::DATE,'2099-12-31') >= (select min(block_timestamp::DATE) from " ~ generate_tmp_view_name(this) ~ ")"], + merge_exclude_columns = ["inserted_timestamp"], + cluster_by = ['block_timestamp::DATE','modified_timestamp::Date'], + unique_key = 'transfers_complete_id', + incremental_strategy = 'merge', + post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION ON EQUALITY(tx_hash,contract_address,from_address,to_address);", + tags = ['curated','scheduled_non_core'] +) }} + +WITH native_transfers AS ( + + SELECT + block_id, + block_timestamp, + tx_hash, + action_id, + '0' AS rn, + 'wrap.near' AS contract_address, + predecessor_id AS from_address, + receiver_id AS to_address, + NULL AS memo, + IFF(REGEXP_LIKE(deposit, '^[0-9]+$'), deposit, NULL) AS amount_unadj, + 'native' AS transfer_type, + _inserted_timestamp, + modified_timestamp AS _modified_timestamp, + _partition_by_block_number + FROM + {{ ref('silver__transfers_s3') }} + WHERE + status = TRUE AND deposit != 0 + {% if var("MANUAL_FIX") %} + AND + {{ partition_load_manual('no_buffer') }} + + {% elif is_incremental() %} + AND + _modified_timestamp >= ( + SELECT + MAX(_modified_timestamp) + FROM + {{ this }} + ) + {% endif %} +), +ft_transfers_method AS ( + SELECT + block_id, + block_timestamp, + tx_hash, + action_id, + rn, + contract_address, + from_address, + to_address, + memo, + amount_unadj, + 'nep141' AS transfer_type, + _inserted_timestamp, + modified_timestamp AS _modified_timestamp, + _partition_by_block_number + FROM + {{ ref('silver__token_transfer_ft_transfers_method') }} + + {% if var("MANUAL_FIX") %} + WHERE + {{ partition_load_manual('no_buffer') }} + + {% elif is_incremental() %} + WHERE + _modified_timestamp >= ( + SELECT + MAX(_modified_timestamp) + FROM + {{ this }} + ) + {% endif %} +), +ft_transfers_event AS ( + SELECT + block_id, + block_timestamp, + tx_hash, + action_id, + rn, + contract_address, + from_address, + to_address, + memo, + amount_unadj, + 'nep141' AS transfer_type, + _inserted_timestamp, + modified_timestamp AS _modified_timestamp, + _partition_by_block_number + FROM + {{ ref('silver__token_transfer_ft_transfers_event') }} + + {% if var("MANUAL_FIX") %} + WHERE + {{ partition_load_manual('no_buffer') }} + + {% elif is_incremental() %} + WHERE + _modified_timestamp >= ( + SELECT + MAX(_modified_timestamp) + FROM + {{ this }} + ) + {% endif %} +), +mints AS ( + SELECT + block_id, + block_timestamp, + tx_hash, + action_id, + rn, + contract_address, + from_address, + to_address, + memo, + amount_unadj, + 'nep141' AS transfer_type, + _inserted_timestamp, + modified_timestamp AS _modified_timestamp, + _partition_by_block_number + FROM + {{ ref('silver__token_transfer_mints') }} + + {% if var("MANUAL_FIX") %} + WHERE + {{ partition_load_manual('no_buffer') }} + + {% elif is_incremental() %} + WHERE + _modified_timestamp >= ( + SELECT + MAX(_modified_timestamp) + FROM + {{ this }} + ) + {% endif %} +), +orders AS ( + SELECT + block_id, + block_timestamp, + tx_hash, + action_id, + rn, + contract_address, + from_address, + to_address, + memo, + amount_unadj, + 'nep141' AS transfer_type, + _inserted_timestamp, + modified_timestamp AS _modified_timestamp, + _partition_by_block_number + FROM + {{ ref('silver__token_transfer_orders') }} + + {% if var("MANUAL_FIX") %} + WHERE + {{ partition_load_manual('no_buffer') }} + + {% elif is_incremental() %} + WHERE + _modified_timestamp >= ( + SELECT + MAX(_modified_timestamp) + FROM + {{ this }} + ) + {% endif %} +), +liquidity AS ( + SELECT + block_id, + block_timestamp, + tx_hash, + action_id, + rn, + contract_address, + from_address, + to_address, + memo, + amount_unadj, + 'nep141' AS transfer_type, + _inserted_timestamp, + modified_timestamp AS _modified_timestamp, + _partition_by_block_number + FROM + {{ ref('silver__token_transfer_liquidity') }} + + {% if var("MANUAL_FIX") %} + WHERE + {{ partition_load_manual('no_buffer') }} + + {% elif is_incremental() %} + WHERE + _modified_timestamp >= ( + SELECT + MAX(_modified_timestamp) + FROM + {{ this }} + ) + {% endif %} +), +FINAL AS ( + SELECT + * + FROM + native_transfers + UNION ALL + SELECT + * + FROM + ft_transfers_method + UNION ALL + SELECT + * + FROM + ft_transfers_event + UNION ALL + SELECT + * + FROM + mints + UNION ALL + SELECT + * + FROM + orders + UNION ALL + SELECT + * + FROM + liquidity +) +SELECT + block_id, + block_timestamp, + tx_hash, + action_id, + rn, + contract_address, + from_address, + to_address, + memo, + amount_unadj, + transfer_type, + _inserted_timestamp, + _modified_timestamp, + _partition_by_block_number, + {{ dbt_utils.generate_surrogate_key( + ['action_id','contract_address','amount_unadj','from_address','to_address','rn'] + ) }} AS transfers_complete_id, + SYSDATE() AS inserted_timestamp, + SYSDATE() AS modified_timestamp, + '{{ invocation_id }}' AS _invocation_id +FROM + FINAL diff --git a/models/silver/curated/silver__token_transfers.yml b/models/silver/curated/token_transfers/silver__token_transfers_complete.yml similarity index 70% rename from models/silver/curated/silver__token_transfers.yml rename to models/silver/curated/token_transfers/silver__token_transfers_complete.yml index 35635d7..224d6f3 100644 --- a/models/silver/curated/silver__token_transfers.yml +++ b/models/silver/curated/token_transfers/silver__token_transfers_complete.yml @@ -1,18 +1,8 @@ version: 2 models: - - name: silver__token_transfers - tests: - - dbt_utils.unique_combination_of_columns: - combination_of_columns: - - tx_hash - - action_id - - contract_address - - amount_raw - - from_address - - to_address - - memo - - rn + - name: silver__token_transfers_complete + description: |- This table records all the Native Token + FTs Transfers of the Near blockchain. columns: @@ -23,7 +13,7 @@ models: description: "{{ doc('block_timestamp')}}" tests: - not_null: - where: _inserted_timestamp <= current_timestamp - interval '10 hour' + where: _inserted_timestamp <= SYSDATE() - interval '2 hours' - name: TX_HASH description: "{{ doc('tx_hash')}}" @@ -37,20 +27,17 @@ models: - name: FROM_ADDRESS description: "{{ doc('from_address')}}" - - name: RN - description: "Row number" - - name: TO_ADDRESS description: "{{ doc('to_address')}}" + - name: RN + description: "Row number" + - name: MEMO description: "{{ doc('memo')}}" - - name: AMOUNT_RAW - description: "{{ doc('amount_raw')}}" - - - name: AMOUNT_RAW_PRECISE - description: "{{ doc('amount_adj')}}" + - name: AMOUNT_UNADJ + description: "{{ doc('amount_unadj')}}" - name: TRANSFER_TYPE description: "{{ doc('transfer_type')}}" @@ -61,8 +48,11 @@ models: - name: _MODIFIED_TIMESTAMP description: "{{ doc('_modified_timestamp')}}" - - name: TRANSFERS_ID + - name: TRANSFERS_COMPLETE_ID description: "{{doc('id')}}" + tests: + - not_null + - unique - name: INSERTED_TIMESTAMP description: "{{doc('inserted_timestamp')}}"