revert decoded logs deploy

This commit is contained in:
desmond-hui 2024-12-09 10:07:48 -08:00
parent b466b9469d
commit 4102bf2b73
3 changed files with 25 additions and 136 deletions

View File

@ -41,4 +41,5 @@ jobs:
dbt deps
- name: Run DBT Jobs
run: |
dbt run --vars '{"STREAMLINE_INVOKE_STREAMS": True}' -s models/streamline/decode_logs/streamline__decode_logs_realtime.sql
dbt run --vars '{"STREAMLINE_INVOKE_STREAMS": True}' -s models/streamline/decode_logs/streamline__decode_logs_2_realtime.sql

View File

@ -2,33 +2,29 @@
-- depends_on: {{ ref('silver__transactions') }}
-- depends_on: {{ ref('bronze__streamline_decoded_logs') }}
-- depends_on: {{ ref('bronze__streamline_FR_decoded_logs') }}
-- depends_on: {{ ref('bronze__streamline_decoded_logs_2') }}
{{ config(
materialized = 'incremental',
incremental_predicates = ["dynamic_range_predicate", "block_timestamp::date"],
unique_key = "decoded_logs_id",
cluster_by = ['block_timestamp::DATE', '_inserted_timestamp::DATE', 'program_id'],
cluster_by = ['block_timestamp::DATE','_inserted_timestamp::DATE','program_id'],
merge_exclude_columns = ["inserted_timestamp"],
post_hook = enable_search_optimization(
'{{this.schema}}',
'{{this.identifier}}',
'ON EQUALITY(tx_id, event_type, decoded_logs_id)'
),
full_refresh = false,
tags = ['scheduled_non_core']
) }}
{% set CUTOVER_DATETIME = modules.datetime.datetime.strptime("2024-07-16 17:00:00", "%Y-%m-%d %H:%M:%S") %}
{% set use_legacy_logic = False %}
{% set streamline_2_cutover_datetime = modules.datetime.datetime.strptime("2024-12-09 00:00:00+00:00", "%Y-%m-%d %H:%M:%S%z") %}
/* run incremental timestamp value first then use it as a static value */
{% if execute %}
{% if is_incremental() %}
{% set max_inserted_query %}
SELECT
max(_inserted_timestamp) - INTERVAL '1 HOUR' AS _inserted_timestamp
MAX(_inserted_timestamp) - INTERVAL '1 HOUR' AS _inserted_timestamp
FROM
{{ this }}
{% endset %}
@ -49,12 +45,12 @@
log_index,
program_id,
data,
_inserted_timestamp
_inserted_timestamp,
FROM
{% if is_incremental() %}
{{ ref('bronze__streamline_decoded_logs') }} A
{{ ref('bronze__streamline_decoded_logs') }} A
{% else %}
{{ ref('bronze__streamline_FR_decoded_logs') }} A
{{ ref('bronze__streamline_FR_decoded_logs') }} A
{% endif %}
JOIN
{{ ref('silver__blocks') }}
@ -69,6 +65,7 @@
{% set between_stmts = fsc_utils.dynamic_range_predicate("silver.decoded_logs__intermediate_tmp","block_timestamp::date") %}
{% endif %}
{% endif %}
{% if use_legacy_logic %}
@ -95,11 +92,11 @@ SELECT
t.succeeded,
d.program_id,
d.data AS decoded_log,
decoded_log:name::STRING AS event_type,
decoded_log :name :: STRING AS event_type,
d._inserted_timestamp,
{{ dbt_utils.generate_surrogate_key(['d.tx_id', 'd.index', 'd.inner_index', 'd.log_index']) }} AS decoded_logs_id,
sysdate() AS inserted_timestamp,
sysdate() AS modified_timestamp,
{{ dbt_utils.generate_surrogate_key(['d.tx_id', 'd.index', 'd.inner_index','d.log_index']) }} AS decoded_logs_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp,
'{{ invocation_id }}' AS _invocation_id
FROM
silver.decoded_logs__intermediate_tmp d
@ -108,7 +105,7 @@ JOIN
ON d.block_id = t.block_id
AND d.tx_id = t.tx_id
QUALIFY
row_number() OVER (
ROW_NUMBER() over (
PARTITION BY decoded_logs_id
ORDER BY d._inserted_timestamp DESC
) = 1
@ -126,31 +123,21 @@ SELECT
data AS decoded_log,
decoded_log:name::string AS event_type,
_inserted_timestamp,
{{ dbt_utils.generate_surrogate_key(['tx_id', 'index', 'inner_index', 'log_index']) }} AS decoded_logs_id,
{{ dbt_utils.generate_surrogate_key(['tx_id', 'index', 'inner_index','log_index']) }} AS decoded_logs_id,
sysdate() AS inserted_timestamp,
sysdate() AS modified_timestamp,
'{{ invocation_id }}' AS _invocation_id
FROM
{% if is_incremental() and max_inserted_timestamp < streamline_2_cutover_datetime %}
{% if is_incremental() %}
{{ ref('bronze__streamline_decoded_logs') }}
{% elif is_incremental() %}
{{ ref('bronze__streamline_decoded_logs_2') }}
{% endif %}
/*
No longer allow full refresh of this model.
If we need to full refresh, manual intervention is required as we need to union both sets of raw data
{# {% else %}
{% else %}
{{ ref('bronze__streamline_FR_decoded_logs') }}
{% endif %} #}
*/
{% endif %}
{% if is_incremental() %}
WHERE
_inserted_timestamp >= '{{ max_inserted_timestamp }}'
AND _partition_by_created_date_hour >= dateadd('hour', -2, date_trunc('hour','{{ max_inserted_timestamp }}'::timestamp_ntz))
{% endif %}
QUALIFY
row_number() OVER (
PARTITION BY decoded_logs_id
ORDER BY _inserted_timestamp DESC
) = 1
{% endif %}
row_number() OVER (PARTITION BY decoded_logs_id ORDER BY _inserted_timestamp DESC) = 1
{% endif %}

View File

@ -7,111 +7,12 @@
),
tags = ['streamline_decoder_logs']
) }}
{% if execute %}
{% set min_event_block_id_query %}
SELECT
min(block_id)
FROM
{{ ref('silver__blocks') }}
WHERE
block_timestamp >= CURRENT_DATE - 2
{% endset %}
{% set min_event_block_id = run_query(min_event_block_id_query).columns[0].values()[0] %}
{% endif %}
WITH idl_in_play AS (
SELECT
program_id
FROM
{{ ref('silver__verified_idls') }}
WHERE
program_id IN (
'JUP6LkbZbjS1jKKwapdHNy74zcZ3tLUZoi5QNyVTaV4',
'PhoeNiXZ8ByJGLkxNfZRnkUfjvmuYqLR89jjFHGqdXY',
'6EF8rrecthR5Dkzon8Nwu78hRvfCKubJ14M5uBEwF6P',
'DCA265Vj8a9CEuX1eb1LWRnDT7uK6q1xMipnNyatn23M',
'7a4WjyR8VZ7yZz5XJAKm39BUGn5iT9CKcv2pmG9tdXVH'
)
),
event_subset AS (
SELECT
i.value :programId :: STRING AS inner_program_id,
e.tx_id,
e.index,
i.index AS inner_index,
NULL AS log_index,
i.value AS instruction,
e.block_id,
e.block_timestamp,
e.signers,
e.succeeded,
{{ dbt_utils.generate_surrogate_key(['e.block_id','e.tx_id','e.index','inner_index','log_index','inner_program_id']) }} as id
FROM
{{ ref('silver__events') }} e
JOIN
table(flatten(e.inner_instruction:instructions)) i
JOIN
idl_in_play b
ON array_contains(b.program_id::variant, e.inner_instruction_program_ids)
AND b.program_id = inner_program_id
WHERE
e.block_timestamp >= CURRENT_DATE - 2
AND e.succeeded
AND array_size(i.value:accounts::array) = 1
UNION ALL
SELECT
l.program_id,
l.tx_id,
l.index,
l.inner_index,
l.log_index,
object_construct('accounts',[],'data',l.data,'programId',l.program_id) as instruction,
l.block_id,
l.block_timestamp,
t.signers,
t.succeeded,
{{ dbt_utils.generate_surrogate_key(['l.block_id','l.tx_id','l.index','l.inner_index','l.log_index','l.program_id']) }} as id
FROM
{{ ref('silver__transaction_logs_program_data') }} l
JOIN
{{ ref('silver__transactions') }} t
USING(block_timestamp, tx_id)
WHERE
l.block_timestamp >= CURRENT_DATE - 2
AND l.program_id in ('TSWAPaqyCSx2KABk68Shruf4rp7CxcNi8hAsbdwmHbN','JUP4Fb2cqiRUcaTHdrPC8h2gNsA2ETXiPDD33WcGuJB')
),
completed_subset AS (
SELECT
block_id,
complete_decoded_logs_2_id as id
FROM
{{ ref('streamline__complete_decoded_logs_2') }}
WHERE
block_id >= {{ min_event_block_id }} --ensure we at least prune to last 2 days worth of blocks since the dynamic below will scan everything
AND block_id >= (
SELECT
MIN(block_id)
FROM
event_subset
)
)
/*
while we are running in parallel, can just select from the existing table
once we are done, we can move the existing code into this table
and it should be mostly the same except for the completed table references
*/
SELECT
e.inner_program_id as program_id,
e.tx_id,
e.index,
e.inner_index,
e.log_index,
e.instruction,
e.block_id,
e.block_timestamp,
e.signers,
e.succeeded
*
FROM
event_subset e
LEFT OUTER JOIN
completed_subset C
ON C.block_id = e.block_id
AND e.id = C.id
WHERE
C.block_id IS NULL
{{ ref('streamline__decode_logs_realtime') }}