Apache·Kafka学习


官方文档:
搜索关键字:
相关/参考链接:

Kafka安装配置[简单版]

参照:http://kafka.apache.org/documentation.html#quickstart

Kafka uses ZooKeeper so you need to first start a ZooKeeper server if you don’t already have one. You can use the convenience script packaged with kafka to get a quick-and-dirty single-node ZooKeeper instance.(Kafka使用了ZooKeeper,所以你在使用Kafka之前需要先启动一个ZooKeeper,如果你没有的话,可以使用和Kafka一起打包的一个脚本方便的进行启动测试)

先进入对应的Kafka目录,然后执行:
> bin/zookeeper-server-start.sh config/zookeeper.properties  #启动ZooKeeper

> bin/kafka-server-start.sh config/server.properties  #然后启动Kafka
启动了之后再执行创建topic、producer发送消息、consumer消费消息的简单测试:
> bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test  #创建一个叫做“test”的topic

> bin/kafka-topics.sh --list --zookeeper localhost:2181  #查看已有的topic列表
test
> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test    #启动producer(由控制台输入消息)
This is a message    #自己输入的
This is another message  #自己输入的

{

之前在:> bin/kafka-console-producer.sh –broker-list localhost:9092 –topic test    报错如下:

SLF4J: Failed to load class “org.slf4j.impl.StaticLoggerBinder”. 

SLF4J: Defaulting to no-operation (NOP) logger implementation 

SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.

根据上面的链接查找解决方法,以为是Java的版本、CLASSPATH的设置有问题,后来设置成了Oracle的Java,修改了CLASSPATH但是也没有解决;在中文中一搜就找到了解决办法:

需要下载 slftj-nop-1.5.jar {http://www.slf4j.org/download.html},并将其拷贝至Kafka的libs目录下

}

> bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning    #启动consumer(从头开始接收消息)
This is a message
This is another message

So far we have been running against a single broker, but that’s no fun. For Kafka, a single broker is just a cluster of size one, so nothing much changes other than starting a few more broker instances. But just to get feel for it, let’s expand our cluster to three nodes (still all on our local machine).(以上我们只是进行了简单的——单broker的Kafka测试,但这显然是不够的,所以,下面我们会在单机上启动3个broker实例,感觉一下Kafka的cluster效果)


First we make a config file for each of the brokers(首先分别为每一个broker在默认配置的基础上创建一个config文件):

> cp config/server.properties config/server-1.properties

> cp config/server.properties config/server-2.properties

Now edit these new files and set the following properties:

config/server-1.properties:
    broker.id=1
    port=9093
    log.dir=/tmp/kafka-logs-1
config/server-2.properties:
    broker.id=2
    port=9094
    log.dir=/tmp/kafka-logs-2

The broker.id property is the unique and permanent name of each node in the cluster. We have to override the port and log directory only because we are running these all on the same machine and we want to keep the brokers from all trying to register on the same port or overwrite each others data.(主要修改3点信息:每个broker的broker.id在cluster中必须是唯一的;修改port和log.dir的原因在于——我们是在同一台机器上进行的多broker的测试,为了避免冲突和错误)

We already have Zookeeper and our single node started, so we just need to start the two new nodes(因为之前我们已经启动了ZooKeeper和一个Kafka实例,所以接下来只需要再开启2个Kafka实例即可):

> bin/kafka-server-start.sh config/server-1.properties &
...
> bin/kafka-server-start.sh config/server-2.properties &
...

Now create a new topic with a replication factor of three(现在创建一个3复制因子的新topic):

> bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic

Okay but now that we have a cluster how can we know which broker is doing what? To see that run the “describe topics” command(创建了之后我们如何知道每一个broker分别在做些什么呢?这时可以通过describe选项进行查看):

> bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
Topic:my-replicated-topic      PartitionCount:1     ReplicationFactor:3       Configs:
    Topic: my-replicated-topic     Partition: 0     Leader: 1 Replicas: 1,2,0       Isr: 1,2,0

Here is an explanation of output. The first line gives a summary of all the partitions, each additional line gives information about one partition. Since we have only one partition for this topic there is only one line(对以上输出内容的解释:第一行是所有分区的一个总结;每多一行就是对一个partion的详细说明,这里因为只有一个partition,所以也就只有一行).

“leader” is the node responsible for all reads and writes for the given partition. Each node will be the leader for a randomly selected portion of the partitions.(leader是随机选出的)

“replicas” is the list of nodes that replicate the log for this partition regardless of whether they are the leader or even if they are currently alive.(replicas是在该partition中要复制log信息的节点列表,不考虑该节点是否为leader或是alive)

“isr” is the set of “in-sync” replicas. This is the subset of the replicas list that is currently alive and caught-up to the leader.(isr是”in-sync”的副本,是当前存活/正在工作的节点列表)

Let’s publish a few messages to our new topic(通过producer发布一些新的message到指定topic中):

> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my-replicated-topic
...
my test message 1   #自己从console输入的
my test message 2   #自己从console输入的
^C

Now let’s consume these messages(然后consumer消费这些信息):

> bin/kafka-console-consumer.sh --zookeeper localhost:2181 --from-beginning --topic my-replicated-topic
...
my test message 1
my test message 2
^C

Now let’s test out fault-tolerance. Broker 1 was acting as the leader so let’s kill it(接下来测试一下cluster的容错性,这里先测试将leader节点kill掉之后的效果):

> ps | grep server-1.properties
7564 ttys002    0:15.91 /System/Library/Frameworks/JavaVM.framework/Versions/1.6/Home/bin/java...

> kill -9 7564

Leadership has switched to one of the slaves and node 1 is no longer in the in-sync replica set(通过describe选项可以查看到leader的id已经从之前的1号节点切换到了之前的2号从节点,并且1号节点已经不再isr集合中了):

> bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
Topic:my-replicated-topic      PartitionCount:1     ReplicationFactor:3       Configs:
    Topic: my-replicated-topic     Partition: 0     Leader: 2 Replicas: 1,2,0       Isr: 2,0

But the messages are still be available for consumption even though the leader that took the writes originally is down(即使是在leader被kill掉了之后,message依然可用):

> bin/kafka-console-consumer.sh --zookeeper localhost:2181 --from-beginning --topic my-replicated-topic
...
my test message 1
my test message 2
^C

然后,简单总结下Kafka系统运行的大体流程/顺序:

  1. 启动ZooKeeper的server
  2. 启动Kafka的server
  3. 创建topic(topic就类似于一个队列)
  4. Producer如果生产了数据,会先通过ZooKeeper找到broker,然后将数据放进broker
  5. Consumer如果要消费数据,会先通过ZooKeeper找对应的broker,然后消费

kafka_brief

,

《“Apache·Kafka学习”》 有 10 条评论

  1. Kafka深度解析
    http://www.jasongj.com/2015/01/02/Kafka%E6%B7%B1%E5%BA%A6%E8%A7%A3%E6%9E%90/
    http://www.jasongj.com/categories/Kafka/

    Kafka设计解析(一) Kafka背景及架构介绍
    http://www.jasongj.com/2015/03/10/KafkaColumn1/

    Kafka设计解析(二)- Kafka High Availability (上)
    Kafka设计解析(三)- Kafka High Availability (下)
    Kafka设计解析(四)- Kafka Consumer设计解析
    Kafka设计解析(五)- Kafka性能测试方法及Benchmark报告
    Kafka设计解析(六)- Kafka高性能架构之道
    Kafka设计解析(七)- Kafka Stream

  2. 说说 MQ 之 Kafka(一)
    http://www.importnew.com/30288.html
    `
    Topic,指消息的类别,每个消息都必须有;
    Producer,指消息的产生者,或者,消息的写端;
    Consumer,指消息的消费者,或者,消息的读端;
    Producer Group,指产生者组,组内的生产者产生同一类消息;
    Consumer Group,指消费者组,组内的消费者消费同一类消息;
    Broker,指消息服务器,Producer 产生的消息都是写到这里,Consumer 读消息也是从这里读;
    Zookeeper,是 Kafka 的注册中心,Broker 和 Consumer 之间的协调器,包含状态信息、配置信息和一些 Topic 的信息;
    Partition,指消息的水平分区,一个 Topic 可以有多个分区;
    Replica,指消息的副本,为了提高可用性,将消息副本保存在其他 Broker 上。
    `
    说说 MQ 之 Kafka(二)
    http://www.importnew.com/30301.html
    说说 MQ 之 Kafka(三)
    http://www.importnew.com/30304.html

  3. Kafka系列4-基本概念及消费者组(Consumer Group)的理解
    https://blog.csdn.net/kuluzs/article/details/71171537
    `
    Topic,是Kafka下消息的类别,类似于RabbitMQ中的Exchange的概念。这是逻辑上的概念,用来区分、隔离不同的消息数据,屏蔽了底层复杂的存储方式。对于大多数人来说,在开发的时候只需要关注数据写入到了哪个topic、从哪个topic取出数据。 

    Partition,是Kafka下数据存储的基本单元,这个是物理上的概念。同一个topic的数据,会被分散的存储到多个partition中,这些partition可以在同一台机器上,也可以是在多台机器上,比如下图所示的topic就有4个partition,分散在两台机器上。这种方式在大多数分布式存储中都可以见到,比如MongoDB、Elasticsearch的分片技术,其优势在于:有利于水平扩展,避免单台机器在磁盘空间和性能上的限制,同时可以通过复制来增加数据冗余性,提高容灾能力。为了做到均匀分布,通常partition的数量通常是Broker Server数量的整数倍。

    Consumer Group,同样是逻辑上的概念,是Kafka实现单播和广播两种消息模型的手段。同一个topic的数据,会广播给不同的group;同一个group中的worker,只有一个worker能拿到这个数据。换句话说,对于同一个topic,每个group都可以拿到同样的所有数据,但是数据进入group后只能被其中的一个worker消费。group内的worker可以使用多线程或多进程来实现,也可以将进程分散在多台机器上,worker的数量通常不超过partition的数量,且二者最好保持整数倍关系,因为Kafka在设计时假定了一个partition只能被一个worker消费(同一group内)。
    `

  4. Apache Kafka 从 0.7 到 1.0:那些年我们踩过的坑
    https://www.infoq.cn/article/MLMyoWNxqs*MzQX7lvzO
    https://zhuanlan.zhihu.com/p/48972039
    `
    Kafka 的小历史:
    0.7 版本,进入 Apache 孵化器
    0.8 版本,高可用性
    0.9 版本,配额和安全性
    0.10 版本,更细粒度的时间戳
    1.0 & 1.1 版本,Exactly-Once 与运维性提升

    经验教训:
    构建可进化的系统(你不能把系统 Shut down 来进行更新,只能一边用一边更新。)
    只有能被度量的问题才能得到最终解决(只有对 Kafka 进行全方位的观测和监测,才能知道哪里出现了问题,需要在哪里进行优化,需要在哪里进行改进。)
    API 保持不变(API向后兼容,避免产生没有考虑到的问题)
    服务需要把关(配额:流量、CPU、……)
    生态系统是关键(生态系统对一个开源社区来说是非常重要的。)
    `

  5. APACHE KAFKA QUICKSTART
    https://kafka.apache.org/quickstart
    `
    STEP 1: GET KAFKA
    STEP 2: START THE KAFKA ENVIRONMENT
    STEP 3: CREATE A TOPIC TO STORE YOUR EVENTS
    STEP 4: WRITE SOME EVENTS INTO THE TOPIC #从命令行往特定的Kafka topic发送消息,用于验证连通性和兼容性
    STEP 5: READ THE EVENTS
    STEP 6: IMPORT/EXPORT YOUR DATA AS STREAMS OF EVENTS WITH KAFKA CONNECT
    STEP 7: PROCESS YOUR EVENTS WITH KAFKA STREAMS
    STEP 8: TERMINATE THE KAFKA ENVIRONMENT
    `

发表回复

您的电子邮箱地址不会被公开。 必填项已用*标注