ものづくりのブログ

うちのネコを題材にしたものづくりができたらいいなと思っていろいろ奮闘してます。

【python】BigQuery にカウントアップするIDを登録する処理

Google BigQuery に対してカウントアップするIDを登録する処理を考えてみました。

準備

クレデンシャル設定

GCPの認証を設定します。(GOOGLE_APPLICATION_CREDENTIALS 環境変数でサービスアカウントキーを設定)

google-cloud-bigquery ライブラリのインストール

$ pip install google-cloud-bigquery

コード

サンプルコードは以下の通りです。

  • create_table_if_not_exists()関数では、指定されたテーブルが存在するかを確認します。存在しない場合は、指定されたスキーマを持つテーブルを作成します。
  • get_next_id()関数では、テーブルのid列の最大値を取得し、それに1を加えた次のidを生成します。
  • insert_id()関数で、次のidとその他のフィールドを含むデータをテーブルに挿入します。
from google.cloud import bigquery
from google.cloud.exceptions import NotFound
import uuid

# BigQueryクライアントを作成
client = bigquery.Client()

# プロジェクトID、データセットID、テーブルIDを指定
project_id = "your-project-id"
dataset_id = "your_dataset_id"
table_id = "your_table_id"

# BigQueryの完全なテーブル名を構成
table_ref = client.dataset(dataset_id).table(table_id)

# テーブルが存在するか確認し、存在しない場合は作成
def create_table_if_not_exists():
    try:
        client.get_table(table_ref)  # テーブルを取得して存在を確認
        print(f"Table {table_id} already exists.")

    except NotFound:
        print(f"Table {table_id} not found. Creating table...")
        schema = [
            bigquery.SchemaField("id", "INTEGER", mode="REQUIRED"),
            bigquery.SchemaField("name", "STRING", mode="REQUIRED"),
            bigquery.SchemaField("uuid", "STRING", mode="REQUIRED"),
            bigquery.SchemaField("created_at", "TIMESTAMP", mode="REQUIRED"),
        ]
        table = bigquery.Table(table_ref, schema=schema)
        table = client.create_table(table)  # テーブルを作成
        print(f"Created table {table_id}.")

# 既存のデータに基づいて最大のIDを取得し、それに1を加える
def get_next_id():
    query = f"""
        SELECT COALESCE(MAX(id), 0) + 1 AS next_id
        FROM `{project_id}.{dataset_id}.{table_id}`
    """
    query_job = client.query(query)
    result = query_job.result()
    for row in result:
        return row['next_id']

# カウントアップしたIDをテーブルに挿入する
def insert_id():
    next_id = get_next_id()

    # 新しい行のデータ
    rows_to_insert = [
        {
            "id": next_id,
            "name": "Sample Name",
            "uuid": str(uuid.uuid4()),
            "created_at": bigquery.Timestamp.now()
        }
    ]

    # データを挿入
    errors = client.insert_rows_json(table_ref, rows_to_insert)

    # エラーがあれば表示
    if errors == []:
        print(f"Successfully inserted row with id {next_id}")
    else:
        print(f"Encountered errors while inserting rows: {errors}")

# テーブルが存在しなければ作成
create_table_if_not_exists()

# データを挿入
insert_id()