An 2149/account owners model (#237)

* wip

* wip

* wip

* change dev owner to use new access role

* wip

* update names

* wip

* use macro for dynamic incremental logic

* change names, split stakes acct and token acct into own tables

* workflow to backfill

* disable new models from incremental while it backfills

* include mint of token account on creation

* inner index should not have no null tests, add mint to yml

* make inner_index -1 so that it can be included in unique key merge

* pr fixes

* add dupe tests

* change to full refresh only, works faster for backfills at least

* fix typo

* fix mint null test, only for create events

* account initialization also has mint value

* add ordering based on events to fix same block event issues

* simplify temporary exclude
This commit is contained in:
desmond-hui 2023-02-09 14:37:48 -08:00 committed by GitHub
parent 7ec47cb4c3
commit 12442761fc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 704 additions and 2 deletions

View File

@ -0,0 +1,44 @@
name: dbt_run_scheduled_batch_backfill
run-name: dbt_run_scheduled_batch_backfill
on:
workflow_dispatch:
schedule:
# Runs every 5 mins, adjust to appropriate schedule as needed
- cron: '*/5 0 * * *'
env:
DBT_PROFILES_DIR: "${{ secrets.DBT_PROFILES_DIR }}"
ACCOUNT: "${{ secrets.ACCOUNT }}"
ROLE: "${{ secrets.ROLE }}"
USER: "${{ secrets.USER }}"
PASSWORD: "${{ secrets.PASSWORD }}"
REGION: "${{ secrets.REGION }}"
DATABASE: "${{ secrets.DATABASE }}"
WAREHOUSE: DBT_EMERGENCY
SCHEMA: "${{ secrets.SCHEMA }}"
concurrency:
group: ${{ github.workflow }}
jobs:
run_dbt_jobs:
runs-on: ubuntu-latest
environment:
name: workflow_prod
steps:
- uses: actions/checkout@v3
- uses: actions/setup-python@v1
with:
python-version: "3.7.x"
- name: install dependencies
run: |
pip3 install dbt-snowflake==${{ secrets.DBT_VERSION }} cli_passthrough requests click
dbt deps
- name: Run DBT Jobs
run: |
dbt run -s models/silver/accounts/silver__token_account_ownership_events.sql models/silver/accounts/silver__token_account_owners.sql models/silver/accounts/silver__stake_account_ownership_events.sql

View File

@ -44,6 +44,6 @@ jobs:
dbt run-operation run_sp_refresh_external_tables_full
dbt run -s models/silver/silver__transactions.sql models/silver/silver__blocks.sql models/silver/silver__votes.sql models/silver/silver___inner_instructions.sql models/silver/silver___instructions.sql models/silver/silver__events.sql models/silver/silver___all_undecoded_instructions_data.sql
dbt run-operation run_sp_refresh_external_tables_full
dbt run -s ./models --exclude models/core models/silver/silver__transactions.sql models/silver/silver__blocks.sql models/silver/silver__votes.sql models/silver/silver___inner_instructions.sql models/silver/silver___instructions.sql models/silver/silver__events.sql models/silver/silver___all_undecoded_instructions_data.sql tag:share models/streamline models/silver/silver__daily_signers.sql models/silver/silver__signers.sql
dbt run -s ./models --exclude models/core models/silver/silver__transactions.sql models/silver/silver__blocks.sql models/silver/silver__votes.sql models/silver/silver___inner_instructions.sql models/silver/silver___instructions.sql models/silver/silver__events.sql models/silver/silver___all_undecoded_instructions_data.sql tag:share models/streamline models/silver/silver__daily_signers.sql models/silver/silver__signers.sql models/silver/accounts
dbt run --var '{"UPDATE_SNOWFLAKE_TAGS":True}' -s ./models/core --exclude models/core/core__ez_signers.sql

View File

@ -0,0 +1,15 @@
{% macro get_batch_load_logic(model_name, batch_size_days, end_date) -%}
{% set query %}
select max(_inserted_timestamp)::date::string from {{ model_name }};
{% endset %}
{% set max_date = run_query(query).columns[0].values()[0] %}
{% if max_date >= '2022-09-01' and max_date < '2022-09-05' %}
{{ "and _inserted_timestamp between (select max(_inserted_timestamp) from " ~ model_name ~ ") and (select dateadd('hour',4,max(_inserted_timestamp)) from " ~ model_name ~ ")" }}
{% elif max_date >= '2022-09-05' and max_date < end_date %}
{{ "and _inserted_timestamp between (select max(_inserted_timestamp) from " ~ model_name ~ ") and (select dateadd('day',"~ batch_size_days ~",max(_inserted_timestamp)) from " ~ model_name ~ ")" }}
{% else %}
{{ "and _inserted_timestamp >= (select max(_inserted_timestamp) from " ~ model_name}}
{% endif %}
{%- endmacro %}

View File

@ -1,6 +1,6 @@
{% macro run_sp_create_prod_clone() %}
{% set clone_query %}
call solana._internal.create_prod_clone('solana', 'solana_dev', 'internal_dev');
call solana._internal.create_prod_clone('solana', 'solana_dev', 'solana_dev_owner');
{% endset %}
{% do run_query(clone_query) %}

View File

@ -0,0 +1,164 @@
{{ config(
materialized = 'incremental',
unique_key = ["block_id","tx_id","index","inner_index","authority_type"],
incremental_predicates = ['DBT_INTERNAL_DEST.block_timestamp::date >= LEAST(current_date-7,(select min(block_timestamp)::date from ' ~ generate_tmp_view_name(this) ~ '))'],
cluster_by = ['block_timestamp::DATE','_inserted_timestamp::DATE']
) }}
with base_events as (
select *
from {{ ref('silver__events')}}
where succeeded
{% if is_incremental() %}
{% if execute %}
{{ get_batch_load_logic(this,15,'2023-02-05') }}
{% endif %}
{% else %}
and _inserted_timestamp::date between '2022-08-12' and '2022-09-01'
{% endif %}
),
ownership_change_events as (
select
block_timestamp,
block_id,
tx_id,
succeeded,
index,
-1 as inner_index,
event_type,
instruction,
_inserted_timestamp
from base_events
where event_type in ('authorize','authorizeChecked','authorizeWithSeed','initialize','initializeChecked')
union
select
block_timestamp,
block_id,
tx_id,
succeeded,
e.index,
ii.index as inner_index,
ii.value :parsed :type,
ii.value as instruction,
_inserted_timestamp
from base_events e,
TABLE(FLATTEN(e.inner_instruction :instructions)) ii
WHERE
ii.value :parsed :type :: STRING IN ('authorize','authorizeChecked','authorizeWithSeed','initialize','initializeChecked')
),
combined as (
select
block_timestamp,
block_id,
tx_id,
succeeded,
index,
inner_index,
event_type,
instruction:parsed:info:stakeAccount::string as account_address,
lower(instruction:parsed:info:authorityType::string) as authority_type,
instruction:parsed:info:newAuthority::string as authority,
_inserted_timestamp
from ownership_change_events
where event_type in ('authorize','authorizeChecked')
and lower(instruction:parsed:info:authorityType::string) = 'withdrawer' /* probably handle stake accounts differently and include both stake and withdraw authorities */
and instruction:parsed:info:voteAccount::string is null
union
select
block_timestamp,
block_id,
tx_id,
succeeded,
index,
inner_index,
event_type,
instruction:parsed:info:stakeAccount::string as account_address,
lower(instruction:parsed:info:authorityType::string) as authority_type,
instruction:parsed:info:newAuthority::string as authority,
_inserted_timestamp
from ownership_change_events
where event_type in ('authorize','authorizeChecked')
and lower(instruction:parsed:info:authorityType::string) = 'staker' /* probably handle stake accounts differently and include both stake and withdraw authorities */
and instruction:parsed:info:voteAccount::string is null
union
select
block_timestamp,
block_id,
tx_id,
succeeded,
index,
inner_index,
event_type,
instruction:parsed:info:stakeAccount::string as account_address,
lower(instruction:parsed:info:authorityType::string) as authority_type,
coalesce(instruction:parsed:info:newAuthorized::string, instruction:parsed:info:newAuthority::string),
_inserted_timestamp
from ownership_change_events
where event_type in ('authorizeWithSeed')
and instruction:parsed:info:voteAccount::string is null /* handle vote account changes elsewhere? */
union
select
block_timestamp,
block_id,
tx_id,
succeeded,
index,
inner_index,
event_type,
instruction:parsed:info:stakeAccount::string as account_address,
'withdrawer' as authority_type,
coalesce(instruction:parsed:info:authorized:withdrawer::string,instruction:parsed:info:authorizedWithdrawer::string),
_inserted_timestamp
from ownership_change_events
where event_type in ('initialize')
and instruction:parsed:info:voteAccount::string is null /* handle vote account changes elsewhere? */
union
select
block_timestamp,
block_id,
tx_id,
succeeded,
index,
inner_index,
event_type,
instruction:parsed:info:stakeAccount::string as account_address,
'staker' as authority_type,
coalesce(instruction:parsed:info:authorized:staker::string,instruction:parsed:info:authorizedStaker::string),
_inserted_timestamp
from ownership_change_events
where event_type in ('initialize')
and instruction:parsed:info:voteAccount::string is null /* handle vote account changes elsewhere? */
union
select
block_timestamp,
block_id,
tx_id,
succeeded,
index,
inner_index,
event_type,
instruction:parsed:info:stakeAccount::string as account_address,
'withdrawer' as authority_type,
instruction:parsed:info:withdrawer::string,
_inserted_timestamp
from ownership_change_events
where event_type in ('initializeChecked')
union
select
block_timestamp,
block_id,
tx_id,
succeeded,
index,
inner_index,
event_type,
instruction:parsed:info:stakeAccount::string as account_address,
'staker' as authority_type,
instruction:parsed:info:staker::string,
_inserted_timestamp
from ownership_change_events
where event_type in ('initializeChecked')
)
select *
from combined
qualify(row_number() over (partition by tx_id, account_address, authority_type order by index desc, inner_index desc)) = 1

View File

@ -0,0 +1,58 @@
version: 2
models:
- name: silver__stake_account_ownership_events
tests:
- dbt_utils.unique_combination_of_columns:
combination_of_columns:
- BLOCK_ID
- TX_ID
- INDEX
- INNER_INDEX
- AUTHORITY_TYPE
columns:
- name: BLOCK_TIMESTAMP
description: "{{ doc('block_timestamp') }}"
tests:
- not_null:
where: block_id > 39824213
- dbt_expectations.expect_row_values_to_have_recent_data:
datepart: day
interval: 2
- name: BLOCK_ID
description: "{{ doc('block_id') }}"
tests:
- not_null
- name: TX_ID
description: "{{ doc('tx_id') }}"
tests:
- not_null
- name: SUCCEEDED
description: "{{ doc('tx_succeeded') }}"
tests:
- not_null
- name: INDEX
description: Location of the instruction within a transaction
tests:
- not_null
- name: INNER_INDEX
description: Specifies the inner_instruction location within the instruction identified by INDEX
- name: EVENT_TYPE
description: type of account change ownership event
tests:
- not_null
- name: ACCOUNT_ADDRESS
description: address of stake account
tests:
- not_null
- name: AUTHORITY_TYPE
description: type of ownership being transferred, can be `staker` or `withdrawer`
tests:
- not_null
- name: AUTHORITY
description: address of new staker or withdrawer
tests:
- not_null
- name: _INSERTED_TIMESTAMP
description: "{{ doc('_inserted_timestamp') }}"
tests:
- not_null

View File

@ -0,0 +1,54 @@
{{ config(
materialized = 'table',
unique_key = ["account_address"],
cluster_by = ['_inserted_timestamp::DATE'],
post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION",
) }}
/* need to rebucket and regroup the intermediate model due to possibility of change events coming in out of order */
with last_updated_at as (
select max(_inserted_timestamp) as _inserted_timestamp
from {{ ref('silver__token_account_owners_intermediate') }}
)
, changed_addresses as (
select distinct account_address
from {{ ref('silver__token_account_owners_intermediate') }}
{% if is_incremental() %}
where _inserted_timestamp > (select max(_inserted_timestamp) from {{ this }})
{% endif %}
),
rebucket as (
select
o.account_address,
o.owner,
o.start_block_id,
conditional_change_event(owner) over (partition by o.account_address order by o.start_block_id) as bucket
from {{ ref('silver__token_account_owners_intermediate') }} o
inner join changed_addresses c on o.account_address = c.account_address
),
regroup as (
select
account_address,
owner,
bucket,
min(start_block_id) as start_block_id
from rebucket
group by 1,2,3
),
pre_final as (
select
account_address,
owner,
start_block_id,
lead(start_block_id) ignore nulls over (
PARTITION BY account_address
ORDER BY bucket
) as end_block_id,
_inserted_timestamp
from regroup
join last_updated_at
)
select *
from pre_final
where start_block_id <> end_block_id
or end_block_id is null

View File

@ -0,0 +1,28 @@
version: 2
models:
- name: silver__token_account_owners
tests:
- dbt_utils.unique_combination_of_columns:
combination_of_columns:
- ACCOUNT_ADDRESS
- OWNER
- START_BLOCK_ID
columns:
- name: ACCOUNT_ADDRESS
description: address of token account
tests:
- not_null
- name: OWNER
description: address of owner
tests:
- not_null
- name: START_BLOCK_ID
description: block where this ownership begins
tests:
- not_null
- name: END_BLOCK_ID
description: block where this ownership ends, null value represents current ownership
- name: _INSERTED_TIMESTAMP
description: "{{ doc('_inserted_timestamp') }}"
tests:
- not_null

View File

@ -0,0 +1,81 @@
{{ config(
materialized = 'incremental',
unique_key = ["account_address","owner","start_block_id"],
cluster_by = ['_inserted_timestamp::DATE'],
) }}
/*
for incrementals also select all null end date accounts and combine
join to eliminate accounts that are not in the subset
remove all accounts that have the same owner + start block + end block
*/
with last_updated_at as (
select max(_inserted_timestamp) as _inserted_timestamp
from {{ ref('silver__token_account_ownership_events') }}
),
base as (
select
account_address,
owner,
block_id,
case
when event_type in ('create','createIdempotent','createAccount','createAccountWithSeed') then
0
when event_type in ('initializeAccount','initializeAccount2','initializeAccount3') then
1
else 2
end as same_block_order_index
from {{ ref('silver__token_account_ownership_events') }}
/* incremental condition here */
{% if is_incremental() %}
where _inserted_timestamp >= (select max(_inserted_timestamp) from {{ this }})
{% endif %}
),
{% if is_incremental() %}
current_ownership as (
select
t.account_address,
t.owner,
t.start_block_id as block_id
from {{ this }} t
join (select distinct account_address from base) b on b.account_address = t.account_address
where t.end_block_id is null
union
select
*
from base
),
bucketed as (
select
*,
conditional_change_event(owner) over (partition by account_address order by block_id, same_block_order_index) as bucket
from current_ownership
),
{% else %}
bucketed as (
select
*,
conditional_change_event(owner) over (partition by account_address order by block_id, same_block_order_index) as bucket
from base
),
{% endif %}
c as (
select
account_address,
owner,
bucket,
min(block_id) as start_block_id
from bucketed
group by 1,2,3
)
select
account_address,
owner,
start_block_id,
lead(start_block_id) ignore nulls over (
PARTITION BY account_address
ORDER BY bucket
) as end_block_id,
_inserted_timestamp
from c
join last_updated_at

View File

@ -0,0 +1,22 @@
version: 2
models:
- name: silver__token_account_owners_intermediate
columns:
- name: ACCOUNT_ADDRESS
description: address of token account
tests:
- not_null
- name: OWNER
description: address of owner
tests:
- not_null
- name: START_BLOCK_ID
description: block where this ownership begins
tests:
- not_null
- name: END_BLOCK_ID
description: block where this ownership ends, null value represents current ownership
- name: _INSERTED_TIMESTAMP
description: "{{ doc('_inserted_timestamp') }}"
tests:
- not_null

View File

@ -0,0 +1,178 @@
{{ config(
materialized = 'incremental',
unique_key = ["block_id","tx_id","index","inner_index"],
incremental_predicates = ['DBT_INTERNAL_DEST.block_timestamp::date >= LEAST(current_date-7,(select min(block_timestamp)::date from ' ~ generate_tmp_view_name(this) ~ '))'],
cluster_by = ['block_timestamp::DATE','_inserted_timestamp::DATE'],
full_refresh = false
) }}
with base_events as (
select *
from {{ ref('silver__events')}}
where succeeded
{% if is_incremental() %}
{% if execute %}
{{ get_batch_load_logic(this,15,'2023-02-05') }}
{% endif %}
{% else %}
and _inserted_timestamp::date between '2022-08-12' and '2022-09-01'
{% endif %}
),
ownership_change_events as (
select
block_timestamp,
block_id,
tx_id,
succeeded,
index,
-1 as inner_index,
event_type,
instruction,
_inserted_timestamp
from base_events
where event_type in ('assign','assignWithSeed','close','closeAccount','create','createAccount','createAccountWithSeed','createIdempotent',
'initializeAccount','initializeAccount2','initializeAccount3','revoke','setAuthority')
union
select
block_timestamp,
block_id,
tx_id,
succeeded,
e.index,
ii.index as inner_index,
ii.value :parsed :type,
ii.value as instruction,
_inserted_timestamp
from base_events e,
TABLE(FLATTEN(e.inner_instruction :instructions)) ii
WHERE
ii.value :parsed :type :: STRING IN ('assign','assignWithSeed','close','closeAccount','create','createAccount','createAccountWithSeed','createIdempotent',
'initializeAccount','initializeAccount2','initializeAccount3','revoke','setAuthority')
),
combined as (
select
block_timestamp,
block_id,
tx_id,
succeeded,
index,
inner_index,
event_type,
instruction:parsed:info:account::string as account_address,
instruction:parsed:info:owner::string as owner,
instruction:parsed:info:mint::string as mint,
_inserted_timestamp
from ownership_change_events
where event_type in ('assign','assignWithSeed')
union
select
block_timestamp,
block_id,
tx_id,
succeeded,
index,
inner_index,
event_type,
instruction:parsed:info:account::string as account_address,
instruction:parsed:info:owner::string as owner,
null as mint,
_inserted_timestamp
from ownership_change_events
where event_type in ('initializeAccount','initializeAccount2','initializeAccount3')
union
select
block_timestamp,
block_id,
tx_id,
succeeded,
index,
inner_index,
event_type,
instruction:parsed:info:account::string as account_address,
instruction:parsed:info:authority::string as owner,
null as mint,
_inserted_timestamp
from ownership_change_events
where event_type in ('close')
union
select
block_timestamp,
block_id,
tx_id,
succeeded,
index,
inner_index,
event_type,
instruction:parsed:info:account::string as account_address,
instruction:parsed:info:wallet::string as owner,
instruction:parsed:info:mint::string as mint,
_inserted_timestamp
from ownership_change_events
where event_type in ('create','createIdempotent')
union
select
block_timestamp,
block_id,
tx_id,
succeeded,
index,
inner_index,
event_type,
instruction:parsed:info:newAccount::string as account_address,
instruction:parsed:info:owner::string as owner,
null as mint,
_inserted_timestamp
from ownership_change_events
where event_type in ('createAccount','createAccountWithSeed')
union
select
block_timestamp,
block_id,
tx_id,
succeeded,
index,
inner_index,
event_type,
instruction:parsed:info:source::string as account_address,
coalesce(instruction:parsed:info:owner::string,instruction:parsed:info:multisigOwner::string) as owner,
null as mint,
_inserted_timestamp
from ownership_change_events
where event_type in ('revoke')
union
select
block_timestamp,
block_id,
tx_id,
succeeded,
index,
inner_index,
event_type,
instruction:parsed:info:account::string as account_address,
instruction:parsed:info:newAuthority::string as owner,
null as mint,
_inserted_timestamp
from ownership_change_events
where event_type in ('setAuthority')
and (instruction:parsed:info:authorityType::string is null
or instruction:parsed:info:authorityType::string = 'accountOwner')
and owner is not null /* some events have an invalid new authority object even though tx is successful, ex: 4oHAf4fmEFmdiYG6Rchh4FoMH4de97iwnZqHEYrvQ5oo3UgwumPxkkkX6KAWCwmk4e5GzsHXqFQYVa2VyoQUYyyD */
union
select
block_timestamp,
block_id,
tx_id,
succeeded,
index,
inner_index,
event_type,
instruction:parsed:info:account::string as account_address,
coalesce(instruction:parsed:info:owner::string,instruction:parsed:info:multisigOwner::string) as owner,
null as mint,
_inserted_timestamp
from ownership_change_events
where event_type in ('closeAccount')
)
select *
from combined
qualify(row_number() over (partition by tx_id, account_address order by index desc, inner_index desc)) = 1

View File

@ -0,0 +1,58 @@
version: 2
models:
- name: silver__token_account_ownership_events
tests:
- dbt_utils.unique_combination_of_columns:
combination_of_columns:
- BLOCK_ID
- TX_ID
- INDEX
- INNER_INDEX
columns:
- name: BLOCK_TIMESTAMP
description: "{{ doc('block_timestamp') }}"
tests:
- not_null:
where: block_id > 39824213
- dbt_expectations.expect_row_values_to_have_recent_data:
datepart: day
interval: 2
- name: BLOCK_ID
description: "{{ doc('block_id') }}"
tests:
- not_null
- name: TX_ID
description: "{{ doc('tx_id') }}"
tests:
- not_null
- name: SUCCEEDED
description: "{{ doc('tx_succeeded') }}"
tests:
- not_null
- name: INDEX
description: Location of the instruction within a transaction
tests:
- not_null
- name: INNER_INDEX
description: Specifies the inner_instruction location within the instruction identified by INDEX
- name: EVENT_TYPE
description: type of account change ownership event
tests:
- not_null
- name: ACCOUNT_ADDRESS
description: address of token account
tests:
- not_null
- name: OWNER
description: address of new owner
tests:
- not_null
- name: MINT
description: mint the account is holding
tests:
- not_null:
where: event_type in ('create','createIdempotent','initializeAccount','initializeAccount2','initializeAccount3')
- name: _INSERTED_TIMESTAMP
description: "{{ doc('_inserted_timestamp') }}"
tests:
- not_null