This commit is contained in:
Austin 2023-10-18 19:07:34 -04:00 committed by GitHub
parent 1823ce35a6
commit 84e509dcc5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 74 additions and 316 deletions

View File

@ -1,45 +0,0 @@
name: dbt_run_heal_models
run-name: dbt_run_heal_models
on:
workflow_dispatch:
schedule:
# Runs at 04:55 on Wednesday (see https://crontab.guru)
- cron: '55 4 * * 3'
env:
DBT_PROFILES_DIR: ./
ACCOUNT: "${{ vars.ACCOUNT }}"
ROLE: "${{ vars.ROLE }}"
USER: "${{ vars.USER }}"
PASSWORD: "${{ secrets.PASSWORD }}"
REGION: "${{ vars.REGION }}"
DATABASE: "${{ vars.DATABASE }}"
WAREHOUSE: "${{ vars.WAREHOUSE }}"
SCHEMA: "${{ vars.SCHEMA }}"
concurrency:
group: ${{ github.workflow }}
jobs:
run_dbt_jobs:
runs-on: ubuntu-latest
environment:
name: workflow_prod
steps:
- uses: actions/checkout@v3
- uses: actions/setup-python@v4
with:
python-version: "3.10"
cache: "pip"
- name: install dependencies
run: |
pip install -r requirements.txt
dbt deps
- name: Run DBT Jobs
run: |
dbt run -m tag:heal --var '{"HEAL_MODEL":True}'

View File

@ -63,5 +63,4 @@ vars:
STREAMLINE_INVOKE_STREAMS: False
STREAMLINE_USE_DEV_FOR_EXTERNAL_TABLES: False
WAIT: 0
OBSERV_FULL_TEST: False
HEAL_MODEL: False
OBSERV_FULL_TEST: False

View File

@ -8,24 +8,47 @@ SELECT
block_number,
block_timestamp,
tx_hash,
event_index,
origin_function_signature,
origin_from_address,
origin_to_address,
contract_address,
t.contract_address,
from_address,
to_address,
raw_amount_precise,
raw_amount,
amount_precise,
amount,
amount_usd,
decimals,
symbol,
token_price,
has_decimal,
has_price,
_log_id,
_inserted_timestamp
raw_amount_precise,
C.token_decimals AS decimals,
C.token_symbol AS symbol,
price AS token_price,
CASE
WHEN C.token_decimals IS NOT NULL THEN raw_amount / pow(
10,
C.token_decimals
)
ELSE NULL
END AS amount,
CASE
WHEN C.token_decimals IS NOT NULL
AND price IS NOT NULL THEN amount * price
ELSE NULL
END AS amount_usd,
CASE
WHEN C.token_decimals IS NULL THEN 'false'
ELSE 'true'
END AS has_decimal,
CASE
WHEN price IS NULL THEN 'false'
ELSE 'true'
END AS has_price,
_log_id
FROM
{{ ref('silver__transfers') }}
{{ ref('core__fact_token_transfers') }}
t
LEFT JOIN {{ ref('price__ez_hourly_token_prices') }}
p
ON t.contract_address = p.token_address
AND DATE_TRUNC(
'hour',
t.block_timestamp
) = HOUR
LEFT JOIN {{ ref('silver__contracts') }} C
ON t.contract_address = C.contract_address

View File

@ -10,41 +10,35 @@ models:
description: '{{ doc("gno_block_timestamp") }}'
- name: TX_HASH
description: '{{ doc("gno_transfer_tx_hash") }}'
- name: EVENT_INDEX
description: '{{ doc("gno_event_index") }}'
- name: ORIGIN_FUNCTION_SIGNATURE
description: '{{ doc("gno_origin_sig") }}'
- name: ORIGIN_FROM_ADDRESS
description: '{{ doc("gno_origin_from") }}'
- name: ORIGIN_TO_ADDRESS
description: '{{ doc("gno_origin_to") }}'
- name: CONTRACT_ADDRESS
description: '{{ doc("gno_transfer_contract_address") }}'
- name: FROM_ADDRESS
description: '{{ doc("gno_transfer_from_address") }}'
- name: TO_ADDRESS
description: '{{ doc("gno_transfer_to_address") }}'
- name: RAW_AMOUNT_PRECISE
description: '{{ doc("gno_transfer_raw_amount_precise") }}'
- name: RAW_AMOUNT
description: '{{ doc("gno_transfer_raw_amount") }}'
- name: AMOUNT_PRECISE
description: '{{ doc("gno_transfer_amount_precise") }}'
- name: RAW_AMOUNT_PRECISE
description: '{{ doc("precise_amount_unadjusted") }}'
- name: DECIMALS
description: 'The number of decimal places this contract needs adjusted where token values exist.'
- name: SYMBOL
description: 'The symbol belonging to the address of the token.'
- name: TOKEN_PRICE
description: '{{ doc("gno_transfer_token_price") }}'
- name: AMOUNT
description: '{{ doc("gno_transfer_amount") }}'
- name: AMOUNT_USD
description: '{{ doc("gno_transfer_amount_usd") }}'
- name: DECIMALS
description: '{{ doc("gno_decimals") }}'
- name: SYMBOL
description: '{{ doc("gno_symbol") }}'
- name: TOKEN_PRICE
description: '{{ doc("gno_transfer_token_price") }}'
- name: HAS_DECIMAL
description: '{{ doc("gno_transfer_has_decimal") }}'
- name: HAS_PRICE
description: '{{ doc("gno_transfer_has_price") }}'
- name: _LOG_ID
description: '{{ doc("gno_log_id_transfers") }}'
description: '{{ doc("gno_log_id_transfers") }}'
- name: ORIGIN_FUNCTION_SIGNATURE
description: '{{ doc("gno_origin_sig") }}'
- name: ORIGIN_FROM_ADDRESS
description: '{{ doc("gno_eth_origin_from") }}'
- name: ORIGIN_TO_ADDRESS
description: '{{ doc("gno_eth_origin_to") }}'

View File

@ -8,7 +8,6 @@ SELECT
block_number,
block_timestamp,
tx_hash,
event_index,
origin_function_signature,
origin_from_address,
origin_to_address,

View File

@ -10,14 +10,6 @@ models:
description: '{{ doc("gno_block_timestamp") }}'
- name: TX_HASH
description: '{{ doc("gno_transfer_tx_hash") }}'
- name: EVENT_INDEX
description: '{{ doc("gno_event_index") }}'
- name: ORIGIN_FUNCTION_SIGNATURE
description: '{{ doc("gno_origin_sig") }}'
- name: ORIGIN_FROM_ADDRESS
description: '{{ doc("gno_origin_from") }}'
- name: ORIGIN_TO_ADDRESS
description: '{{ doc("gno_origin_to") }}'
- name: CONTRACT_ADDRESS
description: '{{ doc("gno_transfer_contract_address") }}'
- name: FROM_ADDRESS
@ -27,6 +19,12 @@ models:
- name: RAW_AMOUNT
description: '{{ doc("gno_transfer_raw_amount") }}'
- name: RAW_AMOUNT_PRECISE
description: '{{ doc("gno_transfer_raw_amount_precise") }}'
description: '{{ doc("precise_amount_unadjusted") }}'
- name: _LOG_ID
description: '{{ doc("gno_log_id_transfers") }}'
description: '{{ doc("gno_log_id_transfers") }}'
- name: ORIGIN_FUNCTION_SIGNATURE
description: '{{ doc("gno_origin_sig") }}'
- name: ORIGIN_FROM_ADDRESS
description: '{{ doc("gno_eth_origin_from") }}'
- name: ORIGIN_TO_ADDRESS
description: '{{ doc("gno_eth_origin_to") }}'

View File

@ -1,9 +1,8 @@
{{ config(
materialized = 'incremental',
incremental_strategy = 'delete+insert',
unique_key = "block_number",
unique_key = '_log_id',
cluster_by = ['block_timestamp::DATE', '_inserted_timestamp::DATE'],
tags = ['non_realtime','reorg','heal']
tags = ['non_realtime','reorg']
) }}
WITH logs AS (
@ -34,241 +33,32 @@ AND _inserted_timestamp >= (
SELECT
MAX(
_inserted_timestamp
) - INTERVAL '36 hours'
)
FROM
{{ this }}
)
{% endif %}
),
token_transfers AS (
SELECT
block_number,
block_timestamp,
tx_hash,
event_index,
origin_function_signature,
origin_from_address,
origin_to_address,
t.contract_address,
from_address,
to_address,
raw_amount_precise,
raw_amount,
IFF(
C.token_decimals IS NOT NULL,
utils.udf_decimal_adjust(
raw_amount_precise,
C.token_decimals
),
NULL
) AS amount_precise,
amount_precise :: FLOAT AS amount,
IFF(
C.token_decimals IS NOT NULL
AND price IS NOT NULL,
amount * price,
NULL
) AS amount_usd,
C.token_decimals AS decimals,
C.token_symbol AS symbol,
price AS token_price,
CASE
WHEN C.token_decimals IS NULL THEN 'false'
ELSE 'true'
END AS has_decimal,
CASE
WHEN price IS NULL THEN 'false'
ELSE 'true'
END AS has_price,
_log_id,
_inserted_timestamp
FROM
logs t
LEFT JOIN {{ ref('price__ez_hourly_token_prices') }}
p
ON t.contract_address = p.token_address
AND DATE_TRUNC(
'hour',
t.block_timestamp
) = HOUR
LEFT JOIN {{ ref('silver__contracts') }} C USING (contract_address)
WHERE
raw_amount IS NOT NULL
AND to_address IS NOT NULL
AND from_address IS NOT NULL
)
{% if is_incremental() and var(
'HEAL_MODEL'
) %},
heal_model AS (
SELECT
t0.block_number,
t0.block_timestamp,
t0.tx_hash,
t0.event_index,
t0.origin_function_signature,
t0.origin_from_address,
t0.origin_to_address,
t0.contract_address,
t0.from_address,
t0.to_address,
t0.raw_amount_precise,
t0.raw_amount,
IFF(
C.token_decimals IS NOT NULL,
utils.udf_decimal_adjust(
t0.raw_amount_precise,
C.token_decimals
),
NULL
) AS amount_precise_heal,
amount_precise_heal :: FLOAT AS amount_heal,
IFF(
C.token_decimals IS NOT NULL
AND price IS NOT NULL,
amount_heal * p.price,
NULL
) AS amount_usd,
C.token_decimals AS decimals,
C.token_symbol AS symbol,
p.price AS token_price,
CASE
WHEN C.token_decimals IS NULL THEN 'false'
ELSE 'true'
END AS has_decimal,
CASE
WHEN p.price IS NULL THEN 'false'
ELSE 'true'
END AS has_price,
t0._log_id,
t0._inserted_timestamp
FROM
{{ this }}
t0
LEFT JOIN {{ ref('price__ez_hourly_token_prices') }}
p
ON t0.contract_address = p.token_address
AND DATE_TRUNC(
'hour',
t0.block_timestamp
) = HOUR
LEFT JOIN {{ ref('silver__contracts') }} C
ON C.contract_address = t0.contract_address
WHERE
t0.block_number IN (
SELECT
DISTINCT t1.block_number AS block_number
FROM
{{ this }}
t1
WHERE
t1.decimals IS NULL
AND _inserted_timestamp < (
SELECT
MAX(
_inserted_timestamp
) - INTERVAL '36 hours'
FROM
{{ this }}
)
AND EXISTS (
SELECT
1
FROM
{{ ref('silver__contracts') }} C
WHERE
C._inserted_timestamp > DATEADD('DAY', -14, SYSDATE())
AND C.token_decimals IS NOT NULL
AND C.contract_address = t1.contract_address)
)
OR t0.block_number IN (
SELECT
DISTINCT t2.block_number
FROM
{{ this }}
t2
WHERE
t2.token_price IS NULL
AND _inserted_timestamp < (
SELECT
MAX(
_inserted_timestamp
) - INTERVAL '36 hours'
FROM
{{ this }}
)
AND EXISTS (
SELECT
1
FROM
{{ ref('silver__hourly_prices_priority') }}
p
WHERE
p._inserted_timestamp > DATEADD('DAY', -14, SYSDATE())
AND p.price IS NOT NULL
AND p.token_address = t2.contract_address
AND p.hour = DATE_TRUNC(
'hour',
t2.block_timestamp
)
)
)
)
{% endif %}
SELECT
block_number,
block_timestamp,
tx_hash,
event_index,
origin_function_signature,
origin_from_address,
origin_to_address,
contract_address,
from_address,
to_address,
raw_amount_precise,
raw_amount,
amount_precise,
amount,
amount_usd,
decimals,
symbol,
token_price,
has_decimal,
has_price,
_log_id,
_inserted_timestamp
FROM
token_transfers
{% if is_incremental() and var(
'HEAL_MODEL'
) %}
UNION ALL
SELECT
_log_id,
block_number,
block_timestamp,
tx_hash,
event_index,
origin_function_signature,
origin_from_address,
origin_to_address,
block_timestamp,
contract_address,
from_address,
to_address,
raw_amount_precise,
raw_amount,
amount_precise_heal AS amount_precise,
amount_heal AS amount,
amount_usd,
decimals,
symbol,
token_price,
has_decimal,
has_price,
_log_id,
_inserted_timestamp
_inserted_timestamp,
event_index,
raw_amount_precise
FROM
heal_model
{% endif %}
logs
WHERE
raw_amount IS NOT NULL
AND to_address IS NOT NULL
AND from_address IS NOT NULL qualify(ROW_NUMBER() over(PARTITION BY _log_id
ORDER BY
_inserted_timestamp DESC)) = 1