From 64a009f0ab5f449dc9fe72047934b7e566d2d563 Mon Sep 17 00:00:00 2001 From: Jack Forgash <58153492+forgxyz@users.noreply.github.com> Date: Fri, 11 Oct 2024 14:16:25 -0600 Subject: [PATCH] AN-5252/points api integration (#368) * basic query * external points api request * rn REWARD_POINTS and add supplementary models * upd vault * add gha workflows * add headers incl user-agent * upd header * rm content type header * addback secret * use target * rn _realtime * upd silver and add FR view --- .../dbt_run_scheduled_reward_points_api.yml | 53 ++++++++++++++++ ...dbt_run_scheduled_reward_points_silver.yml | 62 +++++++++++++++++++ .../external/bronze_api__FR_reward_points.sql | 9 +++ .../external/bronze_api__reward_points.sql | 9 +++ .../external/silver_api__reward_points.sql | 44 +++++++++++++ .../external/silver_api__reward_points.yml | 55 ++++++++++++++++ .../external/streamline__evm_addresses.sql | 40 ++++++++++++ .../streamline__reward_points_realtime.sql | 39 ++++++++++++ models/sources.yml | 1 + 9 files changed, 312 insertions(+) create mode 100644 .github/workflows/dbt_run_scheduled_reward_points_api.yml create mode 100644 .github/workflows/dbt_run_scheduled_reward_points_silver.yml create mode 100644 models/silver/streamline/external/bronze_api__FR_reward_points.sql create mode 100644 models/silver/streamline/external/bronze_api__reward_points.sql create mode 100644 models/silver/streamline/external/silver_api__reward_points.sql create mode 100644 models/silver/streamline/external/silver_api__reward_points.yml create mode 100644 models/silver/streamline/external/streamline__evm_addresses.sql create mode 100644 models/silver/streamline/external/streamline__reward_points_realtime.sql diff --git a/.github/workflows/dbt_run_scheduled_reward_points_api.yml b/.github/workflows/dbt_run_scheduled_reward_points_api.yml new file mode 100644 index 0000000..7406175 --- /dev/null +++ b/.github/workflows/dbt_run_scheduled_reward_points_api.yml @@ -0,0 +1,53 @@ +name: dbt_run_scheduled_reward_points_api +run-name: dbt_run_scheduled_reward_points_api + +on: + workflow_dispatch: + schedule: + # Sunday at 00:00 UTC + - cron: "0 0 * * 0" + +env: + USE_VARS: "${{ vars.USE_VARS }}" + DBT_PROFILES_DIR: "${{ vars.DBT_PROFILES_DIR }}" + DBT_VERSION: "${{ vars.DBT_VERSION }}" + ACCOUNT: "${{ vars.ACCOUNT }}" + ROLE: "${{ vars.ROLE }}" + USER: "${{ vars.USER }}" + PASSWORD: "${{ secrets.PASSWORD }}" + REGION: "${{ vars.REGION }}" + DATABASE: "${{ vars.DATABASE }}" + WAREHOUSE: "${{ vars.WAREHOUSE }}" + SCHEMA: "${{ vars.SCHEMA }}" + +concurrency: + group: ${{ github.workflow }} + +jobs: + dbt: + runs-on: ubuntu-latest + environment: + name: workflow_prod + steps: + - uses: actions/checkout@v3 + - uses: actions/setup-python@v4 + with: + python-version: "3.10" + cache: "pip" + + - name: install dependencies + run: | + pip install -r requirements.txt + dbt deps + + - name: Run DBT Jobs + run: > + dbt run -s tag:streamline_non_core --vars '{"STREAMLINE_INVOKE_STREAMS": True, "STREAMLINE_USE_DEV_FOR_EXTERNAL_TABLES": False}' + + - name: Store logs + uses: actions/upload-artifact@v3 + with: + name: dbt-logs + path: | + logs + target diff --git a/.github/workflows/dbt_run_scheduled_reward_points_silver.yml b/.github/workflows/dbt_run_scheduled_reward_points_silver.yml new file mode 100644 index 0000000..617fc17 --- /dev/null +++ b/.github/workflows/dbt_run_scheduled_reward_points_silver.yml @@ -0,0 +1,62 @@ +name: dbt_run_scheduled_reward_points_silver +run-name: dbt_run_scheduled_reward_points_silver + +on: + workflow_dispatch: + schedule: + # Sunday at 01:00 UTC + - cron: "0 1 * * 0" + +env: + USE_VARS: "${{ vars.USE_VARS }}" + DBT_PROFILES_DIR: "${{ vars.DBT_PROFILES_DIR }}" + DBT_VERSION: "${{ vars.DBT_VERSION }}" + ACCOUNT: "${{ vars.ACCOUNT }}" + ROLE: "${{ vars.ROLE }}" + USER: "${{ vars.USER }}" + PASSWORD: "${{ secrets.PASSWORD }}" + REGION: "${{ vars.REGION }}" + DATABASE: "${{ vars.DATABASE }}" + WAREHOUSE: "${{ vars.WAREHOUSE }}" + SCHEMA: "${{ vars.SCHEMA }}" + +concurrency: + group: ${{ github.workflow }} + +jobs: + dbt: + runs-on: ubuntu-latest + environment: + name: workflow_prod + steps: + - uses: actions/checkout@v3 + - uses: actions/setup-python@v4 + with: + python-version: "3.10" + cache: "pip" + + - name: install dependencies + run: | + pip install -r requirements.txt + dbt deps + + - name: Run DBT Jobs + run: > + dbt run -s silver_api__reward_points + + - name: Test DBT Models + run: > + dbt test -s silver_api__reward_points + continue-on-error: true + + - name: Log test results + run: | + python python/dbt_test_alert.py + + - name: Store logs + uses: actions/upload-artifact@v3 + with: + name: dbt-logs + path: | + logs + target diff --git a/models/silver/streamline/external/bronze_api__FR_reward_points.sql b/models/silver/streamline/external/bronze_api__FR_reward_points.sql new file mode 100644 index 0000000..209c778 --- /dev/null +++ b/models/silver/streamline/external/bronze_api__FR_reward_points.sql @@ -0,0 +1,9 @@ +{{ config ( + materialized = 'view', + tags = ['streamline_non_core'] +) }} + +{{ streamline_external_table_FR_query_v2( + model = "reward_points", + partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 3), '_', 1) AS INTEGER )" +) }} diff --git a/models/silver/streamline/external/bronze_api__reward_points.sql b/models/silver/streamline/external/bronze_api__reward_points.sql new file mode 100644 index 0000000..3484cea --- /dev/null +++ b/models/silver/streamline/external/bronze_api__reward_points.sql @@ -0,0 +1,9 @@ +{{ config ( + materialized = 'view', + tags = ['streamline_non_core'] +) }} + +{{ streamline_external_table_query_v2( + model = "reward_points", + partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 3), '_', 1) AS INTEGER )" +) }} diff --git a/models/silver/streamline/external/silver_api__reward_points.sql b/models/silver/streamline/external/silver_api__reward_points.sql new file mode 100644 index 0000000..15a8541 --- /dev/null +++ b/models/silver/streamline/external/silver_api__reward_points.sql @@ -0,0 +1,44 @@ +-- depends_on: {{ ref('bronze_api__reward_points') }} +-- depends_on: {{ ref('bronze_api__FR_reward_points') }} +{{ config( + materialized = 'incremental', + unique_key = "reward_points_id", + incremental_strategy = 'merge', + merge_exclude_columns = ["inserted_timestamp"], + cluster_by = ['_inserted_timestamp :: DATE', 'address'] +) }} + +SELECT + partition_key, + VALUE :ADDRESS :: STRING as address, + to_timestamp(partition_key) :: DATE AS request_date, + DATA :boxes :: NUMBER as boxes, + DATA :boxes_opened :: NUMBER as boxes_opened, + DATA :eth_address :: STRING as eth_address, + DATA :keys :: NUMBER as keys, + DATA :points :: NUMBER as points, + {{ dbt_utils.generate_surrogate_key( + ['address', 'partition_key'] + ) }} AS reward_points_id, + _inserted_timestamp, + SYSDATE() AS inserted_timestamp, + SYSDATE() AS modified_timestamp, + '{{ invocation_id }}' AS _invocation_id +FROM + +{% if is_incremental() %} +{{ ref('bronze_api__reward_points') }} +WHERE + _inserted_timestamp >= ( + SELECT + MAX(_inserted_timestamp) _inserted_timestamp + FROM + {{ this }} + ) +{% else %} + {{ ref('bronze_api__FR_reward_points') }} +{% endif %} + +qualify(ROW_NUMBER() over (PARTITION BY reward_points_id +ORDER BY + _inserted_timestamp DESC)) = 1 diff --git a/models/silver/streamline/external/silver_api__reward_points.yml b/models/silver/streamline/external/silver_api__reward_points.yml new file mode 100644 index 0000000..316a205 --- /dev/null +++ b/models/silver/streamline/external/silver_api__reward_points.yml @@ -0,0 +1,55 @@ +version: 2 + +models: + - name: silver_api__reward_points + description: "Response from the Reward Points API" + tests: + - dbt_utils.recency: + datepart: day + field: request_date + interval: 1 + + columns: + - name: partition_key + + - name: address + + - name: request_date + + - name: boxes + tests: + - not_null + - dbt_expectations.expect_column_values_to_be_in_type_list: + column_type_list: + - NUMBER + + - name: boxes_opened + + - name: eth_address + + - name: keys + tests: + - not_null + - dbt_expectations.expect_column_values_to_be_in_type_list: + column_type_list: + - NUMBER + + - name: points + tests: + - not_null + - dbt_expectations.expect_column_values_to_be_in_type_list: + column_type_list: + - NUMBER + + - name: reward_points_id + tests: + - not_null + - unique + + - name: _inserted_timestamp + + - name: inserted_timestamp + + - name: modified_timestamp + + - name: _invocation_id diff --git a/models/silver/streamline/external/streamline__evm_addresses.sql b/models/silver/streamline/external/streamline__evm_addresses.sql new file mode 100644 index 0000000..f0147ae --- /dev/null +++ b/models/silver/streamline/external/streamline__evm_addresses.sql @@ -0,0 +1,40 @@ +{{ config( + materialized = 'incremental', + unique_key = 'evm_addresses_id', + incremental_strategy = 'merge', + merge_exclude_columns = ['inserted_timestamp'], + cluster_by = ['evm_addresses_id'], + tags = ['streamline_non_core'] +) }} + +WITH evm_transactions AS ( + + SELECT + DISTINCT from_address AS address + FROM + {{ ref('silver_evm__transactions') }} + +{% if is_incremental() %} +WHERE + modified_timestamp > ( + SELECT + MAX(modified_timestamp) + FROM + {{ this }} + ) +{% endif %} +) +SELECT + address, + SYSDATE() AS modified_timestamp, + SYSDATE() AS inserted_timestamp, + {{ dbt_utils.generate_surrogate_key( + ['address'] + ) }} AS evm_addresses_id, + '{{ invocation_id }}' AS _invocation_id +FROM + evm_transactions + +qualify(ROW_NUMBER() over (PARTITION BY address +ORDER BY + inserted_timestamp DESC)) = 1 diff --git a/models/silver/streamline/external/streamline__reward_points_realtime.sql b/models/silver/streamline/external/streamline__reward_points_realtime.sql new file mode 100644 index 0000000..aa3f761 --- /dev/null +++ b/models/silver/streamline/external/streamline__reward_points_realtime.sql @@ -0,0 +1,39 @@ +{{ config ( + materialized = "view", + post_hook = fsc_utils.if_data_call_function_v2( + func = '{{this.schema}}.udf_bulk_rest_api_v2', + target = "{{this.schema}}.{{this.identifier}}", + params = { + "external_table": "reward_points", + "sql_limit": "1000", + "producer_batch_size": "1000", + "worker_batch_size": "1000", + "sql_source": "{{this.identifier}}" + } + ), + tags = ['streamline_non_core'] +) }} + +WITH evm_addresses AS ( + + SELECT + DISTINCT address AS address + FROM + {{ ref('streamline__evm_addresses') }} +) +SELECT + DATE_PART('EPOCH', SYSDATE()) :: INTEGER AS partition_key, + address, + {{ target.database }}.live.udf_api( + 'GET', + '{Service}/points/ethereum/' || address, + { + 'User-Agent': 'Flipside/0.1', + 'Accept': 'application/json', + 'Connection': 'keep-alive' + }, + {}, + 'Vault/prod/flow/points-api/prod' + ) AS request +FROM + evm_addresses diff --git a/models/sources.yml b/models/sources.yml index ae43f71..e487fb9 100644 --- a/models/sources.yml +++ b/models/sources.yml @@ -125,6 +125,7 @@ sources: - name: evm_blocks - name: evm_receipts - name: evm_traces + - name: reward_points - name: crosschain_silver database: crosschain