やりたいこと
データを読み込むだけで、以下が自動で行われる仕組みを構築したい。
- 読み込んだデータがElasticsearchに自動登録される
- データと指示内容がGeminiに渡され、PlaybookやTool/Schemaが自動生成される
- 生成された内容が自動でPlaybookやToolに反映され、管理用エージェントと紐づいて構成される
前回の調査日記
あわせて読みたい


AI Agent開発日記 2025/06/28
やりたいこと データを読み込むだけで、以下が自動で行われる仕組みを構築したい。 読み込んだデータがElasticsearchに自動登録される データと指示内容がGeminiに渡さ...
目次
コンソールに表示されるテキストの順序と内容を、分かりやすくなるように整理、調整する
- やりたいこと
- t.py実行時のテキスト全てを整理する
- 現状
- 下記コードで検証に成功した
コード
# -*- coding: utf-8 -*-
"""
Integration Service API サンプル実行スクリプト
このスクリプトは、カスタムAPIサーバー(Integration Service)を介して、
Google Cloud Dialogflow CX の Tool や Playbook をプログラムで操作するサンプルです。
主な機能:
1. BigQuery から Elasticsearch へのデータ同期
2. Elasticsearch のデータ構造から OpenAPI スキーマをAIで生成し、Dialogflow Tool を作成
3. ドキュメント例から Playbook の Instructions (指示) をAIで生成し、Dialogflow Playbook を作成
4. 作成した Playbook を既存の親 Playbook に子として関連付ける
■ 前提条件
1. Python 3.8以上
2. 必要なライブラリのインストール:
pip install requests
3. 実行中の Integration Service API サーバー
(このスクリプトは、デフォルトで http://localhost:8080 にアクセスします)
4. Google Cloud プロジェクトおよび Dialogflow CX エージェントのセットアップ
5. (任意) Elasticsearch クラスタおよび BigQuery データセットのセットアップ
■ 設定方法
1. 下記の「<<< ユーザー設定 >>>」セクションを、ご自身の環境に合わせて編集してください。
- APIキー (GEMINI_API_KEY, ELASTIC_CLOUD_ENCODED_API_KEY)
- Dialogflow関連ID (DF_PROJECT_ID, DF_AGENT_ID など)
- 各機能で使用するデータソース名 (インデックス名、テーブル名など)
2. APIキーは、コードに直接書き込む代わりに、環境変数として設定することも推奨されます。
(例: export GEMINI_API_KEY="your_gemini_api_key_here")
■ 実行方法
1. ターミナルでスクリプトを実行します:
python your_script_name.py
2. スクリプト下部の `if __name__ == "__main__":` ブロックで、実行したいフローの
コメントを解除して実行してください。
"""
import requests
import json
import os
import unicodedata
# --- グローバル設定 ---
# APIサーバーが稼働しているポートに合わせて変更してください
PORT = os.getenv("PORT", "8080")
BASE_URL = f"http://localhost:{PORT}"
APP_PREFIX = "integration_service"
# ==============================================================================
# <<< ユーザー設定 >>>
# ご自身の環境に合わせて、以下のプレースホルダーを実際の値に書き換えてください。
# 環境変数に設定している場合は、自動的にその値が使用されます。
# ==============================================================================
# --- APIキー設定 ---
# Gemini APIキー。Playbook/ToolのAI生成機能で必須です。
GEMINI_API_KEY = os.getenv("GEMINI_API_KEY", "YOUR_GEMINI_API_KEY_HERE")
# Elasticsearch APIキー (Base64エンコード済み)。データ同期機能で使用します。
ELASTIC_CLOUD_ENCODED_API_KEY = os.getenv("ELASTIC_CLOUD_ENCODED_API_KEY", "YOUR_ELASTIC_API_KEY_HERE")
# --- Dialogflow設定 ---
DF_PROJECT_ID = os.getenv("DF_PROJECT_ID", "your-gcp-project-id")
DF_LOCATION_ID = os.getenv("DF_LOCATION_ID", "global") # 通常は "global" またはリージョン名
DF_AGENT_ID = os.getenv("DF_AGENT_ID", "your-dialogflow-agent-id")
# 新しく作成したPlaybookを追加する先の「親」となるPlaybookのID
PARENT_PLAYBOOK_ID = os.getenv("PARENT_PLAYBOOK_ID", "parent-playbook-id-to-add-children")
# --- データソース設定 (データ同期フロー用) ---
BQ_DATASET = "your_test_dataset"
BQ_TABLE = "your_source_table"
ES_URL = "http://localhost:9200" # またはElastic Cloudのエンドポイント
ES_INDEX_FOR_SYNC = "synced_data_index"
# --- Tool/Playbook自動生成フロー用設定 ---
# Tool/Playbookの基になるデータソースのインデックス名
ES_INDEX_FOR_GENERATION = "my_product_index"
# 上記インデックスに含まれるドキュメントの例 (AIがデータ構造を理解するために使用)
DOC_EXAMPLES_FOR_GENERATION = [
{"item_id": "AB-123", "productName": "Super Duper Gadget", "price_usd": 199.99, "inStock": True,
"details": {"color": "red", "weight_grams": 250}, "tags": ["electronics", "new", "popular"]},
{"item_id": "XY-789", "productName": "Basic Utility Tool", "price_usd": 25.00, "inStock": False,
"description": "A simple tool for everyday tasks.", "manufacturer_code": "MFR-002"}
]
# 生成するTool/Playbookの表示名や説明 (スクリプト内で自動的にインデックス名が付与されます)
GENERATED_TOOL_DISPLAY_NAME_BASE = "Generated_Tool"
GENERATED_TOOL_DESCRIPTION = "AIが生成したスキーマに基づく汎用検索ツール(スクリプト経由)。"
GENERATED_PLAYBOOK_DISPLAY_NAME_BASE = "物件検索Playbook"
GENERATED_PLAYBOOK_GOAL = "ユーザーの指示に基づいて適切な物件を提案する"
GEMINI_MODEL = "gemini-1.5-flash"
# --- スタンドアロンPlaybook作成フロー用設定 ---
# Playbookが参照する、既存のToolのIDリスト
STANDALONE_PLAYBOOK_REFERENCED_TOOL_IDS = ["existing-tool-id-1", "existing-tool-id-2"]
# ==============================================================================
# <<< 設定ここまで >>>
# ==============================================================================
# --- コンソール出力ヘルパー ---
def print_flow_header(title):
"""フローの開始をコンソールに表示する"""
print(f"\n{'='*10} {title} {'='*10}")
def print_flow_footer(success=True):
"""フローの終了をコンソールに表示する"""
status = "完了" if success else "失敗または中断"
print(f"========== フロー{status} ==========")
def print_step_header(message):
"""フロー内のステップの開始を表示する"""
print(f"\n--- {message} ---")
def print_process(message, indent=0):
"""進行中の処理を表示する"""
prefix = " " * indent
print(f"{prefix}⏳ {message}...")
def print_api_result(path, success, indent=0):
"""APIリクエストの結果を表示する"""
prefix = " " * indent
status = "[OK]" if success else "[ERROR]"
print(f"{prefix}--> API Call: {path} {status}")
def print_success(message, indent=0):
"""成功メッセージを表示する"""
prefix = " " * indent
print(f"{prefix}✅ {message}")
def print_warning(message, indent=0):
"""警告メッセージを表示する"""
prefix = " " * indent
print(f"{prefix}⚠️ {message}")
def print_info_block(title, content, indent=0):
"""整形された情報ブロックを表示する"""
prefix = " " * indent
print(f"{prefix}--- {title} ---")
# contentがNoneの場合を考慮
content_str = str(content) if content is not None else "N/A"
for line in content_str.splitlines():
print(f"{prefix} {line}")
print(f"{prefix}{'-' * (len(title) + 8)}")
# --- API通信ヘルパー ---
def make_api_request(endpoint_path, payload, method="POST", timeout=240, indent=0):
"""
指定されたエンドポイントにAPIリクエストを送信する
Args:
endpoint_path (str): APIのエンドポイントパス (例: /integration_service/data/sync)
payload (dict): 送信するJSONペイロード
method (str): HTTPメソッド (現在はPOSTのみ対応)
timeout (int): リクエストのタイムアウト時間(秒)
indent (int): コンソール出力のインデントレベル
Returns:
dict or None: 成功した場合はレスポンスのJSON、失敗した場合はNone
"""
url = f"{BASE_URL}{endpoint_path}"
response = None
try:
if method.upper() == "POST":
# APIキーをペイロードに追加
if GEMINI_API_KEY and "gemini_api_key" not in payload:
payload["gemini_api_key"] = GEMINI_API_KEY
if ELASTIC_CLOUD_ENCODED_API_KEY and "elasticsearch_encoded_api_key" not in payload:
payload["elasticsearch_encoded_api_key"] = ELASTIC_CLOUD_ENCODED_API_KEY
response = requests.post(url, json=payload, timeout=timeout)
else:
print_warning(f"Unsupported HTTP method: {method}", indent)
return None
response.raise_for_status()
print_api_result(endpoint_path, success=True, indent=indent)
return response.json()
except requests.exceptions.HTTPError as http_err:
print_api_result(endpoint_path, success=False, indent=indent)
print_warning(f"HTTP Error: {http_err}", indent + 1)
if response is not None:
try:
error_body = json.dumps(response.json(), indent=2, ensure_ascii=False)
print_info_block("Error Response Body", error_body, indent + 1)
except json.JSONDecodeError:
print_info_block("Error Response Body (Non-JSON)", response.text, indent + 1)
except requests.exceptions.RequestException as req_err:
print_api_result(endpoint_path, success=False, indent=indent)
print_warning(f"Request Error: {req_err}", indent + 1)
except Exception as e:
print_api_result(endpoint_path, success=False, indent=indent)
print_warning(f"An unexpected error occurred: {e}", indent + 1)
return None
# --- 設定チェックヘルパー ---
def check_api_key(key_value, key_name):
"""APIキーが設定されているかチェックする"""
if not key_value or "YOUR_" in key_value:
print_warning(f"設定値 '{key_name}' が未設定です。このキーを必要とする処理は失敗します。")
return False
return True
def check_dialogflow_ids():
"""Dialogflow関連のIDが設定されているかチェックする"""
valid = True
if "your-gcp-project-id" in DF_PROJECT_ID:
print_warning("DF_PROJECT_ID がデフォルト値のままです。")
valid = False
if "your-dialogflow-agent-id" in DF_AGENT_ID:
print_warning("DF_AGENT_ID がデフォルト値のままです。")
valid = False
return valid
# --- メインフロー関数 ---
def run_sync_bq_to_es_flow():
"""BigQueryからElasticsearchへのデータ同期フローを実行する"""
print_flow_header("データ同期 (BigQuery → Elasticsearch)")
if not check_api_key(ELASTIC_CLOUD_ENCODED_API_KEY, "ELASTIC_CLOUD_ENCODED_API_KEY"):
print_flow_footer(success=False)
return
payload = {
"dataset_id": BQ_DATASET,
"table_id": BQ_TABLE,
"elasticsearch_url": ES_URL,
"elasticsearch_index": ES_INDEX_FOR_SYNC,
}
print_process("データを同期しています")
response = make_api_request(f"/{APP_PREFIX}/data/sync_bq_to_es", payload, timeout=180)
if response:
print_success("データ同期が完了しました")
print_info_block("API Response", response.get('status', 'No details available'))
print_flow_footer(success=True)
else:
print_warning("データ同期に失敗しました。詳細は上記のエラーを確認してください。")
print_flow_footer(success=False)
def run_tool_and_playbook_creation_flow():
"""スキーマ生成からTool作成、Playbook作成、親Playbookへの追加までを一貫して実行する"""
print_flow_header("Tool & Playbook 連続作成フロー")
if not check_api_key(GEMINI_API_KEY, "GEMINI_API_KEY") or not check_dialogflow_ids():
print_flow_footer(success=False)
return
# --- ステップ1: OpenAPIスキーマ生成 ---
print_step_header("ステップ1: OpenAPIスキーマ生成")
print_process("AIにスキーマ生成をリクエストしています...", indent=1)
schema_payload = {
"elasticsearch_index_name": ES_INDEX_FOR_GENERATION,
"elasticsearch_document_examples": DOC_EXAMPLES_FOR_GENERATION,
"gemini_model_name": GEMINI_MODEL
}
generated_schema = make_api_request(f"/{APP_PREFIX}/generate/openapi_schema", schema_payload, indent=1)
if not generated_schema:
print_warning("スキーマ生成に失敗しました。フローを中断します。", indent=1)
print_flow_footer(success=False)
return
print_success("スキーマ生成に成功しました。", indent=1)
print_info_block("Generated OpenAPI Schema", json.dumps(generated_schema, indent=2, ensure_ascii=False), indent=1)
# --- ステップ2: Dialogflow Tool作成 ---
print_step_header("ステップ2: Dialogflow Tool作成")
# インデックス名をサニタイズしてTool名に追加
clean_es_index_name = ES_INDEX_FOR_GENERATION.replace('-', '_').replace('.', '_')
tool_display_name = f"{GENERATED_TOOL_DISPLAY_NAME_BASE}_{clean_es_index_name}"
print_info_block("作成するTool名", tool_display_name, indent=1)
print_process(f"Tool「{tool_display_name}」を作成しています...", indent=1)
tool_payload = {
"project_id": DF_PROJECT_ID,
"location_id": DF_LOCATION_ID,
"agent_id": DF_AGENT_ID,
"tool_display_name": tool_display_name,
"tool_description": GENERATED_TOOL_DESCRIPTION,
"openapi_schema_string": json.dumps(generated_schema)
}
tool_response = make_api_request(f"/{APP_PREFIX}/dialogflow/create_tool", tool_payload, indent=1)
if not (tool_response and "tool_id" in tool_response):
print_warning("Toolの作成に失敗しました。フローを中断します。", indent=1)
print_flow_footer(success=False)
return
created_tool_id = tool_response['tool_id']
created_tool_name = tool_response.get('display_name', tool_display_name)
print_success(f"Tool「{created_tool_name}」(ID: {created_tool_id}) を作成しました。", indent=1)
# --- ステップ3: Dialogflow Playbook作成 ---
print_step_header("ステップ3: 関連Playbook作成")
# Playbook Instructions生成
print_process("AIにPlaybookの指示を生成させています...", indent=1)
instr_payload = {
"elasticsearch_index_name": ES_INDEX_FOR_GENERATION,
"elasticsearch_document_examples": DOC_EXAMPLES_FOR_GENERATION,
"gemini_model_name": GEMINI_MODEL,
"tool_name_in_playbook": created_tool_name
}
instr_response = make_api_request(f"/{APP_PREFIX}/generate/playbook_instructions", instr_payload, indent=1)
if not (instr_response and "playbook_instructions_markdown" in instr_response):
print_warning("Playbookの指示生成に失敗しました。フローを中断します。", indent=1)
print_flow_footer(success=False)
return
generated_instructions = instr_response["playbook_instructions_markdown"]
print_success("Playbookの指示生成に成功しました。", indent=1)
print_info_block("Generated Playbook Instructions", generated_instructions, indent=1)
# Playbook作成
playbook_display_name = f"{GENERATED_PLAYBOOK_DISPLAY_NAME_BASE}_{clean_es_index_name}"
print_info_block("作成するPlaybook名", playbook_display_name, indent=1)
print_process(f"Playbook「{playbook_display_name}」を作成しています...", indent=1)
playbook_payload = {
"project_id": DF_PROJECT_ID,
"location_id": DF_LOCATION_ID,
"agent_id": DF_AGENT_ID,
"playbook_display_name": playbook_display_name,
"playbook_goal": GENERATED_PLAYBOOK_GOAL,
"playbook_instruction_text": generated_instructions,
"tool_ids": [created_tool_id] # 作成したToolのIDを使用
}
playbook_response = make_api_request(f"/{APP_PREFIX}/dialogflow/create_playbook", playbook_payload, indent=1)
if not (playbook_response and "playbook_id" in playbook_response):
print_warning("Playbookの作成に失敗しました。", indent=1)
print_flow_footer(success=False)
return
created_playbook_id = playbook_response['playbook_id']
created_playbook_name = playbook_response.get('display_name', playbook_display_name)
print_success(f"Playbook「{created_playbook_name}」(ID: {created_playbook_id}) を作成しました。", indent=1)
# --- ステップ4: 親Playbookへの追加 ---
if PARENT_PLAYBOOK_ID and "parent-playbook-id" not in PARENT_PLAYBOOK_ID:
add_playbook_to_parent_flow(
parent_playbook_id=PARENT_PLAYBOOK_ID,
child_playbook_info={"playbook_id": created_playbook_id, "display_name": created_playbook_name}
)
else:
print_info_block("Skip", "PARENT_PLAYBOOK_IDが設定されていないため、親Playbookへの追加はスキップします。", indent=0)
print_flow_footer(success=True)
def add_playbook_to_parent_flow(parent_playbook_id, child_playbook_info):
"""指定された子Playbookを親Playbookに追加する"""
print_step_header("ステップ4: 親Playbookへの追加")
if not check_dialogflow_ids(): return
# 親Playbookの現在の指示を取得
print_process(f"親Playbook (ID: {parent_playbook_id}) の情報を取得しています...", indent=1)
get_instr_payload = {
"project_id": DF_PROJECT_ID, "location_id": DF_LOCATION_ID, "agent_id": DF_AGENT_ID,
"playbook_id": parent_playbook_id
}
parent_instr_response = make_api_request(f"/{APP_PREFIX}/dialogflow/get_playbook_instructions", get_instr_payload, indent=1)
if not (parent_instr_response and "instructions_text" in parent_instr_response):
print_warning("親Playbookの指示取得に失敗しました。更新を中止します。", indent=1)
return
existing_parent_instructions = parent_instr_response["instructions_text"]
print_success("親Playbookの現在の指示を取得しました。", indent=1)
# 新しく追加する指示を生成
child_name = child_playbook_info["display_name"]
child_description = f"ユーザーの要望が「{child_name}」に関連する場合、このPlaybookを実行します。" # サンプルの説明文
instruction_to_add = f"\n- - ${{PLAYBOOK:{child_name}}} - {child_description}"
new_parent_instructions = existing_parent_instructions + instruction_to_add
print_info_block("更新後の親Playbookの指示プレビュー", new_parent_instructions, indent=1)
# 親Playbookを更新
print_process("親Playbookを更新しています...", indent=1)
update_instr_payload = {
"project_id": DF_PROJECT_ID, "location_id": DF_LOCATION_ID, "agent_id": DF_AGENT_ID,
"playbook_id": parent_playbook_id,
"new_instructions_text": new_parent_instructions
}
update_response = make_api_request(f"/{APP_PREFIX}/dialogflow/update_playbook_instructions", update_instr_payload, indent=1)
if update_response:
print_success("親Playbookの更新に成功しました。", indent=1)
if "new_instructions_preview" in update_response:
print_info_block("Final Updated Instructions", update_response['new_instructions_preview'], indent=1)
else:
print_warning("親Playbookの更新に失敗しました。", indent=1)
if __name__ == "__main__":
print("Integration Service API サンプルスクリプトを開始します。")
print(f"対象APIサーバー: {BASE_URL}")
# ==============================================================================
# <<< 実行するフローを選択 >>>
# 実行したいフローの関数のコメントを解除してください。
# ==============================================================================
# --- フロー1: BigQueryからElasticsearchへのデータ同期 ---
# run_sync_bq_to_es_flow()
# --- フロー2: ToolとPlaybookの連続作成&親Playbookへの追加 ---
# このフローは、スキーマ生成→Tool作成→Playbook作成→親への追加、を全て行います。
run_tool_and_playbook_creation_flow()
# --- (オプション) フロー3: 作成済みのPlaybookを親に追加する ---
# `run_tool_and_playbook_creation_flow`の実行後に、再度別の親に追加したい場合などに使用します。
# child_info = {
# "playbook_id": "ここに作成済みの子PlaybookのID",
# "display_name": "ここに作成済みの子Playbookの表示名"
# }
# another_parent_id = "別の親PlaybookのID"
# add_playbook_to_parent_flow(parent_playbook_id=another_parent_id, child_playbook_info=child_info)
print("\nスクリプトの実行が完了しました。")