MENU
  • HOME
  • 取引実績
  • お問い合わせ
  • 資料DL
  • 会社概要
  • FAQ
  • Struccle記事
  • BigQuery記事
データ流通、検索エンジン開発のプロフェッショナル
DataStructor
  • HOME
  • 取引実績
  • お問い合わせ
  • 資料DL
  • 会社概要
  • FAQ
  • Struccle記事
  • BigQuery記事
DataStructor
  • HOME
  • 取引実績
  • お問い合わせ
  • 資料DL
  • 会社概要
  • FAQ
  • Struccle記事
  • BigQuery記事
  1. ホーム
  2. 当ブログのコーディング実行環境設定
  3. AI Agent開発日記 2025/06/29

AI Agent開発日記 2025/06/29

2025 7/01
当ブログのコーディング実行環境設定
2025年7月1日

やりたいこと

データを読み込むだけで、以下が自動で行われる仕組みを構築したい。

  • 読み込んだデータがElasticsearchに自動登録される
  • データと指示内容がGeminiに渡され、PlaybookやTool/Schemaが自動生成される
  • 生成された内容が自動でPlaybookやToolに反映され、管理用エージェントと紐づいて構成される

前回の調査日記

あわせて読みたい
AI Agent開発日記 2025/06/28 やりたいこと データを読み込むだけで、以下が自動で行われる仕組みを構築したい。 読み込んだデータがElasticsearchに自動登録される データと指示内容がGeminiに渡さ...
目次

コンソールに表示されるテキストの順序と内容を、分かりやすくなるように整理、調整する

  • やりたいこと
    • t.py実行時のテキスト全てを整理する
  • 現状
    • 下記コードで検証に成功した

コード

# -*- coding: utf-8 -*-
"""
Integration Service API サンプル実行スクリプト

このスクリプトは、カスタムAPIサーバー(Integration Service)を介して、
Google Cloud Dialogflow CX の Tool や Playbook をプログラムで操作するサンプルです。

主な機能:
1. BigQuery から Elasticsearch へのデータ同期
2. Elasticsearch のデータ構造から OpenAPI スキーマをAIで生成し、Dialogflow Tool を作成
3. ドキュメント例から Playbook の Instructions (指示) をAIで生成し、Dialogflow Playbook を作成
4. 作成した Playbook を既存の親 Playbook に子として関連付ける

■ 前提条件
1. Python 3.8以上
2. 必要なライブラリのインストール:
   pip install requests
3. 実行中の Integration Service API サーバー
   (このスクリプトは、デフォルトで http://localhost:8080 にアクセスします)
4. Google Cloud プロジェクトおよび Dialogflow CX エージェントのセットアップ
5. (任意) Elasticsearch クラスタおよび BigQuery データセットのセットアップ

■ 設定方法
1. 下記の「<<< ユーザー設定 >>>」セクションを、ご自身の環境に合わせて編集してください。
   - APIキー (GEMINI_API_KEY, ELASTIC_CLOUD_ENCODED_API_KEY)
   - Dialogflow関連ID (DF_PROJECT_ID, DF_AGENT_ID など)
   - 各機能で使用するデータソース名 (インデックス名、テーブル名など)
2. APIキーは、コードに直接書き込む代わりに、環境変数として設定することも推奨されます。
   (例: export GEMINI_API_KEY="your_gemini_api_key_here")

■ 実行方法
1. ターミナルでスクリプトを実行します:
   python your_script_name.py
2. スクリプト下部の `if __name__ == "__main__":` ブロックで、実行したいフローの
   コメントを解除して実行してください。
"""
import requests
import json
import os
import unicodedata

# --- グローバル設定 ---
# APIサーバーが稼働しているポートに合わせて変更してください
PORT = os.getenv("PORT", "8080")
BASE_URL = f"http://localhost:{PORT}"
APP_PREFIX = "integration_service"

# ==============================================================================
# <<< ユーザー設定 >>>
# ご自身の環境に合わせて、以下のプレースホルダーを実際の値に書き換えてください。
# 環境変数に設定している場合は、自動的にその値が使用されます。
# ==============================================================================

# --- APIキー設定 ---
# Gemini APIキー。Playbook/ToolのAI生成機能で必須です。
GEMINI_API_KEY = os.getenv("GEMINI_API_KEY", "YOUR_GEMINI_API_KEY_HERE")
# Elasticsearch APIキー (Base64エンコード済み)。データ同期機能で使用します。
ELASTIC_CLOUD_ENCODED_API_KEY = os.getenv("ELASTIC_CLOUD_ENCODED_API_KEY", "YOUR_ELASTIC_API_KEY_HERE")

# --- Dialogflow設定 ---
DF_PROJECT_ID = os.getenv("DF_PROJECT_ID", "your-gcp-project-id")
DF_LOCATION_ID = os.getenv("DF_LOCATION_ID", "global") # 通常は "global" またはリージョン名
DF_AGENT_ID = os.getenv("DF_AGENT_ID", "your-dialogflow-agent-id")
# 新しく作成したPlaybookを追加する先の「親」となるPlaybookのID
PARENT_PLAYBOOK_ID = os.getenv("PARENT_PLAYBOOK_ID", "parent-playbook-id-to-add-children")

# --- データソース設定 (データ同期フロー用) ---
BQ_DATASET = "your_test_dataset"
BQ_TABLE = "your_source_table"
ES_URL = "http://localhost:9200" # またはElastic Cloudのエンドポイント
ES_INDEX_FOR_SYNC = "synced_data_index"

# --- Tool/Playbook自動生成フロー用設定 ---
# Tool/Playbookの基になるデータソースのインデックス名
ES_INDEX_FOR_GENERATION = "my_product_index"
# 上記インデックスに含まれるドキュメントの例 (AIがデータ構造を理解するために使用)
DOC_EXAMPLES_FOR_GENERATION = [
    {"item_id": "AB-123", "productName": "Super Duper Gadget", "price_usd": 199.99, "inStock": True,
     "details": {"color": "red", "weight_grams": 250}, "tags": ["electronics", "new", "popular"]},
    {"item_id": "XY-789", "productName": "Basic Utility Tool", "price_usd": 25.00, "inStock": False,
     "description": "A simple tool for everyday tasks.", "manufacturer_code": "MFR-002"}
]
# 生成するTool/Playbookの表示名や説明 (スクリプト内で自動的にインデックス名が付与されます)
GENERATED_TOOL_DISPLAY_NAME_BASE = "Generated_Tool"
GENERATED_TOOL_DESCRIPTION = "AIが生成したスキーマに基づく汎用検索ツール(スクリプト経由)。"
GENERATED_PLAYBOOK_DISPLAY_NAME_BASE = "物件検索Playbook"
GENERATED_PLAYBOOK_GOAL = "ユーザーの指示に基づいて適切な物件を提案する"
GEMINI_MODEL = "gemini-1.5-flash"

# --- スタンドアロンPlaybook作成フロー用設定 ---
# Playbookが参照する、既存のToolのIDリスト
STANDALONE_PLAYBOOK_REFERENCED_TOOL_IDS = ["existing-tool-id-1", "existing-tool-id-2"]

# ==============================================================================
# <<< 設定ここまで >>>
# ==============================================================================


# --- コンソール出力ヘルパー ---
def print_flow_header(title):
    """フローの開始をコンソールに表示する"""
    print(f"\n{'='*10} {title} {'='*10}")

def print_flow_footer(success=True):
    """フローの終了をコンソールに表示する"""
    status = "完了" if success else "失敗または中断"
    print(f"========== フロー{status} ==========")

def print_step_header(message):
    """フロー内のステップの開始を表示する"""
    print(f"\n--- {message} ---")

def print_process(message, indent=0):
    """進行中の処理を表示する"""
    prefix = "  " * indent
    print(f"{prefix}⏳ {message}...")

def print_api_result(path, success, indent=0):
    """APIリクエストの結果を表示する"""
    prefix = "  " * indent
    status = "[OK]" if success else "[ERROR]"
    print(f"{prefix}--> API Call: {path} {status}")

def print_success(message, indent=0):
    """成功メッセージを表示する"""
    prefix = "  " * indent
    print(f"{prefix}✅ {message}")

def print_warning(message, indent=0):
    """警告メッセージを表示する"""
    prefix = "  " * indent
    print(f"{prefix}⚠️ {message}")

def print_info_block(title, content, indent=0):
    """整形された情報ブロックを表示する"""
    prefix = "  " * indent
    print(f"{prefix}--- {title} ---")
    # contentがNoneの場合を考慮
    content_str = str(content) if content is not None else "N/A"
    for line in content_str.splitlines():
        print(f"{prefix}  {line}")
    print(f"{prefix}{'-' * (len(title) + 8)}")


# --- API通信ヘルパー ---
def make_api_request(endpoint_path, payload, method="POST", timeout=240, indent=0):
    """
    指定されたエンドポイントにAPIリクエストを送信する

    Args:
        endpoint_path (str): APIのエンドポイントパス (例: /integration_service/data/sync)
        payload (dict): 送信するJSONペイロード
        method (str): HTTPメソッド (現在はPOSTのみ対応)
        timeout (int): リクエストのタイムアウト時間(秒)
        indent (int): コンソール出力のインデントレベル

    Returns:
        dict or None: 成功した場合はレスポンスのJSON、失敗した場合はNone
    """
    url = f"{BASE_URL}{endpoint_path}"
    response = None
    try:
        if method.upper() == "POST":
            # APIキーをペイロードに追加
            if GEMINI_API_KEY and "gemini_api_key" not in payload:
                payload["gemini_api_key"] = GEMINI_API_KEY
            if ELASTIC_CLOUD_ENCODED_API_KEY and "elasticsearch_encoded_api_key" not in payload:
                 payload["elasticsearch_encoded_api_key"] = ELASTIC_CLOUD_ENCODED_API_KEY

            response = requests.post(url, json=payload, timeout=timeout)
        else:
            print_warning(f"Unsupported HTTP method: {method}", indent)
            return None

        response.raise_for_status()
        print_api_result(endpoint_path, success=True, indent=indent)
        return response.json()

    except requests.exceptions.HTTPError as http_err:
        print_api_result(endpoint_path, success=False, indent=indent)
        print_warning(f"HTTP Error: {http_err}", indent + 1)
        if response is not None:
            try:
                error_body = json.dumps(response.json(), indent=2, ensure_ascii=False)
                print_info_block("Error Response Body", error_body, indent + 1)
            except json.JSONDecodeError:
                print_info_block("Error Response Body (Non-JSON)", response.text, indent + 1)
    except requests.exceptions.RequestException as req_err:
        print_api_result(endpoint_path, success=False, indent=indent)
        print_warning(f"Request Error: {req_err}", indent + 1)
    except Exception as e:
        print_api_result(endpoint_path, success=False, indent=indent)
        print_warning(f"An unexpected error occurred: {e}", indent + 1)

    return None


# --- 設定チェックヘルパー ---
def check_api_key(key_value, key_name):
    """APIキーが設定されているかチェックする"""
    if not key_value or "YOUR_" in key_value:
        print_warning(f"設定値 '{key_name}' が未設定です。このキーを必要とする処理は失敗します。")
        return False
    return True

def check_dialogflow_ids():
    """Dialogflow関連のIDが設定されているかチェックする"""
    valid = True
    if "your-gcp-project-id" in DF_PROJECT_ID:
        print_warning("DF_PROJECT_ID がデフォルト値のままです。")
        valid = False
    if "your-dialogflow-agent-id" in DF_AGENT_ID:
        print_warning("DF_AGENT_ID がデフォルト値のままです。")
        valid = False
    return valid


# --- メインフロー関数 ---

def run_sync_bq_to_es_flow():
    """BigQueryからElasticsearchへのデータ同期フローを実行する"""
    print_flow_header("データ同期 (BigQuery → Elasticsearch)")
    if not check_api_key(ELASTIC_CLOUD_ENCODED_API_KEY, "ELASTIC_CLOUD_ENCODED_API_KEY"):
        print_flow_footer(success=False)
        return

    payload = {
        "dataset_id": BQ_DATASET,
        "table_id": BQ_TABLE,
        "elasticsearch_url": ES_URL,
        "elasticsearch_index": ES_INDEX_FOR_SYNC,
    }

    print_process("データを同期しています")
    response = make_api_request(f"/{APP_PREFIX}/data/sync_bq_to_es", payload, timeout=180)

    if response:
        print_success("データ同期が完了しました")
        print_info_block("API Response", response.get('status', 'No details available'))
        print_flow_footer(success=True)
    else:
        print_warning("データ同期に失敗しました。詳細は上記のエラーを確認してください。")
        print_flow_footer(success=False)

def run_tool_and_playbook_creation_flow():
    """スキーマ生成からTool作成、Playbook作成、親Playbookへの追加までを一貫して実行する"""
    print_flow_header("Tool & Playbook 連続作成フロー")

    if not check_api_key(GEMINI_API_KEY, "GEMINI_API_KEY") or not check_dialogflow_ids():
        print_flow_footer(success=False)
        return

    # --- ステップ1: OpenAPIスキーマ生成 ---
    print_step_header("ステップ1: OpenAPIスキーマ生成")
    print_process("AIにスキーマ生成をリクエストしています...", indent=1)
    schema_payload = {
        "elasticsearch_index_name": ES_INDEX_FOR_GENERATION,
        "elasticsearch_document_examples": DOC_EXAMPLES_FOR_GENERATION,
        "gemini_model_name": GEMINI_MODEL
    }
    generated_schema = make_api_request(f"/{APP_PREFIX}/generate/openapi_schema", schema_payload, indent=1)

    if not generated_schema:
        print_warning("スキーマ生成に失敗しました。フローを中断します。", indent=1)
        print_flow_footer(success=False)
        return
    print_success("スキーマ生成に成功しました。", indent=1)
    print_info_block("Generated OpenAPI Schema", json.dumps(generated_schema, indent=2, ensure_ascii=False), indent=1)

    # --- ステップ2: Dialogflow Tool作成 ---
    print_step_header("ステップ2: Dialogflow Tool作成")
    # インデックス名をサニタイズしてTool名に追加
    clean_es_index_name = ES_INDEX_FOR_GENERATION.replace('-', '_').replace('.', '_')
    tool_display_name = f"{GENERATED_TOOL_DISPLAY_NAME_BASE}_{clean_es_index_name}"
    print_info_block("作成するTool名", tool_display_name, indent=1)

    print_process(f"Tool「{tool_display_name}」を作成しています...", indent=1)
    tool_payload = {
        "project_id": DF_PROJECT_ID,
        "location_id": DF_LOCATION_ID,
        "agent_id": DF_AGENT_ID,
        "tool_display_name": tool_display_name,
        "tool_description": GENERATED_TOOL_DESCRIPTION,
        "openapi_schema_string": json.dumps(generated_schema)
    }
    tool_response = make_api_request(f"/{APP_PREFIX}/dialogflow/create_tool", tool_payload, indent=1)

    if not (tool_response and "tool_id" in tool_response):
        print_warning("Toolの作成に失敗しました。フローを中断します。", indent=1)
        print_flow_footer(success=False)
        return

    created_tool_id = tool_response['tool_id']
    created_tool_name = tool_response.get('display_name', tool_display_name)
    print_success(f"Tool「{created_tool_name}」(ID: {created_tool_id}) を作成しました。", indent=1)

    # --- ステップ3: Dialogflow Playbook作成 ---
    print_step_header("ステップ3: 関連Playbook作成")
    # Playbook Instructions生成
    print_process("AIにPlaybookの指示を生成させています...", indent=1)
    instr_payload = {
        "elasticsearch_index_name": ES_INDEX_FOR_GENERATION,
        "elasticsearch_document_examples": DOC_EXAMPLES_FOR_GENERATION,
        "gemini_model_name": GEMINI_MODEL,
        "tool_name_in_playbook": created_tool_name
    }
    instr_response = make_api_request(f"/{APP_PREFIX}/generate/playbook_instructions", instr_payload, indent=1)

    if not (instr_response and "playbook_instructions_markdown" in instr_response):
        print_warning("Playbookの指示生成に失敗しました。フローを中断します。", indent=1)
        print_flow_footer(success=False)
        return

    generated_instructions = instr_response["playbook_instructions_markdown"]
    print_success("Playbookの指示生成に成功しました。", indent=1)
    print_info_block("Generated Playbook Instructions", generated_instructions, indent=1)

    # Playbook作成
    playbook_display_name = f"{GENERATED_PLAYBOOK_DISPLAY_NAME_BASE}_{clean_es_index_name}"
    print_info_block("作成するPlaybook名", playbook_display_name, indent=1)

    print_process(f"Playbook「{playbook_display_name}」を作成しています...", indent=1)
    playbook_payload = {
        "project_id": DF_PROJECT_ID,
        "location_id": DF_LOCATION_ID,
        "agent_id": DF_AGENT_ID,
        "playbook_display_name": playbook_display_name,
        "playbook_goal": GENERATED_PLAYBOOK_GOAL,
        "playbook_instruction_text": generated_instructions,
        "tool_ids": [created_tool_id] # 作成したToolのIDを使用
    }
    playbook_response = make_api_request(f"/{APP_PREFIX}/dialogflow/create_playbook", playbook_payload, indent=1)

    if not (playbook_response and "playbook_id" in playbook_response):
        print_warning("Playbookの作成に失敗しました。", indent=1)
        print_flow_footer(success=False)
        return

    created_playbook_id = playbook_response['playbook_id']
    created_playbook_name = playbook_response.get('display_name', playbook_display_name)
    print_success(f"Playbook「{created_playbook_name}」(ID: {created_playbook_id}) を作成しました。", indent=1)

    # --- ステップ4: 親Playbookへの追加 ---
    if PARENT_PLAYBOOK_ID and "parent-playbook-id" not in PARENT_PLAYBOOK_ID:
        add_playbook_to_parent_flow(
            parent_playbook_id=PARENT_PLAYBOOK_ID,
            child_playbook_info={"playbook_id": created_playbook_id, "display_name": created_playbook_name}
        )
    else:
        print_info_block("Skip", "PARENT_PLAYBOOK_IDが設定されていないため、親Playbookへの追加はスキップします。", indent=0)

    print_flow_footer(success=True)

def add_playbook_to_parent_flow(parent_playbook_id, child_playbook_info):
    """指定された子Playbookを親Playbookに追加する"""
    print_step_header("ステップ4: 親Playbookへの追加")
    if not check_dialogflow_ids(): return

    # 親Playbookの現在の指示を取得
    print_process(f"親Playbook (ID: {parent_playbook_id}) の情報を取得しています...", indent=1)
    get_instr_payload = {
        "project_id": DF_PROJECT_ID, "location_id": DF_LOCATION_ID, "agent_id": DF_AGENT_ID,
        "playbook_id": parent_playbook_id
    }
    parent_instr_response = make_api_request(f"/{APP_PREFIX}/dialogflow/get_playbook_instructions", get_instr_payload, indent=1)

    if not (parent_instr_response and "instructions_text" in parent_instr_response):
        print_warning("親Playbookの指示取得に失敗しました。更新を中止します。", indent=1)
        return

    existing_parent_instructions = parent_instr_response["instructions_text"]
    print_success("親Playbookの現在の指示を取得しました。", indent=1)

    # 新しく追加する指示を生成
    child_name = child_playbook_info["display_name"]
    child_description = f"ユーザーの要望が「{child_name}」に関連する場合、このPlaybookを実行します。" # サンプルの説明文
    instruction_to_add = f"\n- - ${{PLAYBOOK:{child_name}}} - {child_description}"
    new_parent_instructions = existing_parent_instructions + instruction_to_add

    print_info_block("更新後の親Playbookの指示プレビュー", new_parent_instructions, indent=1)

    # 親Playbookを更新
    print_process("親Playbookを更新しています...", indent=1)
    update_instr_payload = {
        "project_id": DF_PROJECT_ID, "location_id": DF_LOCATION_ID, "agent_id": DF_AGENT_ID,
        "playbook_id": parent_playbook_id,
        "new_instructions_text": new_parent_instructions
    }
    update_response = make_api_request(f"/{APP_PREFIX}/dialogflow/update_playbook_instructions", update_instr_payload, indent=1)

    if update_response:
        print_success("親Playbookの更新に成功しました。", indent=1)
        if "new_instructions_preview" in update_response:
            print_info_block("Final Updated Instructions", update_response['new_instructions_preview'], indent=1)
    else:
        print_warning("親Playbookの更新に失敗しました。", indent=1)


if __name__ == "__main__":
    print("Integration Service API サンプルスクリプトを開始します。")
    print(f"対象APIサーバー: {BASE_URL}")

    # ==============================================================================
    # <<< 実行するフローを選択 >>>
    # 実行したいフローの関数のコメントを解除してください。
    # ==============================================================================

    # --- フロー1: BigQueryからElasticsearchへのデータ同期 ---
    # run_sync_bq_to_es_flow()

    # --- フロー2: ToolとPlaybookの連続作成&親Playbookへの追加 ---
    # このフローは、スキーマ生成→Tool作成→Playbook作成→親への追加、を全て行います。
    run_tool_and_playbook_creation_flow()

    # --- (オプション) フロー3: 作成済みのPlaybookを親に追加する ---
    # `run_tool_and_playbook_creation_flow`の実行後に、再度別の親に追加したい場合などに使用します。
    # child_info = {
    #     "playbook_id": "ここに作成済みの子PlaybookのID",
    #     "display_name": "ここに作成済みの子Playbookの表示名"
    # }
    # another_parent_id = "別の親PlaybookのID"
    # add_playbook_to_parent_flow(parent_playbook_id=another_parent_id, child_playbook_info=child_info)


    print("\nスクリプトの実行が完了しました。")

人気記事

  • BigQueryの無料枠を活用しよう!制限と注意点、活用方法を解説
  • BigQueryでエラー解決!よくあるエラーと対処法
  • BigQueryのレベル別学習リソースまとめ!初心者から上級者まで役立つ情報源
  • 【SUUMOスクレイピング】Struccleで物件データを全件収集
  • BigQuery入門!無料データでSQLの基本文字列関数をマスター
当ブログのコーディング実行環境設定
よかったらシェアしてね!
  • URLをコピーしました!
  • URLをコピーしました!
目次
カテゴリー
  • AI_Agent (137)
    • Agent開発 (137)
  • BigQuery (100)
    • BigQueryTips (11)
    • BigQueryでデータ分析 (49)
    • BigQueryのFAQ (1)
    • BigQuery入門 (8)
    • BigQuery学習教材 (22)
    • BigQuery導入ガイド (3)
    • BigQuery最新情報 (3)
    • BigQuery活用事例 (4)
  • Struccle (163)
    • Struccleでスクレイピング (10)
      • suumoの物件データを収集&分析 (1)
      • アニマルジョブの電話番号、メールアドレスを全件収集 (1)
      • データ集計 (6)
      • ホットペッパービューティーのヘアサロンデータを収集&分析 (1)
      • 食べログの飲食店データを収集&分析 (1)
    • Struccleデータ料金事例 (152)
      • 商品分析 (15)
      • 営業リスト (98)
      • 競合分析&市場調査 (58)
      • 自動車 (11)
      • 自社活用 (7)
    • Struccle活用企業様の紹介 (1)
  • 当ブログのコーディング実行環境設定 (4)
目次