account storage (#490)
Some checks are pending
docs_update / run_dbt_jobs (push) Waiting to run
docs_update / notify-failure (push) Blocked by required conditions

This commit is contained in:
Austin 2025-10-02 22:39:19 -04:00 committed by GitHub
parent 7549b06024
commit 0e91031c85
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
14 changed files with 460 additions and 17 deletions

1
.gitignore vendored
View File

@ -23,3 +23,4 @@ local*
venv/
.cursor/
.cursorrules
package-lock.yml

View File

@ -5,7 +5,7 @@ CREATE
OR REPLACE FUNCTION {{ target.database }}.streamline.udf_decode_hash_array(raw_array ARRAY)
RETURNS STRING
LANGUAGE PYTHON
RUNTIME_VERSION = '3.8'
RUNTIME_VERSION = '3.9'
HANDLER = 'decode_hash_array'
AS
$$

View File

@ -0,0 +1,9 @@
{{ config (
materialized = 'view',
tags = ['streamline_realtime', 'account_storage']
) }}
{{ streamline_external_table_query_v2(
model = "account_storage",
partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 3), '_', 1) AS INTEGER )"
) }}

View File

@ -0,0 +1,9 @@
{{ config (
materialized = 'view',
tags = ['streamline_realtime', 'account_storage']
) }}
{{ streamline_external_table_FR_query_v2(
model = "account_storage",
partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 3), '_', 1) AS INTEGER )"
) }}

View File

@ -0,0 +1,30 @@
{{ config(
materialized = 'incremental',
cluster_by = ['block_date'],
unique_key = "fact_account_storage_id",
post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION on equality(fact_account_storage_id, account_address)",
tags = ['scheduled_non_core']
) }}
SELECT
block_height,
account_address,
block_date,
encoded_data,
decoded_data,
decoded_data:value[0]:value::NUMBER as storage_used,
decoded_data:value[1]:value::NUMBER as storage_capacity,
streamline_account_storage_id as fact_account_storage_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp
FROM
{{ ref('silver__account_storage') }}
WHERE 1=1
{% if is_incremental() %}
AND modified_timestamp > (
SELECT
COALESCE(MAX(modified_timestamp), '1970-01-01' :: TIMESTAMP) AS modified_timestamp
FROM
{{ this }}
)
{% endif %}

View File

@ -0,0 +1,67 @@
version: 2
models:
- name: core__fact_account_storage
description: This table contains account storage data as of the last block each day. Please note that the table will only update once a day.
tests:
- dbt_utils.recency:
datepart: day
field: block_date
interval: 2
columns:
- name: BLOCK_HEIGHT
description: The block height of the last block for the day at which storage was fetched.
tests:
- not_null
- name: ACCOUNT_ADDRESS
description: The address of the account for which storage was fetched.
tests:
- not_null
- name: BLOCK_DATE
description: The date for which storage was fetched at the last block of the day.
tests:
- not_null
- name: ENCODED_DATA
description: The encoded data of the account storage.
tests:
- not_null
- name: DECODED_DATA
description: The decoded data of the account storage.
tests:
- not_null
- name: STORAGE_USED
description: The storage used by the account.
tests:
- not_null
- name: STORAGE_CAPACITY
description: The storage capacity of the account.
tests:
- not_null
- name: FACT_ACCOUNT_STORAGE_ID
description: The surrogate key for the account storage.
tests:
- not_null
- name: INSERTED_TIMESTAMP
description: The timestamp of that the row was inserted.
tests:
- not_null
- dbt_expectations.expect_row_values_to_have_recent_data:
datepart: day
interval: 2
- name: MODIFIED_TIMESTAMP
description: The timestamp of that the row was last modified.
tests:
- not_null
- dbt_expectations.expect_row_values_to_have_recent_data:
datepart: day
interval: 2

View File

@ -0,0 +1,47 @@
-- depends_on: {{ ref('bronze__streamline_account_storage') }}
-- depends_on: {{ ref('bronze__streamline_fr_account_storage') }}
{{ config (
materialized = "incremental",
incremental_predicates = ["dynamic_range_predicate", "partition_key"],
unique_key = "streamline_account_storage_id",
cluster_by = "partition_key",
post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION on equality(streamline_account_storage_id)",
tags = ['streamline_load', 'core', 'scheduled_core']
) }}
SELECT
value:"BLOCK_HEIGHT"::INT AS block_height,
value:"ACCOUNT_ADDRESS"::STRING AS account_address,
value:"BLOCK_DATE"::string::Date as block_date,
data::string as encoded_data,
TRY_PARSE_JSON(BASE64_DECODE_STRING(encoded_data)) as decoded_data,
file_name,
partition_key,
_inserted_timestamp,
{{ dbt_utils.generate_surrogate_key(
['block_height', 'account_address']
) }} AS streamline_account_storage_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp,
'{{ invocation_id }}' AS _invocation_id
FROM
{% if is_incremental() %}
{{ ref('bronze__streamline_account_storage') }}
WHERE
_inserted_timestamp >= COALESCE(
(
SELECT
MAX(_inserted_timestamp) _inserted_timestamp
FROM
{{ this }}
),
'1900-01-01' :: timestamp_ntz
)
{% else %}
{{ ref('bronze__streamline_fr_account_storage') }}
{% endif %}
qualify(ROW_NUMBER() over (PARTITION BY streamline_account_storage_id
ORDER BY
_inserted_timestamp DESC)) = 1

View File

@ -0,0 +1,18 @@
version: 2
models:
- name: silver__account_storage
description: -|
Raw table for rest API account storage data.
columns:
- name: INSERTED_TIMESTAMP
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- TIMESTAMP_LTZ
- TIMESTAMP_NTZ
- dbt_expectations.expect_row_values_to_have_recent_data:
datepart: hour
interval: 2

View File

@ -133,6 +133,7 @@ sources:
- name: evm_testnet_blocks
- name: evm_testnet_receipts
- name: evm_testnet_traces
- name: account_storage
- name: crosschain_silver
database: crosschain

View File

@ -0,0 +1,44 @@
-- depends_on: {{ ref('bronze__streamline_account_storage') }}
-- depends_on: {{ ref('bronze__streamline_fr_account_storage') }}
{{ config (
materialized = "incremental",
incremental_predicates = ["dynamic_range_predicate", "partition_key"],
unique_key = "complete_account_storage_id",
cluster_by = "partition_key",
post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION on equality(complete_account_storage_id)",
tags = ['streamline_complete', 'account_storage']
) }}
SELECT
value:"BLOCK_HEIGHT"::INT AS block_height,
value:"ACCOUNT_ADDRESS"::STRING AS account_address,
file_name,
partition_key,
_inserted_timestamp,
{{ dbt_utils.generate_surrogate_key(
['block_height', 'account_address']
) }} AS complete_account_storage_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp,
'{{ invocation_id }}' AS _invocation_id
FROM
{% if is_incremental() %}
{{ ref('bronze__streamline_account_storage') }}
WHERE
_inserted_timestamp >= COALESCE(
(
SELECT
MAX(_inserted_timestamp) _inserted_timestamp
FROM
{{ this }}
),
'1900-01-01' :: timestamp_ntz
)
{% else %}
{{ ref('bronze__streamline_fr_account_storage') }}
{% endif %}
qualify(ROW_NUMBER() over (PARTITION BY complete_account_storage_id
ORDER BY
_inserted_timestamp DESC)) = 1

View File

@ -0,0 +1,108 @@
{{ config (
materialized = "view",
post_hook = fsc_utils.if_data_call_function_v2(
func = 'streamline.udf_bulk_rest_api_v2',
target = "{{this.schema}}.{{this.identifier}}",
params ={ "external_table" :"account_storage",
"sql_limit" :"2000000",
"producer_batch_size" :"50000",
"worker_batch_size" :"10000",
"async_concurrent_requests" :"10",
"sql_source" :"{{this.identifier}}" }
),
tags = ['streamline_history', 'account_storage']
) }}
with all_event_contracts as (
select
min(block_height) as events_start,
event_contract
from {{ ref('core__fact_events') }}
where tx_succeeded
group by all
),
relevant_block_heights as (
select
block_timestamp::date as block_date,
max(block_height) as block_height
from {{ ref('core__fact_blocks') }}
where block_timestamp >= '2025-01-01'
and block_timestamp::date <> (select max(block_timestamp::date) from {{ ref('core__fact_blocks') }})
group by all
),
event_contract_days as (
select
event_contract,
block_date,
block_height
from all_event_contracts a
join relevant_block_heights b
on a.events_start <= b.block_height
),
account_addresses as (
select
event_contract,
block_date,
block_height,
account_address
from event_contract_days
join {{ ref('core__dim_contract_labels') }}
using (event_contract)
),
distinct_targets as (
select
block_date,
block_height,
account_address,
event_contract
from account_addresses
qualify(ROW_NUMBER() over (PARTITION BY block_height, account_address ORDER BY block_date DESC)) = 1
),
create_requests as (
select
block_date,
block_height,
account_address,
event_contract,
concat('/v1/scripts?block_height=', block_height) as endpoint,
OBJECT_CONSTRUCT(
'script', BASE64_ENCODE('access(all) fun main(addr: Address): [UInt64] { let account = getAccount(addr); return [account.storage.used, account.storage.capacity] }'),
'arguments', ARRAY_CONSTRUCT(
BASE64_ENCODE('{"type":"Address","value":"' || account_address || '"}')
)
) as request_data
from distinct_targets
),
to_do as (
select
block_date,
block_height,
account_address,
event_contract,
endpoint,
request_data
from create_requests a
left join {{ ref('streamline__complete_get_account_storage') }} b
using (block_height, account_address)
where b.complete_account_storage_id is null
)
select
block_date::VARCHAR as block_date,
round(block_height, -3) :: INT AS partition_key,
block_height,
account_address,
event_contract,
{{ target.database }}.live.udf_api(
'POST',
'{Service}/{Authentication}' || endpoint,
OBJECT_CONSTRUCT(
'Content-Type', 'application/json',
'Accept', 'application/json'
),
request_data,
'Vault/prod/flow/quicknode/mainnet'
) as request
from to_do
order by block_date desc
limit 2000000

View File

@ -0,0 +1,109 @@
{{ config (
materialized = "view",
post_hook = fsc_utils.if_data_call_function_v2(
func = 'streamline.udf_bulk_rest_api_v2',
target = "{{this.schema}}.{{this.identifier}}",
params ={ "external_table" :"account_storage",
"sql_limit" :"100000",
"producer_batch_size" :"50000",
"worker_batch_size" :"10000",
"async_concurrent_requests" :"10",
"sql_source" :"{{this.identifier}}" }
),
tags = ['streamline_realtime', 'account_storage']
) }}
with all_event_contracts as (
select
min(block_height) as events_start,
event_contract
from {{ ref('core__fact_events') }}
where tx_succeeded
group by all
),
relevant_block_heights as (
select
block_timestamp::date as block_date,
max(block_height) as block_height
from {{ ref('core__fact_blocks') }}
where block_timestamp >= '2025-01-01'
and block_timestamp::date <> (select max(block_timestamp::date) from {{ ref('core__fact_blocks') }})
and block_timestamp::date >= DATEADD('day', -4, SYSDATE())
group by all
),
event_contract_days as (
select
event_contract,
block_date,
block_height
from all_event_contracts a
join relevant_block_heights b
on a.events_start <= b.block_height
),
account_addresses as (
select
event_contract,
block_date,
block_height,
account_address
from event_contract_days
join {{ ref('core__dim_contract_labels') }}
using (event_contract)
),
distinct_targets as (
select
block_date,
block_height,
account_address,
event_contract
from account_addresses
qualify(ROW_NUMBER() over (PARTITION BY block_height, account_address ORDER BY block_date DESC)) = 1
),
create_requests as (
select
block_date,
block_height,
account_address,
event_contract,
concat('/v1/scripts?block_height=', block_height) as endpoint,
OBJECT_CONSTRUCT(
'script', BASE64_ENCODE('access(all) fun main(addr: Address): [UInt64] { let account = getAccount(addr); return [account.storage.used, account.storage.capacity] }'),
'arguments', ARRAY_CONSTRUCT(
BASE64_ENCODE('{"type":"Address","value":"' || account_address || '"}')
)
) as request_data
from distinct_targets
),
to_do as (
select
block_date,
block_height,
account_address,
event_contract,
endpoint,
request_data
from create_requests a
left join {{ ref('streamline__complete_get_account_storage') }} b
using (block_height, account_address)
where b.complete_account_storage_id is null
)
select
block_date::VARCHAR as block_date,
round(block_height, -3) :: INT AS partition_key,
block_height,
account_address,
event_contract,
{{ target.database }}.live.udf_api(
'POST',
'{Service}/{Authentication}' || endpoint,
OBJECT_CONSTRUCT(
'Content-Type', 'application/json',
'Accept', 'application/json'
),
request_data,
'Vault/prod/flow/quicknode/mainnet'
) as request
from to_do
order by block_date desc
limit 100000

View File

@ -6,11 +6,11 @@ packages:
- package: dbt-labs/dbt_utils
version: 1.0.0
- git: https://github.com/FlipsideCrypto/fsc-utils.git
revision: 87e00eb90acddcc7a34aa8e67e3b3bac86b262e6
revision: 6415fc4873bb609ca8de2a9f612e8462bb144caf
- package: get-select/dbt_snowflake_query_tags
version: 2.5.0
version: 2.5.3
- package: calogica/dbt_date
version: 0.7.2
- git: https://github.com/FlipsideCrypto/livequery-models.git
revision: 2651a45b7e123f7bd421bcc0e7e2a7bcbaf7652f
sha1_hash: a1cc3545d7ef13fcf5b3908a9e888b4421018792
revision: 430bc25db1cf236da3ea2df14b0844d3d156fc59
sha1_hash: 2e5c67859a1a4bba5619c3bfb4deda06f050608b

View File

@ -6,6 +6,6 @@ packages:
- package: dbt-labs/dbt_utils
version: 1.0.0
- git: https://github.com/FlipsideCrypto/fsc-utils.git
revision: v1.35.1
revision: v1.39.1
- package: get-select/dbt_snowflake_query_tags
version: [">=2.0.0", "<3.0.0"]