Skip to content

并发编程模型

一、并发模型概述

并发 vs 并行

┌─────────────────────────────────────────────────────────────────┐
│                   并发 (Concurrency) vs 并行 (Parallelism)       │
├─────────────────────────────────────────────────────────────────┤
│                                                                 │
│   并发 - 同时处理多件事 (结构)                                    │
│   ┌─────────────────────────────────────────────────────────┐  │
│   │                        单核 CPU                          │  │
│   │                                                          │  │
│   │   时间 ─▶                                                │  │
│   │   ┌──┐  ┌──┐  ┌──┐  ┌──┐  ┌──┐  ┌──┐                   │  │
│   │   │A │  │B │  │A │  │C │  │B │  │A │  ← 时间片轮转      │  │
│   │   └──┘  └──┘  └──┘  └──┘  └──┘  └──┘                   │  │
│   │                                                          │  │
│   │   任务交替执行,看起来"同时"进行                           │  │
│   └─────────────────────────────────────────────────────────┘  │
│                                                                 │
│   并行 - 同时做多件事 (执行)                                     │
│   ┌─────────────────────────────────────────────────────────┐  │
│   │                        多核 CPU                          │  │
│   │                                                          │  │
│   │   时间 ─▶                                                │  │
│   │   Core1: ┌───────────────────────────────┐              │  │
│   │          │           任务 A              │              │  │
│   │          └───────────────────────────────┘              │  │
│   │   Core2: ┌───────────────────────────────┐              │  │
│   │          │           任务 B              │              │  │
│   │          └───────────────────────────────┘              │  │
│   │   Core3: ┌───────────────────────────────┐              │  │
│   │          │           任务 C              │              │  │
│   │          └───────────────────────────────┘              │  │
│   │                                                          │  │
│   │   任务真正同时执行                                        │  │
│   └─────────────────────────────────────────────────────────┘  │
│                                                                 │
│   关系: 并发是并行的前提,并行是并发的一种执行方式                 │
│                                                                 │
└─────────────────────────────────────────────────────────────────┘

主流并发模型

┌─────────────────────────────────────────────────────────────────┐
│                      并发模型对比                                │
├─────────────────────────────────────────────────────────────────┤
│                                                                 │
│   1. 多线程模型 (共享内存)                                       │
│   ┌─────────────────────────────────────────────────────────┐  │
│   │  Thread1   Thread2   Thread3                             │  │
│   │     │         │         │                                │  │
│   │     └─────────┼─────────┘                                │  │
│   │               ▼                                          │  │
│   │        ┌──────────────┐                                  │  │
│   │        │  共享内存     │  ← 需要锁保护                     │  │
│   │        └──────────────┘                                  │  │
│   │  语言: Java, C++, C#                                     │  │
│   │  优点: 直观,性能好                                       │  │
│   │  缺点: 容易死锁、竞态条件                                  │  │
│   └─────────────────────────────────────────────────────────┘  │
│                                                                 │
│   2. CSP 模型 (通信顺序进程)                                     │
│   ┌─────────────────────────────────────────────────────────┐  │
│   │  Goroutine1 ──────▶ Channel ──────▶ Goroutine2          │  │
│   │                                                          │  │
│   │  "Don't communicate by sharing memory;                   │  │
│   │   share memory by communicating."                        │  │
│   │                                                          │  │
│   │  语言: Go                                                │  │
│   │  优点: 简单、安全                                         │  │
│   │  缺点: Channel 本身有开销                                 │  │
│   └─────────────────────────────────────────────────────────┘  │
│                                                                 │
│   3. Actor 模型                                                 │
│   ┌─────────────────────────────────────────────────────────┐  │
│   │    ┌─────────┐         ┌─────────┐                      │  │
│   │    │ Actor A │ ──msg──▶│ Actor B │                      │  │
│   │    │ Mailbox │         │ Mailbox │                      │  │
│   │    └─────────┘         └─────────┘                      │  │
│   │                                                          │  │
│   │  每个 Actor 有自己的状态和邮箱                            │  │
│   │  语言: Erlang, Scala(Akka)                               │  │
│   │  优点: 天然分布式、容错                                   │  │
│   │  缺点: 编程范式转变大                                     │  │
│   └─────────────────────────────────────────────────────────┘  │
│                                                                 │
│   4. 协程/异步模型                                               │
│   ┌─────────────────────────────────────────────────────────┐  │
│   │  async/await, Promise, Future                            │  │
│   │                                                          │  │
│   │  async function fetchData() {                            │  │
│   │      const a = await fetch('/api/a');                    │  │
│   │      const b = await fetch('/api/b');                    │  │
│   │      return process(a, b);                               │  │
│   │  }                                                       │  │
│   │                                                          │  │
│   │  语言: JavaScript, Python, Rust                          │  │
│   │  优点: I/O密集型场景效率高                                │  │
│   │  缺点: 传染性 (async 函数调用也需要 async)                │  │
│   └─────────────────────────────────────────────────────────┘  │
│                                                                 │
└─────────────────────────────────────────────────────────────────┘

二、Go 并发编程

1. Goroutine 与 Channel

go
// 基础使用
func main() {
    ch := make(chan int, 10)  // 带缓冲的 channel

    // 生产者
    go func() {
        for i := 0; i < 100; i++ {
            ch <- i
        }
        close(ch)  // 关闭 channel
    }()

    // 消费者
    for v := range ch {
        fmt.Println(v)
    }
}

// 多个 goroutine 同步
func processItems(items []Item) []Result {
    results := make([]Result, len(items))
    var wg sync.WaitGroup

    for i, item := range items {
        wg.Add(1)
        go func(idx int, it Item) {
            defer wg.Done()
            results[idx] = process(it)
        }(i, item)
    }

    wg.Wait()
    return results
}

2. 并发模式

go
// 扇出扇入 (Fan-out/Fan-in)
func fanOutFanIn(inputs []int) []int {
    numWorkers := runtime.NumCPU()

    // Fan-out: 分发到多个 worker
    inputCh := make(chan int, len(inputs))
    for _, v := range inputs {
        inputCh <- v
    }
    close(inputCh)

    // 每个 worker 的输出 channel
    workerChs := make([]<-chan int, numWorkers)
    for i := 0; i < numWorkers; i++ {
        workerChs[i] = worker(inputCh)
    }

    // Fan-in: 合并所有 worker 的输出
    return collect(merge(workerChs...))
}

func worker(input <-chan int) <-chan int {
    output := make(chan int)
    go func() {
        defer close(output)
        for v := range input {
            output <- process(v)
        }
    }()
    return output
}

func merge(chs ...<-chan int) <-chan int {
    merged := make(chan int)
    var wg sync.WaitGroup

    for _, ch := range chs {
        wg.Add(1)
        go func(c <-chan int) {
            defer wg.Done()
            for v := range c {
                merged <- v
            }
        }(ch)
    }

    go func() {
        wg.Wait()
        close(merged)
    }()

    return merged
}
go
// 工作池模式 (Worker Pool)
type WorkerPool struct {
    workers   int
    taskQueue chan Task
    results   chan Result
    wg        sync.WaitGroup
}

type Task func() Result
type Result struct {
    Value interface{}
    Err   error
}

func NewWorkerPool(workers, queueSize int) *WorkerPool {
    pool := &WorkerPool{
        workers:   workers,
        taskQueue: make(chan Task, queueSize),
        results:   make(chan Result, queueSize),
    }

    // 启动 workers
    for i := 0; i < workers; i++ {
        pool.wg.Add(1)
        go pool.worker()
    }

    return pool
}

func (p *WorkerPool) worker() {
    defer p.wg.Done()
    for task := range p.taskQueue {
        p.results <- task()
    }
}

func (p *WorkerPool) Submit(task Task) {
    p.taskQueue <- task
}

func (p *WorkerPool) Shutdown() {
    close(p.taskQueue)
    p.wg.Wait()
    close(p.results)
}

// 使用示例
func main() {
    pool := NewWorkerPool(10, 100)

    // 提交任务
    for i := 0; i < 50; i++ {
        id := i
        pool.Submit(func() Result {
            time.Sleep(100 * time.Millisecond)
            return Result{Value: id * 2}
        })
    }

    // 收集结果
    go func() {
        for result := range pool.results {
            fmt.Println(result.Value)
        }
    }()

    pool.Shutdown()
}

3. 并发控制

go
// 限制并发数
type Semaphore struct {
    ch chan struct{}
}

func NewSemaphore(n int) *Semaphore {
    return &Semaphore{
        ch: make(chan struct{}, n),
    }
}

func (s *Semaphore) Acquire() {
    s.ch <- struct{}{}
}

func (s *Semaphore) Release() {
    <-s.ch
}

// 使用示例
func processWithLimit(items []Item, limit int) {
    sem := NewSemaphore(limit)
    var wg sync.WaitGroup

    for _, item := range items {
        wg.Add(1)
        sem.Acquire()

        go func(it Item) {
            defer wg.Done()
            defer sem.Release()
            process(it)
        }(item)
    }

    wg.Wait()
}

// 使用 errgroup 处理错误
import "golang.org/x/sync/errgroup"

func processWithErrGroup(ctx context.Context, items []Item) error {
    g, ctx := errgroup.WithContext(ctx)

    // 限制并发数
    g.SetLimit(10)

    for _, item := range items {
        item := item  // 捕获变量
        g.Go(func() error {
            return process(ctx, item)
        })
    }

    return g.Wait()  // 返回第一个错误
}

4. Context 使用

go
// 超时控制
func fetchWithTimeout(url string, timeout time.Duration) ([]byte, error) {
    ctx, cancel := context.WithTimeout(context.Background(), timeout)
    defer cancel()

    req, _ := http.NewRequestWithContext(ctx, "GET", url, nil)
    resp, err := http.DefaultClient.Do(req)
    if err != nil {
        return nil, err
    }
    defer resp.Body.Close()

    return io.ReadAll(resp.Body)
}

// 取消传播
func processWithCancel(ctx context.Context) error {
    // 子 goroutine 监听取消信号
    errCh := make(chan error, 1)

    go func() {
        errCh <- doWork(ctx)
    }()

    select {
    case err := <-errCh:
        return err
    case <-ctx.Done():
        return ctx.Err()
    }
}

// 传递请求相关数据
type contextKey string

const (
    requestIDKey contextKey = "request_id"
    userIDKey    contextKey = "user_id"
)

func middleware(next http.Handler) http.Handler {
    return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
        ctx := r.Context()
        ctx = context.WithValue(ctx, requestIDKey, uuid.New().String())
        ctx = context.WithValue(ctx, userIDKey, r.Header.Get("X-User-ID"))
        next.ServeHTTP(w, r.WithContext(ctx))
    })
}

func getRequestID(ctx context.Context) string {
    if v := ctx.Value(requestIDKey); v != nil {
        return v.(string)
    }
    return ""
}

三、锁与同步原语

1. 锁的类型

go
// 互斥锁 (Mutex)
type SafeCounter struct {
    mu    sync.Mutex
    count int
}

func (c *SafeCounter) Inc() {
    c.mu.Lock()
    defer c.mu.Unlock()
    c.count++
}

// 读写锁 (RWMutex) - 读多写少场景
type SafeMap struct {
    mu   sync.RWMutex
    data map[string]interface{}
}

func (m *SafeMap) Get(key string) (interface{}, bool) {
    m.mu.RLock()
    defer m.mu.RUnlock()
    v, ok := m.data[key]
    return v, ok
}

func (m *SafeMap) Set(key string, value interface{}) {
    m.mu.Lock()
    defer m.mu.Unlock()
    m.data[key] = value
}

2. 原子操作

go
import "sync/atomic"

// 原子计数器 - 比锁更高效
type AtomicCounter struct {
    count int64
}

func (c *AtomicCounter) Inc() int64 {
    return atomic.AddInt64(&c.count, 1)
}

func (c *AtomicCounter) Get() int64 {
    return atomic.LoadInt64(&c.count)
}

// 原子指针 - 实现无锁数据结构
type AtomicValue struct {
    v atomic.Value
}

func (a *AtomicValue) Store(val interface{}) {
    a.v.Store(val)
}

func (a *AtomicValue) Load() interface{} {
    return a.v.Load()
}

// CAS 操作
func (c *AtomicCounter) CompareAndSwap(old, new int64) bool {
    return atomic.CompareAndSwapInt64(&c.count, old, new)
}

// 使用 CAS 实现自旋锁
type SpinLock struct {
    flag int32
}

func (l *SpinLock) Lock() {
    for !atomic.CompareAndSwapInt32(&l.flag, 0, 1) {
        runtime.Gosched()  // 让出 CPU
    }
}

func (l *SpinLock) Unlock() {
    atomic.StoreInt32(&l.flag, 0)
}

3. sync.Once 与 sync.Pool

go
// 单例模式 - 懒加载
var (
    instance *Database
    once     sync.Once
)

func GetDatabase() *Database {
    once.Do(func() {
        instance = connectToDatabase()
    })
    return instance
}

// 对象池 - 减少 GC 压力
var bufferPool = sync.Pool{
    New: func() interface{} {
        return make([]byte, 4096)
    },
}

func processRequest(data []byte) {
    buf := bufferPool.Get().([]byte)
    defer bufferPool.Put(buf)

    // 使用 buf 处理数据
    copy(buf, data)
    // ...
}

// 实际应用: bytes.Buffer 池
var bufPool = sync.Pool{
    New: func() interface{} {
        return new(bytes.Buffer)
    },
}

func formatJSON(v interface{}) string {
    buf := bufPool.Get().(*bytes.Buffer)
    defer func() {
        buf.Reset()
        bufPool.Put(buf)
    }()

    json.NewEncoder(buf).Encode(v)
    return buf.String()
}

4. 避免锁竞争

go
// ❌ 坑点 1: 锁粒度太大
type BadCache struct {
    mu   sync.Mutex
    data map[string]interface{}
}

func (c *BadCache) Get(key string) interface{} {
    c.mu.Lock()
    defer c.mu.Unlock()  // 读操作也加互斥锁
    return c.data[key]
}

// ✅ 解决: 使用读写锁
type GoodCache struct {
    mu   sync.RWMutex
    data map[string]interface{}
}

func (c *GoodCache) Get(key string) interface{} {
    c.mu.RLock()
    defer c.mu.RUnlock()
    return c.data[key]
}


// ❌ 坑点 2: 单一锁保护所有数据
type BadMap struct {
    mu   sync.Mutex
    data map[string]interface{}
}

// ✅ 解决: 分片锁 (Sharded Lock)
const shardCount = 32

type ShardedMap struct {
    shards [shardCount]*shard
}

type shard struct {
    mu   sync.RWMutex
    data map[string]interface{}
}

func NewShardedMap() *ShardedMap {
    m := &ShardedMap{}
    for i := 0; i < shardCount; i++ {
        m.shards[i] = &shard{
            data: make(map[string]interface{}),
        }
    }
    return m
}

func (m *ShardedMap) getShard(key string) *shard {
    hash := fnv.New32a()
    hash.Write([]byte(key))
    return m.shards[hash.Sum32()%shardCount]
}

func (m *ShardedMap) Get(key string) (interface{}, bool) {
    shard := m.getShard(key)
    shard.mu.RLock()
    defer shard.mu.RUnlock()
    v, ok := shard.data[key]
    return v, ok
}

func (m *ShardedMap) Set(key string, value interface{}) {
    shard := m.getShard(key)
    shard.mu.Lock()
    defer shard.mu.Unlock()
    shard.data[key] = value
}

四、无锁编程

1. Lock-Free 队列

go
// 无锁队列 (基于 CAS)
type LockFreeQueue struct {
    head unsafe.Pointer
    tail unsafe.Pointer
}

type node struct {
    value interface{}
    next  unsafe.Pointer
}

func NewLockFreeQueue() *LockFreeQueue {
    n := unsafe.Pointer(&node{})
    return &LockFreeQueue{head: n, tail: n}
}

func (q *LockFreeQueue) Enqueue(value interface{}) {
    newNode := &node{value: value}
    newPtr := unsafe.Pointer(newNode)

    for {
        tail := atomic.LoadPointer(&q.tail)
        tailNode := (*node)(tail)
        next := atomic.LoadPointer(&tailNode.next)

        if tail == atomic.LoadPointer(&q.tail) {
            if next == nil {
                // 尝试将新节点链接到队尾
                if atomic.CompareAndSwapPointer(&tailNode.next, nil, newPtr) {
                    // 更新 tail 指针
                    atomic.CompareAndSwapPointer(&q.tail, tail, newPtr)
                    return
                }
            } else {
                // tail 落后了,帮助推进
                atomic.CompareAndSwapPointer(&q.tail, tail, next)
            }
        }
    }
}

func (q *LockFreeQueue) Dequeue() (interface{}, bool) {
    for {
        head := atomic.LoadPointer(&q.head)
        tail := atomic.LoadPointer(&q.tail)
        headNode := (*node)(head)
        next := atomic.LoadPointer(&headNode.next)

        if head == atomic.LoadPointer(&q.head) {
            if head == tail {
                if next == nil {
                    return nil, false  // 队列为空
                }
                // tail 落后,帮助推进
                atomic.CompareAndSwapPointer(&q.tail, tail, next)
            } else {
                value := (*node)(next).value
                if atomic.CompareAndSwapPointer(&q.head, head, next) {
                    return value, true
                }
            }
        }
    }
}

2. Copy-on-Write

go
// COW Map - 读多写少场景
type COWMap struct {
    v atomic.Value  // 存储 map[string]interface{}
}

func NewCOWMap() *COWMap {
    m := &COWMap{}
    m.v.Store(make(map[string]interface{}))
    return m
}

func (m *COWMap) Get(key string) (interface{}, bool) {
    data := m.v.Load().(map[string]interface{})
    v, ok := data[key]
    return v, ok
}

func (m *COWMap) Set(key string, value interface{}) {
    // 复制整个 map
    for {
        oldData := m.v.Load().(map[string]interface{})
        newData := make(map[string]interface{}, len(oldData)+1)
        for k, v := range oldData {
            newData[k] = v
        }
        newData[key] = value

        // 原子替换
        if m.v.CompareAndSwap(oldData, newData) {
            return
        }
    }
}

// 适用场景: 配置中心、路由表等读多写少的场景

五、并发陷阱

常见问题

go
// ❌ 陷阱 1: goroutine 泄漏
func leak() {
    ch := make(chan int)
    go func() {
        val := <-ch  // 永远阻塞,因为没人发送
        fmt.Println(val)
    }()
    // 函数返回,ch 和 goroutine 都泄漏了
}

// ✅ 解决: 使用 context 取消
func noLeak(ctx context.Context) {
    ch := make(chan int)
    go func() {
        select {
        case val := <-ch:
            fmt.Println(val)
        case <-ctx.Done():
            return
        }
    }()
}


// ❌ 陷阱 2: 在循环中捕获迭代变量
func badCapture() {
    for i := 0; i < 5; i++ {
        go func() {
            fmt.Println(i)  // 可能全部打印 5
        }()
    }
}

// ✅ 解决: 通过参数传递
func goodCapture() {
    for i := 0; i < 5; i++ {
        go func(n int) {
            fmt.Println(n)  // 正确打印 0-4
        }(i)
    }
}


// ❌ 陷阱 3: 忘记关闭 channel
func badClose() <-chan int {
    ch := make(chan int)
    go func() {
        for i := 0; i < 10; i++ {
            ch <- i
        }
        // 忘记 close(ch)
    }()
    return ch
}

// 消费者会永远阻塞
for v := range badClose() {  // 死循环
    fmt.Println(v)
}


// ❌ 陷阱 4: 数据竞争
type BadCounter struct {
    count int  // 没有保护
}

func (c *BadCounter) Inc() {
    c.count++  // 数据竞争!
}

// ✅ 检测: go run -race main.go
// ✅ 解决: 使用 atomic 或 mutex


// ❌ 陷阱 5: select 中的 default 导致忙等
func badSelect(ch chan int) {
    for {
        select {
        case v := <-ch:
            process(v)
        default:
            // CPU 100%!
        }
    }
}

// ✅ 解决: 使用 time.Sleep 或去掉 default
func goodSelect(ch chan int) {
    for {
        select {
        case v := <-ch:
            process(v)
        case <-time.After(100 * time.Millisecond):
            // 超时处理
        }
    }
}

六、检查清单

并发设计检查

  • [ ] 是否选择了合适的并发模型?
  • [ ] goroutine 生命周期是否可控?
  • [ ] 是否有 goroutine 泄漏的风险?
  • [ ] 并发数是否有限制?

同步机制检查

  • [ ] 锁的粒度是否合适?
  • [ ] 是否使用了合适的锁类型(mutex vs rwmutex)?
  • [ ] 是否可以使用原子操作代替锁?
  • [ ] 是否存在死锁风险?

数据竞争检查

  • [ ] 是否运行过 race detector?
  • [ ] 共享数据是否都有保护?
  • [ ] channel 是否正确关闭?
  • [ ] context 是否正确传递和取消?

💬 讨论

使用 GitHub 账号登录后即可参与讨论

基于 MIT 许可发布