This commit is contained in:
Desmond Hui 2022-02-28 15:55:44 -08:00
parent cb1e2666fb
commit 6a2115fe3e
6 changed files with 260 additions and 0 deletions

View File

@ -0,0 +1,90 @@
{{ config(
materialized = 'incremental',
unique_key = "CONCAT_WS('-', block_id, tx_id)",
incremental_strategy = 'delete+insert',
cluster_by = ['ingested_at::DATE'],
) }}
WITH base AS (
SELECT
block_timestamp,
block_id,
tx_id,
tx :transaction :messag :recentBlockhash AS recent_block_hash,
tx :meta :fee AS fee,
CASE
WHEN tx :meta :err IS NULL THEN TRUE
ELSE FALSE
END AS succeeded,
tx :transaction :message :accountKeys AS account_keys,
tx :meta :preBalances AS pre_balances,
tx :meta :postBalances AS post_balances,
tx :meta :preTokenBalances AS pre_token_balances,
tx :meta :postTokenBalances AS post_token_balances,
tx :transaction :message :instructions AS instructions,
tx :meta :innerInstructions AS inner_instructions,
ingested_at
FROM
{{ ref('bronze_solana__transactions') }}
t
WHERE
COALESCE(
tx :transaction :message :instructions [0] :programId :: STRING,
''
) <> 'Vote111111111111111111111111111111111111111'
{% if is_incremental() %}
AND ingested_at >= getdate() - INTERVAL '2 days'
{% endif %}
),
signers_flattened AS (
SELECT
b.tx_id,
A.value :pubkey :: STRING AS acct,
ROW_NUMBER() over (
PARTITION BY t.tx_id
ORDER BY
A.index DESC
) AS rn
FROM
base b,
TABLE(FLATTEN(b.account_keys)) A
WHERE
A.value :signer = TRUE
),
signers_arr AS (
SELECT
tx_id,
ARRAY_AGG(acct) AS signers
FROM
signers
GROUP BY
1
)
SELECT
block_timestamp,
block_id,
tx_id,
recent_block_hash,
s.signers AS signers,
sf.acct AS requestor,
fee,
succeeded,
ccount_keys,
pre_balances,
post_balances,
pre_token_balances,
post_token_balances,
instructions,
inner_instructions,
ingested_at
FROM
base b
LEFT OUTER JOIN signers_arr s
ON b.tx_id = s.tx_id
LEFT OUTER JOIN signers_flattened sf
ON b.tx_id = sf.tx_id
AND sf.rn = 1 qualify(ROW_NUMBER() over(PARTITION BY block_id, tx_id
ORDER BY
ingested_at DESC)) = 1

View File

@ -0,0 +1,50 @@
{{ config(
materialized = 'incremental',
unique_key = "CONCAT_WS('-', block_id, tx_id)",
incremental_strategy = 'delete+insert',
cluster_by = ['ingested_at::DATE'],
) }}
WITH base AS (
SELECT
block_timestamp,
block_id,
tx_id,
tx :transaction :messag :recentBlockhash AS recent_block_hash,
tx :meta :fee AS fee,
CASE
WHEN tx :meta :err IS NULL THEN TRUE
ELSE FALSE
END AS succeeded,
tx :transaction :message :instructions [0] :parsed :info :voteAccount :: STRING AS vote_account,
tx :transaction :message :instructions [0] :parsed :info :voteAuthority :: STRING AS vote_authority,
tx :transaction :message :instructions [0] :parsed :info :vote :voteHash :: STRING AS vote_hash,
tx :transaction :message :instructions [0] :parsed :info :vote :voteSlots :: ARRAY AS vote_slots,
ingested_at
FROM
{{ ref('bronze_solana__transactions') }}
t
WHERE
tx :transaction :message :instructions [0] :parsed :type :: STRING IS NOT NULL
AND tx :transaction :message :instructions [0] :programId :: STRING = 'Vote111111111111111111111111111111111111111'
{% if is_incremental() %}
AND ingested_at >= getdate() - INTERVAL '2 days'
{% endif %}
)
SELECT
block_timestamp,
block_id,
tx_id,
recent_block_hash,
succeeded,
vote_account,
vote_authority,
vote_hash,
vote_slots,
ingested_at
FROM
base_table qualify(ROW_NUMBER() over(PARTITION BY block_id, tx_id
ORDER BY
ingested_at DESC)) = 1

View File

@ -0,0 +1,31 @@
{{ config(
materialized = 'incremental',
unique_key = "CONCAT_WS('-', block_id, tx_id, mapped_event_index)",
incremental_strategy = 'delete+insert',
cluster_by = ['block_timestamp::DATE'],
) }}
SELECT
block_timestamp,
block_id,
tx_id,
e.index,
e.value :index :: NUMBER AS mapped_event_index,
e.value,
ingested_at
FROM
{{ ref('dbt_solana__transactions') }}
t,
TABLE(FLATTEN(innerInstructions)) AS e
WHERE
COALESCE(
e.value :programId :: STRING,
''
) NOT IN (
'FsJ3A3u2vn5cTVofAjvy6y5kwABJAqYWpe4975bi2epH',
'DtmE9D2CSB4L5D6A15mraeEjrGMm6auWVzgaD8hK2tZM'
)
{% if is_incremental() %}
AND ingested_at >= getdate() - INTERVAL '2 days'
{% endif %}

View File

@ -0,0 +1,31 @@
{{ config(
materialized = 'incremental',
unique_key = "CONCAT_WS('-', block_id, tx_id, index)",
incremental_strategy = 'delete+insert',
cluster_by = ['block_timestamp::DATE'],
) }}
SELECT
block_timestamp,
block_id,
tx_id,
e.index,
e.value :parsed :type :: STRING AS event_type,
e.value,
ingested_at
FROM
{{ ref('dbt_solana__transactions') }}
t,
TABLE(FLATTEN(instructions)) AS e
WHERE
COALESCE(
e.value :programId :: STRING,
''
) NOT IN (
'FsJ3A3u2vn5cTVofAjvy6y5kwABJAqYWpe4975bi2epH',
'DtmE9D2CSB4L5D6A15mraeEjrGMm6auWVzgaD8hK2tZM'
)
{% if is_incremental() %}
AND ingested_at >= getdate() - INTERVAL '2 days'
{% endif %}

View File

@ -0,0 +1,29 @@
{{ config(
materialized = 'incremental',
unique_key = "CONCAT_WS('-', block_id, tx_id, index)",
incremental_strategy = 'delete+insert',
cluster_by = ['block_timestamp::DATE'],
) }}
SELECT
block_timestamp,
block_id,
tx_id,
b.index,
b.value :accountIndex :: INTEGER AS account_index,
b.value :mint :: STRING AS mint,
b.value :owner :: STRING AS owner,
b.value :uiTokenAmount :amount :: INTEGER AS amount,
b.value :uiTokenAmount :decimals AS DECIMAL,
b.value :uiTokenAmount :uiAmount AS uiAmount,
b.value :uiTokenAmount :uiAmountString AS uiAmountString,
ingested_at
FROM
{{ ref('dbt_solana__transactions') }}
t,
TABLE(FLATTEN(postTokenBalances)) b
{% if is_incremental() %}
WHERE
ingested_at >= getdate() - INTERVAL '2 days'
{% endif %}

View File

@ -0,0 +1,29 @@
{{ config(
materialized = 'incremental',
unique_key = "CONCAT_WS('-', block_id, tx_id, index)",
incremental_strategy = 'delete+insert',
cluster_by = ['block_timestamp::DATE'],
) }}
SELECT
block_timestamp,
block_id,
tx_id,
b.index,
b.value :accountIndex :: INTEGER AS account_index,
b.value :mint :: STRING AS mint,
b.value :owner :: STRING AS owner,
b.value :uiTokenAmount :amount :: INTEGER AS amount,
b.value :uiTokenAmount :decimals AS DECIMAL,
b.value :uiTokenAmount :uiAmount AS uiAmount,
b.value :uiTokenAmount :uiAmountString AS uiAmountString,
ingested_at
FROM
{{ ref('dbt_solana__transactions') }}
t,
TABLE(FLATTEN(preTokenBalances)) b
{% if is_incremental() %}
WHERE
ingested_at >= getdate() - INTERVAL '2 days'
{% endif %}