やりたいこと
データを読み込むだけで、以下が自動で行われる仕組みを構築したい。
- 読み込んだデータがElasticsearchに自動登録される
- データと指示内容がGeminiに渡され、PlaybookやTool/Schemaが自動生成される
- 生成された内容が自動でPlaybookやToolに反映され、管理用エージェントと紐づいて構成される
前回の調査日記
あわせて読みたい


AI Agent開発日記 2025/06/17
やりたいこと データを読み込むだけで、以下が自動で行われる仕組みを構築したい。 読み込んだデータがElasticsearchに自動登録される データと指示内容がGeminiに渡さ...
目次
InstructionsとPlaybookを作成した後、そのPlaybookの情報を引き継ぎ、そのまま管理用Playbookに追加できるようにしたい。
- やりたいこと
- 過去に作成した下記コードに加えて作成したplaybookをその管理用playbookに追加する機構を作成したい
- bq > elastic同期
openapi and tool作成
instructions and playbook作成
- bq > elastic同期
- main.py t.pyとは別の親Playbook用Instructionsファイルに、新たに作成したPlaybookを下記の形式で追加し、その内容を親Playbookに反映させる手法のコードを作成する
- – – 曲を検索するplaybookです
– – – ${PLAYBOOK:有名な曲検索_playbook}
- – – 曲を検索するplaybookです
- ローカルの親Playbook用Instructionsファイルに新たに作成したPlaybookの情報を追加し、その内容を親Playbookに反映させる方法ではなく、親Playbookの現在のInstructionsを取得して、そこに新たな子Playbookを追加して反映させる方法の方が適していそうなので、その方法を採用することにして、コードを編集する
- 以下のコードにより、やりたかったことを達成出来た
- 過去に作成した下記コードに加えて作成したplaybookをその管理用playbookに追加する機構を作成したい
現状のコード
t.py
# t.py (サンプル版 - 親Playbook更新機能改善版, argparse default設定)
import requests
import json
import os
import argparse
try:
import vars
except ImportError:
vars = None
print("警告: vars.py が見つかりません。APIキーが設定されない可能性があります。")
# --- グローバル設定 ---
PORT = os.getenv("PORT", "8080")
BASE_URL = f"http://localhost:{PORT}"
APP_PREFIX = "integration_service_sample" # main.py の APP_NAME に合わせる
# --- デフォルトテスト設定値 (サンプル用) ---
DEFAULT_BQ_DATASET = "your_bq_dataset"
DEFAULT_BQ_TABLE = "your_bq_table"
DEFAULT_ES_URL = "http://localhost:9200" # ローカルESを想定
DEFAULT_ES_INDEX_BQ = "my_sample_bq_index"
DEFAULT_ES_INDEX_SCHEMA = "my_sample_schema_index"
DEFAULT_DOC_EXAMPLES_SCHEMA = [
{"id": "item-001", "name": "Sample Product A", "value": 100, "is_active": True,
"specs": {"color": "blue", "size": "M"}, "categories": ["cat1", "cat2"]},
{"id": "item-002", "description": "Another sample item.", "tags": ["sample", "test"],
"inventory_count": 50}
]
DEFAULT_GEMINI_MODEL = "gemini-1.5-flash" # または "gemini-1.5-pro" など
DEFAULT_ES_INDEX_PLAYBOOK_INSTRUCTIONS = "my_sample_instructions_index"
DEFAULT_DOC_EXAMPLES_PLAYBOOK_INSTRUCTIONS = [
{"record_id": "rec-abc", "category": "Category1", "data_value": 123.45, "processed": False,
"timestamp": "2024-01-15T10:30:00Z", "source_system": "SystemA"},
{"record_id": "rec-xyz", "notes": "Some notes here.", "priority": 3,
"status_code": "pending", "related_ids": ["rel-001", "rel-002"]}
]
DEFAULT_CUSTOM_TOOL_NAME_PLAYBOOK_INSTRUCTIONS = "GenericSearchTool"
DEFAULT_DF_PROJECT_ID = "your-gcp-project-id" # ここにご自身のGCPプロジェクトIDを設定
DEFAULT_DF_LOCATION_ID = "global" # または "us-central1" など、エージェントのリージョン
DEFAULT_DF_AGENT_ID = "your-dialogflow-agent-id" # ここにご自身のエージェントIDを設定
DEFAULT_PARENT_PLAYBOOK_ID = "your-parent-playbook-id" # 親PlaybookのID (存在する場合)
DEFAULT_GATC_TOOL_DISPLAY_NAME_BASE = "SampleGeneratedTool"
DEFAULT_GATC_TOOL_DESCRIPTION = "Tool with AI-generated schema (via script sample)."
DEFAULT_GACP_PLAYBOOK_DISPLAY_NAME_BASE = "SampleDataSearchPlaybook"
DEFAULT_GACP_PLAYBOOK_GOAL = "Search for sample data based on user criteria."
DEFAULT_DF_REFERENCED_TOOL_IDS = ["your-tool-id-1", "your-tool-id-2"] # 実際のツールIDに置き換えてください
# --- ヘルパー関数 ---
def make_api_request(endpoint_path, payload, method="POST", timeout=240, expected_response_key=None):
"""APIリクエストを行い、レスポンスを処理する共通関数"""
url = f"{BASE_URL}{endpoint_path}"
print(f"リクエスト: {method} {url}")
log_payload = payload.copy()
if "elasticsearch_encoded_api_key" in log_payload:
log_payload["elasticsearch_encoded_api_key"] = "****REDACTED_API_KEY****"
# print(f"ペイロード:\n{json.dumps(log_payload, indent=2, ensure_ascii=False)}") # 必要に応じてコメント解除
response = None
try:
if method.upper() == "POST":
response = requests.post(url, json=payload, timeout=timeout)
else:
print(f"エラー: 未対応のHTTPメソッドです: {method}")
return None
response.raise_for_status()
print(f"レスポンス: {response.status_code}")
response_json = response.json()
if expected_response_key:
if expected_response_key in response_json:
print(f"\n--- レスポンスキー '{expected_response_key}' の内容 ---")
if expected_response_key in ["playbook_instructions_markdown", "instructions_text",
"new_instructions_preview"]:
print(response_json[expected_response_key])
else:
print(json.dumps(response_json[expected_response_key], indent=2, ensure_ascii=False))
print(f"--- '{expected_response_key}' ここまで ---")
else:
print(f"警告: レスポンスに期待されるキー '{expected_response_key}' が見つかりませんでした。")
print("レスポンス全体:")
print(json.dumps(response_json, indent=2, ensure_ascii=False))
return response_json
except requests.exceptions.HTTPError as http_err:
print(f"エラー: HTTPエラーが発生しました: {http_err}")
if response is not None:
print(f"ステータスコード (エラー時): {response.status_code}")
try:
print(f"レスポンスボディ (エラー時): {json.dumps(response.json(), indent=2, ensure_ascii=False)}")
except json.JSONDecodeError:
print(f"レスポンスボディ (エラー時, 非JSON): {response.text}")
except requests.exceptions.RequestException as req_err:
print(f"エラー: リクエストエラーが発生しました: {req_err}")
except Exception as e:
print(f"エラー: 予期せぬエラーが発生しました: {e}")
return None
def check_api_key(key_name, placeholder_value="YOUR_KEY_HERE", key_type="API"):
key_value = getattr(vars, key_name, None) if vars else None
key_present = key_value and key_value != placeholder_value and "YOUR_" not in str(key_value).upper()
if not key_present:
print(f"警告: vars.py の {key_name} が正しく設定されていない、またはプレースホルダーのままのようです。")
print(f" この{key_type}関連のテストに失敗する可能性が高いです。vars.pyを適切に設定してください。")
return key_present
def check_dialogflow_ids(project_id, agent_id, skip_prompt):
is_placeholder_project = "your-gcp-project-id" in project_id.lower()
is_placeholder_agent = "your-dialogflow-agent-id" in agent_id.lower()
if is_placeholder_project or is_placeholder_agent:
warning_msg = "警告: Dialogflowの "
if is_placeholder_project: warning_msg += "project_id "
if is_placeholder_project and is_placeholder_agent: warning_msg += "および "
if is_placeholder_agent: warning_msg += "agent_id "
warning_msg += "がプレースホルダーのままです。ご自身の値に置き換えてください。"
print(warning_msg)
if not (skip_prompt or input(
"この設定でDialogflow関連テストを続行しますか? (yes/no): ").strip().lower() in [
"yes", "y"]):
print("テストをスキップしました。")
return False
return True
def check_playbook_tool_ids(tool_ids, skip_prompt):
if not tool_ids or any("your-tool-id" in tid.lower() for tid in tool_ids): # プレースホルダーチェック
print("警告: Playbook作成に使用するtool_idsがプレースホルダーのままか、空です。")
print(
" DEFAULT_DF_REFERENCED_TOOL_IDS または --gacp_tool_ids オプションで実際のツールIDを設定してください。")
if not (skip_prompt or input("このtool_idsでPlaybook作成を続行しますか? (yes/no): ").strip().lower() in ["yes",
"y"]):
print("テストをスキップしました。")
return False
return True
def prompt_for_action(prompt_message, skip_prompt):
"""汎用的な確認プロンプト"""
if skip_prompt:
print(f"{prompt_message} (自動実行 - yes)")
return True
else:
user_input = input(f"{prompt_message} (yes/no): ").strip().lower()
return user_input in ["yes", "y"]
def print_test_header(title):
print(f"\n--- テスト開始: {title.upper()} ---")
# --- テスト関数 ---
def test_sync_bq_to_es(
dataset_id=DEFAULT_BQ_DATASET, table_id=DEFAULT_BQ_TABLE,
es_url=DEFAULT_ES_URL, es_index=DEFAULT_ES_INDEX_BQ,
skip_prompt=False
):
endpoint_path = f"/{APP_PREFIX}/data/sync_bq_to_es"
# vars.py から ELASTIC_CLOUD_ENCODED_API_KEY を取得する想定
# 実際のキーは vars.py に "YOUR_ELASTIC_API_KEY_HERE" 以外の値を設定
elastic_api_key_name = "ELASTIC_CLOUD_ENCODED_API_KEY" # vars.py でのキー名
elastic_api_key_placeholder = "YOUR_ELASTIC_API_KEY_HERE"
elastic_api_key = getattr(vars, elastic_api_key_name, None) if vars else None
if not (elastic_api_key and elastic_api_key != elastic_api_key_placeholder and "YOUR_" not in str(elastic_api_key).upper()) and \
"localhost" not in es_url and "127.0.0.1" not in es_url:
print(f"警告: Elasticsearch APIキー ({elastic_api_key_name}) がvars.pyに未設定かプレースホルダーのままで、ES URLもローカルホストではありません。")
if not (skip_prompt or input("この設定でテストを続行しますか? (yes/no): ").strip().lower() in ["yes", "y"]):
print("テストをスキップしました。")
return
payload = {
"dataset_id": dataset_id, "table_id": table_id,
"elasticsearch_url": es_url, "elasticsearch_index": es_index,
"elasticsearch_encoded_api_key": elastic_api_key if (elastic_api_key and elastic_api_key != elastic_api_key_placeholder) else None,
}
make_api_request(endpoint_path, payload, timeout=180)
print("--- BigQuery -> Elasticsearch 同期テスト終了 ---\n")
def test_generate_schema_and_create_tool_interactive(
es_index_for_schema_gen=DEFAULT_ES_INDEX_SCHEMA,
doc_examples_for_schema_gen=DEFAULT_DOC_EXAMPLES_SCHEMA,
gemini_model_for_schema_gen=DEFAULT_GEMINI_MODEL,
df_project_id_interactive=DEFAULT_DF_PROJECT_ID,
df_location_id_interactive=DEFAULT_DF_LOCATION_ID,
df_agent_id_interactive=DEFAULT_DF_AGENT_ID,
df_tool_display_name_base_interactive=DEFAULT_GATC_TOOL_DISPLAY_NAME_BASE,
df_tool_description_interactive=DEFAULT_GATC_TOOL_DESCRIPTION,
skip_prompts_interactive=False
):
print_test_header("インタラクティブ: OpenAPIスキーマ生成 & Dialogflow Tool作成")
if not check_api_key("GEMINI_API_KEY", "YOUR_GEMINI_API_KEY_HERE", "Gemini"): return None
if not check_dialogflow_ids(df_project_id_interactive, df_agent_id_interactive,
skip_prompts_interactive): return None
if not skip_prompts_interactive:
print("このテストはユーザーの確認が必要です。")
generated_openapi_schema_obj = None
max_retries = 3
retry_count = 0
while retry_count < max_retries:
print(f"\nステップ {retry_count + 1}/{max_retries}: OpenAPIスキーマを生成中...")
schema_payload = {
"elasticsearch_index_name": es_index_for_schema_gen,
"elasticsearch_document_examples": doc_examples_for_schema_gen,
"gemini_model_name": gemini_model_for_schema_gen
}
schema_response_json = make_api_request(f"/{APP_PREFIX}/generate/openapi_schema", schema_payload, timeout=180)
if schema_response_json: # Assuming the response itself is the schema object
generated_openapi_schema_obj = schema_response_json
print("\n--- 生成されたOpenAPIスキーマ ---")
print(json.dumps(generated_openapi_schema_obj, indent=2, ensure_ascii=False))
print("--- スキーマここまで ---")
if skip_prompts_interactive:
user_choice = "yes"
else:
while True:
user_choice = input(
"\nこのスキーマでDialogflow Toolを作成しますか? (yes/no/retry_schema): ").strip().lower()
if user_choice in ["yes", "y", "no", "n", "retry_schema", "r"]: break
print("無効な入力です。'yes', 'no', または 'retry_schema' を入力してください。")
if user_choice in ["yes", "y"]:
break
elif user_choice in ["no", "n"]:
print("ユーザーによりTool作成がキャンセルされました。")
return None
elif user_choice in ["retry_schema", "r"]:
retry_count += 1
if retry_count >= max_retries:
print("スキーマ生成の最大再試行回数に達しました。処理を中止します。")
return None
print("スキーマ生成を再試行します...")
continue
else:
print("OpenAPIスキーマの生成に失敗しました。")
if retry_count < max_retries - 1 and not skip_prompts_interactive:
if input("スキーマ生成を再試行しますか? (yes/no): ").strip().lower() in ["yes", "y"]:
retry_count += 1
continue
else:
print("処理を中止します。")
return None
elif skip_prompts_interactive:
print("スキーマ生成失敗のため処理を中止します (プロンプトスキップ)。")
return None
else:
print("スキーマ生成失敗のため処理を中止します。")
return None
if not generated_openapi_schema_obj:
print("有効なスキーマが得られませんでした。Tool作成を中止します。")
return None
print("\nステップ 2: Dialogflow Toolを作成中...")
clean_es_index_name = es_index_for_schema_gen.replace('-', '_').replace('.', '_')
final_tool_name = f"{df_tool_display_name_base_interactive}_{clean_es_index_name}"
tool_payload = {
"project_id": df_project_id_interactive,
"location_id": df_location_id_interactive,
"agent_id": df_agent_id_interactive,
"tool_display_name": final_tool_name,
"tool_description": df_tool_description_interactive,
"openapi_schema_string": json.dumps(generated_openapi_schema_obj) # スキーマ全体を文字列として渡す
}
tool_creation_response = make_api_request(f"/{APP_PREFIX}/dialogflow/create_tool", tool_payload, timeout=120,
expected_response_key="tool_id")
if tool_creation_response and "tool_id" in tool_creation_response:
print(
f"Dialogflow Tool '{tool_creation_response.get('display_name')}' (ID: {tool_creation_response['tool_id']}) が正常に作成されました。")
print("--- インタラクティブ: OpenAPIスキーマ生成 & Dialogflow Tool作成 終了 ---")
return {"tool_id": tool_creation_response['tool_id'],
"display_name": tool_creation_response.get('display_name')}
else:
print("Dialogflow Toolの作成に失敗しました。")
print("--- インタラクティブ: OpenAPIスキーマ生成 & Dialogflow Tool作成 終了 ---")
return None
def test_generate_instructions_and_create_playbook_interactive(
es_index_for_instr_gen=DEFAULT_ES_INDEX_PLAYBOOK_INSTRUCTIONS,
doc_examples_for_instr_gen=DEFAULT_DOC_EXAMPLES_PLAYBOOK_INSTRUCTIONS,
gemini_model_for_instr_gen=DEFAULT_GEMINI_MODEL,
tool_name_for_instr_gen=DEFAULT_CUSTOM_TOOL_NAME_PLAYBOOK_INSTRUCTIONS,
df_project_id_interactive=DEFAULT_DF_PROJECT_ID,
df_location_id_interactive=DEFAULT_DF_LOCATION_ID,
df_agent_id_interactive=DEFAULT_DF_AGENT_ID,
df_playbook_display_name_base_interactive=DEFAULT_GACP_PLAYBOOK_DISPLAY_NAME_BASE,
df_playbook_goal_interactive=DEFAULT_GACP_PLAYBOOK_GOAL,
df_tool_ids_interactive=DEFAULT_DF_REFERENCED_TOOL_IDS,
skip_prompts_interactive=False
):
print_test_header("インタラクティブ: Playbook Instructions生成 & Dialogflow Playbook作成")
if not check_api_key("GEMINI_API_KEY", "YOUR_GEMINI_API_KEY_HERE", "Gemini"): return None
if not check_dialogflow_ids(df_project_id_interactive, df_agent_id_interactive,
skip_prompts_interactive): return None
if not check_playbook_tool_ids(df_tool_ids_interactive, skip_prompts_interactive): return None
if not skip_prompts_interactive:
print("このテストはユーザーの確認が必要です。")
generated_instructions_markdown = None
max_retries = 3
retry_count = 0
while retry_count < max_retries:
print(f"\nステップ {retry_count + 1}/{max_retries}: Playbook Instructionsを生成中...")
instr_payload = {
"elasticsearch_index_name": es_index_for_instr_gen,
"elasticsearch_document_examples": doc_examples_for_instr_gen,
"gemini_model_name": gemini_model_for_instr_gen,
"tool_name_in_playbook": tool_name_for_instr_gen
}
instr_response_json = make_api_request(f"/{APP_PREFIX}/generate/playbook_instructions", instr_payload,
timeout=240, expected_response_key="playbook_instructions_markdown")
if instr_response_json and "playbook_instructions_markdown" in instr_response_json:
generated_instructions_markdown = instr_response_json["playbook_instructions_markdown"]
if skip_prompts_interactive:
user_choice = "yes"
else:
print("\n--- 生成されたPlaybook Instructions (Markdown) ---")
print(generated_instructions_markdown)
print("--- Instructionsここまで ---")
while True:
user_choice = input(
"\nこのInstructionsでDialogflow Playbookを作成しますか? (yes/no/retry_instructions): ").strip().lower()
if user_choice in ["yes", "y", "no", "n", "retry_instructions", "r"]: break
print("無効な入力です。'yes', 'no', または 'retry_instructions' を入力してください。")
if user_choice in ["yes", "y"]:
break
elif user_choice in ["no", "n"]:
print("ユーザーによりPlaybook作成がキャンセルされました。")
return None
elif user_choice in ["retry_instructions", "r"]:
retry_count += 1
if retry_count >= max_retries:
print("Instructions生成の最大再試行回数に達しました。処理を中止します。")
return None
print("Instructions生成を再試行します...")
continue
else:
print("Playbook Instructionsの生成に失敗しました。")
if retry_count < max_retries - 1 and not skip_prompts_interactive:
if input("Instructions生成を再試行しますか? (yes/no): ").strip().lower() in ["yes", "y"]:
retry_count += 1
continue
else:
print("処理を中止します。")
return None
elif skip_prompts_interactive:
print("Instructions生成失敗のため処理を中止します (プロンプトスキップ)。")
return None
else:
print("Instructions生成失敗のため処理を中止します。")
return None
if not generated_instructions_markdown:
print("有効なPlaybook Instructionsが得られませんでした。Playbook作成を中止します。")
return None
print("\nステップ 2: Dialogflow Playbook を作成中...")
clean_es_index_name = es_index_for_instr_gen.replace('-', '_').replace('.', '_')
final_playbook_name = f"{df_playbook_display_name_base_interactive}_{clean_es_index_name}"
playbook_payload = {
"project_id": df_project_id_interactive,
"location_id": df_location_id_interactive,
"agent_id": df_agent_id_interactive,
"playbook_display_name": final_playbook_name,
"playbook_goal": df_playbook_goal_interactive,
"playbook_instruction_text": generated_instructions_markdown,
"tool_ids": [tid for tid in df_tool_ids_interactive if "your-tool-id" not in tid.lower()] # プレースホルダーを除外
}
playbook_creation_response = make_api_request(f"/{APP_PREFIX}/dialogflow/create_playbook", playbook_payload,
timeout=120, expected_response_key="playbook_id")
if playbook_creation_response and "playbook_id" in playbook_creation_response:
playbook_id = playbook_creation_response['playbook_id']
display_name = playbook_creation_response.get('display_name', final_playbook_name)
print(f"Dialogflow Playbook '{display_name}' (ID: {playbook_id}) が正常に作成されました。")
print("--- インタラクティブ: Playbook Instructions生成 & Dialogflow Playbook作成 終了 ---")
return {"playbook_id": playbook_id, "display_name": display_name}
else:
print("Dialogflow Playbookの作成に失敗しました。")
print("--- インタラクティブ: Playbook Instructions生成 & Dialogflow Playbook作成 終了 ---")
return None
def add_child_playbooks_to_parent_interactive(
parent_playbook_id,
created_child_playbooks,
df_project_id,
df_location_id,
df_agent_id,
skip_prompts
):
if not parent_playbook_id or "your-parent-playbook-id" in parent_playbook_id.lower():
print("親Playbook IDが指定されていないか、プレースホルダーのままのため、追加処理をスキップします。")
return
if not created_child_playbooks:
print("追加対象の子Playbookがありません。")
return
print_test_header(f"親Playbook (ID: {parent_playbook_id}) への子Playbook追加処理")
print(f"親Playbook (ID: {parent_playbook_id}) のInstructionsを取得中...")
get_instr_payload = {
"project_id": df_project_id,
"location_id": df_location_id,
"agent_id": df_agent_id,
"playbook_id": parent_playbook_id
}
parent_instr_response = make_api_request(
f"/{APP_PREFIX}/dialogflow/get_playbook_instructions",
get_instr_payload,
timeout=120,
expected_response_key="instructions_text"
)
if not (parent_instr_response and "instructions_text" in parent_instr_response):
print(f"親Playbook (ID: {parent_playbook_id}) のInstructions取得に失敗しました。更新は行われません。")
return
existing_parent_instructions = parent_instr_response["instructions_text"]
print(f"取得した親PlaybookのInstructions:\n---\n{existing_parent_instructions}\n---")
instructions_to_add_list = []
for child_playbook_info in created_child_playbooks:
child_id = child_playbook_info["playbook_id"]
child_name = child_playbook_info["display_name"]
prompt_msg = (f"\n子Playbook '{child_name}' (ID: {child_id}) を"
f"親Playbook (ID: {parent_playbook_id}) のInstructionsに追加しますか?")
if prompt_for_action(prompt_msg, skip_prompts):
if skip_prompts:
child_description = f"This is an auto-generated description for '{child_name}' Playbook."
print(f"説明 (自動入力): {child_description}")
else:
while True:
child_description = input(
f"子Playbook '{child_name}' (ID: {child_id}) の説明を入力してください: ").strip()
if child_description:
break
print("説明は空にできません。再度入力してください。")
# Dialogflow CXではPlaybookを参照する際にPlaybook IDを使用します。
# 表示名での参照は通常サポートされていません。
# ここでは child_id を使用して正しいフォーマットにします。
string_for_one_child = f"\n- - ${{PLAYBOOK:{child_id}}} - {child_description}"
instructions_to_add_list.append(string_for_one_child)
print(f"追加予定のInstruction行: {string_for_one_child.strip()}")
else:
print(f"子Playbook '{child_name}' (ID: {child_id}) の親Playbookへの追加はスキップされました。")
if not instructions_to_add_list:
print("親Playbookに追加する子Playbookが選択されませんでした。更新処理をスキップします。")
return
new_parent_instructions = existing_parent_instructions
for item_to_add in instructions_to_add_list:
new_parent_instructions += item_to_add
print(f"\n結合後の新しい親PlaybookのInstructionsプレビュー:\n---\n{new_parent_instructions}\n---")
if not new_parent_instructions.strip(): # 空のInstructionsは更新しない
print("エラー: 新しい親PlaybookのInstructions内容が空です。更新をスキップします。")
return
update_confirm_msg = "上記のプレビュー内容で親PlaybookのInstructionsを更新しますか?"
if prompt_for_action(update_confirm_msg, skip_prompts):
print(f"親Playbook (ID: {parent_playbook_id}) のInstructionsを更新中...")
update_instr_payload = {
"project_id": df_project_id,
"location_id": df_location_id,
"agent_id": df_agent_id,
"playbook_id": parent_playbook_id,
"new_instructions_text": new_parent_instructions
}
update_response = make_api_request(
f"/{APP_PREFIX}/dialogflow/update_playbook_instructions",
update_instr_payload,
timeout=120,
expected_response_key="updated_playbook_name_resource"
)
if update_response and "updated_playbook_name_resource" in update_response:
print(f"親Playbook (ID: {parent_playbook_id}) のInstructions更新に成功しました。")
if "new_instructions_preview" in update_response:
print(f"更新後のInstructions (APIからのプレビュー):\n{update_response['new_instructions_preview']}")
else:
print(f"親Playbook (ID: {parent_playbook_id}) のInstructions更新に失敗しました。")
else:
print("親PlaybookのInstructions更新はキャンセルされました。")
print(f"--- 親Playbook (ID: {parent_playbook_id}) への子Playbook追加処理 終了 ---")
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Integration Service API (Sample) テストスクリプト")
parser.add_argument(
"--test",
choices=["sync", "generate_and_create_tool", "generate_and_create_playbook", "all"],
nargs="+",
help="実行するテストを選択 (複数指定可)",
default=["all"] # デフォルトは全テスト実行
)
parser.add_argument("-y", "--yes", action="store_true",
help="全ての確認プロンプトをスキップし、自動で 'yes' と回答します")
# Dialogflow関連の引数 (デフォルト値はプレースホルダー)
parser.add_argument("--df_project_id", default=DEFAULT_DF_PROJECT_ID,
help=f"Dialogflow Project ID (デフォルト: {DEFAULT_DF_PROJECT_ID})")
parser.add_argument("--df_location_id", default=DEFAULT_DF_LOCATION_ID,
help=f"Dialogflow Agent Location ID (デフォルト: {DEFAULT_DF_LOCATION_ID})")
parser.add_argument("--df_agent_id", default=DEFAULT_DF_AGENT_ID,
help=f"Dialogflow Agent ID (デフォルト: {DEFAULT_DF_AGENT_ID})")
parser.add_argument("--parent_playbook_id",
default=DEFAULT_PARENT_PLAYBOOK_ID,
help=f"子Playbookを追加する先の親PlaybookのID (デフォルト: {DEFAULT_PARENT_PLAYBOOK_ID})")
# Tool生成関連の引数
parser.add_argument("--gact_es_index", default=DEFAULT_ES_INDEX_SCHEMA,
help=f"Tool作成テスト用ESインデックス (デフォルト: {DEFAULT_ES_INDEX_SCHEMA})")
parser.add_argument("--gact_tool_name_base", default=DEFAULT_GATC_TOOL_DISPLAY_NAME_BASE,
help=f"Tool作成テスト用ツール名のベース名 (デフォルト: {DEFAULT_GATC_TOOL_DISPLAY_NAME_BASE})")
# Playbook生成関連の引数
parser.add_argument("--gacp_es_index", default=DEFAULT_ES_INDEX_PLAYBOOK_INSTRUCTIONS,
help=f"Playbook作成テスト用ESインデックス (デフォルト: {DEFAULT_ES_INDEX_PLAYBOOK_INSTRUCTIONS})")
parser.add_argument("--gacp_playbook_name_base", default=DEFAULT_GACP_PLAYBOOK_DISPLAY_NAME_BASE,
help=f"Playbook作成テスト用Playbook名のベース名 (デフォルト: {DEFAULT_GACP_PLAYBOOK_DISPLAY_NAME_BASE})")
parser.add_argument("--gacp_playbook_goal", default=DEFAULT_GACP_PLAYBOOK_GOAL,
help=f"Playbook作成テスト用Playbookのゴール (デフォルト: {DEFAULT_GACP_PLAYBOOK_GOAL})")
parser.add_argument("--gacp_tool_ids", nargs="*", default=DEFAULT_DF_REFERENCED_TOOL_IDS,
help=f"Playbook作成テスト用参照ツールIDリスト (デフォルト: {' '.join(DEFAULT_DF_REFERENCED_TOOL_IDS)})")
# BigQuery/Elasticsearch関連の引数
parser.add_argument("--bq_dataset", default=DEFAULT_BQ_DATASET, help=f"BigQuery Dataset ID (デフォルト: {DEFAULT_BQ_DATASET})")
parser.add_argument("--bq_table", default=DEFAULT_BQ_TABLE, help=f"BigQuery Table ID (デフォルト: {DEFAULT_BQ_TABLE})")
parser.add_argument("--es_url", default=DEFAULT_ES_URL, help=f"Elasticsearch URL (デフォルト: {DEFAULT_ES_URL})")
parser.add_argument("--es_index_bq", default=DEFAULT_ES_INDEX_BQ, help=f"Elasticsearch index for BQ sync (デフォルト: {DEFAULT_ES_INDEX_BQ})")
args = parser.parse_args()
print(f"テスト対象サービスURL: {BASE_URL}")
print(f"実行対象テスト: {', '.join(args.test)}")
if args.yes:
print("注意: 全ての確認プロンプトはスキップされます。")
if args.parent_playbook_id and "your-parent-playbook-id" not in args.parent_playbook_id.lower():
print(f"親Playbook ID (generate_and_create_playbookテスト後に使用): {args.parent_playbook_id}")
else:
print("親Playbook IDは指定されていないか、プレースホルダーのままです。子Playbookの追加処理は行われない可能性があります。")
tests_to_run_flags = {
"sync": False,
"generate_and_create_tool": False,
"generate_and_create_playbook": False
}
if "all" in args.test:
for key in tests_to_run_flags: tests_to_run_flags[key] = True
else:
for test_name in args.test:
if test_name in tests_to_run_flags:
tests_to_run_flags[test_name] = True
else:
print(f"警告: 不明なテスト名 '{test_name}' は無視されます。")
print("\nテストを開始します...")
print("注意: このスクリプトはAPIリクエストを実行し、Dialogflowリソース等を作成/変更する可能性があります。")
print(" テスト環境やサンドボックス環境での実行を強く推奨します。")
print(" vars.py にGEMINI_API_KEYやELASTIC_CLOUD_ENCODED_API_KEYを正しく設定してください。")
print(" また、Dialogflow関連のID (project_id, agent_idなど) をご自身のものに置き換えてください。")
created_playbooks_for_parent = []
if tests_to_run_flags["sync"]:
print_test_header("BigQueryからElasticsearchへのデータ同期")
if prompt_for_action("BigQuery -> Elasticsearch 同期テストを実行しますか?", args.yes):
test_sync_bq_to_es(
dataset_id=args.bq_dataset,
table_id=args.bq_table,
es_url=args.es_url,
es_index=args.es_index_bq,
skip_prompt=args.yes
)
else:
print("スキップされました。")
if tests_to_run_flags["generate_and_create_tool"]:
print_test_header("インタラクティブ: OpenAPIスキーマ生成 & Dialogflow Tool作成")
prompt_message = f"インタラクティブ: OpenAPIスキーマ生成 & Dialogflow Tool作成テストを実行しますか? (Agent: {args.df_agent_id})"
if prompt_for_action(prompt_message, args.yes):
test_generate_schema_and_create_tool_interactive(
es_index_for_schema_gen=args.gact_es_index,
gemini_model_for_schema_gen=DEFAULT_GEMINI_MODEL, # モデルは固定で良いか、引数にするか
df_project_id_interactive=args.df_project_id,
df_location_id_interactive=args.df_location_id,
df_agent_id_interactive=args.df_agent_id,
df_tool_display_name_base_interactive=args.gact_tool_name_base,
skip_prompts_interactive=args.yes
)
else:
print("スキップされました。")
if tests_to_run_flags["generate_and_create_playbook"]:
print_test_header("インタラクティブ: Playbook Instructions生成 & Dialogflow Playbook作成")
prompt_message = f"インタラクティブ: Playbook Instructions生成 & Dialogflow Playbook作成テストを実行しますか? (Agent: {args.df_agent_id})"
if prompt_for_action(prompt_message, args.yes):
created_playbook_info = test_generate_instructions_and_create_playbook_interactive(
es_index_for_instr_gen=args.gacp_es_index,
gemini_model_for_instr_gen=DEFAULT_GEMINI_MODEL,
tool_name_for_instr_gen=DEFAULT_CUSTOM_TOOL_NAME_PLAYBOOK_INSTRUCTIONS, # 固定か引数か
df_project_id_interactive=args.df_project_id,
df_location_id_interactive=args.df_location_id,
df_agent_id_interactive=args.df_agent_id,
df_playbook_display_name_base_interactive=args.gacp_playbook_name_base,
df_playbook_goal_interactive=args.gacp_playbook_goal,
df_tool_ids_interactive=args.gacp_tool_ids,
skip_prompts_interactive=args.yes
)
if created_playbook_info:
created_playbooks_for_parent.append(created_playbook_info)
else:
print("スキップされました。")
print("\nすべての選択されたテストの実行試行が完了しました。")
if created_playbooks_for_parent and args.parent_playbook_id and "your-parent-playbook-id" not in args.parent_playbook_id.lower():
add_child_playbooks_to_parent_interactive(
parent_playbook_id=args.parent_playbook_id,
created_child_playbooks=created_playbooks_for_parent,
df_project_id=args.df_project_id,
df_location_id=args.df_location_id,
df_agent_id=args.df_agent_id,
skip_prompts=args.yes
)
elif args.parent_playbook_id and "your-parent-playbook-id" not in args.parent_playbook_id.lower() and not created_playbooks_for_parent:
print(
f"\n親Playbook ID ({args.parent_playbook_id}) が使用される予定でしたが、追加対象の子Playbookが作成されませんでした。")
elif (not args.parent_playbook_id or "your-parent-playbook-id" in args.parent_playbook_id.lower()) and created_playbooks_for_parent:
print(
f"\n子Playbookが作成されましたが、親Playbook IDが指定されていないかプレースホルダーのままのため、追加処理は行われませんでした。")
main.py
# main.py (全内容 - 親Playbook Instruction 更新機能追加版 - サンプル化)
import os
import json
import traceback
import uuid
from fastapi import FastAPI, status, Request, HTTPException, APIRouter
from fastapi.responses import JSONResponse
from elasticsearch import Elasticsearch
from pydantic import BaseModel, Field
from typing import List, Dict, Any, Optional
from sub import db # BigQueryクライアント (実際のプロジェクトでは適切に設定ください)
# vars.py からAPIキーを読み込む試み
try:
import vars
except ImportError:
vars = None
print("警告: vars.py が見つかりません。APIキーが設定されない可能性があります。")
# プロンプトテンプレートをインポート
from prompt_template import PROMPT_FOR_OPENAPI_SCHEMA
from prompt_template_playbook import PROMPT_FOR_PLAYBOOK_INSTRUCTIONS
# --- Gemini ---
import google.generativeai as genai
# --- Dialogflow CX ---
try:
from google.cloud import dialogflowcx_v3beta1
from google.protobuf import field_mask_pb2 # UpdateMask のために必要
DialogflowToolType = dialogflowcx_v3beta1.types.Tool
DialogflowPlaybookType = dialogflowcx_v3beta1.types.Playbook
DialogflowPlaybookInstruction = dialogflowcx_v3beta1.types.Playbook.Instruction # Instruction型
DialogflowPlaybookStep = dialogflowcx_v3beta1.types.Playbook.Step # Step型
except ImportError:
dialogflowcx_v3beta1 = None
field_mask_pb2 = None
DialogflowToolType = Any
DialogflowPlaybookType = Any
DialogflowPlaybookInstruction = Any
DialogflowPlaybookStep = Any
print(
"警告: google-cloud-dialogflow-cx がインストールされていないか、インポートできませんでした。Dialogflow関連機能は利用できません。")
# --- FastAPI アプリケーションとグローバル設定 ---
app = FastAPI(
title="Integration Service API (Sample)",
description="BigQuery to Elasticsearch integration, AI-powered generator for OpenAPI schemas and Playbook Instructions, Dialogflow CX Tool/Playbook creator, and Playbook Instructions updater.",
version="1.0.5-sample"
)
APP_NAME = "integration_service_sample" # アプリケーション名をサンプル用に変更
INFO = "INFO"
ERROR = "ERROR"
WARNING = "WARNING"
# --- Gemini APIキー設定とクライアント ---
GEMINI_API_KEY = None
if vars and hasattr(vars,
"GEMINI_API_KEY") and vars.GEMINI_API_KEY and vars.GEMINI_API_KEY != "YOUR_GEMINI_API_KEY_HERE":
GEMINI_API_KEY = vars.GEMINI_API_KEY
try:
genai.configure(api_key=GEMINI_API_KEY)
print(json.dumps({"severity": INFO, "message": "Gemini APIキーがvars.pyから正常に設定されました。"}))
except Exception as e:
print(json.dumps({"severity": ERROR, "message": f"Gemini APIキーの設定中にエラーが発生しました: {e}"}))
GEMINI_API_KEY = None
else:
missing_key_msg = "vars.pyにGEMINI_API_KEYの定義が見つからないか、プレースホルダーのままです。AI関連機能は利用できません。"
print(json.dumps({"severity": WARNING, "message": missing_key_msg}))
# --- ロギング関数 ---
def log_text(severity: str, message: str, **extra_data):
log_entry = {"severity": severity, "message": message}
if extra_data:
log_entry.update(extra_data)
print(json.dumps(log_entry, ensure_ascii=False))
# --- ヘルパー関数 ---
def to_pascal_case(text: str) -> str:
if not text: return ""
return "".join(word.capitalize() for word in text.replace("-", "_").split("_"))
def generate_schema_markdown_from_examples(doc_examples: List[Dict[str, Any]]) -> str:
if not doc_examples: return "- (データ例が提供されなかったため、スキーマを推測できませんでした)"
field_types = {}
all_keys = set()
for doc in doc_examples: all_keys.update(doc.keys())
for key in sorted(list(all_keys)):
value_type_str = "unknown"
value = None
for doc in doc_examples:
if key in doc and doc[key] is not None: value = doc[key]; break
if value is not None:
if isinstance(value, str):
value_type_str = "string"
elif isinstance(value, bool):
value_type_str = "boolean"
elif isinstance(value, int):
value_type_str = "integer"
elif isinstance(value, float):
value_type_str = "number"
elif isinstance(value, list):
if value and all(isinstance(item, str) for item in value):
value_type_str = "array of string"
elif value and all(isinstance(item, dict) for item in value):
value_type_str = "array of object"
else:
value_type_str = "array"
elif isinstance(value, dict):
value_type_str = "object"
field_types[key] = value_type_str
markdown_lines = [f" - `{field}` ({f_type})" for field, f_type in field_types.items()]
return "\n".join(markdown_lines) if markdown_lines else "- (フィールドが見つかりませんでした)"
async def _call_gemini_api_with_prompt(model_name: str, prompt: str, context: str = "AI") -> str:
if not GEMINI_API_KEY:
log_text(ERROR, f"{context}生成エラー: Gemini APIキーが未設定です。")
raise HTTPException(status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
detail="Gemini APIキーがサーバーに設定されていません。")
try:
model = genai.GenerativeModel(model_name)
log_text(INFO, f"Geminiモデル ({model_name}) を{context}生成用に初期化。", context=context)
log_text(INFO, f"Gemini APIに{context}生成をリクエスト...", context=context)
response = await model.generate_content_async(prompt)
response_text = ""
if response.candidates and response.candidates[0].content and response.candidates[0].content.parts:
response_text = "".join(part.text for part in response.candidates[0].content.parts if hasattr(part, 'text'))
response_text = response_text.strip()
log_text(INFO, f"Geminiから{context}を正常に取得。", context=context, response_length=len(response_text))
return response_text
except Exception as e:
log_text(ERROR, f"{context}生成中に予期せぬエラー。", error=str(e), traceback=traceback.format_exc(),
context=context)
detail_message = f"{context}の生成中にサーバー内部エラーが発生しました。"
if hasattr(e, 'message') and isinstance(getattr(e, 'message'), str):
detail_message += f" Detail: {e.message}"
elif hasattr(e, 'args') and e.args:
detail_message += f" Detail: {str(e.args[0])}"
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=detail_message)
def _clean_ai_response_markdown_block(text: str, block_type: str = "json") -> str:
cleaned_text = text
if cleaned_text.startswith(f"```{block_type}"):
cleaned_text = cleaned_text[len(f"```{block_type}"):].strip()
elif cleaned_text.startswith("```"):
first_newline = cleaned_text.find('\n')
if first_newline != -1: cleaned_text = cleaned_text[first_newline + 1:].strip()
if cleaned_text.endswith("```"): cleaned_text = cleaned_text[:-len("```")].strip()
return cleaned_text
def get_elasticsearch_client(url: str, api_key: Optional[str] = None):
if not url:
log_text(ERROR, "Elasticsearch URLが指定されていません。")
return None
try:
if api_key and api_key != "YOUR_ELASTIC_API_KEY_HERE": # プレースホルダーでないことを確認
log_text(INFO, "Elasticsearch APIキーを使用してクライアントを初期化します。", es_url=url)
es_client = Elasticsearch(url, api_key=api_key)
else:
log_text(INFO, "Elasticsearch APIキーなしでクライアントを初期化します(ローカル接続等を想定)。", es_url=url)
es_client = Elasticsearch(url)
if not es_client.ping():
log_text(WARNING, "Elasticsearchサーバーへのpingに失敗しました。", es_url=url)
else:
log_text(INFO, "Elasticsearchサーバーへのpingに成功しました。", es_url=url)
return es_client
except Exception as e:
log_text(ERROR, "Elasticsearchクライアントの初期化中にエラーが発生しました。", es_url=url, error=str(e))
return None
# --- Pydanticモデル定義 ---
class BqEsRequest(BaseModel):
dataset_id: str = Field(..., example="your_bq_dataset")
table_id: str = Field(..., example="your_bq_table")
elasticsearch_url: str = Field(..., example="http://localhost:9200")
elasticsearch_index: str = Field(..., example="my_sample_bq_index")
elasticsearch_encoded_api_key: Optional[str] = Field(None, example="YOUR_ELASTIC_API_KEY_HERE")
class OpenAPISchemaRequest(BaseModel):
elasticsearch_index_name: str = Field(..., example="my_sample_index")
elasticsearch_document_examples: List[Dict[str, Any]] = Field(..., example=[
{"id": "item-001", "name": "Sample Product A", "value": 100, "is_active": True},
{"id": "item-002", "description": "Another sample item.", "tags": ["sample", "test"]}
])
gemini_model_name: Optional[str] = Field("gemini-1.5-flash", example="gemini-1.5-pro")
class PlaybookInstructionsRequest(BaseModel):
elasticsearch_index_name: str = Field(..., example="my_sample_data_index")
elasticsearch_document_examples: List[Dict[str, Any]] = Field(..., example=[
{"record_id": "rec-abc", "category": "Category1", "data_value": 123.45, "processed": False},
{"record_id": "rec-xyz", "notes": "Some notes here.", "priority": 3}
])
gemini_model_name: Optional[str] = Field("gemini-1.5-flash", example="gemini-1.5-pro")
tool_name_in_playbook: Optional[str] = Field(None, example="GenericSearchTool")
class DialogflowToolCreateRequest(BaseModel):
project_id: str = Field(..., example="your-gcp-project-id")
location_id: str = Field(..., example="global")
agent_id: str = Field(..., example="your-dialogflow-agent-id")
tool_display_name: str = Field(..., example="My Sample API Tool")
tool_description: Optional[str] = Field(None, example="A description for my sample API tool.")
openapi_schema_string: str = Field(..., example="openapi: 3.0.0\ninfo:\n title: Sample API\n version: v1...")
class DialogflowPlaybookCreateRequest(BaseModel):
project_id: str = Field(..., example="your-gcp-project-id")
location_id: str = Field(..., example="global")
agent_id: str = Field(..., example="your-dialogflow-agent-id")
playbook_display_name: str = Field(..., example="My Sample Playbook")
playbook_goal: str = Field(..., example="The goal of this sample playbook is to demonstrate functionality.")
playbook_instruction_text: Optional[str] = Field(None, example="- Step 1: Perform action A.\n- Step 2: Then perform action B.")
tool_ids: Optional[List[str]] = Field(None, example=["your-tool-id-1", "your-tool-id-2"])
class DialogflowPlaybookGetInstructionsRequest(BaseModel):
project_id: str = Field(..., example="your-gcp-project-id")
location_id: str = Field(..., example="global")
agent_id: str = Field(..., example="your-dialogflow-agent-id")
playbook_id: str = Field(..., example="your-playbook-id")
class DialogflowPlaybookGetInstructionsResponse(BaseModel):
playbook_name_resource: str
instructions_text: str
class DialogflowPlaybookUpdateInstructionsRequest(BaseModel):
project_id: str = Field(..., example="your-gcp-project-id")
location_id: str = Field(..., example="global")
agent_id: str = Field(..., example="your-dialogflow-agent-id")
playbook_id: str = Field(..., example="your-playbook-id-to-update")
new_instructions_text: str = Field(...,
example="- Step 1: New instruction.\n- - ${PLAYBOOK:your-child-playbook-id} - Child Playbook Description")
class DialogflowPlaybookUpdateInstructionsResponse(BaseModel):
message: str
updated_playbook_name_resource: str
new_instructions_preview: str
# --- APIRouter の設定 ---
router_bq_es = APIRouter(prefix=f"/{APP_NAME}/data", tags=["Data Integration (BQ to ES)"])
router_ai_generators = APIRouter(prefix=f"/{APP_NAME}/generate", tags=["AI Generators"])
router_dialogflow = APIRouter(prefix=f"/{APP_NAME}/dialogflow", tags=["Dialogflow CX"])
# --- Dialogflow Tool Creation Function ---
def _create_dialogflow_api_tool_internal(project_id: str, location_id: str, agent_id: str,
tool_display_name: str, tool_description: Optional[str],
openapi_yaml_string: str) -> Optional[DialogflowToolType]:
if dialogflowcx_v3beta1 is None:
log_text(ERROR, "Dialogflow CXライブラリがロードされていません。Tool作成はスキップされます。")
raise HTTPException(status_code=status.HTTP_501_NOT_IMPLEMENTED,
detail="Dialogflow CX SDK is not available on the server.")
client = dialogflowcx_v3beta1.ToolsClient()
parent = f"projects/{project_id}/locations/{location_id}/agents/{agent_id}"
openapi_tool_spec = dialogflowcx_v3beta1.types.Tool.OpenApiTool(text_schema=openapi_yaml_string)
tool_config = dialogflowcx_v3beta1.types.Tool(
display_name=tool_display_name,
description=tool_description,
open_api_spec=openapi_tool_spec,
tool_type=dialogflowcx_v3beta1.types.Tool.ToolType.CUSTOMIZED_TOOL
)
try:
log_text(INFO, f"Dialogflow CXにTool作成をリクエスト: {tool_display_name}", project=project_id, agent=agent_id)
created_tool = client.create_tool(parent=parent, tool=tool_config)
log_text(INFO, f"Dialogflow CX Toolが正常に作成されました: {created_tool.name}")
return created_tool
except Exception as e:
log_text(ERROR, "Dialogflow CX Tool作成中にエラー発生。", error=str(e), traceback=traceback.format_exc(),
tool_name=tool_display_name)
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Error creating Dialogflow tool: {e}")
# --- Dialogflow Playbook Creation Function ---
def _create_dialogflow_playbook_internal(
project_id: str, location_id: str, agent_id: str,
playbook_display_name: str, playbook_goal: str,
playbook_instruction_text: Optional[str],
tool_ids: Optional[List[str]]
) -> Optional[DialogflowPlaybookType]:
if dialogflowcx_v3beta1 is None:
log_text(ERROR, "Dialogflow CXライブラリがロードされていません。Playbook作成はスキップされます。")
raise HTTPException(status_code=status.HTTP_501_NOT_IMPLEMENTED,
detail="Dialogflow CX SDK is not available on the server.")
client = dialogflowcx_v3beta1.PlaybooksClient()
parent = f"projects/{project_id}/locations/{location_id}/agents/{agent_id}"
instruction_obj = None
if playbook_instruction_text:
instruction_step = DialogflowPlaybookStep(text=playbook_instruction_text)
instruction_obj = DialogflowPlaybookInstruction(steps=[instruction_step])
referenced_tools_list = []
if tool_ids:
# プレースホルダーでないことを確認するロジックはここには含めず、呼び出し元で処理
referenced_tools_list = [
f"projects/{project_id}/locations/{location_id}/agents/{agent_id}/tools/{tool_id}"
for tool_id in tool_ids if "your-tool-id" not in tool_id.lower() # 簡易的なプレースホルダーチェック
]
playbook_config = dialogflowcx_v3beta1.types.Playbook(
display_name=playbook_display_name,
goal=playbook_goal,
instruction=instruction_obj,
referenced_tools=referenced_tools_list if referenced_tools_list else None
)
try:
log_text(INFO, f"Dialogflow CXにPlaybook作成をリクエスト: {playbook_display_name}", project=project_id,
agent=agent_id)
created_playbook = client.create_playbook(parent=parent, playbook=playbook_config)
log_text(INFO, f"Dialogflow CX Playbookが正常に作成されました: {created_playbook.name}")
return created_playbook
except Exception as e:
log_text(ERROR, "Dialogflow CX Playbook作成中にエラー発生。", error=str(e), traceback=traceback.format_exc(),
playbook_name=playbook_display_name)
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Error creating Dialogflow playbook: {e}")
# --- Dialogflow Playbook Instructions 関連関数 ---
def _get_dialogflow_playbook_instructions_internal(
project_id: str, location_id: str, agent_id: str, playbook_id: str
) -> Dict[str, str]:
if dialogflowcx_v3beta1 is None:
log_text(ERROR, "Dialogflow CXライブラリがロードされていません。Playbook Instructions取得はスキップされます。")
raise HTTPException(status_code=status.HTTP_501_NOT_IMPLEMENTED,
detail="Dialogflow CX SDK is not available on the server.")
client = dialogflowcx_v3beta1.PlaybooksClient()
playbook_resource_name = f'projects/{project_id}/locations/{location_id}/agents/{agent_id}/playbooks/{playbook_id}'
try:
log_text(INFO, f"Dialogflow CXからPlaybookを取得中: {playbook_resource_name}")
playbook = client.get_playbook(name=playbook_resource_name)
log_text(INFO, "Playbook取得成功。")
existing_instructions_text = ""
text_parts = []
if (playbook.instruction and
hasattr(playbook.instruction, 'steps') and
playbook.instruction.steps):
for step in playbook.instruction.steps:
if hasattr(step, 'text') and step.text:
text_parts.append(step.text)
if text_parts:
existing_instructions_text = "\n".join(text_parts)
log_text(INFO,
f"既存のInstructionsテキスト内容(テキストステップのみ)を取得: {len(existing_instructions_text)}文字")
else:
log_text(INFO, "Playbookにテキスト内容を持つInstructionsステップが見つかりませんでした。")
else:
log_text(INFO, "PlaybookにInstructionsまたはそのステップが見つかりませんでした。")
return {
"playbook_name_resource": playbook.name,
"instructions_text": existing_instructions_text
}
except Exception as e:
if "NotFound" in str(type(e)) or "NotFound" in str(e):
log_text(ERROR, f"指定されたPlaybookが見つかりませんでした: {playbook_resource_name}")
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND,
detail=f"Playbook not found: {playbook_resource_name}")
log_text(ERROR, "Dialogflow CX Playbook Instructions取得中にエラー発生。", error=str(e),
traceback=traceback.format_exc(), playbook_name=playbook_resource_name)
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Error getting Playbook Instructions: {e}")
def _update_dialogflow_playbook_instructions_internal(
project_id: str, location_id: str, agent_id: str, playbook_id: str, new_instructions_text: str
) -> DialogflowPlaybookType:
if dialogflowcx_v3beta1 is None or field_mask_pb2 is None:
log_text(ERROR,
"Dialogflow CXライブラリまたはFieldMaskがロードされていません。Playbook Instructions更新はスキップされます。")
raise HTTPException(status_code=status.HTTP_501_NOT_IMPLEMENTED,
detail="Dialogflow CX SDK or FieldMask is not available on the server.")
client = dialogflowcx_v3beta1.PlaybooksClient()
playbook_resource_name = f'projects/{project_id}/locations/{location_id}/agents/{agent_id}/playbooks/{playbook_id}'
try:
log_text(INFO, f"Dialogflow CX Playbook Instructionsを更新します: {playbook_resource_name}")
updated_playbook = dialogflowcx_v3beta1.types.Playbook()
updated_playbook.name = playbook_resource_name
new_step = DialogflowPlaybookStep(text=new_instructions_text)
new_instruction_pb = DialogflowPlaybookInstruction(steps=[new_step])
updated_playbook.instruction = new_instruction_pb
update_mask = field_mask_pb2.FieldMask(paths=['instruction.steps'])
response = client.update_playbook(
playbook=updated_playbook,
update_mask=update_mask
)
log_text(INFO, f"Playbook Instructions更新成功: {response.name}")
return response
except Exception as e:
if "NotFound" in str(type(e)) or "NotFound" in str(e):
log_text(ERROR, f"更新対象のPlaybookが見つかりませんでした: {playbook_resource_name}")
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND,
detail=f"Playbook to update not found: {playbook_resource_name}")
log_text(ERROR, "Dialogflow CX Playbook Instructions更新中にエラー発生。", error=str(e),
traceback=traceback.format_exc(), playbook_name=playbook_resource_name)
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Error updating Playbook Instructions: {e}")
# --- エンドポイント定義 ---
@router_bq_es.post("/sync_bq_to_es")
async def sync_bq_to_es(payload: BqEsRequest):
es_client = None
log_text(INFO, "BQ to ES同期リクエスト受信。", dataset=f"{payload.dataset_id}.{payload.table_id}",
es_target=f"{payload.elasticsearch_url}/{payload.elasticsearch_index}")
if db is None:
log_text(ERROR, "BigQueryクライアント未初期化。")
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="サーバー設定エラー: BigQueryクライアント利用不可。")
try:
if payload.elasticsearch_encoded_api_key and \
payload.elasticsearch_encoded_api_key != "YOUR_ELASTIC_API_KEY_HERE":
log_text(INFO, "Elasticsearch APIキー提供あり。")
es_client = get_elasticsearch_client(payload.elasticsearch_url, payload.elasticsearch_encoded_api_key)
query = f"SELECT * FROM `{payload.dataset_id}.{payload.table_id}`" # GCPプロジェクトIDはBigQueryクライアント設定に依存
log_text(INFO, "BigQueryクエリ実行開始。", query=query)
query_job = db.query(query)
bq_data = [dict(row) for row in query_job.result()]
if not bq_data:
log_text(INFO, "BigQueryテーブルからデータが見つかりませんでした。",
table=f"{payload.dataset_id}.{payload.table_id}")
return JSONResponse(content={"bq_result_count": 0, "elasticsearch_indexing_results": [],
"message": "BigQueryからデータが見つかりませんでした。"},
status_code=status.HTTP_200_OK)
log_text(INFO, f"BigQueryから {len(bq_data)} 件のデータを取得。",
table=f"{payload.dataset_id}.{payload.table_id}")
es_indexing_results = []
if es_client:
log_text(INFO, f"Elasticsearchインデックス {payload.elasticsearch_index} への登録開始。")
for doc in bq_data:
doc_id_candidate = doc.get("id") or doc.get("idx") # common ID fields
doc_id = str(doc_id_candidate if doc_id_candidate is not None else uuid.uuid4())
try:
resp = es_client.index(index=payload.elasticsearch_index, id=doc_id, document=doc)
es_indexing_results.append(
{"id": doc_id, "status": resp.get("result", "success"), "_shards": resp.get("_shards")})
except Exception as es_e:
error_message = f"ドキュメント {doc_id} のインデックス登録失敗: {str(es_e)}"
es_indexing_results.append({"id": doc_id, "status": "failed", "error": error_message})
log_text(ERROR, error_message, doc_id=doc_id, es_index=payload.elasticsearch_index,
error_details=str(es_e))
log_text(INFO, f"{len(es_indexing_results)}件のESインデックス処理完了。")
else:
es_indexing_results.append({"status": "skipped_due_to_es_client_issue",
"message": "ESクライアントが利用できないためインデックス処理をスキップしました。"})
log_text(WARNING, "ESクライアントが利用できないためインデックス処理をスキップ。")
return JSONResponse(
content={"bq_result_count": len(bq_data), "elasticsearch_indexing_results": es_indexing_results,
"message": "処理完了。"}, status_code=status.HTTP_200_OK)
except Exception as e:
log_text(ERROR, "BQ to ES同期処理中に予期せぬエラー。", error=str(e), traceback=traceback.format_exc())
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"サーバー内部エラー: {str(e)}")
finally:
if es_client and hasattr(es_client, 'close'):
try:
es_client.close()
log_text(INFO, f"Elasticsearchクライアント ({payload.elasticsearch_url}) をクローズ。")
except Exception as e_close:
log_text(WARNING, "ESクライアントクローズ中にエラー。", error=str(e_close))
@router_ai_generators.post("/openapi_schema", response_model=Dict[str, Any])
async def generate_openapi_schema_endpoint(payload: OpenAPISchemaRequest):
log_text(INFO, "OpenAPIスキーマ生成リクエスト受信。", index_name=payload.elasticsearch_index_name,
model=payload.gemini_model_name)
try:
json_examples_string = json.dumps(payload.elasticsearch_document_examples, indent=2, ensure_ascii=False)
except TypeError as te:
log_text(ERROR, "ドキュメント例のJSONシリアライズ失敗 (OpenAPI)。", error=str(te))
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST,
detail=f"ドキュメント例をJSONに変換できませんでした: {te}")
pascal_case_index = to_pascal_case(payload.elasticsearch_index_name)
final_prompt = PROMPT_FOR_OPENAPI_SCHEMA.format(index_name=payload.elasticsearch_index_name,
pascal_case_index_name=pascal_case_index,
json_examples_string=json_examples_string)
raw_response_text = await _call_gemini_api_with_prompt(payload.gemini_model_name, final_prompt,
context="OpenAPIスキーマ")
cleaned_response_text = _clean_ai_response_markdown_block(raw_response_text, block_type="json")
try:
openapi_schema_json = json.loads(cleaned_response_text)
log_text(INFO, "OpenAPIスキーマを正常にパース。", index_name=payload.elasticsearch_index_name)
return openapi_schema_json
except json.JSONDecodeError as jde:
log_text(ERROR, "GeminiのOpenAPIスキーマ応答が有効なJSONではありません。", error=str(jde),
raw_response=cleaned_response_text[:500]) # ログに一部応答を含める
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="AIからのOpenAPIスキーマ応答が期待されるJSON形式ではありませんでした。",
headers={"X-AI-Response-Text": cleaned_response_text[:1000]}) # ヘッダーにも一部応答を含める
@router_ai_generators.post("/playbook_instructions", response_model=Dict[str, str])
async def generate_playbook_instructions_endpoint(payload: PlaybookInstructionsRequest):
log_text(INFO, "Playbook Instructions生成リクエスト受信。", index_name=payload.elasticsearch_index_name,
model=payload.gemini_model_name)
try:
json_examples_string = json.dumps(payload.elasticsearch_document_examples, indent=2, ensure_ascii=False)
except TypeError as te:
log_text(ERROR, "ドキュメント例のJSONシリアライズ失敗 (Playbook)。", error=str(te))
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST,
detail=f"ドキュメント例をJSONに変換できませんでした: {te}")
schema_fields_md = generate_schema_markdown_from_examples(payload.elasticsearch_document_examples)
tool_name_base = payload.tool_name_in_playbook
if not tool_name_base:
pascal_index_name = to_pascal_case(payload.elasticsearch_index_name)
tool_name_base = f"{pascal_index_name}SearchTool" # デフォルトのツール名生成
tool_name_placeholder_for_prompt = f"${{TOOL:{tool_name_base}}}"
final_prompt = PROMPT_FOR_PLAYBOOK_INSTRUCTIONS.format(index_name=payload.elasticsearch_index_name,
json_examples_string=json_examples_string,
schema_fields_markdown=schema_fields_md,
tool_name_placeholder=tool_name_placeholder_for_prompt)
raw_response_text = await _call_gemini_api_with_prompt(payload.gemini_model_name, final_prompt,
context="Playbook Instructions")
cleaned_markdown = _clean_ai_response_markdown_block(raw_response_text, block_type="markdown")
log_text(INFO, "Playbook Instructionsを正常に生成。", index_name=payload.elasticsearch_index_name)
return {"playbook_instructions_markdown": cleaned_markdown}
@router_dialogflow.post("/create_tool")
async def create_dialogflow_tool_endpoint(payload: DialogflowToolCreateRequest):
log_text(INFO, "Dialogflow CX Tool作成リクエスト受信。", tool_name=payload.tool_display_name,
project=payload.project_id, agent=payload.agent_id)
if dialogflowcx_v3beta1 is None:
log_text(ERROR, "Dialogflow CX SDKがサーバーで利用できません。")
raise HTTPException(status_code=status.HTTP_501_NOT_IMPLEMENTED,
detail="Dialogflow CX SDK is not available on the server. Please check server logs.")
try:
created_tool = _create_dialogflow_api_tool_internal(
project_id=payload.project_id, location_id=payload.location_id, agent_id=payload.agent_id,
tool_display_name=payload.tool_display_name, tool_description=payload.tool_description,
openapi_yaml_string=payload.openapi_schema_string
)
if created_tool:
tool_type_name = "UNSPECIFIED"
if created_tool.tool_type:
try:
tool_type_name = dialogflowcx_v3beta1.types.Tool.ToolType(created_tool.tool_type).name
except ValueError: # enum値が存在しない場合
log_text(WARNING, f"Unknown tool_type enum value: {created_tool.tool_type}")
tool_type_name = f"UNKNOWN_ENUM_VALUE_{created_tool.tool_type}"
return JSONResponse(
content={"message": "Dialogflow CX Tool created successfully.", "tool_name": created_tool.name,
"display_name": created_tool.display_name, "tool_type": tool_type_name,
"tool_id": created_tool.name.split('/')[-1]}, # tool_id を返す
status_code=status.HTTP_201_CREATED
)
else:
# _create_dialogflow_api_tool_internal が None を返すが例外を発生させないケース (理論上は起こりにくい)
log_text(ERROR, "Dialogflow CX Tool作成に失敗しましたが、例外がキャッチされませんでした。")
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Failed to create Dialogflow tool for an unknown reason.")
except HTTPException: # 既にHTTPExceptionならそのままraise
raise
except Exception as e:
log_text(ERROR, "Dialogflow Tool作成エンドポイントで予期せぬエラー。", error=str(e),
traceback=traceback.format_exc())
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"An unexpected error occurred: {str(e)}")
@router_dialogflow.post("/create_playbook")
async def create_dialogflow_playbook_endpoint(payload: DialogflowPlaybookCreateRequest):
log_text(INFO, "Dialogflow CX Playbook作成リクエスト受信。", playbook_name=payload.playbook_display_name,
project=payload.project_id, agent=payload.agent_id)
if dialogflowcx_v3beta1 is None:
log_text(ERROR, "Dialogflow CX SDKがサーバーで利用できません。")
raise HTTPException(status_code=status.HTTP_501_NOT_IMPLEMENTED,
detail="Dialogflow CX SDK is not available on the server. Please check server logs.")
try:
created_playbook = _create_dialogflow_playbook_internal(
project_id=payload.project_id,
location_id=payload.location_id,
agent_id=payload.agent_id,
playbook_display_name=payload.playbook_display_name,
playbook_goal=payload.playbook_goal,
playbook_instruction_text=payload.playbook_instruction_text,
tool_ids=payload.tool_ids
)
if created_playbook:
return JSONResponse(
content={
"message": "Dialogflow CX Playbook created successfully.",
"playbook_name_resource": created_playbook.name,
"display_name": created_playbook.display_name,
"goal": created_playbook.goal,
"instruction_steps_count": len(
created_playbook.instruction.steps) if created_playbook.instruction and created_playbook.instruction.steps else 0,
"referenced_tools_count": len(
created_playbook.referenced_tools) if created_playbook.referenced_tools else 0,
"playbook_id": created_playbook.name.split('/')[-1] # playbook_id を返す
},
status_code=status.HTTP_201_CREATED
)
else:
log_text(ERROR, "Dialogflow CX Playbook作成に失敗しましたが、例外がキャッチされませんでした。")
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Failed to create Dialogflow playbook for an unknown reason.")
except HTTPException:
raise
except Exception as e:
log_text(ERROR, "Dialogflow Playbook作成エンドポイントで予期せぬエラー。", error=str(e),
traceback=traceback.format_exc())
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"An unexpected error occurred while creating playbook: {str(e)}")
@router_dialogflow.post("/get_playbook_instructions", response_model=DialogflowPlaybookGetInstructionsResponse)
async def get_dialogflow_playbook_instructions_endpoint(payload: DialogflowPlaybookGetInstructionsRequest):
log_text(INFO, "Dialogflow CX Playbook Instructions取得リクエスト受信。",
project=payload.project_id, agent=payload.agent_id, playbook_id=payload.playbook_id)
try:
result = _get_dialogflow_playbook_instructions_internal(
project_id=payload.project_id,
location_id=payload.location_id,
agent_id=payload.agent_id,
playbook_id=payload.playbook_id
)
return result
except HTTPException:
raise
except Exception as e:
log_text(ERROR, "Playbook Instructions取得エンドポイントで予期せぬエラー。", error=str(e),
traceback=traceback.format_exc())
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"An unexpected error occurred: {str(e)}")
@router_dialogflow.post("/update_playbook_instructions", response_model=DialogflowPlaybookUpdateInstructionsResponse)
async def update_dialogflow_playbook_instructions_endpoint(payload: DialogflowPlaybookUpdateInstructionsRequest):
log_text(INFO, "Dialogflow CX Playbook Instructions更新リクエスト受信。",
project=payload.project_id, agent=payload.agent_id, playbook_id=payload.playbook_id)
try:
updated_playbook = _update_dialogflow_playbook_instructions_internal(
project_id=payload.project_id,
location_id=payload.location_id,
agent_id=payload.agent_id,
playbook_id=payload.playbook_id,
new_instructions_text=payload.new_instructions_text
)
return DialogflowPlaybookUpdateInstructionsResponse(
message="Dialogflow CX Playbook Instructions updated successfully.",
updated_playbook_name_resource=updated_playbook.name,
new_instructions_preview=payload.new_instructions_text[:200] + "..." if len(
payload.new_instructions_text) > 200 else payload.new_instructions_text
)
except HTTPException:
raise
except Exception as e:
log_text(ERROR, "Playbook Instructions更新エンドポイントで予期せぬエラー。", error=str(e),
traceback=traceback.format_exc())
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"An unexpected error occurred: {str(e)}")
# ルーターをアプリケーションに含める
app.include_router(router_bq_es)
app.include_router(router_ai_generators)
app.include_router(router_dialogflow)
# --- グローバル例外ハンドラ ---
@app.exception_handler(Exception)
async def generic_exception_handler(request: Request, exc: Exception):
if isinstance(exc, HTTPException): raise exc # FastAPI自身のHTTPExceptionは再raise
log_text(ERROR, "グローバル例外ハンドラで未処理の例外をキャッチ。", exception_type=type(exc).__name__,
error_message=str(exc), path=request.url.path, traceback_info=traceback.format_exc())
return JSONResponse(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
content={"error": "予期せぬ内部エラーが発生しました。",
"detail": "サーバー側で問題が発生しました。詳細はサーバーログを確認してください。"})
if __name__ == "__main__":
import uvicorn
server_port = int(os.getenv("PORT", "8080")) # PORT環境変数からポート番号を取得
log_text(INFO, f"開発サーバーを http://0.0.0.0:{server_port} で起動します...")
uvicorn.run(app, host="0.0.0.0", port=server_port)