diff --git a/.github/workflows/dbt_run_parser_program.yml b/.github/workflows/dbt_run_parser_program.yml new file mode 100644 index 00000000..1156259c --- /dev/null +++ b/.github/workflows/dbt_run_parser_program.yml @@ -0,0 +1,43 @@ +name: dbt_run_parser_program +run-name: dbt_run_parser_program + +on: + schedule: + # Runs every 30 mins (see https://crontab.guru) + - cron: '*/30 * * * *' + +env: + DBT_PROFILES_DIR: "${{ secrets.DBT_PROFILES_DIR }}" + + ACCOUNT: "${{ secrets.ACCOUNT }}" + ROLE: "${{ secrets.ROLE }}" + USER: "${{ secrets.USER }}" + PASSWORD: "${{ secrets.PASSWORD }}" + REGION: "${{ secrets.REGION }}" + DATABASE: "${{ secrets.DATABASE }}" + WAREHOUSE: "${{ secrets.WAREHOUSE }}" + SCHEMA: "${{ secrets.SCHEMA }}" + +concurrency: + group: ${{ github.workflow }} + +jobs: + run_dbt_jobs: + runs-on: ubuntu-latest + environment: + name: workflow_prod + + steps: + - uses: actions/checkout@v3 + + - uses: actions/setup-python@v1 + with: + python-version: "3.7.x" + + - name: install dependencies + run: | + pip3 install dbt-snowflake==${{ secrets.DBT_VERSION }} cli_passthrough requests click + dbt deps + - name: Run DBT Jobs + run: | + dbt run --vars '{"STREAMLINE_INVOKE_STREAMS":True, "STREAMLINE_RUN_HISTORY":True}' -m ./models/streamline/parser/streamline__all_undecoded_instructions_history.sql \ No newline at end of file diff --git a/.user.yml b/.user.yml new file mode 100644 index 00000000..644da680 --- /dev/null +++ b/.user.yml @@ -0,0 +1 @@ +id: a98e72da-488e-4e5a-980b-5c0b59143929 diff --git a/dbt_project.yml b/dbt_project.yml index 4f084a1c..d4081379 100644 --- a/dbt_project.yml +++ b/dbt_project.yml @@ -60,4 +60,6 @@ models: vars: "dbt_date:time_zone": GMT UPDATE_SNOWFLAKE_TAGS: True - UPDATE_UDFS_AND_SPS: False \ No newline at end of file + UPDATE_UDFS_AND_SPS: False + STREAMLINE_INVOKE_STREAMS: False + STREAMLINE_RUN_HISTORY: False \ No newline at end of file diff --git a/macros/streamline/bulk_program_parser/run_sp_program_parser.sql b/macros/streamline/bulk_program_parser/run_sp_program_parser.sql new file mode 100644 index 00000000..c250bc8a --- /dev/null +++ b/macros/streamline/bulk_program_parser/run_sp_program_parser.sql @@ -0,0 +1,7 @@ +{% macro run_sp_udf_bulk_program_parser() %} +{% set sql %} +call silver.sp_udf_bulk_program_parser(); +{% endset %} + +{% do run_query(sql) %} +{% endmacro %} \ No newline at end of file diff --git a/macros/streamline/bulk_program_parser/sp_create_program_parser.sql b/macros/streamline/bulk_program_parser/sp_create_program_parser.sql new file mode 100644 index 00000000..78e67211 --- /dev/null +++ b/macros/streamline/bulk_program_parser/sp_create_program_parser.sql @@ -0,0 +1,21 @@ +{% macro sp_create_udf_bulk_program_parser() %} + {% if var("UPDATE_UDFS_AND_SPS") %} + {% set sql %} + CREATE OR REPLACE PROCEDURE silver.sp_udf_bulk_program_parser() + RETURNS variant + LANGUAGE SQL + AS + $$ + DECLARE + RESULT VARCHAR; + BEGIN + RESULT:= ( + SELECT + silver.udf_bulk_program_parser() + ); + RETURN RESULT; + END; + $${% endset %} + {% do run_query(sql) %} + {% endif %} +{% endmacro %} \ No newline at end of file diff --git a/macros/streamline/bulk_program_parser/udf_program_parser.sql b/macros/streamline/bulk_program_parser/udf_program_parser.sql new file mode 100644 index 00000000..67b62ddd --- /dev/null +++ b/macros/streamline/bulk_program_parser/udf_program_parser.sql @@ -0,0 +1,8 @@ +{% macro udf_bulk_program_parser() %} + CREATE + OR REPLACE EXTERNAL FUNCTION silver.udf_bulk_program_parser() returns ARRAY api_integration = aws_solana_api_dev AS {% if target.database == 'SOLANA' -%} + 'https://pj4rqb8z96.execute-api.us-east-1.amazonaws.com/prod/bulk_program_parser' + {% else %} + 'https://89kf6gtxr0.execute-api.us-east-1.amazonaws.com/dev/bulk_program_parser' + {%- endif %} +{% endmacro %} \ No newline at end of file diff --git a/macros/streamline/models.sql b/macros/streamline/models.sql new file mode 100644 index 00000000..ad31de36 --- /dev/null +++ b/macros/streamline/models.sql @@ -0,0 +1,144 @@ +{% macro decode_logs_history( + start, + stop + ) %} + WITH look_back AS ( + SELECT + block_number + FROM + {{ ref("_max_block_by_date") }} + qualify ROW_NUMBER() over ( + ORDER BY + block_number DESC + ) = 1 + ) +SELECT + l.block_number, + l._log_id, + A.abi AS abi, + OBJECT_CONSTRUCT( + 'topics', + l.topics, + 'data', + l.data, + 'address', + l.contract_address + ) AS DATA +FROM + {{ ref("silver__logs") }} + l + INNER JOIN {{ ref("silver__complete_event_abis") }} A + ON A.parent_contract_address = l.contract_address + AND A.event_signature = l.topics[0]:: STRING + AND l.block_number BETWEEN A.start_block + AND A.end_block +WHERE + ( + l.block_number BETWEEN {{ start }} + AND {{ stop }} + ) + AND l.block_number <= ( + SELECT + block_number + FROM + look_back + ) + AND _log_id NOT IN ( + SELECT + _log_id + FROM + {{ ref("streamline__complete_decode_logs") }} + WHERE + ( + block_number BETWEEN {{ start }} + AND {{ stop }} + ) + AND block_number <= ( + SELECT + block_number + FROM + look_back + ) + ) +{% endmacro %} + +{% macro streamline_external_table_query( + model, + partition_function, + partition_name, + unique_key, + other_cols + ) %} + WITH meta AS ( + SELECT + job_created_time AS _inserted_timestamp, + file_name, + {{ partition_function }} AS {{ partition_name }} + FROM + TABLE( + information_schema.external_table_file_registration_history( + start_time => DATEADD('day', -7, CURRENT_TIMESTAMP()), + table_name => '{{ source( "bronze_streamline", model) }}') + ) A + ) + SELECT + {{ unique_key }}, + {{ other_cols }}, + DATA, + _inserted_timestamp, + s.{{ partition_name }}, + s.value AS VALUE + FROM + {{ source( + "bronze_streamline", + model + ) }} + s + JOIN meta b + ON b.file_name = metadata$filename + AND b.{{ partition_name }} = s.{{ partition_name }} + WHERE + b.{{ partition_name }} = s.{{ partition_name }} + AND DATA :error :code IS NULL +{% endmacro %} + +{% macro streamline_external_table_FR_query( + model, + partition_function, + partition_name, + unique_key, + other_cols + ) %} + WITH meta AS ( + SELECT + registered_on AS _inserted_timestamp, + file_name, + {{ partition_function }} AS {{ partition_name }} + FROM + TABLE( + information_schema.external_table_files( + table_name => '{{ source( "bronze_streamline", model) }}' + ) + ) A + ) +SELECT + {{ unique_key }}, + {{ other_cols }}, + DATA, + _inserted_timestamp, + s.{{ partition_name }}, + s.value AS VALUE +FROM + {{ source( + "bronze_streamline", + model + ) }} + s + JOIN meta b + ON b.file_name = metadata$filename + AND b.{{ partition_name }} = s.{{ partition_name }} +WHERE + b.{{ partition_name }} = s.{{ partition_name }} + AND DATA :error :code IS NULL + +{% endmacro %} diff --git a/macros/utils.sql b/macros/utils.sql new file mode 100644 index 00000000..14de2a46 --- /dev/null +++ b/macros/utils.sql @@ -0,0 +1,78 @@ +{% macro if_data_call_function( + func, + target + ) %} + {% if var( + "STREAMLINE_INVOKE_STREAMS" + ) %} + {% if execute %} + {{ log( + "Running macro `if_data_call_function`: Calling udf " ~ func ~ " on " ~ target, + True + ) }} + {% endif %} + SELECT + {{ func }} + WHERE + EXISTS( + SELECT + 1 + FROM + {{ target }} + LIMIT + 1 + ) + {% else %} + {% if execute %} + {{ log( + "Running macro `if_data_call_function`: NOOP", + False + ) }} + {% endif %} + SELECT + NULL + {% endif %} +{% endmacro %} + +{% macro if_data_call_wait() %} + {% if var( + "STREAMLINE_INVOKE_STREAMS" + ) %} + {% set query %} + SELECT + 1 + WHERE + EXISTS( + SELECT + 1 + FROM + {{ model.schema ~ "." ~ model.alias }} + LIMIT + 1 + ) {% endset %} + {% if execute %} + {% set results = run_query( + query + ) %} + {% if results %} + {{ log( + "Waiting...", + info = True + ) }} + + {% set wait_query %} + SELECT + system$wait( + {{ var( + "WAIT", + 600 + ) }} + ) {% endset %} + {% do run_query(wait_query) %} + {% else %} + SELECT + NULL; + {% endif %} + {% endif %} + {% endif %} +{% endmacro %} \ No newline at end of file diff --git a/models/bronze/streamline/bronze__streamline_FR_program_parser.sql b/models/bronze/streamline/bronze__streamline_FR_program_parser.sql new file mode 100644 index 00000000..3a0b38eb --- /dev/null +++ b/models/bronze/streamline/bronze__streamline_FR_program_parser.sql @@ -0,0 +1,12 @@ +{{ config ( + materialized = 'view' +) }} + +{% set model = this.identifier.split("_") [-2] + '_' + this.identifier.split("_") [-1] %} +{{ streamline_external_table_FR_query( + model, + partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 4), '_', 1) AS INTEGER )", + partition_name = "_partition_by_block_id", + unique_key = "block_id", + other_cols = "tx_id,index,program_id" +) }} diff --git a/models/bronze/streamline/bronze__streamline_program_parser.sql b/models/bronze/streamline/bronze__streamline_program_parser.sql new file mode 100644 index 00000000..3f44f4c2 --- /dev/null +++ b/models/bronze/streamline/bronze__streamline_program_parser.sql @@ -0,0 +1,12 @@ +{{ config ( + materialized = 'view' +) }} + +{% set model = this.identifier.split("_") [-2] + '_' + this.identifier.split("_") [-1] %} +{{ streamline_external_table_query( + model, + partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 4), '_', 1) AS INTEGER )", + partition_name = "_partition_by_block_id", + unique_key = "block_id", + other_cols = "tx_id,index,program_id" +) }} diff --git a/models/sources.yml b/models/sources.yml index d441307c..132ed7a1 100644 --- a/models/sources.yml +++ b/models/sources.yml @@ -50,6 +50,7 @@ sources: - name: validators_app_list_api - name: stake_program_accounts - name: validator_vote_program_accounts + - name: program_parser - name: bronze_api schema: bronze_api tables: diff --git a/models/silver/parser/silver___all_undecoded_instructions.sql b/models/streamline/parser/streamline__all_undecoded_instructions.sql similarity index 83% rename from models/silver/parser/silver___all_undecoded_instructions.sql rename to models/streamline/parser/streamline__all_undecoded_instructions.sql index f4c09e77..a43c5925 100644 --- a/models/silver/parser/silver___all_undecoded_instructions.sql +++ b/models/streamline/parser/streamline__all_undecoded_instructions.sql @@ -1,8 +1,9 @@ -{{ config( - materialized = 'view', - full_refresh = false +{{ config ( + materialized = "incremental", + unique_key = "CONCAT_WS('-', tx_id, INDEX)", + cluster_by = "ROUND(block_id, -3)" ) }} --- post_hook = 'call silver.sp_bulk_decode_instructions()', + WITH idl_in_play AS ( SELECT @@ -21,6 +22,7 @@ instr_in_play AS ( tx_id, INDEX, instruction, + block_id, block_timestamp FROM {{ ref('silver__events') }} A @@ -34,6 +36,7 @@ SELECT p.tx_id, p.index, p.instruction, + p.block_id, p.block_timestamp FROM instr_in_play p diff --git a/models/streamline/parser/streamline__all_undecoded_instructions_history.sql b/models/streamline/parser/streamline__all_undecoded_instructions_history.sql new file mode 100644 index 00000000..e8b765ba --- /dev/null +++ b/models/streamline/parser/streamline__all_undecoded_instructions_history.sql @@ -0,0 +1,68 @@ +{{ config ( + materialized = "view", + post_hook = if_data_call_function( + func = "{{this.schema}}.udf_bulk_program_parser()", + target = "{{this.schema}}.{{this.identifier}}" + ) +) }} + +WITH last_3_days AS ({% if var('STREAMLINE_RUN_HISTORY') %} + + SELECT + 0 AS block_id + {% else %} + SELECT + MAX(block_id) - 100000 AS block_id --aprox 3 days + FROM + {{ ref("streamline__all_undecoded_instructions") }} + {% endif %}), + tbl AS ( + SELECT + program_id, + tx_id, + INDEX, + instruction, + block_id, + block_timestamp + FROM + {{ ref("streamline__all_undecoded_instructions") }} + WHERE + ( + block_id >= ( + SELECT + block_id + FROM + last_3_days + ) + ) + AND block_id IS NOT NULL + AND concat_ws( + '-', + block_id, + program_id, + INDEX + ) NOT IN ( + SELECT + id + FROM + {{ ref("streamline__complete_decoded_instructions") }} + WHERE + block_id >= ( + SELECT + block_id + FROM + last_3_days + ) + AND block_id IS NOT NULL + ) + ) +SELECT + program_id, + tx_id, + INDEX, + instruction, + block_id, + block_timestamp +FROM + tbl +WHERE program_id = (SELECT MAX(program_id) AS program_id FROM tbl) \ No newline at end of file diff --git a/models/streamline/parser/streamline__complete_decoded_instructions.sql b/models/streamline/parser/streamline__complete_decoded_instructions.sql new file mode 100644 index 00000000..f65e8208 --- /dev/null +++ b/models/streamline/parser/streamline__complete_decoded_instructions.sql @@ -0,0 +1,29 @@ +-- depends_on: {{ ref('bronze__streamline_program_parser') }} +{{ config ( + materialized = "incremental", + unique_key = "id", + cluster_by = "ROUND(block_id, -3)", + post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION on equality(id)" +) }} + +SELECT + block_id, + CONCAT_WS('-', block_id, data[1]:program::STRING, data[0]) AS id, + _inserted_timestamp +FROM +{% if is_incremental() %} +{{ ref('bronze__streamline_program_parser') }} +WHERE + _inserted_timestamp >= ( + SELECT + MAX(_inserted_timestamp) _inserted_timestamp + FROM + {{ this }} + ) +{% else %} + {{ ref('bronze__streamline_FR_program_parser') }} +{% endif %} + +qualify(ROW_NUMBER() over (PARTITION BY id +ORDER BY + _inserted_timestamp DESC)) = 1