From 6946b8129e4326974c53291a8bf609fd2d147db8 Mon Sep 17 00:00:00 2001 From: desmond-hui <97470747+desmond-hui@users.noreply.github.com> Date: Tue, 24 Oct 2023 12:29:23 -0700 Subject: [PATCH] Hotfix/transfers incremental (#143) * temp disable transfers incremental : * fix extremely long query run time * reenable transfers model updates --- .github/workflows/dbt_run_incremental.yml | 2 +- models/silver/core/silver__transfers.sql | 101 +++++++++++----------- 2 files changed, 53 insertions(+), 50 deletions(-) diff --git a/.github/workflows/dbt_run_incremental.yml b/.github/workflows/dbt_run_incremental.yml index 4dd912d..2550c17 100644 --- a/.github/workflows/dbt_run_incremental.yml +++ b/.github/workflows/dbt_run_incremental.yml @@ -28,7 +28,7 @@ jobs: with: dbt_command: | dbt run-operation stage_external_sources --vars "ext_full_refresh: true" - dbt run -s ./models --exclude models/silver/_observability tag:balances tag:daily models/silver/core/silver__transfers.sql+ + dbt run -s ./models --exclude models/silver/_observability tag:balances tag:daily dbt run-operation stage_external_sources --vars "ext_full_refresh: true" environment: workflow_prod warehouse: ${{ vars.WAREHOUSE }} diff --git a/models/silver/core/silver__transfers.sql b/models/silver/core/silver__transfers.sql index 8076c6e..2d0214c 100644 --- a/models/silver/core/silver__transfers.sql +++ b/models/silver/core/silver__transfers.sql @@ -1,3 +1,4 @@ +-- depends_on: {{ ref('silver__msg_attributes') }} {{ config( materialized = 'incremental', unique_key = "_unique_key", @@ -5,48 +6,61 @@ cluster_by = ['block_timestamp::DATE','_inserted_timestamp::DATE'] ) }} +{% if execute %} + {% set query = """ + CREATE OR REPLACE TEMPORARY TABLE silver.transfers__intermediate_tmp AS + SELECT + block_id, + block_timestamp, + tx_id, + tx_succeeded, + msg_group, + msg_sub_group, + msg_index, + msg_type, + attribute_key, + attribute_value, + _inserted_timestamp + FROM + """ ~ ref('silver__msg_attributes') ~ """ + WHERE + ( + attribute_key IN ( + 'acc_seq', + 'amount' + ) + OR msg_type IN ( + 'coin_spent', + 'transfer', + 'message', + 'claim', + 'ibc_transfer', + 'write_acknowledgement' + ) + )""" + %} + {% set incr = "" %} + {% if is_incremental() %} + {% set incr = """ + AND _inserted_timestamp >= ( + SELECT + MAX( + _inserted_timestamp + ) + FROM + """ ~ this ~ """ + ) - INTERVAL '24 HOURS'""" %} + {% endif %} + {% do run_query(query ~ incr) %} +{% endif %} + WITH base_atts AS ( SELECT - block_id, - block_timestamp, - tx_id, - tx_succeeded, - msg_group, - msg_sub_group, - msg_index, - msg_type, - attribute_key, - attribute_value, - _inserted_timestamp + * FROM - {{ ref('silver__msg_attributes') }} - WHERE - ( - attribute_key IN ( - 'acc_seq', - 'amount' - ) - OR msg_type IN ( - 'coin_spent', - 'transfer', - 'message', - 'claim', - 'ibc_transfer', - 'write_acknowledgement' - ) - ) - -{% if is_incremental() %} -AND _inserted_timestamp >= ( - SELECT - MAX( - _inserted_timestamp - ) - FROM - {{ this }} -) - INTERVAL '24 HOURS' -{% endif %} + silver.transfers__intermediate_tmp + ), sender AS ( SELECT @@ -359,17 +373,6 @@ fin AS ( ON s.tx_id = m.tx_id LEFT OUTER JOIN {{ ref('silver__asset_metadata') }} A ON TRY_PARSE_JSON(attribute_value) :denom :: STRING = A.denom - JOIN ( - SELECT - DISTINCT block_id, - block_timestamp, - tx_succeeded, - tx_id, - _inserted_timestamp - FROM - base_atts - ) t - ON s.tx_id = t.tx_id INNER JOIN coin_sent_ibc C ON s.tx_id = C.tx_id AND m.msg_group = C.msg_group {# AND m.msg_sub_group = C.msg_sub_group #}