remove upper bound (#571)

* remove upper bound

* rename

* add owner to SO
This commit is contained in:
desmond-hui 2024-06-10 06:45:51 -07:00 committed by GitHub
parent 366b842346
commit 1002c5aac7
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 132 additions and 222 deletions

View File

@ -1,59 +1,123 @@
{{ config(
materialized = 'table',
post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION ON EQUALITY(account_address,owner);",
tags = ['daily']
materialized = 'incremental',
incremental_strategy = "delete+insert",
incremental_predicates = ['min_value_predicate', 'start_block_id', generate_view_name(this) ~ ".start_block_id >= " ~ generate_tmp_view_name(this) ~ ".start_block_id"],
unique_key = ["account_address"],
cluster_by = ["round(start_block_id,-5)"],
post_hook = enable_search_optimization('{{this.schema}}','{{this.identifier}}','ON EQUALITY(account_address, owner)'),
tags = ['scheduled_non_core']
) }}
/* need to rebucket and regroup the intermediate model due to possibility of change events coming in out of order */
WITH rebucket AS (
WITH new_events AS (
SELECT
account_address,
owner,
start_block_id,
conditional_change_event(owner) over (
PARTITION BY account_address
ORDER BY
start_block_id
) AS bucket
_inserted_timestamp,
FROM
{{ ref('silver__token_account_owners_intermediate') }}
WHERE
{% if is_incremental() %}
_inserted_timestamp > (
SELECT
MAX(_inserted_timestamp)
FROM
{{ this }}
)
{% else %}
_inserted_timestamp :: DATE = '2022-09-01'
{% endif %}
AND start_block_id <> coalesce(end_block_id,-1)
),
regroup AS (
{% if is_incremental() %}
distinct_states AS (
SELECT
account_address,
owner,
bucket,
MIN(start_block_id) AS start_block_id
MIN(start_block_id) AS min_block_id
FROM
rebucket
new_events
GROUP BY
account_address,
owner,
bucket
1
),
pre_final AS (
events_to_reprocess AS (
SELECT
account_address,
owner,
start_block_id,
LEAD(start_block_id) ignore nulls over (
PARTITION BY account_address
ORDER BY
bucket
) AS end_block_id
C.account_address,
C.owner,
C.start_block_id,
C._inserted_timestamp,
FROM
regroup
{{ ref('silver__token_account_owners_intermediate') }} C
JOIN
distinct_states d
USING(account_address)
WHERE
C.start_block_id >= d.min_block_id
AND start_block_id <> coalesce(end_block_id,-1)
AND _inserted_timestamp <= (SELECT max(_inserted_timestamp) FROM new_events)
),
current_state AS (
select
C.account_address,
C.owner,
C.start_block_id,
C._inserted_timestamp,
from
{{ this }} C
JOIN
distinct_states d
USING(account_address)
WHERE
(
C.end_block_id >= d.min_block_id
OR
C.end_block_id IS NULL
)
QUALIFY
row_number() over (partition by account_address order by start_block_id) = 1
),
{% endif %}
all_states AS (
{% if is_incremental() %}
SELECT
*
FROM
current_state
UNION ALL
SELECT
*
FROM
events_to_reprocess
{% else %}
SELECT
*
FROM
new_events
{% endif %}
),
changed_states AS (
SELECT
*
FROM
all_states
QUALIFY
coalesce(lag(owner) OVER (PARTITION BY account_address ORDER BY start_block_id),'abc') <> owner
)
SELECT
*,
{{ dbt_utils.generate_surrogate_key(
['account_address','start_block_id']
) }} AS token_account_owners_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp,
account_address,
owner,
start_block_id,
LEAD(start_block_id) OVER (
PARTITION BY account_address
ORDER BY
start_block_id
) AS end_block_id,
_inserted_timestamp,
{{ dbt_utils.generate_surrogate_key(['account_address','start_block_id']) }} AS token_account_owners_id,
sysdate() AS inserted_timestamp,
sysdate() AS modified_timestamp,
'{{ invocation_id }}' AS _invocation_id
FROM
pre_final
WHERE
start_block_id <> end_block_id
OR end_block_id IS NULL
changed_states

View File

@ -1,24 +1,50 @@
version: 2
models:
- name: silver__token_account_owners
recent_date_filter: &recent_date_filter
config:
where: inserted_timestamp >= current_date - 7
tests:
- dbt_utils.unique_combination_of_columns:
combination_of_columns:
- ACCOUNT_ADDRESS
- OWNER
- START_BLOCK_ID
<<: *recent_date_filter
columns:
- name: ACCOUNT_ADDRESS
description: address of token account
tests:
- not_null
- not_null: *recent_date_filter
- name: OWNER
description: address of owner
tests:
- not_null
- not_null: *recent_date_filter
- name: START_BLOCK_ID
description: block where this ownership begins
tests:
- not_null
- not_null: *recent_date_filter
- name: END_BLOCK_ID
description: block where this ownership ends, null value represents current ownership
description: block where this ownership ends, null value represents current ownership
- name: _INSERTED_TIMESTAMP
description: "{{ doc('_inserted_timestamp') }}"
tests:
- not_null: *recent_date_filter
- name: TOKEN_ACCOUNT_OWNERS_ID
description: '{{ doc("pk") }}'
tests:
- unique: *recent_date_filter
- name: INSERTED_TIMESTAMP
description: '{{ doc("inserted_timestamp") }}'
tests:
- not_null: *recent_date_filter
- name: MODIFIED_TIMESTAMP
description: '{{ doc("modified_timestamp") }}'
tests:
- not_null: *recent_date_filter
- name: _INVOCATION_ID
description: '{{ doc("_invocation_id") }}'
tests:
- not_null:
name: test_silver__not_null_token_account_owners__invocation_id
<<: *recent_date_filter

View File

@ -1,130 +0,0 @@
{{ config(
materialized = 'incremental',
incremental_strategy = "delete+insert",
incremental_predicates = ['min_value_predicate', 'start_block_id', generate_view_name(this) ~ ".start_block_id >= " ~ generate_tmp_view_name(this) ~ ".start_block_id"],
unique_key = ["account_address"],
cluster_by = ["round(start_block_id,-5)"],
post_hook = enable_search_optimization('{{this.schema}}','{{this.identifier}}','ON EQUALITY(account_address)'),
tags = ['scheduled_non_core']
) }}
WITH new_events AS (
SELECT
account_address,
owner,
start_block_id,
_inserted_timestamp,
FROM
{{ ref('silver__token_account_owners_intermediate') }}
WHERE
{% if is_incremental() %}
_inserted_timestamp > (
SELECT
MAX(_inserted_timestamp)
FROM
{{ this }}
)
/* TODO remove upper bound after backfill is done */
AND _inserted_timestamp < (
SELECT
MAX(_inserted_timestamp) + INTERVAL '120 day'
FROM
{{ this }}
)
{% else %}
_inserted_timestamp :: DATE = '2022-09-01'
{% endif %}
AND start_block_id <> coalesce(end_block_id,-1)
),
{% if is_incremental() %}
distinct_states AS (
SELECT
account_address,
MIN(start_block_id) AS min_block_id
FROM
new_events
GROUP BY
1
),
events_to_reprocess AS (
SELECT
C.account_address,
C.owner,
C.start_block_id,
C._inserted_timestamp,
FROM
{{ ref('silver__token_account_owners_intermediate') }} C
JOIN
distinct_states d
USING(account_address)
WHERE
C.start_block_id >= d.min_block_id
AND start_block_id <> coalesce(end_block_id,-1)
AND _inserted_timestamp <= (SELECT max(_inserted_timestamp) FROM new_events)
),
current_state AS (
select
C.account_address,
C.owner,
C.start_block_id,
C._inserted_timestamp,
from
{{ this }} C
JOIN
distinct_states d
USING(account_address)
WHERE
(
C.end_block_id >= d.min_block_id
OR
C.end_block_id IS NULL
)
QUALIFY
row_number() over (partition by account_address order by start_block_id) = 1
),
{% endif %}
all_states AS (
{% if is_incremental() %}
SELECT
*
FROM
current_state
UNION ALL
SELECT
*
FROM
events_to_reprocess
{% else %}
SELECT
*
FROM
new_events
{% endif %}
),
changed_states AS (
SELECT
*
FROM
all_states
QUALIFY
coalesce(lag(owner) OVER (PARTITION BY account_address ORDER BY start_block_id),'abc') <> owner
)
SELECT
account_address,
owner,
start_block_id,
LEAD(start_block_id) OVER (
PARTITION BY account_address
ORDER BY
start_block_id
) AS end_block_id,
_inserted_timestamp,
{{ dbt_utils.generate_surrogate_key(['account_address','start_block_id']) }} AS token_account_owners_id,
sysdate() AS inserted_timestamp,
sysdate() AS modified_timestamp,
'{{ invocation_id }}' AS _invocation_id
FROM
changed_states

View File

@ -1,50 +0,0 @@
version: 2
models:
- name: silver__token_account_owners_2
recent_date_filter: &recent_date_filter
config:
where: inserted_timestamp >= current_date - 7
tests:
- dbt_utils.unique_combination_of_columns:
combination_of_columns:
- ACCOUNT_ADDRESS
- OWNER
- START_BLOCK_ID
<<: *recent_date_filter
columns:
- name: ACCOUNT_ADDRESS
description: address of token account
tests:
- not_null: *recent_date_filter
- name: OWNER
description: address of owner
tests:
- not_null: *recent_date_filter
- name: START_BLOCK_ID
description: block where this ownership begins
tests:
- not_null: *recent_date_filter
- name: END_BLOCK_ID
description: block where this ownership ends, null value represents current ownership
- name: _INSERTED_TIMESTAMP
description: "{{ doc('_inserted_timestamp') }}"
tests:
- not_null: *recent_date_filter
- name: TOKEN_ACCOUNT_OWNERS_ID
description: '{{ doc("pk") }}'
tests:
- unique: *recent_date_filter
- name: INSERTED_TIMESTAMP
description: '{{ doc("inserted_timestamp") }}'
tests:
- not_null: *recent_date_filter
- name: MODIFIED_TIMESTAMP
description: '{{ doc("modified_timestamp") }}'
tests:
- not_null: *recent_date_filter
- name: _INVOCATION_ID
description: '{{ doc("_invocation_id") }}'
tests:
- not_null:
name: test_silver__not_null_token_account_owners__invocation_id
<<: *recent_date_filter