この広告は、90日以上更新していないブログに表示しています。
GAEにどんどん機能が追加されていく中、なかなか実装されないのが全文検索。品詞がとれるセグメンターだけでも提供してくれたら全然便利だと思うんだけどそんなアナウンスはまだ有りません。
なきゃ作ればいいじゃんという事で、全文検索もどきを実装してみました。ひとつ前のエントリー通りTriGramです。
以前、恵比寿のイケメン イアンさんと一緒に作ったmisopotetoというモジュールをベースにしています。
今回のポイントは、転置インデックスをredisサーバに送っているところ、GAE(とうかDB全般)は、インサートがめちゃくちゃ遅いので、Ngramでgram毎にエントリーIDをappendしていくというのは辛いです。Twitterの検索結果15個x100文字位をTriGramでインデックスを作ろうとすると、1500個くらいをgetしてappendして、putする必要があります。以前は、TaskQueueが無かったのでおれおれQueueを作って非同期でIndexを作成していましたが今はTaskQueueがあるので問題なく実装できると思っていました。がしかし、全然遅いし、なぞのリトライが多発して、30分くらいで2000+個のTaskQueueが積まれっぱなしで全然消化される気配がありませんでした。
最近メチャ熱な、redisを使うことでサクサクIndex作成をしてみました。この時点でGAE/Pだけじゃなくなってるんですが・・・時間かければGAEでも出来るのでまぁまぁまぁ。redisは、sakuraのVPSにubuntu入れて立ててます。Pythonのredisモジュールはsocketが必須でGAEで使えないので、VPS上にredisプロキシーなFlaskAppを作ってPOSTメソッドでredisのKVSを使えるようにしています。
ソースは、bitbucket.orgに上げました。まるごと上げたので問題有るかも知れないですが、問題あったら教えてください。
ソース:http://bitbucket.org/a2c/a2c-fts/overview
a2c-fts:http://a2c-fts.appspot.com/
(注意:ChromeかSafari以外で見ると悲しい見た目になります)
utilsモジュールにNgramSegmenterクラスを入れてます。これでNgramでぶつ切ります。デフォルトでBigramですが、ノイズがハンパないので今回はTrigramで使用しています。以下ソース
classNgramSegmenter: _word_delimiter_regex =u"[。、" + string.punctuation +" ]"def__init__(self, text, sp=2, word_delimiter_regex=None):ifnot word_delimiter_regex: word_delimiter_regex = self._word_delimiter_regex self.text = re.sub(word_delimiter_regex,r'', text) self.ngramArr = []for posinrange(len(self.text)-sp+1): self.ngramArr.append({'word_pos' : pos,'word_text' : self.text[pos:pos+sp]})defgetText(self):return self.textdefgetNgramArr(self):return self.ngramArrdefgetSegmenter(self): a = [x['word_text']for xin self.ngramArr]return a#return ' '.join(a)if __name__ =='__main__': LOG_FILENAME ='segmenter.log' logging.basicConfig(filename=LOG_FILENAME,level=logging.DEBUG) text =u'やってみせ 言って聞かせて させて見せ ほめてやらねば 人は動かじ' ana2Str = NgramSegmenter(text ,2)print ana2Str.getSegmenter()
GAEからredisサーバを使用する為にシンプルなWebアプリ作りました。redisはappend操作もアトミックに動くのでとても楽ちんです。登録する時には、GETでもPOSTでも受け付けます。参照するときにはGETのみ。テスト用にルートにアクセスすると登録用のフォームもあります。abかけてみましたが、秒間数百行けてるです。
#!/usr/bin/env pythonimport osimport redisfrom flaskimport Flask, requestimport jsonapp = Flask(__name__)app.debug =True@app.route('/')defredis_input(): html =''' <!doctype html> <form action="/api/post" method="post"> Key:<input type=text name="key"><br> Val:<input type=text name="val"><br> <input type=submit value="save"> </form> '''return html# GET method ================================================@app.route('/api/set/<key>/<val>')defset_id(key, val): gram ='redis_' + key twit_id =str(val) r = redis.Redis(host='localhost', port=6379, db=0) r.rpush(gram, twit_id) cur_list = r.lrange(gram,0, -1)return json.dumps(cur_list, indent=2)@app.route('/api/get/<key>')defget_id(key): gram ='redis_' + key r = redis.Redis(host='localhost', port=6379, db=0) cur_list = r.lrange(gram,0, -1)return json.dumps(cur_list, indent=2)# POST method ================================================@app.route('/api/post', methods=['POST'])defset_post_id(): gram ='redis_' + request.form['key'] twit_id =str(request.form['val']) r = redis.Redis(host='localhost', port=6379, db=0) r.rpush(gram, twit_id) cur_list = r.lrange(gram,0, -1)return json.dumps(cur_list, indent=2)if __name__ =='__main__': app.run(host='0.0.0.0', port=8080)
GAE-jのグループにも聞いてしまったのですが、TaskQueueに登録するところではまりました。
TaskQueueで登録できるTaskは、自アプリの特定URLオンリーなので外部URLをTaskに登録することが出来ません。そこで、外部サーバーを叩くエンドポイントを作ってそこをTaskQueueで叩くようにしたのですが、正常終了しているはずなのにTaskが削除されずに延々リトライを続けて、GAEのCPUとバンドをドンドン食いつぶす病にかかってしまいました。
以下、駄目だったコード
@app.route('/api/send_redis', methods=['POST'])defsaveRedisTwitSearchIndex(): gram = request.form['gram'] twit_id =request.form['twit_id'] ext_url ='http://redis.hoge.com/api/post' form_fields = {"key": gram,"val": twit_id, } form_data = urllib.urlencode(form_fields) result = urlfetch.fetch(url=ext_url, payload=form_data, method=urlfetch.POST, )if result.status_code ==200:return1return1
ほぼサンプルからコピペした寄せ集めなので動くはずなんですが、全然だめで、原因もわかりません。
今も原因が分からないですが、どうやら if があるとダメぽいです。
if result.status_code ==200:return1
この部分をなくすとうまくいきました。動いている現在のコード
@app.route('/api/send_redis', methods=['POST'])defsaveRedisTwitSearchIndex(): gram = request.form['gram'] twit_id =request.form['twit_id'] ext_url ='http://redis.atusi.me/api/post' form_fields = {"key": gram,"val": twit_id, } form_data = urllib.urlencode(form_fields) result = urlfetch.fetch(url=ext_url, payload=form_data, method=urlfetch.POST, )''' if result.status_code == 200: #return result.status_code return json.loads(result.content) '''return"gram: %s<br>twitid: %s"%(gram, twit_id)
コメントアウトするだけで動きました。なんでやねん!
上記変更を加えてから、1000以上のTaskQueueも数分で消化できるようになりました。
検索結果は、GAEの上限の1000件まで返すようになってます。トップページは、いっぱい出ても意味が無いので100件に絞ってます。他にも色々つまずいた所とがあったような気がしたけど忘れた・・・
redisへのFlaskアプリのベンチーマーク結果
Document Path: /api/set/hoge1/1Document Length: 17 bytesConcurrency Level: 100Time taken for tests: 0.439 secondsComplete requests: 100Failed requests: 98 (Connect: 0, Receive: 0, Length: 98, Exceptions: 0)Write errors: 0Total transferred: 59828 bytesHTML transferred: 40740 bytesRequests per second: 227.68 [#/sec] (mean)Time per request: 439.219 [ms] (mean)Time per request: 4.392 [ms] (mean, across all concurrent requests)Transfer rate: 133.02 [Kbytes/sec] receivedConnection Times (ms) min mean[+/-sd] median maxConnect: 8 15 3.3 16 21Processing: 38 154 81.8 144 431Waiting: 38 154 81.6 144 428Total: 48 169 83.6 160 439
引用をストックしました
引用するにはまずログインしてください
引用をストックできませんでした。再度お試しください
限定公開記事のため引用できません。