【OSS探訪記】hq
背景・モチベーション
Goで並行処理について公式ドキュメントや書籍を読んだが、実際にどのように使われているのか、そしてどのようなプロダクトがあるのか気になった。そこで、awesome-goを見て並行処理をしていそうなOSSを読んでみようと思う。
ざっくり気ままに読んでいきたいので誤読していたり大事なところを読んでいなかったりするかもしれない。OSSの紹介ではなく自己満足でただ読んでいくだけなので、解説についてはそこまで期待しないで頂ければと思う。気になった方が自分も読んでみようかなと思ってもらえると嬉しい。
対象OSS
今回はhqを読む。(さっそくawesome-goに含まれていないOSSなのだが、以前に作者の記事を読んでいて非常に面白かったのでコードも読んでみた)
これは何をするソフトウェア?
hqはジョブキューのライブラリ。詳細は作者が解説した記事があるのでそれを読むのが正確で理解は早い。あと、この記事は作者の解説記事を読んでいることはある程度前提にして話が進むので、流し読みせずに読まれる方は解説記事を一読された方が良いだろうと思う。
hqは下記の特徴がある。
・Goによる実装で、シングルバイナリ
・スタンドアロンのHTTP APIサーバー。ジョブのデータベースも読み込みであるため、別途特別な依存を必要としないで動作する
・シンプルでプログラミング言語非依存。HTTP APIでジョブを投入し、ジョブはHTTP POSTメッセージをワーカーアプリケーション(Webアプリ)に送信するというアーキテクチャ
・フロントエンドとしてCLIとWebUIを組み込みでサポート
処理の流れはこんな感じ。
HTTP APIでジョブ(JSON)を投入します。HQはジョブを取り出し、ジョブに記載されたURLにHTTP POSTして、別途用意されたワーカー用のWebアプリケーションにジョブを実行させる、という流れになっています。
解説記事がめっちゃわかりやすい。いきなりREADMEや実装を読む前に解説があると非常に助かる。
実装を追う
ジョブキューとしてのおおまかな流れはわかったので、下記の実装を見てみようと思う。
- Webサーバとして起動している箇所
- ジョブをenqueueしている箇所
- キューの構造
- ジョブをdequeueしている箇所
- ジョブの構造
Webサーバの起動
サーバの起動なのでそこまでmain関数から離れて定義されてはいないだろう。ということでmain関数を発見。mainの中でrealMain
を呼び出している。
func main() {
os.Exit(realMain())
}
https://github.com/kohkimakimoto/hq/blob/master/cmd/hq/hq.go#L11-L13
realMain
の中ではurfave/cli
を使ってCLIアプリとしてコマンドの設定を行いコマンドを実行している。設定に従いapp.Run(os.Args)
でコマンドが実行される。
app := cli.NewApp() app.Name = hq.Name app.HelpName = hq.Name app.Version = hq.Version + " (" + hq.CommitHash + ")" app.Usage = "Simplistic job queue engine" app.Commands = command.Commands if err := app.Run(os.Args); err != nil { printError(err) status = 1 }
https://github.com/kohkimakimoto/hq/blob/master/cmd/hq/hq.go#L23-L33
コマンドの一覧は下記。ServeCommand
がサーバの起動だろう。
// Command set var Commands = []cli.Command{ DeleteCommand, InfoCommand, ListCommand, PushCommand, RestartCommand, ServeCommand, StatsCommand, StopCommand, }
https://github.com/kohkimakimoto/hq/blob/master/command/commands.go#L13-L23
さて、ServeCommand
とは何なのか。定義に飛んでみるとserverAction
がハンドラとして登録されており、serverAction
ではListenAndServe()
を呼び出しサーバを起動していることがわかる。その過程でserver.NewApp(config)
でApp
を作っているようだ。
var ServeCommand = cli.Command{ Name: "serve", Usage: "Starts the HQ server process", Action: serverAction, Flags: []cli.Flag{ configFileFlag, logLevelFlag, }, } func serverAction(ctx *cli.Context) error { config := server.NewConfig() if err := loadServerConfigFiles(ctx, config); err != nil { return err } applyLogLevel(ctx, config) app := server.NewApp(config) defer app.Close() return app.ListenAndServe() }
https://github.com/kohkimakimoto/hq/blob/master/command/serve.go#L11-L34
NewApp
を見るとApp
という構造にEcho
がラップされている。つまりWebサーバの実態はEcho
だ。
func NewApp(config ...*Config) *App { var c *Config if len(config) == 0 { c = NewConfig() } else { c = config[0] } // create app instance app := &App{ Config: c, Echo: echo.New(), DataDir: c.DataDir, } app.Echo.HideBanner = true app.Echo.HidePort = true app.Echo.Server.Addr = app.Config.Addr return app }
https://github.com/kohkimakimoto/hq/blob/master/server/app.go#L58-L78
ついでなのでApp
がどういう構造なのかも見てみる。ほとんど設定関連だがDB *bolt.DB
やStore *Store
、QueueManager *QueueManager
などがある。(Gen
はジョブのIDを採番するためのもの。本筋とそこまで関わらないのでここで補足しておく)
type App struct { // Configuration of the application instance Config *Config // Logger Logger echo.Logger // LogfileWriter LogfileWriter reopen.Writer // LogLevel LogLevel log.Lvl // Echo web framework Echo *echo.Echo // AccessLog AccessLogFile *os.File // AccessLogFile AccessLogFileWriter reopen.Writer // DataDir DataDir string // UseTempDataDir UseTempDataDir bool // DB DB *bolt.DB // Store Store *Store // Background Background *Background // katsubushi Gen katsubushi.Generator // QueueManager QueueManager *QueueManager }
https://github.com/kohkimakimoto/hq/blob/master/server/app.go#L27-L56
App
がわかったので先ほどのListenAndServe
の中身を見ていく。ちょっと行数が多いが基本的にはEcho
の起動設定と起動など。ジョブキューとしての本筋と関連しそうなのはメソッドの先頭で行っているapp.Open()
だ。
func (app *App) ListenAndServe() error { // open resources such as log files, database, temporary directory, etc. if err := app.Open(); err != nil { return err }
https://github.com/kohkimakimoto/hq/blob/master/server/app.go#L203-L207
Open
では先ほどのスニペットのコメントにある通り、Echo
以外で必要なリソースの初期化処理を行っている。100行近いコードなので抜粋。下記のように初期化してApp
に代入している。QueueManager
については起動も同時に行っている。これでジョブを受け付け実効する準備が完了するのだろう。
// setup bolt database db, err := bolt.Open(app.BoltDBPath(), 0600, nil) if err != nil { return err } app.DB = db logger.Infof("Opened boltdb: %s", db.Path()) // store app.Store = &Store{ app: app, db: db, logger: logger, } if err := app.Store.Init(); err != nil { return err } // queue app.QueueManager = NewQueueManager(app) app.QueueManager.Start()
https://github.com/kohkimakimoto/hq/blob/master/server/app.go#L137-L158
ということで、コマンドでserve
を渡すとEcho
のWebサーバが起動するようだ。
Enqueue
次にこのWebサーバでどのようにEnqueueしているのかを見ていく。これはREADME.md
に記載があり、/job
に対してPOSTするとのこと。
https://github.com/kohkimakimoto/hq/blob/master/README.md#post-job
ではさっそくPOST /job
を探そう。サーバに設定されているハンドラの一覧は下記。POST /job
に対応するのはCreateJobHandler
だ。
func setupAPIHandlers(e *echo.Echo, prefix string) { e.Any(prefix, InfoHandler) e.GET(prefix+"stats", StatsHandler) e.POST(prefix+"job", CreateJobHandler) e.GET(prefix+"job", ListJobsHandler) e.GET(prefix+"job/:id", GetJobHandler) e.DELETE(prefix+"job/:id", DeleteJobHandler) e.POST(prefix+"job/:id/stop", StopJobHandler) e.POST(prefix+"job/:id/restart", RestartJobHandler) }
https://github.com/kohkimakimoto/hq/blob/master/server/app.go#L58-L78
CreateJobHandler
の中でリクエストの中身からJob
を作ってapp.QueueManager.EnqueueAsync(job)
という記述がある。また、app.Store.CreateJob
という記述もある。解説記事に書いてあるが、このジョブキューはメモリとKVSのどちらにもジョブを登録するので、これらはその話と対応する処理なのだろう。メソッド全部を載せるにはやや長いので抜粋。
job := &hq.Job{} job.ID = id job.CreatedAt = katsubushi.ToTime(id) job.Name = req.Name job.Comment = req.Comment job.URL = req.URL job.Payload = req.Payload job.Headers = req.Headers job.Timeout = req.Timeout if err := app.Store.CreateJob(job); err != nil { return err } app.QueueManager.EnqueueAsync(job)
https://github.com/kohkimakimoto/hq/blob/master/server/handlers.go#L50-L64
まずapp.Store.CreateJob
を見てみる。メソッドの最初の行に以下のように書いてある。s.db
とUpdate
に渡している関数のtx *bolt.Tx
から、Store
はbolt
のクライアントとして定義されていそうな予感。
return s.db.Update(func(tx *bolt.Tx) error {
https://github.com/kohkimakimoto/hq/blob/master/server/store.go#L74
Store
の定義に飛んでみるとやはりそうだった。app.Store.CreateJob
の中で色々やっていそうだけど、bolt
にジョブを突っ込んでいる処理だとは思うので、一旦これで理解を留めて先に進む。
type Store struct { app *App db *bolt.DB logger echo.Logger }
https://github.com/kohkimakimoto/hq/blob/master/server/store.go#L15-L19
先ほどのCreateJobHandler
のapp.QueueManager.EnqueueAsync
に話を戻そう。呼び出しているEnqueueAsync
の定義を読むと、これはジョブをキューに登録しているものだ。また、同時にgoroutineでQueue
という変数のchanにjobを積んでいることがわかる。m.RegisterWaitingJob(job)
はいまいちわからないので読む必要がありそう。
func (m *QueueManager) EnqueueAsync(job *hq.Job) { m.RegisterWaitingJob(job) go func() { m.Queue <- job }() }
https://github.com/kohkimakimoto/hq/blob/master/server/queue.go#L51-L57
RegisterWaitingJob
がやってることはmapにジョブを登録する処理だった。ジョブは待機ジョブとして登録される様子。
func (m *QueueManager) RegisterWaitingJob(job *hq.Job) { m.statusMutex.Lock() defer m.statusMutex.Unlock() m.WaitingJobs[job.ID] = &WaitingJob{ Job: job, } }
https://github.com/kohkimakimoto/hq/blob/master/server/queue.go#L79-L86
ついでなので、RegisterRunningJob
とRemoveRunningJob
も見ておく。やってることはジョブのステータス変更だが、Mapで管理しているので、各Mapから該当するIDのデータをdelete
している。
func (m *QueueManager) RegisterRunningJob(job *hq.Job, cancel context.CancelFunc) { m.statusMutex.Lock() defer m.statusMutex.Unlock() m.RunningJobs[job.ID] = &RunningJob{ Job: job, Cancel: cancel, } // remove waiting jobs delete(m.WaitingJobs, job.ID) } func (m *QueueManager) RemoveRunningJob(job *hq.Job) { m.statusMutex.Lock() defer m.statusMutex.Unlock() delete(m.RunningJobs, job.ID) }
https://github.com/kohkimakimoto/hq/blob/master/server/queue.go#L59-L77
ここまでで、bolt
にデータを格納しつつQueueManager
のQueue
というchanにジョブを積む、という流れがわかった。作者の解説記事にもあるが、途中でプロセスが死んでもbolt
から取り出せるのでジョブをロストしないようにするための設計だ。
Queueの構造
では、さっきから度々登場していたQueueManager
の定義を確認する。先ほどまではEnqueueを見ていたが、ここからはDequeueのための定義も含まれている。
type QueueManager struct { App *App Queue chan *hq.Job Dispatchers []*Dispatcher WorkerWg *sync.WaitGroup // job status statusMutex *sync.Mutex WaitingJobs map[uint64]*WaitingJob RunningJobs map[uint64]*RunningJob }
https://github.com/kohkimakimoto/hq/blob/master/server/queue.go#L9-L19
Dispatcher
は名前からして明らかにDequeue関連なので一旦後回し。他にはWatingJob
とRunningJob
が気になる。これらの定義を見てみるとJob
をラップしたものだった。これらの構造体でラップすることでJob
のステータスを表しているようだ。RunningJob
の方はジョブとCancel
を抱き合わせているので取り回しが便利そう。
type WaitingJob struct { Job *hq.Job } type RunningJob struct { Job *hq.Job Cancel context.CancelFunc }
https://github.com/kohkimakimoto/hq/blob/master/server/queue.go#L114-L121
キューの構造自体はシンプルだった。状態毎のMapにジョブのIDで登録・削除をしていく。sync.Map
を使えばMap操作前後でロックの確保・開放の処理が必要なくなるのかな、と思うなどした。けれどこれは実装当時はsync.Map
がなかったのかもしれないし、もしかしたらsync.Map
では実現できない何かがあるかもしれない。これは細かい話なので次に進もう。
Dequeue
さて、QueueManager
が保持していたDispatcher
の定義を確認しよう。と思ってみてみたらかなり小さい構造だった。
type Dispatcher struct { manager *QueueManager NumWorkers int64 }
https://github.com/kohkimakimoto/hq/blob/master/server/dispatcher.go#L20-L23
構造体の定義の少し下にある実装を見てみるとloop
というメソッドがdispatcher
には生えており、これがディスパッチャとしての実態だった。ここではQueue
からジョブを取り出して、Worker数に応じて同期・非同期に処理を行っていくようだ。dispatchAsync
の方はgoroutineで処理されるのだろう。
func (d *Dispatcher) loop() { m := d.manager logger := m.App.Logger config := m.App.Config for { job := <-m.Queue logger.Debugf("dequeue job: %d", job.ID) if atomic.LoadInt64(&config.MaxWorkers) <= 0 { // sync d.dispatch(job) } else if atomic.LoadInt64(&d.NumWorkers) < atomic.LoadInt64(&config.MaxWorkers) { // async d.dispatchAsync(job) } else { // sync d.dispatch(job) } } }
https://github.com/kohkimakimoto/hq/blob/master/server/dispatcher.go#L25-L45
dispatch
とdispatchAsync
の中身を確認する。どちらの処理も大差はなくgoroutineで処理を行うかどうかが違うだけで、work
を呼び出していることがわかる。
func (d *Dispatcher) dispatchAsync(job *hq.Job) { manager := d.manager manager.WorkerWg.Add(1) atomic.AddInt64(&d.NumWorkers, 1) go func() { defer func() { manager.WorkerWg.Done() atomic.AddInt64(&d.NumWorkers, -1) }() d.work(job) }() } func (d *Dispatcher) dispatch(job *hq.Job) { manager := d.manager manager.WorkerWg.Add(1) atomic.AddInt64(&d.NumWorkers, 1) defer func() { manager.WorkerWg.Done() atomic.AddInt64(&d.NumWorkers, -1) }() d.work(job) }
https://github.com/kohkimakimoto/hq/blob/master/server/dispatcher.go#L47-L74
work
を見てみる。ここでは主な処理はジョブのステータス変更とジョブの処理を行っている。ジョブの処理がwork
だと思っていたけど、本体はrunHttpWorker
だった。結構長いのでコードを適宜抜粋する。ちなみにスニペットと実装されている順序は異なる。(最後のスニペットはdeferで囲まれており記述はメソッドの中間あたり。ただ処理は最後にされるのでスニペットも最後に載せた)
// keep running job status. manager.RegisterRunningJob(job, cancel) defer manager.RemoveRunningJob(job) if job.Canceled { return } now := time.Now().UTC().Truncate(time.Millisecond) // update startedAt job.StartedAt = &now if e := store.UpdateJob(job); e != nil { logger.Error(e) }
// worker
err = d.runHttpWorker(job, ctx)
// Update result status (success, failure or canceled). // If the evaluator has an error, write it to the output buf. if err != nil { logger.Errorf("worker error: %v", err) job.Success = false job.Failure = true job.Err = err.Error() } else { job.Success = true job.Failure = false } // Truncate millisecond. It is compatible time for katsubushi ID generator time stamp. now := time.Now().UTC().Truncate(time.Millisecond) // update finishedAt job.FinishedAt = &now if e := store.UpdateJob(job); e != nil { logger.Error(e) }
https://github.com/kohkimakimoto/hq/blob/master/server/dispatcher.go#L79-L139
では、本丸のrunHttpWorker
を見てみる。やってることは案外単純でJob
からPayload
やURL
を取り出してその通りにnet/http
でリクエストを送っている。これ以上の説明のしようがないのでコードを全部載せてしまった方がわかりやすい気がしてきた。
func (d *Dispatcher) runHttpWorker(job *hq.Job, ctx context.Context) error { var reqBody io.Reader if job.Payload != nil && !bytes.Equal(job.Payload, []byte("null")) { reqBody = bytes.NewReader(job.Payload) } // worker req, err := http.NewRequest( "POST", job.URL, reqBody, ) if err != nil { return errors.Wrap(err, "failed to create new request") } // set context req = req.WithContext(ctx) // common headers req.Header.Add("Content-Type", "application/json") req.Header.Add("User-Agent", WorkerDefaultUserAgent) req.Header.Add("X-Hq-Job-Id", fmt.Sprintf("%d", job.ID)) // job specific headers for k, v := range job.Headers { req.Header.Add(k, v) } // http client client := &http.Client{ Timeout: time.Duration(job.Timeout) * time.Second, } resp, err := client.Do(req) if err != nil { return errors.Wrap(err, "failed to do http request") } defer resp.Body.Close() statusCode := resp.StatusCode job.StatusCode = &statusCode body, err := ioutil.ReadAll(resp.Body) if err != nil { return errors.Wrap(err, "failed to read http response body") } job.Output = string(body) if resp.StatusCode != http.StatusOK { return fmt.Errorf(http.StatusText(resp.StatusCode)) } return nil }
https://github.com/kohkimakimoto/hq/blob/master/server/dispatcher.go#L141-L195
ふむふむ。Dequeueの雰囲気が掴めた。
ジョブの構造
最後にジョブがどういう定義になっているのかを確認する。URL
とPayload
は先ほどのrunHttpWorker
で見た。他はジョブの状態を表していることがわかる。
type Job struct { ID uint64 `json:"id,string"` Name string `json:"name"` Comment string `json:"comment"` URL string `json:"url"` Payload json.RawMessage `json:"payload"` Headers map[string]string `json:"headers"` Timeout int64 `json:"timeout"` CreatedAt time.Time `json:"createdAt"` StartedAt *time.Time `json:"startedAt"` FinishedAt *time.Time `json:"finishedAt"` Failure bool `json:"failure"` Success bool `json:"success"` Canceled bool `json:"canceled"` StatusCode *int `json:"statusCode"` Err string `json:"err"` Output string `json:"output"` // status properties. Waiting bool `json:"waiting"` Running bool `json:"running"` }
https://github.com/kohkimakimoto/hq/blob/master/hq/structs.go#L25-L45
まとめ
ということでhqをざっと読んでみた。hqは作者のありあまる親切心がREADMEや解説記事ににじみ出ており非常にわかりやすい。OSSを読んでいく上で入門的だ。内容はWebサーバを立ててKVSとchanを活用してジョブを登録・取出を行い、ジョブに保存されているURLとPayloadでリクエストを投げるというものだった。並行処理なのでロックであったり状態管理については非常に参考になる。
なんだかまとめるまでもなく記事の前半で作者の解説記事から引用してきた処理の流れ通りの説明になってしまった。それだけあの解説記事が親切でわかりやすいということで。
次回に読むOSSは決めているのだが、次回がいつやってくるのか怪しいので伏せておく。読んでいて楽しかったので失踪せずに続けたいものだ。