balances to streamline

This commit is contained in:
Eric Laurello 2023-12-20 15:05:08 -05:00
parent 8f5b03d17d
commit 78bc689cf5
21 changed files with 720 additions and 53 deletions

View File

@ -27,7 +27,7 @@ jobs:
uses: FlipsideCrypto/analytics-workflow-templates/.github/workflows/dbt_run_template.yml@main
with:
dbt_command: |
dbt run-operation run_sp_bulk_get_balances
dbt run --vars '{"STREAMLINE_INVOKE_STREAMS":True}' -m 1+models/streamline/core/realtime/streamline__pool_balances_realtime.sql 1+models/streamline/core/realtime/streamline__balances_realtime.sql
environment: workflow_prod
warehouse: ${{ vars.WAREHOUSE }}
secrets: inherit

View File

@ -0,0 +1,12 @@
{{ config(
materialized = 'table',
tags = ['daily']
) }}
SELECT
live.udf_api(
'GET',
'https://raw.githubusercontent.com/osmosis-labs/assetlists/main/osmosis-1/osmosis-1.assetlist.json',{},{}
) AS resp,
SYSDATE() AS _inserted_timestamp,
'{{ invocation_id }}' AS _invocation_id

View File

@ -0,0 +1,12 @@
{{ config (
materialized = 'view',
tags = ['core']
) }}
{% set model = this.identifier.split("_") [-1] %}
{{ streamline_external_table_FR_query(
model = 'balances',
partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 3), '_', 1) AS INTEGER)",
partition_name = "_partition_by_block_id",
unique_key = 'block_number'
) }}

View File

@ -0,0 +1,12 @@
{{ config (
materialized = 'view',
tags = ['core']
) }}
{% set model = this.identifier.split("_") [-1] %}
{{ streamline_external_table_FR_query(
model = 'pool_balances',
partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 3), '_', 1) AS INTEGER)",
partition_name = "_partition_by_block_id",
unique_key = 'block_number'
) }}

View File

@ -0,0 +1,12 @@
{{ config (
materialized = 'view',
tags = ['core']
) }}
{% set model = this.identifier.split("_") [-1] %}
{{ streamline_external_table_query(
model = 'balances',
partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 3), '_', 1) AS INTEGER)",
partition_name = "_partition_by_block_id",
unique_key = 'block_number'
) }}

View File

@ -0,0 +1,12 @@
{{ config (
materialized = 'view',
tags = ['core']
) }}
{% set model = this.identifier.split("_") [-1] %}
{{ streamline_external_table_query(
model = 'pool_balances',
partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 3), '_', 1) AS INTEGER)",
partition_name = "_partition_by_block_id",
unique_key = "block_number"
) }}

View File

@ -1,7 +1,8 @@
{{ config(
materialized = 'view',
post_hook = "call silver.sp_bulk_get_asset_metadata()",
tags = ['noncore']
tags = ['noncore'],
enabled = false
) }}
SELECT

View File

@ -1,5 +1,6 @@
{{ config(
materialized = 'view',
enabled = false
) }}
WITH base AS (

View File

@ -1,7 +1,8 @@
{{ config(
materialized = 'view',
post_hook = 'call silver.sp_bulk_get_pool_balances()',
tags = ['noncore']
tags = ['noncore'],
enabled = false
) }}
WITH last_block_of_hour AS (

View File

@ -1,5 +1,7 @@
{{ config(
materialized = 'table',
materialized = 'incremental',
unique_key = "address",
incremental_strategy = 'delete+insert',
tags = ['daily']
) }}
@ -9,7 +11,8 @@ WITH base AS (
base AS address,
NAME AS label,
symbol AS project_name,
denom_units AS raw_metadata
denom_units AS raw_metadata,
'2000-01-01' :: datetime AS _inserted_timestamp
FROM
{{ source(
'bronze_streamline',
@ -20,6 +23,46 @@ WITH base AS (
2,
3,
4
),
combo AS (
SELECT
address,
label,
project_name,
raw_metadata [0] :aliases [0] :: STRING AS alias,
raw_metadata [array_size(raw_metadata)-1] :exponent :: NUMBER AS DECIMAL,
raw_metadata,
COALESCE(
raw_metadata [0] :aliases [0] :: STRING,
raw_metadata [0] :denom :: STRING
) AS denom,
address AS _unique_key,
_inserted_timestamp
FROM
base
UNION ALL
SELECT
COALESCE(
VALUE :denom :: STRING,
base
) AS address,
NAME AS label,
symbol AS project_name,
VALUE :aliases :: STRING AS alias,
VALUE :exponent :: INT AS DECIMAL,
denom_units AS raw_metadata,
COALESCE(
VALUE [0] :aliases [0] :: STRING,
VALUE [0] :denom :: STRING
) AS denom,
address AS _unique_key,
_inserted_timestamp
FROM
{{ ref('silver__github_asset_metadata') }},
LATERAL FLATTEN(
denom_units,
outer => TRUE
)
)
SELECT
'osmosis' AS blockchain,
@ -29,19 +72,12 @@ SELECT
'token_contract' AS label_subtype,
label,
project_name,
raw_metadata [0] :aliases [0] :: STRING AS alias,
raw_metadata [array_size(raw_metadata)-1] :exponent :: NUMBER AS DECIMAL,
alias,
DECIMAL,
raw_metadata,
COALESCE(
raw_metadata [0] :aliases [0] :: STRING,
raw_metadata [0] :denom :: STRING
) AS denom,
concat_ws(
'-',
address,
creator,
blockchain
) AS _unique_key,
denom,
_unique_key,
_inserted_timestamp,
{{ dbt_utils.generate_surrogate_key(
['_unique_key']
) }} AS asset_metadata_id,
@ -49,6 +85,6 @@ SELECT
SYSDATE() AS modified_timestamp,
'{{ invocation_id }}' AS _invocation_id
FROM
base qualify(ROW_NUMBER() over(PARTITION BY blockchain, creator, address
combo qualify(ROW_NUMBER() over(PARTITION BY address
ORDER BY
project_name DESC)) = 1
_inserted_timestamp DESC)) = 1

View File

@ -0,0 +1,102 @@
{{ config(
materialized = 'incremental',
unique_key = ['block_id', 'address', 'currency'],
incremental_strategy = 'delete+insert',
cluster_by = ['_inserted_timestamp::DATE'],
tags = ['balances']
) }}
-- depends_on: {{ ref('bronze__streamline_balances') }}
WITH base AS (
SELECT
bal.block_id,
bal.address,
b.value :denom :: STRING AS currency,
b.value :amount :: INT AS amount,
TO_TIMESTAMP_NTZ(
SUBSTR(SPLIT_PART(metadata$filename, '/', 4), 1, 10) :: NUMBER,
0
) AS _inserted_timestamp
FROM
{{ source(
'bronze_streamline',
'balances_api'
) }}
bal,
LATERAL FLATTEN (
input => balances,
outer => TRUE
) b
{% if is_incremental() %}
WHERE
0 = 1
{% endif %}
),
sl2 AS (
SELECT
A.metadata :request :headers :"x-cosmos-block-height" :: INT AS block_id,
REPLACE(
REPLACE(
A.metadata :request :url,
' {service}/{Authentication}/cosmos/bank/v1beta1/balances/'
),
'?pagination.limit=1000'
) AS address,
b.value :denom :: STRING AS currency,
b.value :amount :: INT AS amount,
_inserted_timestamp
FROM
{% if is_incremental() %}
{{ ref('bronze__streamline_balances') }}
{% else %}
{{ ref('bronze__streamline_FR_balances') }}
{% endif %}
A,
LATERAL FLATTEN (
input => DATA: balances,
outer => TRUE
) b
{% if is_incremental() %}
WHERE
_inserted_timestamp >= (
SELECT
MAX(_inserted_timestamp)
FROM
{{ this }}
)
{% endif %}
),
combo AS (
SELECT
block_id,
address,
currency,
amount,
_inserted_timestamp
FROM
base
UNION ALL
SELECT
block_id,
address,
currency,
amount,
_inserted_timestamp
FROM
sl2
)
SELECT
block_id,
address,
currency,
amount,
_inserted_timestamp
FROM
combo qualify(ROW_NUMBER() over(PARTITION BY block_id, address, currency
ORDER BY
_inserted_timestamp DESC)) = 1

View File

@ -0,0 +1,52 @@
version: 2
models:
- name: silver__balances
description: A table containing all the liquid balances
tests:
- dbt_utils.unique_combination_of_columns:
combination_of_columns:
- BLOCK_ID
- ADDRESS
- CURRENCY
columns:
- name: BLOCK_ID
description: "{{ doc('block_id') }}"
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- NUMBER
- FLOAT
- name: ADDRESS
description: "{{ doc('address') }}"
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- STRING
- VARCHAR
- dbt_expectations.expect_column_values_to_match_regex:
regex: osmo1[0-9a-z]{38,38}
- name: CURRENCY
description: "{{ doc('currency') }}"
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- STRING
- VARCHAR
- name: AMOUNT
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- NUMBER
- FLOAT
- name: _INSERTED_TIMESTAMP
description: "{{ doc('ingested_at') }}"
tests:
- not_null
- dbt_expectations.expect_row_values_to_have_recent_data:
datepart: day
interval: 2

View File

@ -0,0 +1,48 @@
{{ config(
materialized = 'incremental',
unique_key = "base",
incremental_strategy = 'delete+insert',
tags = ['daily']
) }}
WITH base AS (
SELECT
resp,
_inserted_timestamp
FROM
{{ ref("bronze_api__github_asset_metadata") }}
{% if is_incremental() %}
WHERE
_inserted_timestamp >= (
SELECT
MAX(
_inserted_timestamp
)
FROM
{{ this }}
)
{% endif %}
qualify(ROW_NUMBER() over (
ORDER BY
_inserted_timestamp DESC) = 1)
)
SELECT
VALUE :base :: STRING AS base,
VALUE :coingecko_id :: STRING AS coingecko_id,
VALUE :denom_units AS denom_units,
VALUE :description :: STRING AS description,
VALUE :display :: STRING AS display,
VALUE :keywords AS keywords,
VALUE :logo_URIs AS logo_URIs,
VALUE :name :: STRING AS NAME,
VALUE :symbol :: STRING AS symbol,
VALUE :traces: AS traces,
_inserted_timestamp
FROM
base,
LATERAL FLATTEN(resp :data :assets) qualify(ROW_NUMBER() over (PARTITION BY base
ORDER BY
_inserted_timestamp DESC) = 1)

View File

@ -12,22 +12,15 @@ WITH base AS (
bal.block_id,
bl.block_timestamp,
bal.address,
b.value :denom :: STRING AS currency,
b.value :amount :: INT AS balance,
TO_TIMESTAMP_NTZ(
SUBSTR(SPLIT_PART(metadata$filename, '/', 4), 1, 10) :: NUMBER,
0
) AS _inserted_timestamp
bal.currency,
bal.amount AS balance,
bal._inserted_timestamp
FROM
{{ source(
'bronze_streamline',
'balances_api'
) }}
{{ ref('silver__balances') }}
bal
LEFT OUTER JOIN {{ ref('silver__blocks') }}
bl
ON bal.block_id = bl.block_id
LEFT OUTER JOIN TABLE(FLATTEN (input => balances, outer => TRUE)) b
{% if is_incremental() %}
WHERE
@ -43,7 +36,7 @@ WHERE
FROM
{{ this }}
)
OR b.value :denom :: STRING IN (
OR currency IN (
SELECT
currency
FROM
@ -57,7 +50,7 @@ WHERE
qualify(ROW_NUMBER() over(PARTITION BY bal.block_id, bal.address, currency
ORDER BY
_inserted_timestamp DESC)) = 1
bal._inserted_timestamp DESC)) = 1
),
tbl AS (

View File

@ -6,7 +6,62 @@
cluster_by = ['block_timestamp'],
tags = ['noncore']
) }}
-- depends_on: {{ ref('bronze__streamline_pool_balances') }}
WITH base AS (
SELECT
A.block_id,
A.pools,
A._INSERTED_DATE AS _INSERTED_TIMESTAMP
FROM
{{ source(
'bronze_streamline',
'pool_balances_api'
) }} A
{% if is_incremental() %}
WHERE
0 = 1
{% endif %}
),
sl2 AS (
SELECT
VALUE :metadata :request :headers :"x-cosmos-block-height" :: INT AS block_id,
DATA :pools AS pools,
_INSERTED_TIMESTAMP
FROM
{% if is_incremental() %}
{{ ref('bronze__streamline_pool_balances') }}
{% else %}
{{ ref('bronze__streamline_FR_pool_balances') }}
{% endif %}
{% if is_incremental() %}
WHERE
_inserted_timestamp >= (
SELECT
MAX(_inserted_timestamp)
FROM
{{ this }}
)
{% endif %}
),
combo AS (
SELECT
block_id,
pools,
_INSERTED_TIMESTAMP
FROM
sl2
UNION ALL
SELECT
block_id,
pools,
_INSERTED_TIMESTAMP
FROM
base
)
SELECT
A.block_id,
C.block_timestamp,
@ -96,7 +151,7 @@ SELECT
) :: bigint AS total_weight,
b.value :scaling_factor_controller :: STRING AS scaling_factor_controller,
b.value :scaling_factors AS scaling_factors,
_INSERTED_DATE AS _inserted_timestamp,
A._inserted_timestamp,
{{ dbt_utils.generate_surrogate_key(
['pool_id','a.block_id']
) }} AS pool_balances_id,
@ -104,28 +159,11 @@ SELECT
SYSDATE() AS modified_timestamp,
'{{ invocation_id }}' AS _invocation_id
FROM
{{ source(
'bronze_streamline',
'pool_balances_api'
) }} A
combo A
JOIN LATERAL FLATTEN(
A.pools
) b
JOIN {{ ref('silver__blocks') }} C
ON A.block_id = C.block_id
{% if is_incremental() %}
WHERE
_INSERTED_DATE >= (
SELECT
MAX(
_inserted_timestamp
)
FROM
{{ this }}
)
{% endif %}
qualify (ROW_NUMBER() over (PARTITION BY A.block_id, pool_id
ON A.block_id = C.block_id qualify (ROW_NUMBER() over (PARTITION BY A.block_id, pool_id
ORDER BY
_inserted_timestamp DESC) = 1)
A._inserted_timestamp DESC) = 1)

View File

@ -29,6 +29,8 @@ sources:
- name: blocks
- name: transactions
- name: txcount
- name: pool_balances
- name: balances
- name: crosschain_silver
database: "{{ 'crosschain' if target.database == 'OSMOSIS' else 'crosschain_dev' }}"
schema: silver

View File

@ -0,0 +1,42 @@
-- depends_on: {{ ref('bronze__streamline_balances') }}
{{ config (
materialized = "incremental",
unique_key = ["block_number","address"],
cluster_by = "ROUND(block_number, -3)",
post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION on equality(block_number,address)"
) }}
WITH base AS (
SELECT
REPLACE(
REPLACE(
VALUE :metadata :request :url,
'{service}/{Authentication}/cosmos/bank/v1beta1/balances/'
),
'?pagination.limit=1000'
) AS address,
VALUE :metadata :request :headers :"x-cosmos-block-height" :: INT AS block_number,
_inserted_timestamp
FROM
{% if is_incremental() %}
{{ ref('bronze__streamline_balances') }}
WHERE
_inserted_timestamp >= (
SELECT
COALESCE(MAX(_inserted_timestamp), '2000-01-01' :: datetime) _inserted_timestamp
FROM
{{ this }})
{% else %}
{{ ref('bronze__streamline_FR_balances') }}
{% endif %}
)
SELECT
address,
block_number,
_inserted_timestamp
FROM
base qualify(ROW_NUMBER() over (PARTITION BY block_number, address
ORDER BY
_inserted_timestamp DESC)) = 1

View File

@ -0,0 +1,34 @@
-- depends_on: {{ ref('bronze__streamline_pool_balances') }}
{{ config (
materialized = "incremental",
unique_key = "block_number",
cluster_by = "ROUND(block_number, -3)",
post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION on equality(block_number)"
) }}
WITH base AS (
SELECT
VALUE :metadata :request :headers :"x-cosmos-block-height" :: INT AS block_number,
_inserted_timestamp
FROM
{% if is_incremental() %}
{{ ref('bronze__streamline_pool_balances') }}
WHERE
_inserted_timestamp >= (
SELECT
COALESCE(MAX(_inserted_timestamp), '2000-01-01' :: datetime) _inserted_timestamp
FROM
{{ this }})
{% else %}
{{ ref('bronze__streamline_FR_pool_balances') }}
{% endif %}
)
SELECT
block_number,
_inserted_timestamp
FROM
base qualify(ROW_NUMBER() over (PARTITION BY block_number
ORDER BY
_inserted_timestamp DESC)) = 1

View File

@ -0,0 +1,47 @@
{{ config (
materialized = "view",
post_hook = if_data_call_function(
func = "{{this.schema}}.udf_bulk_rest_api(object_construct('sql_source', '{{this.identifier}}', 'external_table', 'balances', 'sql_limit', {{var('sql_limit','1000000')}}, 'producer_batch_size', {{var('producer_batch_size','1000')}}, 'worker_batch_size', {{var('worker_batch_size','100')}}, 'sm_secret_name','prod/osmosis/allthatnode/mainnet-archive/rest'))",
target = "{{this.schema}}.{{this.identifier}}"
)
) }}
WITH blocks_to_call AS(
SELECT
block_id AS block_number,
address
FROM
{{ ref('streamline__balances') }}
EXCEPT
SELECT
block_number,
address
FROM
{{ ref('streamline__complete_balances') }}
),
calls AS (
SELECT
'{service}/{Authentication}/cosmos/bank/v1beta1/balances/' || address || '?pagination.limit=10000' AS calls,
block_number
FROM
blocks_to_call
)
SELECT
ARRAY_CONSTRUCT(
block_number,
ARRAY_CONSTRUCT(
'GET',
calls,
OBJECT_CONSTRUCT(
'x-cosmos-block-height',
block_number :: STRING
),
PARSE_JSON('{}'),
''
)
) AS request
FROM
calls
ORDER BY
block_number

View File

@ -0,0 +1,61 @@
{{ config (
materialized = "view",
post_hook = if_data_call_function(
func = "{{this.schema}}.udf_bulk_rest_api(object_construct('sql_source', '{{this.identifier}}', 'external_table', 'pool_balances', 'sql_limit', {{var('sql_limit','1000')}}, 'producer_batch_size', {{var('producer_batch_size','200')}}, 'worker_batch_size', {{var('worker_batch_size','20')}}, 'sm_secret_name','prod/osmosis/allthatnode/mainnet-archive/rest'))",
target = "{{this.schema}}.{{this.identifier}}"
)
) }}
WITH last_block_of_hour AS (
SELECT
DATE_TRUNC(
'hour',
block_timestamp
) AS block_hour,
MAX(block_id) AS block_number
FROM
{{ ref('silver__blocks') }}
WHERE
block_id >= 12791459 -- last block pulled via the old streamline
GROUP BY
1
),
blocks_to_call AS(
SELECT
block_number
FROM
last_block_of_hour
EXCEPT
SELECT
block_number
FROM
{{ ref(
'streamline__complete_pool_balances'
) }}
),
calls AS (
SELECT
'{service}/{Authentication}/osmosis/gamm/v1beta1/pools?pagination.limit=10000' AS calls,
block_number
FROM
blocks_to_call
)
SELECT
ARRAY_CONSTRUCT(
block_number,
ARRAY_CONSTRUCT(
'GET',
calls,
OBJECT_CONSTRUCT(
'x-cosmos-block-height',
block_number :: STRING
),
PARSE_JSON('{}'),
''
)
) AS request
FROM
calls
ORDER BY
block_number

View File

@ -0,0 +1,149 @@
{{ config (
materialized = "incremental",
unique_key = ["block_id","address"],
tags = ['streamline_view']
) }}
-- depends_on: {{ ref('silver__blocks') }}
WITH base AS (
SELECT
block_id,
block_timestamp,
tx_id,
msg_type,
attribute_key,
attribute_value
FROM
{{ ref('silver__msg_attributes') }}
WHERE
(
RLIKE(
attribute_value,
'osmo\\w{39}'
)
OR (
msg_type = 'message'
AND attribute_key = 'action'
AND attribute_value = 'superfluid_delegate'
)
)
AND block_id > 12782446 -- last block pulled via old process
{% if is_incremental() %}
AND block_timestamp :: DATE >= (
SELECT
MAX(
block_timestamp :: DATE
)
FROM
(
SELECT
MAX(block_id) block_id
FROM
{{ this }}
) A
JOIN {{ ref('silver__blocks') }}
b
ON A.block_id = b.block_id
)
{% endif %}
),
wallets_per_block AS (
SELECT
DISTINCT block_id,
block_timestamp :: DATE AS block_timestamp_date,
attribute_value AS address
FROM
base
WHERE
RLIKE(
attribute_value,
'osmo\\w{39}'
)
AND block_id > 2383300
AND block_timestamp_date < CURRENT_DATE
),
max_block_id_per_date AS (
SELECT
block_timestamp_date,
MAX(block_id) AS max_block_id
FROM
wallets_per_block
GROUP BY
1
),
max_block_id_per_date_all AS (
SELECT
block_timestamp :: DATE AS block_timestamp_date,
MAX(block_id) AS max_block_id
FROM
base
WHERE
block_id > 2383300
GROUP BY
1
),
unique_address_per_block_date AS (
SELECT
DISTINCT max_block_id,
address
FROM
wallets_per_block b
LEFT OUTER JOIN max_block_id_per_date d
ON d.block_timestamp_date = b.block_timestamp_date
),
all_lp_wallets AS (
SELECT
liquidity_provider_address AS address
FROM
{{ ref('silver__liquidity_provider_actions') }}
WHERE
action = 'pool_joined'
GROUP BY
1
UNION
SELECT
delegator_address AS address
FROM
{{ ref('silver__superfluid_actions') }}
GROUP BY
1
),
possible_balances_needed AS (
SELECT
max_block_id AS block_id,
address
FROM
unique_address_per_block_date
UNION
SELECT
max_block_id,
address
FROM
all_lp_wallets
CROSS JOIN (
SELECT
DISTINCT max_block_id
FROM
unique_address_per_block_date
)
UNION
SELECT
max_block_id,
address
FROM
{{ ref('bronze__balance_addresses_everyday') }}
CROSS JOIN (
SELECT
DISTINCT max_block_id
FROM
max_block_id_per_date_all
)
)
SELECT
block_id,
address,
SYSDATE() AS inserted_timestamp,
'{{ invocation_id }}' AS _invocation_id
FROM
possible_balances_needed