From 5edc2fcf22a198addced681d5bd684455cfc04dd Mon Sep 17 00:00:00 2001 From: desmond-hui <97470747+desmond-hui@users.noreply.github.com> Date: Wed, 3 Jan 2024 09:24:43 -0800 Subject: [PATCH] An 4323/initial streamline 2.0 setup (#428) * wip * change udf name, fix surrogate key for inner instruct calls * (wip) decoded instructions backfill helpers * complete backfill helpers, clean up views with 0 requests * rename * revert * temp change will revert * rename * use streamline 2.0 verify idl api endpoint * this is placeholder model, will replace existing when ready * reorg files * add workflow for decode instructions real time * use pip cache * update with prod endpoints * update sql limit to real value, should be union all --- .../workflows/dbt_run_decode_instructions.yml | 48 +++++++ macros/create_udfs.sql | 2 + .../decoded_instructions_backfill_helpers.sql | 125 ++++++++++++++++++ macros/streamline/streamline_udfs.sql | 19 +++ ...__streamline_FR_decoded_instructions_2.sql | 12 ++ ...nze__streamline_decoded_instructions_2.sql | 12 ++ .../idls/silver__verified_user_idls.sql | 16 ++- .../parser/silver__decoded_instructions_2.sql | 61 +++++++++ models/sources.yml | 3 +- ...mline__complete_decoded_instructions_2.sql | 34 +++++ ...mplete_decoded_instructions_2_backfill.sql | 10 ++ ...amline__decode_instructions_2_realtime.sql | 89 +++++++++++++ 12 files changed, 426 insertions(+), 5 deletions(-) create mode 100644 .github/workflows/dbt_run_decode_instructions.yml create mode 100644 macros/helpers/decoded_instructions_backfill_helpers.sql create mode 100644 macros/streamline/streamline_udfs.sql create mode 100644 models/bronze/streamline/bronze__streamline_FR_decoded_instructions_2.sql create mode 100644 models/bronze/streamline/bronze__streamline_decoded_instructions_2.sql create mode 100644 models/silver/parser/silver__decoded_instructions_2.sql create mode 100644 models/streamline/decode_instructions/streamline__complete_decoded_instructions_2.sql create mode 100644 models/streamline/decode_instructions/streamline__complete_decoded_instructions_2_backfill.sql create mode 100644 models/streamline/decode_instructions/streamline__decode_instructions_2_realtime.sql diff --git a/.github/workflows/dbt_run_decode_instructions.yml b/.github/workflows/dbt_run_decode_instructions.yml new file mode 100644 index 00000000..b5c3bfeb --- /dev/null +++ b/.github/workflows/dbt_run_decode_instructions.yml @@ -0,0 +1,48 @@ +name: dbt_run_decode_instructions +run-name: dbt_run_decode_instructions + +on: + workflow_dispatch: + branches: + - "main" +# schedule: +# # Runs every 30 mins (see https://crontab.guru) +# - cron: '*/30 * * * *' + +env: + DBT_PROFILES_DIR: "${{ vars.DBT_PROFILES_DIR }}" + + ACCOUNT: "${{ vars.ACCOUNT }}" + ROLE: "${{ vars.ROLE }}" + USER: "${{ vars.USER }}" + PASSWORD: "${{ secrets.PASSWORD }}" + REGION: "${{ vars.REGION }}" + DATABASE: "${{ vars.DATABASE }}" + WAREHOUSE: "${{ vars.WAREHOUSE }}" + SCHEMA: "${{ vars.SCHEMA }}" + +concurrency: + group: ${{ github.workflow }} + +jobs: + run_dbt_jobs: + runs-on: ubuntu-latest + environment: + name: workflow_prod + + steps: + - uses: actions/checkout@v3 + + - uses: actions/setup-python@v4 + with: + python-version: "${{ vars.PYTHON_VERSION }}" + cache: "pip" + + - name: install dependencies + run: | + pip3 install dbt-snowflake==${{ vars.DBT_VERSION }} cli_passthrough requests click + dbt deps + - name: Run DBT Jobs + run: | + dbt run -s models/streamline/decode_instructions/streamline__complete_decoded_instructions_2.sql + dbt run --vars '{"STREAMLINE_INVOKE_STREAMS": True}' -s models/streamline/decode_instructions/streamline__decode_instructions_2_realtime.sql \ No newline at end of file diff --git a/macros/create_udfs.sql b/macros/create_udfs.sql index f8507f03..8b51828a 100644 --- a/macros/create_udfs.sql +++ b/macros/create_udfs.sql @@ -15,6 +15,8 @@ {{ udf_decode_instructions() }}; {{ udf_bulk_parse_compressed_nft_mints() }}; {{ udf_bulk_get_solscan_blocks() }}; + {{ create_udf_bulk_instructions_decoder() }}; + {{ create_udf_verify_idl() }}; {% endif %} {{ create_udf_ordered_signers( diff --git a/macros/helpers/decoded_instructions_backfill_helpers.sql b/macros/helpers/decoded_instructions_backfill_helpers.sql new file mode 100644 index 00000000..a4874921 --- /dev/null +++ b/macros/helpers/decoded_instructions_backfill_helpers.sql @@ -0,0 +1,125 @@ +{% macro decoded_instructions_backfill_generate_views(program_id) %} + {% set result_cols = run_query("""select + first_block_id, + default_backfill_start_block_id + from solana.streamline.idls_history + where program_id = '""" ~ program_id ~ """';""").columns %} + {% set min_block_id = result_cols[0].values()[0] | int %} + {% set max_block_id = result_cols[1].values()[0] | int %} + {% set step = 1000000 %} + + {% for i in range(min_block_id, max_block_id, step) %} + {% if i == min_block_id %} + {% set start_block = i %} + {% else %} + {% set start_block = i+1 %} + {% endif %} + + {% if i+step >= max_block_id %} + {% set end_block = max_block_id %} + {% else %} + {% set end_block = i+step %} + {% endif %} + + {% set query = """create or replace view streamline.decoded_instructions_backfill_""" ~ start_block ~ """_""" ~ end_block ~ """_""" ~ program_id ~ """ AS + with completed_subset AS ( + SELECT + block_id, + program_id, + complete_decoded_instructions_2_id as id + FROM + """ ~ ref('streamline__complete_decoded_instructions_2') ~ """ + WHERE + program_id = '""" ~ program_id ~ """' + AND + block_id between """ ~ start_block ~ """ and """ ~ end_block ~ """ + ), + event_subset as ( + select + e.block_id, + e.tx_id, + e.index, + NULL as inner_index, + e.instruction, + e.program_id, + e.block_timestamp, + """ ~ dbt_utils.generate_surrogate_key(['e.block_id','e.tx_id','e.index','inner_index','e.program_id']) ~ """ as id + from solana.silver.events e + where program_id = '""" ~ program_id ~ """' + and block_id between """ ~ start_block ~ """ and """ ~ end_block ~ """ + and succeeded + union + select + e.block_id, + e.tx_id, + e.index, + i.index as inner_index, + i.value as instruction, + i.value :programId :: STRING AS inner_program_id, + e.block_timestamp, + """ ~ dbt_utils.generate_surrogate_key(['e.block_id','e.tx_id','e.index','inner_index','inner_program_id']) ~ """ as id + from solana.silver.events e, + table(flatten(inner_instruction:instructions)) i + where array_contains(program_id::variant, inner_instruction_program_ids) + and inner_program_id = '""" ~ program_id ~ """' + and e.block_id between """ ~ start_block ~ """ and """ ~ end_block ~ """ + and e.succeeded + ) + select + e.* + from event_subset e + left outer join completed_subset c on c.program_id = e.program_id and c.block_id = e.block_id and c.id = e.id + where c.block_id is null""" %} + + {% do run_query(query) %} + {% endfor %} +{% endmacro %} + +{% macro decoded_instructions_backill_cleanup_views() %} + {% set results = run_query("""select + table_schema, + table_name + from information_schema.views + where table_name like 'DECODED_INSTRUCTIONS_BACKFILL_%' + order by 2 desc + limit 20;""").columns %} + + {% set schema_names = results[0].values() %} + {% set table_names = results[1].values() %} + {% for table_name in table_names %} + {% set has_requests = run_query("""select 1 from """ ~ schema_names[0] ~ """.""" ~ table_name ~ """ limit 1""").columns[0].values()[0] %} + {% if not has_requests %} + {% do run_query("""drop view """ ~ schema_names[0] ~ """.""" ~ table_name) %} + {% do run_query("""insert into """ ~ ref('streamline__complete_decoded_instructions_2_backfill') ~ """ values('""" ~ schema_names[0] ~ """','""" ~ table_name ~ """')""") %} + {% endif %} + {% endfor %} +{% endmacro %} + +{% macro decoded_instructions_backfill_calls() %} + {% set sql_limit = 2500000 %} + {% set producer_batch_size = 1000000 %} + {% set worker_batch_size = 50000 %} + {% set batch_call_limit = 1000 %} + + {% set results = run_query("""select + table_schema, + table_name + from information_schema.views + where table_name like 'DECODED_INSTRUCTIONS_BACKFILL_%' + except + select + schema_name, + table_name + from """ ~ ref('streamline__complete_decoded_instructions_2_backfill') ~ """ + order by 2 desc + limit 10;""").columns %} + {% set schema_names = results[0].values() %} + {% set table_names = results[1].values() %} + {% for table_name in table_names %} + {% set udf_call = if_data_call_function( + func = schema_names[0] ~ ".udf_bulk_instructions_decoder(object_construct('sql_source', '" ~ table_name ~ "', 'external_table', 'decoded_instructions_2', 'sql_limit', '" ~ sql_limit ~ "', 'producer_batch_size', '" ~ producer_batch_size ~ "', 'worker_batch_size', '" ~ worker_batch_size ~ "', 'batch_call_limit', '" ~ batch_call_limit ~ "', 'call_type', 'backfill'))", + target = schema_names[0] ~ "." ~ table_name) %} + + {% do run_query(udf_call) %} + {% endfor %} +{% endmacro %} \ No newline at end of file diff --git a/macros/streamline/streamline_udfs.sql b/macros/streamline/streamline_udfs.sql new file mode 100644 index 00000000..f08e7590 --- /dev/null +++ b/macros/streamline/streamline_udfs.sql @@ -0,0 +1,19 @@ +{% macro create_udf_bulk_instructions_decoder() %} + CREATE + OR REPLACE EXTERNAL FUNCTION streamline.udf_bulk_instructions_decoder( + json variant + ) returns text api_integration = aws_solana_api_dev AS {% if target.database == 'SOLANA' -%} + 'https://l426aqju0g.execute-api.us-east-1.amazonaws.com/prod/udf_bulk_instructions_decoder' + {% else %} + 'https://7938mznoq8.execute-api.us-east-1.amazonaws.com/dev/udf_bulk_instructions_decoder' + {%- endif %} +{% endmacro %} + +{% macro create_udf_verify_idl() %} + CREATE + OR REPLACE EXTERNAL FUNCTION streamline.udf_verify_idl("JSON" ARRAY) returns VARIANT api_integration = aws_solana_api_dev AS {% if target.database == 'SOLANA' -%} + 'https://l426aqju0g.execute-api.us-east-1.amazonaws.com/prod/verify_idl' + {% else %} + 'https://7938mznoq8.execute-api.us-east-1.amazonaws.com/dev/verify_idl' + {%- endif %} +{% endmacro %} \ No newline at end of file diff --git a/models/bronze/streamline/bronze__streamline_FR_decoded_instructions_2.sql b/models/bronze/streamline/bronze__streamline_FR_decoded_instructions_2.sql new file mode 100644 index 00000000..9928bec7 --- /dev/null +++ b/models/bronze/streamline/bronze__streamline_FR_decoded_instructions_2.sql @@ -0,0 +1,12 @@ +{{ config ( + materialized = 'view' +) }} + +{% set model = "decoded_instructions_2" %} +{{ streamline_external_table_FR_query( + model, + partition_function = "to_date(concat_ws('-', split_part(file_name, '/', 3),split_part(file_name, '/', 4), split_part(file_name, '/', 5)))", + partition_name = "_partition_by_created_date", + unique_key = "block_id", + other_cols = "tx_id,index,inner_index,program_id,_partition_by_block_id" +) }} diff --git a/models/bronze/streamline/bronze__streamline_decoded_instructions_2.sql b/models/bronze/streamline/bronze__streamline_decoded_instructions_2.sql new file mode 100644 index 00000000..dc7f8aa9 --- /dev/null +++ b/models/bronze/streamline/bronze__streamline_decoded_instructions_2.sql @@ -0,0 +1,12 @@ +{{ config ( + materialized = 'view' +) }} + +{% set model = "decoded_instructions_2" %} +{{ streamline_external_table_query( + model, + partition_function = "to_date(concat_ws('-', split_part(file_name, '/', 3),split_part(file_name, '/', 4), split_part(file_name, '/', 5)))", + partition_name = "_partition_by_created_date", + unique_key = "block_id", + other_cols = "tx_id,index,inner_index,program_id,_partition_by_block_id" +) }} diff --git a/models/silver/idls/silver__verified_user_idls.sql b/models/silver/idls/silver__verified_user_idls.sql index 213fe9b6..fddccd3d 100644 --- a/models/silver/idls/silver__verified_user_idls.sql +++ b/models/silver/idls/silver__verified_user_idls.sql @@ -47,10 +47,18 @@ LIMIT ), program_requests AS ( SELECT e.program_id, - ARRAY_CONSTRUCT( + OBJECT_CONSTRUCT( + 'tx_id', + e.tx_id, + 'block_id', + e.block_id, + 'index', e.index, + 'program_id', e.program_id, + 'instruction', e.instruction, + 'is_verify', TRUE ) AS request FROM @@ -75,7 +83,7 @@ groupings AS ( responses AS ( SELECT program_id, - streamline.udf_decode_instructions(requests) AS response + streamline.udf_verify_idl(requests) AS response FROM groupings ), @@ -83,14 +91,14 @@ results as ( select program_id, response :status_code :: INTEGER as status_code, - try_parse_json(response:body):data::array as decoded_instructions + try_parse_json(response:body)::array as decoded_instructions from responses ), expanded as ( select r.program_id, r.status_code, - iff(coalesce(d.value:error::string,'') = '' or status_code <> 200,false,true) is_error + iff(coalesce(d.value:error::string,'') = '' and coalesce(d.value:data:error::string,'') = '' and status_code = 200,false,true) is_error from results r, table(flatten(decoded_instructions)) d ), diff --git a/models/silver/parser/silver__decoded_instructions_2.sql b/models/silver/parser/silver__decoded_instructions_2.sql new file mode 100644 index 00000000..63464bd1 --- /dev/null +++ b/models/silver/parser/silver__decoded_instructions_2.sql @@ -0,0 +1,61 @@ +-- depends_on: {{ ref('bronze__streamline_decoded_instructions_2') }} +-- depends_on: {{ ref('bronze__streamline_FR_decoded_instructions_2') }} +{{ config( + materialized = 'incremental', + unique_key = ["tx_id", "index", "inner_index" ], + cluster_by = ['block_timestamp::DATE','_inserted_timestamp::DATE','program_id'], + post_hook = enable_search_optimization('{{this.schema}}','{{this.identifier}}'), + merge_exclude_columns = ["inserted_timestamp"], + tags = ['scheduled_non_core'], + enabled=false, +) }} + +SELECT + b.block_timestamp, + A.block_id, + A.tx_id, + COALESCE( + A.index, + VALUE :data :data [0] [0], + VALUE :data [0] [0] + ) :: INT AS INDEX, + A.inner_index, + A.program_id, + COALESCE( + A.value :data :data [0] [1], + A.value :data [1], + A.value :data + ) AS decoded_instruction, + A._inserted_timestamp, + {{ dbt_utils.generate_surrogate_key( + ['A.tx_id', 'A.index', 'A.inner_index'] + ) }} AS decoded_instructions_id, + SYSDATE() AS inserted_timestamp, + SYSDATE() AS modified_timestamp, + '{{ invocation_id }}' AS _invocation_id +FROM + +{% if is_incremental() %} +{{ ref('bronze__streamline_decoded_instructions_2') }} A +{% else %} + {{ ref('bronze__streamline_FR_decoded_instructions_2') }} A +{% endif %} +JOIN {{ ref('silver__blocks') }} +b +ON A.block_id = b.block_id + +{% if is_incremental() %} +WHERE + A._inserted_timestamp >= ( + SELECT + MAX( + _inserted_timestamp + ) _inserted_timestamp + FROM + {{ this }} + ) +{% endif %} + +qualify(ROW_NUMBER() over (PARTITION BY tx_id, INDEX, inner_index +ORDER BY + A._inserted_timestamp DESC)) = 1 diff --git a/models/sources.yml b/models/sources.yml index a69627fd..8b12d88e 100644 --- a/models/sources.yml +++ b/models/sources.yml @@ -42,7 +42,7 @@ sources: - name: prod_nft_metadata_uploads_1828572827 - name: bronze_streamline database: streamline - schema: solana + schema: "{{ 'solana' if target.database == 'SOLANA' else 'solana_dev' }}" tables: - name: decode_instructions_idls - name: decoded_instructions_data_api @@ -57,6 +57,7 @@ sources: - name: stake_program_accounts - name: validator_vote_program_accounts - name: program_parser + - name: decoded_instructions_2 - name: bronze_api schema: bronze_api tables: diff --git a/models/streamline/decode_instructions/streamline__complete_decoded_instructions_2.sql b/models/streamline/decode_instructions/streamline__complete_decoded_instructions_2.sql new file mode 100644 index 00000000..16fe6be0 --- /dev/null +++ b/models/streamline/decode_instructions/streamline__complete_decoded_instructions_2.sql @@ -0,0 +1,34 @@ +-- depends_on: {{ ref('bronze__streamline_decoded_instructions_2') }} +{{ config ( + materialized = "incremental", + unique_key = "complete_decoded_instructions_2_id", + cluster_by = ["ROUND(block_id, -3)","program_id"], + post_hook = enable_search_optimization('{{this.schema}}','{{this.identifier}}','ON EQUALITY(complete_decoded_instructions_2_id)'), + tags = ['streamline_decoder'], +) }} + +SELECT + block_id, + tx_id, + index, + inner_index, + program_id, + {{ dbt_utils.generate_surrogate_key(['block_id','tx_id','index','inner_index','program_id']) }} as complete_decoded_instructions_2_id, + _inserted_timestamp +FROM + +{% if is_incremental() %} +{{ ref('bronze__streamline_decoded_instructions_2') }} +WHERE + _inserted_timestamp >= ( + SELECT + COALESCE(MAX(_inserted_timestamp),'2000-01-01'::timestamp_ntz) _inserted_timestamp + FROM + {{ this }} + ) +{% else %} + {{ ref('bronze__streamline_FR_decoded_instructions_2') }} +{% endif %} +qualify(ROW_NUMBER() over (PARTITION BY complete_decoded_instructions_2_id +ORDER BY + _inserted_timestamp DESC)) = 1 \ No newline at end of file diff --git a/models/streamline/decode_instructions/streamline__complete_decoded_instructions_2_backfill.sql b/models/streamline/decode_instructions/streamline__complete_decoded_instructions_2_backfill.sql new file mode 100644 index 00000000..8a2a5ed1 --- /dev/null +++ b/models/streamline/decode_instructions/streamline__complete_decoded_instructions_2_backfill.sql @@ -0,0 +1,10 @@ +{{ config ( + materialized = 'incremental', + unique_key = 'table_name', + full_refresh = false, + tags = ['streamline_decoder'], +) }} + +select + 'placeholder'::string as schema_name, + 'placeholder'::string as table_name \ No newline at end of file diff --git a/models/streamline/decode_instructions/streamline__decode_instructions_2_realtime.sql b/models/streamline/decode_instructions/streamline__decode_instructions_2_realtime.sql new file mode 100644 index 00000000..c450fbeb --- /dev/null +++ b/models/streamline/decode_instructions/streamline__decode_instructions_2_realtime.sql @@ -0,0 +1,89 @@ +{{ config ( + materialized = "view", + post_hook = if_data_call_function( + func = "{{this.schema}}.udf_bulk_instructions_decoder(object_construct('sql_source', '{{this.identifier}}', 'external_table', 'decoded_instructions_2', 'sql_limit', {{var('sql_limit','2500000')}}, 'producer_batch_size', {{var('producer_batch_size','1000000')}}, 'worker_batch_size', {{var('worker_batch_size','50000')}}, 'batch_call_limit', {{var('batch_call_limit','1000')}}))", + target = "{{this.schema}}.{{this.identifier}}" + ), + tags = ['streamline_decoder'] +) }} + +WITH idl_in_play AS ( + + SELECT + program_id + FROM + {{ ref('silver__verified_idls') }} +), +event_subset AS ( + SELECT + e.program_id, + e.tx_id, + e.index, + NULL as inner_index, + e.instruction, + e.block_id, + e.block_timestamp, + {{ dbt_utils.generate_surrogate_key(['e.block_id','e.tx_id','e.index','inner_index','e.program_id']) }} as id + FROM + {{ ref('silver__events') }} + e + JOIN idl_in_play b + ON e.program_id = b.program_id + WHERE + e.block_timestamp >= CURRENT_DATE - 2 + AND + e.succeeded + UNION ALL + SELECT + i.value :programId :: STRING AS inner_program_id, + e.tx_id, + e.index, + i.index AS inner_index, + i.value AS instruction, + e.block_id, + e.block_timestamp, + {{ dbt_utils.generate_surrogate_key(['e.block_id','e.tx_id','e.index','inner_index','inner_program_id']) }} as id + FROM + {{ ref('silver__events') }} + e + JOIN idl_in_play b + ON ARRAY_CONTAINS(b.program_id::variant, e.inner_instruction_program_ids) + JOIN table(flatten(e.inner_instruction:instructions)) i + WHERE + e.block_timestamp >= CURRENT_DATE - 2 + AND + e.succeeded + AND + i.value :programId :: STRING = b.program_id + +), +completed_subset AS ( + SELECT + block_id, + complete_decoded_instructions_2_id as id + FROM + {{ ref('streamline__complete_decoded_instructions_2') }} + WHERE + block_id >= ( + SELECT + MIN(block_id) + FROM + event_subset + ) +) +SELECT + e.program_id, + e.tx_id, + e.index, + e.inner_index, + e.instruction, + e.block_id, + e.block_timestamp +FROM + event_subset e + LEFT OUTER JOIN completed_subset C + ON C.block_id = e.block_id + AND e.id = C.id +WHERE + C.block_id IS NULL +qualify(row_number() over (order by e.block_id, e.tx_id)) <= {{ var('sql_limit','2500000') }} \ No newline at end of file