Skip to content

Instantly share code, notes, and snippets.

@nownabe
Created August 9, 2022 07:43
Show Gist options
  • Save nownabe/695ee9e86b7bd2ceacdb3fd67581260c to your computer and use it in GitHub Desktop.
Save nownabe/695ee9e86b7bd2ceacdb3fd67581260c to your computer and use it in GitHub Desktop.
Quickstart: Pub/Sub BigQuery Subscription

Pub/Sub BigQuery Subscription の作り方

Pub/Sub BigQuery Subscription を利用する最小限の手順です。

Project 準備

利用するプロジェクトを設定してください。

gcloud config set project YOUR-PROJECT

BigQuery 準備

利用するデータセットを作成してください。

参考: データセットの作成  |  BigQuery  |  Google Cloud

bq --location asia-northeast1 \
  mk --dataset \
  $(gcloud config get-value project):pubsubtest

テーブルを作成してください。

参考: テーブルの作成と使用  |  BigQuery  |  Google Cloud

bq mk --table \
  $(gcloud config get-value project):pubsubtest.dsttable \
  name:STRING,age:INT64

Pub/Sub Topic 準備

Pub/Sub トピックのスキーマ定義ファイルを作成してください。

参考: スキーマの作成と管理  |  Cloud Pub/Sub ドキュメント  |  Google Cloud

cat <<EOF > topic-schema.json
{
  "type": "record",
  "name": "Avro",
  "fields": [
    {"name": "name", "type": "string"},
    {"name": "age", "type": "long"}
  ]
}
EOF

Pub/Sub トピックのスキーマを作成してください。

参考: スキーマの作成と管理  |  Cloud Pub/Sub ドキュメント  |  Google Cloud

gcloud pubsub schemas create test-schema \
  --type AVRO \
  --definition "$(cat topic-schema.json)"

スキーマを使用してトピックを作成してください。

参考: トピックの作成と使用  |  Cloud Pub/Sub ドキュメント  |  Google Cloud

gcloud pubsub topics create test-topic \
  --message-encoding JSON \
  --schema test-schema

Pub/Sub Subscription 準備

Pub/Sub サービスアカウントに BigQuery のロールを割り当ててください。

参考: サブスクリプションを作成して使用する  |  Cloud Pub/Sub ドキュメント  |  Google Cloud

sa=service-$(gcloud projects describe $(gcloud config get-value project) --format "value(projectNumber)")@gcp-sa-pubsub.iam.gserviceaccount.com
cat <<EOF > table-policy.json
{
  "bindings": [
    {"members": ["serviceAccount:$sa"], "role": "roles/bigquery.dataEditor"},
    {"members": ["serviceAccount:$sa"], "role": "roles/bigquery.metadataViewer"}
  ]
}
EOF
bq set-iam-policy \
  $(gcloud config get-value project):pubsubtest.dsttable \
  table-policy.json

BigQuery Subscription を作成してください。

参考: サブスクリプションを作成して使用する  |  Cloud Pub/Sub ドキュメント  |  Google Cloud

gcloud pubsub subscriptions create test-subscription \
  --topic test-topic \
  --bigquery-table $(gcloud config get-value project):pubsubtest.dsttable \
  --use-topic-schema

Pub/Sub トピックにメッセージを Publish する

main.go を実行してメッセージを Publish してください。

参考: クイックスタート: クライアント ライブラリを使用して Pub/Sub でメッセージをパブリッシュおよび受信する  |  Cloud Pub/Sub ドキュメント  |  Google Cloud

PROJECT_ID=$(gcloud config get-value project) go run main.go

BigQuery で確認する

データが BigQuery に保存されているか確認してください。

参考: インタラクティブ クエリとバッチクエリのジョブの実行  |  BigQuery  |  Google Cloud

bq query \
  --use_legacy_sql=false \
  "SELECT * FROM argolis-shogow.pubsubtest.dsttable"
package main
import (
"context"
"encoding/json"
"fmt"
"log"
"os"
"cloud.google.com/go/pubsub"
)
type message struct {
Name string `json:"name"`
Age int64 `json:"age"`
}
func main() {
ctx := context.Background()
client, err := pubsub.NewClient(ctx, os.Getenv("PROJECT_ID"))
if err != nil {
log.Fatalf("Failed to create Pub/Sub client: %v", err)
}
defer client.Close()
t := client.Topic("test-topic")
messages := []*message{
{"Alice", 12},
{"Bob", 34},
{"Charlie", 21},
{"Dave", 46},
{"Ellen", 98},
{"Frank", 5},
}
for _, m := range messages {
if err := publish(ctx, t, m); err != nil {
log.Printf("Error: %v", err)
}
}
}
func publish(ctx context.Context, t *pubsub.Topic, msg *message) error {
data, err := json.Marshal(msg)
if err != nil {
return fmt.Errorf("json.Marshal: %w", err)
}
result := t.Publish(ctx, &pubsub.Message{Data: data})
id, err := result.Get(ctx)
if err != nil {
return fmt.Errorf("result.Get: %w", err)
}
log.Printf("Published a message '%+v' [id=%s]", msg, id)
return nil
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment