MySQLtoHive数据增量抽取

Demo1

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
package top.wsczh.test

import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession

import java.util.Properties

object MysqlToODSTestDS {
def main(args: Array[String]): Unit = {
/** * 抽取shtd_store库中user_info的增量数据进入Hive的ods库中表user_info。
* 根据ods.user_info表中operate_time或create_time作为增量字段(
* 即MySQL中每条数据取这两个时间中较大的那个时间作为增量字段去和ods里的这两个 字段中较大的时间进行比较),
* 只将新增的数据抽入,字段名称、类型不变,同时添加静态分区,分区字段类型为 String,
* 且值为当前比赛日的前一天日期(分区字段格式为yyyyMMdd)。
* 使用hive cli执行show partitions ods.user_info命令,将结果截图粘贴至对应报 告中;
*/
Logger.getLogger("org").setLevel(Level.ERROR)
val warehouse = "hdfs://hadoop102:9000/hive/warehouse/home"
val conf = new SparkConf().setMaster("local[*]").setAppName("read mysql write hive")
.set("spark.testing.memory", "471859200")
val sparkSession = SparkSession.builder().enableHiveSupport().config(conf)
.config("spark.sql.warehouse.dir", warehouse).getOrCreate()


val MYSQLDBURL: String = "jdbc:mysql://192.168.198.13:3306/new_shtd_store?useUnicode=true&characterEncoding=utf-8" // mysql url地址
val properties: Properties = new Properties()
properties.put("user", "root") //用户名
properties.put("password", "password") // 密码
properties.put("driver", "com.mysql.jdbc.Driver") // 驱动名称

sparkSession.read.jdbc(MYSQLDBURL, "user_info", properties)
.createTempView("v")
//hive中的这两个时间的最大值 => hive 命令行查看 select max(operate_time) from ods.user_info ==>2023-12-11
// select max(create_time) from ods.user_info ==>2023-11-11
val maxTime = sparkSession.sql(
"""
|from ods.user_info
|select case
| when operate_time > create_time then operate_time
| when operate_time < create_time then create_time
| else create_time end as `incrementvalue`
|order by `incrementvalue` desc
|limit 1
|""".stripMargin).collect()(0).get(0).toString
println(s"最大时间 :${maxTime}")

sparkSession.sql(
s"""
|insert into ods.user_info partition(etldate="20230319")
|from (
| from v select *
| where operate_time > cast('$maxTime' as TIMESTAMP) or create_time > cast('$maxTime' as TIMESTAMP))
|""".stripMargin)
sparkSession.sql(
"""
|select * from ods.user_info limit 10
|""".stripMargin).show(false)
sparkSession.stop()

}
}

Demo2

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
package top.wsczh.test

import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession

import java.util.Properties

object MysqlToODSTestDS1 {
def main(args: Array[String]): Unit = {
/** * 抽取shtd_store库中user_info的增量数据进入Hive的ods库中表user_info。
* 根据ods.user_info表中operate_time
* 只将新增的数据抽入,字段名称、类型不变,同时添加静态分区,分区字段类型为 String,
* 且值为当前比赛日的前一天日期(分区字段格式为yyyyMMdd)。
* 使用hive cli执行show partitions ods.user_info命令,将结果截图粘贴至对应报 告中;
*/
Logger.getLogger("org").setLevel(Level.ERROR)
val warehouse = "hdfs://hadoop102:9000/hive/warehouse/home"
val conf = new SparkConf().setMaster("local[*]").setAppName("read mysql write hive")
.set("spark.testing.memory", "471859200")
val sparkSession = SparkSession.builder().enableHiveSupport().config(conf)
.config("spark.sql.warehouse.dir", warehouse).getOrCreate()


val MYSQLDBURL: String = "jdbc:mysql://192.168.198.13:3306/new_shtd_store?useUnicode=true&characterEncoding=utf-8" // mysql url地址
val properties: Properties = new Properties()
properties.put("user", "root") //用户名
properties.put("password", "password") // 密码
properties.put("driver", "com.mysql.jdbc.Driver") // 驱动名称

sparkSession.read.jdbc(MYSQLDBURL, "user_info", properties)
.createTempView("v")
//hive中的这两个时间的最大值 => hive 命令行查看 select max(operate_time) from ods.user_info ==>2023-12-11
// 增量字段是id
val maxTime: String = sparkSession.sql(
"""
|select max(operate_time) from ods.user_info
|""".stripMargin).collect()(0).get(0).toString
println(s"最大时间 :${maxTime}")
//将MySQL数据抽取到hive中
sparkSession.sql(
s"""
|insert into table ods.user_info partition(etldate="20230319")
| select * from v where operate_time > cast('$maxTime' as TIMESTAMP))
|""".stripMargin)
sparkSession.sql(
"""
|select * from ods.user_info where etldate='20230319' limit 10
|""".stripMargin).show()
sparkSession.stop()

}
}