From 10e53f74d5b08dfce04aec44afcc152dee348699 Mon Sep 17 00:00:00 2001 From: Jack Forgash <58153492+forgxyz@users.noreply.github.com> Date: Tue, 4 Mar 2025 18:05:29 -0700 Subject: [PATCH] restore nft metadata py model --- ...equery__request_nearblocks_nft_metadata.py | 101 ++++++++++++++++++ 1 file changed, 101 insertions(+) create mode 100644 models/streamline/external/token_metadata/nearblocks/legacy/livequery__request_nearblocks_nft_metadata.py diff --git a/models/streamline/external/token_metadata/nearblocks/legacy/livequery__request_nearblocks_nft_metadata.py b/models/streamline/external/token_metadata/nearblocks/legacy/livequery__request_nearblocks_nft_metadata.py new file mode 100644 index 0000000..ef65b96 --- /dev/null +++ b/models/streamline/external/token_metadata/nearblocks/legacy/livequery__request_nearblocks_nft_metadata.py @@ -0,0 +1,101 @@ +import anyjson as json +import snowflake.snowpark.types as T + +from datetime import datetime + +# Legacy model for NFT Contract Metadata +# Still runs fine - keep? Or refactor? + +def model(dbt, session): + + dbt.config( + materialized='incremental', + unique_key='_RES_ID', + packages=['anyjson'], + tags=['livequery', 'nearblocks'], + incremental_strategy='delete+insert' + ) + + # define api parameters + page = 1 + url_params = { + 'page': page, + 'per_page': 50, + 'sort': 'holders', + 'order': 'desc' + } + + # define parameters for udf_api + method = 'GET' + headers = {} # no header required for this api + data = {} # arguments passed in URL via GET request + base_url = 'https://api.nearblocks.io/v1/nfts' + + # define result df schema with columns + schema = T.StructType([ + T.StructField('PAGE', T.IntegerType()), + T.StructField('REQUEST_SQL', T.StringType()), + T.StructField('RESPONSE_COUNT', T.IntegerType()), + T.StructField('DATA', T.VariantType()), + T.StructField('_INSERTED_TIMESTAMP', T.TimestampType()), + T.StructField('_RES_ID', T.StringType()) + ]) + + # define result df + result_df = session.create_dataframe( + [], + schema + ) + + # set upper limit + max_page = 100 + + while True: + # set url for GET request based on url_params + url = f'{base_url}?{"&".join([f"{k}={v}" for k, v in url_params.items()])}' + + # call udf api (max 50 requests per page) + call_udf_sql = f"select livequery.live.udf_api('{method}', '{url}', {headers}, {data})" + + # execute udf_api call + response = session.sql(call_udf_sql).collect() + try: + token_count = len(json.loads(response[0][0])['data']['tokens']) + except: + break + if token_count > 0: + + _inserted_timestamp = datetime.utcnow() + _res_id = f'{_inserted_timestamp.date()}-{page}' + + # append response to result df + result_df = result_df.union( + session.create_dataframe( + [ + [ + page, + call_udf_sql, + token_count, + json.loads(response[0][0]), + _inserted_timestamp, + _res_id + ] + ], + schema + ) + ) + + # increment page + page += 1 + + # update data + url_params.update({'page': page}) + + # break if max page reached + if page > max_page: + break + + else: + break + + return result_df