An 2134/migrate external tables (#103)

* reference new external table location

* delete line

* change warehouse
This commit is contained in:
desmond-hui 2022-09-15 07:38:07 -07:00 committed by GitHub
parent af2ce900af
commit 8a00a94959
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
20 changed files with 252 additions and 232 deletions

View File

@ -34,7 +34,6 @@ on-run-start:
- '{{ sp_create_bulk_get_validator_metadata() }}'
- '{{ sp_create_bulk_get_stake_account_tx_ids() }}'
- '{{ sp_create_bulk_get_txs() }}'
- '{{ sp_refresh_external_table_batch() }}'
# Configuring models

View File

@ -14,8 +14,7 @@ BEGIN
SELECT
block_id,
_partition_id
FROM
bronze.block_rewards_api AS s
FROM streamline.{{ target.database }}.block_rewards_api AS s
WHERE
s.block_id IS NOT NULL
AND s._partition_id > (
@ -53,10 +52,16 @@ END;'
{% endset %}
{% do run_query(sql) %}
{% if target.database == 'SOLANA' %}
{% set sql %}
alter task streamline.bulk_get_block_rewards_historical resume;
{% endset %}
{% do run_query(sql) %}
{% endif %}
/* no backfills atm so we can suspend in prod also */
{% set sql %}
alter task streamline.bulk_get_block_rewards_historical suspend;
{% endset %}
{% do run_query(sql) %}
-- {% if target.database == 'SOLANA' %}
-- {% set sql %}
-- alter task streamline.bulk_get_block_rewards_historical resume;
-- {% endset %}
-- {% do run_query(sql) %}
-- {% endif %}
{% endmacro %}

View File

@ -1,25 +1,55 @@
-- {% macro task_bulk_get_block_rewards_real_time() %}
-- {% set sql %}
-- execute immediate 'create or replace task streamline.bulk_get_block_rewards_real_time
-- warehouse = dbt_cloud_solana
-- allow_overlapping_execution = false
-- schedule = \'USING CRON */15 * * * * UTC\'
-- as
-- BEGIN
-- select streamline.udf_bulk_get_block_rewards(TRUE)
-- where exists (
-- select 1
-- from streamline.all_unknown_block_rewards_real_time
-- limit 1
-- );
-- END;'
-- {% endset %}
-- {% do run_query(sql) %}
{% macro task_bulk_get_block_rewards_real_time() %}
{% set sql %}
execute immediate 'create or replace task streamline.bulk_get_block_rewards_real_time
warehouse = dbt_cloud_solana
allow_overlapping_execution = false
schedule = \'USING CRON */15 * * * * UTC\'
as
BEGIN
call streamline.refresh_external_table_next_batch(\'block_rewards_api\',\'complete_block_rewards\');
create or replace temporary table streamline.complete_block_rewards__dbt_tmp as
(
select *
from (
SELECT
block_id,
_partition_id
FROM streamline.{{ target.database }}.block_rewards_api AS s
WHERE
s.block_id IS NOT NULL
AND s._partition_id > (
select
coalesce(max(_partition_id),0)
from
streamline.complete_block_rewards
)
group by 1,2
)
order by (_partition_id)
);
merge into streamline.complete_block_rewards as DBT_INTERNAL_DEST
using streamline.complete_block_rewards__dbt_tmp as DBT_INTERNAL_SOURCE
on DBT_INTERNAL_SOURCE.block_id = DBT_INTERNAL_DEST.block_id
when matched then
update set
_partition_id = DBT_INTERNAL_SOURCE._partition_id
when not matched then
insert ("BLOCK_ID", "_PARTITION_ID")
values ("BLOCK_ID", "_PARTITION_ID");
select streamline.udf_bulk_get_block_rewards(TRUE)
where exists (
select 1
from streamline.all_unknown_block_rewards_real_time
limit 1
);
END;'
{% endset %}
{% do run_query(sql) %}
-- {% if target.database == 'SOLANA' %}
-- {% set sql %}
-- alter task streamline.bulk_get_block_rewards_real_time resume;
-- {% endset %}
-- {% do run_query(sql) %}
-- {% endif %}
-- {% endmacro %}
{% if target.database == 'SOLANA' %}
{% set sql %}
alter task streamline.bulk_get_block_rewards_real_time resume;
{% endset %}
{% do run_query(sql) %}
{% endif %}
{% endmacro %}

View File

@ -1,7 +1,7 @@
{% macro task_bulk_get_block_txs_historical() %}
{% set sql %}
execute immediate 'create or replace task streamline.bulk_get_block_txs_historical
warehouse = dbt_emergency
warehouse = dbt_cloud_solana
allow_overlapping_execution = false
schedule = \'USING CRON */20 * * * * UTC\'
as
@ -15,7 +15,7 @@ BEGIN
block_id,
_partition_id
FROM
bronze.block_txs_api AS s
streamline.{{ target.database }}.block_txs_api AS s
WHERE
s.block_id IS NOT NULL
AND s._partition_id > (
@ -52,10 +52,16 @@ END;'
{% endset %}
{% do run_query(sql) %}
{% if target.database == 'SOLANA' %}
{% set sql %}
alter task streamline.bulk_get_block_txs_historical resume;
{% endset %}
{% do run_query(sql) %}
{% endif %}
/* no backfills atm so we can suspend in prod also */
{% set sql %}
alter task streamline.bulk_get_block_txs_historical suspend;
{% endset %}
{% do run_query(sql) %}
-- {% if target.database == 'SOLANA' %}
-- {% set sql %}
-- alter task streamline.bulk_get_block_txs_historical resume;
-- {% endset %}
-- {% do run_query(sql) %}
-- {% endif %}
{% endmacro %}

View File

@ -1,25 +1,55 @@
-- {% macro task_bulk_get_block_txs_real_time() %}
-- {% set sql %}
-- execute immediate 'create or replace task streamline.bulk_get_block_txs_real_time
-- warehouse = dbt_cloud_solana
-- allow_overlapping_execution = false
-- schedule = \'USING CRON */15 * * * * UTC\'
-- as
-- BEGIN
-- select streamline.udf_bulk_get_block_txs(TRUE)
-- where exists (
-- select 1
-- from streamline.all_unknown_block_txs_real_time
-- limit 1
-- );
-- END;'
-- {% endset %}
-- {% do run_query(sql) %}
{% macro task_bulk_get_block_txs_real_time() %}
{% set sql %}
execute immediate 'create or replace task streamline.bulk_get_block_txs_real_time
warehouse = dbt_cloud_solana
allow_overlapping_execution = false
schedule = \'USING CRON */20 * * * * UTC\'
as
BEGIN
call streamline.refresh_external_table_next_batch(\'block_txs_api\',\'complete_block_txs\');
create or replace temporary table streamline.complete_block_txs__dbt_tmp as
(
select *
from (
SELECT
block_id,
_partition_id
FROM
streamline.{{ target.database }}.block_txs_api AS s
WHERE
s.block_id IS NOT NULL
AND s._partition_id > (
select
coalesce(max(_partition_id),0)
from
streamline.complete_block_txs
)
group by 1,2
)
order by (_partition_id)
);
merge into streamline.complete_block_txs as DBT_INTERNAL_DEST
using streamline.complete_block_txs__dbt_tmp as DBT_INTERNAL_SOURCE
on DBT_INTERNAL_SOURCE.block_id = DBT_INTERNAL_DEST.block_id
when matched then
update set _partition_id = DBT_INTERNAL_SOURCE._partition_id
when not matched then
insert ("BLOCK_ID", "_PARTITION_ID")
values ("BLOCK_ID", "_PARTITION_ID");
select streamline.udf_bulk_get_block_txs(TRUE)
where exists (
select 1
from streamline.all_unknown_block_txs_real_time
limit 1
);
END;'
{% endset %}
{% do run_query(sql) %}
-- {% if target.database == 'SOLANA' %}
-- {% set sql %}
-- alter task streamline.bulk_get_block_txs_real_time resume;
-- {% endset %}
-- {% do run_query(sql) %}
-- {% endif %}
-- {% endmacro %}
{% if target.database == 'SOLANA' %}
{% set sql %}
alter task streamline.bulk_get_block_txs_real_time resume;
{% endset %}
{% do run_query(sql) %}
{% endif %}
{% endmacro %}

View File

@ -6,7 +6,7 @@ execute immediate 'create or replace task streamline.bulk_get_blocks_historical
schedule = \'USING CRON */15 * * * * UTC\'
as
BEGIN
alter external table bronze.blocks_api refresh;
alter external table streamline.{{ target.database }}.blocks_api refresh;
create or replace temporary table streamline.complete_blocks__dbt_tmp as
(
select *
@ -18,7 +18,7 @@ BEGIN
FROM
TABLE(
information_schema.external_table_files(
table_name => \'bronze.blocks_api\'
table_name => \'streamline.{{ target.database }}.blocks_api\'
)
) A
WHERE
@ -34,7 +34,7 @@ BEGIN
_inserted_date,
m.registered_on as _inserted_timestamp
FROM
bronze.blocks_api AS s
streamline.{{ target.database }}.blocks_api AS s
JOIN meta m
ON m.file_name = metadata$filename
WHERE
@ -78,10 +78,17 @@ END;'
{% endset %}
{% do run_query(sql) %}
{% if target.database == 'SOLANA' %}
{% set sql %}
alter task streamline.bulk_get_blocks_historical resume;
{% endset %}
{% do run_query(sql) %}
{% endif %}
/* no backfills atm so we can suspend in prod also */
{% set sql %}
alter task streamline.bulk_get_blocks_historical suspend;
{% endset %}
{% do run_query(sql) %}
-- {% if target.database == 'SOLANA' %}
-- {% set sql %}
-- alter task streamline.bulk_get_blocks_historical resume;
-- {% endset %}
-- {% do run_query(sql) %}
-- {% endif %}
{% endmacro %}

View File

@ -1,25 +1,81 @@
-- {% macro task_bulk_get_blocks_real_time() %}
-- {% set sql %}
-- execute immediate 'create or replace task streamline.bulk_get_blocks_real_time
-- warehouse = dbt_cloud_solana
-- allow_overlapping_execution = false
-- schedule = \'USING CRON */15 * * * * UTC\'
-- as
-- BEGIN
-- select streamline.udf_bulk_get_blocks(TRUE)
-- where exists (
-- select 1
-- from streamline.all_unknown_blocks_real_time
-- limit 1
-- );
-- END;'
-- {% endset %}
-- {% do run_query(sql) %}
{% macro task_bulk_get_blocks_real_time() %}
{% set sql %}
execute immediate 'create or replace task streamline.bulk_get_blocks_real_time
warehouse = dbt_cloud_solana
allow_overlapping_execution = false
schedule = \'USING CRON */15 * * * * UTC\'
as
BEGIN
alter external table streamline.{{ target.database }}.blocks_api refresh;
create or replace temporary table streamline.complete_blocks__dbt_tmp as
(
select *
from (
WITH meta AS (
SELECT
registered_on,
file_name
FROM
TABLE(
information_schema.external_table_files(
table_name => \'streamline.{{ target.database }}.blocks_api\'
)
) A
WHERE
registered_on >= (
SELECT
COALESCE(MAX(_INSERTED_TIMESTAMP), \'1970-01-01\' :: DATE) max_INSERTED_TIMESTAMP
FROM
streamline.complete_blocks
)
)
SELECT
block_id,
_inserted_date,
m.registered_on as _inserted_timestamp
FROM
streamline.{{ target.database }}.blocks_api AS s
JOIN meta m
ON m.file_name = metadata$filename
WHERE
s.block_id IS NOT NULL
AND s._inserted_date >= CURRENT_DATE
AND m.registered_on > (
SELECT
coalesce(max(_inserted_timestamp),\'2022-01-01 00:00:00\'::timestamp_ntz)
FROM
streamline.complete_blocks
)
qualify(ROW_NUMBER() over (PARTITION BY block_id
ORDER BY
_inserted_date, _inserted_timestamp DESC)) = 1
)
order by (_inserted_date)
);
merge into streamline.complete_blocks as DBT_INTERNAL_DEST
using streamline.complete_blocks__dbt_tmp as DBT_INTERNAL_SOURCE
on DBT_INTERNAL_SOURCE.block_id = DBT_INTERNAL_DEST.block_id
when matched then
update set
_inserted_date = DBT_INTERNAL_SOURCE._inserted_date,
_inserted_timestamp = DBT_INTERNAL_SOURCE._inserted_timestamp
when not matched then
insert ("BLOCK_ID", "_INSERTED_DATE", "_INSERTED_TIMESTAMP")
values ("BLOCK_ID", "_INSERTED_DATE", "_INSERTED_TIMESTAMP");
select streamline.udf_bulk_get_blocks(TRUE)
where exists (
select 1
from streamline.all_unknown_blocks_real_time
limit 1
);
END;'
{% endset %}
{% do run_query(sql) %}
-- {% if target.database == 'SOLANA' %}
-- {% set sql %}
-- alter task streamline.bulk_get_blocks_real_time resume;
-- {% endset %}
-- {% do run_query(sql) %}
-- {% endif %}
-- {% endmacro %}
{% if target.database == 'SOLANA' %}
{% set sql %}
alter task streamline.bulk_get_blocks_real_time resume;
{% endset %}
{% do run_query(sql) %}
{% endif %}
{% endmacro %}

View File

@ -17,7 +17,7 @@ $$
for row_variable in c1 do
path := row_variable.path;
end for;
refresh_stmt := 'alter external table bronze.' || :external_table_name || ' refresh \'' || :PATH || '\'';
refresh_stmt := 'alter external table streamline.{{ target.database }}.' || :external_table_name || ' refresh \'' || :PATH || '\'';
res := (execute immediate :refresh_stmt);
return 'table refreshed with ' || :refresh_stmt;
end;

View File

@ -15,6 +15,6 @@ SELECT
) as _inserted_timestamp
FROM
{{ source(
'solana_external',
'bronze_streamline',
'blocks_api'
)}}

View File

@ -16,6 +16,6 @@ SELECT
) as _inserted_timestamp
FROM
{{ source(
'solana_external',
'bronze_streamline',
'block_txs_api'
)}}

View File

@ -187,7 +187,7 @@ validators AS (
VALUE :number :: INTEGER AS validator_rank
FROM
{{ source(
'solana_external',
'bronze_streamline',
'validator_metadata_api'
) }}
),

View File

@ -46,7 +46,7 @@ SELECT
FROM
possible_undecoded_instructions p
LEFT OUTER JOIN {{ source(
'solana_external',
'bronze_streamline',
'decoded_instructions_data_api'
) }}
d

View File

@ -23,7 +23,7 @@ validator_keys as (
VALUE :nodePubkey :: STRING AS node_pubkey
FROM
{{ source(
'solana_external',
'bronze_streamline',
'validator_metadata_api'
) }}
)
@ -40,6 +40,6 @@ SELECT
account
FROM
{{ source(
'solana_external',
'bronze_streamline',
'stake_account_tx_ids_api'
) }}

View File

@ -6,7 +6,7 @@ SELECT
signature as tx_id
FROM
{{ source(
'solana_external',
'bronze_streamline',
'stake_account_tx_ids_api'
) }}
EXCEPT
@ -14,6 +14,6 @@ SELECT
tx_id
FROM
{{ source(
'solana_external',
'bronze_streamline',
'txs_api'
) }}

View File

@ -15,7 +15,7 @@ WITH instructs AS (
i.value AS instruction
FROM
{{ source(
'solana_external',
'bronze_streamline',
'txs_api'
) }},
TABLE(
@ -51,7 +51,7 @@ SELECT
C.value AS instruction
FROM
{{ source(
'solana_external',
'bronze_streamline',
'txs_api'
) }} A,
TABLE(FLATTEN (DATA :result :meta :innerInstructions)) b,
@ -102,7 +102,7 @@ tx_base AS (
) AS _inserted_timestamp
FROM
{{ source(
'solana_external',
'bronze_streamline',
'txs_api'
) }}
t

View File

@ -17,7 +17,7 @@ SELECT
) AS _inserted_timestamp
FROM
{{ source(
'solana_external',
'bronze_streamline',
'decoded_instructions_data_api'
) }}

View File

@ -22,127 +22,14 @@ sources:
schema: bronze
tables:
- name: prod_nft_metadata_uploads_1828572827
- name: solana_external
schema: bronze
loader: S3
- name: bronze_streamline
database: streamline
schema: solana
tables:
- name: decoded_instructions_data_api
description: "External table of solana decoded instructions data"
external:
location: "@solana.bronze.analytics_external_tables/{{target.database}}/DECODED_INSTRUCTIONS_DATA_API"
file_format: "( type = json, strip_outer_array = TRUE )"
auto_refresh: true
columns:
- name: tx_id
data_type: string
description: ""
- name: event_index
data_type: number
description: ""
- name: program_id
data_type: string
description: ""
- name: instruction_type
data_type: string
description: ""
- name: data
data_type: variant
description: ""
- name: validator_metadata_api
description: "External table of solana validator metadata"
external:
location: "@solana.bronze.analytics_external_tables/{{target.database}}/VALIDATOR_METADATA_API"
file_format: "( type = json, strip_outer_array = TRUE )"
auto_refresh: true
- name: stake_account_tx_ids_api
description: "External table of solana stake account tx ids"
external:
location: "@solana.bronze.analytics_external_tables/{{target.database}}/STAKE_ACCOUNT_TX_IDS_API"
file_format: "( type = json, strip_outer_array = TRUE )"
auto_refresh: true
columns:
- name: account
data_type: string
- name: slot
data_type: number
- name: blockTime
data_type: number
- name: signature
data_type: string
- name: txs_api
description: "External table of solana stake account tx ids"
external:
location: "@solana.bronze.analytics_external_tables/{{target.database}}/TXS_API"
file_format: "( type = json, strip_outer_array = TRUE )"
auto_refresh: true
partitions:
- name: _inserted_date
data_type: string
expression: substr((split_part(METADATA$FILENAME,'/',3)),16,10)
columns:
- name: metadata
data_type: variant
- name: tx_id
data_type: string
- name: data
data_type: variant
- name: blocks_api
description: "External table of solana stake account tx ids"
external:
location: "@solana.bronze.analytics_external_tables/{{target.database}}/BLOCKS_API"
file_format: "( type = json, strip_outer_array = TRUE )"
auto_refresh: true
partitions:
- name: _inserted_date
data_type: string
expression: substr((split_part(METADATA$FILENAME,'/',3)),16,10)
columns:
- name: metadata
data_type: variant
- name: block_id
data_type: integer
- name: data
data_type: variant
- name: error
data_type: variant
- name: block_rewards_api
description: "External table of solana stake account tx ids"
external:
location: "@solana.bronze.analytics_external_tables/{{target.database}}/BLOCK_REWARDS_API"
file_format: "( type = json, strip_outer_array = TRUE )"
auto_refresh: true
partitions:
- name: _partition_id
data_type: number
expression: to_number(split_part(split_part(METADATA$FILENAME,'/',3),'=',2))
columns:
- name: metadata
data_type: variant
- name: block_id
data_type: integer
- name: data
data_type: variant
- name: error
data_type: variant
- name: block_txs_api
description: "External table of solana stake account tx ids"
external:
location: "@solana.bronze.analytics_external_tables/{{target.database}}/BLOCK_TXS_API"
file_format: "( type = json, strip_outer_array = TRUE )"
auto_refresh: true
partitions:
- name: _partition_id
data_type: number
expression: to_number(split_part(split_part(METADATA$FILENAME,'/',3),'=',2))
columns:
- name: metadata
data_type: variant
- name: block_id
data_type: integer
- name: tx_id
data_type: string
- name: data
data_type: variant
- name: error
data_type: variant
- name: block_txs_api

View File

@ -10,7 +10,7 @@ SELECT
_partition_id
FROM
{{ source(
"solana_external",
"bronze_streamline",
"block_rewards_api"
) }} AS s
WHERE

View File

@ -10,7 +10,7 @@ SELECT
_partition_id
FROM
{{ source(
"solana_external",
"bronze_streamline",
"block_txs_api"
) }} AS s
WHERE

View File

@ -13,7 +13,7 @@ WITH meta AS (
FROM
TABLE(
information_schema.external_table_files(
table_name => '{{ source( "solana_external", "blocks_api") }}'
table_name => '{{ source( "bronze_streamline", "blocks_api") }}'
)
) A
{% if is_incremental() %}
@ -31,7 +31,7 @@ SELECT
m.registered_on as _inserted_timestamp
FROM
{{ source(
"solana_external",
"bronze_streamline",
"blocks_api"
) }} AS s
JOIN meta m