contracts model + tasks (#32)

* contracts model + tasks

* docs

* back off
This commit is contained in:
Austin 2022-11-30 10:57:34 -05:00 committed by GitHub
parent 626fc88017
commit f65da7e9a1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 389 additions and 2 deletions

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

View File

@ -0,0 +1,69 @@
{% macro task_get_abis() %}
{% set sql %}
EXECUTE IMMEDIATE
'create or replace task bronze_api.get_block_explorer_abis
warehouse = DBT_CLOUD_AVALANCHE
allow_overlapping_execution = false
schedule = \'300 minute\'
as
BEGIN
INSERT INTO
bronze_api.contract_abis(
contract_address,
abi_data,
_inserted_timestamp
)
WITH api_keys AS (
SELECT
api_key
FROM
crosschain.silver.apis_keys
WHERE
api_name = \'snowtrace\'
),
base AS (
SELECT
contract_address
FROM
silver.relevant_abi_contracts
EXCEPT
SELECT
contract_address
FROM
bronze_api.contract_abis
WHERE
abi_data :data :result :: STRING <> \'Max rate limit reached\'
LIMIT
100
)
SELECT
contract_address,
ethereum.streamline.udf_api(
\'GET\',
CONCAT(
\'https://api.snowtrace.io/api?module=contract&action=getabi&address=\',
contract_address,
\'&apikey=\',
api_key
),{},{}
) AS abi_data,
SYSDATE()
FROM
base
LEFT JOIN api_keys
ON 1 = 1
where exists (select 1 from base limit 1);
END;'
{% endset %}
{% do run_query(sql) %}
{% if target.database.upper() == 'AVALANCHE' %}
{% set sql %}
alter task bronze_api.get_block_explorer_abis resume;
{% endset %}
{% do run_query(sql) %}
{% endif %}
{% endmacro %}

View File

@ -0,0 +1,94 @@
{% macro task_get_token_reads() %}
{% set sql %}
EXECUTE IMMEDIATE
'create or replace task bronze_api.get_token_reads
warehouse = DBT_CLOUD_AVALANCHE
allow_overlapping_execution = false
schedule = \'300 minute\'
as
BEGIN
INSERT INTO
bronze_api.token_reads(
contract_address,
block_number,
function_sig,
function_input,
read_result,
_inserted_timestamp
)
with base as (
select
contract_address,
created_block
from silver.relevant_token_contracts
where contract_address not in (select contract_address from bronze_api.token_reads)
limit 500
)
, function_sigs as (
select \'0x313ce567\' as function_sig, \'decimals\' as function_name
union select \'0x06fdde03\', \'name\'
union select \'0x95d89b41\', \'symbol\'
),
all_reads as (
select *
from base
join function_sigs on 1=1
),
ready_reads as (
select
contract_address,
created_block,
function_sig,
concat(\'[!\',contract_address,\'!,\', created_block, \',!\',function_sig,\'!,!!]\') as read_input1,
replace(read_input1, $$!$$,$$\'$$) as read_input
from all_reads
)
, batch_reads as (
select concat(\'[\',listagg(read_input,\',\'),\']\') as batch_read
from ready_reads
),
results as (
select
ethereum.streamline.udf_json_rpc_read_calls(
node_url,
headers,
parse_json(batch_read)
) as read_output
from batch_reads
join streamline.crosschain.node_mapping
on 1=1 and chain = \'avalanche\'
where exists (select 1 from ready_reads limit 1)
)
, final as (
select
value:id::string as read_id,
value:result::string as read_result,
split(read_id,\'-\') as read_id_object,
read_id_object[0]::string as contract_address,
read_id_object[1]::string as block_number,
read_id_object[2]::string as function_sig,
read_id_object[3]::string as function_input
from results,
lateral flatten(input=> read_output[0]:data)
)
select
contract_address,
block_number,
function_sig,
function_input,
read_result,
sysdate()::timestamp as _inserted_timestamp
from final;
end;'
{% endset %}
{% do run_query(sql) %}
{% if target.database.upper() == 'AVALANCHE' %}
{% set sql %}
alter task bronze_api.get_token_reads resume;
{% endset %}
{% do run_query(sql) %}
{% endif %}
{% endmacro %}

View File

@ -0,0 +1,13 @@
{{ config(
materialized = 'view',
persist_docs ={ "relation": true,
"columns": true }
) }}
SELECT
contract_address as address,
token_symbol AS symbol,
token_name AS NAME,
token_decimals AS decimals
FROM
{{ ref('silver__contracts') }}

View File

@ -0,0 +1,14 @@
version: 2
models:
- name: core__dim_contracts
description: 'This table contains the contract addresses and their associated metadata. Includes ERC20 and ERC721 tokens. Metadata is read directly from contracts on the blockchain.'
columns:
- name: ADDRESS
description: 'The address of the contract.'
- name: SYMBOL
description: 'The symbol of the contract.'
- name: NAME
description: 'The name of the contract.'
- name: DECIMALS
description: 'The number of decimals used to adjust amount for this contract.'

View File

@ -0,0 +1,19 @@
{{ config(
materialized = 'table',
unique_key = "contract_address"
) }}
SELECT
contract_address,
'avalanche' AS blockchain,
COUNT(*) AS transfers,
MIN(block_number) AS created_block
FROM
{{ ref('silver__logs') }}
WHERE
topics [0] :: STRING = '0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef'
GROUP BY
1,
2
HAVING
COUNT(*) > 25

View File

@ -0,0 +1,7 @@
version: 2
models:
- name: silver__relevant_token_contracts
tests:
- dbt_utils.unique_combination_of_columns:
combination_of_columns:
- CONTRACT_ADDRESS

View File

@ -0,0 +1,141 @@
{{ config(
materialized = 'incremental',
unique_key = 'contract_address'
) }}
WITH base_metadata AS (
SELECT
contract_address,
block_number,
function_sig AS function_signature,
read_result AS read_output,
_inserted_timestamp
FROM
{{ source(
'bronze_api',
'token_reads'
) }}
WHERE
read_result IS NOT NULL
AND read_result <> '0x'
{% if is_incremental() %}
AND _inserted_timestamp >= (
SELECT
MAX(
_inserted_timestamp
)
FROM
{{ this }}
)
{% endif %}
),
token_names AS (
SELECT
contract_address,
block_number,
function_signature,
read_output,
regexp_substr_all(SUBSTR(read_output, 3, len(read_output)), '.{64}') AS segmented_output,
PUBLIC.udf_hex_to_int(
segmented_output [1] :: STRING
) AS sub_len,
TRY_HEX_DECODE_STRING(
SUBSTR(
segmented_output [2] :: STRING,
0,
sub_len * 2
)
) AS name1,
TRY_HEX_DECODE_STRING(RTRIM(segmented_output [2] :: STRING, 0)) AS name2,
TRY_HEX_DECODE_STRING(RTRIM(segmented_output [0] :: STRING, 0)) AS name3,
TRY_HEX_DECODE_STRING(
CONCAT(RTRIM(segmented_output [0] :: STRING, 0), '0')
) AS name4,
COALESCE(
name1,
name2,
name3,
name4
) AS token_name
FROM
base_metadata
WHERE
function_signature = '0x06fdde03'
AND segmented_output [1] :: STRING IS NOT NULL
),
token_symbols AS (
SELECT
contract_address,
block_number,
function_signature,
read_output,
regexp_substr_all(SUBSTR(read_output, 3, len(read_output)), '.{64}') AS segmented_output,
PUBLIC.udf_hex_to_int(
segmented_output [1] :: STRING
) AS sub_len,
TRY_HEX_DECODE_STRING(
SUBSTR(
segmented_output [2] :: STRING,
0,
sub_len * 2
)
) AS symbol1,
TRY_HEX_DECODE_STRING(RTRIM(segmented_output [2] :: STRING, 0)) AS symbol2,
TRY_HEX_DECODE_STRING(RTRIM(segmented_output [0] :: STRING, 0)) AS symbol3,
TRY_HEX_DECODE_STRING(
CONCAT(RTRIM(segmented_output [0] :: STRING, 0), '0')
) AS symbol4,
COALESCE(
symbol1,
symbol2,
symbol3,
symbol4
) AS token_symbol
FROM
base_metadata
WHERE
function_signature = '0x95d89b41'
AND segmented_output [1] :: STRING IS NOT NULL
),
token_decimals AS (
SELECT
contract_address,
PUBLIC.udf_hex_to_int(
read_output :: STRING
) AS token_decimals,
LENGTH(token_decimals) AS dec_length
FROM
base_metadata
WHERE
function_signature = '0x313ce567'
AND read_output IS NOT NULL
AND read_output <> '0x'
),
contracts AS (
SELECT
contract_address,
MAX(_inserted_timestamp) AS _inserted_timestamp
FROM
base_metadata
GROUP BY
1
)
SELECT
c1.contract_address AS contract_address,
token_name,
token_decimals::integer as token_decimals,
token_symbol,
_inserted_timestamp
FROM
contracts c1
LEFT JOIN token_names
ON c1.contract_address = token_names.contract_address
LEFT JOIN token_symbols
ON c1.contract_address = token_symbols.contract_address
LEFT JOIN token_decimals
ON c1.contract_address = token_decimals.contract_address
AND dec_length < 3 qualify(ROW_NUMBER() over(PARTITION BY c1.contract_address
ORDER BY
_inserted_timestamp DESC)) = 1

View File

@ -0,0 +1,24 @@
version: 2
models:
- name: silver__contracts
tests:
- dbt_utils.unique_combination_of_columns:
combination_of_columns:
- CONTRACT_ADDRESS
columns:
- name: CONTRACT_ADDRESS
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- STRING
- VARCHAR
- name: _INSERTED_TIMESTAMP
tests:
- not_null
- dbt_expectations.expect_row_values_to_have_recent_data:
datepart: day
interval: 2
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- TIMESTAMP_NTZ

View File

@ -28,4 +28,10 @@ sources:
schema: SILVER
tables:
- name: prices_v2
- name: bronze_api
database: avalanche
schema: bronze_api
tables:
- name: contract_abis
- name: token_reads