Migrate solana decoder to streamline 2 (#711)

* udf to call decoder on streamline 2.0

* bronze instruction decoder models

* add more v2 apis

* add bronze models for streamline 2.0 decoded logs

* add streamline models for 2.0 decoded instructions & logs

* run streamline 2.0 real time decoding pipelines in parallel
This commit is contained in:
desmond-hui 2024-11-25 08:37:24 -08:00 committed by GitHub
parent 469db641b1
commit 935ca3c728
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
19 changed files with 276 additions and 5 deletions

View File

@ -41,4 +41,5 @@ jobs:
dbt deps
- name: Run DBT Jobs
run: |
dbt run --vars '{"STREAMLINE_INVOKE_STREAMS": True}' -s models/streamline/decode_instructions/streamline__decode_instructions_2_realtime.sql
dbt run --vars '{"STREAMLINE_INVOKE_STREAMS": True}' -s models/streamline/decode_instructions/streamline__decode_instructions_2_realtime.sql
dbt run --vars '{"STREAMLINE_INVOKE_STREAMS": True}' -s models/streamline/decode_instructions/streamline__decode_instructions_3_realtime.sql

View File

@ -53,7 +53,7 @@ jobs:
- name: Run DBT Jobs
run: |
dbt run -s models/streamline/decode_instructions/streamline__complete_decoded_instructions_2.sql
dbt run -s streamline__complete_decoded_instructions_2 streamline__complete_decoded_instructions_3
- name: Run Real Time Pyth Every 15 minutes
run: |

View File

@ -41,4 +41,5 @@ jobs:
dbt deps
- name: Run DBT Jobs
run: |
dbt run --vars '{"STREAMLINE_INVOKE_STREAMS": True}' -s models/streamline/decode_instructions/streamline__decode_instructions_2_pyth_realtime.sql
dbt run --vars '{"STREAMLINE_INVOKE_STREAMS": True}' -s models/streamline/decode_instructions/streamline__decode_instructions_2_pyth_realtime.sql
dbt run --vars '{"STREAMLINE_INVOKE_STREAMS": True}' -s models/streamline/decode_instructions/streamline__decode_instructions_3_pyth_realtime.sql

View File

@ -41,4 +41,5 @@ jobs:
dbt deps
- name: Run DBT Jobs
run: |
dbt run --vars '{"STREAMLINE_INVOKE_STREAMS": True}' -s models/streamline/decode_logs/streamline__decode_logs_realtime.sql
dbt run --vars '{"STREAMLINE_INVOKE_STREAMS": True}' -s models/streamline/decode_logs/streamline__decode_logs_realtime.sql
dbt run --vars '{"STREAMLINE_INVOKE_STREAMS": True}' -s models/streamline/decode_logs/streamline__decode_logs_2_realtime.sql

View File

@ -53,7 +53,7 @@ jobs:
- name: Run DBT Jobs
run: |
dbt run -s models/streamline/decode_logs/streamline__complete_decoded_logs.sql
dbt run -s streamline__complete_decoded_logs streamline__complete_decoded_logs_2
- name: Run Real Time Core on minutes 0 and 30 every hour
run: |

View File

@ -10,8 +10,11 @@
{{ udf_bulk_parse_compressed_nft_mints() }};
{{ udf_bulk_get_solscan_blocks() }};
{{ create_udf_bulk_instructions_decoder() }};
{{ create_udf_bulk_instructions_decoder_v2() }};
{{ create_udf_verify_idl() }};
{{ create_udf_verify_idl_v2() }};
{{ create_udf_decode_compressed_mint_change_logs() }};
{{ create_udf_decode_compressed_mint_change_logs_v2() }};
{{ create_udf_bulk_rest_api_v2() }};
{% endif %}

View File

@ -45,4 +45,51 @@
AS
'https://vmax5o4p1a.execute-api.us-east-1.amazonaws.com/stg/udf_bulk_rest_api'
{%- endif %}
{% endmacro %}
{% macro create_udf_bulk_instructions_decoder_v2() %}
CREATE
OR REPLACE EXTERNAL FUNCTION streamline.udf_bulk_instructions_decoder_v2(
json variant
) returns text api_integration =
{% if target.database == 'SOLANA' -%}
AWS_SOLANA_API_PROD_V2
AS
'https://eurlntbb7k.execute-api.us-east-1.amazonaws.com/prod/udf_bulk_instructions_decoder'
{% else %}
AWS_SOLANA_API_STG_V2
AS
'https://vmax5o4p1a.execute-api.us-east-1.amazonaws.com/stg/udf_bulk_instructions_decoder'
{%- endif %}
{% endmacro %}
{% macro create_udf_verify_idl_v2() %}
CREATE
OR REPLACE EXTERNAL FUNCTION streamline.udf_verify_idl_v2("JSON" ARRAY) returns VARIANT
api_integration =
{% if target.database == 'SOLANA' -%}
AWS_SOLANA_API_PROD_V2
AS
'https://eurlntbb7k.execute-api.us-east-1.amazonaws.com/prod/verify_idl'
{% else %}
AWS_SOLANA_API_STG_V2
AS
'https://vmax5o4p1a.execute-api.us-east-1.amazonaws.com/stg/verify_idl'
{%- endif %}
{% endmacro %}
{% macro create_udf_decode_compressed_mint_change_logs_v2() %}
CREATE
OR REPLACE EXTERNAL FUNCTION streamline.udf_decode_compressed_mint_change_logs_v2("JSON" ARRAY)
returns VARIANT
api_integration =
{% if target.database == 'SOLANA' -%}
AWS_SOLANA_API_PROD_V2
AS
'https://eurlntbb7k.execute-api.us-east-1.amazonaws.com/prod/udf_decode_compressed_mint_change_logs'
{% else %}
AWS_SOLANA_API_STG_V2
AS
'https://vmax5o4p1a.execute-api.us-east-1.amazonaws.com/stg/udf_decode_compressed_mint_change_logs'
{%- endif %}
{% endmacro %}

View File

@ -0,0 +1,12 @@
{{ config (
materialized = 'view'
) }}
{% set model = "decoded_instructions_3" %}
{{ streamline_external_table_FR_query(
model,
partition_function = "to_timestamp_ntz(concat(split_part(file_name, '/', 3),'-',split_part(file_name, '/', 4),'-',split_part(file_name, '/', 5),' ',split_part(file_name, '/', 6),':00:00.000'))",
partition_name = "_partition_by_created_date_hour",
unique_key = "block_id",
other_cols = "tx_id,index,inner_index,program_id,_partition_by_block_id"
) }}

View File

@ -0,0 +1,12 @@
{{ config (
materialized = 'view'
) }}
{% set model = "decoded_logs_2" %}
{{ streamline_external_table_FR_query(
model,
partition_function = "to_timestamp_ntz(concat(split_part(file_name, '/', 3),'-',split_part(file_name, '/', 4),'-',split_part(file_name, '/', 5),' ',split_part(file_name, '/', 6),':00:00.000'))",
partition_name = "_partition_by_created_date_hour",
unique_key = "block_id",
other_cols = "tx_id,index,inner_index,log_index,program_id,_partition_by_block_id"
) }}

View File

@ -0,0 +1,12 @@
{{ config (
materialized = 'view'
) }}
{% set model = "decoded_instructions_3" %}
{{ streamline_external_table_query(
model,
partition_function = "to_timestamp_ntz(concat(split_part(file_name, '/', 3),'-',split_part(file_name, '/', 4),'-',split_part(file_name, '/', 5),' ',split_part(file_name, '/', 6),':00:00.000'))",
partition_name = "_partition_by_created_date_hour",
unique_key = "block_id",
other_cols = "tx_id,index,inner_index,program_id,_partition_by_block_id"
) }}

View File

@ -0,0 +1,12 @@
{{ config (
materialized = 'view'
) }}
{% set model = "decoded_logs_2" %}
{{ streamline_external_table_query(
model,
partition_function = "to_timestamp_ntz(concat(split_part(file_name, '/', 3),'-',split_part(file_name, '/', 4),'-',split_part(file_name, '/', 5),' ',split_part(file_name, '/', 6),':00:00.000'))",
partition_name = "_partition_by_created_date_hour",
unique_key = "block_id",
other_cols = "tx_id,index,inner_index,log_index,program_id,_partition_by_block_id"
) }}

View File

@ -65,6 +65,8 @@ sources:
- name: validators_list_2
- name: solscan_blocks_2
- name: solscan_token_list
- name: decoded_instructions_3
- name: decoded_logs_2
- name: bronze_api
schema: bronze_api
tables:

View File

@ -0,0 +1,46 @@
-- depends_on: {{ ref('bronze__streamline_decoded_instructions_3') }}
{{ config (
materialized = "incremental",
incremental_predicates = ['DBT_INTERNAL_DEST._inserted_timestamp::date >= LEAST(current_date-2,(select min(_inserted_timestamp)::date from ' ~ generate_tmp_view_name(this) ~ '))'],
unique_key = "complete_decoded_instructions_3_id",
cluster_by = ["ROUND(block_id, -3)","program_id"],
post_hook = enable_search_optimization('{{this.schema}}','{{this.identifier}}','ON EQUALITY(complete_decoded_instructions_3_id)'),
tags = ['streamline_decoder'],
) }}
/* run incremental timestamp value first then use it as a static value */
{% if execute %}
{% if is_incremental() %}
{% set query %}
SELECT
COALESCE(MAX(_inserted_timestamp),'2000-01-01'::timestamp_ntz) _inserted_timestamp
FROM
{{ this }}
{% endset %}
{% set max_inserted_timestamp = run_query(query).columns[0].values()[0] %}
{% endif %}
{% endif %}
SELECT
block_id,
tx_id,
index,
inner_index,
program_id,
{{ dbt_utils.generate_surrogate_key(['block_id','tx_id','index','inner_index','program_id']) }} as complete_decoded_instructions_3_id,
_inserted_timestamp
FROM
{% if is_incremental() %}
{{ ref('bronze__streamline_decoded_instructions_3') }}
WHERE
_inserted_timestamp >= dateadd('minute', -5, '{{ max_inserted_timestamp }}')
AND
_partition_by_created_date_hour >= dateadd('hour', -1, date_trunc('hour','{{ max_inserted_timestamp }}'::timestamp_ntz))
{% else %}
{{ ref('bronze__streamline_FR_decoded_instructions_3') }}
{% endif %}
qualify(ROW_NUMBER() over (PARTITION BY complete_decoded_instructions_3_id
ORDER BY
_inserted_timestamp DESC)) = 1

View File

@ -0,0 +1,10 @@
{{ config (
materialized = 'incremental',
unique_key = 'table_name',
full_refresh = false,
tags = ['streamline_decoder'],
) }}
select
'placeholder'::string as schema_name,
'placeholder'::string as table_name

View File

@ -0,0 +1,18 @@
{{ config (
materialized = "view",
post_hook = if_data_call_function(
func = "{{this.schema}}.udf_bulk_instructions_decoder_v2(object_construct('sql_source', '{{this.identifier}}', 'external_table', 'decoded_instructions_2', 'sql_limit', {{var('sql_limit','10000000')}}, 'producer_batch_size', {{var('producer_batch_size','10000000')}}, 'worker_batch_size', {{var('worker_batch_size','100000')}}, 'batch_call_limit', {{var('batch_call_limit','1000')}}, 'call_type', 'RT_PYTH'))",
target = "{{this.schema}}.{{this.identifier}}"
),
tags = ['streamline_decoder']
) }}
/*
while we are running in parallel, can just select from the existing table
once we are done, we can move the existing code into this table
and it should be mostly the same except for the completed table references
*/
SELECT
*
FROM
{{ ref('streamline__decode_instructions_2_pyth_realtime') }}

View File

@ -0,0 +1,18 @@
{{ config (
materialized = "table",
post_hook = if_data_call_function(
func = "{{this.schema}}.udf_bulk_instructions_decoder_v2(object_construct('sql_source', '{{this.identifier}}', 'external_table', 'decoded_instructions_3', 'sql_limit', {{var('sql_limit','5000000')}}, 'producer_batch_size', {{var('producer_batch_size','2000000')}}, 'worker_batch_size', {{var('worker_batch_size','100000')}}, 'batch_call_limit', {{var('batch_call_limit','1000')}}, 'call_type', 'RT'))",
target = "{{this.schema}}.{{this.identifier}}"
),
tags = ['streamline_decoder']
) }}
/*
while we are running in parallel, can just select from the existing table
once we are done, we can move the existing code into this table
and it should be mostly the same except for the completed table references
*/
SELECT
*
FROM
{{ ref('streamline__decode_instructions_2_realtime') }}

View File

@ -0,0 +1,47 @@
-- depends_on: {{ ref('bronze__streamline_decoded_logs_2') }}
{{ config (
materialized = "incremental",
incremental_predicates = ['DBT_INTERNAL_DEST._inserted_timestamp::date >= LEAST(current_date-2,(select min(_inserted_timestamp)::date from ' ~ generate_tmp_view_name(this) ~ '))'],
unique_key = "complete_decoded_logs_2_id",
cluster_by = ["ROUND(block_id, -3)","program_id"],
post_hook = enable_search_optimization('{{this.schema}}','{{this.identifier}}','ON EQUALITY(complete_decoded_logs_2_id)'),
tags = ['streamline_decoder_logs'],
) }}
/* run incremental timestamp value first then use it as a static value */
{% if execute %}
{% if is_incremental() %}
{% set query %}
SELECT
COALESCE(MAX(_inserted_timestamp),'2000-01-01'::timestamp_ntz) _inserted_timestamp
FROM
{{ this }}
{% endset %}
{% set max_inserted_timestamp = run_query(query).columns[0].values()[0] %}
{% endif %}
{% endif %}
SELECT
block_id,
tx_id,
index,
inner_index,
log_index,
program_id,
{{ dbt_utils.generate_surrogate_key(['block_id','tx_id','index','inner_index','log_index','program_id']) }} as complete_decoded_logs_2_id,
_inserted_timestamp
FROM
{% if is_incremental() %}
{{ ref('bronze__streamline_decoded_logs_2') }}
WHERE
_inserted_timestamp >= dateadd('minute', -5, '{{ max_inserted_timestamp }}')
AND
_partition_by_created_date_hour >= dateadd('hour', -1, date_trunc('hour','{{ max_inserted_timestamp }}'::timestamp_ntz))
{% else %}
{{ ref('bronze__streamline_FR_decoded_logs_2') }}
{% endif %}
qualify(ROW_NUMBER() over (PARTITION BY complete_decoded_logs_2_id
ORDER BY
_inserted_timestamp DESC)) = 1

View File

@ -0,0 +1,10 @@
{{ config (
materialized = 'incremental',
unique_key = 'table_name',
full_refresh = false,
tags = ['streamline_decoder_logs'],
) }}
select
'placeholder'::string as schema_name,
'placeholder'::string as table_name

View File

@ -0,0 +1,19 @@
-- depends_on: {{ ref('silver__blocks') }}
{{ config (
materialized = "table",
post_hook = if_data_call_function(
func = "{{this.schema}}.udf_bulk_instructions_decoder_v2(object_construct('sql_source', '{{this.identifier}}', 'external_table', 'decoded_logs_2', 'sql_limit', {{var('sql_limit','5000000')}}, 'producer_batch_size', {{var('producer_batch_size','2000000')}}, 'worker_batch_size', {{var('worker_batch_size','100000')}}, 'batch_call_limit', {{var('batch_call_limit','1000')}}, 'call_type', 'logs'))",
target = "{{this.schema}}.{{this.identifier}}"
),
tags = ['streamline_decoder_logs']
) }}
/*
while we are running in parallel, can just select from the existing table
once we are done, we can move the existing code into this table
and it should be mostly the same except for the completed table references
*/
SELECT
*
FROM
{{ ref('streamline__decode_logs_realtime') }}