An 1882/solana backfill (#84)

* wip

* streamline models and resources to run txs backfill

* models for solana blocks/txs/rewards raw data loading

* add tasks, they do not run w/ dbt run-operation cmd

* make tasks executable by dbt run-operation

* add basic tests for streamline materialized models
This commit is contained in:
desmond-hui 2022-08-18 11:18:57 -07:00 committed by GitHub
parent c37fc3ef5c
commit 8878e9f5db
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
19 changed files with 562 additions and 0 deletions

View File

@ -34,6 +34,7 @@ on-run-start:
- '{{ sp_create_bulk_get_validator_metadata() }}'
- '{{ sp_create_bulk_get_stake_account_tx_ids() }}'
- '{{ sp_create_bulk_get_txs() }}'
- '{{ sp_refresh_external_table_batch() }}'
# Configuring models

View File

@ -4,6 +4,9 @@
{{ udf_bulk_get_validator_metadata() }};
{{ udf_bulk_get_stake_account_tx_ids() }};
{{ udf_bulk_get_txs() }};
{{ udf_bulk_get_blocks() }};
{{ udf_bulk_get_block_txs() }};
{{ udf_bulk_get_block_rewards() }};
{{ create_udf_ordered_signers(
schema = "silver"
) }}

View File

@ -0,0 +1,55 @@
{% macro task_bulk_get_block_rewards_historical() %}
{% set sql %}
execute immediate 'create or replace task streamline.bulk_get_block_rewards_historical
warehouse = dbt_cloud_solana
allow_overlapping_execution = false
schedule = \'USING CRON */3 * * * * UTC\'
as
BEGIN
call streamline.refresh_external_table_next_batch(\'block_rewards_api\',\'complete_block_rewards\');
create or replace temporary table streamline.complete_block_rewards__dbt_tmp as
(
select *
from (
SELECT
block_id,
_partition_id
FROM
bronze.block_rewards_api AS s
WHERE
s.block_id IS NOT NULL
AND s._partition_id > (
select
coalesce(max(_partition_id),0)
from
streamline.complete_block_rewards
)
group by 1,2
)
order by (_partition_id)
);
merge into streamline.complete_block_rewards as DBT_INTERNAL_DEST
using streamline.complete_block_rewards__dbt_tmp as DBT_INTERNAL_SOURCE
on DBT_INTERNAL_SOURCE.block_id = DBT_INTERNAL_DEST.block_id
when matched then
update set
_partition_id = DBT_INTERNAL_SOURCE._partition_id
when not matched then
insert ("BLOCK_ID", "_PARTITION_ID")
values ("BLOCK_ID", "_PARTITION_ID");
select streamline.udf_bulk_get_block_rewards()
where exists (
select 1
from streamline.all_unknown_block_rewards_historical
limit 1
);
END;'
{% endset %}
{% do run_query(sql) %}
{% set sql %}
alter task streamline.bulk_get_block_rewards_historical resume;
{% endset %}
{% do run_query(sql) %}
{% endmacro %}

View File

@ -0,0 +1,8 @@
{% macro udf_bulk_get_block_rewards() %}
CREATE
OR REPLACE EXTERNAL FUNCTION streamline.udf_bulk_get_block_rewards() returns text api_integration = aws_solana_api_dev AS {% if target.name == "prod" -%}
'https://pj4rqb8z96.execute-api.us-east-1.amazonaws.com/prod/bulk_get_block_rewards'
{% else %}
'https://11zlwk4fm3.execute-api.us-east-1.amazonaws.com/dev/bulk_get_block_rewards'
{%- endif %}
{% endmacro %}

View File

@ -0,0 +1,57 @@
{% macro task_bulk_get_block_txs_historical() %}
{% set sql %}
execute immediate 'create or replace task streamline.bulk_get_block_txs_historical
warehouse = dbt_cloud_solana
allow_overlapping_execution = false
schedule = \'USING CRON */6 * * * * UTC\'
as
BEGIN
call streamline.refresh_external_table_next_batch(\'block_txs_api\',\'complete_block_txs\');
create or replace temporary table streamline.complete_block_txs__dbt_tmp as
(
select *
from (
SELECT
block_id,
_partition_id
FROM
bronze.block_txs_api AS s
WHERE
s.block_id IS NOT NULL
AND s._partition_id > (
select
max(_partition_id)
from
streamline.complete_block_txs
)
group by 1,2
)
order by (_partition_id)
);
merge into streamline.complete_block_txs as DBT_INTERNAL_DEST
using streamline.complete_block_txs__dbt_tmp as DBT_INTERNAL_SOURCE
on DBT_INTERNAL_SOURCE.block_id = DBT_INTERNAL_DEST.block_id
when matched then
update set _partition_id = DBT_INTERNAL_SOURCE._partition_id
when not matched then
insert ("BLOCK_ID", "_PARTITION_ID")
values ("BLOCK_ID", "_PARTITION_ID");
select streamline.udf_bulk_get_block_txs()
where exists (
select 1
from streamline.all_unknown_block_txs_historical
limit 1
);
END;'
{% endset %}
{% do run_query(sql) %}
{% set sql %}
alter task streamline.bulk_get_block_txs_historical resume;
{% endset %}
{% do run_query(sql) %}
{% endmacro %}

View File

@ -0,0 +1,8 @@
{% macro udf_bulk_get_block_txs() %}
CREATE
OR REPLACE EXTERNAL FUNCTION streamline.udf_bulk_get_block_txs() returns text api_integration = aws_solana_api_dev AS {% if target.name == "prod" -%}
'https://pj4rqb8z96.execute-api.us-east-1.amazonaws.com/prod/bulk_get_block_txs'
{% else %}
'https://11zlwk4fm3.execute-api.us-east-1.amazonaws.com/dev/bulk_get_block_txs'
{%- endif %}
{% endmacro %}

View File

@ -0,0 +1,81 @@
{% macro task_bulk_get_blocks_historical() %}
{% set sql %}
execute immediate 'create or replace task streamline.bulk_get_blocks_historical
warehouse = dbt_cloud_solana
allow_overlapping_execution = false
schedule = \'USING CRON */5 * * * * UTC\'
as
BEGIN
alter external table bronze.blocks_api refresh;
create or replace temporary table streamline.complete_blocks__dbt_tmp as
(
select *
from (
WITH meta AS (
SELECT
registered_on,
file_name
FROM
TABLE(
information_schema.external_table_files(
table_name => \'bronze.blocks_api\'
)
) A
WHERE
registered_on >= (
SELECT
COALESCE(MAX(_INSERTED_TIMESTAMP), \'1970-01-01\' :: DATE) max_INSERTED_TIMESTAMP
FROM
streamline.complete_blocks
)
)
SELECT
block_id,
_inserted_date,
m.registered_on as _inserted_timestamp
FROM
bronze.blocks_api AS s
JOIN meta m
ON m.file_name = metadata$filename
WHERE
s.block_id IS NOT NULL
AND s._inserted_date >= CURRENT_DATE
AND m.registered_on > (
SELECT
max(_inserted_timestamp)
FROM
streamline.complete_blocks
)
qualify(ROW_NUMBER() over (PARTITION BY block_id
ORDER BY
_inserted_date, _inserted_timestamp DESC)) = 1
)
order by (_inserted_date)
);
merge into streamline.complete_blocks as DBT_INTERNAL_DEST
using streamline.complete_blocks__dbt_tmp as DBT_INTERNAL_SOURCE
on DBT_INTERNAL_SOURCE.block_id = DBT_INTERNAL_DEST.block_id
when matched then
update set
_inserted_date = DBT_INTERNAL_SOURCE._inserted_date,
_inserted_timestamp = DBT_INTERNAL_SOURCE._inserted_timestamp
when not matched then
insert ("BLOCK_ID", "_INSERTED_DATE", "_INSERTED_TIMESTAMP")
values ("BLOCK_ID", "_INSERTED_DATE", "_INSERTED_TIMESTAMP");
select streamline.udf_bulk_get_blocks()
where exists (
select 1
from streamline.all_unknown_blocks_historical
limit 1
);
END;'
{% endset %}
{% do run_query(sql) %}
{% set sql %}
alter task streamline.bulk_get_blocks_historical resume;
{% endset %}
{% do run_query(sql) %}
{% endmacro %}

View File

@ -0,0 +1,8 @@
{% macro udf_bulk_get_blocks() %}
CREATE
OR REPLACE EXTERNAL FUNCTION streamline.udf_bulk_get_blocks() returns text api_integration = aws_solana_api_dev AS {% if target.name == "prod" -%}
'https://pj4rqb8z96.execute-api.us-east-1.amazonaws.com/prod/bulk_get_blocks'
{% else %}
'https://11zlwk4fm3.execute-api.us-east-1.amazonaws.com/dev/bulk_get_blocks'
{%- endif %}
{% endmacro %}

View File

@ -0,0 +1,26 @@
{% macro sp_refresh_external_table_batch() %}
{% set sql %}
create or replace procedure streamline.refresh_external_table_next_batch(external_table_name string, streamline_table_name string)
returns string
language sql
as
$$
declare
path string;
select_stmt string;
refresh_stmt string;
res resultset;
begin
select_stmt := 'select concat(\'batch=\',max(_partition_id)+1,\'/\') as path from streamline.' || :streamline_table_name;
res := (execute immediate :select_stmt);
let c1 cursor for res;
for row_variable in c1 do
path := row_variable.path;
end for;
refresh_stmt := 'alter external table bronze.' || :external_table_name || ' refresh \'' || :PATH || '\'';
res := (execute immediate :refresh_stmt);
return 'table refreshed with ' || :refresh_stmt;
end;
$${% endset %}
{% do run_query(sql) %}
{% endmacro %}

View File

@ -86,4 +86,63 @@ sources:
data_type: string
- name: data
data_type: variant
- name: blocks_api
description: "External table of solana stake account tx ids"
external:
location: "@solana.bronze.analytics_external_tables/{{target.database}}/BLOCKS_API"
file_format: "( type = json, strip_outer_array = TRUE )"
auto_refresh: true
partitions:
- name: _inserted_date
data_type: string
expression: substr((split_part(METADATA$FILENAME,'/',3)),16,10)
columns:
- name: metadata
data_type: variant
- name: block_id
data_type: integer
- name: data
data_type: variant
- name: error
data_type: variant
- name: block_rewards_api
description: "External table of solana stake account tx ids"
external:
location: "@solana.bronze.analytics_external_tables/{{target.database}}/BLOCK_REWARDS_API"
file_format: "( type = json, strip_outer_array = TRUE )"
auto_refresh: true
partitions:
- name: _partition_id
data_type: number
expression: to_number(split_part(split_part(METADATA$FILENAME,'/',3),'=',2))
columns:
- name: metadata
data_type: variant
- name: block_id
data_type: integer
- name: data
data_type: variant
- name: error
data_type: variant
- name: block_txs_api
description: "External table of solana stake account tx ids"
external:
location: "@solana.bronze.analytics_external_tables/{{target.database}}/BLOCK_TXS_API"
file_format: "( type = json, strip_outer_array = TRUE )"
auto_refresh: true
partitions:
- name: _partition_id
data_type: number
expression: to_number(split_part(split_part(METADATA$FILENAME,'/',3),'=',2))
columns:
- name: metadata
data_type: variant
- name: block_id
data_type: integer
- name: tx_id
data_type: string
- name: data
data_type: variant
- name: error
data_type: variant

View File

@ -0,0 +1,28 @@
{{ config(
materialized = 'view'
) }}
WITH pre_final AS (
SELECT
SEQ8() AS block_id
FROM
TABLE(GENERATOR(rowcount => 1000000000))
WHERE
block_id <= 98680445
EXCEPT
SELECT
block_id
FROM
{{ ref('streamline__complete_block_rewards') }}
)
SELECT
block_id,
(
SELECT
coalesce(MAX(_partition_id) + 1,1)
FROM
{{ ref('streamline__complete_block_rewards') }}
) AS batch_id
FROM
pre_final

View File

@ -0,0 +1,28 @@
{{ config(
materialized = 'view'
) }}
WITH pre_final AS (
SELECT
SEQ8() AS block_id
FROM
TABLE(GENERATOR(rowcount => 1000000000))
WHERE
block_id <= 98680445
EXCEPT
SELECT
block_id
FROM
{{ ref('streamline__complete_block_txs') }}
)
SELECT
block_id,
(
SELECT
coalesce(MAX(_partition_id) + 1,1)
FROM
{{ ref('streamline__complete_block_txs') }}
) AS batch_id
FROM
pre_final

View File

@ -0,0 +1,14 @@
{{ config(
materialized = 'view'
) }}
SELECT
SEQ8() AS block_id
FROM
TABLE(GENERATOR(rowcount => 1000000000))
WHERE block_id <= 98680445
EXCEPT
SELECT
block_id
from
{{ ref('streamline__complete_blocks') }}

View File

@ -0,0 +1,39 @@
{{ config (
materialized = "incremental",
unique_key = "block_id",
cluster_by = "_partition_id",
merge_update_columns = ["_partition_id"]
) }}
WITH meta AS (
SELECT
registered_on,
file_name
FROM
TABLE(
information_schema.external_table_files(
table_name => '{{ source( "solana_external", "block_rewards_api") }}'
)
) A
)
SELECT
block_id,
_partition_id
FROM
{{ source(
"solana_external",
"block_rewards_api"
) }} AS s
WHERE
s.block_id IS NOT NULL
{% if is_incremental() %}
AND s._partition_id > (
select
coalesce(max(_partition_id),0)
from
{{ this }}
)
{% endif %}
group by 1,2

View File

@ -0,0 +1,16 @@
version: 2
models:
- name: streamline__complete_block_rewards
tests:
- dbt_utils.unique_combination_of_columns:
combination_of_columns:
- BLOCK_ID
columns:
- name: BLOCK_ID
description: "{{ doc('block_id') }}"
tests:
- not_null
- name: _PARTITION_ID
description: Value representing a single pipeline execution batch
tests:
- not_null

View File

@ -0,0 +1,44 @@
{{ config (
materialized = "incremental",
unique_key = "block_id",
cluster_by = "_partition_id",
merge_update_columns = ["_partition_id"]
) }}
WITH meta AS (
SELECT
registered_on,
file_name
FROM
TABLE(
information_schema.external_table_files(
table_name => '{{ source( "solana_external", "block_txs_api") }}'
)
) A
)
SELECT
block_id,
_partition_id
FROM
{{ source(
"solana_external",
"block_txs_api"
) }} AS s
WHERE
s.block_id IS NOT NULL
{% if is_incremental() %}
AND s._partition_id > (
select
coalesce(max(_partition_id),0)
from
{{ this }}
)
{% endif %}
group by 1,2
{% if not is_incremental() %}
qualify(ROW_NUMBER() over (PARTITION BY block_id
ORDER BY
_partition_id DESC)) = 1
{% endif %}

View File

@ -0,0 +1,16 @@
version: 2
models:
- name: streamline__complete_block_txs
tests:
- dbt_utils.unique_combination_of_columns:
combination_of_columns:
- BLOCK_ID
columns:
- name: BLOCK_ID
description: "{{ doc('block_id') }}"
tests:
- not_null
- name: _PARTITION_ID
description: Value representing a single pipeline execution batch
tests:
- not_null

View File

@ -0,0 +1,53 @@
{{ config (
materialized = "incremental",
unique_key = "block_id",
cluster_by = "_inserted_date",
merge_update_columns = ["_inserted_date","_inserted_timestamp"]
) }}
WITH meta AS (
SELECT
registered_on,
file_name
FROM
TABLE(
information_schema.external_table_files(
table_name => '{{ source( "solana_external", "blocks_api") }}'
)
) A
{% if is_incremental() %}
WHERE
registered_on >= (
SELECT
COALESCE(MAX(_INSERTED_TIMESTAMP), '1970-01-01' :: DATE) max_INSERTED_TIMESTAMP
FROM
{{ this }})
{% endif %}
)
SELECT
block_id,
_inserted_date,
m.registered_on as _inserted_timestamp
FROM
{{ source(
"solana_external",
"blocks_api"
) }} AS s
JOIN meta m
ON m.file_name = metadata$filename
WHERE
s.block_id IS NOT NULL
{% if is_incremental() %}
AND s._inserted_date >= CURRENT_DATE
AND m.registered_on > (
SELECT
max(_inserted_timestamp)
FROM
{{ this }}
)
{% endif %}
qualify(ROW_NUMBER() over (PARTITION BY block_id
ORDER BY
_inserted_date, _inserted_timestamp DESC)) = 1

View File

@ -0,0 +1,18 @@
version: 2
models:
- name: streamline__complete_blocks
tests:
- dbt_utils.unique_combination_of_columns:
combination_of_columns:
- BLOCK_ID
columns:
- name: BLOCK_ID
description: "{{ doc('block_id') }}"
tests:
- not_null
- name: _INSERTED_DATE
description: Date that the external file was inserted into s3
tests:
- not_null
- name: _INSERTED_TIMESTAMP
description: "{{ doc('_inserted_timestamp') }}"