vault secrets (#256)

* vault secrets

* batching
This commit is contained in:
Austin 2024-02-28 17:25:26 -05:00 committed by GitHub
parent 6c94cc2193
commit cca738dc7f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
11 changed files with 698 additions and 564 deletions

View File

@ -46,7 +46,7 @@ row_nos AS (
),
batched AS ({% for item in range(150) %}
SELECT
rn.contract_address, live.udf_api('GET', CONCAT('https://api-optimistic.etherscan.io/api?module=contract&action=getabi&address=', rn.contract_address, '&apikey={op_key}'),{ 'User-Agent': 'FlipsideStreamline' },{},'EXPLORER') AS abi_data, SYSDATE() AS _inserted_timestamp
rn.contract_address, live.udf_api('GET', CONCAT('https://api-optimistic.etherscan.io/api?module=contract&action=getabi&address=', rn.contract_address, '&apikey={key}'),{ 'User-Agent': 'FlipsideStreamline' },{}, 'Vault/prod/block_explorers/op_etherscan') AS abi_data, SYSDATE() AS _inserted_timestamp
FROM
row_nos rn
WHERE

View File

@ -50,39 +50,45 @@ ready_reads AS (
contract_address,
latest_block,
function_sig,
CONCAT(
'[\'',
contract_address,
'\',',
latest_block,
',\'',
RPAD(
function_sig,
'\',\'\']'
) AS read_input
64,
'0'
) AS input,
utils.udf_json_rpc_call(
'eth_call',
[{'to': contract_address, 'from': null, 'data': input}, utils.udf_int_to_hex(latest_block)],
concat_ws(
'-',
contract_address,
input,
latest_block
)
) AS rpc_request
FROM
all_reads
),
batch_reads AS (
SELECT
CONCAT('[', LISTAGG(read_input, ','), ']') AS batch_read
ARRAY_AGG(rpc_request) AS batch_rpc_request
FROM
ready_reads
),
results AS (
node_call AS (
SELECT
ethereum.streamline.udf_json_rpc_read_calls(
node_url,
headers,
PARSE_JSON(batch_read)
) AS read_output
*,
live.udf_api(
'POST',
CONCAT(
'{service}',
'/',
'{Authentication}'
),{},
batch_rpc_request,
'Vault/prod/optimism/quicknode/mainnet'
) AS response
FROM
batch_reads
JOIN {{ source(
'streamline_crosschain',
'node_mapping'
) }}
ON 1 = 1
AND chain = 'optimism'
WHERE
EXISTS (
SELECT
@ -92,30 +98,30 @@ results AS (
LIMIT
1
)
), FINAL AS (
), flat_responses AS (
SELECT
VALUE :id :: STRING AS read_id,
VALUE :result :: STRING AS read_result,
SPLIT(
read_id,
'-'
) AS read_id_object,
read_id_object [0] :: STRING AS contract_address,
read_id_object [1] :: STRING AS block_number,
read_id_object [2] :: STRING AS function_sig,
read_id_object [3] :: STRING AS function_input
VALUE :id :: STRING AS call_id,
VALUE :result :: STRING AS read_result
FROM
results,
LATERAL FLATTEN(
input => read_output [0] :data
node_call,
LATERAL FLATTEN (
input => response :data
)
)
SELECT
contract_address,
block_number,
function_sig,
function_input,
SPLIT_PART(
call_id,
'-',
1
) AS contract_address,
SPLIT_PART(
call_id,
'-',
3
) AS block_number,
LEFT(SPLIT_PART(call_id, '-', 2), 10) AS function_sig,
NULL AS function_input,
read_result,
SYSDATE() :: TIMESTAMP AS _inserted_timestamp
FROM
FINAL
flat_responses

View File

@ -70,18 +70,18 @@ contract_reads AS (
[{ 'to': contract_address, 'from': null, 'data': data }, utils.udf_int_to_hex(block_number) ]
) AS rpc_request,
live.udf_api(
node_url,
rpc_request
'POST',
CONCAT(
'{service}',
'/',
'{Authentication}'
),{},
rpc_request,
'Vault/prod/optimism/quicknode/mainnet'
) AS read_output,
SYSDATE() AS _inserted_timestamp
FROM
inputs
JOIN {{ source(
'streamline_crosschain',
'node_mapping'
) }}
ON 1 = 1
AND chain = 'optimism'
),
reads_flat AS (
SELECT

View File

@ -62,18 +62,18 @@ contract_reads AS (
[{ 'to': amm_wrapper_address, 'from': null, 'data': data }, utils.udf_int_to_hex(block_number) ]
) AS rpc_request,
live.udf_api(
node_url,
rpc_request
'POST',
CONCAT(
'{service}',
'/',
'{Authentication}'
),{},
rpc_request,
'Vault/prod/optimism/quicknode/mainnet'
) AS read_output,
SYSDATE() AS _inserted_timestamp
FROM
inputs
JOIN {{ source(
'streamline_crosschain',
'node_mapping'
) }}
ON 1 = 1
AND chain = 'optimism'
),
reads_flat AS (
SELECT

View File

@ -75,18 +75,18 @@ contract_reads AS (
[{ 'to': contract_address, 'from': null, 'data': data }, utils.udf_int_to_hex(block_number) ]
) AS rpc_request,
live.udf_api(
node_url,
rpc_request
'POST',
CONCAT(
'{service}',
'/',
'{Authentication}'
),{},
rpc_request,
'Vault/prod/optimism/quicknode/mainnet'
) AS read_output,
SYSDATE() AS _inserted_timestamp
FROM
inputs
JOIN {{ source(
'streamline_crosschain',
'node_mapping'
) }}
ON 1 = 1
AND chain = 'optimism'
),
reads_flat AS (
SELECT

View File

@ -15,15 +15,23 @@ WITH pools_registered AS (
tx_hash,
contract_address,
topics [1] :: STRING AS pool_id,
SUBSTR(topics [1] :: STRING,1,42) AS pool_address,
SUBSTR(
topics [1] :: STRING,
1,
42
) AS pool_address,
_log_id,
_inserted_timestamp,
ROW_NUMBER() OVER (ORDER BY pool_address) AS row_num
ROW_NUMBER() over (
ORDER BY
pool_address
) AS row_num
FROM
{{ ref('silver__logs') }}
WHERE
topics[0]::STRING = '0x3c13bc30b8e878c53fd2a36b679409c073afd75950be43d8858768e956fbc20e' --PoolRegistered
topics [0] :: STRING = '0x3c13bc30b8e878c53fd2a36b679409c073afd75950be43d8858768e956fbc20e' --PoolRegistered
AND contract_address = '0xba12222222228d8ba445958a75a0704d566bf2c8'
{% if is_incremental() %}
AND _inserted_timestamp >= (
SELECT
@ -32,120 +40,117 @@ AND _inserted_timestamp >= (
{{ this }}
)
{% endif %}
),
tokens_registered AS (
SELECT
block_number,
block_timestamp,
event_index,
tx_hash,
contract_address,
decoded_flat :poolId :: STRING AS pool_id,
decoded_flat :tokens AS tokens,
tokens[0] :: STRING AS token0,
tokens[1] :: STRING AS token1,
tokens[2] :: STRING AS token2,
tokens[3] :: STRING AS token3,
tokens[4] :: STRING AS token4,
tokens[5] :: STRING AS token5,
tokens[6] :: STRING AS token6,
tokens[7] :: STRING AS token7,
decoded_flat :assetManagers AS asset_managers,
_log_id,
_inserted_timestamp
FROM {{ ref('silver__decoded_logs') }}
WHERE
topics[0]::STRING = '0xf5847d3f2197b16cdcd2098ec95d0905cd1abdaf415f07bb7cef2bba8ac5dec4' --TokensRegistered
AND contract_address = '0xba12222222228d8ba445958a75a0704d566bf2c8'
AND tx_hash IN (
SELECT
tx_hash
FROM
pools_registered
)
),
function_sigs AS (
SELECT
'0x06fdde03' AS function_sig,
'name' AS function_name
UNION ALL
SELECT
'0x95d89b41' AS function_sig,
'symbol' AS function_name
UNION ALL
SELECT
'0x313ce567' AS function_sig,
'decimals' AS function_name
),
inputs_pools AS (
SELECT
pool_address,
block_number,
function_sig,
(ROW_NUMBER() OVER (PARTITION BY pool_address
ORDER BY block_number)) - 1 AS function_input
FROM pools_registered
JOIN function_sigs ON 1=1
),
pool_token_reads AS (
{% for item in range(50) %}
(
SELECT
ethereum.streamline.udf_json_rpc_read_calls(
node_url,
headers,
PARSE_JSON(batch_read)
) AS read_output,
SYSDATE() AS _inserted_timestamp
FROM (
SELECT
CONCAT('[', LISTAGG(read_input, ','), ']') AS batch_read
FROM (
SELECT
pool_address,
block_number,
function_sig,
function_input,
CONCAT(
'[\'',
pool_address,
'\',',
block_number,
',\'',
function_sig,
'\',\'',
function_input,
'\']'
) AS read_input,
row_num
FROM inputs_pools
LEFT JOIN pools_registered USING(pool_address)
) ready_reads_pools
WHERE row_num BETWEEN {{ item * 50 + 1 }} AND {{ (item + 1) * 50}}
) batch_reads_pools
JOIN {{ source(
'streamline_crosschain',
'node_mapping'
) }} ON 1=1
AND chain = 'optimism'
) {% if not loop.last %}
UNION ALL
{% endif %}
{% endfor %}
block_number,
block_timestamp,
event_index,
tx_hash,
contract_address,
decoded_flat :poolId :: STRING AS pool_id,
decoded_flat :tokens AS tokens,
tokens [0] :: STRING AS token0,
tokens [1] :: STRING AS token1,
tokens [2] :: STRING AS token2,
tokens [3] :: STRING AS token3,
tokens [4] :: STRING AS token4,
tokens [5] :: STRING AS token5,
tokens [6] :: STRING AS token6,
tokens [7] :: STRING AS token7,
decoded_flat :assetManagers AS asset_managers,
_log_id,
_inserted_timestamp
FROM
{{ ref('silver__decoded_logs') }}
WHERE
topics [0] :: STRING = '0xf5847d3f2197b16cdcd2098ec95d0905cd1abdaf415f07bb7cef2bba8ac5dec4' --TokensRegistered
AND contract_address = '0xba12222222228d8ba445958a75a0704d566bf2c8'
AND tx_hash IN (
SELECT
tx_hash
FROM
pools_registered
)
),
reads_adjusted AS (
SELECT
function_sigs AS (
SELECT
'0x06fdde03' AS function_sig,
'name' AS function_name
UNION ALL
SELECT
'0x95d89b41' AS function_sig,
'symbol' AS function_name
UNION ALL
SELECT
'0x313ce567' AS function_sig,
'decimals' AS function_name
),
inputs_pools AS (
SELECT
pool_address,
block_number,
function_sig
FROM
pools_registered
JOIN function_sigs
ON 1 = 1
),
build_rpc_requests AS (
SELECT
pool_address,
block_number,
function_sig,
RPAD(
function_sig,
64,
'0'
) AS input,
utils.udf_json_rpc_call(
'eth_call',
[{'to': pool_address, 'from': null, 'data': input}, utils.udf_int_to_hex(block_number)],
concat_ws(
'-',
pool_address,
input,
block_number
)
) AS rpc_request
FROM
inputs_pools
),
batch_reads AS (
SELECT
ARRAY_AGG(rpc_request) AS batch_rpc_request
FROM
build_rpc_requests
),
node_call AS (
SELECT
*,
live.udf_api(
'POST',
CONCAT(
'{service}',
'/',
'{Authentication}'
),{},
batch_rpc_request,
'Vault/prod/optimism/quicknode/mainnet'
) AS response
FROM
batch_reads
WHERE
EXISTS (
SELECT
1
FROM
build_rpc_requests
LIMIT
1
)
), reads_adjusted AS (
SELECT
VALUE :id :: STRING AS read_id,
VALUE :result :: STRING AS read_result,
SPLIT(
@ -153,44 +158,58 @@ SELECT
'-'
) AS read_id_object,
read_id_object [0] :: STRING AS pool_address,
read_id_object [1] :: STRING AS block_number,
read_id_object [2] :: STRING AS function_sig,
read_id_object [3] :: STRING AS function_input,
_inserted_timestamp
FROM
pool_token_reads,
LATERAL FLATTEN(
input => read_output [0] :data
)
read_id_object [2] :: STRING AS block_number,
LEFT(
read_id_object [1] :: STRING,
10
) AS function_sig
FROM
node_call,
LATERAL FLATTEN (
input => response :data
)
),
pool_details AS (
SELECT
pool_address,
function_sig,
function_name,
read_result,
regexp_substr_all(SUBSTR(read_result, 3, len(read_result)), '.{64}') AS segmented_output,
_inserted_timestamp
FROM reads_adjusted
LEFT JOIN function_sigs USING(function_sig)
),
SELECT
pool_address,
function_sig,
function_name,
read_result,
regexp_substr_all(SUBSTR(read_result, 3, len(read_result)), '.{64}') AS segmented_output,
SYSDATE() AS _inserted_timestamp
FROM
reads_adjusted
LEFT JOIN function_sigs USING(function_sig)
),
FINAL AS (
SELECT
pool_address,
MIN(CASE WHEN function_name = 'symbol' THEN utils.udf_hex_to_string(segmented_output [2] :: STRING) END) AS pool_symbol,
MIN(CASE WHEN function_name = 'name' THEN utils.udf_hex_to_string(segmented_output [2] :: STRING) END) AS pool_name,
MIN(CASE
WHEN read_result::STRING = '0x' THEN NULL
ELSE utils.udf_hex_to_int(LEFT(read_result::STRING,66))
END)::INTEGER AS pool_decimals,
MAX(_inserted_timestamp) AS _inserted_timestamp
FROM pool_details
GROUP BY 1
SELECT
pool_address,
MIN(
CASE
WHEN function_name = 'symbol' THEN utils.udf_hex_to_string(
segmented_output [2] :: STRING
)
END
) AS pool_symbol,
MIN(
CASE
WHEN function_name = 'name' THEN utils.udf_hex_to_string(
segmented_output [2] :: STRING
)
END
) AS pool_name,
MIN(
CASE
WHEN read_result :: STRING = '0x' THEN NULL
ELSE utils.udf_hex_to_int(LEFT(read_result :: STRING, 66))
END
) :: INTEGER AS pool_decimals,
MAX(_inserted_timestamp) AS _inserted_timestamp
FROM
pool_details
GROUP BY
1
)
SELECT
p.block_number,
p.block_timestamp,
@ -213,7 +232,11 @@ SELECT
t.asset_managers,
p._log_id,
f._inserted_timestamp
FROM FINAL f
LEFT JOIN pools_registered p ON f.pool_address = p.pool_address
LEFT JOIN tokens_registered t ON p.pool_id = t.pool_id
WHERE t.token0 IS NOT NULL
FROM
FINAL f
LEFT JOIN pools_registered p
ON f.pool_address = p.pool_address
LEFT JOIN tokens_registered t
ON p.pool_id = t.pool_id
WHERE
t.token0 IS NOT NULL

View File

@ -8,26 +8,32 @@
WITH contract_deployments AS (
SELECT
tx_hash,
block_number,
block_timestamp,
from_address AS deployer_address,
to_address AS contract_address,
_call_id,
_inserted_timestamp,
ROW_NUMBER() OVER (ORDER BY contract_address) AS row_num
FROM
{{ ref('silver__traces' )}}
WHERE
-- curve contract deployers
from_address IN (
'0x2db0e83599a91b508ac268a6197b8b14f5e72840',
'0x7eeac6cddbd1d0b8af061742d41877d7f707289a',
'0x745748bcfd8f9c2de519a71d789be8a63dd7d66c'
SELECT
tx_hash,
block_number,
block_timestamp,
from_address AS deployer_address,
to_address AS contract_address,
_call_id,
_inserted_timestamp,
ROW_NUMBER() over (
ORDER BY
contract_address
) AS row_num
FROM
{{ ref(
'silver__traces'
) }}
WHERE
-- curve contract deployers
from_address IN (
'0x2db0e83599a91b508ac268a6197b8b14f5e72840',
'0x7eeac6cddbd1d0b8af061742d41877d7f707289a',
'0x745748bcfd8f9c2de519a71d789be8a63dd7d66c'
)
AND TYPE ilike 'create%'
AND TX_STATUS ilike 'success'
AND TYPE ILIKE 'create%'
AND tx_status ILIKE 'success'
{% if is_incremental() %}
AND _inserted_timestamp >= (
SELECT
@ -35,190 +41,228 @@ AND _inserted_timestamp >= (
FROM
{{ this }}
)
{% endif %}
QUALIFY(ROW_NUMBER() OVER(PARTITION BY to_address ORDER BY block_timestamp ASC)) = 1
),
qualify(ROW_NUMBER() over(PARTITION BY to_address
ORDER BY
block_timestamp ASC)) = 1
),
function_sigs AS (
SELECT
'0x87cb4f57' AS function_sig,
'base_coins' AS function_name
UNION ALL
SELECT
'0xb9947eb0' AS function_sig,
'underlying_coins' AS function_name
UNION ALL
SELECT
'0xc6610657' AS function_sig,
'coins' AS function_name
UNION ALL
SELECT
'0x06fdde03' AS function_sig,
'name' AS function_name
UNION ALL
SELECT
'0x95d89b41' AS function_sig,
'symbol' AS function_name
UNION ALL
SELECT
'0x313ce567' AS function_sig,
'decimals' AS function_name
SELECT
'0x87cb4f57' AS function_sig,
'base_coins' AS function_name
UNION ALL
SELECT
'0xb9947eb0' AS function_sig,
'underlying_coins' AS function_name
UNION ALL
SELECT
'0xc6610657' AS function_sig,
'coins' AS function_name
UNION ALL
SELECT
'0x06fdde03' AS function_sig,
'name' AS function_name
UNION ALL
SELECT
'0x95d89b41' AS function_sig,
'symbol' AS function_name
UNION ALL
SELECT
'0x313ce567' AS function_sig,
'decimals' AS function_name
),
function_inputs AS (
SELECT
SEQ4() AS function_input
FROM
TABLE(GENERATOR(rowcount => 8))
),
inputs_coins AS (
SELECT
deployer_address,
contract_address,
block_number,
function_sig,
(ROW_NUMBER() OVER (PARTITION BY contract_address
ORDER BY block_number)) - 1 AS function_input
FROM contract_deployments
JOIN function_sigs ON 1=1
JOIN function_inputs ON 1=1
WHERE function_name = 'coins'
SELECT
deployer_address,
contract_address,
block_number,
function_sig,
(ROW_NUMBER() over (PARTITION BY contract_address
ORDER BY
block_number)) - 1 AS function_input
FROM
contract_deployments
JOIN function_sigs
ON 1 = 1
JOIN function_inputs
ON 1 = 1
WHERE
function_name = 'coins'
),
inputs_base_coins AS (
SELECT
deployer_address,
contract_address,
block_number,
function_sig,
(ROW_NUMBER() OVER (PARTITION BY contract_address
ORDER BY block_number)) - 1 AS function_input
FROM contract_deployments
JOIN function_sigs ON 1=1
JOIN function_inputs ON 1=1
WHERE function_name = 'base_coins'
SELECT
deployer_address,
contract_address,
block_number,
function_sig,
(ROW_NUMBER() over (PARTITION BY contract_address
ORDER BY
block_number)) - 1 AS function_input
FROM
contract_deployments
JOIN function_sigs
ON 1 = 1
JOIN function_inputs
ON 1 = 1
WHERE
function_name = 'base_coins'
),
inputs_underlying_coins AS (
SELECT
deployer_address,
contract_address,
block_number,
function_sig,
(ROW_NUMBER() OVER (PARTITION BY contract_address
ORDER BY block_number)) - 1 AS function_input
FROM contract_deployments
JOIN function_sigs ON 1=1
JOIN function_inputs ON 1=1
WHERE function_name = 'underlying_coins'
SELECT
deployer_address,
contract_address,
block_number,
function_sig,
(ROW_NUMBER() over (PARTITION BY contract_address
ORDER BY
block_number)) - 1 AS function_input
FROM
contract_deployments
JOIN function_sigs
ON 1 = 1
JOIN function_inputs
ON 1 = 1
WHERE
function_name = 'underlying_coins'
),
inputs_pool_details AS (
SELECT
deployer_address,
contract_address,
block_number,
function_sig,
NULL AS function_input
FROM contract_deployments
JOIN function_sigs ON 1=1
WHERE function_name IN ('name','symbol','decimals')
SELECT
deployer_address,
contract_address,
block_number,
function_sig,
NULL AS function_input
FROM
contract_deployments
JOIN function_sigs
ON 1 = 1
WHERE
function_name IN (
'name',
'symbol',
'decimals'
)
),
all_inputs AS (
SELECT
deployer_address,
contract_address,
block_number,
function_sig,
function_input
FROM inputs_coins
UNION ALL
SELECT
deployer_address,
contract_address,
block_number,
function_sig,
function_input
FROM inputs_base_coins
UNION ALL
SELECT
deployer_address,
contract_address,
block_number,
function_sig,
function_input
FROM inputs_underlying_coins
UNION ALL
SELECT
deployer_address,
contract_address,
block_number,
function_sig,
function_input
FROM inputs_pool_details
SELECT
deployer_address,
contract_address,
block_number,
function_sig,
function_input
FROM
inputs_coins
UNION ALL
SELECT
deployer_address,
contract_address,
block_number,
function_sig,
function_input
FROM
inputs_base_coins
UNION ALL
SELECT
deployer_address,
contract_address,
block_number,
function_sig,
function_input
FROM
inputs_underlying_coins
UNION ALL
SELECT
deployer_address,
contract_address,
block_number,
function_sig,
function_input
FROM
inputs_pool_details
),
build_rpc_requests AS (
SELECT
deployer_address,
contract_address,
block_number,
function_sig,
function_input,
CONCAT(
function_sig,
LPAD(IFNULL(function_input, 0), 64, '0')
) AS input,
utils.udf_json_rpc_call(
'eth_call',
[{'to': contract_address, 'from': null, 'data':input }, utils.udf_int_to_hex(block_number)],
concat_ws(
'-',
contract_address,
input,
block_number
)
) AS rpc_request,
row_num,
CEIL(
row_num / 50
) AS batch_no
FROM
all_inputs
LEFT JOIN contract_deployments USING(contract_address)
),
pool_token_reads AS (
{% for item in range(10) %}
(
SELECT
ethereum.streamline.udf_json_rpc_read_calls(
node_url,
headers,
PARSE_JSON(batch_read)
) AS read_output,
SYSDATE() AS _inserted_timestamp
FROM (
{% if is_incremental() %}
{% for item in range(6) %}
(
SELECT
CONCAT('[', LISTAGG(read_input, ','), ']') AS batch_read
FROM (
SELECT
deployer_address,
contract_address,
block_number,
function_sig,
function_input,
CONCAT(
'[\'',
contract_address,
'\',',
block_number,
',\'',
function_sig,
'\',\'',
(CASE WHEN function_input IS NULL THEN '' ELSE function_input::STRING END),
'\']'
) AS read_input,
row_num
FROM all_inputs
LEFT JOIN contract_deployments USING(contract_address)
) ready_reads_pools
WHERE row_num BETWEEN {{ item * 50 + 1 }} AND {{ (item + 1) * 50}}
) batch_reads_pools
JOIN {{ source(
'streamline_crosschain',
'node_mapping'
) }} ON 1=1
AND chain = 'optimism'
) {% if not loop.last %}
UNION ALL
{% endif %}
{% endfor %}
),
live.udf_api('POST', CONCAT('{service}', '/', '{Authentication}'),{}, batch_rpc_request, 'Vault/prod/optimism/quicknode/mainnet') AS read_output, SYSDATE() AS _inserted_timestamp
FROM
(
SELECT
ARRAY_AGG(rpc_request) batch_rpc_request
FROM
build_rpc_requests
WHERE
batch_no = {{ item }} + 1
AND batch_no IN (
SELECT
DISTINCT batch_no
FROM
build_rpc_requests))) {% if not loop.last %}
UNION ALL
{% endif %}
{% endfor %}
{% else %}
{% for item in range(60) %}
(
SELECT
live.udf_api('POST', CONCAT('{service}', '/', '{Authentication}'),{}, batch_rpc_request, 'Vault/prod/optimism/quicknode/mainnet') AS read_output, SYSDATE() AS _inserted_timestamp
FROM
(
SELECT
ARRAY_AGG(rpc_request) batch_rpc_request
FROM
build_rpc_requests
WHERE
batch_no = {{ item }} + 1
AND batch_no IN (
SELECT
DISTINCT batch_no
FROM
build_rpc_requests))) {% if not loop.last %}
UNION ALL
{% endif %}
{% endfor %}
{% endif %}),
reads_adjusted AS (
SELECT
SELECT
VALUE :id :: STRING AS read_id,
VALUE :result :: STRING AS read_result,
SPLIT(
@ -226,117 +270,159 @@ SELECT
'-'
) AS read_id_object,
read_id_object [0] :: STRING AS contract_address,
read_id_object [1] :: STRING AS block_number,
read_id_object [2] :: STRING AS function_sig,
read_id_object [3] :: STRING AS function_input,
read_id_object [2] :: STRING AS block_number,
LEFT(
read_id_object [1] :: STRING,
10
) AS function_sig,
RIGHT(
read_id_object [1],
LENGTH(
read_id_object [1] - 10
)
) :: INT AS function_input,
_inserted_timestamp
FROM
pool_token_reads,
LATERAL FLATTEN(
input => read_output [0] :data
)
FROM
pool_token_reads,
LATERAL FLATTEN(
input => read_output :data
)
),
tokens AS (
SELECT
contract_address,
function_sig,
function_name,
function_input,
read_result,
regexp_substr_all(SUBSTR(read_result, 3, len(read_result)), '.{64}')[0]AS segmented_token_address,
_inserted_timestamp
FROM reads_adjusted
LEFT JOIN function_sigs USING(function_sig)
WHERE function_name IN ('coins','base_coins','underlying_coins')
AND read_result IS NOT NULL
),
pool_details AS (
SELECT
contract_address,
function_sig,
function_name,
function_input,
read_result,
regexp_substr_all(SUBSTR(read_result, 3, len(read_result)), '.{64}') AS segmented_output,
_inserted_timestamp
FROM reads_adjusted
LEFT JOIN function_sigs USING(function_sig)
WHERE function_name IN ('name','symbol','decimals')
AND read_result IS NOT NULL
),
all_pools AS (
SELECT
t.contract_address AS pool_address,
CONCAT('0x',SUBSTRING(t.segmented_token_address,25,40)) AS token_address,
function_input AS token_id,
function_name AS token_type,
MIN(CASE WHEN p.function_name = 'symbol' THEN utils.udf_hex_to_string(RTRIM(p.segmented_output [2] :: STRING,0)) END) AS pool_symbol,
MIN(CASE WHEN p.function_name = 'name' THEN CONCAT(utils.udf_hex_to_string(p.segmented_output [2] :: STRING),
utils.udf_hex_to_string(segmented_output [3] :: STRING)) END) AS pool_name,
MIN(CASE
WHEN p.read_result::STRING = '0x' THEN NULL
ELSE utils.udf_hex_to_int(LEFT(p.read_result::STRING,66))
END)::INTEGER AS pool_decimals,
CONCAT(
t.contract_address,
'-',
CONCAT('0x',SUBSTRING(t.segmented_token_address,25,40)),
'-',
SELECT
contract_address,
function_sig,
function_name,
function_input,
'-',
function_name
) AS pool_id,
MAX(t._inserted_timestamp) AS _inserted_timestamp
FROM tokens t
LEFT JOIN pool_details p USING(contract_address)
WHERE token_address IS NOT NULL
AND token_address <> '0x0000000000000000000000000000000000000000'
GROUP BY 1,2,3,4
read_result,
regexp_substr_all(SUBSTR(read_result, 3, len(read_result)), '.{64}') [0] AS segmented_token_address,
_inserted_timestamp
FROM
reads_adjusted
LEFT JOIN function_sigs USING(function_sig)
WHERE
function_name IN (
'coins',
'base_coins',
'underlying_coins'
)
AND read_result IS NOT NULL
),
pool_details AS (
SELECT
contract_address,
function_sig,
function_name,
function_input,
read_result,
regexp_substr_all(SUBSTR(read_result, 3, len(read_result)), '.{64}') AS segmented_output,
_inserted_timestamp
FROM
reads_adjusted
LEFT JOIN function_sigs USING(function_sig)
WHERE
function_name IN (
'name',
'symbol',
'decimals'
)
AND read_result IS NOT NULL
),
all_pools AS (
SELECT
t.contract_address AS pool_address,
CONCAT('0x', SUBSTRING(t.segmented_token_address, 25, 40)) AS token_address,
function_input AS token_id,
function_name AS token_type,
MIN(
CASE
WHEN p.function_name = 'symbol' THEN utils.udf_hex_to_string(RTRIM(p.segmented_output [2] :: STRING, 0))
END
) AS pool_symbol,
MIN(
CASE
WHEN p.function_name = 'name' THEN CONCAT(
utils.udf_hex_to_string(
p.segmented_output [2] :: STRING
),
utils.udf_hex_to_string(
segmented_output [3] :: STRING
)
)
END
) AS pool_name,
MIN(
CASE
WHEN p.read_result :: STRING = '0x' THEN NULL
ELSE utils.udf_hex_to_int(LEFT(p.read_result :: STRING, 66))
END
) :: INTEGER AS pool_decimals,
CONCAT(
t.contract_address,
'-',
CONCAT('0x', SUBSTRING(t.segmented_token_address, 25, 40)),
'-',
function_input,
'-',
function_name
) AS pool_id,
MAX(
t._inserted_timestamp
) AS _inserted_timestamp
FROM
tokens t
LEFT JOIN pool_details p USING(contract_address)
WHERE
token_address IS NOT NULL
AND token_address <> '0x0000000000000000000000000000000000000000'
GROUP BY
1,
2,
3,
4
),
FINAL AS (
SELECT
block_number,
block_timestamp,
tx_hash,
deployer_address,
pool_address,
CASE
WHEN token_address = '0xeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeee' THEN '0x4200000000000000000000000000000000000006'
ELSE token_address
END AS token_address,
token_id,
token_type,
CASE
WHEN token_address = '0xeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeee' THEN 'WETH'
WHEN pool_symbol IS NULL THEN c.token_symbol
ELSE pool_symbol
END AS pool_symbol,
pool_name,
CASE
WHEN token_address = '0xeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeee' THEN '18'
WHEN pool_decimals IS NULL THEN c.token_decimals
ELSE pool_decimals
END AS pool_decimals,
pool_id,
_call_id,
a._inserted_timestamp
FROM all_pools a
LEFT JOIN {{ ref('silver__contracts') }} c
ON a.token_address = c.contract_address
LEFT JOIN contract_deployments d
ON a.pool_address = d.contract_address
QUALIFY(ROW_NUMBER() OVER(PARTITION BY pool_address, token_address ORDER BY a._inserted_timestamp DESC)) = 1
SELECT
block_number,
block_timestamp,
tx_hash,
deployer_address,
pool_address,
CASE
WHEN token_address = '0xeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeee' THEN '0x4200000000000000000000000000000000000006'
ELSE token_address
END AS token_address,
token_id,
token_type,
CASE
WHEN token_address = '0xeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeee' THEN 'WETH'
WHEN pool_symbol IS NULL THEN C.token_symbol
ELSE pool_symbol
END AS pool_symbol,
pool_name,
CASE
WHEN token_address = '0xeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeee' THEN '18'
WHEN pool_decimals IS NULL THEN C.token_decimals
ELSE pool_decimals
END AS pool_decimals,
pool_id,
_call_id,
A._inserted_timestamp
FROM
all_pools A
LEFT JOIN {{ ref('silver__contracts') }} C
ON A.token_address = C.contract_address
LEFT JOIN contract_deployments d
ON A.pool_address = d.contract_address qualify(ROW_NUMBER() over(PARTITION BY pool_address, token_address
ORDER BY
A._inserted_timestamp DESC)) = 1
)
SELECT
*,
ROW_NUMBER() OVER (PARTITION BY pool_address ORDER BY token_address ASC) AS token_num
FROM FINAL
ROW_NUMBER() over (
PARTITION BY pool_address
ORDER BY
token_address ASC
) AS token_num
FROM
FINAL

View File

@ -2,8 +2,8 @@
materialized = 'incremental',
incremental_strategy = 'delete+insert',
unique_key = 'created_block',
tags = ['curated'],
full_refresh = false
full_refresh = false,
tags = ['curated']
) }}
WITH pool_creation AS (
@ -31,7 +31,12 @@ WITH pool_creation AS (
AND contract_address = '0x25cbddb98b35ab1ff77413456b31ec81a6b6b746' --velo deployer
{% if is_incremental() %}
AND _inserted_timestamp >= (
SELECT
MAX(_inserted_timestamp) - INTERVAL '12 hours'
FROM
{{ this }}
)
{% endif %}
),
function_sigs AS (
@ -52,9 +57,6 @@ all_inputs AS (
pool_address AS contract_address,
block_number,
function_sig,
(ROW_NUMBER() over (PARTITION BY pool_address
ORDER BY
block_number)) - 1 AS function_input,
'pool' AS address_label
FROM
pool_creation
@ -65,9 +67,6 @@ all_inputs AS (
token0_address AS contract_address,
block_number,
function_sig,
(ROW_NUMBER() over (PARTITION BY token0_address
ORDER BY
block_number)) - 1 AS function_input,
'token0' AS address_label
FROM
pool_creation
@ -78,64 +77,88 @@ all_inputs AS (
token1_address AS contract_address,
block_number,
function_sig,
(ROW_NUMBER() over (PARTITION BY token1_address
ORDER BY
block_number)) - 1 AS function_input,
'token1' AS address_label
FROM
pool_creation
JOIN function_sigs
ON 1 = 1
),
ready_reads_all AS (
build_rpc_requests AS (
SELECT
contract_address,
block_number,
function_sig,
function_input,
CONCAT(
'[\'',
contract_address,
'\',',
block_number,
',\'',
RPAD(
function_sig,
'\',\'',
function_input,
'\']'
) AS read_input
64,
'0'
) AS input,
utils.udf_json_rpc_call(
'eth_call',
[{'to': contract_address, 'from': null, 'data': input}, utils.udf_int_to_hex(block_number)],
concat_ws(
'-',
contract_address,
input,
block_number
)
) AS rpc_request,
ROW_NUMBER() over (
ORDER BY
block_number
) AS row_no,
CEIL(
row_no / 300
) AS batch_no
FROM
all_inputs
),
batch_reads_all AS (
SELECT
CONCAT('[', LISTAGG(read_input, ','), ']') AS batch_read
FROM
ready_reads_all
),
all_reads AS (
{% if is_incremental() %}
{% for item in range(3) %}
(
SELECT
ethereum.streamline.udf_json_rpc_read_calls(
node_url,
headers,
PARSE_JSON(batch_read)
) AS read_output,
SYSDATE() AS _inserted_timestamp
live.udf_api('POST', CONCAT('{service}', '/', '{Authentication}'),{}, batch_rpc_request, 'Vault/prod/optimism/quicknode/mainnet') AS read_output, SYSDATE() AS _inserted_timestamp
FROM
batch_reads_all
JOIN streamline.crosschain.node_mapping
ON 1 = 1
AND chain = 'optimism'
(
SELECT
ARRAY_AGG(rpc_request) batch_rpc_request
FROM
build_rpc_requests
WHERE
EXISTS (
SELECT
1
FROM
ready_reads_all
LIMIT
1
)
), reads_adjusted AS (
batch_no = {{ item }} + 1
AND batch_no IN (
SELECT
DISTINCT batch_no
FROM
build_rpc_requests))) {% if not loop.last %}
UNION ALL
{% endif %}
{% endfor %}
{% else %}
{% for item in range(20) %}
(
SELECT
live.udf_api('POST', CONCAT('{service}', '/', '{Authentication}'),{}, batch_rpc_request, 'Vault/prod/optimism/quicknode/mainnet') AS read_output, SYSDATE() AS _inserted_timestamp
FROM
(
SELECT
ARRAY_AGG(rpc_request) batch_rpc_request
FROM
build_rpc_requests
WHERE
batch_no = {{ item }} + 1
AND batch_no IN (
SELECT
DISTINCT batch_no
FROM
build_rpc_requests))) {% if not loop.last %}
UNION ALL
{% endif %}
{% endfor %}
{% endif %}),
reads_adjusted AS (
SELECT
VALUE :id :: STRING AS read_id,
VALUE :result :: STRING AS read_result,
@ -144,14 +167,16 @@ all_reads AS (
'-'
) AS read_id_object,
read_id_object [0] :: STRING AS contract_address,
read_id_object [1] :: STRING AS block_number,
read_id_object [2] :: STRING AS function_sig,
read_id_object [3] :: STRING AS function_input,
read_id_object [2] :: STRING AS block_number,
LEFT(
read_id_object [1] :: STRING,
10
) AS function_sig,
_inserted_timestamp
FROM
all_reads,
LATERAL FLATTEN(
input => read_output [0] :data
LATERAL FLATTEN (
input => read_output :data
)
),
details AS (

View File

@ -28,18 +28,12 @@ sources:
database: "{{ 'crosschain' if target.database == 'OPTIMISM' else 'crosschain_dev' }}"
schema: silver
tables:
- name: apis_keys
- name: token_prices_priority_hourly
- name: token_prices_all_providers_hourly
- name: asset_metadata_priority
- name: asset_metadata_all_providers
- name: near_address_encoded
- name: labels_combined
- name: streamline_crosschain
database: streamline
schema: crosschain
tables:
- name: node_mapping
- name: crosschain_public
database: crosschain
schema: bronze_public

View File

@ -6,11 +6,11 @@ packages:
- package: dbt-labs/dbt_utils
version: 1.0.0
- git: https://github.com/FlipsideCrypto/fsc-utils.git
revision: bdc9dd02079c9f3a58c39dd45a44988cb1deb1bd
revision: 436e7a90ccb1bfa8d566ab99a807f38ec53f74b7
- package: get-select/dbt_snowflake_query_tags
version: 2.3.3
- package: calogica/dbt_date
version: 0.7.2
- git: https://github.com/FlipsideCrypto/livequery-models.git
revision: bca494102fbd2d621d32746e9a7fe780678044f8
sha1_hash: 09ade33483dfac0a83369b80cfd88bb6f9b52a92
revision: 992947a4eaa8fccdf2cfcd2cb73a470ff5e89fa2
sha1_hash: 342c7081a105a2da3cd7f77edf07124c8f0f1c25

View File

@ -6,6 +6,6 @@ packages:
- package: dbt-labs/dbt_utils
version: 1.0.0
- git: https://github.com/FlipsideCrypto/fsc-utils.git
revision: v1.15.1
revision: v1.18.0
- package: get-select/dbt_snowflake_query_tags
version: [">=2.0.0", "<3.0.0"]