An 3911/balances views (#143)

* stash

* updates

* wbnb
This commit is contained in:
Austin 2023-09-12 16:08:39 -04:00 committed by GitHub
parent 5c3c2b0e1a
commit 254a8252f0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 771 additions and 1 deletions

View File

@ -63,4 +63,6 @@ vars:
UPDATE_UDFS_AND_SPS: False
UPDATE_SNOWFLAKE_TAGS: True
WAIT: 0
OBSERV_FULL_TEST: False
OBSERV_FULL_TEST: False
BALANCES_START: 0
BALANCES_END: 10000000

View File

@ -44,6 +44,8 @@ sources:
- name: traces
- name: decoded_logs
- name: confirm_blocks
- name: token_balances
- name: bnb_balances
- name: bsc_silver
database: bsc
schema: silver

View File

@ -0,0 +1,53 @@
{{ config (
materialized = 'view'
) }}
WITH meta AS (
SELECT
last_modified AS _inserted_timestamp,
file_name,
TO_NUMBER(SPLIT_PART(file_name, '/', 3)) AS _partition_by_block_id
FROM
TABLE(
information_schema.external_table_file_registration_history(
start_time => DATEADD('day', -3, CURRENT_TIMESTAMP()),
table_name => '{{ source( "bronze_streamline", "bnb_balances") }}')
) A
)
SELECT
s.block_number :: INTEGER AS block_number,
address :: STRING AS address,
b._inserted_timestamp,
{{ dbt_utils.generate_surrogate_key(
['s.block_number', 'address']
) }} AS id,
s._partition_by_block_id AS _partition_by_block_id,
s.data
FROM
{{ source(
'bronze_streamline',
'bnb_balances'
) }}
s
JOIN meta b
ON b.file_name = metadata$filename
AND b._partition_by_block_id = s._partition_by_block_id
WHERE
b._partition_by_block_id = s._partition_by_block_id
AND (
DATA :error :code IS NULL
OR DATA :error :code NOT IN (
'-32000',
'-32001',
'-32002',
'-32003',
'-32004',
'-32005',
'-32006',
'-32007',
'-32008',
'-32009',
'-32010'
)
)

View File

@ -0,0 +1,53 @@
{{ config (
materialized = 'view'
) }}
WITH meta AS (
SELECT
registered_on AS _inserted_timestamp,
file_name,
TO_NUMBER(SPLIT_PART(file_name, '/', 3)) AS _partition_by_block_id
FROM
TABLE(
information_schema.external_table_files(
table_name => '{{ source( "bronze_streamline", "bnb_balances") }}'
)
) A
)
SELECT
s.block_number :: INTEGER AS block_number,
address :: STRING AS address,
b._inserted_timestamp,
{{ dbt_utils.generate_surrogate_key(
['s.block_number', 'address']
) }} AS id,
s._partition_by_block_id AS _partition_by_block_id,
s.data
FROM
{{ source(
'bronze_streamline',
'bnb_balances'
) }}
s
JOIN meta b
ON b.file_name = metadata$filename
AND b._partition_by_block_id = s._partition_by_block_id
WHERE
b._partition_by_block_id = s._partition_by_block_id
AND (
DATA :error :code IS NULL
OR DATA :error :code NOT IN (
'-32000',
'-32001',
'-32002',
'-32003',
'-32004',
'-32005',
'-32006',
'-32007',
'-32008',
'-32009',
'-32010'
)
)

View File

@ -0,0 +1,53 @@
{{ config (
materialized = 'view'
) }}
WITH meta AS (
SELECT
registered_on AS _inserted_timestamp,
file_name,
TO_NUMBER(SPLIT_PART(file_name, '/', 3)) AS _partition_by_block_id
FROM
TABLE(
information_schema.external_table_files(
table_name => '{{ source( "bronze_streamline", "token_balances") }}'
)
) A
)
SELECT
s.block_number :: INTEGER AS block_number,
address :: STRING AS address,
contract_address :: STRING AS contract_address,
b._inserted_timestamp,
{{ dbt_utils.generate_surrogate_key(
['s.block_number','contract_address', 'address']
) }} AS id,
s.data
FROM
{{ source(
'bronze_streamline',
'token_balances'
) }}
s
JOIN meta b
ON b.file_name = metadata$filename
AND b._partition_by_block_id = s._partition_by_block_id
WHERE
b._partition_by_block_id = s._partition_by_block_id
AND (
DATA :error :code IS NULL
OR DATA :error :code NOT IN (
'-32000',
'-32001',
'-32002',
'-32003',
'-32004',
'-32005',
'-32006',
'-32007',
'-32008',
'-32009',
'-32010'
)
)

View File

@ -0,0 +1,54 @@
{{ config (
materialized = 'view'
) }}
WITH meta AS (
SELECT
last_modified AS _inserted_timestamp,
file_name,
TO_NUMBER(SPLIT_PART(file_name, '/', 3)) AS _partition_by_block_id
FROM
TABLE(
information_schema.external_table_file_registration_history(
start_time => DATEADD('day', -3, CURRENT_TIMESTAMP()),
table_name => '{{ source( "bronze_streamline", "token_balances") }}')
) A
)
SELECT
s.block_number :: INTEGER AS block_number,
address :: STRING AS address,
contract_address :: STRING AS contract_address,
b._inserted_timestamp,
{{ dbt_utils.generate_surrogate_key(
['s.block_number', 'contract_address', 'address']
) }} AS id,
s._partition_by_block_id AS _partition_by_block_id,
s.data
FROM
{{ source(
'bronze_streamline',
'token_balances'
) }}
s
JOIN meta b
ON b.file_name = metadata$filename
AND b._partition_by_block_id = s._partition_by_block_id
WHERE
b._partition_by_block_id = s._partition_by_block_id
AND (
DATA :error :code IS NULL
OR DATA :error :code NOT IN (
'-32000',
'-32001',
'-32002',
'-32003',
'-32004',
'-32005',
'-32006',
'-32007',
'-32008',
'-32009',
'-32010'
)
)

View File

@ -0,0 +1,33 @@
-- depends on: {{ ref('bronze__bnb_balances') }}
{{ config (
materialized = "incremental",
unique_key = "id",
cluster_by = "ROUND(block_number, -3)",
post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION on equality(id)",
incremental_predicates = ["dynamic_range", "block_number"],
tags = ['streamline_balances_complete']
) }}
SELECT
block_number,
address,
id,
_inserted_timestamp
FROM
{% if is_incremental() %}
{{ ref('bronze__bnb_balances') }}
WHERE
_inserted_timestamp >= (
SELECT
MAX(_inserted_timestamp) _inserted_timestamp
FROM
{{ this }}
)
{% else %}
{{ ref('bronze__fr_bnb_balances') }}
{% endif %}
qualify(ROW_NUMBER() over (PARTITION BY id
ORDER BY
_inserted_timestamp DESC)) = 1

View File

@ -0,0 +1,34 @@
-- depends on: {{ ref('bronze__token_balances') }}
{{ config (
materialized = "incremental",
unique_key = "id",
cluster_by = "ROUND(block_number, -3)",
post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION on equality(id)",
incremental_predicates = ["dynamic_range", "block_number"],
tags = ['streamline_balances_complete']
) }}
SELECT
block_number,
address,
contract_address,
id,
_inserted_timestamp
FROM
{% if is_incremental() %}
{{ ref('bronze__token_balances') }}
WHERE
_inserted_timestamp >= (
SELECT
MAX(_inserted_timestamp) _inserted_timestamp
FROM
{{ this }}
)
{% else %}
{{ ref('bronze__fr_token_balances') }}
{% endif %}
qualify(ROW_NUMBER() over (PARTITION BY id
ORDER BY
_inserted_timestamp DESC)) = 1

View File

@ -0,0 +1,106 @@
{{ config (
materialized = "view",
tags = ['streamline_balances_history']
) }}
WITH last_3_days AS (
SELECT
block_number
FROM
{{ ref("_max_block_by_date") }}
qualify ROW_NUMBER() over (
ORDER BY
block_number DESC
) = 3
),
traces AS (
SELECT
block_number,
from_address,
to_address
FROM
{{ ref('silver__traces') }}
WHERE
bnb_value > 0
AND trace_status = 'SUCCESS'
AND tx_status = 'SUCCESS'
AND block_number < (
SELECT
block_number
FROM
last_3_days
)
AND block_number BETWEEN {{ var('BALANCES_START') }}
AND {{ var('BALANCES_END') }}
),
stacked AS (
SELECT
DISTINCT block_number,
from_address AS address
FROM
traces
WHERE
from_address IS NOT NULL
AND from_address <> '0x0000000000000000000000000000000000000000'
UNION
SELECT
DISTINCT block_number,
to_address AS address
FROM
traces
WHERE
to_address IS NOT NULL
AND to_address <> '0x0000000000000000000000000000000000000000'
),
FINAL AS (
SELECT
block_number,
address
FROM
stacked
WHERE
block_number IS NOT NULL
EXCEPT
SELECT
block_number,
address
FROM
{{ ref("streamline__complete_bnb_balances") }}
WHERE
block_number < (
SELECT
block_number
FROM
last_3_days
)
AND block_number BETWEEN {{ var('BALANCES_START') }}
AND {{ var('BALANCES_END') }}
)
SELECT
PARSE_JSON(
CONCAT(
'{"jsonrpc": "2.0",',
'"method": "eth_getBalance", "params": ["',
address,
'", "',
REPLACE(
CONCAT(
'0x',
to_char(
block_number :: INTEGER,
'XXXXXXXX'
)
),
' ',
''
),
'"], "id": "',
block_number :: STRING,
'"}'
)
) AS request
FROM
FINAL
ORDER BY
block_number ASC

View File

@ -0,0 +1,125 @@
{{ config (
materialized = "view",
tags = ['streamline_balances_history']
) }}
WITH last_3_days AS (
SELECT
block_number
FROM
{{ ref("_max_block_by_date") }}
qualify ROW_NUMBER() over (
ORDER BY
block_number DESC
) = 3
),
logs AS (
SELECT
CONCAT('0x', SUBSTR(l.topics [1] :: STRING, 27, 42)) AS address1,
CONCAT('0x', SUBSTR(l.topics [2] :: STRING, 27, 42)) AS address2,
l.contract_address,
l.block_number
FROM
{{ ref('silver__logs') }}
l
WHERE
(
l.topics [0] :: STRING = '0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef'
OR (
l.topics [0] :: STRING = '0x7fcf532c15f0a6db0bd6d0e038bea71d30d808c7d98cb3bf7268a95bf5081b65'
AND l.contract_address = '0xbb4cdb9cbd36b01bd1cbaebf2de08d9173bc095c'
)
OR (
l.topics [0] :: STRING = '0xe1fffcc4923d04b559f4d29a8bfc6cda04eb5b0d3c460751c2402c5c5cc9109c'
AND l.contract_address = '0xbb4cdb9cbd36b01bd1cbaebf2de08d9173bc095c'
)
)
AND block_number < (
SELECT
block_number
FROM
last_3_days
)
AND block_number BETWEEN {{ var('BALANCES_START') }}
AND {{ var('BALANCES_END') }}
),
transfers AS (
SELECT
DISTINCT block_number,
contract_address,
address1 AS address
FROM
logs
WHERE
address1 IS NOT NULL
AND address1 <> '0x0000000000000000000000000000000000000000'
UNION
SELECT
DISTINCT block_number,
contract_address,
address2 AS address
FROM
logs
WHERE
address2 IS NOT NULL
AND address2 <> '0x0000000000000000000000000000000000000000'
),
FINAL AS (
SELECT
block_number,
address,
contract_address
FROM
transfers
WHERE
block_number IS NOT NULL
EXCEPT
SELECT
block_number,
address,
contract_address
FROM
{{ ref("streamline__complete_token_balances") }}
WHERE
block_number < (
SELECT
block_number
FROM
last_3_days
)
AND block_number IS NOT NULL
AND block_number BETWEEN {{ var('BALANCES_START') }}
AND {{ var('BALANCES_END') }}
)
SELECT
PARSE_JSON(
CONCAT(
'{"jsonrpc": "2.0",',
'"method": "eth_call", "params": [{ "to": "',
contract_address,
'", "data": "',
'0x70a08231',
-- Method ID for balanceOf(address)
CONCAT(REPEAT('0', 24), RIGHT(address, 40)),
'"}, "',
REPLACE(
CONCAT(
'0x',
to_char(
block_number :: INTEGER,
'XXXXXXXX'
)
),
' ',
''
),
'"], "id": ',
block_number :: STRING,
'}'
)
) AS request
FROM
FINAL
ORDER BY
block_number ASC

View File

@ -0,0 +1,118 @@
{{ config (
materialized = "view",
post_hook = if_data_call_function(
func = "{{this.schema}}.udf_json_rpc(object_construct('sql_source', '{{this.identifier}}', 'external_table', 'bnb_balances', 'method', 'eth_getBalance', 'producer_batch_size',5000, 'producer_limit_size', 5000000, 'worker_batch_size',500))",
target = "{{this.schema}}.{{this.identifier}}"
),
tags = ['streamline_balances_realtime']
) }}
WITH last_3_days AS (
SELECT
block_number
FROM
{{ ref("_max_block_by_date") }}
qualify ROW_NUMBER() over (
ORDER BY
block_number DESC
) = 3
),
traces AS (
SELECT
block_number,
from_address,
to_address
FROM
{{ ref('silver__traces') }}
WHERE
bnb_value > 0
AND trace_status = 'SUCCESS'
AND tx_status = 'SUCCESS'
AND block_number >= (
SELECT
block_number
FROM
last_3_days
)
AND block_timestamp :: DATE >= DATEADD(
'day',
-5,
CURRENT_TIMESTAMP
)
),
stacked AS (
SELECT
DISTINCT block_number,
from_address AS address
FROM
traces
WHERE
from_address IS NOT NULL
AND from_address <> '0x0000000000000000000000000000000000000000'
UNION
SELECT
DISTINCT block_number,
to_address AS address
FROM
traces
WHERE
to_address IS NOT NULL
AND to_address <> '0x0000000000000000000000000000000000000000'
),
FINAL AS (
SELECT
block_number,
address
FROM
stacked
WHERE
block_number IS NOT NULL
EXCEPT
SELECT
block_number,
address
FROM
{{ ref("streamline__complete_bnb_balances") }}
WHERE
block_number >= (
SELECT
block_number
FROM
last_3_days
)
AND _inserted_timestamp :: DATE >= DATEADD(
'day',
-7,
CURRENT_TIMESTAMP
)
)
SELECT
PARSE_JSON(
CONCAT(
'{"jsonrpc": "2.0",',
'"method": "eth_getBalance", "params": ["',
address,
'", "',
REPLACE(
CONCAT(
'0x',
to_char(
block_number :: INTEGER,
'XXXXXXXX'
)
),
' ',
''
),
'"], "id": "',
block_number :: STRING,
'"}'
)
) AS request
FROM
FINAL
ORDER BY
block_number ASC
LIMIT
10 -- TODO: remove this limit

View File

@ -0,0 +1,137 @@
{{ config (
materialized = "view",
post_hook = if_data_call_function(
func = "{{this.schema}}.udf_json_rpc(object_construct('sql_source', '{{this.identifier}}', 'external_table', 'token_balances', 'method', 'eth_call', 'producer_batch_size',5000, 'producer_limit_size', 5000000, 'worker_batch_size',500))",
target = "{{this.schema}}.{{this.identifier}}"
),
tags = ['streamline_balances_realtime']
) }}
WITH last_3_days AS (
SELECT
block_number
FROM
{{ ref("_max_block_by_date") }}
qualify ROW_NUMBER() over (
ORDER BY
block_number DESC
) = 3
),
logs AS (
SELECT
CONCAT('0x', SUBSTR(l.topics [1] :: STRING, 27, 42)) AS address1,
CONCAT('0x', SUBSTR(l.topics [2] :: STRING, 27, 42)) AS address2,
l.contract_address,
l.block_number
FROM
{{ ref('silver__logs') }}
l
WHERE
(
l.topics [0] :: STRING = '0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef'
OR (
l.topics [0] :: STRING = '0x7fcf532c15f0a6db0bd6d0e038bea71d30d808c7d98cb3bf7268a95bf5081b65'
AND l.contract_address = '0xbb4cdb9cbd36b01bd1cbaebf2de08d9173bc095c'
)
OR (
l.topics [0] :: STRING = '0xe1fffcc4923d04b559f4d29a8bfc6cda04eb5b0d3c460751c2402c5c5cc9109c'
AND l.contract_address = '0xbb4cdb9cbd36b01bd1cbaebf2de08d9173bc095c'
)
)
AND block_number >= (
SELECT
block_number
FROM
last_3_days
)
AND block_timestamp :: DATE >= DATEADD(
'day',
-5,
CURRENT_TIMESTAMP
)
),
transfers AS (
SELECT
DISTINCT block_number,
contract_address,
address1 AS address
FROM
logs
WHERE
address1 IS NOT NULL
AND address1 <> '0x0000000000000000000000000000000000000000'
UNION
SELECT
DISTINCT block_number,
contract_address,
address2 AS address
FROM
logs
WHERE
address2 IS NOT NULL
AND address2 <> '0x0000000000000000000000000000000000000000'
),
FINAL AS (
SELECT
block_number,
address,
contract_address
FROM
transfers
WHERE
block_number IS NOT NULL
EXCEPT
SELECT
block_number,
address,
contract_address
FROM
{{ ref("streamline__complete_token_balances") }}
WHERE
block_number >= (
SELECT
block_number
FROM
last_3_days
)
AND block_number IS NOT NULL
AND _inserted_timestamp :: DATE >= DATEADD(
'day',
-7,
CURRENT_TIMESTAMP
)
)
SELECT
PARSE_JSON(
CONCAT(
'{"jsonrpc": "2.0",',
'"method": "eth_call", "params": [{ "to": "',
contract_address,
'", "data": "',
'0x70a08231',
-- Method ID for balanceOf(address)
CONCAT(REPEAT('0', 24), RIGHT(address, 40)),
'"}, "',
REPLACE(
CONCAT(
'0x',
to_char(
block_number :: INTEGER,
'XXXXXXXX'
)
),
' ',
''
),
'"], "id": ',
block_number :: STRING,
'}'
)
) AS request
FROM
FINAL
ORDER BY
block_number ASC
LIMIT
10 -- TODO: remove this limit