AN-6514 Streamline 2.0 Upgrade (#465)
Some checks failed
docs_update / run_dbt_jobs (push) Has been cancelled
docs_update / notify-failure (push) Has been cancelled
dbt_run_streamline_decoded_logs_history / run_dbt_jobs (push) Has been cancelled
dbt_run_streamline_decoded_logs_history / notify-failure (push) Has been cancelled

* rm topshot

* upd yml

* deploy udf

* upd blocks_realtime

* sl func

* bronze upd

* v1 namespace

* bronze v2

* prod endpoint

* define api integrations for v2

---------

Co-authored-by: Jack Forgash <58153492+forgxyz@users.noreply.github.com>
This commit is contained in:
stanz 2025-10-24 01:15:13 +07:00 committed by GitHub
parent 5c8c2d9afa
commit 27e3414619
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
37 changed files with 211 additions and 663 deletions

View File

@ -7,7 +7,7 @@
) }}
{{ create_udf_get_chainhead() }}
{{ create_udf_get_chainhead_testnet() }}
{{ create_udf_bulk_grpc() }}
{{ create_udf_bulk_grpc_v2() }}
{{ run_create_udf_array_disjunctive_union() }}
{{ run_create_address_array_adj() }}

View File

@ -2,9 +2,20 @@
{% macro create_aws_flow_api() %}
{% if target.name == "prod" %}
{% set sql %}
-- Likely deprecated endpoint
CREATE api integration IF NOT EXISTS aws_flow_api_prod api_provider = aws_api_gateway api_aws_role_arn = 'arn:aws:iam::490041342817:role/flow-api-prod-rolesnowflakeudfsAF733095-FNY67ODG1RFG' api_allowed_prefixes = (
'https://quxfxtl934.execute-api.us-east-1.amazonaws.com/prod/'
) enabled = TRUE;
{% endset %}
{% do run_query(sql) %}
{% set sql %}
-- New v2 deployment for prod
CREATE api integration IF NOT EXISTS aws_flow_api_prod_v2 api_provider = aws_api_gateway api_aws_role_arn = 'arn:aws:iam::924682671219:role/flow-api-prod-rolesnowflakeudfsAF733095-RmrgKIWbzoFL' api_allowed_prefixes = (
'https://rajpkbgko9.execute-api.us-east-1.amazonaws.com/prod/'
) enabled = TRUE;
{% endset %}
{% do run_query(sql) %}
@ -26,6 +37,14 @@
{% elif target.name == "dev" %}
{{ log("Generating api integration for target:" ~ target.name, info=True) }}
-- New v2 deployment for dev
{% set sql %}
CREATE api integration IF NOT EXISTS aws_flow_api_stg_v2 api_provider = aws_api_gateway api_aws_role_arn = 'arn:aws:iam::704693948482:role/flow-api-stg-rolesnowflakeudfsAF733095-ybejBONVMTd4' api_allowed_prefixes = (
'https://2hcu4hei27.execute-api.us-east-1.amazonaws.com/stg/'
) enabled = TRUE;
{% endset %}
{% do run_query(sql) %}
{% set sql %}
CREATE api integration IF NOT EXISTS aws_flow_api_dev_2 api_provider = aws_api_gateway api_aws_role_arn = 'arn:aws:iam::490041342817:role/flow-api-dev-rolesnowflakeudfsAF733095-i1JsMNTpSzX0' api_allowed_prefixes = (
'https://sicl8dvvv9.execute-api.us-east-1.amazonaws.com/dev/'

View File

@ -43,6 +43,21 @@
{%- endif %};
{% endmacro %}
{% macro create_udf_bulk_grpc_v2() %}
{{ log("Creating udf udf_bulk_grpc_v2 for target:" ~ target.name ~ ", schema: " ~ target.schema, info=True) }}
{{ log("role:" ~ target.role ~ ", user:" ~ target.user, info=True) }}
CREATE
OR REPLACE EXTERNAL FUNCTION streamline.udf_bulk_grpc_v2(json variant) returns variant api_integration =
{% if target.name == "prod" %}
aws_flow_api_prod_v2 AS 'https://rajpkbgko9.execute-api.us-east-1.amazonaws.com/prod/udf_bulk_grpc'
{% elif target.name == "dev" %}
aws_flow_api_stg_v2 AS 'https://2hcu4hei27.execute-api.us-east-1.amazonaws.com/stg/udf_bulk_grpc'
{% elif target.name == "sbx" %}
{{ log("Creating sbx udf_bulk_grpc_v2", info=True) }}
aws_flow_api_stg_v2 AS 'https://2hcu4hei27.execute-api.us-east-1.amazonaws.com/stg/udf_bulk_grpc'
{%- endif %};
{% endmacro %}
{% macro create_udf_bulk_grpc_us_east_2() %}
{{ log("Creating udf udf_bulk_grpc for target:" ~ target.name ~ ", schema: " ~ target.schema, info=True) }}
{{ log("role:" ~ target.role ~ ", user:" ~ target.user, info=True) }}

View File

@ -2,34 +2,7 @@
materialized = 'view'
) }}
WITH meta AS (
SELECT
last_modified AS _inserted_timestamp,
file_name,
CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 3), '_', 1) AS INTEGER) AS _partition_by_block_id
FROM
TABLE(
information_schema.external_table_file_registration_history(
start_time => DATEADD('day', -3, CURRENT_TIMESTAMP()),
table_name => '{{ source( "bronze_streamline", "blocks") }}')
) A
)
SELECT
block_number,
DATA,
_inserted_timestamp,
MD5(
CAST(
COALESCE(CAST(block_number AS text), '' :: STRING) AS text
)
) AS _fsc_id,
s._partition_by_block_id,
s.value AS VALUE
FROM
{{ source("bronze_streamline","blocks") }} s
JOIN meta b
ON b.file_name = metadata$filename
AND b._partition_by_block_id = s._partition_by_block_id
WHERE
b._partition_by_block_id = s._partition_by_block_id
{{ streamline_external_table_query_v2(
model = "blocks_v2",
partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 3), '_', 1) AS INTEGER )"
) }}

View File

@ -2,10 +2,7 @@
materialized = 'view'
) }}
{% set model = this.identifier.split("_") [-1] %}
{{ streamline_external_table_query(
model,
partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 3), '_', 1) AS INTEGER)",
partition_name = "_partition_by_block_id",
unique_key = "id"
{{ streamline_external_table_query_v2(
model = "collections_v2",
partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 3), '_', 1) AS INTEGER)"
) }}

View File

@ -1,34 +1,8 @@
{{ config (
materialized = 'view'
materialized = 'view'
) }}
WITH meta AS (
SELECT
registered_on AS _inserted_timestamp,
file_name,
CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 3), '_', 1) AS INTEGER) AS _partition_by_block_id
FROM
TABLE(
information_schema.external_table_files(
table_name => '{{ source( "bronze_streamline", "blocks") }}'
)
) A
)
SELECT
block_number,
DATA,
_inserted_timestamp,
MD5(
CAST(
COALESCE(CAST(block_number AS text), '' :: STRING) AS text
)
) AS _fsc_id,
s._partition_by_block_id,
s.value AS VALUE
FROM
{{ source("bronze_streamline","blocks") }} s
JOIN meta b
ON b.file_name = metadata$filename
AND b._partition_by_block_id = s._partition_by_block_id
WHERE
b._partition_by_block_id = s._partition_by_block_id
{{ streamline_external_table_FR_query_v2(
model = 'blocks_v2',
partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 3), '_', 1) AS INTEGER)"
) }}

View File

@ -2,10 +2,7 @@
materialized = 'view'
) }}
{% set model = this.identifier.split("_") [-1] %}
{{ streamline_external_table_FR_query(
model,
partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 3), '_', 1) AS INTEGER)",
partition_name = "_partition_by_block_id",
unique_key = "id"
{{ streamline_external_table_FR_query_v2(
model = 'collections_v2',
partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 3), '_', 1) AS INTEGER)"
) }}

View File

@ -2,12 +2,9 @@
materialized = 'view'
) }}
{% set model = this.identifier.split("_")[-2:] | join('_') %}
{{ streamline_external_table_FR_query(
model,
partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 3), '_', 1) AS INTEGER)",
partition_name = "_partition_by_block_id",
unique_key = "id"
{{ streamline_external_table_FR_query_v2(
model = 'transaction_results_v2',
partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 3), '_', 1) AS INTEGER)"
) }}

View File

@ -2,10 +2,7 @@
materialized = 'view'
) }}
{% set model = this.identifier.split("_") [-1] %}
{{ streamline_external_table_FR_query(
model,
partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 3), '_', 1) AS INTEGER)",
partition_name = "_partition_by_block_id",
unique_key = "id"
{{ streamline_external_table_FR_query_v2(
model = 'transactions_v2',
partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 3), '_', 1) AS INTEGER)"
) }}

View File

@ -2,10 +2,7 @@
materialized = 'view'
) }}
{% set model = this.identifier.split("_")[-2:] | join('_') %}
{{ streamline_external_table_query(
model,
partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 3), '_', 1) AS INTEGER)",
partition_name = "_partition_by_block_id",
unique_key = "id"
{{ streamline_external_table_query_v2(
model = 'transaction_results_v2',
partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 3), '_', 1) AS INTEGER)"
) }}

View File

@ -2,10 +2,7 @@
materialized = 'view'
) }}
{% set model = this.identifier.split("_") [-1] %}
{{ streamline_external_table_query(
model,
partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 3), '_', 1) AS INTEGER)",
partition_name = "_partition_by_block_id",
unique_key = "id"
{{ streamline_external_table_query_v2(
model = 'transactions_v2',
partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 3), '_', 1) AS INTEGER)"
) }}

View File

@ -0,0 +1,34 @@
{{ config (
materialized = 'view'
) }}
WITH meta AS (
SELECT
last_modified AS _inserted_timestamp,
file_name,
CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 3), '_', 1) AS INTEGER) AS _partition_by_block_id
FROM
TABLE(
information_schema.external_table_file_registration_history(
start_time => DATEADD('day', -3, CURRENT_TIMESTAMP()),
table_name => '{{ source( "bronze_streamline", "blocks") }}')
) A
)
SELECT
block_number,
DATA,
_inserted_timestamp,
MD5(
CAST(
COALESCE(CAST(block_number AS text), '' :: STRING) AS text
)
) AS _fsc_id,
s._partition_by_block_id,
s.value AS VALUE
FROM
{{ source("bronze_streamline","blocks") }} s
JOIN meta b
ON b.file_name = metadata$filename
AND b._partition_by_block_id = s._partition_by_block_id
WHERE
b._partition_by_block_id = s._partition_by_block_id

View File

@ -0,0 +1,11 @@
{{ config (
materialized = 'view'
) }}
{% set model = this.identifier.split("_") [-1] %}
{{ streamline_external_table_query(
model = "collections",
partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 3), '_', 1) AS INTEGER)",
partition_name = "_partition_by_block_id",
unique_key = "id"
) }}

View File

@ -0,0 +1,34 @@
{{ config (
materialized = 'view'
) }}
WITH meta AS (
SELECT
registered_on AS _inserted_timestamp,
file_name,
CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 3), '_', 1) AS INTEGER) AS _partition_by_block_id
FROM
TABLE(
information_schema.external_table_files(
table_name => '{{ source( "bronze_streamline", "blocks") }}'
)
) A
)
SELECT
block_number,
DATA,
_inserted_timestamp,
MD5(
CAST(
COALESCE(CAST(block_number AS text), '' :: STRING) AS text
)
) AS _fsc_id,
s._partition_by_block_id,
s.value AS VALUE
FROM
{{ source("bronze_streamline","blocks") }} s
JOIN meta b
ON b.file_name = metadata$filename
AND b._partition_by_block_id = s._partition_by_block_id
WHERE
b._partition_by_block_id = s._partition_by_block_id

View File

@ -0,0 +1,11 @@
{{ config (
materialized = 'view'
) }}
{% set model = this.identifier.split("_") [-1] %}
{{ streamline_external_table_FR_query(
model = 'collections',
partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 3), '_', 1) AS INTEGER)",
partition_name = "_partition_by_block_id",
unique_key = "id"
) }}

View File

@ -0,0 +1,13 @@
{{ config (
materialized = 'view'
) }}
{% set model = this.identifier.split("_")[-2:] | join('_') %}
{{ streamline_external_table_FR_query(
model = 'transaction_results',
partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 3), '_', 1) AS INTEGER)",
partition_name = "_partition_by_block_id",
unique_key = "id"
) }}

View File

@ -0,0 +1,11 @@
{{ config (
materialized = 'view'
) }}
{% set model = this.identifier.split("_") [-1] %}
{{ streamline_external_table_FR_query(
model = 'transactions',
partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 3), '_', 1) AS INTEGER)",
partition_name = "_partition_by_block_id",
unique_key = "id"
) }}

View File

@ -0,0 +1,11 @@
{{ config (
materialized = 'view'
) }}
{% set model = this.identifier.split("_")[-2:] | join('_') %}
{{ streamline_external_table_query(
model = 'transaction_results',
partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 3), '_', 1) AS INTEGER)",
partition_name = "_partition_by_block_id",
unique_key = "id"
) }}

View File

@ -0,0 +1,11 @@
{{ config (
materialized = 'view'
) }}
{% set model = this.identifier.split("_") [-1] %}
{{ streamline_external_table_query(
model = 'transactions',
partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 3), '_', 1) AS INTEGER)",
partition_name = "_partition_by_block_id",
unique_key = "id"
) }}

View File

@ -6,7 +6,8 @@
unique_key = "nft_id",
post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION ON EQUALITY(nft_id,nbatopshot_id);",
tags = ['scheduled_non_core'],
meta ={ 'database_tags':{ 'table':{ 'PURPOSE': 'NFT, TOPSHOT' }} }
meta ={ 'database_tags':{ 'table':{ 'PURPOSE': 'NFT, TOPSHOT' }} },
enabled = false
) }}
-- depends_on: {{ ref('bronze__streamline_topshot_metadata') }}
WITH

View File

@ -6,7 +6,8 @@
cluster_by = ['block_timestamp::date', 'modified_timestamp::date'],
unique_key = "topshot_buyback_id",
tags = ['nft', 'topshot', 'scheduled'],
meta = { 'database_tags': { 'table': { 'PURPOSE': 'NFT, TOPSHOT' } } }
meta = { 'database_tags': { 'table': { 'PURPOSE': 'NFT, TOPSHOT' } } },
enabled = false
) }}
WITH flowty_sales AS (

View File

@ -1,7 +1,7 @@
version: 2
models:
- name: silver__swaps_factory
- name: silver__increment_swaps
description: |-
This table records asset swaps on the Flow blockchain parsed from Swap events emitted by Increment SwapPair contracts (via the SwapFactory contract).
tests:

View File

@ -1,35 +0,0 @@
{{ config(
materialized = 'incremental',
unique_key = '_id',
tags = ['livequery', 'topshot', 'moment_metadata']
) }}
SELECT
moment_id,
event_contract,
_inserted_date,
_inserted_timestamp,
MD5(
'moment_id' || 'event_contract' || '_inserted_date'
) AS _id
FROM
{{ ref('livequery__request_topshot_metadata') }}
WHERE
DATA :data :data :getMintedMoment :: STRING IS NULL
{% if is_incremental() %}
AND _inserted_date >= (
SELECT
MAX(_inserted_date)
FROM
{{ this }}
)
AND _inserted_timestamp > (
SELECT
MAX(_inserted_timestamp)
FROM
{{ this }}
)
{% else %}
AND _inserted_date >= '2022-12-09'
{% endif %}

View File

@ -1,104 +0,0 @@
import snowflake.snowpark.types as T
import snowflake.snowpark.functions as F
def register_udf_construct_data():
"""
Helper function to register a named UDF to construct the DATA object for the API call.
This named UDF can be used with a column expression, so multiple moment_ids can be called at the same time.
"""
udf_construct_data = (
F.udf(
lambda query, moment_id: {'query': query,
'variables': {'momentId': moment_id}},
name='udf_construct_data',
input_types=[
T.StringType(),
T.StringType()
],
return_type=T.VariantType(),
replace=True
)
)
return udf_construct_data
def model(dbt, session):
"""
This model will call the TopShot GraphQL API to request metadata for a list of moment_ids, determined by an exeternally defined view.
The request arguments are a GraphQL query and moment ID. The gql and API URL are stored in a table and retrieved in this workflow.
"""
dbt.config(
materialized='incremental',
unique_key='_RES_ID',
packages=['snowflake-snowpark-python'],
tags=['livequery', 'topshot', 'moment_metadata'],
incremental_strategy='delete+insert',
cluster_by=['_INSERTED_TIMESTAMP']
)
# base url and graphql query stored in table via dbt
topshot_gql_params = dbt.ref(
'livequery__moments_parameters').select(
'base_url', 'query').where(
F.col(
'contract') == 'A.0b2a3299cc857e29.TopShot'
).collect()
# define params for UDF_API
method = 'POST'
headers = {
'Accept': 'application/json',
'Accept-Encoding': 'gzip',
'Connection': 'keep-alive',
'Content-Type': 'application/json',
'User-Agent': 'Flipside_Flow_metadata/0.1'
}
url = topshot_gql_params[0][0]
# gql query passed with the post request
data = topshot_gql_params[0][1]
# metadata request requires moment_id, defined in a separate view
# number of moment_ids to request set by .limit(), timeout experienced at 4000
inputs = dbt.ref(
'livequery__topshot_moments_metadata_needed').select(
"EVENT_CONTRACT", "MOMENT_ID"
).limit(100)
# Note prior limit of 3500 leads to 429 error / rate limit by system
# Per Dapper team, 50 reqs per 10 seconds. If exceeded, blocked for 30s.
# register the udf_construct_data function
udf_construct_data = register_udf_construct_data()
# use with_columns to source moment_id from the input_df and call multiple udf_api calls at once
# columns defined in the array will be appended to the input dataframe
response = inputs.with_columns(
['DATA', '_INSERTED_DATE', '_INSERTED_TIMESTAMP', '_RES_ID'],
[
F.call_udf(
'flow.live.udf_api',
method,
url,
headers,
udf_construct_data(
F.lit(data),
F.col('MOMENT_ID')
)
),
F.sysdate().cast(T.DateType()),
F.sysdate(),
F.md5(
F.concat(
F.col('EVENT_CONTRACT'),
F.col('MOMENT_ID')
)
)
]
)
# dbt will append response to table per incremental config
return response

View File

@ -1,19 +0,0 @@
version: 2
models:
- name: livequery__request_topshot_metadata
description: |-
LiveQuery-based model to request TopShot metadata from the public graphQL endpoint.
columns:
- name: EVENT_CONTRACT
- name: MOMENT_ID
- name: DATA
- name: _INSERTED_DATE
- name: _INSERTED_TIMESTAMP
- name: _RES_ID

View File

@ -1,91 +0,0 @@
{{ config(
materialized = 'view',
tags = ['livequery', 'topshot', 'moment_metadata']
) }}
WITH mints AS (
SELECT
event_contract,
event_data :momentID :: STRING AS moment_id
FROM
{{ ref('silver__nft_moments_s') }}
WHERE
event_contract = 'A.0b2a3299cc857e29.TopShot'
AND event_type = 'MomentMinted'
),
sales AS (
SELECT
nft_collection AS event_contract,
nft_id AS moment_id
FROM
{{ ref('silver__nft_sales_s') }}
WHERE
nft_collection ILIKE '%topshot%'
),
all_topshots AS (
SELECT
event_contract,
moment_id
FROM
mints
UNION
SELECT
event_contract,
moment_id
FROM
sales
),
lq_always_null AS (
SELECT
moment_id,
event_contract,
COUNT(1) AS num_times_null_resp
FROM
{{ target.database }}.livequery.null_moments_metadata
WHERE
event_contract = 'A.0b2a3299cc857e29.TopShot'
GROUP BY
1,
2
HAVING
num_times_null_resp > 2
),
legacy_always_null AS (
SELECT
id,
contract,
COUNT(1) AS num_times_null_resp
FROM
{{ ref('streamline__null_moments_metadata') }}
WHERE
contract = 'A.0b2a3299cc857e29.TopShot'
GROUP BY
1,
2
HAVING
num_times_null_resp > 2
)
SELECT
DISTINCT *
FROM
all_topshots
WHERE
moment_id NOT IN (
(
SELECT
nft_id AS moment_id
FROM
{{ target.database }}.silver.nft_topshot_metadata
UNION
SELECT
id AS moment_id
FROM
legacy_always_null
UNION
SELECT
moment_id
FROM
lq_always_null
)
)

View File

@ -1,74 +0,0 @@
{{ config(
materialized = 'incremental',
incremental_strategy = 'delete+insert',
cluster_by = ['_inserted_timestamp::DATE'],
unique_key = 'nft_id',
tags = ['livequery', 'topshot'],
full_refresh = False,
enabled = false
) }}
{# NFT Metadata from legacy process lives in external table, deleted CTE and set FR=False
TO
LIMIT
/ avoid unnecessary TABLE scans #}
WITH metadata_lq AS (
SELECT
_res_id,
'A.0b2a3299cc857e29.TopShot' AS contract,
moment_id,
DATA :data :data :: variant AS DATA,
_inserted_timestamp
FROM
{{ ref('livequery__request_topshot_metadata') }}
{% if is_incremental() %}
WHERE
_inserted_timestamp >= (
SELECT
MAX(_inserted_timestamp)
FROM
{{ this }}
)
{% endif %}
),
lq_final AS (
SELECT
moment_id :: STRING AS nft_id,
contract :: STRING AS nft_collection,
DATA :getMintedMoment :data :id :: STRING AS nbatopshot_id,
DATA :getMintedMoment :data :flowSerialNumber :: NUMBER AS serial_number,
DATA :getMintedMoment :data :setPlay :circulationCount :: NUMBER AS total_circulation,
DATA :getMintedMoment :data :play :description :: VARCHAR AS moment_description,
DATA :getMintedMoment :data :play :stats :playerName :: STRING AS player,
DATA :getMintedMoment :data :play :stats :teamAtMoment :: STRING AS team,
DATA :getMintedMoment :data :play :stats :nbaSeason :: STRING AS season,
DATA :getMintedMoment :data :play :stats :playCategory :: STRING AS play_category,
DATA :getMintedMoment :data :play :stats :playType :: STRING AS play_type,
DATA :getMintedMoment :data :play :stats :dateOfMoment :: TIMESTAMP AS moment_date,
DATA :getMintedMoment :data :set :flowName :: STRING AS set_name,
DATA :getMintedMoment :data :set :flowSeriesNumber :: NUMBER AS set_series_number,
DATA :getMintedMoment :data :play :assets :videos :: ARRAY AS video_urls,
DATA :getMintedMoment :data :play :stats :: OBJECT AS moment_stats_full,
DATA :getMintedMoment :data :play :statsPlayerGameScores :: OBJECT AS player_stats_game,
DATA :getMintedMoment :data :play :statsPlayerSeasonAverageScores :: OBJECT AS player_stats_season_to_date,
_inserted_timestamp
FROM
metadata_lq
WHERE
DATA :getMintedMoment :: STRING IS NOT NULL
)
SELECT
*,
{{ dbt_utils.generate_surrogate_key(
['nft_id']
) }} AS nft_moment_metadata_topshot_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp,
'{{ invocation_id }}' AS _invocation_id
FROM
lq_final qualify ROW_NUMBER() over (
PARTITION BY nft_id
ORDER BY
_inserted_timestamp DESC
) = 1

View File

@ -1,73 +0,0 @@
version: 2
models:
- name: silver__nft_topshot_metadata
description: |-
Data for TopShot Moments, including player, team, stats and more.
columns:
- name: nft_id
description: "{{ doc('nft_id') }}"
- name: nft_collection
description: "{{ doc('nft_collection') }}"
- name: nbatopshot_id
description: "{{ doc('nbatopshot_id') }}"
- name: serial_number
description: "{{ doc('serial_number') }}"
- name: total_circulation
description: "{{ doc('total_circulation') }}"
- name: moment_description
description: "{{ doc('moment_description') }}"
- name: player
description: "{{ doc('player') }}"
- name: team
description: "{{ doc('team') }}"
- name: season
description: "{{ doc('season') }}"
- name: play_category
description: "{{ doc('play_category') }}"
- name: play_type
description: "{{ doc('play_type') }}"
- name: moment_date
description: "{{ doc('moment_date') }}"
- name: set_name
description: "{{ doc('set_name') }}"
- name: set_series_number
description: "{{ doc('set_series_number') }}"
- name: video_urls
description: "{{ doc('video_urls') }}"
- name: moment_stats_full
description: "{{ doc('moment_stats_full') }}"
- name: player_stats_game
description: "{{ doc('player_stats_game') }}"
- name: player_stats_season_to_date
description: "{{ doc('player_stats_season_to_date') }}"
- name: _inserted_timestamp
description: "{{ doc('_inserted_timestamp') }}"
- name: nft_moment_metadata_topshot_id
description: "{{ doc('pk_id') }}"
- name: inserted_timestamp
description: "{{ doc('inserted_timestamp') }}"
- name: modified_timestamp
description: "{{ doc('modified_timestamp') }}"

View File

@ -1,64 +0,0 @@
{{ config(
materialized = 'incremental',
incremental_strategy = 'delete+insert',
cluster_by = ['_inserted_timestamp::DATE'],
unique_key = 'nft_id',
tags = ['streamline', 'topshot']
) }}
-- depends_on: {{ ref('bronze__streamline_topshot_metadata') }}
WITH metadata_from_streamline AS (
SELECT
VALUE :CONTRACT AS contract,
VALUE :ID AS moment_id,
DATA,
_inserted_timestamp
FROM
{% if is_incremental() %}
{{ ref('bronze__streamline_topshot_metadata') }}
WHERE
_inserted_timestamp >= (
SELECT
MAX(_inserted_timestamp) _inserted_timestamp
FROM
{{ this }}
)
{% else %}
{{ ref('bronze__streamline_topshot_metadata_FR') }}
{% endif %}
)
SELECT
moment_id :: STRING AS nft_id,
contract :: STRING AS nft_collection,
DATA :data :getMintedMoment :data :id :: STRING AS nbatopshot_id,
DATA :data :getMintedMoment :data :flowSerialNumber :: NUMBER AS serial_number,
DATA :data :getMintedMoment :data :setPlay :circulationCount :: NUMBER AS total_circulation,
DATA :data :getMintedMoment :data :play :description :: VARCHAR AS moment_description,
DATA :data :getMintedMoment :data :play :stats :playerName :: STRING AS player,
DATA :data :getMintedMoment :data :play :stats :teamAtMoment :: STRING AS team,
DATA :data :getMintedMoment :data :play :stats :nbaSeason :: STRING AS season,
DATA :data :getMintedMoment :data :play :stats :playCategory :: STRING AS play_category,
DATA :data :getMintedMoment :data :play :stats :playType :: STRING AS play_type,
DATA :data :getMintedMoment :data :play :stats :dateOfMoment :: TIMESTAMP AS moment_date,
DATA :data :getMintedMoment :data :set :flowName :: STRING AS set_name,
DATA :data :getMintedMoment :data :set :flowSeriesNumber :: NUMBER AS set_series_number,
DATA :data :getMintedMoment :data :play :assets :videos :: ARRAY AS video_urls,
DATA :data :getMintedMoment :data :play :stats :: OBJECT AS moment_stats_full,
DATA :data :getMintedMoment :data :play :statsPlayerGameScores :: OBJECT AS player_stats_game,
DATA :data :getMintedMoment :data :play :statsPlayerSeasonAverageScores :: OBJECT AS player_stats_season_to_date,
_inserted_timestamp,
{{ dbt_utils.generate_surrogate_key(
['nft_id']
) }} AS nft_topshot_metadata_v2_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp,
'{{ invocation_id }}' AS _invocation_id
FROM
metadata_from_streamline
WHERE
DATA :errors IS NULL qualify ROW_NUMBER() over (
PARTITION BY nft_id
ORDER BY
_inserted_timestamp DESC
) = 1

View File

@ -1,33 +0,0 @@
{{ config(
materialized = 'view'
) }}
SELECT
nft_id,
nft_collection,
nbatopshot_id,
serial_number,
total_circulation,
moment_description,
player,
team,
season,
play_category,
play_type,
moment_date,
set_name,
set_series_number,
video_urls,
moment_stats_full,
player_stats_game,
player_stats_season_to_date,
_INSERTED_TIMESTAMP,
nft_moment_metadata_topshot_id,
inserted_timestamp,
modified_timestamp,
_INVOCATION_ID
FROM
{{ source(
'silver',
'nft_topshot_metadata'
) }}

View File

@ -1,34 +0,0 @@
{{ config(
materialized = 'incremental',
unique_key = ["id","contract","_inserted_date"],
tags = ['topshot', 'moment_metadata'],
enabled = True
) }}
{# Legacy workflow - TODO deprecate soon #}
SELECT
id,
contract,
_inserted_date,
_inserted_timestamp
FROM
{{ ref('bronze__moments_metadata') }}
WHERE
DATA :getMintedMoment :: STRING IS NULL
{% if is_incremental() %}
AND _inserted_date >= (
SELECT
MAX(_inserted_date)
FROM
{{ this }}
)
AND _inserted_timestamp > (
SELECT
MAX(_inserted_timestamp)
FROM
{{ this }}
)
{% else %}
AND _inserted_date >= '2022-12-09'
{% endif %}

View File

@ -8,9 +8,13 @@ sources:
tables:
- name: moments_minted_metadata_api
- name: blocks
- name: blocks_v2
- name: collections
- name: collections_v2
- name: transactions
- name: transactions_v2
- name: transaction_results
- name: transaction_results_v2
- name: BLOCKS_CANDIDATE_07
- name: BLOCKS_CANDIDATE_08
- name: BLOCKS_CANDIDATE_09

View File

@ -1,7 +1,7 @@
{{ config (
materialized = "view",
post_hook = if_data_call_function(
func = "{{this.schema}}.udf_bulk_grpc(object_construct('sql_source', '{{this.identifier}}','node_url','access.mainnet.nodes.onflow.org:9000','external_table', 'blocks', 'sql_limit', {{var('sql_limit','500000')}}, 'producer_batch_size', {{var('producer_batch_size','10000')}}, 'worker_batch_size', {{var('worker_batch_size','1000')}}, 'batch_call_limit', {{var('batch_call_limit','1')}}))",
func = "{{this.schema}}.udf_bulk_grpc_v2(object_construct('sql_source', '{{this.identifier}}','node_url','access.mainnet.nodes.onflow.org:9000','external_table', 'blocks_v2', 'sql_limit', {{var('sql_limit','500000')}}, 'producer_batch_size', {{var('producer_batch_size','10000')}}, 'worker_batch_size', {{var('worker_batch_size','1000')}}, 'batch_call_limit', {{var('batch_call_limit','1')}}))",
target = "{{this.schema}}.{{this.identifier}}"
),
tags = ['streamline_realtime']

View File

@ -1,7 +1,7 @@
{{ config (
materialized = "view",
post_hook = if_data_call_function(
func = "{{this.schema}}.udf_bulk_grpc(object_construct('sql_source', '{{this.identifier}}','node_url','access.mainnet.nodes.onflow.org:9000','external_table', 'collections', 'sql_limit', {{var('sql_limit','500000')}}, 'producer_batch_size', {{var('producer_batch_size','10000')}}, 'worker_batch_size', {{var('worker_batch_size','1000')}}, 'batch_call_limit', {{var('batch_call_limit','1')}}))",
func = "{{this.schema}}.udf_bulk_grpc_v2(object_construct('sql_source', '{{this.identifier}}','node_url','access.mainnet.nodes.onflow.org:9000','external_table', 'collections_v2', 'sql_limit', {{var('sql_limit','500000')}}, 'producer_batch_size', {{var('producer_batch_size','10000')}}, 'worker_batch_size', {{var('worker_batch_size','1000')}}, 'batch_call_limit', {{var('batch_call_limit','1')}}))",
target = "{{this.schema}}.{{this.identifier}}"
),
tags = ['streamline_realtime']

View File

@ -1,7 +1,7 @@
{{ config (
materialized = "view",
post_hook = if_data_call_function(
func = "{{this.schema}}.udf_bulk_grpc(object_construct('sql_source', '{{this.identifier}}','node_url','access.mainnet.nodes.onflow.org:9000','external_table', 'transaction_results', 'sql_limit', {{var('sql_limit','500000')}}, 'producer_batch_size', {{var('producer_batch_size','10000')}}, 'worker_batch_size', {{var('worker_batch_size','1000')}}, 'batch_call_limit', {{var('batch_call_limit','1')}}))",
func = "{{this.schema}}.udf_bulk_grpc_v2(object_construct('sql_source', '{{this.identifier}}','node_url','access.mainnet.nodes.onflow.org:9000','external_table', 'transaction_results_v2', 'sql_limit', {{var('sql_limit','500000')}}, 'producer_batch_size', {{var('producer_batch_size','10000')}}, 'worker_batch_size', {{var('worker_batch_size','1000')}}, 'batch_call_limit', {{var('batch_call_limit','1')}}))",
target = "{{this.schema}}.{{this.identifier}}"
),
tags = ['streamline_realtime']

View File

@ -1,7 +1,7 @@
{{ config (
materialized = "view",
post_hook = if_data_call_function(
func = "{{this.schema}}.udf_bulk_grpc(object_construct('sql_source', '{{this.identifier}}','node_url','access.mainnet.nodes.onflow.org:9000','external_table', 'transactions', 'sql_limit', {{var('sql_limit','500000')}}, 'producer_batch_size', {{var('producer_batch_size','10000')}}, 'worker_batch_size', {{var('worker_batch_size','1000')}}, 'batch_call_limit', {{var('batch_call_limit','1')}}))",
func = "{{this.schema}}.udf_bulk_grpc_v2(object_construct('sql_source', '{{this.identifier}}','node_url','access.mainnet.nodes.onflow.org:9000','external_table', 'transactions_v2', 'sql_limit', {{var('sql_limit','500000')}}, 'producer_batch_size', {{var('producer_batch_size','10000')}}, 'worker_batch_size', {{var('worker_batch_size','1000')}}, 'batch_call_limit', {{var('batch_call_limit','1')}}))",
target = "{{this.schema}}.{{this.identifier}}"
),
tags = ['streamline_realtime']

View File

@ -1,30 +0,0 @@
WITH mint_events AS (
SELECT
MAX(block_timestamp) :: DATE AS last_mint_date
FROM
{{ ref('silver__nft_moments_s') }}
WHERE
event_contract = 'A.0b2a3299cc857e29.TopShot'
AND event_type = 'MomentMinted'
),
moments AS (
SELECT
nft_collection,
nft_id,
_inserted_timestamp
FROM
{{ ref('silver__nft_topshot_metadata') }}
WHERE
_inserted_timestamp :: DATE >= (
SELECT
last_mint_date
FROM
mint_events
)
)
SELECT
IFF(COUNT(nft_id) > 0, TRUE, FALSE) AS recent
FROM
moments
HAVING
recent = FALSE