BigQueryワークフロー実行用に”Cloud Composerもどき”を構築する

このエントリーをはてなブックマークに追加

こんにちは、データデザイン部でデータエンジニアをしている矢部です。

前回の記事ではBigQueryのワークフローを実行する方法を模索しました。その中で Cloud Composerを使わずにApache AirflowをGoogle Cloud Platform上で動かす場合について軽く触れました。
今回はその具体的な方法を紹介します。

この記事ではGoogle Cloud Platformの詳しい構築方法や Apache Airflow の具体的なDAG記述方法については述べませんので詳しくは、それぞれの製品のドキュメントを参照してください。

目次

準備

ファイアウォールルールの設定

Apache Airflow の管理UIが待受しているポートに接続できるようにファイアウォールを構成しておきます。
Airflow の管理UIはデフォルトではTCP 8080 番ポートを使います。

Compute Engine のファイアウォールルールの作成で以下のルールを追加します。

  • トラフィックの方向:上り
  • 一致したときのアクション:許可
  • ターゲットタグ:airflow (VMインスタンスのタグと合わせる)
  • ソースフィルタ:IP範囲(適宜設定してください)
  • ソースIPの範囲:(<管理画面にアクセスするPC等のグローバルIPアドレス>/32など)
  • プロトコルとポート:TCP 8080

VMインスタンスの作成

Apache Airflow などのコンテナを動作させるVMインスタンスを以下の構成で作成します。

マシンタイプについてはどこまで小さくできるか試したのですが、メモリまわりをいじってないのでコンテナ類をすべて立ち上げるとメモリを1GBは消費してしまいます。

  • マシンタイプ:e2-small(2vCPU、2GBメモリ)
  • OS:Container-Optimized OS
  • ブートディスク:標準の永続ディスク 10GB
  • ネットワーク タグ:airflow (ファイアウォールのタグと合わせる)

Cloud Storage バケットの作成

Apache AirflowのDAGファイルやプラグインを配置するバケットを作成します。
わかりやすい適当な名前を設定してください。(このバケット名は後ほど使用します。)

Cloud Composer もどきの構築

Apache Airflow を Google Compuet Engine のVM上に構築します。

ディレクトリ構成

この説明では、VM内のディレクトリ構成を以下のようにします。

$ tree
.
└── airflow
    └── docker-compose.yml
1 directory, 1 file

docker-compose.ymlファイルの作成

airflowディレクトリを作成し、その中にdocker-compose.yml ファイルを以下の内容で作成します。このファイルは puckel/docker-airflow の docker-compose-LocalExecutor.yml を参考に作成しています。

version: '3.7'
services:
    gcsfuse:
        image: ekino/gcsfuse:latest
        command: ['ash', '-c', "\
            apk add unison && \
            mkdir -p /mnt/gcs && \
            gcsfuse -o allow_other <バケット名> /mnt/gcs && \
            while true; do
                unison /mnt/gcs /mnt/airflow -batch -silent;
                sleep 10;
            done"]
        restart: always
        logging:
            driver: gcplogs
        privileged: true
        volumes:
            - airflow:/mnt/airflow

    postgres:
        image: postgres:9.6
        restart: always
        environment:
            - POSTGRES_USER=airflow
            - POSTGRES_PASSWORD=airflow
            - POSTGRES_DB=airflow
        logging:
            driver: gcplogs

    webserver:
        image: puckel/docker-airflow:1.10.9
        restart: always
        depends_on:
            - gcsfuse
            - postgres
        environment:
            - LOAD_EX=n
            - EXECUTOR=Local
            - AIRFLOW__CORE__DAGS_FOLDER=/mnt/airflow/dags
            - AIRFLOW__CORE__PLUGINS_FOLDER=/mnt/airflow/plugins
        logging:
            driver: gcplogs
        volumes:
            - airflow:/mnt/airflow
        ports:
            - "8080:8080"
        command: ['webserver']
        healthcheck:
            test: ["CMD-SHELL", "[ -f /usr/local/airflow/airflow-webserver.pid ]"]
            interval: 30s
            timeout: 30s
            retries: 3
volumes:
  airflow:
    driver: local

ファイルの次の部分を適宜編集します。

  • <バケット名>:準備で作成しておいたCloud Storage のバケット名に置き換え

コンテナの起動

docker-compose をインストールしても良いのですが、 Container-Optimized OS を使用している関係上 以下のコマンドを実行してdocker-composeもコンテナ内で実行します。

$ docker run --rm -v /var/run/docker.sock:/var/run/docker.sock -v "$PWD:/rootfs/$PWD" -w="/rootfs/$PWD" docker/compose:1.26.2 up -d

しばらく待っていると 3つのコンテナが立ち上がり、Airflow にアクセスできるようになります。

DAGの実行

Apache Airflow のUIは http://<グローバルIPアドレス>:8080 で開くことができます。
グローバルIPはCompute Engine の VMインスタンスに表示されています。

 
ブラウザで先ほど説明したUI の URL を開くと、Airlfow の管理画面が開きます。

 
Cloud Composer もどきの構築で作成しておいた Cloud Storage のバケットに DAGファイルを追加します。

 
しばらく待つと、Airflow で確認できるようになりますので、通常のAirflow の操作で実行するだけです。

各コンテナの説明

各コンテナがどのような役割をしているのか説明します。
この Docker Compose もどきに使用しているコンテナイメージは Docker Hub から取得しています。検証用途であれば問題ありませんが、プロダクション用途で使用する場合はコンテナイメージを自前で用意することをおすすめします。

webserver

はじめはAirflow の動作するコンテナイメージを新たに作成しようとしていましたが、DBにPostgreSQLを用意してAirflow の設定を一部変更することでpuckel/docker-airflowをそのまま使用できました。

DAG ファイルとプラグインを外部から差し込めるようにボリューム(airflow)をマウントし、環境変数(AIRFLOW__CORE__DAGS_FOLDER、AIRFLOW__CORE__PLUGINS_FOLDER)でAirflow の設定を上書きしています。

postgres

PostgreSQLの公式 Docker イメージを使用しています。
Apache Airflow のチュートリアルなどでは、 DBMS に SQLite、Executor に Sequencial Executor が設定されています。タスクの並列実行のためにLocal Executorを使用する場合は SQLite を使用することができませんでした(※)。そのため、SqlAlchemy で使用できる別のDBを使うことになりますが、ドキュメントで推奨されている MySQL か PostgreSQL を使用するのが良さそうです。
puckel/docker-airflow では PostgreSQL が指定されているようなのでそれに従います。

※起動時に airflow.exceptions.AirflowConfigException: error: cannot use sqlite with the LocalExecutor という例外が発生します。

gcsfuse

ekino/gcsfuse のDockerイメージを使用しています。

Google Cloud Storage のバケットをマウントすることができるGCSFUSEを用いてDAGファイルやプラグインをCloud Storage から読み込めるようにしています。

GCSFUSEでマウントしたディレクトリをDocker のボリュームで共有することができないため、GCSFUSE でマウントしたディレクトリとDocker の共有ディレクトリを unison コマンドを定期的に実行することで同期しています。

動作ログの確認方法

実行中のコンテナから出力されるログは Stackdriver Logging から確認することができます。

Stackdriver LoggingのログビューアをCloud ConsoleのLoggingから開きます。
ソースはVM Instance に設定してください。

おわりに

いかがだったでしょうか?Google Cloud Platform 上で Apache Airflow を実行する環境を構築する事ができました。

Cloud Composer のようにクラスターを構築しているわけではないので、コンピューティングに負荷のかかるタスクは実行できませんが、BigQuery のワークフローの実行であれば問題なくこなせると思います。

もしスケールアウトや信頼性を重視したCloud Composerの環境が必要になった場合にはDAGファイルをCloud Composer のバケットに移動するだけで簡単に移行できます。


Apache Airflow, Apache, Airflow, the Airflow logo, and the Apache feather logo are either registered trademarks or trademarks of The Apache Software Foundation. All other products or name brands are trademarks of their respective holders, including The Apache Software Foundation.

Portions of this page are reproduced from work created and shared by Google and used according to terms described in the Creative Commons 4.0 Attribution License.

【お知らせ】
隠れた“高コスパエリア”を見つけられる「Starflake retail」が今なら20%割引でご利用できます

「Starflake retail」は、任意のエリアごとの人口密度と経済的な価値(地価など)を比較、クラスタリングしたデータです。流通業の店舗開発や不動産ディベロッパーの分譲地開拓などで、エリアごとのコスパを把握できます。  

本サービスのリリースを記念して、20%OFFでご利用いただけるキャンペーンを9/30まで実施中。
興味のある方は詳細ページからお問い合わせください。

「Starflake retail」の詳細はこちら

WRITER
Kazunari Yabe

データエンジニア

矢部   一成 Kazunari Yabe

主にデータエンジニアとして、データ分析基盤の構築やAI活用のシステム開発を担当。  情報処理安全確保支援士、応用情報技術者

最新記事

ページTOPへ