Spring集成Apache ActiveMQ配置实现

Apache ActiveMQ是目前比较流行的开源、多协议、基于Java的消息服务器之一。它支持行业标准协议,因此用户可以在各种语言平台上获得范围广泛而高效客户端。C、C++、Python、.NET等编程语言。使用无所不在的AMQP协议集成您的多平台应用程序。使用STOMP替换WebSockets在Web应用程序之间交换消息。使用MQTT协议管理物联网设备。支持你现有及其他的JMS基础架构。ActiveMQ提供了支持任何消息传递用例的能力和灵活性。

下载地址:http://activemq.apache.org/components/classic/download/

官方文档:http://activemq.apache.org/components/classic/documentation

FAQ帮助文档:http://activemq.apache.org/faq

1.配置连接工厂

@Bean
public ActiveMQConnectionFactory activeMQConnectionFactory(){
    ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
    //factory.setBrokerURL("mqtt://0.0.0.0:1883");
    factory.setBrokerURL("tcp://0.0.0.0:61616");
    return factory;
}

2.配置JmsTemplate(生产者)

@Bean
public JmsTemplate jmsTemplate(){
    JmsTemplate jmsTemplate = new JmsTemplate();
    jmsTemplate.setConnectionFactory(this.activeMQConnectionFactory());
    return jmsTemplate;
}

3.配置监听器(消费者)

/**
 * new an activeMQ queue.
 * @return
 */
@Bean
public ActiveMQQueue activeMQQueue(){
    ActiveMQQueue queue = new ActiveMQQueue("queue/default");
    return queue;
}

/**
 * new an activeMQ message listener.
 * @return
 */
@Bean
public ActiveMQMessageListener activeMQMessageListener(){
    return new ActiveMQMessageListener();
}

/**
 * default message listener container.
 * @return
 */
@Bean
public DefaultMessageListenerContainer defaultMessageListenerContainer(){
    DefaultMessageListenerContainer container = new DefaultMessageListenerContainer();
    container.setConnectionFactory(this.activeMQConnectionFactory());
    container.setDestination(this.activeMQQueue());
    container.setMessageListener(this.activeMQMessageListener());
    return container;
}

配置起来非常简单,那么使用起来呢,下面来看一个例子:

@Autowired
private JmsTemplate jmsTemplate;//注入JMS模板类(生产者)

@Autowired
private ActiveMQQueue queue;//注入消息队列

@RequestMapping(value = "/test", method = RequestMethod.GET)
public ResultResp<Void> test(HttpServletRequest request) {

    ResultResp<Void> resp = new ResultResp<>();
    ActiveMQMessagePojo mqMessagePojo = new ActiveMQMessagePojo("Test for demo..." + System.currentTimeMillis());
    jmsTemplate.send(queue, mqMessagePojo);

    return resp;
}

//ActiveMQMessagePojo实体类:
import org.springframework.jms.core.MessageCreator;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;

/**
 * MQ message
 * Created by alan on 2018/1/13.
 */
public class ActiveMQMessagePojo implements MessageCreator {

    private String msg;

    public ActiveMQMessagePojo(){

    }

    public ActiveMQMessagePojo(String msg){
        this.msg = msg;
    }

    @Override
    public Message createMessage(Session session) throws JMSException {
        return session.createTextMessage(msg);
    }

}

就是这样子简单。

 

Go 语言入门总结

Go 语言使用非常简单,是专门针对各种语言的痛点设计的,本人看的书是《Go语言实战》非常推荐给大家,笔记的作者是原水寒,我在查看他的笔记的时候觉得他总结的很到位,所以就拷贝了一份到本人的博客中,具体目录请参见原作者 Go 语言极速入门

实际上,还有一些关于 Go 语言的知识没有在极速入门中进行分析,例如 Go 语言的反射机制、Go 语言的测试体系、各种标准库的使用以及各种 Go 语言内建工具的使用。当然,Go 语言的表格驱动测试姿势在 Go 语言极速入门12 – 实战项目之单任务版爬虫 这一小节中做过简单的使用姿势的分析,但是性能测试没有进行分析。
使用 Go 语言实现的项目有以下这些
Docker
Kubernetes
Consul
Prometheus
etcd
istio
SOFAMosn
SOFAMesh
opentracing-go
grpc-go

Go 语言的基本使用姿势掌握后,后续我会去分析 SOFAStack 中的两个框架 – SOFAMosn 和 SOFAMesh,这两个框架由蚂蚁金服开发维护,前者是 ServiceMesh 架构下很好的 SideCar 实现,后者是 fork 自 istio,并作出了一些自己的优化,通过对这两个框架的分析,我们可以不只是从理论方面学习 ServiceMesh 架构,还可以从实际的代码实现来学习 ServiceMesh!!!

Go 语言入门14 – jsonrpc 最简姿势

服务定义

package rpcdemo

import "errors"

// 服务
type DemoService struct {
}

// 参数
type Args struct {
   A, B int
}

// jsonrpc 的服务需要定义为参入参数和传出参数的格式
// 传出参数必须为指针格式
func (e DemoService) DIV(args Args, result *float64) error {
   if args.B == 0 {
      return errors.New("division by zero")
   }

   *result = float64(args.A) / float64(args.B)
   return nil
}

 

服务端

package main

import (
   "github.com/zhaojigang/crawler/rpc"
   "log"
   "net"
   "net/rpc"
   "net/rpc/jsonrpc"
)

func main() {
   // 注册服务
   rpc.Register(rpcdemo.DemoService{})
   // 启动 server
   listener, err := net.Listen("tcp", "127.0.0.1:1234")

   if err != nil {
      panic(err)
   }

   for {
      // 不断连接服务
      conn, err := listener.Accept()
      if err != nil {
         log.Printf("accept error, %v", err)
         continue
      }
      // 使用 Goroutine:ServeConn runs the JSON-RPC server on a single connection.
      go jsonrpc.ServeConn(conn)
   }
}

客户端

package main

import (
   "fmt"
   "github.com/zhaojigang/crawler/rpc"
   "log"
   "net/rpc/jsonrpc"
)

func main() {
   // 连接server并创建jsonrpc-client
   client, err := jsonrpc.Dial("tcp", "127.0.0.1:1234")
   if err != nil {
      log.Printf("create jsonrpc client error")
   }

   var result float64
   // 发起调用
   err = client.Call("DemoService.DIV", rpcdemo.Args{3, 4}, &result)
   if err != nil {
      log.Printf("call error")
   }
   fmt.Println(result)
}

Go 语言入门13 – 实战项目之并发版爬虫

单任务版的爬虫很慢,因为只有一个 main Goroutine 在执行,最慢的地方就是爬取(fetch)和解析(parse),我们可以将同一个 url 的这两部分合并成一个任务(worker),然后使用多个 Goroutine 去并行的执行这多个 worker,这样速度就会有极大的提升。

一、并发版爬虫 – 并发调度器

1.1、架构

1556594489-4817-5842684-9885ac4881bc3a58

  1. 服务启动时,创建两个 chan,in(用于接收 Request)和 out(用于接收 ParseResult),并且将 in 赋值给 scheduler 的任务 chan(即 scheduler 后续操作的 chan 其实就是 in)
  2. 创建指定数量个 Goroutine,每一个 Goroutine 做以下几件事:

2.1. 从 in chan 中阻塞等待获取 Request
2.2. 使用 Worker 对 获取到的Request 做 fetch 和 parse 的操作,将 parseResult 阻塞发送到 out chan

  1. Engine 将 seeds 中的 Request 添加到调度器 chann,
  2. 之后开启死循环,不断从 out chan 中接收 parseResult,然后将 parseResult.items 进行打印,将 parseResult.requests 加入到 scheduler 的 chan,实际上就是 in chan(当然,每一个 Request 加入 in 的操作都是由一个新的 Goroutine 来完成的,原因为本小节末会讲)

1.2、实现

1556594496-2334-5842684-318244d050f83249

由于项目结构进行了调整,列出发生修改的所有代码。

爬取器 fetcher 和解析器 parser 与之前相同,模型类也不变。

调度器接口 scheduler.go

package scheduler

import (
    "github.com/zhaojigang/crawler/model"
)

// 调度器接口
type Scheduler interface {
    // 提交 Request 到调度器的 request 任务通道中
    Submit(request model.Request)
    // 初始化当前的调度器实例的 request 任务通道
    ConfigureMasterWorkerChan(chan model.Request)
}

并发调度器 simple.go

package scheduler

import (
    "github.com/zhaojigang/crawler/model"
)

type SimpleScheduler struct {
    workerChan chan model.Request
}

// 为什么使用指针接收者,需要改变 SimpleScheduler 内部的 workerChan
// https://stackoverflow.com/questions/27775376/value-receiver-vs-pointer-receiver-in-golang
// https://studygolang.com/articles/1113
// https://blog.csdn.net/suiban7403/article/details/78899671
func (s *SimpleScheduler) ConfigureMasterWorkerChan(in chan model.Request) {
    s.workerChan = in
}

func (s *SimpleScheduler) Submit(request model.Request) {
    // 每个 Request 一个 Goroutine
    go func() { s.workerChan <- request }()
}

注意:

  • 这里为每一个将 Request 添加到 chan 的任务都开启一个 Goroutine 来执行,为什么?
  • 方法为何使用指针接收者而不是值接收者?
package engine

import (
    "github.com/zhaojigang/crawler/fetcher"
    "github.com/zhaojigang/crawler/model"
    "github.com/zhaojigang/crawler/scheduler"
    "log"
)

// 并发引擎
type ConcurrentEngine struct {
    // 调度器
    Scheduler scheduler.Scheduler
    // 开启的 worker 数量
    WorkerCount int
}

func (e ConcurrentEngine) Run(seeds ...model.Request) {
    in := make(chan model.Request)
    out := make(chan model.ParseResult)
    // 初始化调度器的 chann
    e.Scheduler.ConfigureMasterWorkerChan(in)
    // 创建 WorkerCount 个 worker
    for i := 0; i < e.WorkerCount; i++ {
        createWorker(in, out);
    }
    // 将 seeds 中的 Request 添加到调度器 chann
    for _, r := range seeds {
        e.Scheduler.Submit(r)
    }

    for {
        result := <-out // 阻塞获取
        for _, item := range result.Items {
            log.Printf("getItems, items: %v", item)
        }

        for _, r := range result.Requests {
            // 如果 submit 内部直接是 s.workerChan <- request,则阻塞等待发送,该方法阻塞在这里
            // 如果 submit 内部直接是 go func() { s.workerChan <- request }(),则为每个Request分配了一个Goroutine,这里不会阻塞在这里
            e.Scheduler.Submit(r)
        }
    }
}

func createWorker(in chan model.Request, out chan model.ParseResult) {
    go func() {
        for {
            r := <-in // 阻塞等待获取
            result, err := worker(r)
            if err != nil {
                continue
            }
            out <- result // 阻塞发送
        }
    }()
}

func worker(r model.Request) (model.ParseResult, error) {
    log.Printf("fetching url:%s", r.Url)
    body, err := fetcher.Fetch(r.Url)
    if err != nil {
        log.Printf("fetch error, url: %s, err: %v", r.Url, err)
        return model.ParseResult{}, nil
    }
    return r.ParserFunc(body), nil
}

启动器 main.go

package main

import (
    "github.com/zhaojigang/crawler/engine"
    "github.com/zhaojigang/crawler/model"
    "github.com/zhaojigang/crawler/scheduler"
    "github.com/zhaojigang/crawler/zhenai/parser"
)

func main() {
    engine.ConcurrentEngine{
        Scheduler:   &scheduler.SimpleScheduler{},
        WorkerCount: 1000,
    }.Run(model.Request{
        // 种子 Url
        Url:        "http://www.zhenai.com/zhenghun",
        ParserFunc: parser.ParseCityList,
    })
}

疑问:

Q1. 为什么在 scheduler 中每一个将 Request 添加到 chan 的任务都开启一个 Go routine 来执行?

A:在 Go 语言学习9 – Channel 一节描述过,对于无缓冲的 channel,如果两个 go routine 没有同时准备好,通道会导致先执行发送或接收操作的 go routine 阻塞等待,假设使用 s.workerChan <- request 而不是 go func() { s.workerChan <- request }(),假设开启了 10 个 Worker Go routine,这 10 个 go routine 阻塞在 r := <-in 阻塞等待获取 Request 上,假设 seeds 大于 10,例如 11,那么当 Engine 的这个循环执行到底 11 个的时候,将陷入等待,因为所有的10个 Worker go routine 此时都可能也处于等待中,即 in chan 没有接收方准备好接收数据,所以 engine 作为发送方也要阻塞等待;那么为什么10个 Worker go routine 都会处于等待中呢?

for _, r := range seeds {
        e.Scheduler.Submit(r)
}

 

 go func() {
        for {
            r := <-in // 阻塞等待获取
            result, err := worker(r)
            if err != nil {
                continue
            }
            out <- result // 阻塞发送
        }
    }()

Q2. scheduler 方法为何使用指针接收者而不是值接收者?

A:在 Go 语言学习5 – 面向接口 中我们详细的介绍了什么时候使用指针接收者,什么时候使用值接收者,其中最重要的两条就是 “1. 如果要改变接收者内部的属性值,必须使用指针接收者,因为值接收者是对接收者副本的操作;2. 如果 struct 内一个方法是指针接收者,那么其全部方法都是用指针接收者”,在 scheduler 中,我们要将外界的 in chan 赋值给 scheduler 的 workChann,所以需要改变 workChann 的值,需要使用指针接收者。

二、并发版爬虫架构 – 队列调度器

1.1、架构

1556594510-9572-5842684-bb0118a739fafe94

服务启动时,初始化 Scheduler 的 requestChann(用于接收 Request)和 workerChan(用于接收 Worker 的 chan Request,注意:每一个 Worker 都有一个 chan Request),然后 Scheduler 启动一个 go routine,不断的将 requestChann 接收到的 Request 存储到 requestQ 切片中,将 workerChan 接收到的 chan Request 存储到 workerQ 切片中,并将 requestQ 中的 Request 分配给 workerQ 中的 chan Request(即由某一个 Worker 来处理)。
注意:在 Scheduler 中有两个 Go routine。

一个是主 Go routine,用于接收 Engine 传来的 Request 到 requestChann 以及接收 Worker 传来的 chan Request 到 workerChan 中
另一个是 Scheduler 显示启动的一个 Go routine,用于将 requestChann 接收到的 Request 存储到 requestQ 切片中,将 workerChan 接收到的 chan Request 存储到 workerQ 切片中,并将 requestQ 中的 Request 分配给 workerQ 中的 chan Request

创建指定数量个 Go routine,每一个 Go routine 做以下几件事:

  • 1. 创建 Worker 自己的 w chan Request,并发送到 Scheduler 的 workerChan 中
  • 2. 从 w 中阻塞等待获取 Request(如果 Scheduler 的分配 Go routine 分配了 Request 到该 Worker 的 w,则获取成功)
  • 3. 使用 Worker 对 获取到的 Request 做 fetch 和 parse 的操作,将 parseResult 阻塞发送到 out chan

Engine 将 seeds 中的 Request 添加到 Scheduler 的 requestChann
之后开启死循环,不断从 out chan 中接收 parseResult,然后将 parseResult.items 进行打印,将 parseResult.requests 加入到 scheduler 的 requestChann

1.2、实现

1556594517-3253-5842684-ef18b36002d77a36

调度器 scheduler.go

package scheduler

import (
    "github.com/zhaojigang/crawler/model"
)

// 调度器接口
type Scheduler interface {
    ReadyNotifier
    Submit(request model.Request)
    WorkerChann() chan model.Request
    Run()
}

通知器 readyNotifier.go

package scheduler

import "github.com/zhaojigang/crawler/model"

type ReadyNotifier interface {
    WorkerReady(chan model.Request)
}

队列调度器 queued.go

package scheduler

import (
   "github.com/zhaojigang/crawler/model"
)

type QueuedScheduler struct {
   requestChann chan model.Request
   // 每一个Worker都有一个自己的chan Request
   // workerChan中存放的是Worker们的chan
   workerChan chan chan model.Request
}

func (s *QueuedScheduler) WorkerChann() chan model.Request {
   return make(chan model.Request)
}

func (s *QueuedScheduler) Submit(request model.Request) {
   s.requestChann <- request
}

func (s *QueuedScheduler) WorkerReady(w chan model.Request) {
   s.workerChan <- w
}

func (s *QueuedScheduler) Run() {
   // 初始化 requestChann
   s.requestChann = make(chan model.Request)
   // 初始化 workerChan
   s.workerChan = make(chan chan model.Request)

   // 创建一个 goroutine
   // 1. 进行request以及Worker的chan的存储
   // 2. 分发request到worker的chan中
   go func() {
      var requestQ []model.Request
      var workerQ []chan model.Request
      for {
         var activeRequest model.Request
         var activeWorker chan model.Request
         if len(requestQ) > 0 && len(workerQ) > 0 {
            activeRequest = requestQ[0]
            activeWorker = workerQ[0]
         }

         select {
         case r := <-s.requestChann:
            // 如果开始requestQ=nil,append之后就是包含一个r元素的切片
            requestQ = append(requestQ, r)
         case w := <-s.workerChan:
            workerQ = append(workerQ, w)
            // 进行request的分发
         case activeWorker <- activeRequest:
            requestQ = requestQ[1:]
            workerQ = workerQ[1:]
         }
      }
   }()
}

并发引擎 ConcurrentEngine.go

package engine

import (
   "github.com/zhaojigang/crawler/fetcher"
   "github.com/zhaojigang/crawler/model"
   "github.com/zhaojigang/crawler/scheduler"
   "log"
)

// 并发引擎
type ConcurrentEngine struct {
   // 调度器
   Scheduler scheduler.Scheduler
   // 开启的 worker 数量
   WorkerCount int
}

func (e ConcurrentEngine) Run(seeds ...model.Request) {
   // 初始化 Scheduler 的队列,并启动配对 goroutine
   e.Scheduler.Run()
   out := make(chan model.ParseResult)
   for i := 0; i < e.WorkerCount; i++ {
      // 每个 Worker 都创建自己的一个 chan Request
      createWorker(e.Scheduler.WorkerChann(), out, e.Scheduler);
   }
   for _, r := range seeds {
      e.Scheduler.Submit(r)
   }

   for {
      result := <-out // 阻塞获取
      for _, item := range result.Items {
         log.Printf("getItems, items: %v", item)
      }

      for _, r := range result.Requests {
         e.Scheduler.Submit(r)
      }
   }
}

func createWorker(in chan model.Request, out chan model.ParseResult, notifier scheduler.ReadyNotifier) {
   go func() {
      for {
         notifier.WorkerReady(in)
         r := <-in // 阻塞等待获取
         result, err := worker(r)
         if err != nil {
            continue
         }
         out <- result // 阻塞发送
      }
   }()
}

func worker(r model.Request) (model.ParseResult, error) {
   log.Printf("fetching url:%s", r.Url)
   body, err := fetcher.Fetch(r.Url)
   if err != nil {
      log.Printf("fetch error, url: %s, err: %v", r.Url, err)
      return model.ParseResult{}, nil
   }
   return r.ParserFunc(body), nil
}

启动器 main.go

package main

import (
    "github.com/zhaojigang/crawler/engine"
    "github.com/zhaojigang/crawler/model"
    "github.com/zhaojigang/crawler/scheduler"
    "github.com/zhaojigang/crawler/zhenai/parser"
)

func main() {
    engine.ConcurrentEngine{
        Scheduler:   &scheduler.QueuedScheduler{},
        WorkerCount: 100,
    }.Run(model.Request{
        // 种子 Url
        Url:        "http://www.zhenai.com/zhenghun",
        ParserFunc: parser.ParseCityList,
    })
}

队列调度器与并发调度器的对比

  • 队列调度器不需要为每一个 “将 Request 添加到 chan的任务” 创建一个 Go routine 来执行
  • 队列调度器需要为每一个 Worker 创建一个 chan Request
  • 队列调度器编码较为复杂,并发调度器编码简单,且虽然会创建大量的
  • Go routine,但是由于协程的轻量性,一般而言,问题不大

 

Go 语言入门12 – 实战项目之单任务版爬虫

  • 单人版爬虫:一个 Goroutine 运行整个爬虫项目
  • 并发版爬虫:多个 Goroutine 在一台机器上实现爬虫项目
  • 分布式爬虫:多个 Goroutine 在多台机器上实现爬虫项目

一、爬虫整体算法

该爬虫项目爬取的是珍爱网的数据,总体算法如下

5842684-fbe951dcef43f94c

  • 首先根据城市列表 Url 爬取城市列表,爬取出来的内容通过城市列表解析器解析出来每一个城市的 Url
  • 然后根据每一个城市的 Url 爬取该城市的用户信息列表,通过城市解析器将用户信息列表中的用户 Url 解析出来
  • 最后根据每一个用户的 Url 爬取该用户的详细信息,并进行解析

三种 Url 示例:

  • 城市列表 Url:http://www.zhenai.com/zhenghun
  • 城市 Url:http://www.zhenai.com/zhenghun/aba
  • 用户 Url:http://album.zhenai.com/u/1902329077

二、单任务版爬虫架构

5842684-fbe951dcef43f94c

  • 1.会将种子 Url Seed 连同其解析器 Parser 封装为一个 Request,放入 Engine 引擎中的任务队列(其实就是 []Request 切片)中,启动爬取任务(这里的 Seed 就是城市列表 Url)
  • 2. Engine 使用 Fetcher 爬取该 Url 的内容 text,然后使用对应 Url 的Parser 解析该 text,将解析出来的 Url(例如,城市 Url)和其 Parser 封装为 Request 加入 Engine 任务队列,将解析出来的 items(例如,城市名)打印出来
  • 3. Engine 不断的从其任务队列中获取任务 Request 一个个进行串行执行(使用 Fetcher 对 Request.Url 进行爬取,使用 Request.Parser 对爬取出来的 text 进行解析,将解析出来的内容部分进行封装为Request,进行后续循环,部分进行打印)

三、代码实现

5842684-fbe951dcef43f94c

3.1 请求与解析结果类型结构 type.go

package engine

// 请求任务封装体
type Request struct {
   // 需爬取的 Url
   Url string
   // Url 对应的解析函数
   ParserFunc func([]byte) ParseResult
}

// 解析结果
type ParseResult struct {
   // 解析出来的多个 Request 任务
   Requests []Request
   // 解析出来的实体(例如,城市名),是任意类别(interface{},类比 java Object)
   Items    []interface{}
}

3.2 执行引擎 engine.go

package engine

import (
   "fetcher"
   "log"
)

func Run(seeds ...Request) {
   // Request 任务队列
   var requests []Request
   // 将 seeds Request 放入 []requests,即初始化 []requests
   for _, r := range seeds {
      requests = append(requests, r)
   }
   // 执行任务
   for len(requests) > 0 {
      // 1. 获取第一个 Request,并从 []requests 移除,实现了一个队列功能
      r := requests[0]
      requests = requests[1:]

      // 2. 使用爬取器进行对 Request.Url 进行爬取
      body, err := fetcher.Fetch(r.Url)
      // 如果爬取出错,记录日志
      if err != nil {
         log.Printf("fetch error, url: %s, err: %v", r.Url, err)
         continue
      }

      // 3. 使用 Request 的解析函数对怕渠道的内容进行解析
      parseResult := r.ParserFunc(body)
      // 4. 将解析体中的 []Requests 加到请求任务队列 requests 的尾部
      requests = append(requests, parseResult.Requests...)

      // 5. 遍历解析出来的实体,直接打印
      for _, item := range parseResult.Items {
         log.Printf("getItems, url: %s, items: %v", r.Url, item)
      }
   }
}

3.3 爬取器 fetcher.go

package fetcher

import (
   "fmt"
   "io/ioutil"
   "net/http"
)

func Fetch(url string) ([]byte, error) {
   // 1. 爬取 url
   resp, err := http.Get(url)
   if err != nil {
      return nil, err
   }
   defer resp.Body.Close()

   if resp.StatusCode != http.StatusOK {
      return nil, fmt.Errorf("wrong statusCode, %d", resp.StatusCode)
   }
   // 2. 读取响应体并返回
   return ioutil.ReadAll(resp.Body)
}

3.4 三种解析器

  • 城市列表解析器 citylist.go
package parser

import (
   "engine"
   "regexp"
)

const cityListRe = `<a href="(http://www.zhenai.com/zhenghun/[0-9a-z]+)"[^>]*>([^<]*)</a>`

// cityList 的 ParserFunc func([]byte) ParseResult
// 解析种子页面 - 获取城市列表
func ParseCityList(contents []byte) engine.ParseResult {
   result := engine.ParseResult{}
   // 正则表达式:()用于提取
   rg := regexp.MustCompile(cityListRe)
   allSubmatch := rg.FindAllSubmatch(contents, -1)
   // 遍历每一个城市的匹配字段(城市 Url 和城市名),并且将 Url 和城市解析器封装为一个 Request
   // 最后将该 Request 添加到 ParseResult 中
   for _, m := range allSubmatch {
      result.Items = append(result.Items, "city "+string(m[2]))
      result.Requests = append(result.Requests, engine.Request{
         Url:        string(m[1]),
         ParserFunc: ParseCity,
      })
   }
   // 返回 ParseResult
   return result
}

学习 Go 正则表达式的使用

  • 城市解析器 city.go
package parser

import (
   "engine"
   "regexp"
)

// match[1]=url match[2]=name
const cityRe = `<a href="(http://album.zhenai.com/u/[0-9]+)"[^>]*>([^<]+)</a>`

// 解析单个城市 - 获取单个城市的用户列表
func ParseCity(contents []byte) engine.ParseResult {
   result := engine.ParseResult{}
   rg := regexp.MustCompile(cityRe)
   allSubmatch := rg.FindAllSubmatch(contents, -1)
   for _, m := range allSubmatch {
      name := string(m[2])
      result.Items = append(result.Items, "user "+name)
      result.Requests = append(result.Requests, engine.Request{
         Url: string(m[1]),
         ParserFunc: func(c []byte) engine.ParseResult {
            return ParseProfile(c, name) // 函数式编程,使用函数包裹函数
         },
      })
   }

   return result
}

学习函数式编程:使用函数包裹函数,即函数的返回值和入参都可以是函数。

  • 用户解析器 profile.go
package parser

import (
   "github.com/zhaojigang/crawler/engine"
   "github.com/zhaojigang/crawler/model"
   "regexp"
   "strconv"
)

var ageRe = regexp.MustCompile(`<td><span class=""label">年龄:</span>([\d])+岁</td>`)
var incomeRe = regexp.MustCompile(`<td><span class=""label">月收入:</span>([^<]+)</td>`)

// 解析单个人的主页
func ParseProfile(contents []byte, name string) engine.ParseResult {
   profile := model.Profile{}

   // 1. 年龄
   age, err := strconv.Atoi(extractString(contents, ageRe))
   if err == nil {
      profile.Age = age
   }

   // 2. 月收入
   profile.Income = extractString(contents, incomeRe)

   // 3. 姓名
   profile.Name = name

   result := engine.ParseResult{
      Items: []interface{}{profile},
   }
   return result
}

func extractString(body []byte, re *regexp.Regexp) string {
   match := re.FindSubmatch(body) // 只找到第一个match的
   if len(match) >= 2 {
      return string(match[1])
   }
   return ""
}

profile 实体类

package model

type Profile struct {
   // 姓名
   Name string
   // 年龄
   Age int
   // 收入
   Income string
}

3.5 启动器 main.go

package main

import (
   "github.com/zhaojigang/crawler/engine"
   "github.com/zhaojigang/crawler/zhenai/parser"
)

func main() {
   engine.Run(engine.Request{
      // 种子 Url
      Url:        "http://www.zhenai.com/zhenghun",
      ParserFunc: parser.ParseCityList,
   })
}

解析器测试类

package parser

import (
   "io/ioutil"
   "testing"
)

func TestParseCityList(t *testing.T) {
   expectRequestsLen := 470
   expectCitiesLen := 470
   // 表格驱动测试
   expectRequestUrls := []string{
      "http://www.zhenai.com/zhenghun/aba",
      "http://www.zhenai.com/zhenghun/akesu",
      "http://www.zhenai.com/zhenghun/alashanmeng",
   }
   expectRequestCities := []string{
      "city 阿坝",
      "city 阿克苏",
      "city 阿拉善盟",
   }

   body, err := ioutil.ReadFile("citylist_test_data.html")
   if err != nil {
      panic(err)
   }
   result := ParseCityList(body)

   if len(result.Requests) != expectRequestsLen {
      t.Errorf("expect requestLen %d, but %d", expectRequestsLen, len(result.Requests))
   }
   if len(result.Items) != expectCitiesLen {
      t.Errorf("expect citiesLen %d, but %d", expectCitiesLen, len(result.Items))
   }

   for i, url := range expectRequestUrls {
      if url != result.Requests[i].Url {
         t.Errorf("expect url %s, but %s", url, result.Requests[i].Url)
      }
   }

   for i, city := range expectRequestCities {
      if city != result.Items[i] {
         t.Errorf("expect url %s, but %s", city, result.Items[i])
      }
   }
}

学习经典的 Go 表格驱动测试。

执行 main 函数发现执行的很慢,因为只有一个 main Go routine 在执行,还有网络 IO,所以比较慢,接下来,将单任务版的改造成多个 Go routine 共同执行的并发版的爬虫。

1141516171832
 
Copyright © 2008-2021 lanxinbase.com Rights Reserved. | 粤ICP备14086738号-3 |