diff --git a/.github/workflows/dbt_run_streamline_helius_cnft_metadata.yml b/.github/workflows/dbt_run_streamline_helius_cnft_metadata.yml new file mode 100644 index 00000000..932fa99e --- /dev/null +++ b/.github/workflows/dbt_run_streamline_helius_cnft_metadata.yml @@ -0,0 +1,45 @@ +name: dbt_run_streamline_helius_cnft_metadata +run-name: dbt_run_streamline_helius_cnft_metadata + +on: + workflow_dispatch: + branches: + - "main" + +env: + DBT_PROFILES_DIR: "${{ vars.DBT_PROFILES_DIR }}" + + ACCOUNT: "${{ vars.ACCOUNT }}" + ROLE: "${{ vars.ROLE }}" + USER: "${{ vars.USER }}" + PASSWORD: "${{ secrets.PASSWORD }}" + REGION: "${{ vars.REGION }}" + DATABASE: "${{ vars.DATABASE }}" + WAREHOUSE: "${{ vars.WAREHOUSE }}" + SCHEMA: "${{ vars.SCHEMA }}" + +concurrency: + group: ${{ github.workflow }} + +jobs: + run_dbt_jobs: + runs-on: ubuntu-latest + environment: + name: workflow_prod + + steps: + - uses: actions/checkout@v3 + + - uses: actions/setup-python@v4 + with: + python-version: "${{ vars.PYTHON_VERSION }}" + cache: "pip" + + - name: install dependencies + run: | + pip install -r requirements.txt + dbt deps + - name: Run DBT Jobs + run: | + dbt run -s streamline__complete_helius_cnft_metadata_requests + dbt run --vars '{"STREAMLINE_INVOKE_STREAMS": True}' -s streamline__helius_cnft_metadata_requests \ No newline at end of file diff --git a/data/github_actions__workflows.csv b/data/github_actions__workflows.csv index 5d028f53..ecd7958b 100644 --- a/data/github_actions__workflows.csv +++ b/data/github_actions__workflows.csv @@ -6,4 +6,5 @@ dbt_run_decode_instructions_orchestrator,"*/5 * * * *" dbt_run_nft_compressed_mints_realtime,"*/15 * * * *" dbt_run_nft_compressed_mints_sales_realtime,"55 * * * *" dbt_run_decode_logs_orchestrator,"*/15 * * * *" -dbt_run_streamline_block_rewards,"*/15 * * * *" \ No newline at end of file +dbt_run_streamline_block_rewards,"*/15 * * * *" +dbt_run_streamline_helius_cnft_metadata,"*/10 * * * *" \ No newline at end of file diff --git a/models/bronze/streamline/bronze__streamline_FR_helius_cnft_metadata.sql b/models/bronze/streamline/bronze__streamline_FR_helius_cnft_metadata.sql new file mode 100644 index 00000000..dcde247c --- /dev/null +++ b/models/bronze/streamline/bronze__streamline_FR_helius_cnft_metadata.sql @@ -0,0 +1,12 @@ +{{ config ( + materialized = 'view' +) }} + +{% set model = "helius_nft_metadata" %} +{{ streamline_external_table_FR_query( + model, + partition_function = "concat_ws('-',split_part(split_part(file_name,'/',3),'_',1),split_part(split_part(file_name,'/',3),'_',2),split_part(split_part(file_name,'/',3),'_',3))", + partition_name = "_partition_by_created_date", + unique_key = "data:id::STRING AS mint", + other_cols="HELIUS_NFT_METADATA_REQUESTS_ID, MAX_MINT_EVENT_INSERTED_TIMESTAMP" +) }} \ No newline at end of file diff --git a/models/bronze/streamline/bronze__streamline_helius_cnft_metadata.sql b/models/bronze/streamline/bronze__streamline_helius_cnft_metadata.sql new file mode 100644 index 00000000..feba1df3 --- /dev/null +++ b/models/bronze/streamline/bronze__streamline_helius_cnft_metadata.sql @@ -0,0 +1,12 @@ +{{ config ( + materialized = 'view' +) }} + +{% set model = "helius_nft_metadata" %} +{{ streamline_external_table_query( + model, + partition_function = "concat_ws('-',split_part(split_part(file_name,'/',3),'_',1),split_part(split_part(file_name,'/',3),'_',2),split_part(split_part(file_name,'/',3),'_',3))", + partition_name = "_partition_by_created_date", + unique_key = "data:id::STRING AS mint", + other_cols="HELIUS_NFT_METADATA_REQUESTS_ID, MAX_MINT_EVENT_INSERTED_TIMESTAMP" +) }} \ No newline at end of file diff --git a/models/sources.yml b/models/sources.yml index d24478ec..818e10ad 100644 --- a/models/sources.yml +++ b/models/sources.yml @@ -57,6 +57,7 @@ sources: - name: decoded_logs - name: stake_program_accounts_2 - name: block_rewards_2 + - name: helius_nft_metadata - name: bronze_api schema: bronze_api tables: diff --git a/models/streamline/nft/metadata/complete/streamline__complete_helius_cnft_metadata_requests.sql b/models/streamline/nft/metadata/complete/streamline__complete_helius_cnft_metadata_requests.sql new file mode 100644 index 00000000..39b943ae --- /dev/null +++ b/models/streamline/nft/metadata/complete/streamline__complete_helius_cnft_metadata_requests.sql @@ -0,0 +1,37 @@ +-- depends_on: {{ ref('bronze__streamline_helius_cnft_metadata') }} + +{{ + config( + materialized = 'incremental', + unique_key = 'mint', + cluster_by = ['_inserted_timestamp::date'], + post_hook = enable_search_optimization('{{this.schema}}', '{{this.identifier}}', 'ON EQUALITY(mint)') + ) +}} + +SELECT + mint, + helius_nft_metadata_requests_id, + max_mint_event_inserted_timestamp, + _partition_by_created_date, + _inserted_timestamp +FROM + {% if is_incremental() %} + {{ ref('bronze__streamline_helius_cnft_metadata') }} + {% else %} + {{ ref('bronze__streamline_FR_helius_cnft_metadata') }} + {% endif %} +{% if is_incremental() %} +WHERE + _inserted_timestamp >= ( + SELECT + coalesce(max(_inserted_timestamp), '1970-01-01'::DATE) max_inserted_timestamp + FROM + {{ this }} + ) +{% endif %} +QUALIFY + row_number() OVER ( + PARTITION BY mint + ORDER BY _inserted_timestamp DESC + ) = 1 \ No newline at end of file diff --git a/models/streamline/nft/metadata/requests/streamline__helius_cnft_metadata_requests.sql b/models/streamline/nft/metadata/requests/streamline__helius_cnft_metadata_requests.sql new file mode 100644 index 00000000..4c956464 --- /dev/null +++ b/models/streamline/nft/metadata/requests/streamline__helius_cnft_metadata_requests.sql @@ -0,0 +1,101 @@ +{{ + config( + materialized = "view", + post_hook = fsc_utils.if_data_call_function_v2( + func = 'streamline.udf_bulk_rest_api_v2', + target = "{{this.schema}}.{{this.identifier}}", + params = { + "external_table": "helius_nft_metadata", + "sql_limit": "100", + "producer_batch_size": "100", + "worker_batch_size": "10", + "sql_source": "{{this.identifier}}", + "exploded_key": tojson(["result"]), + "order_by_column": "group_num", + } + ) + ) +}} + +WITH all_unknown_metadata AS ( + SELECT + mint + FROM + {{ ref('silver__nft_compressed_mints') }} + WHERE + _inserted_timestamp >= ( + SELECT + coalesce(max(max_mint_event_inserted_timestamp), '2000-01-01') + FROM + {{ ref('streamline__complete_helius_cnft_metadata_requests') }} + ) + EXCEPT + SELECT + mint + FROM + {{ ref('streamline__complete_helius_cnft_metadata_requests') }} +), +numbered AS ( + SELECT + m.*, + row_number() OVER (ORDER BY m._inserted_timestamp) AS row_num + FROM + {{ ref('silver__nft_compressed_mints') }} m + INNER JOIN + all_unknown_metadata + USING(mint) + WHERE + _inserted_timestamp >= ( + SELECT + coalesce(max(max_mint_event_inserted_timestamp), '2000-01-01') + FROM + {{ ref('streamline__complete_helius_cnft_metadata_requests') }} + ) +), +grouped AS ( + SELECT + mint, + floor((row_num - 1) / 1000) + 1 AS group_num, + _inserted_timestamp + FROM + numbered +), +list_mints AS ( + SELECT + array_agg(mint) AS list_mint, + max(_inserted_timestamp) AS max_mint_event_inserted_timestamp, + group_num + FROM + grouped + GROUP BY + group_num +) +SELECT + group_num, + concat_ws('_', current_timestamp, group_num) AS helius_nft_metadata_requests_id, + max_mint_event_inserted_timestamp::string AS max_mint_event_inserted_timestamp, + replace(current_date::string, '-', '_') AS partition_key, + {{ target.database }}.live.udf_api( + 'POST', + '{service}/?api-key={Authentication}', + object_construct( + 'Content-Type', + 'application/json' + ), + object_construct( + 'id', + helius_nft_metadata_requests_id, + 'jsonrpc', + '2.0', + 'method', + 'getAssetBatch', + 'params', + object_construct( + 'ids', + list_mint + ) + ), + 'Vault/prod/solana/helius/mainnet' + ) AS request +FROM + list_mints