Go 语言 – 实战简易文章系统

go语言很简单,只要有一定的编程基础都很容易使用它来编写一些程序,学完了go lang的语法,习惯写一个小程序,这里我写了一个简易的文章系统,非常简单。

目录结构如下:

a001

1、main.go

func main()  {
   //文件系统
   //fs := http.FileSystem(http.Dir("e:/tools"))
   //http.Handle("/", http.FileServer(fs))
   //log.Fatal(http.ListenAndServe(":8080", nil))

   port := "8080"
   web := http.Server{
      Addr:           ":"+port,
      Handler:        app.HttpHandler(),
      ReadTimeout:    10 * time.Second,
      WriteTimeout:   10 * time.Second,
      MaxHeaderBytes: 1 << 20,
   }

   app.Router()

   log.Printf("Listening on port %s", port)
   log.Printf("Open http://localhost:%s in the browser", port)

   log.Fatal(web.ListenAndServe())
}

首先我们从入口类开始,main()的方法,首先实例化了一个web服务器对象,传入了port跟Handler,handler使用的是一个全局性,也就是说所有的请求都会指向app.HttpHandler()。

接着调用app.Router()方法,初始化一些router,代码待会贴上。

2、Router.go

type route struct {
   path       string
   method     string
   authorized bool
   handler    func(http.ResponseWriter, *http.Request)
}

const (
   GET    = "GET"
   POST   = "POST"
   PUT    = "PUT"
   DELETE = "DELETE"
   OPTION = "OPTION"
   HEADER = "HEADER"
)

var (
   routes map[string]route
)

func Router() {
   //http.HandleFunc("/", indexHandler)
   routes = map[string]route{}
   routes["/"] = route{path: "/", method: GET, authorized: false, handler: indexHandler}
   routes["/view/res/*"] = route{path: "/", method: GET, authorized: false, handler: resourcesHandler}
   routes["/user"] = route{path: "/user", method: GET, authorized: true, handler: indexHandler}
   routes["/add"] = route{path: "/add", method: GET, authorized: true, handler: addHandler}
   routes["/save"] = route{path: "/edit", method: POST, authorized: true, handler: addSaveHandler}
   routes["/view"] = route{path: "/view", method: GET, authorized: false, handler: viewHandler}
   routes["/sign/in"] = route{path: "/sign/up", method: GET, authorized: false, handler: signInHandler}
   routes["/sign/up"] = route{path: "/sign/up", method: GET, authorized: false, handler: signUpHandler}
   routes["/doSignIn"] = route{path: "/doSignIn", method: POST, authorized: false, handler: signInSaveHandler}
   routes["/doSignUp"] = route{path: "/doSignUp", method: POST, authorized: false, handler: signUpSaveHandler}
}

func NewRouter(key string) (r route, ok bool) {
   if strings.Contains(key, "/view/res/") {
      key = "/view/res/*"
   }
   r, err := routes[key]
   return r, err
}

router需要一个类型来保存路由的基本信息,所以这里申明一个route类型对象,route类型:

  • path string  //路由的路径
  • method string  //方法名
  • authorized bool //是否授权
  • handler func(http.ResponseWriter, *http.Request) //处理函数

3、handler.go

const (
   ERROR_NOT_FOUND    = "ERROR_NOT_FOUND"
   ERROR_NOT_METHOD   = "ERROR_NOT_METHOD"
   ERROR_AUTH_INVALID = "ERROR_AUTH_INVALID"
)

var (
   mutex sync.Mutex
   wg    sync.WaitGroup
)

func init() {
   wg.Add(100)
}

func HttpHandler() http.HandlerFunc {
   return func(w http.ResponseWriter, r *http.Request) {
      log.Println(r.Header.Get("Accept"))
      log.Println(r.Header.Get("User-Agent"))
      log.Println(r.Header)
      log.Println(r.Proto, r.Host, r.Method, r.URL.Path)

      token, _ := r.Cookie("token")
      log.Println("token", token)

      //if strings.Index(r.URL.Path,"/view/res/") == 0 {
      // resourcesHandler(w,r)
      // return
      //}
      route, ok := NewRouter(r.URL.Path)
      if !ok {
         errorHandler(w, r, ERROR_NOT_FOUND)
         return
      }
      if r.Method != route.method {
         errorHandler(w, r, ERROR_NOT_METHOD)
         return
      }
      if route.authorized && token != nil && len(token.Value) < 32 {
         errorHandler(w, r, ERROR_AUTH_INVALID)
         return
      }

      route.handler(w, r)
   }
}

func errorHandler(w http.ResponseWriter, r *http.Request, s string) {
   if s == ERROR_NOT_FOUND {
      http.NotFound(w, r)
      return
   }
   if s == ERROR_NOT_METHOD {
      http.Error(w, http.StatusText(http.StatusMethodNotAllowed), http.StatusMethodNotAllowed)
      return
   }
   http.Error(w, http.StatusText(http.StatusNonAuthoritativeInfo), http.StatusNonAuthoritativeInfo)
   return
}

func resourcesHandler(w http.ResponseWriter, r *http.Request) {
   filePath := conf.ROOT + string([]byte(r.URL.Path)[1:])

   //file, err := os.OpenFile(filePath, os.O_RDONLY, 066)
   //defer file.Close()
   //if err != nil {
   // fmt.Println("not found", filePath)
   // return
   //}
   http.ServeFile(w, r, filePath)
}

这里没有用到WaitGroup,只是申明的时候忘记删除了。

主要的函数HttpHandler(),这时一个公共函数,类似于http的调度者,所有的请求都会call这个函数,然后再通过这个函数去分配控制器(route.handler(w, r))。

资源文件处理函数resourcesHandler(),这个函数是将go http服务器中的js、css、image等这些静态资源直接输出,开始不知道有http.ServeFile(w, r, string)这个函数,所以使用了最基本的os读取文件的方式把文件输出出去,其实如果全心投入到go语言,那么真的需要很好地去了解一下go语言的SDK。

3、Controller.go

func indexHandler(w http.ResponseWriter, r *http.Request) {
   util.Output(w, tmpl.Index(r), util.PUT_HTML)
}

这里我只展示了一个函数,其余的函数都是一样的,这里使用了工具类,把信息输出给用户,其中信息的处理交给了tmpl.go的文件。

4、tmpl.go

package tmpl

import (
   "book/model"
   "book/util"
   "fmt"
   "net/http"
   "strconv"
   "strings"
)

func init() {

}

func Index(r *http.Request) string {
   //return "Hello, World!"
   h := NewHtml();
   //h.body("<h1>Hello, World</h1>")
   //h.body(util.GetViewTpl("index.html"))

   list := model.GetArticles("select * from lx_article order by id desc")
   var str string
   tml := `
      <div class="row">
        <div class="col-left">
         <img src="/view/res/img/file_101.353.png" class="img128"/>
            <a href="/view?id=%d" target="_blank">%s</a>
      </div>
        <div class="col-right">%s</div>
        <div class="col-right1"><a href="#">%s</a></div>
    </div>`
   for _, s := range list {
      str += fmt.Sprintf(tml, s.Id, s.Title, s.CreateTimeF, s.User.Username)
   }
   h.body(strings.Replace(util.GetViewTpl("index.html"), "{{content}}", str, -1))
   return h.Show("首页")
}

这个文件比较负责,设计了html的代码,我没有时间去编写模板引擎,所以使用了比较简单的字符替换的方式,把模板输出出去,其实在生产环境中,我们很有必要编写一个模板引起,不过现在流行的是前后端分离,所有的请求,都是通过接口的形式去调去,那么在实际应用中这一层是用不上的,但是为了实现一个简易的文章系统,这里我还是编写出这样不人性化的代码。

核心代码还有很多比如:数据库、模型等到,这里不一一贴出,帖子的最后会附上整个项目的源代码地址,现在我们来看看截图:

用户登录

003

 

 

发表文章:

QQ截图20190507095119

首页的效果图:

001

 

查看文章:

002

项目地址:

https://github.com/AlanRo1986/go-book

Spring中集成RabbitMQ中的消息队列跟发布订阅

1.介绍

RabbitMQ是一个消息代理:它接受和转发消息。您可以将其视为邮局:当您将要发布的邮件放在邮箱中时,您可以确定邮件先生或Mailperson女士最终会将邮件发送给您的收件人。在这个类比中,RabbitMQ是一个邮箱,邮局和邮递员。

RabbitMQ和邮局之间的主要区别在于它不处理纸张,而是接受,存储和转发二进制数据 – 消息

RabbitMQ和一般的消息传递使用了一些术语。

1.生产只意味着发送,发送消息的程序是生产者:

1556961989-3296-producer

2.queue是RabbitMQ中的邮箱的名称,虽然信息流经RabbitMQ和应用程序,但它们只能存储在queue中。queue仅由主机的存储器&磁盘限制约束,它本质上就是一个大的消息缓冲器。许多生产者可以发送消息到一个队列,并且许多消费者可以尝试从一个队列接收数据。这就是我们代表队列的方式:
1556961989-3357-queue
3.消费消息与接收消息有类似的意义。一个消费者是一个程序,主要是等待接收信息:
1556961990-9390-consumer
请注意,生产者,消费者和代理不必驻留在同一主机上; 实际上在大多数应用中他们没有。应用程序也可以是生产者和消费者。

2.工作队列(Queue)

1556963391-5142-python-two

工作队列(又称:任务队列)主要思想是避免立即执行资源密集型任务,并且必须等待它完成。相反,我们安排任务稍后完成。我们将任务封装为消息并将其发送到队列。在后台运行的工作进程将弹出任务并最终执行作业。当你运行许多工作程序时,它们之间将共享这些队列。

这个概念在Web应用程序中特别有用,在这些应用程序中,在短HTTP请求窗口期间无法处理复杂任务。

3.发布/订阅(Publish/Subscribe)

工作队列实际上就是将每个任务都交付给一个工作者(只有一个能接收)。在这一部分,我们将做一些完全不同的事情 – 我们将向多个消费者传递信息。此模式称为“发布/订阅”。

为了说明这种模式,我们将构建一个简单的日志记录系统。它将包含两个程序 – 第一个将发出日志消息,第二个将接收和打印它们。

在我们的日志记录系统中,接收程序的每个运行副本都将获取消息。这样我们就可以运行一个接收器并将日志定向到磁盘; 同时我们将能够运行另一个接收器并在屏幕上看到日志。

基本上,发布的日志消息将被广播给所有接收者。

Exchanges

我们向队列发送消息和从队列接收消息。现在是时候在RabbitMQ中引入完整的消息传递模型了。

RabbitMQ中消息传递模型的核心思想是生产者永远不会将任何消息直接发送到队列。实际上,生产者通常甚至不知道消息是否会被传递到任何队列。

相反,生产者只能向exchange发送消息。exchange是一件非常简单的事情。一方面,它接收来自生产者的消息,另一方面将它们推送到Queue。exchanges必须确切知道如何处理它收到的消息。它应该附加到特定队列吗?它应该附加到许多队列吗?或者它应该被丢弃。其规则由exchange类型定义 。

1556963392-3075-exchanges

有几种交换类型可供选择:directtopicheaders 和fanout。我们将专注于最后一个 – fanout。让我们创建一个这种类型的exchange,并将其称为logs:

channel.exchangeDeclare("logs", "fanout");

fanout exchange非常简单。正如您可能从名称中猜到的那样,它只是将收到的所有消息广播到它知道的所有队列中。而这正是我们记录器所需要的。

现在,我们可以发布到我们的命名交换:

channel.basicPublish("logs","",null,message.getBytes());

临时队列

能够命名队列对我们来说至关重要 – 我们需要将工作人员指向同一个队列。当您想要在生产者和消费者之间共享队列时,为队列命名非常重要。

但我们的记录器并非如此。我们希望了解所有日志消息,而不仅仅是它们的一部分。我们也只对目前流动的消息感兴趣,而不是旧消息。要解决这个问题,我们需要两件事。

首先,每当我们连接到RabbitMQ时,我们都需要一个新的空队列。为此,我们可以使用随机名称创建队列,或者更好 – 让服务器为我们选择随机队列名称。

其次,一旦我们断开消费者,就应该自动删除队列

在Java客户端中,当我们没有向queueDeclare()提供参数时,我们使用生成的名称创建一个非持久的,独占的并能勾自动删除队列:

String queueName = channel.queueDeclare().getQueue();

此时,queueName包含随机队列名称。例如:它可能看起来像amq.gen-JzTY20BRgKO-HjmUJj0wLg

绑定队列

1556963392-9099-bindings
我们已经创建了一个fanout exchange和一个队列。现在我们需要告诉exchange将消息发送到我们的队列。exchange和队列之间的关系称为绑定

channel.queueBind(queueName, "logs", "");

从现在开始,logs exchange会将消息附加到我们的队列中。

列出绑定

你可以使用rabbitmqctl列出现有的绑定

rabbitmqctl list_bindings

把它们放在一起

1556963393-7282-python-three-overall

以上的教材都来自官方文档的翻译,原文请查阅:

https://www.rabbitmq.com/getstarted.html

4.集成

maven

<!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client -->
<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>5.7.0</version>
</dependency>

配置生产者

private static final String host = "127.0.0.1";
private static final int prot = 5672;
/**
 * 缓存Chanel
 */
private Map<String, Channel> producerMap = new ConcurrentHashMap<>();

/**
 * 连接工厂
 */
private ConnectionFactory factory;

private Connection connection;
private Channel channel;

@Override
public void afterPropertiesSet() throws Exception {
    factory = new ConnectionFactory();
    factory.setHost(host);
    factory.setPort(prot);
}

/**
 * 获取一个连接,如果为空或断开了连接则重新实例化
 *
 * @return Connection
 * @throws Exception
 */
@Override
public Connection getConnection() throws Exception {
    if (connection == null || !connection.isOpen()) {
        connection = factory.newConnection();
    }
    return connection;
}

/**
 * 返回一个通道
 *
 * @return Channel
 * @throws Exception
 */
@Override
public Channel getChannel() throws Exception {
    if (channel == null || !channel.isOpen()) {
        channel = this.getConnection().createChannel();
    }
    return channel;
}

/**
 * 创建一个生产者,如果缓存中没有,则重新创建
 *
 * @param exchange Queue name|exchange name
 * @param type     queue|fanout|topic|headers|direct
 * @param durable  是否持久性
 * @return Channel
 * @throws Exception
 */
@Override
public Channel createProducer(String exchange, String type, boolean durable) throws Exception {
    if (producerMap.containsKey(exchange + type + durable)) {
        logger("producer by cache.");
        Channel c1 = producerMap.get(exchange + type + durable);
        if (c1.isOpen()) {
            return c1;
        }
    }

    Channel c = this.getChannel();
    if (type == null || queue.equals(type)) {
        c.queueDeclare(exchange, durable, false, false, null);
    } else {
        c.exchangeDeclare(exchange, type, durable);
    }
    producerMap.put(exchange + type + durable, c);
    return c;
}

/**
 * 发送一条消息
 *
 * @param name    Queue name|exchange name
 * @param type    queue|fanout|topic|headers|direct
 * @param message content
 * @return boolean
 * @throws Exception
 */
@Override
public boolean send(String name, String type, String message) throws Exception {
    try {
        if (type == null || queue.equals(type)) {
            this.getProducer(name, type).basicPublish("", name, null, message.getBytes());
        } else {
            this.getProducer(name, type).basicPublish(name, "", null, message.getBytes());
        }
        return true;
    } catch (Exception e) {
        e.printStackTrace();
    }
    return false;
}

配置消费者

/**
 * 添加一个exchange监听器
 *
 * @param exchange exchange name
 * @param autoAck  是否自动响应
 * @param listener DeliverCallback监听器
 */
@Override
public void addExchangeListener(String exchange, boolean autoAck, DeliverCallback listener) {
    try {
        Channel c = this.getChannel();
        String queue = c.queueDeclare().getQueue();
        c.queueBind(queue, exchange, "");
        c.basicConsume(queue, autoAck, listener, e -> {
            logger("exchange error:" + e);
        });
    } catch (Exception e) {
        e.printStackTrace();
    }
}

/**
 * 添加一个Queue监听器
 *
 * @param queue    Queue name
 * @param autoAck  是否自动响应
 * @param listener DeliverCallback监听器
 */
@Override
public void addQueueListener(String queue, boolean autoAck, DeliverCallback listener) {
    try {
        this.getProducer(queue).basicConsume(queue, autoAck, listener, e -> {
            logger("queue error:" + e);
        });
    } catch (Exception e) {
        e.printStackTrace();
    }
}

因为exchange需要绑定临时队列,所以就用两个方法来分开绑定监听。

测试代码

@RestController
@RequestMapping(value = "/test")
public class TestRabbitMQController {

    private static final Logger logger = Logger.getLogger("RabbitMQ>");

    @Resource
    private IRabbitMQService rabbitMQService;


    /**
     * http://localhost:8180/test/rabbitmq/pub/add?name=lan.queue&type=queue
     * http://localhost:8180/test/rabbitmq/pub/add?name=lan.fanout&type=fanout
     * http://localhost:8180/test/rabbitmq/pub/add?name=lan.topic&type=topic
     *
     * @param request
     * @return
     */
    @RequestMapping(value = "/rabbitmq/pub/add")
    public ResultResp<Void> queue(HttpServletRequest request) {
        ResultResp<Void> resp = new ResultResp<>();

        String name = request.getParameter("name");
        String type = request.getParameter("type");

        String msg = "test RabbitMQ " + type + " " + name + " " + DateTimeUtils.getTime();
        try {
            rabbitMQService.send(name, type, msg);
            resp.setInfo(msg);
        } catch (Exception e) {
            e.printStackTrace();
            resp.setInfo(e.getMessage());
        }

        return resp;
    }

    /**
     * http://localhost:8180/test/rabbitmq/sub/add?id=100&name=lan.queue&type=queue
     * http://localhost:8180/test/rabbitmq/sub/add?id=101&name=lan.queue&type=queue
     * http://localhost:8180/test/rabbitmq/sub/add?id=102&name=lan.queue&type=queue
     *
     * http://localhost:8180/test/rabbitmq/sub/add?id=103&name=lan.fanout&type=fanout
     * http://localhost:8180/test/rabbitmq/sub/add?id=104&name=lan.fanout&type=fanout
     * http://localhost:8180/test/rabbitmq/sub/add?id=105&name=lan.fanout&type=fanout
     *
     * http://localhost:8180/test/rabbitmq/sub/add?id=106&name=lan.topic&type=topic
     * http://localhost:8180/test/rabbitmq/sub/add?id=107&name=lan.topic&type=topic
     * http://localhost:8180/test/rabbitmq/sub/add?id=108&name=lan.topic&type=topic
     *
     * @param request
     * @return
     */
    @RequestMapping(value = "/rabbitmq/sub/add")
    public ResultResp<Void> topic(HttpServletRequest request) {
        ResultResp<Void> resp = new ResultResp<>();

        String id = request.getParameter("id");
        String name = request.getParameter("name");
        String type = request.getParameter("type");
        try {
            rabbitMQService.addListener(name, type, (s, c) -> {
                logger.info(id + "# message:" + new String(c.getBody()) + ", routing:" + c.getEnvelope().getRoutingKey());
            });
            resp.setInfo(id);
        } catch (Exception e) {
            e.printStackTrace();
            resp.setInfo(e.getMessage());
        }

        return resp;
    }
}

5.完整代码

RabbitMQService接口

package com.lanxinbase.system.service.resource;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DeliverCallback;

import java.io.IOException;

/**
 * Created by alan on 2019/5/2.
 */
public interface IRabbitMQService {

    Connection getConnection() throws Exception;

    Channel getChannel() throws Exception;

    Channel getProducer(String name) throws Exception;

    Channel getProducer(String name, String type) throws Exception;

    Channel createProducer(String name, String type, boolean durable) throws Exception;

    boolean send(String name, String message) throws Exception;

    boolean send(String name, String type, String message) throws Exception;

    void addListener(String name, String type,DeliverCallback listener);

    void addListener(String name, String type, boolean autoAck, DeliverCallback listener);

    void addExchangeListener(String exchange, boolean autoAck, DeliverCallback listener);

    void addQueueListener(String queue, boolean autoAck, DeliverCallback listener);


}

RabbitMQService实现

package com.lanxinbase.system.service;

import com.lanxinbase.system.basic.CompactService;
import com.lanxinbase.system.service.resource.IRabbitMQService;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.stereotype.Service;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

/**
 * Created by alan on 2019/5/3.
 * <p>
 * <p>
 * 0.需要下载Erlang,并且设置好ERLANG_HOME的环境变量,类似于JDK的配置方式。
 * 1.下载RabbitMQ
 * 3.运行RabbitMQ,like this:./sbin/rabbitmq-server.bat
 * <p>
 * Queue Test:
 * 生产者:
 * http://localhost:8180/test/rabbitmq/pub/add?name=lan.queue&type=queue
 * <p>
 * rabbitMQService.send(name, type, msg);
 * <p>
 * 消费者(3个):
 * http://localhost:8180/test/rabbitmq/sub/add?id=100&name=lan.queue&type=queue
 * http://localhost:8180/test/rabbitmq/sub/add?id=101&name=lan.queue&type=queue
 * http://localhost:8180/test/rabbitmq/sub/add?id=102&name=lan.queue&type=queue
 * <p>
 * rabbitMQService.addListener(name, type, (s, c) -> {
 * logger.info(id + "# message:" + new String(c.getBody()) + ", routing:" + c.getEnvelope().getRoutingKey());
 * });
 * <p>
 * Queue运行日志:
 * 03-May-2019 22:24:37.773 lambda$topic$0 101# message:test RabbitMQ queue lan.queue 1556893477772, routing:lan.queue
 * 03-May-2019 22:24:38.467 lambda$topic$0 102# message:test RabbitMQ queue lan.queue 1556893478466, routing:lan.queue
 * 03-May-2019 22:24:39.376 lambda$topic$0 100# message:test RabbitMQ queue lan.queue 1556893479374, routing:lan.queue
 * <p>
 * 这里生产者生产了3条信息,Queue消息不会丢失,如果生产者生产消息的时候没有消费者进入,那么消息会等到消费者进入后发送给消费者。
 * 如果有多个消费者监听同一个Queue,那么则会按照某种算法,将消息发送给其中一个消费者,如果接收成功后,通道会自动删除消息。
 * <p>
 * Exchange Test:
 * 生产者:
 * http://localhost:8180/test/rabbitmq/pub/add?name=lan.fanout&type=fanout
 * http://localhost:8180/test/rabbitmq/pub/add?name=lan.topic&type=topic
 * <p>
 * rabbitMQService.send(name, type, msg);
 * <p>
 * 消费者:
 * http://localhost:8180/test/rabbitmq/sub/add?id=103&name=lan.fanout&type=fanout
 * http://localhost:8180/test/rabbitmq/sub/add?id=104&name=lan.fanout&type=fanout
 * http://localhost:8180/test/rabbitmq/sub/add?id=105&name=lan.fanout&type=fanout
 * <p>
 * http://localhost:8180/test/rabbitmq/sub/add?id=106&name=lan.topic&type=topic
 * http://localhost:8180/test/rabbitmq/sub/add?id=107&name=lan.topic&type=topic
 * http://localhost:8180/test/rabbitmq/sub/add?id=108&name=lan.topic&type=topic
 * <p>
 * rabbitMQService.addListener(name, type, (s, c) -> {
 * logger.info(id + "# message:" + new String(c.getBody()) + ", routing:" + c.getEnvelope().getRoutingKey());
 * });
 * <p>
 * Exchange运行日志:
 * 03-May-2019 22:24:42.424 lambda$topic$0 104# message:test RabbitMQ fanout lan.fanout 1556893482420, routing:
 * 03-May-2019 22:24:42.425 lambda$topic$0 103# message:test RabbitMQ fanout lan.fanout 1556893482420, routing:
 * 03-May-2019 22:24:42.425 lambda$topic$0 105# message:test RabbitMQ fanout lan.fanout 1556893482420, routing:
 * <p>
 * 03-May-2019 22:24:46.077 lambda$topic$0 107# message:test RabbitMQ topic lan.topic 1556893486075, routing:
 * 03-May-2019 22:24:46.077 lambda$topic$0 108# message:test RabbitMQ topic lan.topic 1556893486075, routing:
 * 03-May-2019 22:24:46.077 lambda$topic$0 106# message:test RabbitMQ topic lan.topic 1556893486075, routing:
 * <p>
 * 从日志时间上可以看的出,生产者的消息,全部同时发送给了所有消费者。如果生产者生产消息的时候没有消费者进入,那么消息会丢失。
 * 当有消费者监听Topic时,可以收到消息,如果同时有多个消费者监听同一个topic,那么消息将分别发送给各个消费者。
 *
 * @See TestRabbitMQController
 */

@Service
public class RabbitMQService extends CompactService implements InitializingBean, DisposableBean, IRabbitMQService {

    private static final String host = "127.0.0.1";
    private static final int prot = 5672;

    public static final String TOPIC_DEFAULT = "lan.topic";
    public static final String DIRECT_DEFAULT = "lan.direct";
    public static final String HEADERS_DEFAULT = "lan.headers";
    public static final String FANOUT_DEFAULT = "lan.fanout";

    public static final String QUEUE_DEFAULT = "lan.queue";

    public static final String direct = "direct";
    public static final String topic = "topic";
    public static final String fanout = "fanout";
    public static final String headers = "headers";
    public static final String queue = "queue";

    /**
     * 缓存Chanel
     */
    private Map<String, Channel> producerMap = new ConcurrentHashMap<>();

    /**
     * 连接工厂
     */
    private ConnectionFactory factory;

    private Connection connection;
    private Channel channel;

    public RabbitMQService() {

    }

    @Override
    public void afterPropertiesSet() throws Exception {
        factory = new ConnectionFactory();
        factory.setHost(host);
        factory.setPort(prot);
    }

    /**
     * 获取一个连接,如果为空或断开了连接则重新实例化
     *
     * @return Connection
     * @throws Exception
     */
    @Override
    public Connection getConnection() throws Exception {
        if (connection == null || !connection.isOpen()) {
            connection = factory.newConnection();
        }
        return connection;
    }

    /**
     * 返回一个通道
     *
     * @return Channel
     * @throws Exception
     */
    @Override
    public Channel getChannel() throws Exception {
        if (channel == null || !channel.isOpen()) {
            channel = this.getConnection().createChannel();
        }
        return channel;
    }

    /**
     * 获取一个生产者
     *
     * @param name Queue name|exchange name
     * @return Channel
     * @throws Exception
     */
    @Override
    public Channel getProducer(String name) throws Exception {
        return this.getProducer(name, queue);
    }

    /**
     * 获取一个生产者
     *
     * @param name Queue name|exchange name
     * @param type queue|fanout|topic|headers|direct
     * @return Channel
     * @throws Exception
     */
    @Override
    public Channel getProducer(String name, String type) throws Exception {
        return this.createProducer(name, type, false);
    }

    /**
     * 创建一个生产者,如果缓存中没有,则重新创建
     *
     * @param exchange Queue name|exchange name
     * @param type     queue|fanout|topic|headers|direct
     * @param durable  是否持久性
     * @return Channel
     * @throws Exception
     */
    @Override
    public Channel createProducer(String exchange, String type, boolean durable) throws Exception {
        if (producerMap.containsKey(exchange + type + durable)) {
            logger("producer by cache.");
            Channel c1 = producerMap.get(exchange + type + durable);
            if (c1.isOpen()) {
                return c1;
            }
        }

        Channel c = this.getChannel();
        if (type == null || queue.equals(type)) {
            c.queueDeclare(exchange, durable, false, false, null);
        } else {
            c.exchangeDeclare(exchange, type, durable);
        }
        producerMap.put(exchange + type + durable, c);
        return c;
    }


    /**
     * 发送一条消息,默认只发送queue消息
     *
     * @param name    Queue name|exchange name
     * @param message content
     * @return boolean
     * @throws Exception
     */
    @Override
    public boolean send(String name, String message) throws Exception {
        return this.send(name, queue, message);
    }

    /**
     * 发送一条消息
     *
     * @param name    Queue name|exchange name
     * @param type    queue|fanout|topic|headers|direct
     * @param message content
     * @return boolean
     * @throws Exception
     */
    @Override
    public boolean send(String name, String type, String message) throws Exception {
        try {
            if (type == null || queue.equals(type)) {
                this.getProducer(name, type).basicPublish("", name, null, message.getBytes());
            } else {
                this.getProducer(name, type).basicPublish(name, "", null, message.getBytes());
            }
            return true;
        } catch (Exception e) {
            e.printStackTrace();
        }
        return false;
    }

    /**
     * 设置消费者监听
     *
     * @param name     Queue name|exchange name
     * @param type     queue|fanout|topic|headers|direct
     * @param listener DeliverCallback监听器
     */
    @Override
    public void addListener(String name, String type, DeliverCallback listener) {
        this.addListener(name, type, true, listener);
    }

    /**
     * 设置消费者监听
     *
     * @param name     Queue name|exchange name
     * @param type     queue|fanout|topic|headers|direct
     * @param autoAck  是否自动响应
     * @param listener DeliverCallback监听器
     */
    @Override
    public void addListener(String name, String type, boolean autoAck, DeliverCallback listener) {
        if (type == null || queue.equals(type)) {
            this.addQueueListener(name, autoAck, listener);
        } else {
            this.addExchangeListener(name, autoAck, listener);
        }
    }

    /**
     * 添加一个exchange监听器
     *
     * @param exchange exchange name
     * @param autoAck  是否自动响应
     * @param listener DeliverCallback监听器
     */
    @Override
    public void addExchangeListener(String exchange, boolean autoAck, DeliverCallback listener) {
        try {
            Channel c = this.getChannel();
            String queue = c.queueDeclare().getQueue();
            c.queueBind(queue, exchange, "");
            c.basicConsume(queue, autoAck, listener, e -> {
                logger("exchange error:" + e);
            });
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    /**
     * 添加一个Queue监听器
     *
     * @param queue    Queue name
     * @param autoAck  是否自动响应
     * @param listener DeliverCallback监听器
     */
    @Override
    public void addQueueListener(String queue, boolean autoAck, DeliverCallback listener) {
        try {
            this.getProducer(queue).basicConsume(queue, autoAck, listener, e -> {
                logger("queue error:" + e);
            });
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    @Override
    public void destroy() throws Exception {
        channel.close();
        connection.close();
    }
}

RabbitMQService服务类每个方法都写了说明,这里就不解释了。

Spring集成Apache Kafka

之前写了一篇文章,当然,来自Apache Kafka的翻译文档,让大家更能理解Kafka,地址是:

http://www.lanxinbase.com/?p=2314

Apache Kafka® 是一个分布式流媒体平台,很适合用来处理大并发的数据,本人使用了Kafka有两年多了,非常稳定。

1.下载Kafka

我测试及开发的系统是windows,所以现在要下载一个win版本的,JDK环境就不说了,本次测试的版本号:kafka_2.12-1.0.1。

下载地址:http://kafka.apache.org/downloads

由于Kafka的运行在zookeeper环境内,所以在启动之前,我们需要安装一下zookeeper,版本号:zookeeper-3.4.11。

下载地址:https://zookeeper.apache.org/releases.html

2.配置Kafka

如果下载下来不配置的话,会出现很多问题。

\kafka_2.12-1.0.1\config\server.properties

############################# Socket Server Settings #############################

# The address the socket server listens on. It will get the value returned from
# java.net.InetAddress.getCanonicalHostName() if not configured.
# FORMAT:
# listeners = listener_name://host_name:port
# EXAMPLE:
# listeners = PLAINTEXT://your.host.name:9092
#listeners=PLAINTEXT://:9092

port=9092
host.name=127.0.0.1

############################# Log Basics #############################

# A comma seperated list of directories under which to store log files
#log.dirs=/tmp/kafka-logs
log.dirs=D:\\kafka\\logs

示例的配置有删减,因为事实上我只配置了这几点而已:

  • port=9092
  • host.name=127.0.0.1
  • log.dirs=D:\\kafka\\logs

\zookeeper-3.4.11\conf\zoo.cfg

dataDir=D:\\kafka\\logs

\kafka_2.12-1.0.1\config\zookeeper.properties

dataDir=D:\\kafka\\logs

到这里,基本上就可以运行了,现在首先启动zookeeper服务器:

zkServer  //简单直接

./zookeeper-3.4.11/bin/zkServer.cmd //或许可以这样子

./zkServer.sh start  //linux 可能是这样子

然后启动kafka服务器:

.\kafka_2.12-1.0.1\bin\windows\kafka-server-start.bat .\kafka_2.12-1.0.1\config\server.properties

./kafka-server-start.sh ../config/server.properties  //linux 启动

./kafka-server-start.sh ../config/server.properties 1>/dev/null 2>&1 &  //linux 后台启动

nohup ./kafka-server-start.sh ../config/server.properties &  //linux 后台启动

服务器启动完了,现在来写代码,配置kafka

3.集成Kafka

maven

<!-- https://mvnrepository.com/artifact/org.springframework.kafka/spring-kafka -->
<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>1.3.0.RELEASE</version>
</dependency>

 

配置生产者

private final Map<String, Object> producerArg = new HashMap<>();
private DefaultKafkaProducerFactory kafkaProducerFactory;
private KafkaTemplate kafkaTemplate;

private void initArg() {
    producerArg.put("bootstrap.servers", "127.0.0.1:9092");
    producerArg.put("group.id", "100");
    producerArg.put("compression.type", "gzip");
    producerArg.put("reconnect.backoff.ms ", 20000);
    producerArg.put("retry.backoff.ms", 20000);
    producerArg.put("retries", 30);
    producerArg.put("batch.size", "16384");
    producerArg.put("linger.ms", "50");
    producerArg.put("acks", "all");
    producerArg.put("buffer.memory", "33554432");
    producerArg.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    producerArg.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
}
/**
 * 创建一个生产者工厂类
 * @return
 */
public DefaultKafkaProducerFactory getKafkaProducerFactory() {
    if (kafkaProducerFactory == null) {
        kafkaProducerFactory = new DefaultKafkaProducerFactory(producerArg);
    }
    return kafkaProducerFactory;
}

/**
 * 创建一个消息模板
 * @param topic 默认的TOPIC
 * @param listener 生产者监听,如不需要则传入null
 * @return KafkaTemplate
 */
@Override
public KafkaTemplate createKafkaTemplate(String topic, ProducerListener listener) {
    if (kafkaTemplate == null) {
        kafkaTemplate = new KafkaTemplate(this.getKafkaProducerFactory());
        kafkaTemplate.setDefaultTopic(TOPIC_DEFAULT);
        kafkaTemplate.setProducerListener(listener);
    }
    return kafkaTemplate;
}
/**
 * 发布消息
 * @param topic TopicName
 * @param message 消息字符串,通常为JSON string
 * @param isUsePartition 是否使用分区
 * @param partitionNum 分区的数量
 * @param role 用来区分消息key值
 * @return
 */
@Override
public boolean send(String topic, String message, boolean isUsePartition, Integer partitionNum, String role) {
    if (role == null) {
        role = ROLE_DEFAULT;
    }

    String key = role + "_" + message.hashCode();
    ListenableFuture<SendResult<String, Object>> result;
    if (isUsePartition) {
        int index = getPartitionIndex(key, partitionNum);
        result = kafkaTemplate.send(topic, index, key, message);

    } else {
        result = kafkaTemplate.send(topic, key, message);
    }

    return checkResult(result);
}

配置消费者

private final Map<String, Object> consumerArg = new HashMap<>();
private DefaultKafkaConsumerFactory kafkaConsumerFactory;

/**
 * 初始化参数
 */
private void initArg() {
    String groupId = "20190504";
    consumerArg.put("bootstrap.servers", "127.0.0.1:9092");
    consumerArg.put("group.id", groupId);//消费群组,如果需要所有消费者都能接收到消息,则为每个消费者设置不同的群组Id
    consumerArg.put("enable.auto.commit", "false");
    consumerArg.put("auto.commit.interval.ms", "1000");
    consumerArg.put("auto.offset.reset", "latest");
    consumerArg.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    consumerArg.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
}

/**
 * 创建一个消费者工厂类
 * @return
 */
public DefaultKafkaConsumerFactory getKafkaConsumerFactory() {
    if (kafkaConsumerFactory == null) {
        kafkaConsumerFactory = new DefaultKafkaConsumerFactory(consumerArg);
    }
    return kafkaConsumerFactory;
}

/**
 * 添加一个消费者监听
 * @param listener 监听器
 * @param groupId 消费者Id,需要让所有的消费者接收消息,请指定不同的分组Id
 * @param topic 监听Topic名称
 * @return 返回KafkaMessageListenerContainer对象,可以进行stop或start
 */
@Override
public KafkaMessageListenerContainer addListener(MessageListener listener, String groupId, String... topic) {
    ContainerProperties properties = new ContainerProperties(topic);
    properties.setMessageListener(listener);
    properties.setGroupId(groupId);

    KafkaMessageListenerContainer container = new KafkaMessageListenerContainer(getKafkaConsumerFactory(), properties);
    container.start();
    return container;
}

非常简单,现在已经配置完,附上测试代码:

@RestController
@RequestMapping(value = "/test")
public class TestKafkaMQController {

    private static final Logger logger = Logger.getLogger("kafkaMQ>");
    
    @Autowired
    private IKafkaMQService kafkaMQService;

    private final Map<String, KafkaMessageListenerContainer> listenerContainerMap = new ConcurrentHashMap<>();

    /**
     * http://localhost:8180/test/kafka/pub
     *
     * @param request
     * @return
     */
    @RequestMapping(value = "/kafka/pub")
    public ResultResp<Void> kafkaPub(HttpServletRequest request) {
        ResultResp<Void> resp = new ResultResp<>();

        String msg = "test kafka " + DateTimeUtils.getTime();
        try {
            kafkaMQService.send(KafkaMQService.TOPIC_DEFAULT, msg);
            resp.setInfo(msg);
        } catch (Exception e) {
            e.printStackTrace();
            resp.setInfo(e.getMessage());
        }

        return resp;
    }

    /**
     * http://localhost:8180/test/kafka/sub?group=20190504&id=100
     * http://localhost:8180/test/kafka/sub?group=20190504&id=101
     * http://localhost:8180/test/kafka/sub?group=20190503&id=102
     *
     * @param request
     * @return
     */
    @RequestMapping(value = "/kafka/sub")
    public ResultResp<Void> kafkaSub(HttpServletRequest request) {
        ResultResp<Void> resp = new ResultResp<>();

        String id = request.getParameter("id");
        String group = request.getParameter("group");

        try {
            KafkaMessageListenerContainer container = kafkaMQService.addListener(new MessageListener<String, Object>() {

                @Override
                public void onMessage(ConsumerRecord<String, Object> record) {
                    String log = "%s#{topic:%s, key:%s, value:%s, offset:%s, partition:%s, timestamp:%s }";
                    logger.info(String.format(log, id, record.topic(), record.key(), record.value(), record.offset(), record.partition(), record.timestamp()));
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }

            }, group, KafkaMQService.TOPIC_DEFAULT);

            listenerContainerMap.put(id, container);
            resp.setInfo(id);
        } catch (Exception e) {
            e.printStackTrace();
            resp.setInfo(e.getMessage());
        }

        return resp;
    }

    /**
     * http://localhost:8180/test/kafka/cancel?id=100
     *
     * @param request
     * @return
     */
    @RequestMapping(value = "/kafka/cancel")
    public ResultResp<Void> kafkaCancel(HttpServletRequest request) {
        ResultResp<Void> resp = new ResultResp<>();

        String id = request.getParameter("id");

        if (listenerContainerMap.containsKey(id)) {
            listenerContainerMap.get(id).stop();
            listenerContainerMap.remove(id);
        }

        return resp;
    }

}

完整的KafkaService服务类

IKafkaMQService接口

package com.lanxinbase.system.service.resource;

import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.listener.KafkaMessageListenerContainer;
import org.springframework.kafka.listener.MessageListener;
import org.springframework.kafka.listener.MessageListenerContainer;
import org.springframework.kafka.support.ProducerListener;

import java.util.Map;

/**
 * Created by alan on 2019/5/4.
 */
public interface IKafkaMQService {

    Map<String, Object> getProducerFactoryArg();

    KafkaTemplate getKafkaTemplate();

    KafkaTemplate getKafkaTemplate(String topic);

    KafkaTemplate getKafkaTemplate(String topic, ProducerListener listener);

    KafkaTemplate createKafkaTemplate(String topic, ProducerListener listener);

    boolean send(String topic, String message);

    boolean send(String topic, String message, boolean isUsePartition, Integer partitionNum);

    boolean send(String topic, String message, boolean isUsePartition, Integer partitionNum, String role);

    int getPartitionIndex(String hashCode, int partitionNum);

    Map<String, Object> getConsumerFactoryArg();

    Map<String, Object> setConsumerFactoryArg(String key, Object val);

    KafkaMessageListenerContainer addListener(MessageListener listener, String topic);

    KafkaMessageListenerContainer addListener(MessageListener listener, String groupId, String... topic);

}

KafkaMQService实现

package com.lanxinbase.system.service;

import com.lanxinbase.system.basic.CompactService;
import com.lanxinbase.system.service.resource.IKafkaMQService;
import com.lanxinbase.system.utils.NumberUtils;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.listener.KafkaMessageListenerContainer;
import org.springframework.kafka.listener.MessageListener;
import org.springframework.kafka.listener.config.ContainerProperties;
import org.springframework.kafka.support.ProducerListener;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Service;
import org.springframework.util.concurrent.ListenableFuture;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutionException;

/**
 * Created by alan on 2019/5/4.
 *
 * 0.需要下载Kafka,这里我下载的版本是:kafka_2.12-1.0.1。
 * 1.配置kafka(主要是日志的路径,Socket Server Settings:{
 *      port=9092
 *      host.name=127.0.0.1
 * })
 * 2.启动zookeeper:zkServer
 * 3.启动Kafka:.\bin\windows\kafka-server-start.bat .\config\server.properties
 *
 * Topic Test:
 *
 * 发送消息:
 *      http://localhost:8180/test/kafka/pub
 *
 * Id=100 & 100 监听:
 *      http://localhost:8180/test/kafka/sub?group=20190504&id=100
 *      http://localhost:8180/test/kafka/sub?group=20190504&id=101
 *
 * 测试日志:
 * 04-May-2019 16:13:00.647 .onMessage 100#{topic:lan_topic, key:app_-1937508585, value:test kafka 1556957580589, offset:113, partition:0, timestamp:1556957580589 }
 *
 * Id=102 监听
 *      http://localhost:8180/test/kafka/sub?group=20190503&id=102
 *
 * 测试日志:
 * 04-May-2019 16:13:06.892 .onMessage 102#{topic:lan_topic, key:app_-1937508585, value:test kafka 1556957580589, offset:113, partition:0, timestamp:1556957580589 }
 * 注:102监听的Topic跟Id=100的是一样的,但是group.id不一样,所有102会收到上一条消息,可以通过时间戳对比
 *
 * ------------------------------------------------------------------------------------------------------------------
 * 发送消息:
 *      http://localhost:8180/test/kafka/pub
 *
 * 测试日志:
 * 04-May-2019 16:13:11.292 .onMessage 102#{topic:lan_topic, key:app_-1936558289, value:test kafka 1556957591240, offset:114, partition:0, timestamp:1556957591240 }
 * 04-May-2019 16:13:11.293 .onMessage 100#{topic:lan_topic, key:app_-1936558289, value:test kafka 1556957591240, offset:114, partition:0, timestamp:1556957591240 }
 * 注:由于100&102的group.id不一致,所以它们都收到了消息,但是为什么101收不到消息呢?因为是100的服务器状态良好,现在我们来取消100的监听
 *
 * ------------------------------------------------------------------------------------------------------------------
 * 取消监听:
 *      http://localhost:8180/test/kafka/cancel?id=100
 *      KafkaMessageListenerContainer.stop();
 *
 * 发送消息:
 *     http://localhost:8180/test/kafka/pub
 *
 * 测试日志:
 * 04-May-2019 16:13:23.147 .onMessage 101#{topic:lan_topic, key:app_-1916183009, value:test kafka 1556957603093, offset:115, partition:0, timestamp:1556957603093 }
 * 04-May-2019 16:13:23.147 .onMessage 102#{topic:lan_topic, key:app_-1916183009, value:test kafka 1556957603093, offset:115, partition:0, timestamp:1556957603093 }
 * 注:这下只有101&102能收到消息了。
 *
 * @See TestKafkaMQController
 */
@Service
public class KafkaMQService extends CompactService implements IKafkaMQService, InitializingBean, DisposableBean {

    private static final String uri = "127.0.0.1:9092";
    public static final String TOPIC_DEFAULT = "lan_topic";
    public static final String ROLE_DEFAULT = "app";

    private final Map<String, Object> producerArg = new HashMap<>();
    private final Map<String, Object> consumerArg = new HashMap<>();

    private DefaultKafkaProducerFactory kafkaProducerFactory;
    private DefaultKafkaConsumerFactory kafkaConsumerFactory;

    private KafkaTemplate kafkaTemplate;

    public KafkaMQService() {

    }

    /**
     * 启动后执行
     * @throws Exception
     */
    @Override
    public void afterPropertiesSet() throws Exception {
        this.initArg();
        this.getKafkaProducerFactory();
//        this.getKafkaConsumerFactory();

        kafkaTemplate = this.createKafkaTemplate(TOPIC_DEFAULT, this.getProducerListener());

    }

    /**
     * 生产者监听
     * @return
     */
    private ProducerListener<String, String> getProducerListener() {
        return new ProducerListener<String, String>() {
            @Override
            public void onSuccess(String topic, Integer partition, String key, String value, RecordMetadata recordMetadata) {
                StringBuffer sb = new StringBuffer();
                sb.append("success{")
                        .append("topic:" + topic)
                        .append(",partition:" + partition)
                        .append(",key:" + key)
                        .append(",value:" + value)
                        .append("}");
                logger(sb.toString());
            }

            @Override
            public void onError(String topic, Integer partition, String key, String value, Exception exception) {
                StringBuffer sb = new StringBuffer();
                sb.append("error{")
                        .append("topic:" + topic)
                        .append(",partition:" + partition)
                        .append(",key:" + key)
                        .append(",value:" + value)
                        .append("}");
                logger(sb.toString());
            }

            @Override
            public boolean isInterestedInSuccess() {
                return false;
            }
        };
    }

    /**
     * 初始化参数
     */
    private void initArg() {
        producerArg.put("bootstrap.servers", uri);
        producerArg.put("group.id", "100");
        producerArg.put("compression.type", "gzip");
        producerArg.put("reconnect.backoff.ms ", 20000);
        producerArg.put("retry.backoff.ms", 20000);
        producerArg.put("retries", 30);
        producerArg.put("batch.size", "16384");
        producerArg.put("linger.ms", "50");
        producerArg.put("acks", "all");
        producerArg.put("buffer.memory", "33554432");
        producerArg.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        producerArg.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        String groupId = "20190504";
        consumerArg.put("bootstrap.servers", uri);
        consumerArg.put("group.id", groupId);//消费群组,如果需要所有消费者都能接收到消息,则为每个消费者设置不同的群组Id
        consumerArg.put("enable.auto.commit", "false");
        consumerArg.put("auto.commit.interval.ms", "1000");
        consumerArg.put("auto.offset.reset", "latest");
        consumerArg.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        consumerArg.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    }


    @Override
    public Map<String, Object> getProducerFactoryArg() {
        return producerArg;
    }

    @Override
    public KafkaTemplate getKafkaTemplate() {
        return this.getKafkaTemplate(TOPIC_DEFAULT);
    }

    @Override
    public KafkaTemplate getKafkaTemplate(String topic) {
        return this.getKafkaTemplate(topic, null);
    }

    @Override
    public KafkaTemplate getKafkaTemplate(String topic, ProducerListener listener) {
        return this.createKafkaTemplate(topic, listener);
    }

    /**
     * 创建一个消息模板
     * @param topic 默认的TOPIC
     * @param listener 生产者监听,如不需要则传入null
     * @return KafkaTemplate
     */
    @Override
    public KafkaTemplate createKafkaTemplate(String topic, ProducerListener listener) {
        if (kafkaTemplate == null) {
            kafkaTemplate = new KafkaTemplate(this.getKafkaProducerFactory());
            kafkaTemplate.setDefaultTopic(TOPIC_DEFAULT);
            kafkaTemplate.setProducerListener(listener);
        }
        return kafkaTemplate;
    }

    /**
     * 发布消息
     * @param topic TopicName
     * @param message 消息字符串,通常为JSON string
     * @return
     */
    @Override
    public boolean send(String topic, String message) {
        return this.send(topic, message, false, 0);
    }

    /**
     * 发布消息
     * @param topic TopicName
     * @param message 消息字符串,通常为JSON string
     * @param isUsePartition 是否使用分区
     * @param partitionNum 分区的数量
     * @return
     */
    @Override
    public boolean send(String topic, String message, boolean isUsePartition, Integer partitionNum) {
        return this.send(topic, message, isUsePartition, partitionNum, ROLE_DEFAULT);
    }

    /**
     * 发布消息
     * @param topic TopicName
     * @param message 消息字符串,通常为JSON string
     * @param isUsePartition 是否使用分区
     * @param partitionNum 分区的数量
     * @param role 用来区分消息key值
     * @return
     */
    @Override
    public boolean send(String topic, String message, boolean isUsePartition, Integer partitionNum, String role) {
        if (role == null) {
            role = ROLE_DEFAULT;
        }

        String key = role + "_" + message.hashCode();
        ListenableFuture<SendResult<String, Object>> result;
        if (isUsePartition) {
            int index = getPartitionIndex(key, partitionNum);
            result = kafkaTemplate.send(topic, index, key, message);

        } else {
            result = kafkaTemplate.send(topic, key, message);
        }

        return checkResult(result);
    }

    /**
     * 检查是否发送成功
     * @param result ListenableFuture
     * @return
     */
    private boolean checkResult(ListenableFuture<SendResult<String, Object>> result) {
        if (result != null) {
            try {
                long offset = result.get().getRecordMetadata().offset();
                if (offset >= 0) {
                    return true;
                }
                logger("unknown offset.");
            } catch (InterruptedException e) {
                e.printStackTrace();
                logger("send time out.", e.getMessage());

            } catch (ExecutionException e) {
                e.printStackTrace();
                logger("send time out.", e.getMessage());
            }
        }
        return false;
    }

    /**
     * 获取分区索引,根据key自动分配
     * @param hashCode key的hashCode
     * @param partitionNum 分区的总数量
     * @return 返回索引号
     */
    @Override
    public int getPartitionIndex(String hashCode, int partitionNum) {
        if (hashCode == null) {
            return NumberUtils.nextInt(partitionNum);
        }
        return Math.abs(hashCode.hashCode()) % partitionNum;
    }

    @Override
    public Map<String, Object> getConsumerFactoryArg() {
        return consumerArg;
    }

    @Override
    public Map<String, Object> setConsumerFactoryArg(String key, Object val) {
        consumerArg.put(key, val);
        return consumerArg;
    }

    /**
     * 添加一个消费者监听
     * @param listener 监听器
     * @param topic 监听Topic名称
     * @return 返回KafkaMessageListenerContainer对象,可以进行stop或start
     */
    @Override
    public KafkaMessageListenerContainer addListener(MessageListener listener, String topic) {
        return this.addListener(listener, getConsumerFactoryArg().get("group.id").toString(), topic);
    }

    /**
     * 添加一个消费者监听
     * @param listener 监听器
     * @param groupId 消费者Id,需要让所有的消费者接收消息,请指定不同的分组Id
     * @param topic 监听Topic名称
     * @return 返回KafkaMessageListenerContainer对象,可以进行stop或start
     */
    @Override
    public KafkaMessageListenerContainer addListener(MessageListener listener, String groupId, String... topic) {
        ContainerProperties properties = new ContainerProperties(topic);
        properties.setMessageListener(listener);
        properties.setGroupId(groupId);

        KafkaMessageListenerContainer container = new KafkaMessageListenerContainer(getKafkaConsumerFactory(), properties);
        container.start();
        return container;
    }

    /**
     * 创建一个生产者工厂类
     * @return
     */
    public DefaultKafkaProducerFactory getKafkaProducerFactory() {
        if (kafkaProducerFactory == null) {
            kafkaProducerFactory = new DefaultKafkaProducerFactory(producerArg);
        }
        return kafkaProducerFactory;
    }

    /**
     * 创建一个消费者工厂类
     * @return
     */
    public DefaultKafkaConsumerFactory getKafkaConsumerFactory() {
        if (kafkaConsumerFactory == null) {
            kafkaConsumerFactory = new DefaultKafkaConsumerFactory(consumerArg);
        }
        return kafkaConsumerFactory;
    }

    @Override
    public void destroy() throws Exception {

    }

}

这里就不附图片了,完整的实现类里面有测试日志。

 

Apache Kafka

Kafka说明

Apache Kafka® 是一个分布式流媒体平台,这是什么意思呢?

分布式流媒体平台它有三大特性:

  • 发布与订阅流媒体数据,类似于消息队列或企业消息传递系统
  • 能够容错并持久性存储流媒体数据
  • 处理流媒体数据

Kafka通常适用于两大类应用场景:

  • 在系统或应用程序之间构建实时数据流管道,使其可靠地获取数据
  • 在应用程序中对数据流进行实时转换或响应

要了解Kafka如何做这些事情,让我们深入探讨Kafka的能力。

首先是几个概念:

  • Kafka作为一个集群运行在一个或多个可跨多个数据中心的服务器上。
  • Kafka集群以称为topics类别存储记录流。
  • 每条记录流都由一个键、值和时间戳组成。

Kafka有四个核心API:

  • 生产者(Producer API )允许应用程序发布一条数据记录到一个或更多的Kafka topics。
  • 消费者(Consumer API)允许应用程序订阅一个或多个主题,并处理为其生成的记录流。
  • 数据流(Streams API)允许应用程序充当流处理器( stream processor),使用来自一个或多个topics输入流,并将输出流生成到一个或多个输出topics,从而有效地将输入流转换为输出流。
  • 连接器(Connector API)允许构建并运行可重用的生产者或消费者,将Kafka topics连接到现有的应用程序或数据系统。例如,数据库的连接器可以捕获每个表的更改。

1556898600-9189-kafka-apis

在Kafka中,客户机和服务器之间的通信是通过一种简单、高性能、与语言无关的TCP协议来完成。 此协议已经版本化,并保持与旧版本的向后兼容性。我们为Kafka提供Java客户端,客户端可以使用多语言版本。

Topics and Logs

让我们首先深入探讨Kafka为记录流提供的核心抽象 – topic。

topic是发布记录的类别或源名称。Kafka中的Topics总是多个订阅用户;也就是说,一个topic可以有0个、1个或多个订户订阅者。

对于每个topic,Kafka群集都维护一个分区日志,如下所示:

 

1556898598-1342-log-anatomy每个分区都是一个有序的,不可变的记录序列,不断附加到一个结构化的日志中。分区中的记录每个都被分配一个 offset 的顺序ID号,它标识分区中的每个记录。

Kafka集群持久地保留所有已发布的记录,无论它们是否已被消耗,可以使用可配置方式设置的过期时间。例如,如果保留策略设置为两天,则在发布记录后的两天内,它可供使用,之后将被丢弃以释放空间。Kafka的性能在数据大小方面实际上是恒定的,因此长时间存储数据不是问题。

1556898607-4067-log-consumer事实上,在消费者的日志中使用偏移量(offset)或分区来保持唯一的元数据。这种偏移(offset)由消费者控制:通常消费者在读取记录时会线性地提高其偏移量(offset),但事实上,由于消费者控制偏移量(offset)的位置,它可以按照自己喜欢的任何顺序消费记录。例如,消费者可以重置为已处理过或较旧的偏移量(offset)以重新处理过去的数据,或者跳到最近的记录并从“现在”开始消费。

这些功能组合意味着Kafka消费者非常简单,他们可以来来往往对集群或其他消费者没有太大影响。例如,您可以使用我们的命令行工具“tail”任何主题的内容,而无需更改任何现有消费者所消费的内容。

日志中的分区有多种用途。首先,它们允许日志扩展到超出单个服务器的大小。每个单独的分区必须适合托管它的服务器,但topic可能有许多分区,因此它可以处理任意数量的数据。其次,在一点上他们更像是并行的单元。

分配

日志的分区分布在Kafka集群中的服务器上,每个服务器处理数据并请求分区的共享。每个分区都在可配置数量的服务器上进行复制,以实现容错。

每个分区都有一个服务器充当“领导者(leader)”,0个或多个服务器充当“追随者(followers)”。领导者处理分区的所有读取和写入请求,而关注者被动地复制领导者。如果领导者出现故障,其中一个追随者将自动成为新的领导者。每个服务器都充当其某些分区的领导者和其他服务器的追随者,因此负载在群集中很均衡。

地理复制

Kafka MirrorMaker为群集提供地理复制的支持。使用MirrorMaker,消息跨多个数据中心或云区域进行复制。你可以使用它在active/passive方案中进行备份和恢复; 或者在active/active方案中,按地理的方式,使数据更接近用户,或支持数据位置要求。

生产者

生产者将数据发布到他们选择的topics。生产者负责选择要分配给topic中哪个分区的记录。这可以通过循环方式完成,只是为了平衡负载,或者可以根据一些语义分区功能(例如:基于记录中的某些键)来完成。

消费者

消费者使用 consumer group 名称标记自己,每一个记录都会发布到一个topic中,并传递给每一个订阅的 consumer group 中其中一个消费者。消费者实例可以在同一个进程中,也可以在不同的机器

如果所有消费者实例具有相同的 consumer group,那么记录将在消费者实例上进行负载平衡(只有其中一个能收到消息)。

如果所有消费者实例具有不同的 consumer groups,那么每个记录将使用广播的方式,发送到所有 consumer group 进程。

 

1556898599-7054-consumer-groups两个服务器Kafka群集,托管四个分区(P0-P3),包含两个 consumer groups。 group A有两个消费者, group B有四个消费者。

然而,更常见的是,我们发现topics具有少量的 consumer groups,每个“logical subscriber”一个。每个组由许多用于可伸缩性和容错的消费者实例组成。这只不过是发布 – 订阅语义,其中订阅者是消费者群集而不是单个进程。

在Kafka中实现消费的方式是通过在消费者实例上划分日志中的分区,以便每个实例在任何时间点都是分配的“公平份额”的独占消费者。这个维护组成员身份的过程是由kafka协议动态处理的。如果有新的消费者实例加入该组,他们将从该组的其他成员接管一些分区; 如果实例出现故障,那么其分区将分配给其余消费者实例。

Kafka仅提供分区内记录的总订单,而不是主题中不同分区之间的记录。对于大多数应用程序而言,按分区排序与按键分区数据的能力相结合就足够了。但是,如果需要对记录进行总排序,可以使用只有一个分区的主题来实现,但这将意味着每个 consumer group 只有一个消费者进程。

多租户

您可以将Kafka部署为多租户解决方案。通过配置哪些主题可以生成或使用数据来启用多租户。配额也有运营支持。管理员可以定义和强制执行配额,以控制客户端使用的代理资源。有关更多信息,请参阅安全文档

担保

在高级别Kafka提供以下保证:

  • 生产者按顺序将消息发送到特定主题分区。也就是说,如果记录数据M1由与数据M2是由同一个生产者发送,并且首先发送M1,则M1将具有比M2更低的偏移并在日志中更早出现。
  • 消费者实例按照它们存储在日志中的顺序查看记录。
  • 对于具有复制因子N的主题,我们将容忍最多N-1个服务器故障,而不会丢失任何提交到日志的记录。

下一篇,将Kafka集成到Spring中。

有关Kafka提供的API和功能的更多信息,请参阅官方文档

Apache ActiveMQ Artemis实战

虽然它们(ActiveMQ & Artemis)的设计目的是做同样的工作,但内部的工作方式却不同。以下是您在规划迁移时需要注意的一些最显着的体系结构差异。

在ActiveMQ中,我们有一些IO连接层的不同实现,如TCP(阻塞的)和NIO(非阻塞的)。在Artemis中,IO层是使用Netty实现的,Netty是一个NIO框架。这意味着不再需要在不同的实现之间进行选择,因为默认情况下使用非阻塞实现。

每个broker都是一个重要消息存储。大多数ActiveMQ用户都应该熟悉KahaDB。它由一个消息日志组成,用于快速存储消息(和其他命令包)日志,以及在需要时用于检索取回消息。

Artemis有自己的消息存储。它只包含仅追加的消息日志。由于分页方式的不同,不需要使用消息索引。我们一会儿再谈。在这一点上,很重要的一点是,这两个存储是不能互换的,如果需要的话,必须仔细规划数据迁移。

我们所说的分页差异是什么意思?分页是当broker无法在其内存中保存所有传入消息时发生的过程。如何处理这种情况的策略在两个broker之间是不同的。ActiveMQ有一些游标,这些游标基本上是准备发送给使用者的消息的缓存。它将尝试将所有传入的消息保存在缓存。当可用内存用完时,消息会添加到存储中,但缓存会停止。当空间再次可用时,broker将通过批量从存储中提取消息来再次填充缓存。因此,我们需要在broker运行时,经常地读取日志。为了做到这一点,我们需要维护日志索引,以便在日志中跟踪消息的位置。

在Artemis中,在这方面的工作是不同的。整个消息日志保存在内存中,并直接从中发送消息。当内存耗尽时,消息会在生产者端进行分页(在消息到达broker之前)。它们按到达时的顺序存储在连续的页面文件中。一旦释放内存,消息就会从这些页面文件移到日志中。对于这样的分页工作,只有在broker启动时才从文件日志读取消息,以便在内存中重新创建日志的这个版本。在这种情况下,日志只能按顺序读取,这意味着不需要在日志中保留消息索引。

12
 
Copyright © 2008-2021 lanxinbase.com Rights Reserved. | 粤ICP备14086738号-3 |