diff --git a/macros/create_udfs.sql b/macros/create_udfs.sql index 2644b0f..5480601 100644 --- a/macros/create_udfs.sql +++ b/macros/create_udfs.sql @@ -7,7 +7,7 @@ ) }} {{ create_udf_get_chainhead() }} {{ create_udf_get_chainhead_testnet() }} - {{ create_udf_bulk_grpc() }} + {{ create_udf_bulk_grpc_v2() }} {{ run_create_udf_array_disjunctive_union() }} {{ run_create_address_array_adj() }} diff --git a/macros/streamline/api_integrations.sql b/macros/streamline/api_integrations.sql index d9dc5f4..e7f44fc 100644 --- a/macros/streamline/api_integrations.sql +++ b/macros/streamline/api_integrations.sql @@ -2,9 +2,20 @@ {% macro create_aws_flow_api() %} {% if target.name == "prod" %} {% set sql %} + -- Likely deprecated endpoint CREATE api integration IF NOT EXISTS aws_flow_api_prod api_provider = aws_api_gateway api_aws_role_arn = 'arn:aws:iam::490041342817:role/flow-api-prod-rolesnowflakeudfsAF733095-FNY67ODG1RFG' api_allowed_prefixes = ( 'https://quxfxtl934.execute-api.us-east-1.amazonaws.com/prod/' ) enabled = TRUE; + + {% endset %} + {% do run_query(sql) %} + + {% set sql %} + -- New v2 deployment for prod + CREATE api integration IF NOT EXISTS aws_flow_api_prod_v2 api_provider = aws_api_gateway api_aws_role_arn = 'arn:aws:iam::924682671219:role/flow-api-prod-rolesnowflakeudfsAF733095-RmrgKIWbzoFL' api_allowed_prefixes = ( + 'https://rajpkbgko9.execute-api.us-east-1.amazonaws.com/prod/' + ) enabled = TRUE; + {% endset %} {% do run_query(sql) %} @@ -26,6 +37,14 @@ {% elif target.name == "dev" %} {{ log("Generating api integration for target:" ~ target.name, info=True) }} + -- New v2 deployment for dev + {% set sql %} + CREATE api integration IF NOT EXISTS aws_flow_api_stg_v2 api_provider = aws_api_gateway api_aws_role_arn = 'arn:aws:iam::704693948482:role/flow-api-stg-rolesnowflakeudfsAF733095-ybejBONVMTd4' api_allowed_prefixes = ( + 'https://2hcu4hei27.execute-api.us-east-1.amazonaws.com/stg/' + ) enabled = TRUE; + {% endset %} + {% do run_query(sql) %} + {% set sql %} CREATE api integration IF NOT EXISTS aws_flow_api_dev_2 api_provider = aws_api_gateway api_aws_role_arn = 'arn:aws:iam::490041342817:role/flow-api-dev-rolesnowflakeudfsAF733095-i1JsMNTpSzX0' api_allowed_prefixes = ( 'https://sicl8dvvv9.execute-api.us-east-1.amazonaws.com/dev/' diff --git a/macros/streamline/streamline_udfs.sql b/macros/streamline/streamline_udfs.sql index bea1a25..a7a2687 100644 --- a/macros/streamline/streamline_udfs.sql +++ b/macros/streamline/streamline_udfs.sql @@ -43,6 +43,21 @@ {%- endif %}; {% endmacro %} +{% macro create_udf_bulk_grpc_v2() %} + {{ log("Creating udf udf_bulk_grpc_v2 for target:" ~ target.name ~ ", schema: " ~ target.schema, info=True) }} + {{ log("role:" ~ target.role ~ ", user:" ~ target.user, info=True) }} + CREATE + OR REPLACE EXTERNAL FUNCTION streamline.udf_bulk_grpc_v2(json variant) returns variant api_integration = + {% if target.name == "prod" %} + aws_flow_api_prod_v2 AS 'https://rajpkbgko9.execute-api.us-east-1.amazonaws.com/prod/udf_bulk_grpc' + {% elif target.name == "dev" %} + aws_flow_api_stg_v2 AS 'https://2hcu4hei27.execute-api.us-east-1.amazonaws.com/stg/udf_bulk_grpc' + {% elif target.name == "sbx" %} + {{ log("Creating sbx udf_bulk_grpc_v2", info=True) }} + aws_flow_api_stg_v2 AS 'https://2hcu4hei27.execute-api.us-east-1.amazonaws.com/stg/udf_bulk_grpc' + {%- endif %}; +{% endmacro %} + {% macro create_udf_bulk_grpc_us_east_2() %} {{ log("Creating udf udf_bulk_grpc for target:" ~ target.name ~ ", schema: " ~ target.schema, info=True) }} {{ log("role:" ~ target.role ~ ", user:" ~ target.user, info=True) }} diff --git a/models/bronze/streamline/realtime/bronze__streamline_blocks.sql b/models/bronze/streamline/realtime/bronze__streamline_blocks.sql index 4f0fbd0..0cd2937 100644 --- a/models/bronze/streamline/realtime/bronze__streamline_blocks.sql +++ b/models/bronze/streamline/realtime/bronze__streamline_blocks.sql @@ -2,34 +2,7 @@ materialized = 'view' ) }} -WITH meta AS ( - SELECT - last_modified AS _inserted_timestamp, - file_name, - CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 3), '_', 1) AS INTEGER) AS _partition_by_block_id - FROM - TABLE( - information_schema.external_table_file_registration_history( - start_time => DATEADD('day', -3, CURRENT_TIMESTAMP()), - table_name => '{{ source( "bronze_streamline", "blocks") }}') - ) A - ) - SELECT - block_number, - DATA, - _inserted_timestamp, - MD5( - CAST( - COALESCE(CAST(block_number AS text), '' :: STRING) AS text - ) - ) AS _fsc_id, - s._partition_by_block_id, - s.value AS VALUE - FROM - {{ source("bronze_streamline","blocks") }} s - JOIN meta b - ON b.file_name = metadata$filename - AND b._partition_by_block_id = s._partition_by_block_id - WHERE - b._partition_by_block_id = s._partition_by_block_id - +{{ streamline_external_table_query_v2( + model = "blocks_v2", + partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 3), '_', 1) AS INTEGER )" +) }} \ No newline at end of file diff --git a/models/bronze/streamline/realtime/bronze__streamline_collections.sql b/models/bronze/streamline/realtime/bronze__streamline_collections.sql index d2d2395..ae8934d 100644 --- a/models/bronze/streamline/realtime/bronze__streamline_collections.sql +++ b/models/bronze/streamline/realtime/bronze__streamline_collections.sql @@ -2,10 +2,7 @@ materialized = 'view' ) }} -{% set model = this.identifier.split("_") [-1] %} -{{ streamline_external_table_query( - model, - partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 3), '_', 1) AS INTEGER)", - partition_name = "_partition_by_block_id", - unique_key = "id" +{{ streamline_external_table_query_v2( + model = "collections_v2", + partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 3), '_', 1) AS INTEGER)" ) }} diff --git a/models/bronze/streamline/realtime/bronze__streamline_fr_blocks.sql b/models/bronze/streamline/realtime/bronze__streamline_fr_blocks.sql index b1af0e3..56d6923 100644 --- a/models/bronze/streamline/realtime/bronze__streamline_fr_blocks.sql +++ b/models/bronze/streamline/realtime/bronze__streamline_fr_blocks.sql @@ -1,34 +1,8 @@ {{ config ( -materialized = 'view' + materialized = 'view' ) }} -WITH meta AS ( - SELECT - registered_on AS _inserted_timestamp, - file_name, - CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 3), '_', 1) AS INTEGER) AS _partition_by_block_id - FROM - TABLE( - information_schema.external_table_files( - table_name => '{{ source( "bronze_streamline", "blocks") }}' - ) - ) A -) -SELECT - block_number, - DATA, - _inserted_timestamp, - MD5( - CAST( - COALESCE(CAST(block_number AS text), '' :: STRING) AS text - ) - ) AS _fsc_id, - s._partition_by_block_id, - s.value AS VALUE -FROM - {{ source("bronze_streamline","blocks") }} s - JOIN meta b - ON b.file_name = metadata$filename - AND b._partition_by_block_id = s._partition_by_block_id -WHERE - b._partition_by_block_id = s._partition_by_block_id +{{ streamline_external_table_FR_query_v2( + model = 'blocks_v2', + partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 3), '_', 1) AS INTEGER)" +) }} diff --git a/models/bronze/streamline/realtime/bronze__streamline_fr_collections.sql b/models/bronze/streamline/realtime/bronze__streamline_fr_collections.sql index f552fc8..7882774 100644 --- a/models/bronze/streamline/realtime/bronze__streamline_fr_collections.sql +++ b/models/bronze/streamline/realtime/bronze__streamline_fr_collections.sql @@ -2,10 +2,7 @@ materialized = 'view' ) }} -{% set model = this.identifier.split("_") [-1] %} -{{ streamline_external_table_FR_query( - model, - partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 3), '_', 1) AS INTEGER)", - partition_name = "_partition_by_block_id", - unique_key = "id" +{{ streamline_external_table_FR_query_v2( + model = 'collections_v2', + partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 3), '_', 1) AS INTEGER)" ) }} diff --git a/models/bronze/streamline/realtime/bronze__streamline_fr_transaction_results.sql b/models/bronze/streamline/realtime/bronze__streamline_fr_transaction_results.sql index 2f72332..42a973a 100644 --- a/models/bronze/streamline/realtime/bronze__streamline_fr_transaction_results.sql +++ b/models/bronze/streamline/realtime/bronze__streamline_fr_transaction_results.sql @@ -2,12 +2,9 @@ materialized = 'view' ) }} -{% set model = this.identifier.split("_")[-2:] | join('_') %} -{{ streamline_external_table_FR_query( - model, - partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 3), '_', 1) AS INTEGER)", - partition_name = "_partition_by_block_id", - unique_key = "id" +{{ streamline_external_table_FR_query_v2( + model = 'transaction_results_v2', + partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 3), '_', 1) AS INTEGER)" ) }} diff --git a/models/bronze/streamline/realtime/bronze__streamline_fr_transactions.sql b/models/bronze/streamline/realtime/bronze__streamline_fr_transactions.sql index f552fc8..e7dc629 100644 --- a/models/bronze/streamline/realtime/bronze__streamline_fr_transactions.sql +++ b/models/bronze/streamline/realtime/bronze__streamline_fr_transactions.sql @@ -2,10 +2,7 @@ materialized = 'view' ) }} -{% set model = this.identifier.split("_") [-1] %} -{{ streamline_external_table_FR_query( - model, - partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 3), '_', 1) AS INTEGER)", - partition_name = "_partition_by_block_id", - unique_key = "id" +{{ streamline_external_table_FR_query_v2( + model = 'transactions_v2', + partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 3), '_', 1) AS INTEGER)" ) }} diff --git a/models/bronze/streamline/realtime/bronze__streamline_transaction_results.sql b/models/bronze/streamline/realtime/bronze__streamline_transaction_results.sql index 545031e..ca3f4bf 100644 --- a/models/bronze/streamline/realtime/bronze__streamline_transaction_results.sql +++ b/models/bronze/streamline/realtime/bronze__streamline_transaction_results.sql @@ -2,10 +2,7 @@ materialized = 'view' ) }} -{% set model = this.identifier.split("_")[-2:] | join('_') %} -{{ streamline_external_table_query( - model, - partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 3), '_', 1) AS INTEGER)", - partition_name = "_partition_by_block_id", - unique_key = "id" +{{ streamline_external_table_query_v2( + model = 'transaction_results_v2', + partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 3), '_', 1) AS INTEGER)" ) }} diff --git a/models/bronze/streamline/realtime/bronze__streamline_transactions.sql b/models/bronze/streamline/realtime/bronze__streamline_transactions.sql index d2d2395..106175f 100644 --- a/models/bronze/streamline/realtime/bronze__streamline_transactions.sql +++ b/models/bronze/streamline/realtime/bronze__streamline_transactions.sql @@ -2,10 +2,7 @@ materialized = 'view' ) }} -{% set model = this.identifier.split("_") [-1] %} -{{ streamline_external_table_query( - model, - partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 3), '_', 1) AS INTEGER)", - partition_name = "_partition_by_block_id", - unique_key = "id" +{{ streamline_external_table_query_v2( + model = 'transactions_v2', + partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 3), '_', 1) AS INTEGER)" ) }} diff --git a/models/bronze/streamline/realtime/v1/bronze__streamline_blocks_v1.sql b/models/bronze/streamline/realtime/v1/bronze__streamline_blocks_v1.sql new file mode 100644 index 0000000..fffa5d5 --- /dev/null +++ b/models/bronze/streamline/realtime/v1/bronze__streamline_blocks_v1.sql @@ -0,0 +1,34 @@ +{{ config ( + materialized = 'view' +) }} + +WITH meta AS ( + SELECT + last_modified AS _inserted_timestamp, + file_name, + CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 3), '_', 1) AS INTEGER) AS _partition_by_block_id + FROM + TABLE( + information_schema.external_table_file_registration_history( + start_time => DATEADD('day', -3, CURRENT_TIMESTAMP()), + table_name => '{{ source( "bronze_streamline", "blocks") }}') + ) A + ) + SELECT + block_number, + DATA, + _inserted_timestamp, + MD5( + CAST( + COALESCE(CAST(block_number AS text), '' :: STRING) AS text + ) + ) AS _fsc_id, + s._partition_by_block_id, + s.value AS VALUE + FROM + {{ source("bronze_streamline","blocks") }} s + JOIN meta b + ON b.file_name = metadata$filename + AND b._partition_by_block_id = s._partition_by_block_id + WHERE + b._partition_by_block_id = s._partition_by_block_id \ No newline at end of file diff --git a/models/bronze/streamline/realtime/v1/bronze__streamline_collections_v1.sql b/models/bronze/streamline/realtime/v1/bronze__streamline_collections_v1.sql new file mode 100644 index 0000000..a2614fa --- /dev/null +++ b/models/bronze/streamline/realtime/v1/bronze__streamline_collections_v1.sql @@ -0,0 +1,11 @@ +{{ config ( + materialized = 'view' +) }} + +{% set model = this.identifier.split("_") [-1] %} +{{ streamline_external_table_query( + model = "collections", + partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 3), '_', 1) AS INTEGER)", + partition_name = "_partition_by_block_id", + unique_key = "id" +) }} diff --git a/models/bronze/streamline/realtime/v1/bronze__streamline_fr_blocks_v1.sql b/models/bronze/streamline/realtime/v1/bronze__streamline_fr_blocks_v1.sql new file mode 100644 index 0000000..dc349d9 --- /dev/null +++ b/models/bronze/streamline/realtime/v1/bronze__streamline_fr_blocks_v1.sql @@ -0,0 +1,34 @@ +{{ config ( +materialized = 'view' +) }} + +WITH meta AS ( + SELECT + registered_on AS _inserted_timestamp, + file_name, + CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 3), '_', 1) AS INTEGER) AS _partition_by_block_id + FROM + TABLE( + information_schema.external_table_files( + table_name => '{{ source( "bronze_streamline", "blocks") }}' + ) + ) A +) +SELECT + block_number, + DATA, + _inserted_timestamp, + MD5( + CAST( + COALESCE(CAST(block_number AS text), '' :: STRING) AS text + ) + ) AS _fsc_id, + s._partition_by_block_id, + s.value AS VALUE +FROM + {{ source("bronze_streamline","blocks") }} s + JOIN meta b + ON b.file_name = metadata$filename + AND b._partition_by_block_id = s._partition_by_block_id +WHERE + b._partition_by_block_id = s._partition_by_block_id \ No newline at end of file diff --git a/models/bronze/streamline/realtime/v1/bronze__streamline_fr_collections_v1.sql b/models/bronze/streamline/realtime/v1/bronze__streamline_fr_collections_v1.sql new file mode 100644 index 0000000..ec83dce --- /dev/null +++ b/models/bronze/streamline/realtime/v1/bronze__streamline_fr_collections_v1.sql @@ -0,0 +1,11 @@ +{{ config ( + materialized = 'view' +) }} + +{% set model = this.identifier.split("_") [-1] %} +{{ streamline_external_table_FR_query( + model = 'collections', + partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 3), '_', 1) AS INTEGER)", + partition_name = "_partition_by_block_id", + unique_key = "id" +) }} diff --git a/models/bronze/streamline/realtime/v1/bronze__streamline_fr_transaction_results_v1.sql b/models/bronze/streamline/realtime/v1/bronze__streamline_fr_transaction_results_v1.sql new file mode 100644 index 0000000..230bc83 --- /dev/null +++ b/models/bronze/streamline/realtime/v1/bronze__streamline_fr_transaction_results_v1.sql @@ -0,0 +1,13 @@ +{{ config ( + materialized = 'view' +) }} + +{% set model = this.identifier.split("_")[-2:] | join('_') %} +{{ streamline_external_table_FR_query( + model = 'transaction_results', + partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 3), '_', 1) AS INTEGER)", + partition_name = "_partition_by_block_id", + unique_key = "id" +) }} + + diff --git a/models/bronze/streamline/realtime/v1/bronze__streamline_fr_transactions_v1.sql b/models/bronze/streamline/realtime/v1/bronze__streamline_fr_transactions_v1.sql new file mode 100644 index 0000000..c8c73c8 --- /dev/null +++ b/models/bronze/streamline/realtime/v1/bronze__streamline_fr_transactions_v1.sql @@ -0,0 +1,11 @@ +{{ config ( + materialized = 'view' +) }} + +{% set model = this.identifier.split("_") [-1] %} +{{ streamline_external_table_FR_query( + model = 'transactions', + partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 3), '_', 1) AS INTEGER)", + partition_name = "_partition_by_block_id", + unique_key = "id" +) }} diff --git a/models/bronze/streamline/realtime/v1/bronze__streamline_transaction_results_v1.sql b/models/bronze/streamline/realtime/v1/bronze__streamline_transaction_results_v1.sql new file mode 100644 index 0000000..08b2434 --- /dev/null +++ b/models/bronze/streamline/realtime/v1/bronze__streamline_transaction_results_v1.sql @@ -0,0 +1,11 @@ +{{ config ( + materialized = 'view' +) }} + +{% set model = this.identifier.split("_")[-2:] | join('_') %} +{{ streamline_external_table_query( + model = 'transaction_results', + partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 3), '_', 1) AS INTEGER)", + partition_name = "_partition_by_block_id", + unique_key = "id" +) }} diff --git a/models/bronze/streamline/realtime/v1/bronze__streamline_transactions_v1.sql b/models/bronze/streamline/realtime/v1/bronze__streamline_transactions_v1.sql new file mode 100644 index 0000000..8a3fef0 --- /dev/null +++ b/models/bronze/streamline/realtime/v1/bronze__streamline_transactions_v1.sql @@ -0,0 +1,11 @@ +{{ config ( + materialized = 'view' +) }} + +{% set model = this.identifier.split("_") [-1] %} +{{ streamline_external_table_query( + model = 'transactions', + partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 3), '_', 1) AS INTEGER)", + partition_name = "_partition_by_block_id", + unique_key = "id" +) }} diff --git a/models/gold/nft/nft__dim_topshot_metadata.sql b/models/gold/nft/nft__dim_topshot_metadata.sql index 232ce88..343436c 100644 --- a/models/gold/nft/nft__dim_topshot_metadata.sql +++ b/models/gold/nft/nft__dim_topshot_metadata.sql @@ -6,7 +6,8 @@ unique_key = "nft_id", post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION ON EQUALITY(nft_id,nbatopshot_id);", tags = ['scheduled_non_core'], - meta ={ 'database_tags':{ 'table':{ 'PURPOSE': 'NFT, TOPSHOT' }} } + meta ={ 'database_tags':{ 'table':{ 'PURPOSE': 'NFT, TOPSHOT' }} }, + enabled = false ) }} -- depends_on: {{ ref('bronze__streamline_topshot_metadata') }} WITH diff --git a/models/gold/nft/nft__fact_topshot_buybacks.sql b/models/gold/nft/nft__fact_topshot_buybacks.sql index d6d6b9b..63e1897 100644 --- a/models/gold/nft/nft__fact_topshot_buybacks.sql +++ b/models/gold/nft/nft__fact_topshot_buybacks.sql @@ -6,7 +6,8 @@ cluster_by = ['block_timestamp::date', 'modified_timestamp::date'], unique_key = "topshot_buyback_id", tags = ['nft', 'topshot', 'scheduled'], - meta = { 'database_tags': { 'table': { 'PURPOSE': 'NFT, TOPSHOT' } } } + meta = { 'database_tags': { 'table': { 'PURPOSE': 'NFT, TOPSHOT' } } }, + enabled = false ) }} WITH flowty_sales AS ( diff --git a/models/silver/defi/dex/increment/silver__increment_swaps.yml b/models/silver/defi/dex/increment/silver__increment_swaps.yml index f46309d..ba2beb4 100644 --- a/models/silver/defi/dex/increment/silver__increment_swaps.yml +++ b/models/silver/defi/dex/increment/silver__increment_swaps.yml @@ -1,7 +1,7 @@ version: 2 models: - - name: silver__swaps_factory + - name: silver__increment_swaps description: |- This table records asset swaps on the Flow blockchain parsed from Swap events emitted by Increment SwapPair contracts (via the SwapFactory contract). tests: diff --git a/models/silver/nft/metadata/livequery/topshot/livequery__null_moments_metadata.sql b/models/silver/nft/metadata/livequery/topshot/livequery__null_moments_metadata.sql deleted file mode 100644 index 5ab1e40..0000000 --- a/models/silver/nft/metadata/livequery/topshot/livequery__null_moments_metadata.sql +++ /dev/null @@ -1,35 +0,0 @@ -{{ config( - materialized = 'incremental', - unique_key = '_id', - tags = ['livequery', 'topshot', 'moment_metadata'] -) }} - -SELECT - moment_id, - event_contract, - _inserted_date, - _inserted_timestamp, - MD5( - 'moment_id' || 'event_contract' || '_inserted_date' - ) AS _id -FROM - {{ ref('livequery__request_topshot_metadata') }} -WHERE - DATA :data :data :getMintedMoment :: STRING IS NULL - -{% if is_incremental() %} -AND _inserted_date >= ( - SELECT - MAX(_inserted_date) - FROM - {{ this }} -) -AND _inserted_timestamp > ( - SELECT - MAX(_inserted_timestamp) - FROM - {{ this }} -) -{% else %} - AND _inserted_date >= '2022-12-09' -{% endif %} diff --git a/models/silver/nft/metadata/livequery/topshot/livequery__request_topshot_metadata.py b/models/silver/nft/metadata/livequery/topshot/livequery__request_topshot_metadata.py deleted file mode 100644 index 847e0f4..0000000 --- a/models/silver/nft/metadata/livequery/topshot/livequery__request_topshot_metadata.py +++ /dev/null @@ -1,104 +0,0 @@ -import snowflake.snowpark.types as T -import snowflake.snowpark.functions as F - - -def register_udf_construct_data(): - """ - Helper function to register a named UDF to construct the DATA object for the API call. - This named UDF can be used with a column expression, so multiple moment_ids can be called at the same time. - """ - - udf_construct_data = ( - F.udf( - lambda query, moment_id: {'query': query, - 'variables': {'momentId': moment_id}}, - name='udf_construct_data', - input_types=[ - T.StringType(), - T.StringType() - ], - return_type=T.VariantType(), - replace=True - ) - ) - - return udf_construct_data - - -def model(dbt, session): - """ - This model will call the TopShot GraphQL API to request metadata for a list of moment_ids, determined by an exeternally defined view. - The request arguments are a GraphQL query and moment ID. The gql and API URL are stored in a table and retrieved in this workflow. - """ - - dbt.config( - materialized='incremental', - unique_key='_RES_ID', - packages=['snowflake-snowpark-python'], - tags=['livequery', 'topshot', 'moment_metadata'], - incremental_strategy='delete+insert', - cluster_by=['_INSERTED_TIMESTAMP'] - ) - - # base url and graphql query stored in table via dbt - topshot_gql_params = dbt.ref( - 'livequery__moments_parameters').select( - 'base_url', 'query').where( - F.col( - 'contract') == 'A.0b2a3299cc857e29.TopShot' - ).collect() - - # define params for UDF_API - method = 'POST' - headers = { - 'Accept': 'application/json', - 'Accept-Encoding': 'gzip', - 'Connection': 'keep-alive', - 'Content-Type': 'application/json', - 'User-Agent': 'Flipside_Flow_metadata/0.1' - } - url = topshot_gql_params[0][0] - - # gql query passed with the post request - data = topshot_gql_params[0][1] - - # metadata request requires moment_id, defined in a separate view - # number of moment_ids to request set by .limit(), timeout experienced at 4000 - inputs = dbt.ref( - 'livequery__topshot_moments_metadata_needed').select( - "EVENT_CONTRACT", "MOMENT_ID" - ).limit(100) - # Note prior limit of 3500 leads to 429 error / rate limit by system - # Per Dapper team, 50 reqs per 10 seconds. If exceeded, blocked for 30s. - - # register the udf_construct_data function - udf_construct_data = register_udf_construct_data() - - # use with_columns to source moment_id from the input_df and call multiple udf_api calls at once - # columns defined in the array will be appended to the input dataframe - response = inputs.with_columns( - ['DATA', '_INSERTED_DATE', '_INSERTED_TIMESTAMP', '_RES_ID'], - [ - F.call_udf( - 'flow.live.udf_api', - method, - url, - headers, - udf_construct_data( - F.lit(data), - F.col('MOMENT_ID') - ) - ), - F.sysdate().cast(T.DateType()), - F.sysdate(), - F.md5( - F.concat( - F.col('EVENT_CONTRACT'), - F.col('MOMENT_ID') - ) - ) - ] - ) - - # dbt will append response to table per incremental config - return response diff --git a/models/silver/nft/metadata/livequery/topshot/livequery__request_topshot_metadata.yml b/models/silver/nft/metadata/livequery/topshot/livequery__request_topshot_metadata.yml deleted file mode 100644 index 372ba49..0000000 --- a/models/silver/nft/metadata/livequery/topshot/livequery__request_topshot_metadata.yml +++ /dev/null @@ -1,19 +0,0 @@ -version: 2 - -models: - - name: livequery__request_topshot_metadata - description: |- - LiveQuery-based model to request TopShot metadata from the public graphQL endpoint. - - columns: - - name: EVENT_CONTRACT - - - name: MOMENT_ID - - - name: DATA - - - name: _INSERTED_DATE - - - name: _INSERTED_TIMESTAMP - - - name: _RES_ID diff --git a/models/silver/nft/metadata/livequery/topshot/livequery__topshot_moments_metadata_needed.sql b/models/silver/nft/metadata/livequery/topshot/livequery__topshot_moments_metadata_needed.sql deleted file mode 100644 index cae9d64..0000000 --- a/models/silver/nft/metadata/livequery/topshot/livequery__topshot_moments_metadata_needed.sql +++ /dev/null @@ -1,91 +0,0 @@ -{{ config( - materialized = 'view', - tags = ['livequery', 'topshot', 'moment_metadata'] -) }} - -WITH mints AS ( - - SELECT - event_contract, - event_data :momentID :: STRING AS moment_id - FROM - {{ ref('silver__nft_moments_s') }} - WHERE - event_contract = 'A.0b2a3299cc857e29.TopShot' - AND event_type = 'MomentMinted' -), -sales AS ( - SELECT - nft_collection AS event_contract, - nft_id AS moment_id - FROM - {{ ref('silver__nft_sales_s') }} - WHERE - nft_collection ILIKE '%topshot%' -), -all_topshots AS ( - SELECT - event_contract, - moment_id - FROM - mints - UNION - SELECT - event_contract, - moment_id - FROM - sales -), -lq_always_null AS ( - SELECT - moment_id, - event_contract, - COUNT(1) AS num_times_null_resp - FROM - {{ target.database }}.livequery.null_moments_metadata - WHERE - event_contract = 'A.0b2a3299cc857e29.TopShot' - GROUP BY - 1, - 2 - HAVING - num_times_null_resp > 2 -), -legacy_always_null AS ( - SELECT - id, - contract, - COUNT(1) AS num_times_null_resp - FROM - {{ ref('streamline__null_moments_metadata') }} - WHERE - contract = 'A.0b2a3299cc857e29.TopShot' - GROUP BY - 1, - 2 - HAVING - num_times_null_resp > 2 -) -SELECT - DISTINCT * -FROM - all_topshots -WHERE - moment_id NOT IN ( - ( - SELECT - nft_id AS moment_id - FROM - {{ target.database }}.silver.nft_topshot_metadata - UNION - SELECT - id AS moment_id - FROM - legacy_always_null - UNION - SELECT - moment_id - FROM - lq_always_null - ) - ) diff --git a/models/silver/nft/metadata/livequery/topshot/silver__nft_topshot_metadata.sql b/models/silver/nft/metadata/livequery/topshot/silver__nft_topshot_metadata.sql deleted file mode 100644 index af5a8e1..0000000 --- a/models/silver/nft/metadata/livequery/topshot/silver__nft_topshot_metadata.sql +++ /dev/null @@ -1,74 +0,0 @@ -{{ config( - materialized = 'incremental', - incremental_strategy = 'delete+insert', - cluster_by = ['_inserted_timestamp::DATE'], - unique_key = 'nft_id', - tags = ['livequery', 'topshot'], - full_refresh = False, - enabled = false -) }} -{# NFT Metadata from legacy process lives in external table, deleted CTE and set FR=False -TO - -LIMIT - / avoid unnecessary TABLE scans #} - WITH metadata_lq AS ( - SELECT - _res_id, - 'A.0b2a3299cc857e29.TopShot' AS contract, - moment_id, - DATA :data :data :: variant AS DATA, - _inserted_timestamp - FROM - {{ ref('livequery__request_topshot_metadata') }} - -{% if is_incremental() %} -WHERE - _inserted_timestamp >= ( - SELECT - MAX(_inserted_timestamp) - FROM - {{ this }} - ) -{% endif %} -), -lq_final AS ( - SELECT - moment_id :: STRING AS nft_id, - contract :: STRING AS nft_collection, - DATA :getMintedMoment :data :id :: STRING AS nbatopshot_id, - DATA :getMintedMoment :data :flowSerialNumber :: NUMBER AS serial_number, - DATA :getMintedMoment :data :setPlay :circulationCount :: NUMBER AS total_circulation, - DATA :getMintedMoment :data :play :description :: VARCHAR AS moment_description, - DATA :getMintedMoment :data :play :stats :playerName :: STRING AS player, - DATA :getMintedMoment :data :play :stats :teamAtMoment :: STRING AS team, - DATA :getMintedMoment :data :play :stats :nbaSeason :: STRING AS season, - DATA :getMintedMoment :data :play :stats :playCategory :: STRING AS play_category, - DATA :getMintedMoment :data :play :stats :playType :: STRING AS play_type, - DATA :getMintedMoment :data :play :stats :dateOfMoment :: TIMESTAMP AS moment_date, - DATA :getMintedMoment :data :set :flowName :: STRING AS set_name, - DATA :getMintedMoment :data :set :flowSeriesNumber :: NUMBER AS set_series_number, - DATA :getMintedMoment :data :play :assets :videos :: ARRAY AS video_urls, - DATA :getMintedMoment :data :play :stats :: OBJECT AS moment_stats_full, - DATA :getMintedMoment :data :play :statsPlayerGameScores :: OBJECT AS player_stats_game, - DATA :getMintedMoment :data :play :statsPlayerSeasonAverageScores :: OBJECT AS player_stats_season_to_date, - _inserted_timestamp - FROM - metadata_lq - WHERE - DATA :getMintedMoment :: STRING IS NOT NULL -) -SELECT - *, - {{ dbt_utils.generate_surrogate_key( - ['nft_id'] - ) }} AS nft_moment_metadata_topshot_id, - SYSDATE() AS inserted_timestamp, - SYSDATE() AS modified_timestamp, - '{{ invocation_id }}' AS _invocation_id -FROM - lq_final qualify ROW_NUMBER() over ( - PARTITION BY nft_id - ORDER BY - _inserted_timestamp DESC - ) = 1 diff --git a/models/silver/nft/metadata/livequery/topshot/silver__nft_topshot_metadata.yml b/models/silver/nft/metadata/livequery/topshot/silver__nft_topshot_metadata.yml deleted file mode 100644 index 1c613bf..0000000 --- a/models/silver/nft/metadata/livequery/topshot/silver__nft_topshot_metadata.yml +++ /dev/null @@ -1,73 +0,0 @@ -version: 2 - -models: - - name: silver__nft_topshot_metadata - description: |- - Data for TopShot Moments, including player, team, stats and more. - - columns: - - name: nft_id - description: "{{ doc('nft_id') }}" - - - name: nft_collection - description: "{{ doc('nft_collection') }}" - - - name: nbatopshot_id - description: "{{ doc('nbatopshot_id') }}" - - - name: serial_number - description: "{{ doc('serial_number') }}" - - - name: total_circulation - description: "{{ doc('total_circulation') }}" - - - name: moment_description - description: "{{ doc('moment_description') }}" - - - name: player - description: "{{ doc('player') }}" - - - name: team - description: "{{ doc('team') }}" - - - name: season - description: "{{ doc('season') }}" - - - name: play_category - description: "{{ doc('play_category') }}" - - - name: play_type - description: "{{ doc('play_type') }}" - - - name: moment_date - description: "{{ doc('moment_date') }}" - - - name: set_name - description: "{{ doc('set_name') }}" - - - name: set_series_number - description: "{{ doc('set_series_number') }}" - - - name: video_urls - description: "{{ doc('video_urls') }}" - - - name: moment_stats_full - description: "{{ doc('moment_stats_full') }}" - - - name: player_stats_game - description: "{{ doc('player_stats_game') }}" - - - name: player_stats_season_to_date - description: "{{ doc('player_stats_season_to_date') }}" - - - name: _inserted_timestamp - description: "{{ doc('_inserted_timestamp') }}" - - - name: nft_moment_metadata_topshot_id - description: "{{ doc('pk_id') }}" - - - name: inserted_timestamp - description: "{{ doc('inserted_timestamp') }}" - - - name: modified_timestamp - description: "{{ doc('modified_timestamp') }}" diff --git a/models/silver/nft/metadata/livequery/topshot/silver__nft_topshot_metadata_v2.sql b/models/silver/nft/metadata/livequery/topshot/silver__nft_topshot_metadata_v2.sql deleted file mode 100644 index 184b3be..0000000 --- a/models/silver/nft/metadata/livequery/topshot/silver__nft_topshot_metadata_v2.sql +++ /dev/null @@ -1,64 +0,0 @@ -{{ config( - materialized = 'incremental', - incremental_strategy = 'delete+insert', - cluster_by = ['_inserted_timestamp::DATE'], - unique_key = 'nft_id', - tags = ['streamline', 'topshot'] -) }} --- depends_on: {{ ref('bronze__streamline_topshot_metadata') }} -WITH metadata_from_streamline AS ( - - SELECT - VALUE :CONTRACT AS contract, - VALUE :ID AS moment_id, - DATA, - _inserted_timestamp - FROM - -{% if is_incremental() %} -{{ ref('bronze__streamline_topshot_metadata') }} -WHERE - _inserted_timestamp >= ( - SELECT - MAX(_inserted_timestamp) _inserted_timestamp - FROM - {{ this }} - ) -{% else %} - {{ ref('bronze__streamline_topshot_metadata_FR') }} -{% endif %} -) -SELECT - moment_id :: STRING AS nft_id, - contract :: STRING AS nft_collection, - DATA :data :getMintedMoment :data :id :: STRING AS nbatopshot_id, - DATA :data :getMintedMoment :data :flowSerialNumber :: NUMBER AS serial_number, - DATA :data :getMintedMoment :data :setPlay :circulationCount :: NUMBER AS total_circulation, - DATA :data :getMintedMoment :data :play :description :: VARCHAR AS moment_description, - DATA :data :getMintedMoment :data :play :stats :playerName :: STRING AS player, - DATA :data :getMintedMoment :data :play :stats :teamAtMoment :: STRING AS team, - DATA :data :getMintedMoment :data :play :stats :nbaSeason :: STRING AS season, - DATA :data :getMintedMoment :data :play :stats :playCategory :: STRING AS play_category, - DATA :data :getMintedMoment :data :play :stats :playType :: STRING AS play_type, - DATA :data :getMintedMoment :data :play :stats :dateOfMoment :: TIMESTAMP AS moment_date, - DATA :data :getMintedMoment :data :set :flowName :: STRING AS set_name, - DATA :data :getMintedMoment :data :set :flowSeriesNumber :: NUMBER AS set_series_number, - DATA :data :getMintedMoment :data :play :assets :videos :: ARRAY AS video_urls, - DATA :data :getMintedMoment :data :play :stats :: OBJECT AS moment_stats_full, - DATA :data :getMintedMoment :data :play :statsPlayerGameScores :: OBJECT AS player_stats_game, - DATA :data :getMintedMoment :data :play :statsPlayerSeasonAverageScores :: OBJECT AS player_stats_season_to_date, - _inserted_timestamp, - {{ dbt_utils.generate_surrogate_key( - ['nft_id'] - ) }} AS nft_topshot_metadata_v2_id, - SYSDATE() AS inserted_timestamp, - SYSDATE() AS modified_timestamp, - '{{ invocation_id }}' AS _invocation_id -FROM - metadata_from_streamline -WHERE - DATA :errors IS NULL qualify ROW_NUMBER() over ( - PARTITION BY nft_id - ORDER BY - _inserted_timestamp DESC - ) = 1 diff --git a/models/silver/nft/metadata/livequery/topshot/silver__nft_topshot_metadata_view.sql b/models/silver/nft/metadata/livequery/topshot/silver__nft_topshot_metadata_view.sql deleted file mode 100644 index 9ff1a38..0000000 --- a/models/silver/nft/metadata/livequery/topshot/silver__nft_topshot_metadata_view.sql +++ /dev/null @@ -1,33 +0,0 @@ -{{ config( - materialized = 'view' -) }} - -SELECT - nft_id, - nft_collection, - nbatopshot_id, - serial_number, - total_circulation, - moment_description, - player, - team, - season, - play_category, - play_type, - moment_date, - set_name, - set_series_number, - video_urls, - moment_stats_full, - player_stats_game, - player_stats_season_to_date, - _INSERTED_TIMESTAMP, - nft_moment_metadata_topshot_id, - inserted_timestamp, - modified_timestamp, - _INVOCATION_ID -FROM - {{ source( - 'silver', - 'nft_topshot_metadata' - ) }} diff --git a/models/silver/nft/metadata/livequery/topshot/streamline__null_moments_metadata.sql b/models/silver/nft/metadata/livequery/topshot/streamline__null_moments_metadata.sql deleted file mode 100644 index 9b94306..0000000 --- a/models/silver/nft/metadata/livequery/topshot/streamline__null_moments_metadata.sql +++ /dev/null @@ -1,34 +0,0 @@ -{{ config( - materialized = 'incremental', - unique_key = ["id","contract","_inserted_date"], - tags = ['topshot', 'moment_metadata'], - enabled = True -) }} -{# Legacy workflow - TODO deprecate soon #} - -SELECT - id, - contract, - _inserted_date, - _inserted_timestamp -FROM - {{ ref('bronze__moments_metadata') }} -WHERE - DATA :getMintedMoment :: STRING IS NULL - -{% if is_incremental() %} -AND _inserted_date >= ( - SELECT - MAX(_inserted_date) - FROM - {{ this }} -) -AND _inserted_timestamp > ( - SELECT - MAX(_inserted_timestamp) - FROM - {{ this }} -) -{% else %} - AND _inserted_date >= '2022-12-09' -{% endif %} diff --git a/models/sources.yml b/models/sources.yml index de97511..248f557 100644 --- a/models/sources.yml +++ b/models/sources.yml @@ -8,9 +8,13 @@ sources: tables: - name: moments_minted_metadata_api - name: blocks + - name: blocks_v2 - name: collections + - name: collections_v2 - name: transactions + - name: transactions_v2 - name: transaction_results + - name: transaction_results_v2 - name: BLOCKS_CANDIDATE_07 - name: BLOCKS_CANDIDATE_08 - name: BLOCKS_CANDIDATE_09 diff --git a/models/streamline/core/realtime/streamline__get_blocks_realtime.sql b/models/streamline/core/realtime/streamline__get_blocks_realtime.sql index 7fea8fc..354bd01 100644 --- a/models/streamline/core/realtime/streamline__get_blocks_realtime.sql +++ b/models/streamline/core/realtime/streamline__get_blocks_realtime.sql @@ -1,7 +1,7 @@ {{ config ( materialized = "view", post_hook = if_data_call_function( - func = "{{this.schema}}.udf_bulk_grpc(object_construct('sql_source', '{{this.identifier}}','node_url','access.mainnet.nodes.onflow.org:9000','external_table', 'blocks', 'sql_limit', {{var('sql_limit','500000')}}, 'producer_batch_size', {{var('producer_batch_size','10000')}}, 'worker_batch_size', {{var('worker_batch_size','1000')}}, 'batch_call_limit', {{var('batch_call_limit','1')}}))", + func = "{{this.schema}}.udf_bulk_grpc_v2(object_construct('sql_source', '{{this.identifier}}','node_url','access.mainnet.nodes.onflow.org:9000','external_table', 'blocks_v2', 'sql_limit', {{var('sql_limit','500000')}}, 'producer_batch_size', {{var('producer_batch_size','10000')}}, 'worker_batch_size', {{var('worker_batch_size','1000')}}, 'batch_call_limit', {{var('batch_call_limit','1')}}))", target = "{{this.schema}}.{{this.identifier}}" ), tags = ['streamline_realtime'] diff --git a/models/streamline/core/realtime/streamline__get_collections_realtime.sql b/models/streamline/core/realtime/streamline__get_collections_realtime.sql index 5b036fa..67bf0d5 100644 --- a/models/streamline/core/realtime/streamline__get_collections_realtime.sql +++ b/models/streamline/core/realtime/streamline__get_collections_realtime.sql @@ -1,7 +1,7 @@ {{ config ( materialized = "view", post_hook = if_data_call_function( - func = "{{this.schema}}.udf_bulk_grpc(object_construct('sql_source', '{{this.identifier}}','node_url','access.mainnet.nodes.onflow.org:9000','external_table', 'collections', 'sql_limit', {{var('sql_limit','500000')}}, 'producer_batch_size', {{var('producer_batch_size','10000')}}, 'worker_batch_size', {{var('worker_batch_size','1000')}}, 'batch_call_limit', {{var('batch_call_limit','1')}}))", + func = "{{this.schema}}.udf_bulk_grpc_v2(object_construct('sql_source', '{{this.identifier}}','node_url','access.mainnet.nodes.onflow.org:9000','external_table', 'collections_v2', 'sql_limit', {{var('sql_limit','500000')}}, 'producer_batch_size', {{var('producer_batch_size','10000')}}, 'worker_batch_size', {{var('worker_batch_size','1000')}}, 'batch_call_limit', {{var('batch_call_limit','1')}}))", target = "{{this.schema}}.{{this.identifier}}" ), tags = ['streamline_realtime'] diff --git a/models/streamline/core/realtime/streamline__get_transaction_results_realtime.sql b/models/streamline/core/realtime/streamline__get_transaction_results_realtime.sql index 285e130..700baf7 100644 --- a/models/streamline/core/realtime/streamline__get_transaction_results_realtime.sql +++ b/models/streamline/core/realtime/streamline__get_transaction_results_realtime.sql @@ -1,7 +1,7 @@ {{ config ( materialized = "view", post_hook = if_data_call_function( - func = "{{this.schema}}.udf_bulk_grpc(object_construct('sql_source', '{{this.identifier}}','node_url','access.mainnet.nodes.onflow.org:9000','external_table', 'transaction_results', 'sql_limit', {{var('sql_limit','500000')}}, 'producer_batch_size', {{var('producer_batch_size','10000')}}, 'worker_batch_size', {{var('worker_batch_size','1000')}}, 'batch_call_limit', {{var('batch_call_limit','1')}}))", + func = "{{this.schema}}.udf_bulk_grpc_v2(object_construct('sql_source', '{{this.identifier}}','node_url','access.mainnet.nodes.onflow.org:9000','external_table', 'transaction_results_v2', 'sql_limit', {{var('sql_limit','500000')}}, 'producer_batch_size', {{var('producer_batch_size','10000')}}, 'worker_batch_size', {{var('worker_batch_size','1000')}}, 'batch_call_limit', {{var('batch_call_limit','1')}}))", target = "{{this.schema}}.{{this.identifier}}" ), tags = ['streamline_realtime'] diff --git a/models/streamline/core/realtime/streamline__get_transactions_realtime.sql b/models/streamline/core/realtime/streamline__get_transactions_realtime.sql index a96e0a0..0529ec7 100644 --- a/models/streamline/core/realtime/streamline__get_transactions_realtime.sql +++ b/models/streamline/core/realtime/streamline__get_transactions_realtime.sql @@ -1,7 +1,7 @@ {{ config ( materialized = "view", post_hook = if_data_call_function( - func = "{{this.schema}}.udf_bulk_grpc(object_construct('sql_source', '{{this.identifier}}','node_url','access.mainnet.nodes.onflow.org:9000','external_table', 'transactions', 'sql_limit', {{var('sql_limit','500000')}}, 'producer_batch_size', {{var('producer_batch_size','10000')}}, 'worker_batch_size', {{var('worker_batch_size','1000')}}, 'batch_call_limit', {{var('batch_call_limit','1')}}))", + func = "{{this.schema}}.udf_bulk_grpc_v2(object_construct('sql_source', '{{this.identifier}}','node_url','access.mainnet.nodes.onflow.org:9000','external_table', 'transactions_v2', 'sql_limit', {{var('sql_limit','500000')}}, 'producer_batch_size', {{var('producer_batch_size','10000')}}, 'worker_batch_size', {{var('worker_batch_size','1000')}}, 'batch_call_limit', {{var('batch_call_limit','1')}}))", target = "{{this.schema}}.{{this.identifier}}" ), tags = ['streamline_realtime'] diff --git a/tests/tests__moment_metadata_topshot.sql b/tests/tests__moment_metadata_topshot.sql deleted file mode 100644 index 11a5532..0000000 --- a/tests/tests__moment_metadata_topshot.sql +++ /dev/null @@ -1,30 +0,0 @@ -WITH mint_events AS ( - SELECT - MAX(block_timestamp) :: DATE AS last_mint_date - FROM - {{ ref('silver__nft_moments_s') }} - WHERE - event_contract = 'A.0b2a3299cc857e29.TopShot' - AND event_type = 'MomentMinted' -), -moments AS ( - SELECT - nft_collection, - nft_id, - _inserted_timestamp - FROM - {{ ref('silver__nft_topshot_metadata') }} - WHERE - _inserted_timestamp :: DATE >= ( - SELECT - last_mint_date - FROM - mint_events - ) -) -SELECT - IFF(COUNT(nft_id) > 0, TRUE, FALSE) AS recent -FROM - moments -HAVING - recent = FALSE