【OSS探訪記】hq

背景・モチベーション

Goで並行処理について公式ドキュメントや書籍を読んだが、実際にどのように使われているのか、そしてどのようなプロダクトがあるのか気になった。そこで、awesome-goを見て並行処理をしていそうなOSSを読んでみようと思う。
ざっくり気ままに読んでいきたいので誤読していたり大事なところを読んでいなかったりするかもしれない。OSSの紹介ではなく自己満足でただ読んでいくだけなので、解説についてはそこまで期待しないで頂ければと思う。気になった方が自分も読んでみようかなと思ってもらえると嬉しい。

対象OSS

今回はhqを読む。(さっそくawesome-goに含まれていないOSSなのだが、以前に作者の記事を読んでいて非常に面白かったのでコードも読んでみた)

github.com

これは何をするソフトウェア?

hqはジョブキューのライブラリ。詳細は作者が解説した記事があるのでそれを読むのが正確で理解は早い。あと、この記事は作者の解説記事を読んでいることはある程度前提にして話が進むので、流し読みせずに読まれる方は解説記事を一読された方が良いだろうと思う。

kohkimakimoto.hatenablog.com

hqは下記の特徴がある。

・Goによる実装で、シングルバイナリ

スタンドアロンのHTTP APIサーバー。ジョブのデータベースも読み込みであるため、別途特別な依存を必要としないで動作する

・シンプルでプログラミング言語非依存。HTTP APIでジョブを投入し、ジョブはHTTP POSTメッセージをワーカーアプリケーション(Webアプリ)に送信するというアーキテクチャ

・フロントエンドとしてCLIとWebUIを組み込みでサポート

処理の流れはこんな感じ。

HTTP APIでジョブ(JSON)を投入します。HQはジョブを取り出し、ジョブに記載されたURLにHTTP POSTして、別途用意されたワーカー用のWebアプリケーションにジョブを実行させる、という流れになっています。

解説記事がめっちゃわかりやすい。いきなりREADMEや実装を読む前に解説があると非常に助かる。

実装を追う

ジョブキューとしてのおおまかな流れはわかったので、下記の実装を見てみようと思う。

  • Webサーバとして起動している箇所
  • ジョブをenqueueしている箇所
  • キューの構造
  • ジョブをdequeueしている箇所
  • ジョブの構造

Webサーバの起動

サーバの起動なのでそこまでmain関数から離れて定義されてはいないだろう。ということでmain関数を発見。mainの中でrealMainを呼び出している。

func main() {
    os.Exit(realMain())
}

https://github.com/kohkimakimoto/hq/blob/master/cmd/hq/hq.go#L11-L13

realMainの中ではurfave/cliを使ってCLIアプリとしてコマンドの設定を行いコマンドを実行している。設定に従いapp.Run(os.Args)でコマンドが実行される。

   app := cli.NewApp()
    app.Name = hq.Name
    app.HelpName = hq.Name
    app.Version = hq.Version + " (" + hq.CommitHash + ")"
    app.Usage = "Simplistic job queue engine"
    app.Commands = command.Commands

    if err := app.Run(os.Args); err != nil {
        printError(err)
        status = 1
    }

https://github.com/kohkimakimoto/hq/blob/master/cmd/hq/hq.go#L23-L33

コマンドの一覧は下記。ServeCommandがサーバの起動だろう。

// Command set
var Commands = []cli.Command{
    DeleteCommand,
    InfoCommand,
    ListCommand,
    PushCommand,
    RestartCommand,
    ServeCommand,
    StatsCommand,
    StopCommand,
}

https://github.com/kohkimakimoto/hq/blob/master/command/commands.go#L13-L23

さて、ServeCommandとは何なのか。定義に飛んでみるとserverActionがハンドラとして登録されており、serverActionではListenAndServe()を呼び出しサーバを起動していることがわかる。その過程でserver.NewApp(config)Appを作っているようだ。

var ServeCommand = cli.Command{
    Name:   "serve",
    Usage:  "Starts the HQ server process",
    Action: serverAction,
    Flags: []cli.Flag{
        configFileFlag,
        logLevelFlag,
    },
}

func serverAction(ctx *cli.Context) error {
    config := server.NewConfig()

    if err := loadServerConfigFiles(ctx, config); err != nil {
        return err
    }

    applyLogLevel(ctx, config)

    app := server.NewApp(config)
    defer app.Close()

    return app.ListenAndServe()
}

https://github.com/kohkimakimoto/hq/blob/master/command/serve.go#L11-L34

NewAppを見るとAppという構造にEchoがラップされている。つまりWebサーバの実態はEchoだ。

func NewApp(config ...*Config) *App {
    var c *Config
    if len(config) == 0 {
        c = NewConfig()
    } else {
        c = config[0]
    }

    // create app instance
    app := &App{
        Config:  c,
        Echo:    echo.New(),
        DataDir: c.DataDir,
    }

    app.Echo.HideBanner = true
    app.Echo.HidePort = true
    app.Echo.Server.Addr = app.Config.Addr

    return app
}

https://github.com/kohkimakimoto/hq/blob/master/server/app.go#L58-L78

ついでなのでAppがどういう構造なのかも見てみる。ほとんど設定関連だがDB *bolt.DBStore *StoreQueueManager *QueueManagerなどがある。(GenはジョブのIDを採番するためのもの。本筋とそこまで関わらないのでここで補足しておく)

type App struct {
    // Configuration of the application instance
    Config *Config
    // Logger
    Logger echo.Logger
    // LogfileWriter
    LogfileWriter reopen.Writer
    // LogLevel
    LogLevel log.Lvl
    // Echo web framework
    Echo *echo.Echo
    // AccessLog
    AccessLogFile *os.File
    // AccessLogFile
    AccessLogFileWriter reopen.Writer
    // DataDir
    DataDir string
    // UseTempDataDir
    UseTempDataDir bool
    // DB
    DB *bolt.DB
    // Store
    Store *Store
    // Background
    Background *Background
    // katsubushi
    Gen katsubushi.Generator
    // QueueManager
    QueueManager *QueueManager
}

https://github.com/kohkimakimoto/hq/blob/master/server/app.go#L27-L56

Appがわかったので先ほどのListenAndServeの中身を見ていく。ちょっと行数が多いが基本的にはEchoの起動設定と起動など。ジョブキューとしての本筋と関連しそうなのはメソッドの先頭で行っているapp.Open()だ。

func (app *App) ListenAndServe() error {
    // open resources such as log files, database, temporary directory, etc.
    if err := app.Open(); err != nil {
        return err
    }

https://github.com/kohkimakimoto/hq/blob/master/server/app.go#L203-L207

Openでは先ほどのスニペットのコメントにある通り、Echo以外で必要なリソースの初期化処理を行っている。100行近いコードなので抜粋。下記のように初期化してAppに代入している。QueueManagerについては起動も同時に行っている。これでジョブを受け付け実効する準備が完了するのだろう。

   // setup bolt database
    db, err := bolt.Open(app.BoltDBPath(), 0600, nil)
    if err != nil {
        return err
    }
    app.DB = db
    logger.Infof("Opened boltdb: %s", db.Path())

    // store
    app.Store = &Store{
        app:    app,
        db:     db,
        logger: logger,
    }

    if err := app.Store.Init(); err != nil {
        return err
    }

    // queue
    app.QueueManager = NewQueueManager(app)
    app.QueueManager.Start()

https://github.com/kohkimakimoto/hq/blob/master/server/app.go#L137-L158

ということで、コマンドでserveを渡すとEchoのWebサーバが起動するようだ。

Enqueue

次にこのWebサーバでどのようにEnqueueしているのかを見ていく。これはREADME.mdに記載があり、/jobに対してPOSTするとのこと。 https://github.com/kohkimakimoto/hq/blob/master/README.md#post-job

ではさっそくPOST /jobを探そう。サーバに設定されているハンドラの一覧は下記。POST /jobに対応するのはCreateJobHandlerだ。

func setupAPIHandlers(e *echo.Echo, prefix string) {
    e.Any(prefix, InfoHandler)
    e.GET(prefix+"stats", StatsHandler)
    e.POST(prefix+"job", CreateJobHandler)
    e.GET(prefix+"job", ListJobsHandler)
    e.GET(prefix+"job/:id", GetJobHandler)
    e.DELETE(prefix+"job/:id", DeleteJobHandler)
    e.POST(prefix+"job/:id/stop", StopJobHandler)
    e.POST(prefix+"job/:id/restart", RestartJobHandler)
}

https://github.com/kohkimakimoto/hq/blob/master/server/app.go#L58-L78

CreateJobHandlerの中でリクエストの中身からJobを作ってapp.QueueManager.EnqueueAsync(job)という記述がある。また、app.Store.CreateJobという記述もある。解説記事に書いてあるが、このジョブキューはメモリとKVSのどちらにもジョブを登録するので、これらはその話と対応する処理なのだろう。メソッド全部を載せるにはやや長いので抜粋。

   job := &hq.Job{}
    job.ID = id
    job.CreatedAt = katsubushi.ToTime(id)
    job.Name = req.Name
    job.Comment = req.Comment
    job.URL = req.URL
    job.Payload = req.Payload
    job.Headers = req.Headers
    job.Timeout = req.Timeout

    if err := app.Store.CreateJob(job); err != nil {
        return err
    }

    app.QueueManager.EnqueueAsync(job)

https://github.com/kohkimakimoto/hq/blob/master/server/handlers.go#L50-L64

まずapp.Store.CreateJobを見てみる。メソッドの最初の行に以下のように書いてある。s.dbUpdateに渡している関数のtx *bolt.Txから、Storeboltのクライアントとして定義されていそうな予感。

   return s.db.Update(func(tx *bolt.Tx) error {

https://github.com/kohkimakimoto/hq/blob/master/server/store.go#L74

Storeの定義に飛んでみるとやはりそうだった。app.Store.CreateJobの中で色々やっていそうだけど、boltにジョブを突っ込んでいる処理だとは思うので、一旦これで理解を留めて先に進む。

type Store struct {
    app    *App
    db     *bolt.DB
    logger echo.Logger
}

https://github.com/kohkimakimoto/hq/blob/master/server/store.go#L15-L19

先ほどのCreateJobHandlerapp.QueueManager.EnqueueAsyncに話を戻そう。呼び出しているEnqueueAsyncの定義を読むと、これはジョブをキューに登録しているものだ。また、同時にgoroutineでQueueという変数のchanにjobを積んでいることがわかる。m.RegisterWaitingJob(job)はいまいちわからないので読む必要がありそう。

func (m *QueueManager) EnqueueAsync(job *hq.Job) {
    m.RegisterWaitingJob(job)

    go func() {
        m.Queue <- job
    }()
}

https://github.com/kohkimakimoto/hq/blob/master/server/queue.go#L51-L57

RegisterWaitingJobがやってることはmapにジョブを登録する処理だった。ジョブは待機ジョブとして登録される様子。

func (m *QueueManager) RegisterWaitingJob(job *hq.Job) {
    m.statusMutex.Lock()
    defer m.statusMutex.Unlock()

    m.WaitingJobs[job.ID] = &WaitingJob{
        Job: job,
    }
}

https://github.com/kohkimakimoto/hq/blob/master/server/queue.go#L79-L86

ついでなので、RegisterRunningJobRemoveRunningJobも見ておく。やってることはジョブのステータス変更だが、Mapで管理しているので、各Mapから該当するIDのデータをdeleteしている。

func (m *QueueManager) RegisterRunningJob(job *hq.Job, cancel context.CancelFunc) {
    m.statusMutex.Lock()
    defer m.statusMutex.Unlock()

    m.RunningJobs[job.ID] = &RunningJob{
        Job:    job,
        Cancel: cancel,
    }

    // remove waiting jobs
    delete(m.WaitingJobs, job.ID)
}

func (m *QueueManager) RemoveRunningJob(job *hq.Job) {
    m.statusMutex.Lock()
    defer m.statusMutex.Unlock()

    delete(m.RunningJobs, job.ID)
}

https://github.com/kohkimakimoto/hq/blob/master/server/queue.go#L59-L77

ここまでで、boltにデータを格納しつつQueueManagerQueueというchanにジョブを積む、という流れがわかった。作者の解説記事にもあるが、途中でプロセスが死んでもboltから取り出せるのでジョブをロストしないようにするための設計だ。

Queueの構造

では、さっきから度々登場していたQueueManagerの定義を確認する。先ほどまではEnqueueを見ていたが、ここからはDequeueのための定義も含まれている。

type QueueManager struct {
    App         *App
    Queue       chan *hq.Job
    Dispatchers []*Dispatcher
    WorkerWg    *sync.WaitGroup

    // job status
    statusMutex *sync.Mutex
    WaitingJobs map[uint64]*WaitingJob
    RunningJobs map[uint64]*RunningJob
}

https://github.com/kohkimakimoto/hq/blob/master/server/queue.go#L9-L19

Dispatcherは名前からして明らかにDequeue関連なので一旦後回し。他にはWatingJobRunningJobが気になる。これらの定義を見てみるとJobをラップしたものだった。これらの構造体でラップすることでJobのステータスを表しているようだ。RunningJobの方はジョブとCancelを抱き合わせているので取り回しが便利そう。

type WaitingJob struct {
    Job *hq.Job
}

type RunningJob struct {
    Job    *hq.Job
    Cancel context.CancelFunc
}

https://github.com/kohkimakimoto/hq/blob/master/server/queue.go#L114-L121

キューの構造自体はシンプルだった。状態毎のMapにジョブのIDで登録・削除をしていく。sync.Mapを使えばMap操作前後でロックの確保・開放の処理が必要なくなるのかな、と思うなどした。けれどこれは実装当時はsync.Mapがなかったのかもしれないし、もしかしたらsync.Mapでは実現できない何かがあるかもしれない。これは細かい話なので次に進もう。

Dequeue

さて、QueueManagerが保持していたDispatcherの定義を確認しよう。と思ってみてみたらかなり小さい構造だった。

type Dispatcher struct {
    manager    *QueueManager
    NumWorkers int64
}

https://github.com/kohkimakimoto/hq/blob/master/server/dispatcher.go#L20-L23

構造体の定義の少し下にある実装を見てみるとloopというメソッドがdispatcherには生えており、これがディスパッチャとしての実態だった。ここではQueueからジョブを取り出して、Worker数に応じて同期・非同期に処理を行っていくようだ。dispatchAsyncの方はgoroutineで処理されるのだろう。

func (d *Dispatcher) loop() {
    m := d.manager
    logger := m.App.Logger
    config := m.App.Config

    for {
        job := <-m.Queue
        logger.Debugf("dequeue job: %d", job.ID)

        if atomic.LoadInt64(&config.MaxWorkers) <= 0 {
            // sync
            d.dispatch(job)
        } else if atomic.LoadInt64(&d.NumWorkers) < atomic.LoadInt64(&config.MaxWorkers) {
            // async
            d.dispatchAsync(job)
        } else {
            // sync
            d.dispatch(job)
        }
    }
}

https://github.com/kohkimakimoto/hq/blob/master/server/dispatcher.go#L25-L45

dispatchdispatchAsyncの中身を確認する。どちらの処理も大差はなくgoroutineで処理を行うかどうかが違うだけで、workを呼び出していることがわかる。

func (d *Dispatcher) dispatchAsync(job *hq.Job) {
    manager := d.manager

    manager.WorkerWg.Add(1)
    atomic.AddInt64(&d.NumWorkers, 1)

    go func() {
        defer func() {
            manager.WorkerWg.Done()
            atomic.AddInt64(&d.NumWorkers, -1)
        }()

        d.work(job)
    }()
}

func (d *Dispatcher) dispatch(job *hq.Job) {
    manager := d.manager

    manager.WorkerWg.Add(1)
    atomic.AddInt64(&d.NumWorkers, 1)
    defer func() {
        manager.WorkerWg.Done()
        atomic.AddInt64(&d.NumWorkers, -1)
    }()

    d.work(job)
}

https://github.com/kohkimakimoto/hq/blob/master/server/dispatcher.go#L47-L74

workを見てみる。ここでは主な処理はジョブのステータス変更とジョブの処理を行っている。ジョブの処理がworkだと思っていたけど、本体はrunHttpWorkerだった。結構長いのでコードを適宜抜粋する。ちなみにスニペットと実装されている順序は異なる。(最後のスニペットはdeferで囲まれており記述はメソッドの中間あたり。ただ処理は最後にされるのでスニペットも最後に載せた)

   // keep running job status.
    manager.RegisterRunningJob(job, cancel)
    defer manager.RemoveRunningJob(job)

    if job.Canceled {
        return
    }

    now := time.Now().UTC().Truncate(time.Millisecond)
    // update startedAt
    job.StartedAt = &now
    if e := store.UpdateJob(job); e != nil {
        logger.Error(e)
    }
   // worker
    err = d.runHttpWorker(job, ctx)
       // Update result status (success, failure or canceled).
        // If the evaluator has an error, write it to the output buf.
        if err != nil {
            logger.Errorf("worker error: %v", err)
            job.Success = false
            job.Failure = true
            job.Err = err.Error()
        } else {
            job.Success = true
            job.Failure = false
        }

        // Truncate millisecond. It is compatible time for katsubushi ID generator time stamp.
        now := time.Now().UTC().Truncate(time.Millisecond)

        // update finishedAt
        job.FinishedAt = &now

        if e := store.UpdateJob(job); e != nil {
            logger.Error(e)
        }

https://github.com/kohkimakimoto/hq/blob/master/server/dispatcher.go#L79-L139

では、本丸のrunHttpWorkerを見てみる。やってることは案外単純でJobからPayloadURLを取り出してその通りにnet/httpでリクエストを送っている。これ以上の説明のしようがないのでコードを全部載せてしまった方がわかりやすい気がしてきた。

func (d *Dispatcher) runHttpWorker(job *hq.Job, ctx context.Context) error {
    var reqBody io.Reader
    if job.Payload != nil && !bytes.Equal(job.Payload, []byte("null")) {
        reqBody = bytes.NewReader(job.Payload)
    }

    // worker
    req, err := http.NewRequest(
        "POST",
        job.URL,
        reqBody,
    )
    if err != nil {
        return errors.Wrap(err, "failed to create new request")
    }

    // set context
    req = req.WithContext(ctx)

    // common headers
    req.Header.Add("Content-Type", "application/json")
    req.Header.Add("User-Agent", WorkerDefaultUserAgent)
    req.Header.Add("X-Hq-Job-Id", fmt.Sprintf("%d", job.ID))

    // job specific headers
    for k, v := range job.Headers {
        req.Header.Add(k, v)
    }

    // http client
    client := &http.Client{
        Timeout: time.Duration(job.Timeout) * time.Second,
    }

    resp, err := client.Do(req)
    if err != nil {
        return errors.Wrap(err, "failed to do http request")
    }
    defer resp.Body.Close()

    statusCode := resp.StatusCode

    job.StatusCode = &statusCode
    body, err := ioutil.ReadAll(resp.Body)
    if err != nil {
        return errors.Wrap(err, "failed to read http response body")
    }
    job.Output = string(body)

    if resp.StatusCode != http.StatusOK {
        return fmt.Errorf(http.StatusText(resp.StatusCode))
    }

    return nil
}

https://github.com/kohkimakimoto/hq/blob/master/server/dispatcher.go#L141-L195

ふむふむ。Dequeueの雰囲気が掴めた。

ジョブの構造

最後にジョブがどういう定義になっているのかを確認する。URLPayloadは先ほどのrunHttpWorkerで見た。他はジョブの状態を表していることがわかる。

type Job struct {
    ID         uint64            `json:"id,string"`
    Name       string            `json:"name"`
    Comment    string            `json:"comment"`
    URL        string            `json:"url"`
    Payload    json.RawMessage   `json:"payload"`
    Headers    map[string]string `json:"headers"`
    Timeout    int64             `json:"timeout"`
    CreatedAt  time.Time         `json:"createdAt"`
    StartedAt  *time.Time        `json:"startedAt"`
    FinishedAt *time.Time        `json:"finishedAt"`
    Failure    bool              `json:"failure"`
    Success    bool              `json:"success"`
    Canceled   bool              `json:"canceled"`
    StatusCode *int              `json:"statusCode"`
    Err        string            `json:"err"`
    Output     string            `json:"output"`
    // status properties.
    Waiting bool `json:"waiting"`
    Running bool `json:"running"`
}

https://github.com/kohkimakimoto/hq/blob/master/hq/structs.go#L25-L45

まとめ

ということでhqをざっと読んでみた。hqは作者のありあまる親切心がREADMEや解説記事ににじみ出ており非常にわかりやすい。OSSを読んでいく上で入門的だ。内容はWebサーバを立ててKVSとchanを活用してジョブを登録・取出を行い、ジョブに保存されているURLとPayloadでリクエストを投げるというものだった。並行処理なのでロックであったり状態管理については非常に参考になる。
なんだかまとめるまでもなく記事の前半で作者の解説記事から引用してきた処理の流れ通りの説明になってしまった。それだけあの解説記事が親切でわかりやすいということで。

次回に読むOSSは決めているのだが、次回がいつやってくるのか怪しいので伏せておく。読んでいて楽しかったので失踪せずに続けたいものだ。