今回は、Pub/Subメッセージングモデルについてまとめていこうと思います。非同期なイベントやりとりでは、必ずといって話題に上がるものですので、どんな仕組みなのかという触りの部分だけでも押さえておきたいところです。
目次
Pub/Subメッセージングモデル
ソフトウェアアーキテクチャで利用されるメッセージングモデルについて簡単に説明します。下記のような3つの要素で構成されるメッセージのやりとりのモデルです。
- メッセージを仲介するBroker
メッセージを貯める部分
Topicという論理チャンネルにメッセージを紐付けて管理する - メッセージ発信するPublisher
Topicを指定してメッセージをBrockerに投げる - メッセージを受信するSubscriber
Topicを指定してメッセージを取りに行く
利点
このモデルには、以下の利点があります。
- メッセージを発信する人、メッセージを受信する人の部分で分断できるため、疎結合になりやすい。
- pubshiler、subscriberが状態を保持していないため、スケール可能である。
このように疎結合にできるという点、スケール可能という点から最近では、マイクロサービス感の非同期イベントベース通信などでよく利用されています。
難しさ
このモデルには、以下の難しさがあります。
- 順序を制御することが難しい
一人のpublisherがシーケンシャルに書き込んだデータをsubscriberが読み込んだときの順序を保っているとは限らない。 - publisher、subscriberがスケール可能だからといって、アプリケーションがスケールできるとは限らない。
実際には、スケール可能なアプリケーションを組めるかは、アプリケーション設計をうまく行う必要がある。 - Publisherが複数で書き込む際に同一のデータが書き込まれる可能性がある。
こちらも同一のメッセージ発行時に、アプリケーションがどう振る舞うかをきちっと設計する必要があります。
MQTT
よく利用されるPub/Subモデルには、MQTTがあります。MQTTは、非同期に1対多の通信ができるプロトコルです。軽量かつシンプルなので、IOTなどでも利用されています。
cloud Pub/Subについて
Pub/Sub は、イベントを処理するサービスとイベントを生成するサービスを切り離す非同期メッセージング サービスです。AWSに詳しい方は、Amazon SQS(AWS Simple Message Queue)サービスを思い浮かべていただければよいかと思います。cloud Pub/Subでは、以下の用語の用語が頻繁に利用されます。
- トピック
パブリッシャーがメッセージを送信する名前付きリソース - サブスクリプション
特定の単一のトピックからサブスクライブするアプリケーションに配信されるメッセージのストリームを表す、名前付きリソース - メッセージ
パブリッシャーがトピックに送信し、最終的にはサブスクライバーに配信されるデータ - メッセージ属性
パブリッシャーがメッセージに対して定義できる Key-Value ペア
パブリッシャーとサブスクライバーの関係がかなり柔軟に設定できるので、最初は戸惑ったのですが、GCPに記載されている下記図を見るとわかりやすいと思います。
この図に記載されている通り、Cloud Pub/Subとしては、Topic、SubscriptionがGCPで管理され、メッセージを発行するPublisher、メッセージを受信するSubscriberの部分が管理外になっていることがわかると思います。
Pub/Subメッセージフロー
下記にGCPの公式サイトで利用されている図を引用してきました。
TopicとSubscriptionの紐付け、メッセージを発行、topic、Subscriptionを指定して、Subscribeするという仕組みになっています。メッセージ自体は、Message Storegaeとして保持し、Ackを送ることで、そのストレージから削除されるという仕組みです。それでは、サンプルコードを見てみましょう。
使ってみる
cloud pub/sub自体は、pub/subモデルのアーキテクチャですので、同一言語で記載する必要はありません。下記例は、メッセージを発信するアプリケーションをpython、メッセージを受信するアプリケーションをgolangで記載しています。
import sys
import os
import time
import datetime
from google.cloud import pubsub_v1
project_id = "dev"
topic_name = "pubsub-test-resource"
cred_file = "hogehoge.json"
publisher = pubsub_v1.publisher.Client.from_service_account_file(cred_file)
topic_path = publisher.topic_path(project_id, topic_name)
cnt = 0
while True:
data = u"Message from test publisher {}".format(
cnt) + datetime.datetime.now().isoformat(" ")
data = data.encode("utf-8")
print("Publish: " + data.decode("utf-8", "ignore"))
future = publisher.publish(topic_path, data=data)
print("return ", future.result())
time.sleep(0.25)
cnt = cnt + 1
package main
import (
"context"
"log"
"os"
"time"
"cloud.google.com/go/pubsub"
"google.golang.org/api/option"
)
// ReceiveMessageCB トピックメッセージ受信時のCallback型
type ReceiveMessageCB func(context.Context, []byte)
// Subscriber メッセージ観測者
type Subscriber interface {
Receive(ctx context.Context, topicName string, subscriptionName string, callback ReceiveMessageCB)
}
type subscriber struct {
}
// NewSubscriber サブスクラーバを生成する
func NewSubscriber(ctx context.Context) (Subscriber, error) {
return &subscriber{}, nil
}
// Receive メッセージ受信
func (subscriber *subscriber) Receive(ctx context.Context, topicName string, subscriptionName string, callback ReceiveMessageCB) {
topic, err := createTopicIfNotExists(ctx, topicName)
if err != nil {
log.Printf("create topic failed error %v\n", err.Error())
return
}
sub := createSubscription(subscriptionName, topic)
err = sub.Receive(ctx, func(cctx context.Context, msg *pubsub.Message) {
msg.Ack()
log.Printf("Got message: %q\n", string(msg.Data))
if callback != nil {
callback(cctx, msg.Data)
}
})
if err != nil {
log.Printf("subscriber recive error %v\n", err)
}
}
func newPubsubClient(ctx context.Context) (*pubsub.Client, error) {
projectID := os.Getenv("PUBSUB_PROJECT_ID")
pubsubKeyfilePath := os.Getenv("PUBSUB_KEYFILE_PATH")
sa := option.WithCredentialsFile(pubsubKeyfilePath)
client, err := pubsub.NewClient(ctx, projectID, sa)
if err != nil {
log.Fatal(err)
}
return client, nil
}
func createTopicIfNotExists(ctx context.Context, topicName string) (*pubsub.Topic, error) {
client, err := newPubsubClient(ctx)
if err != nil {
log.Fatal(err)
}
// Create a topic to subscribe to.
t := client.Topic(topicName)
ok, err := t.Exists(ctx)
if err != nil {
log.Printf("Topic Exists func failed %v", err)
return t, err
}
if ok {
return t, nil
}
log.Printf("create Topic Name[%v]", topicName)
t, err = client.CreateTopic(ctx, topicName)
if err != nil {
log.Printf("Failed to create the topic: %v", err)
return t, err
}
return t, nil
}
func createSubscription(subName string, topic *pubsub.Topic) *pubsub.Subscription {
ctx := context.Background()
client, err := newPubsubClient(ctx)
if err != nil {
log.Fatal(err)
}
// Create a topic to subscribe to.
sub := client.Subscription(subName)
ok, err := sub.Exists(ctx)
if err != nil {
log.Printf("Subscription Exists func failed %v\n", err)
return sub
}
if ok {
log.Printf("Subscription Exists func ok\n")
return sub
}
sub, err = client.CreateSubscription(ctx, subName, pubsub.SubscriptionConfig{
Topic: topic,
AckDeadline: 10 * time.Second,
})
if err != nil {
log.Printf("erro:%v", err)
return nil
}
log.Printf("Created subscription: %v\n", sub)
return sub
}
func main() {
ctx := context.Background()
subscriber, err := NewSubscriber(ctx)
if err != nil {
log.Fatalf("subscriber create failed %v", err.Error())
}
for {
subscriber.Receive(ctx, "pubsub-test-resource", "pubsub-test-resource-sub", func(cctx context.Context, data []byte) {
log.Printf("recived")
})
}
}
メッセージを受信したことは、msg.Ack()で行います。これを呼び出すことでメッセージがサブスクリプションのメッセージキューから削除されます。
実装自体は、そこまで難しいありません。Pub/Subメッセージングモデルの難しさで紹介した、スケール可能なアプリケーションを組むための設計&実装の部分で考慮が必要にあります。
さらに詳しく知りたい方へ
もう少し内部実装について学びたい方は、Microsoftの技術記事が参考になるかと思います。マイクロサービス間での非同期メッセージ通信に関する実装については、マイクロサービス間でイベント ベースの通信を実装する (統合イベント)で紹介されています。こちらを読むとイベントを通知するようなパターンが網羅されているので、参考にしてみて下さい。
今回は、pub/subメッセージングモデルという観点で、GCPが提供するcloud Pub/Subについての勉強と簡単ですが、利用方法について紹介しました。お疲れ様でした。

