An 4038/precise token transfers (#152)

* ez token transfers

* transfers
This commit is contained in:
Austin 2023-10-23 19:33:44 -04:00 committed by GitHub
parent 72fde5be08
commit 5a965601ca
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 325 additions and 74 deletions

View File

@ -0,0 +1,45 @@
name: dbt_run_heal_models
run-name: dbt_run_heal_models
on:
workflow_dispatch:
schedule:
# Runs at 04:20 on Wednesday (see https://crontab.guru)
- cron: '20 4 * * 3'
env:
DBT_PROFILES_DIR: ./
ACCOUNT: "${{ vars.ACCOUNT }}"
ROLE: "${{ vars.ROLE }}"
USER: "${{ vars.USER }}"
PASSWORD: "${{ secrets.PASSWORD }}"
REGION: "${{ vars.REGION }}"
DATABASE: "${{ vars.DATABASE }}"
WAREHOUSE: "${{ vars.WAREHOUSE }}"
SCHEMA: "${{ vars.SCHEMA }}"
concurrency:
group: ${{ github.workflow }}
jobs:
run_dbt_jobs:
runs-on: ubuntu-latest
environment:
name: workflow_prod
steps:
- uses: actions/checkout@v3
- uses: actions/setup-python@v4
with:
python-version: "3.10"
cache: "pip"
- name: install dependencies
run: |
pip install -r requirements.txt
dbt deps
- name: Run DBT Jobs
run: |
dbt run -m tag:heal --var '{"HEAL_MODEL":True}'

View File

@ -63,3 +63,4 @@ vars:
UPDATE_SNOWFLAKE_TAGS: True
WAIT: 0
OBSERV_FULL_TEST: False
HEAL_MODEL: False

View File

@ -2,4 +2,10 @@
The decimal transformed amount for this token. Tokens without a decimal adjustment will be nulled out here.
{% enddocs %}
{% docs arb_transfer_amount_precise %}
The decimal transformed amount for this token returned as a string to preserve precision. Tokens without a decimal adjustment will be nulled out here.
{% enddocs %}

View File

@ -2,4 +2,10 @@
The amount of tokens transferred. This value is not decimal adjusted.
{% enddocs %}
{% docs arb_transfer_raw_amount_precise %}
The amount of tokens transferred returned as a string to preserve precision. This value is not decimal adjusted.
{% enddocs %}

View File

@ -8,47 +8,24 @@ SELECT
block_number,
block_timestamp,
tx_hash,
event_index,
origin_function_signature,
origin_from_address,
origin_to_address,
t.contract_address,
contract_address,
from_address,
to_address,
raw_amount,
raw_amount_precise,
C.token_decimals AS decimals,
C.token_symbol AS symbol,
price AS token_price,
CASE
WHEN C.token_decimals IS NOT NULL THEN raw_amount / pow(
10,
C.token_decimals
)
ELSE NULL
END AS amount,
CASE
WHEN C.token_decimals IS NOT NULL
AND price IS NOT NULL THEN amount * price
ELSE NULL
END AS amount_usd,
CASE
WHEN C.token_decimals IS NULL THEN 'false'
ELSE 'true'
END AS has_decimal,
CASE
WHEN price IS NULL THEN 'false'
ELSE 'true'
END AS has_price,
_log_id
raw_amount,
amount_precise,
amount,
amount_usd,
decimals,
symbol,
token_price,
has_decimal,
has_price,
_log_id,
_inserted_timestamp
FROM
{{ ref('core__fact_token_transfers') }}
t
LEFT JOIN {{ ref('price__ez_hourly_token_prices') }}
p
ON t.contract_address = p.token_address
AND DATE_TRUNC(
'hour',
t.block_timestamp
) = HOUR
LEFT JOIN {{ ref('silver__contracts') }} C
ON t.contract_address = C.contract_address
{{ ref('silver__transfers') }}

View File

@ -10,37 +10,41 @@ models:
description: '{{ doc("arb_block_timestamp") }}'
- name: TX_HASH
description: '{{ doc("arb_transfer_tx_hash") }}'
- name: EVENT_INDEX
description: '{{ doc("arb_event_index") }}'
- name: ORIGIN_FUNCTION_SIGNATURE
description: '{{ doc("arb_origin_sig") }}'
- name: ORIGIN_FROM_ADDRESS
description: '{{ doc("arb_origin_from") }}'
- name: ORIGIN_TO_ADDRESS
description: '{{ doc("arb_origin_to") }}'
- name: CONTRACT_ADDRESS
description: '{{ doc("arb_transfer_contract_address") }}'
- name: FROM_ADDRESS
description: '{{ doc("arb_transfer_from_address") }}'
- name: TO_ADDRESS
description: '{{ doc("arb_transfer_to_address") }}'
- name: RAW_AMOUNT_PRECISE
description: '{{ doc("arb_transfer_raw_amount_precise") }}'
- name: RAW_AMOUNT
description: '{{ doc("arb_transfer_raw_amount") }}'
- name: DECIMALS
description: 'The number of decimal places this contract needs adjusted where token values exist.'
- name: SYMBOL
description: 'The symbol belonging to the address of the token.'
- name: AMOUNT_PRECISE
description: '{{ doc("arb_transfer_amount_precise") }}'
- name: TOKEN_PRICE
description: '{{ doc("arb_transfer_token_price") }}'
- name: AMOUNT
description: '{{ doc("arb_transfer_amount") }}'
- name: AMOUNT_USD
description: '{{ doc("arb_transfer_amount_usd") }}'
- name: DECIMALS
description: '{{ doc("arb_decimals") }}'
- name: SYMBOL
description: '{{ doc("arb_symbol") }}'
- name: TOKEN_PRICE
description: '{{ doc("arb_transfer_token_price") }}'
- name: HAS_DECIMAL
description: '{{ doc("arb_transfer_has_decimal") }}'
- name: HAS_PRICE
description: '{{ doc("arb_transfer_has_price") }}'
- name: _LOG_ID
description: '{{ doc("arb_log_id_transfers") }}'
- name: ORIGIN_FUNCTION_SIGNATURE
description: '{{ doc("arb_origin_sig") }}'
- name: ORIGIN_FROM_ADDRESS
description: '{{ doc("arb_eth_origin_from") }}'
- name: ORIGIN_TO_ADDRESS
description: '{{ doc("arb_eth_origin_to") }}'
- name: RAW_AMOUNT_PRECISE
description: '{{ doc("precise_amount_unadjusted") }}'
- name: AMOUNT_PRECISE
description: '{{ doc("precise_amount_adjusted") }}'
description: '{{ doc("arb_log_id_transfers") }}'

View File

@ -8,6 +8,7 @@ SELECT
block_number,
block_timestamp,
tx_hash,
event_index,
origin_function_signature,
origin_from_address,
origin_to_address,

View File

@ -10,6 +10,14 @@ models:
description: '{{ doc("arb_block_timestamp") }}'
- name: TX_HASH
description: '{{ doc("arb_transfer_tx_hash") }}'
- name: EVENT_INDEX
description: '{{ doc("arb_event_index") }}'
- name: ORIGIN_FUNCTION_SIGNATURE
description: '{{ doc("arb_origin_sig") }}'
- name: ORIGIN_FROM_ADDRESS
description: '{{ doc("arb_origin_from") }}'
- name: ORIGIN_TO_ADDRESS
description: '{{ doc("arb_origin_to") }}'
- name: CONTRACT_ADDRESS
description: '{{ doc("arb_transfer_contract_address") }}'
- name: FROM_ADDRESS
@ -19,12 +27,6 @@ models:
- name: RAW_AMOUNT
description: '{{ doc("arb_transfer_raw_amount") }}'
- name: RAW_AMOUNT_PRECISE
description: '{{ doc("precise_amount_unadjusted") }}'
description: '{{ doc("arb_transfer_raw_amount_precise") }}'
- name: _LOG_ID
description: '{{ doc("arb_log_id_transfers") }}'
- name: ORIGIN_FUNCTION_SIGNATURE
description: '{{ doc("arb_origin_sig") }}'
- name: ORIGIN_FROM_ADDRESS
description: '{{ doc("arb_eth_origin_from") }}'
- name: ORIGIN_TO_ADDRESS
description: '{{ doc("arb_eth_origin_to") }}'
description: '{{ doc("arb_log_id_transfers") }}'

View File

@ -3,7 +3,7 @@
incremental_strategy = 'delete+insert',
unique_key = "block_number",
cluster_by = ['block_timestamp::DATE', '_inserted_timestamp::DATE'],
tags = ['non_realtime','reorg']
tags = ['non_realtime','reorg','heal']
) }}
WITH logs AS (
@ -34,32 +34,241 @@ AND _inserted_timestamp >= (
SELECT
MAX(
_inserted_timestamp
)
) - INTERVAL '36 hours'
FROM
{{ this }}
)
{% endif %}
),
token_transfers AS (
SELECT
block_number,
block_timestamp,
tx_hash,
event_index,
origin_function_signature,
origin_from_address,
origin_to_address,
t.contract_address,
from_address,
to_address,
raw_amount_precise,
raw_amount,
IFF(
C.token_decimals IS NOT NULL,
utils.udf_decimal_adjust(
raw_amount_precise,
C.token_decimals
),
NULL
) AS amount_precise,
amount_precise :: FLOAT AS amount,
IFF(
C.token_decimals IS NOT NULL
AND price IS NOT NULL,
amount * price,
NULL
) AS amount_usd,
C.token_decimals AS decimals,
C.token_symbol AS symbol,
price AS token_price,
CASE
WHEN C.token_decimals IS NULL THEN 'false'
ELSE 'true'
END AS has_decimal,
CASE
WHEN price IS NULL THEN 'false'
ELSE 'true'
END AS has_price,
_log_id,
_inserted_timestamp
FROM
logs t
LEFT JOIN {{ ref('price__ez_hourly_token_prices') }}
p
ON t.contract_address = p.token_address
AND DATE_TRUNC(
'hour',
t.block_timestamp
) = HOUR
LEFT JOIN {{ ref('silver__contracts') }} C USING (contract_address)
WHERE
raw_amount IS NOT NULL
AND to_address IS NOT NULL
AND from_address IS NOT NULL
)
{% if is_incremental() and var(
'HEAL_MODEL'
) %},
heal_model AS (
SELECT
t0.block_number,
t0.block_timestamp,
t0.tx_hash,
t0.event_index,
t0.origin_function_signature,
t0.origin_from_address,
t0.origin_to_address,
t0.contract_address,
t0.from_address,
t0.to_address,
t0.raw_amount_precise,
t0.raw_amount,
IFF(
C.token_decimals IS NOT NULL,
utils.udf_decimal_adjust(
t0.raw_amount_precise,
C.token_decimals
),
NULL
) AS amount_precise_heal,
amount_precise_heal :: FLOAT AS amount_heal,
IFF(
C.token_decimals IS NOT NULL
AND price IS NOT NULL,
amount_heal * p.price,
NULL
) AS amount_usd,
C.token_decimals AS decimals,
C.token_symbol AS symbol,
p.price AS token_price,
CASE
WHEN C.token_decimals IS NULL THEN 'false'
ELSE 'true'
END AS has_decimal,
CASE
WHEN p.price IS NULL THEN 'false'
ELSE 'true'
END AS has_price,
t0._log_id,
t0._inserted_timestamp
FROM
{{ this }}
t0
LEFT JOIN {{ ref('price__ez_hourly_token_prices') }}
p
ON t0.contract_address = p.token_address
AND DATE_TRUNC(
'hour',
t0.block_timestamp
) = HOUR
LEFT JOIN {{ ref('silver__contracts') }} C
ON C.contract_address = t0.contract_address
WHERE
t0.block_number IN (
SELECT
DISTINCT t1.block_number AS block_number
FROM
{{ this }}
t1
WHERE
t1.decimals IS NULL
AND _inserted_timestamp < (
SELECT
MAX(
_inserted_timestamp
) - INTERVAL '36 hours'
FROM
{{ this }}
)
AND EXISTS (
SELECT
1
FROM
{{ ref('silver__contracts') }} C
WHERE
C._inserted_timestamp > DATEADD('DAY', -14, SYSDATE())
AND C.token_decimals IS NOT NULL
AND C.contract_address = t1.contract_address)
)
OR t0.block_number IN (
SELECT
DISTINCT t2.block_number
FROM
{{ this }}
t2
WHERE
t2.token_price IS NULL
AND _inserted_timestamp < (
SELECT
MAX(
_inserted_timestamp
) - INTERVAL '36 hours'
FROM
{{ this }}
)
AND EXISTS (
SELECT
1
FROM
{{ ref('silver__hourly_prices_priority') }}
p
WHERE
p._inserted_timestamp > DATEADD('DAY', -14, SYSDATE())
AND p.price IS NOT NULL
AND p.token_address = t2.contract_address
AND p.hour = DATE_TRUNC(
'hour',
t2.block_timestamp
)
)
)
)
{% endif %}
SELECT
block_number,
block_timestamp,
tx_hash,
event_index,
origin_function_signature,
origin_from_address,
origin_to_address,
contract_address,
from_address,
to_address,
raw_amount_precise,
raw_amount,
amount_precise,
amount,
amount_usd,
decimals,
symbol,
token_price,
has_decimal,
has_price,
_log_id,
_inserted_timestamp
FROM
token_transfers
{% if is_incremental() and var(
'HEAL_MODEL'
) %}
UNION ALL
SELECT
_log_id,
block_number,
block_timestamp,
tx_hash,
event_index,
origin_function_signature,
origin_from_address,
origin_to_address,
block_timestamp,
contract_address,
from_address,
to_address,
raw_amount_precise,
raw_amount,
_inserted_timestamp,
event_index,
raw_amount_precise
amount_precise_heal AS amount_precise,
amount_heal AS amount,
amount_usd,
decimals,
symbol,
token_price,
has_decimal,
has_price,
_log_id,
_inserted_timestamp
FROM
logs
WHERE
raw_amount IS NOT NULL
AND to_address IS NOT NULL
AND from_address IS NOT NULL qualify(ROW_NUMBER() over(PARTITION BY _log_id
ORDER BY
_inserted_timestamp DESC)) = 1
heal_model
{% endif %}