Hotfix/transfers incremental (#143)

* temp disable transfers incremental
:

* fix extremely long query run time

* reenable transfers model updates
This commit is contained in:
desmond-hui 2023-10-24 12:29:23 -07:00 committed by GitHub
parent 8e017e6d98
commit 6946b8129e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 53 additions and 50 deletions

View File

@ -28,7 +28,7 @@ jobs:
with:
dbt_command: |
dbt run-operation stage_external_sources --vars "ext_full_refresh: true"
dbt run -s ./models --exclude models/silver/_observability tag:balances tag:daily models/silver/core/silver__transfers.sql+
dbt run -s ./models --exclude models/silver/_observability tag:balances tag:daily
dbt run-operation stage_external_sources --vars "ext_full_refresh: true"
environment: workflow_prod
warehouse: ${{ vars.WAREHOUSE }}

View File

@ -1,3 +1,4 @@
-- depends_on: {{ ref('silver__msg_attributes') }}
{{ config(
materialized = 'incremental',
unique_key = "_unique_key",
@ -5,48 +6,61 @@
cluster_by = ['block_timestamp::DATE','_inserted_timestamp::DATE']
) }}
{% if execute %}
{% set query = """
CREATE OR REPLACE TEMPORARY TABLE silver.transfers__intermediate_tmp AS
SELECT
block_id,
block_timestamp,
tx_id,
tx_succeeded,
msg_group,
msg_sub_group,
msg_index,
msg_type,
attribute_key,
attribute_value,
_inserted_timestamp
FROM
""" ~ ref('silver__msg_attributes') ~ """
WHERE
(
attribute_key IN (
'acc_seq',
'amount'
)
OR msg_type IN (
'coin_spent',
'transfer',
'message',
'claim',
'ibc_transfer',
'write_acknowledgement'
)
)"""
%}
{% set incr = "" %}
{% if is_incremental() %}
{% set incr = """
AND _inserted_timestamp >= (
SELECT
MAX(
_inserted_timestamp
)
FROM
""" ~ this ~ """
) - INTERVAL '24 HOURS'""" %}
{% endif %}
{% do run_query(query ~ incr) %}
{% endif %}
WITH base_atts AS (
SELECT
block_id,
block_timestamp,
tx_id,
tx_succeeded,
msg_group,
msg_sub_group,
msg_index,
msg_type,
attribute_key,
attribute_value,
_inserted_timestamp
*
FROM
{{ ref('silver__msg_attributes') }}
WHERE
(
attribute_key IN (
'acc_seq',
'amount'
)
OR msg_type IN (
'coin_spent',
'transfer',
'message',
'claim',
'ibc_transfer',
'write_acknowledgement'
)
)
{% if is_incremental() %}
AND _inserted_timestamp >= (
SELECT
MAX(
_inserted_timestamp
)
FROM
{{ this }}
) - INTERVAL '24 HOURS'
{% endif %}
silver.transfers__intermediate_tmp
),
sender AS (
SELECT
@ -359,17 +373,6 @@ fin AS (
ON s.tx_id = m.tx_id
LEFT OUTER JOIN {{ ref('silver__asset_metadata') }} A
ON TRY_PARSE_JSON(attribute_value) :denom :: STRING = A.denom
JOIN (
SELECT
DISTINCT block_id,
block_timestamp,
tx_succeeded,
tx_id,
_inserted_timestamp
FROM
base_atts
) t
ON s.tx_id = t.tx_id
INNER JOIN coin_sent_ibc C
ON s.tx_id = C.tx_id
AND m.msg_group = C.msg_group {# AND m.msg_sub_group = C.msg_sub_group #}