diff --git a/dbt_project.yml b/dbt_project.yml index edbc7186..45ade5be 100644 --- a/dbt_project.yml +++ b/dbt_project.yml @@ -34,6 +34,7 @@ on-run-start: - '{{ sp_create_bulk_get_validator_metadata() }}' - '{{ sp_create_bulk_get_stake_account_tx_ids() }}' - '{{ sp_create_bulk_get_txs() }}' + - '{{ sp_refresh_external_table_batch() }}' # Configuring models diff --git a/macros/create_udfs.sql b/macros/create_udfs.sql index fa8f9771..2e5ecc91 100644 --- a/macros/create_udfs.sql +++ b/macros/create_udfs.sql @@ -4,6 +4,9 @@ {{ udf_bulk_get_validator_metadata() }}; {{ udf_bulk_get_stake_account_tx_ids() }}; {{ udf_bulk_get_txs() }}; + {{ udf_bulk_get_blocks() }}; + {{ udf_bulk_get_block_txs() }}; + {{ udf_bulk_get_block_rewards() }}; {{ create_udf_ordered_signers( schema = "silver" ) }} diff --git a/macros/streamline/bulk_get_block_rewards/task_bulk_get_block_rewards_historical.sql b/macros/streamline/bulk_get_block_rewards/task_bulk_get_block_rewards_historical.sql new file mode 100644 index 00000000..6ad40f26 --- /dev/null +++ b/macros/streamline/bulk_get_block_rewards/task_bulk_get_block_rewards_historical.sql @@ -0,0 +1,55 @@ +{% macro task_bulk_get_block_rewards_historical() %} +{% set sql %} +execute immediate 'create or replace task streamline.bulk_get_block_rewards_historical + warehouse = dbt_cloud_solana + allow_overlapping_execution = false + schedule = \'USING CRON */3 * * * * UTC\' +as +BEGIN + call streamline.refresh_external_table_next_batch(\'block_rewards_api\',\'complete_block_rewards\'); + create or replace temporary table streamline.complete_block_rewards__dbt_tmp as + ( + select * + from ( + SELECT + block_id, + _partition_id + FROM + bronze.block_rewards_api AS s + WHERE + s.block_id IS NOT NULL + AND s._partition_id > ( + select + coalesce(max(_partition_id),0) + from + streamline.complete_block_rewards + ) + + group by 1,2 + ) + order by (_partition_id) + ); + merge into streamline.complete_block_rewards as DBT_INTERNAL_DEST + using streamline.complete_block_rewards__dbt_tmp as DBT_INTERNAL_SOURCE + on DBT_INTERNAL_SOURCE.block_id = DBT_INTERNAL_DEST.block_id + when matched then + update set + _partition_id = DBT_INTERNAL_SOURCE._partition_id + when not matched then + insert ("BLOCK_ID", "_PARTITION_ID") + values ("BLOCK_ID", "_PARTITION_ID"); + select streamline.udf_bulk_get_block_rewards() + where exists ( + select 1 + from streamline.all_unknown_block_rewards_historical + limit 1 + ); +END;' +{% endset %} +{% do run_query(sql) %} + +{% set sql %} +alter task streamline.bulk_get_block_rewards_historical resume; +{% endset %} +{% do run_query(sql) %} +{% endmacro %} \ No newline at end of file diff --git a/macros/streamline/bulk_get_block_rewards/udf_bulk_get_block_rewards.sql b/macros/streamline/bulk_get_block_rewards/udf_bulk_get_block_rewards.sql new file mode 100644 index 00000000..9d11aacf --- /dev/null +++ b/macros/streamline/bulk_get_block_rewards/udf_bulk_get_block_rewards.sql @@ -0,0 +1,8 @@ +{% macro udf_bulk_get_block_rewards() %} + CREATE + OR REPLACE EXTERNAL FUNCTION streamline.udf_bulk_get_block_rewards() returns text api_integration = aws_solana_api_dev AS {% if target.name == "prod" -%} + 'https://pj4rqb8z96.execute-api.us-east-1.amazonaws.com/prod/bulk_get_block_rewards' + {% else %} + 'https://11zlwk4fm3.execute-api.us-east-1.amazonaws.com/dev/bulk_get_block_rewards' + {%- endif %} +{% endmacro %} \ No newline at end of file diff --git a/macros/streamline/bulk_get_block_txs/task_bulk_get_block_txs_historical.sql b/macros/streamline/bulk_get_block_txs/task_bulk_get_block_txs_historical.sql new file mode 100644 index 00000000..130934fb --- /dev/null +++ b/macros/streamline/bulk_get_block_txs/task_bulk_get_block_txs_historical.sql @@ -0,0 +1,57 @@ +{% macro task_bulk_get_block_txs_historical() %} +{% set sql %} +execute immediate 'create or replace task streamline.bulk_get_block_txs_historical + warehouse = dbt_cloud_solana + allow_overlapping_execution = false + schedule = \'USING CRON */6 * * * * UTC\' +as +BEGIN + call streamline.refresh_external_table_next_batch(\'block_txs_api\',\'complete_block_txs\'); + create or replace temporary table streamline.complete_block_txs__dbt_tmp as + ( + select * + from ( + SELECT + block_id, + _partition_id + FROM + bronze.block_txs_api AS s + WHERE + s.block_id IS NOT NULL + + + AND s._partition_id > ( + select + max(_partition_id) + from + streamline.complete_block_txs + ) + group by 1,2 + ) + order by (_partition_id) + ); + + merge into streamline.complete_block_txs as DBT_INTERNAL_DEST + using streamline.complete_block_txs__dbt_tmp as DBT_INTERNAL_SOURCE + on DBT_INTERNAL_SOURCE.block_id = DBT_INTERNAL_DEST.block_id + when matched then + update set _partition_id = DBT_INTERNAL_SOURCE._partition_id + when not matched then + insert ("BLOCK_ID", "_PARTITION_ID") + values ("BLOCK_ID", "_PARTITION_ID"); + + select streamline.udf_bulk_get_block_txs() + where exists ( + select 1 + from streamline.all_unknown_block_txs_historical + limit 1 + ); +END;' +{% endset %} +{% do run_query(sql) %} + +{% set sql %} +alter task streamline.bulk_get_block_txs_historical resume; +{% endset %} +{% do run_query(sql) %} +{% endmacro %} \ No newline at end of file diff --git a/macros/streamline/bulk_get_block_txs/udf_bulk_get_block_txs.sql b/macros/streamline/bulk_get_block_txs/udf_bulk_get_block_txs.sql new file mode 100644 index 00000000..29a8d712 --- /dev/null +++ b/macros/streamline/bulk_get_block_txs/udf_bulk_get_block_txs.sql @@ -0,0 +1,8 @@ +{% macro udf_bulk_get_block_txs() %} + CREATE + OR REPLACE EXTERNAL FUNCTION streamline.udf_bulk_get_block_txs() returns text api_integration = aws_solana_api_dev AS {% if target.name == "prod" -%} + 'https://pj4rqb8z96.execute-api.us-east-1.amazonaws.com/prod/bulk_get_block_txs' + {% else %} + 'https://11zlwk4fm3.execute-api.us-east-1.amazonaws.com/dev/bulk_get_block_txs' + {%- endif %} +{% endmacro %} \ No newline at end of file diff --git a/macros/streamline/bulk_get_blocks/task_bulk_get_blocks_historical.sql b/macros/streamline/bulk_get_blocks/task_bulk_get_blocks_historical.sql new file mode 100644 index 00000000..7eaf226a --- /dev/null +++ b/macros/streamline/bulk_get_blocks/task_bulk_get_blocks_historical.sql @@ -0,0 +1,81 @@ +{% macro task_bulk_get_blocks_historical() %} +{% set sql %} +execute immediate 'create or replace task streamline.bulk_get_blocks_historical + warehouse = dbt_cloud_solana + allow_overlapping_execution = false + schedule = \'USING CRON */5 * * * * UTC\' +as +BEGIN + alter external table bronze.blocks_api refresh; + create or replace temporary table streamline.complete_blocks__dbt_tmp as + ( + select * + from ( + WITH meta AS ( + SELECT + registered_on, + file_name + FROM + TABLE( + information_schema.external_table_files( + table_name => \'bronze.blocks_api\' + ) + ) A + WHERE + registered_on >= ( + SELECT + COALESCE(MAX(_INSERTED_TIMESTAMP), \'1970-01-01\' :: DATE) max_INSERTED_TIMESTAMP + FROM + streamline.complete_blocks + ) + ) + SELECT + block_id, + _inserted_date, + m.registered_on as _inserted_timestamp + FROM + bronze.blocks_api AS s + JOIN meta m + ON m.file_name = metadata$filename + WHERE + s.block_id IS NOT NULL + + + AND s._inserted_date >= CURRENT_DATE + AND m.registered_on > ( + SELECT + max(_inserted_timestamp) + FROM + streamline.complete_blocks + ) + qualify(ROW_NUMBER() over (PARTITION BY block_id + ORDER BY + _inserted_date, _inserted_timestamp DESC)) = 1 + ) + order by (_inserted_date) + ); + merge into streamline.complete_blocks as DBT_INTERNAL_DEST + using streamline.complete_blocks__dbt_tmp as DBT_INTERNAL_SOURCE + on DBT_INTERNAL_SOURCE.block_id = DBT_INTERNAL_DEST.block_id + when matched then + update set + _inserted_date = DBT_INTERNAL_SOURCE._inserted_date, + _inserted_timestamp = DBT_INTERNAL_SOURCE._inserted_timestamp + when not matched then + insert ("BLOCK_ID", "_INSERTED_DATE", "_INSERTED_TIMESTAMP") + values ("BLOCK_ID", "_INSERTED_DATE", "_INSERTED_TIMESTAMP"); + select streamline.udf_bulk_get_blocks() + where exists ( + select 1 + from streamline.all_unknown_blocks_historical + limit 1 + ); +END;' +{% endset %} +{% do run_query(sql) %} + +{% set sql %} +alter task streamline.bulk_get_blocks_historical resume; +{% endset %} +{% do run_query(sql) %} +{% endmacro %} \ No newline at end of file diff --git a/macros/streamline/bulk_get_blocks/udf_bulk_get_blocks.sql b/macros/streamline/bulk_get_blocks/udf_bulk_get_blocks.sql new file mode 100644 index 00000000..4e648f8a --- /dev/null +++ b/macros/streamline/bulk_get_blocks/udf_bulk_get_blocks.sql @@ -0,0 +1,8 @@ +{% macro udf_bulk_get_blocks() %} + CREATE + OR REPLACE EXTERNAL FUNCTION streamline.udf_bulk_get_blocks() returns text api_integration = aws_solana_api_dev AS {% if target.name == "prod" -%} + 'https://pj4rqb8z96.execute-api.us-east-1.amazonaws.com/prod/bulk_get_blocks' + {% else %} + 'https://11zlwk4fm3.execute-api.us-east-1.amazonaws.com/dev/bulk_get_blocks' + {%- endif %} +{% endmacro %} \ No newline at end of file diff --git a/macros/streamline/sp_refresh_external_table_batch.sql b/macros/streamline/sp_refresh_external_table_batch.sql new file mode 100644 index 00000000..2cfa5e49 --- /dev/null +++ b/macros/streamline/sp_refresh_external_table_batch.sql @@ -0,0 +1,26 @@ +{% macro sp_refresh_external_table_batch() %} +{% set sql %} +create or replace procedure streamline.refresh_external_table_next_batch(external_table_name string, streamline_table_name string) +returns string +language sql +as +$$ + declare + path string; + select_stmt string; + refresh_stmt string; + res resultset; + begin + select_stmt := 'select concat(\'batch=\',max(_partition_id)+1,\'/\') as path from streamline.' || :streamline_table_name; + res := (execute immediate :select_stmt); + let c1 cursor for res; + for row_variable in c1 do + path := row_variable.path; + end for; + refresh_stmt := 'alter external table bronze.' || :external_table_name || ' refresh \'' || :PATH || '\''; + res := (execute immediate :refresh_stmt); + return 'table refreshed with ' || :refresh_stmt; + end; +$${% endset %} +{% do run_query(sql) %} +{% endmacro %} \ No newline at end of file diff --git a/models/sources.yml b/models/sources.yml index 426057c3..91f297be 100644 --- a/models/sources.yml +++ b/models/sources.yml @@ -86,4 +86,63 @@ sources: data_type: string - name: data data_type: variant + - name: blocks_api + description: "External table of solana stake account tx ids" + external: + location: "@solana.bronze.analytics_external_tables/{{target.database}}/BLOCKS_API" + file_format: "( type = json, strip_outer_array = TRUE )" + auto_refresh: true + partitions: + - name: _inserted_date + data_type: string + expression: substr((split_part(METADATA$FILENAME,'/',3)),16,10) + columns: + - name: metadata + data_type: variant + - name: block_id + data_type: integer + - name: data + data_type: variant + - name: error + data_type: variant + - name: block_rewards_api + description: "External table of solana stake account tx ids" + external: + location: "@solana.bronze.analytics_external_tables/{{target.database}}/BLOCK_REWARDS_API" + file_format: "( type = json, strip_outer_array = TRUE )" + auto_refresh: true + partitions: + - name: _partition_id + data_type: number + expression: to_number(split_part(split_part(METADATA$FILENAME,'/',3),'=',2)) + columns: + - name: metadata + data_type: variant + - name: block_id + data_type: integer + - name: data + data_type: variant + - name: error + data_type: variant + - name: block_txs_api + description: "External table of solana stake account tx ids" + external: + location: "@solana.bronze.analytics_external_tables/{{target.database}}/BLOCK_TXS_API" + file_format: "( type = json, strip_outer_array = TRUE )" + auto_refresh: true + partitions: + - name: _partition_id + data_type: number + expression: to_number(split_part(split_part(METADATA$FILENAME,'/',3),'=',2)) + columns: + - name: metadata + data_type: variant + - name: block_id + data_type: integer + - name: tx_id + data_type: string + - name: data + data_type: variant + - name: error + data_type: variant \ No newline at end of file diff --git a/models/streamline/streamline__all_unknown_block_rewards_historical.sql b/models/streamline/streamline__all_unknown_block_rewards_historical.sql new file mode 100644 index 00000000..514f582a --- /dev/null +++ b/models/streamline/streamline__all_unknown_block_rewards_historical.sql @@ -0,0 +1,28 @@ +{{ config( + materialized = 'view' +) }} + +WITH pre_final AS ( + + SELECT + SEQ8() AS block_id + FROM + TABLE(GENERATOR(rowcount => 1000000000)) + WHERE + block_id <= 98680445 + EXCEPT + SELECT + block_id + FROM + {{ ref('streamline__complete_block_rewards') }} +) +SELECT + block_id, + ( + SELECT + coalesce(MAX(_partition_id) + 1,1) + FROM + {{ ref('streamline__complete_block_rewards') }} + ) AS batch_id +FROM + pre_final diff --git a/models/streamline/streamline__all_unknown_block_txs_historical.sql b/models/streamline/streamline__all_unknown_block_txs_historical.sql new file mode 100644 index 00000000..615705e9 --- /dev/null +++ b/models/streamline/streamline__all_unknown_block_txs_historical.sql @@ -0,0 +1,28 @@ +{{ config( + materialized = 'view' +) }} + +WITH pre_final AS ( + + SELECT + SEQ8() AS block_id + FROM + TABLE(GENERATOR(rowcount => 1000000000)) + WHERE + block_id <= 98680445 + EXCEPT + SELECT + block_id + FROM + {{ ref('streamline__complete_block_txs') }} +) +SELECT + block_id, + ( + SELECT + coalesce(MAX(_partition_id) + 1,1) + FROM + {{ ref('streamline__complete_block_txs') }} + ) AS batch_id +FROM + pre_final diff --git a/models/streamline/streamline__all_unknown_blocks_historical.sql b/models/streamline/streamline__all_unknown_blocks_historical.sql new file mode 100644 index 00000000..4bac4b40 --- /dev/null +++ b/models/streamline/streamline__all_unknown_blocks_historical.sql @@ -0,0 +1,14 @@ +{{ config( + materialized = 'view' +) }} + +SELECT + SEQ8() AS block_id +FROM + TABLE(GENERATOR(rowcount => 1000000000)) +WHERE block_id <= 98680445 +EXCEPT +SELECT + block_id +from + {{ ref('streamline__complete_blocks') }} \ No newline at end of file diff --git a/models/streamline/streamline__complete_block_rewards.sql b/models/streamline/streamline__complete_block_rewards.sql new file mode 100644 index 00000000..962a35f8 --- /dev/null +++ b/models/streamline/streamline__complete_block_rewards.sql @@ -0,0 +1,39 @@ +{{ config ( + materialized = "incremental", + unique_key = "block_id", + cluster_by = "_partition_id", + merge_update_columns = ["_partition_id"] +) }} + +WITH meta AS ( + + SELECT + registered_on, + file_name + FROM + TABLE( + information_schema.external_table_files( + table_name => '{{ source( "solana_external", "block_rewards_api") }}' + ) + ) A +) +SELECT + block_id, + _partition_id +FROM + {{ source( + "solana_external", + "block_rewards_api" + ) }} AS s +WHERE + s.block_id IS NOT NULL + +{% if is_incremental() %} +AND s._partition_id > ( + select + coalesce(max(_partition_id),0) + from + {{ this }} +) +{% endif %} +group by 1,2 diff --git a/models/streamline/streamline__complete_block_rewards.yml b/models/streamline/streamline__complete_block_rewards.yml new file mode 100644 index 00000000..9ab6e91d --- /dev/null +++ b/models/streamline/streamline__complete_block_rewards.yml @@ -0,0 +1,16 @@ +version: 2 +models: + - name: streamline__complete_block_rewards + tests: + - dbt_utils.unique_combination_of_columns: + combination_of_columns: + - BLOCK_ID + columns: + - name: BLOCK_ID + description: "{{ doc('block_id') }}" + tests: + - not_null + - name: _PARTITION_ID + description: Value representing a single pipeline execution batch + tests: + - not_null \ No newline at end of file diff --git a/models/streamline/streamline__complete_block_txs.sql b/models/streamline/streamline__complete_block_txs.sql new file mode 100644 index 00000000..7d19a79c --- /dev/null +++ b/models/streamline/streamline__complete_block_txs.sql @@ -0,0 +1,44 @@ +{{ config ( + materialized = "incremental", + unique_key = "block_id", + cluster_by = "_partition_id", + merge_update_columns = ["_partition_id"] +) }} + +WITH meta AS ( + + SELECT + registered_on, + file_name + FROM + TABLE( + information_schema.external_table_files( + table_name => '{{ source( "solana_external", "block_txs_api") }}' + ) + ) A +) +SELECT + block_id, + _partition_id +FROM + {{ source( + "solana_external", + "block_txs_api" + ) }} AS s +WHERE + s.block_id IS NOT NULL + +{% if is_incremental() %} +AND s._partition_id > ( + select + coalesce(max(_partition_id),0) + from + {{ this }} +) +{% endif %} +group by 1,2 +{% if not is_incremental() %} +qualify(ROW_NUMBER() over (PARTITION BY block_id +ORDER BY +_partition_id DESC)) = 1 +{% endif %} diff --git a/models/streamline/streamline__complete_block_txs.yml b/models/streamline/streamline__complete_block_txs.yml new file mode 100644 index 00000000..75c5e172 --- /dev/null +++ b/models/streamline/streamline__complete_block_txs.yml @@ -0,0 +1,16 @@ +version: 2 +models: + - name: streamline__complete_block_txs + tests: + - dbt_utils.unique_combination_of_columns: + combination_of_columns: + - BLOCK_ID + columns: + - name: BLOCK_ID + description: "{{ doc('block_id') }}" + tests: + - not_null + - name: _PARTITION_ID + description: Value representing a single pipeline execution batch + tests: + - not_null \ No newline at end of file diff --git a/models/streamline/streamline__complete_blocks.sql b/models/streamline/streamline__complete_blocks.sql new file mode 100644 index 00000000..8ebc51b1 --- /dev/null +++ b/models/streamline/streamline__complete_blocks.sql @@ -0,0 +1,53 @@ +{{ config ( + materialized = "incremental", + unique_key = "block_id", + cluster_by = "_inserted_date", + merge_update_columns = ["_inserted_date","_inserted_timestamp"] +) }} + +WITH meta AS ( + + SELECT + registered_on, + file_name + FROM + TABLE( + information_schema.external_table_files( + table_name => '{{ source( "solana_external", "blocks_api") }}' + ) + ) A +{% if is_incremental() %} +WHERE + registered_on >= ( + SELECT + COALESCE(MAX(_INSERTED_TIMESTAMP), '1970-01-01' :: DATE) max_INSERTED_TIMESTAMP + FROM + {{ this }}) +{% endif %} +) +SELECT + block_id, + _inserted_date, + m.registered_on as _inserted_timestamp +FROM + {{ source( + "solana_external", + "blocks_api" + ) }} AS s + JOIN meta m + ON m.file_name = metadata$filename +WHERE + s.block_id IS NOT NULL + +{% if is_incremental() %} + AND s._inserted_date >= CURRENT_DATE + AND m.registered_on > ( + SELECT + max(_inserted_timestamp) + FROM + {{ this }} + ) +{% endif %} +qualify(ROW_NUMBER() over (PARTITION BY block_id +ORDER BY +_inserted_date, _inserted_timestamp DESC)) = 1 diff --git a/models/streamline/streamline__complete_blocks.yml b/models/streamline/streamline__complete_blocks.yml new file mode 100644 index 00000000..7c5e8802 --- /dev/null +++ b/models/streamline/streamline__complete_blocks.yml @@ -0,0 +1,18 @@ +version: 2 +models: + - name: streamline__complete_blocks + tests: + - dbt_utils.unique_combination_of_columns: + combination_of_columns: + - BLOCK_ID + columns: + - name: BLOCK_ID + description: "{{ doc('block_id') }}" + tests: + - not_null + - name: _INSERTED_DATE + description: Date that the external file was inserted into s3 + tests: + - not_null + - name: _INSERTED_TIMESTAMP + description: "{{ doc('_inserted_timestamp') }}" \ No newline at end of file