11 KiB
11 KiB
Spark SQL 语法要点
数据类型
| 类型 | 说明 | 示例 |
|---|---|---|
| STRING | 字符串 | name STRING |
| INT | 整数 | age INT |
| BIGINT | 大整数 | id BIGINT |
| DOUBLE | 浮点数 | price DOUBLE |
| DECIMAL(p,s) | 定点数 | amount DECIMAL(18,2) |
| BOOLEAN | 布尔 | active BOOLEAN |
| DATE | 日期 | birth_date DATE |
| TIMESTAMP | 时间戳 | created_at TIMESTAMP |
| ARRAY | 数组 | tags ARRAY |
| MAP<key,value> | 映射 | props MAP<STRING,STRING> |
| STRUCTfield:type,... | 结构体 | user STRUCTid:INT,name:STRING |
时间函数
-- 当前时间
current_date()
current_timestamp()
now()
-- 格式转换
date_format(date_col, 'yyyy-MM-dd')
date_format(timestamp_col, 'yyyy-MM-dd HH:mm:ss')
to_date(string_col, 'yyyy-MM-dd')
to_timestamp(string_col, 'yyyy-MM-dd HH:mm:ss')
-- 日期计算
date_add(date_col, 7) -- 加7天
date_sub(date_col, 7) -- 减7天
add_months(date_col, 3) -- 加3个月
datediff(end_date, start_date) -- 日期差(天数)
-- 日期提取
year(date_col)
month(date_col)
day(date_col)
dayofweek(date_col)
hour(timestamp_col)
minute(timestamp_col)
second(timestamp_col)
-- 季度、周
quarter(date_col) -- 季度 (1-4)
weekofyear(date_col) -- 年中第几周
-- Unix 时间戳
unix_timestamp(date_col) -- 转 Unix 时间戳
from_unixtime(timestamp) -- Unix 时间戳转时间字符串
字符串函数
-- 常用函数
concat(str1, str2, ...) -- 字符串拼接
concat_ws('-', str1, str2, ...) -- 用分隔符拼接
lower(str) -- 转小写
upper(str) -- 转大写
trim(str) -- 去两端空格
ltrim(str) -- 去左空格
rtrim(str) -- 去右空格
length(str) -- 字符串长度
substring(str, pos, len) -- 截取字符串
left(str, len) -- 取左边len个字符
right(str, len) -- 取右边len个字符
reverse(str) -- 反转字符串
repeat(str, n) -- 重复n次
space(n) -- 生成n个空格
-- 查找与替换
instr(str, substr) -- 查找子串位置
locate(substr, str, pos) -- 从pos位置查找
replace(str, old, new) -- 替换
regexp_extract(str, pattern, idx) -- 正则提取
regexp_replace(str, pattern, replacement) -- 正则替换
-- 分割
split(str, delimiter) -- 分割成数组
split_part(str, delimiter, idx) -- 取分割后的第idx部分
-- 其他
initcap(str) -- 首字母大写
lpad(str, len, pad) -- 左填充
rpad(str, len, pad) -- 右填充
levenshtein(str1, str2) -- 编辑距离
条件表达式
-- CASE WHEN
CASE
WHEN condition1 THEN result1
WHEN condition2 THEN result2
ELSE default_result
END
-- CASE 字段匹配
CASE field
WHEN value1 THEN result1
WHEN value2 THEN result2
ELSE default_result
END
-- COALESCE(取第一个非空值)
COALESCE(col1, col2, default_value)
-- NULLIF(相等返回NULL)
NULLIF(col1, col2)
-- IF(简单条件)
IF(condition, true_value, false_value)
-- NVL(空值替换)
NVL(col, default_value)
聚合函数
-- 基础聚合
COUNT(*) -- 计数(含NULL行)
COUNT(col) -- 计数(不含NULL)
COUNT(DISTINCT col) -- 去重计数
SUM(col) -- 求和
AVG(col) -- 平均值
MIN(col) -- 最小值
MAX(col) -- 最大值
-- 集合聚合
collect_list(col) -- 返回数组(不去重)
collect_set(col) -- 返回数组(去重)
-- 统计函数
variance(col) -- 方差
var_pop(col) -- 总体方差
var_samp(col) -- 样本方差
stddev(col) -- 标准差
stddev_pop(col) -- 总体标准差
stddev_samp(col) -- 样本标准差
-- 近似函数
approx_count_distinct(col) -- 近似去重计数(大数据量优化)
-- 其他
first(col) -- 第一个值
last(col) -- 最后一个值
数学函数
-- 基础运算
abs(col) -- 绝对值
round(col, digits) -- 四舍五入
ceil(col) -- 向上取整
floor(col) -- 向下取整
sign(col) -- 符号 (-1, 0, 1)
-- 指数与对数
exp(col) -- e的指数
log(col) -- 自然对数
log10(col) -- 10为底对数
log2(col) -- 2为底对数
pow(col, n) -- 幂运算
sqrt(col) -- 平方根
-- 三角函数
sin(col), cos(col), tan(col)
asin(col), acos(col), atan(col)
-- 随机数
rand() -- 随机数 (0-1)
rand(seed) -- 指定种子随机数
-- 其他
cbrt(col) -- 立方根
hex(col) -- 转16进制
unhex(col) -- 16进制转字符串
数组函数
-- 创建数组
array(val1, val2, ...) -- 创建数组
-- 访问
array_contains(arr, val) -- 判断是否包含
element_at(arr, idx) -- 取元素(idx从1开始)
arr[idx] -- 取元素(idx从0开始)
-- 操作
size(arr) -- 数组长度
array_join(arr, delimiter) -- 数组转字符串
concat(arr1, arr2) -- 数组拼接
-- 展开
explode(arr) -- 展开数组为多行
posexplode(arr) -- 展开数组(带位置索引)
-- 排序与去重
sort_array(arr) -- 排序
array_distinct(arr) -- 去重
array_remove(arr, val) -- 移除元素
array_union(arr1, arr2) -- 并集
array_intersect(arr1, arr2) -- 交集
array_except(arr1, arr2) -- 差集
Map 函数
-- 创建 Map
map(key1, val1, key2, val2, ...) -- 创建 Map
str_to_map(str, delim1, delim2) -- 字符串转 Map
-- 访问
map_contains(map, key) -- 判断是否包含key
element_at(map, key) -- 取值
map[key] -- 取值
map_keys(map) -- 取所有key(返回数组)
map_values(map) -- 取所有value(返回数组)
-- 操作
size(map) -- Map大小
map_concat(map1, map2) -- Map合并
JSON 函数
-- 解析
get_json_object(json_str, path) -- 提取JSON字段
json_tuple(json_str, field1, ...) -- 提取多个字段
-- 转换
from_json(json_str, schema) -- JSON转结构体
to_json(struct_col) -- 结构体转JSON
-- Schema 定义示例
from_json('{"name":"张三","age":25}', 'name STRING, age INT')
分区表操作
-- 创建分区表
CREATE TABLE target_table (
id BIGINT,
name STRING,
amount DECIMAL(18,2)
)
PARTITIONED BY (day_id STRING)
STORED AS PARQUET;
-- 写入指定分区
INSERT OVERWRITE TABLE target_table
PARTITION (day_id = '${day_id}')
SELECT id, name, amount
FROM source_table
WHERE ...
-- 动态分区写入
INSERT OVERWRITE TABLE target_table
PARTITION (day_id)
SELECT id, name, amount, day_id
FROM source_table;
-- 查看分区
SHOW PARTITIONS target_table;
临时表与视图
-- 创建临时表
CREATE TEMPORARY TABLE tmp_table AS
SELECT ...
-- 创建临时视图
CREATE TEMPORARY VIEW tmp_view AS
SELECT ...
-- 全局临时视图(跨Session)
CREATE GLOBAL TEMPORARY VIEW global_view AS
SELECT ...
-- 删除
DROP TABLE IF EXISTS tmp_table;
DROP VIEW IF EXISTS tmp_view;
MERGE INTO(更新插入)
-- MERGE INTO 语法
MERGE INTO target_table t
USING source_table s
ON t.id = s.id
WHEN MATCHED THEN UPDATE SET t.name = s.name, t.amount = s.amount
WHEN NOT MATCHED THEN INSERT (id, name, amount) VALUES (s.id, s.name, s.amount)
-- 仅更新
MERGE INTO target_table t
USING source_table s
ON t.id = s.id
WHEN MATCHED THEN UPDATE SET *
-- 仅插入
MERGE INTO target_table t
USING source_table s
ON t.id = s.id
WHEN NOT MATCHED THEN INSERT *
Spark SQL 不支持的特性
| PostgreSQL 特性 | Spark SQL | 替代方案 |
|---|---|---|
| CREATE INDEX | ❌ 不支持 | 依赖存储格式优化(Parquet/ORC) |
| CREATE TRIGGER | ❌ 不支持 | 使用程序逻辑处理 |
| FOREIGN KEY 约束 | ❌ 不强制 | 数据关联靠 JOIN 保证 |
| CHECK 约束 | ❌ 不支持 | 使用过滤条件 |
| ON CONFLICT (UPSERT) | 使用 MERGE INTO | - |
| WITH RECURSIVE | Spark 3.x+ 支持 | 或用程序迭代 |
| 物化视图 | ❌ 不支持 | 使用缓存或临时表 |
| 存储过程 | ❌ 不支持 | 使用外部程序 |
| FOR UPDATE 锁 | ❌ 不支持 | 无行级锁概念 |
SQL 生成规则
通用规则(所有引擎统一)
- 禁止使用 CTE (WITH 子句),每个主要逻辑步骤必须物化为临时表
- 先 DROP 再 CREATE:
DROP TABLE IF EXISTS ...; CREATE TABLE ... AS SELECT ...; - 禁止
SELECT *,必须明确列出所有字段 - 多表查询时所有表必须使用简短别名
- 每个步骤前添加注释说明
- 谓词下推:过滤条件前置,JOIN 时在 WHERE 中一并添加过滤
- 临时表命名:
${db_tmp_env}.tmp_{业务简称}_{步骤序号} - 目标表命名:
${db_eda_env}.{目标表名}
Spark 特有规则
- 使用
INSERT OVERWRITE TABLE写入目标表 - 分区表必须指定分区:
PARTITION (day_id = '${day_id}') - 最后一步写入目标表,中间步骤物化临时表
- 日期函数:
date_format(),to_date(),date_add(),add_months() - 时间范围筛选:
-- 日账期过滤 WHERE day_id = '${day_id}' -- 最近N个月(月份格式 yyyyMM) WHERE month_id >= date_format(add_months(to_date('${month_id}', 'yyyyMM'), -N), 'yyyyMM') AND month_id < '${month_id}'
SQL 脚本结构
-- =====================================================================
-- @SqlName: spark-D-SQL-{表名}
-- @Engine: spark
-- ...(头注释)
-- =====================================================================
-- Step01: {步骤描述}
DROP TABLE IF EXISTS ${db_tmp_env}.tmp_xxx_01;
CREATE TABLE ${db_tmp_env}.tmp_xxx_01 AS
SELECT ...;
-- Step02: {步骤描述}
DROP TABLE IF EXISTS ${db_tmp_env}.tmp_xxx_02;
CREATE TABLE ${db_tmp_env}.tmp_xxx_02 AS
SELECT ...;
-- 最后一步:写入目标表
INSERT OVERWRITE TABLE ${db_eda_env}.target_table
PARTITION (day_id = '${day_id}')
SELECT ...;