From 141cae7f7e9954c39fb911c55e38261b734e2db1 Mon Sep 17 00:00:00 2001 From: Jack Forgash <58153492+forgxyz@users.noreply.github.com> Date: Tue, 22 Apr 2025 13:40:54 -0600 Subject: [PATCH] burrow models refactor --- models/MIGRATION_CHANGELOG.md | 235 ++++++++++++++++++ .../lending/burrow/silver__burrow_borrows.sql | 37 +-- .../lending/burrow/silver__burrow_borrows.yml | 24 +- .../burrow/silver__burrow_collaterals.sql | 90 ++++--- .../burrow/silver__burrow_collaterals.yml | 24 +- .../burrow/silver__burrow_deposits.sql | 79 ++++-- .../burrow/silver__burrow_deposits.yml | 24 +- .../lending/burrow/silver__burrow_repays.sql | 88 ++++--- .../lending/burrow/silver__burrow_repays.yml | 24 +- .../burrow/silver__burrow_withdraws.sql | 77 ++++-- .../burrow/silver__burrow_withdraws.yml | 24 +- 11 files changed, 549 insertions(+), 177 deletions(-) diff --git a/models/MIGRATION_CHANGELOG.md b/models/MIGRATION_CHANGELOG.md index 9d9521b..c6b0ce1 100644 --- a/models/MIGRATION_CHANGELOG.md +++ b/models/MIGRATION_CHANGELOG.md @@ -37,4 +37,239 @@ The gold model `nft__fact_nft_transfers.sql` needs to be updated: 1. Replace `action_id` with `receipt_id` in SELECT statement 2. Remove `_inserted_timestamp` from COALESCE logic in inserted_timestamp/modified_timestamp +--- + +## silver__burrow_borrows + +### Major Changes +- Model has been refactored to use `core__ez_actions` directly +- Maintains same business logic and data structure, allowing for incremental processing + +### Column Changes + +#### Columns Removed +- `action_id` (replaced by receipt_id + action_index combination) +- `_inserted_timestamp` (deprecated) + +#### Columns Added +- `receipt_id` (from core__ez_actions) +- `action_index` (from core__ez_actions) +- `predecessor_id` (from receipt_predecessor_id) +- `signer_id` (from receipt_signer_id) + +#### Column Modifications +- Changed `receiver_id` to come from `receipt_receiver_id` +- Changed `method_name` to parse from `action_data :method_name` +- Changed `args` to parse from `action_data :args` +- Changed `_partition_by_block_number` to be calculated as `FLOOR(block_id, -3)` + +### Configuration Changes +- Updated incremental predicates to use `dynamic_range_predicate_custom` +- Added `modified_timestamp::DATE` to clustering keys +- Added `receipt_id` to search optimization +- Updated unique key to use combination of `receipt_id` and `action_index` + +### Query Changes +- Updated WHERE clause to filter on `action_name = 'FunctionCall'` first, then check `action_data :method_name` +- Updated partition_load_manual to include partition key calculation +- Added proper type casting for method_name and args from action_data + +--- + +## silver__burrow_collaterals + +### Major Changes +- Model has been refactored to use both `core__ez_actions` and `silver__logs_s3` directly +- Added proper handling of logs by joining directly to `silver__logs_s3` instead of using deprecated logs column +- Maintains same business logic and data structure, allowing for incremental processing + +### Architecture Changes +- Split data sourcing into two CTEs: + 1. `actions` CTE from `core__ez_actions` for function call data + 2. `logs` CTE from `silver__logs_s3` for event logs +- Added smart log targeting using `target_log_index` calculation based on method_name +- Implemented proper join logic between actions and logs using tx_hash, receipt_id, and calculated log_index + +### Column Changes + +#### Columns Removed +- `action_id` (replaced by receipt_id + action_index combination) +- `_inserted_timestamp` (deprecated) +- Removed dependency on deprecated `logs` column from actions table + +#### Columns Added +- `receipt_id` (from core__ez_actions) +- `action_index` (from core__ez_actions) +- `predecessor_id` (from receipt_predecessor_id) +- `signer_id` (from receipt_signer_id) +- `target_log_index` (calculated from method_name) + +#### Column Modifications +- Changed `receiver_id` to come from `receipt_receiver_id` +- Changed `method_name` to parse from `action_data :method_name` +- Changed `args` to parse from `action_data :args` +- Changed `_partition_by_block_number` to be calculated as `FLOOR(block_id, -3)` +- Changed log parsing to use clean_log directly from logs table + +### Configuration Changes +- Updated incremental predicates to use `dynamic_range_predicate_custom` +- Added `modified_timestamp::DATE` to clustering keys +- Added `receipt_id` to search optimization +- Updated unique key to use combination of `receipt_id` and `action_index` + +### Query Changes +- Updated WHERE clause to filter on `action_name = 'FunctionCall'` and specific method_names upfront +- Updated partition_load_manual to include partition key calculation +- Added proper type casting for method_name and args from action_data +- Implemented LEFT JOIN to logs table with proper matching conditions +- Maintained filtering logic for increase_collateral and decrease_collateral actions + +--- + +## silver__burrow_deposits + +### Major Changes +- Model has been refactored to use both `core__ez_actions` and `silver__logs_s3` directly +- Added proper handling of logs by joining directly to `silver__logs_s3` instead of using deprecated logs column +- Maintains same business logic and data structure, allowing for incremental processing + +### Architecture Changes +- Split data sourcing into two CTEs: + 1. `actions` CTE from `core__ez_actions` for function call data + 2. `logs` CTE from `silver__logs_s3` for event logs +- Simplified log handling by directly targeting log_index = 0 (deposits always use first log) +- Implemented proper join logic between actions and logs using tx_hash and receipt_id + +### Column Changes + +#### Columns Removed +- `action_id` (replaced by receipt_id + action_index combination) +- `_inserted_timestamp` (deprecated) +- Removed dependency on deprecated `logs` column from actions table + +#### Columns Added +- `receipt_id` (from core__ez_actions) +- `action_index` (from core__ez_actions) +- `predecessor_id` (from receipt_predecessor_id) +- `signer_id` (from receipt_signer_id) + +#### Column Modifications +- Changed `receiver_id` to come from `receipt_receiver_id` +- Changed `method_name` to parse from `action_data :method_name` +- Changed `args` to parse from `action_data :args` +- Changed `_partition_by_block_number` to be calculated as `FLOOR(block_id, -3)` +- Changed log parsing to use clean_log directly from logs table + +### Configuration Changes +- Updated incremental predicates to use `dynamic_range_predicate_custom` +- Added `modified_timestamp::DATE` to clustering keys +- Added `receipt_id` to search optimization +- Updated unique key to use combination of `receipt_id` and `action_index` + +### Query Changes +- Updated WHERE clause to filter on `action_name = 'FunctionCall'` and method_name upfront +- Updated partition_load_manual to include partition key calculation +- Added proper type casting for method_name and args from action_data +- Implemented LEFT JOIN to logs table with proper matching conditions +- Simplified log parsing by removing SUBSTRING operation and using TRY_PARSE_JSON directly + +--- + +## silver__burrow_repays + +### Major Changes +- Model has been refactored to use both `core__ez_actions` and `silver__logs_s3` directly +- Added proper handling of logs by joining directly to `silver__logs_s3` instead of using deprecated logs column +- Maintains same business logic and data structure, allowing for incremental processing + +### Architecture Changes +- Split data sourcing into two CTEs: + 1. `actions` CTE from `core__ez_actions` for function call data + 2. `logs` CTE from `silver__logs_s3` for event logs +- Simplified log handling by directly targeting log_index = 1 (repays always use second log) +- Implemented proper join logic between actions and logs using tx_hash and receipt_id + +### Column Changes + +#### Columns Removed +- `action_id` (replaced by receipt_id + action_index combination) +- `_inserted_timestamp` (deprecated) +- Removed dependency on deprecated `logs` column from actions table + +#### Columns Added +- `receipt_id` (from core__ez_actions) +- `action_index` (from core__ez_actions) +- `predecessor_id` (from receipt_predecessor_id) +- `signer_id` (from receipt_signer_id) + +#### Column Modifications +- Changed `receiver_id` to come from `receipt_receiver_id` +- Changed `method_name` to parse from `action_data :method_name` +- Changed `args` to parse from `action_data :args` +- Changed `_partition_by_block_number` to be calculated as `FLOOR(block_id, -3)` +- Changed log parsing to use clean_log directly from logs table + +### Configuration Changes +- Updated incremental predicates to use `dynamic_range_predicate_custom` +- Added `modified_timestamp::DATE` to clustering keys +- Added `receipt_id` to search optimization +- Updated unique key to use combination of `receipt_id` and `action_index` + +### Query Changes +- Updated WHERE clause to filter on `action_name = 'FunctionCall'` and method_names upfront +- Added method_name IN clause to filter both ft_on_transfer and oracle_on_call +- Updated partition_load_manual to include partition key calculation +- Added proper type casting for method_name and args from action_data +- Implemented LEFT JOIN to logs table with proper matching conditions +- Maintained complex filtering logic for repay actions and args:msg conditions + +--- + +## silver__burrow_withdraws + +### Major Changes +- Model has been refactored to use both `core__ez_actions` and `silver__logs_s3` directly +- Added proper handling of logs by joining directly to `silver__logs_s3` instead of using deprecated logs column +- Maintains same business logic and data structure, allowing for incremental processing + +### Architecture Changes +- Split data sourcing into two CTEs: + 1. `actions` CTE from `core__ez_actions` for function call data + 2. `logs` CTE from `silver__logs_s3` for event logs +- Simplified log handling by directly targeting log_index = 0 (withdraws always use first log) +- Implemented proper join logic between actions and logs using tx_hash and receipt_id + +### Column Changes + +#### Columns Removed +- `action_id` (replaced by receipt_id + action_index combination) +- `_inserted_timestamp` (deprecated) +- Removed dependency on deprecated `logs` column from actions table + +#### Columns Added +- `receipt_id` (from core__ez_actions) +- `action_index` (from core__ez_actions) +- `predecessor_id` (from receipt_predecessor_id) +- `signer_id` (from receipt_signer_id) + +#### Column Modifications +- Changed `receiver_id` to come from `receipt_receiver_id` +- Changed `method_name` to parse from `action_data :method_name` +- Changed `args` to parse from `action_data :args` +- Changed `_partition_by_block_number` to be calculated as `FLOOR(block_id, -3)` +- Changed log parsing to use clean_log directly from logs table + +### Configuration Changes +- Updated incremental predicates to use `dynamic_range_predicate_custom` +- Added `modified_timestamp::DATE` to clustering keys +- Added `receipt_id` to search optimization +- Updated unique key to use combination of `receipt_id` and `action_index` + +### Query Changes +- Updated WHERE clause to filter on `action_name = 'FunctionCall'` and method_name upfront +- Updated partition_load_manual to include partition key calculation +- Added proper type casting for method_name and args from action_data +- Implemented LEFT JOIN to logs table with proper matching conditions +- Simplified log parsing by removing SUBSTRING operation and using TRY_PARSE_JSON directly + --- \ No newline at end of file diff --git a/models/silver/defi/lending/burrow/silver__burrow_borrows.sql b/models/silver/defi/lending/burrow/silver__burrow_borrows.sql index fdda4d0..382c66a 100644 --- a/models/silver/defi/lending/burrow/silver__burrow_borrows.sql +++ b/models/silver/defi/lending/burrow/silver__burrow_borrows.sql @@ -1,11 +1,11 @@ {{ config( materialized = 'incremental', incremental_strategy = 'merge', - incremental_predicates = ["COALESCE(DBT_INTERNAL_DEST.block_timestamp::DATE,'2099-12-31') >= (select min(block_timestamp::DATE) from " ~ generate_tmp_view_name(this) ~ ")"], + incremental_predicates = ["dynamic_range_predicate_custom","block_timestamp::date"], merge_exclude_columns = ["inserted_timestamp"], unique_key = "burrow_borrows_id", - cluster_by = ['block_timestamp::DATE'], - post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION ON EQUALITY(tx_hash,sender_id);", + cluster_by = ['block_timestamp::DATE', 'modified_timestamp::DATE'], + post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION ON EQUALITY(tx_hash,receipt_id,sender_id);", tags = ['curated','scheduled_non_core'] ) }} @@ -13,25 +13,27 @@ WITH --borrows from Burrow LendingPool contracts borrows AS ( SELECT - action_id as action_id, block_id, block_timestamp, tx_hash, - method_name, - args, - receiver_id, + receipt_id, + action_index, + receipt_predecessor_id AS predecessor_id, + receipt_receiver_id AS receiver_id, + action_data :method_name :: STRING AS method_name, + action_data :args :: VARIANT AS args, receipt_succeeded, - _inserted_timestamp, - _partition_by_block_number + FLOOR(block_id, -3) AS _partition_by_block_number FROM - {{ ref('silver__actions_events_function_call_s3') }} + {{ ref('core__ez_actions') }} WHERE - receiver_id = 'contract.main.burrow.near' - AND method_name = 'oracle_on_call' - AND receipt_succeeded = TRUE + receipt_receiver_id = 'contract.main.burrow.near' + AND action_name = 'FunctionCall' + AND receipt_succeeded + AND action_data :method_name :: STRING = 'oracle_on_call' {% if var("MANUAL_FIX") %} - AND {{ partition_load_manual('no_buffer') }} + AND {{ partition_load_manual('no_buffer', 'floor(block_id, -3)') }} {% else %} {% if is_incremental() %} AND modified_timestamp >= ( @@ -61,19 +63,20 @@ FINAL AS ( segmented_data IS NOT NULL ) SELECT - action_id, tx_hash, block_id, block_timestamp, + receipt_id, + action_index, + predecessor_id, sender_id, actions, contract_address, amount_raw, token_contract_address, - _inserted_timestamp, _partition_by_block_number, {{ dbt_utils.generate_surrogate_key( - ['action_id'] + ['receipt_id', 'action_index'] ) }} AS burrow_borrows_id, SYSDATE() AS inserted_timestamp, SYSDATE() AS modified_timestamp, diff --git a/models/silver/defi/lending/burrow/silver__burrow_borrows.yml b/models/silver/defi/lending/burrow/silver__burrow_borrows.yml index 4bf205f..b40ca02 100644 --- a/models/silver/defi/lending/burrow/silver__burrow_borrows.yml +++ b/models/silver/defi/lending/burrow/silver__burrow_borrows.yml @@ -2,18 +2,12 @@ version: 2 models: - name: silver__burrow_borrows columns: - - name: action_id - description: "{{ doc('action_id')}}" - tests: - - not_null - - unique - - name: TX_HASH description: "{{ doc('tx_hash')}}" tests: - not_null - - name: block_id + - name: BLOCK_ID description: "{{ doc('block_id')}}" tests: - not_null @@ -21,8 +15,18 @@ models: - name: BLOCK_TIMESTAMP description: "{{ doc('block_timestamp')}}" tests: - - not_null: - where: _inserted_timestamp <= current_timestamp - interval '1 hour' + - not_null + + - name: RECEIPT_ID + description: "{{ doc('receipt_id')}}" + tests: + - not_null + + - name: ACTION_INDEX + description: "{{ doc('action_index')}}" + + - name: PREDECESSOR_ID + description: "{{ doc('predecessor_id')}}" - name: SENDER_ID description: "{{ doc('sender_id')}}" @@ -44,7 +48,7 @@ models: - name: BURROW_BORROWS_ID description: "{{doc('id')}}" - test: + tests: - not_null - unique diff --git a/models/silver/defi/lending/burrow/silver__burrow_collaterals.sql b/models/silver/defi/lending/burrow/silver__burrow_collaterals.sql index b5b234f..d65eb18 100644 --- a/models/silver/defi/lending/burrow/silver__burrow_collaterals.sql +++ b/models/silver/defi/lending/burrow/silver__burrow_collaterals.sql @@ -1,11 +1,11 @@ {{ config( materialized = 'incremental', incremental_strategy = 'merge', - incremental_predicates = ["COALESCE(DBT_INTERNAL_DEST.block_timestamp::DATE,'2099-12-31') >= (select min(block_timestamp::DATE) from " ~ generate_tmp_view_name(this) ~ ")"], + incremental_predicates = ["dynamic_range_predicate_custom","block_timestamp::date"], merge_exclude_columns = ["inserted_timestamp"], unique_key = "burrow_collaterals_id", - cluster_by = ['block_timestamp::DATE'], - post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION ON EQUALITY(tx_hash,sender_id);", + cluster_by = ['block_timestamp::DATE', 'modified_timestamp::DATE'], + post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION ON EQUALITY(tx_hash,receipt_id,sender_id);", tags = ['curated','scheduled_non_core'] ) }} @@ -13,29 +13,53 @@ WITH actions AS ( SELECT - action_id AS action_id, block_id, block_timestamp, tx_hash, - method_name, - args, - logs, - receiver_id, + receipt_id, + action_index, + receipt_predecessor_id AS predecessor_id, + receipt_receiver_id AS receiver_id, + action_data :method_name :: STRING AS method_name, + (action_data :method_name :: STRING = 'ft_on_transfer') :: INT AS target_log_index, + action_data :args ::VARIANT AS args, receipt_succeeded, - _inserted_timestamp, - _partition_by_block_number + FLOOR(block_id, -3) AS _partition_by_block_number FROM - {{ ref('silver__actions_events_function_call_s3') }} + {{ ref('core__ez_actions') }} + WHERE + receipt_receiver_id = 'contract.main.burrow.near' + AND action_name = 'FunctionCall' + AND receipt_succeeded + AND method_name in ('ft_on_transfer', 'oracle_on_call') + {% if var("MANUAL_FIX") %} + AND {{ partition_load_manual('no_buffer', 'floor(block_id, -3)') }} + {% else %} + {% if is_incremental() %} + AND modified_timestamp >= ( + SELECT + MAX(modified_timestamp) + FROM + {{ this }} + ) + {% endif %} + {% endif %} +), +logs AS ( + SELECT + tx_hash, + receipt_id, + clean_log, + log_index + FROM + {{ ref('silver__logs_s3') }} WHERE receiver_id = 'contract.main.burrow.near' - AND receipt_succeeded = TRUE - + AND receipt_succeeded {% if var("MANUAL_FIX") %} - AND - {{ partition_load_manual('no_buffer') }} + AND {{ partition_load_manual('no_buffer') }} {% else %} - {% if is_incremental() %} - + {% if is_incremental() %} AND modified_timestamp >= ( SELECT MAX(modified_timestamp) @@ -47,19 +71,28 @@ actions AS ( ), FINAL AS ( SELECT - *, - args :sender_id:: STRING AS sender_id, + block_id, + block_timestamp, + a.tx_hash, + a.receipt_id, + action_index, + predecessor_id, receiver_id AS contract_address, - CASE - WHEN method_name = 'ft_on_transfer' THEN PARSE_JSON(SUBSTRING(logs [1], 12)) - WHEN method_name = 'oracle_on_call' THEN PARSE_JSON(SUBSTRING(logs [0], 12)) - END :: OBJECT AS segmented_data, + args :sender_id :: STRING AS sender_id, + method_name, + args, + TRY_PARSE_JSON(l.clean_log) :: OBJECT AS segmented_data, segmented_data :data [0] :account_id AS account_id, segmented_data :data [0] :token_id AS token_contract_address, segmented_data :data [0] :amount :: NUMBER AS amount_raw, - segmented_data :event :: STRING AS actions + segmented_data :event :: STRING AS actions, + _partition_by_block_number FROM - actions + actions a + LEFT JOIN logs l + ON a.tx_hash = l.tx_hash + AND a.receipt_id = l.receipt_id + AND a.target_log_index = l.log_index WHERE ( (method_name = 'ft_on_transfer' @@ -71,19 +104,20 @@ FINAL AS ( ) ) SELECT - action_id, tx_hash, block_id, block_timestamp, + receipt_id, + action_index, + predecessor_id, sender_id, actions, contract_address, amount_raw, token_contract_address, - _inserted_timestamp, _partition_by_block_number, {{ dbt_utils.generate_surrogate_key( - ['action_id'] + ['receipt_id', 'action_index'] ) }} AS burrow_collaterals_id, SYSDATE() AS inserted_timestamp, SYSDATE() AS modified_timestamp, diff --git a/models/silver/defi/lending/burrow/silver__burrow_collaterals.yml b/models/silver/defi/lending/burrow/silver__burrow_collaterals.yml index 34796a5..53c0bb0 100644 --- a/models/silver/defi/lending/burrow/silver__burrow_collaterals.yml +++ b/models/silver/defi/lending/burrow/silver__burrow_collaterals.yml @@ -2,18 +2,12 @@ version: 2 models: - name: silver__burrow_collaterals columns: - - name: action_id - description: "{{ doc('action_id')}}" - tests: - - not_null - - unique - - name: TX_HASH description: "{{ doc('tx_hash')}}" tests: - not_null - - name: block_id + - name: BLOCK_ID description: "{{ doc('block_id')}}" tests: - not_null @@ -21,8 +15,18 @@ models: - name: BLOCK_TIMESTAMP description: "{{ doc('block_timestamp')}}" tests: - - not_null: - where: _inserted_timestamp <= current_timestamp - interval '1 hour' + - not_null + + - name: RECEIPT_ID + description: "{{ doc('receipt_id')}}" + tests: + - not_null + + - name: ACTION_INDEX + description: "{{ doc('action_index')}}" + + - name: PREDECESSOR_ID + description: "{{ doc('predecessor_id')}}" - name: SENDER_ID description: "{{ doc('sender_id')}}" @@ -44,7 +48,7 @@ models: - name: BURROW_COLLATERALS_ID description: "{{doc('id')}}" - test: + tests: - not_null - unique diff --git a/models/silver/defi/lending/burrow/silver__burrow_deposits.sql b/models/silver/defi/lending/burrow/silver__burrow_deposits.sql index 603dde1..cbb88a1 100644 --- a/models/silver/defi/lending/burrow/silver__burrow_deposits.sql +++ b/models/silver/defi/lending/burrow/silver__burrow_deposits.sql @@ -1,42 +1,38 @@ {{ config( materialized = 'incremental', incremental_strategy = 'merge', - incremental_predicates = ["COALESCE(DBT_INTERNAL_DEST.block_timestamp::DATE,'2099-12-31') >= (select min(block_timestamp::DATE) from " ~ generate_tmp_view_name(this) ~ ")"], + incremental_predicates = ["dynamic_range_predicate_custom","block_timestamp::date"], merge_exclude_columns = ["inserted_timestamp"], unique_key = "burrow_deposits_id", - cluster_by = ['block_timestamp::DATE'], - post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION ON EQUALITY(tx_hash,sender_id);", + cluster_by = ['block_timestamp::DATE', 'modified_timestamp::DATE'], + post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION ON EQUALITY(tx_hash,receipt_id,sender_id);", tags = ['curated','scheduled_non_core'] ) }} -WITH -deposits AS ( - +WITH actions AS ( SELECT - action_id AS action_id, block_id, block_timestamp, tx_hash, - method_name, - args, - logs, - receiver_id, - receipt_succeeded, - _inserted_timestamp, - _partition_by_block_number + receipt_id, + action_index, + receipt_predecessor_id AS predecessor_id, + receipt_receiver_id AS receiver_id, + action_data :method_name :: STRING AS method_name, + action_data :args AS args, + FLOOR(block_id, -3) AS _partition_by_block_number FROM - {{ ref('silver__actions_events_function_call_s3') }} + {{ ref('core__ez_actions') }} WHERE - receiver_id = 'contract.main.burrow.near' - AND method_name = 'ft_on_transfer' + receipt_receiver_id = 'contract.main.burrow.near' + AND action_name = 'FunctionCall' + AND action_data :method_name :: STRING = 'ft_on_transfer' AND receipt_succeeded = TRUE {% if var("MANUAL_FIX") %} - AND {{ partition_load_manual('no_buffer') }} + AND {{ partition_load_manual('no_buffer', 'floor(block_id, -3)') }} {% else %} - {% if is_incremental() %} - AND modified_timestamp >= ( SELECT MAX(modified_timestamp) @@ -46,33 +42,62 @@ deposits AS ( {% endif %} {% endif %} ), + +logs AS ( + SELECT + tx_hash, + receipt_id, + TRY_PARSE_JSON(clean_log) AS parsed_log + FROM {{ ref('silver__logs_s3') }} + WHERE + receiver_id = 'contract.main.burrow.near' + AND log_index = 0 -- deposits always uses logs[0] + AND receipt_id IN (SELECT receipt_id FROM actions) + {% if var("MANUAL_FIX") %} + AND {{ partition_load_manual('no_buffer') }} + {% else %} + {% if is_incremental() %} + AND modified_timestamp >= ( + SELECT + MAX(modified_timestamp) + FROM + {{ this }} + ) + {% endif %} + {% endif %} +), + FINAL AS ( SELECT - *, + a.*, args :sender_id:: STRING AS sender_id, receiver_id AS contract_address, - PARSE_JSON(SUBSTRING(logs [0], 12)) :: OBJECT AS segmented_data, + l.parsed_log AS segmented_data, segmented_data :data [0] :account_id AS account_id, segmented_data :data [0] :token_id AS token_contract_address, segmented_data :data [0] :amount :: NUMBER AS amount_raw, segmented_data :event :: STRING AS actions FROM - deposits - ) + actions a + LEFT JOIN logs l + ON a.tx_hash = l.tx_hash + AND a.receipt_id = l.receipt_id +) SELECT - action_id, tx_hash, block_id, block_timestamp, + receipt_id, + action_index, + predecessor_id, sender_id, actions, contract_address, amount_raw, token_contract_address, - _inserted_timestamp, _partition_by_block_number, {{ dbt_utils.generate_surrogate_key( - ['action_id'] + ['receipt_id', 'action_index'] ) }} AS burrow_deposits_id, SYSDATE() AS inserted_timestamp, SYSDATE() AS modified_timestamp, diff --git a/models/silver/defi/lending/burrow/silver__burrow_deposits.yml b/models/silver/defi/lending/burrow/silver__burrow_deposits.yml index 73660f2..793ec91 100644 --- a/models/silver/defi/lending/burrow/silver__burrow_deposits.yml +++ b/models/silver/defi/lending/burrow/silver__burrow_deposits.yml @@ -2,18 +2,12 @@ version: 2 models: - name: silver__burrow_deposits columns: - - name: action_id - description: "{{ doc('action_id')}}" - tests: - - not_null - - unique - - name: TX_HASH description: "{{ doc('tx_hash')}}" tests: - not_null - - name: block_id + - name: BLOCK_ID description: "{{ doc('block_id')}}" tests: - not_null @@ -21,8 +15,18 @@ models: - name: BLOCK_TIMESTAMP description: "{{ doc('block_timestamp')}}" tests: - - not_null: - where: _inserted_timestamp <= current_timestamp - interval '1 hour' + - not_null + + - name: RECEIPT_ID + description: "{{ doc('receipt_id')}}" + tests: + - not_null + + - name: ACTION_INDEX + description: "{{ doc('action_index')}}" + + - name: PREDECESSOR_ID + description: "{{ doc('predecessor_id')}}" - name: SENDER_ID description: "{{ doc('sender_id')}}" @@ -44,7 +48,7 @@ models: - name: BURROW_DEPOSITS_ID description: "{{doc('id')}}" - test: + tests: - not_null - unique diff --git a/models/silver/defi/lending/burrow/silver__burrow_repays.sql b/models/silver/defi/lending/burrow/silver__burrow_repays.sql index e0ce573..d13c10f 100644 --- a/models/silver/defi/lending/burrow/silver__burrow_repays.sql +++ b/models/silver/defi/lending/burrow/silver__burrow_repays.sql @@ -1,37 +1,36 @@ {{ config( materialized = 'incremental', incremental_strategy = 'merge', - incremental_predicates = ["COALESCE(DBT_INTERNAL_DEST.block_timestamp::DATE,'2099-12-31') >= (select min(block_timestamp::DATE) from " ~ generate_tmp_view_name(this) ~ ")"], + incremental_predicates = ["dynamic_range_predicate_custom","block_timestamp::date"], merge_exclude_columns = ["inserted_timestamp"], unique_key = "burrow_repays_id", - cluster_by = ['block_timestamp::DATE'], - post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION ON EQUALITY(tx_hash,sender_id);", + cluster_by = ['block_timestamp::DATE', 'modified_timestamp::DATE'], + post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION ON EQUALITY(tx_hash,receipt_id,sender_id);", tags = ['curated','scheduled_non_core'] ) }} -WITH -actions AS ( - +WITH actions AS ( SELECT - action_id AS action_id, block_id, block_timestamp, tx_hash, - method_name, - args, - logs, - receiver_id, - receipt_succeeded, - _inserted_timestamp, - _partition_by_block_number + receipt_id, + action_index, + receipt_predecessor_id AS predecessor_id, + receipt_receiver_id AS receiver_id, + action_data :method_name :: STRING AS method_name, + action_data :args AS args, + FLOOR(block_id, -3) AS _partition_by_block_number FROM - {{ ref('silver__actions_events_function_call_s3') }} + {{ ref('core__ez_actions') }} WHERE - receiver_id = 'contract.main.burrow.near' - AND receipt_succeeded = TRUE + receipt_receiver_id = 'contract.main.burrow.near' + AND action_name = 'FunctionCall' + AND action_data :method_name :: STRING IN ('ft_on_transfer', 'oracle_on_call') + AND receipt_succeeded {% if var("MANUAL_FIX") %} - AND {{ partition_load_manual('no_buffer') }} + AND {{ partition_load_manual('no_buffer', 'floor(block_id, -3)') }} {% else %} {% if is_incremental() %} AND modified_timestamp >= ( @@ -43,43 +42,72 @@ actions AS ( {% endif %} {% endif %} ), + +logs AS ( + SELECT + tx_hash, + receipt_id, + TRY_PARSE_JSON(clean_log) AS parsed_log + FROM {{ ref('silver__logs_s3') }} + WHERE + receiver_id = 'contract.main.burrow.near' + AND log_index = 1 -- repays always uses logs[1] + AND receipt_id IN (SELECT receipt_id FROM actions) + {% if var("MANUAL_FIX") %} + AND {{ partition_load_manual('no_buffer') }} + {% else %} + {% if is_incremental() %} + AND modified_timestamp >= ( + SELECT + MAX(modified_timestamp) + FROM + {{ this }} + ) + {% endif %} + {% endif %} +), + FINAL AS ( SELECT - *, + a.*, args :sender_id:: STRING AS sender_id, receiver_id AS contract_address, - PARSE_JSON(SUBSTRING(logs [1], 12)) :: OBJECT AS segmented_data, + l.parsed_log AS segmented_data, segmented_data :data [0] :account_id AS account_id, segmented_data :data [0] :token_id AS token_contract_address, segmented_data :data [0] :amount :: NUMBER AS amount_raw, segmented_data :event :: STRING AS actions FROM - actions + actions a + LEFT JOIN logs l + ON a.tx_hash = l.tx_hash + AND a.receipt_id = l.receipt_id WHERE - ( ( - method_name = 'ft_on_transfer' -- repay_from_deposit - AND args:msg != '' - ) OR ( - method_name = 'oracle_on_call' -- repay_from_decrease_collateral + ( + method_name = 'ft_on_transfer' -- repay_from_deposit + AND args:msg != '' + ) OR ( + method_name = 'oracle_on_call' -- repay_from_decrease_collateral ) ) AND actions = 'repay' - ) +) SELECT - action_id, tx_hash, block_id, block_timestamp, + receipt_id, + action_index, + predecessor_id, sender_id, actions, contract_address, amount_raw, token_contract_address, - _inserted_timestamp, _partition_by_block_number, {{ dbt_utils.generate_surrogate_key( - ['action_id'] + ['receipt_id', 'action_index'] ) }} AS burrow_repays_id, SYSDATE() AS inserted_timestamp, SYSDATE() AS modified_timestamp, diff --git a/models/silver/defi/lending/burrow/silver__burrow_repays.yml b/models/silver/defi/lending/burrow/silver__burrow_repays.yml index 4bb318e..c6e5593 100644 --- a/models/silver/defi/lending/burrow/silver__burrow_repays.yml +++ b/models/silver/defi/lending/burrow/silver__burrow_repays.yml @@ -2,18 +2,12 @@ version: 2 models: - name: silver__burrow_repays columns: - - name: action_id - description: "{{ doc('action_id')}}" - tests: - - not_null - - unique - - name: TX_HASH description: "{{ doc('tx_hash')}}" tests: - not_null - - name: block_id + - name: BLOCK_ID description: "{{ doc('block_id')}}" tests: - not_null @@ -21,8 +15,18 @@ models: - name: BLOCK_TIMESTAMP description: "{{ doc('block_timestamp')}}" tests: - - not_null: - where: _inserted_timestamp <= current_timestamp - interval '1 hour' + - not_null + + - name: RECEIPT_ID + description: "{{ doc('receipt_id')}}" + tests: + - not_null + + - name: ACTION_INDEX + description: "{{ doc('action_index')}}" + + - name: PREDECESSOR_ID + description: "{{ doc('predecessor_id')}}" - name: SENDER_ID description: "{{ doc('sender_id')}}" @@ -44,7 +48,7 @@ models: - name: BURROW_REPAYS_ID description: "{{doc('id')}}" - test: + tests: - not_null - unique diff --git a/models/silver/defi/lending/burrow/silver__burrow_withdraws.sql b/models/silver/defi/lending/burrow/silver__burrow_withdraws.sql index d91656c..ce802ff 100644 --- a/models/silver/defi/lending/burrow/silver__burrow_withdraws.sql +++ b/models/silver/defi/lending/burrow/silver__burrow_withdraws.sql @@ -1,38 +1,36 @@ {{ config( materialized = 'incremental', incremental_strategy = 'merge', - incremental_predicates = ["COALESCE(DBT_INTERNAL_DEST.block_timestamp::DATE,'2099-12-31') >= (select min(block_timestamp::DATE) from " ~ generate_tmp_view_name(this) ~ ")"], + incremental_predicates = ["dynamic_range_predicate_custom","block_timestamp::date"], merge_exclude_columns = ["inserted_timestamp"], unique_key = "burrow_withdraws_id", - cluster_by = ['block_timestamp::DATE'], - post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION ON EQUALITY(tx_hash,sender_id);", + cluster_by = ['block_timestamp::DATE', 'modified_timestamp::DATE'], + post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION ON EQUALITY(tx_hash,receipt_id,sender_id);", tags = ['curated','scheduled_non_core'] ) }} -WITH -- successfull withdraws -withdraws AS ( - +WITH actions AS ( SELECT - action_id AS action_id, block_id, block_timestamp, tx_hash, - method_name, - args, - logs, - receiver_id, - receipt_succeeded, - _inserted_timestamp, - _partition_by_block_number + receipt_id, + action_index, + receipt_predecessor_id AS predecessor_id, + receipt_receiver_id AS receiver_id, + action_data :method_name :: STRING AS method_name, + action_data :args AS args, + FLOOR(block_id, -3) AS _partition_by_block_number FROM - {{ ref('silver__actions_events_function_call_s3') }} + {{ ref('core__ez_actions') }} WHERE - receiver_id = 'contract.main.burrow.near' - AND method_name = 'after_ft_transfer' - AND receipt_succeeded = TRUE + receipt_receiver_id = 'contract.main.burrow.near' + AND action_name = 'FunctionCall' + AND action_data :method_name :: STRING = 'after_ft_transfer' + AND receipt_succeeded {% if var("MANUAL_FIX") %} - AND {{ partition_load_manual('no_buffer') }} + AND {{ partition_load_manual('no_buffer', 'floor(block_id, -3)') }} {% else %} {% if is_incremental() %} AND modified_timestamp >= ( @@ -44,32 +42,61 @@ withdraws AS ( {% endif %} {% endif %} ), + +logs AS ( + SELECT + tx_hash, + receipt_id, + TRY_PARSE_JSON(clean_log) AS parsed_log + FROM {{ ref('silver__logs_s3') }} + WHERE + receiver_id = 'contract.main.burrow.near' + AND log_index = 0 -- withdraws always use logs[0] + AND receipt_id IN (SELECT receipt_id FROM actions) + {% if var("MANUAL_FIX") %} + AND {{ partition_load_manual('no_buffer') }} + {% else %} + {% if is_incremental() %} + AND modified_timestamp >= ( + SELECT + MAX(modified_timestamp) + FROM + {{ this }} + ) + {% endif %} + {% endif %} +), + FINAL AS ( SELECT - *, + a.*, receiver_id AS contract_address, - PARSE_JSON(SUBSTRING(logs [0], 12)) :: OBJECT AS segmented_data, + l.parsed_log AS segmented_data, segmented_data :data [0] :account_id AS sender_id, segmented_data :data [0] :token_id AS token_contract_address, segmented_data :data [0] :amount :: NUMBER AS amount_raw, segmented_data :event :: STRING AS actions FROM - withdraws + actions a + LEFT JOIN logs l + ON a.tx_hash = l.tx_hash + AND a.receipt_id = l.receipt_id ) SELECT - action_id, tx_hash, block_id, block_timestamp, + receipt_id, + action_index, + predecessor_id, sender_id, actions, contract_address, amount_raw, token_contract_address, - _inserted_timestamp, _partition_by_block_number, {{ dbt_utils.generate_surrogate_key( - ['action_id'] + ['receipt_id', 'action_index'] ) }} AS burrow_withdraws_id, SYSDATE() AS inserted_timestamp, SYSDATE() AS modified_timestamp, diff --git a/models/silver/defi/lending/burrow/silver__burrow_withdraws.yml b/models/silver/defi/lending/burrow/silver__burrow_withdraws.yml index ae2c924..31a2665 100644 --- a/models/silver/defi/lending/burrow/silver__burrow_withdraws.yml +++ b/models/silver/defi/lending/burrow/silver__burrow_withdraws.yml @@ -2,18 +2,12 @@ version: 2 models: - name: silver__burrow_withdraws columns: - - name: action_id - description: "{{ doc('action_id')}}" - tests: - - not_null - - unique - - name: TX_HASH description: "{{ doc('tx_hash')}}" tests: - not_null - - name: block_id + - name: BLOCK_ID description: "{{ doc('block_id')}}" tests: - not_null @@ -21,8 +15,18 @@ models: - name: BLOCK_TIMESTAMP description: "{{ doc('block_timestamp')}}" tests: - - not_null: - where: _inserted_timestamp <= current_timestamp - interval '1 hour' + - not_null + + - name: RECEIPT_ID + description: "{{ doc('receipt_id')}}" + tests: + - not_null + + - name: ACTION_INDEX + description: "{{ doc('action_index')}}" + + - name: PREDECESSOR_ID + description: "{{ doc('predecessor_id')}}" - name: SENDER_ID description: "{{ doc('sender_id')}}" @@ -44,7 +48,7 @@ models: - name: BURROW_WITHDRAWS_ID description: "{{doc('id')}}" - test: + tests: - not_null - unique