update dnv source (#32)

This commit is contained in:
Austin 2023-07-19 17:24:54 -04:00 committed by GitHub
parent 92350da2b7
commit 2f4587e718
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 99 additions and 317 deletions

View File

@ -1,84 +0,0 @@
{{ config(
materialized = 'incremental',
unique_key = 'api_url',
full_refresh = false,
enabled = false
) }}
WITH requests AS (
SELECT
api_url,
date_day
FROM
{{ ref('silver__dnv_historical_requests') }}
{% if is_incremental() %}
WHERE
api_url NOT IN (
SELECT
api_url
FROM
{{ this }}
)
{% endif %}
ORDER BY
date_day DESC
LIMIT
30
), api_key AS (
SELECT
CONCAT(
'{\'Authorization\': \'Token ',
api_key,
'\', \'accept\': \'application/json\'}'
) AS header
FROM
{{ source(
'crosschain_silver',
'apis_keys'
) }}
WHERE
api_name = 'deepnftvalue'
),
row_nos AS (
SELECT
api_url,
ROW_NUMBER () over (
ORDER BY
api_url
) AS row_no,
FLOOR(
row_no / 2
) AS batch_no,
header
FROM
requests
JOIN api_key
ON 1 = 1
),
batched AS ({% for item in range(10) %}
SELECT
ethereum.streamline.udf_api(' GET ', api_url, PARSE_JSON(header),{}) AS resp, api_url, SYSDATE() _inserted_timestamp
FROM
row_nos rn
WHERE
batch_no = {{ item }}
AND EXISTS (
SELECT
1
FROM
row_nos
WHERE
batch_no = {{ item }}
LIMIT
1) {% if not loop.last %}
UNION ALL
{% endif %}
{% endfor %})
SELECT
resp,
_inserted_timestamp,
api_url
FROM
batched

View File

@ -1,85 +0,0 @@
{{ config(
materialized = 'incremental',
unique_key = '_id',
full_refresh = false,
enabled = false
) }}
WITH requests AS (
SELECT
api_url,
collection_slug,
_id
FROM
{{ ref('silver__dnv_latest_valuation_requests') }}
{% if is_incremental() %}
WHERE
_id NOT IN (
SELECT
_id
FROM
{{ this }}
)
{% endif %}
),
api_key AS (
SELECT
CONCAT(
'{\'Authorization\': \'Token ',
api_key,
'\', \'accept\': \'application/json\'}'
) AS header
FROM
{{ source(
'crosschain_silver',
'apis_keys'
) }}
WHERE
api_name = 'deepnftvalue'
),
row_nos AS (
SELECT
api_url,
collection_slug,
_id,
ROW_NUMBER() over (
ORDER BY
api_url
) AS row_no,
FLOOR(
row_no / 2
) + 1 AS batch_no,
header
FROM
requests
JOIN api_key
ON 1 = 1
),
batched AS ({% for item in range(11) %}
SELECT
ethereum.streamline.udf_api(' GET ', api_url, PARSE_JSON(header),{}) AS resp, _id, SYSDATE() _inserted_timestamp, collection_slug
FROM
row_nos rn
WHERE
batch_no = {{ item }}
AND EXISTS (
SELECT
1
FROM
row_nos
WHERE
batch_no = {{ item }}
LIMIT
1) {% if not loop.last %}
UNION ALL
{% endif %}
{% endfor %})
SELECT
resp,
_inserted_timestamp,
collection_slug,
_id
FROM
batched

View File

@ -0,0 +1,39 @@
{{ config (
materialized = 'view'
) }}
WITH meta AS (
SELECT
registered_on AS _inserted_timestamp,
file_name,
CONCAT(SPLIT_PART(SPLIT_PART(file_name, '/', 4), '_', 1), '-01') :: DATE AS DATE_PART
FROM
TABLE(
information_schema.external_table_files(
table_name => '{{ source( "bronze_streamline", "valuations_parquet") }}'
)
) A
)
SELECT
s.date_part,
_inserted_timestamp,
collection_address,
collection_name,
token_id,
price,
valuation_date,
currency,
collection_slug,
metadata$filename AS _filename
FROM
{{ source(
"bronze_streamline",
"valuations_parquet"
) }}
s
JOIN meta b
ON b.file_name = metadata$filename
AND b.date_part = s.date_part
WHERE
b.date_part = s.date_part

View File

@ -2,16 +2,38 @@
materialized = 'view'
) }}
SELECT
collection_address,
collection_name,
token_id,
price,
valuation_date,
currency,
CURRENT_TIMESTAMP :: timestamp_ntz AS _inserted_timestamp
FROM
{{ source(
'bronze_streamline',
'valuations'
) }}
WITH meta AS (
SELECT
last_modified AS _inserted_timestamp,
file_name,
CONCAT(SPLIT_PART(SPLIT_PART(file_name, '/', 4), '_', 1), '-01') :: DATE AS DATE_PART
FROM
TABLE(
information_schema.external_table_file_registration_history(
start_time => DATEADD('day', -7, CURRENT_TIMESTAMP()),
table_name => '{{ source( "bronze_streamline", "valuations_parquet") }}')
) A
)
SELECT
s.date_part,
_inserted_timestamp,
collection_address,
collection_name,
token_id,
price,
valuation_date,
currency,
collection_slug,
metadata$filename AS _filename
FROM
{{ source(
"bronze_streamline",
"valuations_parquet"
) }}
s
JOIN meta b
ON b.file_name = metadata$filename
AND b.date_part = s.date_part
WHERE
b.date_part = s.date_part

View File

@ -1,66 +0,0 @@
{{ config(
materialized = 'table',
enabled = false
) }}
WITH slugs AS (
SELECT
collection_slug,
total_supply,
created_at,
5000 AS limiter,
total_supply / limiter AS total_pages
FROM
{{ ref('bronze__dnv_collection_slugs') }}
),
date_series AS (
SELECT
date_day
FROM
{{ ref('silver__dates') }}
),
generate_sequence AS (
SELECT
SEQ4() AS seq
FROM
TABLE(GENERATOR(rowcount => 100))
),
all_tokens AS (
SELECT
*,
seq * limiter AS offset
FROM
date_series
JOIN slugs
ON created_at <= date_day
AND date_day < SYSDATE() :: DATE
JOIN generate_sequence
ON seq <= CEIL(total_pages) - 1
)
SELECT
collection_slug,
date_day,
CONCAT(
'https://api.deepnftvalue.com/v1/valuations/hist/',
collection_slug,
'?limit=',
limiter,
'&token_ids=all&start=',
date_day,
'&end=',
date_day
) AS api_url1,
CASE
WHEN offset = 0 THEN ''
ELSE CONCAT(
'&offset=',
offset
)
END AS api_url2,
CONCAT(
api_url1,
api_url2
) AS api_url
FROM
all_tokens

View File

@ -1,64 +0,0 @@
{{ config(
materialized = 'table',
enabled = false
) }}
WITH slugs AS (
SELECT
collection_slug,
total_supply
FROM
{{ ref('bronze__dnv_collection_slugs') }}
),
offsets AS (
SELECT
2500 AS limiter
),
generate_sequence AS (
SELECT
SEQ4() AS seq
FROM
TABLE(GENERATOR(rowcount => 100000))
),
limits AS (
SELECT
*,
seq * limiter AS offset
FROM
generate_sequence
JOIN offsets
ON 1 = 1
JOIN slugs
ON seq * limiter < total_supply
)
SELECT
*,
CASE
seq
WHEN 0 THEN CONCAT(
'https://api.deepnftvalue.com/v1/valuations/',
collection_slug,
'?limit=',
limiter,
'&token_ids=all'
)
ELSE CONCAT(
'http://api.deepnftvalue.com/v1/valuations/',
collection_slug,
'?limit=',
limiter,
'&offset=',
offset,
'&token_ids=all'
)
END AS api_url,
CONCAT(
collection_slug,
'-',
offset,
'-',
SYSDATE() :: DATE
) AS _id
FROM
limits

View File

@ -1,5 +1,9 @@
-- depends_on: {{ ref('bronze__streamline_valuations') }}
{{ config(
materialized = 'table'
materialized = 'incremental',
unique_key = "_id",
incremental_strategy = "delete+insert",
cluster_by = ['valuation_date::DATE']
) }}
WITH base AS (
@ -11,9 +15,24 @@ WITH base AS (
price,
valuation_date,
currency,
_inserted_timestamp
_inserted_timestamp,
collection_slug
FROM
{{ ref('bronze__streamline_valuations') }}
{% if is_incremental() %}
{{ ref('bronze__streamline_valuations') }}
WHERE
_inserted_timestamp >= (
SELECT
MAX(
_inserted_timestamp
) :: DATE
FROM
{{ this }}
)
{% else %}
{{ ref('bronze__streamline_fr_valuations') }}
{% endif %}
)
SELECT
valuation_date,
@ -22,9 +41,10 @@ SELECT
token_id,
currency,
price,
collection_slug,
_inserted_timestamp,
CONCAT(
collection_name,
collection_slug,
'-',
token_id,
'-',

View File

@ -46,5 +46,5 @@ sources:
database: streamline
schema: external
tables:
- name: valuations
- name: valuations_parquet