Movatterモバイル変換


[0]ホーム

URL:


株式会社 AI Shift

MENU
CyberAgent
サーバーサイドエンジニアはこちらフロントエンドエンジニアはこちらコンサルティングセールス・PMはこちらカジュアル面談をご希望の方はこちら
  1. TOP
  2. TECH BLOG
  3. Argo WorkflowsとGKEで構築するLLMを使った要約サービスの機械学習パイプライン

Argo WorkflowsとGKEで構築するLLMを使った要約サービスの機械学習パイプライン

こんにちは。

AIチームの干飯(@hosimesi11_)です。今回はAI Shiftで取り組んでいる新規サービスであるAI Messenger Summaryの機械学習パイプラインと、Proof of Concept(PoC)から実際のプロダクトへと展開する過程についてご紹介します。

AI Messenger Summaryとは

AI Messenger Summary

AI Messenger Summaryはコールセンター事業における、会話内容の要約サービスになります(プレスリリース)。コールセンターでは、顧客との会話内容をまとめるアフターコールワークに多くの時間が割かれるという課題があります。この状況を解決するため、オペレーターと顧客の会話データや録音データを音声認識し、大規模言語モデル(LLM)で処理して要約し、導入企業に合わせたフォーマットで出力するようなサービスとなっています。要約の他、応対品質評価やラベリングなど複数のタスクに対応しています。

初期の課題感

AI Messenger Summaryでは、各企業ごとの要約が実現可能かどうかを判別するために、PoC(Proof of Concept)フェーズを設けていました。
初期ではGradioを使った一つのサーバ内で、音声認識や要約処理など全ての処理を同期的に行っていました。処理の例として以下のようなものがあります。

  1. 音声ファイルアップロード
  2. 前処理
  3. 音声認識
  4. 後処理
  5. 要約
Display of Gradio
初期のPoC用のGradioの画面

元々データサイエンティスト一人で環境を整備しており、PoC初期では十分な機能を果たせていましたが、導入企業が増えてくるにつれ徐々に課題感が出てきました。音声認識のパラメータの変更や、前処理の変更、複数のプロンプトの試行など個社ごとに試行錯誤する必要があります。また処理の特性上、一ファイルの一回の実行に数十分から数時間かかる場合もあります。PoCを行う企業が増えるにつれ、PoCの負担はさらに増すことが予想されました。
PoCの負担を下げつつ、素早くMVP(Minimum Viable Product)としてリリースできるようにするため、開発フェーズを以下の2つに定義し、段階的に達成するように進めることにしました。

Phase 1. PoCの負担削減と高速化
Phase 2. 実プロダクト実装

 

プロジェクトの進め方

Phase 1

Phase 1では、PoCの負担削減と高速化にだけ観点を置き、リッチな構成は取らないようにすることで実装コストを大幅に下げました。具体的には、要約処理などの画面に関係のない処理の中心をCloud Run Jobsに載せて処理を分離しました。これによって下記のことが可能になりました。

  1. 複数音声ファイルの処理を並列で同時実行
  2. 処理を分離し、画面から非同期で実行
  3. slackへの失敗通知

処理のリトライや細かい部分での並列実行にworkflowツールを導入したかったのですが、スピード感とPhase 2の実プロダクト実装を踏まえて、スコープ外としました。また今のUIであるGradioはリプレイスされることが予期されたため、手をつけずにジョブの分離に注力しました。

Old System Architecture
元のシステム構成図
Phase 1 System Architecture
Phase 1システム構成図

PoCの高速化をしつつ、この段階から実プロダクトの設計も開始しました。こうすることで、大部分の実装を本番時にそのまま再利用でき、実プロダクトのリリースまでの時間を短縮することができました。

Phase 2

Phase 2は実プロダクトの実装になります。実プロダクトでは各企業のCTI(Computer Telephony Integration)やCRM(Customer Relationship Management)と自動連携することになります。さらにPoCの処理も全てこのシステムに載ってくることが予期されました。本番運用とPoCの処理の共通化を考慮するとリトライ処理なども完備したかったため、ワークフローツールを導入しました。インフラ基盤とセットでワークフローツールをいくつか検討しました。

インフラ基盤とワークフローツールの選定

ここでは以下のような方針を採用することを避けました。

  • 流行っているという理由での技術選定
  • 必要以上にリッチな構成
  • ML独自のツールの採用

これらの方針を避ける理由は次の通りです。

  • プロダクト立ち上げなのでシンプルで管理しやすいシステムを目指したい
  • ML独自のツールでは運用もMLエンジニアが全て見る必要があり、ソフトウェアエンジニアと共同して運用できない

そのため、以下を選定の基準としました。

  • 達成したい目的を整理し、それを実現できる点
  • ソフトウェアエンジニアと共同で運用できる点

AI ShiftではクラウドサービスとしてGoogle Cloudを採用しており、ほとんどのアプリケーションがGKE上で動作しています。インフラ基盤の選定に関して、元々は素早く立ち上げるためにCloud Run Jobsを使用する予定でしたが、ワークフローツールとの兼ね合いや音声認識などをGPU上で動かしたい場面が想像できたため、既に動いているGKEを選定しました。他のものに比べて初期実装コストが少し高いですが、チームとしての運用の知見アセットがある点も含めて選定しました。

ワークフローツールの選定に関しては社内の別プロダクトの方にもヒアリングしつつ、柔軟性や既存システムとの相性、運用ノウハウの点からArgo Workflowsを選定しました。AI ShiftがArgo CDをCDツールとして使用していることも、ソフトウェアエンジニアと一緒に運用する上で大きな利点となりました。

上記を含めて、Phase 2(プロダクト)のインフラ構成は以下になります。

Phase 2 System Architecture
Phase 2構成図

機械学習パイプラインの構築

AI Shiftの本番環境のGKEの管理にはKustomizeが使用されているため、検証が終わり次第デプロイするまでの差分を減らすため、ローカルでもKustomizeを使用しました。本番ではCTIなどと自動連携し、PubSubへのパブリッシュをトリガーにワークフローを動かしたかったため、Argo Eventsも使用しています。

Argo Workflowsとは

Kubernetes 上で並列ジョブを調整するためのオープンソースのワークフローツールです。柔軟なDAGを組むことができます。

Argo Eventsとは

Kubernetes 用のイベント駆動型ワークフロー実行ツールで、Webhook・MQなど様々なツールに対応しています。Sensor、EventSource、EventBusの3つのコンポーネントからなります。

Argo Events System Architecture
イメージ図

Google Cloudリソースの作成

まずは、以下のようにTerraformで必要なGoogle Cloudのリソースを作成しています。

Argo Workflows

# External IP for Argo Workflowsresource "google_compute_global_address" "argo_workflows_external_ip" {  name         = "argo-workflows-server-ip"  address_type = "EXTERNAL"  ip_version   = "IPV4"  project      = var.project}# DNS Zonedata "google_dns_managed_zone" "dns_zone" {  name    = "dns-zone"  project = var.project}# DNS Record for Argo Workflowsresource "google_dns_record_set" "argo_workflows" {  project      = var.dns_management_project  name         = "sample.${data.google_dns_managed_zone.dns_zone.dns_name}"  type         = "A"  ttl          = 300  managed_zone = data.google_dns_managed_zone.dns_zone.name  rrdatas      = [google_compute_global_address.argo_workflows_external_ip.address]  lifecycle {    ignore_changes = [rrdatas]  }}# Access Policy for Argo Workflowsresource "google_compute_security_policy" "argo_workflows_policy" {  name = "${var.env}-argo-workflows-policy"  rule {    action   = "deny(403)"    priority = "2147483647"    match {      versioned_expr = "SRC_IPS_V1"      config {        src_ip_ranges = ["*"]      }    }    description = "Default"  }  rule {    action   = "allow"    priority = "1001"    match {      versioned_expr = "SRC_IPS_V1"      config {        src_ip_ranges = [ "xxx" ]      }    }    description = "Allow IP Ranges"  }}# History DB for Argo Workflowsresource "google_sql_database" "argo_workflows_db" {  name      = "$argo_workflows_db"  instance  = var.db_instance_name  charset   = "utf8mb4"  collation = "utf8mb4_general_ci"  project   = var.project}# Service Account for Argo Workflowsresource "google_service_account" "argo_workflows_sa" {  account_id   = "argo-workflows-sa"  display_name = "argo workflows service account"}# Attach secret manager accessor to Service Account for Argo Workflowsresource "google_project_iam_member" "workflows_secret_manager_iam" {  project = var.project  role    = "roles/secretmanager.secretAccessor"  member  = "serviceAccount:${google_service_account.argo_workflows_sa.email}"}# Attach pub/sub publisher to Service Account for Argo Workflowsresource "google_project_iam_member" "workflows_pubsub_publisher_iam" {  project = var.project  role    = "roles/pubsub.publisher"  member  = "serviceAccount:${google_service_account.argo_workflows_sa.email}"}# Attach storage object admin to Service Account for Argo Workflowsresource "google_project_iam_member" "workflows_storage_iam" {  project = var.project  role    = "roles/storage.objectAdmin"  member  = "serviceAccount:${google_service_account.argo_workflows_sa.email}"}# Attach artifact registry writer to Service Account for Argo Workflowsresource "google_project_iam_member" "workflows_artifact_registry_iam" {  project = var.project  role    = "roles/artifactregistry.writer"  member  = "serviceAccount:${google_service_account.argo_workflows_sa.email}"}# Attach service account token creator to Service Account for Argo Workflowsresource "google_project_iam_member" "workflows_service_account_token_creator_iam" {  project = var.project  role    = "roles/iam.serviceAccountTokenCreator"  member  = "serviceAccount:${google_service_account.argo_workflows_sa.email}"}# Attach cloud sql client to Service Account for Argo Workflowsresource "google_project_iam_member" "workflows_cloudsql_client_iam" {  project = var.project  role    = "roles/cloudsql.client"  member  = "serviceAccount:${google_service_account.argo_workflows_sa.email}"}# Attach Workload Identity to Service Account for Argo Workflowsresource "google_service_account_iam_binding" "argo_workflows_workload_identity" {  service_account_id = google_service_account.argo_workflows_sa.name  role               = "roles/iam.workloadIdentityUser"  members            = ["serviceAccount:${var.project}.svc.id.goog[argo-workflows/argo-workflows-sa]"]}# Bucket for Argo Workflows Logresource "google_storage_bucket" "argo_workflows_logs_bucket" {  name          = "argo-workflows-logs"  location      = var.region  project       = var.project  force_destroy = true}

Argo Events

# Argo Eventsのservice accountresource "google_service_account" "argo_events_sa" {  account_id   = "argo-events-sa"  display_name = "用のargo events用のservice account"}# Attach pub/sub subscriber to Service Account for Argo Eventsresource "google_project_iam_member" "pubsub_subscriber_iam" {  project = var.project  role    = "roles/pubsub.subscriber"  member  = "serviceAccount:${google_service_account.argo_events_sa.email}"}# Attach pub/sub viewer to Service Account for Argo Eventsresource "google_project_iam_member" "pubsub_viewer_iam" {  project = var.project  role    = "roles/pubsub.viewer"  member  = "serviceAccount:${google_service_account.argo_events_sa.email}"}# Attach service account token creator to Service Account for Argo Eventsresource "google_project_iam_member" "workflows_service_account_token_creator_iam" {  project = var.project  role    = "roles/iam.serviceAccountTokenCreator"  member  = "serviceAccount:${google_service_account.argo_events_sa.email}"}# Attach Workload Identity to Service Account for Argo Eventsresource "google_service_account_iam_binding" "argo_events_workload_identity" {  service_account_id = google_service_account.argo_events_sa.id  role               = "roles/iam.workloadIdentityUser"  members            = ["serviceAccount:${var.project}.svc.id.goog[argo-events/argo-events-sa]"]}

 

Argo Workflowsのインストール

Argo WorkflowsをGKEに構築していきます。(一部抜粋)

まず、以下のページからinstall.yamlをダウンロードし、kustomizeの配下におきます。

https://github.com/argoproj/argo-events/releases

ConfigMapを作成します。ここでは、ワークフローの履歴をMySQLに入れるための設定をしています。また、GCSにもログが上がるように設定しています。

ConfigMap

apiVersion: v1kind: ConfigMapmetadata:  name: workflow-controller-configmap  namespace: argo-workflowsdata:  parallelism: "10"  namespace: argo-workflows  resourceRateLimit: |    limit: 10    burst: 1  persistence: |    archive: true    archiveTTL: 10d    skipMigration: false    connectionPool:      maxIdleConns: 100      maxOpenConns: 100    nodeStatusOffLoad: true    mysql:      host: xxxxx      port: 3306      database: xxxx      tableName: argo_workflows      userNameSecret:        name: argo-workflows-secret        key: MYSQL_USER      passwordSecret:        name: argo-workflows-secret        key: MYSQL_PASSWORD  artifactRepository: |    archiveLogs: true    gcs:      bucket: sample-log-bucket      keyFormat: "argo\        /{{workflow.creationTimestamp.Y}}\        /{{workflow.creationTimestamp.m}}\        /{{workflow.creationTimestamp.d}}\        /{{workflow.name}}\        /{{pod.name}}"

Service Account

ここで、Google Cloud側で作成済みのサービスアカウントとKubernetesのサービスアカウントを紐付けます。

apiVersion: v1kind: ServiceAccountmetadata:  name: argo-workflows-sa  namespace: argo-workflows  annotations:    iam.gke.io/gcp-service-account: sample-gcp-service-account

Deployment

Deploymentの定義をします。ここでは--auth-modeはclient、server、ssoが対応しています。

apiVersion: apps/v1kind: Deploymentmetadata:  name: argo-server  namespace: argo-workflowsspec:  strategy:    type: RollingUpdate    rollingUpdate:      maxSurge: 1      maxUnavailable: 0  selector:    matchLabels:      app: argo-server  template:    metadata:      labels:        app: argo-server    spec:      serviceAccountName: argo-workflows-sa      terminationGracePeriodSeconds: 60      nodeSelector:        cloud.google.com/gke-nodepool: sample-node-pool      containers:      - name: argo-server        args: ["server", "--secure=false", "--auth-mode=sso"]        ports:        - containerPort: 2746        livenessProbe:          httpGet:            path: /            port: 2746            scheme: HTTP          initialDelaySeconds: 3          periodSeconds: 30          timeoutSeconds: 1          successThreshold: 1          failureThreshold: 3        readinessProbe:          httpGet:            path: /            port: 2746            scheme: HTTP          initialDelaySeconds: 3          periodSeconds: 30          timeoutSeconds: 1          successThreshold: 1          failureThreshold: 3        lifecycle:          preStop:            exec:              command:                - /bin/sh                - -c                - sleep 10

Service

apiVersion: v1kind: Servicemetadata:  name: argo-server  namespace: argo-workflows  annotations:    cloud.google.com/neg: '{"ingress":true}'spec:  type: ClusterIP  selector:    app: argo-server  ports:  - name: argo-workflows-ui    port: 80    targetPort: 2746    protocol: TCP

Ingress

Ingressを作成します。ここでは先ほどTerraformで作成済みのExternal IPや、DNSレコードを設定します。

apiVersion: networking.k8s.io/v1kind: Ingressmetadata:  name: argo-server-ingress  namespace: argo-workflows  annotations:    kubernetes.io/ingress.global-static-ip-name: sample-ip-name    networking.gke.io/managed-certificates: sample-certificatesspec:  defaultBackend:    service:      name: argo-server      port:        number: 80  rules:    - host: sample-domain      http:        paths:          - path: /            pathType: Prefix            backend:              service:                name: argo-server                port:                  number: 80

これらをapplyすることで、Argo Serverが起動し、Argo UIが表示されます。

Argo Events

Argo EventsをGKEに構築していきます。(一部抜粋)

まず、以下のページからinstall.yamlをダウンロードし、kustomizeの配下におきます。

https://github.com/argoproj/argo-events/releases

EventBus

EventBusを定義します。

apiVersion: argoproj.io/v1alpha1kind: EventBusmetadata:  name: argo-events-bus  namespace: argo-eventsspec:  jetstream:    version: 2.9.12    replicas: 3    nodeSelector:      cloud.google.com/gke-nodepool: sample-node-pool    streamConfig: |      maxAge: 24h    settings: |      max_file_store: 1GB    startArgs:      - "-D"

EventSource

EventSourceを作成します。ここでは先ほどTerraformで作成した、ワークフローをキックする際に使用するPub/Subを指定します。

apiVersion: argoproj.io/v1alpha1kind: EventSourcemetadata:  name: gcp-pubsub  namespace: argo-eventsspec:  eventBusName: argo-events-bus  type: pubsub  template:    serviceAccountName: argo-events-sa    spec:      nodeSelector:        cloud.google.com/gke-nodepool: sample-node-pool  pubSub:    workflow-trigger:      jsonBody: true      projectID: sample-project      topic: sample-topic      subscriptionID: sample-topic-sub

Sensor

どのワークフローをキックするかを定義します。ここで、dependenciesはEventSourceと名前を合わせる必要があります。

apiVersion: argoproj.io/v1alpha1kind: Sensormetadata:  name: pubsub-sensor  namespace: argo-eventsspec:  template:    serviceAccountName: argo-events-sa  eventBusName: argo-events-bus  dependencies:    - name: sample-trigger-dependency      eventSourceName: gcp-pubsub      eventName: workflow-trigger  triggers:    - template:        name: trigger-template        k8s:          operation: create          source:            resource:              apiVersion: argoproj.io/v1alpha1              kind: Workflow              metadata:                generateName: workflow-                namespace: argo-workflows              spec:                serviceAccountName: argo-workflows-sa                workflowTemplateRef:                  name: sample-workflow                arguments:                  parameters:                  - name: config                    valueFrom:                      path: /event          parameters:            - src:                dependencyName: sample-trigger-dependency                dataKey: body              dest: spec.arguments.parameters.0.value

これらをapplyすることで、Argo Events用のpodが作成されます。

 

Workflowの本体のデプロイ

Argo Events経由でワークフローをキックしたいため、今回はWorkflowTemplateを用いて機械学習のワークフローを定義します。

大まかな処理は以下のようになります。

  1. 前処理
  2. 音声認識
  3. 後処理
  4. 要約
  5. 結果格納

はじめにentrypoint-jobを定義し、inputやファイルによってワークフローを切り替えるために判定用のjobを一つ入れています。

apiVersion: argoproj.io/v1alpha1kind: WorkflowTemplatemetadata:  name: sample-workflow  namespace: argo-workflowsspec:  templates:  - name: main-job    nodeSelector:      cloud.google.com/gke-nodepool: sample-node-pool    inputs:      parameters:      - name: config    steps:    - - name: entrypoint-job        templateRef:          name: entrypoint-job          template: entrypoint        arguments:          parameters:          - name: config            value: "{{inputs.parameters.config}}"    - - name: main-job        templateRef:          name: main-job          template: main-audio  - name: exit-handler-job    steps:    - - name: slack-notification        templateRef:          name: slack-notification-job          template: slack-notification        when: "{{workflow.status}} != Succeeded"        arguments:          parameters:          - name: title            value: "{{workflow.name}}が失敗しました"                    :

実際の処理やロジックは割愛しますが、高速化のため複数ステップで並列化させています。また、処理の内容によってはGPUを使用したいため、nodeSelectorによってインスタンスを分けています。

構築したワークフロー

今回構築したワークフローの全体像は以下のようになっています。音声を扱うのでMLロジックが盛りだくさんですが、柔軟なワークフローを組むことができます。

Argo Workflows Result
ワークフローの実行画面

おわりに

本記事では新規プロダクトであるAI Messenger Summaryのプロジェクトの進め方と、機械学習パイプラインについて紹介しました。AI Messenger Summaryでは機械学習パイプラインにArgo Workflowsを導入しました。すでにKubernetesを運用している方にとっては選択肢になりうると思います。
最後までお読みいただきありがとうございました!

Share on

  • Twitterでシェア
  • Facebookでシェア
  • LINEでシェア
  • noteでシェア
  • はてぶでシェア
PREVNEXTBACK TO LIST
  • Twitterでシェア
  • Facebookでシェア
  • LINEでシェア
  • noteでシェア
  • はてぶでシェア

CATEGORY

PICK UP

    TAG


    [8]ページ先頭

    ©2009-2025 Movatter.jp