An 5047/helius cnft metadata (#669)

* streamline bronze data models

* wip complete and requests nft metadata models

* add source

* formatting

* update columns and explode key

* fix partition key

* update requests logic

* rename to cnft

* run cnft metadata every 10 minutes

* add SO

* formatting
This commit is contained in:
desmond-hui 2024-10-15 09:21:52 -07:00 committed by GitHub
parent 0e6555d975
commit 122fa61bc9
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 210 additions and 1 deletions

View File

@ -0,0 +1,45 @@
name: dbt_run_streamline_helius_cnft_metadata
run-name: dbt_run_streamline_helius_cnft_metadata
on:
workflow_dispatch:
branches:
- "main"
env:
DBT_PROFILES_DIR: "${{ vars.DBT_PROFILES_DIR }}"
ACCOUNT: "${{ vars.ACCOUNT }}"
ROLE: "${{ vars.ROLE }}"
USER: "${{ vars.USER }}"
PASSWORD: "${{ secrets.PASSWORD }}"
REGION: "${{ vars.REGION }}"
DATABASE: "${{ vars.DATABASE }}"
WAREHOUSE: "${{ vars.WAREHOUSE }}"
SCHEMA: "${{ vars.SCHEMA }}"
concurrency:
group: ${{ github.workflow }}
jobs:
run_dbt_jobs:
runs-on: ubuntu-latest
environment:
name: workflow_prod
steps:
- uses: actions/checkout@v3
- uses: actions/setup-python@v4
with:
python-version: "${{ vars.PYTHON_VERSION }}"
cache: "pip"
- name: install dependencies
run: |
pip install -r requirements.txt
dbt deps
- name: Run DBT Jobs
run: |
dbt run -s streamline__complete_helius_cnft_metadata_requests
dbt run --vars '{"STREAMLINE_INVOKE_STREAMS": True}' -s streamline__helius_cnft_metadata_requests

View File

@ -6,4 +6,5 @@ dbt_run_decode_instructions_orchestrator,"*/5 * * * *"
dbt_run_nft_compressed_mints_realtime,"*/15 * * * *"
dbt_run_nft_compressed_mints_sales_realtime,"55 * * * *"
dbt_run_decode_logs_orchestrator,"*/15 * * * *"
dbt_run_streamline_block_rewards,"*/15 * * * *"
dbt_run_streamline_block_rewards,"*/15 * * * *"
dbt_run_streamline_helius_cnft_metadata,"*/10 * * * *"
1 workflow_name workflow_schedule
6 dbt_run_nft_compressed_mints_realtime */15 * * * *
7 dbt_run_nft_compressed_mints_sales_realtime 55 * * * *
8 dbt_run_decode_logs_orchestrator */15 * * * *
9 dbt_run_streamline_block_rewards */15 * * * *
10 dbt_run_streamline_helius_cnft_metadata */10 * * * *

View File

@ -0,0 +1,12 @@
{{ config (
materialized = 'view'
) }}
{% set model = "helius_nft_metadata" %}
{{ streamline_external_table_FR_query(
model,
partition_function = "concat_ws('-',split_part(split_part(file_name,'/',3),'_',1),split_part(split_part(file_name,'/',3),'_',2),split_part(split_part(file_name,'/',3),'_',3))",
partition_name = "_partition_by_created_date",
unique_key = "data:id::STRING AS mint",
other_cols="HELIUS_NFT_METADATA_REQUESTS_ID, MAX_MINT_EVENT_INSERTED_TIMESTAMP"
) }}

View File

@ -0,0 +1,12 @@
{{ config (
materialized = 'view'
) }}
{% set model = "helius_nft_metadata" %}
{{ streamline_external_table_query(
model,
partition_function = "concat_ws('-',split_part(split_part(file_name,'/',3),'_',1),split_part(split_part(file_name,'/',3),'_',2),split_part(split_part(file_name,'/',3),'_',3))",
partition_name = "_partition_by_created_date",
unique_key = "data:id::STRING AS mint",
other_cols="HELIUS_NFT_METADATA_REQUESTS_ID, MAX_MINT_EVENT_INSERTED_TIMESTAMP"
) }}

View File

@ -57,6 +57,7 @@ sources:
- name: decoded_logs
- name: stake_program_accounts_2
- name: block_rewards_2
- name: helius_nft_metadata
- name: bronze_api
schema: bronze_api
tables:

View File

@ -0,0 +1,37 @@
-- depends_on: {{ ref('bronze__streamline_helius_cnft_metadata') }}
{{
config(
materialized = 'incremental',
unique_key = 'mint',
cluster_by = ['_inserted_timestamp::date'],
post_hook = enable_search_optimization('{{this.schema}}', '{{this.identifier}}', 'ON EQUALITY(mint)')
)
}}
SELECT
mint,
helius_nft_metadata_requests_id,
max_mint_event_inserted_timestamp,
_partition_by_created_date,
_inserted_timestamp
FROM
{% if is_incremental() %}
{{ ref('bronze__streamline_helius_cnft_metadata') }}
{% else %}
{{ ref('bronze__streamline_FR_helius_cnft_metadata') }}
{% endif %}
{% if is_incremental() %}
WHERE
_inserted_timestamp >= (
SELECT
coalesce(max(_inserted_timestamp), '1970-01-01'::DATE) max_inserted_timestamp
FROM
{{ this }}
)
{% endif %}
QUALIFY
row_number() OVER (
PARTITION BY mint
ORDER BY _inserted_timestamp DESC
) = 1

View File

@ -0,0 +1,101 @@
{{
config(
materialized = "view",
post_hook = fsc_utils.if_data_call_function_v2(
func = 'streamline.udf_bulk_rest_api_v2',
target = "{{this.schema}}.{{this.identifier}}",
params = {
"external_table": "helius_nft_metadata",
"sql_limit": "100",
"producer_batch_size": "100",
"worker_batch_size": "10",
"sql_source": "{{this.identifier}}",
"exploded_key": tojson(["result"]),
"order_by_column": "group_num",
}
)
)
}}
WITH all_unknown_metadata AS (
SELECT
mint
FROM
{{ ref('silver__nft_compressed_mints') }}
WHERE
_inserted_timestamp >= (
SELECT
coalesce(max(max_mint_event_inserted_timestamp), '2000-01-01')
FROM
{{ ref('streamline__complete_helius_cnft_metadata_requests') }}
)
EXCEPT
SELECT
mint
FROM
{{ ref('streamline__complete_helius_cnft_metadata_requests') }}
),
numbered AS (
SELECT
m.*,
row_number() OVER (ORDER BY m._inserted_timestamp) AS row_num
FROM
{{ ref('silver__nft_compressed_mints') }} m
INNER JOIN
all_unknown_metadata
USING(mint)
WHERE
_inserted_timestamp >= (
SELECT
coalesce(max(max_mint_event_inserted_timestamp), '2000-01-01')
FROM
{{ ref('streamline__complete_helius_cnft_metadata_requests') }}
)
),
grouped AS (
SELECT
mint,
floor((row_num - 1) / 1000) + 1 AS group_num,
_inserted_timestamp
FROM
numbered
),
list_mints AS (
SELECT
array_agg(mint) AS list_mint,
max(_inserted_timestamp) AS max_mint_event_inserted_timestamp,
group_num
FROM
grouped
GROUP BY
group_num
)
SELECT
group_num,
concat_ws('_', current_timestamp, group_num) AS helius_nft_metadata_requests_id,
max_mint_event_inserted_timestamp::string AS max_mint_event_inserted_timestamp,
replace(current_date::string, '-', '_') AS partition_key,
{{ target.database }}.live.udf_api(
'POST',
'{service}/?api-key={Authentication}',
object_construct(
'Content-Type',
'application/json'
),
object_construct(
'id',
helius_nft_metadata_requests_id,
'jsonrpc',
'2.0',
'method',
'getAssetBatch',
'params',
object_construct(
'ids',
list_mint
)
),
'Vault/prod/solana/helius/mainnet'
) AS request
FROM
list_mints