Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings
/nsqPublic

Commitb16e68e

Browse files
authored
Merge pull request#193 from CrazyHZM/feat/support_requeue_delay_msg
Support requeue delay msg
2 parents2ecb3f9 +aaad42b commitb16e68e

File tree

3 files changed

+55
-3
lines changed

3 files changed

+55
-3
lines changed

‎internal/version/binary.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ import (
55
"runtime"
66
)
77

8-
constBinary="0.3.7-HA.1.12.9"
8+
constBinary="0.3.7-HA.1.13.0"
99

1010
var (
1111
Commit="unset"

‎nsqd/channel.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2663,7 +2663,7 @@ func (c *Channel) processInFlightQueue(tnow int64) (bool, bool) {
26632663

26642664
if!c.IsConsumeDisabled()&&!c.IsOrdered()&&
26652665
needPeekDelay&&clientNum>0 {
2666-
newAdded,cnt,err:=c.peekAndReqDelayedMessages(tnow)
2666+
newAdded,cnt,err:=c.PeekAndReqDelayedMessages(tnow)
26672667
iferr==nil {
26682668
ifnewAdded>0&&c.chLog.Level()>=levellogger.LOG_DEBUG {
26692669
c.chLog.LogDebugf("channel delayed waiting peeked %v added %v new : %v",
@@ -2709,7 +2709,7 @@ func (c *Channel) processInFlightQueue(tnow int64) (bool, bool) {
27092709
returndirty,checkFast
27102710
}
27112711

2712-
func (c*Channel)peekAndReqDelayedMessages(tnowint64) (int,int,error) {
2712+
func (c*Channel)PeekAndReqDelayedMessages(tnowint64) (int,int,error) {
27132713
ifc.IsEphemeral() {
27142714
return0,0,nil
27152715
}

‎nsqdserver/http.go

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,7 @@ func newHTTPServer(ctx *context, tlsEnabled bool, tlsRequired bool) *httpServer
8686
router.Handle("POST","/channel/fixconfirmed",http_api.Decorate(s.doFixChannelConfirmed,log,http_api.V1))
8787
router.Handle("POST","/channel/finishmemdelayed",http_api.Decorate(s.doFinishMemDelayed,log,http_api.V1))
8888
router.Handle("POST","/channel/emptydelayed",http_api.Decorate(s.doEmptyChannelDelayed,log,http_api.V1))
89+
router.Handle("POST","/channel/requeuedelayed",http_api.Decorate(s.doRequeueChannelDelayed,log,http_api.V1))
8990
router.Handle("POST","/channel/setoffset",http_api.Decorate(s.doSetChannelOffset,log,http_api.V1))
9091
router.Handle("POST","/channel/setorder",http_api.Decorate(s.doSetChannelOrder,log,http_api.V1))
9192
router.Handle("POST","/channel/setclientlimit",http_api.Decorate(s.doSetChannelClientLimit,log,http_api.V1))
@@ -224,6 +225,30 @@ func (s *httpServer) getExistingTopicChannelFromQuery(req *http.Request) (url.Va
224225
returnreqParams,topic,channelName,err
225226
}
226227

228+
func (s*httpServer)getTimeOffsetFromQuery(req*http.Request) (int,error) {
229+
reqParams,err:=url.ParseQuery(req.URL.RawQuery)
230+
231+
iferr!=nil {
232+
nsqd.NsqLogger().LogErrorf("failed to parse request params - %s",err)
233+
return1, http_api.Err{400,"INVALID_REQUEST"}
234+
}
235+
236+
offsetStr:=reqParams.Get("offset")
237+
238+
ifoffsetStr=="" {
239+
nsqd.NsqLogger().LogErrorf("The value of offset does not exist. Set the default value to 1")
240+
return1,nil
241+
}
242+
offset,err:=strconv.Atoi(offsetStr)
243+
244+
iferr!=nil {
245+
nsqd.NsqLogger().LogErrorf("offset invalid - %s",err)
246+
return1, http_api.Err{400,"INVALID_REQUEST"}
247+
}
248+
249+
returnoffset,err
250+
}
251+
227252
//TODO: will be refactored for further extension
228253
funcgetTag(reqParams url.Values)string {
229254
returnreqParams.Get("tag")
@@ -718,6 +743,33 @@ func (s *httpServer) doEmptyChannelDelayed(w http.ResponseWriter, req *http.Requ
718743
returnnil,nil
719744
}
720745

746+
func (s*httpServer)doRequeueChannelDelayed(w http.ResponseWriter,req*http.Request,ps httprouter.Params) (interface{},error) {
747+
_,topic,channelName,err:=s.getExistingTopicChannelFromQuery(req)
748+
iferr!=nil {
749+
returnnil,err
750+
}
751+
752+
offset,err:=s.getTimeOffsetFromQuery(req)
753+
754+
channel,err:=topic.GetExistingChannel(channelName)
755+
iferr!=nil {
756+
returnnil, http_api.Err{404,"CHANNEL_NOT_FOUND"}
757+
}
758+
759+
ifs.ctx.checkConsumeForMasterWrite(topic.GetTopicName(),topic.GetTopicPart()) {
760+
_,_,err:=channel.PeekAndReqDelayedMessages(time.Now().Add(time.Hour*time.Duration(offset)).UnixNano())
761+
iferr!=nil {
762+
nsqd.NsqLogger().Logf("failed to requeue the channel %v delayed data: %v, by client:%v",
763+
channelName,err,req.RemoteAddr)
764+
}
765+
}else {
766+
nsqd.NsqLogger().LogDebugf("should request to master: %v, from %v",
767+
topic.GetFullName(),req.RemoteAddr)
768+
returnnil, http_api.Err{400,FailedOnNotLeader}
769+
}
770+
returnnil,nil
771+
}
772+
721773
func (s*httpServer)doFixChannelConfirmed(w http.ResponseWriter,req*http.Request,ps httprouter.Params) (interface{},error) {
722774
_,topic,channelName,err:=s.getExistingTopicChannelFromQuery(req)
723775
iferr!=nil {

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp