运维开发网
广告位招商联系QQ:123077622
 
广告位招商联系QQ:123077622

zookeeper与卡夫卡集群搭建

运维开发网 https://www.qedev.com 2021-04-24 12:24 出处:51CTO 作者:Jack_jason
首先这片博客没有任何理论性的东西,只是详细说明kafka与zookeeper集群的搭建过程,需要三台linux服务器。 java环境变量设置 zookeeper集群搭建 kafka集群搭建 java环境变量设置 在每台服务器上都有设置java环境变量 这里使用java源码安装的方式: 下载源码包解压

首先这片博客没有任何理论性的东西,只是详细说明kafka与zookeeper集群的搭建过程,需要三台Linux服务器。

  • java环境变量设置
  • zookeeper集群搭建
  • kafka集群搭建

java环境变量设置

在每台服务器上都有设置java环境变量

这里使用java源码安装的方式:

下载源码包解压,放入到/usr/local/文件夹下,修改名目录名字为jdk!接下就是把java的命令参数加入到Linux的环境变量中。

[[email protected]][email protected]jdk/=$PATH:/usr/local/jdk/bin:/usr/local/jdk/jre/=/usr/local/jdk/jre/lib:/usr/local/jdk/lib:/usr/local/jdk/jre/lib/

若是以上均反馈正常,则说明java环境已经设置完毕!

zookeeper集群搭建

zookeeper集群被称为群组。zookeeper使用的是一致性协议,所以建议每个群组里应该包含奇数个节点,因为只有当群组中大多数节点处于可用状态,zookeeper才能处理外部请求。也就是说,如果一个包含3个节点的群组,那么它允许一个节点失效。如果包含5个节点,那么它允许2个节点失效。【不建议zookeeper群组超过7个节点,因为zookeeper使用了一致性协议,节点过多会降低群组的性能】

群组中的每个服务器需要在数据目录中创建一个myid文件,用于指明自己的ID。

这里zookeeper采用源码的方式安装:

[[email protected]]#  zxvf zookeeper-...gz -C ../         #解压/ zookeeper-.-..jar      zookeeper-.-maven  ivysettings.xml  lib      NOTICE.txt   README.txt            src      zookeeper-..jar.asc  zookeeper-. -p /data/ zoo_sample.cfg zoo.cfg                     #修改配置文件的名字,建议把原来的配置文件做备份

zookeeper的配置文件如下:

== not use /tmp  storage, /=/data/zookeeper/=/data/zookeeper/= you need to handle ====.::=.::=.::

客户端只需要通过clientport就能连接到群组,而群组节点间的通信则需要同时用到这三个端口。

 

【上面的操作,只是修改了配置文件,创建了数据目录和日志文件目录,还有就是解压了源码包!

下面要做的就是把上面操作在另外两台机器上做一遍,也可以把上面的文件直接通过scp命令复制到其余两台机器对应的位置上。】

同MySQL集群一样,集群每台服务器都需要一个唯一的标识,zookeeper也一样,需要一个myid文件,来标识每一台服务器!

[[email protected]]# /data/zookeeper/

另外两台的myid文件如下:

[[email protected]]# /data/zookeeper/

然后启动zookeeper集群:

#三台服务器都这样启动,会在当前目录执行路径下面生成nohup.out文件,若是报错的话,可以查看文件内容!
[[email protected]]# pwd
/usr/local/zookeeper/bin
[[email protected]]# ./zkServer./usr/local/zookeeper/bin/../conf/

三台服务器全部启动完成之后,进行如下验证:

[[email protected]]# ./zkCli. --server10..-- ::, [myid:] - INFO  [main:Environment@] - Client environment:zookeeper.version=.-, built on // :-- ::, [myid:] - INFO  [main:Environment@] - Client environment:host.name=-- ::, [myid:] - INFO  [main:Environment@] - Client environment:java.version=-- ::, [myid:] - INFO  [main:Environment@] - Client environment:java.vendor=-- ::, [myid:] - INFO  [main:Environment@] - Client environment:java.home=/usr/local/jdk/jdk1..0_79/-- ::, [myid:] - INFO  [main:Environment@] - Client environment:java.library.path=/usr/local/mysql/lib:/usr/java/packages/lib/amd64:/usr/lib64:/lib64:/lib:/usr/-- ::, [myid:] - INFO  [main:Environment@] - Client environment:java.io.tmpdir=/-- ::, [myid:] - INFO  [main:Environment@] - Client environment:java.compiler=

搭建卡夫卡集群

仍然采用源码的方式安装。安装过程和zookeeper差不多,解压,修改配置文件,创建对应消息目录。

[[email protected]]# tar -zxvf kafka_2.10-0.10.0.0.tgz -C ../         #解压[[email protected]]# cd ..
[[email protected]][email protected]   #修改名字
[[email protected]]# cd kafka/[[email protected]][email protected]ocs
[[email protected]][email protected]            #[[email protected]][email protected]               #kafka的配置文件有很多,但是目前我们只需要关注 server.propertiesconnect-console-sink.properties    connect-file-sink.properties    connect-standalone.properties  producer.properties     zookeeper.properties
connect-console-source.properties  connect-file-source.properties  consumer.properties            server.properties
connect-distributed.properties     connect-log4j.properties        log4j.properties               tools-log4j.properties

配置文件如下:把其中的注释删掉了!

broker.id=3                            #在每个kafka服务器中唯一的,是一个整数port=9092host.name=10.0.102.179auto.create.topics.enable = false      delete.topic.enable = truenum.network.threads=3num.io.threads=8socket.send.buffer.bytes=102400socket.receive.buffer.bytes=102400socket.request.max.bytes=104857600log.dirs=/data/kafka/logs                 

num.partitions=1num.recovery.threads.per.data.dir=1log.retention.hours=168log.segment.bytes=1073741824log.retention.check.interval.ms=300000zookeeper.connect=10.0.102.179:2181,10.0.102.204:2181,10.0.102.214:2181zookeeper.connection.timeout.ms=6000

配置文件的参数的说明:

  • broker.id: 每一个broker都需要一个标识符,使用broker.id来表示。默认是0,可以被设置为任意整数,在kafka集群中必须是唯一的。
  • port:默认端口是9092,配置kafka监听的端口。
  • zookeeper.connect:用于保存broker元数据的zookeeper地址是通过zookeeper.connect来指定的。新式就是上面那样,主机和端口号,多个地址用逗号隔开。上面的三个是一个zookeeper集群。若是当前连接的zookeeper宕机,broker可以连接到zookeeper群组中的其他服务器上。
  • log.dirs: kafka把所有消息都保存在磁盘上,存放这些日子片段的目录是通过log.dirs指定的。它是一组用逗号分隔的本地文件系统路径。如果指定了多个路径,那么broker会根据“最少使用”原则,把同一个分区的日志片段保存到同一个路径下。需要注意,borker会往拥有最少数目分区的路径新增分区,而不是往拥有最小磁盘空间的路径新增分区。
  • num.recovery.threads.per.data.dir,对于如下3种情况,kafka会使用多少个线程来处理日志片段。
    •   服务器正常启动,用于打开每个分区的日志片段。
    •        服务器崩溃后重启,用于检查和截短每个分区的日志片段。
    •        服务器正常关闭,用于关闭日志片段。

          默认情况下,每个日志目录只使用一个线程。因为这些线程只是在服务器启动和关闭时会用到,所以完全可以设置大量的线程来达到并行操作的目的。特别是对于包含大量分区的服务器来说,一旦发生崩溃,在进行恢复时使用并行操作可能会省下很多时间。所配置的数字对应的是log.dirs指定的单个日志目录。譬如,这个值被设置为8,并且log.dirx指定了3个路径,那么久需要24个线程。

  • auto.create.topics.enable 默认情况下,kafka会在以下几种情况下创建主题。

    •   当一个生产者开始往主题写入消息时。
    •        当一个消费者开始从主题读取消息时。
    •        当任意一个客户端向主题发送元数据请求是。

    这个参数有点迷糊,尽量设置为false吧!

 

上面指定了kafa的日志目录,需要创建消息目录:

[[email protected]~]#  mkdir -p /data/kafka/logs

和zookeeper一样,把上面解压后的文件和创建的日志目录,分别拷贝到其余两台服务器上,注意修改其余两台服务器的broker.id。

然后启动kafka:

[[email protected]][email protected][email protected][email protected][email protected][email protected][email protected][email protected][email protected][email protected][email protected].[email protected][email protected][email protected].

三台服务器全部启动完毕,可以进行如下测试:

[[email protected]]# ./kafka-console-producer. --broker-list .: --

若是以上的队列消息验证成功,则说明集群搭建成功、

[[email protected]]# ./kafka-topics.sh --zookeeper 10.0.102.179:2181 --describe --topic science         #查看主题的信息
Topic:science    PartitionCount:1    ReplicationFactor:1    Configs:
    Topic: science    Partition: 0    Leader: 2    Replicas: 2    Isr: 2[[email protected]]#

 

一个异常说明:在测试的时候,刚开始总报如下错误:

[[email protected]]# ./kafka-console-producer.sh --broker-list localhost:9092 --topic lianxi
test1
[2018-12-20 17:33:27,112] ERROR Error when sending message to topic lianxi with key: null, value: 5 bytes with error: (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback)
org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms.

这个问题搞了好久,最好把java的版本从1.8降到了1.7之后,然后就成功了!

搭建成功的各个组件的版本信息如下:

[[email protected]~]# java -.0_79--Bit Server VM (build -b02, mixed mode)

#kafka没有类似的version命令,需要查看kafka的使用包信息
[[email protected]]# pwd
/usr/local/kafka/libs
[[email protected]]# ls kafka_2.10-0.10.0.0.jar
kafka_2.10-0.10.0.0.jar

#2.10是scala的版本,后面的0.10.0.0是kafka的版本

#在zookeeper的安装目录下面有对应版本的一些包
[[email protected]]# ls
bin        CHANGES.txt  contrib     docs             ivy.xml  LICENSE.txt  README_packaging.txt  recipes  zookeeper-3.4.6.jar      zookeeper-3.4.6.jar.md5
build.xml  conf         dist-maven  ivysettings.xml  lib      NOTICE.txt   README.txt            src      zookeeper-3.4.6.jar.asc  zookeeper-3.4.6.jar.sha1
[[email protected]]# 
#zookeeper的版本是3.4.6

 

扫码领视频副本.gif

0

精彩评论

暂无评论...
验证码 换一张
取 消

关注公众号