こんにちは。AIチームの干飯(@hosimesi11_)です。今回はAI Shiftの新規サービスであるAI Messanger SummaryのMLパイプラインの監視についてまとめたいと思います。
具体的にはExitHandlerで外部ツールとの連携をしつつ、詳細なメトリクスの監視はGoogle Cloud Managed Service for Prometheus + Cloud Monitoringで構成しました。
AI Messenger Summaryはコールセンターにおける、オペレーターと顧客の会話内容の要約サービスになります(プレスリリース)。コールセンターでは、会話内容をまとめるといったアフターコールワークに多くの時間が割かれるという課題があります。この状況を解決するため、AI Messenger Summaryはオペレーターと顧客の会話データや録音データを音声認識し、大規模言語モデル(LLM)で処理して要約し、導入企業に合わせたフォーマットで出力するようなサービスとなっています。
MLを扱う音声認識やLLMを使った要約処理をStepとみなし、全ての実行をワークフローツールで制御しています。Argo Workflowsをワークフローツールとして使用し、並列化やリトライを制御しています。1つのワークフローに対する処理のステップ数はワークフローによりますが、10~20ステップほどあります。
Argo Workflowsの構成は以下を参考にしてください。
https://www.ai-shift.co.jp/techblog/4380
AI Messenger Summaryのインフラ基盤にはGoogle Cloudを採用しています。アプリケーションはGKE上に構築しています。
立ち上げ期のプロダクトのため、開発を行うエンジニアは数人程度で、監視に関してはソフトウェアエンジニア 1人、MLエンジニア 1人で監視していました。
そのため監視項目はあまりリッチにはせず、最低限のJobの成否とサーバーエラーのみをモニタリングしていました。ここでは通常のCloud LoggingのアラートとArgo WorkflowsのExitHandlerというものを使い、ステータスの連携とSlackへの通知を実行していました。ExitHandlerは成功または失敗に関係なく、ワークフローの最後に常に実行されるテンプレートです。
ExitHandlerの使用例としては以下のようなものがあります。
具体的には以下のように使用し、Slack通知とPubSubへのステータス通知をしていました。
Slack通知用のWorkflowTemplateです。
apiVersion: argoproj.io/v1alpha1kind: WorkflowTemplatemetadata: name: slack-notification-step namespace: sample-namespacespec: serviceAccountName: sample-sa templates: - name: slack-notification nodeSelector: cloud.google.com/gke-nodepool: sumple-node-pool inputs: parameters: - name: status - name: title - name: env - name: url - name: workflow-id - name: duration - name: config - name: message script: command: - sh image: sample-image source: | FAILURE_MESSAGES=$(echo {{inputs.parameters.message}} | awk -F"[,:}]" '{for(i=1;i<=NF;i++){if($i~/'message'\042/){print $(i+1); exit}}}' | tr -d '"') PAYLOAD="{\"title\":\"{{inputs.parameters.title}}\",\"config\":\"{{inputs.parameters.config}}\",\"duration\":\"{{inputs.parameters.duration}}s\",\"id\":\"{{inputs.parameters.workflow-id}}\",\"status\":\"{{inputs.parameters.status}}\",\"env\":\"{{inputs.parameters.env}}\", \"url\":\"{{inputs.parameters.url}}\", \"message\":\"$FAILURE_MESSAGES\"}" curl -X POST -H 'Content-type: application/json' --data "$PAYLOAD" $SLACK_WEBHOOK_URL
Slack通知の実際の画面です。
slack_notification
しかしSlack通知のステップで落ちた場合、通知が上手くいかないといった課題がありました。ログの場所やアラートの探し方も属人化していたため、チームで運用できる状況になっていませんでした。
ワークフロー自体が動いていない時やArgo Serverの負荷が上がってエラーになった場合にも気付けないため、これらも監視できるようなシステムを作ることに決めました。またエラーではありませんが、特定のステップで時間がかかりすぎてユーザーの操作に影響をきたしている場合なども気付ける必要があります。
システム運用についてのゴールを以下と定めました。
監視システムを設計するために、まず必要な監視項目を洗い出しました。全てのメトリクスを監視すると、重要なデータが見落とされる可能性があるため、必要なメトリクスを厳選することが求められました。また、少人数での運用を実現するためにも、この選別作業は重要でした。そこで、ビジネスにおける重要度が高い項目を中心に、必須とオプションの2つに分けてメトリクスをリスト化しました。これにより、監視すべき重要な項目を明確にし、効率的なシステム運用を実現することを目指しました。
上記を満たすようなツール選定、モニタリング方法を調査しました。
ワークフローの監視ツール選択の基準としては以下の3つの観点で考えました。
この観点で調査すると候補は以下になりました。
モニタリング基盤自体に運用工数を割きたくなかったため、ログ取得部分はDatadog AgentかManaged Prometheusに絞りました。どちらもやりたいことを達成できますが、結果的にGoogle Cloudで完結してコストが安く、運用がしやすいGoogle Cloud Managed Service for Prometheus + Cloud Monitoringを使用しました。
GKEではManaged Prometheusは、以下のバージョンでデフォルトで有効になっています。
AI Shiftでは、ソフトウェアエンジニアが主にCloud Monitoringをシステム監視に利用しています。このため、MLエンジニアも同様にCloud Monitoringを使用することで、全員が使い慣れたツールで運用できることを目指しました。
Cloud Monitoringはアプリケーションとインフラストラクチャのパフォーマンス、可用性、健全性の可視化を提供します。取得したメトリクスに対して閾値を設けることでアラートを上げたり、外部ツールとの連携も可能になります。
Prometheusは、オープンソースのシステム監視とアラートツールです。Google Cloud Managed Service for PrometheusはフルマネージドなPrometheusでMonarchアーキテクチャ上に構築されています。料金も安く、100万サンプルで0.06ドル程度になっています。Argo Workflowsではデフォルトでworkflow-controllerの/metrics
からPrometheus形式のメトリクスを吐き出しています。
Argo WorkflowsがPrometheus形式で取れるメトリクスには以下の2種類があります。
コントローラメトリクス
コントローラメトリクスはコントローラの状態を通知するメトリクスであり、サーバの状態等を自動的に<host>:9090/metrics
に吐き出します。
現在取れるメトリクスは以下のとおりです。
cronworkflows_triggered_total
gauge
error_count
k8s_request_total
k8s_request_duration
is_leader
log_messages
operation_duration_seconds
pods_gauge
pod_missing
pod_pending_count
pods_total_count
queue_adds_count
queue_depth_gauge
queue_duration
queue_latency
queue_longest_running
queue_retries
queue_unfinished_work
total_count
version
workers_busy_count
workflow_condition
workflowtemplate_runtime
workflowtemplate_triggered_total
Prometheusの他、OpenTelemetryへのメトリクスの出力が可能です。
カスタムメトリクス
カスタムメトリクスはユーザー側で定義できる一連のワークフローの状態を通知するメトリクスです。ステップごとの実行時間などはこちらで定義することになります。
Argo WorkflowsのメトリクスをPrometheusに出力しようと思うと、PodMonitoringと呼ばれるCRDを定義する必要があります。Cluster全体のPodをモニタリングしたい場合はClusterPodMonitoringを使うことができます。
apiVersion: monitoring.googleapis.com/v1kind: PodMonitoringmetadata: name: argo-workflows-monitoring namespace: sample-namespacespec: selector: matchLabels: app: workflow-controller endpoints: - port: metrics interval: 30s timeout: 10s metricRelabeling: - action: keep regex: sample.* sourceLabels: [__name__]
設定としてはmetricsのエンドポイントに30秒ごとにメトリクスを取りに行ってます。ここで重要になるのはmetricRelabelingです。こちらを設定しない場合、全てのメトリクスを取得してPrometheusに送ってしまうので想定以上にコストがかかる可能性があります。こちらで取りたいメトリクスのみに絞って取ることをおすすめします。
そしてWorkflow ControllerのConfigMapにモニタリングに関する設定を追加します。
下記ではmetricsのTTL(Time To Live)を10分としています。
apiVersion: v1kind: ConfigMapmetadata: name: workflow-controller-configmap namespace: sample-namespacedata: metricsConfig: | enabled: true metricsTTL: "10m"
そして、カスタムメトリクスを取りたいため、各WorkflowTemplateにmetricsプロパティを追加します。こちらではステップごとの実行時間と実行の成否、リトライ数をメトリクスとして送っています。
apiVersion: argoproj.io/v1alpha1kind: WorkflowTemplatemetadata: name: sample-step namespace: sample-namespacespec: serviceAccountName: sample-sa templates: - name: sample-step nodeSelector: cloud.google.com/gke-nodepool: sample-node-pool inputs: parameters: - name: task-config container: image: image_uri command: ["python", "main.py"] metrics: prometheus: - name: step_duration help: duration of current step labels: - key: template_name value: sample-step gauge: value: "{{workflow.duration}}" - name: step_result help: result of step executed lastly labels: - key: template_name value: sample-step - key: status value: "{{workflow.status}}" gauge: value: "1" - name: step_retry_count help: retry count of step labels: - key: template_name value: sample-step gauge: value: "{{retries}}"
これらをapplyするだけで自動的にCloud Monitoring上からメトリクスを取れるようになるので、そこからはPromQL等で可視化していくことになります。
上記を設定後作成したダッシュボードは以下のようになりました。月2ドル前後で必要なメトリクスの監視ができているため、コスト的にも納得できるものになりました。
今回はArgo Workflowsで構築したLLMを使った要約サービスのMLパイプラインの監視にManaged PrometheusとCloud Monitoringを使用した事例を紹介しました。少人数でGKE上に構築したサービスを運用する場合、Managed Prometheus + Cloud Monitoringは有力な候補になりうると思いました。低コストで上手くマネージドサービスを使用しつつ、必要なサーバメトリクスを取れるメリットはかなり大きいと思います。今回はどちらかというとソフトウェア側の監視になり、LLMの出力自体の監視やMLの監視はまだまだ発展途上ですが、今後このあたりのML監視にも力を入れていきたいと思います。