diff --git a/.gitignore b/.gitignore index 5094746..b127376 100644 --- a/.gitignore +++ b/.gitignore @@ -18,4 +18,8 @@ dbt-env/ .env .* # KEEP -!.github/ \ No newline at end of file +!.github/ + +# Ignore Python bytecode files +*.pyc +__pycache__/ diff --git a/apps/README.md b/apps/README.md new file mode 100644 index 0000000..e7b1599 --- /dev/null +++ b/apps/README.md @@ -0,0 +1,45 @@ +## Development + +1. cd your directory + + ```bash + cd apps + ``` + +2. Setup a virtual environment + + ```bash + conda create -n "livequery-models" python=3.9 + ``` + +3. Activate the virtual environment + + ```bash + conda activate livequery-models + ``` + +4. Create a `.env` file from the env.sample file and run, + + ```base + set -a; source .env; set +a + ``` + +5. Install + + ```bash + pip install -r ../requirements.txt + ``` + +6. Add PYTHONPATH + + ```bash + export PYTHONPATH=. + ``` + +7. Run the app + + ```bash + wave run app.py + ``` + +7. Visit the demo at [http://localhost:10101/demo](http://127.0.0.1:10101/demo) diff --git a/apps/__init__.py b/apps/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/apps/app.py b/apps/app.py new file mode 100644 index 0000000..e717c3f --- /dev/null +++ b/apps/app.py @@ -0,0 +1,111 @@ +import time +from h2o_wave import app, data, ui, Q, main + +from dashboards.realtime_monitoring import execute_query +import logging + + +logging.basicConfig(level=logging.INFO) + + +light_theme_colors = "$red $pink $purple $violet $indigo $blue $azure $cyan $teal $mint $green $amber $orange $tangerine".split() # noqa: E501 +dark_theme_colors = "$red $pink $blue $azure $cyan $teal $mint $green $lime $yellow $amber $orange $tangerine".split() + +_color_index = -1 +colors = dark_theme_colors + + +def next_color(): + global _color_index + _color_index += 1 + return colors[_color_index % len(colors)] + + +_curve_index = -1 +curves = "linear smooth step step-after step-before".split() + + +def next_curve(): + global _curve_index + _curve_index += 1 + return curves[_curve_index % len(curves)] + +async def update_realtime_dataset(q): + async for data_table in execute_query(): + try: + latest_block_number = max( + row["BLOCK_NUMBER"] for row in data_table.to_pylist() + ) + q.page["block_number_card"].value = str(latest_block_number) + except Exception as e: + print(e) + + # try: + # import pyarrow as pa + # import pyarrow.compute as pc + + # # Extract block_timestamp column + # block_timestamps = data_table.column("BLOCK_TIMESTAMP") + + # # Convert timestamps to minutes + # block_timestamps_minute = pc.strftime(block_timestamps, format="%Y-%m-%d %H:%M") + + # # Group by minute and count + # table = pa.table([block_timestamps_minute], names=["minute"]) + # grouped_data = table.group_by("minute").aggregate([("minute", "count")]) + + # # Convert grouped data to list and assign it back + # existing_data = list(q.page["line_plot"].data) # Convert Ref to list + # new_data = existing_data + [[row["minute"], row["minute_count"]] for row in grouped_data.to_pylist()] # Append new entries + # q.page["line_plot"].data = new_data # Assign back to Ref + # except Exception as e: + # print(e) + + # try: + # rows = [ + # ui.table_row( + # name=str(i), + # cells=[current_time, str(len(row))] + # ) + # for i, row in enumerate(data_table.to_pylist()) + # ] + # q.page["data_table"].rows = rows + # except Exception as e: + # print(e) + + await q.page.save() + +@app("/demo", mode='broadcast') +async def create_dashboard(q: Q): + q.page.drop() + line_plot = ui.plot_card( + box="1 3 4 4", + title="Token Transfers Data Size Over Time", + data=data("time length", -100), + plot=ui.plot( + [ui.mark(type="line", x="=time", y="=length", curve="smooth")] + ), + ) + + block_number_card = ui.small_stat_card(box="1 1 2 2", title="Latest Block Number", value="0") + + # data_table = ui.table( + # name='data_table', + # columns=[ + # ui.table_column(name='time', label='Time', sortable=True), + # ui.table_column(name='length', label='Data Length', sortable=True), + # ], + # rows=[], + # pagination=ui.table_pagination(total_rows=100, rows_per_page=100) + # ) + + # q.page["line_plot"] = line_plot + q.page["block_number_card"] = block_number_card + # q.page["data_table"] = data_table + + await q.page.save() + + try: + await update_realtime_dataset(q) + finally: + await q.close() diff --git a/apps/dashboards/__init__.py b/apps/dashboards/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/apps/dashboards/realtime_monitoring.py b/apps/dashboards/realtime_monitoring.py new file mode 100644 index 0000000..2aba2a5 --- /dev/null +++ b/apps/dashboards/realtime_monitoring.py @@ -0,0 +1,139 @@ +from h2o_wave import ui, data, Q +import pyarrow as pa +from pyarrow import compute as pc +import asyncio +import time +from concurrent.futures import ThreadPoolExecutor +import snowflake.connector +from snowflake.connector import ProgrammingError +import os +import yaml + +async def pull_data_from_snowflake(query: str): + with snowflake.connector.connect( + user=os.getenv("SNOWFLAKE_USER"), + password=os.getenv("SNOWFLAKE_PASSWORD"), + account=os.getenv("SNOWFLAKE_ACCOUNT"), + database=os.getenv("SNOWFLAKE_DATABASE"), + schema=os.getenv("SNOWFLAKE_SCHEMA"), + warehouse=os.getenv("SNOWFLAKE_WAREHOUSE"), + role=os.getenv("SNOWFLAKE_ROLE"), + session_parameters={ + "QUERY_TAG": "realtime_monitoring", + }, + ) as conn: + cur = conn.cursor() + try: + cur.execute_async(query) + query_id = cur.sfqid + + try: + while conn.is_still_running(conn.get_query_status_throw_if_error(query_id)): + time.sleep(1) + except ProgrammingError as err: + print('Programming Error: {0}'.format(err)) + + results = [] + cur.get_results_from_sfqid(query_id) + + #TODO: work with PyArrow and yield in batches to avoid OOM + for row in cur.fetch_arrow_batches(): + results.extend(row.to_pylist()) + + return results + finally: + cur.close() + + + +QUERY = """ + SELECT * + FROM TABLE({SCHEMA}.{TABLE_FUNCTION_NAME}( + {table_function_args} + )); +""" + +def load_table_list_from_yaml(yaml_file): + with open(yaml_file, 'r') as file: + data = yaml.safe_load(file) + return data + +# table_lists = load_table_list_from_yaml('apps/table_list.yaml') + +async def fetch_data_for_schema_table(schema, table, args): + latest_block_height = 0 + query = QUERY.format( + DATABASE=os.getenv("SNOWFLAKE_DATABASE"), + SCHEMA=schema, + TABLE_FUNCTION_NAME=table, + table_function_args=args, + ) + results = await pull_data_from_snowflake(query) + if results: + results_table = pa.Table.from_pylist(results) + latest_block_height = pc.max(results_table.column('BLOCK_NUMBER')).as_py() + + return (latest_block_height, results_table) + +# async def gather_tasks_and_parse_args(q: Q): +# """ +# Gather tasks and parse args for each table and network. +# """ +# table_lists = load_table_list_from_yaml('apps/table_list.yaml') +# tables = table_lists['tables'] + +# loop = asyncio.get_event_loop() +# with ThreadPoolExecutor() as executor: +# tasks = [] +# for table in tables: +# blockchain = table['blockchain'] +# for network in table['networks']: +# for network_name, network_data in network.items(): +# schema = f"{blockchain}_{network_name}" +# for table_data in network_data['tables']: +# table_name = table_data['name'] +# args = {**network_data['default_args'], **table_data.get('extra_args', {})} +# formatted_args = ', '.join(f"{key}={arg['value']}::{arg['type']}" for key, arg in args.items()) +# tasks.append( +# loop.run_in_executor( +# executor, fetch_data_for_schema_table, q, schema, table_name, formatted_args +# ) +# ) +# await asyncio.gather(*tasks) + +async def execute_query() -> pa.Table: + table_lists = { + "schema": "ethereum_mainnet", + "table": "tf_ez_token_transfers", + "args": [ + { + "block_height": "NULL::INTEGER", + "to_latest": "TRUE::BOOLEAN", + "ez_token_transfers_id": "'0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef'::STRING" + } + ] + } + + initial_args = ', '.join(f"{value}" for arg in table_lists['args'] for _, value in arg.items()) + latest_block_height, data_table = await fetch_data_for_schema_table(table_lists['schema'], table_lists['table'], initial_args) + + # Forever loop to fetch new data + while True: + table_lists['args'][0]['block_height'] = f"{latest_block_height}::INTEGER" + updated_args = ', '.join(f"{value}" for arg in table_lists['args'] for _, value in arg.items()) + latest_block_height, data_table = await fetch_data_for_schema_table(table_lists['schema'], table_lists['table'], updated_args) + + yield data_table + +if __name__ == "__main__": + # data = asyncio.run(execute_query()) + import logging + + logging.basicConfig(level=logging.INFO) + + async def log_data(): + async for data_table in execute_query(): + data_array = data_table.to_pylist() + logging.info(data_array) + + asyncio.run(log_data()) diff --git a/apps/env.sample b/apps/env.sample new file mode 100644 index 0000000..c7ba6fd --- /dev/null +++ b/apps/env.sample @@ -0,0 +1,7 @@ +SNOWFLAKE_ACCOUNT='vna27887.us-east-1' +SNOWFLAKE_ROLE='ACCOUNTADMIN' +SNOWFLAKE_USER='<>' +SNOWFLAKE_PASSWORD='<>' +SNOWFLAKE_REGION='us-east-1' +SNOWFLAKE_DATABASE='LIVEQUERY_DEV' +SNOWFLAKE_WAREHOUSE='INTERNAL_DEV' diff --git a/apps/table_list.yaml b/apps/table_list.yaml new file mode 100644 index 0000000..4adb06b --- /dev/null +++ b/apps/table_list.yaml @@ -0,0 +1,35 @@ +tables: + - blockchain: ethereum + networks: + - mainnet: + tables: + - name: tf_fact_blocks + - name: tf_fact_logs + - name: tf_ez_token_transfers + extra_args: + ez_token_transfers_id: + value: '0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef' + type: STRING + default_args: + block_height: + value: NULL + type: INTEGER + to_latest: + value: TRUE + type: BOOLEAN + - sepolia: + tables: + - name: tf_fact_blocks + - name: tf_fact_logs + - name: tf_ez_token_transfers + extra_args: + ez_token_transfers_id: + value: '0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef' + type: STRING + default_args: + block_height: + value: NULL + type: INTEGER + to_latest: + value: TRUE + type: BOOLEAN diff --git a/requirements.txt b/requirements.txt index ff8e7ca..29a3fa6 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1 +1,4 @@ -dbt-snowflake~=1.5.0 \ No newline at end of file +dbt-snowflake~=1.5.0 +snowflake-connector-python~=3.12.2 +pyarrow~=17.0.0 +h2o-wave~=1.5.1