-- ===================================================================== -- @SparkSqlName: PAIMONA-D-SQL-{表名}-ETL -- @Version: 1.0 -- @Desc: ETL 数据处理模板(临时表链式处理) -- @TargetTables: ${db_eda_env}.{目标表名} -- @SourceTables: {源表列表} -- @TargetDatabase: Paimon -- @SourceDatabase: Paimon -- @任务调度频度: {日/周/月} -- @修改记录: -- 版本号 更新时间 更新人员 更新内容 -- V1.0 {日期} {人员} 创建脚本 -- @数据处理步骤: -- Step01: {步骤描述} -- Step02: {步骤描述} -- Step03: {步骤描述} -- 参数说明 -- 账期参数: -- ${day_id} 日账期,格式:20250101 -- 环境变量: -- 变量名 测试环境值 生产环境值 -- ${db_tmp_env} {库名} {库名} -- ${db_eda_env} {库名} {库名} -- ===================================================================== -- ============================================================================ -- Step01: 基础清洗与过滤 -- ============================================================================ -- 说明:从源表读取数据,进行基础过滤和清洗 -- 输入:{源表名} -- 输出:${db_tmp_env}.tmp_{表名}_01 DROP TABLE IF EXISTS ${db_tmp_env}.tmp_xxx_01; CREATE TABLE ${db_tmp_env}.tmp_xxx_01 AS SELECT id, name, amount, status, created_at, day_id FROM source_table WHERE day_id = '${day_id}' -- 分区过滤(必须) AND status IN ('active', 'valid') -- 业务过滤 AND amount > 0 -- 数据质量过滤 AND id IS NOT NULL -- NULL过滤; -- ============================================================================ -- Step02: 多表关联与维度补全 -- ============================================================================ -- 说明:关联维度表,补全业务属性字段 -- 输入:${db_tmp_env}.tmp_xxx_01, {维度表1}, {维度表2} -- 输出:${db_tmp_env}.tmp_xxx_02 DROP TABLE IF EXISTS ${db_tmp_env}.tmp_xxx_02; CREATE TABLE ${db_tmp_env}.tmp_xxx_02 AS SELECT a.id, a.name, a.amount, a.status, b.category_name, -- 维度补全:类别名称 c.department_name, -- 维度补全:部门名称 a.created_at, a.day_id FROM ${db_tmp_env}.tmp_xxx_01 a LEFT JOIN dim_category b ON a.category_id = b.id AND b.day_id = '${day_id}' -- 维度表分区过滤 LEFT JOIN dim_department c ON a.department_id = c.id AND c.day_id = '${day_id}'; -- 维度表分区过滤 -- ============================================================================ -- Step03: 聚合计算与指标生成 -- ============================================================================ -- 说明:按业务维度聚合,计算统计指标 -- 输入:${db_tmp_env}.tmp_xxx_02 -- 输出:${db_tmp_env}.tmp_xxx_03 DROP TABLE IF EXISTS ${db_tmp_env}.tmp_xxx_03; CREATE TABLE ${db_tmp_env}.tmp_xxx_03 AS SELECT day_id, category_name, department_name, COUNT(*) AS record_count, -- 记录数 COUNT(DISTINCT id) AS unique_count, -- 唯一计数 SUM(amount) AS total_amount, -- 总金额 AVG(amount) AS avg_amount, -- 平均金额 MAX(amount) AS max_amount, -- 最大金额 MIN(amount) AS min_amount -- 最小金额 FROM ${db_tmp_env}.tmp_xxx_02 GROUP BY day_id, category_name, department_name; -- ============================================================================ -- Step04: 最终输出写入目标表 -- ============================================================================ -- 说明:补全目标表标准字段,写入结果表 -- 输入:${db_tmp_env}.tmp_xxx_03 -- 输出:${db_eda_env}.{目标表名} INSERT OVERWRITE TABLE ${db_eda_env}.target_table PARTITION (day_id = '${day_id}') SELECT -- 业务字段 category_name, department_name, record_count, unique_count, total_amount, avg_amount, max_amount, min_amount, -- 技术字段 current_timestamp() AS etl_time, -- 数据加工时间 '${day_id}' AS stat_date -- 统计日期; -- ============================================================================ -- 关键规则说明 -- ============================================================================ /* 1. 禁止使用 CTE (WITH 子句) - 每个步骤必须物化为临时表 - 原因:避免内存溢出,便于调试和断点续跑 2. 先 DROP 再 CREATE - 每个临时表创建前必须先 DROP - 原因:防止表已存在导致失败 3. 分区过滤必须前置 - 所有源表和维度表查询必须带 day_id 过滤 - 原因:避免全表扫描,提升性能 4. JOIN 条件下推 - 维度表关联时带上分区过滤条件 - 原因:减少关联数据量 5. 临时表命名规范 - 格式:tmp_{业务简称}_{步骤序号} - 示例:tmp_order_stats_01, tmp_order_stats_02 6. 目标表写入规范 - 使用 INSERT OVERWRITE(覆盖写入) - 明确指定分区 - 补全技术字段(etl_time 等) */