dbt には Python model という仕組みがある。これを使うことで SQL ではなく Python で model を書くことができ、SQL では不可能な処理が可能になる。
データウェアハウスとして BigQuery を使っている場合、 Dataproc を利用して Python model を動かすのが公式が示す方法。
Dataproc クラスタと Dataproc サーバレス、どちらでも使える。Dataproc サーバレスのほうが手軽に使えるが、自由度は Dataproc クラスタのほうが高く、Python model で使用したい任意のライブラリをインストールすることができる。
この記事では Dataproc クラスタを使って Python model を実行する方法を書いていく。必要な作業は要件によって異なるが、今回は BigQuery のテーブルの内容を読み取りただそれを別のテーブルに書き込むだけの、シンプルな Python model の実行を目指す。
この記事の内容は以下の環境で動作確認している。
- dbt-core@1.9.1
- dbt-bigquery@1.9.0
前提
dbt-coreやdbt-bigqueryのインストール手順などはこの記事では扱わない。既にdbt runコマンドを実行できる状態であるとする。
また、Dataproc クラスタを使うための準備も既に終わっているものとして話を進める。
具体的には、対象のプロジェクト(この記事ではsample-pjとする)で、以下のことを行っていることを前提としている。
- プロジェクトに対して請求先アカウントを設定する
- Dataproc API を有効にする
- 「限定公開の Google アクセス」を有効にする
これらをどのように行えばよいかは、以下の記事に書いている。
基本的に gcloud CLI で Google Cloud に対して操作を行う。
gcloud CLI のカレントプロジェクト(core/project)はsample-pjに設定してある。
gcloud CLI については以下の記事に書いた。
BigQuery にデータセットやテーブルを用意する
まずは、必要なデータセットやテーブルを BigQuery に用意していく。
src.userというテーブルを用意し、そこにデータを入れる。これをデータソースとして使う。
$ bq mk --dataset --location=asia-northeast1 src $ bq mk --table --location=asia-northeast1 src.user id:INTEGER,name:STRING $ bq query --use_legacy_sql=false "INSERT INTO src.user (id, name) VALUES (1, 'Alice'), (2, 'Bob')"
データが入っていることを確認。
$ bq query --use_legacy_sql=false "SELECT * FROM src.user LIMIT 10" +----+-------+ | id | name | +----+-------+ | 1 | Alice | | 2 | Bob | +----+-------+
次にdestデータセットを作る。このデータセットを dbt の出力先とする。
$ bq mk --dataset --location=asia-northeast1 dest
Cloud Storage バケットを用意する
Dataproc クラスタを使って Python model を実行する場合、Cloud Storage のバケットを用意する必要がある。
このバケットは、Dataproc クラスタが Python model を実行する過程で発生する一時的なファイルを格納するために使われる。
今回はsample-bucketというバケットを作成しそれを使うことにする。
$ gcloud storage buckets create gs://sample-bucket --location=asia-northeast1 $ gcloud storage ls gs://sample-bucket/
dbt プロジェクトの設定を行う
次に、dbt プロジェクトの設定を行っていく。
profiles.ymlとdbt_project.ymlをそれぞれ以下の内容にする。
gcs_bucketに、先程用意したバケットを指定する。
# profiles.yml my_profile_1: # profile の名前 target: dev # デフォルトで使う target の名前 outputs: dev: # target の名前 type: bigquery # 接続先のデータウェアハウスの種類 method: oauth # 認証に使う方法 project: sample-pj # 使用するプロジェクト dataset: dest # 使用するデータセット gcs_bucket: sample-bucket # 一時的なデータを保存するためのバケット dataproc_region: asia-northeast1 # 使用する Dataproc クラスタのリージョン
# dbt_project.yml name: 'my_dbt_project' config-version: 2 version: '1.0.0' profile: 'my_profile_1' # profiles.yml で定義した my_profile_1 を指定
ソースファイル。
# models/sources.yml version: 2 sources: - name: src tables: - name: user columns: - name: id - name: name
実行する Python model は以下。
# models/python_sample.py def model(dbt, session): dbt.config( submission_method="cluster", # Dataproc クラスタを使う場合はこの記述が必要 dataproc_cluster_name="my-cluster", # 使用する Dataproc クラスタ(このあと作成する)の名前 ) source_df = dbt.source('src', 'user') return source_df
src.userの内容を読み取ってそれを返している。
profiles.ymlのdatasetでdestを指定しておりそれを上書きしていないので、この Python model を実行するとdest.python_sampleが作られる。
サービスアカウントを用意する
最後に、サービスアカウントを用意する。
Dataproc クラスタが利用するサービスアカウントと ADC で利用するサービスアカウントの 2 つを用意する必要がある。
これらのサービスアカウントについても以下の記事で解説している。
まずは Dataproc クラスタが利用するサービスアカウントを作成する。
今回のケースでは以下のロールが付与されていれば動作する。
roles/bigquery.dataEditorroles/bigquery.jobUserroles/bigquery.readSessionUserroles/dataproc.worker
dataproc-cluster-saというサービスアカウントを作成してロールを付与する。
$ gcloud iam service-accounts create dataproc-cluster-sa $ gcloud projects add-iam-policy-binding sample-pj --member="serviceAccount:dataproc-cluster-sa@sample-pj.iam.gserviceaccount.com" --role="roles/bigquery.dataEditor" $ gcloud projects add-iam-policy-binding sample-pj --member="serviceAccount:dataproc-cluster-sa@sample-pj.iam.gserviceaccount.com" --role="roles/bigquery.jobUser" $ gcloud projects add-iam-policy-binding sample-pj --member="serviceAccount:dataproc-cluster-sa@sample-pj.iam.gserviceaccount.com" --role="roles/bigquery.readSessionUser" $ gcloud projects add-iam-policy-binding sample-pj --member="serviceAccount:dataproc-cluster-sa@sample-pj.iam.gserviceaccount.com" --role="roles/dataproc.worker"
ロールが付与されていることを確認する。
$ gcloud projects get-iam-policy sample-pj --flatten="bindings[].members" --filter="bindings.members:serviceAccount:dataproc-cluster-sa@sample-pj.iam.gserviceaccount.com" --format="table(bindings.role)" ROLE roles/bigquery.dataEditor roles/bigquery.jobUser roles/bigquery.readSessionUser roles/dataproc.worker
次に、ADC で利用するサービスアカウントを用意する。
profiles.ymlにmethod: oauthと書いたように、今回は OAuth を使って認証を行う。
具体的には ADC の仕組みを利用する。
ADC については以下の記事で解説した。
今回のケースでは以下のロールが付与されていれば動作する。
roles/bigquery.dataEditorroles/bigquery.jobUserroles/dataproc.editorgcs_bucketで指定したバケットに対するroles/storage.legacyBucketWriter
adc-saというサービスアカウントを作成してロールを付与する。
$ gcloud iam service-accounts create adc-sa $ gcloud projects add-iam-policy-binding sample-pj --member="serviceAccount:adc-sa@sample-pj.iam.gserviceaccount.com" --role="roles/bigquery.dataEditor" $ gcloud projects add-iam-policy-binding sample-pj --member="serviceAccount:adc-sa@sample-pj.iam.gserviceaccount.com" --role="roles/bigquery.jobUser" $ gcloud projects add-iam-policy-binding sample-pj --member="serviceAccount:adc-sa@sample-pj.iam.gserviceaccount.com" --role="roles/dataproc.editor" $ gcloud storage buckets add-iam-policy-binding gs://sample-bucket --member="serviceAccount:adc-sa@sample-pj.iam.gserviceaccount.com" --role="roles/storage.legacyBucketWriter"
ロールが付与されていることを確認する。
$ gcloud projects get-iam-policy sample-pj --flatten="bindings[].members" --filter="bindings.members:serviceAccount:adc-sa@sample-pj.iam.gserviceaccount.com" --format="table(bindings.role)" ROLE roles/bigquery.dataEditor roles/bigquery.jobUser roles/dataproc.editor $ gcloud storage buckets get-iam-policy gs://sample-bucket --format=json | jq '.bindings[] | select(.members[] == "serviceAccount:adc-sa@sample-pj.iam.gserviceaccount.com") | {role: .role}' { "role": "roles/storage.legacyBucketWriter" }
adc-saのサービスアカウントキーを発行しそれを ADC に設定する。
$ gcloud iam service-accounts keys create adc-sa-key.json --iam-account=adc-sa@sample-pj.iam.gserviceaccount.com $ export GOOGLE_APPLICATION_CREDENTIALS=adc-sa-key.json
Python model を実行する
これで準備が整ったので Dataproc クラスタを作成し Python model を実行する。
以下のコマンドでmy-clusterという名前の Dataproc クラスタを作成する。サービスアカウントは先程作成したdataproc-cluster-saを指定する。
$ gcloud dataproc clusters create my-cluster --service-account=dataproc-cluster-sa@sample-pj.iam.gserviceaccount.com --region=asia-northeast1 --single-node --master-machine-type=e2-standard-2
あとはdbt runすれば Dataproc クラスタで Python model が処理される。
$ dbt run -s python_sample.py
destを確認してみるとpython_sampleというテーブルが作成されており、中身も入っている。
$ bq ls dest tableId Type Labels Time Partitioning Clustered Fields --------------- ------- -------- ------------------- ------------------ python_sample TABLE $ bq query --use_legacy_sql=false 'SELECT * FROM dest.python_sample LIMIT 10' +----+-------+ | id | name | +----+-------+ | 1 | Alice | | 2 | Bob | +----+-------+
Dataproc クラスタをそのままにしておくと費用が発生し続けるので、忘れずに削除する。
$ gcloud dataproc clusters delete my-cluster --region=asia-northeast1
クラスタ一覧を取得して、削除できていることを確認する。
$ gcloud dataproc clusters list --region=asia-northeast1 Listed 0 items.