From 4102bf2b730353e080b54713bf30e73541ecdd66 Mon Sep 17 00:00:00 2001 From: desmond-hui Date: Mon, 9 Dec 2024 10:07:48 -0800 Subject: [PATCH] revert decoded logs deploy --- .github/workflows/dbt_run_decode_logs.yml | 1 + models/silver/parser/silver__decoded_logs.sql | 47 +++----- .../streamline__decode_logs_2_realtime.sql | 113 ++---------------- 3 files changed, 25 insertions(+), 136 deletions(-) diff --git a/.github/workflows/dbt_run_decode_logs.yml b/.github/workflows/dbt_run_decode_logs.yml index f410434d..92cad48a 100644 --- a/.github/workflows/dbt_run_decode_logs.yml +++ b/.github/workflows/dbt_run_decode_logs.yml @@ -41,4 +41,5 @@ jobs: dbt deps - name: Run DBT Jobs run: | + dbt run --vars '{"STREAMLINE_INVOKE_STREAMS": True}' -s models/streamline/decode_logs/streamline__decode_logs_realtime.sql dbt run --vars '{"STREAMLINE_INVOKE_STREAMS": True}' -s models/streamline/decode_logs/streamline__decode_logs_2_realtime.sql diff --git a/models/silver/parser/silver__decoded_logs.sql b/models/silver/parser/silver__decoded_logs.sql index 27e6138d..b1e5da09 100644 --- a/models/silver/parser/silver__decoded_logs.sql +++ b/models/silver/parser/silver__decoded_logs.sql @@ -2,33 +2,29 @@ -- depends_on: {{ ref('silver__transactions') }} -- depends_on: {{ ref('bronze__streamline_decoded_logs') }} -- depends_on: {{ ref('bronze__streamline_FR_decoded_logs') }} --- depends_on: {{ ref('bronze__streamline_decoded_logs_2') }} - {{ config( materialized = 'incremental', incremental_predicates = ["dynamic_range_predicate", "block_timestamp::date"], unique_key = "decoded_logs_id", - cluster_by = ['block_timestamp::DATE', '_inserted_timestamp::DATE', 'program_id'], + cluster_by = ['block_timestamp::DATE','_inserted_timestamp::DATE','program_id'], merge_exclude_columns = ["inserted_timestamp"], post_hook = enable_search_optimization( '{{this.schema}}', '{{this.identifier}}', 'ON EQUALITY(tx_id, event_type, decoded_logs_id)' ), - full_refresh = false, tags = ['scheduled_non_core'] ) }} {% set CUTOVER_DATETIME = modules.datetime.datetime.strptime("2024-07-16 17:00:00", "%Y-%m-%d %H:%M:%S") %} {% set use_legacy_logic = False %} -{% set streamline_2_cutover_datetime = modules.datetime.datetime.strptime("2024-12-09 00:00:00+00:00", "%Y-%m-%d %H:%M:%S%z") %} /* run incremental timestamp value first then use it as a static value */ {% if execute %} {% if is_incremental() %} {% set max_inserted_query %} SELECT - max(_inserted_timestamp) - INTERVAL '1 HOUR' AS _inserted_timestamp + MAX(_inserted_timestamp) - INTERVAL '1 HOUR' AS _inserted_timestamp FROM {{ this }} {% endset %} @@ -49,12 +45,12 @@ log_index, program_id, data, - _inserted_timestamp + _inserted_timestamp, FROM {% if is_incremental() %} - {{ ref('bronze__streamline_decoded_logs') }} A + {{ ref('bronze__streamline_decoded_logs') }} A {% else %} - {{ ref('bronze__streamline_FR_decoded_logs') }} A + {{ ref('bronze__streamline_FR_decoded_logs') }} A {% endif %} JOIN {{ ref('silver__blocks') }} @@ -69,6 +65,7 @@ {% set between_stmts = fsc_utils.dynamic_range_predicate("silver.decoded_logs__intermediate_tmp","block_timestamp::date") %} {% endif %} + {% endif %} {% if use_legacy_logic %} @@ -95,11 +92,11 @@ SELECT t.succeeded, d.program_id, d.data AS decoded_log, - decoded_log:name::STRING AS event_type, + decoded_log :name :: STRING AS event_type, d._inserted_timestamp, - {{ dbt_utils.generate_surrogate_key(['d.tx_id', 'd.index', 'd.inner_index', 'd.log_index']) }} AS decoded_logs_id, - sysdate() AS inserted_timestamp, - sysdate() AS modified_timestamp, + {{ dbt_utils.generate_surrogate_key(['d.tx_id', 'd.index', 'd.inner_index','d.log_index']) }} AS decoded_logs_id, + SYSDATE() AS inserted_timestamp, + SYSDATE() AS modified_timestamp, '{{ invocation_id }}' AS _invocation_id FROM silver.decoded_logs__intermediate_tmp d @@ -108,7 +105,7 @@ JOIN ON d.block_id = t.block_id AND d.tx_id = t.tx_id QUALIFY - row_number() OVER ( + ROW_NUMBER() over ( PARTITION BY decoded_logs_id ORDER BY d._inserted_timestamp DESC ) = 1 @@ -126,31 +123,21 @@ SELECT data AS decoded_log, decoded_log:name::string AS event_type, _inserted_timestamp, - {{ dbt_utils.generate_surrogate_key(['tx_id', 'index', 'inner_index', 'log_index']) }} AS decoded_logs_id, + {{ dbt_utils.generate_surrogate_key(['tx_id', 'index', 'inner_index','log_index']) }} AS decoded_logs_id, sysdate() AS inserted_timestamp, sysdate() AS modified_timestamp, '{{ invocation_id }}' AS _invocation_id FROM - {% if is_incremental() and max_inserted_timestamp < streamline_2_cutover_datetime %} + {% if is_incremental() %} {{ ref('bronze__streamline_decoded_logs') }} - {% elif is_incremental() %} - {{ ref('bronze__streamline_decoded_logs_2') }} - {% endif %} - /* - No longer allow full refresh of this model. - If we need to full refresh, manual intervention is required as we need to union both sets of raw data - {# {% else %} + {% else %} {{ ref('bronze__streamline_FR_decoded_logs') }} - {% endif %} #} - */ + {% endif %} {% if is_incremental() %} WHERE _inserted_timestamp >= '{{ max_inserted_timestamp }}' AND _partition_by_created_date_hour >= dateadd('hour', -2, date_trunc('hour','{{ max_inserted_timestamp }}'::timestamp_ntz)) {% endif %} QUALIFY - row_number() OVER ( - PARTITION BY decoded_logs_id - ORDER BY _inserted_timestamp DESC - ) = 1 -{% endif %} + row_number() OVER (PARTITION BY decoded_logs_id ORDER BY _inserted_timestamp DESC) = 1 +{% endif %} \ No newline at end of file diff --git a/models/streamline/decode_logs/streamline__decode_logs_2_realtime.sql b/models/streamline/decode_logs/streamline__decode_logs_2_realtime.sql index 3a5d38be..990d1d46 100644 --- a/models/streamline/decode_logs/streamline__decode_logs_2_realtime.sql +++ b/models/streamline/decode_logs/streamline__decode_logs_2_realtime.sql @@ -7,111 +7,12 @@ ), tags = ['streamline_decoder_logs'] ) }} - -{% if execute %} - {% set min_event_block_id_query %} - SELECT - min(block_id) - FROM - {{ ref('silver__blocks') }} - WHERE - block_timestamp >= CURRENT_DATE - 2 - {% endset %} - {% set min_event_block_id = run_query(min_event_block_id_query).columns[0].values()[0] %} -{% endif %} - -WITH idl_in_play AS ( - SELECT - program_id - FROM - {{ ref('silver__verified_idls') }} - WHERE - program_id IN ( - 'JUP6LkbZbjS1jKKwapdHNy74zcZ3tLUZoi5QNyVTaV4', - 'PhoeNiXZ8ByJGLkxNfZRnkUfjvmuYqLR89jjFHGqdXY', - '6EF8rrecthR5Dkzon8Nwu78hRvfCKubJ14M5uBEwF6P', - 'DCA265Vj8a9CEuX1eb1LWRnDT7uK6q1xMipnNyatn23M', - '7a4WjyR8VZ7yZz5XJAKm39BUGn5iT9CKcv2pmG9tdXVH' - ) -), -event_subset AS ( - SELECT - i.value :programId :: STRING AS inner_program_id, - e.tx_id, - e.index, - i.index AS inner_index, - NULL AS log_index, - i.value AS instruction, - e.block_id, - e.block_timestamp, - e.signers, - e.succeeded, - {{ dbt_utils.generate_surrogate_key(['e.block_id','e.tx_id','e.index','inner_index','log_index','inner_program_id']) }} as id - FROM - {{ ref('silver__events') }} e - JOIN - table(flatten(e.inner_instruction:instructions)) i - JOIN - idl_in_play b - ON array_contains(b.program_id::variant, e.inner_instruction_program_ids) - AND b.program_id = inner_program_id - WHERE - e.block_timestamp >= CURRENT_DATE - 2 - AND e.succeeded - AND array_size(i.value:accounts::array) = 1 - UNION ALL - SELECT - l.program_id, - l.tx_id, - l.index, - l.inner_index, - l.log_index, - object_construct('accounts',[],'data',l.data,'programId',l.program_id) as instruction, - l.block_id, - l.block_timestamp, - t.signers, - t.succeeded, - {{ dbt_utils.generate_surrogate_key(['l.block_id','l.tx_id','l.index','l.inner_index','l.log_index','l.program_id']) }} as id - FROM - {{ ref('silver__transaction_logs_program_data') }} l - JOIN - {{ ref('silver__transactions') }} t - USING(block_timestamp, tx_id) - WHERE - l.block_timestamp >= CURRENT_DATE - 2 - AND l.program_id in ('TSWAPaqyCSx2KABk68Shruf4rp7CxcNi8hAsbdwmHbN','JUP4Fb2cqiRUcaTHdrPC8h2gNsA2ETXiPDD33WcGuJB') -), -completed_subset AS ( - SELECT - block_id, - complete_decoded_logs_2_id as id - FROM - {{ ref('streamline__complete_decoded_logs_2') }} - WHERE - block_id >= {{ min_event_block_id }} --ensure we at least prune to last 2 days worth of blocks since the dynamic below will scan everything - AND block_id >= ( - SELECT - MIN(block_id) - FROM - event_subset - ) -) +/* +while we are running in parallel, can just select from the existing table +once we are done, we can move the existing code into this table +and it should be mostly the same except for the completed table references +*/ SELECT - e.inner_program_id as program_id, - e.tx_id, - e.index, - e.inner_index, - e.log_index, - e.instruction, - e.block_id, - e.block_timestamp, - e.signers, - e.succeeded + * FROM - event_subset e -LEFT OUTER JOIN - completed_subset C - ON C.block_id = e.block_id - AND e.id = C.id -WHERE - C.block_id IS NULL + {{ ref('streamline__decode_logs_realtime') }} \ No newline at end of file