An 4281/token account owners (#501)

* wip

* wip

* add core, clean up

* update per pr comments

* update incremental config

* update logic

* FR model

* logic update for daily fr

* remove model

* recency test for token_account_owners

* refs and sysdate

---------

Co-authored-by: Eric Laurello <eric.laurello@flipsidecrypto.com>
This commit is contained in:
tarikceric 2024-04-04 19:24:02 -05:00 committed by GitHub
parent 37bbc6bce5
commit 9184b7f677
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 137 additions and 69 deletions

View File

@ -42,4 +42,4 @@ jobs:
dbt deps
- name: Run DBT Jobs
run: |
dbt test -s "solana_models,./models" --exclude tag:test_weekly
dbt test -s "solana_models,./models,tag:test_daily" --exclude tag:test_weekly

View File

@ -0,0 +1,15 @@
{{ config(
materialized='view',
tags = ['daily']
)
}}
SELECT
account_address,
owner,
start_block_id,
end_block_id,
token_account_owners_id AS fact_token_account_owners_id,
inserted_timestamp,
modified_timestamp
FROM {{ ref('silver__token_account_owners') }}

View File

@ -0,0 +1,33 @@
version: 2
models:
- name: core__fact_token_account_owners
description: Contains token account addresses and the range of blocks from which an address had ownership. Table updated daily.
columns:
- name: ACCOUNT_ADDRESS
description: Address of token account
tests:
- dbt_expectations.expect_column_to_exist
- name: OWNER
description: Address of owner
tests:
- dbt_expectations.expect_column_to_exist
- name: START_BLOCK_ID
description: Block where this ownership begins
tests:
- dbt_expectations.expect_column_to_exist
- name: END_BLOCK_ID
description: Block where this ownership ends, null value represents current ownership
tests:
- dbt_expectations.expect_column_to_exist
- name: FACT_TOKEN_ACCOUNT_OWNERS_ID
description: "{{ doc('id') }}"
tests:
- dbt_expectations.expect_column_to_exist
- name: MODIFIED_TIMESTAMP
description: "{{ doc('modified_timestamp') }}"
tests:
- dbt_expectations.expect_column_to_exist
- name: INSERTED_TIMESTAMP
description: "{{ doc('inserted_timestamp') }}"
tests:
- dbt_expectations.expect_column_to_exist

View File

@ -1,57 +1,59 @@
{{ config(
materialized = 'incremental',
incremental_strategy = 'delete+insert',
unique_key = ["account_address"],
cluster_by = ['_inserted_timestamp::DATE'],
post_hook = enable_search_optimization('{{this.schema}}','{{this.identifier}}'),
full_refresh = false,
enabled = false,
materialized = 'table',
post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION ON EQUALITY(account_address,owner);",
tags = ['daily']
) }}
/* need to rebucket and regroup the intermediate model due to possibility of change events coming in out of order */
with last_updated_at as (
select max(_inserted_timestamp) as _inserted_timestamp
from {{ ref('silver__token_account_owners_intermediate') }}
)
, changed_addresses as (
select distinct account_address
from {{ ref('silver__token_account_owners_intermediate') }}
{% if is_incremental() %}
where _inserted_timestamp > (select max(_inserted_timestamp) from {{ this }})
{% endif %}
),
rebucket as (
select
o.account_address,
o.owner,
o.start_block_id,
conditional_change_event(owner) over (partition by o.account_address order by o.start_block_id) as bucket
from {{ ref('silver__token_account_owners_intermediate') }} o
inner join changed_addresses c on o.account_address = c.account_address
),
regroup as (
select
account_address,
owner,
bucket,
min(start_block_id) as start_block_id
from rebucket
group by 1,2,3
),
pre_final as (
select
WITH rebucket AS (
SELECT
account_address,
owner,
start_block_id,
lead(start_block_id) ignore nulls over (
PARTITION BY account_address
ORDER BY bucket
) as end_block_id,
_inserted_timestamp
from regroup
join last_updated_at
conditional_change_event(owner) over (
PARTITION BY account_address
ORDER BY
start_block_id
) AS bucket
FROM
{{ ref('silver__token_account_owners_intermediate') }}
),
regroup AS (
SELECT
account_address,
owner,
bucket,
MIN(start_block_id) AS start_block_id
FROM
rebucket
GROUP BY
account_address,
owner,
bucket
),
pre_final AS (
SELECT
account_address,
owner,
start_block_id,
LEAD(start_block_id) ignore nulls over (
PARTITION BY account_address
ORDER BY
bucket
) AS end_block_id
FROM
regroup
)
select *
from pre_final
where start_block_id <> end_block_id
or end_block_id is null
SELECT
*,
{{ dbt_utils.generate_surrogate_key(
['account_address','start_block_id']
) }} AS token_account_owners_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp,
'{{ invocation_id }}' AS _invocation_id
FROM
pre_final
WHERE
start_block_id <> end_block_id
OR end_block_id IS NULL

View File

@ -21,8 +21,4 @@ models:
tests:
- not_null
- name: END_BLOCK_ID
description: block where this ownership ends, null value represents current ownership
- name: _INSERTED_TIMESTAMP
description: "{{ doc('_inserted_timestamp') }}"
tests:
- not_null
description: block where this ownership ends, null value represents current ownership

View File

@ -3,14 +3,14 @@
unique_key = ["account_address","owner","start_block_id"],
cluster_by = ['_inserted_timestamp::DATE'],
full_refresh = false,
enabled = false,
tags = ['scheduled_non_core']
) }}
/*
for incrementals also select all null end date accounts and combine
join to eliminate accounts that are not in the subset
remove all accounts that have the same owner + start block + end block
*/
with last_updated_at as (
select max(_inserted_timestamp) as _inserted_timestamp
from {{ ref('silver__token_account_ownership_events') }}
@ -28,7 +28,6 @@ base as (
else 2
end as same_block_order_index
from {{ ref('silver__token_account_ownership_events') }}
/* incremental condition here */
{% if is_incremental() %}
where _inserted_timestamp >= (select max(_inserted_timestamp) from {{ this }})
{% endif %}

View File

@ -1,10 +1,10 @@
{{ config(
materialized = 'incremental',
unique_key = ["block_id","tx_id","index","inner_index"],
incremental_predicates = ['DBT_INTERNAL_DEST.block_timestamp::date >= LEAST(current_date-7,(select min(block_timestamp)::date from ' ~ generate_tmp_view_name(this) ~ '))'],
incremental_predicates = ["dynamic_range_predicate", "block_timestamp::date"],
cluster_by = ['block_timestamp::DATE','_inserted_timestamp::DATE'],
full_refresh = false,
enabled = false,
tags = ['scheduled_non_core'],
) }}
with base_events as (
@ -33,7 +33,7 @@ ownership_change_events as (
from base_events
where event_type in ('assign','assignWithSeed','close','closeAccount','create','createAccount','createAccountWithSeed','createIdempotent',
'initializeAccount','initializeAccount2','initializeAccount3','revoke','setAuthority')
union
union all
select
block_timestamp,
block_id,
@ -65,7 +65,7 @@ combined as (
_inserted_timestamp
from ownership_change_events
where event_type in ('assign','assignWithSeed')
union
union all
select
block_timestamp,
block_id,
@ -80,7 +80,7 @@ combined as (
_inserted_timestamp
from ownership_change_events
where event_type in ('initializeAccount','initializeAccount2','initializeAccount3')
union
union all
select
block_timestamp,
block_id,
@ -95,7 +95,7 @@ combined as (
_inserted_timestamp
from ownership_change_events
where event_type in ('close')
union
union all
select
block_timestamp,
block_id,
@ -110,7 +110,7 @@ combined as (
_inserted_timestamp
from ownership_change_events
where event_type in ('create','createIdempotent')
union
union all
select
block_timestamp,
block_id,
@ -125,7 +125,7 @@ combined as (
_inserted_timestamp
from ownership_change_events
where event_type in ('createAccount','createAccountWithSeed')
union
union all
select
block_timestamp,
block_id,
@ -140,7 +140,7 @@ combined as (
_inserted_timestamp
from ownership_change_events
where event_type in ('revoke')
union
union all
select
block_timestamp,
block_id,
@ -158,7 +158,7 @@ combined as (
and (instruction:parsed:info:authorityType::string is null
or instruction:parsed:info:authorityType::string = 'accountOwner')
and owner is not null /* some events have an invalid new authority object even though tx is successful, ex: 4oHAf4fmEFmdiYG6Rchh4FoMH4de97iwnZqHEYrvQ5oo3UgwumPxkkkX6KAWCwmk4e5GzsHXqFQYVa2VyoQUYyyD */
union
union all
select
block_timestamp,
block_id,

View File

@ -0,0 +1,23 @@
{{ config(
tags = ["test_daily"]
) }}
WITH most_recent_block AS (
SELECT
MAX(start_block_id) AS recent_block_id
FROM
{{ ref('silver__token_account_owners') }}
)
SELECT
A.recent_block_id,
b.block_id,
b.block_timestamp
FROM
most_recent_block A
LEFT JOIN {{ ref('silver__blocks') }} b
ON A.recent_block_id = b.block_id
WHERE
b.block_timestamp <= (
SYSDATE() - INTERVAL '12 HOUR'
)