kafka场景
摘要:
1. kafka具备吞吐量大无限扩容的特点,相比起同类,它更适合需要无限扩容, 吞吐量要大(并发量很大) 的场景,比如日志大数据等。
2. 本来也是个消息系统,所以可以做: 解耦,异步处理,流量削峰,消息队列
场景:
(1)消息系统。Kafka作为一款优秀的消息系统,具有高吞吐量、内置的分区、备份冗余分布式等特点,为大规模 消息处理提供了一种很好的解决方案。
(2)应用监控。利用Kafka采集应用程序和服务器健康相关的指标,如CPU占用率、IO、内存、连接数、TPS、QPS 等,然后将指标信息进行处理,从而构建一个具有监控仪表盘、曲线图等可视化监控系统。例如,很多公司采用 Kafka与ELK(ElasticSearch、Logstash和Kibana)整合构建应用服务监控系统。
(3)网站用户行为追踪。为了更好地了解用户行为、操作习惯,改善用户体验,进而对产品升级改进,将用户操作 轨迹、内容等信息发送到Kafka集群上,通过Hadoop、Spark或Strom等进行数据分析处理,生成相应的统计报告, 为推荐系统推荐对象建模提供数据源,进而为每个用户进行个性化推荐。
(4)流处理。需要将已收集的流数据提供给其他流式计算框架进行处理,用Kafka收集流数据是一个不错的选择, 而且当前版本的Kafka提供了Kafka Streams支持对流数据的处理。
(5)持久性日志。Kafka可以为外部系统提供一种持久性日志的分布式系统。日志可以在多个节点间进行备份, Kafka为故障节点数据恢复提供了一种重新同步的机制。同时,Kafka很方便与HDFS和Flume进行整合,这样就方便 将Kafka采集的数据持久化到其他外部系统。 更多的场景主要是用来做日志分析系统,除了日志,网站的一些浏览数据应该也适用。(只要原始数据不需要直接存 DB的都可以) 使用kafka的核心理由: 分布式,高吞吐量,速度快(kafka是直接通过磁盘存储,线性读写,速度快)
2 kafka集群介绍
Producer:Producer即生产者,消息的产生 者,是消息的入口。 kafka cluster: Broker:Broker是kafka实例,每个服务器上有一个或多个 kafka的实例,我们姑且认为每个broker对应一台服务器。每个kafka集群内的broker都有一个不重复的编号,如图 中的broker-0、broker-1等…… Topic:消息的主题,可以理解为消息的分类,kafka的数据就保存在topic。 在每个broker上都可以创建多个topic。 Partition:Topic的分区,每个topic可以有多个分区,分区的作用 是做负载,提高kafka的吞吐量。同一个topic在不同的分区的数据是不重复的,partition的表现形式就是一个一个的文件夹! Replication:每一个分区都有多个副本,副本的作用是做备胎。当主分区(Leader)故障的时候 会选择一个备胎(Follower)上位,成为Leader。在kafka中默认副本的最大数量是10个,且副本的数量不能大于 Broker的数量,follower和leader绝对是在不同的机器,同一机器对同一个分区也只可能存放一个副本(包括自 己)。 Message:每一条发送的消息主体。 Consumer:消费者,即消息的消费方,是消息的出口。 Consumer Group:我们可以将多个消费组组成一个消费者组,在kafka的设计中同一个分区的数据只能被消费 者组中的某一个消费者消费。同一个消费者组的消费者可以消费同一个topic的不同分区的数据,这也是为了提高 kafka的吞吐量! Zookeeper:kafka集群依赖zookeeper来保存集群的的元信息,来保证系统的可用性。
工作流程分析
上面介绍了kafka的基础架构及基本概念,不知道大家看完有没有对kafka有个大致印象,如果对还比较懵也没关 系!我们接下来再结合上面的结构图分析kafka的工作流程,最后再回来整个梳理一遍我相信你会更有收获! 2.2.1 发送数据 我们看上面的架构图中,producer就是生产者,是数据的入口。注意看图中的红色箭头,Producer在写入数据 的时候永远的找leader,不会直接将数据写入follower!那leader怎么找呢?写入的流程又是什么样的呢?我们看下 图:
那kafka为什么要做分区呢?相信大家应该也能猜到,分区的主要目的是:
1、 方便扩展。因为一个topic可以有多个partition,所以我们可以通过扩展机器去轻松的应对日益增长的数据 量。
2、 提高并发。以partition为读写单位,可以多个消费者同时消费数据,提高了消息的处理效率。 熟悉负载均衡的朋友应该知道,当我们向某个服务器发送请求的时候,服务端可能会对请求做一个负载,将流量 分发到不同的服务器,
那在kafka中,如果某个topic有多个partition,producer又怎么知道该将数据发往哪个 partition呢?kafka中有几个原则:
1、 partition在写入的时候可以指定需要写入的partition,如果有指定,则 写入对应的partition。
2、 如果没有指定partition,但是设置了数据的key,则会根据key的值hash出一个 partition。
3、 如果既没指定partition,又没有设置key,则会轮询选出一个partition。 保证消息不丢失是一个消息队列中间件的基本保证,那producer在向kafka写入消息的时候,怎么保证消息不丢 失呢?其实上面的写入流程图中有描述出来,那就是通过ACK应答机制!在生产者向队列写入数据的时候可以设置参 数来确定是否确认kafka接收到数据,这个参数可设置的值为0、1、all。 0代表producer往集群发送数据不需要 等到集群的返回,不确保消息发送成功。安全性最低但是效率最高。 1代表producer往集群发送数据只要leader 应答就可以发送下一条,只确保leader发送成功。 all代表producer往集群发送数据需要所有的follower都完成 从leader的同步才会发送下一条,确保leader发送成功和所有的副本都完成备份。安全性最高,但是效率最低。 最后要注意的是,如果往不存在的topic写数据,能不能写入成功呢?kafka会自动创建topic,分区和副本的数量 根据默认配置都是1。
保存数据 Producer将数据写入kafka后,集群就需要对数据进行保存了!kafka将数据保存在磁盘,可能在我们的一般的 认知里,写入磁盘是比较耗时的操作,不适合这种高并发的组件。Kafka初始会单独开辟一块磁盘空间,顺序写入数 据(效率比随机写入高)。 Partition 结构 前面说过了每个topic都可以分为一个或多个partition,如果你觉得topic比较抽象,那partition 就是比较具体的东西了!Partition在服务器上的表现形式就是一个一个的文件夹,每个partition的文件夹下面会有多 组segment文件,每组segment文件又包含.index文件、.log文件、.timeindex文件(早期版本中没有)三个文件, log文件就实际是存储message的地方,而index和timeindex文件为索引文件,用于检索消息。
Message结构 上面说到log文件就实际是存储message的地方,我们在producer往kafka写入的也是一条一条的 message,那存储在log中的message是什么样子的呢?消息主要包含消息体、消息大小、offffset、压缩类型……等 等!我们重点需要知道的是下面三个: 1、 offffset:offffset是一个占8byte的有序id号,它可以唯一确定每条消息 在parition内的位置! 2、 消息大小:消息大小占用4byte,用于描述消息的大小。 3、 消息体:消息体存 放的是实际的消息数据(被压缩过),占用的空间根据具体的消息而不一样。 存储策略 无论消息是否被消费,kafka都会保存所有的消息。那对于旧数据有什么删除策略呢? 1、 基于时 间,默认配置是168小时(7天)。 2、 基于大小,默认配置是1073741824。 需要注意的是,kafka读取特 定消息的时间复杂度是O(1),所以这里删除过期的文件并不会提高kafka的性能!
消费数据 消息存储在log文件后,消费者就可以进行消费了。与生产消息相同的是,消费者在拉取消息的时候也是找 leader去拉取。 多个消费者可以组成一个消费者组(consumer group),每个消费者组都有一个组id!同一个消费组者的消费 者可以消费同一topic下不同分区的数据,但是不会组内多个消费者消费同一分区的数据
安装
需要装的docker容器有6个,3个zookeeper容器和3个kafka容器,这里我们装在一台机子上。实际场景中是6台机 子。节约的话也至少是3台(每台上1个zookeeper和1个kafka)。
一 zookeeper 集群安装
这里用zookeeper干啥 我们这里用zookeeper来做元数据/配置信息管理,具体包括:存储消费偏移量,topic话题信息,partition信息) 这 些部分组成。 当然zookeeper的功能肯定不止这么多,但我们这里只用到它这个功能!
Docker 配置
docker-compose-zookeeper-cluster.yml
version: '3.5' networks: docker_net: external: true services: zoo1: image: zookeeper restart: unless-stopped hostname: zoo10 container_name: zoo11 ports: - 2182:2181 environment: ZOO_MY_ID: 1 ZOO_SERVERS: server.1=0.0.0.0:2888:3888;2181 server.2=zoo2:2888:3888;2181 server.3=zoo3:2888:3888;2181 volumes: - ./zookeeper/zoo1/data:/data - ./zookeeper/zoo1/datalog:/datalog networks: - docker_net zoo2: image: zookeeper restart: unless-stopped hostname: zoo20 container_name: zoo22 ports: - 2183:2181 environment: ZOO_MY_ID: 2 ZOO_SERVERS: server.1=zoo1:2888:3888;2181 server.2=0.0.0.0:2888:3888;2181 server.3=zoo3:2888:3888;2181 volumes: - ./zookeeper/zoo2/data:/data - ./zookeeper/zoo2/datalog:/datalog networks: - docker_net zoo3: image: zookeeper restart: unless-stopped hostname: zoo30 container_name: zoo33 ports: - 2184:2181 environment: ZOO_MY_ID: 3 ZOO_SERVERS: server.1=zoo1:2888:3888;2181 server.2=zoo2:2888:3888;2181 server.3=0.0.0.0:2888:3888;2181 volumes: - ./zookeeper/zoo3/data:/data - ./zookeeper/zoo3/datalog:/datalog networks: - docker_net
启动zookeeper集群
docker-compose -f docker-compose-zookeeper-cluster.yml up -d
查看集群
在 ZAB 算法中,存在 Leader、Follower、Observer 三种角色,举例查看zoo1的
docker exec -it zoo1 /bin/sh #查看角色 zkServer.sh status
kafka安装
docker-compose-kafka-cluster.yml
version: '3.5' networks: docker_net: external: true services: kafka1: image: wurstmeister/kafka restart: unless-stopped container_name: kafka1 ports: - "9093:9092" external_links: - zoo1 - zoo2 - zoo3 environment: KAFKA_BROKER_ID: 1 KAFKA_ADVERTISED_HOST_NAME: 192.168.232.204 ## 修改:宿主机IP KAFKA_ADVERTISED_PORT: 9093 ## 修改:宿主机映射port KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://192.168.232.204:9093 ## 绑定发布订阅的端口。修改:宿主机IP KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181,zoo2:2181,zoo3:2181" volumes: - "./kafka/kafka1/docker.sock:/var/run/docker.sock" - "./kafka/kafka1/data/:/kafka" networks: - docker_net kafka2: image: wurstmeister/kafka restart: unless-stopped container_name: kafka2 ports: - "9094:9092" external_links: - zoo1 - zoo2 - zoo3 environment: KAFKA_BROKER_ID: 2 KAFKA_ADVERTISED_HOST_NAME: 192.168.232.204 ## 修改:宿主机IP KAFKA_ADVERTISED_PORT: 9094 ## 修改:宿主机映射port KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://192.168.232.204:9094 ## 修改:宿主机IP KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181,zoo2:2181,zoo3:2181" volumes: - "./kafka/kafka2/docker.sock:/var/run/docker.sock" - "./kafka/kafka2/data/:/kafka" networks: - docker_net kafka3: image: wurstmeister/kafka restart: unless-stopped container_name: kafka3 ports: - "9095:9092" external_links: - zoo1 - zoo2 - zoo3 environment: KAFKA_BROKER_ID: 3 KAFKA_ADVERTISED_HOST_NAME: 192.168.232.204 ## 修改:宿主机IP KAFKA_ADVERTISED_PORT: 9095 ## 修改:宿主机映射port KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://192.168.232.204:9095 ## 修改:宿主机IP KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181,zoo2:2181,zoo3:2181" volumes: - "./kafka/kafka3/docker.sock:/var/run/docker.sock" - "./kafka/kafka3/data/:/kafka" networks: - docker_net kafka-manager: image: sheepkiller/kafka-manager:latest restart: unless-stopped container_name: kafka-manager hostname: kafka-manager ports: - "9000:9000" links: # 连接本compose文件创建的container - kafka1 - kafka2 - kafka3 external_links: # 连接本compose文件以外的container - zoo1 - zoo2 - zoo3 environment: ZK_HOSTS: zoo1:2181,zoo2:2181,zoo3:2181 ## 修改:宿主机IP TZ: CST-8 networks: - docker_net
执行以下命令启动
docker-compose -f docker-compose-kafka-cluster.yml up -d
可以看到 kafka 集群已经启动成功。 这样,我们就成功安装了由3个kafka和3个zookeeper组成的kafka集群。 并且还安装了kafka集群可视化管理工具: kafka-manag
访问192.168.232.204:9000,按图示添加相关配置
现在,我们进入集群,创建主题:
主题创建成功。 这样,我们就可以使用这个集群投入生产了!
命令行生产与消费
1.命令创建主题如果在可视化工具里创建了就不用创建了
[root@204 ~]# docker exec -it kafka1 bash # 进入容器 bash-4.4# cd /opt/kafka/ # 进入安装目录 bash-4.4# ./bin/kafka-topics.sh --list --zookeeper zoo1:2181,zoo2:2181,zoo3:2181 # 查看主 题列表 __consumer_offsets bash-4.4# ./bin/kafka-topics.sh --create --zookeeper zoo1:2181,zoo2:2181,zoo3:2181 -- replication-factor 2 --partitions 3 --topic test # 新建主题 Created topic test.
2.生产消息
bash-4.4# ./bin/kafka-console-producer.sh --broker-list kafka1:9092,kafka2:9092,kafka3:9092 --topic test
3.消费消息
bash-4.4# ./bin/kafka-console-consumer.sh --bootstrap-server kafka1:9092,kafka2:9092,kafka3:9092 --topic test --from-beginning
php整合kafka
#新建目录 [root@201 kafka]# mkdir kafka #进入目录 [root@201 kafka]# cd kafka #下载组件包: [root@201 kafka1]# composer require nmred/kafka-php
生产端代码:
[root@201 kafka]# vim producer.php <?php require './vendor/autoload.php'; date_default_timezone_set('PRC'); $config = \Kafka\ProducerConfig::getInstance(); $config->setMetadataRefreshIntervalMs(10000); $config->setMetadataBrokerList('192.168.232.204:9093'); $config->setBrokerVersion('1.0.0'); $config->setRequiredAck(1); $config->setIsAsyn(false); $config->setProduceInterval(500); $producer = new \Kafka\Producer(); for($i = 0; $i < 2; $i++) { $result = $producer->send([ [ 'topic' => 'test', 'value' => 'test1....message.', 'key' => '', ], ]); var_dump($result); }
消费端:
[root@201 kafka]# vim consumer.php require './vendor/autoload.php'; date_default_timezone_set('PRC'); $config = \Kafka\ConsumerConfig::getInstance(); $config->setMetadataRefreshIntervalMs(10000); $config->setMetadataBrokerList('192.168.232.204:9093'); $config->setGroupId('test'); $config->setBrokerVersion('1.0.0'); $config->setTopics(array('test')); //$config->setOffsetReset('earliest'); $consumer = new \Kafka\Consumer(); #开启消费 $consumer->start(function ($topic, $part, $message) { var_dump($message); //打印出获取的消息 });
本文由:xiasohu168.com 作者:xiaoshu发表,转载请注明来源!