diff --git a/.github/workflows/dbt_run_decode_instructions.yml b/.github/workflows/dbt_run_decode_instructions.yml index 8b3f1da8..9e3cd289 100644 --- a/.github/workflows/dbt_run_decode_instructions.yml +++ b/.github/workflows/dbt_run_decode_instructions.yml @@ -41,4 +41,5 @@ jobs: dbt deps - name: Run DBT Jobs run: | - dbt run --vars '{"STREAMLINE_INVOKE_STREAMS": True}' -s models/streamline/decode_instructions/streamline__decode_instructions_2_realtime.sql \ No newline at end of file + dbt run --vars '{"STREAMLINE_INVOKE_STREAMS": True}' -s models/streamline/decode_instructions/streamline__decode_instructions_2_realtime.sql + dbt run --vars '{"STREAMLINE_INVOKE_STREAMS": True}' -s models/streamline/decode_instructions/streamline__decode_instructions_3_realtime.sql \ No newline at end of file diff --git a/.github/workflows/dbt_run_decode_instructions_orchestrator.yml b/.github/workflows/dbt_run_decode_instructions_orchestrator.yml index e1ed4c29..1e474a07 100644 --- a/.github/workflows/dbt_run_decode_instructions_orchestrator.yml +++ b/.github/workflows/dbt_run_decode_instructions_orchestrator.yml @@ -53,7 +53,7 @@ jobs: - name: Run DBT Jobs run: | - dbt run -s models/streamline/decode_instructions/streamline__complete_decoded_instructions_2.sql + dbt run -s streamline__complete_decoded_instructions_2 streamline__complete_decoded_instructions_3 - name: Run Real Time Pyth Every 15 minutes run: | diff --git a/.github/workflows/dbt_run_decode_instructions_pyth.yml b/.github/workflows/dbt_run_decode_instructions_pyth.yml index c0446eb0..cb968715 100644 --- a/.github/workflows/dbt_run_decode_instructions_pyth.yml +++ b/.github/workflows/dbt_run_decode_instructions_pyth.yml @@ -41,4 +41,5 @@ jobs: dbt deps - name: Run DBT Jobs run: | - dbt run --vars '{"STREAMLINE_INVOKE_STREAMS": True}' -s models/streamline/decode_instructions/streamline__decode_instructions_2_pyth_realtime.sql \ No newline at end of file + dbt run --vars '{"STREAMLINE_INVOKE_STREAMS": True}' -s models/streamline/decode_instructions/streamline__decode_instructions_2_pyth_realtime.sql + dbt run --vars '{"STREAMLINE_INVOKE_STREAMS": True}' -s models/streamline/decode_instructions/streamline__decode_instructions_3_pyth_realtime.sql diff --git a/.github/workflows/dbt_run_decode_logs.yml b/.github/workflows/dbt_run_decode_logs.yml index aa5b0757..92cad48a 100644 --- a/.github/workflows/dbt_run_decode_logs.yml +++ b/.github/workflows/dbt_run_decode_logs.yml @@ -41,4 +41,5 @@ jobs: dbt deps - name: Run DBT Jobs run: | - dbt run --vars '{"STREAMLINE_INVOKE_STREAMS": True}' -s models/streamline/decode_logs/streamline__decode_logs_realtime.sql \ No newline at end of file + dbt run --vars '{"STREAMLINE_INVOKE_STREAMS": True}' -s models/streamline/decode_logs/streamline__decode_logs_realtime.sql + dbt run --vars '{"STREAMLINE_INVOKE_STREAMS": True}' -s models/streamline/decode_logs/streamline__decode_logs_2_realtime.sql diff --git a/.github/workflows/dbt_run_decode_logs_orchestrator.yml b/.github/workflows/dbt_run_decode_logs_orchestrator.yml index eb8049e2..355028a9 100644 --- a/.github/workflows/dbt_run_decode_logs_orchestrator.yml +++ b/.github/workflows/dbt_run_decode_logs_orchestrator.yml @@ -53,7 +53,7 @@ jobs: - name: Run DBT Jobs run: | - dbt run -s models/streamline/decode_logs/streamline__complete_decoded_logs.sql + dbt run -s streamline__complete_decoded_logs streamline__complete_decoded_logs_2 - name: Run Real Time Core on minutes 0 and 30 every hour run: | diff --git a/macros/create_udfs.sql b/macros/create_udfs.sql index 8cdc98b3..b843ffdf 100644 --- a/macros/create_udfs.sql +++ b/macros/create_udfs.sql @@ -10,8 +10,11 @@ {{ udf_bulk_parse_compressed_nft_mints() }}; {{ udf_bulk_get_solscan_blocks() }}; {{ create_udf_bulk_instructions_decoder() }}; + {{ create_udf_bulk_instructions_decoder_v2() }}; {{ create_udf_verify_idl() }}; + {{ create_udf_verify_idl_v2() }}; {{ create_udf_decode_compressed_mint_change_logs() }}; + {{ create_udf_decode_compressed_mint_change_logs_v2() }}; {{ create_udf_bulk_rest_api_v2() }}; {% endif %} diff --git a/macros/streamline/streamline_udfs.sql b/macros/streamline/streamline_udfs.sql index 3a86f88a..093b323a 100644 --- a/macros/streamline/streamline_udfs.sql +++ b/macros/streamline/streamline_udfs.sql @@ -45,4 +45,51 @@ AS 'https://vmax5o4p1a.execute-api.us-east-1.amazonaws.com/stg/udf_bulk_rest_api' {%- endif %} +{% endmacro %} + +{% macro create_udf_bulk_instructions_decoder_v2() %} + CREATE + OR REPLACE EXTERNAL FUNCTION streamline.udf_bulk_instructions_decoder_v2( + json variant + ) returns text api_integration = + {% if target.database == 'SOLANA' -%} + AWS_SOLANA_API_PROD_V2 + AS + 'https://eurlntbb7k.execute-api.us-east-1.amazonaws.com/prod/udf_bulk_instructions_decoder' + {% else %} + AWS_SOLANA_API_STG_V2 + AS + 'https://vmax5o4p1a.execute-api.us-east-1.amazonaws.com/stg/udf_bulk_instructions_decoder' + {%- endif %} +{% endmacro %} + +{% macro create_udf_verify_idl_v2() %} + CREATE + OR REPLACE EXTERNAL FUNCTION streamline.udf_verify_idl_v2("JSON" ARRAY) returns VARIANT + api_integration = + {% if target.database == 'SOLANA' -%} + AWS_SOLANA_API_PROD_V2 + AS + 'https://eurlntbb7k.execute-api.us-east-1.amazonaws.com/prod/verify_idl' + {% else %} + AWS_SOLANA_API_STG_V2 + AS + 'https://vmax5o4p1a.execute-api.us-east-1.amazonaws.com/stg/verify_idl' + {%- endif %} +{% endmacro %} + +{% macro create_udf_decode_compressed_mint_change_logs_v2() %} + CREATE + OR REPLACE EXTERNAL FUNCTION streamline.udf_decode_compressed_mint_change_logs_v2("JSON" ARRAY) + returns VARIANT + api_integration = + {% if target.database == 'SOLANA' -%} + AWS_SOLANA_API_PROD_V2 + AS + 'https://eurlntbb7k.execute-api.us-east-1.amazonaws.com/prod/udf_decode_compressed_mint_change_logs' + {% else %} + AWS_SOLANA_API_STG_V2 + AS + 'https://vmax5o4p1a.execute-api.us-east-1.amazonaws.com/stg/udf_decode_compressed_mint_change_logs' + {%- endif %} {% endmacro %} \ No newline at end of file diff --git a/models/bronze/streamline/bronze__streamline_FR_decoded_instructions_3.sql b/models/bronze/streamline/bronze__streamline_FR_decoded_instructions_3.sql new file mode 100644 index 00000000..2c72cf02 --- /dev/null +++ b/models/bronze/streamline/bronze__streamline_FR_decoded_instructions_3.sql @@ -0,0 +1,12 @@ +{{ config ( + materialized = 'view' +) }} + +{% set model = "decoded_instructions_3" %} +{{ streamline_external_table_FR_query( + model, + partition_function = "to_timestamp_ntz(concat(split_part(file_name, '/', 3),'-',split_part(file_name, '/', 4),'-',split_part(file_name, '/', 5),' ',split_part(file_name, '/', 6),':00:00.000'))", + partition_name = "_partition_by_created_date_hour", + unique_key = "block_id", + other_cols = "tx_id,index,inner_index,program_id,_partition_by_block_id" +) }} diff --git a/models/bronze/streamline/bronze__streamline_FR_decoded_logs_2.sql b/models/bronze/streamline/bronze__streamline_FR_decoded_logs_2.sql new file mode 100644 index 00000000..1757ad7a --- /dev/null +++ b/models/bronze/streamline/bronze__streamline_FR_decoded_logs_2.sql @@ -0,0 +1,12 @@ +{{ config ( + materialized = 'view' +) }} + +{% set model = "decoded_logs_2" %} +{{ streamline_external_table_FR_query( + model, + partition_function = "to_timestamp_ntz(concat(split_part(file_name, '/', 3),'-',split_part(file_name, '/', 4),'-',split_part(file_name, '/', 5),' ',split_part(file_name, '/', 6),':00:00.000'))", + partition_name = "_partition_by_created_date_hour", + unique_key = "block_id", + other_cols = "tx_id,index,inner_index,log_index,program_id,_partition_by_block_id" +) }} diff --git a/models/bronze/streamline/bronze__streamline_decoded_instructions_3.sql b/models/bronze/streamline/bronze__streamline_decoded_instructions_3.sql new file mode 100644 index 00000000..41ed8b67 --- /dev/null +++ b/models/bronze/streamline/bronze__streamline_decoded_instructions_3.sql @@ -0,0 +1,12 @@ +{{ config ( + materialized = 'view' +) }} + +{% set model = "decoded_instructions_3" %} +{{ streamline_external_table_query( + model, + partition_function = "to_timestamp_ntz(concat(split_part(file_name, '/', 3),'-',split_part(file_name, '/', 4),'-',split_part(file_name, '/', 5),' ',split_part(file_name, '/', 6),':00:00.000'))", + partition_name = "_partition_by_created_date_hour", + unique_key = "block_id", + other_cols = "tx_id,index,inner_index,program_id,_partition_by_block_id" +) }} diff --git a/models/bronze/streamline/bronze__streamline_decoded_logs_2.sql b/models/bronze/streamline/bronze__streamline_decoded_logs_2.sql new file mode 100644 index 00000000..62a8b557 --- /dev/null +++ b/models/bronze/streamline/bronze__streamline_decoded_logs_2.sql @@ -0,0 +1,12 @@ +{{ config ( + materialized = 'view' +) }} + +{% set model = "decoded_logs_2" %} +{{ streamline_external_table_query( + model, + partition_function = "to_timestamp_ntz(concat(split_part(file_name, '/', 3),'-',split_part(file_name, '/', 4),'-',split_part(file_name, '/', 5),' ',split_part(file_name, '/', 6),':00:00.000'))", + partition_name = "_partition_by_created_date_hour", + unique_key = "block_id", + other_cols = "tx_id,index,inner_index,log_index,program_id,_partition_by_block_id" +) }} diff --git a/models/sources.yml b/models/sources.yml index 64ab4554..95c00fd0 100644 --- a/models/sources.yml +++ b/models/sources.yml @@ -65,6 +65,8 @@ sources: - name: validators_list_2 - name: solscan_blocks_2 - name: solscan_token_list + - name: decoded_instructions_3 + - name: decoded_logs_2 - name: bronze_api schema: bronze_api tables: diff --git a/models/streamline/decode_instructions/streamline__complete_decoded_instructions_3.sql b/models/streamline/decode_instructions/streamline__complete_decoded_instructions_3.sql new file mode 100644 index 00000000..c3806265 --- /dev/null +++ b/models/streamline/decode_instructions/streamline__complete_decoded_instructions_3.sql @@ -0,0 +1,46 @@ +-- depends_on: {{ ref('bronze__streamline_decoded_instructions_3') }} +{{ config ( + materialized = "incremental", + incremental_predicates = ['DBT_INTERNAL_DEST._inserted_timestamp::date >= LEAST(current_date-2,(select min(_inserted_timestamp)::date from ' ~ generate_tmp_view_name(this) ~ '))'], + unique_key = "complete_decoded_instructions_3_id", + cluster_by = ["ROUND(block_id, -3)","program_id"], + post_hook = enable_search_optimization('{{this.schema}}','{{this.identifier}}','ON EQUALITY(complete_decoded_instructions_3_id)'), + tags = ['streamline_decoder'], +) }} + +/* run incremental timestamp value first then use it as a static value */ +{% if execute %} + {% if is_incremental() %} + {% set query %} + SELECT + COALESCE(MAX(_inserted_timestamp),'2000-01-01'::timestamp_ntz) _inserted_timestamp + FROM + {{ this }} + {% endset %} + + {% set max_inserted_timestamp = run_query(query).columns[0].values()[0] %} + {% endif %} +{% endif %} + +SELECT + block_id, + tx_id, + index, + inner_index, + program_id, + {{ dbt_utils.generate_surrogate_key(['block_id','tx_id','index','inner_index','program_id']) }} as complete_decoded_instructions_3_id, + _inserted_timestamp +FROM + +{% if is_incremental() %} +{{ ref('bronze__streamline_decoded_instructions_3') }} +WHERE + _inserted_timestamp >= dateadd('minute', -5, '{{ max_inserted_timestamp }}') +AND + _partition_by_created_date_hour >= dateadd('hour', -1, date_trunc('hour','{{ max_inserted_timestamp }}'::timestamp_ntz)) +{% else %} + {{ ref('bronze__streamline_FR_decoded_instructions_3') }} +{% endif %} +qualify(ROW_NUMBER() over (PARTITION BY complete_decoded_instructions_3_id +ORDER BY + _inserted_timestamp DESC)) = 1 \ No newline at end of file diff --git a/models/streamline/decode_instructions/streamline__complete_decoded_instructions_3_backfill.sql b/models/streamline/decode_instructions/streamline__complete_decoded_instructions_3_backfill.sql new file mode 100644 index 00000000..8a2a5ed1 --- /dev/null +++ b/models/streamline/decode_instructions/streamline__complete_decoded_instructions_3_backfill.sql @@ -0,0 +1,10 @@ +{{ config ( + materialized = 'incremental', + unique_key = 'table_name', + full_refresh = false, + tags = ['streamline_decoder'], +) }} + +select + 'placeholder'::string as schema_name, + 'placeholder'::string as table_name \ No newline at end of file diff --git a/models/streamline/decode_instructions/streamline__decode_instructions_3_pyth_realtime.sql b/models/streamline/decode_instructions/streamline__decode_instructions_3_pyth_realtime.sql new file mode 100644 index 00000000..d22add79 --- /dev/null +++ b/models/streamline/decode_instructions/streamline__decode_instructions_3_pyth_realtime.sql @@ -0,0 +1,18 @@ +{{ config ( + materialized = "view", + post_hook = if_data_call_function( + func = "{{this.schema}}.udf_bulk_instructions_decoder_v2(object_construct('sql_source', '{{this.identifier}}', 'external_table', 'decoded_instructions_2', 'sql_limit', {{var('sql_limit','10000000')}}, 'producer_batch_size', {{var('producer_batch_size','10000000')}}, 'worker_batch_size', {{var('worker_batch_size','100000')}}, 'batch_call_limit', {{var('batch_call_limit','1000')}}, 'call_type', 'RT_PYTH'))", + target = "{{this.schema}}.{{this.identifier}}" + ), + tags = ['streamline_decoder'] +) }} + +/* +while we are running in parallel, can just select from the existing table +once we are done, we can move the existing code into this table +and it should be mostly the same except for the completed table references +*/ +SELECT + * +FROM + {{ ref('streamline__decode_instructions_2_pyth_realtime') }} \ No newline at end of file diff --git a/models/streamline/decode_instructions/streamline__decode_instructions_3_realtime.sql b/models/streamline/decode_instructions/streamline__decode_instructions_3_realtime.sql new file mode 100644 index 00000000..61675dfe --- /dev/null +++ b/models/streamline/decode_instructions/streamline__decode_instructions_3_realtime.sql @@ -0,0 +1,18 @@ +{{ config ( + materialized = "table", + post_hook = if_data_call_function( + func = "{{this.schema}}.udf_bulk_instructions_decoder_v2(object_construct('sql_source', '{{this.identifier}}', 'external_table', 'decoded_instructions_3', 'sql_limit', {{var('sql_limit','5000000')}}, 'producer_batch_size', {{var('producer_batch_size','2000000')}}, 'worker_batch_size', {{var('worker_batch_size','100000')}}, 'batch_call_limit', {{var('batch_call_limit','1000')}}, 'call_type', 'RT'))", + target = "{{this.schema}}.{{this.identifier}}" + ), + tags = ['streamline_decoder'] +) }} + +/* +while we are running in parallel, can just select from the existing table +once we are done, we can move the existing code into this table +and it should be mostly the same except for the completed table references +*/ +SELECT + * +FROM + {{ ref('streamline__decode_instructions_2_realtime') }} diff --git a/models/streamline/decode_logs/streamline__complete_decoded_logs_2.sql b/models/streamline/decode_logs/streamline__complete_decoded_logs_2.sql new file mode 100644 index 00000000..0a8f96bc --- /dev/null +++ b/models/streamline/decode_logs/streamline__complete_decoded_logs_2.sql @@ -0,0 +1,47 @@ +-- depends_on: {{ ref('bronze__streamline_decoded_logs_2') }} +{{ config ( + materialized = "incremental", + incremental_predicates = ['DBT_INTERNAL_DEST._inserted_timestamp::date >= LEAST(current_date-2,(select min(_inserted_timestamp)::date from ' ~ generate_tmp_view_name(this) ~ '))'], + unique_key = "complete_decoded_logs_2_id", + cluster_by = ["ROUND(block_id, -3)","program_id"], + post_hook = enable_search_optimization('{{this.schema}}','{{this.identifier}}','ON EQUALITY(complete_decoded_logs_2_id)'), + tags = ['streamline_decoder_logs'], +) }} + +/* run incremental timestamp value first then use it as a static value */ +{% if execute %} + {% if is_incremental() %} + {% set query %} + SELECT + COALESCE(MAX(_inserted_timestamp),'2000-01-01'::timestamp_ntz) _inserted_timestamp + FROM + {{ this }} + {% endset %} + + {% set max_inserted_timestamp = run_query(query).columns[0].values()[0] %} + {% endif %} +{% endif %} + +SELECT + block_id, + tx_id, + index, + inner_index, + log_index, + program_id, + {{ dbt_utils.generate_surrogate_key(['block_id','tx_id','index','inner_index','log_index','program_id']) }} as complete_decoded_logs_2_id, + _inserted_timestamp +FROM + +{% if is_incremental() %} +{{ ref('bronze__streamline_decoded_logs_2') }} +WHERE + _inserted_timestamp >= dateadd('minute', -5, '{{ max_inserted_timestamp }}') +AND + _partition_by_created_date_hour >= dateadd('hour', -1, date_trunc('hour','{{ max_inserted_timestamp }}'::timestamp_ntz)) +{% else %} + {{ ref('bronze__streamline_FR_decoded_logs_2') }} +{% endif %} +qualify(ROW_NUMBER() over (PARTITION BY complete_decoded_logs_2_id +ORDER BY + _inserted_timestamp DESC)) = 1 \ No newline at end of file diff --git a/models/streamline/decode_logs/streamline__complete_decoded_logs_2_backfill.sql b/models/streamline/decode_logs/streamline__complete_decoded_logs_2_backfill.sql new file mode 100644 index 00000000..cb3fc389 --- /dev/null +++ b/models/streamline/decode_logs/streamline__complete_decoded_logs_2_backfill.sql @@ -0,0 +1,10 @@ +{{ config ( + materialized = 'incremental', + unique_key = 'table_name', + full_refresh = false, + tags = ['streamline_decoder_logs'], +) }} + +select + 'placeholder'::string as schema_name, + 'placeholder'::string as table_name \ No newline at end of file diff --git a/models/streamline/decode_logs/streamline__decode_logs_2_realtime.sql b/models/streamline/decode_logs/streamline__decode_logs_2_realtime.sql new file mode 100644 index 00000000..2a3b25a3 --- /dev/null +++ b/models/streamline/decode_logs/streamline__decode_logs_2_realtime.sql @@ -0,0 +1,19 @@ +-- depends_on: {{ ref('silver__blocks') }} +{{ config ( + materialized = "table", + post_hook = if_data_call_function( + func = "{{this.schema}}.udf_bulk_instructions_decoder_v2(object_construct('sql_source', '{{this.identifier}}', 'external_table', 'decoded_logs_2', 'sql_limit', {{var('sql_limit','5000000')}}, 'producer_batch_size', {{var('producer_batch_size','2000000')}}, 'worker_batch_size', {{var('worker_batch_size','100000')}}, 'batch_call_limit', {{var('batch_call_limit','1000')}}, 'call_type', 'logs'))", + target = "{{this.schema}}.{{this.identifier}}" + ), + tags = ['streamline_decoder_logs'] +) }} + +/* +while we are running in parallel, can just select from the existing table +once we are done, we can move the existing code into this table +and it should be mostly the same except for the completed table references +*/ +SELECT + * +FROM + {{ ref('streamline__decode_logs_realtime') }} \ No newline at end of file