整理一篇学习笔记,把看到的一些要点和自己的理解都记下来。
目录
二、缺失值检测 三、缺失值处理 四、异常值检测 五、异常值处理 六、数据质量报告 七、自动化清洗流程 八、实战案例 九、总结参考资料摘要
本文深入讲解DolphinDB工业数据清洗技术。从数据质量检测到缺失值处理,从异常值识别到数据修复,从清洗策略到自动化流程,全面介绍数据清洗的核心方法。通过丰富的代码示例,帮助读者掌握工业数据清洗的核心技能。
一、数据清洗概述
1.1 数据质量坑
数据质量坑
缺失值
数据清洗
异常值
重复值
不一致
高质量数据
1.2 工业数据特点
问题说明缺失值设备故障、网络中断异常值传感器故障、干扰重复值数据重复上报不一致格式不统一
二、缺失值检测
2.1 检测缺失值
// 创建测试数据
t = table(
1..10 as id,
[1, NULL, 3, NULL, 5, 6, NULL, 8, 9, 10] as value,
[25.0, 26.0, NULL, 28.0, 29.0, NULL, 31.0, 32.0, NULL, 34.0] as temperature
)
// 检测缺失值
select id, value, temperature,
isNull(value) as value_null,
isNull(temperature) as temp_null
from t
// 统计缺失值
select
count(*) as total,
sum(isNull(value)) as value_null_count,
sum(isNull(temperature)) as temp_null_count,
sum(isNull(value)) * 100.0 / count(*) as value_null_rate
from t
2.2 缺失值模式分析
// 分析缺失值模式
def analyzeMissingPattern(data) {
result = dict(STRING, ANY)
for (col in data.columnNames()) {
nullCount = sum(isNull(data[col]))
nullRate = nullCount * 100.0 / data.rows()
result[col] = dict(STRING, ANY, [
["nullCount", nullCount],
["nullRate", nullRate]
])
}
return result
}
// 使用
analyzeMissingPattern(t)
三、缺失值处理
3.1 删除缺失值
// 删除包含缺失值的行
cleaned = select * from t where value is not null and temperature is not null
// 删除特定列缺失的行
cleaned = select * from t where temperature is not null
3.2 填充缺失值
// 均值填充
avgTemp = avg(t.temperature)
filled = select id, value,
iif(temperature is null, avgTemp, temperature) as temperature
from t
// 中位数填充
medTemp = med(t.temperature)
filled = select id, value,
iif(temperature is null, medTemp, temperature) as temperature
from t
// 前向填充
filled = select id, value,
ffill(temperature) as temperature
from t
// 线性插值
filled = select id, value,
interpolate(temperature, "linear") as temperature
from t
3.3 分组填充
// 按设备分组填充
t = table(
take(1..3, 30) as device_id,
1..30 as id,
[25.0, NULL, 27.0, NULL, 29.0, ...] as temperature
)
// 按设备均值填充
filled = select id, device_id,
iif(temperature is null, avg(temperature), temperature) as temperature
from t
context by device_id
四、异常值检测
4.1 统计方法检测
// 创建测试数据
t = table(
1..100 as id,
concat([rand(20.0..30.0, 95), [100.0, -50.0, 200.0, -100.0, 150.0]]) as temperature
)
// 3σ原则检测
avgTemp = avg(t.temperature)
stdTemp = std(t.temperature)
select id, temperature,
abs(temperature - avgTemp) > 3 * stdTemp as is_outlier_3sigma
from t
// IQR方法检测
q1 = percentile(t.temperature, 25)
q3 = percentile(t.temperature, 75)
iqr = q3 - q1
lowerBound = q1 - 1.5 * iqr
upperBound = q3 + 1.5 * iqr
select id, temperature,
temperature < lowerBound or temperature > upperBound as is_outlier_iqr
from t
4.2 Z-Score检测
// Z-Score检测
def detectOutliersZScore(data, threshold = 3) {
meanVal = avg(data)
stdVal = std(data)
zScore = abs(data - meanVal) / stdVal
return zScore > threshold
}
// 使用
select id, temperature,
detectOutliersZScore(temperature) as is_outlier
from t
4.3 分位数检测
// 分位数检测
def detectOutliersQuantile(data, lowerQ = 0.01, upperQ = 0.99) {
lower = percentile(data, lowerQ * 100)
upper = percentile(data, upperQ * 100)
return data < lower or data > upper
}
// 使用
select id, temperature,
detectOutliersQuantile(temperature) as is_outlier
from t
五、异常值处理
5.1 删除异常值
// 删除异常值
avgTemp = avg(t.temperature)
stdTemp = std(t.temperature)
cleaned = select * from t
where abs(temperature - avgTemp) upperBound, upperBound, temperature)) as temperature
from t
5.3 插值替换
// 用相邻值替换
cleaned = select id, temperature,
iif(abs(temperature - avgTemp) > 3 * stdTemp,
ffill(temperature), temperature) as temperature_fixed
from t
六、数据质量报告
6.1 生成质量报告
// 数据质量报告函数
def generateQualityReport(data) {
report = dict(STRING, ANY)
report["totalRows"] = data.rows()
report["totalColumns"] = data.columns()
// 缺失值统计
nullStats = dict(STRING, ANY)
for (col in data.columnNames()) {
nullStats[col] = dict(STRING, ANY, [
["nullCount", sum(isNull(data[col]))],
["nullRate", sum(isNull(data[col])) * 100.0 / data.rows()]
])
}
report["nullStats"] = nullStats
// 异常值统计(数值列)
outlierStats = dict(STRING, ANY)
for (col in data.columnNames()) {
if (type(data[col]) in [INT, LONG, FLOAT, DOUBLE]) {
avgVal = avg(data[col])
stdVal = std(data[col])
outlierCount = sum(abs(data[col] - avgVal) > 3 * stdVal)
outlierStats[col] = dict(STRING, ANY, [
["outlierCount", outlierCount],
["outlierRate", outlierCount * 100.0 / data.rows()]
])
}
}
report["outlierStats"] = outlierStats
return report
}
// 使用
report = generateQualityReport(t)
print(report)
七、自动化清洗流程
7.1 清洗流水线
// 数据清洗流水线
def dataCleaningPipeline(data, config) {
result = data
// 1. 缺失值处理
if (config.handleMissing == "drop") {
result = select * from result where not hasNull(*)
} else if (config.handleMissing == "fill_mean") {
for (col in config.numericColumns) {
result[col] = iif(isNull(result[col]), avg(result[col]), result[col])
}
}
// 2. 异常值处理
if (config.handleOutlier == "drop") {
for (col in config.numericColumns) {
avgVal = avg(result[col])
stdVal = std(result[col])
result = select * from result
where abs(result[col] - avgVal) 3 * stdTemp, avgTemp, temperature) as temperature,
humidity
from cleaned
// 5. 清洗后报告
print("=== 清洗后数据质量 ===")
print("总行数: " + string(cleaned.rows()))
print("温度缺失: " + string(sum(isNull(cleaned.temperature))))
// 6. 写入分布式表
db = database("dfs://cleaned_db", VALUE, 1..10)
db.createPartitionedTable(cleaned, `cleaned_data, `device_id)
loadTable("dfs://cleaned_db", "cleaned_data").append!(cleaned)
print("数据清洗完成")
九、总结
本文详细介绍了DolphinDB工业数据清洗:
1. 数据质量问题:缺失值、异常值、重复值
2. 缺失值检测:检测方法、模式分析
3. 缺失值处理:删除、填充、插值
4. 异常值检测:3σ、IQR、Z-Score
5. 异常值处理:删除、替换、插值
6. 自动化流程:清洗流水线、质量报告
思考题:
1. 如何选择合适的缺失值处理方法?
2. 如何平衡异常值检测的准确性和误报率?
3. 如何设计自动化数据清洗流程?
参考资料
暂时整理到这里。以上都是个人理解,可能有疏漏,欢迎指正。
评论 (0)
暂无评论