diff --git a/models/bronze/api_udf/bronze_api__contract_abis.sql b/models/bronze/api_udf/bronze_api__contract_abis.sql index 577336b6..50608427 100644 --- a/models/bronze/api_udf/bronze_api__contract_abis.sql +++ b/models/bronze/api_udf/bronze_api__contract_abis.sql @@ -46,7 +46,7 @@ row_nos AS ( ), batched AS ({% for item in range(150) %} SELECT - rn.contract_address, live.udf_api('GET', CONCAT('https://api-optimistic.etherscan.io/api?module=contract&action=getabi&address=', rn.contract_address, '&apikey={op_key}'),{ 'User-Agent': 'FlipsideStreamline' },{},'EXPLORER') AS abi_data, SYSDATE() AS _inserted_timestamp + rn.contract_address, live.udf_api('GET', CONCAT('https://api-optimistic.etherscan.io/api?module=contract&action=getabi&address=', rn.contract_address, '&apikey={key}'),{ 'User-Agent': 'FlipsideStreamline' },{}, 'Vault/prod/block_explorers/op_etherscan') AS abi_data, SYSDATE() AS _inserted_timestamp FROM row_nos rn WHERE diff --git a/models/bronze/api_udf/bronze_api__token_reads.sql b/models/bronze/api_udf/bronze_api__token_reads.sql index 63a73e40..000ce7b4 100644 --- a/models/bronze/api_udf/bronze_api__token_reads.sql +++ b/models/bronze/api_udf/bronze_api__token_reads.sql @@ -50,39 +50,45 @@ ready_reads AS ( contract_address, latest_block, function_sig, - CONCAT( - '[\'', - contract_address, - '\',', - latest_block, - ',\'', + RPAD( function_sig, - '\',\'\']' - ) AS read_input + 64, + '0' + ) AS input, + utils.udf_json_rpc_call( + 'eth_call', + [{'to': contract_address, 'from': null, 'data': input}, utils.udf_int_to_hex(latest_block)], + concat_ws( + '-', + contract_address, + input, + latest_block + ) + ) AS rpc_request FROM all_reads ), batch_reads AS ( SELECT - CONCAT('[', LISTAGG(read_input, ','), ']') AS batch_read + ARRAY_AGG(rpc_request) AS batch_rpc_request FROM ready_reads ), -results AS ( +node_call AS ( SELECT - ethereum.streamline.udf_json_rpc_read_calls( - node_url, - headers, - PARSE_JSON(batch_read) - ) AS read_output + *, + live.udf_api( + 'POST', + CONCAT( + '{service}', + '/', + '{Authentication}' + ),{}, + batch_rpc_request, + 'Vault/prod/optimism/quicknode/mainnet' + ) AS response FROM batch_reads - JOIN {{ source( - 'streamline_crosschain', - 'node_mapping' - ) }} - ON 1 = 1 - AND chain = 'optimism' WHERE EXISTS ( SELECT @@ -92,30 +98,30 @@ results AS ( LIMIT 1 ) -), FINAL AS ( +), flat_responses AS ( SELECT - VALUE :id :: STRING AS read_id, - VALUE :result :: STRING AS read_result, - SPLIT( - read_id, - '-' - ) AS read_id_object, - read_id_object [0] :: STRING AS contract_address, - read_id_object [1] :: STRING AS block_number, - read_id_object [2] :: STRING AS function_sig, - read_id_object [3] :: STRING AS function_input + VALUE :id :: STRING AS call_id, + VALUE :result :: STRING AS read_result FROM - results, - LATERAL FLATTEN( - input => read_output [0] :data + node_call, + LATERAL FLATTEN ( + input => response :data ) ) SELECT - contract_address, - block_number, - function_sig, - function_input, + SPLIT_PART( + call_id, + '-', + 1 + ) AS contract_address, + SPLIT_PART( + call_id, + '-', + 3 + ) AS block_number, + LEFT(SPLIT_PART(call_id, '-', 2), 10) AS function_sig, + NULL AS function_input, read_result, SYSDATE() :: TIMESTAMP AS _inserted_timestamp FROM - FINAL + flat_responses diff --git a/models/silver/defi/bridge/hop/silver_bridge__hop_ammwrapper.sql b/models/silver/defi/bridge/hop/silver_bridge__hop_ammwrapper.sql index a6e29572..5ee9fb39 100644 --- a/models/silver/defi/bridge/hop/silver_bridge__hop_ammwrapper.sql +++ b/models/silver/defi/bridge/hop/silver_bridge__hop_ammwrapper.sql @@ -70,18 +70,18 @@ contract_reads AS ( [{ 'to': contract_address, 'from': null, 'data': data }, utils.udf_int_to_hex(block_number) ] ) AS rpc_request, live.udf_api( - node_url, - rpc_request + 'POST', + CONCAT( + '{service}', + '/', + '{Authentication}' + ),{}, + rpc_request, + 'Vault/prod/optimism/quicknode/mainnet' ) AS read_output, SYSDATE() AS _inserted_timestamp FROM inputs - JOIN {{ source( - 'streamline_crosschain', - 'node_mapping' - ) }} - ON 1 = 1 - AND chain = 'optimism' ), reads_flat AS ( SELECT diff --git a/models/silver/defi/bridge/hop/silver_bridge__hop_l2canonicaltoken.sql b/models/silver/defi/bridge/hop/silver_bridge__hop_l2canonicaltoken.sql index 2d2fa74c..5eddaa21 100644 --- a/models/silver/defi/bridge/hop/silver_bridge__hop_l2canonicaltoken.sql +++ b/models/silver/defi/bridge/hop/silver_bridge__hop_l2canonicaltoken.sql @@ -62,18 +62,18 @@ contract_reads AS ( [{ 'to': amm_wrapper_address, 'from': null, 'data': data }, utils.udf_int_to_hex(block_number) ] ) AS rpc_request, live.udf_api( - node_url, - rpc_request + 'POST', + CONCAT( + '{service}', + '/', + '{Authentication}' + ),{}, + rpc_request, + 'Vault/prod/optimism/quicknode/mainnet' ) AS read_output, SYSDATE() AS _inserted_timestamp FROM inputs - JOIN {{ source( - 'streamline_crosschain', - 'node_mapping' - ) }} - ON 1 = 1 - AND chain = 'optimism' ), reads_flat AS ( SELECT diff --git a/models/silver/defi/bridge/stargate/silver_bridge__stargate_createpool.sql b/models/silver/defi/bridge/stargate/silver_bridge__stargate_createpool.sql index f6ee090e..ffc7a84a 100644 --- a/models/silver/defi/bridge/stargate/silver_bridge__stargate_createpool.sql +++ b/models/silver/defi/bridge/stargate/silver_bridge__stargate_createpool.sql @@ -75,18 +75,18 @@ contract_reads AS ( [{ 'to': contract_address, 'from': null, 'data': data }, utils.udf_int_to_hex(block_number) ] ) AS rpc_request, live.udf_api( - node_url, - rpc_request + 'POST', + CONCAT( + '{service}', + '/', + '{Authentication}' + ),{}, + rpc_request, + 'Vault/prod/optimism/quicknode/mainnet' ) AS read_output, SYSDATE() AS _inserted_timestamp FROM inputs - JOIN {{ source( - 'streamline_crosschain', - 'node_mapping' - ) }} - ON 1 = 1 - AND chain = 'optimism' ), reads_flat AS ( SELECT diff --git a/models/silver/defi/dex/beethovenx/silver_dex__beethovenx_pools.sql b/models/silver/defi/dex/beethovenx/silver_dex__beethovenx_pools.sql index 5b9cefa7..15fc76fe 100644 --- a/models/silver/defi/dex/beethovenx/silver_dex__beethovenx_pools.sql +++ b/models/silver/defi/dex/beethovenx/silver_dex__beethovenx_pools.sql @@ -15,15 +15,23 @@ WITH pools_registered AS ( tx_hash, contract_address, topics [1] :: STRING AS pool_id, - SUBSTR(topics [1] :: STRING,1,42) AS pool_address, + SUBSTR( + topics [1] :: STRING, + 1, + 42 + ) AS pool_address, _log_id, _inserted_timestamp, - ROW_NUMBER() OVER (ORDER BY pool_address) AS row_num + ROW_NUMBER() over ( + ORDER BY + pool_address + ) AS row_num FROM {{ ref('silver__logs') }} WHERE - topics[0]::STRING = '0x3c13bc30b8e878c53fd2a36b679409c073afd75950be43d8858768e956fbc20e' --PoolRegistered + topics [0] :: STRING = '0x3c13bc30b8e878c53fd2a36b679409c073afd75950be43d8858768e956fbc20e' --PoolRegistered AND contract_address = '0xba12222222228d8ba445958a75a0704d566bf2c8' + {% if is_incremental() %} AND _inserted_timestamp >= ( SELECT @@ -32,120 +40,117 @@ AND _inserted_timestamp >= ( {{ this }} ) {% endif %} - ), - tokens_registered AS ( - -SELECT - block_number, - block_timestamp, - event_index, - tx_hash, - contract_address, - decoded_flat :poolId :: STRING AS pool_id, - decoded_flat :tokens AS tokens, - tokens[0] :: STRING AS token0, - tokens[1] :: STRING AS token1, - tokens[2] :: STRING AS token2, - tokens[3] :: STRING AS token3, - tokens[4] :: STRING AS token4, - tokens[5] :: STRING AS token5, - tokens[6] :: STRING AS token6, - tokens[7] :: STRING AS token7, - decoded_flat :assetManagers AS asset_managers, - _log_id, - _inserted_timestamp -FROM {{ ref('silver__decoded_logs') }} -WHERE - topics[0]::STRING = '0xf5847d3f2197b16cdcd2098ec95d0905cd1abdaf415f07bb7cef2bba8ac5dec4' --TokensRegistered - AND contract_address = '0xba12222222228d8ba445958a75a0704d566bf2c8' - AND tx_hash IN ( - SELECT - tx_hash - FROM - pools_registered - ) -), - -function_sigs AS ( - -SELECT - '0x06fdde03' AS function_sig, - 'name' AS function_name -UNION ALL -SELECT - '0x95d89b41' AS function_sig, - 'symbol' AS function_name -UNION ALL -SELECT - '0x313ce567' AS function_sig, - 'decimals' AS function_name -), - -inputs_pools AS ( - -SELECT - pool_address, - block_number, - function_sig, - (ROW_NUMBER() OVER (PARTITION BY pool_address - ORDER BY block_number)) - 1 AS function_input -FROM pools_registered -JOIN function_sigs ON 1=1 -), - -pool_token_reads AS ( - -{% for item in range(50) %} -( -SELECT - ethereum.streamline.udf_json_rpc_read_calls( - node_url, - headers, - PARSE_JSON(batch_read) - ) AS read_output, - SYSDATE() AS _inserted_timestamp -FROM ( SELECT - CONCAT('[', LISTAGG(read_input, ','), ']') AS batch_read - FROM ( - SELECT - pool_address, - block_number, - function_sig, - function_input, - CONCAT( - '[\'', - pool_address, - '\',', - block_number, - ',\'', - function_sig, - '\',\'', - function_input, - '\']' - ) AS read_input, - row_num - FROM inputs_pools - LEFT JOIN pools_registered USING(pool_address) - ) ready_reads_pools - WHERE row_num BETWEEN {{ item * 50 + 1 }} AND {{ (item + 1) * 50}} - ) batch_reads_pools -JOIN {{ source( - 'streamline_crosschain', - 'node_mapping' - ) }} ON 1=1 - AND chain = 'optimism' -) {% if not loop.last %} -UNION ALL -{% endif %} -{% endfor %} + block_number, + block_timestamp, + event_index, + tx_hash, + contract_address, + decoded_flat :poolId :: STRING AS pool_id, + decoded_flat :tokens AS tokens, + tokens [0] :: STRING AS token0, + tokens [1] :: STRING AS token1, + tokens [2] :: STRING AS token2, + tokens [3] :: STRING AS token3, + tokens [4] :: STRING AS token4, + tokens [5] :: STRING AS token5, + tokens [6] :: STRING AS token6, + tokens [7] :: STRING AS token7, + decoded_flat :assetManagers AS asset_managers, + _log_id, + _inserted_timestamp + FROM + {{ ref('silver__decoded_logs') }} + WHERE + topics [0] :: STRING = '0xf5847d3f2197b16cdcd2098ec95d0905cd1abdaf415f07bb7cef2bba8ac5dec4' --TokensRegistered + AND contract_address = '0xba12222222228d8ba445958a75a0704d566bf2c8' + AND tx_hash IN ( + SELECT + tx_hash + FROM + pools_registered + ) ), - -reads_adjusted AS ( - -SELECT +function_sigs AS ( + SELECT + '0x06fdde03' AS function_sig, + 'name' AS function_name + UNION ALL + SELECT + '0x95d89b41' AS function_sig, + 'symbol' AS function_name + UNION ALL + SELECT + '0x313ce567' AS function_sig, + 'decimals' AS function_name +), +inputs_pools AS ( + SELECT + pool_address, + block_number, + function_sig + FROM + pools_registered + JOIN function_sigs + ON 1 = 1 +), +build_rpc_requests AS ( + SELECT + pool_address, + block_number, + function_sig, + RPAD( + function_sig, + 64, + '0' + ) AS input, + utils.udf_json_rpc_call( + 'eth_call', + [{'to': pool_address, 'from': null, 'data': input}, utils.udf_int_to_hex(block_number)], + concat_ws( + '-', + pool_address, + input, + block_number + ) + ) AS rpc_request + FROM + inputs_pools +), +batch_reads AS ( + SELECT + ARRAY_AGG(rpc_request) AS batch_rpc_request + FROM + build_rpc_requests +), +node_call AS ( + SELECT + *, + live.udf_api( + 'POST', + CONCAT( + '{service}', + '/', + '{Authentication}' + ),{}, + batch_rpc_request, + 'Vault/prod/optimism/quicknode/mainnet' + ) AS response + FROM + batch_reads + WHERE + EXISTS ( + SELECT + 1 + FROM + build_rpc_requests + LIMIT + 1 + ) +), reads_adjusted AS ( + SELECT VALUE :id :: STRING AS read_id, VALUE :result :: STRING AS read_result, SPLIT( @@ -153,44 +158,58 @@ SELECT '-' ) AS read_id_object, read_id_object [0] :: STRING AS pool_address, - read_id_object [1] :: STRING AS block_number, - read_id_object [2] :: STRING AS function_sig, - read_id_object [3] :: STRING AS function_input, - _inserted_timestamp -FROM - pool_token_reads, - LATERAL FLATTEN( - input => read_output [0] :data - ) + read_id_object [2] :: STRING AS block_number, + LEFT( + read_id_object [1] :: STRING, + 10 + ) AS function_sig + FROM + node_call, + LATERAL FLATTEN ( + input => response :data + ) ), - pool_details AS ( - -SELECT - pool_address, - function_sig, - function_name, - read_result, - regexp_substr_all(SUBSTR(read_result, 3, len(read_result)), '.{64}') AS segmented_output, - _inserted_timestamp -FROM reads_adjusted -LEFT JOIN function_sigs USING(function_sig) - ), - + SELECT + pool_address, + function_sig, + function_name, + read_result, + regexp_substr_all(SUBSTR(read_result, 3, len(read_result)), '.{64}') AS segmented_output, + SYSDATE() AS _inserted_timestamp + FROM + reads_adjusted + LEFT JOIN function_sigs USING(function_sig) +), FINAL AS ( -SELECT - pool_address, - MIN(CASE WHEN function_name = 'symbol' THEN utils.udf_hex_to_string(segmented_output [2] :: STRING) END) AS pool_symbol, - MIN(CASE WHEN function_name = 'name' THEN utils.udf_hex_to_string(segmented_output [2] :: STRING) END) AS pool_name, - MIN(CASE - WHEN read_result::STRING = '0x' THEN NULL - ELSE utils.udf_hex_to_int(LEFT(read_result::STRING,66)) - END)::INTEGER AS pool_decimals, - MAX(_inserted_timestamp) AS _inserted_timestamp -FROM pool_details -GROUP BY 1 + SELECT + pool_address, + MIN( + CASE + WHEN function_name = 'symbol' THEN utils.udf_hex_to_string( + segmented_output [2] :: STRING + ) + END + ) AS pool_symbol, + MIN( + CASE + WHEN function_name = 'name' THEN utils.udf_hex_to_string( + segmented_output [2] :: STRING + ) + END + ) AS pool_name, + MIN( + CASE + WHEN read_result :: STRING = '0x' THEN NULL + ELSE utils.udf_hex_to_int(LEFT(read_result :: STRING, 66)) + END + ) :: INTEGER AS pool_decimals, + MAX(_inserted_timestamp) AS _inserted_timestamp + FROM + pool_details + GROUP BY + 1 ) - SELECT p.block_number, p.block_timestamp, @@ -213,7 +232,11 @@ SELECT t.asset_managers, p._log_id, f._inserted_timestamp -FROM FINAL f -LEFT JOIN pools_registered p ON f.pool_address = p.pool_address -LEFT JOIN tokens_registered t ON p.pool_id = t.pool_id -WHERE t.token0 IS NOT NULL \ No newline at end of file +FROM + FINAL f + LEFT JOIN pools_registered p + ON f.pool_address = p.pool_address + LEFT JOIN tokens_registered t + ON p.pool_id = t.pool_id +WHERE + t.token0 IS NOT NULL diff --git a/models/silver/defi/dex/curve/silver_dex__curve_pools.sql b/models/silver/defi/dex/curve/silver_dex__curve_pools.sql index aa27afa8..a5a2abd6 100644 --- a/models/silver/defi/dex/curve/silver_dex__curve_pools.sql +++ b/models/silver/defi/dex/curve/silver_dex__curve_pools.sql @@ -8,26 +8,32 @@ WITH contract_deployments AS ( -SELECT - tx_hash, - block_number, - block_timestamp, - from_address AS deployer_address, - to_address AS contract_address, - _call_id, - _inserted_timestamp, - ROW_NUMBER() OVER (ORDER BY contract_address) AS row_num -FROM - {{ ref('silver__traces' )}} -WHERE - -- curve contract deployers - from_address IN ( - '0x2db0e83599a91b508ac268a6197b8b14f5e72840', - '0x7eeac6cddbd1d0b8af061742d41877d7f707289a', - '0x745748bcfd8f9c2de519a71d789be8a63dd7d66c' + SELECT + tx_hash, + block_number, + block_timestamp, + from_address AS deployer_address, + to_address AS contract_address, + _call_id, + _inserted_timestamp, + ROW_NUMBER() over ( + ORDER BY + contract_address + ) AS row_num + FROM + {{ ref( + 'silver__traces' + ) }} + WHERE + -- curve contract deployers + from_address IN ( + '0x2db0e83599a91b508ac268a6197b8b14f5e72840', + '0x7eeac6cddbd1d0b8af061742d41877d7f707289a', + '0x745748bcfd8f9c2de519a71d789be8a63dd7d66c' ) - AND TYPE ilike 'create%' - AND TX_STATUS ilike 'success' + AND TYPE ILIKE 'create%' + AND tx_status ILIKE 'success' + {% if is_incremental() %} AND _inserted_timestamp >= ( SELECT @@ -35,190 +41,228 @@ AND _inserted_timestamp >= ( FROM {{ this }} ) - {% endif %} -QUALIFY(ROW_NUMBER() OVER(PARTITION BY to_address ORDER BY block_timestamp ASC)) = 1 -), +qualify(ROW_NUMBER() over(PARTITION BY to_address +ORDER BY + block_timestamp ASC)) = 1 +), function_sigs AS ( - -SELECT - '0x87cb4f57' AS function_sig, - 'base_coins' AS function_name -UNION ALL -SELECT - '0xb9947eb0' AS function_sig, - 'underlying_coins' AS function_name -UNION ALL -SELECT - '0xc6610657' AS function_sig, - 'coins' AS function_name -UNION ALL -SELECT - '0x06fdde03' AS function_sig, - 'name' AS function_name -UNION ALL -SELECT - '0x95d89b41' AS function_sig, - 'symbol' AS function_name -UNION ALL -SELECT - '0x313ce567' AS function_sig, - 'decimals' AS function_name + SELECT + '0x87cb4f57' AS function_sig, + 'base_coins' AS function_name + UNION ALL + SELECT + '0xb9947eb0' AS function_sig, + 'underlying_coins' AS function_name + UNION ALL + SELECT + '0xc6610657' AS function_sig, + 'coins' AS function_name + UNION ALL + SELECT + '0x06fdde03' AS function_sig, + 'name' AS function_name + UNION ALL + SELECT + '0x95d89b41' AS function_sig, + 'symbol' AS function_name + UNION ALL + SELECT + '0x313ce567' AS function_sig, + 'decimals' AS function_name ), - function_inputs AS ( SELECT SEQ4() AS function_input FROM TABLE(GENERATOR(rowcount => 8)) ), - inputs_coins AS ( - -SELECT - deployer_address, - contract_address, - block_number, - function_sig, - (ROW_NUMBER() OVER (PARTITION BY contract_address - ORDER BY block_number)) - 1 AS function_input -FROM contract_deployments -JOIN function_sigs ON 1=1 -JOIN function_inputs ON 1=1 - WHERE function_name = 'coins' + SELECT + deployer_address, + contract_address, + block_number, + function_sig, + (ROW_NUMBER() over (PARTITION BY contract_address + ORDER BY + block_number)) - 1 AS function_input + FROM + contract_deployments + JOIN function_sigs + ON 1 = 1 + JOIN function_inputs + ON 1 = 1 + WHERE + function_name = 'coins' ), - inputs_base_coins AS ( - -SELECT - deployer_address, - contract_address, - block_number, - function_sig, - (ROW_NUMBER() OVER (PARTITION BY contract_address - ORDER BY block_number)) - 1 AS function_input -FROM contract_deployments -JOIN function_sigs ON 1=1 -JOIN function_inputs ON 1=1 - WHERE function_name = 'base_coins' + SELECT + deployer_address, + contract_address, + block_number, + function_sig, + (ROW_NUMBER() over (PARTITION BY contract_address + ORDER BY + block_number)) - 1 AS function_input + FROM + contract_deployments + JOIN function_sigs + ON 1 = 1 + JOIN function_inputs + ON 1 = 1 + WHERE + function_name = 'base_coins' ), - inputs_underlying_coins AS ( - -SELECT - deployer_address, - contract_address, - block_number, - function_sig, - (ROW_NUMBER() OVER (PARTITION BY contract_address - ORDER BY block_number)) - 1 AS function_input -FROM contract_deployments -JOIN function_sigs ON 1=1 -JOIN function_inputs ON 1=1 - WHERE function_name = 'underlying_coins' + SELECT + deployer_address, + contract_address, + block_number, + function_sig, + (ROW_NUMBER() over (PARTITION BY contract_address + ORDER BY + block_number)) - 1 AS function_input + FROM + contract_deployments + JOIN function_sigs + ON 1 = 1 + JOIN function_inputs + ON 1 = 1 + WHERE + function_name = 'underlying_coins' ), - inputs_pool_details AS ( - -SELECT - deployer_address, - contract_address, - block_number, - function_sig, - NULL AS function_input -FROM contract_deployments -JOIN function_sigs ON 1=1 -WHERE function_name IN ('name','symbol','decimals') + SELECT + deployer_address, + contract_address, + block_number, + function_sig, + NULL AS function_input + FROM + contract_deployments + JOIN function_sigs + ON 1 = 1 + WHERE + function_name IN ( + 'name', + 'symbol', + 'decimals' + ) ), - all_inputs AS ( - -SELECT - deployer_address, - contract_address, - block_number, - function_sig, - function_input -FROM inputs_coins -UNION ALL -SELECT - deployer_address, - contract_address, - block_number, - function_sig, - function_input -FROM inputs_base_coins -UNION ALL -SELECT - deployer_address, - contract_address, - block_number, - function_sig, - function_input -FROM inputs_underlying_coins -UNION ALL -SELECT - deployer_address, - contract_address, - block_number, - function_sig, - function_input -FROM inputs_pool_details + SELECT + deployer_address, + contract_address, + block_number, + function_sig, + function_input + FROM + inputs_coins + UNION ALL + SELECT + deployer_address, + contract_address, + block_number, + function_sig, + function_input + FROM + inputs_base_coins + UNION ALL + SELECT + deployer_address, + contract_address, + block_number, + function_sig, + function_input + FROM + inputs_underlying_coins + UNION ALL + SELECT + deployer_address, + contract_address, + block_number, + function_sig, + function_input + FROM + inputs_pool_details +), +build_rpc_requests AS ( + SELECT + deployer_address, + contract_address, + block_number, + function_sig, + function_input, + CONCAT( + function_sig, + LPAD(IFNULL(function_input, 0), 64, '0') + ) AS input, + utils.udf_json_rpc_call( + 'eth_call', + [{'to': contract_address, 'from': null, 'data':input }, utils.udf_int_to_hex(block_number)], + concat_ws( + '-', + contract_address, + input, + block_number + ) + ) AS rpc_request, + row_num, + CEIL( + row_num / 50 + ) AS batch_no + FROM + all_inputs + LEFT JOIN contract_deployments USING(contract_address) ), - pool_token_reads AS ( -{% for item in range(10) %} -( -SELECT - ethereum.streamline.udf_json_rpc_read_calls( - node_url, - headers, - PARSE_JSON(batch_read) - ) AS read_output, - SYSDATE() AS _inserted_timestamp -FROM ( +{% if is_incremental() %} +{% for item in range(6) %} + ( SELECT - CONCAT('[', LISTAGG(read_input, ','), ']') AS batch_read - FROM ( - SELECT - deployer_address, - contract_address, - block_number, - function_sig, - function_input, - CONCAT( - '[\'', - contract_address, - '\',', - block_number, - ',\'', - function_sig, - '\',\'', - (CASE WHEN function_input IS NULL THEN '' ELSE function_input::STRING END), - '\']' - ) AS read_input, - row_num - FROM all_inputs - LEFT JOIN contract_deployments USING(contract_address) - ) ready_reads_pools - WHERE row_num BETWEEN {{ item * 50 + 1 }} AND {{ (item + 1) * 50}} - ) batch_reads_pools -JOIN {{ source( - 'streamline_crosschain', - 'node_mapping' - ) }} ON 1=1 - AND chain = 'optimism' -) {% if not loop.last %} -UNION ALL -{% endif %} -{% endfor %} -), - + live.udf_api('POST', CONCAT('{service}', '/', '{Authentication}'),{}, batch_rpc_request, 'Vault/prod/optimism/quicknode/mainnet') AS read_output, SYSDATE() AS _inserted_timestamp + FROM + ( + SELECT + ARRAY_AGG(rpc_request) batch_rpc_request + FROM + build_rpc_requests + WHERE + batch_no = {{ item }} + 1 + AND batch_no IN ( + SELECT + DISTINCT batch_no + FROM + build_rpc_requests))) {% if not loop.last %} + UNION ALL + {% endif %} + {% endfor %} +{% else %} + {% for item in range(60) %} + ( + SELECT + live.udf_api('POST', CONCAT('{service}', '/', '{Authentication}'),{}, batch_rpc_request, 'Vault/prod/optimism/quicknode/mainnet') AS read_output, SYSDATE() AS _inserted_timestamp + FROM + ( + SELECT + ARRAY_AGG(rpc_request) batch_rpc_request + FROM + build_rpc_requests + WHERE + batch_no = {{ item }} + 1 + AND batch_no IN ( + SELECT + DISTINCT batch_no + FROM + build_rpc_requests))) {% if not loop.last %} + UNION ALL + {% endif %} + {% endfor %} +{% endif %}), reads_adjusted AS ( - -SELECT + SELECT VALUE :id :: STRING AS read_id, VALUE :result :: STRING AS read_result, SPLIT( @@ -226,117 +270,159 @@ SELECT '-' ) AS read_id_object, read_id_object [0] :: STRING AS contract_address, - read_id_object [1] :: STRING AS block_number, - read_id_object [2] :: STRING AS function_sig, - read_id_object [3] :: STRING AS function_input, + read_id_object [2] :: STRING AS block_number, + LEFT( + read_id_object [1] :: STRING, + 10 + ) AS function_sig, + RIGHT( + read_id_object [1], + LENGTH( + read_id_object [1] - 10 + ) + ) :: INT AS function_input, _inserted_timestamp -FROM - pool_token_reads, - LATERAL FLATTEN( - input => read_output [0] :data - ) + FROM + pool_token_reads, + LATERAL FLATTEN( + input => read_output :data + ) ), - tokens AS ( - -SELECT - contract_address, - function_sig, - function_name, - function_input, - read_result, - regexp_substr_all(SUBSTR(read_result, 3, len(read_result)), '.{64}')[0]AS segmented_token_address, - _inserted_timestamp -FROM reads_adjusted -LEFT JOIN function_sigs USING(function_sig) -WHERE function_name IN ('coins','base_coins','underlying_coins') - AND read_result IS NOT NULL - -), - -pool_details AS ( - -SELECT - contract_address, - function_sig, - function_name, - function_input, - read_result, - regexp_substr_all(SUBSTR(read_result, 3, len(read_result)), '.{64}') AS segmented_output, - _inserted_timestamp -FROM reads_adjusted -LEFT JOIN function_sigs USING(function_sig) -WHERE function_name IN ('name','symbol','decimals') - AND read_result IS NOT NULL - -), - -all_pools AS ( -SELECT - t.contract_address AS pool_address, - CONCAT('0x',SUBSTRING(t.segmented_token_address,25,40)) AS token_address, - function_input AS token_id, - function_name AS token_type, - MIN(CASE WHEN p.function_name = 'symbol' THEN utils.udf_hex_to_string(RTRIM(p.segmented_output [2] :: STRING,0)) END) AS pool_symbol, - MIN(CASE WHEN p.function_name = 'name' THEN CONCAT(utils.udf_hex_to_string(p.segmented_output [2] :: STRING), - utils.udf_hex_to_string(segmented_output [3] :: STRING)) END) AS pool_name, - MIN(CASE - WHEN p.read_result::STRING = '0x' THEN NULL - ELSE utils.udf_hex_to_int(LEFT(p.read_result::STRING,66)) - END)::INTEGER AS pool_decimals, - CONCAT( - t.contract_address, - '-', - CONCAT('0x',SUBSTRING(t.segmented_token_address,25,40)), - '-', + SELECT + contract_address, + function_sig, + function_name, function_input, - '-', - function_name - ) AS pool_id, - MAX(t._inserted_timestamp) AS _inserted_timestamp -FROM tokens t -LEFT JOIN pool_details p USING(contract_address) -WHERE token_address IS NOT NULL - AND token_address <> '0x0000000000000000000000000000000000000000' -GROUP BY 1,2,3,4 + read_result, + regexp_substr_all(SUBSTR(read_result, 3, len(read_result)), '.{64}') [0] AS segmented_token_address, + _inserted_timestamp + FROM + reads_adjusted + LEFT JOIN function_sigs USING(function_sig) + WHERE + function_name IN ( + 'coins', + 'base_coins', + 'underlying_coins' + ) + AND read_result IS NOT NULL +), +pool_details AS ( + SELECT + contract_address, + function_sig, + function_name, + function_input, + read_result, + regexp_substr_all(SUBSTR(read_result, 3, len(read_result)), '.{64}') AS segmented_output, + _inserted_timestamp + FROM + reads_adjusted + LEFT JOIN function_sigs USING(function_sig) + WHERE + function_name IN ( + 'name', + 'symbol', + 'decimals' + ) + AND read_result IS NOT NULL +), +all_pools AS ( + SELECT + t.contract_address AS pool_address, + CONCAT('0x', SUBSTRING(t.segmented_token_address, 25, 40)) AS token_address, + function_input AS token_id, + function_name AS token_type, + MIN( + CASE + WHEN p.function_name = 'symbol' THEN utils.udf_hex_to_string(RTRIM(p.segmented_output [2] :: STRING, 0)) + END + ) AS pool_symbol, + MIN( + CASE + WHEN p.function_name = 'name' THEN CONCAT( + utils.udf_hex_to_string( + p.segmented_output [2] :: STRING + ), + utils.udf_hex_to_string( + segmented_output [3] :: STRING + ) + ) + END + ) AS pool_name, + MIN( + CASE + WHEN p.read_result :: STRING = '0x' THEN NULL + ELSE utils.udf_hex_to_int(LEFT(p.read_result :: STRING, 66)) + END + ) :: INTEGER AS pool_decimals, + CONCAT( + t.contract_address, + '-', + CONCAT('0x', SUBSTRING(t.segmented_token_address, 25, 40)), + '-', + function_input, + '-', + function_name + ) AS pool_id, + MAX( + t._inserted_timestamp + ) AS _inserted_timestamp + FROM + tokens t + LEFT JOIN pool_details p USING(contract_address) + WHERE + token_address IS NOT NULL + AND token_address <> '0x0000000000000000000000000000000000000000' + GROUP BY + 1, + 2, + 3, + 4 ), - FINAL AS ( -SELECT - block_number, - block_timestamp, - tx_hash, - deployer_address, - pool_address, - CASE - WHEN token_address = '0xeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeee' THEN '0x4200000000000000000000000000000000000006' - ELSE token_address - END AS token_address, - token_id, - token_type, - CASE - WHEN token_address = '0xeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeee' THEN 'WETH' - WHEN pool_symbol IS NULL THEN c.token_symbol - ELSE pool_symbol - END AS pool_symbol, - pool_name, - CASE - WHEN token_address = '0xeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeee' THEN '18' - WHEN pool_decimals IS NULL THEN c.token_decimals - ELSE pool_decimals - END AS pool_decimals, - pool_id, - _call_id, - a._inserted_timestamp -FROM all_pools a -LEFT JOIN {{ ref('silver__contracts') }} c - ON a.token_address = c.contract_address -LEFT JOIN contract_deployments d - ON a.pool_address = d.contract_address -QUALIFY(ROW_NUMBER() OVER(PARTITION BY pool_address, token_address ORDER BY a._inserted_timestamp DESC)) = 1 + SELECT + block_number, + block_timestamp, + tx_hash, + deployer_address, + pool_address, + CASE + WHEN token_address = '0xeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeee' THEN '0x4200000000000000000000000000000000000006' + ELSE token_address + END AS token_address, + token_id, + token_type, + CASE + WHEN token_address = '0xeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeee' THEN 'WETH' + WHEN pool_symbol IS NULL THEN C.token_symbol + ELSE pool_symbol + END AS pool_symbol, + pool_name, + CASE + WHEN token_address = '0xeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeee' THEN '18' + WHEN pool_decimals IS NULL THEN C.token_decimals + ELSE pool_decimals + END AS pool_decimals, + pool_id, + _call_id, + A._inserted_timestamp + FROM + all_pools A + LEFT JOIN {{ ref('silver__contracts') }} C + ON A.token_address = C.contract_address + LEFT JOIN contract_deployments d + ON A.pool_address = d.contract_address qualify(ROW_NUMBER() over(PARTITION BY pool_address, token_address + ORDER BY + A._inserted_timestamp DESC)) = 1 ) - SELECT *, - ROW_NUMBER() OVER (PARTITION BY pool_address ORDER BY token_address ASC) AS token_num -FROM FINAL \ No newline at end of file + ROW_NUMBER() over ( + PARTITION BY pool_address + ORDER BY + token_address ASC + ) AS token_num +FROM + FINAL diff --git a/models/silver/protocols/velodrome/silver__velodrome_pools.sql b/models/silver/protocols/velodrome/silver__velodrome_pools.sql index 447c8004..90b2c370 100644 --- a/models/silver/protocols/velodrome/silver__velodrome_pools.sql +++ b/models/silver/protocols/velodrome/silver__velodrome_pools.sql @@ -2,8 +2,8 @@ materialized = 'incremental', incremental_strategy = 'delete+insert', unique_key = 'created_block', - tags = ['curated'], - full_refresh = false + full_refresh = false, + tags = ['curated'] ) }} WITH pool_creation AS ( @@ -31,7 +31,12 @@ WITH pool_creation AS ( AND contract_address = '0x25cbddb98b35ab1ff77413456b31ec81a6b6b746' --velo deployer {% if is_incremental() %} - +AND _inserted_timestamp >= ( + SELECT + MAX(_inserted_timestamp) - INTERVAL '12 hours' + FROM + {{ this }} +) {% endif %} ), function_sigs AS ( @@ -52,9 +57,6 @@ all_inputs AS ( pool_address AS contract_address, block_number, function_sig, - (ROW_NUMBER() over (PARTITION BY pool_address - ORDER BY - block_number)) - 1 AS function_input, 'pool' AS address_label FROM pool_creation @@ -65,9 +67,6 @@ all_inputs AS ( token0_address AS contract_address, block_number, function_sig, - (ROW_NUMBER() over (PARTITION BY token0_address - ORDER BY - block_number)) - 1 AS function_input, 'token0' AS address_label FROM pool_creation @@ -78,64 +77,88 @@ all_inputs AS ( token1_address AS contract_address, block_number, function_sig, - (ROW_NUMBER() over (PARTITION BY token1_address - ORDER BY - block_number)) - 1 AS function_input, 'token1' AS address_label FROM pool_creation JOIN function_sigs ON 1 = 1 ), -ready_reads_all AS ( +build_rpc_requests AS ( SELECT contract_address, block_number, function_sig, - function_input, - CONCAT( - '[\'', - contract_address, - '\',', - block_number, - ',\'', + RPAD( function_sig, - '\',\'', - function_input, - '\']' - ) AS read_input + 64, + '0' + ) AS input, + utils.udf_json_rpc_call( + 'eth_call', + [{'to': contract_address, 'from': null, 'data': input}, utils.udf_int_to_hex(block_number)], + concat_ws( + '-', + contract_address, + input, + block_number + ) + ) AS rpc_request, + ROW_NUMBER() over ( + ORDER BY + block_number + ) AS row_no, + CEIL( + row_no / 300 + ) AS batch_no FROM all_inputs ), -batch_reads_all AS ( - SELECT - CONCAT('[', LISTAGG(read_input, ','), ']') AS batch_read - FROM - ready_reads_all -), all_reads AS ( + +{% if is_incremental() %} +{% for item in range(3) %} + ( SELECT - ethereum.streamline.udf_json_rpc_read_calls( - node_url, - headers, - PARSE_JSON(batch_read) - ) AS read_output, - SYSDATE() AS _inserted_timestamp + live.udf_api('POST', CONCAT('{service}', '/', '{Authentication}'),{}, batch_rpc_request, 'Vault/prod/optimism/quicknode/mainnet') AS read_output, SYSDATE() AS _inserted_timestamp FROM - batch_reads_all - JOIN streamline.crosschain.node_mapping - ON 1 = 1 - AND chain = 'optimism' + ( + SELECT + ARRAY_AGG(rpc_request) batch_rpc_request + FROM + build_rpc_requests WHERE - EXISTS ( - SELECT - 1 - FROM - ready_reads_all - LIMIT - 1 - ) -), reads_adjusted AS ( + batch_no = {{ item }} + 1 + AND batch_no IN ( + SELECT + DISTINCT batch_no + FROM + build_rpc_requests))) {% if not loop.last %} + UNION ALL + {% endif %} + {% endfor %} +{% else %} + {% for item in range(20) %} + ( + SELECT + live.udf_api('POST', CONCAT('{service}', '/', '{Authentication}'),{}, batch_rpc_request, 'Vault/prod/optimism/quicknode/mainnet') AS read_output, SYSDATE() AS _inserted_timestamp + FROM + ( + SELECT + ARRAY_AGG(rpc_request) batch_rpc_request + FROM + build_rpc_requests + WHERE + batch_no = {{ item }} + 1 + AND batch_no IN ( + SELECT + DISTINCT batch_no + FROM + build_rpc_requests))) {% if not loop.last %} + UNION ALL + {% endif %} + {% endfor %} +{% endif %}), +reads_adjusted AS ( SELECT VALUE :id :: STRING AS read_id, VALUE :result :: STRING AS read_result, @@ -144,14 +167,16 @@ all_reads AS ( '-' ) AS read_id_object, read_id_object [0] :: STRING AS contract_address, - read_id_object [1] :: STRING AS block_number, - read_id_object [2] :: STRING AS function_sig, - read_id_object [3] :: STRING AS function_input, + read_id_object [2] :: STRING AS block_number, + LEFT( + read_id_object [1] :: STRING, + 10 + ) AS function_sig, _inserted_timestamp FROM all_reads, - LATERAL FLATTEN( - input => read_output [0] :data + LATERAL FLATTEN ( + input => read_output :data ) ), details AS ( diff --git a/models/sources.yml b/models/sources.yml index 0e2c3059..5cba6993 100644 --- a/models/sources.yml +++ b/models/sources.yml @@ -28,18 +28,12 @@ sources: database: "{{ 'crosschain' if target.database == 'OPTIMISM' else 'crosschain_dev' }}" schema: silver tables: - - name: apis_keys - name: token_prices_priority_hourly - name: token_prices_all_providers_hourly - name: asset_metadata_priority - name: asset_metadata_all_providers - name: near_address_encoded - name: labels_combined - - name: streamline_crosschain - database: streamline - schema: crosschain - tables: - - name: node_mapping - name: crosschain_public database: crosschain schema: bronze_public diff --git a/package-lock.yml b/package-lock.yml index 175a0f96..84cd1c17 100644 --- a/package-lock.yml +++ b/package-lock.yml @@ -6,11 +6,11 @@ packages: - package: dbt-labs/dbt_utils version: 1.0.0 - git: https://github.com/FlipsideCrypto/fsc-utils.git - revision: bdc9dd02079c9f3a58c39dd45a44988cb1deb1bd + revision: 436e7a90ccb1bfa8d566ab99a807f38ec53f74b7 - package: get-select/dbt_snowflake_query_tags version: 2.3.3 - package: calogica/dbt_date version: 0.7.2 - git: https://github.com/FlipsideCrypto/livequery-models.git - revision: bca494102fbd2d621d32746e9a7fe780678044f8 -sha1_hash: 09ade33483dfac0a83369b80cfd88bb6f9b52a92 + revision: 992947a4eaa8fccdf2cfcd2cb73a470ff5e89fa2 +sha1_hash: 342c7081a105a2da3cd7f77edf07124c8f0f1c25 diff --git a/packages.yml b/packages.yml index 78af77d7..544b4dd3 100644 --- a/packages.yml +++ b/packages.yml @@ -6,6 +6,6 @@ packages: - package: dbt-labs/dbt_utils version: 1.0.0 - git: https://github.com/FlipsideCrypto/fsc-utils.git - revision: v1.15.1 + revision: v1.18.0 - package: get-select/dbt_snowflake_query_tags version: [">=2.0.0", "<3.0.0"] \ No newline at end of file