封面是在凌晨三点时,第一次使用kafka采集到的maxwell数据

记忆

当kafka为sink时

1
2
3
4
a1.sinks.k1.type=org.apache.flume.kafka.KafkaSink
a1.sinks.k1.kafka.bootstrap.server=master:9092
a1.sinks.k1.kafka.topic=order
a1.sinks.k1.kafka.producer.acks=1

当HDFS为sink时

1
2
3
4
a1.sinks.k2.type=hdfs
a1.sinks.k2.hdfs.path=hdfs://master:8020/user/test/flumebackup
a1.sinks.k2.hdfs.fileType=DataStream
a1.sinks.k2.hdfs.whiteFormat=Text

创建主题

1
kafka-topics.sh --bootstrap-server master:9092 --create --partitions 1 --replication-factor 3 --topic order

查看kafka的数据

1
kafka-console-consumer.sh --bootstrap-server master:9092 --topic order --from-beginning --max-messages 2

学习

任务一:实时数据采集

1、 在Master节点使用Flume采集实时数据生成器10050端口的socket数据,将数据存入到Kafka的Topic中(Topic名称为order,分区数为4),使用Kafka自带的消费者消费order(Topic)中的数据,将前2条数据的结果截图粘贴至对应报告中;

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
a1.sources=r1
a1.sinks=k1
a1.channels=c1

a1.sources.r1.type=netcat
a1.sources.r1.bind=master
a1.source.r1.port=10050

a1.sinks.k1.type=org.apache.flume.kafka.KafkaSink
a1.sinks.k1.kafka.bootstrap.server=master:9092
a1.sinks.k1.kafka.topic=order
a1.sinks.k1.kafka.producer.acks=1

a1.channels.c1.type=memory
a1.channels.c1.capacity=1000
a1.channels.c1.transaction=100

a1.sinks.k1.channels=c1
a1.sources.r1.channels=c1

2.从指定端口采集数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
a1.sources=r1
a1.sinks=k1
a1.channels=c1
#描述和配置source,这里的a表示agent的名字
#第一步:配置source
a1.sources.r1.type=netcat
a1.sources.r1.bind=192.168.88.100
a1.sources.r1.port=44444
#配置logger
a1.sinks.k1.type=logger
#配置channel
a1.channels.c1.type=memory
#三者串联
a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1

3.使用Flume采集26001端口的socket数据,将数据存入到Kafka的Topic中(topic名称为order,分区数为2),将Flume配置粘贴在报告中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
a1.sources = r1 
a1.sinks = k1
a1.channels = c1
#
#描述/配置源
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 26001
#
# # 描述接收器
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.bootstrap.servers = master:9092
a1.sinks.k1.kafka.topic = order
a1.sinks.k1.kafka.producer.acks = 1

# 使用一个通道缓冲内存中的事件
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# source和sink绑定到通道
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

4.Flume接收数据注入kafka 的同时,将数据备份到HDFS目录/user/test/flumebackup下,将备份结果截图粘贴至对应报告中。

注意:先启动flume,然后启动脚本 结束时先关闭脚本然后关闭flume

第一步:先去看环境是否存在order这个主题

如果不存在需要自行创建

创建主题

1
kafka-topics.sh --bootstrap-server master:9092 --create --partitions 1 --replication-factor 3 --topic order

查看kafka的数据

1
kafka-console-consumer.sh --bootstrap-server master:9092 --topic order --from-beginning --max-messages 2

创建Flume任务

3.1 创建flume任务

socket-flume-kafka.conf

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
# 命名这个代理上的组件
a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1 c2

a1.sources.r1.type=netcat
a1.sources.r1.bind=master
a1.sources.r1.port=26001

# 描述接收器
a1.sinks.k1.type=org.apache.flume.kafka.KafkaSink
a1.sinks.k1.kafka.bootstrap.servers=master:9092
a1.sinks.k1.kafka.topic=order
a1.sinks.k1.kafka.producer.acks-=1

# 使用一个通道缓冲内存中的事件
a1.channels.c1.type=memory
a1.channels.c1.capacity=1000
a1.channels.c1.transactionCapcity=100

# source和sink绑定到通道
a1.sources.r1.channels=c1
a1.sinks.k1.channels=c1

a1.sinks.k2.type=hdfs
a1.sinks.k2.hdfs.path=hdfs://master:8020/user/test/flumebackup
a1.sinks.k2.hdfs.fileType=DataStream
a1.sinks.k2.hdfs.whiteFormat=Text

a1.channels.c2.type=memory
a1.channels.c2.capcity=1000
a1.channels.c2.transactionCapacity=100

a1.sources.r1.channels=c2
a1.sink.k1.channels=c2