Merge branch 'main' into STREAM-375/aptos_streamline

This commit is contained in:
Ryan-Loofy 2023-11-14 12:37:02 -05:00
commit 30df8e4413
14 changed files with 328 additions and 7 deletions

View File

@ -0,0 +1,52 @@
name: dbt_run_backfill_generic
run-name: dbt_run_backfill_generic
on:
workflow_dispatch:
env:
USE_VARS: "${{ vars.USE_VARS }}"
DBT_PROFILES_DIR: "${{ vars.DBT_PROFILES_DIR }}"
DBT_VERSION: "${{ vars.DBT_VERSION }}"
ACCOUNT: "${{ vars.ACCOUNT }}"
ROLE: "${{ vars.ROLE }}"
USER: "${{ vars.USER }}"
PASSWORD: "${{ secrets.PASSWORD }}"
REGION: "${{ vars.REGION }}"
DATABASE: "${{ vars.DATABASE }}"
WAREHOUSE: "${{ vars.WAREHOUSE }}"
SCHEMA: "${{ vars.SCHEMA }}"
concurrency:
group: ${{ github.workflow }}
jobs:
called_workflow_template:
uses: FlipsideCrypto/analytics-workflow-templates/.github/workflows/dbt_run_template.yml@main
with:
dbt_command: |
dbt run -m models/bronze/bronze_api/bronze_api__aptoslabs_aptos_names.sql;
dbt run -m models/bronze/bronze_api/bronze_api__aptoslabs_aptos_names.sql;
dbt run -m models/bronze/bronze_api/bronze_api__aptoslabs_aptos_names.sql;
dbt run -m models/bronze/bronze_api/bronze_api__aptoslabs_aptos_names.sql;
dbt run -m models/bronze/bronze_api/bronze_api__aptoslabs_aptos_names.sql;
dbt run -m models/bronze/bronze_api/bronze_api__aptoslabs_aptos_names.sql;
dbt run -m models/bronze/bronze_api/bronze_api__aptoslabs_aptos_names.sql;
dbt run -m models/bronze/bronze_api/bronze_api__aptoslabs_aptos_names.sql;
dbt run -m models/bronze/bronze_api/bronze_api__aptoslabs_aptos_names.sql;
dbt run -m models/bronze/bronze_api/bronze_api__aptoslabs_aptos_names.sql;
dbt run -m models/bronze/bronze_api/bronze_api__aptoslabs_aptos_names.sql;
dbt run -m models/bronze/bronze_api/bronze_api__aptoslabs_aptos_names.sql;
dbt run -m models/bronze/bronze_api/bronze_api__aptoslabs_aptos_names.sql;
dbt run -m models/bronze/bronze_api/bronze_api__aptoslabs_aptos_names.sql;
dbt run -m models/bronze/bronze_api/bronze_api__aptoslabs_aptos_names.sql;
dbt run -m models/bronze/bronze_api/bronze_api__aptoslabs_aptos_names.sql;
dbt run -m models/bronze/bronze_api/bronze_api__aptoslabs_aptos_names.sql;
dbt run -m models/bronze/bronze_api/bronze_api__aptoslabs_aptos_names.sql;
dbt run -m models/bronze/bronze_api/bronze_api__aptoslabs_aptos_names.sql;
dbt run -m models/bronze/bronze_api/bronze_api__aptoslabs_aptos_names.sql;
environment: workflow_prod
warehouse: ${{ vars.WAREHOUSE }}
secrets: inherit

View File

@ -1,2 +1,3 @@
workflow_name,workflow_schedule
dbt_run_streamline_blocks_tx_realtime, "0 * * * *"
dbt_run_backfill_generic,"17 * * * *"

1 workflow_name workflow_schedule
2 dbt_run_streamline_blocks_tx_realtime 0 * * * *
3 dbt_run_backfill_generic 17 * * * *

19
macros/dbt_helper.sql Normal file
View File

@ -0,0 +1,19 @@
{% macro get_last_transaction_version_created_coin_info() %}
{% if execute %}
{% set last_version = run_query("SELECT MAX(transaction_version_created) FROM bronze_api.aptoslabs_coin_info").columns [0] [0] %}
{% else %}
{% set last_version = -1 %}
{% endif %}
{% do return(last_version) %}
{% endmacro %}
{% macro get_last_transaction_version_aptos_names() %}
{% if execute %}
{% set last_version = run_query("SELECT MAX(last_transaction_version) FROM bronze_api.aptoslabs_aptos_names").columns [0] [0] %}
{% else %}
{% set last_version = -1 %}
{% endif %}
{% do return(last_version) %}
{% endmacro %}

View File

@ -0,0 +1,61 @@
{{ config(
materialized = 'incremental',
full_refresh = false,
tags = ['noncore']
) }}
{% if is_incremental() %}
{% set last_version = get_last_transaction_version_aptos_names() %}
{% endif %}
WITH params AS (
SELECT
'query MyQuery { current_aptos_names( limit: 100 order_by: {last_transaction_version: asc}' ||
{% if is_incremental() %}
'where: {last_transaction_version: {_gte: "' || {{ last_version }} || '"}} ' ||
{% endif %}
' ) { domain domain_with_suffix expiration_timestamp is_active is_primary last_transaction_version owner_address registered_address subdomain token_name token_standard } } ' AS query
),
res AS (
SELECT
live.udf_api(
'post',
'https://indexer.mainnet.aptoslabs.com/v1/graphql',
OBJECT_CONSTRUCT(
'Content-Type',
'application/json'
),
OBJECT_CONSTRUCT(
'query',
query,
'variables',{}
)
) AS res,
query,
SYSDATE() AS _inserted_timestamp
FROM
params
)
SELECT
{# res, #}
query,
C.value :domain :: STRING AS domain,
C.value :domain_with_suffix :: STRING AS domain_with_suffix,
C.value :expiration_timestamp :: datetime AS expiration_timestamp,
C.value :is_active :: BOOLEAN AS is_active,
C.value :is_primary :: BOOLEAN AS is_primary,
C.value :last_transaction_version :: INT AS last_transaction_version,
C.value :owner_address :: STRING AS owner_address,
C.value :registered_address :: STRING AS registered_address,
C.value :subdomain :: STRING AS subdomain,
C.value :token_name :: STRING AS token_name,
C.value :token_standard :: STRING AS token_standard,
_inserted_timestamp
FROM
res,
LATERAL FLATTEN(
input => res :data :data :current_aptos_names
) C

View File

@ -0,0 +1,58 @@
{{ config(
materialized = 'incremental',
full_refresh = false,
tags = ['noncore']
) }}
{% if is_incremental() %}
{% set last_version = get_last_transaction_version_created_coin_info() %}
{% endif %}
WITH params AS (
SELECT
'query MyQuery { coin_infos( limit: 100 order_by: {transaction_version_created: asc}' ||
{% if is_incremental() %}
'where: {transaction_version_created: {_gte: "' || {{ last_version }} || '"}} ' ||
{% endif %}
') { coin_type coin_type_hash creator_address decimals name symbol transaction_created_timestamp transaction_version_created }} ' AS query
),
res AS (
SELECT
live.udf_api(
'post',
'https://indexer.mainnet.aptoslabs.com/v1/graphql',
OBJECT_CONSTRUCT(
'Content-Type',
'application/json'
),
OBJECT_CONSTRUCT(
'query',
query,
'variables',{}
)
) AS res,
query,
SYSDATE() AS _inserted_timestamp
FROM
params
)
SELECT
{# res, #}
query,
C.value :coin_type :: STRING AS coin_type,
C.value :coin_type_hash :: STRING AS coin_type_hash,
C.value :creator_address :: STRING AS creator_address,
C.value :decimals :: INT AS decimals,
C.value :name :: STRING AS NAME,
C.value :symbol :: STRING AS symbol,
C.value :transaction_created_timestamp :: datetime AS transaction_created_timestamp,
C.value :transaction_version_created :: INT AS transaction_version_created,
_inserted_timestamp
FROM
res,
LATERAL FLATTEN(
input => res :data :data :coin_infos
) C

View File

@ -0,0 +1,6 @@
{{ config(
materialized = 'view',
tags = ['gha_tasks']
) }}
{{ fsc_utils.gha_task_current_status_view() }}

View File

@ -0,0 +1,17 @@
version: 2
models:
- name: github_actions__current_task_status
columns:
- name: PIPELINE_ACTIVE
tests:
- dbt_expectations.expect_column_values_to_be_in_set:
value_set:
- TRUE
- name: SUCCESSES
tests:
- dbt_expectations.expect_column_values_to_be_in_set:
value_set:
- 204
config:
severity: warn
warn_if: ">0"

View File

@ -0,0 +1,5 @@
{{ config(
materialized = 'view'
) }}
{{ fsc_utils.gha_task_history_view() }}

View File

@ -0,0 +1,5 @@
{{ config(
materialized = 'view'
) }}
{{ fsc_utils.gha_task_performance_view() }}

View File

@ -0,0 +1,5 @@
{{ config(
materialized = 'view'
) }}
{{ fsc_utils.gha_task_schedule_view() }}

View File

@ -0,0 +1,5 @@
{{ config(
materialized = 'view'
) }}
{{ fsc_utils.gha_tasks_view() }}

View File

@ -0,0 +1,49 @@
{{ config(
materialized = 'incremental',
unique_key = "aptos_names_id",
incremental_strategy = 'merge',
merge_exclude_columns = ["inserted_timestamp"],
cluster_by = ['_inserted_timestamp::DATE'],
tags = ['core']
) }}
SELECT
domain,
domain_with_suffix,
creator_address,
expiration_timestamp,
is_active,
is_primary,
last_transaction_version,
owner_address,
registered_address,
subdomain,
token_name,
token_standard,
{{ dbt_utils.generate_surrogate_key(
['token_name']
) }} AS aptos_names_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp,
_inserted_timestamp,
'{{ invocation_id }}' AS _invocation_id
FROM
{{ ref(
'bronze_api__aptoslabs_aptos_names'
) }}
{% if is_incremental() %}
WHERE
_inserted_timestamp >= (
SELECT
MAX(
_inserted_timestamp
)
FROM
{{ this }}
)
{% endif %}
qualify(ROW_NUMBER() over(PARTITION BY token_name
ORDER BY
last_transaction_version DESC, _inserted_timestamp DESC)) = 1

View File

@ -35,16 +35,9 @@ models:
tests:
- not_null
- name: INSERTED_TIMESTAMP
tests:
- not_null
- name: MODIFIED_TIMESTAMP
tests:
- not_null
- name: _INSERTED_TIMESTAMP
tests:
- name: not_null_silver__blocks_INSERTED_TIMESTAMP_
test_name: not_null
- name: _INVOCATION_ID
tests:
- name: not_null_silver__blocks_INVOCATION_ID
test_name: not_null

View File

@ -0,0 +1,45 @@
{{ config(
materialized = 'incremental',
unique_key = "coin_type",
incremental_strategy = 'merge',
merge_exclude_columns = ["inserted_timestamp"],
cluster_by = ['_inserted_timestamp::DATE'],
tags = ['core']
) }}
SELECT
coin_type,
coin_type_hash,
creator_address,
decimals,
NAME,
symbol,
transaction_created_timestamp,
transaction_version_created,
{{ dbt_utils.generate_surrogate_key(
['coin_type']
) }} AS coin_info_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp,
_inserted_timestamp,
'{{ invocation_id }}' AS _invocation_id
FROM
{{ ref(
'bronze_api__aptoslabs_coin_info'
) }}
{% if is_incremental() %}
WHERE
_inserted_timestamp >= (
SELECT
MAX(
_inserted_timestamp
)
FROM
{{ this }}
)
{% endif %}
qualify(ROW_NUMBER() over(PARTITION BY coin_type
ORDER BY
_inserted_timestamp DESC)) = 1