AN-3712/Metadata livequery (#146)

* topshot metadata pull v1

* before swap from lambda

* ready for stress test / batch sizing tmrw

* define unique key in helper tbl

* typo

* add todo deloy udf_api

* slight format tweak

* named

* migrate to livequery schema

* add parameter yml

* add livequery model yml, need tests

* alter metadata job run

* del old streamline macros

* silver upd - in flight

* clean up py model

* limit comment

* revert silver topshot metadata

* new silver model

* union new lq data into nft metadata table(s)

* del block_timestamp from metadata needed view

* untag silver nft allday metadata

* set limit to 3500

* addback topshot tag

* update schedule to hourly
This commit is contained in:
Jack Forgash 2023-09-07 10:13:02 -06:00 committed by GitHub
parent 5357fa70cb
commit b1f0925a90
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 568 additions and 109 deletions

View File

@ -5,7 +5,9 @@ on:
workflow_dispatch:
schedule:
# Runs 0000 UTC daily (see https://crontab.guru)
- cron: '0 0 * * *'
# - cron: '0 0 * * *'
# Runs hourly for next 30 hours, then reset to just daily
- cron: '0 * * * *'
env:
USE_VARS: "${{ vars.USE_VARS }}"
@ -28,8 +30,7 @@ jobs:
uses: FlipsideCrypto/analytics-workflow-templates/.github/workflows/dbt_run_template.yml@main
with:
dbt_command: >
dbt run-operation run_bulk_get_topshot_moments_metadata;
dbt run-operation run_bulk_get_nfl_allday_moments_metadata;
dbt run -s tag:livequery
environment: workflow_prod
warehouse: ${{ vars.WAREHOUSE }}
secrets: inherit

View File

@ -1,15 +0,0 @@
{% macro run_bulk_get_nfl_allday_moments_metadata() %}
{% set sql %}
select streamline.udf_bulk_get_nfl_allday_moments_metadata()
where exists (
select 1
from streamline.allday_moments_metadata_needed
limit 1
)
{% endset %}
{% do run_query(sql) %}
{% endmacro %}

View File

@ -1,8 +0,0 @@
{% macro udf_bulk_get_nfl_allday_moments_metadata() %}
CREATE
OR REPLACE EXTERNAL FUNCTION streamline.udf_bulk_get_nfl_allday_moments_metadata() returns text api_integration = aws_flow_api_dev AS {% if target.database == "FLOW" -%}
'https://3ltti6kisi.execute-api.us-east-1.amazonaws.com/prod/bulk_get_nfl_allday_metadata'
{% else %}
'https://wn6lmi2rs4.execute-api.us-east-1.amazonaws.com/dev/bulk_get_nfl_allday_metadata'
{%- endif %}
{% endmacro %}

View File

@ -1,15 +0,0 @@
{% macro run_bulk_get_topshot_moments_metadata() %}
{% set sql %}
select streamline.udf_bulk_get_topshot_moments_minted_metadata()
where exists (
select 1
from streamline.all_topshot_moments_minted_metadata_needed
limit 1
)
{% endset %}
{% do run_query(sql) %}
{% endmacro %}

View File

@ -1,8 +0,0 @@
{% macro udf_bulk_get_topshot_moments_minted_metadata() %}
CREATE
OR REPLACE EXTERNAL FUNCTION streamline.udf_bulk_get_topshot_moments_minted_metadata() returns text api_integration = aws_flow_api_dev AS {% if target.database == "FLOW" -%}
'https://3ltti6kisi.execute-api.us-east-1.amazonaws.com/prod/bulk_get_topshot_moments_minted_metadata'
{% else %}
'https://wn6lmi2rs4.execute-api.us-east-1.amazonaws.com/dev/bulk_get_topshot_moments_minted_metadata'
{%- endif %}
{% endmacro %}

View File

@ -0,0 +1,5 @@
{% docs _res_id %}
A unique response ID for an API call.
{% enddocs %}

View File

@ -1,7 +1,9 @@
{{ config(
materialized = 'view',
tags = ['livequery', 'allday', 'moment_metadata'],
enabled = False
) }}
{# AllDay workflow inactive, view migrated to lq naming convention only #}
WITH mints AS (
SELECT

View File

@ -0,0 +1,319 @@
{{ config(
materialized = 'table',
unique_key = 'contract',
tags = ['livequery', 'topshot', 'allday', 'moment_metadata']
) }}
SELECT
'A.0b2a3299cc857e29.TopShot' AS contract,
'https://public-api.nbatopshot.com/graphql' as base_url,
'query getMintedMoment ($momentId: ID!) {
getMintedMoment (momentId: $momentId) {
data {
id
version
sortID
set {
id
sortID
version
flowId
flowName
flowSeriesNumber
flowLocked
setVisualId
assetPath
assets {
images {
type
url
}
}
}
play {
id
version
description
flowID
sortID
status
assets {
videos {
type
url
videoLength
}
videoLengthInMilliseconds
}
stats {
playerID
playerName
firstName
lastName
jerseyNumber
teamAtMoment
awayTeamName
awayTeamScore
homeTeamName
homeTeamScore
dateOfMoment
totalYearsExperience
teamAtMomentNbaId
height
weight
currentTeam
currentTeamId
primaryPosition
homeTeamNbaId
awayTeamNbaId
nbaSeason
draftYear
draftSelection
draftRound
birthplace
birthdate
draftTeam
draftTeamNbaId
playCategory
playType
quarter
}
statsPlayerGameScores {
blocks
points
steals
assists
minutes
rebounds
turnovers
plusMinus
flagrantFouls
personalFouls
technicalFouls
twoPointsMade
blockedAttempts
fieldGoalsMade
freeThrowsMade
threePointsMade
defensiveRebounds
offensiveRebounds
pointsOffTurnovers
twoPointsAttempted
assistTurnoverRatio
fieldGoalsAttempted
freeThrowsAttempted
twoPointsPercentage
fieldGoalsPercentage
freeThrowsPercentage
threePointsAttempted
threePointsPercentage
playerPosition
}
statsPlayerSeasonAverageScores {
minutes
blocks
points
steals
assists
rebounds
turnovers
plusMinus
flagrantFouls
personalFouls
technicalFouls
twoPointsMade
blockedAttempts
fieldGoalsMade
freeThrowsMade
threePointsMade
defensiveRebounds
offensiveRebounds
pointsOffTurnovers
twoPointsAttempted
assistTurnoverRatio
fieldGoalsAttempted
freeThrowsAttempted
twoPointsPercentage
fieldGoalsPercentage
freeThrowsPercentage
threePointsAttempted
threePointsPercentage
efficiency
true_shooting_attempts
points_in_paint_made
points_in_paint_attempted
points_in_paint
fouls_drawn
offensive_fouls
fast_break_points
fast_break_points_attempted
fast_break_points_made
second_chance_points
second_chance_points_attempted
second_chance_points_made
}
tags {
id
name
title
visible
hardcourt
level
}
}
flowId
flowSerialNumber
price
forSale
listingOrderID
owner {
dapperID
email
flowAddress
username
profileImageUrl
twitterHandle
segmentID
}
assetPathPrefix
setPlay {
ID
setID
playID
flowRetired
circulationCount
tags {
id
name
title
visible
hardcourt
level
}
}
createdAt
acquiredAt
packListingID
tags {
id
name
title
visible
hardcourt
level
}
}
}
}' AS query
UNION
SELECT
'A.e4cf4bdc1751c65d.AllDay' AS contract,
'https://nflallday.com/consumer/graphql' as base_url,
'query SearchMomentNFTsV2($input: SearchMomentNFTsInputV2!) {
searchMomentNFTsV2(input: $input) {
edges {
cursor
node {
id
ownerAddress
serialNumber
flowID
distributionFlowID
packNFTFlowID
editionFlowID
owner {
id
dapperID
email
phoneNumber
username
flowAddress
profileImageUrl
isCurrentTOSSigned
}
edition {
id
flowID
playFlowID
seriesFlowID
setFlowID
maxMintSize
currentMintSize
tier
description
numMomentsOwned
numMomentsInPacks
numMomentsUnavailable
numMomentsBurned
series {
flowID
name
active
}
set {
flowID
name
}
play {
id
flowID
metadata {
state
description
league
playType
videos {
type
url
videoLength
}
images {
type
url
}
classification
week
season
playerID
playerFullName
playerFirstName
playerLastName
playerPosition
playerNumber
playerWeight
playerHeight
playerBirthdate
playerBirthplace
playerBirthplace
playerRookieYear
playerDraftTeam
playerDraftYear
playerDraftRound
playerDraftNumber
playerCollege
teamID
gameNflID
gameDate
homeTeamName
homeTeamID
homeTeamScore
awayTeamName
awayTeamID
awayTeamScore
gameTime
gameQuarter
gameDown
gameDistance
teamName
}
}
}
}
}
pageInfo {
endCursor
hasNextPage
}
totalCount
}
}' AS query

View File

@ -0,0 +1,13 @@
version: 2
models:
- name: livequery__moments_parameters
description: |-
A simple helper table to store the base_url and graphQL query for the livequery request moments model(s). Ok to store in plaintext as it is not sensitive information.
columns:
- name: contract
- name: base_url
- name: query

View File

@ -0,0 +1,35 @@
{{ config(
materialized = 'incremental',
unique_key = '_id',
tags = ['livequery', 'topshot', 'moment_metadata']
) }}
SELECT
moment_id,
event_contract,
_inserted_date,
_inserted_timestamp,
MD5(
'moment_id' || 'event_contract' || '_inserted_date'
) AS _id
FROM
{{ ref('livequery__request_topshot_metadata') }}
WHERE
DATA :data :data :getMintedMoment :: STRING IS NULL
{% if is_incremental() %}
AND _inserted_date >= (
SELECT
MAX(_inserted_date)
FROM
{{ this }}
)
AND _inserted_timestamp > (
SELECT
MAX(_inserted_timestamp)
FROM
{{ this }}
)
{% else %}
AND _inserted_date >= '2022-12-09'
{% endif %}

View File

@ -0,0 +1,100 @@
import snowflake.snowpark.types as T
import snowflake.snowpark.functions as F
def register_udf_construct_data():
"""
Helper function to register a named UDF to construct the DATA object for the API call.
This named UDF can be used with a column expression, so multiple moment_ids can be called at the same time.
"""
udf_construct_data = (
F.udf(
lambda query, moment_id: {'query': query,
'variables': {'momentId': moment_id}},
name='udf_construct_data',
input_types=[
T.StringType(),
T.StringType()
],
return_type=T.VariantType(),
replace=True
)
)
return udf_construct_data
def model(dbt, session):
"""
This model will call the TopShot GraphQL API to request metadata for a list of moment_ids, determined by an exeternally defined view.
The request arguments are a GraphQL query and moment ID. The gql and API URL are stored in a table and retrieved in this workflow.
"""
dbt.config(
materialized='incremental',
unique_key='_RES_ID',
packages=['snowflake-snowpark-python'],
tags=['livequery', 'topshot', 'moment_metadata'],
incremental_strategy='delete+insert',
cluster_by=['_INSERTED_TIMESTAMP']
)
# base url and graphql query stored in table via dbt
topshot_gql_params = dbt.ref(
'livequery__moments_parameters').select(
'base_url', 'query').where(
F.col(
'contract') == 'A.0b2a3299cc857e29.TopShot'
).collect()
# define params for UDF_API
method = 'POST'
headers = {
'Content-Type': 'application/json'
}
url = topshot_gql_params[0][0]
# gql query passed with the post request
data = topshot_gql_params[0][1]
# metadata request requires moment_id, defined in a separate view
# number of moment_ids to request set by .limit(), timeout experienced at 4000
inputs = dbt.ref(
'livequery__topshot_moments_metadata_needed').select(
"EVENT_CONTRACT", "MOMENT_ID"
).limit(3500)
# register the udf_construct_data function
udf_construct_data = register_udf_construct_data()
# use with_columns to source moment_id from the input_df and call multiple udf_api calls at once
# columns defined in the array will be appended to the input dataframe
response = inputs.with_columns(
['DATA', '_INSERTED_DATE', '_INSERTED_TIMESTAMP', '_RES_ID'],
[
F.call_udf(
'flow.streamline.udf_api',
method,
url,
headers,
udf_construct_data(
F.lit(data),
F.col('MOMENT_ID')
),
F.lit(None), # USER_ID req on Flow deployment of UDF_API
F.lit(None) # SECRET_NAME req on Flow deployment of UDF_API
),
F.sysdate().cast(T.DateType()),
F.sysdate(),
F.md5(
F.concat(
F.col('EVENT_CONTRACT'),
F.col('MOMENT_ID')
)
)
]
)
# dbt will append response to table per incremental config
return response

View File

@ -0,0 +1,19 @@
version: 2
models:
- name: livequery__request_topshot_metadata
description: |-
LiveQuery-based model to request TopShot metadata from the public graphQL endpoint.
columns:
- name: EVENT_CONTRACT
- name: MOMENT_ID
- name: DATA
- name: _INSERTED_DATE
- name: _INSERTED_TIMESTAMP
- name: _RES_ID

View File

@ -1,5 +1,6 @@
{{ config(
materialized = 'view',
tags = ['livequery', 'topshot', 'moment_metadata']
) }}
WITH mints AS (
@ -35,11 +36,26 @@ all_topshots AS (
FROM
sales
),
always_null AS (
lq_always_null AS (
SELECT
moment_id,
event_contract,
COUNT(1) AS num_times_null_resp
FROM
{{ target.database }}.livequery.null_moments_metadata
WHERE
event_contract = 'A.0b2a3299cc857e29.TopShot'
GROUP BY
1,
2
HAVING
num_times_null_resp > 2
),
legacy_always_null AS (
SELECT
id,
contract,
COUNT(*) AS num_times_null_resp
COUNT(1) AS num_times_null_resp
FROM
{{ ref('streamline__null_moments_metadata') }}
WHERE
@ -54,17 +70,22 @@ SELECT
DISTINCT *
FROM
all_topshots
EXCEPT
(
SELECT
nft_collection AS event_contract,
nft_id AS moment_id
FROM
{{ ref('silver__nft_topshot_metadata') }}
UNION
SELECT
contract,
id
FROM
always_null
WHERE
moment_id NOT IN (
(
SELECT
nft_id AS moment_id
FROM
{{ target.database }}.silver.nft_topshot_metadata
UNION
SELECT
id AS moment_id
FROM
legacy_always_null
UNION
SELECT
moment_id
FROM
lq_always_null
)
)

View File

@ -1,7 +1,10 @@
{{ config(
materialized = 'incremental',
unique_key = ["id","contract","_inserted_date"]
unique_key = ["id","contract","_inserted_date"],
tags = ['topshot', 'moment_metadata'],
enabled = True
) }}
{# Legacy workflow - TODO deprecate soon #}
SELECT
id,
@ -27,5 +30,5 @@ AND _inserted_timestamp > (
{{ this }}
)
{% else %}
AND _inserted_date >= '2022-12-09'
AND _inserted_date >= '2022-12-09'
{% endif %}

View File

@ -2,10 +2,10 @@
materialized = 'incremental',
incremental_strategy = 'delete+insert',
cluster_by = ['_inserted_timestamp::DATE'],
unique_key = 'nft_id',
tags = ['scheduled']
unique_key = 'nft_id'
) }}
{# Note - removed schedule tag as the legacy lambda workflow is inactive.
No need to query external table #}
WITH metadata AS (
SELECT

View File

@ -3,36 +3,36 @@
incremental_strategy = 'delete+insert',
cluster_by = ['_inserted_timestamp::DATE'],
unique_key = 'nft_id',
tags = ['scheduled']
tags = ['livequery'],
full_refresh = False
) }}
{# NFT Metadata from legacy process lives in external table, deleted CTE and set FR=False
to limit / avoid unnecessary table scans #}
WITH metadata AS (
WITH metadata_lq AS (
SELECT
*
_res_id,
'A.0b2a3299cc857e29.TopShot' AS contract,
moment_id,
DATA :data :data :: variant AS DATA,
_inserted_timestamp
FROM
{{ ref('bronze__moments_metadata') }}
WHERE
contract = 'A.0b2a3299cc857e29.TopShot'
{{ ref('livequery__request_topshot_metadata') }}
{% if is_incremental() %}
AND _inserted_timestamp >= (
SELECT
MAX(_inserted_timestamp)
FROM
{{ this }}
)
WHERE
_inserted_timestamp >= (
SELECT
MAX(_inserted_timestamp)
FROM
{{ this }}
)
{% endif %}
qualify ROW_NUMBER() over (
PARTITION BY id
ORDER BY
DATA :getMintedMoment :data :acquiredAt :: TIMESTAMP
) = 1
),
FINAL AS (
lq_final AS (
SELECT
id AS nft_id,
moment_id AS nft_id,
contract AS nft_collection,
DATA :getMintedMoment :data :id :: STRING AS nbatopshot_id,
DATA :getMintedMoment :data :flowSerialNumber :: NUMBER AS serial_number,
@ -52,11 +52,15 @@ FINAL AS (
DATA :getMintedMoment :data :play :statsPlayerSeasonAverageScores :: OBJECT AS player_stats_season_to_date,
_inserted_timestamp
FROM
metadata
metadata_lq
WHERE
DATA :getMintedMoment :: STRING IS NOT NULL
)
SELECT
*
FROM
FINAL
lq_final qualify ROW_NUMBER() over (
PARTITION BY nft_id
ORDER BY
_inserted_timestamp DESC
) = 1

View File

@ -1,17 +0,0 @@
# Setup Snowflake Api Integration & UDFS
## Setup Snowflake Api Integration
Use the [create_aws_flow_api()](../../macros/streamline/api_integrations.sql#2) macro to create the `streamline-flow` Snowflake API integration.
The
```zsh
DBT_TARGET=sbx make sl-flow-api
# This runs:
# dbt run-operation create_aws_flow_api \
# --profile flow \
# --target $(DBT_TARGET) \
# --profiles-dir ~/.dbt/
```