From 027551d8f030987c8f8a7aeb49a6705b640119c7 Mon Sep 17 00:00:00 2001 From: Austin <93135983+austinFlipside@users.noreply.github.com> Date: Fri, 10 Nov 2023 15:02:53 -0500 Subject: [PATCH] latency (#106) --- .../workflows/dbt_run_scheduled_curated.yml | 44 ++ data/github_actions__workflows.csv | 3 +- .../balancer/silver_dex__balancer_pools.sql | 290 ++++----- .../balancer/silver_dex__balancer_swaps.sql | 2 +- .../dex/curve/silver_dex__curve_pools.sql | 615 +++++++++--------- .../dex/curve/silver_dex__curve_swaps.sql | 189 +++--- .../honeyswap/silver_dex__honeyswap_pools.sql | 2 +- .../honeyswap/silver_dex__honeyswap_swaps.sql | 2 +- ...lver_dex__complete_dex_liquidity_pools.sql | 386 +++++++---- .../dex/silver_dex__complete_dex_swaps.sql | 2 +- .../dex/sushi/silver_dex__sushi_pools.sql | 2 +- .../dex/sushi/silver_dex__sushi_swaps.sql | 2 +- .../dex/swapr/silver_dex__swapr_pools.sql | 2 +- .../dex/swapr/silver_dex__swapr_swaps.sql | 2 +- models/silver/nft/silver__nft_transfers.sql | 2 +- 15 files changed, 890 insertions(+), 655 deletions(-) create mode 100644 .github/workflows/dbt_run_scheduled_curated.yml diff --git a/.github/workflows/dbt_run_scheduled_curated.yml b/.github/workflows/dbt_run_scheduled_curated.yml new file mode 100644 index 0000000..cd783ae --- /dev/null +++ b/.github/workflows/dbt_run_scheduled_curated.yml @@ -0,0 +1,44 @@ +name: dbt_run_scheduled_curated +run-name: dbt_run_scheduled_curated + +on: + workflow_dispatch: + branches: + - "main" + +env: + DBT_PROFILES_DIR: ./ + + ACCOUNT: "${{ vars.ACCOUNT }}" + ROLE: "${{ vars.ROLE }}" + USER: "${{ vars.USER }}" + PASSWORD: "${{ secrets.PASSWORD }}" + REGION: "${{ vars.REGION }}" + DATABASE: "${{ vars.DATABASE }}" + WAREHOUSE: "${{ vars.WAREHOUSE }}" + SCHEMA: "${{ vars.SCHEMA }}" + +concurrency: + group: ${{ github.workflow }} + +jobs: + run_dbt_jobs: + runs-on: ubuntu-latest + environment: + name: workflow_prod + + steps: + - uses: actions/checkout@v3 + + - uses: actions/setup-python@v4 + with: + python-version: "3.10" + cache: "pip" + + - name: install dependencies + run: | + pip install -r requirements.txt + dbt deps + - name: Run DBT Jobs + run: | + dbt run -m "gnosis_models,tag:curated" \ No newline at end of file diff --git a/data/github_actions__workflows.csv b/data/github_actions__workflows.csv index 871008a..57ed842 100644 --- a/data/github_actions__workflows.csv +++ b/data/github_actions__workflows.csv @@ -2,4 +2,5 @@ workflow_name,workflow_schedule dbt_run_scheduled_non_realtime,"10,40 * * * *" dbt_run_streamline_chainhead,"0,30 * * * *" dbt_run_streamline_decoder,"20,50 * * * *" -dbt_test_tasks,"25,55 * * * *" \ No newline at end of file +dbt_run_scheduled_curated,"30 * * * *" +dbt_test_tasks,"25 * * * *" \ No newline at end of file diff --git a/models/silver/dex/balancer/silver_dex__balancer_pools.sql b/models/silver/dex/balancer/silver_dex__balancer_pools.sql index ae110d3..43b0d71 100644 --- a/models/silver/dex/balancer/silver_dex__balancer_pools.sql +++ b/models/silver/dex/balancer/silver_dex__balancer_pools.sql @@ -3,7 +3,7 @@ incremental_strategy = 'delete+insert', unique_key = 'block_number', full_refresh = false, - tags = ['non_realtime'] + tags = ['curated'] ) }} WITH pools_registered AS ( @@ -15,15 +15,23 @@ WITH pools_registered AS ( tx_hash, contract_address, topics [1] :: STRING AS pool_id, - SUBSTR(topics [1] :: STRING,1,42) AS pool_address, + SUBSTR( + topics [1] :: STRING, + 1, + 42 + ) AS pool_address, _log_id, _inserted_timestamp, - ROW_NUMBER() OVER (ORDER BY pool_address) AS row_num + ROW_NUMBER() over ( + ORDER BY + pool_address + ) AS row_num FROM {{ ref('silver__logs') }} WHERE - topics[0]::STRING = '0x3c13bc30b8e878c53fd2a36b679409c073afd75950be43d8858768e956fbc20e' --PoolRegistered + topics [0] :: STRING = '0x3c13bc30b8e878c53fd2a36b679409c073afd75950be43d8858768e956fbc20e' --PoolRegistered AND contract_address = '0xba12222222228d8ba445958a75a0704d566bf2c8' + {% if is_incremental() %} AND _inserted_timestamp >= ( SELECT @@ -32,120 +40,91 @@ AND _inserted_timestamp >= ( {{ this }} ) {% endif %} - ), - tokens_registered AS ( - -SELECT - block_number, - block_timestamp, - event_index, - tx_hash, - contract_address, - decoded_flat :poolId :: STRING AS pool_id, - decoded_flat :tokens AS tokens, - tokens[0] :: STRING AS token0, - tokens[1] :: STRING AS token1, - tokens[2] :: STRING AS token2, - tokens[3] :: STRING AS token3, - tokens[4] :: STRING AS token4, - tokens[5] :: STRING AS token5, - tokens[6] :: STRING AS token6, - tokens[7] :: STRING AS token7, - decoded_flat :assetManagers AS asset_managers, - _log_id, - _inserted_timestamp -FROM {{ ref('silver__decoded_logs') }} -WHERE - topics[0]::STRING = '0xf5847d3f2197b16cdcd2098ec95d0905cd1abdaf415f07bb7cef2bba8ac5dec4' --TokensRegistered - AND contract_address = '0xba12222222228d8ba445958a75a0704d566bf2c8' - AND tx_hash IN ( - SELECT - tx_hash - FROM - pools_registered - ) -), - -function_sigs AS ( - -SELECT - '0x06fdde03' AS function_sig, - 'name' AS function_name -UNION ALL -SELECT - '0x95d89b41' AS function_sig, - 'symbol' AS function_name -UNION ALL -SELECT - '0x313ce567' AS function_sig, - 'decimals' AS function_name -), - -inputs_pools AS ( - -SELECT - pool_address, - block_number, - function_sig, - (ROW_NUMBER() OVER (PARTITION BY pool_address - ORDER BY block_number)) - 1 AS function_input -FROM pools_registered -JOIN function_sigs ON 1=1 -), - -pool_token_reads AS ( - -{% for item in range(50) %} -( -SELECT - ethereum.streamline.udf_json_rpc_read_calls( - node_url, - headers, - PARSE_JSON(batch_read) - ) AS read_output, - SYSDATE() AS _inserted_timestamp -FROM ( SELECT - CONCAT('[', LISTAGG(read_input, ','), ']') AS batch_read - FROM ( - SELECT - pool_address, - block_number, - function_sig, - function_input, - CONCAT( - '[\'', - pool_address, - '\',', - block_number, - ',\'', - function_sig, - '\',\'', - function_input, - '\']' - ) AS read_input, - row_num - FROM inputs_pools - LEFT JOIN pools_registered USING(pool_address) - ) ready_reads_pools - WHERE row_num BETWEEN {{ item * 50 + 1 }} AND {{ (item + 1) * 50}} - ) batch_reads_pools -JOIN {{ source( - 'streamline_crosschain', - 'node_mapping' - ) }} ON 1=1 - AND chain = 'gnosis' -) {% if not loop.last %} -UNION ALL -{% endif %} -{% endfor %} + block_number, + block_timestamp, + event_index, + tx_hash, + contract_address, + decoded_flat :poolId :: STRING AS pool_id, + decoded_flat :tokens AS tokens, + tokens [0] :: STRING AS token0, + tokens [1] :: STRING AS token1, + tokens [2] :: STRING AS token2, + tokens [3] :: STRING AS token3, + tokens [4] :: STRING AS token4, + tokens [5] :: STRING AS token5, + tokens [6] :: STRING AS token6, + tokens [7] :: STRING AS token7, + decoded_flat :assetManagers AS asset_managers, + _log_id, + _inserted_timestamp + FROM + {{ ref('silver__decoded_logs') }} + WHERE + topics [0] :: STRING = '0xf5847d3f2197b16cdcd2098ec95d0905cd1abdaf415f07bb7cef2bba8ac5dec4' --TokensRegistered + AND contract_address = '0xba12222222228d8ba445958a75a0704d566bf2c8' + AND tx_hash IN ( + SELECT + tx_hash + FROM + pools_registered + ) ), - -reads_adjusted AS ( - +function_sigs AS ( + SELECT + '0x06fdde03' AS function_sig, + 'name' AS function_name + UNION ALL + SELECT + '0x95d89b41' AS function_sig, + 'symbol' AS function_name + UNION ALL + SELECT + '0x313ce567' AS function_sig, + 'decimals' AS function_name +), +inputs_pools AS ( + SELECT + pool_address, + block_number, + function_sig, + (ROW_NUMBER() over (PARTITION BY pool_address + ORDER BY + block_number)) - 1 AS function_input + FROM + pools_registered + JOIN function_sigs + ON 1 = 1 +), +pool_token_reads AS ({% for item in range(50) %} + ( SELECT + ethereum.streamline.udf_json_rpc_read_calls(node_url, headers, PARSE_JSON(batch_read)) AS read_output, SYSDATE() AS _inserted_timestamp +FROM + ( +SELECT + CONCAT('[', LISTAGG(read_input, ','), ']') AS batch_read +FROM + ( +SELECT + pool_address, block_number, function_sig, function_input, CONCAT('[\'', pool_address, '\',', block_number, ',\'', function_sig, '\',\'', function_input, '\']') AS read_input, row_num +FROM + inputs_pools + LEFT JOIN pools_registered USING(pool_address)) ready_reads_pools +WHERE + row_num BETWEEN {{ item * 50 + 1 }} + AND {{(item + 1) * 50 }}) batch_reads_pools + JOIN {{ source('streamline_crosschain', 'node_mapping') }} + ON 1 = 1 + AND chain = 'gnosis') {% if not loop.last %} + UNION ALL + {% endif %} +{% endfor %}), +reads_adjusted AS ( + SELECT VALUE :id :: STRING AS read_id, VALUE :result :: STRING AS read_result, SPLIT( @@ -157,40 +136,53 @@ SELECT read_id_object [2] :: STRING AS function_sig, read_id_object [3] :: STRING AS function_input, _inserted_timestamp -FROM - pool_token_reads, - LATERAL FLATTEN( - input => read_output [0] :data - ) + FROM + pool_token_reads, + LATERAL FLATTEN( + input => read_output [0] :data + ) ), - pool_details AS ( - -SELECT - pool_address, - function_sig, - function_name, - read_result, - regexp_substr_all(SUBSTR(read_result, 3, len(read_result)), '.{64}') AS segmented_output, - _inserted_timestamp -FROM reads_adjusted -LEFT JOIN function_sigs USING(function_sig) - ), - + SELECT + pool_address, + function_sig, + function_name, + read_result, + regexp_substr_all(SUBSTR(read_result, 3, len(read_result)), '.{64}') AS segmented_output, + _inserted_timestamp + FROM + reads_adjusted + LEFT JOIN function_sigs USING(function_sig) +), FINAL AS ( -SELECT - pool_address, - MIN(CASE WHEN function_name = 'symbol' THEN utils.udf_hex_to_string(segmented_output [2] :: STRING) END) AS pool_symbol, - MIN(CASE WHEN function_name = 'name' THEN utils.udf_hex_to_string(segmented_output [2] :: STRING) END) AS pool_name, - MIN(CASE - WHEN read_result::STRING = '0x' THEN NULL - ELSE utils.udf_hex_to_int(LEFT(read_result::STRING,66)) - END)::INTEGER AS pool_decimals, - MAX(_inserted_timestamp) AS _inserted_timestamp -FROM pool_details -GROUP BY 1 + SELECT + pool_address, + MIN( + CASE + WHEN function_name = 'symbol' THEN utils.udf_hex_to_string( + segmented_output [2] :: STRING + ) + END + ) AS pool_symbol, + MIN( + CASE + WHEN function_name = 'name' THEN utils.udf_hex_to_string( + segmented_output [2] :: STRING + ) + END + ) AS pool_name, + MIN( + CASE + WHEN read_result :: STRING = '0x' THEN NULL + ELSE utils.udf_hex_to_int(LEFT(read_result :: STRING, 66)) + END + ) :: INTEGER AS pool_decimals, + MAX(_inserted_timestamp) AS _inserted_timestamp + FROM + pool_details + GROUP BY + 1 ) - SELECT p.block_number, p.block_timestamp, @@ -213,7 +205,11 @@ SELECT t.asset_managers, p._log_id, f._inserted_timestamp -FROM FINAL f -LEFT JOIN pools_registered p ON f.pool_address = p.pool_address -LEFT JOIN tokens_registered t ON p.pool_id = t.pool_id -WHERE t.token0 IS NOT NULL +FROM + FINAL f + LEFT JOIN pools_registered p + ON f.pool_address = p.pool_address + LEFT JOIN tokens_registered t + ON p.pool_id = t.pool_id +WHERE + t.token0 IS NOT NULL diff --git a/models/silver/dex/balancer/silver_dex__balancer_swaps.sql b/models/silver/dex/balancer/silver_dex__balancer_swaps.sql index 8565785..8f56f85 100644 --- a/models/silver/dex/balancer/silver_dex__balancer_swaps.sql +++ b/models/silver/dex/balancer/silver_dex__balancer_swaps.sql @@ -3,7 +3,7 @@ incremental_strategy = 'delete+insert', unique_key = 'block_number', cluster_by = ['block_timestamp::DATE'], - tags = ['non_realtime','reorg'] + tags = ['curated','reorg'] ) }} WITH pools AS ( diff --git a/models/silver/dex/curve/silver_dex__curve_pools.sql b/models/silver/dex/curve/silver_dex__curve_pools.sql index 4620a90..c57014b 100644 --- a/models/silver/dex/curve/silver_dex__curve_pools.sql +++ b/models/silver/dex/curve/silver_dex__curve_pools.sql @@ -3,32 +3,38 @@ incremental_strategy = 'delete+insert', unique_key = 'block_number', full_refresh = false, - tags = ['non_realtime'] + tags = ['curated'] ) }} WITH contract_deployments AS ( -SELECT - tx_hash, - block_number, - block_timestamp, - from_address AS deployer_address, - to_address AS contract_address, - _call_id, - _inserted_timestamp, - ROW_NUMBER() OVER (ORDER BY contract_address) AS row_num -FROM - {{ ref('silver__traces' )}} -WHERE - -- curve contract deployers - from_address IN ( - '0x7eeac6cddbd1d0b8af061742d41877d7f707289a', - '0xcbaf0a32f5a16b326f00607421857f68fc72e508', - '0xd25fcbb7b6021cf83122fcd65be88a045d5f961c', - '0xd19baeadc667cf2015e395f2b08668ef120f41f5' + SELECT + tx_hash, + block_number, + block_timestamp, + from_address AS deployer_address, + to_address AS contract_address, + _call_id, + _inserted_timestamp, + ROW_NUMBER() over ( + ORDER BY + contract_address + ) AS row_num + FROM + {{ ref( + 'silver__traces' + ) }} + WHERE + -- curve contract deployers + from_address IN ( + '0x7eeac6cddbd1d0b8af061742d41877d7f707289a', + '0xcbaf0a32f5a16b326f00607421857f68fc72e508', + '0xd25fcbb7b6021cf83122fcd65be88a045d5f961c', + '0xd19baeadc667cf2015e395f2b08668ef120f41f5' ) - AND TYPE ilike 'create%' - AND TX_STATUS ilike 'success' + AND TYPE ILIKE 'create%' + AND tx_status ILIKE 'success' + {% if is_incremental() %} AND _inserted_timestamp >= ( SELECT @@ -43,188 +49,179 @@ AND to_address NOT IN ( {{ this }} ) {% endif %} -QUALIFY(ROW_NUMBER() OVER(PARTITION BY to_address ORDER BY block_timestamp ASC)) = 1 -), +qualify(ROW_NUMBER() over(PARTITION BY to_address +ORDER BY + block_timestamp ASC)) = 1 +), function_sigs AS ( - -SELECT - '0x87cb4f57' AS function_sig, - 'base_coins' AS function_name -UNION ALL -SELECT - '0xb9947eb0' AS function_sig, - 'underlying_coins' AS function_name -UNION ALL -SELECT - '0xc6610657' AS function_sig, - 'coins' AS function_name -UNION ALL -SELECT - '0x06fdde03' AS function_sig, - 'name' AS function_name -UNION ALL -SELECT - '0x95d89b41' AS function_sig, - 'symbol' AS function_name -UNION ALL -SELECT - '0x313ce567' AS function_sig, - 'decimals' AS function_name + SELECT + '0x87cb4f57' AS function_sig, + 'base_coins' AS function_name + UNION ALL + SELECT + '0xb9947eb0' AS function_sig, + 'underlying_coins' AS function_name + UNION ALL + SELECT + '0xc6610657' AS function_sig, + 'coins' AS function_name + UNION ALL + SELECT + '0x06fdde03' AS function_sig, + 'name' AS function_name + UNION ALL + SELECT + '0x95d89b41' AS function_sig, + 'symbol' AS function_name + UNION ALL + SELECT + '0x313ce567' AS function_sig, + 'decimals' AS function_name ), - function_inputs AS ( SELECT SEQ4() AS function_input FROM TABLE(GENERATOR(rowcount => 8)) ), - inputs_coins AS ( - -SELECT - deployer_address, - contract_address, - block_number, - function_sig, - (ROW_NUMBER() OVER (PARTITION BY contract_address - ORDER BY block_number)) - 1 AS function_input -FROM contract_deployments -JOIN function_sigs ON 1=1 -JOIN function_inputs ON 1=1 - WHERE function_name = 'coins' -), - -inputs_base_coins AS ( - -SELECT - deployer_address, - contract_address, - block_number, - function_sig, - (ROW_NUMBER() OVER (PARTITION BY contract_address - ORDER BY block_number)) - 1 AS function_input -FROM contract_deployments -JOIN function_sigs ON 1=1 -JOIN function_inputs ON 1=1 - WHERE function_name = 'base_coins' -), - -inputs_underlying_coins AS ( - -SELECT - deployer_address, - contract_address, - block_number, - function_sig, - (ROW_NUMBER() OVER (PARTITION BY contract_address - ORDER BY block_number)) - 1 AS function_input -FROM contract_deployments -JOIN function_sigs ON 1=1 -JOIN function_inputs ON 1=1 - WHERE function_name = 'underlying_coins' -), - -inputs_pool_details AS ( - -SELECT - deployer_address, - contract_address, - block_number, - function_sig, - NULL AS function_input -FROM contract_deployments -JOIN function_sigs ON 1=1 -WHERE function_name IN ('name','symbol','decimals') -), - -all_inputs AS ( - -SELECT - deployer_address, - contract_address, - block_number, - function_sig, - function_input -FROM inputs_coins -UNION ALL -SELECT - deployer_address, - contract_address, - block_number, - function_sig, - function_input -FROM inputs_base_coins -UNION ALL -SELECT - deployer_address, - contract_address, - block_number, - function_sig, - function_input -FROM inputs_underlying_coins -UNION ALL -SELECT - deployer_address, - contract_address, - block_number, - function_sig, - function_input -FROM inputs_pool_details -), - -pool_token_reads AS ( - -{% for item in range(20) %} -( -SELECT - ethereum.streamline.udf_json_rpc_read_calls( - node_url, - headers, - PARSE_JSON(batch_read) - ) AS read_output, - SYSDATE() AS _inserted_timestamp -FROM ( SELECT - CONCAT('[', LISTAGG(read_input, ','), ']') AS batch_read - FROM ( - SELECT - deployer_address, - contract_address, - block_number, - function_sig, - function_input, - CONCAT( - '[\'', - contract_address, - '\',', - block_number, - ',\'', - function_sig, - '\',\'', - (CASE WHEN function_input IS NULL THEN '' ELSE function_input::STRING END), - '\']' - ) AS read_input, - row_num - FROM all_inputs - LEFT JOIN contract_deployments USING(contract_address) - ) ready_reads_pools - WHERE row_num BETWEEN {{ item * 50 + 1 }} AND {{ (item + 1) * 50}} - ) batch_reads_pools -JOIN {{ source( - 'streamline_crosschain', - 'node_mapping' - ) }} ON 1=1 - AND chain = 'gnosis' -) {% if not loop.last %} -UNION ALL -{% endif %} -{% endfor %} + deployer_address, + contract_address, + block_number, + function_sig, + (ROW_NUMBER() over (PARTITION BY contract_address + ORDER BY + block_number)) - 1 AS function_input + FROM + contract_deployments + JOIN function_sigs + ON 1 = 1 + JOIN function_inputs + ON 1 = 1 + WHERE + function_name = 'coins' ), - -reads_adjusted AS ( - +inputs_base_coins AS ( + SELECT + deployer_address, + contract_address, + block_number, + function_sig, + (ROW_NUMBER() over (PARTITION BY contract_address + ORDER BY + block_number)) - 1 AS function_input + FROM + contract_deployments + JOIN function_sigs + ON 1 = 1 + JOIN function_inputs + ON 1 = 1 + WHERE + function_name = 'base_coins' +), +inputs_underlying_coins AS ( + SELECT + deployer_address, + contract_address, + block_number, + function_sig, + (ROW_NUMBER() over (PARTITION BY contract_address + ORDER BY + block_number)) - 1 AS function_input + FROM + contract_deployments + JOIN function_sigs + ON 1 = 1 + JOIN function_inputs + ON 1 = 1 + WHERE + function_name = 'underlying_coins' +), +inputs_pool_details AS ( + SELECT + deployer_address, + contract_address, + block_number, + function_sig, + NULL AS function_input + FROM + contract_deployments + JOIN function_sigs + ON 1 = 1 + WHERE + function_name IN ( + 'name', + 'symbol', + 'decimals' + ) +), +all_inputs AS ( + SELECT + deployer_address, + contract_address, + block_number, + function_sig, + function_input + FROM + inputs_coins + UNION ALL + SELECT + deployer_address, + contract_address, + block_number, + function_sig, + function_input + FROM + inputs_base_coins + UNION ALL + SELECT + deployer_address, + contract_address, + block_number, + function_sig, + function_input + FROM + inputs_underlying_coins + UNION ALL + SELECT + deployer_address, + contract_address, + block_number, + function_sig, + function_input + FROM + inputs_pool_details +), +pool_token_reads AS ({% for item in range(20) %} + ( SELECT + ethereum.streamline.udf_json_rpc_read_calls(node_url, headers, PARSE_JSON(batch_read)) AS read_output, SYSDATE() AS _inserted_timestamp +FROM + ( +SELECT + CONCAT('[', LISTAGG(read_input, ','), ']') AS batch_read +FROM + ( +SELECT + deployer_address, contract_address, block_number, function_sig, function_input, CONCAT('[\'', contract_address, '\',', block_number, ',\'', function_sig, '\',\'', (CASE + WHEN function_input IS NULL THEN '' + ELSE function_input :: STRINGEND), '\']') AS read_input, row_num +FROM + all_inputs + LEFT JOIN contract_deployments USING(contract_address)) ready_reads_pools +WHERE + row_num BETWEEN {{ item * 50 + 1 }} + AND {{(item + 1) * 50 }}) batch_reads_pools + JOIN {{ source('streamline_crosschain', 'node_mapping') }} + ON 1 = 1 + AND chain = 'gnosis') {% if not loop.last %} + UNION ALL + {% endif %} +{% endfor %}), +reads_adjusted AS ( + SELECT VALUE :id :: STRING AS read_id, VALUE :result :: STRING AS read_result, SPLIT( @@ -236,113 +233,147 @@ SELECT read_id_object [2] :: STRING AS function_sig, read_id_object [3] :: STRING AS function_input, _inserted_timestamp -FROM - pool_token_reads, - LATERAL FLATTEN( - input => read_output [0] :data - ) + FROM + pool_token_reads, + LATERAL FLATTEN( + input => read_output [0] :data + ) ), - tokens AS ( - -SELECT - contract_address, - function_sig, - function_name, - function_input, - read_result, - regexp_substr_all(SUBSTR(read_result, 3, len(read_result)), '.{64}')[0]AS segmented_token_address, - _inserted_timestamp -FROM reads_adjusted -LEFT JOIN function_sigs USING(function_sig) -WHERE function_name IN ('coins','base_coins','underlying_coins') - AND read_result IS NOT NULL - -), - -pool_details AS ( - -SELECT - contract_address, - function_sig, - function_name, - function_input, - read_result, - regexp_substr_all(SUBSTR(read_result, 3, len(read_result)), '.{64}') AS segmented_output, - _inserted_timestamp -FROM reads_adjusted -LEFT JOIN function_sigs USING(function_sig) -WHERE function_name IN ('name','symbol','decimals') - AND read_result IS NOT NULL - -), - -all_pools AS ( -SELECT - t.contract_address AS pool_address, - CONCAT('0x',SUBSTRING(t.segmented_token_address,25,40)) AS token_address, - function_input AS token_id, - function_name AS token_type, - MIN(CASE WHEN p.function_name = 'symbol' THEN utils.udf_hex_to_string(RTRIM(p.segmented_output [2] :: STRING, 0)) END) AS pool_symbol, - MIN(CASE WHEN p.function_name = 'name' THEN CONCAT(utils.udf_hex_to_string(p.segmented_output [2] :: STRING), - utils.udf_hex_to_string(segmented_output [3] :: STRING)) END) AS pool_name, - MIN(CASE - WHEN p.read_result::STRING = '0x' THEN NULL - ELSE utils.udf_hex_to_int(LEFT(p.read_result::STRING,66)) - END)::INTEGER AS pool_decimals, - CONCAT( - t.contract_address, - '-', - CONCAT('0x',SUBSTRING(t.segmented_token_address,25,40)), - '-', + SELECT + contract_address, + function_sig, + function_name, function_input, - '-', - function_name - ) AS pool_id, - MAX(t._inserted_timestamp) AS _inserted_timestamp -FROM tokens t -LEFT JOIN pool_details p USING(contract_address) -WHERE token_address IS NOT NULL - AND token_address <> '0x0000000000000000000000000000000000000000' -GROUP BY 1,2,3,4 + read_result, + regexp_substr_all(SUBSTR(read_result, 3, len(read_result)), '.{64}') [0] AS segmented_token_address, + _inserted_timestamp + FROM + reads_adjusted + LEFT JOIN function_sigs USING(function_sig) + WHERE + function_name IN ( + 'coins', + 'base_coins', + 'underlying_coins' + ) + AND read_result IS NOT NULL +), +pool_details AS ( + SELECT + contract_address, + function_sig, + function_name, + function_input, + read_result, + regexp_substr_all(SUBSTR(read_result, 3, len(read_result)), '.{64}') AS segmented_output, + _inserted_timestamp + FROM + reads_adjusted + LEFT JOIN function_sigs USING(function_sig) + WHERE + function_name IN ( + 'name', + 'symbol', + 'decimals' + ) + AND read_result IS NOT NULL +), +all_pools AS ( + SELECT + t.contract_address AS pool_address, + CONCAT('0x', SUBSTRING(t.segmented_token_address, 25, 40)) AS token_address, + function_input AS token_id, + function_name AS token_type, + MIN( + CASE + WHEN p.function_name = 'symbol' THEN utils.udf_hex_to_string(RTRIM(p.segmented_output [2] :: STRING, 0)) + END + ) AS pool_symbol, + MIN( + CASE + WHEN p.function_name = 'name' THEN CONCAT( + utils.udf_hex_to_string( + p.segmented_output [2] :: STRING + ), + utils.udf_hex_to_string( + segmented_output [3] :: STRING + ) + ) + END + ) AS pool_name, + MIN( + CASE + WHEN p.read_result :: STRING = '0x' THEN NULL + ELSE utils.udf_hex_to_int(LEFT(p.read_result :: STRING, 66)) + END + ) :: INTEGER AS pool_decimals, + CONCAT( + t.contract_address, + '-', + CONCAT('0x', SUBSTRING(t.segmented_token_address, 25, 40)), + '-', + function_input, + '-', + function_name + ) AS pool_id, + MAX( + t._inserted_timestamp + ) AS _inserted_timestamp + FROM + tokens t + LEFT JOIN pool_details p USING(contract_address) + WHERE + token_address IS NOT NULL + AND token_address <> '0x0000000000000000000000000000000000000000' + GROUP BY + 1, + 2, + 3, + 4 ), - FINAL AS ( -SELECT - block_number, - block_timestamp, - tx_hash, - deployer_address, - pool_address, - CASE - WHEN token_address = '0xeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeee' THEN '0xe91d153e0b41518a2ce8dd3d7944fa863463a97d' - ELSE token_address - END AS token_address, - token_id, - token_type, - CASE - WHEN token_address = '0xeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeee' THEN 'WXDAI' - WHEN pool_symbol IS NULL THEN c.token_symbol - ELSE pool_symbol - END AS pool_symbol, - pool_name, - CASE - WHEN token_address = '0xeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeee' THEN '18' - WHEN pool_decimals IS NULL THEN c.token_decimals - ELSE pool_decimals - END AS pool_decimals, - pool_id, - _call_id, - a._inserted_timestamp -FROM all_pools a -LEFT JOIN {{ ref('silver__contracts') }} c - ON a.token_address = c.contract_address -LEFT JOIN contract_deployments d - ON a.pool_address = d.contract_address -QUALIFY(ROW_NUMBER() OVER(PARTITION BY pool_address, token_address ORDER BY a._inserted_timestamp DESC)) = 1 + SELECT + block_number, + block_timestamp, + tx_hash, + deployer_address, + pool_address, + CASE + WHEN token_address = '0xeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeee' THEN '0xe91d153e0b41518a2ce8dd3d7944fa863463a97d' + ELSE token_address + END AS token_address, + token_id, + token_type, + CASE + WHEN token_address = '0xeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeee' THEN 'WXDAI' + WHEN pool_symbol IS NULL THEN C.token_symbol + ELSE pool_symbol + END AS pool_symbol, + pool_name, + CASE + WHEN token_address = '0xeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeee' THEN '18' + WHEN pool_decimals IS NULL THEN C.token_decimals + ELSE pool_decimals + END AS pool_decimals, + pool_id, + _call_id, + A._inserted_timestamp + FROM + all_pools A + LEFT JOIN {{ ref('silver__contracts') }} C + ON A.token_address = C.contract_address + LEFT JOIN contract_deployments d + ON A.pool_address = d.contract_address qualify(ROW_NUMBER() over(PARTITION BY pool_address, token_address + ORDER BY + A._inserted_timestamp DESC)) = 1 ) - SELECT *, - ROW_NUMBER() OVER (PARTITION BY pool_address ORDER BY token_address ASC) AS token_num -FROM FINAL \ No newline at end of file + ROW_NUMBER() over ( + PARTITION BY pool_address + ORDER BY + token_address ASC + ) AS token_num +FROM + FINAL diff --git a/models/silver/dex/curve/silver_dex__curve_swaps.sql b/models/silver/dex/curve/silver_dex__curve_swaps.sql index 0fd2fda..08111c6 100644 --- a/models/silver/dex/curve/silver_dex__curve_swaps.sql +++ b/models/silver/dex/curve/silver_dex__curve_swaps.sql @@ -3,7 +3,7 @@ incremental_strategy = 'delete+insert', unique_key = 'block_number', cluster_by = ['block_timestamp::DATE'], - tags = ['non_realtime','reorg'] + tags = ['curated','reorg'] ) }} WITH pool_meta AS ( @@ -13,20 +13,20 @@ WITH pool_meta AS ( pool_name, token_address, pool_symbol AS symbol, - token_id::INTEGER AS token_id, - token_type::STRING AS token_type + token_id :: INTEGER AS token_id, + token_type :: STRING AS token_type FROM {{ ref('silver_dex__curve_pools') }} ), - pools AS ( - SELECT + SELECT DISTINCT pool_address, pool_name - FROM pool_meta - QUALIFY (ROW_NUMBER() OVER (PARTITION BY pool_address ORDER BY pool_name ASC NULLS LAST)) = 1 + FROM + pool_meta qualify (ROW_NUMBER() over (PARTITION BY pool_address + ORDER BY + pool_name ASC nulls last)) = 1 ), - curve_base AS ( SELECT block_number, @@ -45,28 +45,39 @@ curve_base AS ( pool_name, regexp_substr_all(SUBSTR(DATA, 3, len(DATA)), '.{64}') AS segmented_data, CONCAT('0x', SUBSTR(topics [1] :: STRING, 27, 40)) AS sender, - TRY_TO_NUMBER(utils.udf_hex_to_int( - segmented_data [0] :: STRING - )) AS sold_id, - TRY_TO_NUMBER(utils.udf_hex_to_int( - segmented_data [1] :: STRING - )) AS tokens_sold, - TRY_TO_NUMBER(utils.udf_hex_to_int( - segmented_data [2] :: STRING - )) AS bought_id, - TRY_TO_NUMBER(utils.udf_hex_to_int( - segmented_data [3] :: STRING - )) AS tokens_bought, + TRY_TO_NUMBER( + utils.udf_hex_to_int( + segmented_data [0] :: STRING + ) + ) AS sold_id, + TRY_TO_NUMBER( + utils.udf_hex_to_int( + segmented_data [1] :: STRING + ) + ) AS tokens_sold, + TRY_TO_NUMBER( + utils.udf_hex_to_int( + segmented_data [2] :: STRING + ) + ) AS bought_id, + TRY_TO_NUMBER( + utils.udf_hex_to_int( + segmented_data [3] :: STRING + ) + ) AS tokens_bought, _log_id, _inserted_timestamp FROM - {{ ref('silver__logs') }} l + {{ ref('silver__logs') }} + l INNER JOIN pools p ON p.pool_address = l.contract_address WHERE topics [0] :: STRING IN ( - '0x8b3e96f2b889fa771c53c981b40daf005f63f637f1869f707052d15a3dd97140', --TokenExchange - '0xb2e76ae99761dc136e598d4a629bb347eccb9532a5f8bbd72e18467c3c34cc98', --TokenExchange + '0x8b3e96f2b889fa771c53c981b40daf005f63f637f1869f707052d15a3dd97140', + --TokenExchange + '0xb2e76ae99761dc136e598d4a629bb347eccb9532a5f8bbd72e18467c3c34cc98', + --TokenExchange '0xd013ca23e77a65003c2c659c5442c00c805371b7fc1ebd4c206c41d1536bd90b' --TokenExchangeUnderlying ) @@ -79,21 +90,42 @@ AND _inserted_timestamp >= ( ) {% endif %} ), - token_exchange AS ( - -SELECT - _log_id, - MAX(CASE WHEN sold_id = token_id THEN token_address END) AS token_in, - MAX(CASE WHEN bought_id = token_id THEN token_address END) AS token_out, - MAX(CASE WHEN sold_id = token_id THEN symbol END) AS symbol_in, - MAX(CASE WHEN bought_id = token_id THEN symbol END) AS symbol_out -FROM curve_base t -LEFT JOIN pool_meta p ON p.pool_address = t.pool_address AND (p.token_id = t.sold_id OR p.token_id = t.bought_id) -WHERE token_type = 'coins' -GROUP BY 1 + SELECT + _log_id, + MAX( + CASE + WHEN sold_id = token_id THEN token_address + END + ) AS token_in, + MAX( + CASE + WHEN bought_id = token_id THEN token_address + END + ) AS token_out, + MAX( + CASE + WHEN sold_id = token_id THEN symbol + END + ) AS symbol_in, + MAX( + CASE + WHEN bought_id = token_id THEN symbol + END + ) AS symbol_out + FROM + curve_base t + LEFT JOIN pool_meta p + ON p.pool_address = t.pool_address + AND ( + p.token_id = t.sold_id + OR p.token_id = t.bought_id + ) + WHERE + token_type = 'coins' + GROUP BY + 1 ), - token_transfers AS ( SELECT tx_hash, @@ -144,49 +176,52 @@ to_transfers AS ( FROM token_transfers ), - ready_pool_info AS ( - -SELECT - s.block_number, - s.block_timestamp, - s.tx_hash, - s.origin_function_signature, - s.origin_from_address, - s.origin_from_address AS tx_to, - s.origin_to_address, - event_index, - event_name, - pool_address, - pool_address AS contract_address, - pool_name, - sender, - sold_id, - tokens_sold, - COALESCE(sold.token_address,e.token_in) AS token_in, - e.symbol_in AS symbol_in, - bought_id, - tokens_bought, - COALESCE(bought.token_address,e.token_out) AS token_out, - e.symbol_out AS symbol_out, - s._log_id, - _inserted_timestamp -FROM - curve_base s - LEFT JOIN token_exchange e ON s._log_id = e._log_id - LEFT JOIN from_transfers sold - ON tokens_sold = sold.amount - AND s.tx_hash = sold.tx_hash - LEFT JOIN to_transfers bought - ON tokens_bought = bought.amount - AND s.tx_hash = bought.tx_hash -WHERE - tokens_sold <> 0 -qualify(ROW_NUMBER() over(PARTITION BY s._log_id + SELECT + s.block_number, + s.block_timestamp, + s.tx_hash, + s.origin_function_signature, + s.origin_from_address, + s.origin_from_address AS tx_to, + s.origin_to_address, + event_index, + event_name, + pool_address, + pool_address AS contract_address, + pool_name, + sender, + sold_id, + tokens_sold, + COALESCE( + sold.token_address, + e.token_in + ) AS token_in, + e.symbol_in AS symbol_in, + bought_id, + tokens_bought, + COALESCE( + bought.token_address, + e.token_out + ) AS token_out, + e.symbol_out AS symbol_out, + s._log_id, + _inserted_timestamp + FROM + curve_base s + LEFT JOIN token_exchange e + ON s._log_id = e._log_id + LEFT JOIN from_transfers sold + ON tokens_sold = sold.amount + AND s.tx_hash = sold.tx_hash + LEFT JOIN to_transfers bought + ON tokens_bought = bought.amount + AND s.tx_hash = bought.tx_hash + WHERE + tokens_sold <> 0 qualify(ROW_NUMBER() over(PARTITION BY s._log_id ORDER BY - _inserted_timestamp DESC)) = 1 + _inserted_timestamp DESC)) = 1 ) - SELECT block_number, block_timestamp, @@ -213,4 +248,4 @@ SELECT _inserted_timestamp, 'curve' AS platform FROM - ready_pool_info \ No newline at end of file + ready_pool_info diff --git a/models/silver/dex/honeyswap/silver_dex__honeyswap_pools.sql b/models/silver/dex/honeyswap/silver_dex__honeyswap_pools.sql index 7c41c83..6bfda43 100644 --- a/models/silver/dex/honeyswap/silver_dex__honeyswap_pools.sql +++ b/models/silver/dex/honeyswap/silver_dex__honeyswap_pools.sql @@ -2,7 +2,7 @@ materialized = 'incremental', incremental_strategy = 'delete+insert', unique_key = 'block_number', - tags = ['non_realtime'] + tags = ['curated'] ) }} WITH pool_creation AS ( diff --git a/models/silver/dex/honeyswap/silver_dex__honeyswap_swaps.sql b/models/silver/dex/honeyswap/silver_dex__honeyswap_swaps.sql index b30a3fc..48dfd2f 100644 --- a/models/silver/dex/honeyswap/silver_dex__honeyswap_swaps.sql +++ b/models/silver/dex/honeyswap/silver_dex__honeyswap_swaps.sql @@ -3,7 +3,7 @@ incremental_strategy = 'delete+insert', unique_key = 'block_number', cluster_by = ['block_timestamp::DATE'], - tags = ['non_realtime','reorg'] + tags = ['curated','reorg'] ) }} WITH pools AS ( diff --git a/models/silver/dex/silver_dex__complete_dex_liquidity_pools.sql b/models/silver/dex/silver_dex__complete_dex_liquidity_pools.sql index c1b4800..3af78b1 100644 --- a/models/silver/dex/silver_dex__complete_dex_liquidity_pools.sql +++ b/models/silver/dex/silver_dex__complete_dex_liquidity_pools.sql @@ -3,7 +3,7 @@ incremental_strategy = 'delete+insert', unique_key = ['block_number','platform','version'], cluster_by = ['block_timestamp::DATE'], - tags = ['non_realtime','reorg'] + tags = ['curated','reorg'] ) }} WITH contracts AS ( @@ -16,10 +16,8 @@ WITH contracts AS ( FROM {{ ref('silver__contracts') }} ), - balancer AS ( - -SELECT + SELECT block_number, block_timestamp, tx_hash, @@ -38,8 +36,9 @@ SELECT token5, token6, token7 -FROM + FROM {{ ref('silver_dex__balancer_pools') }} + {% if is_incremental() %} WHERE _inserted_timestamp >= ( @@ -50,10 +49,8 @@ WHERE ) {% endif %} ), - curve AS ( - -SELECT + SELECT block_number, block_timestamp, tx_hash, @@ -64,16 +61,49 @@ SELECT 'v1' AS version, _call_id AS _id, _inserted_timestamp, - MAX(CASE WHEN token_num = 1 THEN token_address END) AS token0, - MAX(CASE WHEN token_num = 2 THEN token_address END) AS token1, - MAX(CASE WHEN token_num = 3 THEN token_address END) AS token2, - MAX(CASE WHEN token_num = 4 THEN token_address END) AS token3, - MAX(CASE WHEN token_num = 5 THEN token_address END) AS token4, - MAX(CASE WHEN token_num = 6 THEN token_address END) AS token5, - MAX(CASE WHEN token_num = 7 THEN token_address END) AS token6, - MAX(CASE WHEN token_num = 8 THEN token_address END) AS token7 -FROM + MAX( + CASE + WHEN token_num = 1 THEN token_address + END + ) AS token0, + MAX( + CASE + WHEN token_num = 2 THEN token_address + END + ) AS token1, + MAX( + CASE + WHEN token_num = 3 THEN token_address + END + ) AS token2, + MAX( + CASE + WHEN token_num = 4 THEN token_address + END + ) AS token3, + MAX( + CASE + WHEN token_num = 5 THEN token_address + END + ) AS token4, + MAX( + CASE + WHEN token_num = 6 THEN token_address + END + ) AS token5, + MAX( + CASE + WHEN token_num = 7 THEN token_address + END + ) AS token6, + MAX( + CASE + WHEN token_num = 8 THEN token_address + END + ) AS token7 + FROM {{ ref('silver_dex__curve_pools') }} + {% if is_incremental() %} WHERE _inserted_timestamp >= ( @@ -83,12 +113,11 @@ WHERE {{ this }} ) {% endif %} -GROUP BY all +GROUP BY + ALL ), - honeyswap AS ( - -SELECT + SELECT block_number, block_timestamp, tx_hash, @@ -101,8 +130,9 @@ SELECT 'v1' AS version, _log_id AS _id, _inserted_timestamp -FROM + FROM {{ ref('silver_dex__honeyswap_pools') }} + {% if is_incremental() %} WHERE _inserted_timestamp >= ( @@ -113,10 +143,8 @@ WHERE ) {% endif %} ), - swapr AS ( - -SELECT + SELECT block_number, block_timestamp, tx_hash, @@ -129,8 +157,9 @@ SELECT 'v1' AS version, _log_id AS _id, _inserted_timestamp -FROM + FROM {{ ref('silver_dex__swapr_pools') }} + {% if is_incremental() %} WHERE _inserted_timestamp >= ( @@ -141,10 +170,8 @@ WHERE ) {% endif %} ), - sushi AS ( - -SELECT + SELECT block_number, block_timestamp, tx_hash, @@ -157,8 +184,9 @@ SELECT 'v1' AS version, _log_id AS _id, _inserted_timestamp -FROM + FROM {{ ref('silver_dex__sushi_pools') }} + {% if is_incremental() %} WHERE _inserted_timestamp >= ( @@ -169,113 +197,213 @@ WHERE ) {% endif %} ), - all_pools_standard AS ( - SELECT * - FROM honeyswap - UNION ALL - SELECT * - FROM swapr - UNION ALL - SELECT * - FROM sushi + SELECT + * + FROM + honeyswap + UNION ALL + SELECT + * + FROM + swapr + UNION ALL + SELECT + * + FROM + sushi ), - all_pools_other AS ( - SELECT * - FROM balancer - UNION ALL - SELECT * - FROM curve + SELECT + * + FROM + balancer + UNION ALL + SELECT + * + FROM + curve ), - FINAL AS ( - SELECT - block_number, - block_timestamp, - tx_hash, - contract_address, - pool_address, - CASE - WHEN pool_name IS NULL - THEN CONCAT( - COALESCE(c0.symbol,CONCAT(SUBSTRING(token0, 1, 5),'...',SUBSTRING(token0, 39, 42))), - '-', - COALESCE(c1.symbol,CONCAT(SUBSTRING(token1, 1, 5),'...',SUBSTRING(token1, 39, 42))) - ) - ELSE pool_name - END AS pool_name, - OBJECT_CONSTRUCT('token0',token0,'token1',token1) AS tokens, - OBJECT_CONSTRUCT('token0',c0.symbol,'token1',c1.symbol) AS symbols, - OBJECT_CONSTRUCT('token0',c0.decimals,'token1',c1.decimals) AS decimals, - platform, - version, - _id, - p._inserted_timestamp - FROM all_pools_standard p - LEFT JOIN contracts c0 - ON c0.address = p.token0 - LEFT JOIN contracts c1 - ON c1.address = p.token1 - UNION ALL - SELECT - block_number, - block_timestamp, - tx_hash, - contract_address, - pool_address, - CASE - WHEN pool_name IS NULL - THEN CONCAT( - COALESCE(c0.symbol, SUBSTRING(token0, 1, 5) || '...' || SUBSTRING(token0, 39, 42)), - CASE WHEN token1 IS NOT NULL THEN '-' || COALESCE(c1.symbol, SUBSTRING(token1, 1, 5) || '...' || SUBSTRING(token1, 39, 42)) ELSE '' END, - CASE WHEN token2 IS NOT NULL THEN '-' || COALESCE(c2.symbol, SUBSTRING(token2, 1, 5) || '...' || SUBSTRING(token2, 39, 42)) ELSE '' END, - CASE WHEN token3 IS NOT NULL THEN '-' || COALESCE(c3.symbol, SUBSTRING(token3, 1, 5) || '...' || SUBSTRING(token3, 39, 42)) ELSE '' END, - CASE WHEN token4 IS NOT NULL THEN '-' || COALESCE(c4.symbol, SUBSTRING(token4, 1, 5) || '...' || SUBSTRING(token4, 39, 42)) ELSE '' END, - CASE WHEN token5 IS NOT NULL THEN '-' || COALESCE(c5.symbol, SUBSTRING(token5, 1, 5) || '...' || SUBSTRING(token5, 39, 42)) ELSE '' END, - CASE WHEN token6 IS NOT NULL THEN '-' || COALESCE(c6.symbol, SUBSTRING(token6, 1, 5) || '...' || SUBSTRING(token6, 39, 42)) ELSE '' END, - CASE WHEN token7 IS NOT NULL THEN '-' || COALESCE(c7.symbol, SUBSTRING(token7, 1, 5) || '...' || SUBSTRING(token7, 39, 42)) ELSE '' END - ) - ELSE pool_name - END AS pool_name, - OBJECT_CONSTRUCT('token0', token0, 'token1', token1, 'token2', token2, 'token3', token3, 'token4', token4, 'token5', token5, 'token6', token6, 'token7', token7) AS tokens, - OBJECT_CONSTRUCT('token0', c0.symbol, 'token1', c1.symbol, 'token2', c2.symbol, 'token3', c3.symbol, 'token4', c4.symbol, 'token5', c5.symbol, 'token6', c6.symbol, 'token7', c7.symbol) AS symbols, - OBJECT_CONSTRUCT('token0', c0.decimals, 'token1', c1.decimals, 'token2', c2.decimals, 'token3', c3.decimals, 'token4', c4.decimals, 'token5', c5.decimals, 'token6', c6.decimals, 'token7', c7.decimals) AS decimals, - platform, - version, - _id, - p._inserted_timestamp - FROM all_pools_other p - LEFT JOIN contracts c0 - ON c0.address = p.token0 - LEFT JOIN contracts c1 - ON c1.address = p.token1 - LEFT JOIN contracts c2 - ON c2.address = p.token2 - LEFT JOIN contracts c3 - ON c3.address = p.token3 - LEFT JOIN contracts c4 - ON c4.address = p.token4 - LEFT JOIN contracts c5 - ON c5.address = p.token5 - LEFT JOIN contracts c6 - ON c6.address = p.token6 - LEFT JOIN contracts c7 - ON c7.address = p.token7 -) - -SELECT + SELECT block_number, block_timestamp, tx_hash, - platform, - version, contract_address, pool_address, - pool_name, - tokens, - symbols, - decimals, + CASE + WHEN pool_name IS NULL THEN CONCAT( + COALESCE( + c0.symbol, + CONCAT(SUBSTRING(token0, 1, 5), '...', SUBSTRING(token0, 39, 42)) + ), + '-', + COALESCE( + c1.symbol, + CONCAT(SUBSTRING(token1, 1, 5), '...', SUBSTRING(token1, 39, 42)) + ) + ) + ELSE pool_name + END AS pool_name, + OBJECT_CONSTRUCT( + 'token0', + token0, + 'token1', + token1 + ) AS tokens, + OBJECT_CONSTRUCT( + 'token0', + c0.symbol, + 'token1', + c1.symbol + ) AS symbols, + OBJECT_CONSTRUCT( + 'token0', + c0.decimals, + 'token1', + c1.decimals + ) AS decimals, + platform, + version, _id, - _inserted_timestamp -FROM FINAL \ No newline at end of file + p._inserted_timestamp + FROM + all_pools_standard p + LEFT JOIN contracts c0 + ON c0.address = p.token0 + LEFT JOIN contracts c1 + ON c1.address = p.token1 + UNION ALL + SELECT + block_number, + block_timestamp, + tx_hash, + contract_address, + pool_address, + CASE + WHEN pool_name IS NULL THEN CONCAT( + COALESCE(c0.symbol, SUBSTRING(token0, 1, 5) || '...' || SUBSTRING(token0, 39, 42)), + CASE + WHEN token1 IS NOT NULL THEN '-' || COALESCE(c1.symbol, SUBSTRING(token1, 1, 5) || '...' || SUBSTRING(token1, 39, 42)) + ELSE '' + END, + CASE + WHEN token2 IS NOT NULL THEN '-' || COALESCE(c2.symbol, SUBSTRING(token2, 1, 5) || '...' || SUBSTRING(token2, 39, 42)) + ELSE '' + END, + CASE + WHEN token3 IS NOT NULL THEN '-' || COALESCE(c3.symbol, SUBSTRING(token3, 1, 5) || '...' || SUBSTRING(token3, 39, 42)) + ELSE '' + END, + CASE + WHEN token4 IS NOT NULL THEN '-' || COALESCE(c4.symbol, SUBSTRING(token4, 1, 5) || '...' || SUBSTRING(token4, 39, 42)) + ELSE '' + END, + CASE + WHEN token5 IS NOT NULL THEN '-' || COALESCE(c5.symbol, SUBSTRING(token5, 1, 5) || '...' || SUBSTRING(token5, 39, 42)) + ELSE '' + END, + CASE + WHEN token6 IS NOT NULL THEN '-' || COALESCE(c6.symbol, SUBSTRING(token6, 1, 5) || '...' || SUBSTRING(token6, 39, 42)) + ELSE '' + END, + CASE + WHEN token7 IS NOT NULL THEN '-' || COALESCE(c7.symbol, SUBSTRING(token7, 1, 5) || '...' || SUBSTRING(token7, 39, 42)) + ELSE '' + END + ) + ELSE pool_name + END AS pool_name, + OBJECT_CONSTRUCT( + 'token0', + token0, + 'token1', + token1, + 'token2', + token2, + 'token3', + token3, + 'token4', + token4, + 'token5', + token5, + 'token6', + token6, + 'token7', + token7 + ) AS tokens, + OBJECT_CONSTRUCT( + 'token0', + c0.symbol, + 'token1', + c1.symbol, + 'token2', + c2.symbol, + 'token3', + c3.symbol, + 'token4', + c4.symbol, + 'token5', + c5.symbol, + 'token6', + c6.symbol, + 'token7', + c7.symbol + ) AS symbols, + OBJECT_CONSTRUCT( + 'token0', + c0.decimals, + 'token1', + c1.decimals, + 'token2', + c2.decimals, + 'token3', + c3.decimals, + 'token4', + c4.decimals, + 'token5', + c5.decimals, + 'token6', + c6.decimals, + 'token7', + c7.decimals + ) AS decimals, + platform, + version, + _id, + p._inserted_timestamp + FROM + all_pools_other p + LEFT JOIN contracts c0 + ON c0.address = p.token0 + LEFT JOIN contracts c1 + ON c1.address = p.token1 + LEFT JOIN contracts c2 + ON c2.address = p.token2 + LEFT JOIN contracts c3 + ON c3.address = p.token3 + LEFT JOIN contracts c4 + ON c4.address = p.token4 + LEFT JOIN contracts c5 + ON c5.address = p.token5 + LEFT JOIN contracts c6 + ON c6.address = p.token6 + LEFT JOIN contracts c7 + ON c7.address = p.token7 +) +SELECT + block_number, + block_timestamp, + tx_hash, + platform, + version, + contract_address, + pool_address, + pool_name, + tokens, + symbols, + decimals, + _id, + _inserted_timestamp +FROM + FINAL diff --git a/models/silver/dex/silver_dex__complete_dex_swaps.sql b/models/silver/dex/silver_dex__complete_dex_swaps.sql index 4c5383b..71e4145 100644 --- a/models/silver/dex/silver_dex__complete_dex_swaps.sql +++ b/models/silver/dex/silver_dex__complete_dex_swaps.sql @@ -3,7 +3,7 @@ incremental_strategy = 'delete+insert', unique_key = ['block_number','platform','version'], cluster_by = ['block_timestamp::DATE'], - tags = ['non_realtime','reorg'] + tags = ['curated','reorg'] ) }} WITH contracts AS ( diff --git a/models/silver/dex/sushi/silver_dex__sushi_pools.sql b/models/silver/dex/sushi/silver_dex__sushi_pools.sql index 296051b..13b90e2 100644 --- a/models/silver/dex/sushi/silver_dex__sushi_pools.sql +++ b/models/silver/dex/sushi/silver_dex__sushi_pools.sql @@ -2,7 +2,7 @@ materialized = 'incremental', incremental_strategy = 'delete+insert', unique_key = 'block_number', - tags = ['non_realtime'] + tags = ['curated'] ) }} WITH pool_creation AS ( diff --git a/models/silver/dex/sushi/silver_dex__sushi_swaps.sql b/models/silver/dex/sushi/silver_dex__sushi_swaps.sql index b145569..cb41e8f 100644 --- a/models/silver/dex/sushi/silver_dex__sushi_swaps.sql +++ b/models/silver/dex/sushi/silver_dex__sushi_swaps.sql @@ -3,7 +3,7 @@ incremental_strategy = 'delete+insert', unique_key = 'block_number', cluster_by = ['block_timestamp::DATE'], - tags = ['non_realtime','reorg'] + tags = ['curated','reorg'] ) }} WITH pools AS ( diff --git a/models/silver/dex/swapr/silver_dex__swapr_pools.sql b/models/silver/dex/swapr/silver_dex__swapr_pools.sql index 95c68cc..91e74ed 100644 --- a/models/silver/dex/swapr/silver_dex__swapr_pools.sql +++ b/models/silver/dex/swapr/silver_dex__swapr_pools.sql @@ -2,7 +2,7 @@ materialized = 'incremental', incremental_strategy = 'delete+insert', unique_key = 'block_number', - tags = ['non_realtime'] + tags = ['curated'] ) }} WITH pool_creation AS ( diff --git a/models/silver/dex/swapr/silver_dex__swapr_swaps.sql b/models/silver/dex/swapr/silver_dex__swapr_swaps.sql index dc003ce..b1d2751 100644 --- a/models/silver/dex/swapr/silver_dex__swapr_swaps.sql +++ b/models/silver/dex/swapr/silver_dex__swapr_swaps.sql @@ -3,7 +3,7 @@ incremental_strategy = 'delete+insert', unique_key = 'block_number', cluster_by = ['block_timestamp::DATE'], - tags = ['non_realtime','reorg'] + tags = ['curated','reorg'] ) }} WITH pools AS ( diff --git a/models/silver/nft/silver__nft_transfers.sql b/models/silver/nft/silver__nft_transfers.sql index b6ca608..a895d17 100644 --- a/models/silver/nft/silver__nft_transfers.sql +++ b/models/silver/nft/silver__nft_transfers.sql @@ -4,7 +4,7 @@ unique_key = 'block_number', cluster_by = ['block_timestamp::DATE', '_inserted_timestamp::DATE'], post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION on equality(contract_address, tx_hash)", - tags = ['non_realtime','reorg'] + tags = ['curated','reorg'] ) }} WITH base AS (