Go 语言入门13 – 实战项目之并发版爬虫

单任务版的爬虫很慢,因为只有一个 main Goroutine 在执行,最慢的地方就是爬取(fetch)和解析(parse),我们可以将同一个 url 的这两部分合并成一个任务(worker),然后使用多个 Goroutine 去并行的执行这多个 worker,这样速度就会有极大的提升。

一、并发版爬虫 – 并发调度器

1.1、架构

1556594489-4817-5842684-9885ac4881bc3a58

  1. 服务启动时,创建两个 chan,in(用于接收 Request)和 out(用于接收 ParseResult),并且将 in 赋值给 scheduler 的任务 chan(即 scheduler 后续操作的 chan 其实就是 in)
  2. 创建指定数量个 Goroutine,每一个 Goroutine 做以下几件事:

2.1. 从 in chan 中阻塞等待获取 Request
2.2. 使用 Worker 对 获取到的Request 做 fetch 和 parse 的操作,将 parseResult 阻塞发送到 out chan

  1. Engine 将 seeds 中的 Request 添加到调度器 chann,
  2. 之后开启死循环,不断从 out chan 中接收 parseResult,然后将 parseResult.items 进行打印,将 parseResult.requests 加入到 scheduler 的 chan,实际上就是 in chan(当然,每一个 Request 加入 in 的操作都是由一个新的 Goroutine 来完成的,原因为本小节末会讲)

1.2、实现

1556594496-2334-5842684-318244d050f83249

由于项目结构进行了调整,列出发生修改的所有代码。

爬取器 fetcher 和解析器 parser 与之前相同,模型类也不变。

调度器接口 scheduler.go

package scheduler

import (
    "github.com/zhaojigang/crawler/model"
)

// 调度器接口
type Scheduler interface {
    // 提交 Request 到调度器的 request 任务通道中
    Submit(request model.Request)
    // 初始化当前的调度器实例的 request 任务通道
    ConfigureMasterWorkerChan(chan model.Request)
}

并发调度器 simple.go

package scheduler

import (
    "github.com/zhaojigang/crawler/model"
)

type SimpleScheduler struct {
    workerChan chan model.Request
}

// 为什么使用指针接收者,需要改变 SimpleScheduler 内部的 workerChan
// https://stackoverflow.com/questions/27775376/value-receiver-vs-pointer-receiver-in-golang
// https://studygolang.com/articles/1113
// https://blog.csdn.net/suiban7403/article/details/78899671
func (s *SimpleScheduler) ConfigureMasterWorkerChan(in chan model.Request) {
    s.workerChan = in
}

func (s *SimpleScheduler) Submit(request model.Request) {
    // 每个 Request 一个 Goroutine
    go func() { s.workerChan <- request }()
}

注意:

  • 这里为每一个将 Request 添加到 chan 的任务都开启一个 Goroutine 来执行,为什么?
  • 方法为何使用指针接收者而不是值接收者?
package engine

import (
    "github.com/zhaojigang/crawler/fetcher"
    "github.com/zhaojigang/crawler/model"
    "github.com/zhaojigang/crawler/scheduler"
    "log"
)

// 并发引擎
type ConcurrentEngine struct {
    // 调度器
    Scheduler scheduler.Scheduler
    // 开启的 worker 数量
    WorkerCount int
}

func (e ConcurrentEngine) Run(seeds ...model.Request) {
    in := make(chan model.Request)
    out := make(chan model.ParseResult)
    // 初始化调度器的 chann
    e.Scheduler.ConfigureMasterWorkerChan(in)
    // 创建 WorkerCount 个 worker
    for i := 0; i < e.WorkerCount; i++ {
        createWorker(in, out);
    }
    // 将 seeds 中的 Request 添加到调度器 chann
    for _, r := range seeds {
        e.Scheduler.Submit(r)
    }

    for {
        result := <-out // 阻塞获取
        for _, item := range result.Items {
            log.Printf("getItems, items: %v", item)
        }

        for _, r := range result.Requests {
            // 如果 submit 内部直接是 s.workerChan <- request,则阻塞等待发送,该方法阻塞在这里
            // 如果 submit 内部直接是 go func() { s.workerChan <- request }(),则为每个Request分配了一个Goroutine,这里不会阻塞在这里
            e.Scheduler.Submit(r)
        }
    }
}

func createWorker(in chan model.Request, out chan model.ParseResult) {
    go func() {
        for {
            r := <-in // 阻塞等待获取
            result, err := worker(r)
            if err != nil {
                continue
            }
            out <- result // 阻塞发送
        }
    }()
}

func worker(r model.Request) (model.ParseResult, error) {
    log.Printf("fetching url:%s", r.Url)
    body, err := fetcher.Fetch(r.Url)
    if err != nil {
        log.Printf("fetch error, url: %s, err: %v", r.Url, err)
        return model.ParseResult{}, nil
    }
    return r.ParserFunc(body), nil
}

启动器 main.go

package main

import (
    "github.com/zhaojigang/crawler/engine"
    "github.com/zhaojigang/crawler/model"
    "github.com/zhaojigang/crawler/scheduler"
    "github.com/zhaojigang/crawler/zhenai/parser"
)

func main() {
    engine.ConcurrentEngine{
        Scheduler:   &scheduler.SimpleScheduler{},
        WorkerCount: 1000,
    }.Run(model.Request{
        // 种子 Url
        Url:        "http://www.zhenai.com/zhenghun",
        ParserFunc: parser.ParseCityList,
    })
}

疑问:

Q1. 为什么在 scheduler 中每一个将 Request 添加到 chan 的任务都开启一个 Go routine 来执行?

A:在 Go 语言学习9 – Channel 一节描述过,对于无缓冲的 channel,如果两个 go routine 没有同时准备好,通道会导致先执行发送或接收操作的 go routine 阻塞等待,假设使用 s.workerChan <- request 而不是 go func() { s.workerChan <- request }(),假设开启了 10 个 Worker Go routine,这 10 个 go routine 阻塞在 r := <-in 阻塞等待获取 Request 上,假设 seeds 大于 10,例如 11,那么当 Engine 的这个循环执行到底 11 个的时候,将陷入等待,因为所有的10个 Worker go routine 此时都可能也处于等待中,即 in chan 没有接收方准备好接收数据,所以 engine 作为发送方也要阻塞等待;那么为什么10个 Worker go routine 都会处于等待中呢?

for _, r := range seeds {
        e.Scheduler.Submit(r)
}

 

 go func() {
        for {
            r := <-in // 阻塞等待获取
            result, err := worker(r)
            if err != nil {
                continue
            }
            out <- result // 阻塞发送
        }
    }()

Q2. scheduler 方法为何使用指针接收者而不是值接收者?

A:在 Go 语言学习5 – 面向接口 中我们详细的介绍了什么时候使用指针接收者,什么时候使用值接收者,其中最重要的两条就是 “1. 如果要改变接收者内部的属性值,必须使用指针接收者,因为值接收者是对接收者副本的操作;2. 如果 struct 内一个方法是指针接收者,那么其全部方法都是用指针接收者”,在 scheduler 中,我们要将外界的 in chan 赋值给 scheduler 的 workChann,所以需要改变 workChann 的值,需要使用指针接收者。

二、并发版爬虫架构 – 队列调度器

1.1、架构

1556594510-9572-5842684-bb0118a739fafe94

服务启动时,初始化 Scheduler 的 requestChann(用于接收 Request)和 workerChan(用于接收 Worker 的 chan Request,注意:每一个 Worker 都有一个 chan Request),然后 Scheduler 启动一个 go routine,不断的将 requestChann 接收到的 Request 存储到 requestQ 切片中,将 workerChan 接收到的 chan Request 存储到 workerQ 切片中,并将 requestQ 中的 Request 分配给 workerQ 中的 chan Request(即由某一个 Worker 来处理)。
注意:在 Scheduler 中有两个 Go routine。

一个是主 Go routine,用于接收 Engine 传来的 Request 到 requestChann 以及接收 Worker 传来的 chan Request 到 workerChan 中
另一个是 Scheduler 显示启动的一个 Go routine,用于将 requestChann 接收到的 Request 存储到 requestQ 切片中,将 workerChan 接收到的 chan Request 存储到 workerQ 切片中,并将 requestQ 中的 Request 分配给 workerQ 中的 chan Request

创建指定数量个 Go routine,每一个 Go routine 做以下几件事:

  • 1. 创建 Worker 自己的 w chan Request,并发送到 Scheduler 的 workerChan 中
  • 2. 从 w 中阻塞等待获取 Request(如果 Scheduler 的分配 Go routine 分配了 Request 到该 Worker 的 w,则获取成功)
  • 3. 使用 Worker 对 获取到的 Request 做 fetch 和 parse 的操作,将 parseResult 阻塞发送到 out chan

Engine 将 seeds 中的 Request 添加到 Scheduler 的 requestChann
之后开启死循环,不断从 out chan 中接收 parseResult,然后将 parseResult.items 进行打印,将 parseResult.requests 加入到 scheduler 的 requestChann

1.2、实现

1556594517-3253-5842684-ef18b36002d77a36

调度器 scheduler.go

package scheduler

import (
    "github.com/zhaojigang/crawler/model"
)

// 调度器接口
type Scheduler interface {
    ReadyNotifier
    Submit(request model.Request)
    WorkerChann() chan model.Request
    Run()
}

通知器 readyNotifier.go

package scheduler

import "github.com/zhaojigang/crawler/model"

type ReadyNotifier interface {
    WorkerReady(chan model.Request)
}

队列调度器 queued.go

package scheduler

import (
   "github.com/zhaojigang/crawler/model"
)

type QueuedScheduler struct {
   requestChann chan model.Request
   // 每一个Worker都有一个自己的chan Request
   // workerChan中存放的是Worker们的chan
   workerChan chan chan model.Request
}

func (s *QueuedScheduler) WorkerChann() chan model.Request {
   return make(chan model.Request)
}

func (s *QueuedScheduler) Submit(request model.Request) {
   s.requestChann <- request
}

func (s *QueuedScheduler) WorkerReady(w chan model.Request) {
   s.workerChan <- w
}

func (s *QueuedScheduler) Run() {
   // 初始化 requestChann
   s.requestChann = make(chan model.Request)
   // 初始化 workerChan
   s.workerChan = make(chan chan model.Request)

   // 创建一个 goroutine
   // 1. 进行request以及Worker的chan的存储
   // 2. 分发request到worker的chan中
   go func() {
      var requestQ []model.Request
      var workerQ []chan model.Request
      for {
         var activeRequest model.Request
         var activeWorker chan model.Request
         if len(requestQ) > 0 && len(workerQ) > 0 {
            activeRequest = requestQ[0]
            activeWorker = workerQ[0]
         }

         select {
         case r := <-s.requestChann:
            // 如果开始requestQ=nil,append之后就是包含一个r元素的切片
            requestQ = append(requestQ, r)
         case w := <-s.workerChan:
            workerQ = append(workerQ, w)
            // 进行request的分发
         case activeWorker <- activeRequest:
            requestQ = requestQ[1:]
            workerQ = workerQ[1:]
         }
      }
   }()
}

并发引擎 ConcurrentEngine.go

package engine

import (
   "github.com/zhaojigang/crawler/fetcher"
   "github.com/zhaojigang/crawler/model"
   "github.com/zhaojigang/crawler/scheduler"
   "log"
)

// 并发引擎
type ConcurrentEngine struct {
   // 调度器
   Scheduler scheduler.Scheduler
   // 开启的 worker 数量
   WorkerCount int
}

func (e ConcurrentEngine) Run(seeds ...model.Request) {
   // 初始化 Scheduler 的队列,并启动配对 goroutine
   e.Scheduler.Run()
   out := make(chan model.ParseResult)
   for i := 0; i < e.WorkerCount; i++ {
      // 每个 Worker 都创建自己的一个 chan Request
      createWorker(e.Scheduler.WorkerChann(), out, e.Scheduler);
   }
   for _, r := range seeds {
      e.Scheduler.Submit(r)
   }

   for {
      result := <-out // 阻塞获取
      for _, item := range result.Items {
         log.Printf("getItems, items: %v", item)
      }

      for _, r := range result.Requests {
         e.Scheduler.Submit(r)
      }
   }
}

func createWorker(in chan model.Request, out chan model.ParseResult, notifier scheduler.ReadyNotifier) {
   go func() {
      for {
         notifier.WorkerReady(in)
         r := <-in // 阻塞等待获取
         result, err := worker(r)
         if err != nil {
            continue
         }
         out <- result // 阻塞发送
      }
   }()
}

func worker(r model.Request) (model.ParseResult, error) {
   log.Printf("fetching url:%s", r.Url)
   body, err := fetcher.Fetch(r.Url)
   if err != nil {
      log.Printf("fetch error, url: %s, err: %v", r.Url, err)
      return model.ParseResult{}, nil
   }
   return r.ParserFunc(body), nil
}

启动器 main.go

package main

import (
    "github.com/zhaojigang/crawler/engine"
    "github.com/zhaojigang/crawler/model"
    "github.com/zhaojigang/crawler/scheduler"
    "github.com/zhaojigang/crawler/zhenai/parser"
)

func main() {
    engine.ConcurrentEngine{
        Scheduler:   &scheduler.QueuedScheduler{},
        WorkerCount: 100,
    }.Run(model.Request{
        // 种子 Url
        Url:        "http://www.zhenai.com/zhenghun",
        ParserFunc: parser.ParseCityList,
    })
}

队列调度器与并发调度器的对比

  • 队列调度器不需要为每一个 “将 Request 添加到 chan的任务” 创建一个 Go routine 来执行
  • 队列调度器需要为每一个 Worker 创建一个 chan Request
  • 队列调度器编码较为复杂,并发调度器编码简单,且虽然会创建大量的
  • Go routine,但是由于协程的轻量性,一般而言,问题不大

 

Go 语言入门11 – 并发之协程池

提供一个 go routine 池,每个 go routine 循环阻塞等待从任务池中执行任务;外界使用者不断的往任务池里丢任务,则 go routine 池中的多个 go routine 会并发的处理这些任务

一、worker/workPool.go

import "sync"

type Worker interface {
   Task()
}

type Pool struct {
   wg sync.WaitGroup
   // 工作池
   taskPool chan Worker
}

func New(maxGoroutineNum int) *Pool {
   // 1. 初始化一个 Pool
   p := Pool{
      taskPool: make(chan Worker),
   }

   p.wg.Add(maxGoroutineNum)
   // 2. 创建 maxGoroutineNum 个 goroutine,并发的从 taskPool 中获取任务
   for i := 0; i < maxGoroutineNum; i++ {
      go func() {
         for task := range p.taskPool { // 阻塞获取,一旦没有任务,阻塞在这里 - 无缓冲 channel
            // 3. 执行任务
            task.Task()
         }
         p.wg.Done()
      }()
   }

   return &p
}

// 提交任务到worker池中
func (p *Pool) Run(worker Worker) {
   p.taskPool <- worker
}

func (p *Pool) Shutdown() {
   // 关闭通道
   close(p.taskPool)
   p.wg.Wait()
}

 

二、namePrinter/namePrinter.go

import (
   "fmt"
   "time"
)

type NamePrinter struct {
   Name string
}

func (np *NamePrinter) Task()  {
   fmt.Println(np.Name)
   time.Sleep(time.Second)
}

三、main.go

import (
   "github.com/zhaojigang/worker/worker"
   "sync"
   "github.com/zhaojigang/worker/namePrinter"
)

var names = []string{
   "steve",
   "bob",
}

func main() {

   // 1. 启动两个 goroutine,等待执行任务
   p := worker.New(2)

   var wg sync.WaitGroup
   wg.Add(3 * len(names))

   // 2. 创建 worker,扔到 goroutine 池中
   for i := 0; i < 3; i++ {
      for _, namex := range names {
         worker := namePrinter.NamePrinter{
            Name:namex,
         }
         go func() {
            p.Run(&worker)
            wg.Done()
         }()
      }
   }

   // 3. 等待添加任务完毕
   wg.Wait()

   // 4. 关闭 goroutine 池
   p.Shutdown()
}

Go 语言入门10 – 并发之资源池

提供一个资源池,类似于数据库连接池的功能;资源池在 go 1.11.1 中有官方实现:sync/pool.go

一、资源池

import "log"

package pool

import (
"sync"
"io"
"errors"
"log"
)

// 声明池类结构体
type Pool struct {
   // 锁
   lock sync.Mutex
   // 池中存储的资源
   resources chan io.Closer
   // 资源创建工厂函数
   factory func() (io.Closer, error)
   // 池是否已经被关闭
   closed bool
}

// 创建池类实例的工厂函数
// 工厂函数名通常使用 New 名字
func New(fn func() (io.Closer, error), size int) (*Pool, error) {
   if size <= 0 {
      return nil, errors.New("size too small");
   }

   return &Pool{
      resources: make(chan io.Closer, size),
      factory:   fn,
   }, nil
}

// 从池中获取一个资源
func (p *Pool) Acquire() (io.Closer, error) {
   // select - default 经典模式,将阻塞形式的 channel 改为了非阻塞,当 <-p.resources 不能立即返回时,执行 default
   // 当然,如果没有 default,那么还是要阻塞在 <-p.resources 上的
   select {
   // 检查是否有空闲的资源
   case r, ok := <-p.resources:
      log.Println("Acquire:", "Shared Resource")
      if !ok {
         return nil, errors.New("pool already closed")
      }
      return r, nil
   default:
      log.Println("Acquire:", "New Resource")
      // 调用资源创建函数创建资源
      return p.factory()
   }
}

// 将一个使用后的资源放回池里
func (p *Pool) Release(r io.Closer) {
   // 注意:Release 和 Close 使用的是同一把锁,就是说二者同时只能执行一个,防止资源池已经关闭了,release 还向资源池放资源
   // 向一个已经关闭的 channel 发送消息,会发生 panic: send on closed channel
   p.lock.Lock()
   defer p.lock.Unlock()

   // 如果池已经被关闭,销毁这个资源
   if p.closed {
      r.Close()
      return
   }

   select {
   // 试图将这个资源放入队列
   case p.resources <- r:
      log.Println("Release:", "In Queue")
   default:
      log.Println("Release:", "Closing")
      r.Close()
   }
}

// 关闭资源池,并关闭所有现有的资源
func (p *Pool) Close() {
   p.lock.Lock()
   defer p.lock.Unlock()

   if p.closed {
      return
   }

   p.closed = true

   // 在清空通道里的资源之前,将通道关闭
   close(p.resources)

   // 关闭资源
   for r := range p.resources {
      r.Close()
   }
}

select – default 经典模式,将阻塞形式的 channel 改为了非阻塞,当 <-p.resources 不能立即返回时,执行 default;当然,如果没有 default,那么还是要阻塞在 <-p.resources

二、具体的资源类

import (
   "io"
   "log"
   "sync/atomic"
)

package db

import (
"log"
"io"
"sync/atomic"
)

// 给每个连接分配一个独一无二的id
var idCounter int32

// 资源 - 数据库连接
type DBConnection struct {
   ID int32
}

// dbConnection 实现了 io.Closer 接口
// 关闭资源
func (conn *DBConnection) Close() error {
   log.Println("conn closed")
   return nil
}

// 创建一个资源 - dbConnection
func CreateConn() (io.Closer, error) {
   id := atomic.AddInt32(&idCounter, 1)
   log.Println("Create conn, id:", id)
   return &DBConnection{
      ID: id,
   }, nil
}

三、使用资源池

package main

import (
    "sync"
    "github.com/zhaojigang/pool/pool"
    "github.com/zhaojigang/pool/db"
    "log"
    "time"
    "math/rand"
)

const (
    maxGoroutines   = 5 // 要使用的goroutine的数量
    pooledResources = 2 // 池中的资源的数量
)

func performQuery(query int, p *pool.Pool) {
    // 1. 获取连接
    conn, err := p.Acquire()
    if err != nil {
        log.Println("acquire conn error, ", err)
        return
    }

    // 使用结束后,释放链接
    defer p.Release(conn)

    // 该 log 模拟对连接的使用
    time.Sleep(time.Duration(rand.Intn(1000)) * time.Millisecond)
    log.Printf("QID[%d] CID[%d]\n", query, conn.(*db.DBConnection).ID)
}

func main() {
    var waitGroup sync.WaitGroup
    waitGroup.Add(maxGoroutines)
    // 1. 创建一个 Pool
    p, err := pool.New(db.CreateConn, pooledResources)
    if err != nil {
        log.Println("create Pool error")
    }

    // 2. 开启 goroutine 执行任务
    for query := 0; query < maxGoroutines; query++ {
        // 每个goroutine需要自己复制一份要、查询值的副本,
        // 不然所有的查询会共享同一个查询变量,即所有的 goroutine 最后的 query 值都是3
        go func(q int) {
            performQuery(q, p)
            waitGroup.Done()
        }(query)
        //time.Sleep(1000*time.Millisecond) // 用于测试从 resources channel 中获取资源
    }

    // 3. 关闭连接池
    waitGroup.Wait()
    p.Close()
    log.Println("pool closed - main")
}

在高并发的创建 go routine 的情况下,从 pool.go # Acquire 方法中可以看到,大家可能都还没有 Release 资源,此时都会创建资源,资源在一瞬间会大量增加,在实际系统中,需要根据需求,做一些措施,例如:提前创建好资源放入池中,go routine 都从池中取资源,资源不够就等待,使用完之后就放入池中,防止资源意外关闭,还可以启用后台线程监控等。

 

Go 语言入门9 – Channel(通道)

当一个资源需要在 goroutine 之间共享时,通道在 goroutine 之间架起了一个管道,并提供了确保同步交换数据的机制。(这是除了 atomic 和 mutex 之外的第三种处理竞态资源的方式)
Channel分为两种:

  • 无缓冲的 Channel
    • 无缓冲的通道(unbuffered channel)是指在接收前没有能力保存任何值的通道。这种类型的通道要求发送 goroutine 和接收 goroutine 同时准备好,才能完成发送和接收操作。如果两个 goroutine 没有同时准备好,通道会导致先执行发送或接收操作的 goroutine 阻塞等待。这种对通道进行发送和接收的交互行为本身就是同步的。
  • 有缓冲的 Channel
    • 有缓冲的通道(buffered channel)是一种在被接收前能存储一个或者多个值的通道。这种类型的通道并不强制要求 goroutine 之间必须同时完成发送和接收。只有在通道中没有要接收的值时,接收动作才会阻塞。只有在通道没有可用缓冲区容纳被发送的值时,发送动作才会阻塞。

一、无缓冲的通道

import (
    "sync"
    "fmt"
    "math/rand"
)

var wg sync.WaitGroup

// Channel 完整的类型是 "chan 数据类型"
func player(name string, court chan int) {
    defer wg.Done()

    for {
        // 1. 阻塞等待接球,如果通道关闭,ok返回false
        ball, ok := <-court
        if !ok {
            fmt.Printf("channel already closed! Player %s won\n", name)
            return
        }

        random := rand.Intn(100)
        if random%13 == 0 {
            fmt.Printf("Player %s Lose\n", name)
            // 关闭通道
            close(court)
            return
        }

        fmt.Printf("Player %s Hit %d\n", name, ball)
        ball ++
        // 2. 发球,阻塞等待对方接球
        court <- ball
    }
}

// 两个 player 打网球,即生产者和消费者模式(互为生产者和消费者)
func main() {
    wg.Add(2)

    // 1. 创建一个无缓冲的通道
    // Channel 完整的类型是 "chan 数据类型"
    court := make(chan int)

    // 2. 创建两个 goroutine
    go player("zhangsan", court)
    go player("lisi", court)

    // 3. 发球:向通道发送数据,阻塞等待通道对端接收
    court <- 1

    // 4. 等待输家出现
    wg.Wait()
}

二、有缓冲的通道

import (
   "sync"
   "fmt"
   "time"
)

// 使用4个goroutine来完成10个任务
const (
   taskNum      = 10
   goroutineNum = 4
)

var countDownLatch sync.WaitGroup

func worker(name string, taskChannel chan string) {
   defer countDownLatch.Done()
   for {
      // 1. 不断的阻塞等待分配工作
      task, ok := <-taskChannel
      if !ok {
         fmt.Printf("channel closed and channel is empty\n")
         return
      }

      //fmt.Printf("worker %s start %s\n", name, task)
      time.Sleep(100 * time.Millisecond)
      fmt.Printf("worker %s complete %s\n", name, task)
   }
}

func main() {
   countDownLatch.Add(goroutineNum)
   // 1. 创建有缓冲区的string channel
   taskChannel := make(chan string, taskNum)

   // 2. 创建 4 个goroutine去干活
   for i := 0; i < goroutineNum; i++ {
      go worker(fmt.Sprintf("worker %d", i), taskChannel)
   }

   // 3. 向通道加入task
   for i := 0; i < taskNum; i++ {
      taskChannel <- fmt.Sprintf("task %d", i)
   }

   // 4. 关闭通道:
   // 当通道关闭后,goroutine 依旧可以从通道接收数据,但是不能再向通道里发送数据。
   // 能够从已经关闭的通道接收数据这一点非常重要,因为这允许通道关闭后依旧能取出其中缓冲的全部值,而不会有数据丢失。
   // 从一个已经关闭且没有数据的通道里获取数据,总会立刻返回,并返回一个通道类型的零值
   close(taskChannel)

   // 5. 等待
   countDownLatch.Wait()
}

当通道关闭后,go routine 依旧可以从通道接收数据,但是不能再向通道里发送数据。能够从已经关闭的通道接收数据这一点非常重要,因为这允许通道关闭后依旧能取出其中缓冲的全部值,而不会有数据丢失。

从一个已经关闭且没有数据的通道里获取数据,总会立刻返回,并返回一个通道类型的零值。

Go 语言入门8 – 多线程

Go routine 基于协程 Coroutine,原理总结:

如果创建一个 goroutine 并准备运行,这个 goroutine 就会被放到调度器的全局运行队列中。之后,调度器就将这些队列中的 goroutine 分配给一个逻辑处理器,并放到这个逻辑处理器对应的本地运行队列中。本地运行队列中的 goroutine 会一直等待直到自己被分配的逻辑处理器执行。

Go routine 机制原理如下图所示:

1556591431-8545-5842684-caf2ddc8d7ed6338

一、示例

import (
   "runtime"
   "sync"
   "fmt"
)

func main() {
   // 1. 分配一个逻辑处理器给调度器使用
   runtime.GOMAXPROCS(1)

   // 2. 设定等待器,类比 Java CountDownLatch
   var waitGroup sync.WaitGroup
   waitGroup.Add(2)

   fmt.Println("=== start ===")
   // 3. 创建第一个 goroutine
   go func() {
      defer waitGroup.Done() // CountDownLatch#countDown()

      // 打印3遍字母表
      for count := 0; count < 3; count++ {
         for char := 'a'; char < 'a'+26; char++ {
            fmt.Printf("%c", char)
         }
      }
   }()

   // 4. 创建第二个 goroutine
   go func() {
      defer waitGroup.Done() // CountDownLatch#countDown()

      // 打印3遍字母表
      for count := 0; count < 3; count++ {
         for char := 'A'; char < 'A'+26; char++ {
            fmt.Printf("%c", char)
         }
      }
   }()

   // 5. 阻塞 main goroutine
   waitGroup.Wait() // CountDownLatch#await()
   fmt.Println("=== end ===")
}

使用 go 关键字创建 Go routine

  • 匿名函数实现方式 go func() {xxx}()
  • 普通函数 funcA 实现方式 go funcA()

二、打断正在运行的 Go routine

  • 基于调度器的内部算法,一个正运行的 go routine 在工作结束前,可以被停止并重新调度。
  • 调度器这样做的目的是防止某个 go routine 长时间占用逻辑处理器。当 go routine 占用时间过长时,调度器会停止当前正运行的 go routine,并给其他可运行的 go routine 运行的机会。

该机制的原理如下图所示:
1556591439-2367-5842684-7023f4c17581356e
步骤:

  • 在第 1 步,调度器开始运行 go routine A,而 go routine B 在运行队列里等待调度。
  • 在第 2 步,调度器交换了 go routine A 和 go routine B。 由于 go routine A 并没有完成工作,因此被放回到运行队列。
  • 在第 3 步,go routine B 完成了它的工作并被系统销毁。这也让 go routine A 继续之前的工作。

注意:上述步骤都是由调度器内部实现的,我们不需要编写代码去实现。

三、设置逻辑处理器数量

    // 为每个物理处理器分配一个逻辑处理器给调度器使用
    runtime.GOMAXPROCS(runtime.NumCPU())

四、竞争状态

如果两个或者多个 goroutine 在没有互相同步的情况下,访问某个共享的资源,并试图同时读和写这个资源,就处于相互竞争的状态,这种情况被称作竞争状态(race candition)。

同一时刻只能有一个 goroutine 对共享资源进行读和写操作

import (
   "runtime"
   "sync"
   "fmt"
)

var (
   // 两个 goroutine 同时操作的变量,竞态变量
   counter     int
   waitGroup sync.WaitGroup
)

func incCount(int) {
   defer waitGroup.Done()
   for count := 0; count < 2; count++ {
      value := counter
      // 当前的 goroutine 主动让出资源,从线程退出,并放回到队列,
      // 让其他 goroutine 进行执行
      runtime.Gosched()
      value ++
      counter = value
   }
}

func main() {
   runtime.GOMAXPROCS(1)
   waitGroup.Add(2)

   go incCount(1)
   go incCount(2)

   waitGroup.Wait()
   fmt.Println(counter) // 正确为4,实际上为2
}

代码执行图:
1556591451-4320-5842684-0609cefc2b7bb6a5

 

五、锁机制

5.1、原子类 atomic

import (
   "runtime"
   "sync/atomic"
)

func incCount(int) {
   defer waitGroup.Done()
   for count := 0; count < 2; count++ {
      // 使用原子类
      atomic.AddInt64(&counter, 1)
      runtime.Gosched()
   }
}

另外两个有用的原子函数是 LoadInt64 和 StoreInt64。这两个函数提供了一种安全地读和写一个整型值的方式

import (
   "sync"
   "time"
   "sync/atomic"
   "fmt"
)

var (
   shutdown  int64
   waitGroup sync.WaitGroup
)

func doWork(name string) {
   defer waitGroup.Done()
   for {
      time.Sleep(250 * time.Millisecond)
      // 记载关机标志
      if atomic.LoadInt64(&shutdown) == 1 {
         fmt.Println("shutDown, ", name)
         break
      }
   }
}

func main() {
   waitGroup.Add(2)

   go doWork("A")
   go doWork("B")

   // 给定goroutine执行的时间
   time.Sleep(1000 * time.Millisecond)

   // 设定关机标志
   atomic.StoreInt64(&shutdown, 1)

   waitGroup.Wait()
}

5.2、互斥锁 mutex

互斥锁用于在代码上创建一个临界区,保证同一时间只有一个 goroutine 可以 执行这个临界区代码

 

var (
    // 两个 goroutine 同时操作的变量,竞态变量
    counter     int
    waitGroup sync.WaitGroup
        // 锁,定义一段临界区
    lock sync.Mutex
)

func incCount(int) {
    defer waitGroup.Done()
    for count := 0; count < 2; count++ {
        lock.Lock()
        { // Lock() 与 UnLock() 之间的代码都属于临界区,{}是可以省略的,加上看起来清晰
            value := counter
            // 当前的 goroutine 主动让出资源,从线程退出,并放回到队列,
            // 让其他 goroutine 进行执行
            // 但是因为锁没有释放,调度器还会继续安排执行该 goroutine
            runtime.Gosched()
            value ++
            counter = value
        }
        lock.Unlock()
        // 释放锁,允许其他正在等待的 goroutine 进入临界区
    }
}
 
Copyright © 2008-2021 lanxinbase.com Rights Reserved. | 粤ICP备14086738号-3 |