diff --git a/.github/workflows/dbt_test.yml b/.github/workflows/dbt_test.yml index cd500671..93da9ef5 100644 --- a/.github/workflows/dbt_test.yml +++ b/.github/workflows/dbt_test.yml @@ -42,4 +42,4 @@ jobs: dbt deps - name: Run DBT Jobs run: | - dbt test -s "solana_models,./models" "solana_models,tag:test_daily" --exclude tag:test_weekly tag:test_hourly tag:exclude_test_daily "test_silver__transactions_missing_partitions" + dbt test -s "solana_models,./models" "solana_models,tag:test_daily" --exclude tag:test_weekly tag:test_hourly tag:exclude_test_daily diff --git a/macros/create_udfs.sql b/macros/create_udfs.sql index b843ffdf..4a6be990 100644 --- a/macros/create_udfs.sql +++ b/macros/create_udfs.sql @@ -3,7 +3,6 @@ {% set sql %} {% if target.database != "SOLANA_COMMUNITY_DEV" %} {{ udf_bulk_get_decoded_instructions_data() }}; - {{ udf_bulk_get_block_txs() }}; {{ udf_snapshot_get_stake_accounts() }}; {{ udf_bulk_program_parser() }}; {{ udf_decode_instructions() }}; diff --git a/macros/streamline/bulk_get_block_txs/task_bulk_get_block_txs_historical.sql b/macros/streamline/bulk_get_block_txs/task_bulk_get_block_txs_historical.sql deleted file mode 100644 index 95f97260..00000000 --- a/macros/streamline/bulk_get_block_txs/task_bulk_get_block_txs_historical.sql +++ /dev/null @@ -1,58 +0,0 @@ -{% macro task_bulk_get_block_txs_historical() %} -{% set sql %} -execute immediate 'create or replace task streamline.bulk_get_block_txs_historical - warehouse = dbt_cloud - allow_overlapping_execution = false - schedule = \'USING CRON */20 * * * * UTC\' -as -BEGIN - create or replace temporary table streamline.complete_block_txs__dbt_tmp as - ( - select * - from ( - SELECT - block_id, - _partition_id - FROM - streamline.{{ target.database }}.block_txs_api AS s - WHERE - s.block_id IS NOT NULL - AND s._partition_id > ( - select - coalesce(max(_partition_id),0) - from - streamline.complete_block_txs - ) - group by 1,2 - ) - order by (_partition_id) - ); - merge into streamline.complete_block_txs as DBT_INTERNAL_DEST - using streamline.complete_block_txs__dbt_tmp as DBT_INTERNAL_SOURCE - on DBT_INTERNAL_SOURCE.block_id = DBT_INTERNAL_DEST.block_id - when matched then - update set _partition_id = DBT_INTERNAL_SOURCE._partition_id - when not matched then - insert ("BLOCK_ID", "_PARTITION_ID") - values ("BLOCK_ID", "_PARTITION_ID"); - select streamline.udf_bulk_get_block_txs(FALSE) - where exists ( - select 1 - from streamline.all_unknown_block_txs_historical - limit 1 - ); - select streamline.udf_bulk_get_block_txs(TRUE) - where exists ( - select 1 - from streamline.all_unknown_block_txs_real_time - limit 1 - ); -END;' -{% endset %} -{% do run_query(sql) %} - -{% set sql %} - alter task streamline.bulk_get_block_txs_historical suspend; -{% endset %} -{% do run_query(sql) %} -{% endmacro %} \ No newline at end of file diff --git a/macros/streamline/bulk_get_block_txs/task_bulk_get_block_txs_real_time.sql b/macros/streamline/bulk_get_block_txs/task_bulk_get_block_txs_real_time.sql deleted file mode 100644 index cfa68eb6..00000000 --- a/macros/streamline/bulk_get_block_txs/task_bulk_get_block_txs_real_time.sql +++ /dev/null @@ -1,58 +0,0 @@ -{% macro task_bulk_get_block_txs_real_time() %} -{% set sql %} -execute immediate 'create or replace task streamline.bulk_get_block_txs_real_time - warehouse = dbt_cloud - allow_overlapping_execution = false - schedule = \'USING CRON */20 * * * * UTC\' -as -BEGIN - call streamline.refresh_external_table_next_batch(\'block_txs_api\',\'complete_block_txs\'); - create or replace temporary table streamline.complete_block_txs__dbt_tmp as - ( - select * - from ( - SELECT - block_id, - error, - _partition_id - FROM - streamline.{{ target.database }}.block_txs_api AS s - WHERE - s.block_id IS NOT NULL - AND s._partition_id > ( - select - coalesce(max(_partition_id),0) - from - streamline.complete_block_txs - ) - group by 1,2,3 - ) - order by (_partition_id) - ); - merge into streamline.complete_block_txs as DBT_INTERNAL_DEST - using streamline.complete_block_txs__dbt_tmp as DBT_INTERNAL_SOURCE - on DBT_INTERNAL_SOURCE.block_id = DBT_INTERNAL_DEST.block_id - when matched then - update - set _partition_id = DBT_INTERNAL_SOURCE._partition_id, - error = DBT_INTERNAL_SOURCE.error - when not matched then - insert ("BLOCK_ID", "ERROR", "_PARTITION_ID") - values ("BLOCK_ID", "ERROR", "_PARTITION_ID"); - select streamline.udf_bulk_get_block_txs(TRUE) - where exists ( - select 1 - from streamline.all_unknown_block_txs_real_time - limit 1 - ); -END;' -{% endset %} -{% do run_query(sql) %} - -{% if target.database == 'SOLANA' %} - {% set sql %} - alter task streamline.bulk_get_block_txs_real_time resume; - {% endset %} - {% do run_query(sql) %} -{% endif %} -{% endmacro %} \ No newline at end of file diff --git a/macros/streamline/bulk_get_block_txs/udf_bulk_get_block_txs.sql b/macros/streamline/bulk_get_block_txs/udf_bulk_get_block_txs.sql deleted file mode 100644 index c3ae31f4..00000000 --- a/macros/streamline/bulk_get_block_txs/udf_bulk_get_block_txs.sql +++ /dev/null @@ -1,8 +0,0 @@ -{% macro udf_bulk_get_block_txs() %} - CREATE - OR REPLACE EXTERNAL FUNCTION streamline.udf_bulk_get_block_txs(is_real_time boolean) returns text api_integration = aws_solana_api_dev AS {% if target.database == 'SOLANA' -%} - 'https://pj4rqb8z96.execute-api.us-east-1.amazonaws.com/prod/bulk_get_block_txs' - {% else %} - 'https://11zlwk4fm3.execute-api.us-east-1.amazonaws.com/dev/bulk_get_block_txs' - {%- endif %} -{% endmacro %} \ No newline at end of file diff --git a/models/silver/_observability/silver_observability__transactions_completeness.sql b/models/silver/_observability/silver_observability__transactions_completeness.sql index 9d19bd40..c0d916ce 100644 --- a/models/silver/_observability/silver_observability__transactions_completeness.sql +++ b/models/silver/_observability/silver_observability__transactions_completeness.sql @@ -5,6 +5,9 @@ tags = ['observability'] ) }} +{% set cutover_block_id = 307103862 %} +{% set cutover_partition_id = 150215 %} + WITH summary_stats AS ( SELECT @@ -133,12 +136,25 @@ broken_blocks AS ( m.block_id FROM potential_missing_txs m - LEFT OUTER JOIN {{ ref('streamline__complete_block_txs') }} - cmp - ON m.block_id = cmp.block_id + LEFT OUTER JOIN + {{ ref('streamline__complete_block_txs') }} cmp + ON m.block_id = cmp.block_id WHERE - cmp.error IS NOT NULL - OR cmp.block_id IS NULL + (cmp.error IS NOT NULL + OR cmp.block_id IS NULL) + AND cmp.block_id < {{ cutover_block_id }} + UNION ALL + SELECT + m.block_id + FROM + potential_missing_txs m + LEFT OUTER JOIN + {{ ref('streamline__complete_block_txs_2') }} cmp2 + ON m.block_id = cmp2.block_id + WHERE + (cmp2.block_id IS NULL) + AND cmp2._partition_id >= {{ cutover_partition_id }} + AND cmp2.block_id >= {{ cutover_block_id }} ), impacted_blocks AS ( SELECT diff --git a/models/silver/core/silver__transactions_votes_with_non_votes.sql b/models/silver/backfill/silver__transactions_votes_with_non_votes.sql similarity index 100% rename from models/silver/core/silver__transactions_votes_with_non_votes.sql rename to models/silver/backfill/silver__transactions_votes_with_non_votes.sql diff --git a/models/silver/core/silver__transactions.sql b/models/silver/core/silver__transactions.sql index 0b925fe6..a2853c53 100644 --- a/models/silver/core/silver__transactions.sql +++ b/models/silver/core/silver__transactions.sql @@ -1,3 +1,5 @@ +-- depends_on: {{ ref('streamline__complete_block_txs_2') }} + {{ config( materialized = 'incremental', unique_key = "tx_id", @@ -9,6 +11,9 @@ tags = ['scheduled_core'] ) }} +{% set cutover_block_id = 307103862 %} +{% set cutover_partition_id = 150215 %} + WITH pre_final AS ( SELECT @@ -37,13 +42,12 @@ WITH pre_final AS ( t._partition_id, t._inserted_timestamp FROM - {{ ref('bronze__transactions2') }} - t - LEFT OUTER JOIN {{ ref('silver__blocks') }} - b + {{ ref('bronze__transactions2') }} AS t + LEFT OUTER JOIN {{ ref('silver__blocks') }} AS b ON b.block_id = t.block_id WHERE - tx_id IS NOT NULL + t.block_id < {{cutover_block_id}} + AND tx_id IS NOT NULL AND ( COALESCE(t.data :transaction :message :instructions [0] :programId :: STRING,'') <> 'Vote111111111111111111111111111111111111111' OR @@ -51,32 +55,60 @@ WITH pre_final AS ( array_size(t.data :transaction :message :instructions) > 1 ) ) - -{% if is_incremental() %} - AND _partition_id >= ( - SELECT - MAX(_partition_id) -1 - FROM - {{ this }} - ) - AND _partition_id <= ( - SELECT - MAX(_partition_id) - FROM - {{ source('solana_streamline','complete_block_txs') }} - ) - AND t._inserted_timestamp > ( - SELECT - MAX(_inserted_timestamp) - FROM - {{ this }} - ) -{% else %} - AND _partition_id IN ( - 1, - 2 - ) -{% endif %} + {% if is_incremental() %} + AND _partition_id >= (SELECT max(_partition_id)-1 FROM {{ this }}) + AND _partition_id <= (SELECT max(_partition_id) FROM {{ source('solana_streamline','complete_block_txs') }}) + AND t._inserted_timestamp > (SELECT max(_inserted_timestamp) FROM {{ this }}) + {% else %} + AND _partition_id IN (1,2) + {% endif %} + AND _partition_id < {{cutover_partition_id}} + UNION ALL + SELECT + to_timestamp_ntz(t.value:"result.blockTime"::int) AS block_timestamp, + t.block_id, + t.data:transaction:signatures[0]::string AS tx_id, + t.data :transaction :message :recentBlockhash :: STRING AS recent_block_hash, + t.data :meta :fee :: NUMBER AS fee, + CASE + WHEN IS_NULL_VALUE( + t.data :meta :err + ) THEN TRUE + ELSE FALSE + END AS succeeded, + t.data :transaction :message :accountKeys :: ARRAY AS account_keys, + t.data :meta :preBalances :: ARRAY AS pre_balances, + t.data :meta :postBalances :: ARRAY AS post_balances, + t.data :meta :preTokenBalances :: ARRAY AS pre_token_balances, + t.data :meta :postTokenBalances :: ARRAY AS post_token_balances, + t.data :transaction :message :instructions :: ARRAY AS instructions, + t.data :meta :innerInstructions :: ARRAY AS inner_instructions, + t.data :meta :logMessages :: ARRAY AS log_messages, + t.data:transaction:message:addressTableLookups::array as address_table_lookups, + t.data :meta :computeUnitsConsumed :: NUMBER as compute_units_consumed, + t.data :version :: STRING as version, + t._partition_id, + t._inserted_timestamp + FROM + {{ ref('bronze__streamline_block_txs_2') }} AS t + WHERE + t.block_id >= {{ cutover_block_id }} + AND tx_id IS NOT NULL + AND ( + COALESCE(t.data :transaction :message :instructions [0] :programId :: STRING,'') <> 'Vote111111111111111111111111111111111111111' + OR + ( + array_size(t.data :transaction :message :instructions) > 1 + ) + ) + {% if is_incremental() %} + AND t._partition_id >= (SELECT max(_partition_id)-1 FROM {{ this }}) + AND t._partition_id <= (SELECT max(_partition_id) FROM {{ ref('streamline__complete_block_txs_2') }}) + AND t._inserted_timestamp > (SELECT max(_inserted_timestamp) FROM {{ this }}) + {% else %} + AND t._partition_id < 0 /* keep this here, if we ever do a full refresh this should select no data from streamline 2.0 data */ + {% endif %} + AND t._partition_id >= {{ cutover_partition_id }} ), {% if is_incremental() %} prev_null_block_timestamp_txs AS ( diff --git a/models/silver/non_core/silver__votes.sql b/models/silver/non_core/silver__votes.sql index 607655eb..117e23aa 100644 --- a/models/silver/non_core/silver__votes.sql +++ b/models/silver/non_core/silver__votes.sql @@ -8,6 +8,9 @@ tags = ['scheduled_non_core'] ) }} +{% set cutover_block_id = 307103862 %} +{% set cutover_partition_id = 150215 %} + WITH pre_final AS ( SELECT COALESCE(TO_TIMESTAMP_NTZ(t.value :block_time), b.block_timestamp) AS block_timestamp, @@ -33,34 +36,57 @@ WITH pre_final AS ( LEFT OUTER JOIN {{ ref('silver__blocks') }} b on b.block_id = t.block_id WHERE - tx_id is not null - AND - COALESCE( + t.block_id < {{cutover_block_id}} + AND tx_id is not null + AND coalesce( t.data :transaction :message :instructions [0] :programId :: STRING, '' ) = 'Vote111111111111111111111111111111111111111' - {% if is_incremental() %} - AND - _partition_id >= ( - select max(_partition_id)-1 - from {{this}} - ) - AND - _partition_id <= ( - SELECT - MAX(_partition_id) - FROM - {{ source('solana_streamline','complete_block_txs') }} - ) - AND - t._inserted_timestamp > ( - select max(_inserted_timestamp) - from {{this}} - ) - {% else %} - AND - _partition_id in (1,2) - {% endif %} + {% if is_incremental() %} + AND _partition_id >= (select max(_partition_id)-1 from {{this}}) + AND _partition_id <= (SELECT MAX(_partition_id) FROM {{ source('solana_streamline','complete_block_txs') }}) + AND t._inserted_timestamp > (select max(_inserted_timestamp) from {{this}}) + {% else %} + AND _partition_id in (1,2) + {% endif %} + AND _partition_id < {{cutover_partition_id}} + UNION ALL + SELECT + to_timestamp_ntz(t.value:"result.blockTime"::int) AS block_timestamp, + t.block_id, + t.data:transaction:signatures[0]::string AS tx_id, + t.data :transaction :message :recentBlockhash :: STRING AS recent_block_hash, + t.data :meta :fee :: NUMBER AS fee, + CASE + WHEN IS_NULL_VALUE( + t.data :meta :err + ) THEN TRUE + ELSE FALSE + END AS succeeded, + t.data :transaction :message :accountKeys :: ARRAY AS account_keys, + t.data :transaction :message :instructions [0] :parsed :info :voteAccount :: STRING AS vote_account, + t.data :transaction :message :instructions [0] :parsed :info :voteAuthority :: STRING AS vote_authority, + t.data :transaction :message :instructions [0] :parsed :info :vote :hash :: STRING AS vote_hash, + t.data :transaction :message :instructions [0] :parsed :info :vote :slots :: ARRAY AS vote_slots, + t._partition_id, + t._inserted_timestamp + FROM + {{ ref('bronze__streamline_block_txs_2') }} AS t + WHERE + t.block_id >= {{ cutover_block_id }} + AND tx_id IS NOT NULL + AND coalesce( + t.data :transaction :message :instructions [0] :programId :: STRING, + '' + ) = 'Vote111111111111111111111111111111111111111' + {% if is_incremental() %} + AND t._partition_id >= (SELECT max(_partition_id)-1 FROM {{ this }}) + AND t._partition_id <= (SELECT max(_partition_id) FROM {{ ref('streamline__complete_block_txs_2') }}) + AND t._inserted_timestamp > (SELECT max(_inserted_timestamp) FROM {{ this }}) + {% else %} + AND t._partition_id < 0 /* keep this here, if we ever do a full refresh this should select no data from streamline 2.0 data */ + {% endif %} + AND t._partition_id >= {{ cutover_partition_id }} ) {% if is_incremental() %} , prev_null_block_timestamp_txs as ( diff --git a/tests/test_silver__transactions_and_votes_missing_7_days.sql b/tests/test_silver__transactions_and_votes_missing_7_days.sql index b5aae2c9..8d7f7e6e 100644 --- a/tests/test_silver__transactions_and_votes_missing_7_days.sql +++ b/tests/test_silver__transactions_and_votes_missing_7_days.sql @@ -45,7 +45,7 @@ SELECT e.transaction_count AS ect, A.transaction_count AS act, e.transaction_count - A.transaction_count AS delta, - coalesce(c._partition_id, 0) AS _partition_id + coalesce(c._partition_id, c2._partition_id, 0) AS _partition_id FROM solscan_counts e LEFT OUTER JOIN @@ -54,6 +54,9 @@ LEFT OUTER JOIN LEFT OUTER JOIN streamline.complete_block_txs c ON e.block_id = c.block_id +LEFT OUTER JOIN + streamline.complete_block_txs_2 c2 + ON e.block_id = c2.block_id WHERE ect <> 0 AND ( diff --git a/tests/test_silver__transactions_missing_partitions.sql b/tests/test_silver__transactions_missing_partitions.sql deleted file mode 100644 index 6da10676..00000000 --- a/tests/test_silver__transactions_missing_partitions.sql +++ /dev/null @@ -1,65 +0,0 @@ -WITH max_part_id_tmp AS ( - SELECT - MAX(_partition_id) AS _partition_id - FROM - {% if target.database == 'SOLANA' %} - solana.silver.votes - {% else %} - solana_dev.silver.votes - {% endif %} - UNION - SELECT - MAX(_partition_id) - FROM - {% if target.database == 'SOLANA' %} - solana.silver.transactions - {% else %} - solana_dev.silver.transactions - {% endif %} -), -base AS ( - SELECT - DISTINCT _partition_id - FROM - {% if target.database == 'SOLANA' %} - solana.streamline.complete_block_txs - {% else %} - solana_dev.streamline.complete_block_txs - {% endif %} - WHERE - _partition_id <= ( - SELECT - MAX(_partition_id) - FROM - max_part_id_tmp - ) -), -base_txs AS ( - SELECT - DISTINCT _partition_id - FROM - {{ ref('silver__transactions') }} - UNION - SELECT - DISTINCT _partition_id - FROM - {% if target.database == 'SOLANA' %} - solana.silver.votes - {% else %} - solana_dev.silver.votes - {% endif %} -) -SELECT - b._partition_id -FROM - base b - LEFT OUTER JOIN base_txs t - ON b._partition_id = t._partition_id -WHERE - t._partition_id IS NULL - AND b._partition_id <> 1877 -- seems like this whole partition is skipped slots - AND b._partition_id > 95526 /* some old partitions never got loaded into silver, - the data has made it into silver through other partitions - and confirmed via other checks. - We can start checking for new instances after this partition - and consider everything before reconciled. */ diff --git a/tests/test_streamline__complete_block_txs__gaps.sql b/tests/test_streamline__complete_block_txs__gaps.sql index 30da3b3a..e4f96620 100644 --- a/tests/test_streamline__complete_block_txs__gaps.sql +++ b/tests/test_streamline__complete_block_txs__gaps.sql @@ -1,3 +1,7 @@ +{% set cutover_block_id = 307103862 %} +{% set cutover_partition_id = 150215 %} +{% set start_block_id = 306000000 %} + WITH base_blocks AS ( SELECT * @@ -8,7 +12,7 @@ WITH base_blocks AS ( solana_dev.silver.blocks {% endif %} WHERE - block_id >= 154195836 -- this query wont give correct results prior to this block_id + block_id >= {{ start_block_id }} AND _inserted_date < CURRENT_DATE ), base_txs AS ( @@ -21,7 +25,7 @@ base_txs AS ( solana_dev.silver.transactions {% endif %} WHERE - block_id >= 154195836 + block_id >= {{ start_block_id }} UNION SELECT DISTINCT block_id @@ -32,7 +36,7 @@ base_txs AS ( solana_dev.silver.votes {% endif %} WHERE - block_id >= 154195836 + block_id >= {{ start_block_id }} ), potential_missing_txs AS ( SELECT @@ -45,11 +49,20 @@ potential_missing_txs AS ( base_txs.block_id IS NULL ) SELECT - m.block_id + m.block_id FROM potential_missing_txs m LEFT OUTER JOIN {{ ref('streamline__complete_block_txs') }} cmp ON m.block_id = cmp.block_id + LEFT OUTER JOIN {{ ref('streamline__complete_block_txs_2') }} cmp2 + ON m.block_id = cmp2.block_id WHERE - cmp.error IS NOT NULL - OR cmp.block_id IS NULL \ No newline at end of file + ( + m.block_id < {{ cutover_block_id }} -- cutover block id from silver__transactions + AND (cmp.error IS NOT NULL OR cmp.block_id IS NULL) + ) + OR ( + m.block_id >= {{ cutover_block_id }} + AND cmp2._partition_id >= {{ cutover_partition_id }} + AND (cmp2.block_id IS NULL) + )