DolphinDB工业数据清洗:缺失值与异常值处理

整理一篇学习笔记,把看到的一些要点和自己的理解都记下来。

目录

二、缺失值检测 三、缺失值处理 四、异常值检测 五、异常值处理 六、数据质量报告 七、自动化清洗流程 八、实战案例 九、总结参考资料

摘要

本文深入讲解DolphinDB工业数据清洗技术。从数据质量检测到缺失值处理,从异常值识别到数据修复,从清洗策略到自动化流程,全面介绍数据清洗的核心方法。通过丰富的代码示例,帮助读者掌握工业数据清洗的核心技能。


一、数据清洗概述

1.1 数据质量坑

数据质量坑

缺失值

数据清洗

异常值

重复值

不一致

高质量数据

1.2 工业数据特点

问题说明缺失值设备故障、网络中断异常值传感器故障、干扰重复值数据重复上报不一致格式不统一


二、缺失值检测

2.1 检测缺失值

// 创建测试数据
t = table(
    1..10 as id,
    [1, NULL, 3, NULL, 5, 6, NULL, 8, 9, 10] as value,
    [25.0, 26.0, NULL, 28.0, 29.0, NULL, 31.0, 32.0, NULL, 34.0] as temperature
)

// 检测缺失值
select id, value, temperature,
       isNull(value) as value_null,
       isNull(temperature) as temp_null
from t

// 统计缺失值
select
    count(*) as total,
    sum(isNull(value)) as value_null_count,
    sum(isNull(temperature)) as temp_null_count,
    sum(isNull(value)) * 100.0 / count(*) as value_null_rate
from t

2.2 缺失值模式分析

// 分析缺失值模式
def analyzeMissingPattern(data) {
    result = dict(STRING, ANY)

    for (col in data.columnNames()) {
        nullCount = sum(isNull(data[col]))
        nullRate = nullCount * 100.0 / data.rows()
        result[col] = dict(STRING, ANY, [
            ["nullCount", nullCount],
            ["nullRate", nullRate]
        ])
    }

    return result
}

// 使用
analyzeMissingPattern(t)


三、缺失值处理

3.1 删除缺失值

// 删除包含缺失值的行
cleaned = select * from t where value is not null and temperature is not null

// 删除特定列缺失的行
cleaned = select * from t where temperature is not null

3.2 填充缺失值

// 均值填充
avgTemp = avg(t.temperature)
filled = select id, value,
         iif(temperature is null, avgTemp, temperature) as temperature
         from t

// 中位数填充
medTemp = med(t.temperature)
filled = select id, value,
         iif(temperature is null, medTemp, temperature) as temperature
         from t

// 前向填充
filled = select id, value,
         ffill(temperature) as temperature
         from t

// 线性插值
filled = select id, value,
         interpolate(temperature, "linear") as temperature
         from t

3.3 分组填充

// 按设备分组填充
t = table(
    take(1..3, 30) as device_id,
    1..30 as id,
    [25.0, NULL, 27.0, NULL, 29.0, ...] as temperature
)

// 按设备均值填充
filled = select id, device_id,
         iif(temperature is null, avg(temperature), temperature) as temperature
         from t
         context by device_id


四、异常值检测

4.1 统计方法检测

// 创建测试数据
t = table(
    1..100 as id,
    concat([rand(20.0..30.0, 95), [100.0, -50.0, 200.0, -100.0, 150.0]]) as temperature
)

// 3σ原则检测
avgTemp = avg(t.temperature)
stdTemp = std(t.temperature)

select id, temperature,
       abs(temperature - avgTemp) > 3 * stdTemp as is_outlier_3sigma
from t

// IQR方法检测
q1 = percentile(t.temperature, 25)
q3 = percentile(t.temperature, 75)
iqr = q3 - q1
lowerBound = q1 - 1.5 * iqr
upperBound = q3 + 1.5 * iqr

select id, temperature,
       temperature < lowerBound or temperature > upperBound as is_outlier_iqr
from t

4.2 Z-Score检测

// Z-Score检测
def detectOutliersZScore(data, threshold = 3) {
    meanVal = avg(data)
    stdVal = std(data)
    zScore = abs(data - meanVal) / stdVal
    return zScore > threshold
}

// 使用
select id, temperature,
       detectOutliersZScore(temperature) as is_outlier
from t

4.3 分位数检测

// 分位数检测
def detectOutliersQuantile(data, lowerQ = 0.01, upperQ = 0.99) {
    lower = percentile(data, lowerQ * 100)
    upper = percentile(data, upperQ * 100)
    return data < lower or data > upper
}

// 使用
select id, temperature,
       detectOutliersQuantile(temperature) as is_outlier
from t


五、异常值处理

5.1 删除异常值

// 删除异常值
avgTemp = avg(t.temperature)
stdTemp = std(t.temperature)

cleaned = select * from t
          where abs(temperature - avgTemp)  upperBound, upperBound, temperature)) as temperature
          from t

5.3 插值替换

// 用相邻值替换
cleaned = select id, temperature,
          iif(abs(temperature - avgTemp) > 3 * stdTemp,
              ffill(temperature), temperature) as temperature_fixed
          from t


六、数据质量报告

6.1 生成质量报告

// 数据质量报告函数
def generateQualityReport(data) {
    report = dict(STRING, ANY)

    report["totalRows"] = data.rows()
    report["totalColumns"] = data.columns()

    // 缺失值统计
    nullStats = dict(STRING, ANY)
    for (col in data.columnNames()) {
        nullStats[col] = dict(STRING, ANY, [
            ["nullCount", sum(isNull(data[col]))],
            ["nullRate", sum(isNull(data[col])) * 100.0 / data.rows()]
        ])
    }
    report["nullStats"] = nullStats

    // 异常值统计(数值列)
    outlierStats = dict(STRING, ANY)
    for (col in data.columnNames()) {
        if (type(data[col]) in [INT, LONG, FLOAT, DOUBLE]) {
            avgVal = avg(data[col])
            stdVal = std(data[col])
            outlierCount = sum(abs(data[col] - avgVal) > 3 * stdVal)
            outlierStats[col] = dict(STRING, ANY, [
                ["outlierCount", outlierCount],
                ["outlierRate", outlierCount * 100.0 / data.rows()]
            ])
        }
    }
    report["outlierStats"] = outlierStats

    return report
}

// 使用
report = generateQualityReport(t)
print(report)


七、自动化清洗流程

7.1 清洗流水线

// 数据清洗流水线
def dataCleaningPipeline(data, config) {
    result = data

    // 1. 缺失值处理
    if (config.handleMissing == "drop") {
        result = select * from result where not hasNull(*)
    } else if (config.handleMissing == "fill_mean") {
        for (col in config.numericColumns) {
            result[col] = iif(isNull(result[col]), avg(result[col]), result[col])
        }
    }

    // 2. 异常值处理
    if (config.handleOutlier == "drop") {
        for (col in config.numericColumns) {
            avgVal = avg(result[col])
            stdVal = std(result[col])
            result = select * from result
                     where abs(result[col] - avgVal)  3 * stdTemp, avgTemp, temperature) as temperature,
         humidity
         from cleaned

// 5. 清洗后报告
print("=== 清洗后数据质量 ===")
print("总行数: " + string(cleaned.rows()))
print("温度缺失: " + string(sum(isNull(cleaned.temperature))))

// 6. 写入分布式表
db = database("dfs://cleaned_db", VALUE, 1..10)
db.createPartitionedTable(cleaned, `cleaned_data, `device_id)
loadTable("dfs://cleaned_db", "cleaned_data").append!(cleaned)

print("数据清洗完成")


九、总结

本文详细介绍了DolphinDB工业数据清洗:

1. 数据质量问题:缺失值、异常值、重复值
2. 缺失值检测:检测方法、模式分析
3. 缺失值处理:删除、填充、插值
4. 异常值检测:3σ、IQR、Z-Score
5. 异常值处理:删除、替换、插值
6. 自动化流程:清洗流水线、质量报告

思考题

1. 如何选择合适的缺失值处理方法?
2. 如何平衡异常值检测的准确性和误报率?
3. 如何设计自动化数据清洗流程?


参考资料



暂时整理到这里。以上都是个人理解,可能有疏漏,欢迎指正。

评论 (0)

暂无评论