An 1903/signers (#119)

* signers table wip

* merge main and remove test addresses

* flatten signers and instructions arrays

* daily signers wip

* signers wip, some test failures

* incremental model changes

* incremental changes - dumped FR date limit

* exclude chain admin programs

* updates to incremental

* updates to tests

* incremental fixes

* and condition

* changes to incremental and formatting

* updates to incremental

* remove ver2 references

* remove unintended change

* add upper bound for _inserted_timestamp for downstream ctes

* do 1 day loads

* update docs

Co-authored-by: Desmond Hui <desmond@flipsidecrypto.com>
This commit is contained in:
Jessica Huhnke 2022-10-14 19:38:20 -05:00 committed by GitHub
parent 01d954f945
commit 8083bb8902
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 430 additions and 12 deletions

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

View File

@ -0,0 +1,249 @@
{{ config(
materialized = 'incremental',
unique_key = "CONCAT_WS('-', signer, b_date)",
incremental_strategy = 'delete+insert',
cluster_by = 'signer'
) }}
WITH dates_changed AS (
SELECT
DISTINCT block_timestamp :: DATE AS block_timestamp_date
FROM
{{ ref('silver__transactions') }}
{% if is_incremental() and env_var(
'DBT_IS_BATCH_LOAD',
"false"
) == "true" %}
WHERE
_inserted_timestamp :: DATE BETWEEN (
SELECT
LEAST(
DATEADD(
'day',
1,
COALESCE(MAX(_inserted_timestamp :: DATE), '2022-08-12')
),
CURRENT_DATE - 1
)
FROM
{{ this }}
)
AND (
SELECT
LEAST(
DATEADD(
'day',
1,
COALESCE(MAX(_inserted_timestamp :: DATE), '2022-08-12')
),
CURRENT_DATE - 1
)
FROM
{{ this }}
) {% elif is_incremental() %}
WHERE
_inserted_timestamp >= (
SELECT
MAX(_inserted_timestamp)
FROM
{{ this }}
)
{% else %}
WHERE
_inserted_timestamp :: DATE = '2022-08-12'
{% endif %}
),
b AS (
SELECT
s.value :: STRING AS signer,
block_timestamp :: DATE AS b_date,
FIRST_VALUE(tx_id) over (
PARTITION BY signer,
b_date
ORDER BY
block_timestamp
) AS first_tx,
LAST_VALUE(tx_id) over (
PARTITION BY signer,
b_date
ORDER BY
block_timestamp
) AS last_tx,*
FROM
{{ ref('silver__transactions') }}
t,
TABLE(FLATTEN(signers)) s
WHERE
b_date IN (
SELECT
block_timestamp_date
FROM
dates_changed
)
{% if is_incremental() and env_var(
'DBT_IS_BATCH_LOAD',
"false"
) == "true" %}
AND _inserted_timestamp <= (
SELECT
LEAST(
DATEADD(
'day',
1,
COALESCE(MAX(_inserted_timestamp :: DATE), '2022-08-12')
),
CURRENT_DATE - 1
)
FROM
{{ this }}
)
{% else %}
AND _inserted_timestamp :: DATE = '2022-08-12'
{% endif %}
),
C AS (
SELECT
tx_id,
program_id,
INDEX,
_inserted_timestamp
FROM
{{ ref('silver__events') }}
e
WHERE
e.block_timestamp :: DATE IN (
SELECT
block_timestamp_date
FROM
dates_changed
)
{% if is_incremental() and env_var(
'DBT_IS_BATCH_LOAD',
"false"
) == "true" %}
AND e._inserted_timestamp <= (
SELECT
LEAST(
DATEADD(
'day',
1,
COALESCE(MAX(_inserted_timestamp :: DATE), '2022-08-12')
),
CURRENT_DATE - 1
)
FROM
{{ this }}
)
{% else %}
AND e._inserted_timestamp :: DATE = '2022-08-12'
{% endif %}
),
base_programs AS (
SELECT
tx_id,
ARRAY_AGG(program_id) within GROUP (
ORDER BY
INDEX
) AS program_ids,
program_ids [0] :: STRING AS first_program_id,
program_ids [array_size(program_ids)-1] :: STRING AS last_program_id
FROM
C
GROUP BY
tx_id
),
first_last_programs AS (
SELECT
b.signer,
b.b_date,
b.tx_id,
FIRST_VALUE(first_program_id) over (
PARTITION BY signer,
b_date
ORDER BY
block_timestamp
) AS first_program_id,
LAST_VALUE(last_program_id) over (
PARTITION BY signer,
b_date
ORDER BY
block_timestamp
) AS last_program_id,
LAST_VALUE(
b._inserted_timestamp
) over (
PARTITION BY signer,
b_date
ORDER BY
block_timestamp
) AS _inserted_timestamp
FROM
b
LEFT OUTER JOIN base_programs p
ON p.tx_id = b.tx_id
),
final_programs AS (
SELECT
b.signer,
b.b_date,
b.first_program_id,
b.last_program_id,
array_union_agg(
p.program_ids
) AS unique_program_ids,
b._inserted_timestamp
FROM
first_last_programs b
LEFT OUTER JOIN base_programs p
ON p.tx_id = b.tx_id
GROUP BY
b.signer,
b.b_date,
b.first_program_id,
b.last_program_id,
b._inserted_timestamp
),
final_fees AS (
SELECT
signer,
b_date,
SUM(fee) AS total_fees
FROM
b
WHERE
INDEX = 0
GROUP BY
signer,
b_date
),
final_num_txs AS (
SELECT
signer,
b_date,
first_tx,
last_tx,
COUNT(*) AS num_txs
FROM
b
GROUP BY
signer,
b_date,
first_tx,
last_tx
)
SELECT
s.*,
f.total_fees,
p.first_program_id,
p.last_program_id,
p.unique_program_ids,
p._inserted_timestamp
FROM
final_num_txs s
LEFT OUTER JOIN final_fees f
ON f.signer = s.signer
AND f.b_date = s.b_date
LEFT OUTER JOIN final_programs p
ON p.signer = s.signer
AND p.b_date = s.b_date

View File

@ -0,0 +1,45 @@
version: 2
models:
- name: silver__daily_signers
tests:
- dbt_utils.unique_combination_of_columns:
combination_of_columns:
- signer
- b_date
columns:
- name: SIGNER
description: The address of the user that initiated the transaction
tests:
- not_null
- name: B_DATE
description: The date the information is valid for
tests:
- not_null
- name: FIRST_TX
description: The transaction ID of the first daily transaction.
tests:
- not_null
- name: LAST_TX
description: The transaction ID of the last daily transaction.
tests:
- not_null
- name: TOTAL_FEES
description: The total amount of fees (in lamports) that the signer has paid on a given day. This field can be null, as only the first signer pays transaction fees.
tests:
- not_null:
enabled: FALSE
- name: FIRST_PROGRAM_ID
description: The first program the signer interacted with. This field can be null, as some transactions do not have instructions.
tests:
- not_null:
enabled: FALSE
- name: LAST_PROGRAM_ID
description: The most recent program the signer interacted with. This field can be null, as some transactions do not have instructions.
tests:
- not_null:
enabled: FALSE
- name: UNIQUE_PROGRAM_IDS
description: An array containing all program IDs a user interacted with on a given day.
tests:
- not_null

View File

@ -0,0 +1,76 @@
{{ config(
materialized = 'incremental',
unique_key = "signer",
incremental_strategy = 'delete+insert',
cluster_by = 'signer'
) }}
WITH base_min_signers AS (
SELECT
signer,
min(b_date) AS b_date
FROM
{{ ref('silver__daily_signers') }}
GROUP BY
signer
),
base_max_signers AS (
SELECT
signer,
max(b_date) as b_date
FROM
{{ ref('silver__daily_signers') }}
GROUP BY
signer
),
final_signers_agg AS (
select
signer,
count(*) AS num_days_active,
sum(num_txs) AS num_txs,
array_union_agg(unique_program_ids) AS programs_used,
sum(total_fees) AS total_fees,
max(_inserted_timestamp) AS _inserted_timestamp
FROM
{{ ref('silver__daily_signers') }}
GROUP BY
signer
),
final_min_signers AS (
SELECT
ms.signer,
ms.b_date AS first_tx_date,
sd.first_program_id
FROM
base_min_signers ms
INNER JOIN {{ ref('silver__daily_signers') }} sd
ON sd.signer = ms.signer
AND sd.b_date = ms.b_date
),
final_max_signers AS (
SELECT
ms.signer,
ms.b_date AS last_tx_date,
sd.last_program_id
FROM base_max_signers ms
INNER JOIN {{ ref('silver__daily_signers') }} sd
ON sd.signer = ms.signer
AND sd.b_date = ms.b_date
)
SELECT
s_min.*,
s_max.last_tx_date,
s_max.last_program_id,
s_agg.num_days_active,
s_agg.num_txs,
s_agg.total_fees,
s_agg.programs_used,
s_agg._inserted_timestamp
FROM
final_min_signers s_min
JOIN final_max_signers s_max
ON s_max.signer = s_min.signer
JOIN final_signers_agg s_agg
ON s_agg.signer = s_min.signer

View File

@ -0,0 +1,48 @@
version: 2
models:
- name: silver__signers
tests:
- dbt_utils.unique_combination_of_columns:
combination_of_columns:
- signer
columns:
- name: SIGNER
description: The address of the user that initiated the transaction
tests:
- not_null
- name: FIRST_TX_DATE
description: The first date that the wallet performed a transaction on.
tests:
- not_null
- name: FIRST_PROGRAM_ID
description: The ID of the first program this signer interacted with, excluding chain admin programs.
tests:
- not_null:
enabled: FALSE
- name: LAST_TX_DATE
description: The date of the most recent transaction the signer has performed.
tests:
- not_null
- name: LAST_PROGRAM_ID
description: The ID of the last program this signer interacted with, excluding chain admin programs.
tests:
- not_null:
enabled: FALSE
- name: NUM_DAYS_ACTIVE
description: A count of the total number of unique days that this signer has performed a transaction.
tests:
- not_null
- name: NUM_TXS
description: The total number of distinct transactions initiated by this signer.
tests:
- not_null
- name: TOTAL_FEES
description: The total amount of fees (in lamports) that the signer has paid on a given day. This field can be null, as only the first signer pays fees in a transaction.
tests:
- not_null:
enabled: FALSE
- name: PROGRAMS_USED
description: An array containing all program IDs a user interacted with on a given day.
tests:
- not_null