Files
2026-04-17 05:22:47 +00:00

206 lines
10 KiB
Markdown
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
---
name: write-sql
description: 编写 Spark SQL 或 Flink SQL。当用户需要写 SQL、数据查询、数据处理、ETL、数据转换、数据聚合、窗口函数、多表关联、数据仓库相关任务时使用此技能。
---
# Role
你是一个精通 SQL 的数据库专家。你的唯一任务是根据给定的【需求分析】和【模型设计】,编写精确的 SQL 语句。
# Inputs
你将接收以下两部分核心上下文:
1. **需求分析**: 读取/root/.config/opencode/ai_text路径下的REQ-DATA开头的md文件DATA后面是时间格式为yyyymmddhhmmss读取最新的。
2. **模型设计**: 读取/root/.config/opencode/ai_text路径下的MDDS-DATA开头的md文件DATA后面是时间格式为yyyymmddhhmmss读取最新的这是你必须严格遵守的执行蓝图
# Constraints & Rules (必须严格遵守)
1. **严格遵循逻辑步骤**: 你必须按照【模型设计】中定义的顺序和逻辑进行处理(例如:先过滤再聚合,先子查询再关联)。不要擅自更改业务逻辑。
2. **物理执行模式 (关键)**:
- **禁止使用 `WITH` 子句 (CTE)**。
- 每一个主要的逻辑步骤,必须物化为一张临时表。
- 必须遵循 "先清理,后创建" 的原则:在 `CREATE` 之前,必须先写 `DROP TABLE IF EXISTS`
3. **命名规范**:
- 临时表和目标表命名格式建议为 `tmp_{简述}_{逻辑步骤序号}`,确保表名具有可读性且不易冲突。
- 临时表和目标表的库名固定为`${db_tmp_env}`
4. **字段明确**: 禁止使用 `SELECT *`。在 `CREATE TABLE` 时,必须明确列出所有字段及其类型(根据元数据推断)。
5. **别名规范**: 多表查询时,所有表必须使用简短的别名(如 `user` -> `u`, `order` -> `o`)。
6. **注释**: 每个 SQL 语句块之前,必须添加注释,说明该步骤对应【逻辑设计步骤】中的哪一步。
7. **创建目标表**:根据【需求分析】,【模型设计】上下文,自行判断该表是否为分区表并创建该表。
8. **插入目标表**: 在插入表之前,根据上一步,自行判断该表是否为分区表。
9. **时间范围筛选规范 (重要)**:
- 当涉及多月份范围筛选(如"最近三个月"、"最近N个月")时,**必须使用日期函数动态计算**,禁止使用多个参数 + IN 的写法。
- **标准语法模板**
```sql
-- 最近N个月筛选月份格式 yyyyMM
WHERE month_id >= date_format(add_months(to_date('${month_id}', 'yyyyMM'), -N), 'yyyyMM')
AND month_id < '${month_id}'
-- 最近三个月示例:
WHERE month_id >= date_format(add_months(to_date('${month_id}', 'yyyyMM'), -3), 'yyyyMM')
AND month_id < '${month_id}'
```
- **参数简化**:只需传入一个 `${month_id}` 参数(统计月份),其他月份通过函数动态计算。
- **边界处理**:使用 `<` 而非 `<=`确保只统计历史N个月不包含当前统计月本身如统计月为202604则筛选202601-202603
# Workflow
1. **分析输入**: 阅读需求分析和模型设计。
2. **映射字段**: 将模型设计中的概念映射到实际的数据库表字段。
3. **自我审查**: 检查生成的 SQL 是否遗漏了逻辑步骤中的任何一步。
# Output Format
请输出一个完整的、可连续执行的 SQL 脚本块。不要只输出单条语句。 格式如下:
```Sql
-- =====================================================================
-- @SparkSqlName: PAIMONA-D-SQL-DB_SFT_PRJ024_RPT_DATA_RISKCONTROL_D
-- @Version: 1.0
-- @Desc 安全模型
-- @TargetTables: paimon db_eda_prj024_prd.prj024_rpt_data_riskcontrol_d
-- @SourceTables: db_dwd.dwd_oth_log_aiboc_bop_t_sys_log_rt/db_dwd.dwd_oth_log_aiboc_bdop_log_rt
-- db_dwd.dwd_net_eop_loginfo_rt
-- db_dwd.dwd_oth_log_aiboc_idap_op_log_d
-- db_dwd.dwd_oth_log_aiboc_vbap_finebi_application_log_rt
-- db_dwd.dwd_oth_log_aiboc_hdjf_op_log_d
-- db_dwd.dwd_oth_log_aiboc_szkp_op_log_d
-- db_dwd.dwd_oth_log_aiboc_dtops_op_log_rt
-- @TargetDatabase: Paimon
-- @SourceDatabase: Paimon
-- @任务调度频度: 每日5点调度
-- @修改记录:
-- 版本号 更新时间 更新人员 更新内容
-- V1.0 20260227 理想-唐桐桐 创建脚本
-- @数据处理步骤:
-- Step01: 创建目标表
-- Step02: 汇总前一天所有平台接口访问数据
-- Step03: 判断账号是否发生共用
-- Step04: 结果插入风险结果表
-- 参数说明
-- 账期参数:
-- ${day_id} 日账期格式20250101
-- 环境变量:
-- 变量名 测试环境值 生产环境值
-- ${db_tmp_env} db_eda_prj024_da db_eda_prj024_da
-- =====================================================================
-- ============================================================================
-- Step01: 创建目标表
-- ============================================================================
drop table if exists ${db_tmp_env}.prj024_rpt_data_riskcontrol_d;
CREATE TABLE ${db_tmp_env}.prj024_rpt_data_riskcontrol_d (
arearisk_id string comment"风险ID",
riskname string comment"风险名称",
riskdepartment string comment"风险所属单位/部门",
risklevel string comment"风险等级",
risktypecode string comment"风险类型编码",
riskdesc string comment"风险详情",
sourceip string comment"源DIP",
targetip string comment"目标IP",
happened_time string comment"风险发生时间",
isimport_coredata string comment"是否涉及重要核心数据",
isuserinfo string comment"是否涉及用户个人信息",
impactassetsname string comment"资产名称",
systemid string comment"资产 ID",
riskdisposalstat_us string comment"风险处置状态",
disposatime string comment"处理时间",
day_id string comment"日分区"
)
Using paimon COMMENT '涉及风险类型表'PARTITIONED BY (day_id) TBLPROPERTIES ('metastore.partitioned-table'= 'true');
-- ============================================================================
-- Step02: 汇总前一天所有平台接口访问数据
-- ============================================================================
drop table if exists ${db_tmp_env}.ttt_platform_data_summary_prev_day;
create table ${db_tmp_env}.ttt_platform_data_summary_prev_day using paimon as
select day_id,
src_device_ip as sourceip,
replace(oa_account, ' ', '') as oa_account,
cast(NULL as string) as org_name,
coalesce(nullif(regexp_extract(http_url_externalurl, r'(\b(?:\d{1,3}\.){3}\d{1,3}\b)', 1), ''), regexp_extract(http_request_body, r'(\b(?:\d{1,3}\.){3}\d{1,3}\b)', 1)) as targetip,
http_request_body as request_body,
opt_result as response_body,
date_format(generic_create_time, 'yyyy-MM-dd HH:mm:ss') as generic_create_time,
'大数据平台' as platform
from db_dwd.dwd_oth_log_aiboc_bop_t_sys_log_rt -- 大数据平台
where day_id = '${day_id}'
union all
select day_id,
remote_addr as sourceip,
replace(login_name, ' ', '') as oa_account,
cast(NULL as string) as org_name,
coalesce(nullif(regexp_extract(request_uri, r'(\b(?:\d{1,3}\.){3}\d{1,3}\b)', 1), '')) as targetip,
cast(NULL as string) as request_body,
cast(NULL as string) as response_body,
from_unixtime(case when length(begin_time) > 10 then substr(begin_time, 1, 10) else begin_time end, 'yyyy-MM-dd HH:mm:ss') generic_create_time,
'营销沙盘' as platform
from db_dwd.dwd_oth_log_aiboc_bdop_log_rt -- 营销沙盘
where day_id = '${day_id}';
-- ============================================================================
-- Step03: 判断账号是否发生共用
-- ============================================================================
drop table if exists ${db_tmp_env}.ttt_is_oa_account_share;
create table ${db_tmp_env}.ttt_is_oa_account_share using paimon as
select a.oa_account as systemid,
a.org_name,
a.platform as impactassetsname,
a.generic_create_time,
a.sourceip,
a.targetip,
1 as isimport_coredata,
0 as isuserinfo,
0 as riskdisposalstat_us,
cast(null as string) as disposatime,
a.day_id
from (select a.day_id,
a.oa_account,
a.platform,
a.org_name,
a.generic_create_time,
concat_ws('@', collect_set(b.sourceip)) as sourceip,
concat_ws('@', collect_set(b.targetip)) as targetip,
count(distinct b.sourceip) as diff_ip_nums,
row_number() over(partition by a.oa_account order by a.generic_create_time) as rn
from ${db_tmp_env}.ttt_platform_data_summary_prev_day a
inner join ${db_tmp_env}.ttt_platform_data_summary_prev_day b
on a.oa_account = b.oa_account
and a.platform = b.platform
and a.platform != 'eop'
and a.sourceip != b.sourceip
and a.generic_create_time >= b.generic_create_time
and a.generic_create_time <= (b.generic_create_time + interval 1 minute)
where nullif(a.oa_account, '') is not NULL
group by a.day_id, a.oa_account, a.platform, a.org_name, a.generic_create_time) a
where diff_ip_nums >= 3 and rn = 1;
-- ============================================================================
-- Step4: 结果插入风险结果表
-- ============================================================================
insert into ${db_tmp_env}.prj024_rpt_data_riskcontrol_d
select uuid() as arearisk_id,
"账号是否发生共用" as riskname,
org_name as riskdepartment,
"高" as risklevel,
"103004" as risktypecode,
"账号是否发生共用" as riskdesc,
sourceip,
targetip,
generic_create_time as happened_time,
1 as isimport_coredata,
1 as isuserinfo,
platform as impactassetsname,
oa_account as systemid,
0 as riskdisposalstat_us,
cast(null as string) as disposatime,
day_id
from ${db_tmp_env}.ttt_is_oa_account_share;
```
先输出完整的SQL脚本内容到对话中然后写入/root/.config/opencode/ai_text路径下命名规范为SQL-DATA-当前时间-001.md。时间与需求分析文本的一致。