diff --git a/Makefile b/Makefile index 2a695b8..4796b16 100644 --- a/Makefile +++ b/Makefile @@ -3,4 +3,12 @@ SHELL := /bin/bash dbt-console: docker-compose run dbt_console -.PHONY: dbt-console \ No newline at end of file +.PHONY: dbt-console + +decoder_poc: + dbt run \ + --vars '{"STREAMLINE_INVOKE_STREAMS":True, "STREAMLINE_USE_DEV_FOR_EXTERNAL_TABLES": True}' \ + -m 1+models/streamline/poc/decoder/bronze__streamline_decoded_input_events.sql \ + --profile near \ + --target dev \ + --profiles-dir ~/.dbt \ No newline at end of file diff --git a/macros/streamline_udfs.sql b/macros/streamline_udfs.sql index 913b69f..4128343 100644 --- a/macros/streamline_udfs.sql +++ b/macros/streamline_udfs.sql @@ -60,3 +60,25 @@ 'https://jfqhk99kj1.execute-api.us-east-1.amazonaws.com/stg/s3/copy_objects' {%- endif %}; {% endmacro %} + +{% macro create_udf_decode_withrawal_event() %} + CREATE + OR REPLACE EXTERNAL FUNCTION streamline.udf_decode_withrawal_event( + DATA STRING + ) returns ARRAY api_integration = aws_near_api_stg_v2 AS {% if target.name == "prod" %} + '' + {% else %} + 'https://cx7cyhtcjf.execute-api.us-east-1.amazonaws.com/stg/decode_withdrawal_event' + {%- endif %}; +{% endmacro %} + +{% macro create_udf_bulk_decode_near_events() %} + CREATE + OR REPLACE EXTERNAL FUNCTION streamline.udf_bulk_decode_near_events( + json OBJECT + ) returns ARRAY api_integration = aws_near_api_stg_v2 AS {% if target.name == "prod" %} + '' + {% else %} + 'https://cx7cyhtcjf.execute-api.us-east-1.amazonaws.com/stg/bulk_decode_near_events' + {%- endif %}; +{% endmacro %} \ No newline at end of file diff --git a/models/streamline/poc/decoder/bronze__streamline_decoded_input_events.sql b/models/streamline/poc/decoder/bronze__streamline_decoded_input_events.sql new file mode 100644 index 0000000..67e5f14 --- /dev/null +++ b/models/streamline/poc/decoder/bronze__streamline_decoded_input_events.sql @@ -0,0 +1,24 @@ +{{ config ( + materialized = "view", + post_hook = fsc_utils.if_data_call_function_v2( + func = 'streamline.udf_bulk_decode_near_events', + target = "{{this.schema}}.{{this.identifier}}", + params ={ "external_table" :"DECODED_INPUT_EVENTS", + "sql_limit" :"500", + "producer_batch_size" :"50", + "worker_batch_size" :"10", + "sql_source" :"{{this.identifier}}" } + ), +) }} + +SELECT + block_id, + tx_hash, + receipt_actions:receipt:Action:actions[0]:FunctionCall:args::string as encoded_event, + '{"recipient_address": "Bytes(20)", "amount": "U128"}' as event_struct +FROM near.silver.streamline_receipts_final +WHERE block_timestamp >= sysdate() - INTERVAL '2 weeks' +AND signer_id = 'relay.aurora' +AND object_keys(receipt_actions:receipt:Action:actions[0])[0] = 'FunctionCall' +AND receipt_actions:receipt:Action:actions[0]:FunctionCall:method_name::STRING = 'withdraw' +LIMIT 100 \ No newline at end of file diff --git a/models/streamline/poc/decoder/bronze__streamline_decoded_output_events.sql b/models/streamline/poc/decoder/bronze__streamline_decoded_output_events.sql new file mode 100644 index 0000000..bb0adc8 --- /dev/null +++ b/models/streamline/poc/decoder/bronze__streamline_decoded_output_events.sql @@ -0,0 +1,24 @@ +{{ config ( + materialized = "view", + post_hook = fsc_utils.if_data_call_function_v2( + func = 'streamline.udf_bulk_decode_near_events', + target = "{{this.schema}}.{{this.identifier}}", + params ={ "external_table" :"DECODED_INPUT_EVENTS", + "sql_limit" :"500", + "producer_batch_size" :"50", + "worker_batch_size" :"10", + "sql_source" :"{{this.identifier}}" } + ), +) }} + +SELECT + block_id, + tx_hash, + status_value:SuccessValue :: STRING as encoded_event, + '{"amount": "U128","recipient_id": "Bytes(20)","eth_custodian_address": "Bytes(20)"}' as event_struct +FROM near.silver.streamline_receipts_final +WHERE block_timestamp >= sysdate() - INTERVAL '2 weeks' +AND signer_id = 'relay.aurora' +AND object_keys(receipt_actions:receipt:Action:actions[0])[0] = 'FunctionCall' +AND receipt_actions:receipt:Action:actions[0]:FunctionCall:method_name::STRING = 'withdraw' +LIMIT 100 \ No newline at end of file