change model tags everywhere

This commit is contained in:
Jack Forgash 2023-03-15 16:08:09 -06:00
parent 0f21812802
commit 70e53b9391
66 changed files with 389 additions and 333 deletions

View File

@ -27,7 +27,7 @@ jobs:
uses: FlipsideCrypto/analytics-workflow-templates/.github/workflows/dbt_run_template.yml@main
with:
dbt_command: >
dbt seed; dbt run -s tag:s3_helper tag:s3_load+ tag:api --exclude streamline__s3_sync
dbt seed; dbt run -s tag:helper tag:load+ tag:api --exclude streamline__s3_sync
environment: workflow_prod
warehouse: ${{ vars.WAREHOUSE }}
secrets: inherit

View File

@ -41,7 +41,7 @@ jobs:
dbt deps
- name: Run DBT Jobs
run: |
dbt test -s tag:s3_load+
dbt test -s tag:load+
continue-on-error: true
- name: Log test results

View File

@ -168,10 +168,10 @@ If data needs to be re-run for some reason, partitions of data can be re-reun th
Any data refresh will need to be done in a batch due to the nature of the receipt x tx hash mapping. The view `silver__receipt_tx_hash_mapping` is a recursive AncestryTree that follows linked receipt outcome ids to map all receipts generated in a transaction back to the primary hash. Receipts can be generated many blocks after the transaction occurs, so a generous buffer is required to ensure all receipts are captured.
Models in the `streamline` folder can be run with standard incremental logic up until the 2 final receipt and transaction tables. The next step, mapping receipts to tx hash over a range, can be run with the following command:
Models in the `streamline` folder can be run with standard incremental logic up until the 2 final receipt and transaction tables (tagged as such, see below). The next step, mapping receipts to tx hash over a range, can be run with the following command:
```
dbt run -s tag:s3_manual --vars '{"range_start": 82700000, "range_end": 82750000, "front_buffer": 1, "end_buffer": 1}' -t manual_fix_dev
dbt run -s tag:receipt_map tag:curated --vars '{"range_start": X, "range_end": Y, "front_buffer": 1, "end_buffer": 1}' -t manual_fix_dev
```
The target name will determine how the model operates, calling a macro `partition_load_manual()` which takes the variables input in the command to set the range.
@ -187,4 +187,29 @@ The target name will determine how the model operates, calling a macro `partitio
Actions and curated models include the conditional based on target name so the tags `s3_actions` and `s3_curated` can be included to re-run the fixed data in downstream silver models.
- if missing data is loaded in new, this is not necessary as `_load_timestamp` will be set to when the data hits snowflake and will flow through the standard incremental logic in the curated models.
#### Model Tags
To help with targeted refreshes, a number of tags have been applied to the models. These are defined below:
| Tag | Description |
| --- | --- |
| load | Runs models that load data into Snowflake from S3. The 2 `load_X` models are staging tables for data, which is then parsed and transformed up until the final txs/receipts models. |
| receipt_map | Runs the receipt-mapping models that must use a partition. This set of models cannot simply run with incremental logic due to the recursive tree used to map receipt IDs to Tx Hashes. |
| actions | Just the 3 action events models, an important set of intermediary models before curated activity. Note: These are also tagged with `s3_curated`. |
| curated | Models that are used to generate the curated tables |
Note: certain views, like `core__fact_blocks` are not tagged, so running all views in with `-s models/core/` is recommended after changes are made.
### Incremental Load Strategy
- TODO - comment this section
Include the following conditional, as targeted runs of block partitions may be required:
```
{% if target.name == 'manual_fix' or target.name == 'manual_fix_dev' %}
{{ partition_load_manual('no_buffer') }}
{% else %}
{{ incremental_load_filter('_load_timestamp') }}
{% endif %}
```

View File

@ -1,7 +1,7 @@
{{ config(
materialized = 'view',
secure = true,
tags = ['s3_curated']
tags = ['core']
) }}
SELECT

View File

@ -8,7 +8,7 @@
}
}
},
tags = ['s3_curated']
tags = ['core']
) }}
with staking_actions as (

View File

@ -8,7 +8,7 @@
}
}
},
tags = ['s3_curated']
tags = ['core']
) }}
WITH staking_pools AS (

View File

@ -1,7 +1,7 @@
{{ config(
materialized = 'view',
secure = true,
tags = ['s3_curated']
tags = ['core']
) }}
WITH token_labels AS (

View File

@ -8,7 +8,7 @@
}
}
},
tags = ['s3_curated']
tags = ['core']
) }}
WITH dex_swaps AS (

View File

@ -8,7 +8,7 @@
}
}
},
tags = ['s3_curated']
tags = ['core']
) }}
with mints as (

View File

@ -1,7 +1,7 @@
{{ config(
materialized = 'view',
secure = true,
tags = ['s3_curated']
tags = ['core']
) }}
WITH transactions AS (

View File

@ -1,6 +1,7 @@
{{ config(
materialized = 'view',
secure = true
secure = true,
tags = ['core']
) }}
WITH actions AS (

View File

@ -1,6 +1,7 @@
{{ config(
materialized = 'view',
secure = true
secure = true,
tags = ['core']
) }}
WITH actions_events_function_call AS (

View File

@ -1,6 +1,7 @@
{{ config(
materialized = 'view',
secure = true
secure = true,
tags = ['core']
) }}
WITH blocks AS (

View File

@ -8,7 +8,7 @@
}
}
},
tags = ['s3_curated']
tags = ['core']
) }}
WITH oracle_prices AS (

View File

@ -1,6 +1,7 @@
{{ config(
materialized = 'view',
secure = true
secure = true,
tags = ['core']
) }}
WITH receipts AS (

View File

@ -6,7 +6,8 @@
'PURPOSE': 'DEFI, TOKENS'
}
}
}
},
tags = ['core']
) }}
WITH nearblocks_ft_api AS (

View File

@ -1,6 +1,7 @@
{{ config(
materialized = 'view',
secure = true
secure = true,
tags = ['core']
) }}
WITH transactions AS (

View File

@ -1,7 +1,7 @@
{{ config(
materialized = 'view',
secure = true,
tags = ['s3_curated']
tags = ['core']
) }}
WITH transfers AS (

View File

@ -1,6 +1,6 @@
{{ config(
materialized = 'view',
tags = ['s3_curated', 'social']
tags = ['core', 'social']
) }}
SELECT

View File

@ -1,6 +1,6 @@
{{ config(
materialized = 'view',
tags = ['s3_curated', 'social']
tags = ['core', 'social']
) }}
SELECT

View File

@ -1,6 +1,6 @@
{{ config(
materialized = 'view',
tags = ['s3_curated', 'social']
tags = ['core', 'social']
) }}
SELECT

View File

@ -3,7 +3,7 @@
incremental_strategy = 'delete+insert',
unique_key = 'action_id',
cluster_by = ['block_timestamp::DATE', '_load_timestamp::DATE'],
tags = ['s3_actions']
tags = ['actions', 'curated']
) }}
WITH action_events AS (

View File

@ -3,7 +3,7 @@
incremental_strategy = 'delete+insert',
cluster_by = ['block_timestamp::DATE', '_load_timestamp::DATE'],
unique_key = 'action_id',
tags = ['s3_actions']
tags = ['actions', 'curated']
) }}
WITH action_events AS (

View File

@ -3,7 +3,7 @@
incremental_strategy = 'delete+insert',
cluster_by = ['block_timestamp::DATE', '_load_timestamp::DATE'],
unique_key = 'action_id',
tags = ['s3_actions']
tags = ['actions', 'curated']
) }}
WITH receipts AS (

View File

@ -3,7 +3,7 @@
unique_key = "swap_id",
incremental_strategy = "delete+insert",
cluster_by = ["block_timestamp::DATE", "_load_timestamp::DATE"],
tags = ['curated', 's3_curated']
tags = ['curated']
) }}
WITH base_swap_calls AS (

View File

@ -3,7 +3,7 @@
cluster_by = ["block_timestamp::DATE", "_load_timestamp::DATE"],
unique_key = "CONCAT_WS('-', action_id, nft_address)",
incremental_strategy = "delete+insert",
tags = ['curated', 's3_curated']
tags = ['curated']
) }}
--Data pulled from action_events_function_call
WITH function_call AS (

View File

@ -3,7 +3,7 @@
unique_key = 'block_id',
incremental_strategy = 'delete+insert',
cluster_by = ['block_timestamp::DATE', '_load_timestamp::DATE'],
tags = ['curated', 's3_curated']
tags = ['curated']
) }}
WITH txs AS (

View File

@ -3,7 +3,7 @@
incremental = 'merge',
cluster_by = ['block_timestamp'],
unique_key = 'tx_hash',
tags = ['curated', 's3_curated']
tags = ['curated']
) }}
WITH actions_events_function_call AS (

View File

@ -3,7 +3,7 @@
cluster_by = ['block_timestamp'],
unique_key = 'tx_hash',
incremental_strategy = 'merge',
tags = ['curated', 's3_curated']
tags = ['curated']
) }}
WITH txs AS (

View File

@ -3,7 +3,7 @@
cluster_by = ['block_timestamp::DATE', '_load_timestamp::DATE'],
unique_key = 'action_id',
incremental_strategy = 'delete+insert',
tags = ['curated', 's3_curated']
tags = ['curated']
) }}
WITH action_events AS(

View File

@ -3,7 +3,7 @@
unique_key = 'tx_hash',
incremental_strategy = 'delete+insert',
cluster_by = ['block_timestamp::DATE', '_load_timestamp::DATE'],
tags = ['curated', 's3_curated']
tags = ['curated']
) }}
WITH txs AS (

View File

@ -3,7 +3,7 @@
unique_key = '_res_id',
incremental_strategy = 'merge',
cluster_by = ['_inserted_timestamp::date', 'token_contract'],
tags = ['curated', 'api']
tags = ['api']
) }}
WITH nearblocks_token_api AS (

View File

@ -1,7 +1,6 @@
{{ config(
materialized = 'view',
unique_key = 'token_contract',
tags = ['curated']
unique_key = 'token_contract'
) }}
WITH labels_seed AS (

View File

@ -3,7 +3,8 @@
incremental_strategy = 'delete+insert',
cluster_by = ['block_timestamp::DATE', '_inserted_timestamp::DATE'],
unique_key = 'action_id',
tags = ['curated', 'curated_rpc']
tags = ['curated_rpc'],
enabled = False
) }}
WITH txs AS (

View File

@ -3,8 +3,8 @@
incremental_strategy = 'delete+insert',
unique_key = 'action_id',
cluster_by = ['block_timestamp::DATE', '_inserted_timestamp::DATE'],
tags = ['curated', 'curated_rpc']
tags = ['curated_rpc'],
enabled = False
) }}
WITH action_events AS (

View File

@ -3,8 +3,8 @@
incremental_strategy = 'delete+insert',
cluster_by = ['block_timestamp::DATE', '_inserted_timestamp::DATE'],
unique_key = 'action_id',
tags = ['curated', 'curated_rpc']
tags = ['curated_rpc'],
enabled = False
) }}
WITH action_events AS (

View File

@ -3,8 +3,8 @@
unique_key = "swap_id",
incremental_strategy = "delete+insert",
cluster_by = ["block_timestamp::DATE", "_inserted_timestamp::DATE"],
tags = ['curated', 'curated_rpc']
tags = ['curated_rpc'],
enabled = False
) }}
WITH base_swap_calls AS (

View File

@ -1,85 +1,95 @@
{{
config(
materialized="incremental",
cluster_by=["block_timestamp::DATE", "_inserted_timestamp::DATE"],
unique_key="action_id",
incremental_strategy="delete+insert",
tags = ['curated', 'curated_rpc']
{{ config(
materialized = "incremental",
cluster_by = ["block_timestamp::DATE", "_inserted_timestamp::DATE"],
unique_key = "action_id",
incremental_strategy = "delete+insert",
tags = ['curated_rpc'],
enabled = False
) }}
--Data pulled from action_events_function_call
WITH function_call AS (
)
}}
--Data pulled from action_events_function_call
with
function_call as (
select
action_id,
tx_hash,
block_id,
block_timestamp,
try_parse_json(args) as args_json,
method_name,
deposit / pow(10, 24) as deposit,
_inserted_timestamp,
case
when args_json:receiver_id is not null
then args_json:receiver_id::string
when args_json:receiver_ids is not null
then args_json:receiver_ids::string
end as project_name,
case
when args_json:token_series_id is not null
then try_parse_json(args_json:token_series_id)::string
when args_json:token_owner_id is not null
then try_parse_json(args_json:token_series_id)::string
end as nft_id,
try_parse_json(args_json:token_id)::string as token_id
from {{ ref("silver__actions_events_function_call") }}
where
method_name in ('nft_mint', 'nft_mint_batch')
and {{ incremental_load_filter("_inserted_timestamp") }}
),
--Data Pulled from Transaction
mint_transactions as (
select
tx_hash,
tx_signer,
tx_receiver,
transaction_fee / pow(10, 24) as network_fee,
tx_status
-- tx:actions[0]:functioncall:method_name::string as method_name
from {{ ref("silver__transactions") }}
where
tx_hash in (select distinct tx_hash from function_call)
and tx_status = 'Success'
and {{ incremental_load_filter("_inserted_timestamp") }}
),
--Data pulled from Receipts Table
receipts_data as (
select
tx_hash,
receipt_index,
receipt_object_id as receipt_id,
receipt_outcome_id::string as receipt_outcome_id,
receiver_id,
gas_burnt
from {{ ref("silver__receipts") }}
where
tx_hash in (select distinct tx_hash from function_call)
and {{ incremental_load_filter("_inserted_timestamp") }}
)
select distinct
action_id,
SELECT
action_id,
tx_hash,
block_id,
block_timestamp,
TRY_PARSE_JSON(args) AS args_json,
method_name,
deposit / pow(
10,
24
) AS deposit,
_inserted_timestamp,
CASE
WHEN args_json :receiver_id IS NOT NULL THEN args_json :receiver_id :: STRING
WHEN args_json :receiver_ids IS NOT NULL THEN args_json :receiver_ids :: STRING
END AS project_name,
CASE
WHEN args_json :token_series_id IS NOT NULL THEN TRY_PARSE_JSON(
args_json :token_series_id
) :: STRING
WHEN args_json :token_owner_id IS NOT NULL THEN TRY_PARSE_JSON(
args_json :token_series_id
) :: STRING
END AS nft_id,
TRY_PARSE_JSON(
args_json :token_id
) :: STRING AS token_id
FROM
{{ ref("silver__actions_events_function_call") }}
WHERE
method_name IN (
'nft_mint',
'nft_mint_batch'
)
AND {{ incremental_load_filter("_inserted_timestamp") }}
),
--Data Pulled from Transaction
mint_transactions AS (
SELECT
tx_hash,
tx_signer,
tx_receiver,
transaction_fee / pow(
10,
24
) AS network_fee,
tx_status -- tx:actions[0]:functioncall:method_name::string as method_name
FROM
{{ ref("silver__transactions") }}
WHERE
tx_hash IN (
SELECT
DISTINCT tx_hash
FROM
function_call
)
AND tx_status = 'Success'
AND {{ incremental_load_filter("_inserted_timestamp") }}
),
--Data pulled from Receipts Table
receipts_data AS (
SELECT
tx_hash,
receipt_index,
receipt_object_id AS receipt_id,
receipt_outcome_id :: STRING AS receipt_outcome_id,
receiver_id,
gas_burnt
FROM
{{ ref("silver__receipts") }}
WHERE
tx_hash IN (
SELECT
DISTINCT tx_hash
FROM
function_call
)
AND {{ incremental_load_filter("_inserted_timestamp") }}
)
SELECT
DISTINCT action_id,
function_call.tx_hash,
block_id,
block_timestamp,
@ -90,12 +100,14 @@ select distinct
project_name,
token_id,
nft_id,
receipts_data.receiver_id as nft_address,
receipts_data.receiver_id AS nft_address,
network_fee,
tx_status
from function_call
left join mint_transactions on function_call.tx_hash = mint_transactions.tx_hash
left join receipts_data on function_call.tx_hash = receipts_data.tx_hash
where tx_status is not null
FROM
function_call
LEFT JOIN mint_transactions
ON function_call.tx_hash = mint_transactions.tx_hash
LEFT JOIN receipts_data
ON function_call.tx_hash = receipts_data.tx_hash
WHERE
tx_status IS NOT NULL

View File

@ -3,9 +3,8 @@
unique_key = 'block_id',
incremental_strategy = 'delete+insert',
cluster_by = ['block_timestamp::DATE', '_inserted_timestamp::DATE'],
tags = ['curated', 'curated_rpc']
tags = ['curated_rpc'],
enabled = False
) }}
WITH txs AS (

View File

@ -3,142 +3,157 @@
incremental = 'merge',
cluster_by = ['block_timestamp'],
unique_key = 'tx_hash',
tags = ['curated', 'curated_rpc']
tags = ['curated_rpc'],
enabled = False
) }}
with actions_events_function_call as (
select
WITH actions_events_function_call AS (
SELECT
tx_hash,
method_name,
_inserted_timestamp
from {{ ref('silver__actions_events_function_call') }}
where {{ incremental_load_filter('_inserted_timestamp') }}
and method_name in (
'deposit_and_stake',
'stake',
'unstake',
'unstake_all'
)
FROM
{{ ref('silver__actions_events_function_call') }}
WHERE
{{ incremental_load_filter('_inserted_timestamp') }}
AND method_name IN (
'deposit_and_stake',
'stake',
'unstake',
'unstake_all'
)
),
base_txs as (
select
base_txs AS (
SELECT
*
from {{ ref('silver__transactions') }}
where {{ incremental_load_filter('_inserted_timestamp') }}
FROM
{{ ref('silver__transactions') }}
WHERE
{{ incremental_load_filter('_inserted_timestamp') }}
),
txs as (
select
txs AS (
SELECT
*
from base_txs
where (tx_receiver like '%.pool.near' or tx_receiver like '%.poolv1.near')
FROM
base_txs
WHERE
(
tx_receiver LIKE '%.pool.near'
OR tx_receiver LIKE '%.poolv1.near'
)
),
pool_txs as (
select
txs.tx_hash as tx_hash,
pool_txs AS (
SELECT
txs.tx_hash AS tx_hash,
block_timestamp,
tx_receiver,
tx_signer,
tx,
method_name,
txs._inserted_timestamp as _inserted_timestamp
from txs
inner join actions_events_function_call
on txs.tx_hash = actions_events_function_call.tx_hash
txs._inserted_timestamp AS _inserted_timestamp
FROM
txs
INNER JOIN actions_events_function_call
ON txs.tx_hash = actions_events_function_call.tx_hash
),
deposit_and_stake_txs as (
select
deposit_and_stake_txs AS (
SELECT
tx_hash,
block_timestamp,
tx_receiver as pool_address,
tx_receiver AS pool_address,
tx_signer,
regexp_substr(array_to_string(tx:receipt[0]:outcome:logs, ','), 'staking (\\d+)', 1, 1, 'e')::number as stake_amount,
'Stake' as action,
REGEXP_SUBSTR(ARRAY_TO_STRING(tx :receipt [0] :outcome :logs, ','), 'staking (\\d+)', 1, 1, 'e') :: NUMBER AS stake_amount,
'Stake' AS action,
_inserted_timestamp
from pool_txs
where method_name = 'deposit_and_stake'
and tx:receipt[0]:outcome:status:SuccessValue is not null
FROM
pool_txs
WHERE
method_name = 'deposit_and_stake'
AND tx :receipt [0] :outcome :status :SuccessValue IS NOT NULL
),
stake_txs as (
select
stake_txs AS (
SELECT
tx_hash,
block_timestamp,
tx_receiver as pool_address,
tx_receiver AS pool_address,
tx_signer,
regexp_substr(array_to_string(tx:receipt[0]:outcome:logs, ','), 'staking (\\d+)', 1, 1, 'e')::number as stake_amount,
'Stake' as action,
REGEXP_SUBSTR(ARRAY_TO_STRING(tx :receipt [0] :outcome :logs, ','), 'staking (\\d+)', 1, 1, 'e') :: NUMBER AS stake_amount,
'Stake' AS action,
_inserted_timestamp
from pool_txs
where method_name = 'stake'
and tx:receipt[0]:outcome:status:SuccessValue is not null
FROM
pool_txs
WHERE
method_name = 'stake'
AND tx :receipt [0] :outcome :status :SuccessValue IS NOT NULL
),
stake_all_txs as (
select
stake_all_txs AS (
SELECT
tx_hash,
block_timestamp,
tx_receiver as pool_address,
tx_receiver AS pool_address,
tx_signer,
regexp_substr(array_to_string(tx:receipt[0]:outcome:logs, ','), 'staking (\\d+)', 1, 1, 'e')::number as stake_amount,
'Stake' as action,
REGEXP_SUBSTR(ARRAY_TO_STRING(tx :receipt [0] :outcome :logs, ','), 'staking (\\d+)', 1, 1, 'e') :: NUMBER AS stake_amount,
'Stake' AS action,
_inserted_timestamp
from pool_txs
where method_name = 'stake_all'
and tx:receipt[0]:outcome:status:SuccessValue is not null
FROM
pool_txs
WHERE
method_name = 'stake_all'
AND tx :receipt [0] :outcome :status :SuccessValue IS NOT NULL
),
unstake_txs as (
select
unstake_txs AS (
SELECT
tx_hash,
block_timestamp,
tx_receiver as pool_address,
tx_receiver AS pool_address,
tx_signer,
regexp_substr(array_to_string(tx:receipt[0]:outcome:logs, ','), 'unstaking (\\d+)', 1, 1, 'e')::number as stake_amount,
'Unstake' as action,
REGEXP_SUBSTR(ARRAY_TO_STRING(tx :receipt [0] :outcome :logs, ','), 'unstaking (\\d+)', 1, 1, 'e') :: NUMBER AS stake_amount,
'Unstake' AS action,
_inserted_timestamp
from pool_txs
where method_name = 'unstake'
and tx:receipt[0]:outcome:status:SuccessValue is not null
FROM
pool_txs
WHERE
method_name = 'unstake'
AND tx :receipt [0] :outcome :status :SuccessValue IS NOT NULL
),
unstake_all_txs as (
select
unstake_all_txs AS (
SELECT
tx_hash,
block_timestamp,
tx_receiver as pool_address,
tx_receiver AS pool_address,
tx_signer,
regexp_substr(array_to_string(tx:receipt[0]:outcome:logs, ','), 'unstaking (\\d+)', 1, 1, 'e')::number as stake_amount,
'Unstake' as action,
REGEXP_SUBSTR(ARRAY_TO_STRING(tx :receipt [0] :outcome :logs, ','), 'unstaking (\\d+)', 1, 1, 'e') :: NUMBER AS stake_amount,
'Unstake' AS action,
_inserted_timestamp
from pool_txs
where method_name = 'unstake_all'
and tx:receipt[0]:outcome:status:SuccessValue is not null
FROM
pool_txs
WHERE
method_name = 'unstake_all'
AND tx :receipt [0] :outcome :status :SuccessValue IS NOT NULL
),
final as (
select
FINAL AS (
SELECT
*
from deposit_and_stake_txs
union
select
FROM
deposit_and_stake_txs
UNION
SELECT
*
from stake_all_txs
union
select
FROM
stake_all_txs
UNION
SELECT
*
from unstake_txs
union
select
FROM
unstake_txs
UNION
SELECT
*
from unstake_all_txs
FROM
unstake_all_txs
)
select
*
from final
SELECT
*
FROM
FINAL

View File

@ -1,66 +1,88 @@
{{
config(
materialized = 'incremental',
cluster_by = ['block_timestamp'],
unique_key = 'tx_hash',
incremental_strategy = 'merge',
tags = ['curated', 'curated_rpc']
{{ config(
materialized = 'incremental',
cluster_by = ['block_timestamp'],
unique_key = 'tx_hash',
incremental_strategy = 'merge',
tags = ['curated_rpc'],
enabled = False
) }}
WITH txs AS (
)
}}
with txs as (
select
SELECT
tx_hash,
block_timestamp,
tx_signer,
tx_receiver,
tx,
_inserted_timestamp
from {{ ref('silver__transactions') }}
where {{ incremental_load_filter('_inserted_timestamp') }}
FROM
{{ ref('silver__transactions') }}
WHERE
{{ incremental_load_filter('_inserted_timestamp') }}
),
function_calls as (
select
function_calls AS (
SELECT
tx_hash,
args,
method_name,
_inserted_timestamp
from {{ ref('silver__actions_events_function_call') }}
where method_name in ('create_staking_pool', 'update_reward_fee_fraction')
and {{ incremental_load_filter('_inserted_timestamp') }}
FROM
{{ ref('silver__actions_events_function_call') }}
WHERE
method_name IN (
'create_staking_pool',
'update_reward_fee_fraction'
)
AND {{ incremental_load_filter('_inserted_timestamp') }}
),
pool_txs as (
select
txs.tx_hash as tx_hash,
pool_txs AS (
SELECT
txs.tx_hash AS tx_hash,
block_timestamp,
tx_signer,
tx_receiver,
args,
method_name,
tx,
txs._inserted_timestamp as _inserted_timestamp
from txs
inner join function_calls
on txs.tx_hash = function_calls.tx_hash
where tx:receipt[0]:outcome:status:SuccessValue is not null
or (method_name = 'create_staking_pool'
and tx:receipt[0]:outcome:status:SuccessReceiptId is not null
and tx:receipt[1]:outcome:status:SuccessValue is not null)
txs._inserted_timestamp AS _inserted_timestamp
FROM
txs
INNER JOIN function_calls
ON txs.tx_hash = function_calls.tx_hash
WHERE
tx :receipt [0] :outcome :status :SuccessValue IS NOT NULL
OR (
method_name = 'create_staking_pool'
AND tx :receipt [0] :outcome :status :SuccessReceiptId IS NOT NULL
AND tx :receipt [1] :outcome :status :SuccessValue IS NOT NULL
)
),
final as (
select
pool_txs.tx_hash as tx_hash,
FINAL AS (
SELECT
pool_txs.tx_hash AS tx_hash,
block_timestamp,
iff(method_name = 'create_staking_pool', args::variant::object:owner_id, tx_signer) as owner,
iff(method_name = 'create_staking_pool', tx:receipt[1]:outcome:executor_id::text, tx_receiver) as address,
args::variant::object:reward_fee_fraction as reward_fee_fraction,
iff(method_name = 'create_staking_pool', 'Create', 'Update') as tx_type,
IFF(
method_name = 'create_staking_pool',
args :: variant :: OBJECT :owner_id,
tx_signer
) AS owner,
IFF(
method_name = 'create_staking_pool',
tx :receipt [1] :outcome :executor_id :: text,
tx_receiver
) AS address,
args :: variant :: OBJECT :reward_fee_fraction AS reward_fee_fraction,
IFF(
method_name = 'create_staking_pool',
'Create',
'Update'
) AS tx_type,
_inserted_timestamp
from pool_txs
FROM
pool_txs
)
select * from final
SELECT
*
FROM
FINAL

View File

@ -3,9 +3,8 @@
cluster_by = ['block_timestamp::DATE', '_inserted_timestamp::DATE'],
unique_key = 'action_id',
incremental_strategy = 'delete+insert',
tags = ['curated', 'curated_rpc']
tags = ['curated_rpc'],
enabled = False
) }}
WITH action_events AS(

View File

@ -3,9 +3,8 @@
unique_key = 'tx_hash',
incremental_strategy = 'delete+insert',
cluster_by = ['block_timestamp::DATE', '_inserted_timestamp::DATE'],
tags = ['curated', 'curated_rpc']
tags = ['curated_rpc'],
enabled = False
) }}
WITH txs AS (

View File

@ -3,7 +3,8 @@
unique_key = 'block_id',
incremental_strategy = 'delete+insert',
cluster_by = ['block_timestamp::DATE', '_inserted_timestamp::DATE'],
tags = ['rpc']
tags = ['rpc'],
enabled = False
) }}
WITH base_blocks AS (

View File

@ -3,7 +3,8 @@
unique_key = 'receipt_object_id',
incremental_strategy = 'delete+insert',
cluster_by = ['block_timestamp::DATE', '_inserted_timestamp::DATE'],
tags = ['rpc']
tags = ['rpc'],
enabled = False
) }}
WITH txs AS (

View File

@ -3,7 +3,8 @@
unique_key = 'tx_hash',
incremental_strategy = 'delete+insert',
cluster_by = ['block_timestamp::DATE', '_inserted_timestamp::DATE'],
tags = ['rpc']
tags = ['rpc'],
enabled = False
) }}
WITH base_transactions AS (

View File

@ -2,7 +2,7 @@
materialized = 'incremental',
unique_key = 'action_id',
cluster_by = ['block_timestamp::date'],
tags = ['s3_curated', 'social']
tags = ['curated', 'social']
) }}
WITH receipts AS (

View File

@ -2,7 +2,7 @@
materialized = 'incremental',
unique_key = 'action_id_social',
cluster_by = ['_load_timestamp::date', '_partition_by_block_number'],
tags = ['s3_curated', 'social']
tags = ['curated', 'social']
) }}
WITH all_social_receipts AS (

View File

@ -2,7 +2,7 @@
materialized = 'incremental',
unique_key = 'action_id_profile',
cluster_by = ['block_timestamp::date', 'signer_id'],
tags = ['s3_curated', 'social']
tags = ['curated', 'social']
) }}
WITH decoded_actions AS (

View File

@ -2,7 +2,7 @@
materialized = 'incremental',
unique_key = 'receipt_object_id',
cluster_by = ['_load_timestamp::date', '_partition_by_block_number'],
tags = ['s3_curated', 'social']
tags = ['curated', 'social']
) }}
WITH all_social_receipts AS (

View File

@ -1,6 +1,6 @@
{{ config(
materialized = 'view',
tags = ['s3', 's3_helper', 's3_manual']
tags = ['helper', 'receipt_map']
) }}
WITH receipts AS (

View File

@ -1,7 +1,7 @@
{{ config(
materalized = 'view',
unique_key = 'receipt_id',
tags = ['s3', 's3_helper', 's3_manual']
tags = ['helper', 'receipt_map']
) }}
WITH recursive ancestrytree AS (

View File

@ -4,7 +4,7 @@
cluster_by = ['_partition_by_block_number', '_load_timestamp::DATE'],
unique_key = 'block_id',
full_refresh = False,
tags = ['s3_load']
tags = ['load']
) }}
WITH blocks_json AS (
@ -18,8 +18,9 @@ WITH blocks_json AS (
FROM
{{ ref('bronze__streamline_blocks') }}
WHERE
_partition_by_block_number = 86980000 and block_id = 86989998
{# {{ partition_batch_load(150000) }}
OR #}
OR
(
_partition_by_block_number IN (
SELECT
@ -34,10 +35,10 @@ WITH blocks_json AS (
{{ target.database }}.tests.streamline_block_gaps
)
)
OR (
_partition_by_block_number BETWEEN 86640000
AND 86690000
)
OR ( #}
{# _partition_by_block_number BETWEEN 86640000
AND 86700000 #}
{# ) #}
)
SELECT
*

View File

@ -4,7 +4,7 @@
cluster_by = ['_partition_by_block_number', '_load_timestamp::DATE'],
unique_key = 'shard_id',
full_refresh = False,
tags = ['s3_load']
tags = ['load']
) }}
WITH shards_json AS (
@ -23,10 +23,12 @@ WITH shards_json AS (
_partition_by_block_number
FROM
{{ ref('bronze__streamline_shards') }}
WHERE
where _partition_by_block_number = 86980000
and block_id = 86989998
{# {{ partition_batch_load(150000) }}
OR #}
(
{#
_partition_by_block_number IN (
SELECT
DISTINCT _partition_by_block_number
@ -41,8 +43,8 @@ WITH shards_json AS (
LATERAL FLATTEN(
input => blocks_to_walk
)
)
)
) #}
)
SELECT
*

View File

@ -3,7 +3,7 @@
incremental_strategy = 'delete+insert',
cluster_by = ['_partition_by_block_number', '_load_timestamp::DATE'],
unique_key = 'block_id',
tags = ['s3_load']
tags = ['load']
) }}
WITH blocksjson AS (

View File

@ -3,7 +3,7 @@
incremental_strategy = 'merge',
unique_key = 'receipt_id',
cluster_by = ['_load_timestamp::date', 'block_id'],
tags = ['s3', 'receipts']
tags = ['load']
) }}
WITH chunks AS (

View File

@ -3,7 +3,7 @@
incremental_strategy = 'merge',
unique_key = 'chunk_hash',
cluster_by = ['_load_timestamp::date','height_created','height_included'],
tags = ['s3']
tags = ['load']
) }}
WITH shards AS (

View File

@ -3,7 +3,7 @@
incremental_strategy = 'merge',
unique_key = 'receipt_execution_outcome_id',
cluster_by = ['_load_timestamp::date','block_id','chunk_hash'],
tags = ['s3']
tags = ['load']
) }}
WITH shards AS (

View File

@ -3,7 +3,7 @@
incremental_strategy = 'merge',
unique_key = 'receipt_id',
cluster_by = ['_load_timestamp::date', 'block_id'],
tags = ['s3', 'receipts']
tags = ['load']
) }}
WITH receipt_execution_outcomes AS (

View File

@ -3,7 +3,7 @@
incremental_strategy = 'delete+insert',
unique_key = 'receipt_object_id',
cluster_by = ['_load_timestamp::date', 'block_id'],
tags = ['s3_final', 's3_manual']
tags = ['receipt_map']
) }}
WITH base_receipts AS (

View File

@ -3,7 +3,7 @@
incremental_strategy = 'delete+insert',
cluster_by = ['_partition_by_block_number', '_load_timestamp::DATE'],
unique_key = ['shard_id'],
tags = ['s3_load']
tags = ['load']
) }}
WITH shardsjson AS (

View File

@ -3,7 +3,7 @@
incremental_strategy = 'merge',
unique_key = 'tx_hash',
cluster_by = ['_load_timestamp::date', 'block_id', 'tx_hash'],
tags = ['s3']
tags = ['load']
) }}
WITH chunks AS (

View File

@ -3,7 +3,7 @@
unique_key = 'tx_hash',
incremental_strategy = 'delete+insert',
cluster_by = ['_load_timestamp::date', 'block_timestamp::date'],
tags = ['s3_final', 's3_manual']
tags = ['receipt_map']
) }}
WITH int_txs AS (

View File

@ -1,6 +1,6 @@
{{ config(
materialized = 'view',
tags = ['streamline', 's3_copy'],
tags = ['s3_copy'],
post_hook = "select * from {{this.schema}}.{{this.identifier}}",
comment = "incrementally sync Streamline.near_dev.blocks and Streamline.near_dev.shards"
) }}

View File

@ -1,27 +0,0 @@
{{ config(
enabled = False
) }}
WITH silver_blocks AS (
SELECT
block_id,
block_timestamp,
block_hash,
prev_hash,
LAG(block_hash) over (
ORDER BY
block_timestamp ASC,
block_id ASC
) AS prior_hash
FROM
{{ ref('silver__blocks') }}
WHERE
block_timestamp < CURRENT_DATE - 1
)
SELECT
*
FROM
silver_blocks
WHERE
prior_hash <> prev_hash

View File

@ -6,10 +6,10 @@ WITH block_chunks_included AS (
SELECT
block_id,
VALUE :header :chunks_included AS chunks_included,
header :chunks_included AS chunks_included,
_partition_by_block_number
FROM
{{ ref('silver__load_blocks') }}
{{ ref('silver__streamline_blocks') }}
),
chunks_per_block AS (
SELECT