An 4038/precise token transfers (#699)

* ez token transfers

* transfers
This commit is contained in:
Austin 2023-10-23 20:19:43 -04:00 committed by GitHub
parent 2616a8993a
commit d7bea4356a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 297 additions and 234 deletions

View File

@ -66,3 +66,4 @@ vars:
UPDATE_SNOWFLAKE_TAGS: True
WAIT: 0
OBSERV_FULL_TEST: False
HEAL_MODEL: False

View File

@ -2,4 +2,10 @@
The decimal transformed amount for this token. Tokens without a decimal adjustment will be nulled out here.
{% enddocs %}
{% docs eth_transfer_amount_precise %}
The decimal transformed amount for this token returned as a string to preserve precision. Tokens without a decimal adjustment will be nulled out here.
{% enddocs %}

View File

@ -2,4 +2,11 @@
The amount of tokens transferred. This value is not decimal adjusted.
{% enddocs %}
{% docs eth_transfer_raw_amount_precise %}
The amount of tokens transferred returned as a string to preserve precision. This value is not decimal adjusted.
{% enddocs %}

View File

@ -1,122 +1,31 @@
{{ config(
materialized = 'incremental',
materialized = 'view',
persist_docs ={ "relation": true,
"columns": true },
incremental_strategy = 'delete+insert',
unique_key = 'block_number',
cluster_by = ['block_timestamp::DATE'],
post_hook = "{{ grant_data_share_statement('EZ_TOKEN_TRANSFERS', 'TABLE') }}",
tags = ['realtime','reorg']
"columns": true }
) }}
WITH metadata AS (
SELECT
LOWER(address) AS address,
symbol,
NAME,
decimals
FROM
{{ ref('silver__contracts') }}
WHERE
decimals IS NOT NULL
),
transfers AS (
SELECT
block_number,
block_timestamp,
tx_hash,
origin_function_signature,
origin_from_address,
origin_to_address,
LOWER(contract_address) AS contract_address,
from_address,
to_address,
raw_amount,
_log_id,
_inserted_timestamp
FROM
{{ ref('silver__transfers') }}
{% if is_incremental() %}
WHERE
_inserted_timestamp >= (
SELECT
MAX(
_inserted_timestamp
) - INTERVAL '72 hours'
FROM
{{ this }}
)
{% endif %}
),
hourly_prices AS (
SELECT
HOUR,
LOWER(token_address) AS token_address,
AVG(price) AS price
FROM
{{ ref('price__ez_hourly_token_prices') }}
WHERE
1 = 1
{% if is_incremental() %}
AND HOUR :: DATE IN (
SELECT
DISTINCT block_timestamp :: DATE
FROM
transfers
)
{% else %}
AND HOUR :: DATE >= '2020-05-05'
{% endif %}
GROUP BY
1,
2
)
SELECT
block_number,
block_timestamp,
tx_hash,
event_index,
origin_function_signature,
origin_from_address,
origin_to_address,
contract_address,
from_address,
to_address,
raw_amount_precise,
raw_amount,
amount_precise,
amount,
amount_usd,
decimals,
symbol,
price AS token_price,
CASE
WHEN decimals IS NOT NULL THEN raw_amount / pow(
10,
decimals
)
ELSE NULL
END AS amount,
CASE
WHEN decimals IS NOT NULL
AND price IS NOT NULL THEN amount * price
ELSE NULL
END AS amount_usd,
CASE
WHEN decimals IS NULL THEN 'false'
ELSE 'true'
END AS has_decimal,
CASE
WHEN price IS NULL THEN 'false'
ELSE 'true'
END AS has_price,
token_price,
has_decimal,
has_price,
_log_id,
_inserted_timestamp
FROM
transfers
LEFT JOIN metadata
ON contract_address = address
LEFT JOIN hourly_prices
ON contract_address = token_address
AND DATE_TRUNC(
'hour',
block_timestamp
) = HOUR
{{ ref('silver__transfers') }}

View File

@ -3,124 +3,46 @@ models:
- name: core__ez_token_transfers
description: '{{ doc("eth_ez_transfer_table_doc") }}'
tests:
- dbt_utils.unique_combination_of_columns:
combination_of_columns:
- _LOG_ID
columns:
- name: BLOCK_NUMBER
description: '{{ doc("eth_block_number") }}'
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- NUMBER
- FLOAT
- name: BLOCK_TIMESTAMP
description: '{{ doc("eth_block_timestamp") }}'
tests:
- not_null
- dbt_expectations.expect_row_values_to_have_recent_data:
datepart: day
interval: 1
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- TIMESTAMP_LTZ
- TIMESTAMP_NTZ
description: '{{ doc("eth_block_timestamp") }}'
- name: TX_HASH
description: '{{ doc("eth_transfer_tx_hash") }}'
tests:
- not_null
- dbt_expectations.expect_column_values_to_match_regex:
regex: 0[xX][0-9a-fA-F]+
- name: CONTRACT_ADDRESS
description: '{{ doc("eth_transfer_contract_address") }}'
tests:
- not_null
- dbt_expectations.expect_column_values_to_match_regex:
regex: 0[xX][0-9a-fA-F]+
- name: FROM_ADDRESS
description: '{{ doc("eth_transfer_from_address") }}'
tests:
- not_null:
where: BLOCK_TIMESTAMP > '2021-08-01'
- dbt_expectations.expect_column_values_to_match_regex:
regex: 0[xX][0-9a-fA-F]+
- name: TO_ADDRESS
description: '{{ doc("eth_transfer_to_address") }}'
tests:
- not_null
- dbt_expectations.expect_column_values_to_match_regex:
regex: 0[xX][0-9a-fA-F]+
- name: RAW_AMOUNT
description: '{{ doc("eth_transfer_raw_amount") }}'
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- NUMBER
- FLOAT
- name: DECIMALS
description: '{{ doc("eth_decimals") }}'
tests:
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- NUMBER
- FLOAT
- name: SYMBOL
description: '{{ doc("eth_contracts_symbol") }}'
tests:
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- STRING
- VARCHAR
- name: TOKEN_PRICE
description: '{{ doc("eth_transfer_token_price") }}'
tests:
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- NUMBER
- FLOAT
- name: AMOUNT
description: '{{ doc("eth_transfer_amount") }}'
tests:
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- NUMBER
- FLOAT
- name: AMOUNT_USD
description: '{{ doc("eth_transfer_amount_usd") }}'
tests:
- not_null:
where: CONTRACT_ADDRESS in ('0xc02aaa39b223fe8d0a0e5c4f27ead9083c756cc2', '0xa0b86991c6218b36c1d19d4a2e9eb0ce3606eb48') AND block_timestamp >= '2020-05-06' AND BLOCK_TIMESTAMP < CURRENT_DATE - 2
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- NUMBER
- FLOAT
- name: HAS_PRICE
description: '{{ doc("eth_transfer_has_price") }}'
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_set:
value_set: ['true', 'false']
- name: HAS_DECIMAL
description: '{{ doc("eth_transfer_has_decimal") }}'
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_set:
value_set: ['true', 'false']
- name: _LOG_ID
description: '{{ doc("eth_log_id_transfers") }}'
tests:
- not_null
- name: EVENT_INDEX
description: '{{ doc("eth_event_index") }}'
- name: ORIGIN_FUNCTION_SIGNATURE
description: '{{ doc("nft_origin_sig") }}'
tests:
- not_null
- name: ORIGIN_FROM_ADDRESS
description: '{{ doc("eth_origin_from") }}'
tests:
- not_null
- name: ORIGIN_TO_ADDRESS
description: '{{ doc("eth_origin_to") }}'
- name: CONTRACT_ADDRESS
description: '{{ doc("eth_transfer_contract_address") }}'
- name: FROM_ADDRESS
description: '{{ doc("eth_transfer_from_address") }}'
- name: TO_ADDRESS
description: '{{ doc("eth_transfer_to_address") }}'
- name: RAW_AMOUNT_PRECISE
description: '{{ doc("eth_transfer_raw_amount_precise") }}'
- name: RAW_AMOUNT
description: '{{ doc("eth_transfer_raw_amount") }}'
- name: AMOUNT_PRECISE
description: '{{ doc("eth_transfer_amount_precise") }}'
- name: AMOUNT
description: '{{ doc("eth_transfer_amount") }}'
- name: AMOUNT_USD
description: '{{ doc("eth_transfer_amount_usd") }}'
- name: DECIMALS
description: '{{ doc("eth_decimals") }}'
- name: SYMBOL
description: '{{ doc("eth_contracts_symbol") }}'
- name: TOKEN_PRICE
description: '{{ doc("eth_transfer_token_price") }}'
- name: HAS_PRICE
description: '{{ doc("eth_transfer_has_price") }}'
- name: HAS_DECIMAL
description: '{{ doc("eth_transfer_has_decimal") }}'
- name: _LOG_ID
description: '{{ doc("eth_log_id_transfers") }}'

View File

@ -8,6 +8,7 @@ SELECT
block_number,
block_timestamp,
tx_hash,
event_index,
origin_function_signature,
origin_from_address,
origin_to_address,
@ -15,6 +16,7 @@ SELECT
from_address,
to_address,
raw_amount,
raw_amount_precise,
_log_id
FROM
{{ ref('silver__transfers') }}

View File

@ -10,6 +10,14 @@ models:
description: '{{ doc("eth_block_timestamp") }}'
- name: TX_HASH
description: '{{ doc("eth_transfer_tx_hash") }}'
- name: EVENT_INDEX
description: '{{ doc("eth_event_index") }}'
- name: ORIGIN_FUNCTION_SIGNATURE
description: '{{ doc("eth_origin_sig") }}'
- name: ORIGIN_FROM_ADDRESS
description: '{{ doc("eth_origin_from") }}'
- name: ORIGIN_TO_ADDRESS
description: '{{ doc("eth_origin_to") }}'
- name: CONTRACT_ADDRESS
description: '{{ doc("eth_transfer_contract_address") }}'
- name: FROM_ADDRESS
@ -18,11 +26,7 @@ models:
description: '{{ doc("eth_transfer_to_address") }}'
- name: RAW_AMOUNT
description: '{{ doc("eth_transfer_raw_amount") }}'
- name: RAW_AMOUNT_PRECISE
description: '{{ doc("eth_transfer_raw_amount_precise") }}'
- name: _LOG_ID
description: '{{ doc("eth_log_id_transfers") }}'
- name: ORIGIN_FUNCTION_SIGNATURE
description: '{{ doc("nft_origin_sig") }}'
- name: ORIGIN_FROM_ADDRESS
description: '{{ doc("eth_origin_from") }}'
- name: ORIGIN_TO_ADDRESS
description: '{{ doc("eth_origin_to") }}'
description: '{{ doc("eth_log_id_transfers") }}'

View File

@ -3,7 +3,7 @@
incremental_strategy = 'delete+insert',
unique_key = "block_number",
cluster_by = ['block_timestamp::DATE', '_inserted_timestamp::DATE'],
tags = ['realtime','reorg']
tags = ['realtime','reorg','heal']
) }}
WITH logs AS (
@ -19,9 +19,10 @@ WITH logs AS (
contract_address :: STRING AS contract_address,
CONCAT('0x', SUBSTR(topics [1], 27, 40)) :: STRING AS from_address,
CONCAT('0x', SUBSTR(topics [2], 27, 40)) :: STRING AS to_address,
utils.udf_hex_to_int(SUBSTR(DATA, 3, 64)) :: FLOAT AS raw_amount,
event_index :: FLOAT AS event_index,
TO_TIMESTAMP_NTZ(_inserted_timestamp) AS _inserted_timestamp
utils.udf_hex_to_int(SUBSTR(DATA, 3, 64)) AS raw_amount_precise,
raw_amount_precise :: FLOAT AS raw_amount,
event_index,
_inserted_timestamp
FROM
{{ ref('silver__logs') }}
WHERE
@ -33,31 +34,242 @@ AND _inserted_timestamp >= (
SELECT
MAX(
_inserted_timestamp
)
) - INTERVAL '36 hours'
FROM
{{ this }}
)
{% endif %}
),
token_transfers AS (
SELECT
block_number,
block_timestamp,
tx_hash,
event_index,
origin_function_signature,
origin_from_address,
origin_to_address,
t.contract_address,
from_address,
to_address,
raw_amount_precise,
raw_amount,
IFF(
C.decimals IS NOT NULL,
utils.udf_decimal_adjust(
raw_amount_precise,
C.decimals
),
NULL
) AS amount_precise,
amount_precise :: FLOAT AS amount,
IFF(
C.decimals IS NOT NULL
AND price IS NOT NULL,
amount * price,
NULL
) AS amount_usd,
C.decimals AS decimals,
C.symbol AS symbol,
price AS token_price,
CASE
WHEN C.decimals IS NULL THEN 'false'
ELSE 'true'
END AS has_decimal,
CASE
WHEN price IS NULL THEN 'false'
ELSE 'true'
END AS has_price,
t._log_id,
t._inserted_timestamp
FROM
logs t
LEFT JOIN {{ ref('price__ez_hourly_token_prices') }}
p
ON t.contract_address = p.token_address
AND DATE_TRUNC(
'hour',
t.block_timestamp
) = HOUR
LEFT JOIN {{ ref('silver__contracts') }} C
ON t.contract_address = C.address
WHERE
raw_amount IS NOT NULL
AND to_address IS NOT NULL
AND from_address IS NOT NULL
)
{% if is_incremental() and var(
'HEAL_MODEL'
) %},
heal_model AS (
SELECT
t0.block_number,
t0.block_timestamp,
t0.tx_hash,
t0.event_index,
t0.origin_function_signature,
t0.origin_from_address,
t0.origin_to_address,
t0.contract_address,
t0.from_address,
t0.to_address,
t0.raw_amount_precise,
t0.raw_amount,
IFF(
C.decimals IS NOT NULL,
utils.udf_decimal_adjust(
t0.raw_amount_precise,
C.decimals
),
NULL
) AS amount_precise_heal,
amount_precise_heal :: FLOAT AS amount_heal,
IFF(
C.decimals IS NOT NULL
AND price IS NOT NULL,
amount_heal * p.price,
NULL
) AS amount_usd,
C.decimals AS decimals,
C.symbol AS symbol,
p.price AS token_price,
CASE
WHEN C.decimals IS NULL THEN 'false'
ELSE 'true'
END AS has_decimal,
CASE
WHEN p.price IS NULL THEN 'false'
ELSE 'true'
END AS has_price,
t0._log_id,
t0._inserted_timestamp
FROM
{{ this }}
t0
LEFT JOIN {{ ref('price__ez_hourly_token_prices') }}
p
ON t0.contract_address = p.token_address
AND DATE_TRUNC(
'hour',
t0.block_timestamp
) = HOUR
LEFT JOIN {{ ref('silver__contracts') }} C
ON C.address = t0.contract_address
WHERE
t0.block_number IN (
SELECT
DISTINCT t1.block_number AS block_number
FROM
{{ this }}
t1
WHERE
t1.decimals IS NULL
AND _inserted_timestamp < (
SELECT
MAX(
_inserted_timestamp
) - INTERVAL '36 hours'
FROM
{{ this }}
)
AND EXISTS (
SELECT
1
FROM
{{ ref('silver__contracts') }} C
WHERE
C._inserted_timestamp > DATEADD('DAY', -14, SYSDATE())
AND C.decimals IS NOT NULL
AND C.address = t1.contract_address)
)
OR t0.block_number IN (
SELECT
DISTINCT t2.block_number
FROM
{{ this }}
t2
WHERE
t2.token_price IS NULL
AND _inserted_timestamp < (
SELECT
MAX(
_inserted_timestamp
) - INTERVAL '36 hours'
FROM
{{ this }}
)
AND EXISTS (
SELECT
1
FROM
{{ ref('silver__hourly_prices_priority') }}
p
WHERE
p._inserted_timestamp > DATEADD('DAY', -14, SYSDATE())
AND p.price IS NOT NULL
AND p.token_address = t2.contract_address
AND p.hour = DATE_TRUNC(
'hour',
t2.block_timestamp
)
)
)
)
{% endif %}
SELECT
block_number,
block_timestamp,
tx_hash,
event_index,
origin_function_signature,
origin_from_address,
origin_to_address,
contract_address,
from_address,
to_address,
raw_amount_precise,
raw_amount,
amount_precise,
amount,
amount_usd,
decimals,
symbol,
token_price,
has_decimal,
has_price,
_log_id,
_inserted_timestamp
FROM
token_transfers
{% if is_incremental() and var(
'HEAL_MODEL'
) %}
UNION ALL
SELECT
_log_id,
block_number,
block_timestamp,
tx_hash,
event_index,
origin_function_signature,
origin_from_address,
origin_to_address,
block_timestamp,
contract_address,
from_address,
to_address,
raw_amount_precise,
raw_amount,
_inserted_timestamp,
event_index
amount_precise_heal AS amount_precise,
amount_heal AS amount,
amount_usd,
decimals,
symbol,
token_price,
has_decimal,
has_price,
_log_id,
_inserted_timestamp
FROM
logs
WHERE
raw_amount IS NOT NULL
AND to_address IS NOT NULL
AND from_address IS NOT NULL qualify(ROW_NUMBER() over(PARTITION BY _log_id
ORDER BY
_inserted_timestamp DESC)) = 1
heal_model
{% endif %}