manual tx load process

This commit is contained in:
Eric Laurello 2023-11-01 17:44:42 -04:00
parent c0eaaa6b90
commit 896abf17ac
11 changed files with 198 additions and 59 deletions

0
data/.gitkeep Normal file
View File

View File

@ -0,0 +1,8 @@
tx_id
4B8D42C16336FF945ED9A8AFB18B8EF7C7C8A0F81D824327578A39445C14B09F
E9041B9294E8DFB8036AE6A25CDA198A61806FE69C9DABF7847082C206187AB8
77785982E84848AEEA544CAE4C0A498653A6173729A71FE3820857FEE4F3EE90
076C22A1776B97CBD95C2E8490EF4D45BADE55778AE7CBADF686F4D9C52712AE
0864580B475CEDA344A1B6EC66A2146AFAC551FC93D517F9770E1187B641D6B7
4CF2FE69C5D7EA6FDA8EAD9687A40CEE5C387310C2FB2759B910D9E2A2A79E37
DA0B8E76A42CF06F682B882C52F0E7B95EFEB97747BDB841CAD02566F4FACA23
1 tx_id
2 4B8D42C16336FF945ED9A8AFB18B8EF7C7C8A0F81D824327578A39445C14B09F
3 E9041B9294E8DFB8036AE6A25CDA198A61806FE69C9DABF7847082C206187AB8
4 77785982E84848AEEA544CAE4C0A498653A6173729A71FE3820857FEE4F3EE90
5 076C22A1776B97CBD95C2E8490EF4D45BADE55778AE7CBADF686F4D9C52712AE
6 0864580B475CEDA344A1B6EC66A2146AFAC551FC93D517F9770E1187B641D6B7
7 4CF2FE69C5D7EA6FDA8EAD9687A40CEE5C387310C2FB2759B910D9E2A2A79E37
8 DA0B8E76A42CF06F682B882C52F0E7B95EFEB97747BDB841CAD02566F4FACA23

View File

@ -14,7 +14,7 @@ profile: "terra"
model-paths: ["models"]
analysis-paths: ["analyses"]
test-paths: ["tests"]
seed-paths: ["seeds"]
seed-paths: ["data"]
macro-paths: ["macros"]
snapshot-paths: ["snapshots"]

View File

@ -8,7 +8,7 @@
FROM
(
SELECT
ethereum.streamline.udf_api(
live.udf_api(
'GET',
'https://terra-rpc.polkachu.com/abci_info',{},{}
) DATA
@ -75,7 +75,7 @@ INSERT INTO
),
results AS (
SELECT
ethereum.streamline.udf_json_rpc_call(
live.udf_json_rpc_call(
'https://terra-rpc.polkachu.com/',{},
calls
) DATA
@ -101,7 +101,7 @@ FROM
{% endset %}
{% do run_query(load_query) %}
{% set wait %}
CALL system$wait(10);
CALL system $ wait(10);
{% endset %}
{% do run_query(wait) %}
{% endmacro %}

View File

@ -40,7 +40,7 @@ INSERT INTO
),
results AS (
SELECT
ethereum.streamline.udf_json_rpc_call(
live.udf_json_rpc_call(
'https://terra-rpc.polkachu.com/',{},
calls
) DATA
@ -68,7 +68,7 @@ FROM
{% endset %}
{% do run_query(load_query) %}
{% set wait %}
CALL system$wait(10);
CALL system $ wait(10);
{% endset %}
{% do run_query(wait) %}
{% endmacro %}

View File

@ -8,67 +8,36 @@ as
$$
snowflake.execute({sqlText: `BEGIN TRANSACTION;`});
try {
snowflake.execute({sqlText: `DROP DATABASE IF EXISTS ${DESTINATION_DB_NAME}`});
snowflake.execute({sqlText: `CREATE DATABASE ${DESTINATION_DB_NAME} CLONE ${SOURCE_DB_NAME}`});
snowflake.execute({sqlText: `DROP SCHEMA ${DESTINATION_DB_NAME}._INTERNAL`}); /* this only needs to be in prod */
snowflake.execute({sqlText: `CREATE OR REPLACE DATABASE ${DESTINATION_DB_NAME} CLONE ${SOURCE_DB_NAME}`});
snowflake.execute({sqlText: `DROP SCHEMA IF EXISTS ${DESTINATION_DB_NAME}._INTERNAL`}); /* this only needs to be in prod */
var existing_schemas = snowflake.execute({sqlText: `SELECT table_schema
FROM ${DESTINATION_DB_NAME}.INFORMATION_SCHEMA.TABLE_PRIVILEGES
WHERE grantor IS NOT NULL
GROUP BY 1;`});
snowflake.execute({sqlText: `GRANT OWNERSHIP ON ALL SCHEMAS IN DATABASE ${DESTINATION_DB_NAME} TO ROLE ${ROLE_NAME} COPY CURRENT GRANTS;`});
snowflake.execute({sqlText: `GRANT OWNERSHIP ON ALL FUNCTIONS IN DATABASE ${DESTINATION_DB_NAME} TO ROLE ${ROLE_NAME} COPY CURRENT GRANTS;`});
snowflake.execute({sqlText: `GRANT OWNERSHIP ON ALL PROCEDURES IN DATABASE ${DESTINATION_DB_NAME} TO ROLE ${ROLE_NAME} COPY CURRENT GRANTS;`});
snowflake.execute({sqlText: `GRANT OWNERSHIP ON ALL VIEWS IN DATABASE ${DESTINATION_DB_NAME} TO ROLE ${ROLE_NAME} COPY CURRENT GRANTS;`});
snowflake.execute({sqlText: `GRANT OWNERSHIP ON ALL STAGES IN DATABASE ${DESTINATION_DB_NAME} TO ROLE ${ROLE_NAME} COPY CURRENT GRANTS;`});
snowflake.execute({sqlText: `GRANT OWNERSHIP ON ALL TABLES IN DATABASE ${DESTINATION_DB_NAME} TO ROLE ${ROLE_NAME} COPY CURRENT GRANTS;`});
snowflake.execute({sqlText: `GRANT OWNERSHIP ON FUTURE FUNCTIONS IN DATABASE ${DESTINATION_DB_NAME} TO ROLE ${ROLE_NAME};`});
snowflake.execute({sqlText: `GRANT OWNERSHIP ON FUTURE PROCEDURES IN DATABASE ${DESTINATION_DB_NAME} TO ROLE ${ROLE_NAME};`});
snowflake.execute({sqlText: `GRANT OWNERSHIP ON FUTURE VIEWS IN DATABASE ${DESTINATION_DB_NAME} TO ROLE ${ROLE_NAME};`});
snowflake.execute({sqlText: `GRANT OWNERSHIP ON FUTURE STAGES IN DATABASE ${DESTINATION_DB_NAME} TO ROLE ${ROLE_NAME};`});
snowflake.execute({sqlText: `GRANT OWNERSHIP ON FUTURE TABLES IN DATABASE ${DESTINATION_DB_NAME} TO ROLE ${ROLE_NAME};`});
while (existing_schemas.next()) {
var schema = existing_schemas.getColumnValue(1)
snowflake.execute({sqlText: `GRANT OWNERSHIP ON SCHEMA ${DESTINATION_DB_NAME}.${schema} TO ROLE ${ROLE_NAME} COPY CURRENT GRANTS;`});
snowflake.execute({sqlText: `GRANT OWNERSHIP ON DATABASE ${DESTINATION_DB_NAME} TO ROLE ${ROLE_NAME} COPY CURRENT GRANTS;`})
var existing_tags = snowflake.execute({sqlText: `SHOW TAGS IN DATABASE ${DESTINATION_DB_NAME};`});
while (existing_tags.next()) {
var schema = existing_tags.getColumnValue(4);
var tag_name = existing_tags.getColumnValue(2)
snowflake.execute({sqlText: `GRANT OWNERSHIP ON TAG ${DESTINATION_DB_NAME}.${schema}.${tag_name} TO ROLE ${ROLE_NAME} COPY CURRENT GRANTS;`});
}
var existing_tables = snowflake.execute({sqlText: `SELECT table_schema, table_name
FROM ${DESTINATION_DB_NAME}.INFORMATION_SCHEMA.TABLE_PRIVILEGES
WHERE grantor IS NOT NULL
GROUP BY 1,2;`});
while (existing_tables.next()) {
var schema = existing_tables.getColumnValue(1)
var table_name = existing_tables.getColumnValue(2)
snowflake.execute({sqlText: `GRANT OWNERSHIP ON TABLE ${DESTINATION_DB_NAME}.${schema}.${table_name} TO ROLE ${ROLE_NAME} COPY CURRENT GRANTS;`});
}
var existing_functions = snowflake.execute({sqlText: `SELECT function_schema, function_name, concat('(',array_to_string(regexp_substr_all(argument_signature, 'VARCHAR|NUMBER|FLOAT|ARRAY|VARIANT|OBJECT|DOUBLE'),','),')') as argument_signature
FROM ${DESTINATION_DB_NAME}.INFORMATION_SCHEMA.FUNCTIONS;`});
while (existing_functions.next()) {
var schema = existing_functions.getColumnValue(1)
var function_name = existing_functions.getColumnValue(2)
var argument_signature = existing_functions.getColumnValue(3)
snowflake.execute({sqlText: `GRANT OWNERSHIP ON FUNCTION ${DESTINATION_DB_NAME}.${schema}.${function_name}${argument_signature} to role ${ROLE_NAME} REVOKE CURRENT GRANTS;`});
}
var existing_procedures = snowflake.execute({sqlText: `SELECT procedure_schema, procedure_name, concat('(',array_to_string(regexp_substr_all(argument_signature, 'VARCHAR|NUMBER|FLOAT|ARRAY|VARIANT|OBJECT|DOUBLE'),','),')') as argument_signature
FROM ${DESTINATION_DB_NAME}.INFORMATION_SCHEMA.PROCEDURES;`});
while (existing_procedures.next()) {
var schema = existing_procedures.getColumnValue(1)
var procedure_name = existing_procedures.getColumnValue(2)
var argument_signature = existing_procedures.getColumnValue(3)
snowflake.execute({sqlText: `GRANT OWNERSHIP ON PROCEDURE ${DESTINATION_DB_NAME}.${schema}.${procedure_name}${argument_signature} to role ${ROLE_NAME} REVOKE CURRENT GRANTS;`});
}
var existing_tasks = snowflake.execute({sqlText: `SHOW TASKS IN DATABASE ${DESTINATION_DB_NAME};`});
while (existing_tasks.next()) {
var schema = existing_tasks.getColumnValue(5)
var task_name = existing_tasks.getColumnValue(2)
snowflake.execute({sqlText: `ALTER TASK ${DESTINATION_DB_NAME}.${schema}.${task_name} SUSPEND;`})
snowflake.execute({sqlText: `GRANT OWNERSHIP ON TASK ${DESTINATION_DB_NAME}.${schema}.${task_name} to role ${ROLE_NAME} REVOKE CURRENT GRANTS;`});
}
snowflake.execute({sqlText: `GRANT OWNERSHIP ON DATABASE ${DESTINATION_DB_NAME} TO ROLE ${ROLE_NAME};`})
snowflake.execute({sqlText: `COMMIT;`});
} catch (err) {
snowflake.execute({sqlText: `ROLLBACK;`});
throw(err);
}
return true
$$

View File

@ -0,0 +1,46 @@
{{ config(
materialized = 'incremental',
full_refresh = false
) }}
WITH calls AS (
SELECT
'https://phoenix-lcd.terra.dev/cosmos/tx/v1beta1/txs/' || tx_id calls,
tx_id
FROM
(
SELECT
tx_id
FROM
{{ ref(
'bronze__rpc_error_blocks_tx_ids'
) }}
{% if is_incremental() %}
EXCEPT
SELECT
tx_id
FROM
{{ this }}
WHERE
DATA :data :tx_response :height IS NOT NULL
{% endif %}
LIMIT
20
)
), results AS (
SELECT
tx_id,
livequery_dev.live.udf_api(
'GET',
calls,{},{}
) DATA
FROM
calls
)
SELECT
*,
SYSDATE() AS _inserted_timestamp
FROM
results

View File

@ -99,6 +99,15 @@ OR {% if var('OBSERV_FULL_TEST') %}
) IS NOT NULL
{% endif %}
{% endif %}
UNION ALL
SELECT
block_id,
block_id AS block_id_requested,
block_timestamp,
tx_id,
_inserted_timestamp
FROM
{{ ref('silver___manual_tx_lq') }}
),
b_block AS (
SELECT

View File

@ -0,0 +1,75 @@
{{ config(
materialized = 'incremental',
unique_key = "tx_id",
incremental_strategy = 'merge'
) }}
SELECT
DATA :data :tx_response :height :: INT AS block_id,
DATA :data :tx_response :timestamp :: timestamp_ntz AS block_timestamp,
DATA :data :tx_response :codespace :: STRING AS codespace,
DATA :data :tx_response :gas_used :: INT AS gas_used,
DATA :data :tx_response :gas_wanted :: INT AS gas_wanted,
DATA :data :tx_response :txhash :: STRING AS tx_id,
NULL AS auth_type,
NULL AS authorizer_public_key,
COALESCE(
TRY_BASE64_DECODE_STRING(
DATA :data :tx_response :events [0] :attributes [0] :key
),
DATA :data :tx_response :events [0] :attributes [0] :key
) AS msg0_key,
COALESCE(
TRY_BASE64_DECODE_STRING(
DATA :data :tx_response :events [0] :attributes [0] :value
),
DATA :data :tx_response :events [0] :attributes [0] :value
) AS msg0_value,
NULL AS tx_grantee,
NULL AS tx_granter,
NULL AS tx_payer,
COALESCE(
TRY_BASE64_DECODE_STRING(
NULL
),
NULL
) AS acc_seq,
CASE
WHEN msg0_key = 'spender'
AND msg0_value IS NOT NULL THEN msg0_value
WHEN msg0_key = 'granter'
AND tx_payer IS NOT NULL THEN tx_payer
WHEN msg0_key = 'fee'
AND COALESCE(tx_grantee, SPLIT(acc_seq, '/') [0] :: STRING) IS NOT NULL THEN COALESCE(tx_grantee, SPLIT(acc_seq, '/') [0] :: STRING)
ELSE msg0_value
END AS tx_sender,
NULL AS gas_limit,
NULL AS fee_raw,
NULL AS fee_denom,
NULL AS memo,
DATA :data :tx_response AS tx,
'' AS tx_code,
NULL AS tx_succeeded,
_inserted_timestamp AS _ingested_at,
_inserted_timestamp
FROM
{{ ref(
'bronze_api__manual_tx_lq'
) }}
WHERE
block_id IS NOT NULL
{% if is_incremental() %}
AND _inserted_timestamp >= (
SELECT
MAX(
_inserted_timestamp
)
FROM
{{ this }}
)
{% endif %}
qualify(ROW_NUMBER() over (PARTITION BY block_id, tx_id
ORDER BY
_inserted_timestamp DESC) = 1)

View File

@ -195,6 +195,38 @@ silver_txs AS (
_inserted_timestamp
FROM
bronze_txs
{% if is_incremental() %}
{% else %}
UNION ALL
SELECT
tx_id,
block_id,
block_timestamp,
NULL AS blockchain,
auth_type,
authorizer_public_key :: ARRAY AS authorizer_public_key,
msg0_key,
msg0_value,
tx_grantee,
tx_granter,
tx_payer,
acc_seq,
tx_sender,
gas_limit,
gas_used,
fee_raw,
fee_denom,
memo,
NULL AS tx_code,
tx_succeeded,
codespace,
tx,
_ingested_at,
_inserted_timestamp
FROM
{{ ref('silver___manual_tx_lq') }}
{% endif %}
)
SELECT
A.tx_id,

View File

@ -4,6 +4,6 @@ packages:
- package: dbt-labs/dbt_utils
version: 1.0.0
- git: https://github.com/FlipsideCrypto/fsc-utils.git
revision: "v1.3.0"
revision: "v1.9.3"
- package: get-select/dbt_snowflake_query_tags
version: [">=2.0.0", "<3.0.0"]