AN-4799/Backfill Migration (#329)

* move models to bronze

* blocks backfill logic ez ez ez

* streamline_load models for all 4 methods

* add back a set of views for the complete models to use

* del dbt run history gha workflow

* migrate txs final to modified ts

* upd events to modified

* set default val for LOAD_BACKFILL_VERSION in project yml

* update curated model incr logic
This commit is contained in:
Jack Forgash 2024-06-12 12:10:27 -06:00 committed by GitHub
parent f3e6d0bfab
commit 12bf4e2e86
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
52 changed files with 691 additions and 207 deletions

View File

@ -1,53 +0,0 @@
name: dbt_run_history
run-name: dbt_run_history
on:
workflow_dispatch:
schedule:
# Runs every 2 hours
# - cron: "0 */2 * * *"
# Runs every hour
- cron: "0 * * * *"
env:
USE_VARS: "${{ vars.USE_VARS }}"
DBT_PROFILES_DIR: "${{ vars.DBT_PROFILES_DIR }}"
DBT_VERSION: "${{ vars.DBT_VERSION }}"
ACCOUNT: "${{ vars.ACCOUNT }}"
ROLE: "${{ vars.ROLE }}"
USER: "${{ vars.USER }}"
PASSWORD: "${{ secrets.PASSWORD }}"
REGION: "${{ vars.REGION }}"
DATABASE: "${{ vars.DATABASE }}"
WAREHOUSE: "${{ vars.WAREHOUSE }}"
SCHEMA: "${{ vars.SCHEMA }}"
concurrency:
group: ${{ github.workflow }}
jobs:
dbt:
runs-on: ubuntu-latest
environment:
name: workflow_prod
steps:
- uses: actions/checkout@v3
- uses: actions/setup-python@v4
with:
python-version: "3.10"
cache: "pip"
- name: install dependencies
run: |
pip install -r requirements.txt
dbt deps
- name: Run DBT Jobs
run: |
dbt run -s \
2+streamline__get_transaction_results_history_mainnet_16 \
2+streamline__get_transaction_results_history_mainnet_17 \
2+streamline__get_transaction_results_history_mainnet_18 \
2+streamline__get_transaction_results_history_mainnet_19 \
2+streamline__get_transaction_results_history_mainnet_22 \
--vars '{"STREAMLINE_INVOKE_STREAMS": True}'

View File

@ -66,6 +66,7 @@ vars:
REST_API_PREFIX_PROD: quxfxtl934.execute-api.us-east-1.amazonaws.com/prod/
REST_API_PREFIX_DEV: ul6x832e8l.execute-api.us-east-1.amazonaws.com/dev/
STREAMLINE_START_BLOCK: 55114467
LOAD_BACKFILL_VERSION: CANDIDATE_07
dispatch:
- macro_namespace: dbt

View File

@ -0,0 +1,54 @@
# Backfill
Bronze backfill models have been parametrized to load one network version at a time, as each set of NVs and method responses is a separate bucket and external table.
Run either an individual model type (blocks, collections, transactions, transaction_results) or all 4 at once with `tag:streamline_load`.
```shell
dbt run -s 1+tag:streamline_load --vars '{"LOAD_BACKFILL": True, "LOAD_BACKFILL_VERSION": "<NV>"}'
```
## Valid Network Versions
- CANDIDATE_07
- CANDIDATE_08
- CANDIDATE_09
- MAINNET_01
- MAINNET_02
- MAINNET_03
- MAINNET_04
- MAINNET_05
- MAINNET_06
- MAINNET_07
- MAINNET_08
- MAINNET_09
- MAINNET_10
- MAINNET_11
- MAINNET_12
- MAINNET_13
- MAINNET_14
- MAINNET_15
- MAINNET_16
- MAINNET_17
- MAINNET_18
- MAINNET_19
- MAINNET_20
- MAINNET_21
- MAINNET_22
## View Types
Views with the word `complete` in the name are used in the complete history models at `models/silver/streamline/core/complete`. These use a macro to scan multiple external tables in one call, and feed the streamline backfill process.
The views `bronze__streamline_<method>_history` query just one network version based on the `LOAD_BACKFILL_VERSION` argument passed at runtime. No default is set for this variable so execution fails if it is forgottten.
## Running Streamline Backfill
If a a network version requires more backfill due to missing blocks or transactions (at present, there are 5800 missing transaction results), run the following command as the workflow dbt_run_history has been deleted.
```shell
dbt run -s 2+streamline__get_<method>_history_<network_version> --vars '{"STREAMLINE_INVOKE_STREAMS": True}'
```
i.e.
```shell
dbt run -s \
2+streamline__get_transaction_results_history_mainnet_22 \
--vars '{"STREAMLINE_INVOKE_STREAMS": True}'
```

View File

@ -0,0 +1,36 @@
{{ config (
materialized = 'ephemeral'
) }}
{% set history_model = "BLOCKS_" ~ var('LOAD_BACKFILL_VERSION') %}
WITH meta AS (
SELECT
registered_on AS _inserted_timestamp,
file_name,
CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 3), '_', 1) AS INTEGER) AS _partition_by_block_id
FROM
TABLE(
information_schema.external_table_files(
table_name => '{{ source( "bronze_streamline", history_model ) }}'
)
) A
)
SELECT
block_number,
DATA,
_inserted_timestamp,
MD5(
CAST(
COALESCE(CAST(block_number AS text), '' :: STRING) AS text
)
) AS _fsc_id,
s._partition_by_block_id,
s.value AS VALUE
FROM
{{ source("bronze_streamline", history_model ) }} s
JOIN meta b
ON b.file_name = metadata$filename
AND b._partition_by_block_id = s._partition_by_block_id
WHERE
b._partition_by_block_id = s._partition_by_block_id

View File

@ -0,0 +1,37 @@
{{ config (
materialized = 'ephemeral'
) }}
{% set history_model = "COLLECTIONS_" ~ var('LOAD_BACKFILL_VERSION') %}
WITH meta AS (
SELECT
registered_on AS _inserted_timestamp,
file_name,
CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 3), '_', 1) AS INTEGER) AS _partition_by_block_id
FROM
TABLE(
information_schema.external_table_files(
table_name => '{{ source( "bronze_streamline", history_model ) }}'
)
) A
)
SELECT
block_number,
id,
DATA,
_inserted_timestamp,
MD5(
CAST(
COALESCE(CAST(block_number AS text), '' :: STRING) AS text
)
) AS _fsc_id,
s._partition_by_block_id,
s.value AS VALUE
FROM
{{ source("bronze_streamline", history_model ) }} s
JOIN meta b
ON b.file_name = metadata$filename
AND b._partition_by_block_id = s._partition_by_block_id
WHERE
b._partition_by_block_id = s._partition_by_block_id

View File

@ -0,0 +1,37 @@
{{ config (
materialized = 'ephemeral'
) }}
{% set history_model = "TRANSACTION_RESULTS_" ~ var('LOAD_BACKFILL_VERSION') %}
WITH meta AS (
SELECT
registered_on AS _inserted_timestamp,
file_name,
CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 3), '_', 1) AS INTEGER) AS _partition_by_block_id
FROM
TABLE(
information_schema.external_table_files(
table_name => '{{ source( "bronze_streamline", history_model ) }}'
)
) A
)
SELECT
block_number,
id,
DATA,
_inserted_timestamp,
MD5(
CAST(
COALESCE(CAST(block_number AS text), '' :: STRING) AS text
)
) AS _fsc_id,
s._partition_by_block_id,
s.value AS VALUE
FROM
{{ source("bronze_streamline", history_model ) }} s
JOIN meta b
ON b.file_name = metadata$filename
AND b._partition_by_block_id = s._partition_by_block_id
WHERE
b._partition_by_block_id = s._partition_by_block_id

View File

@ -0,0 +1,37 @@
{{ config (
materialized = 'ephemeral'
) }}
{% set history_model = "TRANSACTIONS_" ~ var('LOAD_BACKFILL_VERSION') %}
WITH meta AS (
SELECT
registered_on AS _inserted_timestamp,
file_name,
CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 3), '_', 1) AS INTEGER) AS _partition_by_block_id
FROM
TABLE(
information_schema.external_table_files(
table_name => '{{ source( "bronze_streamline", history_model ) }}'
)
) A
)
SELECT
block_number,
id,
DATA,
_inserted_timestamp,
MD5(
CAST(
COALESCE(CAST(block_number AS text), '' :: STRING) AS text
)
) AS _fsc_id,
s._partition_by_block_id,
s.value AS VALUE
FROM
{{ source("bronze_streamline", history_model ) }} s
JOIN meta b
ON b.file_name = metadata$filename
AND b._partition_by_block_id = s._partition_by_block_id
WHERE
b._partition_by_block_id = s._partition_by_block_id

View File

@ -0,0 +1,3 @@
# Streamline Realtime Models
Flow migrated to Streamline with Mainnet-23. These views point to the external tables from there forward.

View File

@ -1,4 +1,6 @@
-- depends_on: {{ ref('bronze__streamline_blocks') }}
-- depends_on: {{ ref('bronze__streamline_blocks_history') }}
{{ config(
materialized = 'incremental',
unique_key = "block_number",
@ -10,7 +12,7 @@
WITH
{% if is_incremental() %}
{% if is_incremental() and not var('LOAD_BACKFILL', False) %}
tx_count_lookback AS (
-- lookback to ensure tx count is correct
@ -21,7 +23,7 @@ tx_count_lookback AS (
WHERE
block_height >= {{ var(
'STREAMLINE_START_BLOCK'
) }}
) }} -- TODO, remove AFTER backfill is complete
-- limit to 3 day lookback for performance
AND _inserted_timestamp >= SYSDATE() - INTERVAL '3 days'
AND (
@ -47,7 +49,11 @@ streamline_blocks AS (
_partition_by_block_id,
_inserted_timestamp
FROM
{% if var('LOAD_BACKFILL', False) %}
{{ ref('bronze__streamline_blocks_history') }}
-- TODO need incremental logic of some sort probably (for those 5800 missing txs)
-- where inserted timestamp >= max from this where network version = backfill version OR block range between root and end
{% else %}
{% if is_incremental() %}
{{ ref('bronze__streamline_blocks') }}
WHERE
@ -66,6 +72,7 @@ WHERE
{% else %}
{{ ref('bronze__streamline_fr_blocks') }}
{% endif %}
{% endif %}
qualify(ROW_NUMBER() over (PARTITION BY block_number
ORDER BY
@ -86,7 +93,14 @@ collections AS (
*
FROM
{{ ref('silver__streamline_collections') }}
{% if var('LOAD_BACKFILL', False) %}
WHERE
block_number between (
SELECT root_height FROM network_version WHERE lower(network_version) = lower('{{ var('LOAD_BACKFILL_VERSION').replace('_', '-') }}')
) AND (
SELECT end_height FROM network_version WHERE lower(network_version) = lower('{{ var('LOAD_BACKFILL_VERSION').replace('_', '-') }}')
)
{% else %}
{% if is_incremental() %}
WHERE
_inserted_timestamp >= (
@ -102,6 +116,7 @@ WHERE
tx_count_lookback
)
{% endif %}
{% endif %}
),
tx_count AS (
SELECT

View File

@ -25,6 +25,12 @@ SELECT
'{{ invocation_id }}' AS _invocation_id
FROM
{% if var('LOAD_BACKFILL', False) %}
{{ ref('bronze__streamline_collections_history') }}
-- TODO need incremental logic of some sort probably (for those 5800 missing txs)
-- where inserted timestamp >= max from this where network version = backfill version OR block range between root and end
{% else %}
{% if is_incremental() %}
{{ ref('bronze__streamline_collections') }}
WHERE
@ -38,6 +44,8 @@ WHERE
{{ ref('bronze__streamline_fr_collections') }}
{% endif %}
{% endif %}
qualify(ROW_NUMBER() over (PARTITION BY collection_id
ORDER BY
_inserted_timestamp DESC)) = 1

View File

@ -14,12 +14,12 @@ WITH transactions AS (
FROM
{{ ref('silver__streamline_transactions_final') }}
WHERE
NOT pending_result_response -- inserted timestamp will update w TR ingestion, so should flow thru to events and curated
NOT pending_result_response
{% if is_incremental() %}
AND _inserted_timestamp >= (
AND modified_timestamp >= (
SELECT
MAX(_inserted_timestamp)
MAX(modified_timestamp)
FROM
{{ this }}
)

View File

@ -26,6 +26,12 @@ SELECT
'{{ invocation_id }}' AS _invocation_id
FROM
{% if var('LOAD_BACKFILL', False) %}
{{ ref('bronze__streamline_transaction_results_history') }}
-- TODO need incremental logic of some sort probably (for those 5800 missing txs)
-- where inserted timestamp >= max from this where network version = backfill version OR block range between root and end
{% else %}
{% if is_incremental() %}
{{ ref('bronze__streamline_transaction_results') }}
WHERE
@ -39,6 +45,8 @@ WHERE
{{ ref('bronze__streamline_fr_transaction_results') }}
{% endif %}
{% endif %}
qualify(ROW_NUMBER() over (PARTITION BY tx_id
ORDER BY
_inserted_timestamp DESC)) = 1

View File

@ -30,6 +30,12 @@ SELECT
_inserted_timestamp
FROM
{% if var('LOAD_BACKFILL', False) %}
{{ ref('bronze__streamline_transactions_history') }}
-- TODO need incremental logic of some sort probably (for those 5800 missing txs)
-- where inserted timestamp >= max from this where network version = backfill version OR block range between root and end
{% else %}
{% if is_incremental() %}
{{ ref('bronze__streamline_transactions') }}
WHERE
@ -43,6 +49,8 @@ WHERE
{{ ref('bronze__streamline_fr_transactions') }}
{% endif %}
{% endif %}
qualify(ROW_NUMBER() over (PARTITION BY tx_id
ORDER BY
_inserted_timestamp DESC)) = 1

View File

@ -17,7 +17,7 @@
block_height
FROM """ ~ this ~ """
WHERE
_inserted_timestamp >= SYSDATE() - INTERVAL '3 days'
modified_timestamp >= SYSDATE() - INTERVAL '3 days'
AND (
block_timestamp IS NULL
OR pending_result_response
@ -32,15 +32,15 @@
{% if is_incremental() %}
{% set incr = """
WHERE
_inserted_timestamp >= (
modified_timestamp >= (
SELECT
MAX(_inserted_timestamp) _inserted_timestamp
MAX(modified_timestamp) modified_timestamp
FROM
""" ~ this ~ """
)
OR -- re-run record if block comes in later than tx records
(
_inserted_timestamp >= SYSDATE() - INTERVAL '3 days'
modified_timestamp >= SYSDATE() - INTERVAL '3 days'
AND
tx_id IN (
SELECT
@ -60,7 +60,7 @@
Coalesce in case there are 0 txs returned by the temp table
*/
{% if execute %}
{% set min_time = run_query("select coalesce(min(_inserted_timestamp),current_timestamp()) from silver.streamline_transactions_final_intermediate_tmp").columns [0].values() [0] %}
{% set min_time = run_query("select coalesce(min(modified_timestamp),sysdate()) from silver.streamline_transactions_final_intermediate_tmp").columns [0].values() [0] %}
{% endif %}
WITH txs AS (
@ -78,7 +78,7 @@ tx_results AS (
{% if is_incremental() %}
WHERE
_inserted_timestamp >= SYSDATE() - INTERVAL '3 days'
modified_timestamp >= SYSDATE() - INTERVAL '3 days'
AND tx_id IN (
SELECT
DISTINCT tx_id
@ -95,7 +95,7 @@ blocks AS (
{% if is_incremental() %}
WHERE
_inserted_timestamp >= SYSDATE() - INTERVAL '3 days'
modified_timestamp >= SYSDATE() - INTERVAL '3 days'
AND block_number IN (
SELECT
DISTINCT block_number

View File

@ -9,16 +9,20 @@
WITH swaps_txs AS (
SELECT
*
block_height,
tx_id,
_inserted_timestamp,
_partition_by_block_id,
modified_timestamp
FROM
{{ ref('silver__streamline_events') }}
WHERE
event_contract LIKE '%SwapPair%'
{% if is_incremental() %}
AND _inserted_timestamp >= (
AND modified_timestamp >= (
SELECT
MAX(_inserted_timestamp)
MAX(modified_timestamp)
FROM
{{ this }}
)
@ -26,7 +30,19 @@ AND _inserted_timestamp >= (
),
swap_events AS (
SELECT
*
tx_id,
block_height,
block_timestamp,
event_id,
event_index,
events_count,
payload,
event_contract,
event_type,
event_data,
tx_succeeded,
_inserted_timestamp,
_partition_by_block_id
FROM
{{ ref('silver__streamline_events') }}
WHERE
@ -40,9 +56,9 @@ swap_events AS (
AND event_index < events_count - 3
{% if is_incremental() %}
AND _inserted_timestamp >= (
AND modified_timestamp >= (
SELECT
MAX(_inserted_timestamp)
MAX(modified_timestamp)
FROM
{{ this }}
)

View File

@ -9,15 +9,23 @@
WITH events AS (
SELECT
*
block_height,
block_timestamp,
tx_id,
event_index,
event_contract,
event_type,
event_data,
_inserted_timestamp,
_partition_by_block_id
FROM
{{ ref('silver__swaps_events_s') }}
{% if is_incremental() %}
WHERE
_inserted_timestamp >= (
modified_timestamp >= (
SELECT
MAX(_inserted_timestamp)
MAX(modified_timestamp)
FROM
{{ this }}
)
@ -112,7 +120,8 @@ token_withdraws AS (
) - 1 AS unique_order,
event_contract,
event_data,
_inserted_timestamp
_inserted_timestamp,
_partition_by_block_id
FROM
swap_events
WHERE
@ -166,6 +175,7 @@ link_token_movement AS (
w.block_timestamp,
w.block_height,
w._inserted_timestamp,
w._partition_by_block_id,
-- set transfer index based on execution via deposit, not withdraw, event
RANK() over (
PARTITION BY w.tx_id
@ -283,6 +293,7 @@ boilerplate AS (
block_timestamp,
block_height,
_inserted_timestamp,
_partition_by_block_id,
withdraw_from AS trader
FROM
link_token_movement
@ -310,6 +321,7 @@ FINAL AS (
tokens :token1 :: STRING AS token_in_contract,
amounts :amount1 :: DOUBLE AS token_in_amount,
_inserted_timestamp,
_partition_by_block_id,
{{ dbt_utils.generate_surrogate_key(
['tx_id', 'swap_index']
) }} AS swaps_id,

View File

@ -9,15 +9,23 @@
WITH swaps_events AS (
SELECT
*
block_height,
block_timestamp,
tx_id,
event_index,
event_contract,
event_type,
event_data,
_inserted_timestamp,
_partition_by_block_id
FROM
{{ ref('silver__swaps_events_s') }}
{% if is_incremental() %}
WHERE
_inserted_timestamp >= (
modified_timestamp >= (
SELECT
MAX(_inserted_timestamp)
MAX(modified_timestamp)
FROM
{{ this }}
)
@ -97,7 +105,8 @@ token_out_data AS (
LOWER(
event_data :from :: STRING
) AS trader_token_out,
_inserted_timestamp
_inserted_timestamp,
_partition_by_block_id
FROM
index_id ii
LEFT JOIN swaps_single_trade sst USING (
@ -160,6 +169,7 @@ combo AS (
td.token_1_amount,
td.token_2_amount,
tod._inserted_timestamp,
tod._partition_by_block_id,
{{ dbt_utils.generate_surrogate_key(
['tx_id']
) }} AS swaps_single_trade_id,

View File

@ -14,15 +14,17 @@ WITH splt AS (
event_contract,
'.'
) AS ec_s,
_inserted_timestamp
_inserted_timestamp,
_partition_by_block_id,
modified_timestamp AS _modified_timestamp
FROM
{{ ref('silver__streamline_events') }}
{% if is_incremental() %}
WHERE
_inserted_timestamp >= (
modified_timestamp >= (
SELECT
MAX(_inserted_timestamp) _inserted_timestamp
MAX(modified_timestamp)
FROM
{{ this }}
)
@ -36,7 +38,9 @@ FINAL AS (
'0x',
ec_s [array_size(ec_s)-2] :: STRING
) AS account_address,
_inserted_timestamp
_inserted_timestamp,
_partition_by_block_id,
_modified_timestamp
FROM
splt
WHERE
@ -54,5 +58,5 @@ FROM
FINAL qualify ROW_NUMBER() over (
PARTITION BY event_contract
ORDER BY
_inserted_timestamp DESC
_modified_timestamp DESC
) = 1

View File

@ -9,15 +9,24 @@
WITH events AS (
SELECT
*
block_height,
block_timestamp,
tx_id,
event_index,
event_type,
event_contract,
event_data,
_inserted_timestamp,
_partition_by_block_id,
modified_timestamp
FROM
{{ ref('silver__streamline_events') }}
{% if is_incremental() %}
WHERE
_inserted_timestamp >= (
modified_timestamp >= (
SELECT
MAX(_inserted_timestamp)
MAX(modified_timestamp)
FROM
{{ this }}
)
@ -56,7 +65,8 @@ token_withdraws AS (
block_timestamp,
event_contract,
event_index,
_inserted_timestamp
_inserted_timestamp,
_partition_by_block_id
FROM
pier_events
WHERE
@ -73,7 +83,8 @@ pairs AS (
ORDER BY
event_index
) AS token1_contract,
_inserted_timestamp
_inserted_timestamp,
_partition_by_block_id
FROM
token_withdraws
),
@ -106,6 +117,7 @@ FINAL AS (
e.vault_address,
pa.swap_contract,
C._inserted_timestamp,
C._partition_by_block_id,
{{ dbt_utils.generate_surrogate_key(
['tx_id']
) }} AS labels_pools_metapier_id,

View File

@ -9,15 +9,24 @@
WITH events AS (
SELECT
*
block_height,
block_timestamp,
tx_id,
event_index,
event_type,
event_contract,
event_data,
_inserted_timestamp,
_partition_by_block_id,
modified_timestamp
FROM
{{ ref('silver__streamline_events') }}
{% if is_incremental() %}
WHERE
_inserted_timestamp >= (
modified_timestamp >= (
SELECT
MAX(_inserted_timestamp)
MAX(modified_timestamp)
FROM
{{ this }}
)
@ -40,7 +49,8 @@ pair_creation AS (
event_data :pairAddress :: STRING AS account_address,
event_data :token0Key :: STRING AS token0_contract,
event_data :token1Key :: STRING AS token1_contract,
_inserted_timestamp
_inserted_timestamp,
_partition_by_block_id
FROM
events
WHERE
@ -61,7 +71,8 @@ FINAL AS (
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp,
'{{ invocation_id }}' AS _invocation_id,
_inserted_timestamp
_inserted_timestamp,
_partition_by_block_id
FROM
pair_creation p
LEFT JOIN pair_labels l USING (account_address)

View File

@ -9,7 +9,17 @@
WITH events AS (
SELECT
*
block_height,
block_timestamp,
tx_id,
event_id,
event_index,
event_type,
event_contract,
event_data,
_inserted_timestamp,
_partition_by_block_id,
modified_timestamp
FROM
{{ ref('silver__streamline_events') }}
WHERE
@ -21,9 +31,9 @@ WITH events AS (
)
{% if is_incremental() %}
AND _inserted_timestamp >= (
AND modified_timestamp >= (
SELECT
MAX(_inserted_timestamp)
MAX(modified_timestamp)
FROM
{{ this }}
)
@ -32,6 +42,7 @@ AND _inserted_timestamp >= (
org AS (
SELECT
tx_id,
event_id,
block_timestamp,
event_contract,
event_data :id :: STRING AS edition_id,
@ -40,11 +51,18 @@ org AS (
event_data :seriesID :: STRING AS series_id,
event_data :setID :: STRING AS set_id,
event_data :tier :: STRING AS tier,
_inserted_timestamp
_inserted_timestamp,
_partition_by_block_id
FROM
events
)
SELECT
*
*,
{{ dbt_utils.generate_surrogate_key(
['event_id']
) }} AS nft_moment_editions_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp,
'{{ invocation_id }}' AS _invocation_id
FROM
org

View File

@ -9,7 +9,17 @@
WITH play_creation AS (
SELECT
*
block_height,
block_timestamp,
tx_id,
event_id,
event_index,
event_type,
event_contract,
event_data,
_inserted_timestamp,
_partition_by_block_id,
modified_timestamp
FROM
{{ ref('silver__streamline_events') }}
WHERE
@ -28,9 +38,9 @@ A.87ca73a41bb50ad5.Golazos
{% if is_incremental() %}
AND _inserted_timestamp >= (
AND modified_timestamp >= (
SELECT
MAX(_inserted_timestamp)
MAX(modified_timestamp)
FROM
{{ this }}
)
@ -39,12 +49,14 @@ AND _inserted_timestamp >= (
play_metadata AS (
SELECT
tx_id,
event_id,
block_timestamp,
event_contract,
event_data :id :: NUMBER AS play_id,
VALUE :key :value :: STRING AS column_header,
VALUE :value :value :: STRING AS column_value,
_inserted_timestamp
_inserted_timestamp,
_partition_by_block_id
FROM
play_creation,
LATERAL FLATTEN(input => TRY_PARSE_JSON(event_data :metadata))
@ -52,10 +64,12 @@ play_metadata AS (
FINAL AS (
SELECT
tx_id,
event_id,
block_timestamp,
event_contract,
play_id,
_inserted_timestamp,
_partition_by_block_id,
OBJECT_AGG(
column_header :: variant,
column_value :: variant
@ -67,9 +81,17 @@ FINAL AS (
2,
3,
4,
5
5,
6,
7
)
SELECT
*
*,
{{ dbt_utils.generate_surrogate_key(
['event_id']
) }} AS nft_moment_metadata_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp,
'{{ invocation_id }}' AS _invocation_id
FROM
FINAL

View File

@ -9,16 +9,26 @@
WITH events AS (
SELECT
*
block_height,
block_timestamp,
tx_id,
event_id,
event_index,
event_type,
event_contract,
event_data,
_inserted_timestamp,
_partition_by_block_id,
modified_timestamp
FROM
{{ ref('silver__streamline_events') }}
WHERE
event_type = 'MomentMinted'
{% if is_incremental() %}
AND _inserted_timestamp >= (
AND modified_timestamp >= (
SELECT
MAX(_inserted_timestamp)
MAX(modified_timestamp)
FROM
{{ this }}
)
@ -27,17 +37,25 @@ AND _inserted_timestamp >= (
org AS (
SELECT
tx_id,
event_id,
block_timestamp,
event_contract,
event_data :momentID :: STRING AS moment_id,
event_data :serialNumber :: STRING AS serial_number,
event_data :seriesID :: STRING AS series_id,
event_data :setID :: STRING AS set_id,
_inserted_timestamp
_inserted_timestamp,
_partition_by_block_id
FROM
events
)
SELECT
*
*,
{{ dbt_utils.generate_surrogate_key(
['event_id']
) }} AS nft_moment_minted_2_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp,
'{{ invocation_id }}' AS _invocation_id
FROM
org

View File

@ -9,16 +9,26 @@
WITH events AS (
SELECT
*
block_height,
block_timestamp,
tx_id,
event_id,
event_index,
event_type,
event_contract,
event_data,
_inserted_timestamp,
_partition_by_block_id,
modified_timestamp
FROM
{{ ref('silver__streamline_events') }}
WHERE
event_type = 'MomentNFTMinted'
{% if is_incremental() %}
AND _inserted_timestamp >= (
AND modified_timestamp >= (
SELECT
MAX(_inserted_timestamp)
MAX(modified_timestamp)
FROM
{{ this }}
)
@ -27,16 +37,24 @@ AND _inserted_timestamp >= (
org AS (
SELECT
tx_id,
event_id,
block_timestamp,
event_contract,
event_data :editionID :: STRING AS edition_id,
event_data :id :: STRING AS nft_id,
event_data :serialNumber :: STRING AS serial_number,
_inserted_timestamp
_inserted_timestamp,
_partition_by_block_id
FROM
events
)
SELECT
*
*,
{{ dbt_utils.generate_surrogate_key(
['event_id']
) }} AS nft_moment_minted_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp,
'{{ invocation_id }}' AS _invocation_id
FROM
org

View File

@ -9,7 +9,17 @@
WITH events AS (
SELECT
*
block_height,
block_timestamp,
tx_id,
event_id,
event_index,
event_type,
event_contract,
event_data,
_inserted_timestamp,
_partition_by_block_id,
modified_timestamp
FROM
{{ ref('silver__streamline_events') }}
WHERE
@ -17,9 +27,9 @@ WITH events AS (
AND ARRAY_CONTAINS('name' :: variant, object_keys(event_data))
{% if is_incremental() %}
AND _inserted_timestamp >= (
AND modified_timestamp >= (
SELECT
MAX(_inserted_timestamp)
MAX(modified_timestamp)
FROM
{{ this }}
)
@ -28,15 +38,23 @@ AND _inserted_timestamp >= (
org AS (
SELECT
tx_id,
event_id,
block_timestamp,
event_contract,
event_data :id :: STRING AS series_id,
event_data :name :: STRING AS series_name,
_inserted_timestamp
_inserted_timestamp,
_partition_by_block_id
FROM
events
)
SELECT
*
*,
{{ dbt_utils.generate_surrogate_key(
['event_id']
) }} AS nft_moment_series_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp,
'{{ invocation_id }}' AS _invocation_id
FROM
org

View File

@ -9,7 +9,17 @@
WITH events AS (
SELECT
*
block_height,
block_timestamp,
tx_id,
event_id,
event_index,
event_type,
event_contract,
event_data,
_inserted_timestamp,
_partition_by_block_id,
modified_timestamp
FROM
{{ ref('silver__streamline_events') }}
WHERE
@ -17,9 +27,9 @@ WITH events AS (
AND ARRAY_CONTAINS('name' :: variant, object_keys(event_data))
{% if is_incremental() %}
AND _inserted_timestamp >= (
AND modified_timestamp >= (
SELECT
MAX(_inserted_timestamp)
MAX(modified_timestamp)
FROM
{{ this }}
)
@ -28,17 +38,25 @@ AND _inserted_timestamp >= (
org AS (
SELECT
tx_id,
event_id,
block_timestamp,
event_contract,
event_data :id :: STRING AS set_id,
event_data :name :: STRING AS set_name,
_inserted_timestamp
_inserted_timestamp,
_partition_by_block_id
FROM
events
WHERE
set_id IS NOT NULL
)
SELECT
*
*,
{{ dbt_utils.generate_surrogate_key(
['event_id']
) }} AS nft_moment_set_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp,
'{{ invocation_id }}' AS _invocation_id
FROM
org

View File

@ -6,28 +6,21 @@
tags = ['nft', 'scheduled', 'streamline_scheduled', 'scheduled_non_core']
) }}
WITH events AS (
WITH moment_events AS (
SELECT
*
block_height,
block_timestamp,
tx_id,
event_id,
event_index,
event_type,
event_contract,
event_data,
_inserted_timestamp,
_partition_by_block_id
FROM
{{ ref('silver__streamline_events') }}
{% if is_incremental() %}
WHERE
_inserted_timestamp >= (
SELECT
MAX(_inserted_timestamp)
FROM
{{ this }}
)
{% endif %}
),
moment_events AS (
SELECT
*
FROM
events
WHERE
event_type IN (
'MomentPurchased',
@ -40,8 +33,23 @@ moment_events AS (
'MomentMinted',
'MomentNFTMinted'
)
{% if is_incremental() %}
AND
modified_timestamp >= (
SELECT
MAX(modified_timestamp)
FROM
{{ this }}
)
{% endif %}
)
SELECT
*
*,
{{ dbt_utils.generate_surrogate_key(
['event_id']
) }} AS nft_moments_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp,
'{{ invocation_id }}' AS _invocation_id
FROM
moment_events

View File

@ -6,6 +6,7 @@
) }}
{# Note - removed schedule tag as the legacy lambda workflow is inactive.
No need to query external table #}
{# Not updating incremental to modts logic due to above comment JMF 6/7/2024 #}
WITH metadata AS (
SELECT

View File

@ -9,15 +9,25 @@
WITH events AS (
SELECT
*
block_height,
block_timestamp,
tx_id,
tx_succeeded,
event_index,
event_type,
event_contract,
event_data,
_inserted_timestamp,
_partition_by_block_id,
modified_timestamp
FROM
{{ ref('silver__streamline_events') }}
{% if is_incremental() %}
WHERE
_inserted_timestamp >= (
modified_timestamp >= (
SELECT
MAX(_inserted_timestamp)
MAX(modified_timestamp)
FROM
{{ this }}
)
@ -36,9 +46,9 @@ mapped_sales AS (
{% if is_incremental() %}
WHERE
_inserted_timestamp >= (
modified_timestamp >= (
SELECT
MAX(_inserted_timestamp)
MAX(modified_timestamp)
FROM
{{ this }}
)
@ -54,7 +64,16 @@ duc AS (
),
duc_events AS (
SELECT
*
block_height,
block_timestamp,
tx_id,
event_index,
event_type,
event_contract,
event_data,
_inserted_timestamp,
_partition_by_block_id,
modified_timestamp
FROM
events
WHERE
@ -132,6 +151,7 @@ missing_contract AS (
block_height,
tx_succeeded,
_inserted_timestamp,
_partition_by_block_id,
event_contract AS currency,
event_data :amount :: DOUBLE AS amount,
event_data :from :: STRING AS forwarded_from,
@ -149,6 +169,7 @@ purchase_amt AS (
block_height,
tx_succeeded,
_inserted_timestamp,
_partition_by_block_id,
'A.ead892083b3e2c6c.DapperUtilityCoin' AS currency,
event_data :amount :: DOUBLE AS amount,
event_data :from :: STRING AS forwarded_from,
@ -212,6 +233,7 @@ gl_sales AS (
p.block_height,
p.tx_succeeded,
p._inserted_timestamp,
p._partition_by_block_id,
'Gigantik Primary Market' AS marketplace,
p.missing,
p.currency,
@ -266,7 +288,8 @@ giglabs_final AS (
withdraw_nft_id AS nft_id,
m.nfts,
tx_succeeded,
_inserted_timestamp
_inserted_timestamp,
_partition_by_block_id
FROM
gl_sales s
LEFT JOIN multi m USING (tx_id)
@ -330,6 +353,7 @@ FINAL AS (
counterparties,
tx_succeeded,
_inserted_timestamp,
_partition_by_block_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp,
'{{ invocation_id }}' AS _invocation_id

View File

@ -15,9 +15,9 @@ WITH topshot AS (
{% if is_incremental() %}
WHERE
_inserted_timestamp >= (
modified_timestamp >= (
SELECT
MAX(_inserted_timestamp)
MAX(modified_timestamp)
FROM
{{ this }}
)
@ -31,9 +31,9 @@ secondary_mkts AS (
{% if is_incremental() %}
WHERE
_inserted_timestamp >= (
modified_timestamp >= (
SELECT
MAX(_inserted_timestamp)
MAX(modified_timestamp)
FROM
{{ this }}
)
@ -47,9 +47,9 @@ giglabs AS (
{% if is_incremental() %}
WHERE
_inserted_timestamp >= (
modified_timestamp >= (
SELECT
MAX(_inserted_timestamp)
MAX(modified_timestamp)
FROM
{{ this }}
)
@ -69,6 +69,7 @@ combo AS (
currency,
tx_succeeded,
_inserted_timestamp,
_partition_by_block_id,
{{ dbt_utils.generate_surrogate_key(
['tx_id','seller', 'buyer', 'nft_collection', 'nft_id']
) }} AS nft_sales_id,
@ -79,7 +80,7 @@ combo AS (
counterparties
FROM
topshot
UNION
UNION ALL
SELECT
tx_id,
block_height,
@ -93,6 +94,7 @@ combo AS (
currency,
tx_succeeded,
_inserted_timestamp,
_partition_by_block_id,
{{ dbt_utils.generate_surrogate_key(
['tx_id','seller', 'buyer', 'nft_collection', 'nft_id']
) }} AS nft_sales_id,
@ -103,7 +105,7 @@ combo AS (
counterparties
FROM
secondary_mkts
UNION
UNION ALL
SELECT
tx_id,
block_height,
@ -117,6 +119,7 @@ combo AS (
currency,
tx_succeeded,
_inserted_timestamp,
_partition_by_block_id,
{{ dbt_utils.generate_surrogate_key(
['tx_id','seller', 'buyer', 'nft_collection', 'nft_id']
) }} AS nft_sales_id,

View File

@ -9,7 +9,17 @@
WITH silver_events AS (
SELECT
*
block_height,
block_timestamp,
tx_id,
tx_succeeded,
event_index,
event_type,
event_contract,
event_data,
_inserted_timestamp,
_partition_by_block_id,
modified_timestamp
FROM
{{ ref('silver__streamline_events') }}
-- WHERE
@ -17,9 +27,9 @@ WITH silver_events AS (
{% if is_incremental() %}
WHERE
_inserted_timestamp >= (
modified_timestamp >= (
SELECT
MAX(_inserted_timestamp)
MAX(modified_timestamp)
FROM
{{ this }}
)
@ -35,7 +45,8 @@ moment_data AS (
event_data :price :: DOUBLE AS price,
event_data :seller :: STRING AS seller,
tx_succeeded,
_inserted_timestamp
_inserted_timestamp,
_partition_by_block_id
FROM
silver_events
WHERE
@ -86,7 +97,8 @@ combo AS (
price,
currency,
tx_succeeded,
_inserted_timestamp
_inserted_timestamp,
_partition_by_block_id
FROM
moment_data
LEFT JOIN currency_data USING (tx_id)
@ -143,6 +155,7 @@ FINAL AS (
currency,
tx_succeeded,
_inserted_timestamp,
_partition_by_block_id,
{{ dbt_utils.generate_surrogate_key(
['tx_id']
) }} AS nft_topshot_sales_id,

View File

@ -9,15 +9,25 @@
WITH silver_events AS (
SELECT
*
block_height,
block_timestamp,
tx_id,
tx_succeeded,
event_index,
event_type,
event_contract,
event_data,
_inserted_timestamp,
_partition_by_block_id,
modified_timestamp
FROM
{{ ref('silver__streamline_events') }}
{% if is_incremental() %}
WHERE
_inserted_timestamp >= (
modified_timestamp >= (
SELECT
MAX(_inserted_timestamp)
MAX(modified_timestamp)
FROM
{{ this }}
)
@ -42,7 +52,8 @@ sale_trigger AS (
),
TRUE
) AS is_purchased,
_inserted_timestamp
_inserted_timestamp,
_partition_by_block_id
FROM
silver_events
WHERE
@ -389,7 +400,8 @@ nft_sales AS (
b.nft_collection_deposit,
b.nft_id_deposit,
b.buyer_deposit,
e._inserted_timestamp
e._inserted_timestamp,
e._partition_by_block_id
FROM
sale_trigger e
LEFT JOIN token_withdraw_event w USING (tx_id)
@ -470,7 +482,8 @@ FINAL AS (
cd.step_data,
cd.counterparties,
tx_succeeded,
_inserted_timestamp
_inserted_timestamp,
_partition_by_block_id
FROM
nft_sales ns
LEFT JOIN counterparty_data cd USING (tx_id)

View File

@ -9,15 +9,25 @@
WITH silver_events AS (
SELECT
*
block_height,
block_timestamp,
tx_id,
tx_succeeded,
event_index,
event_type,
event_contract,
event_data,
_inserted_timestamp,
_partition_by_block_id,
modified_timestamp
FROM
{{ ref('silver__streamline_events') }}
{% if is_incremental() %}
WHERE
_inserted_timestamp >= (
modified_timestamp >= (
SELECT
MAX(_inserted_timestamp)
MAX(modified_timestamp)
FROM
{{ this }}
)
@ -27,7 +37,7 @@ WHERE
FROM
{{ this }}
WHERE
_inserted_timestamp >= SYSDATE() - INTERVAL '3 days'
modified_timestamp >= SYSDATE() - INTERVAL '3 days'
AND delegator IS NULL
)
{% endif %}
@ -44,7 +54,8 @@ flow_staking AS (
event_data :amount :: FLOAT AS amount,
event_data :delegatorID :: STRING AS delegator_id,
event_data :nodeID :: STRING AS node_id,
_inserted_timestamp
_inserted_timestamp,
_partition_by_block_id
FROM
silver_events
WHERE
@ -77,9 +88,9 @@ add_auth AS (
{% if is_incremental() %}
AND (
_inserted_timestamp >= (
modified_timestamp >= (
SELECT
MAX(_inserted_timestamp)
MAX(modified_timestamp)
FROM
{{ this }}
)
@ -89,7 +100,7 @@ AND (
FROM
{{ this }}
WHERE
_inserted_timestamp >= SYSDATE() - INTERVAL '3 days'
modified_timestamp >= SYSDATE() - INTERVAL '3 days'
AND delegator IS NULL
)
)
@ -107,6 +118,7 @@ FINAL AS (
amount,
node_id,
_inserted_timestamp,
_partition_by_block_id,
{{ dbt_utils.generate_surrogate_key(
['tx_id', 'event_index', 'action']
) }} AS staking_actions_id,

View File

@ -1,4 +1,4 @@
-- depends_on: {{ ref('bronze__streamline_blocks_history') }}
-- depends_on: {{ ref('bronze__streamline_complete_blocks_history') }}
{{ config (
materialized = "incremental",
unique_key = "block_number",
@ -15,7 +15,7 @@ SELECT
_inserted_timestamp
FROM
{{ ref('bronze__streamline_blocks_history') }}
{{ ref('bronze__streamline_complete_blocks_history') }}
WHERE
TRUE

View File

@ -1,4 +1,4 @@
-- depends_on: {{ ref('bronze__streamline_collections_history') }}
-- depends_on: {{ ref('bronze__streamline_complete_collections_history') }}
{{ config (
materialized = "incremental",
unique_key = "id",
@ -16,7 +16,7 @@ SELECT
_inserted_timestamp
FROM
{{ ref('bronze__streamline_collections_history') }}
{{ ref('bronze__streamline_complete_collections_history') }}
WHERE
TRUE
{% if is_incremental() %}

View File

@ -1,4 +1,4 @@
-- depends_on: {{ ref('bronze__streamline_transaction_results_history') }}
-- depends_on: {{ ref('bronze__streamline_complete_transaction_results_history') }}
{{ config (
materialized = "incremental",
unique_key = "id",
@ -16,7 +16,7 @@ SELECT
_inserted_timestamp
FROM
{{ ref('bronze__streamline_transaction_results_history') }}
{{ ref('bronze__streamline_complete_transaction_results_history') }}
WHERE
TRUE
{% if is_incremental() %}

View File

@ -1,4 +1,4 @@
-- depends_on: {{ ref('bronze__streamline_transactions_history') }}
-- depends_on: {{ ref('bronze__streamline_complete_transactions_history') }}
{{ config (
materialized = "incremental",
unique_key = "id",
@ -16,7 +16,7 @@ SELECT
_inserted_timestamp
FROM
{{ ref('bronze__streamline_transactions_history') }}
{{ ref('bronze__streamline_complete_transactions_history') }}
WHERE
TRUE
{% if is_incremental() %}

View File

@ -9,7 +9,16 @@
WITH events AS (
SELECT
*
block_height,
block_timestamp,
tx_id,
event_index,
event_type,
event_contract,
event_data,
_inserted_timestamp,
_partition_by_block_id,
modified_timestamp
FROM
{{ ref('silver__streamline_events') }}
-- WHERE
@ -17,9 +26,9 @@ WITH events AS (
{% if is_incremental() %}
WHERE
_inserted_timestamp >= (
modified_timestamp >= (
SELECT
MAX(_inserted_timestamp)
MAX(modified_timestamp)
FROM
{{ this }}
)
@ -33,7 +42,8 @@ teleport_events AS (
event_contract AS teleport_contract_fee,
event_data :amount :: DOUBLE AS amount_fee,
event_data :type :: NUMBER AS teleport_direction,
_inserted_timestamp
_inserted_timestamp,
_partition_by_block_id
FROM
events
WHERE
@ -113,7 +123,8 @@ blocto_inbound AS (
d.to_deposits AS flow_wallet_address,
f.teleport_direction,
'blocto' AS bridge,
f._inserted_timestamp
f._inserted_timestamp,
f._partition_by_block_id
FROM
teleports_in t
LEFT JOIN deposits d USING (tx_id)
@ -217,7 +228,8 @@ blocto_outbound AS (
w.from_withdraw AS flow_wallet_address,
f.teleport_direction,
'blocto' AS bridge,
f._inserted_timestamp
f._inserted_timestamp,
f._partition_by_block_id
FROM
teleports_out t
LEFT JOIN teleports_out_withdraw w USING (tx_id)
@ -236,7 +248,8 @@ tbl_union AS (
flow_wallet_address,
teleport_direction,
bridge,
_inserted_timestamp
_inserted_timestamp,
_partition_by_block_id
FROM
blocto_inbound
UNION
@ -252,7 +265,8 @@ tbl_union AS (
flow_wallet_address,
teleport_direction,
bridge,
_inserted_timestamp
_inserted_timestamp,
_partition_by_block_id
FROM
blocto_outbound
),
@ -281,6 +295,7 @@ FINAL AS (
l.blockchain,
bridge,
_inserted_timestamp,
_partition_by_block_id,
{{ dbt_utils.generate_surrogate_key(
['tx_id']
) }} AS bridge_blocto_id,

View File

@ -9,7 +9,17 @@
WITH events AS (
SELECT
*
block_height,
block_timestamp,
tx_id,
tx_succeeded,
event_index,
event_type,
event_contract,
event_data,
_inserted_timestamp,
_partition_by_block_id,
modified_timestamp
FROM
{{ ref('silver__streamline_events') }}
-- WHERE
@ -17,9 +27,9 @@ WITH events AS (
{% if is_incremental() %}
WHERE
_inserted_timestamp >= (
modified_timestamp >= (
SELECT
MAX(_inserted_timestamp)
MAX(modified_timestamp)
FROM
{{ this }}
)
@ -35,7 +45,8 @@ cbridge_txs AS (
event_contract,
event_type,
event_data,
_inserted_timestamp
_inserted_timestamp,
_partition_by_block_id
FROM
events
WHERE
@ -54,7 +65,8 @@ inbound AS (
event_data :refChId :: NUMBER AS chain_id,
'inbound' AS direction,
'cbridge' AS bridge,
_inserted_timestamp
_inserted_timestamp,
_partition_by_block_id
FROM
events
WHERE
@ -82,7 +94,8 @@ outbound AS (
event_data :toChain :: NUMBER AS chain_id,
'outbound' AS direction,
'cbridge' AS bridge,
_inserted_timestamp
_inserted_timestamp,
_partition_by_block_id
FROM
events
WHERE
@ -107,7 +120,8 @@ tbl_union AS (
chain_id,
direction,
bridge,
_inserted_timestamp
_inserted_timestamp,
_partition_by_block_id
FROM
inbound
UNION
@ -123,7 +137,8 @@ tbl_union AS (
chain_id,
direction,
bridge,
_inserted_timestamp
_inserted_timestamp,
_partition_by_block_id
FROM
outbound
),
@ -149,6 +164,7 @@ FINAL AS (
direction,
bridge,
_inserted_timestamp,
_partition_by_block_id,
{{ dbt_utils.generate_surrogate_key(
['tx_id']
) }} AS bridge_celer_id,

View File

@ -9,7 +9,17 @@
WITH events AS (
SELECT
*
block_height,
block_timestamp,
tx_id,
tx_succeeded,
event_index,
event_type,
event_contract,
event_data,
_inserted_timestamp,
_partition_by_block_id,
modified_timestamp
FROM
{{ ref('silver__streamline_events') }}
-- WHERE
@ -17,9 +27,9 @@ WITH events AS (
{% if is_incremental() %}
WHERE
_inserted_timestamp >= (
modified_timestamp >= (
SELECT
MAX(_inserted_timestamp)
MAX(modified_timestamp)
FROM
{{ this }}
)
@ -144,6 +154,7 @@ FINAL AS (
)
SELECT
*,
round(block_height, -5) AS _partition_by_block_id,
{{ dbt_utils.generate_surrogate_key(
['tx_id','sender', 'recipient','token_contract', 'amount']
) }} AS token_transfers_id,