Kafka入门系列(一):理论基础和安装配置

【引言】向来学习都是理论联系实际效果最佳,所以本系列第一篇就先从理论说起,而后从装配过程了解Kafka有哪些节点,从粗粒度上对Kafka有个初步的认知。

背景知识

消息队列(Message Queue)

  简称MQ,全称为Message Queue, 消息队列(MQ)是一种应用程序对应用程序的通信方法。应用程序通过读写出入队列的消息(针对应用程序的数据)来通信,而无需专用连接来链接它们。
  消息传递指的是程序之间通过在消息中发送数据进行通信,而不是通过直接调用彼此来通信,直接调用通常是用于诸如远程过程调用的技术。
  排队指的是应用程序通过 队列来通信。队列的使用除去了接收和发送应用程序同时执行的要求。常见的可作为MQ使用的MQ或者类MQ组件:RabbitMQ、ActiveMQ、ZeroMQ、Redis、Kafka…

JMS(Java Messaging Service)

  JMS即Java消息服务(Java Message Service)应用程序接口,是一个Java平台中关于面向消息中间件(MOM)的API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信。Java消息服务是一个与具体平台无关的API,它类似于JDBC(Java Database Connectivity),绝大多数MOM提供商都对JMS提供支持。

MQ消息模型

MQ的分类

点对点

  消息生产者生产消息发送到queue中,然后消息消费者从queue中取出并且消费消息。消息被消费以后,queue中不再有存储,所以消息消费者不可能消费到已经被消费的消息。Queue支持存在多个消费者,但是对一个消息而言,只会有一个消费者可以消费。

发布/订阅

  消息生产者(发布)将消息发布到topic中,同时有多个消息消费者(订阅)消费该消息。和点对点方式不同,发布到topic的消息会被所有订阅者消费。

基本概念

官方描述

Apache Kafka® is a distributed streaming platform. What exactly does that mean?A streaming platform has three key capabilities:

  • Publish and subscribe to streams of records, similar to a message queue or enterprise messaging system.
  • Store streams of records in a fault-tolerant durable way.
  • Process streams of records as they occur.

Kafka is generally used for two broad classes of applications:

  • Building real-time streaming data pipelines that reliably get data between systems or applications
  • Building real-time streaming applications that transform or react to the streams of data

First a few concepts:

  • Kafka is run as a cluster on one or more servers that can span multiple datacenters.
  • The Kafka cluster stores streams of records in categories called topics.
  • Each record consists of a key, a value, and a timestamp.

通俗理解

  • Kafka是一个开源的、基于发布-订阅模式的、可热扩展的、分布式的消息系统(Message Queue)
  • Kafka可同时支持离线数据处理和实时数据处理(比如:用于hadoop或者storm)
  • Kafka可以时间复杂度为O(1)的方式提供消息持久化能力,并保证低配服务器和大数据量的情况下的访问性能稳定
  • Kafka有很好的容错性,允许集群中节点失败(若副本数量为n,则允许n-1个节点失败)
  • Kafka对高并发量的请求有很好的支持性,可轻松支持数千个客户端的同时读写

Core APIs

  • The Producer API allows an application to publish a stream of records to one or more Kafka topics.(生产者API)
  • The Consumer API allows an application to subscribe to one or more topics and process the stream of records produced to them.(消费者API)
  • The Streams API allows an application to act as a stream processor, consuming an input stream from one or more topics and producing an output stream to one or more output topics, effectively transforming the input streams to output streams.(数据流处理API)
  • The Connector API allows building and running reusable producers or consumers that connect Kafka topics to existing applications or data systems. For example, a connector to a relational database might capture every change to a table.(数据连接连接API)

架构分析

数据流向总览

  • Producers:消息的生产者
  • Brokers:kafka集群(消息的传输和存储者)
  • Consumers:消息的消费者

数据流向细节

  • Topic:消息的主题
  • Consumer Group:消费者分组
  • Partition:消息的存储分片

核心概念

Broker

  Kafka集群中一般包含多个服务器(也可以是一个),这类服务器被称为broker;broker的原意是中间人、代理人,实际在Kafka集群中,也可以类似的理解,broker作为消息传递的中间人,负责了producer和consumer之间的消息传递(类似于一个快递中心的角色)

Producer

  从字面理解,也很通俗易懂,就是消息的生产者;producer定义好消息的格式,消息的主题(topic),消息的内容,然后发送给了Kafka集群(实际就是broker来完成消息的接收和持久化),至此producer的工作就结束了,至于后面消息怎么存,什么人会过来来消费,那都不是producer该关心的事情了。

Consumer

  和前一个概念一样,这也是个很容易从字面意思就能明白用途的角色,它就是消息的消费者;通常来讲,消费才是促进生产的原动力,在kafka里面也一样,只有有了消费的需求,才有必要生产消息,否则生产出来的消息没有人来消费就都成为垃圾数据了。

Kafka Cluster

  Kafka既然是分布式的消息系统,那么必须是要支持以集群的模式存在的,通常我们会对架构的描述有三个常见名词:单机、集群、分布式;这三者的概念大概可以按下面的特征划分(图片来源于知乎)

  • 单机:所有业务都在一台服务器上,相对于集群模式就是其中一个节点(从物理结构上可区分)
  • 集群:相同的业务,部署在多个服务器上,相当于多单机模式组合(从物理结构上可区分)
  • 分布式:一个业务分拆多个子业务(不一定是部署在多台服务器上);相对于集群来说集群描述的是物理形态,分布式描述的是一种工作方式
  • 简单总结:分布式是“并联”工作的,集群是“串联”工作的

Topic

  topic的字面意思就是话题;在kafka的世界里面,可以理解为producer和consumer之间聊天的日常,两个人聊天就肯定会涉及到一个相同的话题,这个话题就是所谓的topic,也就是说只有关于这个topic的消息,才能在围绕这个topic的producer和consumer之间传递(其他消息consumer根本不用关心);实际情况可能比这个比喻更为复杂(因为实际上一个话题可以被若干的consumer消费,一个producer也可以在不同话题之间切换)

Partition

  parition的字面意义是分割,在kafka体系里对应的是物理上的分割的概念,创建topic的时候可以指定该topic包含一个或多个partition,而在实际的存储中每个partition对应的是一个文件夹,这个文件夹下存储的就是该partition被分配到的的数据和索引文件,每个partition都是一个有序的队列(有自己的offset)

Consumer Group

  消费者分组,这是kafka用来实现一个topic的广播(发给所有的consumer)和单播(发给任意一个consumer)的手段。一个topic可以有多个Consumer Group。topic里的消息会给所有的Consumer Group都分发一份(可以理解为复制,但不是真的复制),但每个partion只会把消息发给该Consumer Group中的某一个consumer。
  如果需要实现广播,只要每个consumer有一个独立的Consumer Group就可以了(这样每个consumer都可以接收全量的消息)。要实现单播只要所有的consumer在同一个Consumer Group(这样就是所有的consumer来share全量的消息)。

Offset

  kafka的存储文件都是按照offset.kafka来命名,用offset做名字的好处是方便查找。例如你想找位于2049的位置,只要找到2048.kafka的文件即可。当然the first offset就是00000000000.kafka。我们实际做kafka消息消费时,也可以用这个offset来实现定点消费。

Zookeeper

  ZooKeeper本身是一种分布式协调服务,用于管理大型主机。在分布式环境中协调和管理服务是一个复杂的过程。ZooKeeper通过其简单的架构和API解决了这个问题。ZooKeeper允许开发人员专注于核心应用程序逻辑,而不必担心应用程序的分布式特性。作为去中心化的集群模式;需要要消费者知道现在那些生产者(对于消费者而言,kafka就是生产者)是可用的,这也就是Zookeeper的存在价值,它是用来做分布式集群管理的。
  实际应用中,Kafka将元数据信息保存在Zookeeper中,但是发送给Topic本身的数据是不会发到Zookeeper上的,否则Zookeeper就会爆掉了。kafka使用zookeeper可以实现动态的集群扩展,而不需要更改客户端(producer和consumer)的任何配置(对外来说,集群是一个整体,集群内部的扩展对外是不可感知的)。broker会在zookeeper注册并保持相关的元数据(topic,partition信息等)更新。Zookeeper中kafka节点的元数据存储结构如下:

安装说明

  Kafka集群需要安装的基础组件如下:

  • JDK:基础Java环境,Zookeeper和Kafka对Java基础环境都有一定的依赖,所以JDK是必备的
  • Zookeeper:Zookeeper在大数据处理上经常用到,主要是用于Kafka的状态保存;后面也会起一个专栏专门研究一下Zookeeper。(官方解释ZooKeeper is a centralized service for maintaining configuration information, naming, providing distributed synchronization, and providing group services.)
  • Kafka:Kafka和很多大数据组件一样都是解压即用的(当然该修改的配置文件还是要改的)

准备工作

服务器准备

  这次我们准备搭建一个3节点的小集群,鉴于创建虚拟机比较费时一些,所以这次我使用的是docker来搭建的。具体的机器分配情况如下表所示:

docker名 操作系统 hostname IP地址 备注
cl_76_153 centos7.4 1bc72703f1e9 192.168.142.153 Kafka01、Zookeeper01
cl_76_156 centos7.4 b3e6614d0345 192.168.142.156 Kafka02、Zookeeper02
cl_76_157 centos7.4 63c3077b732d 192.168.142.157 Kafka03、Zookeeper03

【问】:为什么hostname看上去那么诡异呢?
【答】:因为这个就是docker生成后的一个随机ID,尝试过在docker里面修改hostname,但是重启之后就又被恢复回去了,后面会单独开个研究docker的专题,再来解决这个疑问。

组件包准备

组件名称 发行版本 备注
JDK 1.7.0_79 因验证2016年的项目系统,没有用最新版
Zookeeper 3.4.6 因验证2016年的项目系统,没有用最新版
Kafka 2.10-0.9.0.0 因验证2016年的项目系统,没有用最新版

安装流程

  1. 下载安装源:JDK、Zookeeper、Kafka
  2. 分发安装源到各个集群节点
  3. 分别按顺序完成安装:JDK - Zookeeper - Kafka
  4. 完成环境变量配置(主要针对JDK)
  5. 完成Zookeeper和Kafka配置
  6. 按步骤启动服务(Zookeeper - Kafka)
  7. 完成基本流程验证

安装JDK

手工安装

  • 将jdk-7u79-linux-x64.tar.gz上传到/opt/tools/目录下
  • 切换到/opt/tools目录下,解压安装源,解压完成后删除安装源:

    1
    2
    3
    [root@centos154 ~]#cd /opt/tools
    [root@centos154 ~]#tar –zxvf ./jdk-7u79-linux-x64.tar.gz
    [root@centos154 ~]#rm ./jdk-7u79-linux-x64.tar.gz
  • 修改环境变量配置(追加),并立即重加载环境变量:

    1
    2
    3
    4
    5
    6
    [root@centos154 ~]#vi /etc/profile
    export JAVA_HOME=/opt/tools/jdk1.7.0_79
    export CLASSPATH=${JAVA_HOME}/lib
    export PATH=${JAVA_HOME}/bin:$PATH
    ...
    [root@centos154 ~]# source /etc/profile
  • 测试安装是否成功:

    1
    2
    3
    4
    5
    [root@centos154 ~]# java -version
    java version "1.7.0_79"
    Java(TM) SE Runtime Environment (build 1.7.0_79-b15)
    Java HotSpot(TM) 64-Bit Server VM (build 24.79-b02, mixed mode)
    [root@centos154 ~]#

自动安装

  可以通过任意工具将JDK安装包(jdk-7u79-linux-x64.tar.gz)上传到服务器上,然后将如下脚本存档为一个sh文件并上传到安装包的同级目录,最后为脚本添加权限后执行即可完成安装(必须保证执行脚本的用户为root权限用户,因为有对系统配置文件的操作,若是ubuntu系统可考虑使用-i切换到root)。脚本仅供参考,可视具体使用场景调整。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
#!/bin/bash

#脚本所在目录
script_dir=$( cd "$( dirname "$0" )" && pwd )

#若未安装jdk,安装jdk
JAVA=$(which java)
if [[ -z "$JAVA" ]]; then
echo "【INFO】开始安装jdk..."
#删除现有的jdk
rm -rf /opt/tools/jdk1.7.0_79
#解压安装jdk
tar -zxvf ./jdk-7u79-linux-x64.tar.gz -C /opt/tools/
#配置环境变量
### -------------- 删除原配置 -------------------- ###
sed -i -e '/JAVA_HOME/d' -e '/JRE/d' /etc/profile
### -------------- 添加新配置 -------------------- ###
sed -i '$a## --------------------- JAVA_HOME -------------------------- ##' /etc/profile
sed -i '$aJAVA_HOME=/opt/tools/jdk1.7.0_79' /etc/profile
sed -i '$aJRE_HOME=/opt/tools/jdk1.7.0_79/jre' /etc/profile
sed -i '$aPATH=$PATH:$JAVA_HOME/bin:$JRE_HOME/bin' /etc/profile
sed -i '$aCLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar:$JRE_HOME/lib' /etc/profile
sed -i '$aexport JAVA_HOME JRE_HOME PATH CLASSPATH' /etc/profile
echo "$JAVA_HOME"
### --------------- 使环境变量生效 ---------------- ###
source /etc/profile
sleep 1
echo "【SUCCESS】:安装JDK完成"
fi

echo "**************************************************"
echo "【java -version】:"
java -version

【注】:通常在自动安装完成后需要手工执行一下如下命令:source /etc/profile;或者重新登录服务器。

安装Zookeeper

安装步骤

  • 配置host映射

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    [root@1bc72703f1e9 ~]# cat /etc/hosts
    127.0.0.1 localhost
    ::1 localhost ip6-localhost ip6-loopback
    fe00::0 ip6-localnet
    ff00::0 ip6-mcastprefix
    ff02::1 ip6-allnodes
    ff02::2 ip6-allrouters
    172.17.0.6 1bc72703f1e9

    # 此处的映射是为了在配置zookeeper和kafka时可以直接使用集群各主机的hostname来进行通信
    # 配置完成后需要ping一下是否可通信成功
    192.168.142.153 1bc72703f1e9
    192.168.142.156 b3e6614d0345
    192.168.142.157 63c3077b732d

    [root@1bc72703f1e9 ~]# ping b3e6614d0345
    PING b3e6614d0345 (192.168.142.156) 56(84) bytes of data.
    64 bytes from b3e6614d0345 (192.168.142.156): icmp_seq=1 ttl=64 time=0.136 ms
    64 bytes from b3e6614d0345 (192.168.142.156): icmp_seq=2 ttl=64 time=0.072 ms
    ...
  • 创建数据目录,上传安装包并解压

    1
    2
    3
    4
    [root@1bc72703f1e9 ~]# mkdir -p /zookeeper/data
    [root@1bc72703f1e9 ~]# mv /opt/tools/zookeeper-3.4.6.tar.gz /zookeeper/
    [root@1bc72703f1e9 ~]# cd /zookeeper/
    [root@1bc72703f1e9 zookeeper]# tar -zxvf zookeeper-3.4.6.tar.gz
  • 创建配置文件(从安装包模板复制)

    1
    2
    3
    4
    [root@1bc72703f1e9 zookeeper]# cd /zookeeper/zookeeper-3.4.6/conf
    [root@1bc72703f1e9 conf]# cp zoo_sample.cfg zoo.cfg
    [root@1bc72703f1e9 conf]# vi zoo.cfg
    ...
  • 修改配置文件zoo.cfg

    1
    2
    3
    4
    5
    6
    7
    # 原有配置,修改即可
    dataDir=/zookeeper/data

    # 若原配置文件不存在,则新增此段(类似1bc72703f1e9的字串是集群的主机hostname)
    server.1=1bc72703f1e9:2777:3777
    server.2=b3e6614d0345:2777:3777
    server.3=63c3077b732d:2777:3777
  • 分别在集群的各台主机完成安装和配置

    1
    2
    3
    4
    5
    6
    7
    # 建议直接scp已配置好的一台主机的Zookeeper目录,然后只需要修改一下myid即可
    [root@1bc72703f1e9 /]# scp -r zookeeper/ root@192.168.142.156:/
    root@192.168.142.157's password:
    ...
    [root@1bc72703f1e9 /]# scp -r zookeeper/ root@192.168.142.156:/
    root@192.168.142.157's password:
    ...
  • 分别设置各主机的myid
      这个myid的值从哪里得来的呢?注意一下前面的zoo.cfg里面的这个配置:server.1=1bc72703f1e9:2777:3777,其中本主机对应的server后面的那个数字就是本机ID

    1
    2
    3
    4
    5
    6
    7
    8
    # 主机1
    [root@1bc72703f1e9 /]# echo "1">/zookeeper/data/myid

    # 主机2
    [root@b3e6614d0345 /]# echo "2">/zookeeper/data/myid

    # 主机3
    [root@63c3077b732d /]# echo "3">/zookeeper/data/myid
  • 分别启动3台主机的Zookeeper服务

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    # 启动服务
    [root@63c3077b732d /]# cd /zookeeper/zookeeper-3.4.6/bin/
    [root@63c3077b732d bin]# ./zkServer.sh stop
    JMX enabled by default
    Using config: /zookeeper/zookeeper-3.4.6/bin/../conf/zoo.cfg
    Stopping zookeeper ... STOPPED
    [root@63c3077b732d bin]# ./zkServer.sh start
    JMX enabled by default
    Using config: /zookeeper/zookeeper-3.4.6/bin/../conf/zoo.cfg
    Starting zookeeper ... STARTED

    # 查看状态 -- 3台主机的Mode应该是一个leader两个follower,至此即表示启动成功
    [root@63c3077b732d bin]# ./zkServer.sh status
    JMX enabled by default
    Using config: /zookeeper/zookeeper-3.4.6/bin/../conf/zoo.cfg
    Mode: follower
    [root@63c3077b732d bin]#

特别提醒

  • 每台主机的/etc/hosts文件都必须要修改,否则服务启动会报如下异常:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    2018-05-31 06:28:51,175 [myid:3] - WARN  [QuorumPeer[myid=3]/0.0.0.0:2181:QuorumCnxManager@382] 
    - Cannot open channel to 1 at election address 1bc72703f1e9:2778
    java.net.UnknownHostException: 1bc72703f1e9
    at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:178)
    at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
    at java.net.Socket.connect(Socket.java:579)
    at org.apache.zookeeper.server.quorum.QuorumCnxManager.connectOne(QuorumCnxManager.java:368)
    at org.apache.zookeeper.server.quorum.QuorumCnxManager.connectAll(QuorumCnxManager.java:402)
    at org.apache.zookeeper.server.quorum.FastLeaderElection.lookForLeader(FastLeaderElection.java:840)
    at org.apache.zookeeper.server.quorum.QuorumPeer.run(QuorumPeer.java:762)
  • Zookeeper error: Cannot open channel to X at election address的问题起因

    1
    2
    3
    4
    5
    6
    7
    # How have defined the ip of the local server in each node? 
    # If you have given the public ip, then the listener would have failed to connect to the port.
    # You must specify 0.0.0.0 for the current node

    server.1=0.0.0.0:2777:2778
    server.2=b3e6614d0345:3777:3778
    server.3=63c3077b732d:4777:4778
  • 别忘了,我们用的是Docker,所以从端口使用上来讲,必须要避免重复

  • 关于Zookeeper启动的日志,一般情况下在bin目录的zookeeper.out文件查看

安装Kafka

  • 上传和解压安装包

    1
    2
    3
    4
    5
    [root@1bc72703f1e9 bin]# mkdir /kafka
    [root@1bc72703f1e9 bin]# cd /kafka/
    [root@1bc72703f1e9 kafka]# cp /opt/tools/kafka_2.10-0.9.0.0.tgz ./
    [root@1bc72703f1e9 kafka]# tar -zxvf kafka_2.10-0.9.0.0.tgz
    ...
  • 修改配置文件server.properties

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    [root@1bc72703f1e9 config]# pwd
    /kafka/kafka_2.10-0.9.0.0/config
    [root@1bc72703f1e9 config]# vi server.properties
    ...
    ############################# Server Basics #############################

    # The id of the broker. This must be set to a unique integer for each broker.
    broker.id=1

    ############################# Socket Server Settings #############################

    listeners=PLAINTEXT://:9092

    # The port the socket server listens on
    port=9092

    # Hostname the broker will bind to. If not set, the server will bind to all interfaces
    host.name=192.168.142.153

    # Hostname the broker will advertise to producers and consumers. If not set, it uses the
    # value for "host.name" if configured. Otherwise, it will use the value returned from
    # java.net.InetAddress.getCanonicalHostName().
    advertised.host.name=192.168.142.153

    # The default number of log partitions per topic. More partitions allow greater
    num.partitions=12

    # The number of threads handling network requests -- 不建议改动(生产环境可放大,比如16)
    num.network.threads=3

    ...

    ############################# Log Basics #############################

    # A comma seperated list of directories under which to store log files
    log.dirs=/kafka/kafka-logs

    ...
    ...

    ############################# Zookeeper #############################

    # Zookeeper connection string (see zookeeper docs for details).
    # This is a comma separated host:port pairs, each corresponding to a zk
    # server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
    # You can also append an optional chroot string to the urls to specify the
    # root directory for all kafka znodes.
    zookeeper.connect=192.168.142.153:2181,192.168.142.156:2181,192.168.142.157:2181

    # Timeout in ms for connecting to zookeeper
    zookeeper.connection.timeout.ms=6000

    # 原始配置文件中并没有,新增的
    delete.topic.enable=true
    ...
  • 同步kafka安装目录到其他机器

    1
    2
    3
    4
    5
    6
    7
    # 建议直接scp已配置好的一台主机的Kafka目录,然后只需要修改一下server.properties即可
    [root@1bc72703f1e9 /]# scp -r kafka/ root@192.168.142.156:/
    root@192.168.142.157's password:
    ...
    [root@1bc72703f1e9 /]# scp -r kafka/ root@192.168.142.156:/
    root@192.168.142.157's password:
    ...
  • 修改各台服务器的配置文件,修改项:broker.id、host.name、advertised.host.name

  • 启动kafka服务
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    100
    101
    102
    103
    104
    105
    106
    107
    108
    109
    110
    111
    112
    113
    114
    115
    116
    117
    118
    119
    120
    121
    122
    123
    124
    125
    126
    127
    128
    129
    130
    131
    132
    133
    134
    135
    136
    137
    138
    139
    140
    141
    142
    143
    144
    145
    146
    147
    148
    [root@1bc72703f1e9 config]# cd /kafka/kafka_2.10-0.9.0.0/bin
    [root@1bc72703f1e9 bin]# nohup ./kafka-server-start.sh /kafka/kafka_2.10-0.9.0.0/config/server.properties &
    [1] 1459
    [root@1bc72703f1e9 bin]# nohup: ignoring input and appending output to 'nohup.out'

    [root@1bc72703f1e9 bin]# tail -200f nohup.out
    [2018-05-31 08:06:15,576] INFO KafkaConfig values:
    request.timeout.ms = 30000
    log.roll.hours = 168
    inter.broker.protocol.version = 0.9.0.X
    log.preallocate = false
    security.inter.broker.protocol = PLAINTEXT
    controller.socket.timeout.ms = 30000
    ssl.keymanager.algorithm = SunX509
    ssl.key.password = null
    log.cleaner.enable = false
    ssl.provider = null
    num.recovery.threads.per.data.dir = 1
    background.threads = 10
    unclean.leader.election.enable = true
    sasl.kerberos.kinit.cmd = /usr/bin/kinit
    replica.lag.time.max.ms = 10000
    ssl.endpoint.identification.algorithm = null
    auto.create.topics.enable = true
    zookeeper.sync.time.ms = 2000
    ssl.client.auth = none
    ssl.keystore.password = null
    log.cleaner.io.buffer.load.factor = 0.9
    offsets.topic.compression.codec = 0
    log.retention.hours = 168
    log.dirs = /kafka/kafka-logs
    ssl.protocol = TLS
    log.index.size.max.bytes = 10485760
    sasl.kerberos.min.time.before.relogin = 60000
    log.retention.minutes = null
    connections.max.idle.ms = 600000
    ssl.trustmanager.algorithm = PKIX
    offsets.retention.minutes = 1440
    max.connections.per.ip = 2147483647
    replica.fetch.wait.max.ms = 500
    metrics.num.samples = 2
    port = 9092
    offsets.retention.check.interval.ms = 600000
    log.cleaner.dedupe.buffer.size = 524288000
    log.segment.bytes = 1073741824
    group.min.session.timeout.ms = 6000
    producer.purgatory.purge.interval.requests = 1000
    min.insync.replicas = 1
    ssl.truststore.password = null
    log.flush.scheduler.interval.ms = 9223372036854775807
    socket.receive.buffer.bytes = 102400
    leader.imbalance.per.broker.percentage = 10
    num.io.threads = 8
    zookeeper.connect = 192.168.142.153:2181,192.168.142.156:2181,192.168.142.157:2181
    queued.max.requests = 500
    offsets.topic.replication.factor = 3
    replica.socket.timeout.ms = 30000
    offsets.topic.segment.bytes = 104857600
    replica.high.watermark.checkpoint.interval.ms = 5000
    broker.id = 1
    ssl.keystore.location = null
    listeners = PLAINTEXT://:9092
    log.flush.interval.messages = 9223372036854775807
    principal.builder.class = class org.apache.kafka.common.security.auth.DefaultPrincipalBuilder
    log.retention.ms = null
    offsets.commit.required.acks = -1
    sasl.kerberos.principal.to.local.rules = [DEFAULT]
    group.max.session.timeout.ms = 30000
    num.replica.fetchers = 1
    advertised.listeners = null
    replica.socket.receive.buffer.bytes = 65536
    delete.topic.enable = true
    log.index.interval.bytes = 4096
    metric.reporters = []
    compression.type = producer
    log.cleanup.policy = delete
    controlled.shutdown.max.retries = 3
    log.cleaner.threads = 1
    quota.window.size.seconds = 1
    zookeeper.connection.timeout.ms = 6000
    offsets.load.buffer.size = 5242880
    zookeeper.session.timeout.ms = 6000
    ssl.cipher.suites = null
    authorizer.class.name =
    sasl.kerberos.ticket.renew.jitter = 0.05
    sasl.kerberos.service.name = null
    controlled.shutdown.enable = true
    offsets.topic.num.partitions = 50
    quota.window.num = 11
    message.max.bytes = 1000012
    log.cleaner.backoff.ms = 15000
    log.roll.jitter.hours = 0
    log.retention.check.interval.ms = 300000
    replica.fetch.max.bytes = 1048576
    log.cleaner.delete.retention.ms = 86400000
    fetch.purgatory.purge.interval.requests = 1000
    log.cleaner.min.cleanable.ratio = 0.5
    offsets.commit.timeout.ms = 5000
    zookeeper.set.acl = false
    log.retention.bytes = -1
    offset.metadata.max.bytes = 4096
    leader.imbalance.check.interval.seconds = 300
    quota.consumer.default = 9223372036854775807
    log.roll.jitter.ms = null
    reserved.broker.max.id = 1000
    replica.fetch.backoff.ms = 1000
    advertised.host.name = 192.168.142.153
    quota.producer.default = 9223372036854775807
    log.cleaner.io.buffer.size = 524288
    controlled.shutdown.retry.backoff.ms = 5000
    log.dir = /tmp/kafka-logs
    log.flush.offset.checkpoint.interval.ms = 60000
    log.segment.delete.delay.ms = 60000
    num.partitions = 12
    num.network.threads = 3
    socket.request.max.bytes = 104857600
    sasl.kerberos.ticket.renew.window.factor = 0.8
    log.roll.ms = null
    ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
    socket.send.buffer.bytes = 102400
    log.flush.interval.ms = null
    ssl.truststore.location = null
    log.cleaner.io.max.bytes.per.second = 1.7976931348623157E308
    default.replication.factor = 1
    metrics.sample.window.ms = 30000
    auto.leader.rebalance.enable = true
    host.name = 192.168.142.153
    ssl.truststore.type = JKS
    advertised.port = null
    max.connections.per.ip.overrides =
    replica.fetch.min.bytes = 1
    ssl.keystore.type = JKS
    (kafka.server.KafkaConfig)
    [2018-05-31 08:06:15,665] INFO starting (kafka.server.KafkaServer)
    [2018-05-31 08:06:15,673] INFO Connecting to zookeeper on
    192.168.142.153:2181,192.168.142.156:2181,192.168.142.157:2181 (kafka.server.KafkaServer)
    [2018-05-31 08:06:15,701] INFO Starting ZkClient event thread. (org.I0Itec.zkclient.ZkEventThread)
    [2018-05-31 08:06:15,712] INFO Client environment:zookeeper.version=3.4.6-1569965,
    built on 02/20/2014 09:09 GMT (org.apache.zookeeper.ZooKeeper)
    ......
    [2018-05-31 08:07:28,597] INFO Registered broker 2 at path /brokers/ids/2 with addresses:
    PLAINTEXT -> EndPoint(192.168.142.156,9092,PLAINTEXT) (kafka.utils.ZkUtils)
    [2018-05-31 08:07:28,619] INFO Kafka version : 0.9.0.0 (org.apache.kafka.common.utils.AppInfoParser)
    [2018-05-31 08:07:28,620] INFO Kafka commitId : fc7243c2af4b2b4a
    (org.apache.kafka.common.utils.AppInfoParser)
    [2018-05-31 08:07:28,621] INFO [Kafka Server 2], started (kafka.server.KafkaServer)
    ^C
    [root@1bc72703f1e9 bin]#

安装结果验证

创建topic

1
./kafka-topics.sh --create --zookeeper 192.168.142.153:2181,192.168.142.156:2181,192.168.142.157:2181 --replication-factor 3 --partitions 16 --topic kafka-test

查看所有topic

1
./kafka-topics.sh --list --zookeeper 192.168.142.153:2181,192.168.142.156:2181,192.168.142.157:2181

创建producer

1
./kafka-console-producer.sh -broker-list 192.168.142.153:9092,192.168.142.156:9092,192.168.142.157:9092 -topic kafka-test

创建consumer

1
./kafka-console-consumer.sh --zookeeper 192.168.142.153:2181,192.168.142.156:2181,192.168.142.157:2181 -topic kafka-test

消息收发流程验证

producer消息发送过程

1
2
3
4
5
6
7
8
[root@63c3077b732d bin]# ./kafka-topics.sh --create --zookeeper  192.168.142.153:2181,192.168.142.156:2181,192.168.142.157:2181 --replication-factor 3 --partitions 16 --topic kafka-test
Created topic "kafka-test".
[root@63c3077b732d bin]#
[root@63c3077b732d bin]# ./kafka-topics.sh --list --zookeeper 192.168.142.153:2181,192.168.142.156:2181,192.168.142.157:2181 kafka-test
[root@63c3077b732d bin]#
[root@63c3077b732d bin]# ./kafka-console-producer.sh -broker-list 192.168.142.153:9092,192.168.142.156:9092,192.168.142.157:9092 -topic kafka-test
Hello kafka
I am from Nanjing

consumer消息接收过程

1
2
3
[root@b3e6614d0345 bin]# ./kafka-console-consumer.sh --zookeeper 192.168.142.153:2181,192.168.142.156:2181,192.168.142.157:2181 -topic kafka-test
Hello kafka
I am from Nanjing
------2019 Lin.C ------