有一条字串
“a,b,c,d”

通过.split(",")指定分隔符转化为数组,转化为.length查看数组长度

这是数据清洗的一个思路,如果要求是丢失数据为大于3,就清洗该数据,那么可以使用原数组长度和新数组长度比较 原来的长度 - 新的长度 > 3 这个数据就不要了

怎么生成新数组 使用.replice("old",""new)

1
val newS=s.replace(",,,,,",",").replice(",,,,",",,,").replice(",,,",",,").replice(",,",",")

再用新字符串切割

1
newS.splict(",")
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
// 初始化sc对象
val conf = new SparkConf().setMaster("local").setAppName("过滤缺失值超过三个字段的记录")
val sc = new SparkContext(conf)

// 读取文件
val datas = sc.textFile("path")
// 获取新文件,因为老文件有个SEQ开头的东西,需要去掉他
// 用字符串的filter方法
val lines=datas.filter{line => !line.startsWith("SEQ")}
//lines.saveAsTextFile("path/output/")
// 这个时候SEQ开头的就没了

// 过滤缺失值超过3个的记录条数
// 记录原始记录数
val c1=lines.count()
lines.filter(x => x.split(",").length) //原来条数
// 获取新条数的方法看上面那个代码块
// 保留 原数据-新数据<3的数据
// 也是用filter方法
val filterRDD=lines.filter(x => x.split(',').length-s.replace(",,,,,",",").replice(",,,,",",,,").replice(",,,",",,").replice(",,",",").split(",").length<3)

val c2 = filterRDD.count() //清洗后的条目数

print(c1-c2)

//可以保存出来
filterRDD.saveAsTextFile("path/out/put")
sc.stop()

上方的代码初步完成需求,就是可以过滤缺失值超过3个的记录条数。

第二个问题是,针对原始数据,某一个字段为空,就删除该条数据

最后需要知道删除的记录数

前面都是一样的

定义conf sc(SparkContext(conf))

读取文件,去除第一列

filter {x = > ???}

主要是这个方法

1
2
3
4
lines.filter{lines => 
val arr = lines.split(",")
!(arr(6) == "" || arr(10)=="" || arr(11)=="")
}

这里的arr(6),是因为arr是被切割成的数组,6是数组的值需要判断的条件,这里是判断是否为空