之前写了一篇文章,当然,来自Apache Kafka的翻译文档,让大家更能理解Kafka,地址是:
http://www.lanxinbase.com/?p=2314
Apache Kafka® 是一个分布式流媒体平台,很适合用来处理大并发的数据,本人使用了Kafka有两年多了,非常稳定。
1.下载Kafka
我测试及开发的系统是windows,所以现在要下载一个win版本的,JDK环境就不说了,本次测试的版本号:kafka_2.12-1.0.1。
下载地址:http://kafka.apache.org/downloads
由于Kafka的运行在zookeeper环境内,所以在启动之前,我们需要安装一下zookeeper,版本号:zookeeper-3.4.11。
下载地址:https://zookeeper.apache.org/releases.html
2.配置Kafka
如果下载下来不配置的话,会出现很多问题。
\kafka_2.12-1.0.1\config\server.properties
############################# Socket Server Settings #############################
# The address the socket server listens on. It will get the value returned from
# java.net.InetAddress.getCanonicalHostName() if not configured.
# FORMAT:
# listeners = listener_name://host_name:port
# EXAMPLE:
# listeners = PLAINTEXT://your.host.name:9092
#listeners=PLAINTEXT://:9092
port=9092
host.name=127.0.0.1
…
############################# Log Basics #############################
# A comma seperated list of directories under which to store log files
#log.dirs=/tmp/kafka-logs
log.dirs=D:\\kafka\\logs
示例的配置有删减,因为事实上我只配置了这几点而已:
- port=9092
- host.name=127.0.0.1
- log.dirs=D:\\kafka\\logs
\zookeeper-3.4.11\conf\zoo.cfg
dataDir=D:\\kafka\\logs
\kafka_2.12-1.0.1\config\zookeeper.properties
dataDir=D:\\kafka\\logs
到这里,基本上就可以运行了,现在首先启动zookeeper服务器:
zkServer //简单直接
./zookeeper-3.4.11/bin/zkServer.cmd //或许可以这样子
./zkServer.sh start //linux 可能是这样子
然后启动kafka服务器:
.\kafka_2.12-1.0.1\bin\windows\kafka-server-start.bat .\kafka_2.12-1.0.1\config\server.properties
./kafka-server-start.sh ../config/server.properties //linux 启动
./kafka-server-start.sh ../config/server.properties 1>/dev/null 2>&1 & //linux 后台启动
nohup ./kafka-server-start.sh ../config/server.properties & //linux 后台启动
服务器启动完了,现在来写代码,配置kafka
3.集成Kafka
maven
<!-- https://mvnrepository.com/artifact/org.springframework.kafka/spring-kafka -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>1.3.0.RELEASE</version>
</dependency>
配置生产者
private final Map<String, Object> producerArg = new HashMap<>();
private DefaultKafkaProducerFactory kafkaProducerFactory;
private KafkaTemplate kafkaTemplate;
private void initArg() {
producerArg.put("bootstrap.servers", "127.0.0.1:9092");
producerArg.put("group.id", "100");
producerArg.put("compression.type", "gzip");
producerArg.put("reconnect.backoff.ms ", 20000);
producerArg.put("retry.backoff.ms", 20000);
producerArg.put("retries", 30);
producerArg.put("batch.size", "16384");
producerArg.put("linger.ms", "50");
producerArg.put("acks", "all");
producerArg.put("buffer.memory", "33554432");
producerArg.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
producerArg.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
}
/**
* 创建一个生产者工厂类
* @return
*/
public DefaultKafkaProducerFactory getKafkaProducerFactory() {
if (kafkaProducerFactory == null) {
kafkaProducerFactory = new DefaultKafkaProducerFactory(producerArg);
}
return kafkaProducerFactory;
}
/**
* 创建一个消息模板
* @param topic 默认的TOPIC
* @param listener 生产者监听,如不需要则传入null
* @return KafkaTemplate
*/
@Override
public KafkaTemplate createKafkaTemplate(String topic, ProducerListener listener) {
if (kafkaTemplate == null) {
kafkaTemplate = new KafkaTemplate(this.getKafkaProducerFactory());
kafkaTemplate.setDefaultTopic(TOPIC_DEFAULT);
kafkaTemplate.setProducerListener(listener);
}
return kafkaTemplate;
}
/**
* 发布消息
* @param topic TopicName
* @param message 消息字符串,通常为JSON string
* @param isUsePartition 是否使用分区
* @param partitionNum 分区的数量
* @param role 用来区分消息key值
* @return
*/
@Override
public boolean send(String topic, String message, boolean isUsePartition, Integer partitionNum, String role) {
if (role == null) {
role = ROLE_DEFAULT;
}
String key = role + "_" + message.hashCode();
ListenableFuture<SendResult<String, Object>> result;
if (isUsePartition) {
int index = getPartitionIndex(key, partitionNum);
result = kafkaTemplate.send(topic, index, key, message);
} else {
result = kafkaTemplate.send(topic, key, message);
}
return checkResult(result);
}
配置消费者
private final Map<String, Object> consumerArg = new HashMap<>();
private DefaultKafkaConsumerFactory kafkaConsumerFactory;
/**
* 初始化参数
*/
private void initArg() {
String groupId = "20190504";
consumerArg.put("bootstrap.servers", "127.0.0.1:9092");
consumerArg.put("group.id", groupId);//消费群组,如果需要所有消费者都能接收到消息,则为每个消费者设置不同的群组Id
consumerArg.put("enable.auto.commit", "false");
consumerArg.put("auto.commit.interval.ms", "1000");
consumerArg.put("auto.offset.reset", "latest");
consumerArg.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
consumerArg.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
}
/**
* 创建一个消费者工厂类
* @return
*/
public DefaultKafkaConsumerFactory getKafkaConsumerFactory() {
if (kafkaConsumerFactory == null) {
kafkaConsumerFactory = new DefaultKafkaConsumerFactory(consumerArg);
}
return kafkaConsumerFactory;
}
/**
* 添加一个消费者监听
* @param listener 监听器
* @param groupId 消费者Id,需要让所有的消费者接收消息,请指定不同的分组Id
* @param topic 监听Topic名称
* @return 返回KafkaMessageListenerContainer对象,可以进行stop或start
*/
@Override
public KafkaMessageListenerContainer addListener(MessageListener listener, String groupId, String... topic) {
ContainerProperties properties = new ContainerProperties(topic);
properties.setMessageListener(listener);
properties.setGroupId(groupId);
KafkaMessageListenerContainer container = new KafkaMessageListenerContainer(getKafkaConsumerFactory(), properties);
container.start();
return container;
}
非常简单,现在已经配置完,附上测试代码:
@RestController
@RequestMapping(value = "/test")
public class TestKafkaMQController {
private static final Logger logger = Logger.getLogger("kafkaMQ>");
@Autowired
private IKafkaMQService kafkaMQService;
private final Map<String, KafkaMessageListenerContainer> listenerContainerMap = new ConcurrentHashMap<>();
/**
* http://localhost:8180/test/kafka/pub
*
* @param request
* @return
*/
@RequestMapping(value = "/kafka/pub")
public ResultResp<Void> kafkaPub(HttpServletRequest request) {
ResultResp<Void> resp = new ResultResp<>();
String msg = "test kafka " + DateTimeUtils.getTime();
try {
kafkaMQService.send(KafkaMQService.TOPIC_DEFAULT, msg);
resp.setInfo(msg);
} catch (Exception e) {
e.printStackTrace();
resp.setInfo(e.getMessage());
}
return resp;
}
/**
* http://localhost:8180/test/kafka/sub?group=20190504&id=100
* http://localhost:8180/test/kafka/sub?group=20190504&id=101
* http://localhost:8180/test/kafka/sub?group=20190503&id=102
*
* @param request
* @return
*/
@RequestMapping(value = "/kafka/sub")
public ResultResp<Void> kafkaSub(HttpServletRequest request) {
ResultResp<Void> resp = new ResultResp<>();
String id = request.getParameter("id");
String group = request.getParameter("group");
try {
KafkaMessageListenerContainer container = kafkaMQService.addListener(new MessageListener<String, Object>() {
@Override
public void onMessage(ConsumerRecord<String, Object> record) {
String log = "%s#{topic:%s, key:%s, value:%s, offset:%s, partition:%s, timestamp:%s }";
logger.info(String.format(log, id, record.topic(), record.key(), record.value(), record.offset(), record.partition(), record.timestamp()));
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, group, KafkaMQService.TOPIC_DEFAULT);
listenerContainerMap.put(id, container);
resp.setInfo(id);
} catch (Exception e) {
e.printStackTrace();
resp.setInfo(e.getMessage());
}
return resp;
}
/**
* http://localhost:8180/test/kafka/cancel?id=100
*
* @param request
* @return
*/
@RequestMapping(value = "/kafka/cancel")
public ResultResp<Void> kafkaCancel(HttpServletRequest request) {
ResultResp<Void> resp = new ResultResp<>();
String id = request.getParameter("id");
if (listenerContainerMap.containsKey(id)) {
listenerContainerMap.get(id).stop();
listenerContainerMap.remove(id);
}
return resp;
}
}
完整的KafkaService服务类
IKafkaMQService接口
package com.lanxinbase.system.service.resource;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.listener.KafkaMessageListenerContainer;
import org.springframework.kafka.listener.MessageListener;
import org.springframework.kafka.listener.MessageListenerContainer;
import org.springframework.kafka.support.ProducerListener;
import java.util.Map;
/**
* Created by alan on 2019/5/4.
*/
public interface IKafkaMQService {
Map<String, Object> getProducerFactoryArg();
KafkaTemplate getKafkaTemplate();
KafkaTemplate getKafkaTemplate(String topic);
KafkaTemplate getKafkaTemplate(String topic, ProducerListener listener);
KafkaTemplate createKafkaTemplate(String topic, ProducerListener listener);
boolean send(String topic, String message);
boolean send(String topic, String message, boolean isUsePartition, Integer partitionNum);
boolean send(String topic, String message, boolean isUsePartition, Integer partitionNum, String role);
int getPartitionIndex(String hashCode, int partitionNum);
Map<String, Object> getConsumerFactoryArg();
Map<String, Object> setConsumerFactoryArg(String key, Object val);
KafkaMessageListenerContainer addListener(MessageListener listener, String topic);
KafkaMessageListenerContainer addListener(MessageListener listener, String groupId, String... topic);
}
KafkaMQService实现
package com.lanxinbase.system.service;
import com.lanxinbase.system.basic.CompactService;
import com.lanxinbase.system.service.resource.IKafkaMQService;
import com.lanxinbase.system.utils.NumberUtils;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.listener.KafkaMessageListenerContainer;
import org.springframework.kafka.listener.MessageListener;
import org.springframework.kafka.listener.config.ContainerProperties;
import org.springframework.kafka.support.ProducerListener;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Service;
import org.springframework.util.concurrent.ListenableFuture;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutionException;
/**
* Created by alan on 2019/5/4.
*
* 0.需要下载Kafka,这里我下载的版本是:kafka_2.12-1.0.1。
* 1.配置kafka(主要是日志的路径,Socket Server Settings:{
* port=9092
* host.name=127.0.0.1
* })
* 2.启动zookeeper:zkServer
* 3.启动Kafka:.\bin\windows\kafka-server-start.bat .\config\server.properties
*
* Topic Test:
*
* 发送消息:
* http://localhost:8180/test/kafka/pub
*
* Id=100 & 100 监听:
* http://localhost:8180/test/kafka/sub?group=20190504&id=100
* http://localhost:8180/test/kafka/sub?group=20190504&id=101
*
* 测试日志:
* 04-May-2019 16:13:00.647 .onMessage 100#{topic:lan_topic, key:app_-1937508585, value:test kafka 1556957580589, offset:113, partition:0, timestamp:1556957580589 }
*
* Id=102 监听
* http://localhost:8180/test/kafka/sub?group=20190503&id=102
*
* 测试日志:
* 04-May-2019 16:13:06.892 .onMessage 102#{topic:lan_topic, key:app_-1937508585, value:test kafka 1556957580589, offset:113, partition:0, timestamp:1556957580589 }
* 注:102监听的Topic跟Id=100的是一样的,但是group.id不一样,所有102会收到上一条消息,可以通过时间戳对比
*
* ------------------------------------------------------------------------------------------------------------------
* 发送消息:
* http://localhost:8180/test/kafka/pub
*
* 测试日志:
* 04-May-2019 16:13:11.292 .onMessage 102#{topic:lan_topic, key:app_-1936558289, value:test kafka 1556957591240, offset:114, partition:0, timestamp:1556957591240 }
* 04-May-2019 16:13:11.293 .onMessage 100#{topic:lan_topic, key:app_-1936558289, value:test kafka 1556957591240, offset:114, partition:0, timestamp:1556957591240 }
* 注:由于100&102的group.id不一致,所以它们都收到了消息,但是为什么101收不到消息呢?因为是100的服务器状态良好,现在我们来取消100的监听
*
* ------------------------------------------------------------------------------------------------------------------
* 取消监听:
* http://localhost:8180/test/kafka/cancel?id=100
* KafkaMessageListenerContainer.stop();
*
* 发送消息:
* http://localhost:8180/test/kafka/pub
*
* 测试日志:
* 04-May-2019 16:13:23.147 .onMessage 101#{topic:lan_topic, key:app_-1916183009, value:test kafka 1556957603093, offset:115, partition:0, timestamp:1556957603093 }
* 04-May-2019 16:13:23.147 .onMessage 102#{topic:lan_topic, key:app_-1916183009, value:test kafka 1556957603093, offset:115, partition:0, timestamp:1556957603093 }
* 注:这下只有101&102能收到消息了。
*
* @See TestKafkaMQController
*/
@Service
public class KafkaMQService extends CompactService implements IKafkaMQService, InitializingBean, DisposableBean {
private static final String uri = "127.0.0.1:9092";
public static final String TOPIC_DEFAULT = "lan_topic";
public static final String ROLE_DEFAULT = "app";
private final Map<String, Object> producerArg = new HashMap<>();
private final Map<String, Object> consumerArg = new HashMap<>();
private DefaultKafkaProducerFactory kafkaProducerFactory;
private DefaultKafkaConsumerFactory kafkaConsumerFactory;
private KafkaTemplate kafkaTemplate;
public KafkaMQService() {
}
/**
* 启动后执行
* @throws Exception
*/
@Override
public void afterPropertiesSet() throws Exception {
this.initArg();
this.getKafkaProducerFactory();
// this.getKafkaConsumerFactory();
kafkaTemplate = this.createKafkaTemplate(TOPIC_DEFAULT, this.getProducerListener());
}
/**
* 生产者监听
* @return
*/
private ProducerListener<String, String> getProducerListener() {
return new ProducerListener<String, String>() {
@Override
public void onSuccess(String topic, Integer partition, String key, String value, RecordMetadata recordMetadata) {
StringBuffer sb = new StringBuffer();
sb.append("success{")
.append("topic:" + topic)
.append(",partition:" + partition)
.append(",key:" + key)
.append(",value:" + value)
.append("}");
logger(sb.toString());
}
@Override
public void onError(String topic, Integer partition, String key, String value, Exception exception) {
StringBuffer sb = new StringBuffer();
sb.append("error{")
.append("topic:" + topic)
.append(",partition:" + partition)
.append(",key:" + key)
.append(",value:" + value)
.append("}");
logger(sb.toString());
}
@Override
public boolean isInterestedInSuccess() {
return false;
}
};
}
/**
* 初始化参数
*/
private void initArg() {
producerArg.put("bootstrap.servers", uri);
producerArg.put("group.id", "100");
producerArg.put("compression.type", "gzip");
producerArg.put("reconnect.backoff.ms ", 20000);
producerArg.put("retry.backoff.ms", 20000);
producerArg.put("retries", 30);
producerArg.put("batch.size", "16384");
producerArg.put("linger.ms", "50");
producerArg.put("acks", "all");
producerArg.put("buffer.memory", "33554432");
producerArg.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
producerArg.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
String groupId = "20190504";
consumerArg.put("bootstrap.servers", uri);
consumerArg.put("group.id", groupId);//消费群组,如果需要所有消费者都能接收到消息,则为每个消费者设置不同的群组Id
consumerArg.put("enable.auto.commit", "false");
consumerArg.put("auto.commit.interval.ms", "1000");
consumerArg.put("auto.offset.reset", "latest");
consumerArg.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
consumerArg.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
}
@Override
public Map<String, Object> getProducerFactoryArg() {
return producerArg;
}
@Override
public KafkaTemplate getKafkaTemplate() {
return this.getKafkaTemplate(TOPIC_DEFAULT);
}
@Override
public KafkaTemplate getKafkaTemplate(String topic) {
return this.getKafkaTemplate(topic, null);
}
@Override
public KafkaTemplate getKafkaTemplate(String topic, ProducerListener listener) {
return this.createKafkaTemplate(topic, listener);
}
/**
* 创建一个消息模板
* @param topic 默认的TOPIC
* @param listener 生产者监听,如不需要则传入null
* @return KafkaTemplate
*/
@Override
public KafkaTemplate createKafkaTemplate(String topic, ProducerListener listener) {
if (kafkaTemplate == null) {
kafkaTemplate = new KafkaTemplate(this.getKafkaProducerFactory());
kafkaTemplate.setDefaultTopic(TOPIC_DEFAULT);
kafkaTemplate.setProducerListener(listener);
}
return kafkaTemplate;
}
/**
* 发布消息
* @param topic TopicName
* @param message 消息字符串,通常为JSON string
* @return
*/
@Override
public boolean send(String topic, String message) {
return this.send(topic, message, false, 0);
}
/**
* 发布消息
* @param topic TopicName
* @param message 消息字符串,通常为JSON string
* @param isUsePartition 是否使用分区
* @param partitionNum 分区的数量
* @return
*/
@Override
public boolean send(String topic, String message, boolean isUsePartition, Integer partitionNum) {
return this.send(topic, message, isUsePartition, partitionNum, ROLE_DEFAULT);
}
/**
* 发布消息
* @param topic TopicName
* @param message 消息字符串,通常为JSON string
* @param isUsePartition 是否使用分区
* @param partitionNum 分区的数量
* @param role 用来区分消息key值
* @return
*/
@Override
public boolean send(String topic, String message, boolean isUsePartition, Integer partitionNum, String role) {
if (role == null) {
role = ROLE_DEFAULT;
}
String key = role + "_" + message.hashCode();
ListenableFuture<SendResult<String, Object>> result;
if (isUsePartition) {
int index = getPartitionIndex(key, partitionNum);
result = kafkaTemplate.send(topic, index, key, message);
} else {
result = kafkaTemplate.send(topic, key, message);
}
return checkResult(result);
}
/**
* 检查是否发送成功
* @param result ListenableFuture
* @return
*/
private boolean checkResult(ListenableFuture<SendResult<String, Object>> result) {
if (result != null) {
try {
long offset = result.get().getRecordMetadata().offset();
if (offset >= 0) {
return true;
}
logger("unknown offset.");
} catch (InterruptedException e) {
e.printStackTrace();
logger("send time out.", e.getMessage());
} catch (ExecutionException e) {
e.printStackTrace();
logger("send time out.", e.getMessage());
}
}
return false;
}
/**
* 获取分区索引,根据key自动分配
* @param hashCode key的hashCode
* @param partitionNum 分区的总数量
* @return 返回索引号
*/
@Override
public int getPartitionIndex(String hashCode, int partitionNum) {
if (hashCode == null) {
return NumberUtils.nextInt(partitionNum);
}
return Math.abs(hashCode.hashCode()) % partitionNum;
}
@Override
public Map<String, Object> getConsumerFactoryArg() {
return consumerArg;
}
@Override
public Map<String, Object> setConsumerFactoryArg(String key, Object val) {
consumerArg.put(key, val);
return consumerArg;
}
/**
* 添加一个消费者监听
* @param listener 监听器
* @param topic 监听Topic名称
* @return 返回KafkaMessageListenerContainer对象,可以进行stop或start
*/
@Override
public KafkaMessageListenerContainer addListener(MessageListener listener, String topic) {
return this.addListener(listener, getConsumerFactoryArg().get("group.id").toString(), topic);
}
/**
* 添加一个消费者监听
* @param listener 监听器
* @param groupId 消费者Id,需要让所有的消费者接收消息,请指定不同的分组Id
* @param topic 监听Topic名称
* @return 返回KafkaMessageListenerContainer对象,可以进行stop或start
*/
@Override
public KafkaMessageListenerContainer addListener(MessageListener listener, String groupId, String... topic) {
ContainerProperties properties = new ContainerProperties(topic);
properties.setMessageListener(listener);
properties.setGroupId(groupId);
KafkaMessageListenerContainer container = new KafkaMessageListenerContainer(getKafkaConsumerFactory(), properties);
container.start();
return container;
}
/**
* 创建一个生产者工厂类
* @return
*/
public DefaultKafkaProducerFactory getKafkaProducerFactory() {
if (kafkaProducerFactory == null) {
kafkaProducerFactory = new DefaultKafkaProducerFactory(producerArg);
}
return kafkaProducerFactory;
}
/**
* 创建一个消费者工厂类
* @return
*/
public DefaultKafkaConsumerFactory getKafkaConsumerFactory() {
if (kafkaConsumerFactory == null) {
kafkaConsumerFactory = new DefaultKafkaConsumerFactory(consumerArg);
}
return kafkaConsumerFactory;
}
@Override
public void destroy() throws Exception {
}
}
这里就不附图片了,完整的实现类里面有测试日志。
近期评论