golangは、goroutine/channel を使って並行処理が簡単に実行できるので、重たい処理を非同期で実行しておくのは、非常に簡単です。ただし、並列処理するためのスレッドをいくらでも立てられるが、その立てスレッドが終わったのかを把握するためには、ちゃんと設計する必要があります。今回は、非同期処理で重たい処理を行っているプログラムがSIGTERMを受信した際に、安全に処理を継続して停止する仕組みについて、紹介します。

きっかけ

DockerAPI自体も終了時にSIGTERMを送ってくるので、そのSIGTERMハンドラを実装しないとイケないなということとで調べたのがきっかです。

Graceful shutdownとは

Graceful shutdownとは、一般的に以下の停止動作を指します。

  • 停止要求(例えば、SIGTERM)受信後に、新しい処理リクエストを受け付けないようにする
  • 停止要求受信時に処理中の処理は、完了するまで継続する

例えば、バッチ処理や動画コンバート、DB更新などを伴う処理の場合は、処理中にアプリーケーションが中止してしまった場合、中途半端な状態に陥ってしまいます。結果の整合がとれる形でアプリーケーションが終了するようにしましょうというのが、Graceful shutdownです。

Dispatcher-Workerパターンとは

Dispatch-Workerパターンは、プロセス間通信で利用されるデザインパターンです。プロセス間通信では、タスク(やってほしい処理)を依頼するための、メッセージキューを経由して、別のworkerスレッドがタスクを処理するという形で実行します。Dispatcher-Workerパターンでは、そのWorkerへ指示を出す人にDispatcherという専用の人をおき、その処理で必要に応じて、空いているworkerへタスクを配るという形式です。図で表すと下記のような形になります。

間にDispacterがいることで、起動するWorkerスレッドの上限を制御できます。

この2つの処理を組み合わせたgo言語での実装例

goでは、goroutineという軽量スレッド、メッセージのやりとりにchannelが利用できます。下記にDispacherとWorkerの実装例を示します。全体は下記のような形です。

package main

import (
	"fmt"
	"os"
	"os/signal"
	"sync"
	"syscall"
	"time"
)

type (
    // Dispatcher
    Dispatcher struct {
        pool  chan *worker
        queue chan interface{}   // メッセージキュー
        workers []*worker
        wg      sync.WaitGroup
        quit    chan struct{}
    }

    // Worker
    worker struct {
        dispatcher *Dispatcher
        data chan interface{}
        quit chan struct{}
    }
)

func (d *Dispatcher) Start() {
    // ワーカを起動して処理まちの状態にしておく
    for _, w := range d.workers {
        w.start()
    }

    go func() {
        for {
            select {
                // メッセージがキューイングされた場合
            case v := <-d.queue:
                //  処理できるまworkerを取得する
                worker := <-d.pool
                // リクエスト内容をworkerへ転送する
                worker.data <- v
            case <-d.quit:
                return
            }
        }
    }()
}

// adds a given value to the queue of the dispatcher.
func (d *Dispatcher) Add(v interface{}) {
    // キューイングされた場合に処理を待機するために WaitGroup をカウントアップ
    d.wg.Add(1)
    d.queue <- v
}

// waits for the dispatcher to exit. It must have been started by Start.
func (d *Dispatcher) Wait() {
    d.wg.Wait()
}

func (w *worker) start() {
    go func() {
        for {
            // dispatcherに処理が受けつけられることを通知する
            w.dispatcher.pool <- w

            select {
            // メッセージがキューイングされた場合、 v にメッセージを設定
            case v := <-w.data:
                if str, ok := v.(string); ok {
                    // 5分間、適当に処理する
                    for i := 0; i < 30; i++ {
                        message := fmt.Sprintf("%v Working... %v", i, str)
                        fmt.Println(message)
                        time.Sleep(10 * time.Second)
                    }
                }

                // WaitGroupのカウントダウン
                w.dispatcher.wg.Done()

            case <-w.quit:
                return
            }
        }
    }()
}

func NewDispatcher(workerMax int, queueMessageMax int) *Dispatcher {
    d := &Dispatcher {
        pool: make(chan *worker, workerMax),
        queue: make(chan interface{}, queueMessageMax),
        quit: make(chan struct{}),
    }

    d.workers = make([]*worker, cap(d.pool))
    for i := 0; i < cap(d.pool); i++ {
        w := worker{
            dispatcher: d,
            data:       make(chan interface{}),
            quit:       make(chan struct{}),
        }
        d.workers[i] = &w
    }
    return d
}

メッセージを積むところからメッセージを取り出す流れ

利用者がメッセージを積む処理は、Addです。Addすることで、メッセージキュー(queue)へ書き込みが行われ、Dipatcherとして別途立てた別スレッド内で書き込みまちしている箇所(case v := <-d.queue)でメッセージを受信します。selectでは、queueに書き込まれるまでブロックされ、メッセージ受信するとメッセージを取り出します。

取り出しメッセージをworker配ってやってもらう流れ

現在処理を受け付けることができるworker(=IDLE状態)のものを取り出し(worker := <-d.pool)し、リクエスト内容をworkerへ送信(worker.data <- v)することで、worker側は、メッセージを受信(case v := <-w.data:)して、必要な処理を行います。

利用する側

下記のように必要なタイミングでDispacherのAddを呼ぶだけです。処理完了もWait()をコールするだけです。

    go func() {
        for i := 0; i < 10; i++ {
            dumyMessage := fmt.Sprintf("Mesage:%v", i)
            d.Add(dumyMessage)
        }
    }()

〜
   // 処理完了をまつ
   d.Wait()

SIGTERMを受け取る方法

Linuxのシステムプログラミング等でよく出てくるSIGTERMですが、Goでは、signal.Notifyで受け取るチャンネルを指定します。イベント発生するとそのチャンネル経由でイベント発生を検知できます。

SIGTERMのハンドリング、Dispatcher-Workerパターンを組み合わせると

package main

import (
	"fmt"
	"os"
	"os/signal"
	"sync"
	"syscall"
	"time"
)

type (
    // Dispatcher
    Dispatcher struct {
        pool  chan *worker
        queue chan interface{}   // メッセージキュー
        workers []*worker
        wg      sync.WaitGroup
        quit    chan struct{}
    }

    // Worker
    worker struct {
        dispatcher *Dispatcher
        data chan interface{}
        quit chan struct{}
    }
)

func (d *Dispatcher) Start() {
    // ワーカを起動して処理まちの状態にしておく
    for _, w := range d.workers {
        w.start()
    }

    go func() {
        for {
            select {
                // メッセージがキューイングされた場合
            case v := <-d.queue:
                //  処理できるまworkerを取得する
                worker := <-d.pool
                // リクエスト内容をworkerへ転送する
                worker.data <- v
            case <-d.quit:
                return
            }
        }
    }()
}

// Add adds a given value to the queue of the dispatcher.
func (d *Dispatcher) Add(v interface{}) {
    // キューイングされた場合に処理を待機するために WaitGroup をカウントアップ
    d.wg.Add(1)
    d.queue <- v
}

// Wait waits for the dispatcher to exit. It must have been started by Start.
func (d *Dispatcher) Wait() {
    d.wg.Wait()
}

func (w *worker) start() {
    go func() {
        for {
            // dispatcherに処理が受けつけられることを通知する
            w.dispatcher.pool <- w

            select {
            // メッセージがキューイングされた場合、 v にメッセージを設定
            case v := <-w.data:
                if str, ok := v.(string); ok {
                    // 5分間、適当に処理する
                    for i := 0; i < 30; i++ {
                        message := fmt.Sprintf("%v Working... %v", i, str)
                        fmt.Println(message)
                        time.Sleep(10 * time.Second)
                    }
                }

                // WaitGroupのカウントダウン
                w.dispatcher.wg.Done()

            case <-w.quit:
                return
            }
        }
    }()
}

func NewDispatcher(workerMax int, queueMessageMax int) *Dispatcher {
    d := &Dispatcher {
        pool: make(chan *worker, workerMax),
        queue: make(chan interface{}, queueMessageMax),
        quit: make(chan struct{}),
    }

    d.workers = make([]*worker, cap(d.pool))
    for i := 0; i < cap(d.pool); i++ {
        w := worker{
            dispatcher: d,
            data:       make(chan interface{}),
            quit:       make(chan struct{}),
        }
        d.workers[i] = &w
    }

    return d
}


func main() {
    d := NewDispatcher(3, 1000)
    d.Start()

    shutdown := make(chan int)
    sigChannel := make(chan os.Signal, 1)

    go func() {
        for i := 0; i < 10; i++ {
            dumyMessage := fmt.Sprintf("Mesage:%v", i)
            d.Add(dumyMessage)
        }
    }()


    signal.Notify(sigChannel, os.Interrupt, syscall.SIGTERM)
    go func() {
        <- sigChannel
        fmt.Println("Shut down...")
        d.Wait()
        fmt.Println("Shut down... wait finish")
        shutdown <- 1
    }()
    <-shutdown
}

上の例では、SIGTERMを受信した際に、処理中の処理が完了するのを待っています。リクエストの部分は、このSIGTERMを受け取ったタイミング等で、ハンドラを解除するなどすることでリクエストも受け取らなくなるので、このような形でGraceful shutdownも実現できます。

おわりに

goでは、 goroutineでスレッド化が容易なことから安易な実装がされているケースもたまに見かけます。今回紹介したDispatcher-Workerパターンは、workerの数が制限できる為、スレッド数が無限に増えてしまう等の問題も回避できます。プロセス間通信を導入する際には、検討してみてはいかがでしょうか。