封面是在凌晨三点时,第一次使用kafka采集到的maxwell数据
记忆
当kafka为sink时
1 2 3 4
| a1.sinks.k1.type=org.apache.flume.kafka.KafkaSink a1.sinks.k1.kafka.bootstrap.server=master:9092 a1.sinks.k1.kafka.topic=order a1.sinks.k1.kafka.producer.acks=1
|
当HDFS为sink时
1 2 3 4
| a1.sinks.k2.type=hdfs a1.sinks.k2.hdfs.path=hdfs://master:8020/user/test/flumebackup a1.sinks.k2.hdfs.fileType=DataStream a1.sinks.k2.hdfs.whiteFormat=Text
|
创建主题
1
| kafka-topics.sh --bootstrap-server master:9092 --create --partitions 1 --replication-factor 3 --topic order
|
查看kafka的数据
1
| kafka-console-consumer.sh --bootstrap-server master:9092 --topic order --from-beginning --max-messages 2
|
学习
任务一:实时数据采集
1、 在Master节点使用Flume采集实时数据生成器10050端口的socket数据,将数据存入到Kafka的Topic中(Topic名称为order,分区数为4),使用Kafka自带的消费者消费order(Topic)中的数据,将前2条数据的结果截图粘贴至对应报告中;
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| a1.sources=r1 a1.sinks=k1 a1.channels=c1
a1.sources.r1.type=netcat a1.sources.r1.bind=master a1.source.r1.port=10050
a1.sinks.k1.type=org.apache.flume.kafka.KafkaSink a1.sinks.k1.kafka.bootstrap.server=master:9092 a1.sinks.k1.kafka.topic=order a1.sinks.k1.kafka.producer.acks=1
a1.channels.c1.type=memory a1.channels.c1.capacity=1000 a1.channels.c1.transaction=100
a1.sinks.k1.channels=c1 a1.sources.r1.channels=c1
|
2.从指定端口采集数据
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| a1.sources=r1 a1.sinks=k1 a1.channels=c1 #描述和配置source,这里的a表示agent的名字 #第一步:配置source a1.sources.r1.type=netcat a1.sources.r1.bind=192.168.88.100 a1.sources.r1.port=44444 #配置logger a1.sinks.k1.type=logger #配置channel a1.channels.c1.type=memory #三者串联 a1.sources.r1.channels=c1 a1.sinks.k1.channel=c1
|
3.使用Flume采集26001端口的socket数据,将数据存入到Kafka的Topic中(topic名称为order,分区数为2),将Flume配置粘贴在报告中
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| a1.sources = r1 a1.sinks = k1 a1.channels = c1 # a1.sources.r1.type = netcat a1.sources.r1.bind = localhost a1.sources.r1.port = 26001 # a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink a1.sinks.k1.kafka.bootstrap.servers = master:9092 a1.sinks.k1.kafka.topic = order a1.sinks.k1.kafka.producer.acks = 1
# 使用一个通道缓冲内存中的事件 a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # 将source和sink绑定到通道 a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
|
4.Flume接收数据注入kafka 的同时,将数据备份到HDFS目录/user/test/flumebackup下,将备份结果截图粘贴至对应报告中。
注意:先启动flume,然后启动脚本 结束时先关闭脚本然后关闭flume
第一步:先去看环境是否存在order这个主题
如果不存在需要自行创建
创建主题
1
| kafka-topics.sh --bootstrap-server master:9092 --create --partitions 1 --replication-factor 3 --topic order
|
查看kafka的数据
1
| kafka-console-consumer.sh --bootstrap-server master:9092 --topic order --from-beginning --max-messages 2
|
创建Flume任务
3.1 创建flume任务
socket-flume-kafka.conf
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
| # 命名这个代理上的组件 a1.sources = r1 a1.sinks = k1 k2 a1.channels = c1 c2
a1.sources.r1.type=netcat a1.sources.r1.bind=master a1.sources.r1.port=26001
# 描述接收器 a1.sinks.k1.type=org.apache.flume.kafka.KafkaSink a1.sinks.k1.kafka.bootstrap.servers=master:9092 a1.sinks.k1.kafka.topic=order a1.sinks.k1.kafka.producer.acks-=1
# 使用一个通道缓冲内存中的事件 a1.channels.c1.type=memory a1.channels.c1.capacity=1000 a1.channels.c1.transactionCapcity=100
# 将source和sink绑定到通道 a1.sources.r1.channels=c1 a1.sinks.k1.channels=c1
a1.sinks.k2.type=hdfs a1.sinks.k2.hdfs.path=hdfs://master:8020/user/test/flumebackup a1.sinks.k2.hdfs.fileType=DataStream a1.sinks.k2.hdfs.whiteFormat=Text
a1.channels.c2.type=memory a1.channels.c2.capcity=1000 a1.channels.c2.transactionCapacity=100
a1.sources.r1.channels=c2 a1.sink.k1.channels=c2
|