From 0e91031c85e0b6362f9f8f5439ac1f6b0c4852da Mon Sep 17 00:00:00 2001 From: Austin <93135983+austinFlipside@users.noreply.github.com> Date: Thu, 2 Oct 2025 22:39:19 -0400 Subject: [PATCH] account storage (#490) --- .gitignore | 1 + macros/evm/udf_decode_hash_array.sql | 2 +- .../bronze__streamline_account_storage.sql | 9 ++ .../bronze__streamline_fr_account_storage.sql | 9 ++ .../gold/core/core__fact_account_storage.sql | 30 +++++ .../gold/core/core__fact_account_storage.yml | 67 +++++++++++ .../silver/core/silver__account_storage.sql | 47 ++++++++ .../silver/core/silver__account_storage.yml | 18 +++ models/sources.yml | 1 + ...reamline__complete_get_account_storage.sql | 44 +++++++ ...treamline__get_account_storage_history.sql | 108 +++++++++++++++++ ...reamline__get_account_storage_realtime.sql | 109 ++++++++++++++++++ package-lock.yml | 30 ++--- packages.yml | 2 +- 14 files changed, 460 insertions(+), 17 deletions(-) create mode 100644 models/bronze/streamline/realtime/bronze__streamline_account_storage.sql create mode 100644 models/bronze/streamline/realtime/bronze__streamline_fr_account_storage.sql create mode 100644 models/gold/core/core__fact_account_storage.sql create mode 100644 models/gold/core/core__fact_account_storage.yml create mode 100644 models/silver/core/silver__account_storage.sql create mode 100644 models/silver/core/silver__account_storage.yml create mode 100644 models/streamline/core/complete/streamline__complete_get_account_storage.sql create mode 100644 models/streamline/core/history/account/streamline__get_account_storage_history.sql create mode 100644 models/streamline/core/realtime/streamline__get_account_storage_realtime.sql diff --git a/.gitignore b/.gitignore index 4e6a53f..4b4a0c5 100644 --- a/.gitignore +++ b/.gitignore @@ -23,3 +23,4 @@ local* venv/ .cursor/ .cursorrules +package-lock.yml diff --git a/macros/evm/udf_decode_hash_array.sql b/macros/evm/udf_decode_hash_array.sql index ab834f0..55a2658 100644 --- a/macros/evm/udf_decode_hash_array.sql +++ b/macros/evm/udf_decode_hash_array.sql @@ -5,7 +5,7 @@ CREATE OR REPLACE FUNCTION {{ target.database }}.streamline.udf_decode_hash_array(raw_array ARRAY) RETURNS STRING LANGUAGE PYTHON -RUNTIME_VERSION = '3.8' +RUNTIME_VERSION = '3.9' HANDLER = 'decode_hash_array' AS $$ diff --git a/models/bronze/streamline/realtime/bronze__streamline_account_storage.sql b/models/bronze/streamline/realtime/bronze__streamline_account_storage.sql new file mode 100644 index 0000000..971a1b0 --- /dev/null +++ b/models/bronze/streamline/realtime/bronze__streamline_account_storage.sql @@ -0,0 +1,9 @@ +{{ config ( + materialized = 'view', + tags = ['streamline_realtime', 'account_storage'] +) }} + +{{ streamline_external_table_query_v2( + model = "account_storage", + partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 3), '_', 1) AS INTEGER )" +) }} diff --git a/models/bronze/streamline/realtime/bronze__streamline_fr_account_storage.sql b/models/bronze/streamline/realtime/bronze__streamline_fr_account_storage.sql new file mode 100644 index 0000000..e55536e --- /dev/null +++ b/models/bronze/streamline/realtime/bronze__streamline_fr_account_storage.sql @@ -0,0 +1,9 @@ +{{ config ( + materialized = 'view', + tags = ['streamline_realtime', 'account_storage'] +) }} + +{{ streamline_external_table_FR_query_v2( + model = "account_storage", + partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 3), '_', 1) AS INTEGER )" +) }} diff --git a/models/gold/core/core__fact_account_storage.sql b/models/gold/core/core__fact_account_storage.sql new file mode 100644 index 0000000..fab3d1d --- /dev/null +++ b/models/gold/core/core__fact_account_storage.sql @@ -0,0 +1,30 @@ +{{ config( + materialized = 'incremental', + cluster_by = ['block_date'], + unique_key = "fact_account_storage_id", + post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION on equality(fact_account_storage_id, account_address)", + tags = ['scheduled_non_core'] +) }} + +SELECT + block_height, + account_address, + block_date, + encoded_data, + decoded_data, + decoded_data:value[0]:value::NUMBER as storage_used, + decoded_data:value[1]:value::NUMBER as storage_capacity, + streamline_account_storage_id as fact_account_storage_id, + SYSDATE() AS inserted_timestamp, + SYSDATE() AS modified_timestamp +FROM + {{ ref('silver__account_storage') }} +WHERE 1=1 +{% if is_incremental() %} + AND modified_timestamp > ( + SELECT + COALESCE(MAX(modified_timestamp), '1970-01-01' :: TIMESTAMP) AS modified_timestamp + FROM + {{ this }} + ) +{% endif %} diff --git a/models/gold/core/core__fact_account_storage.yml b/models/gold/core/core__fact_account_storage.yml new file mode 100644 index 0000000..f455c61 --- /dev/null +++ b/models/gold/core/core__fact_account_storage.yml @@ -0,0 +1,67 @@ +version: 2 + +models: + - name: core__fact_account_storage + description: This table contains account storage data as of the last block each day. Please note that the table will only update once a day. + tests: + - dbt_utils.recency: + datepart: day + field: block_date + interval: 2 + + columns: + - name: BLOCK_HEIGHT + description: The block height of the last block for the day at which storage was fetched. + tests: + - not_null + + - name: ACCOUNT_ADDRESS + description: The address of the account for which storage was fetched. + tests: + - not_null + + - name: BLOCK_DATE + description: The date for which storage was fetched at the last block of the day. + tests: + - not_null + + - name: ENCODED_DATA + description: The encoded data of the account storage. + tests: + - not_null + + - name: DECODED_DATA + description: The decoded data of the account storage. + tests: + - not_null + + - name: STORAGE_USED + description: The storage used by the account. + tests: + - not_null + + - name: STORAGE_CAPACITY + description: The storage capacity of the account. + tests: + - not_null + + - name: FACT_ACCOUNT_STORAGE_ID + description: The surrogate key for the account storage. + tests: + - not_null + + - name: INSERTED_TIMESTAMP + description: The timestamp of that the row was inserted. + tests: + - not_null + - dbt_expectations.expect_row_values_to_have_recent_data: + datepart: day + interval: 2 + + - name: MODIFIED_TIMESTAMP + description: The timestamp of that the row was last modified. + tests: + - not_null + - dbt_expectations.expect_row_values_to_have_recent_data: + datepart: day + interval: 2 \ No newline at end of file diff --git a/models/silver/core/silver__account_storage.sql b/models/silver/core/silver__account_storage.sql new file mode 100644 index 0000000..279e13b --- /dev/null +++ b/models/silver/core/silver__account_storage.sql @@ -0,0 +1,47 @@ +-- depends_on: {{ ref('bronze__streamline_account_storage') }} +-- depends_on: {{ ref('bronze__streamline_fr_account_storage') }} +{{ config ( + materialized = "incremental", + incremental_predicates = ["dynamic_range_predicate", "partition_key"], + unique_key = "streamline_account_storage_id", + cluster_by = "partition_key", + post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION on equality(streamline_account_storage_id)", + tags = ['streamline_load', 'core', 'scheduled_core'] +) }} + +SELECT + value:"BLOCK_HEIGHT"::INT AS block_height, + value:"ACCOUNT_ADDRESS"::STRING AS account_address, + value:"BLOCK_DATE"::string::Date as block_date, + data::string as encoded_data, + TRY_PARSE_JSON(BASE64_DECODE_STRING(encoded_data)) as decoded_data, + file_name, + partition_key, + _inserted_timestamp, + {{ dbt_utils.generate_surrogate_key( + ['block_height', 'account_address'] + ) }} AS streamline_account_storage_id, + SYSDATE() AS inserted_timestamp, + SYSDATE() AS modified_timestamp, + '{{ invocation_id }}' AS _invocation_id +FROM + +{% if is_incremental() %} +{{ ref('bronze__streamline_account_storage') }} +WHERE + _inserted_timestamp >= COALESCE( + ( + SELECT + MAX(_inserted_timestamp) _inserted_timestamp + FROM + {{ this }} + ), + '1900-01-01' :: timestamp_ntz + ) +{% else %} + {{ ref('bronze__streamline_fr_account_storage') }} +{% endif %} + +qualify(ROW_NUMBER() over (PARTITION BY streamline_account_storage_id +ORDER BY + _inserted_timestamp DESC)) = 1 diff --git a/models/silver/core/silver__account_storage.yml b/models/silver/core/silver__account_storage.yml new file mode 100644 index 0000000..7d8c782 --- /dev/null +++ b/models/silver/core/silver__account_storage.yml @@ -0,0 +1,18 @@ +version: 2 + +models: + - name: silver__account_storage + description: -| + Raw table for rest API account storage data. + + columns: + - name: INSERTED_TIMESTAMP + tests: + - not_null + - dbt_expectations.expect_column_values_to_be_in_type_list: + column_type_list: + - TIMESTAMP_LTZ + - TIMESTAMP_NTZ + - dbt_expectations.expect_row_values_to_have_recent_data: + datepart: hour + interval: 2 \ No newline at end of file diff --git a/models/sources.yml b/models/sources.yml index 90c8535..de97511 100644 --- a/models/sources.yml +++ b/models/sources.yml @@ -133,6 +133,7 @@ sources: - name: evm_testnet_blocks - name: evm_testnet_receipts - name: evm_testnet_traces + - name: account_storage - name: crosschain_silver database: crosschain diff --git a/models/streamline/core/complete/streamline__complete_get_account_storage.sql b/models/streamline/core/complete/streamline__complete_get_account_storage.sql new file mode 100644 index 0000000..eb1ffb6 --- /dev/null +++ b/models/streamline/core/complete/streamline__complete_get_account_storage.sql @@ -0,0 +1,44 @@ +-- depends_on: {{ ref('bronze__streamline_account_storage') }} +-- depends_on: {{ ref('bronze__streamline_fr_account_storage') }} +{{ config ( + materialized = "incremental", + incremental_predicates = ["dynamic_range_predicate", "partition_key"], + unique_key = "complete_account_storage_id", + cluster_by = "partition_key", + post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION on equality(complete_account_storage_id)", + tags = ['streamline_complete', 'account_storage'] +) }} + +SELECT + value:"BLOCK_HEIGHT"::INT AS block_height, + value:"ACCOUNT_ADDRESS"::STRING AS account_address, + file_name, + partition_key, + _inserted_timestamp, + {{ dbt_utils.generate_surrogate_key( + ['block_height', 'account_address'] + ) }} AS complete_account_storage_id, + SYSDATE() AS inserted_timestamp, + SYSDATE() AS modified_timestamp, + '{{ invocation_id }}' AS _invocation_id +FROM + +{% if is_incremental() %} +{{ ref('bronze__streamline_account_storage') }} +WHERE + _inserted_timestamp >= COALESCE( + ( + SELECT + MAX(_inserted_timestamp) _inserted_timestamp + FROM + {{ this }} + ), + '1900-01-01' :: timestamp_ntz + ) +{% else %} + {{ ref('bronze__streamline_fr_account_storage') }} +{% endif %} + +qualify(ROW_NUMBER() over (PARTITION BY complete_account_storage_id +ORDER BY + _inserted_timestamp DESC)) = 1 diff --git a/models/streamline/core/history/account/streamline__get_account_storage_history.sql b/models/streamline/core/history/account/streamline__get_account_storage_history.sql new file mode 100644 index 0000000..e69ba6a --- /dev/null +++ b/models/streamline/core/history/account/streamline__get_account_storage_history.sql @@ -0,0 +1,108 @@ +{{ config ( + materialized = "view", + post_hook = fsc_utils.if_data_call_function_v2( + func = 'streamline.udf_bulk_rest_api_v2', + target = "{{this.schema}}.{{this.identifier}}", + params ={ "external_table" :"account_storage", + "sql_limit" :"2000000", + "producer_batch_size" :"50000", + "worker_batch_size" :"10000", + "async_concurrent_requests" :"10", + "sql_source" :"{{this.identifier}}" } + ), + tags = ['streamline_history', 'account_storage'] +) }} + +with all_event_contracts as ( + select + min(block_height) as events_start, + event_contract + from {{ ref('core__fact_events') }} + where tx_succeeded + group by all +), +relevant_block_heights as ( + select + block_timestamp::date as block_date, + max(block_height) as block_height + from {{ ref('core__fact_blocks') }} + where block_timestamp >= '2025-01-01' + and block_timestamp::date <> (select max(block_timestamp::date) from {{ ref('core__fact_blocks') }}) + group by all +), +event_contract_days as ( + select + event_contract, + block_date, + block_height + from all_event_contracts a + join relevant_block_heights b + on a.events_start <= b.block_height +), +account_addresses as ( + select + event_contract, + block_date, + block_height, + account_address + from event_contract_days + join {{ ref('core__dim_contract_labels') }} + using (event_contract) +), +distinct_targets as ( + select + block_date, + block_height, + account_address, + event_contract + from account_addresses + qualify(ROW_NUMBER() over (PARTITION BY block_height, account_address ORDER BY block_date DESC)) = 1 +), +create_requests as ( + select + block_date, + block_height, + account_address, + event_contract, + concat('/v1/scripts?block_height=', block_height) as endpoint, + OBJECT_CONSTRUCT( + 'script', BASE64_ENCODE('access(all) fun main(addr: Address): [UInt64] { let account = getAccount(addr); return [account.storage.used, account.storage.capacity] }'), + 'arguments', ARRAY_CONSTRUCT( + BASE64_ENCODE('{"type":"Address","value":"' || account_address || '"}') + ) + ) as request_data + from distinct_targets +), +to_do as ( + select + block_date, + block_height, + account_address, + event_contract, + endpoint, + request_data + from create_requests a + left join {{ ref('streamline__complete_get_account_storage') }} b + using (block_height, account_address) + where b.complete_account_storage_id is null +) + +select + block_date::VARCHAR as block_date, + round(block_height, -3) :: INT AS partition_key, + block_height, + account_address, + event_contract, + {{ target.database }}.live.udf_api( + 'POST', + '{Service}/{Authentication}' || endpoint, + OBJECT_CONSTRUCT( + 'Content-Type', 'application/json', + 'Accept', 'application/json' + ), + request_data, + 'Vault/prod/flow/quicknode/mainnet' + ) as request +from to_do +order by block_date desc +limit 2000000 \ No newline at end of file diff --git a/models/streamline/core/realtime/streamline__get_account_storage_realtime.sql b/models/streamline/core/realtime/streamline__get_account_storage_realtime.sql new file mode 100644 index 0000000..86ca8e6 --- /dev/null +++ b/models/streamline/core/realtime/streamline__get_account_storage_realtime.sql @@ -0,0 +1,109 @@ +{{ config ( + materialized = "view", + post_hook = fsc_utils.if_data_call_function_v2( + func = 'streamline.udf_bulk_rest_api_v2', + target = "{{this.schema}}.{{this.identifier}}", + params ={ "external_table" :"account_storage", + "sql_limit" :"100000", + "producer_batch_size" :"50000", + "worker_batch_size" :"10000", + "async_concurrent_requests" :"10", + "sql_source" :"{{this.identifier}}" } + ), + tags = ['streamline_realtime', 'account_storage'] +) }} + +with all_event_contracts as ( + select + min(block_height) as events_start, + event_contract + from {{ ref('core__fact_events') }} + where tx_succeeded + group by all +), +relevant_block_heights as ( + select + block_timestamp::date as block_date, + max(block_height) as block_height + from {{ ref('core__fact_blocks') }} + where block_timestamp >= '2025-01-01' + and block_timestamp::date <> (select max(block_timestamp::date) from {{ ref('core__fact_blocks') }}) + and block_timestamp::date >= DATEADD('day', -4, SYSDATE()) + group by all +), +event_contract_days as ( + select + event_contract, + block_date, + block_height + from all_event_contracts a + join relevant_block_heights b + on a.events_start <= b.block_height +), +account_addresses as ( + select + event_contract, + block_date, + block_height, + account_address + from event_contract_days + join {{ ref('core__dim_contract_labels') }} + using (event_contract) +), +distinct_targets as ( + select + block_date, + block_height, + account_address, + event_contract + from account_addresses + qualify(ROW_NUMBER() over (PARTITION BY block_height, account_address ORDER BY block_date DESC)) = 1 +), +create_requests as ( + select + block_date, + block_height, + account_address, + event_contract, + concat('/v1/scripts?block_height=', block_height) as endpoint, + OBJECT_CONSTRUCT( + 'script', BASE64_ENCODE('access(all) fun main(addr: Address): [UInt64] { let account = getAccount(addr); return [account.storage.used, account.storage.capacity] }'), + 'arguments', ARRAY_CONSTRUCT( + BASE64_ENCODE('{"type":"Address","value":"' || account_address || '"}') + ) + ) as request_data + from distinct_targets +), +to_do as ( + select + block_date, + block_height, + account_address, + event_contract, + endpoint, + request_data + from create_requests a + left join {{ ref('streamline__complete_get_account_storage') }} b + using (block_height, account_address) + where b.complete_account_storage_id is null +) + +select + block_date::VARCHAR as block_date, + round(block_height, -3) :: INT AS partition_key, + block_height, + account_address, + event_contract, + {{ target.database }}.live.udf_api( + 'POST', + '{Service}/{Authentication}' || endpoint, + OBJECT_CONSTRUCT( + 'Content-Type', 'application/json', + 'Accept', 'application/json' + ), + request_data, + 'Vault/prod/flow/quicknode/mainnet' + ) as request +from to_do +order by block_date desc +limit 100000 \ No newline at end of file diff --git a/package-lock.yml b/package-lock.yml index 68f92db..d512789 100644 --- a/package-lock.yml +++ b/package-lock.yml @@ -1,16 +1,16 @@ packages: - - package: calogica/dbt_expectations - version: 0.8.2 - - package: dbt-labs/dbt_external_tables - version: 0.8.2 - - package: dbt-labs/dbt_utils - version: 1.0.0 - - git: https://github.com/FlipsideCrypto/fsc-utils.git - revision: 87e00eb90acddcc7a34aa8e67e3b3bac86b262e6 - - package: get-select/dbt_snowflake_query_tags - version: 2.5.0 - - package: calogica/dbt_date - version: 0.7.2 - - git: https://github.com/FlipsideCrypto/livequery-models.git - revision: 2651a45b7e123f7bd421bcc0e7e2a7bcbaf7652f -sha1_hash: a1cc3545d7ef13fcf5b3908a9e888b4421018792 +- package: calogica/dbt_expectations + version: 0.8.2 +- package: dbt-labs/dbt_external_tables + version: 0.8.2 +- package: dbt-labs/dbt_utils + version: 1.0.0 +- git: https://github.com/FlipsideCrypto/fsc-utils.git + revision: 6415fc4873bb609ca8de2a9f612e8462bb144caf +- package: get-select/dbt_snowflake_query_tags + version: 2.5.3 +- package: calogica/dbt_date + version: 0.7.2 +- git: https://github.com/FlipsideCrypto/livequery-models.git + revision: 430bc25db1cf236da3ea2df14b0844d3d156fc59 +sha1_hash: 2e5c67859a1a4bba5619c3bfb4deda06f050608b diff --git a/packages.yml b/packages.yml index 1b272c8..f3c7012 100644 --- a/packages.yml +++ b/packages.yml @@ -6,6 +6,6 @@ packages: - package: dbt-labs/dbt_utils version: 1.0.0 - git: https://github.com/FlipsideCrypto/fsc-utils.git - revision: v1.35.1 + revision: v1.39.1 - package: get-select/dbt_snowflake_query_tags version: [">=2.0.0", "<3.0.0"]