Apache Mina快速入门

Mina是什么

Mina是一个基于NIO的网络框架,使用它编写程序时,可以专注于业务处理,而不用过于关心IO操作。不论应用程序采用什么协议(TCP、UDP)或者其它的,Mina提供了一套公用的接口,来支持这些协议。目前可以处理的协议有:HTTP, XML, TCP, LDAP, DHCP, NTP, DNS, XMPP, SSH, FTP… 。从这一点来说,Mina不仅仅是一个基于NIO的框架,更是一个网络层协议的实现。

区块链JAVA版的demo

先简单的说一下区块链是个什么(相信你早就知道了)。

区块链就是一个链表。把一堆区块串起来就是区块链。每个block有自己的数字签名(就是一串不规则看起来叼叼的字符串),同时包含有上一个block的数字签名,然后包含一些其他的data。

大体就长这样:

1524105998-8111-ic5t7tvoiz

是不是很熟悉,链表。

好,继续。

数字签名是什么?就是hash。

而且每个block含有前一个block的hash值,而且每个block自己的hash也是由前一个的hash计算得来的。如果前一个block(数据块)的数据发生改变,那么前一个的hash值也改变了,由此就会影响到之后的数据块的所有hash值。

所以,通过计算和对比hash值这种方式我们就可以知道区块链是不是合法的,是不是已经被篡改。

什么意思呢?意味着只要你修改了区块链中的任何一个块中的数据,都将会改变hash,从而破坏了整个链。

好,不多说。上代码:

block块定义

先新建个block块:

public class Block {
   
   public String hash;
   public String previousHash; 
   private String data; //our data will be a simple message.
   private long timeStamp; //as number of milliseconds since 1/1/1970.
   
   //Block Constructor.  
   public Block(String data,String previousHash ) {
      this.data = data;
      this.previousHash = previousHash;
      this.timeStamp = new Date().getTime();
   }
}

你也看到了我们的Block里有四个字段,hash就是这个块自己的hash值,previousHash就是上一个块的hash值,data就是这个块所持有的数据,timeStamp就是一个时间记录。

数字签名生成

接下来我们就需要生成数字签名。

有很多种的加密算法来生成数字签名。这里我们就选择SHA256。这里先新建一个工具类用来搞定这个件事情:

import java.security.MessageDigest;//通过导入MessageDigest来使用SHA256

public class StringUtil {
   
   //Applies Sha256 to a string and returns the result. 
   public static String applySha256(String input){
      
      try {
         MessageDigest digest = MessageDigest.getInstance("SHA-256");
           
         //Applies sha256 to our input, 
         byte[] hash = digest.digest(input.getBytes("UTF-8"));
           
         StringBuffer hexString = new StringBuffer(); // This will contain hash as hexidecimal
         for (int i = 0; i < hash.length; i++) {
            String hex = Integer.toHexString(0xff & hash[i]);
            if(hex.length() == 1) hexString.append('0');
            hexString.append(hex);
         }
         return hexString.toString();
      }
      catch(Exception e) {
         throw new RuntimeException(e);
      }
   }
   
   //Short hand helper to turn Object into a json string
   public static String getJson(Object o) {
      return new GsonBuilder().setPrettyPrinting().create().toJson(o);
   }
   
   //Returns difficulty string target, to compare to hash. eg difficulty of 5 will return "00000"  
   public static String getDificultyString(int difficulty) {
      return new String(new char[difficulty]).replace('\0', '0');
   }
   
   
}

好,现在我们在Block里添加生成hash的方法:

//Calculate new hash based on blocks contents
public String calculateHash() {
   String calculatedhash = StringUtil.applySha256( 
         previousHash +
         Long.toString(timeStamp) +
         Integer.toString(nonce) + 
         data 
         );
   return calculatedhash;
}

然后我们在构造函数里添加hash值的计算:

//Block Constructor.  
public Block(String data,String previousHash ) {
   this.data = data;
   this.previousHash = previousHash;
   this.timeStamp = new Date().getTime();
   
   this.hash = calculateHash(); //Making sure we do this after we set the other values.
}

一试身手

现在是时候一试身手了。我们新建一个main类来玩耍一次:

public static void main(String[] args) {
   Block genesisBlock = new Block("Hi im the first block", "0");
   System.out.println("block 1的hash值 : " + genesisBlock.hash);

   Block secondBlock = new Block("Yo im the second block",genesisBlock.hash);
   System.out.println("block 2的hash值: " + secondBlock.hash);

   Block thirdBlock = new Block("Hey im the third block",secondBlock.hash);
   System.out.println("block 3的hash值: " + thirdBlock.hash);

}

输出结果如下:

1524105999-4597-xttfrx4uia

hash值是不一样的,因为每个block的时间戳不同。

现在每个块都有了自己的数字签名,并且这些数字签名都是基于每个块自身的信息以及前一个块的数字签名联合起来生成的数字签名。

但,现在还不能叫区块链。只是一个个区块。接下来就让我们把这些块装入一个ArrayList中:

public static ArrayList<Block> blockchain = new ArrayList<Block>();

public static void main(String[] args) {
    //add our blocks to the blockchain ArrayList:
    blockchain.add(new Block("Hi im the first block", "0"));
    blockchain.add(new Block("Yo im the second block",blockchain.get(blockchain.size()-1).hash));
    blockchain.add(new Block("Hey im the third block",blockchain.get(blockchain.size()-1).hash));

    String blockchainJson = new GsonBuilder().setPrettyPrinting().create().toJson(blockchain);
    System.out.println(blockchainJson);
}

现在看起来就比较紧凑了,也像个区块链的样子了:

1524105999-4788-j8bsf5xmwi

检查区块链的完整性

现在就让我们在ImportChain中创建一个isChainValid()方法,它会遍历链中每个块,然后对比hash值。这个方法做的事情就是检查hash变量的值是否等于计算出来的hash值以及上一个块的hash是否等于previousHash变量的值。

public static Boolean isChainValid() {
   Block currentBlock; 
   Block previousBlock;
   String hashTarget = new String(new char[difficulty]).replace('\0', '0');
   
   //循环遍历每个块检查hash
   for(int i=1; i < blockchain.size(); i++) {
      currentBlock = blockchain.get(i);
      previousBlock = blockchain.get(i-1);
      //比较注册的hash和计算的hash:
      if(!currentBlock.hash.equals(currentBlock.calculateHash()) ){
         System.out.println("Current Hashes not equal");          
         return false;
      }
      //比较上一个块的hash和注册的上一个hash(也就是previousHash)
      if(!previousBlock.hash.equals(currentBlock.previousHash) ) {
         System.out.println("Previous Hashes not equal");
         return false;
      }
      //检查hash是否被处理
      if(!currentBlock.hash.substring( 0, difficulty).equals(hashTarget)) {
         System.out.println("This block hasn't been mined");
         return false;
      }
      
   }
   return true;
}

对区块链中的块的任何更改都将导致此方法返回false。

On the bitcoin network nodes share their blockchains and the longest valid chain is accepted by the network. What’s to stop someone tampering with data in an old block then creating a whole new longer blockchain and presenting that to the network ? Proof of work. The hashcash proof of work system means it takes considerable time and computational power to create new blocks. Hence the attacker would need more computational power than the rest of the peers combined.

上面说的就是POW 。之后会介绍。

好,上面基本上把区块链搞完了。

现在我们开始新的征程吧!

挖矿

我们将要求矿工们来做POW,具体就是通过尝试不同的变量直到块的hash以几个0开头。

然后我们添加一个nonce(Number once)到calculateHash() 方法以及mineBlock()方法:

public class ImportChain {
   
   public static ArrayList<Block> blockchain = new ArrayList<Block>();
   public static int difficulty = 5;

   public static void main(String[] args) {
      //add our blocks to the blockchain ArrayList:

      System.out.println("正在尝试挖掘block 1... ");
      addBlock(new Block("Hi im the first block", "0"));

      System.out.println("正在尝试挖掘block 2... ");
      addBlock(new Block("Yo im the second block",blockchain.get(blockchain.size()-1).hash));

      System.out.println("正在尝试挖掘block 3... ");
      addBlock(new Block("Hey im the third block",blockchain.get(blockchain.size()-1).hash));

      System.out.println("\nBlockchain is Valid: " + isChainValid());

      String blockchainJson = StringUtil.getJson(blockchain);
      System.out.println("\nThe block chain: ");
      System.out.println(blockchainJson);
   }

   public static Boolean isChainValid() {
      Block currentBlock; 
      Block previousBlock;
      String hashTarget = new String(new char[difficulty]).replace('\0', '0');
      
      //loop through blockchain to check hashes:
      for(int i=1; i < blockchain.size(); i++) {
         currentBlock = blockchain.get(i);
         previousBlock = blockchain.get(i-1);
         //compare registered hash and calculated hash:
         if(!currentBlock.hash.equals(currentBlock.calculateHash()) ){
            System.out.println("Current Hashes not equal");          
            return false;
         }
         //compare previous hash and registered previous hash
         if(!previousBlock.hash.equals(currentBlock.previousHash) ) {
            System.out.println("Previous Hashes not equal");
            return false;
         }
         //check if hash is solved
         if(!currentBlock.hash.substring( 0, difficulty).equals(hashTarget)) {
            System.out.println("This block hasn't been mined");
            return false;
         }
         
      }
      return true;
   }
   
   public static void addBlock(Block newBlock) {
      newBlock.mineBlock(difficulty);
      blockchain.add(newBlock);
   }
}
import java.util.Date;

public class Block {
   
   public String hash;
   public String previousHash; 
   private String data; //our data will be a simple message.
   private long timeStamp; //as number of milliseconds since 1/1/1970.
   private int nonce;
   
   //Block Constructor.  
   public Block(String data,String previousHash ) {
      this.data = data;
      this.previousHash = previousHash;
      this.timeStamp = new Date().getTime();
      
      this.hash = calculateHash(); //Making sure we do this after we set the other values.
   }
   
   //Calculate new hash based on blocks contents
   public String calculateHash() {
      String calculatedhash = StringUtil.applySha256( 
            previousHash +
            Long.toString(timeStamp) +
            Integer.toString(nonce) + 
            data 
            );
      return calculatedhash;
   }
   
   //Increases nonce value until hash target is reached.
   public void mineBlock(int difficulty) {
      String target = StringUtil.getDificultyString(difficulty); //Create a string with difficulty * "0" 
      while(!hash.substring( 0, difficulty).equals(target)) {
         nonce ++;
         hash = calculateHash();
      }
      System.out.println("Block已挖到!!! : " + hash);
   }
   
}

执行main,输出如下:

1524105999-9372-juxnhgirwz

挖掘每一个块都需要一些时间,大概3秒钟。你可以调整难度,看看是如何影响挖矿时间的。

如果有人要窜改区块链中的数据,那么他们的区块链将是无效的,invalid。

他们将无法创建更长的区块链。

在你的网络中诚实的区块链有更大的时间优势来创建一个最长的链。

被篡改的区块链将无法追上更长、更有效的链。

除非它们比网络中的所有其他节点具有更快的计算速度。比如未来的量子计算机之类的东西。

好,我们已经完成了一个基本的区块链!

总结一下我们的这个区块链:

  • 每个区块上携带数据。
  • 有数字签名。
  • 必须通过POW来挖掘来验证新的区块。
  • 可以验证数据是否合法和是否被修改。

 

转载自云栖社区

 

Spring+MyBatis实现数据库读写分离4种方案

通过MyBatis配置文件创建读写分离两个DataSource,每个SqlSessionFactoryBean对象的mapperLocations属性制定两个读写数据源的配置文件。将所有读的操作配置在读文件中,所有写的操作配置在写文件中。

优点:实现简单
缺点:维护麻烦,需要对原有的xml文件进行重新修改,不支持多读,不易扩展
实现方式

AOP面向切片编程的简单理解

面向切面编程(AOP是Aspect Oriented Program的首字母缩写) ,我们知道,面向对象的特点是继承、多态和封装。而封装就要求将功能分散到不同的对象中去,这在软件设计中往往称为职责分配。实际上也就是说,让不同的类设计不同的方法。这样代码就分散到一个个的类中去了。这样做的好处是降低了代码的复杂程度,使类可重用。

懒得去用JDK动态代理,也懒得用CGLIB动态代理,直接简单的反射做个例子。

public static void main(String[] args) {
// TODO Auto-generated method stub

String method = “add”;
try {
Class classz = Class.forName(testService.class.getName());
// Object obj = classz.newInstance();
Object obj = classz.getDeclaredConstructor(String.class).newInstance(“我是参数”);

Method[] methods = classz.getDeclaredMethods();
for (int i = 0; i < methods.length; i++) {
if (methods[i].getName().equals(method)) {
System.out.println(“——befor——:You can do something in here.”);
methods[i].invoke(obj, args);
System.out.println(“——after——:You can do something in here too.”);

}
}

} catch (ClassNotFoundException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (InstantiationException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (IllegalAccessException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (IllegalArgumentException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (InvocationTargetException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (NoSuchMethodException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (SecurityException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}

}

就一个类实例化工厂,然后再调用方法前设置一个before方法,当类方法调用完毕后,再设置一个after方法。

package test;

public class testService {

private final String name;

public testService(String n) {
this.name = n;
}

public void add() {
System.out.println(“invoke Class:”+this.getClass().getName()+” and call the method:add->”+name);
}
}

当然,如果使用CGLIB动态代理会更接单,这里出示网上的一个例子:

代理类

import net.sf.cglib.proxy.Enhancer;
import net.sf.cglib.proxy.MethodInterceptor;
import net.sf.cglib.proxy.MethodProxy;

import java.lang.reflect.Method;

public class CglibProxy implements MethodInterceptor {

private Enhancer enhancer = new Enhancer();

public Object getProxy(Class clazz) {
enhancer.setSuperclass(clazz);
enhancer.setCallback(this);
return enhancer.create();
}

public Object intercept(Object obj, Method method, Object[] args, MethodProxy proxy)
throws Throwable {
System.out.println(“start…”);
Object result = proxy.invokeSuper(obj, args);
System.out.println(“end…”);
return result;
}
}

接着需要一个实现类:

public class CgService {
public void say(){
System.out.println(“CgService hello”);
}

public void miss(){
System.out.println(“miss game”);
}
}

然后测试:

public class Test {
public static void main(String[] args) throws IOException {
CgService cgService = (CgService) cglibProxy.getProxy(CgService.class);
cgService.say();
cgService.miss();
}
}

 

Kafka、SpringMVC整合例子

一、安装zookeeper

1.下载安装包:http://zookeeper.apache.org/releases.html#download;

2.进入Zookeeper设置目录,笔者D:\kafka\zookeeper-3.4.11\conf;

3. 将“zoo_sample.cfg”重命名为“zoo.cfg” ;

3. 编辑zoo.cfg配置文件;

4. 找到并编辑

dataDir=/tmp/zookeeper 并更改成您当前的路径;

5. 系统环境变量:

a. 在系统变量中添加ZOOKEEPER_HOME = D:\kafka\zookeeper-3.4.11

b. 编辑path系统变量,添加为路径%ZOOKEEPER_HOME%\bin;

6. 在zoo.cfg文件中修改默认的Zookeeper端口(默认端口2181);

7.打开新的cmd,输入zkServer,运行Zookeeper;

出现如下图片表示成功:

002

 

二、安装并运行Kafka

1.下载Kafka:http://kafka.apache.org/downloads.html

2. 进入Kafka配置目录,D:\kafka\kafka_2.12-1.0.1\config;

3. 编辑文件“server.properties” ;

4. 找到并编辑log.dirs=/tmp/kafka-logs 改成您当前可用的目录;

5. 找到并编辑zookeeper.connect=localhost:2181;

6. Kafka会按照默认,在9092端口上运行,并连接zookeeper的默认端口:2181。

运行Kafka代码:.\bin\windows\kafka-server-start.bat .\config\server.properties 

003

 

 

注:请确保在启动Kafka服务器前,Zookeeper实例已经准备好并开始运行。

三、Kafka代码的实现

1.生产者配置文件:

@Bean
public Map<String,Object> getDefaultFactoryArg(){
    Map<String,Object> arg = new HashMap<>();
    arg.put("bootstrap.servers",ConstantKafka.KAFKA_SERVER);
    arg.put("group.id","100");
    arg.put("retries","1");
    arg.put("batch.size","16384");
    arg.put("linger.ms","1");
    arg.put("buffer.memory","33554432");
    arg.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
    arg.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
    arg.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
    arg.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
    return arg;
}

@Bean
public DefaultKafkaProducerFactory defaultKafkaProducerFactory(){
    DefaultKafkaProducerFactory factory = new DefaultKafkaProducerFactory(this.getDefaultFactoryArg());
    return factory;
}

@Bean
public KafkaTemplate kafkaTemplate(){
    KafkaTemplate template = new KafkaTemplate(defaultKafkaProducerFactory());
    template.setDefaultTopic(ConstantKafka.KAFKA_TOPIC1);
    template.setProducerListener(kafkaProducerListener());
    return template;
}

@Bean
public KafkaProducerListener kafkaProducerListener(){
    KafkaProducerListener listener = new KafkaProducerListener();
    return listener;
}

2.消费者配置文件:

@Bean
public Map<String,Object> getDefaultArgOfConsumer(){
    Map<String,Object> arg = new HashMap<>();
    arg.put("bootstrap.servers",ConstantKafka.KAFKA_SERVER);
    arg.put("group.id","100");
    arg.put("enable.auto.commit","false");
    arg.put("auto.commit.interval.ms","1000");
    arg.put("auto.commit.interval.ms","15000");
    arg.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
    arg.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
    arg.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
    arg.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");

    return arg;
}

@Bean
public DefaultKafkaConsumerFactory defaultKafkaConsumerFactory(){
    DefaultKafkaConsumerFactory factory = new DefaultKafkaConsumerFactory(getDefaultArgOfConsumer());
    return factory;
}

@Bean
public KafkaConsumerMessageListener kafkaConsumerMessageListener(){
    KafkaConsumerMessageListener listener = new KafkaConsumerMessageListener();
    return listener;
}

/**
 * 监听频道-log
 * @return
 */
@Bean
public ContainerProperties containerPropertiesOfLog(){
    ContainerProperties properties = new ContainerProperties(ConstantKafka.KAFKA_TOPIC1);
    properties.setMessageListener(kafkaConsumerMessageListener());
    return properties;
}

/**
 * 监听频道-other
 * @return
 */
@Bean
public ContainerProperties containerPropertiesOfOther(){
    ContainerProperties properties = new ContainerProperties(ConstantKafka.KAFKA_TOPIC2);
    properties.setMessageListener(kafkaConsumerMessageListener());
    return properties;
}

@Bean(initMethod = "doStart")
public KafkaMessageListenerContainer kafkaMessageListenerContainerOfLog(){
    KafkaMessageListenerContainer container = new KafkaMessageListenerContainer(defaultKafkaConsumerFactory(),containerPropertiesOfLog());
    return container;
}

@Bean(initMethod = "doStart")
public KafkaMessageListenerContainer kafkaMessageListenerContainerOfOther(){
    KafkaMessageListenerContainer container = new KafkaMessageListenerContainer(defaultKafkaConsumerFactory(),containerPropertiesOfOther());
    return container;
}

3.生产消息服务

@Service
public class KafkaProducerServer implements IKafkaProducerServer {

    @Autowired
    private KafkaTemplate kafkaTemplate;

    public static final String ROLE_LOG = "log";
    public static final String ROLE_web = "web";
    public static final String ROLE_APP = "app";

    /**
     * 发送消息
     * @param topic 频道
     * @param msg 消息对象
     * @param isUsePartition 是否使用分区
     * @param partitionNum 分区数,如果isUsePartition为true,此数值必须>0
     * @param role 角色:app,web
     * @return
     * @throws IllegalServiceException
     */
    @Override
    public ResultResp<Void> send(String topic, Object msg, boolean isUsePartition, Integer partitionNum, String role) throws IllegalServiceException {
        if (role == null){
            role = ROLE_LOG;
        }

        String key = role + "_" + msg.hashCode();
        String valueString = JsonUtil.ObjectToJson(msg, true);

        if (isUsePartition) {
            //表示使用分区
            int partitionIndex = getPartitionIndex(key, partitionNum);
            ListenableFuture<SendResult<String, Object>> result = kafkaTemplate.send(topic, partitionIndex, key, valueString);
            return checkProRecord(result);
        } else {
            ListenableFuture<SendResult<String, Object>> result = kafkaTemplate.send(topic, key, valueString);
            return checkProRecord(result);
        }
    }

    /**
     * 根据key值获取分区索引
     *
     * @param key
     * @param num
     * @return
     */
    private int getPartitionIndex(String key, int num) {
        if (key == null) {
            Random random = new Random();
            return random.nextInt(num);
        } else {
            int result = Math.abs(key.hashCode()) % num;
            return result;
        }
    }

    /**
     * 检查发送返回结果record
     *
     * @param res
     * @return
     */

    private ResultResp<Void> checkProRecord(ListenableFuture<SendResult<String, Object>> res) {
        ResultResp<Void> resp = new ResultResp<>();
        resp.setCode(ConstantKafka.KAFKA_NO_RESULT_CODE);
        resp.setInfo(ConstantKafka.KAFKA_NO_RESULT_MES);

        if (res != null) {
            try {
                SendResult r = res.get();//检查result结果集
                /*检查recordMetadata的offset数据,不检查producerRecord*/
                Long offsetIndex = r.getRecordMetadata().offset();
                if (offsetIndex != null && offsetIndex >= 0) {
                    resp.setCode(ConstantKafka.SUCCESS_CODE);
                    resp.setInfo(ConstantKafka.SUCCESS_MSG);
                } else {
                    resp.setCode(ConstantKafka.KAFKA_NO_OFFSET_CODE);
                    resp.setInfo(ConstantKafka.KAFKA_NO_OFFSET_MES);
                }

            } catch (InterruptedException e) {
                e.printStackTrace();
                resp.setCode(ConstantKafka.KAFKA_SEND_ERROR_CODE);
                resp.setInfo(ConstantKafka.KAFKA_SEND_ERROR_MES + ":" + e.getMessage());

            } catch (ExecutionException e) {
                e.printStackTrace();
                resp.setCode(ConstantKafka.KAFKA_SEND_ERROR_CODE);
                resp.setInfo(ConstantKafka.KAFKA_SEND_ERROR_MES + ":" + e.getMessage());
            }
        }

        return resp;
    }

}

4.生产者监听服务

public class KafkaProducerListener implements ProducerListener {

    protected final Logger logger = Logger.getLogger(KafkaProducerListener.class.getName());

    public KafkaProducerListener(){

    }

    @Override
    public void onSuccess(String topic, Integer partition, Object key, Object value, RecordMetadata recordMetadata) {
        logger.info("-----------------kafka发送数据成功");
        logger.info("----------topic:"+topic);
        logger.info("----------partition:"+partition);
        logger.info("----------key:"+key);
        logger.info("----------value:"+value);
        logger.info("----------RecordMetadata:"+recordMetadata);
        logger.info("-----------------kafka发送数据结束");
    }

    @Override
    public void onError(String topic, Integer partition, Object key, Object value, Exception e) {
        logger.info("-----------------kafka发送数据失败");
        logger.info("----------topic:"+topic);
        logger.info("----------partition:"+partition);
        logger.info("----------key:"+key);
        logger.info("----------value:"+value);
        logger.info("-----------------kafka发送数据失败结束");
        e.printStackTrace();
    }

    /**
     * 是否启动Producer监听器
     * @return
     */
    @Override
    public boolean isInterestedInSuccess() {
        return false;
    }
}

5.消费者监听服务

public class KafkaConsumerMessageListener implements MessageListener<String,Object> {

    private Logger logger = Logger.getLogger(KafkaConsumerMessageListener.class.getName());

    public KafkaConsumerMessageListener(){

    }

    /**
     * 消息接收-LOG日志处理
     * @param record
     */
    @Override
    public void onMessage(ConsumerRecord<String, Object> record) {
        logger.info("=============kafka消息订阅=============");

        String topic = record.topic();
        String key = record.key();
        Object value = record.value();
        long offset = record.offset();
        int partition = record.partition();

        if (ConstantKafka.KAFKA_TOPIC1.equals(topic)){
            doSaveLogs(value.toString());
        }

        logger.info("-------------topic:"+topic);
        logger.info("-------------value:"+value);
        logger.info("-------------key:"+key);
        logger.info("-------------offset:"+offset);
        logger.info("-------------partition:"+partition);
        logger.info("=============kafka消息订阅=============");
    }

    private void doSaveLogs(String data){
        SocketIOPojo<BikeLogPojo> logs = JsonUtil.JsonToObject(data.toString(),SocketIOPojo.class);
        /**
         * 写入到数据库中
         */
    }
}

 

测试图片:

004

 

 

1262728293032
 
Copyright © 2008-2021 lanxinbase.com Rights Reserved. | 粤ICP备14086738号-3 |