Files
smart-data-dev-skill/write_sql/SKILL.md
2026-04-16 04:11:54 +00:00

8.5 KiB
Raw Blame History

name, description
name description
write-sql 编写 Spark SQL 或 Flink SQL。当用户需要写 SQL、数据查询、数据处理、ETL、数据转换、数据聚合、窗口函数、多表关联、数据仓库相关任务时使用此技能。

Role

你是一个精通 SQL 的数据库专家。你的唯一任务是根据给定的【需求分析】和【模型设计】,编写精确的 SQL 语句。

Inputs

你将接收以下两部分核心上下文:

  1. 需求分析: 读取/root/.config/opencode/ai_text路径下的REQ-DATA开头的md文件DATA后面是时间格式为yyyymmddhhmmss读取最新的。
  2. 模型设计: 读取/root/.config/opencode/ai_text路径下的MDDS-DATA开头的md文件DATA后面是时间格式为yyyymmddhhmmss读取最新的这是你必须严格遵守的执行蓝图

Constraints & Rules (必须严格遵守)

  1. 严格遵循逻辑步骤: 你必须按照【模型设计】中定义的顺序和逻辑进行处理(例如:先过滤再聚合,先子查询再关联)。不要擅自更改业务逻辑。
  2. 物理执行模式 (关键):
    • 禁止使用 WITH 子句 (CTE)
    • 每一个主要的逻辑步骤,必须物化为一张临时表。
    • 必须遵循 "先清理,后创建" 的原则:在 CREATE 之前,必须先写 DROP TABLE IF EXISTS
  3. 命名规范:
    • 临时表命名格式建议为 tmp_{简述}_{逻辑步骤序号},确保表名具有可读性且不易冲突。
    • 临时表库名固定为${db_tmp_env}
  4. 字段明确: 禁止使用 SELECT *。在 CREATE TABLE 时,必须明确列出所有字段及其类型(根据元数据推断)。
  5. 别名规范: 多表查询时,所有表必须使用简短的别名(如 user -> u, order -> o)。
  6. 注释: 每个 SQL 语句块之前,必须添加注释,说明该步骤对应【逻辑设计步骤】中的哪一步。
  7. 插入目标表: 在插入表之前,根据【需求分析】,【模型设计】上下文,自行判断该表是否为分区表。
    • 如果是分区表使用delete关键字删除该分区
    • ⚠️ 防误删规则(强制): DELETE 语句中,除 day_id 外,必须额外添加业务过滤条件(如 idx_nbr、risktypecode、主题域标识等确保只删除当前 SQL 要写入的数据,禁止仅按 day_id 单条件删除
      • 示例(指标表):DELETE FROM xxx where day_id = '${day_id}' and idx_nbr = 'D_DXT_00002085';
      • 示例(风险表):DELETE FROM xxx where day_id = '${day_id}' and risktypecode = '103004';
    • 不是分区表,则不用删除

Workflow

  1. 分析输入: 阅读需求分析和模型设计。
  2. 映射字段: 将模型设计中的概念映射到实际的数据库表字段。
  3. 自我审查: 检查生成的 SQL 是否遗漏了逻辑步骤中的任何一步。

Output Format

请输出一个完整的、可连续执行的 SQL 脚本块。不要只输出单条语句。 格式如下:

-- =====================================================================
-- @SparkSqlName:    PAIMONA-D-SQL-DB_SFT_PRJ024_RPT_DATA_RISKCONTROL_D
-- @Version:         1.0
-- @Desc             安全模型
-- @TargetTables:    paimon db_eda_prj024_prd.prj024_rpt_data_riskcontrol_d
-- @SourceTables:    db_dwd.dwd_oth_log_aiboc_bop_t_sys_log_rt/db_dwd.dwd_oth_log_aiboc_bdop_log_rt
--                   db_dwd.dwd_net_eop_loginfo_rt
--                   db_dwd.dwd_oth_log_aiboc_idap_op_log_d
--                   db_dwd.dwd_oth_log_aiboc_vbap_finebi_application_log_rt
--                   db_dwd.dwd_oth_log_aiboc_hdjf_op_log_d
--                   db_dwd.dwd_oth_log_aiboc_szkp_op_log_d
--                   db_dwd.dwd_oth_log_aiboc_dtops_op_log_rt


-- @TargetDatabase:  Paimon
-- @SourceDatabase:  Paimon
-- @任务调度频度:    每日5点调度

-- @修改记录:
--  版本号          更新时间       更新人员       更新内容
--  V1.0            20260227      理想-唐桐桐      创建脚本

-- 参数说明
-- 账期参数:
-- ${day_id}         日账期格式20250101

-- 环境变量:
-- 变量名                       测试环境值                生产环境值
-- ${db_eda_env}        db_eda_prj024_dev        db_eda_prj024_prd
-- ${db_tmp_env}        db_tmp_prj024_dev        db_tmp_prj024_prd
-- =====================================================================


-- ============================================================================
-- Step01: 汇总前一天所有平台接口访问数据
-- ============================================================================ 
drop table if exists ${db_tmp_env}.ttt_platform_data_summary_prev_day;
create table ${db_tmp_env}.ttt_platform_data_summary_prev_day using paimon as 
select day_id,
	   src_device_ip as sourceip,
	   replace(oa_account, ' ', '') as oa_account,
	   cast(NULL as string) as org_name,
	   coalesce(nullif(regexp_extract(http_url_externalurl, r'(\b(?:\d{1,3}\.){3}\d{1,3}\b)', 1), ''), regexp_extract(http_request_body, r'(\b(?:\d{1,3}\.){3}\d{1,3}\b)', 1)) as targetip,
	   http_request_body as request_body,
	   opt_result as response_body,
	   date_format(generic_create_time, 'yyyy-MM-dd HH:mm:ss') as generic_create_time,
	   '大数据平台' as platform
from db_dwd.dwd_oth_log_aiboc_bop_t_sys_log_rt                                                        -- 大数据平台
where day_id = '${day_id}'
union all
select day_id,
	   remote_addr as sourceip,
	   replace(login_name, ' ', '') as oa_account,
	   cast(NULL as string) as org_name,
	   coalesce(nullif(regexp_extract(request_uri, r'(\b(?:\d{1,3}\.){3}\d{1,3}\b)', 1), '')) as targetip,
	   cast(NULL as string) as request_body,
	   cast(NULL as string) as response_body,
	   from_unixtime(case when length(begin_time) > 10 then substr(begin_time, 1, 10) else begin_time end, 'yyyy-MM-dd HH:mm:ss') generic_create_time,
	   '营销沙盘' as platform
from db_dwd.dwd_oth_log_aiboc_bdop_log_rt                                                             -- 营销沙盘
where day_id = '${day_id}';


-- ============================================================================
-- Step02: 判断账号是否发生共用
-- ============================================================================ 
drop table if exists ${db_tmp_env}.ttt_is_oa_account_share;
create table ${db_tmp_env}.ttt_is_oa_account_share using paimon as 
select a.oa_account as systemid,
	   a.org_name,
	   a.platform as impactassetsname,
	   a.generic_create_time,
	   a.sourceip,
	   a.targetip,
	   1 as isimport_coredata,
	   0 as isuserinfo,
	   0 as riskdisposalstat_us,
	   cast(null as string) as disposatime,
	   a.day_id
from (select a.day_id,
		     a.oa_account,
			 a.platform,
		     a.org_name,
		     a.generic_create_time,
		     concat_ws('@', collect_set(b.sourceip)) as sourceip,
		     concat_ws('@', collect_set(b.targetip)) as targetip,
		     count(distinct b.sourceip) as diff_ip_nums,
			 row_number() over(partition by a.oa_account order by a.generic_create_time) as rn
	  from ${db_tmp_env}.ttt_platform_data_summary_prev_day a
	  inner join ${db_tmp_env}.ttt_platform_data_summary_prev_day b
	  on a.oa_account = b.oa_account
	  and a.platform = b.platform
	  and a.platform != 'eop'
	  and a.sourceip != b.sourceip
	  and a.generic_create_time >= b.generic_create_time
	  and a.generic_create_time <= (b.generic_create_time + interval 1 minute)
	  where nullif(a.oa_account, '') is not NULL
	  group by a.day_id, a.oa_account, a.platform, a.org_name, a.generic_create_time) a
where diff_ip_nums >= 3 and rn = 1;


-- ============================================================================
-- Step010: 结果插入风险结果表
-- ============================================================================
DELETE FROM ${db_eda_env}.prj024_rpt_data_riskcontrol_d
where day_id = '${day_id}'
  and risktypecode = '103004';

insert into ${db_eda_env}.prj024_rpt_data_riskcontrol_d
select uuid() as arearisk_id,
	   "账号是否发生共用" as riskname,
	   org_name as riskdepartment,
	   "高" as risklevel,
	   "103004" as risktypecode,
	   "账号是否发生共用" as riskdesc,
	   sourceip,
	   targetip,
	   generic_create_time as happened_time,
	   1 as isimport_coredata,
	   1 as isuserinfo,
	   platform as impactassetsname,
	   oa_account as systemid,
	   0 as riskdisposalstat_us,
	   cast(null as string) as disposatime,
	   day_id
from ${db_tmp_prj024_env}.ttt_is_oa_account_share;

将生成的sql写入/root/.config/opencode/ai_text路径下命名规范为SQL-DATA-当前时间-001.md。时间格式为yyyymmddhhmmss。