Go Goroutine

Go 并发

Goroutine

协程,轻量级线程,包括匿名协程和命名协程

func main() {
	go test1()

	go func() {
		fmt.Println("Hello")
	}()
}

func test1() {
	fmt.Println("Hello")
}

无法获取协程结束标识

互斥

协程之间通过Channel进行通信

单任务

func test2() {
  done := make(chan bool)

	go func(){
		fmt.Println("Hello")
		done <- true
 	}()

	<-done
}

无缓冲Channel,接收动完成生在对Channel的发送完成之前。

多任务

Channel

func test3() {
	done := make(chan bool, 3)

	for i := 0; i < cap(done); i++ {
		go func(){
			fmt.Println("Hello")
			done <- true
		}()
	}

	for i := 0; i < cap(done); i++ {
		<-done
	}
}

带缓冲Channel,第K个接收完成发生在第K+C个发送完成之前,其中C是Channel的缓存大小。

WaitGroup

func test4() {
	var wg sync.WaitGroup

	for i := 0; i < 3; i++ {
		wg.Add(1)

		go func() {
			fmt.Println("Hello")
			wg.Done()
		}()
	}

	wg.Wait()
}

sync.WaitGroup等待一组事件

  • wg.Add(1)用于增加等待事件的个数,必须确保在后台线程启动之前执行
  • wg.Done()表示完成一个事件
  • wg.Wait()是等待全部的事件完成

并发

限流

func test5() {
	var limit = make(chan int, 2)
	for i := 0; i < 10; i++ {
		go func(index int) {
			limit <- 1
			fmt.Printf("Hello %d\n", index)
			<-limit
		}(i)
	}
}

通过带缓冲Channel实现

超时

select

func main() {
	cancel := make(chan bool)

	for i := 0; i < 10; i++ {
		go test6(cancel)
	}

	time.Sleep(time.Second)
	close(cancel)
}

func test6(cannel chan bool) {
	for {
		select {
		default:
			fmt.Println("hello")
			// 正常工作
		case <-cannel:
			// 退出
		}
	}
}

当select有多个分支时,会随机选择一个可用的管道分支,如果没有可用的则选择default分支,否则会一直保存阻塞状态。

close关闭一个管道,所有从关闭管道接收的操作均会收到一个零值和一个可选的失败标志,用来实现广播的效果。

退出缺少清理动作

WaitGroup

func main() {
	cancel := make(chan bool)

	var wg sync.WaitGroup
	for i := 0; i < 10; i++ {
		wg.Add(1)
		go test7(&wg, cancel)
	}

	time.Sleep(time.Second)
	close(cancel)
	wg.Wait()
}

func test7(wg *sync.WaitGroup, cannel chan bool) {
	defer wg.Done()

	for {
		select {
		default:
			fmt.Println("hello")
		case <-cannel:
			return
		}
	}
}

close广播结束信号

defer wg.Done(),完成清理动作后,退出任务

context

func main() {
	ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)

	var wg sync.WaitGroup
	for i := 0; i < 10; i++ {
		wg.Add(1)
		go test8(ctx, &wg)
	}

	time.Sleep(time.Second)
	cancel()

	wg.Wait()
}

func test8(ctx context.Context, wg *sync.WaitGroup) error {
	defer wg.Done()

	for {
		select {
		default:
			fmt.Println("hello")
		case <-ctx.Done():
			return ctx.Err()
		}
	}
}

context指定超时,cancel()广播结束信号,case <-ctx.Done()获取结束信号

defer wg.Done(),完成清理动作后,退出任务

模型

生产者->消费者

采用单个Channel作为消息缓冲队列

func main() {
	ch := make(chan int, 64)

	go Producer(3, ch)
	go Producer(5, ch)
	go Consumer(ch)

	time.Sleep(3 * time.Second)
}

func Producer(factor int, out chan<- int) {
	for i := 0; ; i++ {
		out <- i * factor
	}
}

func Consumer(in <-chan int) {
	for v := range in {
		fmt.Println(v)
	}
}

通过带缓冲Channel实现

发布->订阅

采用多个Channel外加过滤器作为消息缓冲队列

func main() {
	p := NewPublisher(time.Second, 10)

	all := p.Subscribe()
	golang := p.SubscribeTopic(func(v interface{}) bool {
		if s, ok := v.(string); ok {
			return strings.Contains(s, "golang")
		}
		return false
	})

	p.Publish("hello, world!")
	p.Publish("hello, golang!")

	var wg sync.WaitGroup
	wg.Add(2)

	go func() {
		for msg := range all {
			v, _ := msg.(string)
			fmt.Printf("topic all: %s\n", v)
		}
		wg.Done()
	}()

	go func() {
		for msg := range golang {
			v, _ := msg.(string)
			fmt.Printf("topic golang: %s\n", v)
		}
		wg.Done()
	}()

	p.Close()
	wg.Wait()
}

type (
	Subscriber chan interface{}
	TopicFunc func(v interface{}) bool
)

type Publisher struct {
	m           sync.RWMutex
	subscribers map[Subscriber]TopicFunc
	buffer      int
	timeout     time.Duration
}

func NewPublisher(publishTimeout time.Duration, buffer int) *Publisher {
	return &Publisher{
		buffer:      buffer,
		timeout:     publishTimeout,
		subscribers: make(map[Subscriber]TopicFunc),
	}
}

func (p *Publisher) Subscribe() Subscriber {
	return p.SubscribeTopic(nil)
}

func (p *Publisher) SubscribeTopic(topic TopicFunc) Subscriber {
	ch := make(Subscriber, p.buffer)
	p.m.Lock()
	p.subscribers[ch] = topic
	p.m.Unlock()

	return ch
}

func (p *Publisher) Evict(sub Subscriber) {
	p.m.Lock()
	defer p.m.Unlock()

	delete(p.subscribers, sub)
	close(sub)
}

func (p *Publisher) Publish(v interface{}) {
	p.m.RLock()
	defer p.m.RUnlock()

	var wg sync.WaitGroup
	for sub, topic := range p.subscribers {
		wg.Add(1)
		go p.sendTopic(sub, topic, v, &wg)
	}

	wg.Wait()
}

func (p *Publisher) Close() {
	p.m.Lock()
	defer p.m.Unlock()

	for sub := range p.subscribers {
		delete(p.subscribers, sub)
		close(sub)
	}
}

func (p *Publisher) sendTopic(sub Subscriber, topic TopicFunc, v interface{}, wg *sync.WaitGroup) {
	defer wg.Done()

	if topic != nil && !topic(v) {
		return
	}

	select {
	case sub <- v:
	case <-time.After(p.timeout):
	}
}

通过Channel作为消息管道

通过TopicFunc自定义函数作为消息过滤器

参考: