An 4323/initial streamline 2.0 setup (#428)

* wip

* change udf name, fix surrogate key for inner instruct calls

* (wip) decoded instructions backfill helpers

* complete backfill helpers, clean up views with 0 requests

* rename

* revert

* temp change will revert

* rename

* use streamline 2.0 verify idl api endpoint

* this is placeholder model, will replace existing when ready

* reorg files

* add workflow for decode instructions real time

* use pip cache

* update with prod endpoints

* update sql limit to real value, should be union all
This commit is contained in:
desmond-hui 2024-01-03 09:24:43 -08:00 committed by GitHub
parent 08da1cc3c4
commit 5edc2fcf22
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 426 additions and 5 deletions

View File

@ -0,0 +1,48 @@
name: dbt_run_decode_instructions
run-name: dbt_run_decode_instructions
on:
workflow_dispatch:
branches:
- "main"
# schedule:
# # Runs every 30 mins (see https://crontab.guru)
# - cron: '*/30 * * * *'
env:
DBT_PROFILES_DIR: "${{ vars.DBT_PROFILES_DIR }}"
ACCOUNT: "${{ vars.ACCOUNT }}"
ROLE: "${{ vars.ROLE }}"
USER: "${{ vars.USER }}"
PASSWORD: "${{ secrets.PASSWORD }}"
REGION: "${{ vars.REGION }}"
DATABASE: "${{ vars.DATABASE }}"
WAREHOUSE: "${{ vars.WAREHOUSE }}"
SCHEMA: "${{ vars.SCHEMA }}"
concurrency:
group: ${{ github.workflow }}
jobs:
run_dbt_jobs:
runs-on: ubuntu-latest
environment:
name: workflow_prod
steps:
- uses: actions/checkout@v3
- uses: actions/setup-python@v4
with:
python-version: "${{ vars.PYTHON_VERSION }}"
cache: "pip"
- name: install dependencies
run: |
pip3 install dbt-snowflake==${{ vars.DBT_VERSION }} cli_passthrough requests click
dbt deps
- name: Run DBT Jobs
run: |
dbt run -s models/streamline/decode_instructions/streamline__complete_decoded_instructions_2.sql
dbt run --vars '{"STREAMLINE_INVOKE_STREAMS": True}' -s models/streamline/decode_instructions/streamline__decode_instructions_2_realtime.sql

View File

@ -15,6 +15,8 @@
{{ udf_decode_instructions() }};
{{ udf_bulk_parse_compressed_nft_mints() }};
{{ udf_bulk_get_solscan_blocks() }};
{{ create_udf_bulk_instructions_decoder() }};
{{ create_udf_verify_idl() }};
{% endif %}
{{ create_udf_ordered_signers(

View File

@ -0,0 +1,125 @@
{% macro decoded_instructions_backfill_generate_views(program_id) %}
{% set result_cols = run_query("""select
first_block_id,
default_backfill_start_block_id
from solana.streamline.idls_history
where program_id = '""" ~ program_id ~ """';""").columns %}
{% set min_block_id = result_cols[0].values()[0] | int %}
{% set max_block_id = result_cols[1].values()[0] | int %}
{% set step = 1000000 %}
{% for i in range(min_block_id, max_block_id, step) %}
{% if i == min_block_id %}
{% set start_block = i %}
{% else %}
{% set start_block = i+1 %}
{% endif %}
{% if i+step >= max_block_id %}
{% set end_block = max_block_id %}
{% else %}
{% set end_block = i+step %}
{% endif %}
{% set query = """create or replace view streamline.decoded_instructions_backfill_""" ~ start_block ~ """_""" ~ end_block ~ """_""" ~ program_id ~ """ AS
with completed_subset AS (
SELECT
block_id,
program_id,
complete_decoded_instructions_2_id as id
FROM
""" ~ ref('streamline__complete_decoded_instructions_2') ~ """
WHERE
program_id = '""" ~ program_id ~ """'
AND
block_id between """ ~ start_block ~ """ and """ ~ end_block ~ """
),
event_subset as (
select
e.block_id,
e.tx_id,
e.index,
NULL as inner_index,
e.instruction,
e.program_id,
e.block_timestamp,
""" ~ dbt_utils.generate_surrogate_key(['e.block_id','e.tx_id','e.index','inner_index','e.program_id']) ~ """ as id
from solana.silver.events e
where program_id = '""" ~ program_id ~ """'
and block_id between """ ~ start_block ~ """ and """ ~ end_block ~ """
and succeeded
union
select
e.block_id,
e.tx_id,
e.index,
i.index as inner_index,
i.value as instruction,
i.value :programId :: STRING AS inner_program_id,
e.block_timestamp,
""" ~ dbt_utils.generate_surrogate_key(['e.block_id','e.tx_id','e.index','inner_index','inner_program_id']) ~ """ as id
from solana.silver.events e,
table(flatten(inner_instruction:instructions)) i
where array_contains(program_id::variant, inner_instruction_program_ids)
and inner_program_id = '""" ~ program_id ~ """'
and e.block_id between """ ~ start_block ~ """ and """ ~ end_block ~ """
and e.succeeded
)
select
e.*
from event_subset e
left outer join completed_subset c on c.program_id = e.program_id and c.block_id = e.block_id and c.id = e.id
where c.block_id is null""" %}
{% do run_query(query) %}
{% endfor %}
{% endmacro %}
{% macro decoded_instructions_backill_cleanup_views() %}
{% set results = run_query("""select
table_schema,
table_name
from information_schema.views
where table_name like 'DECODED_INSTRUCTIONS_BACKFILL_%'
order by 2 desc
limit 20;""").columns %}
{% set schema_names = results[0].values() %}
{% set table_names = results[1].values() %}
{% for table_name in table_names %}
{% set has_requests = run_query("""select 1 from """ ~ schema_names[0] ~ """.""" ~ table_name ~ """ limit 1""").columns[0].values()[0] %}
{% if not has_requests %}
{% do run_query("""drop view """ ~ schema_names[0] ~ """.""" ~ table_name) %}
{% do run_query("""insert into """ ~ ref('streamline__complete_decoded_instructions_2_backfill') ~ """ values('""" ~ schema_names[0] ~ """','""" ~ table_name ~ """')""") %}
{% endif %}
{% endfor %}
{% endmacro %}
{% macro decoded_instructions_backfill_calls() %}
{% set sql_limit = 2500000 %}
{% set producer_batch_size = 1000000 %}
{% set worker_batch_size = 50000 %}
{% set batch_call_limit = 1000 %}
{% set results = run_query("""select
table_schema,
table_name
from information_schema.views
where table_name like 'DECODED_INSTRUCTIONS_BACKFILL_%'
except
select
schema_name,
table_name
from """ ~ ref('streamline__complete_decoded_instructions_2_backfill') ~ """
order by 2 desc
limit 10;""").columns %}
{% set schema_names = results[0].values() %}
{% set table_names = results[1].values() %}
{% for table_name in table_names %}
{% set udf_call = if_data_call_function(
func = schema_names[0] ~ ".udf_bulk_instructions_decoder(object_construct('sql_source', '" ~ table_name ~ "', 'external_table', 'decoded_instructions_2', 'sql_limit', '" ~ sql_limit ~ "', 'producer_batch_size', '" ~ producer_batch_size ~ "', 'worker_batch_size', '" ~ worker_batch_size ~ "', 'batch_call_limit', '" ~ batch_call_limit ~ "', 'call_type', 'backfill'))",
target = schema_names[0] ~ "." ~ table_name) %}
{% do run_query(udf_call) %}
{% endfor %}
{% endmacro %}

View File

@ -0,0 +1,19 @@
{% macro create_udf_bulk_instructions_decoder() %}
CREATE
OR REPLACE EXTERNAL FUNCTION streamline.udf_bulk_instructions_decoder(
json variant
) returns text api_integration = aws_solana_api_dev AS {% if target.database == 'SOLANA' -%}
'https://l426aqju0g.execute-api.us-east-1.amazonaws.com/prod/udf_bulk_instructions_decoder'
{% else %}
'https://7938mznoq8.execute-api.us-east-1.amazonaws.com/dev/udf_bulk_instructions_decoder'
{%- endif %}
{% endmacro %}
{% macro create_udf_verify_idl() %}
CREATE
OR REPLACE EXTERNAL FUNCTION streamline.udf_verify_idl("JSON" ARRAY) returns VARIANT api_integration = aws_solana_api_dev AS {% if target.database == 'SOLANA' -%}
'https://l426aqju0g.execute-api.us-east-1.amazonaws.com/prod/verify_idl'
{% else %}
'https://7938mznoq8.execute-api.us-east-1.amazonaws.com/dev/verify_idl'
{%- endif %}
{% endmacro %}

View File

@ -0,0 +1,12 @@
{{ config (
materialized = 'view'
) }}
{% set model = "decoded_instructions_2" %}
{{ streamline_external_table_FR_query(
model,
partition_function = "to_date(concat_ws('-', split_part(file_name, '/', 3),split_part(file_name, '/', 4), split_part(file_name, '/', 5)))",
partition_name = "_partition_by_created_date",
unique_key = "block_id",
other_cols = "tx_id,index,inner_index,program_id,_partition_by_block_id"
) }}

View File

@ -0,0 +1,12 @@
{{ config (
materialized = 'view'
) }}
{% set model = "decoded_instructions_2" %}
{{ streamline_external_table_query(
model,
partition_function = "to_date(concat_ws('-', split_part(file_name, '/', 3),split_part(file_name, '/', 4), split_part(file_name, '/', 5)))",
partition_name = "_partition_by_created_date",
unique_key = "block_id",
other_cols = "tx_id,index,inner_index,program_id,_partition_by_block_id"
) }}

View File

@ -47,10 +47,18 @@ LIMIT
), program_requests AS (
SELECT
e.program_id,
ARRAY_CONSTRUCT(
OBJECT_CONSTRUCT(
'tx_id',
e.tx_id,
'block_id',
e.block_id,
'index',
e.index,
'program_id',
e.program_id,
'instruction',
e.instruction,
'is_verify',
TRUE
) AS request
FROM
@ -75,7 +83,7 @@ groupings AS (
responses AS (
SELECT
program_id,
streamline.udf_decode_instructions(requests) AS response
streamline.udf_verify_idl(requests) AS response
FROM
groupings
),
@ -83,14 +91,14 @@ results as (
select
program_id,
response :status_code :: INTEGER as status_code,
try_parse_json(response:body):data::array as decoded_instructions
try_parse_json(response:body)::array as decoded_instructions
from responses
),
expanded as (
select
r.program_id,
r.status_code,
iff(coalesce(d.value:error::string,'') = '' or status_code <> 200,false,true) is_error
iff(coalesce(d.value:error::string,'') = '' and coalesce(d.value:data:error::string,'') = '' and status_code = 200,false,true) is_error
from results r,
table(flatten(decoded_instructions)) d
),

View File

@ -0,0 +1,61 @@
-- depends_on: {{ ref('bronze__streamline_decoded_instructions_2') }}
-- depends_on: {{ ref('bronze__streamline_FR_decoded_instructions_2') }}
{{ config(
materialized = 'incremental',
unique_key = ["tx_id", "index", "inner_index" ],
cluster_by = ['block_timestamp::DATE','_inserted_timestamp::DATE','program_id'],
post_hook = enable_search_optimization('{{this.schema}}','{{this.identifier}}'),
merge_exclude_columns = ["inserted_timestamp"],
tags = ['scheduled_non_core'],
enabled=false,
) }}
SELECT
b.block_timestamp,
A.block_id,
A.tx_id,
COALESCE(
A.index,
VALUE :data :data [0] [0],
VALUE :data [0] [0]
) :: INT AS INDEX,
A.inner_index,
A.program_id,
COALESCE(
A.value :data :data [0] [1],
A.value :data [1],
A.value :data
) AS decoded_instruction,
A._inserted_timestamp,
{{ dbt_utils.generate_surrogate_key(
['A.tx_id', 'A.index', 'A.inner_index']
) }} AS decoded_instructions_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp,
'{{ invocation_id }}' AS _invocation_id
FROM
{% if is_incremental() %}
{{ ref('bronze__streamline_decoded_instructions_2') }} A
{% else %}
{{ ref('bronze__streamline_FR_decoded_instructions_2') }} A
{% endif %}
JOIN {{ ref('silver__blocks') }}
b
ON A.block_id = b.block_id
{% if is_incremental() %}
WHERE
A._inserted_timestamp >= (
SELECT
MAX(
_inserted_timestamp
) _inserted_timestamp
FROM
{{ this }}
)
{% endif %}
qualify(ROW_NUMBER() over (PARTITION BY tx_id, INDEX, inner_index
ORDER BY
A._inserted_timestamp DESC)) = 1

View File

@ -42,7 +42,7 @@ sources:
- name: prod_nft_metadata_uploads_1828572827
- name: bronze_streamline
database: streamline
schema: solana
schema: "{{ 'solana' if target.database == 'SOLANA' else 'solana_dev' }}"
tables:
- name: decode_instructions_idls
- name: decoded_instructions_data_api
@ -57,6 +57,7 @@ sources:
- name: stake_program_accounts
- name: validator_vote_program_accounts
- name: program_parser
- name: decoded_instructions_2
- name: bronze_api
schema: bronze_api
tables:

View File

@ -0,0 +1,34 @@
-- depends_on: {{ ref('bronze__streamline_decoded_instructions_2') }}
{{ config (
materialized = "incremental",
unique_key = "complete_decoded_instructions_2_id",
cluster_by = ["ROUND(block_id, -3)","program_id"],
post_hook = enable_search_optimization('{{this.schema}}','{{this.identifier}}','ON EQUALITY(complete_decoded_instructions_2_id)'),
tags = ['streamline_decoder'],
) }}
SELECT
block_id,
tx_id,
index,
inner_index,
program_id,
{{ dbt_utils.generate_surrogate_key(['block_id','tx_id','index','inner_index','program_id']) }} as complete_decoded_instructions_2_id,
_inserted_timestamp
FROM
{% if is_incremental() %}
{{ ref('bronze__streamline_decoded_instructions_2') }}
WHERE
_inserted_timestamp >= (
SELECT
COALESCE(MAX(_inserted_timestamp),'2000-01-01'::timestamp_ntz) _inserted_timestamp
FROM
{{ this }}
)
{% else %}
{{ ref('bronze__streamline_FR_decoded_instructions_2') }}
{% endif %}
qualify(ROW_NUMBER() over (PARTITION BY complete_decoded_instructions_2_id
ORDER BY
_inserted_timestamp DESC)) = 1

View File

@ -0,0 +1,10 @@
{{ config (
materialized = 'incremental',
unique_key = 'table_name',
full_refresh = false,
tags = ['streamline_decoder'],
) }}
select
'placeholder'::string as schema_name,
'placeholder'::string as table_name

View File

@ -0,0 +1,89 @@
{{ config (
materialized = "view",
post_hook = if_data_call_function(
func = "{{this.schema}}.udf_bulk_instructions_decoder(object_construct('sql_source', '{{this.identifier}}', 'external_table', 'decoded_instructions_2', 'sql_limit', {{var('sql_limit','2500000')}}, 'producer_batch_size', {{var('producer_batch_size','1000000')}}, 'worker_batch_size', {{var('worker_batch_size','50000')}}, 'batch_call_limit', {{var('batch_call_limit','1000')}}))",
target = "{{this.schema}}.{{this.identifier}}"
),
tags = ['streamline_decoder']
) }}
WITH idl_in_play AS (
SELECT
program_id
FROM
{{ ref('silver__verified_idls') }}
),
event_subset AS (
SELECT
e.program_id,
e.tx_id,
e.index,
NULL as inner_index,
e.instruction,
e.block_id,
e.block_timestamp,
{{ dbt_utils.generate_surrogate_key(['e.block_id','e.tx_id','e.index','inner_index','e.program_id']) }} as id
FROM
{{ ref('silver__events') }}
e
JOIN idl_in_play b
ON e.program_id = b.program_id
WHERE
e.block_timestamp >= CURRENT_DATE - 2
AND
e.succeeded
UNION ALL
SELECT
i.value :programId :: STRING AS inner_program_id,
e.tx_id,
e.index,
i.index AS inner_index,
i.value AS instruction,
e.block_id,
e.block_timestamp,
{{ dbt_utils.generate_surrogate_key(['e.block_id','e.tx_id','e.index','inner_index','inner_program_id']) }} as id
FROM
{{ ref('silver__events') }}
e
JOIN idl_in_play b
ON ARRAY_CONTAINS(b.program_id::variant, e.inner_instruction_program_ids)
JOIN table(flatten(e.inner_instruction:instructions)) i
WHERE
e.block_timestamp >= CURRENT_DATE - 2
AND
e.succeeded
AND
i.value :programId :: STRING = b.program_id
),
completed_subset AS (
SELECT
block_id,
complete_decoded_instructions_2_id as id
FROM
{{ ref('streamline__complete_decoded_instructions_2') }}
WHERE
block_id >= (
SELECT
MIN(block_id)
FROM
event_subset
)
)
SELECT
e.program_id,
e.tx_id,
e.index,
e.inner_index,
e.instruction,
e.block_id,
e.block_timestamp
FROM
event_subset e
LEFT OUTER JOIN completed_subset C
ON C.block_id = e.block_id
AND e.id = C.id
WHERE
C.block_id IS NULL
qualify(row_number() over (order by e.block_id, e.tx_id)) <= {{ var('sql_limit','2500000') }}