kafka理论知识

kafka是高吞吐量的开源流处理平台,可以作为一种分布式的基于发布与订阅的消息队列,但消息队列的部分功能需要自己编写,如失败重试动作,同步异步任务等,与rabbitmq有区别。

异步处理

用户请求服务后需等待服务完成再得到结果信息,这是同步处理。对于异步处理,用户提交请求后,请求放入消息队列后,无需等待执行完成,用户即可结束进程。异步处理可以实现解耦,异步通信,缓冲缓解服务器压力,提高灵活性和峰值处理能力。

消息队列的两种模式

1点对点模式

一对一,消费者主动拉取数据,消息收到后清楚,即一个消息对应一个消费者。

2发布订阅模式

此模式又分为消费者主动拉取和队列主动推送,推送速度有不同的角色决定。

生产者发布消息到topic中,一个消息可以对应多个消息订阅者。

kafka是基于发布订阅模式的消费者拉取。

消费者主动拉取模式

消费者需要主动询问是否有我的消息,执行了长轮询,消耗资源。

队列主动推送模式

消费者性能有差别,队列推送要满足不同的速度。

基本概念

Producer

消息生产者,向kafka broker发消息的客户端

Consumer

消息消费者,向Kafka broker取消息的客户端

Consumer Group

消费者组,由多个consumer组成。消费者组内每个消费者负责不同分区的数据,一个分区只能由一个组内消费者消费。一个消费者组的多个消费者是并行处理同一份数据的,它们整体视为一个订阅者。Topic虽然有多个分区,其也作为整体被producer选择发布,被consumer订阅。consumer组内如何分布,由kafka决定。

Broker

一台kafka服务器就是一个broker,一个broker可以有多个topic的子分区。

Topic

是producer发布和consumer订阅的基本单位。

Partition

将topic进行partition,分布到多个broker上,每个partion是topic数据的一部分,其保持内部有序。topic可以设定replication数量。

Replica

副本,一个topic每个partition有若干个副本,一个leader和若干个follower。

Leader与Follower

Leader是发布和订阅的对象,Follower实时从Leader同步数据,当Leader故障时,挑选一个Follower为Leader。

Producer Consumer工作原理

Producer

main线程将数据存储到RecordAccumulator,sender不断读取RecordAccumulator并发送到broker。sender根据两个参数发送数据,batch.size,linger.ms,当超过时间或达到了batchsize会进行发送。

ProduceRecord

根据partion分区,根据key的哈希分区,随机分区,当随机分区触发一次send,下次再随机挑选一个。

自定义分区ProduceRecord

1实现Partitioner接口

2重写partition方法

3设定使用自定义partitioner:

properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,"com.atgui gu.kafka.producer.MyPartitioner");

生产经验

生产者的吞吐量,数据的可靠性(boker能否保证接收到所有数据),数据精确消费(通过生产者幂等性和生产者事务实现不丢,不重复的精确消费),数据在broker的分区内有序。

Producer提高吞吐量

结合producer的理论部分不难这些关键参数:batch.size,linger.ms,compression.type,RecordAccumulator。涉及到双端队列的大小,何时发送数据,数据是否压缩。

 //batch size
 properties.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
  // linger.ms:等待时间,默认5-100ms
  properties.put(ProducerConfig.LINGER_MS_CONFIG, 1);
  // RecordAccumulator:缓冲区大小,默认 32M:buffer.memory
  properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG,

数据可靠性

acks=0,生产者发送过来数据就不管了,可靠性差,效率高; acks=1,生产者发送过来数据Leader应答,可靠性中等,效率中等; acks=-1,生产者发送过来数据Leader和ISR队列里面所有Follwer应答,可靠性高,效率低; 在生产环境中,acks=0很少使用;acks=1,一般用于传输普通日志,允许丢个别数据;acks=-1,一般用于传输和钱相关的数据, 对可靠性要求比较高的场景。

 //设置ACK类型
 properties.put(ProducerConfig.ACKS_CONFIG, "all");
  // 重试次数 retries,默认是 int 最大值,2147483647
 properties.put(ProducerConfig.RETRIES_CONFIG, 3);

min.insync.replicas 表示IST in sync replica同步副本数目,即leader+follower保持同步的数量。

replica.lag.time.max.ms表示 同步时间阈值,超过此时间不响应的replica follower剔除出ISR。

数据去重

至少一次,至多一次,精确一次。

当ACK设置为0时,就是至多一次。不会重发

当ACK级别为-1,副本数大于等于2,ISR应答副本数大于等于2,就是至少一次。可能重发

当使用幂等性和事务时,并满足至少一次的相同条件时(ack=-1,ISR>2,replica>2),保持精确一次。

幂等性原理

使用<PID,Partition,SeqNumber>确定唯一消息,不可持久化重复的数据。即其保证不重启情况下的单个分区数据幂等性。

PID:producerID,每次kafka重启分配一个新的PID

Partition,表示分区

Sequence Number表示在producer端的编号。

enable.idempotence 默认为true,开启。

生产者事务

开启事务必须开启幂等性。

 // 1 初始化事务
 void initTransactions();
 ​
 // 2 开启事务
 void beginTransaction() throws ProducerFencedException;
 ​
  // 3 在事务内提交已经消费的偏移量(主要用于消费者)
 void sendOffsetsToTransaction(Map offsets, String consumerGroupId) throws ProducerFencedException;
 ​
 // 4 提交事务
 void commitTransaction() throws ProducerFencedException;
 ​
 // 5 放弃事务(类似于回滚事务的操作)
 void abortTransaction() throws ProducerFencedException;
 ​
 // 设置事务 id(必须),事务 id 任意起名
  properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG,
 "transaction_id_0");
 // 初始化事务
  kafkaProducer.initTransactions();
  // 开启事务
  kafkaProducer.beginTransaction();
  try{
  kafkaProducer.send("","")
  kafkaProducer.commitTransaction();
  }catch(Exception e){
  kafkaProducer.abortTransaction();
  }finally{
  kafkaProducer.close();
  }

数据有序

1未开幂等性启时,max.in.flight.requests.per.connection需要设置为1。

2开启幂等性时,max.in.flight.requests.per.connection需要设置小于等于5。

相关链接

尚硅谷技术之Kafka2022版:尚硅谷官网

 
目前共有0条评论
  • 暂无Trackback
你目前的身份是游客,评论请输入昵称和电邮!