之前写了一篇文章,当然,来自Apache Kafka的翻译文档,让大家更能理解Kafka,地址是:
http://www.lanxinbase.com/?p=2314
Apache Kafka® 是一个分布式流媒体平台,很适合用来处理大并发的数据,本人使用了Kafka有两年多了,非常稳定。
1.下载Kafka
我测试及开发的系统是windows,所以现在要下载一个win版本的,JDK环境就不说了,本次测试的版本号:kafka_2.12-1.0.1。
下载地址:http://kafka.apache.org/downloads
由于Kafka的运行在zookeeper环境内,所以在启动之前,我们需要安装一下zookeeper,版本号:zookeeper-3.4.11。
下载地址:https://zookeeper.apache.org/releases.html
2.配置Kafka
如果下载下来不配置的话,会出现很多问题。
\kafka_2.12-1.0.1\config\server.properties
############################# Socket Server Settings #############################
# The address the socket server listens on. It will get the value returned from
# java.net.InetAddress.getCanonicalHostName() if not configured.
# FORMAT:
# listeners = listener_name://host_name:port
# EXAMPLE:
# listeners = PLAINTEXT://your.host.name:9092
#listeners=PLAINTEXT://:9092port=9092
host.name=127.0.0.1…
############################# Log Basics #############################
# A comma seperated list of directories under which to store log files
#log.dirs=/tmp/kafka-logs
log.dirs=D:\\kafka\\logs
示例的配置有删减,因为事实上我只配置了这几点而已:
- port=9092
- host.name=127.0.0.1
- log.dirs=D:\\kafka\\logs
\zookeeper-3.4.11\conf\zoo.cfg
dataDir=D:\\kafka\\logs
\kafka_2.12-1.0.1\config\zookeeper.properties
dataDir=D:\\kafka\\logs
到这里,基本上就可以运行了,现在首先启动zookeeper服务器:
zkServer //简单直接
./zookeeper-3.4.11/bin/zkServer.cmd //或许可以这样子
./zkServer.sh start //linux 可能是这样子
然后启动kafka服务器:
.\kafka_2.12-1.0.1\bin\windows\kafka-server-start.bat .\kafka_2.12-1.0.1\config\server.properties
./kafka-server-start.sh ../config/server.properties //linux 启动
./kafka-server-start.sh ../config/server.properties 1>/dev/null 2>&1 & //linux 后台启动
nohup ./kafka-server-start.sh ../config/server.properties & //linux 后台启动
服务器启动完了,现在来写代码,配置kafka
3.集成Kafka
maven
<!-- https://mvnrepository.com/artifact/org.springframework.kafka/spring-kafka --> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <version>1.3.0.RELEASE</version> </dependency>
配置生产者
private final Map<String, Object> producerArg = new HashMap<>(); private DefaultKafkaProducerFactory kafkaProducerFactory; private KafkaTemplate kafkaTemplate; private void initArg() { producerArg.put("bootstrap.servers", "127.0.0.1:9092"); producerArg.put("group.id", "100"); producerArg.put("compression.type", "gzip"); producerArg.put("reconnect.backoff.ms ", 20000); producerArg.put("retry.backoff.ms", 20000); producerArg.put("retries", 30); producerArg.put("batch.size", "16384"); producerArg.put("linger.ms", "50"); producerArg.put("acks", "all"); producerArg.put("buffer.memory", "33554432"); producerArg.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); producerArg.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); } /** * 创建一个生产者工厂类 * @return */ public DefaultKafkaProducerFactory getKafkaProducerFactory() { if (kafkaProducerFactory == null) { kafkaProducerFactory = new DefaultKafkaProducerFactory(producerArg); } return kafkaProducerFactory; } /** * 创建一个消息模板 * @param topic 默认的TOPIC * @param listener 生产者监听,如不需要则传入null * @return KafkaTemplate */ @Override public KafkaTemplate createKafkaTemplate(String topic, ProducerListener listener) { if (kafkaTemplate == null) { kafkaTemplate = new KafkaTemplate(this.getKafkaProducerFactory()); kafkaTemplate.setDefaultTopic(TOPIC_DEFAULT); kafkaTemplate.setProducerListener(listener); } return kafkaTemplate; } /** * 发布消息 * @param topic TopicName * @param message 消息字符串,通常为JSON string * @param isUsePartition 是否使用分区 * @param partitionNum 分区的数量 * @param role 用来区分消息key值 * @return */ @Override public boolean send(String topic, String message, boolean isUsePartition, Integer partitionNum, String role) { if (role == null) { role = ROLE_DEFAULT; } String key = role + "_" + message.hashCode(); ListenableFuture<SendResult<String, Object>> result; if (isUsePartition) { int index = getPartitionIndex(key, partitionNum); result = kafkaTemplate.send(topic, index, key, message); } else { result = kafkaTemplate.send(topic, key, message); } return checkResult(result); }
配置消费者
private final Map<String, Object> consumerArg = new HashMap<>(); private DefaultKafkaConsumerFactory kafkaConsumerFactory; /** * 初始化参数 */ private void initArg() { String groupId = "20190504"; consumerArg.put("bootstrap.servers", "127.0.0.1:9092"); consumerArg.put("group.id", groupId);//消费群组,如果需要所有消费者都能接收到消息,则为每个消费者设置不同的群组Id consumerArg.put("enable.auto.commit", "false"); consumerArg.put("auto.commit.interval.ms", "1000"); consumerArg.put("auto.offset.reset", "latest"); consumerArg.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); consumerArg.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); } /** * 创建一个消费者工厂类 * @return */ public DefaultKafkaConsumerFactory getKafkaConsumerFactory() { if (kafkaConsumerFactory == null) { kafkaConsumerFactory = new DefaultKafkaConsumerFactory(consumerArg); } return kafkaConsumerFactory; } /** * 添加一个消费者监听 * @param listener 监听器 * @param groupId 消费者Id,需要让所有的消费者接收消息,请指定不同的分组Id * @param topic 监听Topic名称 * @return 返回KafkaMessageListenerContainer对象,可以进行stop或start */ @Override public KafkaMessageListenerContainer addListener(MessageListener listener, String groupId, String... topic) { ContainerProperties properties = new ContainerProperties(topic); properties.setMessageListener(listener); properties.setGroupId(groupId); KafkaMessageListenerContainer container = new KafkaMessageListenerContainer(getKafkaConsumerFactory(), properties); container.start(); return container; }
非常简单,现在已经配置完,附上测试代码:
@RestController @RequestMapping(value = "/test") public class TestKafkaMQController { private static final Logger logger = Logger.getLogger("kafkaMQ>"); @Autowired private IKafkaMQService kafkaMQService; private final Map<String, KafkaMessageListenerContainer> listenerContainerMap = new ConcurrentHashMap<>(); /** * http://localhost:8180/test/kafka/pub * * @param request * @return */ @RequestMapping(value = "/kafka/pub") public ResultResp<Void> kafkaPub(HttpServletRequest request) { ResultResp<Void> resp = new ResultResp<>(); String msg = "test kafka " + DateTimeUtils.getTime(); try { kafkaMQService.send(KafkaMQService.TOPIC_DEFAULT, msg); resp.setInfo(msg); } catch (Exception e) { e.printStackTrace(); resp.setInfo(e.getMessage()); } return resp; } /** * http://localhost:8180/test/kafka/sub?group=20190504&id=100 * http://localhost:8180/test/kafka/sub?group=20190504&id=101 * http://localhost:8180/test/kafka/sub?group=20190503&id=102 * * @param request * @return */ @RequestMapping(value = "/kafka/sub") public ResultResp<Void> kafkaSub(HttpServletRequest request) { ResultResp<Void> resp = new ResultResp<>(); String id = request.getParameter("id"); String group = request.getParameter("group"); try { KafkaMessageListenerContainer container = kafkaMQService.addListener(new MessageListener<String, Object>() { @Override public void onMessage(ConsumerRecord<String, Object> record) { String log = "%s#{topic:%s, key:%s, value:%s, offset:%s, partition:%s, timestamp:%s }"; logger.info(String.format(log, id, record.topic(), record.key(), record.value(), record.offset(), record.partition(), record.timestamp())); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } }, group, KafkaMQService.TOPIC_DEFAULT); listenerContainerMap.put(id, container); resp.setInfo(id); } catch (Exception e) { e.printStackTrace(); resp.setInfo(e.getMessage()); } return resp; } /** * http://localhost:8180/test/kafka/cancel?id=100 * * @param request * @return */ @RequestMapping(value = "/kafka/cancel") public ResultResp<Void> kafkaCancel(HttpServletRequest request) { ResultResp<Void> resp = new ResultResp<>(); String id = request.getParameter("id"); if (listenerContainerMap.containsKey(id)) { listenerContainerMap.get(id).stop(); listenerContainerMap.remove(id); } return resp; } }
完整的KafkaService服务类
IKafkaMQService接口
package com.lanxinbase.system.service.resource; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.listener.KafkaMessageListenerContainer; import org.springframework.kafka.listener.MessageListener; import org.springframework.kafka.listener.MessageListenerContainer; import org.springframework.kafka.support.ProducerListener; import java.util.Map; /** * Created by alan on 2019/5/4. */ public interface IKafkaMQService { Map<String, Object> getProducerFactoryArg(); KafkaTemplate getKafkaTemplate(); KafkaTemplate getKafkaTemplate(String topic); KafkaTemplate getKafkaTemplate(String topic, ProducerListener listener); KafkaTemplate createKafkaTemplate(String topic, ProducerListener listener); boolean send(String topic, String message); boolean send(String topic, String message, boolean isUsePartition, Integer partitionNum); boolean send(String topic, String message, boolean isUsePartition, Integer partitionNum, String role); int getPartitionIndex(String hashCode, int partitionNum); Map<String, Object> getConsumerFactoryArg(); Map<String, Object> setConsumerFactoryArg(String key, Object val); KafkaMessageListenerContainer addListener(MessageListener listener, String topic); KafkaMessageListenerContainer addListener(MessageListener listener, String groupId, String... topic); }
KafkaMQService实现
package com.lanxinbase.system.service; import com.lanxinbase.system.basic.CompactService; import com.lanxinbase.system.service.resource.IKafkaMQService; import com.lanxinbase.system.utils.NumberUtils; import org.apache.kafka.clients.producer.RecordMetadata; import org.springframework.beans.factory.DisposableBean; import org.springframework.beans.factory.InitializingBean; import org.springframework.kafka.core.DefaultKafkaConsumerFactory; import org.springframework.kafka.core.DefaultKafkaProducerFactory; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.listener.KafkaMessageListenerContainer; import org.springframework.kafka.listener.MessageListener; import org.springframework.kafka.listener.config.ContainerProperties; import org.springframework.kafka.support.ProducerListener; import org.springframework.kafka.support.SendResult; import org.springframework.stereotype.Service; import org.springframework.util.concurrent.ListenableFuture; import java.util.HashMap; import java.util.Map; import java.util.concurrent.ExecutionException; /** * Created by alan on 2019/5/4. * * 0.需要下载Kafka,这里我下载的版本是:kafka_2.12-1.0.1。 * 1.配置kafka(主要是日志的路径,Socket Server Settings:{ * port=9092 * host.name=127.0.0.1 * }) * 2.启动zookeeper:zkServer * 3.启动Kafka:.\bin\windows\kafka-server-start.bat .\config\server.properties * * Topic Test: * * 发送消息: * http://localhost:8180/test/kafka/pub * * Id=100 & 100 监听: * http://localhost:8180/test/kafka/sub?group=20190504&id=100 * http://localhost:8180/test/kafka/sub?group=20190504&id=101 * * 测试日志: * 04-May-2019 16:13:00.647 .onMessage 100#{topic:lan_topic, key:app_-1937508585, value:test kafka 1556957580589, offset:113, partition:0, timestamp:1556957580589 } * * Id=102 监听 * http://localhost:8180/test/kafka/sub?group=20190503&id=102 * * 测试日志: * 04-May-2019 16:13:06.892 .onMessage 102#{topic:lan_topic, key:app_-1937508585, value:test kafka 1556957580589, offset:113, partition:0, timestamp:1556957580589 } * 注:102监听的Topic跟Id=100的是一样的,但是group.id不一样,所有102会收到上一条消息,可以通过时间戳对比 * * ------------------------------------------------------------------------------------------------------------------ * 发送消息: * http://localhost:8180/test/kafka/pub * * 测试日志: * 04-May-2019 16:13:11.292 .onMessage 102#{topic:lan_topic, key:app_-1936558289, value:test kafka 1556957591240, offset:114, partition:0, timestamp:1556957591240 } * 04-May-2019 16:13:11.293 .onMessage 100#{topic:lan_topic, key:app_-1936558289, value:test kafka 1556957591240, offset:114, partition:0, timestamp:1556957591240 } * 注:由于100&102的group.id不一致,所以它们都收到了消息,但是为什么101收不到消息呢?因为是100的服务器状态良好,现在我们来取消100的监听 * * ------------------------------------------------------------------------------------------------------------------ * 取消监听: * http://localhost:8180/test/kafka/cancel?id=100 * KafkaMessageListenerContainer.stop(); * * 发送消息: * http://localhost:8180/test/kafka/pub * * 测试日志: * 04-May-2019 16:13:23.147 .onMessage 101#{topic:lan_topic, key:app_-1916183009, value:test kafka 1556957603093, offset:115, partition:0, timestamp:1556957603093 } * 04-May-2019 16:13:23.147 .onMessage 102#{topic:lan_topic, key:app_-1916183009, value:test kafka 1556957603093, offset:115, partition:0, timestamp:1556957603093 } * 注:这下只有101&102能收到消息了。 * * @See TestKafkaMQController */ @Service public class KafkaMQService extends CompactService implements IKafkaMQService, InitializingBean, DisposableBean { private static final String uri = "127.0.0.1:9092"; public static final String TOPIC_DEFAULT = "lan_topic"; public static final String ROLE_DEFAULT = "app"; private final Map<String, Object> producerArg = new HashMap<>(); private final Map<String, Object> consumerArg = new HashMap<>(); private DefaultKafkaProducerFactory kafkaProducerFactory; private DefaultKafkaConsumerFactory kafkaConsumerFactory; private KafkaTemplate kafkaTemplate; public KafkaMQService() { } /** * 启动后执行 * @throws Exception */ @Override public void afterPropertiesSet() throws Exception { this.initArg(); this.getKafkaProducerFactory(); // this.getKafkaConsumerFactory(); kafkaTemplate = this.createKafkaTemplate(TOPIC_DEFAULT, this.getProducerListener()); } /** * 生产者监听 * @return */ private ProducerListener<String, String> getProducerListener() { return new ProducerListener<String, String>() { @Override public void onSuccess(String topic, Integer partition, String key, String value, RecordMetadata recordMetadata) { StringBuffer sb = new StringBuffer(); sb.append("success{") .append("topic:" + topic) .append(",partition:" + partition) .append(",key:" + key) .append(",value:" + value) .append("}"); logger(sb.toString()); } @Override public void onError(String topic, Integer partition, String key, String value, Exception exception) { StringBuffer sb = new StringBuffer(); sb.append("error{") .append("topic:" + topic) .append(",partition:" + partition) .append(",key:" + key) .append(",value:" + value) .append("}"); logger(sb.toString()); } @Override public boolean isInterestedInSuccess() { return false; } }; } /** * 初始化参数 */ private void initArg() { producerArg.put("bootstrap.servers", uri); producerArg.put("group.id", "100"); producerArg.put("compression.type", "gzip"); producerArg.put("reconnect.backoff.ms ", 20000); producerArg.put("retry.backoff.ms", 20000); producerArg.put("retries", 30); producerArg.put("batch.size", "16384"); producerArg.put("linger.ms", "50"); producerArg.put("acks", "all"); producerArg.put("buffer.memory", "33554432"); producerArg.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); producerArg.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); String groupId = "20190504"; consumerArg.put("bootstrap.servers", uri); consumerArg.put("group.id", groupId);//消费群组,如果需要所有消费者都能接收到消息,则为每个消费者设置不同的群组Id consumerArg.put("enable.auto.commit", "false"); consumerArg.put("auto.commit.interval.ms", "1000"); consumerArg.put("auto.offset.reset", "latest"); consumerArg.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); consumerArg.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); } @Override public Map<String, Object> getProducerFactoryArg() { return producerArg; } @Override public KafkaTemplate getKafkaTemplate() { return this.getKafkaTemplate(TOPIC_DEFAULT); } @Override public KafkaTemplate getKafkaTemplate(String topic) { return this.getKafkaTemplate(topic, null); } @Override public KafkaTemplate getKafkaTemplate(String topic, ProducerListener listener) { return this.createKafkaTemplate(topic, listener); } /** * 创建一个消息模板 * @param topic 默认的TOPIC * @param listener 生产者监听,如不需要则传入null * @return KafkaTemplate */ @Override public KafkaTemplate createKafkaTemplate(String topic, ProducerListener listener) { if (kafkaTemplate == null) { kafkaTemplate = new KafkaTemplate(this.getKafkaProducerFactory()); kafkaTemplate.setDefaultTopic(TOPIC_DEFAULT); kafkaTemplate.setProducerListener(listener); } return kafkaTemplate; } /** * 发布消息 * @param topic TopicName * @param message 消息字符串,通常为JSON string * @return */ @Override public boolean send(String topic, String message) { return this.send(topic, message, false, 0); } /** * 发布消息 * @param topic TopicName * @param message 消息字符串,通常为JSON string * @param isUsePartition 是否使用分区 * @param partitionNum 分区的数量 * @return */ @Override public boolean send(String topic, String message, boolean isUsePartition, Integer partitionNum) { return this.send(topic, message, isUsePartition, partitionNum, ROLE_DEFAULT); } /** * 发布消息 * @param topic TopicName * @param message 消息字符串,通常为JSON string * @param isUsePartition 是否使用分区 * @param partitionNum 分区的数量 * @param role 用来区分消息key值 * @return */ @Override public boolean send(String topic, String message, boolean isUsePartition, Integer partitionNum, String role) { if (role == null) { role = ROLE_DEFAULT; } String key = role + "_" + message.hashCode(); ListenableFuture<SendResult<String, Object>> result; if (isUsePartition) { int index = getPartitionIndex(key, partitionNum); result = kafkaTemplate.send(topic, index, key, message); } else { result = kafkaTemplate.send(topic, key, message); } return checkResult(result); } /** * 检查是否发送成功 * @param result ListenableFuture * @return */ private boolean checkResult(ListenableFuture<SendResult<String, Object>> result) { if (result != null) { try { long offset = result.get().getRecordMetadata().offset(); if (offset >= 0) { return true; } logger("unknown offset."); } catch (InterruptedException e) { e.printStackTrace(); logger("send time out.", e.getMessage()); } catch (ExecutionException e) { e.printStackTrace(); logger("send time out.", e.getMessage()); } } return false; } /** * 获取分区索引,根据key自动分配 * @param hashCode key的hashCode * @param partitionNum 分区的总数量 * @return 返回索引号 */ @Override public int getPartitionIndex(String hashCode, int partitionNum) { if (hashCode == null) { return NumberUtils.nextInt(partitionNum); } return Math.abs(hashCode.hashCode()) % partitionNum; } @Override public Map<String, Object> getConsumerFactoryArg() { return consumerArg; } @Override public Map<String, Object> setConsumerFactoryArg(String key, Object val) { consumerArg.put(key, val); return consumerArg; } /** * 添加一个消费者监听 * @param listener 监听器 * @param topic 监听Topic名称 * @return 返回KafkaMessageListenerContainer对象,可以进行stop或start */ @Override public KafkaMessageListenerContainer addListener(MessageListener listener, String topic) { return this.addListener(listener, getConsumerFactoryArg().get("group.id").toString(), topic); } /** * 添加一个消费者监听 * @param listener 监听器 * @param groupId 消费者Id,需要让所有的消费者接收消息,请指定不同的分组Id * @param topic 监听Topic名称 * @return 返回KafkaMessageListenerContainer对象,可以进行stop或start */ @Override public KafkaMessageListenerContainer addListener(MessageListener listener, String groupId, String... topic) { ContainerProperties properties = new ContainerProperties(topic); properties.setMessageListener(listener); properties.setGroupId(groupId); KafkaMessageListenerContainer container = new KafkaMessageListenerContainer(getKafkaConsumerFactory(), properties); container.start(); return container; } /** * 创建一个生产者工厂类 * @return */ public DefaultKafkaProducerFactory getKafkaProducerFactory() { if (kafkaProducerFactory == null) { kafkaProducerFactory = new DefaultKafkaProducerFactory(producerArg); } return kafkaProducerFactory; } /** * 创建一个消费者工厂类 * @return */ public DefaultKafkaConsumerFactory getKafkaConsumerFactory() { if (kafkaConsumerFactory == null) { kafkaConsumerFactory = new DefaultKafkaConsumerFactory(consumerArg); } return kafkaConsumerFactory; } @Override public void destroy() throws Exception { } }
这里就不附图片了,完整的实现类里面有测试日志。
近期评论