From b0ecf052654d70287c3f7d63fd972c1025559913 Mon Sep 17 00:00:00 2001 From: Jack Forgash <58153492+forgxyz@users.noreply.github.com> Date: Wed, 23 Apr 2025 15:41:37 -0600 Subject: [PATCH] bridge models --- models/MIGRATION_CHANGELOG.md | 194 ++++++++++++++++++ .../defi/bridge/silver__bridge_allbridge.sql | 48 ++--- .../defi/bridge/silver__bridge_allbridge.yml | 58 ++++-- .../defi/bridge/silver__bridge_multichain.sql | 50 ++--- .../defi/bridge/silver__bridge_multichain.yml | 87 +++++++- .../defi/bridge/silver__bridge_rainbow.sql | 175 +++++++++++----- .../defi/bridge/silver__bridge_rainbow.yml | 50 ++++- .../defi/bridge/silver__bridge_wormhole.sql | 91 +++++--- .../defi/bridge/silver__bridge_wormhole.yml | 54 +++-- 9 files changed, 627 insertions(+), 180 deletions(-) diff --git a/models/MIGRATION_CHANGELOG.md b/models/MIGRATION_CHANGELOG.md index ca009b4..aeb24a9 100644 --- a/models/MIGRATION_CHANGELOG.md +++ b/models/MIGRATION_CHANGELOG.md @@ -281,4 +281,198 @@ The gold model `nft__fact_nft_transfers.sql` needs to be updated: - Implemented LEFT JOIN to logs table with proper matching conditions - Simplified log parsing by removing SUBSTRING operation and using TRY_PARSE_JSON directly +--- + +## silver__bridge_rainbow + +### Major Changes +- Refactored to use core__ez_actions and silver__logs_s3 instead of silver__actions_events_function_call_s3 +- Maintained complex multi-directional bridge logic (Near↔Aurora, Near↔Ethereum) +- Enhanced log parsing with direction-specific strategies: + * Aurora inbound: Using log_index=0 for source address extraction + * ETH inbound: Complex log pattern matching for amount and address extraction + * Near outbound: Direct args parsing from function calls +- Preserved all existing bridge direction handling with improved data sourcing + +Architecture Changes: +- Split into specialized CTEs for each bridge direction: + * outbound_near_to_aurora: ft_transfer_call to Aurora + * inbound_aurora_to_near: ft_transfer from relay.aurora + * outbound_near_to_eth: finish_withdraw on factory.bridge.near + * inbound_eth_to_near: finish_deposit with complex log parsing +- Added dedicated logs CTE with proper log aggregation +- Improved transaction identification using receipt_id matching + +Column Changes: +- Added: + * receipt_id (for more precise transaction tracking) + * action_index (from core__ez_actions) +- Modified: + * Improved source/destination address extraction using proper string manipulation + * Enhanced amount parsing from logs with multiple fallback patterns + * Simplified bridge_rainbow_id generation using receipt_id +- Removed: + * Deprecated _inserted_timestamp field + +Configuration Changes: +- Updated incremental predicates to use dynamic_range_predicate_custom +- Added modified_timestamp::DATE to clustering keys +- Enhanced search optimization for transaction tracking + +Query Changes: +- Improved ETH bridge log parsing with multiple regex patterns for different scenarios +- Enhanced Aurora bridge transaction matching using receipt_id +- Added proper type casting and NULL handling for all extracted fields +- Maintained all existing bridge direction logic while improving data extraction +- Added sophisticated log aggregation for multi-log transactions + +--- + +## silver__bridge_allbridge + +### Major Changes +- Model has been refactored to use core__ez_actions directly +- Maintains same business logic and data structure, allowing for incremental processing +- Preserved both bridge directions (outbound NEAR burns and inbound NEAR mints) + +### Architecture Changes +- Simplified to use core__ez_actions for function call data +- Improved args parsing with proper JSON handling for both lock and unlock actions +- Maintained metadata join for token decimals handling + +### Column Changes + +#### Columns Removed +- `_inserted_timestamp` (deprecated) +- Removed dependency on deprecated `logs` column from actions table + +#### Columns Added +- `receipt_id` (from core__ez_actions) +- `action_index` (from core__ez_actions) +- `predecessor_id` (from receipt_predecessor_id) +- `signer_id` (from receipt_signer_id) + +#### Column Modifications +- Changed `receiver_id` to come from `receipt_receiver_id` +- Changed `method_name` to parse from `action_data :method_name` +- Changed `args` to parse from `action_data :args` +- Changed `_partition_by_block_number` to be calculated as `FLOOR(block_id, -3)` +- Maintained amount_adj calculation using metadata decimals + +### Configuration Changes +- Updated incremental predicates to use `dynamic_range_predicate_custom` +- Added `modified_timestamp::DATE` to clustering keys +- Updated unique key to use combination of `receipt_id` and `action_index` +- Maintained search optimization on tx_hash, destination_address, and source_address + +### Query Changes +- Updated WHERE clause to filter on `action_name = 'FunctionCall'` and specific receiver_id upfront +- Updated partition_load_manual to include partition key calculation +- Added proper type casting for method_name and args from action_data +- Maintained separate CTEs for outbound and inbound transfers +- Preserved token metadata join for decimal handling + +--- + +## silver__bridge_multichain + +### Major Changes +- Model has been refactored to use core__ez_actions directly +- Maintains same business logic and data structure, allowing for incremental processing +- Preserved both bridge directions (inbound and outbound transfers) + +### Architecture Changes +- Simplified to use core__ez_actions for function call data +- Maintained separate CTEs for inbound and outbound transfers +- Preserved memo parsing logic for chain ID and address extraction + +### Column Changes + +#### Columns Removed +- `_inserted_timestamp` (deprecated) +- Removed dependency on deprecated `logs` column from actions table + +#### Columns Added +- `receipt_id` (from core__ez_actions) +- `action_index` (from core__ez_actions) +- `predecessor_id` (from receipt_predecessor_id) +- `signer_id` (from receipt_signer_id) + +#### Column Modifications +- Changed `receiver_id` to come from `receipt_receiver_id` +- Changed `method_name` to parse from `action_data :method_name` +- Changed `args` to parse from `action_data :args` +- Changed `_partition_by_block_number` to be calculated as `FLOOR(block_id, -3)` +- Maintained memo parsing for chain IDs and addresses + +### Configuration Changes +- Updated incremental predicates to use `dynamic_range_predicate_custom` +- Added `modified_timestamp::DATE` to clustering keys +- Updated unique key to use combination of `receipt_id` and `action_index` +- Maintained search optimization on tx_hash, destination_address, and source_address + +### Query Changes +- Updated WHERE clause to filter on `action_name = 'FunctionCall'` and specific method_name upfront +- Updated partition_load_manual to include partition key calculation +- Added proper type casting for method_name and args from action_data +- Maintained separate CTEs for inbound and outbound transfers with their specific filtering logic: + * Inbound: signer_id = 'mpc-multichain.near' + * Outbound: args:receiver_id = 'mpc-multichain.near' +- Preserved memo parsing logic for extracting chain IDs and addresses + +--- + +## silver__bridge_wormhole + +### Major Changes +- Model has been refactored to use both `core__ez_actions` and `silver__logs_s3` directly +- Maintains same business logic and data structure, allowing for incremental processing +- Preserved both bridge directions (outbound withdraws and inbound transfers) + +### Architecture Changes +- Split data sourcing into two CTEs: + 1. `actions` CTE from `core__ez_actions` for function call data + 2. `logs` CTE from `silver__logs_s3` with optimized filtering to only fetch relevant transaction logs +- Improved log parsing by using clean_log field directly +- Maintained separate CTEs for inbound and outbound transfers +- Enhanced source chain ID extraction from logs for inbound transfers with precise log targeting + +### Column Changes + +#### Columns Removed +- `_inserted_timestamp` (deprecated) +- Removed dependency on deprecated `logs` column from actions table + +#### Columns Added +- `receipt_id` (from core__ez_actions) +- `action_index` (from core__ez_actions) +- `predecessor_id` (from receipt_predecessor_id) +- `signer_id` (from receipt_signer_id) + +#### Column Modifications +- Changed `receiver_id` to come from `receipt_receiver_id` +- Changed `method_name` to parse from `action_data :method_name` +- Changed `args` to parse from `action_data :args` +- Changed `_partition_by_block_number` to be calculated as `FLOOR(block_id, -3)` +- Improved source chain ID extraction using clean_log from logs table + +### Configuration Changes +- Updated incremental predicates to use `dynamic_range_predicate_custom` +- Added `modified_timestamp::DATE` to clustering keys +- Maintained unique key using tx_hash, destination_address, and source_address +- Maintained search optimization on tx_hash, destination_address, and source_address + +### Query Changes +- Updated WHERE clause to filter on `action_name = 'FunctionCall'` and portalbridge patterns +- Updated partition_load_manual to include partition key calculation +- Added proper type casting for method_name and args from action_data +- Maintained separate CTEs for different bridge directions: + * Outbound: vaa_withdraw with direct args parsing + * Inbound: vaa_transfer with source chain ID from logs +- Improved log parsing efficiency: + * Added upfront filtering in logs CTE to only fetch relevant transaction logs + * Added log_index = 1 condition for precise source chain ID extraction + * Used clean_log field for reliable regex extraction +- Enhanced source chain ID extraction by joining logs table with proper conditions + --- \ No newline at end of file diff --git a/models/silver/curated/defi/bridge/silver__bridge_allbridge.sql b/models/silver/curated/defi/bridge/silver__bridge_allbridge.sql index 1d0a048..58b77cc 100644 --- a/models/silver/curated/defi/bridge/silver__bridge_allbridge.sql +++ b/models/silver/curated/defi/bridge/silver__bridge_allbridge.sql @@ -1,7 +1,7 @@ {{ config( materialized = 'incremental', incremental_strategy = 'merge', - incremental_predicates = ["COALESCE(DBT_INTERNAL_DEST.block_timestamp::DATE,'2099-12-31') >= (select min(block_timestamp::DATE) from " ~ generate_tmp_view_name(this) ~ ")"], + incremental_predicates = ["dynamic_range_predicate_custom","block_timestamp::date"], merge_exclude_columns = ["inserted_timestamp"], unique_key = 'bridge_allbridge_id', cluster_by = ['block_timestamp::DATE', 'modified_timestamp::DATE'], @@ -10,39 +10,38 @@ ) }} WITH functioncall AS ( - SELECT block_id, block_timestamp, tx_hash, - method_name, - args, - logs, - receiver_id, - signer_id, + receipt_id, + action_index, + receipt_predecessor_id AS predecessor_id, + receipt_receiver_id AS receiver_id, + receipt_signer_id AS signer_id, + action_data :method_name :: STRING AS method_name, + action_data :args :: VARIANT AS args, receipt_succeeded, - _inserted_timestamp, - _partition_by_block_number + FLOOR(block_id, -3) AS _partition_by_block_number FROM - {{ ref('silver__actions_events_function_call_s3') }} + {{ ref('core__ez_actions') }} WHERE - receiver_id = 'bridge.a11bd.near' - + action_name = 'FunctionCall' + AND receiver_id = 'bridge.a11bd.near' {% if var("MANUAL_FIX") %} - AND - {{ partition_load_manual('no_buffer') }} + AND {{ partition_load_manual('no_buffer', 'floor(block_id, -3)') }} {% else %} {% if is_incremental() %} AND modified_timestamp >= ( - SELECT - MAX(modified_timestamp) - FROM - {{ this }} - ) + SELECT + MAX(modified_timestamp) + FROM + {{ this }} + ) {% endif %} {% endif %} ), -metadata AS ( +metadata AS ( SELECT contract_address, NAME, @@ -57,6 +56,8 @@ outbound_near AS ( block_id, block_timestamp, tx_hash, + receipt_id, + action_index, args :create_lock_args :token_id :: STRING AS token_address, args :create_lock_args :amount :: INT AS amount_raw, args :fee :: INT AS amount_fee_raw, @@ -71,7 +72,6 @@ outbound_near AS ( receipt_succeeded, method_name, 'outbound' AS direction, - _inserted_timestamp, _partition_by_block_number FROM functioncall @@ -84,6 +84,8 @@ inbound_to_near AS ( block_id, block_timestamp, tx_hash, + receipt_id, + action_index, args :token_id :: STRING AS token_address, args :unlock_args :amount :: INT AS amount_raw, args :fee :: INT AS amount_fee_raw, @@ -98,7 +100,6 @@ inbound_to_near AS ( receipt_succeeded, method_name, 'inbound' AS direction, - _inserted_timestamp, _partition_by_block_number FROM functioncall @@ -131,12 +132,11 @@ FINAL AS ( FINAL_UNION JOIN metadata m ON FINAL_UNION.token_address = m.contract_address - ) SELECT *, {{ dbt_utils.generate_surrogate_key( - ['tx_hash'] + ['receipt_id', 'action_index'] ) }} AS bridge_allbridge_id, SYSDATE() AS inserted_timestamp, SYSDATE() AS modified_timestamp, diff --git a/models/silver/curated/defi/bridge/silver__bridge_allbridge.yml b/models/silver/curated/defi/bridge/silver__bridge_allbridge.yml index ff11497..6b24a7d 100644 --- a/models/silver/curated/defi/bridge/silver__bridge_allbridge.yml +++ b/models/silver/curated/defi/bridge/silver__bridge_allbridge.yml @@ -3,7 +3,7 @@ version: 2 models: - name: silver__bridge_allbridge description: |- - Extracts data from actions table to build a view of bridge activity through the Allbridge. + Extracts data from core__ez_actions to build a view of bridge activity through the Allbridge. tests: - dbt_utils.recency: datepart: week @@ -16,22 +16,36 @@ models: - name: BLOCK_TIMESTAMP description: "{{ doc('block_timestamp')}}" + tests: + - not_null + - dbt_expectations.expect_column_values_to_be_in_type_list: + column_type_list: + - TIMESTAMP_NTZ - name: TX_HASH description: "{{ doc('tx_hash')}}" tests: - - not_null: - where: _inserted_timestamp <= CURRENT_TIMESTAMP - interval '1 hour' + - not_null - dbt_expectations.expect_column_values_to_be_in_type_list: column_type_list: - STRING - VARCHAR + - name: RECEIPT_ID + description: "{{ doc('receipt_id')}}" + tests: + - not_null + + - name: ACTION_INDEX + description: "{{ doc('action_index')}}" + tests: + - not_null + - name: TOKEN_ADDRESS description: "{{ doc('token_contract')}}" tests: - not_null: - where: _inserted_timestamp <= CURRENT_TIMESTAMP - interval '1 hour' AND receipt_succeeded + where: receipt_succeeded - dbt_expectations.expect_column_values_to_be_in_type_list: column_type_list: - STRING @@ -41,7 +55,7 @@ models: description: "{{ doc('amount_raw')}}" tests: - not_null: - where: _inserted_timestamp <= CURRENT_TIMESTAMP - interval '1 hour' AND receipt_succeeded + where: receipt_succeeded - dbt_expectations.expect_column_values_to_be_in_type_list: column_type_list: - INTEGER @@ -51,7 +65,7 @@ models: description: "{{ doc('amount_adj')}}" tests: - not_null: - where: _inserted_timestamp <= CURRENT_TIMESTAMP - interval '1 hour' AND receipt_succeeded + where: receipt_succeeded - dbt_expectations.expect_column_values_to_be_in_type_list: column_type_list: - INTEGER @@ -61,7 +75,7 @@ models: description: "{{ doc('amount_raw')}}" tests: - not_null: - where: _inserted_timestamp <= CURRENT_TIMESTAMP - interval '1 hour' AND receipt_succeeded + where: receipt_succeeded - dbt_expectations.expect_column_values_to_be_in_type_list: column_type_list: - INTEGER @@ -74,7 +88,7 @@ models: description: "{{ doc('destination_address')}}" tests: - not_null: - where: _inserted_timestamp <= CURRENT_TIMESTAMP - interval '1 hour' AND receipt_succeeded + where: receipt_succeeded - dbt_expectations.expect_column_values_to_be_in_type_list: column_type_list: - STRING @@ -83,17 +97,29 @@ models: - name: SOURCE_ADDRESS description: "{{ doc('source_address')}}" - - name: PLATFORM - description: "{{ doc('platform')}}" - - name: BRIDGE_ADDRESS description: "{{ doc('contract_address')}}" + tests: + - not_null + + - name: PLATFORM + description: "{{ doc('platform')}}" + tests: + - not_null + - accepted_values: + values: ['allbridge'] - name: DESTINATION_CHAIN_ID description: "{{ doc('chain_id')}}" + tests: + - not_null: + where: receipt_succeeded - name: SOURCE_CHAIN_ID description: "{{ doc('chain_id')}}" + tests: + - not_null: + where: receipt_succeeded - name: ARGS description: "{{ doc('args')}}" @@ -105,12 +131,15 @@ models: - name: METHOD_NAME description: "{{ doc('method_name')}}" + tests: + - not_null - name: DIRECTION description: "{{ doc('direction')}}" - - - name: _INSERTED_TIMESTAMP - description: "{{ doc('_inserted_timestamp')}}" + tests: + - not_null + - accepted_values: + values: ['inbound', 'outbound'] - name: _PARTITION_BY_BLOCK_NUMBER description: "{{ doc('_partition_by_block_number')}}" @@ -119,6 +148,7 @@ models: description: "{{ doc('id')}}" tests: - unique + - not_null - name: INSERTED_TIMESTAMP description: "{{ doc('inserted_timestamp')}}" diff --git a/models/silver/curated/defi/bridge/silver__bridge_multichain.sql b/models/silver/curated/defi/bridge/silver__bridge_multichain.sql index 349f31b..ee8e95b 100644 --- a/models/silver/curated/defi/bridge/silver__bridge_multichain.sql +++ b/models/silver/curated/defi/bridge/silver__bridge_multichain.sql @@ -1,7 +1,7 @@ {{ config( materialized = 'incremental', incremental_strategy = 'merge', - incremental_predicates = ["COALESCE(DBT_INTERNAL_DEST.block_timestamp::DATE,'2099-12-31') >= (select min(block_timestamp::DATE) from " ~ generate_tmp_view_name(this) ~ ")"], + incremental_predicates = ["dynamic_range_predicate_custom","block_timestamp::date"], merge_exclude_columns = ["inserted_timestamp"], unique_key = 'bridge_multichain_id', cluster_by = ['block_timestamp::DATE', 'modified_timestamp::DATE'], @@ -10,44 +10,44 @@ ) }} WITH functioncall AS ( - SELECT block_id, block_timestamp, tx_hash, - method_name, - args, - logs, - receiver_id, - signer_id, + receipt_id, + action_index, + receipt_predecessor_id AS predecessor_id, + receipt_receiver_id AS receiver_id, + receipt_signer_id AS signer_id, + action_data :method_name :: STRING AS method_name, + action_data :args :: VARIANT AS args, receipt_succeeded, - _inserted_timestamp, - _partition_by_block_number + FLOOR(block_id, -3) AS _partition_by_block_number FROM - {{ ref('silver__actions_events_function_call_s3') }} + {{ ref('core__ez_actions') }} WHERE - method_name = 'ft_transfer' -- Both directions utilize ft_transfer - + action_name = 'FunctionCall' + AND action_data :method_name :: STRING = 'ft_transfer' -- Both directions utilize ft_transfer {% if var("MANUAL_FIX") %} - AND - {{ partition_load_manual('no_buffer') }} + AND {{ partition_load_manual('no_buffer', 'floor(block_id, -3)') }} {% else %} {% if is_incremental() %} AND modified_timestamp >= ( - SELECT - MAX(modified_timestamp) - FROM - {{ this }} - ) + SELECT + MAX(modified_timestamp) + FROM + {{ this }} + ) {% endif %} {% endif %} - ), inbound AS ( SELECT block_id, block_timestamp, tx_hash, + receipt_id, + action_index, receiver_id AS token_address, args :amount :: INT AS amount_raw, args :memo :: STRING AS memo, @@ -61,8 +61,7 @@ inbound AS ( receipt_succeeded, method_name, 'inbound' AS direction, - _partition_by_block_number, - _inserted_timestamp + _partition_by_block_number FROM functioncall WHERE @@ -73,6 +72,8 @@ outbound AS ( block_id, block_timestamp, tx_hash, + receipt_id, + action_index, receiver_id AS token_address, args :amount :: INT AS amount_raw, args :memo :: STRING AS memo, @@ -89,8 +90,7 @@ outbound AS ( receipt_succeeded, method_name, 'outbound' AS direction, - _partition_by_block_number, - _inserted_timestamp + _partition_by_block_number FROM functioncall WHERE @@ -113,7 +113,7 @@ SELECT 'mpc-multichain.near' AS bridge_address, 'multichain' AS platform, {{ dbt_utils.generate_surrogate_key( - ['tx_hash'] + ['receipt_id', 'action_index'] ) }} AS bridge_multichain_id, SYSDATE() AS inserted_timestamp, SYSDATE() AS modified_timestamp, diff --git a/models/silver/curated/defi/bridge/silver__bridge_multichain.yml b/models/silver/curated/defi/bridge/silver__bridge_multichain.yml index d1a1850..f86c533 100644 --- a/models/silver/curated/defi/bridge/silver__bridge_multichain.yml +++ b/models/silver/curated/defi/bridge/silver__bridge_multichain.yml @@ -3,7 +3,12 @@ version: 2 models: - name: silver__bridge_multichain description: |- - Extracts data from actions table to build a view of historical bridge activity through the Multichain Bridge. + Extracts data from core__ez_actions to build a view of bridge activity through the Multichain bridge. + tests: + - dbt_utils.recency: + datepart: week + field: block_timestamp + interval: 1 columns: - name: BLOCK_ID @@ -11,39 +16,103 @@ models: - name: BLOCK_TIMESTAMP description: "{{ doc('block_timestamp')}}" + tests: + - not_null + - dbt_expectations.expect_column_values_to_be_in_type_list: + column_type_list: + - TIMESTAMP_NTZ - name: TX_HASH description: "{{ doc('tx_hash')}}" + tests: + - not_null + - dbt_expectations.expect_column_values_to_be_in_type_list: + column_type_list: + - STRING + - VARCHAR + + - name: RECEIPT_ID + description: "{{ doc('receipt_id')}}" + tests: + - not_null + + - name: ACTION_INDEX + description: "{{ doc('action_index')}}" + tests: + - not_null - name: TOKEN_ADDRESS description: "{{ doc('token_contract')}}" + tests: + - not_null: + where: receipt_succeeded + - dbt_expectations.expect_column_values_to_be_in_type_list: + column_type_list: + - STRING + - VARCHAR - name: AMOUNT_RAW description: "{{ doc('amount_raw')}}" + tests: + - not_null: + where: receipt_succeeded + - dbt_expectations.expect_column_values_to_be_in_type_list: + column_type_list: + - INTEGER + - NUMBER - name: AMOUNT_ADJ description: "{{ doc('amount_adj')}}" + tests: + - not_null: + where: receipt_succeeded + - dbt_expectations.expect_column_values_to_be_in_type_list: + column_type_list: + - INTEGER + - NUMBER - name: MEMO description: "{{ doc('memo')}}" - name: DESTINATION_ADDRESS description: "{{ doc('destination_address')}}" + tests: + - not_null: + where: receipt_succeeded + - dbt_expectations.expect_column_values_to_be_in_type_list: + column_type_list: + - STRING + - VARCHAR - name: SOURCE_ADDRESS description: "{{ doc('source_address')}}" - - name: PLATFORM - description: "{{ doc('platform')}}" - - name: BRIDGE_ADDRESS description: "{{ doc('contract_address')}}" + tests: + - not_null + + - name: PLATFORM + description: "{{ doc('platform')}}" + tests: + - not_null + - accepted_values: + values: ['multichain'] - name: DESTINATION_CHAIN_ID description: "{{ doc('chain_id')}}" + tests: + - not_null: + where: receipt_succeeded - name: SOURCE_CHAIN_ID description: "{{ doc('chain_id')}}" + tests: + - not_null: + where: receipt_succeeded + + - name: ARGS + description: "{{ doc('args')}}" - name: RECEIPT_SUCCEEDED description: "{{ doc('receipt_succeeded')}}" @@ -52,12 +121,15 @@ models: - name: METHOD_NAME description: "{{ doc('method_name')}}" + tests: + - not_null - name: DIRECTION description: "{{ doc('direction')}}" - - - name: _INSERTED_TIMESTAMP - description: "{{ doc('_inserted_timestamp')}}" + tests: + - not_null + - accepted_values: + values: ['inbound', 'outbound'] - name: _PARTITION_BY_BLOCK_NUMBER description: "{{ doc('_partition_by_block_number')}}" @@ -66,6 +138,7 @@ models: description: "{{ doc('id')}}" tests: - unique + - not_null - name: INSERTED_TIMESTAMP description: "{{ doc('inserted_timestamp')}}" diff --git a/models/silver/curated/defi/bridge/silver__bridge_rainbow.sql b/models/silver/curated/defi/bridge/silver__bridge_rainbow.sql index 276701d..4aa8292 100644 --- a/models/silver/curated/defi/bridge/silver__bridge_rainbow.sql +++ b/models/silver/curated/defi/bridge/silver__bridge_rainbow.sql @@ -10,28 +10,51 @@ ) }} WITH functioncall AS ( - SELECT block_id, block_timestamp, tx_hash, - method_name, - args, - logs, - receiver_id, - signer_id, + receipt_id, + action_index, + receipt_predecessor_id AS predecessor_id, + receipt_receiver_id AS receiver_id, + receipt_signer_id AS signer_id, + action_data :method_name :: STRING AS method_name, + action_data :args :: VARIANT AS args, receipt_succeeded, - _inserted_timestamp, - _partition_by_block_number + FLOOR(block_id, -3) AS _partition_by_block_number FROM - {{ ref('silver__actions_events_function_call_s3') }} - + {{ ref('core__ez_actions') }} + WHERE + action_name = 'FunctionCall' {% if var("MANUAL_FIX") %} - WHERE - {{ partition_load_manual('no_buffer') }} + AND {{ partition_load_manual('no_buffer', 'floor(block_id, -3)') }} {% else %} {% if is_incremental() %} - WHERE modified_timestamp >= ( + AND modified_timestamp >= ( + SELECT + MAX(modified_timestamp) + FROM + {{ this }} + ) + {% endif %} + {% endif %} +), + +logs AS ( + SELECT + tx_hash, + receipt_id, + receiver_id, + predecessor_id, + clean_log, + log_index + FROM {{ ref('silver__logs_s3') }} + {% if var("MANUAL_FIX") %} + WHERE {{ partition_load_manual('no_buffer') }} + {% else %} + {% if is_incremental() %} + WHERE modified_timestamp >= ( SELECT MAX(modified_timestamp) FROM @@ -40,6 +63,7 @@ WITH functioncall AS ( {% endif %} {% endif %} ), + outbound_near_to_aurora AS ( -- ft_transfer_call sends token to aurora -- EVM address logged in method action under msg @@ -47,6 +71,8 @@ outbound_near_to_aurora AS ( block_id, block_timestamp, tx_hash, + receipt_id, + action_index, receiver_id AS token_address, args :amount :: INT AS amount_raw, args :memo :: STRING AS memo, @@ -62,7 +88,6 @@ outbound_near_to_aurora AS ( method_name, 'aurora' AS bridge_address, 'outbound' AS direction, - _inserted_timestamp, _partition_by_block_number FROM functioncall @@ -74,6 +99,7 @@ outbound_near_to_aurora AS ( OR receiver_id LIKE '%.factory.bridge.near' ) ), + inbound_aurora_to_near AS ( -- ft_transfer called on token contract, signed by relay.aurora -- recipient in actions JSON of ft_transfer, signer evm address in log of "submit" method @@ -82,6 +108,8 @@ inbound_aurora_to_near AS ( block_id, block_timestamp, tx_hash, + receipt_id, + action_index, receiver_id AS token_address, args :amount :: INT AS amount_raw, args :memo :: STRING AS memo, @@ -92,7 +120,6 @@ inbound_aurora_to_near AS ( method_name, 'aurora' AS bridge_address, 'inbound' AS direction, - _inserted_timestamp, _partition_by_block_number, args FROM @@ -111,29 +138,38 @@ inbound_aurora_to_near AS ( ) ) ), + inbound_a2n_src_address AS ( SELECT tx_hash, + receipt_id, REGEXP_SUBSTR( - logs [0] :: STRING, + clean_log, '0x[0-9a-fA-F]{40}' ) AS source_address FROM - functioncall + logs WHERE - tx_hash IN ( - SELECT - tx_hash - FROM - inbound_aurora_to_near - ) - AND method_name = 'submit' + receiver_id = 'aurora' + AND predecessor_id = 'relay.aurora' + AND log_index = 0 + AND clean_log like 'signer_address%' + AND + tx_hash IN ( + SELECT + tx_hash + FROM + inbound_aurora_to_near + ) ), + inbound_a2n_final AS ( SELECT A.block_id, A.block_timestamp, A.tx_hash, + A.receipt_id, + A.action_index, A.token_address, A.amount_raw, A.memo, @@ -145,13 +181,13 @@ inbound_a2n_final AS ( A.method_name, A.bridge_address, A.direction, - A._inserted_timestamp, A._partition_by_block_number FROM inbound_aurora_to_near A LEFT JOIN inbound_a2n_src_address b ON A.tx_hash = b.tx_hash ), + outbound_near_to_eth AS ( -- determined by finish_withdraw method call on factory.bridge.near -- if signed by aurora relayer, likely aurora<->eth bridge @@ -159,6 +195,8 @@ outbound_near_to_eth AS ( block_id, block_timestamp, tx_hash, + receipt_id, + action_index, signer_id = 'relay.aurora' AS is_aurora, receiver_id AS token_address, args :amount :: INT AS amount_raw, @@ -183,7 +221,6 @@ outbound_near_to_eth AS ( method_name, 'factory.bridge.near' AS bridge_address, 'outbound' AS direction, - _inserted_timestamp, _partition_by_block_number FROM functioncall @@ -199,8 +236,64 @@ outbound_near_to_eth AS ( ) AND method_name = 'withdraw' ), +inbound_eth_to_near_txs AS ( + SELECT + DISTINCT tx_hash + FROM + functioncall + WHERE + receiver_id in ('factory.bridge.near', 'aurora') + AND method_name = 'finish_deposit' +), +inbound_logs AS ( + SELECT + receipt_id, + ARRAY_AGG(clean_log) WITHIN GROUP (ORDER BY log_index) AS logs + FROM + logs + WHERE + tx_hash IN ( + SELECT + tx_hash + FROM + inbound_eth_to_near_txs + ) + GROUP BY + 1 +), inbound_eth_to_near AS ( -- determined by finish_deposit method call on factory.bridge.near + SELECT + fc.tx_hash, + fc.receipt_id, + block_id, + block_timestamp, + method_name, + args, + receiver_id, + signer_id, + logs, + receipt_succeeded, + _partition_by_block_number + FROM + functioncall fc + LEFT JOIN inbound_logs l + ON fc.receipt_id = l.receipt_id + WHERE + tx_hash IN ( + SELECT + DISTINCT tx_hash + FROM + inbound_eth_to_near_txs + ) + AND method_name IN ( + 'mint', + 'ft_transfer_call', + 'deposit', + 'finish_deposit' + ) +), +aggregate_inbound_eth_to_near_txs AS ( SELECT tx_hash, MIN(block_id) AS block_id, @@ -219,26 +312,9 @@ inbound_eth_to_near AS ( ) ) AS actions, booland_agg(receipt_succeeded) AS receipt_succeeded, - MIN(_inserted_timestamp) AS _inserted_timestamp, MIN(_partition_by_block_number) AS _partition_by_block_number FROM - functioncall - WHERE - tx_hash IN ( - SELECT - DISTINCT tx_hash - FROM - functioncall - WHERE - receiver_id in ('factory.bridge.near', 'aurora') - AND method_name = 'finish_deposit' - ) - AND method_name IN ( - 'mint', - 'ft_transfer_call', - 'deposit', - 'finish_deposit' - ) + inbound_eth_to_near GROUP BY 1 ), @@ -281,10 +357,9 @@ inbound_e2n_final_ft AS ( ) AS method_name, 'factory.bridge.near' AS bridge_address, 'inbound' AS direction, - _inserted_timestamp, _partition_by_block_number FROM - inbound_eth_to_near + aggregate_inbound_eth_to_near_txs WHERE actions :finish_deposit :receiver_id :: STRING != 'aurora' ), @@ -314,10 +389,9 @@ inbound_e2n_final_eth AS ( 'mint' AS method_name, 'prover.bridge.near' AS bridge_address, 'inbound' AS direction, - _inserted_timestamp, _partition_by_block_number FROM - inbound_eth_to_near + aggregate_inbound_eth_to_near_txs WHERE actions :finish_deposit :receiver_id :: STRING = 'aurora' @@ -338,7 +412,6 @@ FINAL AS ( method_name, bridge_address, direction, - _inserted_timestamp, _partition_by_block_number FROM outbound_near_to_aurora @@ -358,7 +431,6 @@ FINAL AS ( method_name, bridge_address, direction, - _inserted_timestamp, _partition_by_block_number FROM inbound_a2n_final @@ -378,7 +450,6 @@ FINAL AS ( method_name, bridge_address, direction, - _inserted_timestamp, _partition_by_block_number FROM outbound_near_to_eth @@ -398,7 +469,6 @@ FINAL AS ( method_name, bridge_address, direction, - _inserted_timestamp, _partition_by_block_number FROM inbound_e2n_final_ft @@ -418,7 +488,6 @@ FINAL AS ( method_name, bridge_address, direction, - _inserted_timestamp, _partition_by_block_number FROM inbound_e2n_final_eth diff --git a/models/silver/curated/defi/bridge/silver__bridge_rainbow.yml b/models/silver/curated/defi/bridge/silver__bridge_rainbow.yml index fd56f86..b7e75fc 100644 --- a/models/silver/curated/defi/bridge/silver__bridge_rainbow.yml +++ b/models/silver/curated/defi/bridge/silver__bridge_rainbow.yml @@ -3,7 +3,7 @@ version: 2 models: - name: silver__bridge_rainbow description: |- - Extracts data from actions table to build a view of bridge activity through the Rainbow Bridge. Methods defined and explained [here](https://github.com/aurora-is-near/rainbow-token-connector/tree) + Extracts data from core__ez_actions and silver__logs_s3 to build a view of bridge activity through the Rainbow Bridge. Methods defined and explained [here](https://github.com/aurora-is-near/rainbow-token-connector/tree) tests: - dbt_utils.recency: datepart: week @@ -16,22 +16,36 @@ models: - name: BLOCK_TIMESTAMP description: "{{ doc('block_timestamp')}}" + tests: + - not_null + - dbt_expectations.expect_column_values_to_be_in_type_list: + column_type_list: + - TIMESTAMP_NTZ - name: TX_HASH description: "{{ doc('tx_hash')}}" tests: - - not_null: - where: _inserted_timestamp <= CURRENT_TIMESTAMP - interval '1 hour' + - not_null - dbt_expectations.expect_column_values_to_be_in_type_list: column_type_list: - STRING - VARCHAR + - name: RECEIPT_ID + description: "{{ doc('receipt_id')}}" + tests: + - not_null + + - name: ACTION_INDEX + description: "{{ doc('action_index')}}" + tests: + - not_null + - name: TOKEN_ADDRESS description: "{{ doc('token_contract')}}" tests: - not_null: - where: _inserted_timestamp <= CURRENT_TIMESTAMP - interval '1 hour' AND receipt_succeeded + where: receipt_succeeded - dbt_expectations.expect_column_values_to_be_in_type_list: column_type_list: - STRING @@ -41,7 +55,7 @@ models: description: "{{ doc('amount_raw')}}" tests: - not_null: - where: _inserted_timestamp <= CURRENT_TIMESTAMP - interval '1 hour' AND receipt_succeeded + where: receipt_succeeded - dbt_expectations.expect_column_values_to_be_in_type_list: column_type_list: - INTEGER @@ -51,7 +65,7 @@ models: description: "{{ doc('amount_adj')}}" tests: - not_null: - where: _inserted_timestamp <= CURRENT_TIMESTAMP - interval '1 hour' AND receipt_succeeded + where: receipt_succeeded - dbt_expectations.expect_column_values_to_be_in_type_list: column_type_list: - INTEGER @@ -64,7 +78,7 @@ models: description: "{{ doc('destination_address')}}" tests: - not_null: - where: _inserted_timestamp <= CURRENT_TIMESTAMP - interval '1 hour' AND receipt_succeeded + where: receipt_succeeded - dbt_expectations.expect_column_values_to_be_in_type_list: column_type_list: - STRING @@ -75,15 +89,27 @@ models: - name: BRIDGE_ADDRESS description: "{{ doc('contract_address')}}" + tests: + - not_null - name: PLATFORM description: "{{ doc('platform')}}" + tests: + - not_null + - accepted_values: + values: ['rainbow'] - name: DESTINATION_CHAIN_ID description: "{{ doc('chain_id')}}" + tests: + - not_null: + where: receipt_succeeded - name: SOURCE_CHAIN_ID description: "{{ doc('chain_id')}}" + tests: + - not_null: + where: receipt_succeeded - name: RECEIPT_SUCCEEDED description: "{{ doc('receipt_succeeded')}}" @@ -92,12 +118,15 @@ models: - name: METHOD_NAME description: "{{ doc('method_name')}}" + tests: + - not_null - name: DIRECTION description: "{{ doc('direction')}}" - - - name: _INSERTED_TIMESTAMP - description: "{{ doc('_inserted_timestamp')}}" + tests: + - not_null + - accepted_values: + values: ['inbound', 'outbound'] - name: _PARTITION_BY_BLOCK_NUMBER description: "{{ doc('_partition_by_block_number')}}" @@ -106,6 +135,7 @@ models: description: "{{ doc('id')}}" tests: - unique + - not_null - name: INSERTED_TIMESTAMP description: "{{ doc('inserted_timestamp')}}" diff --git a/models/silver/curated/defi/bridge/silver__bridge_wormhole.sql b/models/silver/curated/defi/bridge/silver__bridge_wormhole.sql index 9d397a2..608dd68 100644 --- a/models/silver/curated/defi/bridge/silver__bridge_wormhole.sql +++ b/models/silver/curated/defi/bridge/silver__bridge_wormhole.sql @@ -1,7 +1,7 @@ {{ config( materialized = 'incremental', incremental_strategy = 'merge', - incremental_predicates = ["COALESCE(DBT_INTERNAL_DEST.block_timestamp::DATE,'2099-12-31') >= (select min(block_timestamp::DATE) from " ~ generate_tmp_view_name(this) ~ ")"], + incremental_predicates = ["dynamic_range_predicate_custom","block_timestamp::date"], merge_exclude_columns = ["inserted_timestamp"], unique_key = 'bridge_wormhole_id', cluster_by = ['block_timestamp::DATE', 'modified_timestamp::DATE'], @@ -10,30 +10,61 @@ ) }} WITH functioncall AS ( - SELECT block_id, block_timestamp, tx_hash, - method_name, - args, - logs, - receiver_id, - signer_id, + receipt_id, + action_index, + receipt_predecessor_id AS predecessor_id, + receipt_receiver_id AS receiver_id, + receipt_signer_id AS signer_id, + action_data :method_name :: STRING AS method_name, + action_data :args :: VARIANT AS args, receipt_succeeded, - _inserted_timestamp, - _partition_by_block_number + FLOOR(block_id, -3) AS _partition_by_block_number FROM - {{ ref('silver__actions_events_function_call_s3') }} + {{ ref('core__ez_actions') }} WHERE - (signer_id LIKE '%.portalbridge.near' - OR receiver_id LIKE '%.portalbridge.near') - + action_name = 'FunctionCall' + AND ( + receipt_signer_id LIKE '%.portalbridge.near' + OR receipt_receiver_id LIKE '%.portalbridge.near' + ) {% if var("MANUAL_FIX") %} - AND {{ partition_load_manual('no_buffer') }} + AND {{ partition_load_manual('no_buffer', 'floor(block_id, -3)') }} {% else %} {% if is_incremental() %} AND modified_timestamp >= ( + SELECT + MAX(modified_timestamp) + FROM + {{ this }} + ) + {% endif %} + {% endif %} +), +logs AS ( + SELECT + tx_hash, + receipt_id, + receiver_id, + predecessor_id, + clean_log, + log_index + FROM {{ ref('silver__logs_s3') }} + WHERE + tx_hash in ( + SELECT + DISTINCT tx_hash + FROM + functioncall + ) + {% if var("MANUAL_FIX") %} + AND {{ partition_load_manual('no_buffer') }} + {% else %} + {% if is_incremental() %} + AND modified_timestamp >= ( SELECT MAX(modified_timestamp) FROM @@ -48,9 +79,9 @@ outbound_near AS ( block_id, block_timestamp, tx_hash, + receipt_id, + action_index, receiver_id AS token_address, - logs, - args, args :amount :: INT AS amount_raw, args :memo :: STRING AS memo, args :receiver :: STRING AS destination_address, @@ -60,7 +91,6 @@ outbound_near AS ( receipt_succeeded, method_name, 'outbound' AS direction, - _inserted_timestamp, _partition_by_block_number FROM functioncall @@ -76,19 +106,17 @@ inbound_to_near AS ( block_id, block_timestamp, tx_hash, + receipt_id, + action_index, receiver_id AS token_address, - logs, - args, args :amount :: INT AS amount_raw, args :memo :: STRING AS memo, args :account_id :: STRING AS destination_address, NULL AS source_address, - -- "In eth is Weth contract -- jum" - args: recipient_chain :: INT AS destination_chain_id, + args :recipient_chain :: INT AS destination_chain_id, receipt_succeeded, method_name, 'inbound' AS direction, - _inserted_timestamp, _partition_by_block_number FROM functioncall @@ -98,30 +126,32 @@ inbound_to_near AS ( inbound_src_id AS ( SELECT tx_hash, + receipt_id, REGEXP_SUBSTR( - logs [1], + clean_log, '\\d+' ) :: INT AS wormhole_chain_id FROM - functioncall + logs WHERE + receiver_id = 'contract.portalbridge.near' tx_hash IN ( SELECT DISTINCT tx_hash FROM inbound_to_near ) - AND method_name = 'submit_vaa' - AND receiver_id = 'contract.portalbridge.near' + AND log_index = 1 + ), inbound_final AS ( SELECT block_id, block_timestamp, i.tx_hash, + i.receipt_id, + i.action_index, token_address, - logs, - args, amount_raw, memo, destination_address, @@ -131,7 +161,6 @@ inbound_final AS ( receipt_succeeded, method_name, direction, - _inserted_timestamp, _partition_by_block_number FROM inbound_to_near i @@ -153,7 +182,6 @@ FINAL AS ( receipt_succeeded, method_name, direction, - _inserted_timestamp, _partition_by_block_number FROM outbound_near @@ -172,7 +200,6 @@ FINAL AS ( receipt_succeeded, method_name, direction, - _inserted_timestamp, _partition_by_block_number FROM inbound_final @@ -183,7 +210,7 @@ SELECT 'portalbridge.near' AS bridge_address, 'wormhole' AS platform, {{ dbt_utils.generate_surrogate_key( - ['tx_hash'] + ['tx_hash', 'destination_address', 'source_address'] ) }} AS bridge_wormhole_id, SYSDATE() AS inserted_timestamp, SYSDATE() AS modified_timestamp, diff --git a/models/silver/curated/defi/bridge/silver__bridge_wormhole.yml b/models/silver/curated/defi/bridge/silver__bridge_wormhole.yml index a665f2b..f09ff87 100644 --- a/models/silver/curated/defi/bridge/silver__bridge_wormhole.yml +++ b/models/silver/curated/defi/bridge/silver__bridge_wormhole.yml @@ -3,7 +3,7 @@ version: 2 models: - name: silver__bridge_wormhole description: |- - Extracts data from actions table to build a view of bridge activity through the Wormhople Portal Bridge. + Extracts data from core__ez_actions and silver__logs_s3 to build a view of bridge activity through the Wormhole Portal Bridge. tests: - dbt_utils.recency: datepart: month @@ -16,22 +16,36 @@ models: - name: BLOCK_TIMESTAMP description: "{{ doc('block_timestamp')}}" + tests: + - not_null + - dbt_expectations.expect_column_values_to_be_in_type_list: + column_type_list: + - TIMESTAMP_NTZ - name: TX_HASH description: "{{ doc('tx_hash')}}" tests: - - not_null: - where: _inserted_timestamp <= CURRENT_TIMESTAMP - interval '1 hour' + - not_null - dbt_expectations.expect_column_values_to_be_in_type_list: column_type_list: - STRING - VARCHAR + - name: RECEIPT_ID + description: "{{ doc('receipt_id')}}" + tests: + - not_null + + - name: ACTION_INDEX + description: "{{ doc('action_index')}}" + tests: + - not_null + - name: TOKEN_ADDRESS description: "{{ doc('token_contract')}}" tests: - not_null: - where: _inserted_timestamp <= CURRENT_TIMESTAMP - interval '1 hour' AND receipt_succeeded + where: receipt_succeeded - dbt_expectations.expect_column_values_to_be_in_type_list: column_type_list: - STRING @@ -41,7 +55,7 @@ models: description: "{{ doc('amount_raw')}}" tests: - not_null: - where: _inserted_timestamp <= CURRENT_TIMESTAMP - interval '1 hour' AND receipt_succeeded + where: receipt_succeeded - dbt_expectations.expect_column_values_to_be_in_type_list: column_type_list: - INTEGER @@ -51,7 +65,7 @@ models: description: "{{ doc('amount_adj')}}" tests: - not_null: - where: _inserted_timestamp <= CURRENT_TIMESTAMP - interval '1 hour' AND receipt_succeeded + where: receipt_succeeded - dbt_expectations.expect_column_values_to_be_in_type_list: column_type_list: - INTEGER @@ -64,7 +78,7 @@ models: description: "{{ doc('destination_address')}}" tests: - not_null: - where: _inserted_timestamp <= CURRENT_TIMESTAMP - interval '1 hour' AND receipt_succeeded + where: receipt_succeeded - dbt_expectations.expect_column_values_to_be_in_type_list: column_type_list: - STRING @@ -73,17 +87,23 @@ models: - name: SOURCE_ADDRESS description: "{{ doc('source_address')}}" - - name: PLATFORM - description: "{{ doc('platform')}}" - - name: BRIDGE_ADDRESS description: "{{ doc('contract_address')}}" + tests: + - not_null + + - name: PLATFORM + description: "{{ doc('platform')}}" + tests: + - not_null + - accepted_values: + values: ['wormhole'] - name: DESTINATION_CHAIN_ID description: "{{ doc('chain_id')}}" tests: - not_null: - where: _inserted_timestamp <= CURRENT_TIMESTAMP - interval '1 hour' AND receipt_succeeded + where: receipt_succeeded - dbt_expectations.expect_column_values_to_be_in_type_list: column_type_list: - NUMBER @@ -93,7 +113,7 @@ models: description: "{{ doc('chain_id')}}" tests: - not_null: - where: _inserted_timestamp <= CURRENT_TIMESTAMP - interval '1 hour' AND receipt_succeeded + where: receipt_succeeded - dbt_expectations.expect_column_values_to_be_in_type_list: column_type_list: - NUMBER @@ -106,12 +126,15 @@ models: - name: METHOD_NAME description: "{{ doc('method_name')}}" + tests: + - not_null - name: DIRECTION description: "{{ doc('direction')}}" - - - name: _INSERTED_TIMESTAMP - description: "{{ doc('_inserted_timestamp')}}" + tests: + - not_null + - accepted_values: + values: ['inbound', 'outbound'] - name: _PARTITION_BY_BLOCK_NUMBER description: "{{ doc('_partition_by_block_number')}}" @@ -120,6 +143,7 @@ models: description: "{{ doc('id')}}" tests: - unique + - not_null - name: INSERTED_TIMESTAMP description: "{{ doc('inserted_timestamp')}}"