SL work - proposed changes

This commit is contained in:
Eric Laurello 2025-05-06 16:46:52 -04:00
parent 907aba3b22
commit f3350f9ee7
9 changed files with 580 additions and 120 deletions

View File

@ -2,21 +2,20 @@
materialized = 'view',
tags = ['streamline', 'topshot', 'moments_metadata', 'backfill'],
post_hook = fsc_utils.if_data_call_function_v2(
func = 'flow_dev.streamline.udf_bulk_rest_api_v2',
target = "streamline.flow_dev.{{this.identifier}}",
params = {
"external_table": "streamline.flow_dev.moments_minted_metadata_api",
"sql_limit": "100",
"producer_batch_size": "100",
"worker_batch_size": "100",
"sql_source": "{{this.identifier}}",
"async_concurrent_requests": "10"
}
func = '{{this.schema}}.udf_bulk_rest_api_v2',
target = "{{this.schema}}.{{this.identifier}}",
params ={ "external_table": "moments_minted_metadata_api",
"sql_limit": "100",
"producer_batch_size": "100",
"worker_batch_size": "100",
"sql_source": "{{this.identifier}}",
"async_concurrent_requests": "10" }
)
) }}
WITH api_parameters AS (
-- Use the same parameters as for realtime
SELECT
base_url,
query
@ -25,72 +24,87 @@ WITH api_parameters AS (
WHERE
contract = 'A.0b2a3299cc857e29.TopShot'
),
-- Get the most recent date we have metadata for
last_metadata_date AS (
SELECT
MAX(_inserted_timestamp)::DATE AS last_date
MAX(_inserted_timestamp) :: DATE AS last_date
FROM
{{ ref('silver__nft_topshot_metadata') }}
),
-- Find all historical moments that need metadata
moments_to_backfill AS (
SELECT
m.event_contract,
m.moment_id
FROM
{{ ref('livequery__topshot_moments_metadata_needed') }} m
LEFT JOIN (
SELECT
moment_id,
COUNT(*) AS failure_count
FROM
{{ ref('livequery__null_moments_metadata') }}
GROUP BY
moment_id
) null_attempts
ON m.moment_id = null_attempts.moment_id
LEFT JOIN (
SELECT
event_data:momentID::STRING AS moment_id,
block_timestamp
FROM
{{ ref('silver__nft_moments_s') }}
WHERE
event_contract = 'A.0b2a3299cc857e29.TopShot'
AND event_type = 'MomentMinted'
AND block_timestamp::DATE >= '2024-09-06'::DATE
) mint_data
ON m.moment_id = mint_data.moment_id
{{ ref('livequery__topshot_moments_metadata_needed') }}
m
LEFT JOIN (
SELECT
moment_id,
COUNT(*) AS failure_count
FROM
{{ ref('livequery__null_moments_metadata') }}
GROUP BY
moment_id
) null_attempts
ON m.moment_id = null_attempts.moment_id
LEFT JOIN (
SELECT
event_data :momentID :: STRING AS moment_id,
block_timestamp
FROM
{{ ref('silver__nft_moments_s') }}
WHERE
event_contract = 'A.0b2a3299cc857e29.TopShot'
AND event_type = 'MomentMinted'
AND block_timestamp :: DATE >= '2024-09-06' :: DATE
) mint_data
ON m.moment_id = mint_data.moment_id
WHERE
COALESCE(null_attempts.failure_count, 0) < 3
COALESCE(
null_attempts.failure_count,
0
) < 3
ORDER BY
mint_data.block_timestamp ASC NULLS LAST,
CAST(m.moment_id AS INTEGER) ASC
LIMIT {{ var('SQL_LIMIT', 100) }}
mint_data.block_timestamp ASC nulls last,
CAST(
m.moment_id AS INTEGER
) ASC
LIMIT
{{ var(
'SQL_LIMIT', 100
) }}
)
SELECT
TO_CHAR(TO_TIMESTAMP_NTZ(SYSDATE()), 'YYYY_MM_DD') AS partition_key,
SELECT
to_char(TO_TIMESTAMP_NTZ(SYSDATE()), 'YYYY_MM_DD') AS partition_key,
m.event_contract AS contract,
m.moment_id AS id,
{{ target.database }}.live.udf_api(
'POST',
p.base_url,
OBJECT_CONSTRUCT(
'Accept', 'application/json',
'Accept-Encoding', 'gzip',
'Connection', 'keep-alive',
'Content-Type', 'application/json',
'User-Agent', 'Flipside_Flow_metadata/0.1'
),
OBJECT_CONSTRUCT(
'query', p.query,
'variables', OBJECT_CONSTRUCT('momentId', m.moment_id)
)
) AS api_response
{{ target.database }}.live.udf_api(
'POST',
p.base_url,
OBJECT_CONSTRUCT(
'Accept',
'application/json',
'Accept-Encoding',
'gzip',
'Connection',
'keep-alive',
'Content-Type',
'application/json',
'User-Agent',
'Flipside_Flow_metadata/0.1'
),
OBJECT_CONSTRUCT(
'query',
p.query,
'variables',
OBJECT_CONSTRUCT(
'momentId',
m.moment_id
)
)
) AS request
FROM
moments_to_backfill m
CROSS JOIN api_parameters p
CROSS JOIN api_parameters p

View File

@ -4,19 +4,19 @@
post_hook = fsc_utils.if_data_call_function_v2(
func = '{{this.schema}}.udf_bulk_rest_api_v2',
target = "{{this.schema}}.{{this.identifier}}",
params = {
"external_table": "moments_minted_metadata_api",
"sql_limit": "50",
"producer_batch_size": "100",
"worker_batch_size": "100",
"sql_source": "{{this.identifier}}",
"async_concurrent_requests": "5"
}
)
params ={ "external_table": "moments_minted_metadata_api",
"sql_limit": "50",
"producer_batch_size": "100",
"worker_batch_size": "100",
"sql_source": "{{this.identifier}}",
"async_concurrent_requests": "5" }
),
enabled = false
) }}
WITH api_parameters AS (
-- Use the same parameters as the LiveQuery model
SELECT
base_url,
query
@ -25,62 +25,60 @@ WITH api_parameters AS (
WHERE
contract = 'A.0b2a3299cc857e29.TopShot'
),
moments_to_fetch AS (
SELECT
m.event_contract,
m.moment_id
FROM
{{ ref('livequery__topshot_moments_metadata_needed') }} m
LEFT JOIN (
SELECT
moment_id,
COUNT(*) AS failure_count
FROM
{{ ref('livequery__null_moments_metadata') }}
GROUP BY
moment_id
) null_attempts
ON m.moment_id = null_attempts.moment_id
LEFT JOIN (
SELECT DISTINCT
nft_id AS moment_id,
block_timestamp
FROM
{{ ref('nft__ez_nft_sales') }}
WHERE
nft_collection = 'A.0b2a3299cc857e29.TopShot'
AND block_timestamp >= DATEADD(day, -7, CURRENT_DATE()) -- Filter to only fetch metadata for moments that had activity in the last 7 days
) recent_txs
ON m.moment_id = recent_txs.moment_id
WHERE
COALESCE(null_attempts.failure_count, 0) < 3
AND recent_txs.moment_id IS NOT NULL -- Only include moments with recent transactions
ORDER BY
recent_txs.block_timestamp DESC
LIMIT {{ var('SQL_LIMIT', 50) }}
)
SELECT
DATE_PART('EPOCH', SYSDATE()) :: INTEGER AS partition_key,
m.event_contract AS contract,
m.moment_id AS id,
{{ target.database }}.live.udf_api(
'POST',
p.base_url,
{
'Content-Type': 'application/json',
{{ ref('livequery__topshot_moments_metadata_needed') }}
m
LEFT JOIN (
SELECT
moment_id,
COUNT(*) AS failure_count
FROM
{{ ref('livequery__null_moments_metadata') }}
GROUP BY
moment_id
) null_attempts
ON m.moment_id = null_attempts.moment_id
LEFT JOIN (
SELECT
DISTINCT nft_id AS moment_id,
block_timestamp
FROM
{{ ref('nft__ez_nft_sales') }}
WHERE
nft_collection = 'A.0b2a3299cc857e29.TopShot'
AND block_timestamp >= DATEADD(DAY, -7, CURRENT_DATE()) -- Filter to only fetch metadata for moments that had activity in the last 7 days) recent_txs
ON m.moment_id = recent_txs.moment_id
WHERE
COALESCE(
null_attempts.failure_count,
0
) < 3
AND recent_txs.moment_id IS NOT NULL -- Only include moments with recent transactions
ORDER BY
recent_txs.block_timestamp DESC
LIMIT
{{ var(
'SQL_LIMIT', 50
) }}
)
SELECT
DATE_PART('EPOCH', SYSDATE()) :: INTEGER AS partition_key,
m.event_contract AS contract,
m.moment_id AS id,
{{ target.database }}.live.udf_api(
'POST',
p.base_url,{ 'Content-Type': 'application/json',
'Accept': 'application/json',
'Accept-Encoding': 'gzip',
'Connection': 'keep-alive',
'User-Agent': 'Flipside_Flow_metadata/0.1'
},
{
'query': p.query,
'variables': {'momentId': m.moment_id}
},
NULL
) AS request
FROM
moments_to_fetch m
CROSS JOIN api_parameters p
'User-Agent': 'Flipside_Flow_metadata/0.1' },{ 'query': p.query,
'variables':{ 'momentId': m.moment_id }},
NULL
) AS request
FROM
moments_to_fetch m
CROSS JOIN api_parameters p

View File

@ -132,6 +132,7 @@ sources:
- name: contract_abis
- name: evm_traces_v2
- name: evm_decoded_logs
- name: topshot_metadata
- name: crosschain_silver
database: crosschain

View File

@ -0,0 +1,8 @@
{{ config (
materialized = 'view',
tags = ['streamline_non_core']
) }}
{{ streamline_external_table_query_v2(
model = "topshot_metadata",
partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 3), '_', 1) AS INTEGER )"
) }}

View File

@ -0,0 +1,8 @@
{{ config (
materialized = 'view',
tags = ['streamline_non_core']
) }}
{{ streamline_external_table_FR_query_v2(
model = "topshot_metadata",
partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 3), '_', 1) AS INTEGER )"
) }}

View File

@ -0,0 +1,68 @@
-- depends_on: {{ ref('bronze__streamline_topshot_metadata') }}
-- depends_on: {{ ref('bronze__streamline_topshot_metadata_FR') }}
{{ config (
materialized = "incremental",
unique_key = "topshot_metadata_complete_id",
merge_exclude_columns = ["inserted_timestamp"],
tags = ['streamline_complete']
) }}
SELECT
VALUE :CONTRACT :: STRING AS event_contract,
VALUE :id :: STRING AS moment_id,
DATA :errors :extensions :error_reason :: STRING AS error_reason,
partition_key :: STRING AS partition_key,
_inserted_timestamp,
{{ dbt_utils.generate_surrogate_key(
['event_contract','moment_id']
) }} AS topshot_metadata_complete_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp,
'{{ invocation_id }}' AS _invocation_id
FROM
{% if is_incremental() %}
{{ ref('bronze__streamline_topshot_metadata') }}
WHERE
_inserted_timestamp >= COALESCE(
(
SELECT
MAX(_inserted_timestamp) _inserted_timestamp
FROM
{{ this }}
),
'1900-01-01' :: timestamp_ntz
)
{% else %}
{{ ref('bronze__streamline_topshot_metadata_FR') }}
{% endif %}
{% if is_incremental() %}
{% else %}
UNION ALL
SELECT
contract AS event_contract,
id AS moment_id,
CASE
WHEN len(
DATA :getMintedMoment
) IS NULL THEN 'null data'
END error_reason,
to_char(TO_TIMESTAMP_NTZ(SYSDATE()), 'YYYYMMDD') AS partition_key,
_INSERTED_DATE AS _inserted_timestamp,
{{ dbt_utils.generate_surrogate_key(
['event_contract','moment_id']
) }} AS topshot_metadata_complete_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp,
'{{ invocation_id }}' AS _invocation_id
FROM
{{ source(
'bronze_streamline',
'moments_minted_metadata_api'
) }}
{% endif %}
qualify(ROW_NUMBER() over (PARTITION BY topshot_metadata_complete_id
ORDER BY
_inserted_timestamp DESC)) = 1

View File

@ -0,0 +1,71 @@
{{ config(
materialized = 'view',
tags = ['streamline', 'topshot', 'moments_metadata', 'backfill'],
post_hook = fsc_utils.if_data_call_function_v2(
func = '{{this.schema}}.udf_bulk_rest_api_v2',
target = "{{this.schema}}.{{this.identifier}}",
params ={ "external_table": "topshot_metadata",
"sql_limit": "100",
"producer_batch_size": "100",
"worker_batch_size": "100",
"sql_source": "{{this.identifier}}",
"async_concurrent_requests": "10" }
)
) }}
WITH api_parameters AS (
-- Use the same parameters as for realtime
SELECT
base_url,
query
FROM
{{ ref('streamline__topshot_parameters') }}
WHERE
contract = 'A.0b2a3299cc857e29.TopShot'
),
work_todo AS (
SELECT
event_contract,
moment_id
FROM
{{ ref('streamline__topshot_moments') }}
EXCEPT
SELECT
event_contract,
moment_id
FROM
{{ ref('streamline__topshot_metadata_complete') }}
)
SELECT
to_char(TO_TIMESTAMP_NTZ(SYSDATE()), 'YYYYMMDD') AS partition_key,
m.event_contract AS contract,
m.moment_id AS id,
{{ target.database }}.live.udf_api(
'POST',
p.base_url,
OBJECT_CONSTRUCT(
'Accept',
'application/json',
'Accept-Encoding',
'gzip',
'Connection',
'keep-alive',
'Content-Type',
'application/json',
'User-Agent',
'Flipside_Flow_metadata/0.1'
),
OBJECT_CONSTRUCT(
'query',
p.query,
'variables',
OBJECT_CONSTRUCT(
'momentId',
m.moment_id
)
)
) AS request
FROM
work_todo m
CROSS JOIN api_parameters p

View File

@ -0,0 +1,85 @@
{{ config (
materialized = "incremental",
unique_key = ["moment_id","event_contract"],
merge_exclude_columns = ["inserted_timestamp"],
tags = ['streamline_complete_evm']
) }}
WITH mints AS (
SELECT
event_contract,
event_data :momentID :: STRING AS moment_id,
_inserted_timestamp
FROM
{{ ref('silver__nft_moments_s') }}
WHERE
event_contract = 'A.0b2a3299cc857e29.TopShot'
AND event_type = 'MomentMinted'
{% if is_incremental() %}
AND _inserted_timestamp >= COALESCE(
(
SELECT
MAX(_inserted_timestamp) _inserted_timestamp
FROM
{{ this }}
),
'1900-01-01' :: timestamp_ntz
)
{% endif %}
),
sales AS (
SELECT
nft_collection AS event_contract,
nft_id AS moment_id,
_inserted_timestamp
FROM
{{ ref('silver__nft_sales_s') }}
WHERE
nft_collection ILIKE '%topshot%'
{% if is_incremental() %}
AND _inserted_timestamp >= COALESCE(
(
SELECT
MAX(_inserted_timestamp) _inserted_timestamp
FROM
{{ this }}
),
'1900-01-01' :: timestamp_ntz
)
{% endif %}
),
all_topshots AS (
SELECT
event_contract,
moment_id,
_inserted_timestamp
FROM
mints
UNION ALL
SELECT
event_contract,
moment_id,
_inserted_timestamp
FROM
sales
)
SELECT
event_contract,
moment_id,
_inserted_timestamp,
{{ dbt_utils.generate_surrogate_key(
['event_contract','moment_id']
) }} AS topshot_moments_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp,
'{{ invocation_id }}' AS _invocation_id
FROM
all_topshots qualify ROW_NUMBER() over (
PARTITION BY event_contract,
moment_id
ORDER BY
_inserted_timestamp DESC
) = 1

View File

@ -0,0 +1,207 @@
{{ config(
materialized = 'table',
unique_key = 'contract',
tags = ['livequery', 'topshot', 'allday', 'moment_metadata']
) }}
SELECT
'A.0b2a3299cc857e29.TopShot' AS contract,
'https://public-api.nbatopshot.com/graphql' as base_url,
'query getMintedMoment($momentId: ID!) {
getMintedMoment(momentId: $momentId) {
data {
id
version
sortID
set {
id
sortID
version
flowId
flowName
flowSeriesNumber
flowLocked
setVisualId
assetPath
assets {
images {
type
url
}
}
}
play {
id
version
description
flowID
sortID
status
assets {
videos {
type
url
videoLength
}
videoLengthInMilliseconds
}
stats {
playerID
playerName
firstName
lastName
jerseyNumber
teamAtMoment
awayTeamName
awayTeamScore
homeTeamName
homeTeamScore
dateOfMoment
totalYearsExperience
teamAtMomentNbaId
height
weight
currentTeam
currentTeamId
primaryPosition
homeTeamNbaId
awayTeamNbaId
nbaSeason
draftYear
draftSelection
draftRound
birthplace
birthdate
draftTeam
draftTeamNbaId
playCategory
playType
quarter
}
statsPlayerGameScores {
blocks
points
steals
assists
minutes
rebounds
turnovers
plusMinus
flagrantFouls
personalFouls
technicalFouls
twoPointsMade
blockedAttempts
fieldGoalsMade
freeThrowsMade
threePointsMade
defensiveRebounds
offensiveRebounds
pointsOffTurnovers
twoPointsAttempted
assistTurnoverRatio
fieldGoalsAttempted
freeThrowsAttempted
twoPointsPercentage
fieldGoalsPercentage
freeThrowsPercentage
threePointsAttempted
threePointsPercentage
playerPosition
}
statsPlayerSeasonAverageScores {
minutes
blocks
points
steals
assists
rebounds
turnovers
plusMinus
flagrantFouls
personalFouls
technicalFouls
twoPointsMade
blockedAttempts
fieldGoalsMade
freeThrowsMade
threePointsMade
defensiveRebounds
offensiveRebounds
pointsOffTurnovers
twoPointsAttempted
assistTurnoverRatio
fieldGoalsAttempted
freeThrowsAttempted
twoPointsPercentage
fieldGoalsPercentage
freeThrowsPercentage
threePointsAttempted
threePointsPercentage
efficiency
true_shooting_attempts
points_in_paint_made
points_in_paint_attempted
points_in_paint
fouls_drawn
offensive_fouls
fast_break_points
fast_break_points_attempted
fast_break_points_made
second_chance_points
second_chance_points_attempted
second_chance_points_made
}
tags {
id
name
title
visible
hardcourt
level
}
}
flowId
flowSerialNumber
price
forSale
listingOrderID
owner {
dapperID
email
flowAddress
username
profileImageUrl
twitterHandle
segmentID
}
assetPathPrefix
setPlay {
id: ID
setID
playID
flowRetired
circulationCount
tags {
id
name
title
visible
hardcourt
level
}
}
createdAt
acquiredAt
packListingID
tags {
id
name
title
visible
hardcourt
level
}
}
}
}' AS query