diff --git a/models/silver/accounts/silver__token_account_owners.sql b/models/silver/accounts/silver__token_account_owners.sql index 01f222be..add348e3 100644 --- a/models/silver/accounts/silver__token_account_owners.sql +++ b/models/silver/accounts/silver__token_account_owners.sql @@ -1,59 +1,123 @@ {{ config( - materialized = 'table', - post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION ON EQUALITY(account_address,owner);", - tags = ['daily'] + materialized = 'incremental', + incremental_strategy = "delete+insert", + incremental_predicates = ['min_value_predicate', 'start_block_id', generate_view_name(this) ~ ".start_block_id >= " ~ generate_tmp_view_name(this) ~ ".start_block_id"], + unique_key = ["account_address"], + cluster_by = ["round(start_block_id,-5)"], + post_hook = enable_search_optimization('{{this.schema}}','{{this.identifier}}','ON EQUALITY(account_address, owner)'), + tags = ['scheduled_non_core'] ) }} -/* need to rebucket and regroup the intermediate model due to possibility of change events coming in out of order */ -WITH rebucket AS ( + +WITH new_events AS ( SELECT account_address, owner, start_block_id, - conditional_change_event(owner) over ( - PARTITION BY account_address - ORDER BY - start_block_id - ) AS bucket + _inserted_timestamp, FROM {{ ref('silver__token_account_owners_intermediate') }} + WHERE + {% if is_incremental() %} + _inserted_timestamp > ( + SELECT + MAX(_inserted_timestamp) + FROM + {{ this }} + ) + {% else %} + _inserted_timestamp :: DATE = '2022-09-01' + {% endif %} + AND start_block_id <> coalesce(end_block_id,-1) ), -regroup AS ( + +{% if is_incremental() %} +distinct_states AS ( SELECT account_address, - owner, - bucket, - MIN(start_block_id) AS start_block_id + MIN(start_block_id) AS min_block_id FROM - rebucket + new_events GROUP BY - account_address, - owner, - bucket + 1 ), -pre_final AS ( +events_to_reprocess AS ( SELECT - account_address, - owner, - start_block_id, - LEAD(start_block_id) ignore nulls over ( - PARTITION BY account_address - ORDER BY - bucket - ) AS end_block_id + C.account_address, + C.owner, + C.start_block_id, + C._inserted_timestamp, FROM - regroup + {{ ref('silver__token_account_owners_intermediate') }} C + JOIN + distinct_states d + USING(account_address) + WHERE + C.start_block_id >= d.min_block_id + AND start_block_id <> coalesce(end_block_id,-1) + AND _inserted_timestamp <= (SELECT max(_inserted_timestamp) FROM new_events) +), +current_state AS ( + select + C.account_address, + C.owner, + C.start_block_id, + C._inserted_timestamp, + from + {{ this }} C + JOIN + distinct_states d + USING(account_address) + WHERE + ( + C.end_block_id >= d.min_block_id + OR + C.end_block_id IS NULL + ) + QUALIFY + row_number() over (partition by account_address order by start_block_id) = 1 + +), +{% endif %} +all_states AS ( + {% if is_incremental() %} + SELECT + * + FROM + current_state + UNION ALL + SELECT + * + FROM + events_to_reprocess + {% else %} + SELECT + * + FROM + new_events + {% endif %} +), +changed_states AS ( + SELECT + * + FROM + all_states + QUALIFY + coalesce(lag(owner) OVER (PARTITION BY account_address ORDER BY start_block_id),'abc') <> owner ) SELECT - *, - {{ dbt_utils.generate_surrogate_key( - ['account_address','start_block_id'] - ) }} AS token_account_owners_id, - SYSDATE() AS inserted_timestamp, - SYSDATE() AS modified_timestamp, + account_address, + owner, + start_block_id, + LEAD(start_block_id) OVER ( + PARTITION BY account_address + ORDER BY + start_block_id + ) AS end_block_id, + _inserted_timestamp, + {{ dbt_utils.generate_surrogate_key(['account_address','start_block_id']) }} AS token_account_owners_id, + sysdate() AS inserted_timestamp, + sysdate() AS modified_timestamp, '{{ invocation_id }}' AS _invocation_id FROM - pre_final -WHERE - start_block_id <> end_block_id - OR end_block_id IS NULL + changed_states diff --git a/models/silver/accounts/silver__token_account_owners.yml b/models/silver/accounts/silver__token_account_owners.yml index b6f1cd71..6ab1a14f 100644 --- a/models/silver/accounts/silver__token_account_owners.yml +++ b/models/silver/accounts/silver__token_account_owners.yml @@ -1,24 +1,50 @@ version: 2 models: - name: silver__token_account_owners + recent_date_filter: &recent_date_filter + config: + where: inserted_timestamp >= current_date - 7 tests: - dbt_utils.unique_combination_of_columns: combination_of_columns: - ACCOUNT_ADDRESS - OWNER - START_BLOCK_ID + <<: *recent_date_filter columns: - name: ACCOUNT_ADDRESS description: address of token account tests: - - not_null + - not_null: *recent_date_filter - name: OWNER description: address of owner tests: - - not_null + - not_null: *recent_date_filter - name: START_BLOCK_ID description: block where this ownership begins tests: - - not_null + - not_null: *recent_date_filter - name: END_BLOCK_ID - description: block where this ownership ends, null value represents current ownership \ No newline at end of file + description: block where this ownership ends, null value represents current ownership + - name: _INSERTED_TIMESTAMP + description: "{{ doc('_inserted_timestamp') }}" + tests: + - not_null: *recent_date_filter + - name: TOKEN_ACCOUNT_OWNERS_ID + description: '{{ doc("pk") }}' + tests: + - unique: *recent_date_filter + - name: INSERTED_TIMESTAMP + description: '{{ doc("inserted_timestamp") }}' + tests: + - not_null: *recent_date_filter + - name: MODIFIED_TIMESTAMP + description: '{{ doc("modified_timestamp") }}' + tests: + - not_null: *recent_date_filter + - name: _INVOCATION_ID + description: '{{ doc("_invocation_id") }}' + tests: + - not_null: + name: test_silver__not_null_token_account_owners__invocation_id + <<: *recent_date_filter \ No newline at end of file diff --git a/models/silver/accounts/silver__token_account_owners_2.sql b/models/silver/accounts/silver__token_account_owners_2.sql deleted file mode 100644 index ce725c77..00000000 --- a/models/silver/accounts/silver__token_account_owners_2.sql +++ /dev/null @@ -1,130 +0,0 @@ -{{ config( - materialized = 'incremental', - incremental_strategy = "delete+insert", - incremental_predicates = ['min_value_predicate', 'start_block_id', generate_view_name(this) ~ ".start_block_id >= " ~ generate_tmp_view_name(this) ~ ".start_block_id"], - unique_key = ["account_address"], - cluster_by = ["round(start_block_id,-5)"], - post_hook = enable_search_optimization('{{this.schema}}','{{this.identifier}}','ON EQUALITY(account_address)'), - tags = ['scheduled_non_core'] -) }} - - -WITH new_events AS ( - SELECT - account_address, - owner, - start_block_id, - _inserted_timestamp, - FROM - {{ ref('silver__token_account_owners_intermediate') }} - WHERE - {% if is_incremental() %} - _inserted_timestamp > ( - SELECT - MAX(_inserted_timestamp) - FROM - {{ this }} - ) - /* TODO remove upper bound after backfill is done */ - AND _inserted_timestamp < ( - SELECT - MAX(_inserted_timestamp) + INTERVAL '120 day' - FROM - {{ this }} - ) - {% else %} - _inserted_timestamp :: DATE = '2022-09-01' - {% endif %} - AND start_block_id <> coalesce(end_block_id,-1) -), - -{% if is_incremental() %} -distinct_states AS ( - SELECT - account_address, - MIN(start_block_id) AS min_block_id - FROM - new_events - GROUP BY - 1 -), -events_to_reprocess AS ( - SELECT - C.account_address, - C.owner, - C.start_block_id, - C._inserted_timestamp, - FROM - {{ ref('silver__token_account_owners_intermediate') }} C - JOIN - distinct_states d - USING(account_address) - WHERE - C.start_block_id >= d.min_block_id - AND start_block_id <> coalesce(end_block_id,-1) - AND _inserted_timestamp <= (SELECT max(_inserted_timestamp) FROM new_events) -), -current_state AS ( - select - C.account_address, - C.owner, - C.start_block_id, - C._inserted_timestamp, - from - {{ this }} C - JOIN - distinct_states d - USING(account_address) - WHERE - ( - C.end_block_id >= d.min_block_id - OR - C.end_block_id IS NULL - ) - QUALIFY - row_number() over (partition by account_address order by start_block_id) = 1 - -), -{% endif %} -all_states AS ( - {% if is_incremental() %} - SELECT - * - FROM - current_state - UNION ALL - SELECT - * - FROM - events_to_reprocess - {% else %} - SELECT - * - FROM - new_events - {% endif %} -), -changed_states AS ( - SELECT - * - FROM - all_states - QUALIFY - coalesce(lag(owner) OVER (PARTITION BY account_address ORDER BY start_block_id),'abc') <> owner -) -SELECT - account_address, - owner, - start_block_id, - LEAD(start_block_id) OVER ( - PARTITION BY account_address - ORDER BY - start_block_id - ) AS end_block_id, - _inserted_timestamp, - {{ dbt_utils.generate_surrogate_key(['account_address','start_block_id']) }} AS token_account_owners_id, - sysdate() AS inserted_timestamp, - sysdate() AS modified_timestamp, - '{{ invocation_id }}' AS _invocation_id -FROM - changed_states diff --git a/models/silver/accounts/silver__token_account_owners_2.yml b/models/silver/accounts/silver__token_account_owners_2.yml deleted file mode 100644 index bb5580d3..00000000 --- a/models/silver/accounts/silver__token_account_owners_2.yml +++ /dev/null @@ -1,50 +0,0 @@ -version: 2 -models: - - name: silver__token_account_owners_2 - recent_date_filter: &recent_date_filter - config: - where: inserted_timestamp >= current_date - 7 - tests: - - dbt_utils.unique_combination_of_columns: - combination_of_columns: - - ACCOUNT_ADDRESS - - OWNER - - START_BLOCK_ID - <<: *recent_date_filter - columns: - - name: ACCOUNT_ADDRESS - description: address of token account - tests: - - not_null: *recent_date_filter - - name: OWNER - description: address of owner - tests: - - not_null: *recent_date_filter - - name: START_BLOCK_ID - description: block where this ownership begins - tests: - - not_null: *recent_date_filter - - name: END_BLOCK_ID - description: block where this ownership ends, null value represents current ownership - - name: _INSERTED_TIMESTAMP - description: "{{ doc('_inserted_timestamp') }}" - tests: - - not_null: *recent_date_filter - - name: TOKEN_ACCOUNT_OWNERS_ID - description: '{{ doc("pk") }}' - tests: - - unique: *recent_date_filter - - name: INSERTED_TIMESTAMP - description: '{{ doc("inserted_timestamp") }}' - tests: - - not_null: *recent_date_filter - - name: MODIFIED_TIMESTAMP - description: '{{ doc("modified_timestamp") }}' - tests: - - not_null: *recent_date_filter - - name: _INVOCATION_ID - description: '{{ doc("_invocation_id") }}' - tests: - - not_null: - name: test_silver__not_null_token_account_owners__invocation_id - <<: *recent_date_filter \ No newline at end of file