[STREAM-1055] add demo plotcard app to show latest block number with the udtf

This commit is contained in:
Jensen Yap 2024-10-12 01:07:27 +09:00 committed by Julius Remigio
parent 7fe5cad7d7
commit 99412f94c6
9 changed files with 346 additions and 2 deletions

6
.gitignore vendored
View File

@ -18,4 +18,8 @@ dbt-env/
.env
.*
# KEEP
!.github/
!.github/
# Ignore Python bytecode files
*.pyc
__pycache__/

45
apps/README.md Normal file
View File

@ -0,0 +1,45 @@
## Development
1. cd your directory
```bash
cd apps
```
2. Setup a virtual environment
```bash
conda create -n "livequery-models" python=3.9
```
3. Activate the virtual environment
```bash
conda activate livequery-models
```
4. Create a `.env` file from the env.sample file and run,
```base
set -a; source .env; set +a
```
5. Install
```bash
pip install -r ../requirements.txt
```
6. Add PYTHONPATH
```bash
export PYTHONPATH=.
```
7. Run the app
```bash
wave run app.py
```
7. Visit the demo at [http://localhost:10101/demo](http://127.0.0.1:10101/demo)

0
apps/__init__.py Normal file
View File

111
apps/app.py Normal file
View File

@ -0,0 +1,111 @@
import time
from h2o_wave import app, data, ui, Q, main
from dashboards.realtime_monitoring import execute_query
import logging
logging.basicConfig(level=logging.INFO)
light_theme_colors = "$red $pink $purple $violet $indigo $blue $azure $cyan $teal $mint $green $amber $orange $tangerine".split() # noqa: E501
dark_theme_colors = "$red $pink $blue $azure $cyan $teal $mint $green $lime $yellow $amber $orange $tangerine".split()
_color_index = -1
colors = dark_theme_colors
def next_color():
global _color_index
_color_index += 1
return colors[_color_index % len(colors)]
_curve_index = -1
curves = "linear smooth step step-after step-before".split()
def next_curve():
global _curve_index
_curve_index += 1
return curves[_curve_index % len(curves)]
async def update_realtime_dataset(q):
async for data_table in execute_query():
try:
latest_block_number = max(
row["BLOCK_NUMBER"] for row in data_table.to_pylist()
)
q.page["block_number_card"].value = str(latest_block_number)
except Exception as e:
print(e)
# try:
# import pyarrow as pa
# import pyarrow.compute as pc
# # Extract block_timestamp column
# block_timestamps = data_table.column("BLOCK_TIMESTAMP")
# # Convert timestamps to minutes
# block_timestamps_minute = pc.strftime(block_timestamps, format="%Y-%m-%d %H:%M")
# # Group by minute and count
# table = pa.table([block_timestamps_minute], names=["minute"])
# grouped_data = table.group_by("minute").aggregate([("minute", "count")])
# # Convert grouped data to list and assign it back
# existing_data = list(q.page["line_plot"].data) # Convert Ref to list
# new_data = existing_data + [[row["minute"], row["minute_count"]] for row in grouped_data.to_pylist()] # Append new entries
# q.page["line_plot"].data = new_data # Assign back to Ref
# except Exception as e:
# print(e)
# try:
# rows = [
# ui.table_row(
# name=str(i),
# cells=[current_time, str(len(row))]
# )
# for i, row in enumerate(data_table.to_pylist())
# ]
# q.page["data_table"].rows = rows
# except Exception as e:
# print(e)
await q.page.save()
@app("/demo", mode='broadcast')
async def create_dashboard(q: Q):
q.page.drop()
line_plot = ui.plot_card(
box="1 3 4 4",
title="Token Transfers Data Size Over Time",
data=data("time length", -100),
plot=ui.plot(
[ui.mark(type="line", x="=time", y="=length", curve="smooth")]
),
)
block_number_card = ui.small_stat_card(box="1 1 2 2", title="Latest Block Number", value="0")
# data_table = ui.table(
# name='data_table',
# columns=[
# ui.table_column(name='time', label='Time', sortable=True),
# ui.table_column(name='length', label='Data Length', sortable=True),
# ],
# rows=[],
# pagination=ui.table_pagination(total_rows=100, rows_per_page=100)
# )
# q.page["line_plot"] = line_plot
q.page["block_number_card"] = block_number_card
# q.page["data_table"] = data_table
await q.page.save()
try:
await update_realtime_dataset(q)
finally:
await q.close()

View File

View File

@ -0,0 +1,139 @@
from h2o_wave import ui, data, Q
import pyarrow as pa
from pyarrow import compute as pc
import asyncio
import time
from concurrent.futures import ThreadPoolExecutor
import snowflake.connector
from snowflake.connector import ProgrammingError
import os
import yaml
async def pull_data_from_snowflake(query: str):
with snowflake.connector.connect(
user=os.getenv("SNOWFLAKE_USER"),
password=os.getenv("SNOWFLAKE_PASSWORD"),
account=os.getenv("SNOWFLAKE_ACCOUNT"),
database=os.getenv("SNOWFLAKE_DATABASE"),
schema=os.getenv("SNOWFLAKE_SCHEMA"),
warehouse=os.getenv("SNOWFLAKE_WAREHOUSE"),
role=os.getenv("SNOWFLAKE_ROLE"),
session_parameters={
"QUERY_TAG": "realtime_monitoring",
},
) as conn:
cur = conn.cursor()
try:
cur.execute_async(query)
query_id = cur.sfqid
try:
while conn.is_still_running(conn.get_query_status_throw_if_error(query_id)):
time.sleep(1)
except ProgrammingError as err:
print('Programming Error: {0}'.format(err))
results = []
cur.get_results_from_sfqid(query_id)
#TODO: work with PyArrow and yield in batches to avoid OOM
for row in cur.fetch_arrow_batches():
results.extend(row.to_pylist())
return results
finally:
cur.close()
QUERY = """
SELECT *
FROM TABLE({SCHEMA}.{TABLE_FUNCTION_NAME}(
{table_function_args}
));
"""
def load_table_list_from_yaml(yaml_file):
with open(yaml_file, 'r') as file:
data = yaml.safe_load(file)
return data
# table_lists = load_table_list_from_yaml('apps/table_list.yaml')
async def fetch_data_for_schema_table(schema, table, args):
latest_block_height = 0
query = QUERY.format(
DATABASE=os.getenv("SNOWFLAKE_DATABASE"),
SCHEMA=schema,
TABLE_FUNCTION_NAME=table,
table_function_args=args,
)
results = await pull_data_from_snowflake(query)
if results:
results_table = pa.Table.from_pylist(results)
latest_block_height = pc.max(results_table.column('BLOCK_NUMBER')).as_py()
return (latest_block_height, results_table)
# async def gather_tasks_and_parse_args(q: Q):
# """
# Gather tasks and parse args for each table and network.
# """
# table_lists = load_table_list_from_yaml('apps/table_list.yaml')
# tables = table_lists['tables']
# loop = asyncio.get_event_loop()
# with ThreadPoolExecutor() as executor:
# tasks = []
# for table in tables:
# blockchain = table['blockchain']
# for network in table['networks']:
# for network_name, network_data in network.items():
# schema = f"{blockchain}_{network_name}"
# for table_data in network_data['tables']:
# table_name = table_data['name']
# args = {**network_data['default_args'], **table_data.get('extra_args', {})}
# formatted_args = ', '.join(f"{key}={arg['value']}::{arg['type']}" for key, arg in args.items())
# tasks.append(
# loop.run_in_executor(
# executor, fetch_data_for_schema_table, q, schema, table_name, formatted_args
# )
# )
# await asyncio.gather(*tasks)
async def execute_query() -> pa.Table:
table_lists = {
"schema": "ethereum_mainnet",
"table": "tf_ez_token_transfers",
"args": [
{
"block_height": "NULL::INTEGER",
"to_latest": "TRUE::BOOLEAN",
"ez_token_transfers_id": "'0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef'::STRING"
}
]
}
initial_args = ', '.join(f"{value}" for arg in table_lists['args'] for _, value in arg.items())
latest_block_height, data_table = await fetch_data_for_schema_table(table_lists['schema'], table_lists['table'], initial_args)
# Forever loop to fetch new data
while True:
table_lists['args'][0]['block_height'] = f"{latest_block_height}::INTEGER"
updated_args = ', '.join(f"{value}" for arg in table_lists['args'] for _, value in arg.items())
latest_block_height, data_table = await fetch_data_for_schema_table(table_lists['schema'], table_lists['table'], updated_args)
yield data_table
if __name__ == "__main__":
# data = asyncio.run(execute_query())
import logging
logging.basicConfig(level=logging.INFO)
async def log_data():
async for data_table in execute_query():
data_array = data_table.to_pylist()
logging.info(data_array)
asyncio.run(log_data())

7
apps/env.sample Normal file
View File

@ -0,0 +1,7 @@
SNOWFLAKE_ACCOUNT='vna27887.us-east-1'
SNOWFLAKE_ROLE='ACCOUNTADMIN'
SNOWFLAKE_USER='<>'
SNOWFLAKE_PASSWORD='<>'
SNOWFLAKE_REGION='us-east-1'
SNOWFLAKE_DATABASE='LIVEQUERY_DEV'
SNOWFLAKE_WAREHOUSE='INTERNAL_DEV'

35
apps/table_list.yaml Normal file
View File

@ -0,0 +1,35 @@
tables:
- blockchain: ethereum
networks:
- mainnet:
tables:
- name: tf_fact_blocks
- name: tf_fact_logs
- name: tf_ez_token_transfers
extra_args:
ez_token_transfers_id:
value: '0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef'
type: STRING
default_args:
block_height:
value: NULL
type: INTEGER
to_latest:
value: TRUE
type: BOOLEAN
- sepolia:
tables:
- name: tf_fact_blocks
- name: tf_fact_logs
- name: tf_ez_token_transfers
extra_args:
ez_token_transfers_id:
value: '0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef'
type: STRING
default_args:
block_height:
value: NULL
type: INTEGER
to_latest:
value: TRUE
type: BOOLEAN

View File

@ -1 +1,4 @@
dbt-snowflake~=1.5.0
dbt-snowflake~=1.5.0
snowflake-connector-python~=3.12.2
pyarrow~=17.0.0
h2o-wave~=1.5.1