diff --git a/.github/workflows/dbt_run_scheduled_reward_points_silver.yml b/.github/workflows/dbt_run_scheduled_reward_points_silver.yml index 91605de..b702b5f 100644 --- a/.github/workflows/dbt_run_scheduled_reward_points_silver.yml +++ b/.github/workflows/dbt_run_scheduled_reward_points_silver.yml @@ -6,6 +6,7 @@ on: schedule: # Daily at 01:00 UTC - cron: "0 1 * * *" + env: SLACK_WEBHOOK_URL: "${{ secrets.SLACK_WEBHOOK_URL }}" USE_VARS: "${{ vars.USE_VARS }}" diff --git a/.github/workflows/dbt_run_scheduled_transaction_entries_realtime.yml b/.github/workflows/dbt_run_scheduled_transaction_entries_realtime.yml new file mode 100644 index 0000000..f70662e --- /dev/null +++ b/.github/workflows/dbt_run_scheduled_transaction_entries_realtime.yml @@ -0,0 +1,53 @@ +name: dbt_run_scheduled_transaction_entries_realtime +run-name: dbt_run_scheduled_transaction_entries_realtime + +on: + workflow_dispatch: + schedule: + # Daily at 00:00 UTC + - cron: "0 0 * * *" + +env: + USE_VARS: "${{ vars.USE_VARS }}" + DBT_PROFILES_DIR: "${{ vars.DBT_PROFILES_DIR }}" + DBT_VERSION: "${{ vars.DBT_VERSION }}" + ACCOUNT: "${{ vars.ACCOUNT }}" + ROLE: "${{ vars.ROLE }}" + USER: "${{ vars.USER }}" + PASSWORD: "${{ secrets.PASSWORD }}" + REGION: "${{ vars.REGION }}" + DATABASE: "${{ vars.DATABASE }}" + WAREHOUSE: "${{ vars.WAREHOUSE }}" + SCHEMA: "${{ vars.SCHEMA }}" + +concurrency: + group: ${{ github.workflow }} + +jobs: + dbt: + runs-on: ubuntu-latest + environment: + name: workflow_prod + steps: + - uses: actions/checkout@v3 + - uses: actions/setup-python@v4 + with: + python-version: "3.10" + cache: "pip" + + - name: install dependencies + run: | + pip install -r requirements.txt + dbt deps + + - name: Run DBT Jobs + run: > + dbt run -s streamline__transaction_entries_realtime --vars '{"STREAMLINE_INVOKE_STREAMS": True}' + + - name: Store logs + uses: actions/upload-artifact@v3 + with: + name: dbt-logs + path: | + logs + target diff --git a/models/silver/streamline/external/storefront/bronze_api__FR_transaction_entries.sql b/models/silver/streamline/external/storefront/bronze_api__FR_transaction_entries.sql new file mode 100644 index 0000000..b115411 --- /dev/null +++ b/models/silver/streamline/external/storefront/bronze_api__FR_transaction_entries.sql @@ -0,0 +1,9 @@ +{{ config ( + materialized = 'view', + tags = ['streamline_non_core', 'rewards_points_spend'] +) }} + +{{ streamline_external_table_FR_query_v2( + model = "transaction_entries", + partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 3), '_', 1) AS INTEGER )" +) }} diff --git a/models/silver/streamline/external/storefront/bronze_api__transaction_entries.sql b/models/silver/streamline/external/storefront/bronze_api__transaction_entries.sql new file mode 100644 index 0000000..5ab47e9 --- /dev/null +++ b/models/silver/streamline/external/storefront/bronze_api__transaction_entries.sql @@ -0,0 +1,9 @@ +{{ config ( + materialized = 'view', + tags = ['streamline_non_core', 'rewards_points_spend'] +) }} + +{{ streamline_external_table_query_v2( + model = "transaction_entries", + partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 3), '_', 1) AS INTEGER )" +) }} diff --git a/models/silver/streamline/external/storefront/silver_api__reward_points_spend.sql b/models/silver/streamline/external/storefront/silver_api__reward_points_spend.sql new file mode 100644 index 0000000..87281d3 --- /dev/null +++ b/models/silver/streamline/external/storefront/silver_api__reward_points_spend.sql @@ -0,0 +1,88 @@ +{{ config( + materialized = 'incremental', + unique_key = "entry_id", + incremental_strategy = 'merge', + merge_exclude_columns = ["inserted_timestamp"], + cluster_by = ['_inserted_timestamp :: DATE'], + post_hook = [ "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION on equality(user_wallet_address)" ], + tags = ['streamline_non_core', 'rewards_points_spend'] +) }} + +WITH silver_responses AS ( + + SELECT + partition_key, + entry_id, + created_at, + INDEX, + DATA, + _inserted_timestamp + FROM + {{ ref('silver_api__transaction_entries') }} + +{% if is_incremental() %} +WHERE + modified_timestamp >= ( + SELECT + MAX(modified_timestamp) + FROM + {{ this }} + ) +{% endif %} + +qualify(ROW_NUMBER() over (PARTITION BY entry_id +ORDER BY + _inserted_timestamp DESC)) = 1 +) +SELECT + entry_id, + created_at, + + DATA :direction :: STRING AS direction, + DATA :amount :: NUMBER AS amount, + DATA :loyaltyAccountStartAmount :: NUMBER AS amount_start, + DATA :loyaltyAccountEndAmount :: NUMBER AS amount_end, + + DATA :idempotencyKey :: STRING AS idempotency_key, + DATA :organizationId :: STRING AS organization_id, + DATA :websiteId :: STRING AS website_id, + + DATA :loyaltyAccountId :: STRING AS account_id, + DATA :loyaltyAccount :user :id :: STRING AS user_id, + DATA :loyaltyAccount :user :walletAddress :: STRING AS user_wallet_address, + + DATA :loyaltyTransactionId :: STRING AS transaction_id, + DATA :loyaltyTransaction :description :: STRING AS transaction_description, + DATA :loyaltyTransaction :type :: STRING AS transaction_type, + + DATA :loyaltyTransaction :loyaltyRule :id :: STRING AS rule_id, + DATA :loyaltyTransaction :loyaltyRule :type :: STRING AS rule_type, + DATA :loyaltyTransaction :loyaltyRule :name :: STRING AS rule_name, + DATA :loyaltyTransaction :loyaltyRule :description :: STRING AS rule_description, + DATA :loyaltyTransaction :loyaltyRule :metadata :: variant AS rule_metadata, + + OBJECT_DELETE( + DATA, + 'amount', + 'createdAt', + 'direction', + 'idempotencyKey', + 'loyaltyAccount', + 'loyaltyAccountId', + 'loyaltyAccountEndAmount', + 'loyaltyAccountStartAmount', + 'loyaltyTransactionId', + 'organizationId', + 'websiteId' + ) AS DATA, + partition_key, + INDEX, + _inserted_timestamp, + {{ dbt_utils.generate_surrogate_key( + ['entry_id', 'partition_key'] + ) }} AS reward_points_spend_id, + SYSDATE() AS inserted_timestamp, + SYSDATE() AS modified_timestamp, + '{{ invocation_id }}' AS _invocation_id +FROM + silver_responses diff --git a/models/silver/streamline/external/storefront/silver_api__reward_points_spend.yml b/models/silver/streamline/external/storefront/silver_api__reward_points_spend.yml new file mode 100644 index 0000000..9f44814 --- /dev/null +++ b/models/silver/streamline/external/storefront/silver_api__reward_points_spend.yml @@ -0,0 +1,101 @@ +version: 2 + +models: + - name: silver_api__reward_points_spend + description: "Response from the Storefront Transaction Entries API" + tests: + - dbt_utils.recency: + datepart: day + field: created_at + interval: 1 + + columns: + - name: entry_id + tests: + - not_null + - unique + + - name: created_at + tests: + - not_null + + - name: direction + tests: + - not_null + + - name: amount + tests: + - not_null + + - name: amount_start + tests: + - not_null + + - name: amount_end + tests: + - not_null + + - name: idempotency_key + + - name: organization_id + tests: + - not_null + + - name: website_id + tests: + - not_null + + - name: account_id + tests: + - not_null + + - name: user_id + tests: + - not_null + + - name: user_wallet_address + tests: + - not_null + + - name: transaction_id + tests: + - not_null + - unique + + - name: transaction_description + + - name: transaction_type + tests: + - not_null + + - name: rule_id + tests: + - not_null + + - name: rule_type + tests: + - not_null + + - name: rule_name + + - name: rule_description + + - name: rule_metadata + + - name: data + + - name: partition_key + tests: + - not_null + + - name: index + + - name: _inserted_timestamp + + - name: reward_points_spend_id + + - name: inserted_timestamp + + - name: modified_timestamp + + - name: _invocation_id diff --git a/models/silver/streamline/external/storefront/silver_api__transaction_entries.sql b/models/silver/streamline/external/storefront/silver_api__transaction_entries.sql new file mode 100644 index 0000000..08c56ba --- /dev/null +++ b/models/silver/streamline/external/storefront/silver_api__transaction_entries.sql @@ -0,0 +1,63 @@ +-- depends_on: {{ ref('bronze_api__transaction_entries') }} +-- depends_on: {{ ref('bronze_api__FR_transaction_entries') }} +{{ config( + materialized = 'incremental', + unique_key = "transaction_entries_id", + incremental_strategy = 'merge', + merge_exclude_columns = ["inserted_timestamp"], + cluster_by = ['_inserted_timestamp :: DATE'], + tags = ['streamline_non_core', 'rewards_points_spend'] +) }} + +WITH bronze AS ( + + SELECT + partition_key, + DATA, + VALUE :STARTING_AFTER :: STRING AS starting_after, + VALUE :API_LIMIT :: INTEGER AS api_limit, + ARRAY_SIZE( + DATA :data :: ARRAY + ) AS entry_count, + DATA :data [0] :id :: STRING AS first_entry_id, + DATA :data [entry_count - 1] :id :: STRING AS last_entry_id, + _inserted_timestamp + FROM + +{% if is_incremental() %} +{{ ref('bronze_api__transaction_entries') }} +WHERE + _inserted_timestamp >= ( + SELECT + MAX(_inserted_timestamp) _inserted_timestamp + FROM + {{ this }} + ) +{% else %} + {{ ref('bronze_api__FR_transaction_entries') }} +{% endif %} +) +SELECT + partition_key, + entry_count, + starting_after, + api_limit, + first_entry_id AS request_first_entry_id, + last_entry_id AS request_last_entry_id, + VALUE :createdAt :: timestamp_ntz AS created_at, + VALUE :id :: STRING AS entry_id, + INDEX :: INTEGER AS INDEX, + VALUE :: variant AS DATA, + _inserted_timestamp, + {{ dbt_utils.generate_surrogate_key( + ['entry_id', 'partition_key'] + ) }} AS transaction_entries_id, + SYSDATE() AS inserted_timestamp, + SYSDATE() AS modified_timestamp, + '{{ invocation_id }}' AS _invocation_id +FROM + bronze, + LATERAL FLATTEN( + input => DATA :data :: ARRAY + ) + diff --git a/models/silver/streamline/external/storefront/streamline__transaction_entries_realtime.sql b/models/silver/streamline/external/storefront/streamline__transaction_entries_realtime.sql new file mode 100644 index 0000000..3b50d3c --- /dev/null +++ b/models/silver/streamline/external/storefront/streamline__transaction_entries_realtime.sql @@ -0,0 +1,62 @@ +-- depends_on: {{ ref('silver_api__transaction_entries') }} +{{ config ( + materialized = "view", + post_hook = fsc_utils.if_data_call_function_v2( + func = '{{this.schema}}.udf_bulk_rest_api_v2', + target = "{{this.schema}}.{{this.identifier}}", + params ={ "external_table": "transaction_entries", + "sql_limit": "1", + "producer_batch_size": "1", + "worker_batch_size": "1", + "sql_source": "{{this.identifier}}" } + ) +) }} + +{% if not var( + 'STOREFRONT_INITIAL_RUN', + false + ) %} + {% if execute %} + {% set query %} + WITH target_entry_id AS ( + + SELECT + entry_id, + ROW_NUMBER() over ( + ORDER BY + partition_key DESC, + INDEX DESC + ) AS rn + FROM + {{ ref('silver_api__transaction_entries') }} + {# WHERE _inserted_timestamp >= CURRENT_DATE - 3 #} + ) + SELECT + entry_id + FROM + target_entry_id + WHERE + rn = 2 + {% endset %} + {% set starting_after = run_query(query).columns [0].values() [0] %} + {{ log( + "last_id: " ~ starting_after, + info = True + ) }} + {% endif %} +{% endif %} + +SELECT + {{ var( + 'API_LIMIT', + 1000 + ) }} AS api_limit, + '{{ starting_after }}' AS starting_after, + DATE_PART('EPOCH', SYSDATE()) :: INTEGER AS partition_key, + {{ target.database }}.live.udf_api( + 'GET', + '{Service}/api/loyalty/transaction_entries' || '?limit=' || api_limit{% if not var('STOREFRONT_INITIAL_RUN', false) %} || '&startingAfter=' || '{{ starting_after }}'{% endif %}, + { 'x-api-key': '{Authentication}' }, + {}, + 'Vault/prod/flow/snag-api' + ) AS request diff --git a/models/sources.yml b/models/sources.yml index 26fe39a..4e64ee5 100644 --- a/models/sources.yml +++ b/models/sources.yml @@ -126,6 +126,7 @@ sources: - name: evm_receipts - name: evm_traces - name: reward_points + - name: transaction_entries - name: points_transfers - name: crosschain_silver