Deprecate legacy block txs pipeline (#735)

* move to backfill folder

* remove legacy udfs and calls

* use streamline 2.0 data after cutover block & partition

* update start block to do checks, add streamline 2.0 data

* fix ambiguous reference

* this test isnt used, it is currently ignored in the test workflow

* get partition from streamline 2.0 upstream when available

* add TODOs

* update partition and block cutoffs
This commit is contained in:
desmond-hui 2024-12-12 15:40:01 -08:00 committed by GitHub
parent cfcb06c56f
commit a3b8cf8458
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
12 changed files with 159 additions and 259 deletions

View File

@ -42,4 +42,4 @@ jobs:
dbt deps
- name: Run DBT Jobs
run: |
dbt test -s "solana_models,./models" "solana_models,tag:test_daily" --exclude tag:test_weekly tag:test_hourly tag:exclude_test_daily "test_silver__transactions_missing_partitions"
dbt test -s "solana_models,./models" "solana_models,tag:test_daily" --exclude tag:test_weekly tag:test_hourly tag:exclude_test_daily

View File

@ -3,7 +3,6 @@
{% set sql %}
{% if target.database != "SOLANA_COMMUNITY_DEV" %}
{{ udf_bulk_get_decoded_instructions_data() }};
{{ udf_bulk_get_block_txs() }};
{{ udf_snapshot_get_stake_accounts() }};
{{ udf_bulk_program_parser() }};
{{ udf_decode_instructions() }};

View File

@ -1,58 +0,0 @@
{% macro task_bulk_get_block_txs_historical() %}
{% set sql %}
execute immediate 'create or replace task streamline.bulk_get_block_txs_historical
warehouse = dbt_cloud
allow_overlapping_execution = false
schedule = \'USING CRON */20 * * * * UTC\'
as
BEGIN
create or replace temporary table streamline.complete_block_txs__dbt_tmp as
(
select *
from (
SELECT
block_id,
_partition_id
FROM
streamline.{{ target.database }}.block_txs_api AS s
WHERE
s.block_id IS NOT NULL
AND s._partition_id > (
select
coalesce(max(_partition_id),0)
from
streamline.complete_block_txs
)
group by 1,2
)
order by (_partition_id)
);
merge into streamline.complete_block_txs as DBT_INTERNAL_DEST
using streamline.complete_block_txs__dbt_tmp as DBT_INTERNAL_SOURCE
on DBT_INTERNAL_SOURCE.block_id = DBT_INTERNAL_DEST.block_id
when matched then
update set _partition_id = DBT_INTERNAL_SOURCE._partition_id
when not matched then
insert ("BLOCK_ID", "_PARTITION_ID")
values ("BLOCK_ID", "_PARTITION_ID");
select streamline.udf_bulk_get_block_txs(FALSE)
where exists (
select 1
from streamline.all_unknown_block_txs_historical
limit 1
);
select streamline.udf_bulk_get_block_txs(TRUE)
where exists (
select 1
from streamline.all_unknown_block_txs_real_time
limit 1
);
END;'
{% endset %}
{% do run_query(sql) %}
{% set sql %}
alter task streamline.bulk_get_block_txs_historical suspend;
{% endset %}
{% do run_query(sql) %}
{% endmacro %}

View File

@ -1,58 +0,0 @@
{% macro task_bulk_get_block_txs_real_time() %}
{% set sql %}
execute immediate 'create or replace task streamline.bulk_get_block_txs_real_time
warehouse = dbt_cloud
allow_overlapping_execution = false
schedule = \'USING CRON */20 * * * * UTC\'
as
BEGIN
call streamline.refresh_external_table_next_batch(\'block_txs_api\',\'complete_block_txs\');
create or replace temporary table streamline.complete_block_txs__dbt_tmp as
(
select *
from (
SELECT
block_id,
error,
_partition_id
FROM
streamline.{{ target.database }}.block_txs_api AS s
WHERE
s.block_id IS NOT NULL
AND s._partition_id > (
select
coalesce(max(_partition_id),0)
from
streamline.complete_block_txs
)
group by 1,2,3
)
order by (_partition_id)
);
merge into streamline.complete_block_txs as DBT_INTERNAL_DEST
using streamline.complete_block_txs__dbt_tmp as DBT_INTERNAL_SOURCE
on DBT_INTERNAL_SOURCE.block_id = DBT_INTERNAL_DEST.block_id
when matched then
update
set _partition_id = DBT_INTERNAL_SOURCE._partition_id,
error = DBT_INTERNAL_SOURCE.error
when not matched then
insert ("BLOCK_ID", "ERROR", "_PARTITION_ID")
values ("BLOCK_ID", "ERROR", "_PARTITION_ID");
select streamline.udf_bulk_get_block_txs(TRUE)
where exists (
select 1
from streamline.all_unknown_block_txs_real_time
limit 1
);
END;'
{% endset %}
{% do run_query(sql) %}
{% if target.database == 'SOLANA' %}
{% set sql %}
alter task streamline.bulk_get_block_txs_real_time resume;
{% endset %}
{% do run_query(sql) %}
{% endif %}
{% endmacro %}

View File

@ -1,8 +0,0 @@
{% macro udf_bulk_get_block_txs() %}
CREATE
OR REPLACE EXTERNAL FUNCTION streamline.udf_bulk_get_block_txs(is_real_time boolean) returns text api_integration = aws_solana_api_dev AS {% if target.database == 'SOLANA' -%}
'https://pj4rqb8z96.execute-api.us-east-1.amazonaws.com/prod/bulk_get_block_txs'
{% else %}
'https://11zlwk4fm3.execute-api.us-east-1.amazonaws.com/dev/bulk_get_block_txs'
{%- endif %}
{% endmacro %}

View File

@ -5,6 +5,9 @@
tags = ['observability']
) }}
{% set cutover_block_id = 307103862 %}
{% set cutover_partition_id = 150215 %}
WITH summary_stats AS (
SELECT
@ -133,12 +136,25 @@ broken_blocks AS (
m.block_id
FROM
potential_missing_txs m
LEFT OUTER JOIN {{ ref('streamline__complete_block_txs') }}
cmp
ON m.block_id = cmp.block_id
LEFT OUTER JOIN
{{ ref('streamline__complete_block_txs') }} cmp
ON m.block_id = cmp.block_id
WHERE
cmp.error IS NOT NULL
OR cmp.block_id IS NULL
(cmp.error IS NOT NULL
OR cmp.block_id IS NULL)
AND cmp.block_id < {{ cutover_block_id }}
UNION ALL
SELECT
m.block_id
FROM
potential_missing_txs m
LEFT OUTER JOIN
{{ ref('streamline__complete_block_txs_2') }} cmp2
ON m.block_id = cmp2.block_id
WHERE
(cmp2.block_id IS NULL)
AND cmp2._partition_id >= {{ cutover_partition_id }}
AND cmp2.block_id >= {{ cutover_block_id }}
),
impacted_blocks AS (
SELECT

View File

@ -1,3 +1,5 @@
-- depends_on: {{ ref('streamline__complete_block_txs_2') }}
{{ config(
materialized = 'incremental',
unique_key = "tx_id",
@ -9,6 +11,9 @@
tags = ['scheduled_core']
) }}
{% set cutover_block_id = 307103862 %}
{% set cutover_partition_id = 150215 %}
WITH pre_final AS (
SELECT
@ -37,13 +42,12 @@ WITH pre_final AS (
t._partition_id,
t._inserted_timestamp
FROM
{{ ref('bronze__transactions2') }}
t
LEFT OUTER JOIN {{ ref('silver__blocks') }}
b
{{ ref('bronze__transactions2') }} AS t
LEFT OUTER JOIN {{ ref('silver__blocks') }} AS b
ON b.block_id = t.block_id
WHERE
tx_id IS NOT NULL
t.block_id < {{cutover_block_id}}
AND tx_id IS NOT NULL
AND (
COALESCE(t.data :transaction :message :instructions [0] :programId :: STRING,'') <> 'Vote111111111111111111111111111111111111111'
OR
@ -51,32 +55,60 @@ WITH pre_final AS (
array_size(t.data :transaction :message :instructions) > 1
)
)
{% if is_incremental() %}
AND _partition_id >= (
SELECT
MAX(_partition_id) -1
FROM
{{ this }}
)
AND _partition_id <= (
SELECT
MAX(_partition_id)
FROM
{{ source('solana_streamline','complete_block_txs') }}
)
AND t._inserted_timestamp > (
SELECT
MAX(_inserted_timestamp)
FROM
{{ this }}
)
{% else %}
AND _partition_id IN (
1,
2
)
{% endif %}
{% if is_incremental() %}
AND _partition_id >= (SELECT max(_partition_id)-1 FROM {{ this }})
AND _partition_id <= (SELECT max(_partition_id) FROM {{ source('solana_streamline','complete_block_txs') }})
AND t._inserted_timestamp > (SELECT max(_inserted_timestamp) FROM {{ this }})
{% else %}
AND _partition_id IN (1,2)
{% endif %}
AND _partition_id < {{cutover_partition_id}}
UNION ALL
SELECT
to_timestamp_ntz(t.value:"result.blockTime"::int) AS block_timestamp,
t.block_id,
t.data:transaction:signatures[0]::string AS tx_id,
t.data :transaction :message :recentBlockhash :: STRING AS recent_block_hash,
t.data :meta :fee :: NUMBER AS fee,
CASE
WHEN IS_NULL_VALUE(
t.data :meta :err
) THEN TRUE
ELSE FALSE
END AS succeeded,
t.data :transaction :message :accountKeys :: ARRAY AS account_keys,
t.data :meta :preBalances :: ARRAY AS pre_balances,
t.data :meta :postBalances :: ARRAY AS post_balances,
t.data :meta :preTokenBalances :: ARRAY AS pre_token_balances,
t.data :meta :postTokenBalances :: ARRAY AS post_token_balances,
t.data :transaction :message :instructions :: ARRAY AS instructions,
t.data :meta :innerInstructions :: ARRAY AS inner_instructions,
t.data :meta :logMessages :: ARRAY AS log_messages,
t.data:transaction:message:addressTableLookups::array as address_table_lookups,
t.data :meta :computeUnitsConsumed :: NUMBER as compute_units_consumed,
t.data :version :: STRING as version,
t._partition_id,
t._inserted_timestamp
FROM
{{ ref('bronze__streamline_block_txs_2') }} AS t
WHERE
t.block_id >= {{ cutover_block_id }}
AND tx_id IS NOT NULL
AND (
COALESCE(t.data :transaction :message :instructions [0] :programId :: STRING,'') <> 'Vote111111111111111111111111111111111111111'
OR
(
array_size(t.data :transaction :message :instructions) > 1
)
)
{% if is_incremental() %}
AND t._partition_id >= (SELECT max(_partition_id)-1 FROM {{ this }})
AND t._partition_id <= (SELECT max(_partition_id) FROM {{ ref('streamline__complete_block_txs_2') }})
AND t._inserted_timestamp > (SELECT max(_inserted_timestamp) FROM {{ this }})
{% else %}
AND t._partition_id < 0 /* keep this here, if we ever do a full refresh this should select no data from streamline 2.0 data */
{% endif %}
AND t._partition_id >= {{ cutover_partition_id }}
),
{% if is_incremental() %}
prev_null_block_timestamp_txs AS (

View File

@ -8,6 +8,9 @@
tags = ['scheduled_non_core']
) }}
{% set cutover_block_id = 307103862 %}
{% set cutover_partition_id = 150215 %}
WITH pre_final AS (
SELECT
COALESCE(TO_TIMESTAMP_NTZ(t.value :block_time), b.block_timestamp) AS block_timestamp,
@ -33,34 +36,57 @@ WITH pre_final AS (
LEFT OUTER JOIN
{{ ref('silver__blocks') }} b on b.block_id = t.block_id
WHERE
tx_id is not null
AND
COALESCE(
t.block_id < {{cutover_block_id}}
AND tx_id is not null
AND coalesce(
t.data :transaction :message :instructions [0] :programId :: STRING,
''
) = 'Vote111111111111111111111111111111111111111'
{% if is_incremental() %}
AND
_partition_id >= (
select max(_partition_id)-1
from {{this}}
)
AND
_partition_id <= (
SELECT
MAX(_partition_id)
FROM
{{ source('solana_streamline','complete_block_txs') }}
)
AND
t._inserted_timestamp > (
select max(_inserted_timestamp)
from {{this}}
)
{% else %}
AND
_partition_id in (1,2)
{% endif %}
{% if is_incremental() %}
AND _partition_id >= (select max(_partition_id)-1 from {{this}})
AND _partition_id <= (SELECT MAX(_partition_id) FROM {{ source('solana_streamline','complete_block_txs') }})
AND t._inserted_timestamp > (select max(_inserted_timestamp) from {{this}})
{% else %}
AND _partition_id in (1,2)
{% endif %}
AND _partition_id < {{cutover_partition_id}}
UNION ALL
SELECT
to_timestamp_ntz(t.value:"result.blockTime"::int) AS block_timestamp,
t.block_id,
t.data:transaction:signatures[0]::string AS tx_id,
t.data :transaction :message :recentBlockhash :: STRING AS recent_block_hash,
t.data :meta :fee :: NUMBER AS fee,
CASE
WHEN IS_NULL_VALUE(
t.data :meta :err
) THEN TRUE
ELSE FALSE
END AS succeeded,
t.data :transaction :message :accountKeys :: ARRAY AS account_keys,
t.data :transaction :message :instructions [0] :parsed :info :voteAccount :: STRING AS vote_account,
t.data :transaction :message :instructions [0] :parsed :info :voteAuthority :: STRING AS vote_authority,
t.data :transaction :message :instructions [0] :parsed :info :vote :hash :: STRING AS vote_hash,
t.data :transaction :message :instructions [0] :parsed :info :vote :slots :: ARRAY AS vote_slots,
t._partition_id,
t._inserted_timestamp
FROM
{{ ref('bronze__streamline_block_txs_2') }} AS t
WHERE
t.block_id >= {{ cutover_block_id }}
AND tx_id IS NOT NULL
AND coalesce(
t.data :transaction :message :instructions [0] :programId :: STRING,
''
) = 'Vote111111111111111111111111111111111111111'
{% if is_incremental() %}
AND t._partition_id >= (SELECT max(_partition_id)-1 FROM {{ this }})
AND t._partition_id <= (SELECT max(_partition_id) FROM {{ ref('streamline__complete_block_txs_2') }})
AND t._inserted_timestamp > (SELECT max(_inserted_timestamp) FROM {{ this }})
{% else %}
AND t._partition_id < 0 /* keep this here, if we ever do a full refresh this should select no data from streamline 2.0 data */
{% endif %}
AND t._partition_id >= {{ cutover_partition_id }}
)
{% if is_incremental() %}
, prev_null_block_timestamp_txs as (

View File

@ -45,7 +45,7 @@ SELECT
e.transaction_count AS ect,
A.transaction_count AS act,
e.transaction_count - A.transaction_count AS delta,
coalesce(c._partition_id, 0) AS _partition_id
coalesce(c._partition_id, c2._partition_id, 0) AS _partition_id
FROM
solscan_counts e
LEFT OUTER JOIN
@ -54,6 +54,9 @@ LEFT OUTER JOIN
LEFT OUTER JOIN
streamline.complete_block_txs c
ON e.block_id = c.block_id
LEFT OUTER JOIN
streamline.complete_block_txs_2 c2
ON e.block_id = c2.block_id
WHERE
ect <> 0
AND (

View File

@ -1,65 +0,0 @@
WITH max_part_id_tmp AS (
SELECT
MAX(_partition_id) AS _partition_id
FROM
{% if target.database == 'SOLANA' %}
solana.silver.votes
{% else %}
solana_dev.silver.votes
{% endif %}
UNION
SELECT
MAX(_partition_id)
FROM
{% if target.database == 'SOLANA' %}
solana.silver.transactions
{% else %}
solana_dev.silver.transactions
{% endif %}
),
base AS (
SELECT
DISTINCT _partition_id
FROM
{% if target.database == 'SOLANA' %}
solana.streamline.complete_block_txs
{% else %}
solana_dev.streamline.complete_block_txs
{% endif %}
WHERE
_partition_id <= (
SELECT
MAX(_partition_id)
FROM
max_part_id_tmp
)
),
base_txs AS (
SELECT
DISTINCT _partition_id
FROM
{{ ref('silver__transactions') }}
UNION
SELECT
DISTINCT _partition_id
FROM
{% if target.database == 'SOLANA' %}
solana.silver.votes
{% else %}
solana_dev.silver.votes
{% endif %}
)
SELECT
b._partition_id
FROM
base b
LEFT OUTER JOIN base_txs t
ON b._partition_id = t._partition_id
WHERE
t._partition_id IS NULL
AND b._partition_id <> 1877 -- seems like this whole partition is skipped slots
AND b._partition_id > 95526 /* some old partitions never got loaded into silver,
the data has made it into silver through other partitions
and confirmed via other checks.
We can start checking for new instances after this partition
and consider everything before reconciled. */

View File

@ -1,3 +1,7 @@
{% set cutover_block_id = 307103862 %}
{% set cutover_partition_id = 150215 %}
{% set start_block_id = 306000000 %}
WITH base_blocks AS (
SELECT
*
@ -8,7 +12,7 @@ WITH base_blocks AS (
solana_dev.silver.blocks
{% endif %}
WHERE
block_id >= 154195836 -- this query wont give correct results prior to this block_id
block_id >= {{ start_block_id }}
AND _inserted_date < CURRENT_DATE
),
base_txs AS (
@ -21,7 +25,7 @@ base_txs AS (
solana_dev.silver.transactions
{% endif %}
WHERE
block_id >= 154195836
block_id >= {{ start_block_id }}
UNION
SELECT
DISTINCT block_id
@ -32,7 +36,7 @@ base_txs AS (
solana_dev.silver.votes
{% endif %}
WHERE
block_id >= 154195836
block_id >= {{ start_block_id }}
),
potential_missing_txs AS (
SELECT
@ -45,11 +49,20 @@ potential_missing_txs AS (
base_txs.block_id IS NULL
)
SELECT
m.block_id
m.block_id
FROM
potential_missing_txs m
LEFT OUTER JOIN {{ ref('streamline__complete_block_txs') }} cmp
ON m.block_id = cmp.block_id
LEFT OUTER JOIN {{ ref('streamline__complete_block_txs_2') }} cmp2
ON m.block_id = cmp2.block_id
WHERE
cmp.error IS NOT NULL
OR cmp.block_id IS NULL
(
m.block_id < {{ cutover_block_id }} -- cutover block id from silver__transactions
AND (cmp.error IS NOT NULL OR cmp.block_id IS NULL)
)
OR (
m.block_id >= {{ cutover_block_id }}
AND cmp2._partition_id >= {{ cutover_partition_id }}
AND (cmp2.block_id IS NULL)
)