Ephemeral deploy LT UDTF Success \0/ | WIP: Bronze override

This commit is contained in:
shah 2025-04-09 16:10:34 -07:00
parent 9374bf5ddb
commit 1b5e1519d0
8 changed files with 321 additions and 9 deletions

View File

@ -11,4 +11,20 @@ decoder_poc:
-m 1+models/streamline/poc/decoder/streamline__decoded_input_events.sql \
--profile near \
--target dev \
--profiles-dir ~/.dbt
--profiles-dir ~/.dbt
rm_logs:
@if [ -d logs ]; then \
rm -r logs 2>/dev/null || echo "Warning: Could not remove logs directory"; \
else \
echo "Logs directory does not exist"; \
fi
# deploy live table udtf
deploy_near_mainnet_lt: rm_logs
dbt run \
-s near_models.deploy.near.near__mainnet \
--vars '{UPDATE_UDFS_AND_SPS: true, ENABLE_LIVE_TABLE_QUERY: true}' \
--profiles-dir ~/.dbt \
--profile near \
--target dev

View File

@ -46,6 +46,8 @@ models:
+on_schema_change: "append_new_columns"
near_models:
+pre-hook: '{{ fsc_utils.set_query_tag() }}'
deploy:
+materialized: ephemeral
livequery_models:
deploy:
core:
@ -81,6 +83,10 @@ vars:
EXTERNAL_FUNCTION_URI: '{{ var("config")[target.name]["EXTERNAL_FUNCTION_URI"] }}'
API_AWS_ROLE_ARN: '{{ var("config")[target.name]["API_AWS_ROLE_ARN"] if var("config")[target.name] else var("config")["dev"]["API_AWS_ROLE_ARN"] }}'
ROLES: '{{ var("config")[target.name]["ROLES"] }}'
# Livetable config
DROP_UDFS_AND_SPS: false
config:
# The keys correspond to dbt profiles and are case sensitive
dev:

View File

@ -0,0 +1,29 @@
{% macro config_near_high_level_abstractions(blockchain, network) -%}
{#
This macro is used to generate the high level abstractions for the Near
blockchain.
#}
{% set schema = blockchain ~ "_" ~ network %}
- name: {{ schema }}.udf_get_latest_block_height
signature: []
return_type: INTEGER
sql: |
{{ near_live_table_latest_block_height() | indent(4) -}}
- name: {{ schema -}}.tf_fact_blocks
signature:
- [_block_height, INTEGER, The start block height to get the blocks from]
- [row_count, INTEGER, The number of rows to fetch]
return_type:
- "TABLE(block_id NUMBER, block_timestamp TIMESTAMP_NTZ, block_hash STRING, block_author STRING, header OBJECT, block_challenges_result ARRAY, block_challenges_root STRING, chunk_headers_root STRING, chunk_tx_root STRING, chunk_mask ARRAY, chunk_receipts_root STRING, chunks ARRAY, chunks_included NUMBER, epoch_id STRING, epoch_sync_data_hash STRING, gas_price FLOAT, last_ds_final_block STRING, last_final_block STRING, latest_protocol_version INT, next_bp_hash STRING, next_epoch_id STRING, outcome_root STRING, prev_hash STRING, prev_height NUMBER, prev_state_root STRING, random_value STRING, rent_paid FLOAT, signature STRING, total_supply FLOAT, validator_proposals ARRAY, validator_reward FLOAT, fact_blocks_id STRING, inserted_timestamp TIMESTAMP_NTZ, modified_timestamp TIMESTAMP_NTZ)"
options: |
NOT NULL
RETURNS NULL ON NULL INPUT
VOLATILE
COMMENT = $$Returns the block data for a given block height. If to_latest is true, it will continue fetching blocks until the latest block. Otherwise, it will fetch blocks until the block_id height is reached.$$
sql: |
{{ near_live_table_fact_blocks(schema, blockchain, network) | indent(4) -}}
{%- endmacro -%}

View File

@ -0,0 +1,160 @@
-- Get Near Chain Head
{% macro near_live_table_latest_block_height() %}
WITH rpc_call AS (
SELECT
live.udf_api(
'https://rpc.mainnet.near.org',
utils.udf_json_rpc_call('block', {'finality' : 'final'})
):data::object AS rpc_result
FROM dual
ORDER BY 1
LIMIT 1
)
SELECT
rpc_result:result:header:height::INTEGER AS latest_block_height
FROM
rpc_call
{% endmacro %}
{% macro near_live_table_min_max_block_height(start_block, block_count) %}
SELECT
{{ start_block }} AS min_height,
min_height + {{ block_count }} AS max_height,
FROM
dual
{% endmacro %}
-- Get Near Block Data
{% macro near_live_table_target_blocks(start_block, block_count) %}
WITH heights AS (
SELECT
min_height,
max_height,
FROM (
{{- near_live_table_min_max_block_height(start_block=start_block, block_count=block_count) | indent(13) -}}
)
),
block_spine AS (
SELECT
ROW_NUMBER() OVER (
ORDER BY
NULL
) - 1 + h.min_height::integer AS block_number,
FROM
heights h,
TABLE(generator(ROWCOUNT => {{ block_count }} ))
qualify block_number BETWEEN h.min_height AND h.max_height
)
SELECT
block_number as block_height
FROM block_spine
{% endmacro %}
{% macro near_live_table_get_spine(table_name) %}
SELECT
block_height,
ROW_NUMBER() OVER (ORDER BY block_height) - 1 as partition_num
FROM
(
SELECT
row_number() over (order by seq4()) - 1 + COALESCE(block_id, 0)::integer as block_height,
min_height,
max_height
FROM
TABLE(generator(ROWCOUNT => IFF(
COALESCE(to_latest, false),
latest_block_height - min_height + 1,
1
))),
{{ table_name }}
qualify block_height BETWEEN min_height AND max_height
)
{% endmacro %}
{% macro near_live_table_get_raw_block_data(spine) %}
SELECT
block_height,
DATE_PART('EPOCH', SYSDATE()) :: INTEGER AS request_timestamp,
_live.lt_near_tx_udf_api(
'POST',
'{Service}',
{'Content-Type' : 'application/json'},
{
'jsonrpc' : '2.0',
'method' : 'block',
'id' : 'Flipside/getBlock/' || request_timestamp || '/' || block_height :: STRING,
'params':{'block_id': block_height}
},
_utils.UDF_WHOAMI(),
'Vault/prod/near/quicknode/mainnet'
):data.result AS rpc_data_result
from
{{spine}}
{% endmacro %}
{% macro near_live_table_extract_raw_block_data(raw_blocks) %}
SELECT
block_data:header:height::string as block_id,
TO_TIMESTAMP_NTZ(
block_data :header :timestamp :: STRING
) AS block_timestamp,
block_data:header:hash::STRING as block_hash,
ARRAY_SIZE(block_data:chunks)::NUMBER as tx_count,
block_data:header as header,
block_data:header:challenges_result::ARRAY as block_challenges_result,
block_data:header:challenges_root::STRING as block_challenges_root,
block_data:header:chunk_headers_root::STRING as chunk_headers_root,
block_data:header:chunk_tx_root::STRING as chunk_tx_root,
block_data:header:chunk_mask::ARRAY as chunk_mask,
block_data:header:chunk_receipts_root::STRING as chunk_receipts_root,
block_data:chunks as chunks,
block_data:header:chunks_included::NUMBER as chunks_included,
block_data:header:epoch_id::STRING as epoch_id,
block_data:header:epoch_sync_data_hash::STRING as epoch_sync_data_hash,
block_data:events as events,
block_data:header:gas_price::NUMBER as gas_price,
block_data:header:last_ds_final_block::STRING as last_ds_final_block,
block_data:header:last_final_block::STRING as last_final_block,
block_data:header:latest_protocol_version::NUMBER as latest_protocol_version,
block_data:header:next_bp_hash::STRING as next_bp_hash,
block_data:header:next_epoch_id::STRING as next_epoch_id,
block_data:header:outcome_root::STRING as outcome_root,
block_data:header:prev_hash::STRING as prev_hash,
block_data:header:prev_height::NUMBER as prev_height,
block_data:header:prev_state_root::STRING as prev_state_root,
block_data:header:random_value::STRING as random_value,
block_data:header:rent_paid::FLOAT as rent_paid,
block_data:header:signature::STRING as signature,
block_data:header:total_supply::NUMBER as total_supply,
block_data:header:validator_proposals as validator_proposals,
block_data:header:validator_reward::NUMBER as validator_reward,
MD5(block_data:header:height::STRING) as fact_blocks_id,
SYSDATE() as inserted_timestamp,
SYSDATE() as modified_timestamp
FROM {{raw_blocks}}
{% endmacro %}
{% macro near_live_table_fact_blocks(schema, blockchain, network) %}
{%- set near_live_table_fact_blocks = get_rendered_model('near_models', 'core__fact_blocks', schema, blockchain, network) -%}
{{ near_live_table_fact_blocks }}
{% endmacro %}
{% macro near_live_table_fact_transactions(schema, blockchain, network) %}
{%- set near_live_table_fact_transactions = get_rendered_model('livequery_models', 'near_fact_transactions', schema, blockchain, network) -%}
{{ near_live_table_fact_transactions }}
{% endmacro %}
{% macro near_live_table_fact_receipts(schema, blockchain, network) %}
{%- set near_live_table_fact_receipts = get_rendered_model('livequery_models', 'near_fact_receipts', schema, blockchain, network) -%}
{{ near_live_table_fact_receipts }}
{% endmacro %}
{% macro near_live_table_ez_actions(schema, blockchain, network) %}
{%- set near_live_table_ez_actions = get_rendered_model('livequery_models', 'near_ez_actions', schema, blockchain, network) -%}
{{ near_live_table_ez_actions }}
{% endmacro %}

View File

@ -0,0 +1,67 @@
{% macro get_rendered_model(package_name, model_name, schema, blockchain, network) %}
{#
This macro retrieves and renders a specified model from the graph.
Args:
package_name (str): The name of the package containing the model.
model_name (str): The name of the model to be rendered.
schema (str): The schema to be used.
blockchain (str): The blockchain to be used.
network (str): The network to be used.
Returns:
str: The rendered SQL of the specified model.
#}
{% if execute %}
{{ log("=== Starting get_rendered_model ===", info=True) }}
{# Use a list to store the node to avoid scope issues #}
{%- set nodes = [] -%}
{{ log("Looking for node: " ~ package_name ~ "." ~ model_name, info=True) }}
{%- for node in graph.nodes.values() -%}
{%- if node.package_name == package_name and node.name == model_name -%}
{{ log("Found target node: " ~ node.unique_id, info=True) }}
{%- do nodes.append(node) -%}
{%- endif -%}
{%- endfor -%}
{%- if nodes | length == 0 -%}
{{ log("No target node found!", info=True) }}
{{ return('') }}
{%- endif -%}
{%- set target_node = nodes[0] -%}
{{ log("Processing node: " ~ target_node.unique_id, info=True) }}
{{ log("Dependencies:\n\t\t" ~ (target_node.depends_on.nodes | pprint).replace("\n", "\n\t\t"), info=True) }}
{# First render all dependency CTEs #}
{%- set ctes = [] -%}
{%- for dep_id in target_node.depends_on.nodes -%}
{{ log("Processing dependency: " ~ dep_id, info=True) }}
{%- set dep_node = graph.nodes[dep_id] -%}
{%- set rendered_sql = render(dep_node.raw_code) | trim -%}
{%- if rendered_sql -%}
{%- set cte_sql -%}
__dbt__cte__{{ dep_node.name }} AS (
{{ rendered_sql }}
)
{%- endset -%}
{%- do ctes.append(cte_sql) -%}
{%- endif -%}
{%- endfor -%}
{{ log("Number of CTEs generated: " ~ ctes | length, info=True) }}
{# Combine CTEs with main query #}
{%- set final_sql -%}
WITH {{ ctes | join(',\n\n') }}
{{ render(target_node.raw_code) }}
{%- endset -%}
{{ log("=== End get_rendered_model ===\n\n" , info=True) }}
{{ return(final_sql) }}
{% endif %}
{% endmacro %}

View File

@ -3,7 +3,15 @@
tags = ['streamline_helper']
) }}
{{ streamline_external_table_FR_query_v2(
model = "blocks_v2",
partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 3), '_', 1) AS INTEGER )"
) }}
{% if var('ENABLE_LIVE_TABLE_QUERY', false) %}
-- LIVE LOGIC: Call RPCs to populate live table
SELECT 13
{% else %}
-- BATCH LOGIC: Default
{{ streamline_external_table_FR_query_v2(
model = "blocks_v2",
partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 3), '_', 1) AS INTEGER )")
}}
{% endif %}

View File

@ -4,7 +4,14 @@
) }}
{{ streamline_external_table_query_v2(
model = "blocks_v2",
partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 3), '_', 1) AS INTEGER )"
) }}
{% if var('ENABLE_LIVE_TABLE_QUERY', false) %}
-- LIVE LOGIC: Call RPCs to populate live table
SELECT 1
{% else %}
-- BATCH LOGIC: Default
{{ streamline_external_table_query_v2(
model = "blocks_v2",
partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 3), '_', 1) AS INTEGER )"
) }}
{% endif %}

View File

@ -0,0 +1,19 @@
-- depends_on: {{ ref('near_models','core__fact_blocks') }}
-- depends_on: {{ ref('near_models','silver__blocks_final') }}
-- depends_on: {{ ref('near_models','silver__blocks_v2') }}
-- depends_on: {{ ref('near_models', 'bronze__blocks') }}
-- depends_on: {{ ref('near_models', 'bronze__FR_blocks') }}
-- depends_on: {{ ref('near_models', 'core__fact_transactions') }}
-- depends_on: {{ ref('near_models', 'silver__transactions_final') }}
-- depends_on: {{ ref('near_models', 'silver__transactions_v2') }}
-- depends_on: {{ ref('near_models', 'bronze__transactions') }}
-- depends_on: {{ ref('near_models', 'bronze__FR_transactions') }}
-- depends_on: {{ ref('near_models','core__fact_receipts') }}
-- depends_on: {{ ref('near_models','silver__receipts_final') }}
-- depends_on: {{ ref('near_models', 'core__ez_actions') }}
{%- set configs = [
config_near_high_level_abstractions
] -%}
{{- livequery_base.ephemeral_deploy(configs) -}}