This commit is contained in:
Austin 2025-03-03 10:26:42 -05:00 committed by GitHub
parent 2a4846b972
commit ae2a23d66c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
22 changed files with 1329 additions and 0 deletions

View File

@ -0,0 +1,26 @@
{{ config(
materialized = 'view',
tags = ['bronze_labels']
) }}
SELECT
system_created_at,
insert_date,
blockchain,
address,
creator,
label_type,
label_subtype,
address_name,
project_name,
_is_deleted,
modified_timestamp,
labels_combined_id
FROM
{{ source(
'crosschain_silver',
'labels_combined'
) }}
WHERE
blockchain = 'flow_evm'
AND address LIKE '0x%'

View File

@ -0,0 +1,33 @@
{{ config(
materialized = 'incremental',
unique_key = ['address', 'blockchain'],
incremental_strategy = 'merge',
merge_exclude_columns = ["inserted_timestamp"],
cluster_by = 'modified_timestamp::DATE',
post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION ON EQUALITY(address, label_type, label_subtype, address_name, label), SUBSTRING(address, label_type, label_subtype, address_name, label); DELETE FROM {{ this }} WHERE address in (SELECT address FROM {{ ref('silver__labels') }} WHERE _is_deleted = TRUE);",
tags = ['gold_labels']
) }}
SELECT
blockchain,
creator,
address,
address_name,
label_type,
label_subtype,
project_name AS label,
{{ dbt_utils.generate_surrogate_key(['labels_id']) }} AS dim_labels_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp
FROM
{{ ref('silver_evm__labels') }} s
{% if is_incremental() %}
WHERE
s.modified_timestamp > (
SELECT
COALESCE(MAX(modified_timestamp), '1970-01-01' :: TIMESTAMP) AS modified_timestamp
FROM
{{ this }}
)
{% endif %}

View File

@ -0,0 +1,26 @@
version: 2
models:
- name: core_evm__dim_labels
description: '{{ doc("evm_table_dim_labels") }}'
columns:
- name: BLOCKCHAIN
description: '{{ doc("evm_label_blockchain") }}'
- name: CREATOR
description: '{{ doc("evm_label_creator") }}'
- name: ADDRESS
description: '{{ doc("evm_label_address") }}'
- name: ADDRESS_NAME
description: '{{ doc("evm_label_address_name") }}'
- name: LABEL_TYPE
description: '{{ doc("evm_label_type") }}'
- name: LABEL_SUBTYPE
description: '{{ doc("evm_label_subtype") }}'
- name: LABEL
description: '{{ doc("evm_label") }}'
- name: DIM_LABELS_ID
description: '{{ doc("evm_pk") }}'
- name: INSERTED_TIMESTAMP
description: '{{ doc("evm_inserted_timestamp") }}'
- name: MODIFIED_TIMESTAMP
description: '{{ doc("evm_modified_timestamp") }}'

View File

@ -0,0 +1,470 @@
-- depends_on: {{ ref('scores__target_days') }}
{% set blockchain = 'flow_evm' %}
{% set full_reload_mode = var('SCORES_FULL_RELOAD_MODE', false) %}
{{ config (
materialized = "incremental",
unique_key = "block_date",
incremental_strategy = "delete+insert",
cluster_by = "block_date",
full_refresh = false,
tags = ['scores']
) }}
{% if is_incremental() %}
{% set max_block_date_query %}
WITH target_days AS (
SELECT block_date
FROM {{ ref('scores__target_days') }}
{% if not full_reload_mode %}
WHERE block_date > dateadd('day', -120, sysdate())
{% endif %}
),
processed_days AS (
SELECT DISTINCT block_date
FROM {{ this }}
{% if not full_reload_mode %}
WHERE block_date > dateadd('day', -120, sysdate())
{% endif %}
),
unprocessed_days AS (
SELECT block_date
FROM target_days
EXCEPT
SELECT block_date
FROM processed_days
)
SELECT block_date
FROM unprocessed_days
{% endset %}
{% set results = run_query(max_block_date_query) %}
{% if execute %}
{% set block_dates = results.columns[0].values() %}
{% if block_dates|length > 0 %}
{{ log("==========================================", info=True) }}
{{ log("Loading action data for blockchain: " ~ blockchain, info=True) }}
{{ log("For block dates: " ~ block_dates|join(', '), info=True) }}
{{ log("==========================================", info=True) }}
{% else %}
{{ log("==========================================", info=True) }}
{{ log("No new action data to process for blockchain: " ~ blockchain, info=True) }}
{{ log("==========================================", info=True) }}
{% endif %}
{% else %}
{% set block_dates = [] %}
{% endif %}
{% endif %}
{% if (is_incremental() and block_dates|length > 0) or (not is_incremental()) %}
WITH txs AS (
SELECT
block_timestamp :: DATE AS block_date,
from_address,
to_address,
input_data <> '0x'
AND LEFT(input_data, 10) <> '0xa9059cbb' AS complex_tx,
input_data <> '0x' AS to_address_is_contract,
block_timestamp,
tx_hash,
block_number
FROM
{{ ref('core_evm__fact_transactions') }} t
{% if is_incremental() %}
INNER JOIN (
{% if block_dates|length == 1 %}
SELECT CAST('{{ block_dates[0] }}' AS DATE) AS block_date
{% else %}
SELECT CAST('{{ block_dates[0] }}' AS DATE) AS block_date
{% for date in block_dates[1:] %}
UNION ALL SELECT CAST('{{ date }}' AS DATE)
{% endfor %}
{% endif %}
) b ON t.block_timestamp::date = b.block_date
{% endif %}
WHERE
tx_succeeded
{% if is_incremental() %}
AND 1=1
{% else %}
AND block_timestamp :: DATE < (SELECT MAX(block_timestamp)::DATE FROM {{ ref('core_evm__fact_transactions') }})
{% endif %}
),
raw_logs AS (
SELECT
block_timestamp :: DATE AS block_date,
origin_from_address,
origin_to_address,
event_index,
contract_address,
topics[0] :: STRING AS event_sig,
topics,
data,
block_timestamp,
tx_hash,
block_number
FROM
{{ ref('core_evm__fact_event_logs') }} l
JOIN txs USING (block_number, tx_hash)
{% if is_incremental() %}
INNER JOIN (
{% if block_dates|length == 1 %}
SELECT CAST('{{ block_dates[0] }}' AS DATE) AS block_date
{% else %}
SELECT CAST('{{ block_dates[0] }}' AS DATE) AS block_date
{% for date in block_dates[1:] %}
UNION ALL SELECT CAST('{{ date }}' AS DATE)
{% endfor %}
{% endif %}
) b ON l.block_timestamp::date = b.block_date
{% endif %}
WHERE
{% if is_incremental() %}
1=1
{% else %}
l.block_timestamp :: DATE < (SELECT MAX(block_timestamp)::DATE FROM {{ ref('core_evm__fact_transactions') }})
{% endif %}
),
decoded_event_logs AS (
SELECT
event_index,
contract_address,
event_name,
block_timestamp,
tx_hash,
block_number
FROM
{{ ref('core_evm__ez_decoded_event_logs') }} dl
JOIN txs USING (block_number, tx_hash)
{% if is_incremental() %}
INNER JOIN (
{% if block_dates|length == 1 %}
SELECT CAST('{{ block_dates[0] }}' AS DATE) AS block_date
{% else %}
SELECT CAST('{{ block_dates[0] }}' AS DATE) AS block_date
{% for date in block_dates[1:] %}
UNION ALL SELECT CAST('{{ date }}' AS DATE)
{% endfor %}
{% endif %}
) b ON dl.block_timestamp::date = b.block_date
{% endif %}
WHERE
{% if is_incremental() %}
1=1
{% else %}
dl.block_timestamp :: DATE < (SELECT MAX(block_timestamp)::DATE FROM {{ ref('core_evm__fact_transactions') }})
{% endif %}
),
native_transfers AS (
SELECT
from_address,
to_address,
value,
block_timestamp,
tx_hash,
block_number,
trace_index
FROM
{{ ref('core_evm__fact_traces') }} tr
JOIN txs USING (block_number, tx_hash)
{% if is_incremental() %}
INNER JOIN (
{% if block_dates|length == 1 %}
SELECT CAST('{{ block_dates[0] }}' AS DATE) AS block_date
{% else %}
SELECT CAST('{{ block_dates[0] }}' AS DATE) AS block_date
{% for date in block_dates[1:] %}
UNION ALL SELECT CAST('{{ date }}' AS DATE)
{% endfor %}
{% endif %}
) b ON tr.block_timestamp::date = b.block_date
{% endif %}
WHERE
{% if is_incremental() %}
1=1
{% else %}
tr.block_timestamp :: DATE < (SELECT MAX(block_timestamp)::DATE FROM {{ ref('core_evm__fact_transactions') }})
{% endif %}
AND value > 0
AND trace_succeeded
),
event_names AS (
SELECT
block_date,
origin_from_address,
origin_to_address,
contract_address,
event_sig,
COALESCE(d.event_name, e.event_name) AS event_name,
event_index,
topics,
data,
block_timestamp,
tx_hash,
block_number
FROM
raw_logs
LEFT JOIN decoded_event_logs d USING (block_number, tx_hash, event_index)
LEFT JOIN {{ ref('scores__event_sigs') }} e USING (event_sig)
),
all_transfers AS (
SELECT
block_date,
origin_from_address,
origin_to_address,
contract_address,
event_index,
block_timestamp,
block_number,
tx_hash,
CASE
WHEN topics[0] :: STRING = '0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef'
AND data = '0x'
AND topics[3] IS NOT NULL THEN 'erc721_transfer'
WHEN topics[0] :: STRING = '0xc3d58168c5ae7397731d063d5bbf3d657854427343f4c083240f7aacaa2d0f62' THEN 'erc1155_transfer'
WHEN topics[0] :: STRING = '0x4a39dc06d4c0dbc64b70af90fd698a233a518aa5d07e595d983b8c0526c8f7fb' THEN 'erc1155_transfer_batch'
WHEN topics[0] :: STRING = '0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef'
AND SUBSTR(data, 3, 64) IS NOT NULL
AND SUBSTR(topics[1], 27, 40) IS NOT NULL
AND SUBSTR(topics[2], 27, 40) IS NOT NULL THEN 'erc20_transfer'
ELSE NULL
END AS transfer_type,
IFF(
transfer_type = 'erc20_transfer',
TRY_TO_NUMBER(utils.udf_hex_to_int(SUBSTR(data, 3, 64))) / POW(10, COALESCE(c.decimals, 18)),
NULL
) AS value,
CASE
WHEN transfer_type IN ('erc20_transfer', 'erc721_transfer') THEN CONCAT('0x', SUBSTR(topics[1], 27, 40)) :: STRING
WHEN transfer_type IN ('erc1155_transfer', 'erc1155_transfer_batch') THEN CONCAT('0x', SUBSTR(topics[2] :: STRING, 27, 40))
ELSE NULL
END AS token_from_address,
CASE
WHEN transfer_type IN ('erc20_transfer', 'erc721_transfer') THEN CONCAT('0x', SUBSTR(topics[2], 27, 40)) :: STRING
WHEN transfer_type IN ('erc1155_transfer', 'erc1155_transfer_batch') THEN CONCAT('0x', SUBSTR(topics[3] :: STRING, 27, 40))
ELSE NULL
END AS token_to_address,
token_from_address = '0x0000000000000000000000000000000000000000' AS is_mint
FROM
event_names e
LEFT JOIN {{ ref('core_evm__dim_contracts') }} c ON e.contract_address = c.address
WHERE
transfer_type IS NOT NULL
UNION ALL
SELECT
block_timestamp :: DATE AS block_date,
txs.from_address AS origin_from_address,
txs.to_address AS origin_to_address,
NULL AS contract_address,
trace_index AS event_index,
block_timestamp,
block_number,
tx_hash,
'native_transfer' AS transfer_type,
value,
from_address AS token_from_address,
to_address AS token_to_address,
FALSE AS is_mint
FROM
native_transfers
LEFT JOIN txs USING (block_number, tx_hash)
),
labeled_transfers AS (
SELECT
t.*,
l.label_type,
CASE
WHEN is_mint AND transfer_type = 'erc721_transfer' THEN 'n_nft_mint'
WHEN is_mint AND transfer_type = 'erc1155_transfer' THEN 'n_nft_mint'
WHEN is_mint AND transfer_type = 'erc1155_transfer_batch' THEN 'n_nft_mint'
WHEN l.label_type = 'bridge' and l.label_subtype <> 'token_contract' THEN 'n_bridge_in'
WHEN l.label_type = 'cex' THEN 'n_cex_withdrawals'
ELSE NULL
END AS label_metric_name,
metric_rank
FROM
all_transfers t
LEFT JOIN {{ ref('core_evm__dim_labels') }} l ON t.token_from_address = l.address
LEFT JOIN {{ ref('scores__scoring_activity_categories') }} a ON a.metric = label_metric_name
),
eligible_events AS (
SELECT
block_date,
origin_from_address,
origin_to_address,
contract_address,
e.event_sig,
e.event_name,
event_index,
block_timestamp,
tx_hash,
block_number,
l.label_type,
s.metric AS sig_metric_name,
n.metric AS name_metric_name,
CASE
WHEN l.label_type = 'bridge' THEN 'n_bridge_in'
WHEN l.label_type = 'cex' THEN 'n_cex_withdrawals'
WHEN l.label_type = 'dex' THEN 'n_swap_tx'
WHEN l.label_type = 'defi' THEN 'n_other_defi'
ELSE NULL
END AS label_metric_name,
COALESCE(sig_metric_name, label_metric_name, name_metric_name) AS metric_name_0,
IFF(
wrapped_asset_address IS NOT NULL
AND e.event_sig = '0xe1fffcc4923d04b559f4d29a8bfc6cda04eb5b0d3c460751c2402c5c5cc9109c',
'n_swap_tx',
metric_name_0
) AS metric_name,
metric_rank
FROM
event_names e
LEFT JOIN {{ ref('core_evm__dim_labels') }} l ON contract_address = l.address
LEFT JOIN {{ ref('scores__known_event_sigs') }} s ON s.event_sig = e.event_sig
LEFT JOIN {{ ref('scores__known_event_names') }} n ON e.event_name ILIKE '%' || n.event_name || '%'
LEFT JOIN {{ ref('scores__wrapped_assets') }} w ON e.contract_address = w.wrapped_asset_address AND w.blockchain = '{{ blockchain }}'
LEFT JOIN {{ ref('scores__scoring_activity_categories') }} a ON a.metric = metric_name
WHERE
e.event_sig NOT IN (
'0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef', -- transfers
'0x8c5be1e5ebec7d5bd14f71427d1e84f3dd0314c0f7b2291e5b200ac8c7c3b925' -- approvals
)
),
prioritized_eligible_events AS (
SELECT
block_date,
origin_from_address,
origin_to_address,
contract_address,
event_sig,
event_name,
event_index,
block_timestamp,
tx_hash,
block_number,
label_type,
sig_metric_name,
name_metric_name,
label_metric_name,
metric_name_0,
metric_name,
metric_rank
FROM
eligible_events
QUALIFY ROW_NUMBER() OVER (PARTITION BY tx_hash, event_index ORDER BY metric_rank ASC) = 1
),
create_action_details AS (
SELECT
block_date,
origin_from_address,
origin_to_address,
event_index AS index,
block_timestamp,
block_number,
tx_hash,
transfer_type AS action_type,
OBJECT_CONSTRUCT(
'value', value,
'token_from_address', token_from_address,
'token_to_address', token_to_address,
'label_type', label_type,
'contract_address', contract_address
) AS action_details,
label_metric_name AS metric_name,
metric_rank
FROM
labeled_transfers
UNION ALL
SELECT
block_date,
origin_from_address,
origin_to_address,
event_index AS index,
block_timestamp,
block_number,
tx_hash,
'contract_interaction' AS action_type,
OBJECT_CONSTRUCT(
'event_name', event_name,
'event_sig', event_sig,
'label_type', label_type,
'contract_address', contract_address
) AS action_details,
metric_name,
metric_rank
FROM
prioritized_eligible_events
UNION ALL
SELECT
block_date,
from_address AS origin_from_address,
to_address AS origin_to_address,
-1 AS index,
block_timestamp,
block_number,
tx_hash,
'tx' AS action_type,
OBJECT_CONSTRUCT(
'complex_tx', complex_tx,
'to_address_is_contract', to_address_is_contract,
'label_type', NULL
) AS action_details,
NULL AS metric_name,
NULL AS metric_rank
FROM
txs
)
SELECT
block_date,
origin_from_address,
origin_to_address,
index,
block_timestamp,
block_number,
tx_hash,
action_type,
action_details,
metric_name,
metric_rank,
'{{ blockchain }}' AS blockchain,
{{ dbt_utils.generate_surrogate_key(['tx_hash', 'index', 'action_type', "'" ~ blockchain ~ "'"]) }} AS actions_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp,
'{{ invocation_id }}' AS _invocation_id
FROM
create_action_details
{% else %}
-- Return an empty result set with the correct schema
SELECT
CAST(NULL AS DATE) AS block_date,
CAST(NULL AS STRING) AS origin_from_address,
CAST(NULL AS STRING) AS origin_to_address,
CAST(NULL AS INTEGER) AS index,
CAST(NULL AS TIMESTAMP) AS block_timestamp,
CAST(NULL AS INTEGER) AS block_number,
CAST(NULL AS STRING) AS tx_hash,
CAST(NULL AS STRING) AS action_type,
CAST(NULL AS OBJECT) AS action_details,
CAST(NULL AS STRING) AS metric_name,
CAST(NULL AS INTEGER) AS metric_rank,
CAST(NULL AS STRING) AS blockchain,
CAST(NULL AS STRING) AS actions_id,
CAST(NULL AS TIMESTAMP) AS inserted_timestamp,
CAST(NULL AS TIMESTAMP) AS modified_timestamp,
CAST(NULL AS STRING) AS _invocation_id
WHERE 1 = 0
{% endif %}

View File

@ -0,0 +1,171 @@
-- depends_on: {{ ref('scores__actions_daily') }}
-- depends_on: {{ ref('core_evm__dim_labels') }}
-- depends_on: {{ ref('core_evm__dim_contracts') }}
{% set blockchain = 'flow_evm' %}
{% set full_reload_mode = var('SCORES_FULL_RELOAD_MODE', false) %}
{% set score_date_limit = var('SCORES_DATE_LIMIT', 30) %}
{{ config (
materialized = "incremental",
unique_key = "score_date",
incremental_strategy = "delete+insert",
cluster_by = "score_date",
version = 1,
full_refresh = false,
tags = ['scores']
) }}
{% set score_dates_query %}
SELECT block_date as score_date
FROM {{ ref('scores__target_days') }}
{% if not full_reload_mode %}
WHERE score_date > dateadd('day', -120, sysdate())
{% endif %}
{% if is_incremental() %}
EXCEPT
SELECT DISTINCT score_date
FROM {{ this }}
{% if not full_reload_mode %}
WHERE score_date > dateadd('day', -120, sysdate())
{% endif %}
{% endif %}
ORDER BY score_date ASC
{% if score_date_limit %}
LIMIT {{ score_date_limit }}
{% endif %}
{% endset %}
{% set score_dates = run_query(score_dates_query) %}
{% if execute %}
{% set score_dates_list = score_dates.columns[0].values() %}
{% else %}
{% set score_dates_list = [] %}
{% endif %}
{% if execute %}
{% if score_dates_list|length > 0 %}
{{ log("==========================================", info=True) }}
{% if score_dates_list|length == 1 %}
{{ log("Calculating action totals for blockchain: " ~ blockchain, info=True) }}
{{ log("For score date: " ~ score_dates_list[0], info=True) }}
{% else %}
{{ log("Calculating action totals for blockchain: " ~ blockchain, info=True) }}
{{ log("For score dates: " ~ score_dates_list|join(', '), info=True) }}
{% endif %}
{{ log("==========================================", info=True) }}
{% else %}
{{ log("==========================================", info=True) }}
{{ log("No action totals to calculate for blockchain: " ~ blockchain, info=True) }}
{{ log("==========================================", info=True) }}
{% endif %}
{% endif %}
{% if score_dates_list|length > 0 %}
WITH combined_results AS (
{% for score_date in score_dates_list %}
SELECT
user_address,
n_complex_txn,
n_contracts,
n_days_active,
n_txn,
n_bridge_in,
n_cex_withdrawals,
net_token_accumulate,
n_other_defi,
n_lp_adds,
n_swap_tx,
n_nft_collections,
n_nft_mints,
n_nft_trades,
n_gov_votes,
n_stake_tx,
n_restakes,
n_validators,
CURRENT_TIMESTAMP AS calculation_time,
CAST('{{ score_date }}' AS DATE) AS score_date,
'{{ blockchain }}' AS blockchain,
{{ dbt_utils.generate_surrogate_key(['user_address', "'" ~ blockchain ~ "'", "'" ~ score_date ~ "'"]) }} AS actions_agg_id,
'{{ model.config.version }}' AS score_version,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp,
'{{ invocation_id }}' AS _invocation_id
FROM
(
SELECT
user_address,
SUM(n_bridge_in) AS n_bridge_in,
SUM(n_cex_withdrawals) AS n_cex_withdrawals,
SUM(n_other_defi) AS n_other_defi,
SUM(n_lp_adds) AS n_lp_adds,
SUM(n_swap_tx) AS n_swap_tx,
SUM(n_nft_mints) AS n_nft_mints,
SUM(n_nft_trades) AS n_nft_trades,
SUM(n_gov_votes) AS n_gov_votes,
SUM(n_stake_tx) AS n_stake_tx,
SUM(n_restakes) AS n_restakes,
SUM(net_token_accumulate) AS net_token_accumulate,
SUM(n_txn) AS n_txn,
SUM(IFF(active_day, 1, 0)) AS n_days_active,
SUM(complex_tx) AS n_complex_txn,
ARRAY_SIZE(ARRAY_COMPACT(ARRAY_DISTINCT(ARRAY_UNION_AGG(validator_addresses)))) AS n_validators,
ARRAY_SIZE(ARRAY_COMPACT(ARRAY_DISTINCT(ARRAY_UNION_AGG(contract_addresses)))) AS n_contracts,
ARRAY_SIZE(ARRAY_COMPACT(ARRAY_DISTINCT(ARRAY_UNION_AGG(nft_collection_addresses)))) AS n_nft_collections
FROM
{{ ref('scores__actions_daily') }} a
LEFT JOIN {{ ref('core_evm__dim_labels') }} b
ON a.user_address = b.address
LEFT JOIN {{ ref('core_evm__dim_contracts') }} c
ON a.user_address = c.address
WHERE
b.address IS NULL
AND c.address IS NULL
AND a.block_date BETWEEN DATEADD('day', -90, '{{ score_date }}') AND '{{ score_date }}' :: DATE
GROUP BY
user_address
)
{% if not loop.last %}
UNION ALL
{% endif %}
{% endfor %}
)
SELECT * FROM combined_results
{% else %}
-- Return an empty result set with the correct schema
SELECT
CAST(NULL AS STRING) AS user_address,
CAST(NULL AS INTEGER) AS n_complex_txn,
CAST(NULL AS INTEGER) AS n_contracts,
CAST(NULL AS INTEGER) AS n_days_active,
CAST(NULL AS INTEGER) AS n_txn,
CAST(NULL AS INTEGER) AS n_bridge_in,
CAST(NULL AS INTEGER) AS n_cex_withdrawals,
CAST(NULL AS FLOAT) AS net_token_accumulate,
CAST(NULL AS INTEGER) AS n_other_defi,
CAST(NULL AS INTEGER) AS n_lp_adds,
CAST(NULL AS INTEGER) AS n_swap_tx,
CAST(NULL AS INTEGER) AS n_nft_collections,
CAST(NULL AS INTEGER) AS n_nft_mints,
CAST(NULL AS INTEGER) AS n_nft_trades,
CAST(NULL AS INTEGER) AS n_gov_votes,
CAST(NULL AS INTEGER) AS n_stake_tx,
CAST(NULL AS INTEGER) AS n_restakes,
CAST(NULL AS INTEGER) AS n_validators,
CAST(NULL AS TIMESTAMP) AS calculation_time,
CAST(NULL AS DATE) AS score_date,
CAST(NULL AS STRING) AS blockchain,
CAST(NULL AS STRING) AS actions_agg_id,
CAST(NULL AS STRING) AS score_version,
CAST(NULL AS TIMESTAMP) AS inserted_timestamp,
CAST(NULL AS TIMESTAMP) AS modified_timestamp,
CAST(NULL AS STRING) AS _invocation_id
WHERE 1 = 0
{% endif %}

View File

@ -0,0 +1,405 @@
{% set blockchain = 'flow_evm' %}
{% set full_reload_mode = var('SCORES_FULL_RELOAD_MODE', false) %}
{{ config (
materialized = "incremental",
unique_key = "block_date",
incremental_strategy = "delete+insert",
cluster_by = "block_date",
version = 1,
tags = ['scores']
) }}
{% if is_incremental() %}
{% set max_modified_timestamp_query %}
SELECT MAX(modified_timestamp) as max_modified_timestamp
FROM {{ this }}
{% endset %}
{% set results = run_query(max_modified_timestamp_query) %}
{% if execute %}
{% set max_modified_timestamp = results.columns[0].values()[0] %}
{% else %}
{% set max_modified_timestamp = none %}
{% endif %}
{% endif %}
{% if execute %}
{{ log("==========================================", info=True) }}
{% if is_incremental() %}
{% set new_data_check_query %}
SELECT COUNT(1) as cnt
FROM {{ ref('scores__actions') }}
WHERE modified_timestamp > '{{ max_modified_timestamp }}'
limit 1
{% endset %}
{% set new_data_results = run_query(new_data_check_query) %}
{% set new_data_count = new_data_results.columns[0].values()[0] %}
{% if new_data_count > 0 %}
{{ log("Processing action data for blockchain: " ~ blockchain ~ " modified after: " ~ max_modified_timestamp, info=True) }}
{% else %}
{{ log("No new action data to aggregate daily for blockchain: " ~ blockchain, info=True) }}
{% endif %}
{% else %}
{{ log("Aggregating daily action data for blockchain: " ~ blockchain, info=True) }}
{% endif %}
{{ log("==========================================", info=True) }}
{% endif %}
WITH actions AS (
SELECT
block_date,
origin_from_address,
origin_to_address,
INDEX,
block_timestamp,
block_number,
tx_hash,
action_type,
action_details,
metric_name,
metric_rank
FROM
{{ ref('scores__actions') }}
WHERE
1=1
{% if is_incremental() %}
AND modified_timestamp > '{{ max_modified_timestamp }}'
{% endif %}
),
priorititized_txs AS (
SELECT
block_date,
origin_from_address,
origin_to_address,
INDEX,
block_timestamp,
block_number,
tx_hash,
action_type,
action_details,
metric_name
FROM
actions
WHERE
action_type <> 'tx' qualify ROW_NUMBER() over (
PARTITION BY tx_hash
ORDER BY
metric_rank ASC
) = 1
),
simple_aggs AS (
SELECT
block_date,
CASE
WHEN action_type = 'contract_interaction' THEN origin_from_address
ELSE action_details :token_from_address :: STRING
END AS user_address,
SUM(IFF(metric_name = 'n_bridge_in', 1, 0)) AS n_bridge_in,
SUM(IFF(metric_name = 'n_cex_withdrawals', 1, 0)) AS n_cex_withdrawals,
SUM(IFF(metric_name = 'n_other_defi', 1, 0)) AS n_other_defi,
SUM(IFF(metric_name = 'n_lp_adds', 1, 0)) AS n_lp_adds,
SUM(IFF(metric_name = 'n_swap_tx', 1, 0)) AS n_swap_tx,
SUM(IFF(metric_name = 'n_nft_mints', 1, 0)) AS n_nft_mints,
SUM(IFF(metric_name = 'n_nft_trades', 1, 0)) AS n_nft_trades,
SUM(IFF(metric_name = 'n_gov_votes', 1, 0)) AS n_gov_votes,
SUM(IFF(metric_name = 'n_stake_tx', 1, 0)) AS n_stake_tx,
SUM(IFF(metric_name = 'n_restakes', 1, 0)) AS n_restakes
FROM
priorititized_txs
GROUP BY
ALL
),
xfer_in AS (
SELECT
block_date,
action_details :token_to_address :: STRING AS user_address,
COUNT(1) AS n_xfer_in
FROM
actions
WHERE
action_type IN (
'erc20_transfer',
'native_transfer'
)
GROUP BY
ALL
),
xfer_out AS (
SELECT
block_date,
action_details :token_from_address :: STRING AS user_address,
COUNT(1) AS n_xfer_out
FROM
actions
WHERE
action_type IN (
'erc20_transfer',
'native_transfer'
)
GROUP BY
ALL
),
net_token_accumulate AS (
SELECT
COALESCE(
A.block_date,
b.block_date
) AS block_date,
COALESCE(
A.user_address,
b.user_address
) AS user_address,
COALESCE(n_xfer_in / (ifnull(n_xfer_in,0) + ifnull(n_xfer_out,0)),0) AS net_token_accumulate
FROM
xfer_in A full
OUTER JOIN xfer_out b
ON A.user_address = b.user_address
AND A.block_date = b.block_date
),
nft_collections AS (
SELECT
block_date,
user_address,
ARRAY_AGG(nft_address) AS nft_collection_addresses
FROM
(
SELECT
DISTINCT block_date,
action_details :token_from_address :: STRING AS user_address,
action_details: contract_address :: STRING AS nft_address
FROM
actions
WHERE
action_type IN (
'erc721_transfer',
'erc1155_transfer',
'erc1155_transfer_batch'
)
UNION
SELECT
DISTINCT block_date,
action_details :token_to_address :: STRING AS user_address,
action_details: contract_address :: STRING AS nft_address
FROM
actions
WHERE
action_type IN (
'erc721_transfer',
'erc1155_transfer',
'erc1155_transfer_batch'
)
qualify row_number() over (partition by user_address, block_date order by block_date asc) <= 1000
)
GROUP BY
ALL
),
staking_validators AS (
SELECT
block_date,
origin_from_address AS user_address,
ARRAY_AGG(
action_details :contract_address :: STRING
) AS validator_addresses
FROM
(
select *
from actions
where action_type = 'contract_interaction'
and metric_name = 'n_stake_tx'
qualify row_number() over (partition by origin_from_address, block_date order by block_timestamp asc) <= 1000
)
GROUP BY
ALL
),
complex_txns AS (
SELECT
block_date,
user_address,
SUM(complex_tx) AS complex_tx
FROM
(
SELECT
block_date,
origin_from_address AS user_address,
IFF(
action_details :complex_tx :: BOOLEAN,
1,
0
) AS complex_tx,
tx_hash
FROM
actions
WHERE
action_type = 'tx'
UNION ALL
SELECT
block_date,
COALESCE(
action_details: token_to_address :: STRING,
origin_from_address :: STRING
) AS user_address,
1 AS complex_tx,
tx_hash
FROM
actions
WHERE
metric_name = 'n_bridge_in'
)
GROUP BY
ALL
),
contract_interactions AS (
SELECT
block_date,
origin_from_address AS user_address,
ARRAY_AGG(
origin_to_address
) AS contract_addresses
FROM
(
select *
from actions
where action_type = 'tx'
and action_details: to_address_is_contract :: BOOLEAN
qualify row_number() over (partition by origin_from_address, block_date order by block_timestamp asc) <= 1000
)
GROUP BY
ALL
),
active_day AS (
SELECT
block_date,
user_address,
MAX(active_day) = 1 AS active_day,
SUM(active_day) AS n_txn
FROM
(
SELECT
block_date,
origin_from_address AS user_address,
1 AS active_day
FROM
actions
WHERE
action_type = 'tx'
UNION ALL
SELECT
block_date,
COALESCE(
action_details: token_to_address :: STRING,
origin_from_address :: STRING
) AS user_address,
1 AS active_day
FROM
actions
WHERE
metric_name = 'n_bridge_in'
UNION ALL
SELECT
block_date,
COALESCE(
action_details: token_to_address :: STRING,
origin_from_address :: STRING
) AS user_address,
1 AS active_day
FROM
actions
WHERE
metric_name = 'n_cex_withdrawals'
)
GROUP BY
ALL
)
SELECT
ad.block_date,
ad.user_address,
IFNULL(
n_bridge_in,
0
) AS n_bridge_in,
IFNULL(
n_cex_withdrawals,
0
) AS n_cex_withdrawals,
IFNULL(
n_other_defi,
0
) AS n_other_defi,
IFNULL(
n_lp_adds,
0
) AS n_lp_adds,
IFNULL(
n_swap_tx,
0
) AS n_swap_tx,
IFNULL(
n_nft_mints,
0
) AS n_nft_mints,
IFNULL(
n_nft_trades,
0
) AS n_nft_trades,
IFNULL(
n_gov_votes,
0
) AS n_gov_votes,
IFNULL(
n_stake_tx,
0
) AS n_stake_tx,
IFNULL(
n_restakes,
0
) AS n_restakes,
IFNULL(
net_token_accumulate,
0
) AS net_token_accumulate,
IFNULL(nft_collection_addresses, ARRAY_CONSTRUCT()) AS nft_collection_addresses,
IFNULL(validator_addresses, ARRAY_CONSTRUCT()) AS validator_addresses,
IFNULL(
complex_tx,
0
) AS complex_tx,
IFNULL(contract_addresses, ARRAY_CONSTRUCT()) AS contract_addresses,
IFNULL(
active_day,
FALSE
) AS active_day,
IFNULL(
n_txn,
0
) AS n_txn,
'{{ blockchain }}' AS blockchain,
{{ dbt_utils.generate_surrogate_key(
['ad.block_date','ad.user_address', "'" ~ blockchain ~ "'"]
) }} AS actions_daily_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp,
'{{ invocation_id }}' AS _invocation_id
FROM
active_day ad
LEFT JOIN simple_aggs sa
ON ad.user_address = sa.user_address
AND ad.block_date = sa.block_date
LEFT JOIN net_token_accumulate nta
ON ad.user_address = nta.user_address
AND ad.block_date = nta.block_date
LEFT JOIN nft_collections nc
ON ad.user_address = nc.user_address
AND ad.block_date = nc.block_date
LEFT JOIN staking_validators sv
ON ad.user_address = sv.user_address
AND ad.block_date = sv.block_date
LEFT JOIN complex_txns ct
ON ad.user_address = ct.user_address
AND ad.block_date = ct.block_date
LEFT JOIN contract_interactions ci
ON ad.user_address = ci.user_address
AND ad.block_date = ci.block_date

View File

@ -0,0 +1,6 @@
{{ config (
materialized = "view",
tags = ['scores']
) }}
select * from {{ source('data_science_silver', 'dates') }}

View File

@ -0,0 +1,5 @@
version: 2
models:
- name: scores__dates
description: "Dates table for scores"

View File

@ -0,0 +1,6 @@
{{ config (
materialized = "view",
tags = ['scores']
) }}
select * from {{ source('data_science_silver', 'evm_event_sigs') }}

View File

@ -0,0 +1,5 @@
version: 2
models:
- name: scores__event_sigs
description: "This table contains a de-duplicated list of event signatures for EVM chains from the complete_event_abis model."

View File

@ -0,0 +1,6 @@
{{ config (
materialized = "view",
tags = ['scores']
) }}
select * from {{ source('data_science_silver', 'evm_known_event_names') }}

View File

@ -0,0 +1,5 @@
version: 2
models:
- name: scores__known_event_names
description: "This table contains a de-duplicated list of event names mapped to their scoring category. Sourced from curated models."

View File

@ -0,0 +1,6 @@
{{ config (
materialized = "view",
tags = ['scores']
) }}
select * from {{ source('data_science_silver', 'evm_known_event_sigs') }}

View File

@ -0,0 +1,5 @@
version: 2
models:
- name: scores__known_event_sigs
description: "This table contains a de-duplicated list of event signatures mapped to their scoring category. Sourced from curated models."

View File

@ -0,0 +1,6 @@
{{ config (
materialized = "view",
tags = ['scores']
) }}
select * from {{ source('data_science_silver', 'scoring_activity_categories') }}

View File

@ -0,0 +1,5 @@
version: 2
models:
- name: scores__scoring_activity_categories
description: "This table contains the scoring activity categories for the scores package."

View File

@ -0,0 +1,77 @@
{% set full_reload_mode = var('SCORES_FULL_RELOAD_MODE', false) %}
{{ config (
materialized = "view",
tags = ['scores']
) }}
{% if execute %}
{{ log("==========================================", info=True) }}
{{ log("Generating date spine for blockchain: " ~ blockchain, info=True) }}
{{ log("Backfill mode: " ~ full_reload_mode, info=True) }}
{{ log("==========================================", info=True) }}
{% endif %}
WITH chain_dates AS (
SELECT
block_timestamp :: DATE AS block_date,
count(distinct date_trunc('hour', block_timestamp)) AS n_hours
FROM {{ ref('core_evm__fact_blocks') }}
{% if not full_reload_mode %}
WHERE block_timestamp :: DATE > DATEADD('day', -120, SYSDATE() :: DATE)
{% endif %}
GROUP BY ALL
),
date_spine AS (
{% if full_reload_mode %}
SELECT
date_day
FROM
{{ ref('scores__dates') }}
WHERE
day_of_week = 1
AND date_day < DATEADD('day', -90, SYSDATE() :: DATE) -- every sunday, excluding last 90 days
AND date_day <= '2024-07-01'
UNION
{% endif %}
SELECT
date_day
FROM
{{ ref('scores__dates') }}
WHERE
date_day >= '2024-07-01'
and date_day <= (SELECT MAX(block_date) FROM chain_dates where n_hours = 24)
),
day_of_chain AS (
SELECT
block_date,
ROW_NUMBER() over (ORDER BY block_date ASC) AS chain_day
FROM
chain_dates
),
exclude_first_90_days AS (
SELECT
block_date
FROM
day_of_chain
{% if full_reload_mode %}
WHERE chain_day >= 90
{% endif %}
),
eligible_dates AS (
SELECT
block_date
FROM
exclude_first_90_days
JOIN date_spine ON date_day = block_date
)
SELECT
block_date
FROM
eligible_dates
ORDER BY block_date ASC

View File

@ -0,0 +1,5 @@
version: 2
models:
- name: scores__target_days
description: "This table contains the target days for the scores package. It is used to score events for the target days."

View File

@ -0,0 +1,7 @@
{{ config (
materialized = "view",
tags = ['scores']
) }}
select *
from {{ source('data_science_silver', 'evm_wrapped_assets') }}

View File

@ -0,0 +1,5 @@
version: 2
models:
- name: scores__wrapped_assets
description: "This table contains a the wrapped assets for EVM chains. It is needed to correctly score deposits and withdrawals for wrapped assets."

View File

@ -0,0 +1,38 @@
{{ config(
materialized = 'incremental',
unique_key = ['address', 'blockchain'],
incremental_strategy = 'merge',
merge_exclude_columns = ["inserted_timestamp"],
cluster_by = 'modified_timestamp::DATE',
tags = ['silver_labels']
) }}
SELECT
system_created_at,
insert_date,
blockchain,
address,
creator,
label_type,
label_subtype,
address_name,
project_name,
_is_deleted,
{{ dbt_utils.generate_surrogate_key(['labels_combined_id']) }} AS labels_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp,
'{{ invocation_id }}' AS _invocation_id
FROM
{{ ref('bronze_evm__labels') }} b
{% if is_incremental() %}
WHERE
b.modified_timestamp >= (
SELECT
MAX(
modified_timestamp
)
FROM
{{ this }}
)
{% endif %}

View File

@ -174,3 +174,14 @@ sources:
schema: bronze_public
tables:
- name: user_abis
- name: data_science_silver
database: datascience
schema: silver
tables:
- name: evm_wrapped_assets
- name: scoring_activity_categories
- name: evm_known_event_sigs
- name: evm_known_event_names
- name: evm_event_sigs
- name: dates