From 818fd59939f0fbd8d4a571e0a312580a25ac0bfe Mon Sep 17 00:00:00 2001 From: tarikceric <46071768+tarikceric@users.noreply.github.com> Date: Mon, 23 Jun 2025 11:30:33 -0700 Subject: [PATCH] An 6361/cte logic txs and idls (#851) * update tx's logic * set active col in idls * same run and tag for idl tables * fix tag name --- .github/workflows/dbt_run_idls_history.yml | 2 +- models/gold/core/core__dim_idls.sql | 4 +- models/gold/core/core__dim_idls.yml | 8 +++ models/silver/core/silver__transactions.sql | 50 +++++++------ models/silver/idls/silver__idls.sql | 12 +++- models/silver/idls/silver__verified_idls.sql | 72 ++++++++++++++++--- .../idls/silver__verified_user_idls.sql | 2 +- ...amline__decode_instructions_3_realtime.sql | 1 + 8 files changed, 115 insertions(+), 36 deletions(-) diff --git a/.github/workflows/dbt_run_idls_history.yml b/.github/workflows/dbt_run_idls_history.yml index b1930044..3572d0f9 100644 --- a/.github/workflows/dbt_run_idls_history.yml +++ b/.github/workflows/dbt_run_idls_history.yml @@ -41,7 +41,7 @@ jobs: dbt deps - name: Run DBT Jobs run: | - dbt run -s models/streamline/parser/history/streamline__idls_history.sql models/streamline/parser/history/streamline__complete_decoded_history.sql + dbt run -s "solana_models,tag:idls" models/streamline/parser/history/streamline__idls_history.sql models/streamline/parser/history/streamline__complete_decoded_history.sql notify-failure: needs: [run_dbt_jobs] diff --git a/models/gold/core/core__dim_idls.sql b/models/gold/core/core__dim_idls.sql index 2f12b373..86d90968 100644 --- a/models/gold/core/core__dim_idls.sql +++ b/models/gold/core/core__dim_idls.sql @@ -1,6 +1,6 @@ {{ config( materialized='view', - tags = ['scheduled_non_core'] + tags = ['idls'] ) }} @@ -9,6 +9,8 @@ SELECT idl, idl_hash, is_valid, + is_active, + last_activity_timestamp, submitted_by, date_submitted, first_block_id, diff --git a/models/gold/core/core__dim_idls.yml b/models/gold/core/core__dim_idls.yml index c1f93b8b..75df81a4 100644 --- a/models/gold/core/core__dim_idls.yml +++ b/models/gold/core/core__dim_idls.yml @@ -23,6 +23,14 @@ models: description: "{{ doc('is_valid') }}" tests: - dbt_expectations.expect_column_to_exist + - name: IS_ACTIVE + description: "If the program has decoded instructions in the last 14 days" + tests: + - dbt_expectations.expect_column_to_exist + - name: LAST_ACTIVITY_TIMESTAMP + description: "Most recent date that the program has decoded instructions" + tests: + - dbt_expectations.expect_column_to_exist - name: SUBMITTED_BY description: "{{ doc('submitted_by') }}" tests: diff --git a/models/silver/core/silver__transactions.sql b/models/silver/core/silver__transactions.sql index 79b12185..1335ac1c 100644 --- a/models/silver/core/silver__transactions.sql +++ b/models/silver/core/silver__transactions.sql @@ -14,8 +14,33 @@ {% set cutover_block_id = 307103862 %} {% set cutover_partition_id = 150215 %} -WITH pre_final AS ( - +WITH base AS ( + SELECT + block_timestamp, + block_id, + tx_index, + data, + _partition_id, + _inserted_timestamp, + COALESCE( + data:transaction:message:instructions[0]:programId::STRING, + '' + ) AS first_program_id, + array_size(data :transaction :message :instructions) as instruction_count + FROM + {{ ref('bronze__stage_block_txs_2') }} AS t + WHERE + block_id >= {{ cutover_block_id }} + {% if is_incremental() %} + AND t._partition_id >= (SELECT max(_partition_id)-1 FROM {{ this }}) + AND t._partition_id <= (SELECT max(_partition_id) FROM {{ ref('streamline__complete_block_txs_2') }}) + AND t._inserted_timestamp > (SELECT max(_inserted_timestamp) FROM {{ this }}) + {% else %} + AND t._partition_id < 0 /* keep this here, if we ever do a full refresh this should select no data from streamline 2.0 data */ + {% endif %} + AND t._partition_id >= {{ cutover_partition_id }} +), +pre_final AS ( SELECT COALESCE(TO_TIMESTAMP_NTZ(t.value :block_time), b.block_timestamp) AS block_timestamp, t.block_id, @@ -91,26 +116,11 @@ WITH pre_final AS ( t.tx_index, t._partition_id, t._inserted_timestamp - FROM - {{ ref('bronze__stage_block_txs_2') }} AS t + FROM base AS t WHERE - t.block_id >= {{ cutover_block_id }} + first_program_id <> 'Vote111111111111111111111111111111111111111' + OR instruction_count > 1 AND tx_id IS NOT NULL - AND ( - COALESCE(t.data :transaction :message :instructions [0] :programId :: STRING,'') <> 'Vote111111111111111111111111111111111111111' - OR - ( - array_size(t.data :transaction :message :instructions) > 1 - ) - ) - {% if is_incremental() %} - AND t._partition_id >= (SELECT max(_partition_id)-1 FROM {{ this }}) - AND t._partition_id <= (SELECT max(_partition_id) FROM {{ ref('streamline__complete_block_txs_2') }}) - AND t._inserted_timestamp > (SELECT max(_inserted_timestamp) FROM {{ this }}) - {% else %} - AND t._partition_id < 0 /* keep this here, if we ever do a full refresh this should select no data from streamline 2.0 data */ - {% endif %} - AND t._partition_id >= {{ cutover_partition_id }} ), {% if is_incremental() %} prev_null_block_timestamp_txs AS ( diff --git a/models/silver/idls/silver__idls.sql b/models/silver/idls/silver__idls.sql index fd2df12f..95461e25 100644 --- a/models/silver/idls/silver__idls.sql +++ b/models/silver/idls/silver__idls.sql @@ -2,7 +2,7 @@ materialized = 'incremental', unique_key = "program_id", merge_exclude_columns = ["inserted_timestamp"], - tags = ['scheduled_non_core'] + tags = ['idls'] ) }} WITH submitted_idls AS ( @@ -11,12 +11,14 @@ WITH submitted_idls AS ( A.program_id, A.idl, A.idl_hash, - A.is_valid, + TRUE as is_valid, --all idls from verified_idls are valid A.discord_username, A._inserted_timestamp, + a.is_active, + a.last_activity_timestamp, b.first_block_id FROM - {{ ref('silver__verified_user_idls') }} A + {{ ref('silver__verified_idls') }} A LEFT JOIN {{ ref('streamline__idls_history') }} b ON A.program_id = b.program_id qualify(ROW_NUMBER() over(PARTITION BY A.program_id @@ -60,6 +62,8 @@ pre_final AS ( A.is_valid, A.discord_username, A._inserted_timestamp, + a.is_active, + a.last_activity_timestamp, A.first_block_id, {% if is_incremental() %} iff(b.earliest_decoded_block < d.earliest_decoded_block, b.earliest_decoded_block, d.earliest_decoded_block) AS earliest_decoded_block, @@ -85,6 +89,8 @@ SELECT idl, idl_hash, is_valid, + is_active, + last_activity_timestamp, discord_username as submitted_by, _inserted_timestamp as date_submitted, first_block_id, diff --git a/models/silver/idls/silver__verified_idls.sql b/models/silver/idls/silver__verified_idls.sql index ee7717e3..4ca643a1 100644 --- a/models/silver/idls/silver__verified_idls.sql +++ b/models/silver/idls/silver__verified_idls.sql @@ -1,17 +1,18 @@ {{ config( materialized = 'incremental', unique_key = "program_id", - tags = ['idls','scheduled_non_core'] + tags = ['idls'] ) }} -WITH user_abis AS ( +WITH new_user_idls AS ( SELECT program_id, idl, discord_username, _inserted_timestamp, 'user' AS idl_source, - idl_hash + idl_hash, + TRUE as is_new_record FROM {{ ref('silver__verified_user_idls') }} WHERE @@ -32,13 +33,64 @@ WITH user_abis AS ( idl_source = 'user' ) {% endif %} +), +all_idls AS ( + {% if is_incremental() %} + -- Include existing records + SELECT + program_id, + idl, + discord_username, + _inserted_timestamp, + idl_source, + idl_hash, + CASE + WHEN _inserted_timestamp >= CURRENT_TIMESTAMP - INTERVAL '2 days' THEN TRUE + ELSE FALSE + END as is_new_record + FROM {{ this }} + + UNION ALL + {% endif %} + + -- Include new records + SELECT + program_id, + idl, + discord_username, + _inserted_timestamp, + idl_source, + idl_hash, + is_new_record + FROM new_user_idls +), +recent_activity AS ( + SELECT + program_id, + MAX(_inserted_timestamp) as last_activity_ts, + COUNT(*) as activity_count + FROM + {{ ref('silver__decoded_instructions_combined') }} + WHERE + _inserted_timestamp >= CURRENT_DATE - 14 + GROUP BY + program_id ) SELECT - program_id, - idl, - _inserted_timestamp, - idl_source, - discord_username, - idl_hash + a.program_id, + a.idl, + a._inserted_timestamp, + a.idl_source, + a.discord_username, + a.idl_hash, + CASE + WHEN a.is_new_record THEN TRUE -- Records from last 2 days are always active + WHEN r.program_id IS NOT NULL AND r.activity_count > 0 THEN TRUE -- Existing records with recent activity + ELSE FALSE -- No recent activity + END as is_active, + COALESCE(r.last_activity_ts, a._inserted_timestamp) as last_activity_timestamp FROM - user_abis \ No newline at end of file + all_idls a +LEFT JOIN + recent_activity r + ON a.program_id = r.program_id \ No newline at end of file diff --git a/models/silver/idls/silver__verified_user_idls.sql b/models/silver/idls/silver__verified_user_idls.sql index ebd8faab..dc4dc5b4 100644 --- a/models/silver/idls/silver__verified_user_idls.sql +++ b/models/silver/idls/silver__verified_user_idls.sql @@ -2,7 +2,7 @@ {{ config ( materialized = "incremental", unique_key = "id", - tags = ['idls','scheduled_non_core'] + tags = ['idls'] ) }} {% if execute %} diff --git a/models/streamline/decode_instructions/streamline__decode_instructions_3_realtime.sql b/models/streamline/decode_instructions/streamline__decode_instructions_3_realtime.sql index 343f2f49..8c628d23 100644 --- a/models/streamline/decode_instructions/streamline__decode_instructions_3_realtime.sql +++ b/models/streamline/decode_instructions/streamline__decode_instructions_3_realtime.sql @@ -28,6 +28,7 @@ {{ ref('silver__verified_idls') }} WHERE program_id <> 'FsJ3A3u2vn5cTVofAjvy6y5kwABJAqYWpe4975bi2epH' + and is_active {% endset %} {% set idls_to_decode = run_query(idls_to_decode_query)[0][0] %} {% endif %}