This commit is contained in:
Austin 2023-11-10 15:02:53 -05:00 committed by GitHub
parent da8351f3d7
commit 027551d8f0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 890 additions and 655 deletions

View File

@ -0,0 +1,44 @@
name: dbt_run_scheduled_curated
run-name: dbt_run_scheduled_curated
on:
workflow_dispatch:
branches:
- "main"
env:
DBT_PROFILES_DIR: ./
ACCOUNT: "${{ vars.ACCOUNT }}"
ROLE: "${{ vars.ROLE }}"
USER: "${{ vars.USER }}"
PASSWORD: "${{ secrets.PASSWORD }}"
REGION: "${{ vars.REGION }}"
DATABASE: "${{ vars.DATABASE }}"
WAREHOUSE: "${{ vars.WAREHOUSE }}"
SCHEMA: "${{ vars.SCHEMA }}"
concurrency:
group: ${{ github.workflow }}
jobs:
run_dbt_jobs:
runs-on: ubuntu-latest
environment:
name: workflow_prod
steps:
- uses: actions/checkout@v3
- uses: actions/setup-python@v4
with:
python-version: "3.10"
cache: "pip"
- name: install dependencies
run: |
pip install -r requirements.txt
dbt deps
- name: Run DBT Jobs
run: |
dbt run -m "gnosis_models,tag:curated"

View File

@ -2,4 +2,5 @@ workflow_name,workflow_schedule
dbt_run_scheduled_non_realtime,"10,40 * * * *"
dbt_run_streamline_chainhead,"0,30 * * * *"
dbt_run_streamline_decoder,"20,50 * * * *"
dbt_test_tasks,"25,55 * * * *"
dbt_run_scheduled_curated,"30 * * * *"
dbt_test_tasks,"25 * * * *"
1 workflow_name workflow_schedule
2 dbt_run_scheduled_non_realtime 10,40 * * * *
3 dbt_run_streamline_chainhead 0,30 * * * *
4 dbt_run_streamline_decoder 20,50 * * * *
5 dbt_test_tasks dbt_run_scheduled_curated 25,55 * * * * 30 * * * *
6 dbt_test_tasks 25 * * * *

View File

@ -3,7 +3,7 @@
incremental_strategy = 'delete+insert',
unique_key = 'block_number',
full_refresh = false,
tags = ['non_realtime']
tags = ['curated']
) }}
WITH pools_registered AS (
@ -15,15 +15,23 @@ WITH pools_registered AS (
tx_hash,
contract_address,
topics [1] :: STRING AS pool_id,
SUBSTR(topics [1] :: STRING,1,42) AS pool_address,
SUBSTR(
topics [1] :: STRING,
1,
42
) AS pool_address,
_log_id,
_inserted_timestamp,
ROW_NUMBER() OVER (ORDER BY pool_address) AS row_num
ROW_NUMBER() over (
ORDER BY
pool_address
) AS row_num
FROM
{{ ref('silver__logs') }}
WHERE
topics[0]::STRING = '0x3c13bc30b8e878c53fd2a36b679409c073afd75950be43d8858768e956fbc20e' --PoolRegistered
topics [0] :: STRING = '0x3c13bc30b8e878c53fd2a36b679409c073afd75950be43d8858768e956fbc20e' --PoolRegistered
AND contract_address = '0xba12222222228d8ba445958a75a0704d566bf2c8'
{% if is_incremental() %}
AND _inserted_timestamp >= (
SELECT
@ -32,120 +40,91 @@ AND _inserted_timestamp >= (
{{ this }}
)
{% endif %}
),
tokens_registered AS (
SELECT
block_number,
block_timestamp,
event_index,
tx_hash,
contract_address,
decoded_flat :poolId :: STRING AS pool_id,
decoded_flat :tokens AS tokens,
tokens[0] :: STRING AS token0,
tokens[1] :: STRING AS token1,
tokens[2] :: STRING AS token2,
tokens[3] :: STRING AS token3,
tokens[4] :: STRING AS token4,
tokens[5] :: STRING AS token5,
tokens[6] :: STRING AS token6,
tokens[7] :: STRING AS token7,
decoded_flat :assetManagers AS asset_managers,
_log_id,
_inserted_timestamp
FROM {{ ref('silver__decoded_logs') }}
WHERE
topics[0]::STRING = '0xf5847d3f2197b16cdcd2098ec95d0905cd1abdaf415f07bb7cef2bba8ac5dec4' --TokensRegistered
AND contract_address = '0xba12222222228d8ba445958a75a0704d566bf2c8'
AND tx_hash IN (
SELECT
tx_hash
FROM
pools_registered
)
),
function_sigs AS (
SELECT
'0x06fdde03' AS function_sig,
'name' AS function_name
UNION ALL
SELECT
'0x95d89b41' AS function_sig,
'symbol' AS function_name
UNION ALL
SELECT
'0x313ce567' AS function_sig,
'decimals' AS function_name
),
inputs_pools AS (
SELECT
pool_address,
block_number,
function_sig,
(ROW_NUMBER() OVER (PARTITION BY pool_address
ORDER BY block_number)) - 1 AS function_input
FROM pools_registered
JOIN function_sigs ON 1=1
),
pool_token_reads AS (
{% for item in range(50) %}
(
SELECT
ethereum.streamline.udf_json_rpc_read_calls(
node_url,
headers,
PARSE_JSON(batch_read)
) AS read_output,
SYSDATE() AS _inserted_timestamp
FROM (
SELECT
CONCAT('[', LISTAGG(read_input, ','), ']') AS batch_read
FROM (
SELECT
pool_address,
block_number,
function_sig,
function_input,
CONCAT(
'[\'',
pool_address,
'\',',
block_number,
',\'',
function_sig,
'\',\'',
function_input,
'\']'
) AS read_input,
row_num
FROM inputs_pools
LEFT JOIN pools_registered USING(pool_address)
) ready_reads_pools
WHERE row_num BETWEEN {{ item * 50 + 1 }} AND {{ (item + 1) * 50}}
) batch_reads_pools
JOIN {{ source(
'streamline_crosschain',
'node_mapping'
) }} ON 1=1
AND chain = 'gnosis'
) {% if not loop.last %}
UNION ALL
{% endif %}
{% endfor %}
block_number,
block_timestamp,
event_index,
tx_hash,
contract_address,
decoded_flat :poolId :: STRING AS pool_id,
decoded_flat :tokens AS tokens,
tokens [0] :: STRING AS token0,
tokens [1] :: STRING AS token1,
tokens [2] :: STRING AS token2,
tokens [3] :: STRING AS token3,
tokens [4] :: STRING AS token4,
tokens [5] :: STRING AS token5,
tokens [6] :: STRING AS token6,
tokens [7] :: STRING AS token7,
decoded_flat :assetManagers AS asset_managers,
_log_id,
_inserted_timestamp
FROM
{{ ref('silver__decoded_logs') }}
WHERE
topics [0] :: STRING = '0xf5847d3f2197b16cdcd2098ec95d0905cd1abdaf415f07bb7cef2bba8ac5dec4' --TokensRegistered
AND contract_address = '0xba12222222228d8ba445958a75a0704d566bf2c8'
AND tx_hash IN (
SELECT
tx_hash
FROM
pools_registered
)
),
reads_adjusted AS (
function_sigs AS (
SELECT
'0x06fdde03' AS function_sig,
'name' AS function_name
UNION ALL
SELECT
'0x95d89b41' AS function_sig,
'symbol' AS function_name
UNION ALL
SELECT
'0x313ce567' AS function_sig,
'decimals' AS function_name
),
inputs_pools AS (
SELECT
pool_address,
block_number,
function_sig,
(ROW_NUMBER() over (PARTITION BY pool_address
ORDER BY
block_number)) - 1 AS function_input
FROM
pools_registered
JOIN function_sigs
ON 1 = 1
),
pool_token_reads AS ({% for item in range(50) %}
(
SELECT
ethereum.streamline.udf_json_rpc_read_calls(node_url, headers, PARSE_JSON(batch_read)) AS read_output, SYSDATE() AS _inserted_timestamp
FROM
(
SELECT
CONCAT('[', LISTAGG(read_input, ','), ']') AS batch_read
FROM
(
SELECT
pool_address, block_number, function_sig, function_input, CONCAT('[\'', pool_address, '\',', block_number, ',\'', function_sig, '\',\'', function_input, '\']') AS read_input, row_num
FROM
inputs_pools
LEFT JOIN pools_registered USING(pool_address)) ready_reads_pools
WHERE
row_num BETWEEN {{ item * 50 + 1 }}
AND {{(item + 1) * 50 }}) batch_reads_pools
JOIN {{ source('streamline_crosschain', 'node_mapping') }}
ON 1 = 1
AND chain = 'gnosis') {% if not loop.last %}
UNION ALL
{% endif %}
{% endfor %}),
reads_adjusted AS (
SELECT
VALUE :id :: STRING AS read_id,
VALUE :result :: STRING AS read_result,
SPLIT(
@ -157,40 +136,53 @@ SELECT
read_id_object [2] :: STRING AS function_sig,
read_id_object [3] :: STRING AS function_input,
_inserted_timestamp
FROM
pool_token_reads,
LATERAL FLATTEN(
input => read_output [0] :data
)
FROM
pool_token_reads,
LATERAL FLATTEN(
input => read_output [0] :data
)
),
pool_details AS (
SELECT
pool_address,
function_sig,
function_name,
read_result,
regexp_substr_all(SUBSTR(read_result, 3, len(read_result)), '.{64}') AS segmented_output,
_inserted_timestamp
FROM reads_adjusted
LEFT JOIN function_sigs USING(function_sig)
),
SELECT
pool_address,
function_sig,
function_name,
read_result,
regexp_substr_all(SUBSTR(read_result, 3, len(read_result)), '.{64}') AS segmented_output,
_inserted_timestamp
FROM
reads_adjusted
LEFT JOIN function_sigs USING(function_sig)
),
FINAL AS (
SELECT
pool_address,
MIN(CASE WHEN function_name = 'symbol' THEN utils.udf_hex_to_string(segmented_output [2] :: STRING) END) AS pool_symbol,
MIN(CASE WHEN function_name = 'name' THEN utils.udf_hex_to_string(segmented_output [2] :: STRING) END) AS pool_name,
MIN(CASE
WHEN read_result::STRING = '0x' THEN NULL
ELSE utils.udf_hex_to_int(LEFT(read_result::STRING,66))
END)::INTEGER AS pool_decimals,
MAX(_inserted_timestamp) AS _inserted_timestamp
FROM pool_details
GROUP BY 1
SELECT
pool_address,
MIN(
CASE
WHEN function_name = 'symbol' THEN utils.udf_hex_to_string(
segmented_output [2] :: STRING
)
END
) AS pool_symbol,
MIN(
CASE
WHEN function_name = 'name' THEN utils.udf_hex_to_string(
segmented_output [2] :: STRING
)
END
) AS pool_name,
MIN(
CASE
WHEN read_result :: STRING = '0x' THEN NULL
ELSE utils.udf_hex_to_int(LEFT(read_result :: STRING, 66))
END
) :: INTEGER AS pool_decimals,
MAX(_inserted_timestamp) AS _inserted_timestamp
FROM
pool_details
GROUP BY
1
)
SELECT
p.block_number,
p.block_timestamp,
@ -213,7 +205,11 @@ SELECT
t.asset_managers,
p._log_id,
f._inserted_timestamp
FROM FINAL f
LEFT JOIN pools_registered p ON f.pool_address = p.pool_address
LEFT JOIN tokens_registered t ON p.pool_id = t.pool_id
WHERE t.token0 IS NOT NULL
FROM
FINAL f
LEFT JOIN pools_registered p
ON f.pool_address = p.pool_address
LEFT JOIN tokens_registered t
ON p.pool_id = t.pool_id
WHERE
t.token0 IS NOT NULL

View File

@ -3,7 +3,7 @@
incremental_strategy = 'delete+insert',
unique_key = 'block_number',
cluster_by = ['block_timestamp::DATE'],
tags = ['non_realtime','reorg']
tags = ['curated','reorg']
) }}
WITH pools AS (

View File

@ -3,32 +3,38 @@
incremental_strategy = 'delete+insert',
unique_key = 'block_number',
full_refresh = false,
tags = ['non_realtime']
tags = ['curated']
) }}
WITH contract_deployments AS (
SELECT
tx_hash,
block_number,
block_timestamp,
from_address AS deployer_address,
to_address AS contract_address,
_call_id,
_inserted_timestamp,
ROW_NUMBER() OVER (ORDER BY contract_address) AS row_num
FROM
{{ ref('silver__traces' )}}
WHERE
-- curve contract deployers
from_address IN (
'0x7eeac6cddbd1d0b8af061742d41877d7f707289a',
'0xcbaf0a32f5a16b326f00607421857f68fc72e508',
'0xd25fcbb7b6021cf83122fcd65be88a045d5f961c',
'0xd19baeadc667cf2015e395f2b08668ef120f41f5'
SELECT
tx_hash,
block_number,
block_timestamp,
from_address AS deployer_address,
to_address AS contract_address,
_call_id,
_inserted_timestamp,
ROW_NUMBER() over (
ORDER BY
contract_address
) AS row_num
FROM
{{ ref(
'silver__traces'
) }}
WHERE
-- curve contract deployers
from_address IN (
'0x7eeac6cddbd1d0b8af061742d41877d7f707289a',
'0xcbaf0a32f5a16b326f00607421857f68fc72e508',
'0xd25fcbb7b6021cf83122fcd65be88a045d5f961c',
'0xd19baeadc667cf2015e395f2b08668ef120f41f5'
)
AND TYPE ilike 'create%'
AND TX_STATUS ilike 'success'
AND TYPE ILIKE 'create%'
AND tx_status ILIKE 'success'
{% if is_incremental() %}
AND _inserted_timestamp >= (
SELECT
@ -43,188 +49,179 @@ AND to_address NOT IN (
{{ this }}
)
{% endif %}
QUALIFY(ROW_NUMBER() OVER(PARTITION BY to_address ORDER BY block_timestamp ASC)) = 1
),
qualify(ROW_NUMBER() over(PARTITION BY to_address
ORDER BY
block_timestamp ASC)) = 1
),
function_sigs AS (
SELECT
'0x87cb4f57' AS function_sig,
'base_coins' AS function_name
UNION ALL
SELECT
'0xb9947eb0' AS function_sig,
'underlying_coins' AS function_name
UNION ALL
SELECT
'0xc6610657' AS function_sig,
'coins' AS function_name
UNION ALL
SELECT
'0x06fdde03' AS function_sig,
'name' AS function_name
UNION ALL
SELECT
'0x95d89b41' AS function_sig,
'symbol' AS function_name
UNION ALL
SELECT
'0x313ce567' AS function_sig,
'decimals' AS function_name
SELECT
'0x87cb4f57' AS function_sig,
'base_coins' AS function_name
UNION ALL
SELECT
'0xb9947eb0' AS function_sig,
'underlying_coins' AS function_name
UNION ALL
SELECT
'0xc6610657' AS function_sig,
'coins' AS function_name
UNION ALL
SELECT
'0x06fdde03' AS function_sig,
'name' AS function_name
UNION ALL
SELECT
'0x95d89b41' AS function_sig,
'symbol' AS function_name
UNION ALL
SELECT
'0x313ce567' AS function_sig,
'decimals' AS function_name
),
function_inputs AS (
SELECT
SEQ4() AS function_input
FROM
TABLE(GENERATOR(rowcount => 8))
),
inputs_coins AS (
SELECT
deployer_address,
contract_address,
block_number,
function_sig,
(ROW_NUMBER() OVER (PARTITION BY contract_address
ORDER BY block_number)) - 1 AS function_input
FROM contract_deployments
JOIN function_sigs ON 1=1
JOIN function_inputs ON 1=1
WHERE function_name = 'coins'
),
inputs_base_coins AS (
SELECT
deployer_address,
contract_address,
block_number,
function_sig,
(ROW_NUMBER() OVER (PARTITION BY contract_address
ORDER BY block_number)) - 1 AS function_input
FROM contract_deployments
JOIN function_sigs ON 1=1
JOIN function_inputs ON 1=1
WHERE function_name = 'base_coins'
),
inputs_underlying_coins AS (
SELECT
deployer_address,
contract_address,
block_number,
function_sig,
(ROW_NUMBER() OVER (PARTITION BY contract_address
ORDER BY block_number)) - 1 AS function_input
FROM contract_deployments
JOIN function_sigs ON 1=1
JOIN function_inputs ON 1=1
WHERE function_name = 'underlying_coins'
),
inputs_pool_details AS (
SELECT
deployer_address,
contract_address,
block_number,
function_sig,
NULL AS function_input
FROM contract_deployments
JOIN function_sigs ON 1=1
WHERE function_name IN ('name','symbol','decimals')
),
all_inputs AS (
SELECT
deployer_address,
contract_address,
block_number,
function_sig,
function_input
FROM inputs_coins
UNION ALL
SELECT
deployer_address,
contract_address,
block_number,
function_sig,
function_input
FROM inputs_base_coins
UNION ALL
SELECT
deployer_address,
contract_address,
block_number,
function_sig,
function_input
FROM inputs_underlying_coins
UNION ALL
SELECT
deployer_address,
contract_address,
block_number,
function_sig,
function_input
FROM inputs_pool_details
),
pool_token_reads AS (
{% for item in range(20) %}
(
SELECT
ethereum.streamline.udf_json_rpc_read_calls(
node_url,
headers,
PARSE_JSON(batch_read)
) AS read_output,
SYSDATE() AS _inserted_timestamp
FROM (
SELECT
CONCAT('[', LISTAGG(read_input, ','), ']') AS batch_read
FROM (
SELECT
deployer_address,
contract_address,
block_number,
function_sig,
function_input,
CONCAT(
'[\'',
contract_address,
'\',',
block_number,
',\'',
function_sig,
'\',\'',
(CASE WHEN function_input IS NULL THEN '' ELSE function_input::STRING END),
'\']'
) AS read_input,
row_num
FROM all_inputs
LEFT JOIN contract_deployments USING(contract_address)
) ready_reads_pools
WHERE row_num BETWEEN {{ item * 50 + 1 }} AND {{ (item + 1) * 50}}
) batch_reads_pools
JOIN {{ source(
'streamline_crosschain',
'node_mapping'
) }} ON 1=1
AND chain = 'gnosis'
) {% if not loop.last %}
UNION ALL
{% endif %}
{% endfor %}
deployer_address,
contract_address,
block_number,
function_sig,
(ROW_NUMBER() over (PARTITION BY contract_address
ORDER BY
block_number)) - 1 AS function_input
FROM
contract_deployments
JOIN function_sigs
ON 1 = 1
JOIN function_inputs
ON 1 = 1
WHERE
function_name = 'coins'
),
reads_adjusted AS (
inputs_base_coins AS (
SELECT
deployer_address,
contract_address,
block_number,
function_sig,
(ROW_NUMBER() over (PARTITION BY contract_address
ORDER BY
block_number)) - 1 AS function_input
FROM
contract_deployments
JOIN function_sigs
ON 1 = 1
JOIN function_inputs
ON 1 = 1
WHERE
function_name = 'base_coins'
),
inputs_underlying_coins AS (
SELECT
deployer_address,
contract_address,
block_number,
function_sig,
(ROW_NUMBER() over (PARTITION BY contract_address
ORDER BY
block_number)) - 1 AS function_input
FROM
contract_deployments
JOIN function_sigs
ON 1 = 1
JOIN function_inputs
ON 1 = 1
WHERE
function_name = 'underlying_coins'
),
inputs_pool_details AS (
SELECT
deployer_address,
contract_address,
block_number,
function_sig,
NULL AS function_input
FROM
contract_deployments
JOIN function_sigs
ON 1 = 1
WHERE
function_name IN (
'name',
'symbol',
'decimals'
)
),
all_inputs AS (
SELECT
deployer_address,
contract_address,
block_number,
function_sig,
function_input
FROM
inputs_coins
UNION ALL
SELECT
deployer_address,
contract_address,
block_number,
function_sig,
function_input
FROM
inputs_base_coins
UNION ALL
SELECT
deployer_address,
contract_address,
block_number,
function_sig,
function_input
FROM
inputs_underlying_coins
UNION ALL
SELECT
deployer_address,
contract_address,
block_number,
function_sig,
function_input
FROM
inputs_pool_details
),
pool_token_reads AS ({% for item in range(20) %}
(
SELECT
ethereum.streamline.udf_json_rpc_read_calls(node_url, headers, PARSE_JSON(batch_read)) AS read_output, SYSDATE() AS _inserted_timestamp
FROM
(
SELECT
CONCAT('[', LISTAGG(read_input, ','), ']') AS batch_read
FROM
(
SELECT
deployer_address, contract_address, block_number, function_sig, function_input, CONCAT('[\'', contract_address, '\',', block_number, ',\'', function_sig, '\',\'', (CASE
WHEN function_input IS NULL THEN ''
ELSE function_input :: STRINGEND), '\']') AS read_input, row_num
FROM
all_inputs
LEFT JOIN contract_deployments USING(contract_address)) ready_reads_pools
WHERE
row_num BETWEEN {{ item * 50 + 1 }}
AND {{(item + 1) * 50 }}) batch_reads_pools
JOIN {{ source('streamline_crosschain', 'node_mapping') }}
ON 1 = 1
AND chain = 'gnosis') {% if not loop.last %}
UNION ALL
{% endif %}
{% endfor %}),
reads_adjusted AS (
SELECT
VALUE :id :: STRING AS read_id,
VALUE :result :: STRING AS read_result,
SPLIT(
@ -236,113 +233,147 @@ SELECT
read_id_object [2] :: STRING AS function_sig,
read_id_object [3] :: STRING AS function_input,
_inserted_timestamp
FROM
pool_token_reads,
LATERAL FLATTEN(
input => read_output [0] :data
)
FROM
pool_token_reads,
LATERAL FLATTEN(
input => read_output [0] :data
)
),
tokens AS (
SELECT
contract_address,
function_sig,
function_name,
function_input,
read_result,
regexp_substr_all(SUBSTR(read_result, 3, len(read_result)), '.{64}')[0]AS segmented_token_address,
_inserted_timestamp
FROM reads_adjusted
LEFT JOIN function_sigs USING(function_sig)
WHERE function_name IN ('coins','base_coins','underlying_coins')
AND read_result IS NOT NULL
),
pool_details AS (
SELECT
contract_address,
function_sig,
function_name,
function_input,
read_result,
regexp_substr_all(SUBSTR(read_result, 3, len(read_result)), '.{64}') AS segmented_output,
_inserted_timestamp
FROM reads_adjusted
LEFT JOIN function_sigs USING(function_sig)
WHERE function_name IN ('name','symbol','decimals')
AND read_result IS NOT NULL
),
all_pools AS (
SELECT
t.contract_address AS pool_address,
CONCAT('0x',SUBSTRING(t.segmented_token_address,25,40)) AS token_address,
function_input AS token_id,
function_name AS token_type,
MIN(CASE WHEN p.function_name = 'symbol' THEN utils.udf_hex_to_string(RTRIM(p.segmented_output [2] :: STRING, 0)) END) AS pool_symbol,
MIN(CASE WHEN p.function_name = 'name' THEN CONCAT(utils.udf_hex_to_string(p.segmented_output [2] :: STRING),
utils.udf_hex_to_string(segmented_output [3] :: STRING)) END) AS pool_name,
MIN(CASE
WHEN p.read_result::STRING = '0x' THEN NULL
ELSE utils.udf_hex_to_int(LEFT(p.read_result::STRING,66))
END)::INTEGER AS pool_decimals,
CONCAT(
t.contract_address,
'-',
CONCAT('0x',SUBSTRING(t.segmented_token_address,25,40)),
'-',
SELECT
contract_address,
function_sig,
function_name,
function_input,
'-',
function_name
) AS pool_id,
MAX(t._inserted_timestamp) AS _inserted_timestamp
FROM tokens t
LEFT JOIN pool_details p USING(contract_address)
WHERE token_address IS NOT NULL
AND token_address <> '0x0000000000000000000000000000000000000000'
GROUP BY 1,2,3,4
read_result,
regexp_substr_all(SUBSTR(read_result, 3, len(read_result)), '.{64}') [0] AS segmented_token_address,
_inserted_timestamp
FROM
reads_adjusted
LEFT JOIN function_sigs USING(function_sig)
WHERE
function_name IN (
'coins',
'base_coins',
'underlying_coins'
)
AND read_result IS NOT NULL
),
pool_details AS (
SELECT
contract_address,
function_sig,
function_name,
function_input,
read_result,
regexp_substr_all(SUBSTR(read_result, 3, len(read_result)), '.{64}') AS segmented_output,
_inserted_timestamp
FROM
reads_adjusted
LEFT JOIN function_sigs USING(function_sig)
WHERE
function_name IN (
'name',
'symbol',
'decimals'
)
AND read_result IS NOT NULL
),
all_pools AS (
SELECT
t.contract_address AS pool_address,
CONCAT('0x', SUBSTRING(t.segmented_token_address, 25, 40)) AS token_address,
function_input AS token_id,
function_name AS token_type,
MIN(
CASE
WHEN p.function_name = 'symbol' THEN utils.udf_hex_to_string(RTRIM(p.segmented_output [2] :: STRING, 0))
END
) AS pool_symbol,
MIN(
CASE
WHEN p.function_name = 'name' THEN CONCAT(
utils.udf_hex_to_string(
p.segmented_output [2] :: STRING
),
utils.udf_hex_to_string(
segmented_output [3] :: STRING
)
)
END
) AS pool_name,
MIN(
CASE
WHEN p.read_result :: STRING = '0x' THEN NULL
ELSE utils.udf_hex_to_int(LEFT(p.read_result :: STRING, 66))
END
) :: INTEGER AS pool_decimals,
CONCAT(
t.contract_address,
'-',
CONCAT('0x', SUBSTRING(t.segmented_token_address, 25, 40)),
'-',
function_input,
'-',
function_name
) AS pool_id,
MAX(
t._inserted_timestamp
) AS _inserted_timestamp
FROM
tokens t
LEFT JOIN pool_details p USING(contract_address)
WHERE
token_address IS NOT NULL
AND token_address <> '0x0000000000000000000000000000000000000000'
GROUP BY
1,
2,
3,
4
),
FINAL AS (
SELECT
block_number,
block_timestamp,
tx_hash,
deployer_address,
pool_address,
CASE
WHEN token_address = '0xeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeee' THEN '0xe91d153e0b41518a2ce8dd3d7944fa863463a97d'
ELSE token_address
END AS token_address,
token_id,
token_type,
CASE
WHEN token_address = '0xeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeee' THEN 'WXDAI'
WHEN pool_symbol IS NULL THEN c.token_symbol
ELSE pool_symbol
END AS pool_symbol,
pool_name,
CASE
WHEN token_address = '0xeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeee' THEN '18'
WHEN pool_decimals IS NULL THEN c.token_decimals
ELSE pool_decimals
END AS pool_decimals,
pool_id,
_call_id,
a._inserted_timestamp
FROM all_pools a
LEFT JOIN {{ ref('silver__contracts') }} c
ON a.token_address = c.contract_address
LEFT JOIN contract_deployments d
ON a.pool_address = d.contract_address
QUALIFY(ROW_NUMBER() OVER(PARTITION BY pool_address, token_address ORDER BY a._inserted_timestamp DESC)) = 1
SELECT
block_number,
block_timestamp,
tx_hash,
deployer_address,
pool_address,
CASE
WHEN token_address = '0xeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeee' THEN '0xe91d153e0b41518a2ce8dd3d7944fa863463a97d'
ELSE token_address
END AS token_address,
token_id,
token_type,
CASE
WHEN token_address = '0xeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeee' THEN 'WXDAI'
WHEN pool_symbol IS NULL THEN C.token_symbol
ELSE pool_symbol
END AS pool_symbol,
pool_name,
CASE
WHEN token_address = '0xeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeee' THEN '18'
WHEN pool_decimals IS NULL THEN C.token_decimals
ELSE pool_decimals
END AS pool_decimals,
pool_id,
_call_id,
A._inserted_timestamp
FROM
all_pools A
LEFT JOIN {{ ref('silver__contracts') }} C
ON A.token_address = C.contract_address
LEFT JOIN contract_deployments d
ON A.pool_address = d.contract_address qualify(ROW_NUMBER() over(PARTITION BY pool_address, token_address
ORDER BY
A._inserted_timestamp DESC)) = 1
)
SELECT
*,
ROW_NUMBER() OVER (PARTITION BY pool_address ORDER BY token_address ASC) AS token_num
FROM FINAL
ROW_NUMBER() over (
PARTITION BY pool_address
ORDER BY
token_address ASC
) AS token_num
FROM
FINAL

View File

@ -3,7 +3,7 @@
incremental_strategy = 'delete+insert',
unique_key = 'block_number',
cluster_by = ['block_timestamp::DATE'],
tags = ['non_realtime','reorg']
tags = ['curated','reorg']
) }}
WITH pool_meta AS (
@ -13,20 +13,20 @@ WITH pool_meta AS (
pool_name,
token_address,
pool_symbol AS symbol,
token_id::INTEGER AS token_id,
token_type::STRING AS token_type
token_id :: INTEGER AS token_id,
token_type :: STRING AS token_type
FROM
{{ ref('silver_dex__curve_pools') }}
),
pools AS (
SELECT
SELECT
DISTINCT pool_address,
pool_name
FROM pool_meta
QUALIFY (ROW_NUMBER() OVER (PARTITION BY pool_address ORDER BY pool_name ASC NULLS LAST)) = 1
FROM
pool_meta qualify (ROW_NUMBER() over (PARTITION BY pool_address
ORDER BY
pool_name ASC nulls last)) = 1
),
curve_base AS (
SELECT
block_number,
@ -45,28 +45,39 @@ curve_base AS (
pool_name,
regexp_substr_all(SUBSTR(DATA, 3, len(DATA)), '.{64}') AS segmented_data,
CONCAT('0x', SUBSTR(topics [1] :: STRING, 27, 40)) AS sender,
TRY_TO_NUMBER(utils.udf_hex_to_int(
segmented_data [0] :: STRING
)) AS sold_id,
TRY_TO_NUMBER(utils.udf_hex_to_int(
segmented_data [1] :: STRING
)) AS tokens_sold,
TRY_TO_NUMBER(utils.udf_hex_to_int(
segmented_data [2] :: STRING
)) AS bought_id,
TRY_TO_NUMBER(utils.udf_hex_to_int(
segmented_data [3] :: STRING
)) AS tokens_bought,
TRY_TO_NUMBER(
utils.udf_hex_to_int(
segmented_data [0] :: STRING
)
) AS sold_id,
TRY_TO_NUMBER(
utils.udf_hex_to_int(
segmented_data [1] :: STRING
)
) AS tokens_sold,
TRY_TO_NUMBER(
utils.udf_hex_to_int(
segmented_data [2] :: STRING
)
) AS bought_id,
TRY_TO_NUMBER(
utils.udf_hex_to_int(
segmented_data [3] :: STRING
)
) AS tokens_bought,
_log_id,
_inserted_timestamp
FROM
{{ ref('silver__logs') }} l
{{ ref('silver__logs') }}
l
INNER JOIN pools p
ON p.pool_address = l.contract_address
WHERE
topics [0] :: STRING IN (
'0x8b3e96f2b889fa771c53c981b40daf005f63f637f1869f707052d15a3dd97140', --TokenExchange
'0xb2e76ae99761dc136e598d4a629bb347eccb9532a5f8bbd72e18467c3c34cc98', --TokenExchange
'0x8b3e96f2b889fa771c53c981b40daf005f63f637f1869f707052d15a3dd97140',
--TokenExchange
'0xb2e76ae99761dc136e598d4a629bb347eccb9532a5f8bbd72e18467c3c34cc98',
--TokenExchange
'0xd013ca23e77a65003c2c659c5442c00c805371b7fc1ebd4c206c41d1536bd90b' --TokenExchangeUnderlying
)
@ -79,21 +90,42 @@ AND _inserted_timestamp >= (
)
{% endif %}
),
token_exchange AS (
SELECT
_log_id,
MAX(CASE WHEN sold_id = token_id THEN token_address END) AS token_in,
MAX(CASE WHEN bought_id = token_id THEN token_address END) AS token_out,
MAX(CASE WHEN sold_id = token_id THEN symbol END) AS symbol_in,
MAX(CASE WHEN bought_id = token_id THEN symbol END) AS symbol_out
FROM curve_base t
LEFT JOIN pool_meta p ON p.pool_address = t.pool_address AND (p.token_id = t.sold_id OR p.token_id = t.bought_id)
WHERE token_type = 'coins'
GROUP BY 1
SELECT
_log_id,
MAX(
CASE
WHEN sold_id = token_id THEN token_address
END
) AS token_in,
MAX(
CASE
WHEN bought_id = token_id THEN token_address
END
) AS token_out,
MAX(
CASE
WHEN sold_id = token_id THEN symbol
END
) AS symbol_in,
MAX(
CASE
WHEN bought_id = token_id THEN symbol
END
) AS symbol_out
FROM
curve_base t
LEFT JOIN pool_meta p
ON p.pool_address = t.pool_address
AND (
p.token_id = t.sold_id
OR p.token_id = t.bought_id
)
WHERE
token_type = 'coins'
GROUP BY
1
),
token_transfers AS (
SELECT
tx_hash,
@ -144,49 +176,52 @@ to_transfers AS (
FROM
token_transfers
),
ready_pool_info AS (
SELECT
s.block_number,
s.block_timestamp,
s.tx_hash,
s.origin_function_signature,
s.origin_from_address,
s.origin_from_address AS tx_to,
s.origin_to_address,
event_index,
event_name,
pool_address,
pool_address AS contract_address,
pool_name,
sender,
sold_id,
tokens_sold,
COALESCE(sold.token_address,e.token_in) AS token_in,
e.symbol_in AS symbol_in,
bought_id,
tokens_bought,
COALESCE(bought.token_address,e.token_out) AS token_out,
e.symbol_out AS symbol_out,
s._log_id,
_inserted_timestamp
FROM
curve_base s
LEFT JOIN token_exchange e ON s._log_id = e._log_id
LEFT JOIN from_transfers sold
ON tokens_sold = sold.amount
AND s.tx_hash = sold.tx_hash
LEFT JOIN to_transfers bought
ON tokens_bought = bought.amount
AND s.tx_hash = bought.tx_hash
WHERE
tokens_sold <> 0
qualify(ROW_NUMBER() over(PARTITION BY s._log_id
SELECT
s.block_number,
s.block_timestamp,
s.tx_hash,
s.origin_function_signature,
s.origin_from_address,
s.origin_from_address AS tx_to,
s.origin_to_address,
event_index,
event_name,
pool_address,
pool_address AS contract_address,
pool_name,
sender,
sold_id,
tokens_sold,
COALESCE(
sold.token_address,
e.token_in
) AS token_in,
e.symbol_in AS symbol_in,
bought_id,
tokens_bought,
COALESCE(
bought.token_address,
e.token_out
) AS token_out,
e.symbol_out AS symbol_out,
s._log_id,
_inserted_timestamp
FROM
curve_base s
LEFT JOIN token_exchange e
ON s._log_id = e._log_id
LEFT JOIN from_transfers sold
ON tokens_sold = sold.amount
AND s.tx_hash = sold.tx_hash
LEFT JOIN to_transfers bought
ON tokens_bought = bought.amount
AND s.tx_hash = bought.tx_hash
WHERE
tokens_sold <> 0 qualify(ROW_NUMBER() over(PARTITION BY s._log_id
ORDER BY
_inserted_timestamp DESC)) = 1
_inserted_timestamp DESC)) = 1
)
SELECT
block_number,
block_timestamp,
@ -213,4 +248,4 @@ SELECT
_inserted_timestamp,
'curve' AS platform
FROM
ready_pool_info
ready_pool_info

View File

@ -2,7 +2,7 @@
materialized = 'incremental',
incremental_strategy = 'delete+insert',
unique_key = 'block_number',
tags = ['non_realtime']
tags = ['curated']
) }}
WITH pool_creation AS (

View File

@ -3,7 +3,7 @@
incremental_strategy = 'delete+insert',
unique_key = 'block_number',
cluster_by = ['block_timestamp::DATE'],
tags = ['non_realtime','reorg']
tags = ['curated','reorg']
) }}
WITH pools AS (

View File

@ -3,7 +3,7 @@
incremental_strategy = 'delete+insert',
unique_key = ['block_number','platform','version'],
cluster_by = ['block_timestamp::DATE'],
tags = ['non_realtime','reorg']
tags = ['curated','reorg']
) }}
WITH contracts AS (
@ -16,10 +16,8 @@ WITH contracts AS (
FROM
{{ ref('silver__contracts') }}
),
balancer AS (
SELECT
SELECT
block_number,
block_timestamp,
tx_hash,
@ -38,8 +36,9 @@ SELECT
token5,
token6,
token7
FROM
FROM
{{ ref('silver_dex__balancer_pools') }}
{% if is_incremental() %}
WHERE
_inserted_timestamp >= (
@ -50,10 +49,8 @@ WHERE
)
{% endif %}
),
curve AS (
SELECT
SELECT
block_number,
block_timestamp,
tx_hash,
@ -64,16 +61,49 @@ SELECT
'v1' AS version,
_call_id AS _id,
_inserted_timestamp,
MAX(CASE WHEN token_num = 1 THEN token_address END) AS token0,
MAX(CASE WHEN token_num = 2 THEN token_address END) AS token1,
MAX(CASE WHEN token_num = 3 THEN token_address END) AS token2,
MAX(CASE WHEN token_num = 4 THEN token_address END) AS token3,
MAX(CASE WHEN token_num = 5 THEN token_address END) AS token4,
MAX(CASE WHEN token_num = 6 THEN token_address END) AS token5,
MAX(CASE WHEN token_num = 7 THEN token_address END) AS token6,
MAX(CASE WHEN token_num = 8 THEN token_address END) AS token7
FROM
MAX(
CASE
WHEN token_num = 1 THEN token_address
END
) AS token0,
MAX(
CASE
WHEN token_num = 2 THEN token_address
END
) AS token1,
MAX(
CASE
WHEN token_num = 3 THEN token_address
END
) AS token2,
MAX(
CASE
WHEN token_num = 4 THEN token_address
END
) AS token3,
MAX(
CASE
WHEN token_num = 5 THEN token_address
END
) AS token4,
MAX(
CASE
WHEN token_num = 6 THEN token_address
END
) AS token5,
MAX(
CASE
WHEN token_num = 7 THEN token_address
END
) AS token6,
MAX(
CASE
WHEN token_num = 8 THEN token_address
END
) AS token7
FROM
{{ ref('silver_dex__curve_pools') }}
{% if is_incremental() %}
WHERE
_inserted_timestamp >= (
@ -83,12 +113,11 @@ WHERE
{{ this }}
)
{% endif %}
GROUP BY all
GROUP BY
ALL
),
honeyswap AS (
SELECT
SELECT
block_number,
block_timestamp,
tx_hash,
@ -101,8 +130,9 @@ SELECT
'v1' AS version,
_log_id AS _id,
_inserted_timestamp
FROM
FROM
{{ ref('silver_dex__honeyswap_pools') }}
{% if is_incremental() %}
WHERE
_inserted_timestamp >= (
@ -113,10 +143,8 @@ WHERE
)
{% endif %}
),
swapr AS (
SELECT
SELECT
block_number,
block_timestamp,
tx_hash,
@ -129,8 +157,9 @@ SELECT
'v1' AS version,
_log_id AS _id,
_inserted_timestamp
FROM
FROM
{{ ref('silver_dex__swapr_pools') }}
{% if is_incremental() %}
WHERE
_inserted_timestamp >= (
@ -141,10 +170,8 @@ WHERE
)
{% endif %}
),
sushi AS (
SELECT
SELECT
block_number,
block_timestamp,
tx_hash,
@ -157,8 +184,9 @@ SELECT
'v1' AS version,
_log_id AS _id,
_inserted_timestamp
FROM
FROM
{{ ref('silver_dex__sushi_pools') }}
{% if is_incremental() %}
WHERE
_inserted_timestamp >= (
@ -169,113 +197,213 @@ WHERE
)
{% endif %}
),
all_pools_standard AS (
SELECT *
FROM honeyswap
UNION ALL
SELECT *
FROM swapr
UNION ALL
SELECT *
FROM sushi
SELECT
*
FROM
honeyswap
UNION ALL
SELECT
*
FROM
swapr
UNION ALL
SELECT
*
FROM
sushi
),
all_pools_other AS (
SELECT *
FROM balancer
UNION ALL
SELECT *
FROM curve
SELECT
*
FROM
balancer
UNION ALL
SELECT
*
FROM
curve
),
FINAL AS (
SELECT
block_number,
block_timestamp,
tx_hash,
contract_address,
pool_address,
CASE
WHEN pool_name IS NULL
THEN CONCAT(
COALESCE(c0.symbol,CONCAT(SUBSTRING(token0, 1, 5),'...',SUBSTRING(token0, 39, 42))),
'-',
COALESCE(c1.symbol,CONCAT(SUBSTRING(token1, 1, 5),'...',SUBSTRING(token1, 39, 42)))
)
ELSE pool_name
END AS pool_name,
OBJECT_CONSTRUCT('token0',token0,'token1',token1) AS tokens,
OBJECT_CONSTRUCT('token0',c0.symbol,'token1',c1.symbol) AS symbols,
OBJECT_CONSTRUCT('token0',c0.decimals,'token1',c1.decimals) AS decimals,
platform,
version,
_id,
p._inserted_timestamp
FROM all_pools_standard p
LEFT JOIN contracts c0
ON c0.address = p.token0
LEFT JOIN contracts c1
ON c1.address = p.token1
UNION ALL
SELECT
block_number,
block_timestamp,
tx_hash,
contract_address,
pool_address,
CASE
WHEN pool_name IS NULL
THEN CONCAT(
COALESCE(c0.symbol, SUBSTRING(token0, 1, 5) || '...' || SUBSTRING(token0, 39, 42)),
CASE WHEN token1 IS NOT NULL THEN '-' || COALESCE(c1.symbol, SUBSTRING(token1, 1, 5) || '...' || SUBSTRING(token1, 39, 42)) ELSE '' END,
CASE WHEN token2 IS NOT NULL THEN '-' || COALESCE(c2.symbol, SUBSTRING(token2, 1, 5) || '...' || SUBSTRING(token2, 39, 42)) ELSE '' END,
CASE WHEN token3 IS NOT NULL THEN '-' || COALESCE(c3.symbol, SUBSTRING(token3, 1, 5) || '...' || SUBSTRING(token3, 39, 42)) ELSE '' END,
CASE WHEN token4 IS NOT NULL THEN '-' || COALESCE(c4.symbol, SUBSTRING(token4, 1, 5) || '...' || SUBSTRING(token4, 39, 42)) ELSE '' END,
CASE WHEN token5 IS NOT NULL THEN '-' || COALESCE(c5.symbol, SUBSTRING(token5, 1, 5) || '...' || SUBSTRING(token5, 39, 42)) ELSE '' END,
CASE WHEN token6 IS NOT NULL THEN '-' || COALESCE(c6.symbol, SUBSTRING(token6, 1, 5) || '...' || SUBSTRING(token6, 39, 42)) ELSE '' END,
CASE WHEN token7 IS NOT NULL THEN '-' || COALESCE(c7.symbol, SUBSTRING(token7, 1, 5) || '...' || SUBSTRING(token7, 39, 42)) ELSE '' END
)
ELSE pool_name
END AS pool_name,
OBJECT_CONSTRUCT('token0', token0, 'token1', token1, 'token2', token2, 'token3', token3, 'token4', token4, 'token5', token5, 'token6', token6, 'token7', token7) AS tokens,
OBJECT_CONSTRUCT('token0', c0.symbol, 'token1', c1.symbol, 'token2', c2.symbol, 'token3', c3.symbol, 'token4', c4.symbol, 'token5', c5.symbol, 'token6', c6.symbol, 'token7', c7.symbol) AS symbols,
OBJECT_CONSTRUCT('token0', c0.decimals, 'token1', c1.decimals, 'token2', c2.decimals, 'token3', c3.decimals, 'token4', c4.decimals, 'token5', c5.decimals, 'token6', c6.decimals, 'token7', c7.decimals) AS decimals,
platform,
version,
_id,
p._inserted_timestamp
FROM all_pools_other p
LEFT JOIN contracts c0
ON c0.address = p.token0
LEFT JOIN contracts c1
ON c1.address = p.token1
LEFT JOIN contracts c2
ON c2.address = p.token2
LEFT JOIN contracts c3
ON c3.address = p.token3
LEFT JOIN contracts c4
ON c4.address = p.token4
LEFT JOIN contracts c5
ON c5.address = p.token5
LEFT JOIN contracts c6
ON c6.address = p.token6
LEFT JOIN contracts c7
ON c7.address = p.token7
)
SELECT
SELECT
block_number,
block_timestamp,
tx_hash,
platform,
version,
contract_address,
pool_address,
pool_name,
tokens,
symbols,
decimals,
CASE
WHEN pool_name IS NULL THEN CONCAT(
COALESCE(
c0.symbol,
CONCAT(SUBSTRING(token0, 1, 5), '...', SUBSTRING(token0, 39, 42))
),
'-',
COALESCE(
c1.symbol,
CONCAT(SUBSTRING(token1, 1, 5), '...', SUBSTRING(token1, 39, 42))
)
)
ELSE pool_name
END AS pool_name,
OBJECT_CONSTRUCT(
'token0',
token0,
'token1',
token1
) AS tokens,
OBJECT_CONSTRUCT(
'token0',
c0.symbol,
'token1',
c1.symbol
) AS symbols,
OBJECT_CONSTRUCT(
'token0',
c0.decimals,
'token1',
c1.decimals
) AS decimals,
platform,
version,
_id,
_inserted_timestamp
FROM FINAL
p._inserted_timestamp
FROM
all_pools_standard p
LEFT JOIN contracts c0
ON c0.address = p.token0
LEFT JOIN contracts c1
ON c1.address = p.token1
UNION ALL
SELECT
block_number,
block_timestamp,
tx_hash,
contract_address,
pool_address,
CASE
WHEN pool_name IS NULL THEN CONCAT(
COALESCE(c0.symbol, SUBSTRING(token0, 1, 5) || '...' || SUBSTRING(token0, 39, 42)),
CASE
WHEN token1 IS NOT NULL THEN '-' || COALESCE(c1.symbol, SUBSTRING(token1, 1, 5) || '...' || SUBSTRING(token1, 39, 42))
ELSE ''
END,
CASE
WHEN token2 IS NOT NULL THEN '-' || COALESCE(c2.symbol, SUBSTRING(token2, 1, 5) || '...' || SUBSTRING(token2, 39, 42))
ELSE ''
END,
CASE
WHEN token3 IS NOT NULL THEN '-' || COALESCE(c3.symbol, SUBSTRING(token3, 1, 5) || '...' || SUBSTRING(token3, 39, 42))
ELSE ''
END,
CASE
WHEN token4 IS NOT NULL THEN '-' || COALESCE(c4.symbol, SUBSTRING(token4, 1, 5) || '...' || SUBSTRING(token4, 39, 42))
ELSE ''
END,
CASE
WHEN token5 IS NOT NULL THEN '-' || COALESCE(c5.symbol, SUBSTRING(token5, 1, 5) || '...' || SUBSTRING(token5, 39, 42))
ELSE ''
END,
CASE
WHEN token6 IS NOT NULL THEN '-' || COALESCE(c6.symbol, SUBSTRING(token6, 1, 5) || '...' || SUBSTRING(token6, 39, 42))
ELSE ''
END,
CASE
WHEN token7 IS NOT NULL THEN '-' || COALESCE(c7.symbol, SUBSTRING(token7, 1, 5) || '...' || SUBSTRING(token7, 39, 42))
ELSE ''
END
)
ELSE pool_name
END AS pool_name,
OBJECT_CONSTRUCT(
'token0',
token0,
'token1',
token1,
'token2',
token2,
'token3',
token3,
'token4',
token4,
'token5',
token5,
'token6',
token6,
'token7',
token7
) AS tokens,
OBJECT_CONSTRUCT(
'token0',
c0.symbol,
'token1',
c1.symbol,
'token2',
c2.symbol,
'token3',
c3.symbol,
'token4',
c4.symbol,
'token5',
c5.symbol,
'token6',
c6.symbol,
'token7',
c7.symbol
) AS symbols,
OBJECT_CONSTRUCT(
'token0',
c0.decimals,
'token1',
c1.decimals,
'token2',
c2.decimals,
'token3',
c3.decimals,
'token4',
c4.decimals,
'token5',
c5.decimals,
'token6',
c6.decimals,
'token7',
c7.decimals
) AS decimals,
platform,
version,
_id,
p._inserted_timestamp
FROM
all_pools_other p
LEFT JOIN contracts c0
ON c0.address = p.token0
LEFT JOIN contracts c1
ON c1.address = p.token1
LEFT JOIN contracts c2
ON c2.address = p.token2
LEFT JOIN contracts c3
ON c3.address = p.token3
LEFT JOIN contracts c4
ON c4.address = p.token4
LEFT JOIN contracts c5
ON c5.address = p.token5
LEFT JOIN contracts c6
ON c6.address = p.token6
LEFT JOIN contracts c7
ON c7.address = p.token7
)
SELECT
block_number,
block_timestamp,
tx_hash,
platform,
version,
contract_address,
pool_address,
pool_name,
tokens,
symbols,
decimals,
_id,
_inserted_timestamp
FROM
FINAL

View File

@ -3,7 +3,7 @@
incremental_strategy = 'delete+insert',
unique_key = ['block_number','platform','version'],
cluster_by = ['block_timestamp::DATE'],
tags = ['non_realtime','reorg']
tags = ['curated','reorg']
) }}
WITH contracts AS (

View File

@ -2,7 +2,7 @@
materialized = 'incremental',
incremental_strategy = 'delete+insert',
unique_key = 'block_number',
tags = ['non_realtime']
tags = ['curated']
) }}
WITH pool_creation AS (

View File

@ -3,7 +3,7 @@
incremental_strategy = 'delete+insert',
unique_key = 'block_number',
cluster_by = ['block_timestamp::DATE'],
tags = ['non_realtime','reorg']
tags = ['curated','reorg']
) }}
WITH pools AS (

View File

@ -2,7 +2,7 @@
materialized = 'incremental',
incremental_strategy = 'delete+insert',
unique_key = 'block_number',
tags = ['non_realtime']
tags = ['curated']
) }}
WITH pool_creation AS (

View File

@ -3,7 +3,7 @@
incremental_strategy = 'delete+insert',
unique_key = 'block_number',
cluster_by = ['block_timestamp::DATE'],
tags = ['non_realtime','reorg']
tags = ['curated','reorg']
) }}
WITH pools AS (

View File

@ -4,7 +4,7 @@
unique_key = 'block_number',
cluster_by = ['block_timestamp::DATE', '_inserted_timestamp::DATE'],
post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION on equality(contract_address, tx_hash)",
tags = ['non_realtime','reorg']
tags = ['curated','reorg']
) }}
WITH base AS (