[一] 环境搭建

Docker

本节只是使本机具有可以操作的基础环境,非竞赛需要掌握的内容。

CentOS 安装Docker

  1. Docker要求CentOS 系统的内核版本高于 3.10

    可以通过 uname -r 查看内核版本

  2. 确保yum包更新到最新

    sudo yum update

  3. 安装需要的软件包

    (yum-util 提供yum-config-manager功能,另两个是devicemapper驱动依赖)

    1
    yum install -y yum-utils device-mapper-persistent-data lvm2

使用官方安装脚本自动安装

安装命令如下

1
curl -fsSL https://get.docker.com | bash -s docker --mirror Aliyun

启动 Docker 并设置开机自启

1
2
systemctl start docker
systemctl enable docker

Docker 镜像加速

国内从 DockerHub 拉取镜像有时会遇到困难,此时可以配置镜像加速器。Docker 官方和国内很多云服务商都提供了国内加速器服务,例如:

/etc/docker/daemon.json 中写入如下内容(如果文件不存在请新建该文件):

1
{"registry-mirrors":["https://hub-mirror.c.163.com/"]}

之后重新启动服务:

1
2
sudo systemctl daemon-reload
sudo systemctl restart docker

使用命令查看是否配置成功

1
2
3
$ docker info
Registry Mirrors:
https://reg-mirror.qiniu.com

为什么我换完还是慢 X﹏X

Docker 安装CentOS

  1. 拉取指定版本的 CentOS 镜像

    1
    docker pull centos:centos7
  2. 查看本地镜像

    使用以下命令来查看是否已安装了centos7:

    1
    docker images
  3. 运行容器,并且可以通过exec命令进入CentOS容器

    1
    docker run -itd --name centos-test centos:centos7
  4. 安装成功

    我们可以使用 docker ps 命令查看容器的运行信息

Docker 镜像使用

列出镜像列表

使用 docker images

各个选项说明:

  • REPOSITORY:表示镜像的仓库源
  • TAG:镜像的标签
  • IMAGE ID:镜像ID
  • CREATED:镜像创建时间
  • SIZE:镜像大小

镜像删除使用docker rmi命令

Hello World

1
docker run centos:centos7 /bin/echo "Hello World"

进入容器

exec 命令

1
docker exec -it name /bin/bash

创建容器集群

创建master节点

1
不会

参数说明

  • -h 为容器设置主机名
  • -name 设置容器的名称
  • -d 在后台运行

shell 脚本

本章节的脚本只是为了方便搭建,无需掌握。

xsync

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
#! /bin/bash

#1.判断参数个数
if [ $# -lt 1 ]
then
echo Not Enough Arguement!
exit;
fi

#2.遍历集群所有机器
for host in master slave1 slave2
do
echo ========== $host ==========
#3.遍历所有目录,挨个发送
for file in $@
do
# 判断文件是否存在
if [ -e $file ]
then
#5.获取父目录
pdir=$(cd -P $(dirname $file); pwd)

#6.获取当前文件的名称
fname=$(basename $file)
# 创建父目录
ssh $host "mkdir -p $pdir"
rsync -av $pdir/$fname $host:$pdir
else
echo $file does not exists!
fi
done
done
echo " ╭︿︿︿╮"
echo " { @ @ }"
echo " ( (oo) ) "
echo " ︶︶︶"
echo "( ̄▽ ̄)V Pig: It's OK!"

jpsall

1
2
3
4
5
6
#!/bin/bash
for host in master slave1 slave2
do
echo =============== $host ===============
ssh $host jps
done

Hadoop

JDK 部署

Hadoop 部署

  1. /opt/software中解压hadoop至 /opt/module

    1
    tar -zxvf /opt/software/hadoop-3.1.3.tar.gz -C /opt/module/
  2. 修改文件名称,配置hadoop环境变量

    1
    2
    3
    4
    [root@master module]# mv hadoop-3.1.3 hadoop
    [root@master module]# ls
    hadoop jdk
    [root@master module]# vim /etc/profile.d/my_env.sh

    配置内容:

    1
    2
    3
    # HADOOP_HOME
    export HADOOP_HOME=/opt/module/hadoop
    export PATH=${HADOOP_HOME}/bin:$PATH

    source 并查看hadoop版本:

    1
    2
    3
    [root@master module]# source /etc/profile
    [root@master module]# hadoop version
    Hadoop 3.1.3
  3. 配置Hadoop相关配置文件

    配置 核心配置文件 core-site.xml

    文件内容:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    <configuration>
    <property>
    <name>fs.defaultFS</name>
    <value>hdfs://master:8020</value>
    </property>
    <property>
    <name>hadoop.tmp.dir</name>
    <value>/opt/module/hadoop/data</value>
    </property>
    </configuration>

    配置 HDFS配置文件 hdfs-site.xml

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    <configuration>
    <!-- nn web 端访问地址-->
    <property>
    <name>dfs.namenode.http-address</name>
    <value>master:9870</value>
    </property>
    <property>
    <name>dfs.replication</name>
    <value>1</value>
    </property>
    </configuration>

    配置 YARN配置文件 yarn-site.xml

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    <configuration>
    <!-- 指定 MR 走 shuffle -->
    <property>
    <name>yarn.nodemanager.aux-services</name>
    <value>mapreduce_shuffle</value>
    </property>
    <!-- 指定 ResourceManager 的地址-->
    <property>
    <name>yarn.resourcemanager.hostname</name>
    <value>master</value>
    </property>
    </configuration>

    配置 Mapreduce配置文件 mapred-site.xml

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    <configuration>
    <!-- 指定 MapReduce 程序运行在 Yarn 上 -->
    <property>
    <name>mapreduce.framework.name</name>
    <value>yarn</value>
    </property>
    <!-- 指定历史端地址 -->
    <property>
    <name>mapreduce.jobhistory.address</name>
    <value>master:10020</value>
    </property>
    <!-- 指定web历史端 -->
    <property>
    <name>mapreduce.jobhistory.webapp.address</name>
    <value>master:19888</value>
    </property>
    </configuration>

    配置 works

    1
    2
    3
    master
    slave1
    slave2

    分别配置 hadoop-env.sh mapred-env.sh yarn-env.sh 向其添加JAVE_HOME

    1
    export JAVA_HOME=/opt/module/jdk
  4. 将Hadoop拷贝至slave1和slave2节点

    1
    2
    3
    4
    [root@master hadoop]# scp -r /opt/module/hadoop/
    slave1:/opt/module/hadoop/
    [root@master hadoop]# scp -r /opt/module/hadoop/
    slave2:/opt/module/hadoop/
  5. 格式化namenode

    1
    hdfs namenode -format
  6. /etc/profile下配置hdfs用户和yarn的用户

    配置内容:

    1
    2
    3
    4
    5
    export HDFS_NAMENODE_USER=root
    export HDFS_DATANODE_USER=root
    export HDFS_SECONDARYNAMENODE_USER=root
    export YARN_RESOURCEMANAGER_USER=root
    export YARN_NODEMANAGER_USER=root

    source /etc/profile 使配置文件生效

HIVE

MySQL 部署

MysQL数据库部署

  1. 卸载mariadb

  2. 通过rpm命令按顺序安装mysql

  3. 通过命令启动MySQL

    1
    service mysqld start
  4. 查看临时密码并通过临时密码登陆mysql

    1
    grep 'password' /var/log/mysqld.log
  5. mysql 修改密码并开启远程访问

    1
    2
    3
    4
    5
    6
    7
    set global validate_password_policy=0;
    set global validate_password_length=1;

    set password=password('123456')

    grant all privileges on *.* to 'root'@'%' identified by '123456';
    flush privileges;

    退出后可以使用自己设置的密码登陆mysql测试

Hive 部署

  1. 解压hive,并重命名为hive,将hive配置到环境变量

  2. 环境变量配置内容:

    1
    2
    3
    # HIVE_HOME
    export HADOOP_HOME=/opt/module/hive
    export PATH=${HIVE_HOME}/bin:$PATH
  3. 将jdbc驱动拷贝到hive的lib目录下

  4. 在hive的conf目录下新建 hive-site.xml文件

    内容如下:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    <?xml version="1.0"?>
    <?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
    <configuration>
       <!-- 存储元数据mysql相关配置 /etc/hosts -->
       <property>
           <name>javax.jdo.option.ConnectionURL</name>
           <value> jdbc:mysql://master:3306/hive?useSSL=false</value>
       </property>
       <property>
           <name>javax.jdo.option.ConnectionDriverName</name>
           <value>com.mysql.jdbc.Driver</value>
       </property>
       <property>
           <name>javax.jdo.option.ConnectionUserName</name>
           <value>root</value>
       </property>
       <property>
           <name>javax.jdo.option.ConnectionPassword</name>
           <value>root</value>
       </property>
    <!-- H2S运行绑定host -->
    <property>
           <name>hive.server2.thrift.bind.host</name>
           <value>master</value>
       </property>
    <!-- 远程模式部署matestore metastore地址 -->
    <property>
    <name>hive.matestore.uris</name>
    <value>thrift://master:9083</value>
    </property>
    <property>
    <name>hive.matestore.event.db.notification.api.auth</name>
    <value>false</value>
    </property>
    </configuration>
  5. 解决jar包冲突,删除hive中低版本guava,将hadoop中高版本guava拷贝过去。

    1
    2
    3
    4
    [root@master hive]# cd lib/
    [root@master lib]# rm -rf guava-19.0.jar
    [root@master lib]# cp /opt/module/hadoop/share/hadoop/common/lib/guava-27.0-jre.jar /opt/module/hive/lib/
    [root@master lib]#
  6. 初始化元数据库:

    登陆MySQL,创建hive的元数据库

    1
    2
    mysql> create database hive;
    Query OK, 1 row affected (0.03 sec)

    初始化源数据库

    1
    schematool --initSchema -dbType mysql -verbose

    初始化成功后会在控制台最后输出如下信息并且会在mysql的hive数据库创建74张表

    1
    2
    3
    beeline> 
    beeline> Initialization script completed
    schemaTool completed

启动hive客户端,查看数据库:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
[root@master lib]# hive
which: no hbase in (/opt/module/hive/bin:/opt/module/hadoop/sbin:/opt/module/hadoop/bin:/opt/module/jdk/bin:/opt/module/hadoop/sbin:/opt/module/hadoop/bin:/opt/module/jdk/bin:/opt/module/hadoop/sbin:/opt/module/hadoop/bin:/opt/module/jdk/bin:/opt/module/hadoop/bin:/opt/module/jdk/bin:/opt/module/hadoop/bin:/opt/module/jdk/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/root/bin)
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/opt/module/hive/lib/log4j-slf4j-impl-2.10.0.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/opt/module/hadoop/share/hadoop/common/lib/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory]
Hive Session ID = 59e91f91-ac2b-4bba-bd58-2a7c838a5b89

Logging initialized using configuration in jar:file:/opt/module/hive/lib/hive-common-3.1.2.jar!/hive-log4j2.properties Async: true
Hive Session ID = 1a054352-400a-4f00-9af5-290e0b70640d
Hive-on-MR is deprecated in Hive 2 and may not be available in the future versions. Consider using a different execution engine (i.e. spark, tez) or using Hive 1.X releases.
hive> show databases;
OK
default
Time taken: 1.278 seconds, Fetched: 1 row(s)
hive>

Spark

  1. 解压Spark安装包,重命名Spark并配置环境变量

    环境变量配置内容

    1
    2
    3
    # SPARK_HOME
    export SPARK_HOME=/opt/module/spark
    export PATH=${SPARK_HOME}/bin:$PATH
  2. source 使环境变量生效

    1
    source /etc/profile
  3. 重命名 works.template 文件并配置计算节点:

    配置内容:

    1
    2
    3
    master
    slave1
    slave2
  4. 将spark-env.sh.template修改为spark-env.sh并且添加如下内容:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    export JAVA_HOME=/opt/module/jdk
    export HADOOP_HOME=/opt/module/hadoop
    export SPARK_MASTER_IP=master
    export SPARK_MASTER_PORT=7077
    export SPARK_DIST_CLASSPATH=$(/opt/module/hadoop/bin/hadoop
    classpath)
    export HADOOP_CONF_DIR=/opt/module/hadoop/etc/hadoop
    export SPARK_YARN_USER_ENV="CLASSPATH=/opt/module/hadoop/etc/hadoop"
    export YARN_CONF_DIR=/opt/module/hadoop/etc/hadoop
  5. 将spark拷贝到其他两个节点

  6. 启动spark并查看进程

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    [root@master sbin]# 
    [root@master sbin]# pwd
    /opt/module/spark/sbin
    [root@master sbin]# ./start-all.sh
    starting org.apache.spark.deploy.master.Master, logging to /opt/module/spark/logs/spark-root-org.apache.spark.deploy.master.Master-1-master.out
    slave1: starting org.apache.spark.deploy.worker.Worker, logging to /opt/module/spark/logs/spark-root-org.apache.spark.deploy.worker.Worker-1-slave1.out
    slave2: starting org.apache.spark.deploy.worker.Worker, logging to /opt/module/spark/logs/spark-root-org.apache.spark.deploy.worker.Worker-1-slave2.out
    master: starting org.apache.spark.deploy.worker.Worker, logging to /opt/module/spark/logs/spark-root-org.apache.spark.deploy.worker.Worker-1-master.out
    [root@master sbin]# jpsall
    =============== master ===============
    29361 SecondaryNameNode
    29156 DataNode
    33748 Worker
    28981 NameNode
    33813 Jps
    29784 NodeManager
    33676 Master
    29631 ResourceManager
    =============== slave1 ===============
    16469 Worker
    16518 Jps
    15623 DataNode
    15736 NodeManager
    =============== slave2 ===============
    14536 DataNode
    15368 Jps
    14649 NodeManager
    15309 Worker
    [root@master sbin]#

    运行SparkPi程序测试spark

    1
    [root@master spark]# spark-submit --class org.apache.spark.examples.SparkPi --master spark://master:7077 examples/jars/spark-examples_2.12-3.1.3.jar 

解压flink修改flink并配置环境变量

配置内容:

1
2
3
# FLINK_HOME
export FLINK_HOME=/opt/module/flink
export PATH=${FLINK_HOME}/bin:$PATH

记得source /etc/profile

修改flink-conf.yaml配置文件

修改内容

1
jobmanager.rpc.address:master

修改works配置文件:

1
2
3
master
slave1
slave2

将flink分发到slave1和slave2

启动flink

1
start-cluster.sh

提交jar包测试flink

1
flink run -m master:8081 /opt/module/flink/examples/batch/WordCount.jar

注意:运行flink要关闭spark standalone集群

FlinkWeb端界面(http://192.168.88.100:8081/#/overview)

Flume

解压Flume修改文件名称并配置环境变量

配置环境变量内容

1
2
3
# FLUME_HOME
export FLUME_HOME=/opt/module/flume
export PATH=${FLUME_HOME}/bin:$PATH

如果后期要sink到hdfs的话,会有版本冲突

​ 删除flume和hadoop冲突的jar包

rm -rf /opt/module/flume/lib/guava-11.0.2.jar

​ 修改log4j配置文件

vim /opt/module/flume/conf/log4j.properties

​ 修改内容为:

flume.log.dir=/opt/module/flume/logs

ZooKeeper

解压zookeeper修改文件名并配置环境变量

环境变量修改内容:

1
2
3
# ZOOKEEPER_HOME
export ZOOKEEPER_HOME=/opt/module/zookeeper
export PATH="$ZOOKEEPER_HOME/bin:$PATH"

source /etc/profile使环境变量生效

配置服务器编号:

在zookeeper目录下创建zkData

在zkData下vim一个新文件,文件名为myid

myid文件内容:

1
0

重命名zoo_sample.cfg为zoo.cfg,并且修改文件内容:

1
2
3
4
5
dataDir=/opt/module/zookeeper/zkData
# 设置zk内部通信地址和选举端口
server.0=master:2888:3888
server.1=slave1:2888:3888
server.2=slave2:2888:3888

分发zookeeper到slave1和slave2

分别修改两台机器上zkData内的myid内容为1和2

Kafka

解压重命名Kafka,并且配置环境变量

1
2
3
# KAFKA_HOME
export KAFKA_HOME=/opt/module/kafka
export PATH=${KAFKA_HOME}/bin:$PATH

修改kafka的server.properties配置文件

1
2
3
4
5
6
# 唯一值
broker.id=0
# kafka启动会自动创建
log.dirs=/opt/module/kafka/logs
# zk连接
zookeeper.connect=master:2181,slave1:2181,slave2:2181/kafka

将kafka拷贝到slave1和slave2节点,并且修改slave1和slave2的broker.id的值

三台启动Kafka

1
kafka-server-start.sh -daemon kafka/config/server.properties

MaxWell

解压重命名maxwell,并配置环境变量:

1
2
3
# MAXWELL_HOME
export MAXWELL_HOME=/opt/module/maxwell
export PATH=${MAXWELL_HOME}/bin:$PATH

修改MySQL相关配置:

启用MySQL Binlog,进行数据同步要先开启BinLog

修改MySQL配置文件 /etc/my.cnf

1
2
3
4
5
6
7
8
#数据库id
server-id = 1
#启动binlog,该参数的值会作为binlog的⽂件名
log-bin=mysql-bin
#binlog类型,maxwell要求为row类型
binlog_format=row
#启⽤binlog的数据库,需根据实际情况作出修改
binlog-do-db=ds_pub

在mysql中创建出数据库 ds_pub

重启MySQL服务

1
ststemctl restart mysqld

Redis

Redis 部署安装

将安装包导入服务器,解压,重命名

使用gcc --version 查看服务器是否有gcc环境,如果没有使用yum指令进行联网下载

1
yum install gcc

进入redis目录执行下列步骤,对文件进行编译 安装

1
2
3
cd redis
make
make install

Redis 两种启动方式

1.前置启动
安装目录下启动:

1
redis/bin/redis-server

在redis的安装目录的bin目录,./redis-server
默认是前端启动模式,端口是6379
缺点: ctrl+c停止前端模式的redis 则关闭redis(不推荐)

2.后台启动(守护进程)
回到Redis的安装目录中,复制一份redis.conf方便操作

1
cp redis.conf /etc/redis.conf

修改配置文件·vim redis.conf
命令模式下搜索 /daemonize 改成yes (让服务在后台启动)
回到安装目录 进行服务启动 需要指定文件启动

1
2
redis-server /etc/redis.conf
ps-ef |grep redis //查找redis的进程 发现已启动 端口号为6379

好处:关闭窗口依然可以运行redis服务,因此推荐使用

1
redis-cli //连接客户端

Redis的关闭

1
2
shutdown
exit

sqoop

SQL-to-Hadoop

安装Sqoop的前提是已经具备Java和Hadoop的环境。

1)下载并解压

2)修改配置文件

  1. 首先重命名sqoop-env-template.sh文件

    1
    mv sqoop-env-template.sh sqoop-env.sh
  2. 在sqoop-env.sh中添加各组件的路径

    1
    2
    3
    4
    5
    6
    export HADOOP_COMMON_HOME=/opt/module/hadoop-2.7.2
    export HADOOP_MAPRED_HOME=/opt/module/hadoop-2.7.2
    export HIVE_HOME=/opt/module/hive
    export ZOOKEEPER_HOME=/opt/module/zookeeper-3.4.10
    export ZOOCFGDIR=/opt/module/zookeeper-3.4.10
    export HBASE_HOME=/opt/module/hbase
  3. 拷贝MySQL的驱动包到Sqoop的lib目录下:

    1
    cp ./mysql-connector-java-5.1.47.jar /opt/modules/sqoop/lib/
  4. 验证sqoop是否安装成功

    1
    bin/sqoop help

    Sqoop的导入导出

    测试连接数据库

    1
    bin/sqoop list-databases --connect jdbc:mysql://hadoop100:3306/ --username root --password root

HBase

HBase安装部署

首先保证

Zookeeper集群正常部署,并且启动

Hadoop集群正常部署,并且启动

开始部署

HBase的解压

  1. 传输并解压,配置环境变量,分发集群

    1
    2
    3
    #HABSE_HOME
    export HABSE_HOME=/opt/module/hbase
    export PATH=$PATH:$HBASE_HOME/bin

修改配置文件

hbase-env.sh

找到下方内容(靠下)

1
2
# Tell HBase whether it should manage it's own instance of ZooKeeper or not.
# export HBASE_MANAGES_ZK=true

修改为

1
2
# Tell HBase whether it should manage it's own instance of ZooKeeper or not.
export HBASE_MANAGES_ZK=false

配置 hbase-site.xml

hbase.cluster.distributed的false修改为true,其余property删除,如下:

1
2
3
4
<property>
<name>hbase.cluster.distributed</name>
<value>true</value>
</property>

然后添加:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
<property>
<name>hbase.rootdir</name>
<value>hdfs://hadoop100:8020/hbase</value>
</property>
<property>
<name>hbase.master.info.port</name>
<value>16010</value>
</property>
<property>
<name>hbase.wal.provider</name>
<value>filesystem</value>
</property>
<property>
<name>hbase.zookeeper.quorum</name>
<value>master,slave1,slave2</value>
</property>
<property>
<name>hbase.zookeeper.property.dataDir</name>
<value>/opt/module/hbase/zookeeper</value>
</property>

修改 regionservers:

1
2
3
master
slave1
slave2

Hbase服务的启动

1)单点启动

1
2
bin/hbase-daemon.sh start master
bin/hbase-daemon.sh start regionserver

2)群启

1
bin/start-hbase.sh

3)对应的停止服务

1
bin/stop-hbase.sh

查看Hbase页面

http://master:16010/

[二]数据抽取

实时数据抽取

Maxwell数据抽取

修改Maxwell配置文件 config.peoperties

1
2
3
4
5
6
7
8
9
log_level=info
producer=kafka
kafka.bootstrap.servers=master:9092,slave1:9092,slave1:9092
kafka_topic=maxwell
# mysql login info
host=master
user=root
password=123456
jdbc_options=useSSL=false&serverTimezone=Asia/Shanghai

启动maxwell

1
2
3
4
5
6
[root@master maxwell]# /opt/module/maxwell/bin/maxwell - config
/opt/module/maxwell/config.properties - daemon
Redirecting STDOUT to
/opt/module/maxwell/bin/ . /logs/MaxwellDaemon.out
Using kafka version: 1.0.0
[root@master maxwell]#

在kafka中创建topic:

1
2
3
4
5
[root@master maxwell]# kafka-topics.sh - bootstrap-server
master:9092 - partitions 1 - replication-factor 1 - create -topic
maxwell
Created topic maxwell.
[root@master maxwell]#

启动 kafka-console-consumer 消费数据:

1
kafka-console-consumer.sh --bootstrap-server master:9092 --topic maxwell

在mysql中执行source命令,往数据库中添加数据:

1
2
3
mysql> create database ds_pub;
mysql> use ds_pub;
mysql> source /opt/data/ds_pub.mysql

结果:kafka的console-consumer消费到MySQL数据库中导入的数据

1
2
{"database":"ds_pub","table":"user_info","type":"insert","ts":1679249454,"xid":5038,"commit":true,"data":{"id":4568,"login_name":"caph84pl6cmo","nick_name":"竹霭","passwd":null,"name":"钟离婷姣","phone_num":"13115495645","email":"caph84pl6cmo@0355.net","head_img":null,"user_level":"2","birthday":"1985-04-26","gender":"F","create_time":"2020-04-26 18:57:55","operate_time":"2020-04-26 06:47:39"}}
{"database":"ds_pub","table":"user_info","type":"insert","ts":1679249454,"xid":5039,"commit":true,"data":{"id":4579,"login_name":"n54zvb14","nick_name":"林有","passwd":null,"name":"曹克","phone_num":"13855733659","email":"n54zvb14@yahoo.com.cn","head_img":null,"user_level":"2","birthday":"1977-04-26","gender":"M","create_time":"2020-04-26 18:57:55","operate_time":"2020-04-26 23:27:15"}}

端口日志数据采集

编写Flume.conf配置⽂件,配置要采集的端⼝

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
# # 命名这个代理上的组件
a1.sources = r1
a1.sinks = k1
a1.channels = c1
#
#描述/配置源
a1.sources.r1.type = exec
a1.sources.r1.command = nc master 26001
#
# # 描述接收器
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.bootstrap.servers = master:9092
a1.sinks.k1.kafka.topic = order
a1.sinks.k1.kafka.producer.acks = 1
# 使⽤⼀个通道缓冲内存中的事件
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# source和sink绑定到通道
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

启动flume采集端口日志:

1
2
[root@master conf]# flume-ng agent - name a1 - conf
/opt/module/flume/conf/ - conf-file /opt/module/flume/conf/read_socket_write_kafka.conf -Dflume.root.logger=INFO,console

开启kafka的console-consumer:在命令⾏消费数据

1
2
[root@master ~]# kafka-console-consumer.sh --bootstrap-server
master:9092,slave1:9092,slave2:9092 --from-beginning --topic order

开启⽇志端⼝脚本,往端⼝实时⽣成数据:

1
2
3
4
5
[root@master ~]# cd /opt/data/
[root@master data]# ls
ds_pub.sql order.sh order.txt
[root@master data]# chmod -R 777 /opt/data/
[root@master data]# sh order.sh | nc -lk 26001

结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
```



## 离线数据抽取

参考教程

- [SparkSQL_哔哩哔哩_bilibili](https://www.bilibili.com/video/BV1WA411273z?p=1&vd_source=182abcf60a2ab4e639ece11d9932680a)

配置项目的pom文件

```xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

<modelVersion>4.0.0</modelVersion>
<groupId>com.example.sparksql</groupId>
<artifactId>sparksql-example</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<scala.version>2.12</scala.version>
<mysqlconnect.version>5.1.47 </mysqlconnect.version>
<spark.version>3.1.1 </spark.version>
<hive.version>2.3.6 </hive.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.spark </groupId>
<artifactId>spark-core_${scala.version} </artifactId>
<version>${spark.version} </version>
</dependency>
<dependency>
<groupId>org.apache.spark </groupId>
<artifactId>spark-sql_${scala.version}</artifactId>
<version>${spark.version} </version>
</dependency>
<dependency>
<groupId>org.apache.spark </groupId>
<artifactId>spark-hive_${scala.version} </artifactId>
<version>${spark.version} </version>
</dependency>
<dependency>
<groupId>mysql </groupId>
<artifactId>mysql-connector-java </artifactId>
<version>${mysqlconnect.version} </version>
</dependency>
<dependency>
<groupId>org.apache.hive </groupId>
<artifactId>hive-exec </artifactId>
<version>2.3.7 </version>
</dependency>
</dependencies>

</project>