Add solana parser jobs and models (#316)

* updated the dbt model for adding the solana parser

* updated

* updated

* updated

* updated
This commit is contained in:
xiuy001 2023-07-21 12:33:08 -04:00 committed by GitHub
parent 9373eaa5c4
commit 3707093464
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 434 additions and 5 deletions

View File

@ -0,0 +1,43 @@
name: dbt_run_parser_program
run-name: dbt_run_parser_program
on:
schedule:
# Runs every 30 mins (see https://crontab.guru)
- cron: '*/30 * * * *'
env:
DBT_PROFILES_DIR: "${{ secrets.DBT_PROFILES_DIR }}"
ACCOUNT: "${{ secrets.ACCOUNT }}"
ROLE: "${{ secrets.ROLE }}"
USER: "${{ secrets.USER }}"
PASSWORD: "${{ secrets.PASSWORD }}"
REGION: "${{ secrets.REGION }}"
DATABASE: "${{ secrets.DATABASE }}"
WAREHOUSE: "${{ secrets.WAREHOUSE }}"
SCHEMA: "${{ secrets.SCHEMA }}"
concurrency:
group: ${{ github.workflow }}
jobs:
run_dbt_jobs:
runs-on: ubuntu-latest
environment:
name: workflow_prod
steps:
- uses: actions/checkout@v3
- uses: actions/setup-python@v1
with:
python-version: "3.7.x"
- name: install dependencies
run: |
pip3 install dbt-snowflake==${{ secrets.DBT_VERSION }} cli_passthrough requests click
dbt deps
- name: Run DBT Jobs
run: |
dbt run --vars '{"STREAMLINE_INVOKE_STREAMS":True, "STREAMLINE_RUN_HISTORY":True}' -m ./models/streamline/parser/streamline__all_undecoded_instructions_history.sql

1
.user.yml Normal file
View File

@ -0,0 +1 @@
id: a98e72da-488e-4e5a-980b-5c0b59143929

View File

@ -60,4 +60,6 @@ models:
vars:
"dbt_date:time_zone": GMT
UPDATE_SNOWFLAKE_TAGS: True
UPDATE_UDFS_AND_SPS: False
UPDATE_UDFS_AND_SPS: False
STREAMLINE_INVOKE_STREAMS: False
STREAMLINE_RUN_HISTORY: False

View File

@ -0,0 +1,7 @@
{% macro run_sp_udf_bulk_program_parser() %}
{% set sql %}
call silver.sp_udf_bulk_program_parser();
{% endset %}
{% do run_query(sql) %}
{% endmacro %}

View File

@ -0,0 +1,21 @@
{% macro sp_create_udf_bulk_program_parser() %}
{% if var("UPDATE_UDFS_AND_SPS") %}
{% set sql %}
CREATE OR REPLACE PROCEDURE silver.sp_udf_bulk_program_parser()
RETURNS variant
LANGUAGE SQL
AS
$$
DECLARE
RESULT VARCHAR;
BEGIN
RESULT:= (
SELECT
silver.udf_bulk_program_parser()
);
RETURN RESULT;
END;
$${% endset %}
{% do run_query(sql) %}
{% endif %}
{% endmacro %}

View File

@ -0,0 +1,8 @@
{% macro udf_bulk_program_parser() %}
CREATE
OR REPLACE EXTERNAL FUNCTION silver.udf_bulk_program_parser() returns ARRAY api_integration = aws_solana_api_dev AS {% if target.database == 'SOLANA' -%}
'https://pj4rqb8z96.execute-api.us-east-1.amazonaws.com/prod/bulk_program_parser'
{% else %}
'https://89kf6gtxr0.execute-api.us-east-1.amazonaws.com/dev/bulk_program_parser'
{%- endif %}
{% endmacro %}

View File

@ -0,0 +1,144 @@
{% macro decode_logs_history(
start,
stop
) %}
WITH look_back AS (
SELECT
block_number
FROM
{{ ref("_max_block_by_date") }}
qualify ROW_NUMBER() over (
ORDER BY
block_number DESC
) = 1
)
SELECT
l.block_number,
l._log_id,
A.abi AS abi,
OBJECT_CONSTRUCT(
'topics',
l.topics,
'data',
l.data,
'address',
l.contract_address
) AS DATA
FROM
{{ ref("silver__logs") }}
l
INNER JOIN {{ ref("silver__complete_event_abis") }} A
ON A.parent_contract_address = l.contract_address
AND A.event_signature = l.topics[0]:: STRING
AND l.block_number BETWEEN A.start_block
AND A.end_block
WHERE
(
l.block_number BETWEEN {{ start }}
AND {{ stop }}
)
AND l.block_number <= (
SELECT
block_number
FROM
look_back
)
AND _log_id NOT IN (
SELECT
_log_id
FROM
{{ ref("streamline__complete_decode_logs") }}
WHERE
(
block_number BETWEEN {{ start }}
AND {{ stop }}
)
AND block_number <= (
SELECT
block_number
FROM
look_back
)
)
{% endmacro %}
{% macro streamline_external_table_query(
model,
partition_function,
partition_name,
unique_key,
other_cols
) %}
WITH meta AS (
SELECT
job_created_time AS _inserted_timestamp,
file_name,
{{ partition_function }} AS {{ partition_name }}
FROM
TABLE(
information_schema.external_table_file_registration_history(
start_time => DATEADD('day', -7, CURRENT_TIMESTAMP()),
table_name => '{{ source( "bronze_streamline", model) }}')
) A
)
SELECT
{{ unique_key }},
{{ other_cols }},
DATA,
_inserted_timestamp,
s.{{ partition_name }},
s.value AS VALUE
FROM
{{ source(
"bronze_streamline",
model
) }}
s
JOIN meta b
ON b.file_name = metadata$filename
AND b.{{ partition_name }} = s.{{ partition_name }}
WHERE
b.{{ partition_name }} = s.{{ partition_name }}
AND DATA :error :code IS NULL
{% endmacro %}
{% macro streamline_external_table_FR_query(
model,
partition_function,
partition_name,
unique_key,
other_cols
) %}
WITH meta AS (
SELECT
registered_on AS _inserted_timestamp,
file_name,
{{ partition_function }} AS {{ partition_name }}
FROM
TABLE(
information_schema.external_table_files(
table_name => '{{ source( "bronze_streamline", model) }}'
)
) A
)
SELECT
{{ unique_key }},
{{ other_cols }},
DATA,
_inserted_timestamp,
s.{{ partition_name }},
s.value AS VALUE
FROM
{{ source(
"bronze_streamline",
model
) }}
s
JOIN meta b
ON b.file_name = metadata$filename
AND b.{{ partition_name }} = s.{{ partition_name }}
WHERE
b.{{ partition_name }} = s.{{ partition_name }}
AND DATA :error :code IS NULL
{% endmacro %}

78
macros/utils.sql Normal file
View File

@ -0,0 +1,78 @@
{% macro if_data_call_function(
func,
target
) %}
{% if var(
"STREAMLINE_INVOKE_STREAMS"
) %}
{% if execute %}
{{ log(
"Running macro `if_data_call_function`: Calling udf " ~ func ~ " on " ~ target,
True
) }}
{% endif %}
SELECT
{{ func }}
WHERE
EXISTS(
SELECT
1
FROM
{{ target }}
LIMIT
1
)
{% else %}
{% if execute %}
{{ log(
"Running macro `if_data_call_function`: NOOP",
False
) }}
{% endif %}
SELECT
NULL
{% endif %}
{% endmacro %}
{% macro if_data_call_wait() %}
{% if var(
"STREAMLINE_INVOKE_STREAMS"
) %}
{% set query %}
SELECT
1
WHERE
EXISTS(
SELECT
1
FROM
{{ model.schema ~ "." ~ model.alias }}
LIMIT
1
) {% endset %}
{% if execute %}
{% set results = run_query(
query
) %}
{% if results %}
{{ log(
"Waiting...",
info = True
) }}
{% set wait_query %}
SELECT
system$wait(
{{ var(
"WAIT",
600
) }}
) {% endset %}
{% do run_query(wait_query) %}
{% else %}
SELECT
NULL;
{% endif %}
{% endif %}
{% endif %}
{% endmacro %}

View File

@ -0,0 +1,12 @@
{{ config (
materialized = 'view'
) }}
{% set model = this.identifier.split("_") [-2] + '_' + this.identifier.split("_") [-1] %}
{{ streamline_external_table_FR_query(
model,
partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 4), '_', 1) AS INTEGER )",
partition_name = "_partition_by_block_id",
unique_key = "block_id",
other_cols = "tx_id,index,program_id"
) }}

View File

@ -0,0 +1,12 @@
{{ config (
materialized = 'view'
) }}
{% set model = this.identifier.split("_") [-2] + '_' + this.identifier.split("_") [-1] %}
{{ streamline_external_table_query(
model,
partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 4), '_', 1) AS INTEGER )",
partition_name = "_partition_by_block_id",
unique_key = "block_id",
other_cols = "tx_id,index,program_id"
) }}

View File

@ -50,6 +50,7 @@ sources:
- name: validators_app_list_api
- name: stake_program_accounts
- name: validator_vote_program_accounts
- name: program_parser
- name: bronze_api
schema: bronze_api
tables:

View File

@ -1,8 +1,9 @@
{{ config(
materialized = 'view',
full_refresh = false
{{ config (
materialized = "incremental",
unique_key = "CONCAT_WS('-', tx_id, INDEX)",
cluster_by = "ROUND(block_id, -3)"
) }}
-- post_hook = 'call silver.sp_bulk_decode_instructions()',
WITH idl_in_play AS (
SELECT
@ -21,6 +22,7 @@ instr_in_play AS (
tx_id,
INDEX,
instruction,
block_id,
block_timestamp
FROM
{{ ref('silver__events') }} A
@ -34,6 +36,7 @@ SELECT
p.tx_id,
p.index,
p.instruction,
p.block_id,
p.block_timestamp
FROM
instr_in_play p

View File

@ -0,0 +1,68 @@
{{ config (
materialized = "view",
post_hook = if_data_call_function(
func = "{{this.schema}}.udf_bulk_program_parser()",
target = "{{this.schema}}.{{this.identifier}}"
)
) }}
WITH last_3_days AS ({% if var('STREAMLINE_RUN_HISTORY') %}
SELECT
0 AS block_id
{% else %}
SELECT
MAX(block_id) - 100000 AS block_id --aprox 3 days
FROM
{{ ref("streamline__all_undecoded_instructions") }}
{% endif %}),
tbl AS (
SELECT
program_id,
tx_id,
INDEX,
instruction,
block_id,
block_timestamp
FROM
{{ ref("streamline__all_undecoded_instructions") }}
WHERE
(
block_id >= (
SELECT
block_id
FROM
last_3_days
)
)
AND block_id IS NOT NULL
AND concat_ws(
'-',
block_id,
program_id,
INDEX
) NOT IN (
SELECT
id
FROM
{{ ref("streamline__complete_decoded_instructions") }}
WHERE
block_id >= (
SELECT
block_id
FROM
last_3_days
)
AND block_id IS NOT NULL
)
)
SELECT
program_id,
tx_id,
INDEX,
instruction,
block_id,
block_timestamp
FROM
tbl
WHERE program_id = (SELECT MAX(program_id) AS program_id FROM tbl)

View File

@ -0,0 +1,29 @@
-- depends_on: {{ ref('bronze__streamline_program_parser') }}
{{ config (
materialized = "incremental",
unique_key = "id",
cluster_by = "ROUND(block_id, -3)",
post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION on equality(id)"
) }}
SELECT
block_id,
CONCAT_WS('-', block_id, data[1]:program::STRING, data[0]) AS id,
_inserted_timestamp
FROM
{% if is_incremental() %}
{{ ref('bronze__streamline_program_parser') }}
WHERE
_inserted_timestamp >= (
SELECT
MAX(_inserted_timestamp) _inserted_timestamp
FROM
{{ this }}
)
{% else %}
{{ ref('bronze__streamline_FR_program_parser') }}
{% endif %}
qualify(ROW_NUMBER() over (PARTITION BY id
ORDER BY
_inserted_timestamp DESC)) = 1