task to model (#54)

This commit is contained in:
Austin 2022-12-05 15:17:47 -05:00 committed by GitHub
parent 9850f74212
commit 52fd64995c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 234 additions and 186 deletions

View File

@ -1,69 +0,0 @@
{% macro task_get_abis() %}
{% set sql %}
EXECUTE IMMEDIATE
'create or replace task bronze_api.get_block_explorer_abis
warehouse = DBT_CLOUD_OPTIMISM
allow_overlapping_execution = false
schedule = \'300 minute\'
as
BEGIN
INSERT INTO
bronze_api.contract_abis(
contract_address,
abi_data,
_inserted_timestamp
)
WITH api_keys AS (
SELECT
api_key
FROM
crosschain.silver.apis_keys
WHERE
api_name = \'op-etherscan\'
),
base AS (
SELECT
contract_address
FROM
silver.relevant_abi_contracts
EXCEPT
SELECT
contract_address
FROM
bronze_api.contract_abis
WHERE
abi_data :data :result :: STRING <> \'Max rate limit reached\'
LIMIT
100
)
SELECT
contract_address,
ethereum.streamline.udf_api(
\'GET\',
CONCAT(
\'https://api-optimistic.etherscan.io/api?module=contract&action=getabi&address=\',
contract_address,
\'&apikey=\',
api_key
),{},{}
) AS abi_data,
SYSDATE()
FROM
base
LEFT JOIN api_keys
ON 1 = 1
where exists (select 1 from base limit 1);
END;'
{% endset %}
{% do run_query(sql) %}
{% if target.database.upper() == 'OPTIMISM' %}
{% set sql %}
alter task bronze_api.get_block_explorer_abis resume;
{% endset %}
{% do run_query(sql) %}
{% endif %}
{% endmacro %}

View File

@ -1,94 +0,0 @@
{% macro task_get_token_reads() %}
{% set sql %}
EXECUTE IMMEDIATE
'create or replace task bronze_api.get_token_reads
warehouse = DBT_CLOUD_OPTIMISM
allow_overlapping_execution = false
schedule = \'60 minute\'
as
BEGIN
INSERT INTO
bronze_api.token_reads(
contract_address,
block_number,
function_sig,
function_input,
read_result,
_inserted_timestamp
)
with base as (
select
contract_address,
created_block
from silver.relevant_token_contracts
where contract_address not in (select contract_address from bronze_api.token_reads)
limit 500
)
, function_sigs as (
select \'0x313ce567\' as function_sig, \'decimals\' as function_name
union select \'0x06fdde03\', \'name\'
union select \'0x95d89b41\', \'symbol\'
),
all_reads as (
select *
from base
join function_sigs on 1=1
),
ready_reads as (
select
contract_address,
created_block,
function_sig,
concat(\'[!\',contract_address,\'!,\', created_block, \',!\',function_sig,\'!,!!]\') as read_input1,
replace(read_input1, $$!$$,$$\'$$) as read_input
from all_reads
)
, batch_reads as (
select concat(\'[\',listagg(read_input,\',\'),\']\') as batch_read
from ready_reads
),
results as (
select
ethereum.streamline.udf_json_rpc_read_calls(
node_url,
headers,
parse_json(batch_read)
) as read_output
from batch_reads
join streamline.crosschain.node_mapping
on 1=1 and chain = \'optimism\'
where exists (select 1 from ready_reads limit 1)
)
, final as (
select
value:id::string as read_id,
value:result::string as read_result,
split(read_id,\'-\') as read_id_object,
read_id_object[0]::string as contract_address,
read_id_object[1]::string as block_number,
read_id_object[2]::string as function_sig,
read_id_object[3]::string as function_input
from results,
lateral flatten(input=> read_output[0]:data)
)
select
contract_address,
block_number,
function_sig,
function_input,
read_result,
sysdate()::timestamp as _inserted_timestamp
from final;
end;'
{% endset %}
{% do run_query(sql) %}
{% if target.database.upper() == 'OPTIMISM' %}
{% set sql %}
alter task bronze_api.get_token_reads resume;
{% endset %}
{% do run_query(sql) %}
{% endif %}
{% endmacro %}

View File

@ -0,0 +1,61 @@
{{ config(
materialized = 'incremental',
unique_key = "contract_address",
full_refresh = false
) }}
WITH api_keys AS (
SELECT
api_key
FROM
{{ source(
'silver_crosschain',
'apis_keys'
) }}
WHERE
api_name = 'op-etherscan'
),
base AS (
SELECT
contract_address
FROM
{{ ref('silver__relevant_abi_contracts') }}
{% if is_incremental() %}
EXCEPT
SELECT
contract_address
FROM
{{ this }}
WHERE
abi_data :data :result :: STRING <> 'Max rate limit reached'
{% endif %}
LIMIT
100
)
SELECT
contract_address,
ethereum.streamline.udf_api(
'GET',
CONCAT(
'https://api-optimistic.etherscan.io/api?module=contract&action=getabi&address=',
contract_address,
'&apikey=',
api_key
),{},{}
) AS abi_data,
SYSDATE() AS _inserted_timestamp
FROM
base
LEFT JOIN api_keys
ON 1 = 1
WHERE
EXISTS (
SELECT
1
FROM
base
LIMIT
1
)

View File

@ -0,0 +1,24 @@
version: 2
models:
- name: bronze_api__contract_abis
columns:
- name: _INSERTED_TIMESTAMP
tests:
- not_null
- dbt_expectations.expect_row_values_to_have_recent_data:
datepart: day
interval: 1
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- TIMESTAMP_NTZ
- name: CONTRACT_ADDRESS
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- VARCHAR
- dbt_expectations.expect_column_values_to_match_regex:
regex: "^(0x)[0-9a-fA-F]{40}$"
- unique:
where: "abi_data :data :result :: STRING <> 'Max rate limit reached'"

View File

@ -0,0 +1,120 @@
{{ config(
materialized = 'incremental',
unique_key = "contract_address",
full_refresh = false
) }}
WITH base AS (
SELECT
contract_address,
created_block
FROM
{{ ref('silver__relevant_token_contracts') }}
{% if is_incremental() %}
WHERE
contract_address NOT IN (
SELECT
contract_address
FROM
{{ this }}
)
{% endif %}
LIMIT
500
), function_sigs AS (
SELECT
'0x313ce567' AS function_sig,
'decimals' AS function_name
UNION
SELECT
'0x06fdde03',
'name'
UNION
SELECT
'0x95d89b41',
'symbol'
),
all_reads AS (
SELECT
*
FROM
base
JOIN function_sigs
ON 1 = 1
),
ready_reads AS (
SELECT
contract_address,
created_block,
function_sig,
CONCAT(
'[\'',
contract_address,
'\',',
created_block,
',\'',
function_sig,
'\',\'\']'
) AS read_input
FROM
all_reads
),
batch_reads AS (
SELECT
CONCAT('[', LISTAGG(read_input, ','), ']') AS batch_read
FROM
ready_reads
),
results AS (
SELECT
ethereum.streamline.udf_json_rpc_read_calls(
node_url,
headers,
PARSE_JSON(batch_read)
) AS read_output
FROM
batch_reads
JOIN {{ source(
'streamline_crosschain',
'node_mapping'
) }}
ON 1 = 1
AND chain = 'optimism'
WHERE
EXISTS (
SELECT
1
FROM
ready_reads
LIMIT
1
)
), FINAL AS (
SELECT
VALUE :id :: STRING AS read_id,
VALUE :result :: STRING AS read_result,
SPLIT(
read_id,
'-'
) AS read_id_object,
read_id_object [0] :: STRING AS contract_address,
read_id_object [1] :: STRING AS block_number,
read_id_object [2] :: STRING AS function_sig,
read_id_object [3] :: STRING AS function_input
FROM
results,
LATERAL FLATTEN(
input => read_output [0] :data
)
)
SELECT
contract_address,
block_number,
function_sig,
function_input,
read_result,
SYSDATE() :: TIMESTAMP AS _inserted_timestamp
FROM
FINAL

View File

@ -0,0 +1,18 @@
version: 2
models:
- name: bronze_api__token_reads
tests:
- dbt_utils.unique_combination_of_columns:
combination_of_columns:
- CONTRACT_ADDRESS
- FUNCTION_SIG
columns:
- name: _INSERTED_TIMESTAMP
tests:
- not_null
- dbt_expectations.expect_row_values_to_have_recent_data:
datepart: day
interval: 1
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- TIMESTAMP_NTZ

View File

@ -6,16 +6,9 @@
WITH base AS (
SELECT
contract_address,
COUNT(*) AS total_events
FROM
{{ ref('silver__logs') }}
WHERE
tx_status = 'SUCCESS'
GROUP BY
contract_address
HAVING
total_events >= 25
FROM
{{ ref('silver__relevant_token_contracts') }}
),
proxies AS (
SELECT

View File

@ -7,7 +7,7 @@ SELECT
contract_address,
'optimism' AS blockchain,
COUNT(*) AS transfers,
MIN(block_number) AS created_block
MIN(block_number) + 1 AS created_block
FROM
{{ ref('silver__logs') }}
GROUP BY

View File

@ -12,10 +12,7 @@ WITH base_metadata AS (
read_result AS read_output,
_inserted_timestamp
FROM
{{ source(
'bronze_api',
'token_reads'
) }}
{{ ref('bronze_api__token_reads') }}
WHERE
read_result IS NOT NULL
AND read_result <> '0x'
@ -125,7 +122,7 @@ contracts AS (
SELECT
c1.contract_address AS contract_address,
token_name,
token_decimals::integer as token_decimals,
token_decimals :: INTEGER AS token_decimals,
token_symbol,
_inserted_timestamp
FROM

View File

@ -53,14 +53,12 @@ sources:
- name: asset_metadata_coin_market_cap
- name: hourly_prices_coin_gecko
- name: hourly_prices_coin_market_cap
- name: bronze_api
database: optimism
schema: bronze_api
tables:
- name: contract_abis
- name: token_reads
- name: apis_keys
- name: streamline_crosschain
database: streamline
schema: crosschain
tables:
- name: node_mapping