diff --git a/.github/workflows/dbt_run_heal_models.yml b/.github/workflows/dbt_run_heal_models.yml new file mode 100644 index 0000000..4ac81d5 --- /dev/null +++ b/.github/workflows/dbt_run_heal_models.yml @@ -0,0 +1,45 @@ +name: dbt_run_heal_models +run-name: dbt_run_heal_models + +on: + workflow_dispatch: + schedule: + # Runs at 04:55 on Wednesday (see https://crontab.guru) + - cron: '55 4 * * 3' + +env: + DBT_PROFILES_DIR: ./ + + ACCOUNT: "${{ vars.ACCOUNT }}" + ROLE: "${{ vars.ROLE }}" + USER: "${{ vars.USER }}" + PASSWORD: "${{ secrets.PASSWORD }}" + REGION: "${{ vars.REGION }}" + DATABASE: "${{ vars.DATABASE }}" + WAREHOUSE: "${{ vars.WAREHOUSE }}" + SCHEMA: "${{ vars.SCHEMA }}" + +concurrency: + group: ${{ github.workflow }} + +jobs: + run_dbt_jobs: + runs-on: ubuntu-latest + environment: + name: workflow_prod + + steps: + - uses: actions/checkout@v3 + + - uses: actions/setup-python@v4 + with: + python-version: "3.10" + cache: "pip" + + - name: install dependencies + run: | + pip install -r requirements.txt + dbt deps + - name: Run DBT Jobs + run: | + dbt run -m tag:heal --var '{"HEAL_MODEL":True}' \ No newline at end of file diff --git a/dbt_project.yml b/dbt_project.yml index c827986..0f428a2 100644 --- a/dbt_project.yml +++ b/dbt_project.yml @@ -63,4 +63,5 @@ vars: STREAMLINE_INVOKE_STREAMS: False STREAMLINE_USE_DEV_FOR_EXTERNAL_TABLES: False WAIT: 0 - OBSERV_FULL_TEST: False \ No newline at end of file + OBSERV_FULL_TEST: False + HEAL_MODEL: False \ No newline at end of file diff --git a/models/gold/core/core__ez_token_transfers.sql b/models/gold/core/core__ez_token_transfers.sql index 4832636..37ed8eb 100644 --- a/models/gold/core/core__ez_token_transfers.sql +++ b/models/gold/core/core__ez_token_transfers.sql @@ -8,47 +8,24 @@ SELECT block_number, block_timestamp, tx_hash, + event_index, origin_function_signature, origin_from_address, origin_to_address, - t.contract_address, + contract_address, from_address, to_address, - raw_amount, raw_amount_precise, - C.token_decimals AS decimals, - C.token_symbol AS symbol, - price AS token_price, - CASE - WHEN C.token_decimals IS NOT NULL THEN raw_amount / pow( - 10, - C.token_decimals - ) - ELSE NULL - END AS amount, - CASE - WHEN C.token_decimals IS NOT NULL - AND price IS NOT NULL THEN amount * price - ELSE NULL - END AS amount_usd, - CASE - WHEN C.token_decimals IS NULL THEN 'false' - ELSE 'true' - END AS has_decimal, - CASE - WHEN price IS NULL THEN 'false' - ELSE 'true' - END AS has_price, - _log_id + raw_amount, + amount_precise, + amount, + amount_usd, + decimals, + symbol, + token_price, + has_decimal, + has_price, + _log_id, + _inserted_timestamp FROM - {{ ref('core__fact_token_transfers') }} - t - LEFT JOIN {{ ref('price__ez_hourly_token_prices') }} - p - ON t.contract_address = p.token_address - AND DATE_TRUNC( - 'hour', - t.block_timestamp - ) = HOUR - LEFT JOIN {{ ref('silver__contracts') }} C - ON t.contract_address = C.contract_address + {{ ref('silver__transfers') }} diff --git a/models/gold/core/core__ez_token_transfers.yml b/models/gold/core/core__ez_token_transfers.yml index f611d6c..3663a4f 100644 --- a/models/gold/core/core__ez_token_transfers.yml +++ b/models/gold/core/core__ez_token_transfers.yml @@ -10,35 +10,41 @@ models: description: '{{ doc("gno_block_timestamp") }}' - name: TX_HASH description: '{{ doc("gno_transfer_tx_hash") }}' + - name: EVENT_INDEX + description: '{{ doc("gno_event_index") }}' + - name: ORIGIN_FUNCTION_SIGNATURE + description: '{{ doc("gno_origin_sig") }}' + - name: ORIGIN_FROM_ADDRESS + description: '{{ doc("gno_origin_from") }}' + - name: ORIGIN_TO_ADDRESS + description: '{{ doc("gno_origin_to") }}' - name: CONTRACT_ADDRESS description: '{{ doc("gno_transfer_contract_address") }}' - name: FROM_ADDRESS description: '{{ doc("gno_transfer_from_address") }}' - name: TO_ADDRESS description: '{{ doc("gno_transfer_to_address") }}' + - name: RAW_AMOUNT_PRECISE + description: '{{ doc("gno_transfer_raw_amount_precise") }}' - name: RAW_AMOUNT description: '{{ doc("gno_transfer_raw_amount") }}' - - name: RAW_AMOUNT_PRECISE - description: '{{ doc("precise_amount_unadjusted") }}' - - name: DECIMALS - description: 'The number of decimal places this contract needs adjusted where token values exist.' - - name: SYMBOL - description: 'The symbol belonging to the address of the token.' + - name: AMOUNT_PRECISE + description: '{{ doc("gno_transfer_amount_precise") }}' - name: TOKEN_PRICE description: '{{ doc("gno_transfer_token_price") }}' - name: AMOUNT description: '{{ doc("gno_transfer_amount") }}' - name: AMOUNT_USD description: '{{ doc("gno_transfer_amount_usd") }}' + - name: DECIMALS + description: '{{ doc("gno_decimals") }}' + - name: SYMBOL + description: '{{ doc("gno_symbol") }}' + - name: TOKEN_PRICE + description: '{{ doc("gno_transfer_token_price") }}' - name: HAS_DECIMAL description: '{{ doc("gno_transfer_has_decimal") }}' - name: HAS_PRICE description: '{{ doc("gno_transfer_has_price") }}' - name: _LOG_ID - description: '{{ doc("gno_log_id_transfers") }}' - - name: ORIGIN_FUNCTION_SIGNATURE - description: '{{ doc("gno_origin_sig") }}' - - name: ORIGIN_FROM_ADDRESS - description: '{{ doc("gno_eth_origin_from") }}' - - name: ORIGIN_TO_ADDRESS - description: '{{ doc("gno_eth_origin_to") }}' \ No newline at end of file + description: '{{ doc("gno_log_id_transfers") }}' \ No newline at end of file diff --git a/models/gold/core/core__fact_token_transfers.sql b/models/gold/core/core__fact_token_transfers.sql index 178f958..9dffa5a 100644 --- a/models/gold/core/core__fact_token_transfers.sql +++ b/models/gold/core/core__fact_token_transfers.sql @@ -8,6 +8,7 @@ SELECT block_number, block_timestamp, tx_hash, + event_index, origin_function_signature, origin_from_address, origin_to_address, diff --git a/models/gold/core/core__fact_token_transfers.yml b/models/gold/core/core__fact_token_transfers.yml index d5d2dbc..5aab2fb 100644 --- a/models/gold/core/core__fact_token_transfers.yml +++ b/models/gold/core/core__fact_token_transfers.yml @@ -10,6 +10,14 @@ models: description: '{{ doc("gno_block_timestamp") }}' - name: TX_HASH description: '{{ doc("gno_transfer_tx_hash") }}' + - name: EVENT_INDEX + description: '{{ doc("gno_event_index") }}' + - name: ORIGIN_FUNCTION_SIGNATURE + description: '{{ doc("gno_origin_sig") }}' + - name: ORIGIN_FROM_ADDRESS + description: '{{ doc("gno_origin_from") }}' + - name: ORIGIN_TO_ADDRESS + description: '{{ doc("gno_origin_to") }}' - name: CONTRACT_ADDRESS description: '{{ doc("gno_transfer_contract_address") }}' - name: FROM_ADDRESS @@ -19,12 +27,6 @@ models: - name: RAW_AMOUNT description: '{{ doc("gno_transfer_raw_amount") }}' - name: RAW_AMOUNT_PRECISE - description: '{{ doc("precise_amount_unadjusted") }}' + description: '{{ doc("gno_transfer_raw_amount_precise") }}' - name: _LOG_ID - description: '{{ doc("gno_log_id_transfers") }}' - - name: ORIGIN_FUNCTION_SIGNATURE - description: '{{ doc("gno_origin_sig") }}' - - name: ORIGIN_FROM_ADDRESS - description: '{{ doc("gno_eth_origin_from") }}' - - name: ORIGIN_TO_ADDRESS - description: '{{ doc("gno_eth_origin_to") }}' \ No newline at end of file + description: '{{ doc("gno_log_id_transfers") }}' \ No newline at end of file diff --git a/models/silver/core/silver__transfers.sql b/models/silver/core/silver__transfers.sql index cb9d027..6011e39 100644 --- a/models/silver/core/silver__transfers.sql +++ b/models/silver/core/silver__transfers.sql @@ -1,8 +1,9 @@ {{ config( materialized = 'incremental', - unique_key = '_log_id', + incremental_strategy = 'delete+insert', + unique_key = "block_number", cluster_by = ['block_timestamp::DATE', '_inserted_timestamp::DATE'], - tags = ['non_realtime','reorg'] + tags = ['non_realtime','reorg','heal'] ) }} WITH logs AS ( @@ -33,32 +34,241 @@ AND _inserted_timestamp >= ( SELECT MAX( _inserted_timestamp - ) + ) - INTERVAL '36 hours' FROM {{ this }} ) {% endif %} +), +token_transfers AS ( + SELECT + block_number, + block_timestamp, + tx_hash, + event_index, + origin_function_signature, + origin_from_address, + origin_to_address, + t.contract_address, + from_address, + to_address, + raw_amount_precise, + raw_amount, + IFF( + C.token_decimals IS NOT NULL, + utils.udf_decimal_adjust( + raw_amount_precise, + C.token_decimals + ), + NULL + ) AS amount_precise, + amount_precise :: FLOAT AS amount, + IFF( + C.token_decimals IS NOT NULL + AND price IS NOT NULL, + amount * price, + NULL + ) AS amount_usd, + C.token_decimals AS decimals, + C.token_symbol AS symbol, + price AS token_price, + CASE + WHEN C.token_decimals IS NULL THEN 'false' + ELSE 'true' + END AS has_decimal, + CASE + WHEN price IS NULL THEN 'false' + ELSE 'true' + END AS has_price, + _log_id, + _inserted_timestamp + FROM + logs t + LEFT JOIN {{ ref('price__ez_hourly_token_prices') }} + p + ON t.contract_address = p.token_address + AND DATE_TRUNC( + 'hour', + t.block_timestamp + ) = HOUR + LEFT JOIN {{ ref('silver__contracts') }} C USING (contract_address) + WHERE + raw_amount IS NOT NULL + AND to_address IS NOT NULL + AND from_address IS NOT NULL ) + +{% if is_incremental() and var( + 'HEAL_MODEL' +) %}, +heal_model AS ( + SELECT + t0.block_number, + t0.block_timestamp, + t0.tx_hash, + t0.event_index, + t0.origin_function_signature, + t0.origin_from_address, + t0.origin_to_address, + t0.contract_address, + t0.from_address, + t0.to_address, + t0.raw_amount_precise, + t0.raw_amount, + IFF( + C.token_decimals IS NOT NULL, + utils.udf_decimal_adjust( + t0.raw_amount_precise, + C.token_decimals + ), + NULL + ) AS amount_precise_heal, + amount_precise_heal :: FLOAT AS amount_heal, + IFF( + C.token_decimals IS NOT NULL + AND price IS NOT NULL, + amount_heal * p.price, + NULL + ) AS amount_usd, + C.token_decimals AS decimals, + C.token_symbol AS symbol, + p.price AS token_price, + CASE + WHEN C.token_decimals IS NULL THEN 'false' + ELSE 'true' + END AS has_decimal, + CASE + WHEN p.price IS NULL THEN 'false' + ELSE 'true' + END AS has_price, + t0._log_id, + t0._inserted_timestamp + FROM + {{ this }} + t0 + LEFT JOIN {{ ref('price__ez_hourly_token_prices') }} + p + ON t0.contract_address = p.token_address + AND DATE_TRUNC( + 'hour', + t0.block_timestamp + ) = HOUR + LEFT JOIN {{ ref('silver__contracts') }} C + ON C.contract_address = t0.contract_address + WHERE + t0.block_number IN ( + SELECT + DISTINCT t1.block_number AS block_number + FROM + {{ this }} + t1 + WHERE + t1.decimals IS NULL + AND _inserted_timestamp < ( + SELECT + MAX( + _inserted_timestamp + ) - INTERVAL '36 hours' + FROM + {{ this }} + ) + AND EXISTS ( + SELECT + 1 + FROM + {{ ref('silver__contracts') }} C + WHERE + C._inserted_timestamp > DATEADD('DAY', -14, SYSDATE()) + AND C.token_decimals IS NOT NULL + AND C.contract_address = t1.contract_address) + ) + OR t0.block_number IN ( + SELECT + DISTINCT t2.block_number + FROM + {{ this }} + t2 + WHERE + t2.token_price IS NULL + AND _inserted_timestamp < ( + SELECT + MAX( + _inserted_timestamp + ) - INTERVAL '36 hours' + FROM + {{ this }} + ) + AND EXISTS ( + SELECT + 1 + FROM + {{ ref('silver__hourly_prices_priority') }} + p + WHERE + p._inserted_timestamp > DATEADD('DAY', -14, SYSDATE()) + AND p.price IS NOT NULL + AND p.token_address = t2.contract_address + AND p.hour = DATE_TRUNC( + 'hour', + t2.block_timestamp + ) + ) + ) + ) + {% endif %} + SELECT + block_number, + block_timestamp, + tx_hash, + event_index, + origin_function_signature, + origin_from_address, + origin_to_address, + contract_address, + from_address, + to_address, + raw_amount_precise, + raw_amount, + amount_precise, + amount, + amount_usd, + decimals, + symbol, + token_price, + has_decimal, + has_price, + _log_id, + _inserted_timestamp + FROM + token_transfers + +{% if is_incremental() and var( + 'HEAL_MODEL' +) %} +UNION ALL SELECT - _log_id, block_number, + block_timestamp, tx_hash, + event_index, origin_function_signature, origin_from_address, origin_to_address, - block_timestamp, contract_address, from_address, to_address, + raw_amount_precise, raw_amount, - _inserted_timestamp, - event_index, - raw_amount_precise + amount_precise_heal AS amount_precise, + amount_heal AS amount, + amount_usd, + decimals, + symbol, + token_price, + has_decimal, + has_price, + _log_id, + _inserted_timestamp FROM - logs -WHERE - raw_amount IS NOT NULL - AND to_address IS NOT NULL - AND from_address IS NOT NULL qualify(ROW_NUMBER() over(PARTITION BY _log_id -ORDER BY - _inserted_timestamp DESC)) = 1 + heal_model +{% endif %}