2024.04.26
Developmentこんにちは。
AIチームの干飯(@hosimesi11_)です。今回はAI Shiftで取り組んでいる新規サービスであるAI Messenger Summaryの機械学習パイプラインと、Proof of Concept(PoC)から実際のプロダクトへと展開する過程についてご紹介します。
AI Messenger Summaryはコールセンター事業における、会話内容の要約サービスになります(プレスリリース)。コールセンターでは、顧客との会話内容をまとめるアフターコールワークに多くの時間が割かれるという課題があります。この状況を解決するため、オペレーターと顧客の会話データや録音データを音声認識し、大規模言語モデル(LLM)で処理して要約し、導入企業に合わせたフォーマットで出力するようなサービスとなっています。要約の他、応対品質評価やラベリングなど複数のタスクに対応しています。
AI Messenger Summaryでは、各企業ごとの要約が実現可能かどうかを判別するために、PoC(Proof of Concept)フェーズを設けていました。
初期ではGradioを使った一つのサーバ内で、音声認識や要約処理など全ての処理を同期的に行っていました。処理の例として以下のようなものがあります。
元々データサイエンティスト一人で環境を整備しており、PoC初期では十分な機能を果たせていましたが、導入企業が増えてくるにつれ徐々に課題感が出てきました。音声認識のパラメータの変更や、前処理の変更、複数のプロンプトの試行など個社ごとに試行錯誤する必要があります。また処理の特性上、一ファイルの一回の実行に数十分から数時間かかる場合もあります。PoCを行う企業が増えるにつれ、PoCの負担はさらに増すことが予想されました。
PoCの負担を下げつつ、素早くMVP(Minimum Viable Product)としてリリースできるようにするため、開発フェーズを以下の2つに定義し、段階的に達成するように進めることにしました。
Phase 1. PoCの負担削減と高速化
Phase 2. 実プロダクト実装
Phase 1では、PoCの負担削減と高速化にだけ観点を置き、リッチな構成は取らないようにすることで実装コストを大幅に下げました。具体的には、要約処理などの画面に関係のない処理の中心をCloud Run Jobsに載せて処理を分離しました。これによって下記のことが可能になりました。
処理のリトライや細かい部分での並列実行にworkflowツールを導入したかったのですが、スピード感とPhase 2の実プロダクト実装を踏まえて、スコープ外としました。また今のUIであるGradioはリプレイスされることが予期されたため、手をつけずにジョブの分離に注力しました。
PoCの高速化をしつつ、この段階から実プロダクトの設計も開始しました。こうすることで、大部分の実装を本番時にそのまま再利用でき、実プロダクトのリリースまでの時間を短縮することができました。
Phase 2は実プロダクトの実装になります。実プロダクトでは各企業のCTI(Computer Telephony Integration)やCRM(Customer Relationship Management)と自動連携することになります。さらにPoCの処理も全てこのシステムに載ってくることが予期されました。本番運用とPoCの処理の共通化を考慮するとリトライ処理なども完備したかったため、ワークフローツールを導入しました。インフラ基盤とセットでワークフローツールをいくつか検討しました。
ここでは以下のような方針を採用することを避けました。
これらの方針を避ける理由は次の通りです。
そのため、以下を選定の基準としました。
AI ShiftではクラウドサービスとしてGoogle Cloudを採用しており、ほとんどのアプリケーションがGKE上で動作しています。インフラ基盤の選定に関して、元々は素早く立ち上げるためにCloud Run Jobsを使用する予定でしたが、ワークフローツールとの兼ね合いや音声認識などをGPU上で動かしたい場面が想像できたため、既に動いているGKEを選定しました。他のものに比べて初期実装コストが少し高いですが、チームとしての運用の知見やアセットがある点も含めて選定しました。
ワークフローツールの選定に関しては社内の別プロダクトの方にもヒアリングしつつ、柔軟性や既存システムとの相性、運用ノウハウの点からArgo Workflowsを選定しました。AI ShiftがArgo CDをCDツールとして使用していることも、ソフトウェアエンジニアと一緒に運用する上で大きな利点となりました。
上記を含めて、Phase 2(プロダクト)のインフラ構成は以下になります。
AI Shiftの本番環境のGKEの管理にはKustomizeが使用されているため、検証が終わり次第デプロイするまでの差分を減らすため、ローカルでもKustomizeを使用しました。本番ではCTIなどと自動連携し、PubSubへのパブリッシュをトリガーにワークフローを動かしたかったため、Argo Eventsも使用しています。
Argo Workflowsとは
Kubernetes 上で並列ジョブを調整するためのオープンソースのワークフローツールです。柔軟なDAGを組むことができます。
Argo Eventsとは
Kubernetes 用のイベント駆動型ワークフロー実行ツールで、Webhook・MQなど様々なツールに対応しています。Sensor、EventSource、EventBusの3つのコンポーネントからなります。
まずは、以下のようにTerraformで必要なGoogle Cloudのリソースを作成しています。
# External IP for Argo Workflowsresource "google_compute_global_address" "argo_workflows_external_ip" { name = "argo-workflows-server-ip" address_type = "EXTERNAL" ip_version = "IPV4" project = var.project}# DNS Zonedata "google_dns_managed_zone" "dns_zone" { name = "dns-zone" project = var.project}# DNS Record for Argo Workflowsresource "google_dns_record_set" "argo_workflows" { project = var.dns_management_project name = "sample.${data.google_dns_managed_zone.dns_zone.dns_name}" type = "A" ttl = 300 managed_zone = data.google_dns_managed_zone.dns_zone.name rrdatas = [google_compute_global_address.argo_workflows_external_ip.address] lifecycle { ignore_changes = [rrdatas] }}# Access Policy for Argo Workflowsresource "google_compute_security_policy" "argo_workflows_policy" { name = "${var.env}-argo-workflows-policy" rule { action = "deny(403)" priority = "2147483647" match { versioned_expr = "SRC_IPS_V1" config { src_ip_ranges = ["*"] } } description = "Default" } rule { action = "allow" priority = "1001" match { versioned_expr = "SRC_IPS_V1" config { src_ip_ranges = [ "xxx" ] } } description = "Allow IP Ranges" }}# History DB for Argo Workflowsresource "google_sql_database" "argo_workflows_db" { name = "$argo_workflows_db" instance = var.db_instance_name charset = "utf8mb4" collation = "utf8mb4_general_ci" project = var.project}# Service Account for Argo Workflowsresource "google_service_account" "argo_workflows_sa" { account_id = "argo-workflows-sa" display_name = "argo workflows service account"}# Attach secret manager accessor to Service Account for Argo Workflowsresource "google_project_iam_member" "workflows_secret_manager_iam" { project = var.project role = "roles/secretmanager.secretAccessor" member = "serviceAccount:${google_service_account.argo_workflows_sa.email}"}# Attach pub/sub publisher to Service Account for Argo Workflowsresource "google_project_iam_member" "workflows_pubsub_publisher_iam" { project = var.project role = "roles/pubsub.publisher" member = "serviceAccount:${google_service_account.argo_workflows_sa.email}"}# Attach storage object admin to Service Account for Argo Workflowsresource "google_project_iam_member" "workflows_storage_iam" { project = var.project role = "roles/storage.objectAdmin" member = "serviceAccount:${google_service_account.argo_workflows_sa.email}"}# Attach artifact registry writer to Service Account for Argo Workflowsresource "google_project_iam_member" "workflows_artifact_registry_iam" { project = var.project role = "roles/artifactregistry.writer" member = "serviceAccount:${google_service_account.argo_workflows_sa.email}"}# Attach service account token creator to Service Account for Argo Workflowsresource "google_project_iam_member" "workflows_service_account_token_creator_iam" { project = var.project role = "roles/iam.serviceAccountTokenCreator" member = "serviceAccount:${google_service_account.argo_workflows_sa.email}"}# Attach cloud sql client to Service Account for Argo Workflowsresource "google_project_iam_member" "workflows_cloudsql_client_iam" { project = var.project role = "roles/cloudsql.client" member = "serviceAccount:${google_service_account.argo_workflows_sa.email}"}# Attach Workload Identity to Service Account for Argo Workflowsresource "google_service_account_iam_binding" "argo_workflows_workload_identity" { service_account_id = google_service_account.argo_workflows_sa.name role = "roles/iam.workloadIdentityUser" members = ["serviceAccount:${var.project}.svc.id.goog[argo-workflows/argo-workflows-sa]"]}# Bucket for Argo Workflows Logresource "google_storage_bucket" "argo_workflows_logs_bucket" { name = "argo-workflows-logs" location = var.region project = var.project force_destroy = true}
# Argo Eventsのservice accountresource "google_service_account" "argo_events_sa" { account_id = "argo-events-sa" display_name = "用のargo events用のservice account"}# Attach pub/sub subscriber to Service Account for Argo Eventsresource "google_project_iam_member" "pubsub_subscriber_iam" { project = var.project role = "roles/pubsub.subscriber" member = "serviceAccount:${google_service_account.argo_events_sa.email}"}# Attach pub/sub viewer to Service Account for Argo Eventsresource "google_project_iam_member" "pubsub_viewer_iam" { project = var.project role = "roles/pubsub.viewer" member = "serviceAccount:${google_service_account.argo_events_sa.email}"}# Attach service account token creator to Service Account for Argo Eventsresource "google_project_iam_member" "workflows_service_account_token_creator_iam" { project = var.project role = "roles/iam.serviceAccountTokenCreator" member = "serviceAccount:${google_service_account.argo_events_sa.email}"}# Attach Workload Identity to Service Account for Argo Eventsresource "google_service_account_iam_binding" "argo_events_workload_identity" { service_account_id = google_service_account.argo_events_sa.id role = "roles/iam.workloadIdentityUser" members = ["serviceAccount:${var.project}.svc.id.goog[argo-events/argo-events-sa]"]}
Argo WorkflowsをGKEに構築していきます。(一部抜粋)
まず、以下のページからinstall.yamlをダウンロードし、kustomizeの配下におきます。
https://github.com/argoproj/argo-events/releases
ConfigMapを作成します。ここでは、ワークフローの履歴をMySQLに入れるための設定をしています。また、GCSにもログが上がるように設定しています。
ConfigMap
apiVersion: v1kind: ConfigMapmetadata: name: workflow-controller-configmap namespace: argo-workflowsdata: parallelism: "10" namespace: argo-workflows resourceRateLimit: | limit: 10 burst: 1 persistence: | archive: true archiveTTL: 10d skipMigration: false connectionPool: maxIdleConns: 100 maxOpenConns: 100 nodeStatusOffLoad: true mysql: host: xxxxx port: 3306 database: xxxx tableName: argo_workflows userNameSecret: name: argo-workflows-secret key: MYSQL_USER passwordSecret: name: argo-workflows-secret key: MYSQL_PASSWORD artifactRepository: | archiveLogs: true gcs: bucket: sample-log-bucket keyFormat: "argo\ /{{workflow.creationTimestamp.Y}}\ /{{workflow.creationTimestamp.m}}\ /{{workflow.creationTimestamp.d}}\ /{{workflow.name}}\ /{{pod.name}}"
ここで、Google Cloud側で作成済みのサービスアカウントとKubernetesのサービスアカウントを紐付けます。
apiVersion: v1kind: ServiceAccountmetadata: name: argo-workflows-sa namespace: argo-workflows annotations: iam.gke.io/gcp-service-account: sample-gcp-service-account
Deploymentの定義をします。ここでは--auth-modeはclient、server、ssoが対応しています。
apiVersion: apps/v1kind: Deploymentmetadata: name: argo-server namespace: argo-workflowsspec: strategy: type: RollingUpdate rollingUpdate: maxSurge: 1 maxUnavailable: 0 selector: matchLabels: app: argo-server template: metadata: labels: app: argo-server spec: serviceAccountName: argo-workflows-sa terminationGracePeriodSeconds: 60 nodeSelector: cloud.google.com/gke-nodepool: sample-node-pool containers: - name: argo-server args: ["server", "--secure=false", "--auth-mode=sso"] ports: - containerPort: 2746 livenessProbe: httpGet: path: / port: 2746 scheme: HTTP initialDelaySeconds: 3 periodSeconds: 30 timeoutSeconds: 1 successThreshold: 1 failureThreshold: 3 readinessProbe: httpGet: path: / port: 2746 scheme: HTTP initialDelaySeconds: 3 periodSeconds: 30 timeoutSeconds: 1 successThreshold: 1 failureThreshold: 3 lifecycle: preStop: exec: command: - /bin/sh - -c - sleep 10
apiVersion: v1kind: Servicemetadata: name: argo-server namespace: argo-workflows annotations: cloud.google.com/neg: '{"ingress":true}'spec: type: ClusterIP selector: app: argo-server ports: - name: argo-workflows-ui port: 80 targetPort: 2746 protocol: TCP
Ingressを作成します。ここでは先ほどTerraformで作成済みのExternal IPや、DNSレコードを設定します。
apiVersion: networking.k8s.io/v1kind: Ingressmetadata: name: argo-server-ingress namespace: argo-workflows annotations: kubernetes.io/ingress.global-static-ip-name: sample-ip-name networking.gke.io/managed-certificates: sample-certificatesspec: defaultBackend: service: name: argo-server port: number: 80 rules: - host: sample-domain http: paths: - path: / pathType: Prefix backend: service: name: argo-server port: number: 80
これらをapplyすることで、Argo Serverが起動し、Argo UIが表示されます。
Argo EventsをGKEに構築していきます。(一部抜粋)
まず、以下のページからinstall.yamlをダウンロードし、kustomizeの配下におきます。
https://github.com/argoproj/argo-events/releases
EventBusを定義します。
apiVersion: argoproj.io/v1alpha1kind: EventBusmetadata: name: argo-events-bus namespace: argo-eventsspec: jetstream: version: 2.9.12 replicas: 3 nodeSelector: cloud.google.com/gke-nodepool: sample-node-pool streamConfig: | maxAge: 24h settings: | max_file_store: 1GB startArgs: - "-D"
EventSourceを作成します。ここでは先ほどTerraformで作成した、ワークフローをキックする際に使用するPub/Subを指定します。
apiVersion: argoproj.io/v1alpha1kind: EventSourcemetadata: name: gcp-pubsub namespace: argo-eventsspec: eventBusName: argo-events-bus type: pubsub template: serviceAccountName: argo-events-sa spec: nodeSelector: cloud.google.com/gke-nodepool: sample-node-pool pubSub: workflow-trigger: jsonBody: true projectID: sample-project topic: sample-topic subscriptionID: sample-topic-sub
どのワークフローをキックするかを定義します。ここで、dependenciesはEventSourceと名前を合わせる必要があります。
apiVersion: argoproj.io/v1alpha1kind: Sensormetadata: name: pubsub-sensor namespace: argo-eventsspec: template: serviceAccountName: argo-events-sa eventBusName: argo-events-bus dependencies: - name: sample-trigger-dependency eventSourceName: gcp-pubsub eventName: workflow-trigger triggers: - template: name: trigger-template k8s: operation: create source: resource: apiVersion: argoproj.io/v1alpha1 kind: Workflow metadata: generateName: workflow- namespace: argo-workflows spec: serviceAccountName: argo-workflows-sa workflowTemplateRef: name: sample-workflow arguments: parameters: - name: config valueFrom: path: /event parameters: - src: dependencyName: sample-trigger-dependency dataKey: body dest: spec.arguments.parameters.0.value
これらをapplyすることで、Argo Events用のpodが作成されます。
Argo Events経由でワークフローをキックしたいため、今回はWorkflowTemplateを用いて機械学習のワークフローを定義します。
大まかな処理は以下のようになります。
はじめにentrypoint-jobを定義し、inputやファイルによってワークフローを切り替えるために判定用のjobを一つ入れています。
apiVersion: argoproj.io/v1alpha1kind: WorkflowTemplatemetadata: name: sample-workflow namespace: argo-workflowsspec: templates: - name: main-job nodeSelector: cloud.google.com/gke-nodepool: sample-node-pool inputs: parameters: - name: config steps: - - name: entrypoint-job templateRef: name: entrypoint-job template: entrypoint arguments: parameters: - name: config value: "{{inputs.parameters.config}}" - - name: main-job templateRef: name: main-job template: main-audio - name: exit-handler-job steps: - - name: slack-notification templateRef: name: slack-notification-job template: slack-notification when: "{{workflow.status}} != Succeeded" arguments: parameters: - name: title value: "{{workflow.name}}が失敗しました" :
実際の処理やロジックは割愛しますが、高速化のため複数ステップで並列化させています。また、処理の内容によってはGPUを使用したいため、nodeSelectorによってインスタンスを分けています。
今回構築したワークフローの全体像は以下のようになっています。音声を扱うのでMLロジックが盛りだくさんですが、柔軟なワークフローを組むことができます。
本記事では新規プロダクトであるAI Messenger Summaryのプロジェクトの進め方と、機械学習パイプラインについて紹介しました。AI Messenger Summaryでは機械学習パイプラインにArgo Workflowsを導入しました。すでにKubernetesを運用している方にとっては選択肢になりうると思います。
最後までお読みいただきありがとうございました!