rm topshot

This commit is contained in:
gregoriustanleyy 2025-08-20 17:09:30 +07:00
parent 4603b20591
commit a8e0e9ebbe
12 changed files with 4 additions and 559 deletions

View File

@ -6,7 +6,8 @@
unique_key = "nft_id",
post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION ON EQUALITY(nft_id,nbatopshot_id);",
tags = ['scheduled_non_core'],
meta ={ 'database_tags':{ 'table':{ 'PURPOSE': 'NFT, TOPSHOT' }} }
meta ={ 'database_tags':{ 'table':{ 'PURPOSE': 'NFT, TOPSHOT' }} },
enabled = false
) }}
-- depends_on: {{ ref('bronze__streamline_topshot_metadata') }}
WITH

View File

@ -6,7 +6,8 @@
cluster_by = ['block_timestamp::date', 'modified_timestamp::date'],
unique_key = "topshot_buyback_id",
tags = ['nft', 'topshot', 'scheduled'],
meta = { 'database_tags': { 'table': { 'PURPOSE': 'NFT, TOPSHOT' } } }
meta = { 'database_tags': { 'table': { 'PURPOSE': 'NFT, TOPSHOT' } } },
enabled = false
) }}
WITH flowty_sales AS (

View File

@ -1,35 +0,0 @@
{{ config(
materialized = 'incremental',
unique_key = '_id',
tags = ['livequery', 'topshot', 'moment_metadata']
) }}
SELECT
moment_id,
event_contract,
_inserted_date,
_inserted_timestamp,
MD5(
'moment_id' || 'event_contract' || '_inserted_date'
) AS _id
FROM
{{ ref('livequery__request_topshot_metadata') }}
WHERE
DATA :data :data :getMintedMoment :: STRING IS NULL
{% if is_incremental() %}
AND _inserted_date >= (
SELECT
MAX(_inserted_date)
FROM
{{ this }}
)
AND _inserted_timestamp > (
SELECT
MAX(_inserted_timestamp)
FROM
{{ this }}
)
{% else %}
AND _inserted_date >= '2022-12-09'
{% endif %}

View File

@ -1,104 +0,0 @@
import snowflake.snowpark.types as T
import snowflake.snowpark.functions as F
def register_udf_construct_data():
"""
Helper function to register a named UDF to construct the DATA object for the API call.
This named UDF can be used with a column expression, so multiple moment_ids can be called at the same time.
"""
udf_construct_data = (
F.udf(
lambda query, moment_id: {'query': query,
'variables': {'momentId': moment_id}},
name='udf_construct_data',
input_types=[
T.StringType(),
T.StringType()
],
return_type=T.VariantType(),
replace=True
)
)
return udf_construct_data
def model(dbt, session):
"""
This model will call the TopShot GraphQL API to request metadata for a list of moment_ids, determined by an exeternally defined view.
The request arguments are a GraphQL query and moment ID. The gql and API URL are stored in a table and retrieved in this workflow.
"""
dbt.config(
materialized='incremental',
unique_key='_RES_ID',
packages=['snowflake-snowpark-python'],
tags=['livequery', 'topshot', 'moment_metadata'],
incremental_strategy='delete+insert',
cluster_by=['_INSERTED_TIMESTAMP']
)
# base url and graphql query stored in table via dbt
topshot_gql_params = dbt.ref(
'livequery__moments_parameters').select(
'base_url', 'query').where(
F.col(
'contract') == 'A.0b2a3299cc857e29.TopShot'
).collect()
# define params for UDF_API
method = 'POST'
headers = {
'Accept': 'application/json',
'Accept-Encoding': 'gzip',
'Connection': 'keep-alive',
'Content-Type': 'application/json',
'User-Agent': 'Flipside_Flow_metadata/0.1'
}
url = topshot_gql_params[0][0]
# gql query passed with the post request
data = topshot_gql_params[0][1]
# metadata request requires moment_id, defined in a separate view
# number of moment_ids to request set by .limit(), timeout experienced at 4000
inputs = dbt.ref(
'livequery__topshot_moments_metadata_needed').select(
"EVENT_CONTRACT", "MOMENT_ID"
).limit(100)
# Note prior limit of 3500 leads to 429 error / rate limit by system
# Per Dapper team, 50 reqs per 10 seconds. If exceeded, blocked for 30s.
# register the udf_construct_data function
udf_construct_data = register_udf_construct_data()
# use with_columns to source moment_id from the input_df and call multiple udf_api calls at once
# columns defined in the array will be appended to the input dataframe
response = inputs.with_columns(
['DATA', '_INSERTED_DATE', '_INSERTED_TIMESTAMP', '_RES_ID'],
[
F.call_udf(
'flow.live.udf_api',
method,
url,
headers,
udf_construct_data(
F.lit(data),
F.col('MOMENT_ID')
)
),
F.sysdate().cast(T.DateType()),
F.sysdate(),
F.md5(
F.concat(
F.col('EVENT_CONTRACT'),
F.col('MOMENT_ID')
)
)
]
)
# dbt will append response to table per incremental config
return response

View File

@ -1,19 +0,0 @@
version: 2
models:
- name: livequery__request_topshot_metadata
description: |-
LiveQuery-based model to request TopShot metadata from the public graphQL endpoint.
columns:
- name: EVENT_CONTRACT
- name: MOMENT_ID
- name: DATA
- name: _INSERTED_DATE
- name: _INSERTED_TIMESTAMP
- name: _RES_ID

View File

@ -1,91 +0,0 @@
{{ config(
materialized = 'view',
tags = ['livequery', 'topshot', 'moment_metadata']
) }}
WITH mints AS (
SELECT
event_contract,
event_data :momentID :: STRING AS moment_id
FROM
{{ ref('silver__nft_moments_s') }}
WHERE
event_contract = 'A.0b2a3299cc857e29.TopShot'
AND event_type = 'MomentMinted'
),
sales AS (
SELECT
nft_collection AS event_contract,
nft_id AS moment_id
FROM
{{ ref('silver__nft_sales_s') }}
WHERE
nft_collection ILIKE '%topshot%'
),
all_topshots AS (
SELECT
event_contract,
moment_id
FROM
mints
UNION
SELECT
event_contract,
moment_id
FROM
sales
),
lq_always_null AS (
SELECT
moment_id,
event_contract,
COUNT(1) AS num_times_null_resp
FROM
{{ target.database }}.livequery.null_moments_metadata
WHERE
event_contract = 'A.0b2a3299cc857e29.TopShot'
GROUP BY
1,
2
HAVING
num_times_null_resp > 2
),
legacy_always_null AS (
SELECT
id,
contract,
COUNT(1) AS num_times_null_resp
FROM
{{ ref('streamline__null_moments_metadata') }}
WHERE
contract = 'A.0b2a3299cc857e29.TopShot'
GROUP BY
1,
2
HAVING
num_times_null_resp > 2
)
SELECT
DISTINCT *
FROM
all_topshots
WHERE
moment_id NOT IN (
(
SELECT
nft_id AS moment_id
FROM
{{ target.database }}.silver.nft_topshot_metadata
UNION
SELECT
id AS moment_id
FROM
legacy_always_null
UNION
SELECT
moment_id
FROM
lq_always_null
)
)

View File

@ -1,74 +0,0 @@
{{ config(
materialized = 'incremental',
incremental_strategy = 'delete+insert',
cluster_by = ['_inserted_timestamp::DATE'],
unique_key = 'nft_id',
tags = ['livequery', 'topshot'],
full_refresh = False,
enabled = false
) }}
{# NFT Metadata from legacy process lives in external table, deleted CTE and set FR=False
TO
LIMIT
/ avoid unnecessary TABLE scans #}
WITH metadata_lq AS (
SELECT
_res_id,
'A.0b2a3299cc857e29.TopShot' AS contract,
moment_id,
DATA :data :data :: variant AS DATA,
_inserted_timestamp
FROM
{{ ref('livequery__request_topshot_metadata') }}
{% if is_incremental() %}
WHERE
_inserted_timestamp >= (
SELECT
MAX(_inserted_timestamp)
FROM
{{ this }}
)
{% endif %}
),
lq_final AS (
SELECT
moment_id :: STRING AS nft_id,
contract :: STRING AS nft_collection,
DATA :getMintedMoment :data :id :: STRING AS nbatopshot_id,
DATA :getMintedMoment :data :flowSerialNumber :: NUMBER AS serial_number,
DATA :getMintedMoment :data :setPlay :circulationCount :: NUMBER AS total_circulation,
DATA :getMintedMoment :data :play :description :: VARCHAR AS moment_description,
DATA :getMintedMoment :data :play :stats :playerName :: STRING AS player,
DATA :getMintedMoment :data :play :stats :teamAtMoment :: STRING AS team,
DATA :getMintedMoment :data :play :stats :nbaSeason :: STRING AS season,
DATA :getMintedMoment :data :play :stats :playCategory :: STRING AS play_category,
DATA :getMintedMoment :data :play :stats :playType :: STRING AS play_type,
DATA :getMintedMoment :data :play :stats :dateOfMoment :: TIMESTAMP AS moment_date,
DATA :getMintedMoment :data :set :flowName :: STRING AS set_name,
DATA :getMintedMoment :data :set :flowSeriesNumber :: NUMBER AS set_series_number,
DATA :getMintedMoment :data :play :assets :videos :: ARRAY AS video_urls,
DATA :getMintedMoment :data :play :stats :: OBJECT AS moment_stats_full,
DATA :getMintedMoment :data :play :statsPlayerGameScores :: OBJECT AS player_stats_game,
DATA :getMintedMoment :data :play :statsPlayerSeasonAverageScores :: OBJECT AS player_stats_season_to_date,
_inserted_timestamp
FROM
metadata_lq
WHERE
DATA :getMintedMoment :: STRING IS NOT NULL
)
SELECT
*,
{{ dbt_utils.generate_surrogate_key(
['nft_id']
) }} AS nft_moment_metadata_topshot_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp,
'{{ invocation_id }}' AS _invocation_id
FROM
lq_final qualify ROW_NUMBER() over (
PARTITION BY nft_id
ORDER BY
_inserted_timestamp DESC
) = 1

View File

@ -1,73 +0,0 @@
version: 2
models:
- name: silver__nft_topshot_metadata
description: |-
Data for TopShot Moments, including player, team, stats and more.
columns:
- name: nft_id
description: "{{ doc('nft_id') }}"
- name: nft_collection
description: "{{ doc('nft_collection') }}"
- name: nbatopshot_id
description: "{{ doc('nbatopshot_id') }}"
- name: serial_number
description: "{{ doc('serial_number') }}"
- name: total_circulation
description: "{{ doc('total_circulation') }}"
- name: moment_description
description: "{{ doc('moment_description') }}"
- name: player
description: "{{ doc('player') }}"
- name: team
description: "{{ doc('team') }}"
- name: season
description: "{{ doc('season') }}"
- name: play_category
description: "{{ doc('play_category') }}"
- name: play_type
description: "{{ doc('play_type') }}"
- name: moment_date
description: "{{ doc('moment_date') }}"
- name: set_name
description: "{{ doc('set_name') }}"
- name: set_series_number
description: "{{ doc('set_series_number') }}"
- name: video_urls
description: "{{ doc('video_urls') }}"
- name: moment_stats_full
description: "{{ doc('moment_stats_full') }}"
- name: player_stats_game
description: "{{ doc('player_stats_game') }}"
- name: player_stats_season_to_date
description: "{{ doc('player_stats_season_to_date') }}"
- name: _inserted_timestamp
description: "{{ doc('_inserted_timestamp') }}"
- name: nft_moment_metadata_topshot_id
description: "{{ doc('pk_id') }}"
- name: inserted_timestamp
description: "{{ doc('inserted_timestamp') }}"
- name: modified_timestamp
description: "{{ doc('modified_timestamp') }}"

View File

@ -1,64 +0,0 @@
{{ config(
materialized = 'incremental',
incremental_strategy = 'delete+insert',
cluster_by = ['_inserted_timestamp::DATE'],
unique_key = 'nft_id',
tags = ['streamline', 'topshot']
) }}
-- depends_on: {{ ref('bronze__streamline_topshot_metadata') }}
WITH metadata_from_streamline AS (
SELECT
VALUE :CONTRACT AS contract,
VALUE :ID AS moment_id,
DATA,
_inserted_timestamp
FROM
{% if is_incremental() %}
{{ ref('bronze__streamline_topshot_metadata') }}
WHERE
_inserted_timestamp >= (
SELECT
MAX(_inserted_timestamp) _inserted_timestamp
FROM
{{ this }}
)
{% else %}
{{ ref('bronze__streamline_topshot_metadata_FR') }}
{% endif %}
)
SELECT
moment_id :: STRING AS nft_id,
contract :: STRING AS nft_collection,
DATA :data :getMintedMoment :data :id :: STRING AS nbatopshot_id,
DATA :data :getMintedMoment :data :flowSerialNumber :: NUMBER AS serial_number,
DATA :data :getMintedMoment :data :setPlay :circulationCount :: NUMBER AS total_circulation,
DATA :data :getMintedMoment :data :play :description :: VARCHAR AS moment_description,
DATA :data :getMintedMoment :data :play :stats :playerName :: STRING AS player,
DATA :data :getMintedMoment :data :play :stats :teamAtMoment :: STRING AS team,
DATA :data :getMintedMoment :data :play :stats :nbaSeason :: STRING AS season,
DATA :data :getMintedMoment :data :play :stats :playCategory :: STRING AS play_category,
DATA :data :getMintedMoment :data :play :stats :playType :: STRING AS play_type,
DATA :data :getMintedMoment :data :play :stats :dateOfMoment :: TIMESTAMP AS moment_date,
DATA :data :getMintedMoment :data :set :flowName :: STRING AS set_name,
DATA :data :getMintedMoment :data :set :flowSeriesNumber :: NUMBER AS set_series_number,
DATA :data :getMintedMoment :data :play :assets :videos :: ARRAY AS video_urls,
DATA :data :getMintedMoment :data :play :stats :: OBJECT AS moment_stats_full,
DATA :data :getMintedMoment :data :play :statsPlayerGameScores :: OBJECT AS player_stats_game,
DATA :data :getMintedMoment :data :play :statsPlayerSeasonAverageScores :: OBJECT AS player_stats_season_to_date,
_inserted_timestamp,
{{ dbt_utils.generate_surrogate_key(
['nft_id']
) }} AS nft_topshot_metadata_v2_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp,
'{{ invocation_id }}' AS _invocation_id
FROM
metadata_from_streamline
WHERE
DATA :errors IS NULL qualify ROW_NUMBER() over (
PARTITION BY nft_id
ORDER BY
_inserted_timestamp DESC
) = 1

View File

@ -1,33 +0,0 @@
{{ config(
materialized = 'view'
) }}
SELECT
nft_id,
nft_collection,
nbatopshot_id,
serial_number,
total_circulation,
moment_description,
player,
team,
season,
play_category,
play_type,
moment_date,
set_name,
set_series_number,
video_urls,
moment_stats_full,
player_stats_game,
player_stats_season_to_date,
_INSERTED_TIMESTAMP,
nft_moment_metadata_topshot_id,
inserted_timestamp,
modified_timestamp,
_INVOCATION_ID
FROM
{{ source(
'silver',
'nft_topshot_metadata'
) }}

View File

@ -1,34 +0,0 @@
{{ config(
materialized = 'incremental',
unique_key = ["id","contract","_inserted_date"],
tags = ['topshot', 'moment_metadata'],
enabled = True
) }}
{# Legacy workflow - TODO deprecate soon #}
SELECT
id,
contract,
_inserted_date,
_inserted_timestamp
FROM
{{ ref('bronze__moments_metadata') }}
WHERE
DATA :getMintedMoment :: STRING IS NULL
{% if is_incremental() %}
AND _inserted_date >= (
SELECT
MAX(_inserted_date)
FROM
{{ this }}
)
AND _inserted_timestamp > (
SELECT
MAX(_inserted_timestamp)
FROM
{{ this }}
)
{% else %}
AND _inserted_date >= '2022-12-09'
{% endif %}

View File

@ -1,30 +0,0 @@
WITH mint_events AS (
SELECT
MAX(block_timestamp) :: DATE AS last_mint_date
FROM
{{ ref('silver__nft_moments_s') }}
WHERE
event_contract = 'A.0b2a3299cc857e29.TopShot'
AND event_type = 'MomentMinted'
),
moments AS (
SELECT
nft_collection,
nft_id,
_inserted_timestamp
FROM
{{ ref('silver__nft_topshot_metadata') }}
WHERE
_inserted_timestamp :: DATE >= (
SELECT
last_mint_date
FROM
mint_events
)
)
SELECT
IFF(COUNT(nft_id) > 0, TRUE, FALSE) AS recent
FROM
moments
HAVING
recent = FALSE