add streamline bulk_decode udf poc

This commit is contained in:
shah 2024-09-18 21:39:43 -07:00
parent d532f6bcc7
commit e968601363
4 changed files with 79 additions and 1 deletions

View File

@ -3,4 +3,12 @@ SHELL := /bin/bash
dbt-console:
docker-compose run dbt_console
.PHONY: dbt-console
.PHONY: dbt-console
decoder_poc:
dbt run \
--vars '{"STREAMLINE_INVOKE_STREAMS":True, "STREAMLINE_USE_DEV_FOR_EXTERNAL_TABLES": True}' \
-m 1+models/streamline/poc/decoder/bronze__streamline_decoded_input_events.sql \
--profile near \
--target dev \
--profiles-dir ~/.dbt

View File

@ -60,3 +60,25 @@
'https://jfqhk99kj1.execute-api.us-east-1.amazonaws.com/stg/s3/copy_objects'
{%- endif %};
{% endmacro %}
{% macro create_udf_decode_withrawal_event() %}
CREATE
OR REPLACE EXTERNAL FUNCTION streamline.udf_decode_withrawal_event(
DATA STRING
) returns ARRAY api_integration = aws_near_api_stg_v2 AS {% if target.name == "prod" %}
''
{% else %}
'https://cx7cyhtcjf.execute-api.us-east-1.amazonaws.com/stg/decode_withdrawal_event'
{%- endif %};
{% endmacro %}
{% macro create_udf_bulk_decode_near_events() %}
CREATE
OR REPLACE EXTERNAL FUNCTION streamline.udf_bulk_decode_near_events(
json OBJECT
) returns ARRAY api_integration = aws_near_api_stg_v2 AS {% if target.name == "prod" %}
''
{% else %}
'https://cx7cyhtcjf.execute-api.us-east-1.amazonaws.com/stg/bulk_decode_near_events'
{%- endif %};
{% endmacro %}

View File

@ -0,0 +1,24 @@
{{ config (
materialized = "view",
post_hook = fsc_utils.if_data_call_function_v2(
func = 'streamline.udf_bulk_decode_near_events',
target = "{{this.schema}}.{{this.identifier}}",
params ={ "external_table" :"DECODED_INPUT_EVENTS",
"sql_limit" :"500",
"producer_batch_size" :"50",
"worker_batch_size" :"10",
"sql_source" :"{{this.identifier}}" }
),
) }}
SELECT
block_id,
tx_hash,
receipt_actions:receipt:Action:actions[0]:FunctionCall:args::string as encoded_event,
'{"recipient_address": "Bytes(20)", "amount": "U128"}' as event_struct
FROM near.silver.streamline_receipts_final
WHERE block_timestamp >= sysdate() - INTERVAL '2 weeks'
AND signer_id = 'relay.aurora'
AND object_keys(receipt_actions:receipt:Action:actions[0])[0] = 'FunctionCall'
AND receipt_actions:receipt:Action:actions[0]:FunctionCall:method_name::STRING = 'withdraw'
LIMIT 100

View File

@ -0,0 +1,24 @@
{{ config (
materialized = "view",
post_hook = fsc_utils.if_data_call_function_v2(
func = 'streamline.udf_bulk_decode_near_events',
target = "{{this.schema}}.{{this.identifier}}",
params ={ "external_table" :"DECODED_INPUT_EVENTS",
"sql_limit" :"500",
"producer_batch_size" :"50",
"worker_batch_size" :"10",
"sql_source" :"{{this.identifier}}" }
),
) }}
SELECT
block_id,
tx_hash,
status_value:SuccessValue :: STRING as encoded_event,
'{"amount": "U128","recipient_id": "Bytes(20)","eth_custodian_address": "Bytes(20)"}' as event_struct
FROM near.silver.streamline_receipts_final
WHERE block_timestamp >= sysdate() - INTERVAL '2 weeks'
AND signer_id = 'relay.aurora'
AND object_keys(receipt_actions:receipt:Action:actions[0])[0] = 'FunctionCall'
AND receipt_actions:receipt:Action:actions[0]:FunctionCall:method_name::STRING = 'withdraw'
LIMIT 100