Kafka入门笔记

kafka是最初由linkedin公司开发的,使用scala语言编写,kafka是一个分布式,分区的,多副本的,多订阅者的日志系统(分布式MQ系统),可以用于搜索日志,监控日志,访问日志等。

kafka目前支持多种客户端的语言:java、python、c++、php等。

Apache Kafka

apache kafka是一个分布式发布-订阅消息系统和一个强大的队列,可以处理大量的数据,并使能够将消息从一个端点传递到另一个端点,kafka适合离线和在线消息消费。kafka消息保留在磁盘上,并在集群内复制以防止数据丢失。kafka构建在zookeeper同步服务之上。它与apache和spark非常好的集成,应用于实时流式数据分析。

Kafka优点

  • 可靠性:分布式的,分区,复制和容错的。
  • 可扩展性:kafka消息传递系统轻松缩放,无需停机。
  • 耐用性:kafka使用分布式提交日志,这意味着消息会尽可能快速的保存在磁盘上,因此它是持久的。
  • 性能:kafka对于发布和定于消息都具有高吞吐量。即使存储了许多TB的消息,他也爆出稳定的性能。
  • kafka非常快:保证零停机和零数据丢失。

kafka应用场景

指标分析:kafka 通常用于操作监控数据。这设计聚合来自分布式应用程序的统计信息, 以产生操作的数据集中反馈;

日志聚合解决方法:kafka可用于跨组织从多个服务器收集日志,并使他们以标准的合适提供给多个服务器;

流式处理:流式处理框架(spark,storm,flink)等主题中读取数据,对齐进行处理,并将处理后的数据写入新的主题,供 用户和应用程序使用,kafka的强耐久性在流处理的上下文中也非常的有用。

Kafka架构

官方架构图

  • 生产者API:允许应用程序发布记录流至一个或者多个kafka的主题(topics);
  • 消费者API:允许应用程序订阅一个或者多个主题,并处理这些主题接收到的记录流;
  • StreamsAPI:允许应用程序充当流处理器(stream processor),从一个或者多个主题获取输入流,并生产一个输出流到一个或 者多个主题,能够有效的变化输入流为输出流;
  • ConnectorAPI:允许构建和运行可重用的生产者或者消费者,能够把kafka主题连接到现有的应用程序或数据系统;

架构关系图

kafka支持消息持久化,消费端为拉模型来拉取数据,消费状态和订阅关系有客户端负责维护,消息消费完 后,不会立即删除,会保留历史消息。因此支持多订阅时,消息只会存储一份就可以了。

整体架构图

一个典型的kafka集群中包含若干个Producer,若干个Broker,若干个Consumer,以及一个zookeeper集群; kafka通过zookeeper管理集群配置,选举leader,以及在Consumer Group发生变化时进行Rebalance(负载均 衡);Producer使用push模式将消息发布到Broker;Consumer使用pull模式从Broker中订阅并消费消息。

kafka术语

  1. producer:消息的生产者,主要是用于生产消息的。主要是接入一些外部的数据源,从外部获取数据,比如说我们可以从flume获取数据,还可以通过ftp传入数据等,还可以通过kafka的API生产数据,通过push的方式,主动的将数据推送到kafka的topic当中去;
  2. topic:每条发布到kafka集群的消息都有一个类别,这个类别就叫做Topic;
  3. partition:消息的分区。为了解决数据保存的横向扩展的问题,所以将一个topic分为多个partition,每个partition保存topic当中的部分部署。为了解决partition丢失的问题,引入了副本机制,可以将一个partition复制多份出来保存;
  4. broker:kafka集群中包含一个或者多个服务实例,这种服务实例被称为Broker;
  5. consumer:消息的消费者,主要去消费topic当中的数据的,主动会去pull拉取topic当中的消息;
  6. zookeeper:为了解决消费者消费的时候,确定一个topic当中有多少个分区,分区分别都在哪一台机器上,引入zk来保存这些数据;
  7. kafka的消费模型,在kafka当中消费有组的概念。同一时间,一个组当中,只能有一个线程去消费一个partition当中的数据。
  8. kafka消费必要的三个条件
    1. 确定哪一个topic;
    2. 必须知道zk的地址;
    3. 消息消费的offset偏移量

kafka的安装

kafka的安装必须要确保先安装jdk、zookeeper,必须要保证时钟同步

  • 第一步:下载上传解压,http://archive.apache.org/dist/kafka/

    1
    2
    cd /export/software/
    tar -zxvf kafka_2.11-1.0.0.tgz -C ../servers/
  • 第二步:修改配置文件

    第一台机器:

    1
    2
    3
    mkdir /export/servers/kafka_2.11-1.0.0/logs
    /export/servers/kafka_2.11-1.0.0/config
    vim server.properties
    1
    2
    3
    4
    5
    broker.id=0
    log.dirs=/export/servers/kafka_2.11-1.0.0/logs
    zookeeper.connect=node01:2181,node02:2181,node03:2181
    delete.topic.enable=true
    host.name=node01

    其它机器:

    1
    2
    scp -r /export/servers/kafka_2.11-1.0.0 node02:$PWD
    scp -r /export/servers/kafka_2.11-1.0.0 node03:$PWD

    然后分别将每台机器中的broker.idhost.name改成本机

  • 第三步:启动kafka集群

    注意事项:在kafka启动前,一定要让zookeeper启动起来。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    ## 前台启动;node01服务器执行以下命令来启动kafka集群
    cd /export/servers/kafka_2.11-0.10.0.0
    bin/kafka-server-start.sh config/server.properties

    ## 后台启动;node01执行以下命令将kafka进程启动在后台
    cd /export/servers/kafka_2.11-0.10.0.0
    nohup bin/kafka-server-start.sh config/server.properties &
    # 或
    nohup bin/kafka-server-start.sh config/server.properties > /dev/null 2>&1 &
  • 停止命令

    node01执行以下命令便可以停止kakfa进程

    1
    2
    cd /export/servers/kafka_2.11-0.10.0.0
    bin/kafka-server-stop.sh
  • 查看kafka启动进程

    通过jps命令来查看进程是否存在

    1
    jps

kafkaManager监控工具

下载kafkaManager

下载地址:https://github.com/yahoo/kafka-manager/

下载源码,然后上传解压准备编译

1
2
3
cd /export/servers/kafka-manager-1.3.3.15
unzip kafka-manager-1.3.3.15.zip -d ../servers/
./sbt clean dist

编译会比较麻烦,可以直接下载编译好的版本。

上传编译好的压缩包并解压

1
2
cd  /export/softwares
unzip kafka-manager-1.3.3.15.zip -d /export/servers/

修改配置文件

1
2
cd /export/servers/kafka-manager-1.3.3.15/
vim conf/application.conf
1
kafka-manager.zkhosts="node01:2181,node02:2181,node03:2181"

添加执行权限

1
2
cd /export/servers/kafka-manager-1.3.3.15/bin
chmod u+x ./*

启动kafkamanager进程

1
2
cd /export/servers/kafka-manager-1.3.3.15
nohup bin/kafka-manager -Dconfig.file=/export/servers/kafka-manager-1.3.3.15/conf/application.conf -Dhttp.port=8070 2>&1 &

浏览器页面访问

http://node01:8070/

kafka监控及运维

在开发工作中,消费在Kafka集群中消息,数据变化是我们关注的问题,当业务前提不复杂时,我们可以使用Kafka 命令提供带有Zookeeper客户端工具的工具,可以轻松完成我们的工作。随着业务的复杂性,增加Group和 Topic,那么我们使用Kafka提供命令工具,已经感到无能为力,那么Kafka监控系统目前尤为重要,我们需要观察 消费者应用的细节。

kafka-eagle概述

为了简化开发者和服务工程师维护Kafka集群的工作有一个监控管理工具,叫做 Kafka-eagle。这个管理工具可以很容易地发现分布在集群中的哪些topic分布不均匀,或者是分区在整个集群分布不均匀的的情况。它支持管理多个集群、选择副本、副本重新分配以及创建Topic。同时,这个管理工具也是一个非常好的可以快速浏览这个集群的工具,

环境和安装

环境要求:需要安装jdk,启动zk以及kafka的服务

下载源码包

kafka-eagle官网:http://download.kafka-eagle.org/

我们可以从官网上面直接下载最细的安装包即可kafka-eagle-bin-1.3.2.tar.gz这个版本即可

代码托管地址:https://github.com/smartloli/kafka-eagle/releases

解压

这里我们选择将kafak-eagle安装在第三台;直接将kafka-eagle安装包上传到node03服务器的/export/softwares路径下,node03服务器执行一下命令进行解压

1
2
3
4
cd /export/softwares/
tar -zxf kafka-eagle-bin-1.3.2.tar.gz -C /export/servers/
cd /export/servers/kafka-eagle-bin-1.3.2
tar -zxf kafka-eagle-web-1.3.2-bin.tar.gz

准备数据库

kafka-eagle需要使用一个数据库来保存一些元数据信息,我们这里直接使用msyql数据库来保存即可,在node03服务器执行以下命令创建一个mysql数据库即可

进入mysql客户端

1
2
mysql -uroot -p
create database eagle;

修改kafak-eagle配置文件

node03执行以下命令修改kafak-eagle配置文件

1
2
cd /export/servers/kafka-eagle-bin-1.3.2/kafka-eagle-web-1.3.2/conf
vim system-config.properties
1
2
3
4
5
6
7
8
kafka.eagle.zk.cluster.alias=cluster1,cluster2
cluster1.zk.list=node01:2181,node02:2181,node03:2181
cluster2.zk.list=node01:2181,node02:2181,node03:2181

kafka.eagle.driver=com.mysql.jdbc.Driver
kafka.eagle.url=jdbc:mysql://node03:3306/eagle
kafka.eagle.username=root
kafka.eagle.password=123456

配置环境变量

kafka-eagle必须配置环境变量,node03服务器执行以下命令来进行配置环境变量

1
vim /etc/profile
1
2
export KE_HOME=/export/servers/kafka-eagle-bin-1.3.2/kafka-eagle-web-1.3.2
export PATH=:$KE_HOME/bin:$PATH

启动kafka-eagle

node03执行以下界面启动kafka-eagle

1
2
3
cd /export/servers/kafka-eagle-bin-1.3.2/kafka-eagle-web-1.3.2/bin
chmod u+x ke.sh
./ke.sh start

主界面

访问kafka-eagle

http://node03:8048/ke/account/signin?/ke/

用户名:admin

密码:123456

-------------本文结束感谢您的阅读-------------

本文标题:Kafka入门笔记

文章作者:Mr.wj

发布时间:2020年01月15日 - 22:51

最后更新:2020年01月15日 - 22:53

原始链接:https://www.wjqixige.cn/2020/01/15/Kafka入门笔记/

许可协议: 署名-非商业性使用-禁止演绎 4.0 国际 转载请保留原文链接及作者。

Mr.wj wechat
欢迎您扫一扫上面的微信公众号,订阅我的博客!