@@ -86,6 +86,7 @@ func newHTTPServer(ctx *context, tlsEnabled bool, tlsRequired bool) *httpServer
86
86
router .Handle ("POST" ,"/channel/fixconfirmed" ,http_api .Decorate (s .doFixChannelConfirmed ,log ,http_api .V1 ))
87
87
router .Handle ("POST" ,"/channel/finishmemdelayed" ,http_api .Decorate (s .doFinishMemDelayed ,log ,http_api .V1 ))
88
88
router .Handle ("POST" ,"/channel/emptydelayed" ,http_api .Decorate (s .doEmptyChannelDelayed ,log ,http_api .V1 ))
89
+ router .Handle ("POST" ,"/channel/requeuedelayed" ,http_api .Decorate (s .doRequeueChannelDelayed ,log ,http_api .V1 ))
89
90
router .Handle ("POST" ,"/channel/setoffset" ,http_api .Decorate (s .doSetChannelOffset ,log ,http_api .V1 ))
90
91
router .Handle ("POST" ,"/channel/setorder" ,http_api .Decorate (s .doSetChannelOrder ,log ,http_api .V1 ))
91
92
router .Handle ("POST" ,"/channel/setclientlimit" ,http_api .Decorate (s .doSetChannelClientLimit ,log ,http_api .V1 ))
@@ -224,6 +225,30 @@ func (s *httpServer) getExistingTopicChannelFromQuery(req *http.Request) (url.Va
224
225
return reqParams ,topic ,channelName ,err
225
226
}
226
227
228
+ func (s * httpServer )getTimeOffsetFromQuery (req * http.Request ) (int ,error ) {
229
+ reqParams ,err := url .ParseQuery (req .URL .RawQuery )
230
+
231
+ if err != nil {
232
+ nsqd .NsqLogger ().LogErrorf ("failed to parse request params - %s" ,err )
233
+ return 1 , http_api.Err {400 ,"INVALID_REQUEST" }
234
+ }
235
+
236
+ offsetStr := reqParams .Get ("offset" )
237
+
238
+ if offsetStr == "" {
239
+ nsqd .NsqLogger ().LogErrorf ("The value of offset does not exist. Set the default value to 1" )
240
+ return 1 ,nil
241
+ }
242
+ offset ,err := strconv .Atoi (offsetStr )
243
+
244
+ if err != nil {
245
+ nsqd .NsqLogger ().LogErrorf ("offset invalid - %s" ,err )
246
+ return 1 , http_api.Err {400 ,"INVALID_REQUEST" }
247
+ }
248
+
249
+ return offset ,err
250
+ }
251
+
227
252
//TODO: will be refactored for further extension
228
253
func getTag (reqParams url.Values )string {
229
254
return reqParams .Get ("tag" )
@@ -718,6 +743,33 @@ func (s *httpServer) doEmptyChannelDelayed(w http.ResponseWriter, req *http.Requ
718
743
return nil ,nil
719
744
}
720
745
746
+ func (s * httpServer )doRequeueChannelDelayed (w http.ResponseWriter ,req * http.Request ,ps httprouter.Params ) (interface {},error ) {
747
+ _ ,topic ,channelName ,err := s .getExistingTopicChannelFromQuery (req )
748
+ if err != nil {
749
+ return nil ,err
750
+ }
751
+
752
+ offset ,err := s .getTimeOffsetFromQuery (req )
753
+
754
+ channel ,err := topic .GetExistingChannel (channelName )
755
+ if err != nil {
756
+ return nil , http_api.Err {404 ,"CHANNEL_NOT_FOUND" }
757
+ }
758
+
759
+ if s .ctx .checkConsumeForMasterWrite (topic .GetTopicName (),topic .GetTopicPart ()) {
760
+ _ ,_ ,err := channel .PeekAndReqDelayedMessages (time .Now ().Add (time .Hour * time .Duration (offset )).UnixNano ())
761
+ if err != nil {
762
+ nsqd .NsqLogger ().Logf ("failed to requeue the channel %v delayed data: %v, by client:%v" ,
763
+ channelName ,err ,req .RemoteAddr )
764
+ }
765
+ }else {
766
+ nsqd .NsqLogger ().LogDebugf ("should request to master: %v, from %v" ,
767
+ topic .GetFullName (),req .RemoteAddr )
768
+ return nil , http_api.Err {400 ,FailedOnNotLeader }
769
+ }
770
+ return nil ,nil
771
+ }
772
+
721
773
func (s * httpServer )doFixChannelConfirmed (w http.ResponseWriter ,req * http.Request ,ps httprouter.Params ) (interface {},error ) {
722
774
_ ,topic ,channelName ,err := s .getExistingTopicChannelFromQuery (req )
723
775
if err != nil {