burrow models refactor

This commit is contained in:
Jack Forgash 2025-04-22 13:40:54 -06:00
parent 5b67eda0a0
commit 141cae7f7e
11 changed files with 549 additions and 177 deletions

View File

@ -37,4 +37,239 @@ The gold model `nft__fact_nft_transfers.sql` needs to be updated:
1. Replace `action_id` with `receipt_id` in SELECT statement
2. Remove `_inserted_timestamp` from COALESCE logic in inserted_timestamp/modified_timestamp
---
## silver__burrow_borrows
### Major Changes
- Model has been refactored to use `core__ez_actions` directly
- Maintains same business logic and data structure, allowing for incremental processing
### Column Changes
#### Columns Removed
- `action_id` (replaced by receipt_id + action_index combination)
- `_inserted_timestamp` (deprecated)
#### Columns Added
- `receipt_id` (from core__ez_actions)
- `action_index` (from core__ez_actions)
- `predecessor_id` (from receipt_predecessor_id)
- `signer_id` (from receipt_signer_id)
#### Column Modifications
- Changed `receiver_id` to come from `receipt_receiver_id`
- Changed `method_name` to parse from `action_data :method_name`
- Changed `args` to parse from `action_data :args`
- Changed `_partition_by_block_number` to be calculated as `FLOOR(block_id, -3)`
### Configuration Changes
- Updated incremental predicates to use `dynamic_range_predicate_custom`
- Added `modified_timestamp::DATE` to clustering keys
- Added `receipt_id` to search optimization
- Updated unique key to use combination of `receipt_id` and `action_index`
### Query Changes
- Updated WHERE clause to filter on `action_name = 'FunctionCall'` first, then check `action_data :method_name`
- Updated partition_load_manual to include partition key calculation
- Added proper type casting for method_name and args from action_data
---
## silver__burrow_collaterals
### Major Changes
- Model has been refactored to use both `core__ez_actions` and `silver__logs_s3` directly
- Added proper handling of logs by joining directly to `silver__logs_s3` instead of using deprecated logs column
- Maintains same business logic and data structure, allowing for incremental processing
### Architecture Changes
- Split data sourcing into two CTEs:
1. `actions` CTE from `core__ez_actions` for function call data
2. `logs` CTE from `silver__logs_s3` for event logs
- Added smart log targeting using `target_log_index` calculation based on method_name
- Implemented proper join logic between actions and logs using tx_hash, receipt_id, and calculated log_index
### Column Changes
#### Columns Removed
- `action_id` (replaced by receipt_id + action_index combination)
- `_inserted_timestamp` (deprecated)
- Removed dependency on deprecated `logs` column from actions table
#### Columns Added
- `receipt_id` (from core__ez_actions)
- `action_index` (from core__ez_actions)
- `predecessor_id` (from receipt_predecessor_id)
- `signer_id` (from receipt_signer_id)
- `target_log_index` (calculated from method_name)
#### Column Modifications
- Changed `receiver_id` to come from `receipt_receiver_id`
- Changed `method_name` to parse from `action_data :method_name`
- Changed `args` to parse from `action_data :args`
- Changed `_partition_by_block_number` to be calculated as `FLOOR(block_id, -3)`
- Changed log parsing to use clean_log directly from logs table
### Configuration Changes
- Updated incremental predicates to use `dynamic_range_predicate_custom`
- Added `modified_timestamp::DATE` to clustering keys
- Added `receipt_id` to search optimization
- Updated unique key to use combination of `receipt_id` and `action_index`
### Query Changes
- Updated WHERE clause to filter on `action_name = 'FunctionCall'` and specific method_names upfront
- Updated partition_load_manual to include partition key calculation
- Added proper type casting for method_name and args from action_data
- Implemented LEFT JOIN to logs table with proper matching conditions
- Maintained filtering logic for increase_collateral and decrease_collateral actions
---
## silver__burrow_deposits
### Major Changes
- Model has been refactored to use both `core__ez_actions` and `silver__logs_s3` directly
- Added proper handling of logs by joining directly to `silver__logs_s3` instead of using deprecated logs column
- Maintains same business logic and data structure, allowing for incremental processing
### Architecture Changes
- Split data sourcing into two CTEs:
1. `actions` CTE from `core__ez_actions` for function call data
2. `logs` CTE from `silver__logs_s3` for event logs
- Simplified log handling by directly targeting log_index = 0 (deposits always use first log)
- Implemented proper join logic between actions and logs using tx_hash and receipt_id
### Column Changes
#### Columns Removed
- `action_id` (replaced by receipt_id + action_index combination)
- `_inserted_timestamp` (deprecated)
- Removed dependency on deprecated `logs` column from actions table
#### Columns Added
- `receipt_id` (from core__ez_actions)
- `action_index` (from core__ez_actions)
- `predecessor_id` (from receipt_predecessor_id)
- `signer_id` (from receipt_signer_id)
#### Column Modifications
- Changed `receiver_id` to come from `receipt_receiver_id`
- Changed `method_name` to parse from `action_data :method_name`
- Changed `args` to parse from `action_data :args`
- Changed `_partition_by_block_number` to be calculated as `FLOOR(block_id, -3)`
- Changed log parsing to use clean_log directly from logs table
### Configuration Changes
- Updated incremental predicates to use `dynamic_range_predicate_custom`
- Added `modified_timestamp::DATE` to clustering keys
- Added `receipt_id` to search optimization
- Updated unique key to use combination of `receipt_id` and `action_index`
### Query Changes
- Updated WHERE clause to filter on `action_name = 'FunctionCall'` and method_name upfront
- Updated partition_load_manual to include partition key calculation
- Added proper type casting for method_name and args from action_data
- Implemented LEFT JOIN to logs table with proper matching conditions
- Simplified log parsing by removing SUBSTRING operation and using TRY_PARSE_JSON directly
---
## silver__burrow_repays
### Major Changes
- Model has been refactored to use both `core__ez_actions` and `silver__logs_s3` directly
- Added proper handling of logs by joining directly to `silver__logs_s3` instead of using deprecated logs column
- Maintains same business logic and data structure, allowing for incremental processing
### Architecture Changes
- Split data sourcing into two CTEs:
1. `actions` CTE from `core__ez_actions` for function call data
2. `logs` CTE from `silver__logs_s3` for event logs
- Simplified log handling by directly targeting log_index = 1 (repays always use second log)
- Implemented proper join logic between actions and logs using tx_hash and receipt_id
### Column Changes
#### Columns Removed
- `action_id` (replaced by receipt_id + action_index combination)
- `_inserted_timestamp` (deprecated)
- Removed dependency on deprecated `logs` column from actions table
#### Columns Added
- `receipt_id` (from core__ez_actions)
- `action_index` (from core__ez_actions)
- `predecessor_id` (from receipt_predecessor_id)
- `signer_id` (from receipt_signer_id)
#### Column Modifications
- Changed `receiver_id` to come from `receipt_receiver_id`
- Changed `method_name` to parse from `action_data :method_name`
- Changed `args` to parse from `action_data :args`
- Changed `_partition_by_block_number` to be calculated as `FLOOR(block_id, -3)`
- Changed log parsing to use clean_log directly from logs table
### Configuration Changes
- Updated incremental predicates to use `dynamic_range_predicate_custom`
- Added `modified_timestamp::DATE` to clustering keys
- Added `receipt_id` to search optimization
- Updated unique key to use combination of `receipt_id` and `action_index`
### Query Changes
- Updated WHERE clause to filter on `action_name = 'FunctionCall'` and method_names upfront
- Added method_name IN clause to filter both ft_on_transfer and oracle_on_call
- Updated partition_load_manual to include partition key calculation
- Added proper type casting for method_name and args from action_data
- Implemented LEFT JOIN to logs table with proper matching conditions
- Maintained complex filtering logic for repay actions and args:msg conditions
---
## silver__burrow_withdraws
### Major Changes
- Model has been refactored to use both `core__ez_actions` and `silver__logs_s3` directly
- Added proper handling of logs by joining directly to `silver__logs_s3` instead of using deprecated logs column
- Maintains same business logic and data structure, allowing for incremental processing
### Architecture Changes
- Split data sourcing into two CTEs:
1. `actions` CTE from `core__ez_actions` for function call data
2. `logs` CTE from `silver__logs_s3` for event logs
- Simplified log handling by directly targeting log_index = 0 (withdraws always use first log)
- Implemented proper join logic between actions and logs using tx_hash and receipt_id
### Column Changes
#### Columns Removed
- `action_id` (replaced by receipt_id + action_index combination)
- `_inserted_timestamp` (deprecated)
- Removed dependency on deprecated `logs` column from actions table
#### Columns Added
- `receipt_id` (from core__ez_actions)
- `action_index` (from core__ez_actions)
- `predecessor_id` (from receipt_predecessor_id)
- `signer_id` (from receipt_signer_id)
#### Column Modifications
- Changed `receiver_id` to come from `receipt_receiver_id`
- Changed `method_name` to parse from `action_data :method_name`
- Changed `args` to parse from `action_data :args`
- Changed `_partition_by_block_number` to be calculated as `FLOOR(block_id, -3)`
- Changed log parsing to use clean_log directly from logs table
### Configuration Changes
- Updated incremental predicates to use `dynamic_range_predicate_custom`
- Added `modified_timestamp::DATE` to clustering keys
- Added `receipt_id` to search optimization
- Updated unique key to use combination of `receipt_id` and `action_index`
### Query Changes
- Updated WHERE clause to filter on `action_name = 'FunctionCall'` and method_name upfront
- Updated partition_load_manual to include partition key calculation
- Added proper type casting for method_name and args from action_data
- Implemented LEFT JOIN to logs table with proper matching conditions
- Simplified log parsing by removing SUBSTRING operation and using TRY_PARSE_JSON directly
---

View File

@ -1,11 +1,11 @@
{{ config(
materialized = 'incremental',
incremental_strategy = 'merge',
incremental_predicates = ["COALESCE(DBT_INTERNAL_DEST.block_timestamp::DATE,'2099-12-31') >= (select min(block_timestamp::DATE) from " ~ generate_tmp_view_name(this) ~ ")"],
incremental_predicates = ["dynamic_range_predicate_custom","block_timestamp::date"],
merge_exclude_columns = ["inserted_timestamp"],
unique_key = "burrow_borrows_id",
cluster_by = ['block_timestamp::DATE'],
post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION ON EQUALITY(tx_hash,sender_id);",
cluster_by = ['block_timestamp::DATE', 'modified_timestamp::DATE'],
post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION ON EQUALITY(tx_hash,receipt_id,sender_id);",
tags = ['curated','scheduled_non_core']
) }}
@ -13,25 +13,27 @@ WITH --borrows from Burrow LendingPool contracts
borrows AS (
SELECT
action_id as action_id,
block_id,
block_timestamp,
tx_hash,
method_name,
args,
receiver_id,
receipt_id,
action_index,
receipt_predecessor_id AS predecessor_id,
receipt_receiver_id AS receiver_id,
action_data :method_name :: STRING AS method_name,
action_data :args :: VARIANT AS args,
receipt_succeeded,
_inserted_timestamp,
_partition_by_block_number
FLOOR(block_id, -3) AS _partition_by_block_number
FROM
{{ ref('silver__actions_events_function_call_s3') }}
{{ ref('core__ez_actions') }}
WHERE
receiver_id = 'contract.main.burrow.near'
AND method_name = 'oracle_on_call'
AND receipt_succeeded = TRUE
receipt_receiver_id = 'contract.main.burrow.near'
AND action_name = 'FunctionCall'
AND receipt_succeeded
AND action_data :method_name :: STRING = 'oracle_on_call'
{% if var("MANUAL_FIX") %}
AND {{ partition_load_manual('no_buffer') }}
AND {{ partition_load_manual('no_buffer', 'floor(block_id, -3)') }}
{% else %}
{% if is_incremental() %}
AND modified_timestamp >= (
@ -61,19 +63,20 @@ FINAL AS (
segmented_data IS NOT NULL
)
SELECT
action_id,
tx_hash,
block_id,
block_timestamp,
receipt_id,
action_index,
predecessor_id,
sender_id,
actions,
contract_address,
amount_raw,
token_contract_address,
_inserted_timestamp,
_partition_by_block_number,
{{ dbt_utils.generate_surrogate_key(
['action_id']
['receipt_id', 'action_index']
) }} AS burrow_borrows_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp,

View File

@ -2,18 +2,12 @@ version: 2
models:
- name: silver__burrow_borrows
columns:
- name: action_id
description: "{{ doc('action_id')}}"
tests:
- not_null
- unique
- name: TX_HASH
description: "{{ doc('tx_hash')}}"
tests:
- not_null
- name: block_id
- name: BLOCK_ID
description: "{{ doc('block_id')}}"
tests:
- not_null
@ -21,8 +15,18 @@ models:
- name: BLOCK_TIMESTAMP
description: "{{ doc('block_timestamp')}}"
tests:
- not_null:
where: _inserted_timestamp <= current_timestamp - interval '1 hour'
- not_null
- name: RECEIPT_ID
description: "{{ doc('receipt_id')}}"
tests:
- not_null
- name: ACTION_INDEX
description: "{{ doc('action_index')}}"
- name: PREDECESSOR_ID
description: "{{ doc('predecessor_id')}}"
- name: SENDER_ID
description: "{{ doc('sender_id')}}"
@ -44,7 +48,7 @@ models:
- name: BURROW_BORROWS_ID
description: "{{doc('id')}}"
test:
tests:
- not_null
- unique

View File

@ -1,11 +1,11 @@
{{ config(
materialized = 'incremental',
incremental_strategy = 'merge',
incremental_predicates = ["COALESCE(DBT_INTERNAL_DEST.block_timestamp::DATE,'2099-12-31') >= (select min(block_timestamp::DATE) from " ~ generate_tmp_view_name(this) ~ ")"],
incremental_predicates = ["dynamic_range_predicate_custom","block_timestamp::date"],
merge_exclude_columns = ["inserted_timestamp"],
unique_key = "burrow_collaterals_id",
cluster_by = ['block_timestamp::DATE'],
post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION ON EQUALITY(tx_hash,sender_id);",
cluster_by = ['block_timestamp::DATE', 'modified_timestamp::DATE'],
post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION ON EQUALITY(tx_hash,receipt_id,sender_id);",
tags = ['curated','scheduled_non_core']
) }}
@ -13,29 +13,53 @@ WITH
actions AS (
SELECT
action_id AS action_id,
block_id,
block_timestamp,
tx_hash,
method_name,
args,
logs,
receiver_id,
receipt_id,
action_index,
receipt_predecessor_id AS predecessor_id,
receipt_receiver_id AS receiver_id,
action_data :method_name :: STRING AS method_name,
(action_data :method_name :: STRING = 'ft_on_transfer') :: INT AS target_log_index,
action_data :args ::VARIANT AS args,
receipt_succeeded,
_inserted_timestamp,
_partition_by_block_number
FLOOR(block_id, -3) AS _partition_by_block_number
FROM
{{ ref('silver__actions_events_function_call_s3') }}
{{ ref('core__ez_actions') }}
WHERE
receipt_receiver_id = 'contract.main.burrow.near'
AND action_name = 'FunctionCall'
AND receipt_succeeded
AND method_name in ('ft_on_transfer', 'oracle_on_call')
{% if var("MANUAL_FIX") %}
AND {{ partition_load_manual('no_buffer', 'floor(block_id, -3)') }}
{% else %}
{% if is_incremental() %}
AND modified_timestamp >= (
SELECT
MAX(modified_timestamp)
FROM
{{ this }}
)
{% endif %}
{% endif %}
),
logs AS (
SELECT
tx_hash,
receipt_id,
clean_log,
log_index
FROM
{{ ref('silver__logs_s3') }}
WHERE
receiver_id = 'contract.main.burrow.near'
AND receipt_succeeded = TRUE
AND receipt_succeeded
{% if var("MANUAL_FIX") %}
AND
{{ partition_load_manual('no_buffer') }}
AND {{ partition_load_manual('no_buffer') }}
{% else %}
{% if is_incremental() %}
{% if is_incremental() %}
AND modified_timestamp >= (
SELECT
MAX(modified_timestamp)
@ -47,19 +71,28 @@ actions AS (
),
FINAL AS (
SELECT
*,
args :sender_id:: STRING AS sender_id,
block_id,
block_timestamp,
a.tx_hash,
a.receipt_id,
action_index,
predecessor_id,
receiver_id AS contract_address,
CASE
WHEN method_name = 'ft_on_transfer' THEN PARSE_JSON(SUBSTRING(logs [1], 12))
WHEN method_name = 'oracle_on_call' THEN PARSE_JSON(SUBSTRING(logs [0], 12))
END :: OBJECT AS segmented_data,
args :sender_id :: STRING AS sender_id,
method_name,
args,
TRY_PARSE_JSON(l.clean_log) :: OBJECT AS segmented_data,
segmented_data :data [0] :account_id AS account_id,
segmented_data :data [0] :token_id AS token_contract_address,
segmented_data :data [0] :amount :: NUMBER AS amount_raw,
segmented_data :event :: STRING AS actions
segmented_data :event :: STRING AS actions,
_partition_by_block_number
FROM
actions
actions a
LEFT JOIN logs l
ON a.tx_hash = l.tx_hash
AND a.receipt_id = l.receipt_id
AND a.target_log_index = l.log_index
WHERE
(
(method_name = 'ft_on_transfer'
@ -71,19 +104,20 @@ FINAL AS (
)
)
SELECT
action_id,
tx_hash,
block_id,
block_timestamp,
receipt_id,
action_index,
predecessor_id,
sender_id,
actions,
contract_address,
amount_raw,
token_contract_address,
_inserted_timestamp,
_partition_by_block_number,
{{ dbt_utils.generate_surrogate_key(
['action_id']
['receipt_id', 'action_index']
) }} AS burrow_collaterals_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp,

View File

@ -2,18 +2,12 @@ version: 2
models:
- name: silver__burrow_collaterals
columns:
- name: action_id
description: "{{ doc('action_id')}}"
tests:
- not_null
- unique
- name: TX_HASH
description: "{{ doc('tx_hash')}}"
tests:
- not_null
- name: block_id
- name: BLOCK_ID
description: "{{ doc('block_id')}}"
tests:
- not_null
@ -21,8 +15,18 @@ models:
- name: BLOCK_TIMESTAMP
description: "{{ doc('block_timestamp')}}"
tests:
- not_null:
where: _inserted_timestamp <= current_timestamp - interval '1 hour'
- not_null
- name: RECEIPT_ID
description: "{{ doc('receipt_id')}}"
tests:
- not_null
- name: ACTION_INDEX
description: "{{ doc('action_index')}}"
- name: PREDECESSOR_ID
description: "{{ doc('predecessor_id')}}"
- name: SENDER_ID
description: "{{ doc('sender_id')}}"
@ -44,7 +48,7 @@ models:
- name: BURROW_COLLATERALS_ID
description: "{{doc('id')}}"
test:
tests:
- not_null
- unique

View File

@ -1,42 +1,38 @@
{{ config(
materialized = 'incremental',
incremental_strategy = 'merge',
incremental_predicates = ["COALESCE(DBT_INTERNAL_DEST.block_timestamp::DATE,'2099-12-31') >= (select min(block_timestamp::DATE) from " ~ generate_tmp_view_name(this) ~ ")"],
incremental_predicates = ["dynamic_range_predicate_custom","block_timestamp::date"],
merge_exclude_columns = ["inserted_timestamp"],
unique_key = "burrow_deposits_id",
cluster_by = ['block_timestamp::DATE'],
post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION ON EQUALITY(tx_hash,sender_id);",
cluster_by = ['block_timestamp::DATE', 'modified_timestamp::DATE'],
post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION ON EQUALITY(tx_hash,receipt_id,sender_id);",
tags = ['curated','scheduled_non_core']
) }}
WITH
deposits AS (
WITH actions AS (
SELECT
action_id AS action_id,
block_id,
block_timestamp,
tx_hash,
method_name,
args,
logs,
receiver_id,
receipt_succeeded,
_inserted_timestamp,
_partition_by_block_number
receipt_id,
action_index,
receipt_predecessor_id AS predecessor_id,
receipt_receiver_id AS receiver_id,
action_data :method_name :: STRING AS method_name,
action_data :args AS args,
FLOOR(block_id, -3) AS _partition_by_block_number
FROM
{{ ref('silver__actions_events_function_call_s3') }}
{{ ref('core__ez_actions') }}
WHERE
receiver_id = 'contract.main.burrow.near'
AND method_name = 'ft_on_transfer'
receipt_receiver_id = 'contract.main.burrow.near'
AND action_name = 'FunctionCall'
AND action_data :method_name :: STRING = 'ft_on_transfer'
AND receipt_succeeded = TRUE
{% if var("MANUAL_FIX") %}
AND {{ partition_load_manual('no_buffer') }}
AND {{ partition_load_manual('no_buffer', 'floor(block_id, -3)') }}
{% else %}
{% if is_incremental() %}
AND modified_timestamp >= (
SELECT
MAX(modified_timestamp)
@ -46,33 +42,62 @@ deposits AS (
{% endif %}
{% endif %}
),
logs AS (
SELECT
tx_hash,
receipt_id,
TRY_PARSE_JSON(clean_log) AS parsed_log
FROM {{ ref('silver__logs_s3') }}
WHERE
receiver_id = 'contract.main.burrow.near'
AND log_index = 0 -- deposits always uses logs[0]
AND receipt_id IN (SELECT receipt_id FROM actions)
{% if var("MANUAL_FIX") %}
AND {{ partition_load_manual('no_buffer') }}
{% else %}
{% if is_incremental() %}
AND modified_timestamp >= (
SELECT
MAX(modified_timestamp)
FROM
{{ this }}
)
{% endif %}
{% endif %}
),
FINAL AS (
SELECT
*,
a.*,
args :sender_id:: STRING AS sender_id,
receiver_id AS contract_address,
PARSE_JSON(SUBSTRING(logs [0], 12)) :: OBJECT AS segmented_data,
l.parsed_log AS segmented_data,
segmented_data :data [0] :account_id AS account_id,
segmented_data :data [0] :token_id AS token_contract_address,
segmented_data :data [0] :amount :: NUMBER AS amount_raw,
segmented_data :event :: STRING AS actions
FROM
deposits
)
actions a
LEFT JOIN logs l
ON a.tx_hash = l.tx_hash
AND a.receipt_id = l.receipt_id
)
SELECT
action_id,
tx_hash,
block_id,
block_timestamp,
receipt_id,
action_index,
predecessor_id,
sender_id,
actions,
contract_address,
amount_raw,
token_contract_address,
_inserted_timestamp,
_partition_by_block_number,
{{ dbt_utils.generate_surrogate_key(
['action_id']
['receipt_id', 'action_index']
) }} AS burrow_deposits_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp,

View File

@ -2,18 +2,12 @@ version: 2
models:
- name: silver__burrow_deposits
columns:
- name: action_id
description: "{{ doc('action_id')}}"
tests:
- not_null
- unique
- name: TX_HASH
description: "{{ doc('tx_hash')}}"
tests:
- not_null
- name: block_id
- name: BLOCK_ID
description: "{{ doc('block_id')}}"
tests:
- not_null
@ -21,8 +15,18 @@ models:
- name: BLOCK_TIMESTAMP
description: "{{ doc('block_timestamp')}}"
tests:
- not_null:
where: _inserted_timestamp <= current_timestamp - interval '1 hour'
- not_null
- name: RECEIPT_ID
description: "{{ doc('receipt_id')}}"
tests:
- not_null
- name: ACTION_INDEX
description: "{{ doc('action_index')}}"
- name: PREDECESSOR_ID
description: "{{ doc('predecessor_id')}}"
- name: SENDER_ID
description: "{{ doc('sender_id')}}"
@ -44,7 +48,7 @@ models:
- name: BURROW_DEPOSITS_ID
description: "{{doc('id')}}"
test:
tests:
- not_null
- unique

View File

@ -1,37 +1,36 @@
{{ config(
materialized = 'incremental',
incremental_strategy = 'merge',
incremental_predicates = ["COALESCE(DBT_INTERNAL_DEST.block_timestamp::DATE,'2099-12-31') >= (select min(block_timestamp::DATE) from " ~ generate_tmp_view_name(this) ~ ")"],
incremental_predicates = ["dynamic_range_predicate_custom","block_timestamp::date"],
merge_exclude_columns = ["inserted_timestamp"],
unique_key = "burrow_repays_id",
cluster_by = ['block_timestamp::DATE'],
post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION ON EQUALITY(tx_hash,sender_id);",
cluster_by = ['block_timestamp::DATE', 'modified_timestamp::DATE'],
post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION ON EQUALITY(tx_hash,receipt_id,sender_id);",
tags = ['curated','scheduled_non_core']
) }}
WITH
actions AS (
WITH actions AS (
SELECT
action_id AS action_id,
block_id,
block_timestamp,
tx_hash,
method_name,
args,
logs,
receiver_id,
receipt_succeeded,
_inserted_timestamp,
_partition_by_block_number
receipt_id,
action_index,
receipt_predecessor_id AS predecessor_id,
receipt_receiver_id AS receiver_id,
action_data :method_name :: STRING AS method_name,
action_data :args AS args,
FLOOR(block_id, -3) AS _partition_by_block_number
FROM
{{ ref('silver__actions_events_function_call_s3') }}
{{ ref('core__ez_actions') }}
WHERE
receiver_id = 'contract.main.burrow.near'
AND receipt_succeeded = TRUE
receipt_receiver_id = 'contract.main.burrow.near'
AND action_name = 'FunctionCall'
AND action_data :method_name :: STRING IN ('ft_on_transfer', 'oracle_on_call')
AND receipt_succeeded
{% if var("MANUAL_FIX") %}
AND {{ partition_load_manual('no_buffer') }}
AND {{ partition_load_manual('no_buffer', 'floor(block_id, -3)') }}
{% else %}
{% if is_incremental() %}
AND modified_timestamp >= (
@ -43,43 +42,72 @@ actions AS (
{% endif %}
{% endif %}
),
logs AS (
SELECT
tx_hash,
receipt_id,
TRY_PARSE_JSON(clean_log) AS parsed_log
FROM {{ ref('silver__logs_s3') }}
WHERE
receiver_id = 'contract.main.burrow.near'
AND log_index = 1 -- repays always uses logs[1]
AND receipt_id IN (SELECT receipt_id FROM actions)
{% if var("MANUAL_FIX") %}
AND {{ partition_load_manual('no_buffer') }}
{% else %}
{% if is_incremental() %}
AND modified_timestamp >= (
SELECT
MAX(modified_timestamp)
FROM
{{ this }}
)
{% endif %}
{% endif %}
),
FINAL AS (
SELECT
*,
a.*,
args :sender_id:: STRING AS sender_id,
receiver_id AS contract_address,
PARSE_JSON(SUBSTRING(logs [1], 12)) :: OBJECT AS segmented_data,
l.parsed_log AS segmented_data,
segmented_data :data [0] :account_id AS account_id,
segmented_data :data [0] :token_id AS token_contract_address,
segmented_data :data [0] :amount :: NUMBER AS amount_raw,
segmented_data :event :: STRING AS actions
FROM
actions
actions a
LEFT JOIN logs l
ON a.tx_hash = l.tx_hash
AND a.receipt_id = l.receipt_id
WHERE
(
(
method_name = 'ft_on_transfer' -- repay_from_deposit
AND args:msg != ''
) OR (
method_name = 'oracle_on_call' -- repay_from_decrease_collateral
(
method_name = 'ft_on_transfer' -- repay_from_deposit
AND args:msg != ''
) OR (
method_name = 'oracle_on_call' -- repay_from_decrease_collateral
)
)
AND actions = 'repay'
)
)
SELECT
action_id,
tx_hash,
block_id,
block_timestamp,
receipt_id,
action_index,
predecessor_id,
sender_id,
actions,
contract_address,
amount_raw,
token_contract_address,
_inserted_timestamp,
_partition_by_block_number,
{{ dbt_utils.generate_surrogate_key(
['action_id']
['receipt_id', 'action_index']
) }} AS burrow_repays_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp,

View File

@ -2,18 +2,12 @@ version: 2
models:
- name: silver__burrow_repays
columns:
- name: action_id
description: "{{ doc('action_id')}}"
tests:
- not_null
- unique
- name: TX_HASH
description: "{{ doc('tx_hash')}}"
tests:
- not_null
- name: block_id
- name: BLOCK_ID
description: "{{ doc('block_id')}}"
tests:
- not_null
@ -21,8 +15,18 @@ models:
- name: BLOCK_TIMESTAMP
description: "{{ doc('block_timestamp')}}"
tests:
- not_null:
where: _inserted_timestamp <= current_timestamp - interval '1 hour'
- not_null
- name: RECEIPT_ID
description: "{{ doc('receipt_id')}}"
tests:
- not_null
- name: ACTION_INDEX
description: "{{ doc('action_index')}}"
- name: PREDECESSOR_ID
description: "{{ doc('predecessor_id')}}"
- name: SENDER_ID
description: "{{ doc('sender_id')}}"
@ -44,7 +48,7 @@ models:
- name: BURROW_REPAYS_ID
description: "{{doc('id')}}"
test:
tests:
- not_null
- unique

View File

@ -1,38 +1,36 @@
{{ config(
materialized = 'incremental',
incremental_strategy = 'merge',
incremental_predicates = ["COALESCE(DBT_INTERNAL_DEST.block_timestamp::DATE,'2099-12-31') >= (select min(block_timestamp::DATE) from " ~ generate_tmp_view_name(this) ~ ")"],
incremental_predicates = ["dynamic_range_predicate_custom","block_timestamp::date"],
merge_exclude_columns = ["inserted_timestamp"],
unique_key = "burrow_withdraws_id",
cluster_by = ['block_timestamp::DATE'],
post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION ON EQUALITY(tx_hash,sender_id);",
cluster_by = ['block_timestamp::DATE', 'modified_timestamp::DATE'],
post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION ON EQUALITY(tx_hash,receipt_id,sender_id);",
tags = ['curated','scheduled_non_core']
) }}
WITH -- successfull withdraws
withdraws AS (
WITH actions AS (
SELECT
action_id AS action_id,
block_id,
block_timestamp,
tx_hash,
method_name,
args,
logs,
receiver_id,
receipt_succeeded,
_inserted_timestamp,
_partition_by_block_number
receipt_id,
action_index,
receipt_predecessor_id AS predecessor_id,
receipt_receiver_id AS receiver_id,
action_data :method_name :: STRING AS method_name,
action_data :args AS args,
FLOOR(block_id, -3) AS _partition_by_block_number
FROM
{{ ref('silver__actions_events_function_call_s3') }}
{{ ref('core__ez_actions') }}
WHERE
receiver_id = 'contract.main.burrow.near'
AND method_name = 'after_ft_transfer'
AND receipt_succeeded = TRUE
receipt_receiver_id = 'contract.main.burrow.near'
AND action_name = 'FunctionCall'
AND action_data :method_name :: STRING = 'after_ft_transfer'
AND receipt_succeeded
{% if var("MANUAL_FIX") %}
AND {{ partition_load_manual('no_buffer') }}
AND {{ partition_load_manual('no_buffer', 'floor(block_id, -3)') }}
{% else %}
{% if is_incremental() %}
AND modified_timestamp >= (
@ -44,32 +42,61 @@ withdraws AS (
{% endif %}
{% endif %}
),
logs AS (
SELECT
tx_hash,
receipt_id,
TRY_PARSE_JSON(clean_log) AS parsed_log
FROM {{ ref('silver__logs_s3') }}
WHERE
receiver_id = 'contract.main.burrow.near'
AND log_index = 0 -- withdraws always use logs[0]
AND receipt_id IN (SELECT receipt_id FROM actions)
{% if var("MANUAL_FIX") %}
AND {{ partition_load_manual('no_buffer') }}
{% else %}
{% if is_incremental() %}
AND modified_timestamp >= (
SELECT
MAX(modified_timestamp)
FROM
{{ this }}
)
{% endif %}
{% endif %}
),
FINAL AS (
SELECT
*,
a.*,
receiver_id AS contract_address,
PARSE_JSON(SUBSTRING(logs [0], 12)) :: OBJECT AS segmented_data,
l.parsed_log AS segmented_data,
segmented_data :data [0] :account_id AS sender_id,
segmented_data :data [0] :token_id AS token_contract_address,
segmented_data :data [0] :amount :: NUMBER AS amount_raw,
segmented_data :event :: STRING AS actions
FROM
withdraws
actions a
LEFT JOIN logs l
ON a.tx_hash = l.tx_hash
AND a.receipt_id = l.receipt_id
)
SELECT
action_id,
tx_hash,
block_id,
block_timestamp,
receipt_id,
action_index,
predecessor_id,
sender_id,
actions,
contract_address,
amount_raw,
token_contract_address,
_inserted_timestamp,
_partition_by_block_number,
{{ dbt_utils.generate_surrogate_key(
['action_id']
['receipt_id', 'action_index']
) }} AS burrow_withdraws_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp,

View File

@ -2,18 +2,12 @@ version: 2
models:
- name: silver__burrow_withdraws
columns:
- name: action_id
description: "{{ doc('action_id')}}"
tests:
- not_null
- unique
- name: TX_HASH
description: "{{ doc('tx_hash')}}"
tests:
- not_null
- name: block_id
- name: BLOCK_ID
description: "{{ doc('block_id')}}"
tests:
- not_null
@ -21,8 +15,18 @@ models:
- name: BLOCK_TIMESTAMP
description: "{{ doc('block_timestamp')}}"
tests:
- not_null:
where: _inserted_timestamp <= current_timestamp - interval '1 hour'
- not_null
- name: RECEIPT_ID
description: "{{ doc('receipt_id')}}"
tests:
- not_null
- name: ACTION_INDEX
description: "{{ doc('action_index')}}"
- name: PREDECESSOR_ID
description: "{{ doc('predecessor_id')}}"
- name: SENDER_ID
description: "{{ doc('sender_id')}}"
@ -44,7 +48,7 @@ models:
- name: BURROW_WITHDRAWS_ID
description: "{{doc('id')}}"
test:
tests:
- not_null
- unique