BigQuery のワークフローを Cloud Composer 以外で動かす方法を模索する

このエントリーをはてなブックマークに追加

Cloud Composer

データ処理基盤を構築していて BigQuery で複雑なワークフローを実行したいと思ったことはないでしょうか?
GCP 上でデータ基盤を構築しているのであれば Cloud Composer を使うことで簡単にワークフロー化できます。ですが BigQuery のフローを制御するためだけに GKE 上で Cloud Composer を動かすのは避けたいと思うかもしれません。(私は思いました。)過去には Cloud Functions と Cloud Pub/Sub を組み合わせてワークフローを組み立てたこともありますが、フローが複雑になってくると全体を把握し、管理することが難しくなってしまいました。
そこで BigQuery のクエリで構成されたワークフローを GCP 上で実行する際に、 Cloud Composer 以外の選択肢にはどんな物があるのか?について調査しました。結論は「予算に余裕があるなら Cloud Composer」ですが、ニッチな用途で BigQuery を実行したいとか、同じようなことを思いついた方の参考になれば幸いです。

この記事では各サービスの詳しい構築方法については述べませんので、Apache Airflow、 Apache Beam、そしてGCP の各サービスの詳しい利用方法につきましては、それぞれの製品のドキュメントを参照してください。

目次

Cloud Composer で BiqQuery のワークフローを実行する

GCPには Cloud Composer という Python で記述したワークフロー (DAG、有向非巡回グラフ) を実行するサービスがあります。
これは GCP上でApache Airflow を扱えるようにしたフルマネージド・サービスです。 フルマネージド・サービスなのでCloud Composer 環境を構築したら GCS に DAG をアップロードするだけでワークフローが実行できる状態になります。

Cloud Composer の環境を構築すると10分程度で GKE 上にノードが最低3台起動します。
構築が完了すると環境が表示されます。 ここからDAGの追加やWeb UI の表示などができるようになっており、操作は非常に簡単です。
ただし、環境が存在している間は常にGCEの利用料金が発生していることに注意する必要があります。
Cloud Composer をコンソールのUIから構築した場合、GKEのクラスターを構成する n1-standard-1 のインスタンスが最低でも 3台起動します。 (APIから作成した場合は未調査です)

追記: Cloud Console からリソースは見えませんが、調整できない部分でDBやWebサーバーなどが作成されています。料金などはこちらをご覧ください。

ページトップの画像にもあったように、Cloud Composer の環境には裏で様々なサービスが動いていて、ユーザーはそれを意識することなく利用できるようになっています。例えば、DBMS として Cloud SQL に接続していますが、コンソールを見ても存在はわかりませんでした。

Cloud Dataflow で BiqQuery のワークフロー(パイプライン)を実行する

Cloud DataflowApache Beam の Pipeline を実行できるフルマネージドのデータ処理サービスです。

簡単に特徴を挙げますと、

  • 水平自動スケーリングが可能であり許容するサーバーの数を設定するだけで分散データ処理をすることができる
  • ストリーミング処理とバッチ処理の両方に対応している
  • パイプラインのコードはJavaか Python で記述できる
  • 実行状態を確認する UI を備えてる

などがあります。

Cloud Dataflow はワークフローを実行するというより、データ処理パイプラインを実行するためのもので、BiqQuery をその処理に組み込めるという位置づけですが、 BigQuery に対してクエリを実行することができます(くわしくはこちら)。BigQuery の実行のためだけに使うのはあまり想定されていないかもしれません。クエリはGCEで動作するワーカーから投げているため、実行中はワーカーのリソースが消費されます。しかし、Cloud Dataflow の最低ワーカー数は1台なので、Cloud Composer の GKEクラスター(GCEインスタンス3台)を構築するのに比べて費用を抑えることができます。運用コストだけを考えれば、マネージド・サービスで BigQuery のワークフロー実行環境を作ろうとすると Cloud Dataflow を利用することが現実的な解なのかもしれません。

SQLのクエリを実行するだけの用途で Cloud Composer と Dataflow を使う場合を比較すると、 Apache Beam でパイプラインを記述する Dataflow は Python コードの記述が面倒に感じました。トリッキーな使い方をしている感じが否めず、個人的には費用が安くてもBigQuery のためだけにこの方法は使いたくありません。

そこで、別の方法でさらに軽量な実行方法を探ってみます。

Cloud Functions で BiqQuery のクエリを実行する

BigQuery で実行するクエリのうち、数分以内に確実に完了するような簡単なものであれば Cloud FunctionsCloud Pub/Sub を組み合わせて実現する事もできます。頑張って依存関係を作り出せばワークフローっぽいことができるかもしれません。

以下の図は外部から GCS にデータが追加されるたびにCloud Functions を使って何か処理をしている命令とデータの流れを示したものです。Cloud Functions のコーディングは BigQuery のクエリ実行が完了したら、Cloud Functions の関数を抜ける直前に Cloud Pub/Sub にトピックを Publish するように記述します。
Cloud Pub/Sub を間に挟むことで、Cloud Functions 同士の結合は疎になりフローを容易に変更することができます。

Cloud Funcsions や Pub/Sub を BigQuery の実行に使う程度であれば、ストリーム処理のように大量のデータが流れるわけではないため、ほとんどの場合 Always Free 枠の範囲内でできてしまいます。(某案件で実績があります)

しかし、以下のような構成になると Cloud Functions の実装は一気に難しくなります。
(上記のワークフローから Cloud Functions が1つ増えただけです)
1番最後に実行される Cloud Functions は、どうやって依存している2つの Cloud Functions からの Pub/Sub メッセージを受け取るのでしょうか?
Cloud Functions 自体は状態を持たないので「両方から呼ばれた」という状態を作り出すためには外部の DB やストレージに状態をもたせる必要があります。

Cloud Functions を使った方法では複数の結果を待って処理をするワークフローを表現することが難しいということがわかりました。この方法ではワークフロー全体の見通しが悪くなるので別途図を作る必要も出てきて2重管理になりがちです。

さらに、Cloud Functions は長時間の処理を想定していないため、タイムアウトまでの時間を最長でも 540 秒までしか伸ばすことができないという制限があります。また、BigQuery は処理内容とデータ量によっては実行に1時間以上かかるケースもあります(こちらも某案件でありました)。このようなクエリを Cloud Functions で実行してしまうと、途中で強制終了してしまうため注意が必要です。

Cloud Functions は手軽に利用できる上にアプリケーションの開発に集中できるため非常に便利なものですが、ワークフローを含め、実行に長い時間を必要とするバッチ処理には向いていないことがわかります。

BigQuery のワークフローを GCP 上で実行する方法をさらに探す

ワークフローを実行するにあたり、フルマネージド・サービス以外にも範囲を広げて Cloud Composer ほどリソースを必要としないやり方が無いか調査する必要がありそうです。

フルマネージド・サービスではスケールアウトできる環境を構築しようとすると、Kubernetes 上に構築するのが一般的になりつつありますが、Kubernetes のクラスターはその管理自体にある程度のリソースを割り当てる必要があります。Cloud Composer の場合では先ほど説明した通り、 n1-standard-1 3台の GKE クラスターが最小構成になっています (API から直接作成した場合は未調査です)。

そこで GKE を使わずに使えそうなものを調査します。なお、独断と偏見により Python でワークフローを記述できる前提となっています(チームで使用している言語が主に Python であること、SQL やワークフローのコードを Git で管理したいというのが理由です)。

Python でワークフローを記述できるワークフローエンジン

名称 WebUI 内蔵スケジューラー デーモンとして動作 外部依存 並列実行 拡張性
Apache Airflow
Luigi ◯(luigid)

ソース: 独自調査

Apache Airflow は Quick Start レベルのお試し実行であれば単体で動作しますが、単体ではDAGの動作確認くらいしかできません。「Apache Airflow を GCP 上で動かす」で詳しく解説しますが、並列実行できないのが BigQuery のワークフローを実行する上で問題になります。

Luigi はそれ単体でもワークフローの並列実行が可能で簡単に使えます。△になっているところについては luigid と Central Scheduler を導入することで解消できますが、Apache Airflow と同様にSQLAlchemy から DBMS に接続する必要が生じるため、外部依存が少ない利点が失われてしまいます。

上記の2つを比較して検討してみましたが、ワークフローの管理や状態を確認しやすいWebUI の存在は捨てがたく、外部依存を少なくするのは難しそうです。優劣つけ難いですが GCP 上に限れば、いざとなれば Cloud Composer に実行環境を移行できるApache Airflow を GKE なしで使うのが良さそうです。

Apache Airflow を GCP上で動かす

とりあえず、マネージドなコンテナ実行環境で動かせないか考えてみます。
Apache Airflow は Web Server、 Scheduler、Worker の3つで構成されていて最低でも Web Server と Scheduler の2つをコンテナとして実行する必要があります。(1コンテナで1サービスとした場合)

# start the web server, default port is 8080
airflow webserver -p 8080

# start the scheduler
airflow scheduler

# visit localhost:8080 in the browser and enable the example dag in the home page

引用元: https://airflow.apache.org/docs/stable/start.html

複数コンテナをまとめて実行しようとすると柔軟性の高いコンテナ実行環境が求められます。また、デーモンとして動作させたほうが良いものがいくつかあるのでリクエストが有ったときだけコンテナを起動するような方法は使えません。公式ドキュメントに目を通すと他にも様々な制限が書いてあります。Apache Airflow を(BigQuery のワークフロー管理目的で) GCP 上で動かすために必要なことをまとめると、

  • Cloud Functions や Cloud Run のようにリクエストを受けてコンテナを起動するような方法は使えない → サーバーを構築する
  • Quickstart の通りにやっただけでは SQLAlchemy の接続先が SQLite になるため同時実行可能数が1になってしまう → 外部の DB を用意して接続する
  • GCP 上で動かすことを考えると Web サーバー上で動作している UI ページのアクセス制限が必要になる → Cloud IAP の設定

ということになります。

Apache Airflow のドキュメントを読むと Cloud Composer の構成がページトップの図のようになっている理由がわかりますね。 Worker も使うように構成すると、さらに以下のことも必要になります。

  • Executor をCeleryExecutor に変更する
  • Worker ノードへは分散キューを経由してジョブが投げられるので Redis を構築する

スクラッチは面倒なのでいい感じの Docker コンテナを探す

1から Apache Airflow の動く環境を作ってもいいのですが、とりあえずいい感じの Docker Image を探してみると、2つの Docker イメージが目に止まりました。

どちらもよくできていて、DAG を 用意すれば簡単にワークフローを実行する事ができました。特に airflow-docker の方は DBMS も選べるので スクラッチで環境構築するよりも断然お手軽です。

しかし、 1. BigQuery のワークフローを実行する 2. 軽量な環境を構築するという目的を達成するには Executor を LocalExecutor に切り替え、分散キュー(Redis)を使わない構成にしたり、MySQL の設定をチューニングしたり(メモリを切り詰める)する必要がありそうです。そうすると、Docker Image に結構手を入れる必要がありそうです。設定ファイルの置換えを始めてしまうと、自分で1からイメージを作ったほうが、いろんな問題に悩まされずに済むのでは(環境変数の追加とか)と頭をよぎります。

なんとか Apache Airflow on GCP を実現したいのでまとまった時間ができたら簡単に GCE 上にデプロイできるものを作りたいですが、それはまた別の記事にします。

結論

BigQuery のワークフローを満足に実行できる Apache Airflow を動かすのはとても大変です。
せめて 外部のDBMS なしで各ジョブを並列実行してくれれば、データの処理自体は BigQuery がするので問題ないのですが。

GCP上でワークフローを実行するにあたって、いくつかのパターンが見えてきましたが、すべてをカバーできる最強の環境が Cloud Composer ということになります。ここで、ワークフローの性質に合わせた実行方法の比較結果をまとめてみます。

    ワークフローの性質 Cloud Composer Cloud Dataflow Cloud Functions Apache Airflow on GCE 主な理由
    ワンショット、複雑 × × 実行が終わり次第環境をすぐ壊すのであればGKEの維持費は問題になりにくい。他は構築に手間がかかる。
    高頻度で実行、単純 環境が常時存在しているのでGKEの維持費用が問題になる。構築の手間と費用のバランスが取れた Dataflow を使う。Cloud Functions でもできないことはない。
    高頻度で実行、複雑 × 複雑な依存関係を可視化して実行できる Cloud Composer のメリットが活かせる。他は構築に手間がかかる。Cloud Functions ではコーディングと依存関係を確認する難易度が指数関数的に上昇する。
    低頻度で実行、単純 環境が常時存在しているのでGKEの維持費用が気になる。GCEの費用もチョット気になる。構築の手間がかかるが費用が極限まで安価になる Cloud Functions を使う。Apache Airflow on GCE でもインスタンスを小さく抑えられればメリットはある。
    低頻度で実行、複雑 × 環境が常時存在しているのでGKEの維持費用が気になる。GCEの費用もチョット気になる。が、ワークフローが複雑で Cloud Functions が使えない以上、初期投資をして Apache Airflow を構築してしまうメリットがある。運用が短期間であれば GKE の 従量課金で Cloud Composer 環境を作ってしまうほうが良いだろう。

    ソース: 独自調査

    ※あくまでも GCP 上で BigQuery のワークフローを実行する前提での比較です。


    Portions of this page are reproduced from work created and shared by Google and used according to terms described in the Creative Commons 4.0 Attribution License.

    【お知らせ】
    給食の献立作成やメニュー計画にお悩みの方を支援します

    栄養士が作成した過去の献立データを元に、献立計画を生成する学習済みモデルを構築するサービス「献立・メニュー計画 AI 生成 API 構築サービス」をご提供しています。

    「献立作成やメニュー計画に時間がかかる」「献立作りが属人化している」などの課題解決のお手伝いをします。

    無料でダウンロード可能な資料をご用意しておりますので、献立・メニュー作成にお悩みの方はぜひご覧ください。

    資料の無料ダウンロードはこちら

    WRITER
    Kazunari Yabe

    データエンジニア

    矢部   一成 Kazunari Yabe

    主にデータエンジニアとして、データ分析基盤の構築やAI活用のシステム開発を担当。  情報処理安全確保支援士、応用情報技術者

    最新記事

    ページTOPへ