生产者
发送消息:
Properties props = new Properties();
//kafka 集群,broker-list
props.put("bootstrap.servers", "172.24.211.140:9092");
props.put("key.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new
KafkaProducer<>(props);
for (int i = 0; i < 10; i++) {
var record =
new ProducerRecord<>("test", "Precision Products",
"France");
producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
System.out.println(metadata);
}
});
}
producer.close();
配置
- acks
- 定了必须要有多少个分区副本收到消息,生产者才会认为消息写入是成功的
- acks=0 ,生产者在成功写入消息之前不会等待任何来自服务器的响应 当 broker 故障时有可能 丢失数据
- acks=1 ,只要集群的首领节点收到消息,生产者就会收到一个来自服务器的成功响应 如果在 follower同步成功之前 leader 故障,那么将会丢失数据
- acks=all ,只有当所有参与复制的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应 如果在 follower 同步完成后,broker 发送 ack 之前,leader 发生故障,那么会造成 数据重复
- buffer.memory
- 设置生产者内存缓冲区的大小
- compression.type
- 设置消息压缩算法
- retries
- 决定了生产者可以重发消息的次数,如果达到这个次数,生产者会放弃重试并返回错误
- batch.size
- 指定了一个批次可以使用的内存大小
- linger.ms
- KafkaProducer 会在批次填满或 linger.ms 达到上限时把批次发送出去。默认情况下,只要有可用的线程,生产者就会把消息发送出去
- client.id
- max.in.flight.requests.per.connection
- 指定了生产者在收到服务器响应之前可以发送多少个消息
- timeout.ms、request.timeout.ms 和 metadata.fetch.timeout.ms
- max.block.ms
- 调用send时最长的阻塞时间
- max.request.siz
- receive.buffer.bytes 和 send.buffer.bytes
- 分别指定了 TCP socket 接收和发送数据包的缓冲区大小
顺序保证
- 将max.in.flight.requests.per.connection设置为1
保证顺序的方法就是:
- 每个主题只分为一个区
- 每次发送的消息发送到同一个分区
序列化器
- 自定义序列化器:实现
Serializer
接口- 不推荐使用
- 其他序列化
- avro:一种将shcema嵌入在数据里的序列化方式
分区策略
分区的原因:
- 方便扩展
- 提高并发
分区原则:
- 指明 partition 的情况下,直接将指明的值直接作为 partiton 值
- 没有指明 partition 值但有 key 的情况下,将 key 的 hash 值与 topic 的 partition数进行取余得到 partition 值
- 否则就是随机取一个值 然后再这个值的基础上进行轮询
自定义分区器:
实现Partitioner
接口
数据可靠性保证
- Exactly Once
将服务器的 ACK 级别设置为-1,可以保证 Producer 到 Server 之间不会丢失数据,即 AtLeast Once 语义
At Least Once + 幂等性 = Exactly Once