Go to list of users who liked
Share on X(Twitter)
Share on Facebook
More than 5 years have passed since last update.
実運用に向けたLINE BOTサーバの実装例
概要
実運用可能なLINE BOTサーバを考えると、次の参考URLでyoichiro6642さんが書いているように非同期の処理が必要になります。
参考URL:大量メッセージが来ても安心なLINE BOTサーバのアーキテクチャ
小中規模の環境で、ある程度大量メッセージが来ても耐えられることを目的として、上記に沿ったLINE BOTサーバ(のスケルトン)を書いてみました。
最後の「APIの呼び出し回数を減らす」(メッセージ送信で複数MIDを指定してPIの呼び出し回数を減らす)ところは実装してません。
利用した環境は以下です。
- OS : CentOS 7.2.1511 x86_64
- BOT Server : Node.js v6.2.0
- Queue : MongoDB v3.2.6
- Dispatcher & jobWorker: Python 2.7.5
Amazon API Gateway+Lambda+DynamoDBという選択肢もありましたが、Node.js+MongoDB+Pythonでオーバーヘッドの少ない軽量Dispatcher&jobWorkerを実装できるのではないかと考えました。
Queueは、RabbitMQ, memcached, Redisなども考えられましたが、次の理由からMongoDBを利用しました。
- ポーリングではなくQueueに追加されたことを契機に処理をキックできるトリガーが欲しい。
- MongoDBは、シングルで(勿論シングルでなくても良い)マスターとして使えばoplogが利用でき、oplog監視することでトリガーとして使える。
- 受け付けたMIDごとの情報の格納や参照に、所詮高速なDBが必要。
前提知識
- CentOS 7
- Node.js
- MongoDB, MongoDB.oplog
- Python
実装例
MongoDBの準備
特に指定したのはreplication, oplogSizeMBくらいです。
systemLog: destination: file logAppend: true path: /var/log/mongodb/mongod.line_bot.logstorage: dbPath: /var/lib/mongo/line_bot journal: enabled: trueprocessManagement: fork: false # fork and run in background pidFilePath: /var/run/mongodb/mongod.line_bot.pid # location of pidfilenet: port: 27017 bindIp: 127.0.0.1 # Listen to local interface only, comment to listen on all interfaces.replication: oplogSizeMB: 3072Mongodの起動
マスターモードで起動します。
$ mongod --master -f mongod.line_bot.confコレクションの作成
コレクションはcapped collectionにして、容量増加を気にしなくてもいいようにしてます。
# !/bin/bach -vmongo --port=27017 <<EOFuse line_bot;db.createCollection("recvq", { capped: true, size: 1048576000 // 1GB});EOFBOT Server(Node.js)
frontDesk.jsでは、LINE Serverからのメッセージを受け取り、即座に応答を返します。
// Settings of the this programvarhttpsPort=443;varallowPath="/callback";varhttpsOpt={"caKey":"/etc/letsencrypt/live/xxx/privkey.pem","caCert":"/etc/letsencrypt/live/xxx/fullchain.pem","caCa":"/etc/letsencrypt/live/xxx/chain.pem"};local={};local['channelSecret']="xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx";// Settings of the MongoDBvarmongoHost="127.0.0.1";varmongoPort=27017;varmongoDb="line_bot";varmongoCol="recvq";varexpress=require('express'),bodyParser=require('body-parser'),log4js=require('log4js'),https=require('https'),fs=require('fs'),mongo=require('mongodb'),path=require('path');varaccept=require(__dirname+'/accept');varapp=express();app.use(bodyParser.json());app.use(bodyParser.urlencoded({extended:true}));// MongoDBvarMongoClient=require('mongodb').MongoClient,assert=require('assert');varmongoUrl='mongodb://'+mongoHost+":"+mongoPort+"/"+mongoDb;set_col(local,mongoUrl,function(rc,local,mongoUrl){if(!rc){console.log("set_col.rc:"+rc);local.db.close();process.exit(1);}console.log("Connected succesfully to"+mongoUrl);});// handle a requestapp.post(allowPath,function(req,res,next){local['acceptTime']=newDate().getTime();// record accept time(ms)// response ASAPres.status(200).send("OK");res.end();accept.post_callback(req,res,next);// Handle the request});// server certificate authorityvarhttpsOpt={key:fs.readFileSync(httpsOpt.caKey),cert:fs.readFileSync(httpsOpt.caCert),ca:fs.readFileSync(httpsOpt.caCa)};// listen portvarhttpsServer=https.createServer(httpsOpt,app);httpsServer.listen(httpsPort,function(){console.log('Listening on port'+httpsPort+'...');}).on('error',function(err){if(err.errno==='EADDRINUSE'){console.log('This program is already running.');}else{console.log(err);}process.exit(1);});functionset_col(local,url,callback){// Use connect method to connect to the MongoServerMongoClient.connect(url,function(err,db){if(err){console.log("MongoDB connection error.");console.log(err);process.exit(1);}local['db']=db;local.db.collection(mongoCol,function(err,collection){if(err){console.log("MongoDB collection error.");console.log(err);process.exit(1);}local.db['collection']=collection;callback(true,local,url);});});}その後、accept.jsで、署名の検証、MongoDBへの登録を行います。
varcrypto=require('crypto');varassert=require('assert');exports.post_callback=function(req,res){// signatureの有無をチェックif((!req.headers)||(!req.headers["x-line-channelsignature"])){console.log("400. Bad Request. The request does not have a x-line-channelsignature");return;}// requestのresultの有無をチェックif((!req.body)||(!req.body['result'])){console.log("400. Bad Request. The request does not have result");return;}varresult_num=req.body.result.length;// HTTP bodyをchannelSecretでsha256暗号化, base64ダイジェストを求める.varbody_str=newBuffer(JSON.stringify(req.body),'utf8');computedSignature=crypto.createHmac("sha256",local['channelSecret']).update(body_str).digest("base64");// signatureを比較し、正当性を確認if(req.headers["x-line-channelsignature"]!=computedSignature){console.log("400. Bad Request. The x-line-channelsignature is wrong.");return;}// 受け付けた時刻を入れておくfor(vari=0;i<Object.keys(req.body.result).length;i++){req.body.result[i]['acceptTime']=local['acceptTime'];}// メッセージをMongoDBに登録local.db.collection.insertMany(req.body.result,function(err,r){assert.equal(null,err);assert.equal(result_num,r.insertedCount);toQueueTime=newDate().getTime()-local['acceptTime'];console.log("necessary time to store to queue:"+toQueueTime+" ms");return;});}Dispatcher & jobWorker
Pythonのマルチスレッドで実装しました。
jobWorkerスレッドは、生成されたらwait()でthreading.Event()を待ちます。
トリガースレッドは、oplogをtsで監視することで、Queueへの追加を契機に処理を開始します。
読み込んだQueueの内容を空きjobWorkerスレッドに割り当て、EventをsetしてjobWorkerに処理を開始させます。
リスト、変数に対するスレッドの参照、更新は意識しているので、ロックはしていません...
の予定だったのですが、LINE APIサーバにマルチスレッドでアクセスしてしまうと、同時接続数のエラーが発生しました。なので、jobWorkerからLINE APIサーバへのアクセスにはacquire()で排他ロックを使用してます。ドキュメントにその辺の事は書かれてないので、1多重、アクセス間隔100msにしています。
なお、私はPythonのマルチスレッドは初心者なので、誤りがあればご指摘ください。
# !/usr/bin/env python# -*- coding: utf-8 -*-# Settings of the this programNumOfThread=20searchInterval=100000# uSecmainSleepInterval=60# Sec# Settings of the MongoDBmongoHost="127.0.0.1";mongoPort=27017;mongoDb="line_bot";mongoCol="recvq";importos,os.pathimportsysimportthreadingimporttimeimportjsonimportpymongofrompymongo.cursorimportCursorTypefromdatetimeimportdatetimeimportdatetimeimportjobWorkerusleep=lambdax:time.sleep(x/1000000.0)# マイクロ秒スリープ##### workerスレッドdefworkerThread(tt):tno=str(tt[0])whileTrue:tt[2].clear()# Eventをクリアし、Evant発生まで待機tt[3]='w'tt[2].wait()ifverbose:# 待機終了。処理開始print'\nworker['+tno+']: wake up'# ここで実際の処理関数を呼び出すjobWorker.jobWorker(verbose,tno,tt[4]['o'])##### MongoDBトリガースレッドdefTriggerMongo(t,tchain,last,searchInterval,host,port,db,col):dbCol=db+'.'+colc=pymongo.MongoClient(host,port)# Uncomment this for master/slave.oplog=c.local.oplog['$main']# Uncomment this for replica sets.#oplog = c.local.oplog.rsfirst=next(oplog.find().sort('$natural',pymongo.DESCENDING).limit(-1))ts=first['ts']whileTrue:cursor=oplog.find({'ts':{'$gt':ts}},cursor_type=CursorType.TAILABLE_AWAIT,oplog_replay=True)whilecursor.alive:fordocincursor:# 定期的に {h:0,ts:Timestamp(nn.., 1),o:{},v:2,ns:'',op:'n'} が返る# が op:'n' は単なるインフォメーション。無視する。ifdoc['ns']==dbColanddoc['op']!='n':# 空きスレッドを探すi=tchain[last]whilet[i][3]!='w':i=tchain[i]ifi==tchain[last]:# 1周探したらusleep(searchInterval)t[i][4]=doc# 空きスレッドのt[n][4]にデータを格納t[i][3]='r't[i][2].set()# t[n]に処理開始指示last=i# Work with doc herets=doc['ts']print"got out of a while corsor.alive loop"######################################################################## Check of the parameterverbose=Falseiflen(sys.argv)==2andsys.argv[1]=='-v':verbose=Trueeliflen(sys.argv)!=1:print"Usage: %s [-v]"%(sys.argv[0],)quit()# workerスレッド管理データ作成 & workerスレッド生成# [ThreadNo, ThreadObj ,EvantObj, status, スレッドに渡すデータ]# (status ' ':準備中, 'w':待機中・空き, 'r':実行中)# :t=[[0foriinrange(5)]foriinrange(NumOfThread)]foriinrange(NumOfThread):t[i][0]=i# Thread No.t[i][2]=threading.Event()# Evantオブジェクト生成t[i][3]=''# is_running# workerスレッド生成t[i][1]=threading.Thread(name='worker['+str(i)+']',target=workerThread,args=(t[i],))t[i][1].setDaemon(True)# Thread list of circulationtc=[0foriinrange(NumOfThread)]# 値は次のスレッドNo.foriinrange(NumOfThread):tc[i]=i+1tc[i]=0# make a list of circulationlastThread=i# 最後に使ったスレッド. 次はtc[lastThread]番目のスレッドを使う.# workerスレッド起動foriinrange(NumOfThread):t[i][1].start()# workerスレッド起動後wait状態待ちyetAllThread=TruewhileyetAllThread:foriinrange(NumOfThread):ift[i][3]=='':breakelse:usleep(100)# 監視間隔は0.1ミリ秒ifi==NumOfThread-1:yetAllThread=Falseelse:usleep(100)# 監視間隔は0.1ミリ秒# MongoDBトリガースレッド生成t_mongo=threading.Thread(name='t_mongo',target=TriggerMongo,args=(t,tc,lastThread,searchInterval,mongoHost,mongoPort,mongoDb,mongoCol,))t_mongo.setDaemon(True)t_mongo.start()# 開始# mainスレッドwhileTrue:time.sleep(mainSleepInterval)jobWorker.pyが実際の処理を行うスレッドです。ここは送信コンテンツ種別に応じてオウム返しするだけのサンプルです。
MID(from)の取り方がopTypeによって異なりますので注意してください。
# !/usr/bin/env python# -*- coding: utf-8 -*-# Note of caution:# - This program is one of the threads.# - Must not do exit(),quit()# - Please use only return()# Settings of the LINE API ServerlineApiHost="trialbot-api_line_me"accessIntervalMS=100# msgetProfilesUrl="https://trialbot-api.line.me/v1/profiles"postUrl="https://trialbot-api.line.me/v1/events"getContentUrl="https://trialbot-api.line.me/v1/bot/message/$messageId/content"header={"Content-Type":"application/json; charser=UTF-8","X-Line-ChannelID":"xxxxxxxxxx","X-Line-ChannelSecret":"xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx","X-Line-Trusted-User-With-ACL":"xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"}postBodyTemplate={"to":[],"toChannel":1383378250,"eventType":"138311608800106203","content":{}}importthreadingimporttimeimportdatetimeimportjsonusleep=lambdax:time.sleep(x/1000000.0)# マイクロ秒スリープ# LINE APIサーバに複数jobWorkerスレッドからの同時アクセスを回避するためのロック関連globalLock={}# 接続先ごとのロックglobalLastAccessTime={}# 接続先ごとの最終アクセスタイムloadTime=int(time.time()*1000)defjobWorker(verbose,mynum,recvBody):globalglobalLockglobalglobalLastAccessTime# 接続先ごとのロックの初期設定ifnotglobalLock.has_key(lineApiHost):globalLock[lineApiHost]=threading.Lock()# 接続先ごとの最終アクセスタイムの初期設定ifnotglobalLastAccessTime.has_key(lineApiHost):globalLastAccessTime[lineApiHost]=loadTimeifverbose:recvBody['_id']='ObjectId("'+str(recvBody['_id'])+'")'print'worker['+mynum+'] recvBody:'+str(int(time.time()*1000)-recvBody['acceptTime'])+' ms to here from accept'printrecvBodyopType=recvBody['content'].get('opType')# blocked from userifopType==8:# ユーザ管理からブロックユーザのMID(recvBody['content']['params'][0])を削除print'please delete user"'+recvBody['content']['params'][0]+'" from management data.'return# POSTのBody部をコピーpostBody={}postBody['to']=''postBody['toChannel']=postBodyTemplate['toChannel']postBody['eventType']=postBodyTemplate['eventType']postBody['content']={}# メッセージ返信先ifopType==4:# New userpostBody['to']=[recvBody['content']['params'][0]]else:postBody['to']=[recvBody['content']['from']]# New userifopType==4:# ユーザプロフィールを取得result=apiServer(verbose,mynum,'get',lineApiHost,getProfilesUrl,header,postBody['to'][0],accessIntervalMS)userProfile=json.loads(result.text)resText='ようこそ!'# ユーザ管理DBにユーザのMIDでプロフィールを追加すべきprint'please add'+userProfile['contacts'][0]['displayName']+'\'s profile to user management db.'printjson.dumps(userProfile,sort_keys=True,indent=4)# メッセージに応じた処理contentType=recvBody['content'].get('contentType')resText=''ifcontentType==1:# TextresText=u'はい、'+recvBody['content']['text']+u'、ですね。'elifcontentType==2:# ImageresText=u'写真ですね...'elifcontentType==3:# VideoresText=u'動画ですね...'elifcontentType==4:# AudioresText=u'ボイスメッセージですね...'elifcontentType==7:# LocationresText=u'位置情報ですね...'ifverbose:printrecvBody['content']['text'].encode('utf-8')printrecvBody['content']['location']['title'].encode('utf-8')printrecvBody['content']['location']['address'].encode('utf-8')elifcontentType==8:# StickerresText=u'スタンプですね'ifverbose:printrecvBody['content']['contentMetadata']['STKTXT'].encode('utf-8')elifcontentType==10:# Contact# Contact(contentType==10)ならcontentMetadataのmidプロフィールを取得resText=recvBody['content']['contentMetadata']['displayName']+u'さんの連絡先ですね'result=apiServer(verbose,mynum,'get',lineApiHost,getProfilesUrl,header,recvBody['content']['contentMetadata']['mid'],accessIntervalMS)contactProfile=json.loads(result.text)ifverbose:print'\ncontactProfile:'+str(contactProfile)# 応答メッセージ送信ifresText:# ユーザプロフィールを取得(本来はユーザ登録時にDBに登録、必要に応じて取得)result=apiServer(verbose,mynum,'get',lineApiHost,getProfilesUrl,header,postBody['to'][0],accessIntervalMS)userProfile=json.loads(result.text)resText=userProfile['contacts'][0]['displayName']+u'さん、'+resTextifverbose:print'\nprofile:'+str(userProfile)# 送信メッセージはtext(ContentType=1). その他にImage,Video,Audio,Location,Sticker,multiple messages,rich messagesが送れるpostBody['content']={'contentType':1,'toType':1,'text':resText}ifverbose:print'\nworker['+mynum+']'+postUrlprint'worker['+mynum+'] postHeader:'+json.dumps(header,sort_keys=True,indent=4)print'worker['+mynum+'] postBody:'+json.dumps(postBody,sort_keys=True,indent=4)# メッセージ送信r=apiServer(verbose,mynum,'post',lineApiHost,postUrl,header,postBody,accessIntervalMS)return# LINE APIサーバアクセスdefapiServer(verbose,mynum,method,host,url,header,body,accessIntervalMS):importrequestsglobalglobalLockglobalglobalLastAccessTimeglobalLock[host].acquire()# Lock# LINE APIサーバへのアクセスに一定時間空ける場合、あとどのくらい待てば良いかcurrentTime=int(time.time()*1000)remain=accessIntervalMS-(currentTime-globalLastAccessTime[host])ifverbose:print'worker['+mynum+'] time since last access(ms):'+str(currentTime-globalLastAccessTime[host])print'worker['+mynum+'] remain='+str(remain)+' ms'# wait accessIntervalMS from last accessifremain>0:usleep(remain*1000)ifmethod=='get':ifbody:payload={'mids':body}r=requests.get(url,params=payload,headers=header)else:ifverbose:printurl,headerr=requests.get(url,headers=header)else:r=requests.post(url,data=json.dumps(body),headers=header)ifverboseandr.status_code!=200:print'worker['+mynum+'] HTTP status code:'+str(r.status_code)print'worker['+mynum+'] response:'+r.textglobalLastAccessTime[host]=int(time.time()*1000)globalLock[host].release()# releasereturnrまとめ
大量メッセージが来ても使える軽量Queue機構、Dispatcher&jowWorkerのスケルトンを実装できたと思います。
64bit CentOS 7の初期状態だとシステム全体でスレッド数の上限は30118なのですが..5000スレッドだと生成に失敗します。(...ってそんなにいらないけど)
このような機構はBOTサーバだけでなく、複数のSMTPサーバを使って大量メールを効率良く配信する時にも必要になります。
jobWorker側を別言語アプリにするなら、マイクロサービス化するか別プロセスでpipeで通信するように変更すれば使えると思います。
この仕組みで負荷分散させるなら、MongoDBを別サーバやShardingにしたり、「4.consurring message」以降を別マシンに持っていきます。それ以上にjobWorkerを分散させたい時はマイクロサービス化するか別のリクエストブローカーの仕組みにした方が良いです。
Register as a new user and use Qiita more conveniently
- You get articles that match your needs
- You can efficiently read back useful information
- You can use dark theme
