Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit3ff19ab

Browse files
authored
feat: implement delivery retry schedule (#532)
* feat: implement delivery retry schedule* docs: generate config* chore: typo
1 parent7bb22b9 commit3ff19ab

File tree

8 files changed

+234
-21
lines changed

8 files changed

+234
-21
lines changed

‎docs/pages/references/configuration.mdx‎

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ Global configurations are provided through env variables or a YAML file. ConfigM
7878
|`LOG_LEVEL`| Defines the verbosity of application logs. Common values: 'trace', 'debug', 'info', 'warn', 'error'.|`info`| No|
7979
|`LOG_MAX_CONCURRENCY`| Maximum number of log writing operations to process concurrently.|`1`| No|
8080
|`MAX_DESTINATIONS_PER_TENANT`| Maximum number of destinations allowed per tenant/organization.|`20`| No|
81-
|`MAX_RETRY_LIMIT`| Maximum number of retry attempts for a single event delivery before giving up.|`10`| No|
81+
|`MAX_RETRY_LIMIT`| Maximum number of retry attempts for a single event delivery before giving up.Ignored if retry_schedule is provided.|`10`| No|
8282
|`ORGANIZATION_NAME`| Name of the organization, used for display purposes and potentially in user agent strings.|`nil`| No|
8383
|`OTEL_EXPORTER`| Specifies the OTLP exporter to use for this telemetry type (e.g., 'otlp'). Typically used with environment variables like OTEL_EXPORTER_OTLP_TRACES_ENDPOINT.|`nil`| Conditional|
8484
|`OTEL_PROTOCOL`| Specifies the OTLP protocol ('grpc' or 'http') for this telemetry type. Typically used with environment variables like OTEL_EXPORTER_OTLP_TRACES_PROTOCOL.|`nil`| Conditional|
@@ -120,7 +120,8 @@ Global configurations are provided through env variables or a YAML file. ConfigM
120120
|`REDIS_PASSWORD`| Password for Redis authentication, if required by the server.|`nil`| Yes|
121121
|`REDIS_PORT`| Port number for the Redis server.|`6379`| Yes|
122122
|`REDIS_TLS_ENABLED`| Enable TLS encryption for Redis connection.|`false`| No|
123-
|`RETRY_INTERVAL_SECONDS`| Interval in seconds between delivery retry attempts for failed webhooks.|`30`| No|
123+
|`RETRY_INTERVAL_SECONDS`| Interval in seconds for exponential backoff retry strategy (base 2). Ignored if retry_schedule is provided.|`30`| No|
124+
|`RETRY_SCHEDULE`| Comma-separated list of retry delays in seconds. If provided, overrides retry_interval_seconds and retry_max_limit. Schedule length defines the max number of retries. Example: '5,60,600,3600,7200' for 5 retries at 5s, 1m, 10m, 1h, 2h.|`[]`| No|
124125
|`SERVICE`| Specifies the service type to run. Valid values: 'api', 'log', 'delivery', or empty/all for singular mode (runs all services).|`nil`| No|
125126
|`TELEMETRY_BATCH_INTERVAL`| Maximum time in seconds to wait before sending a batch of telemetry events if batch size is not reached.|`5`| No|
126127
|`TELEMETRY_BATCH_SIZE`| Maximum number of telemetry events to batch before sending.|`100`| No|
@@ -552,12 +553,15 @@ redis:
552553
tls_enabled:false
553554

554555

555-
# Interval in secondsbetween deliveryretryattempts for failed webhooks.
556+
# Interval in secondsfor exponential backoffretrystrategy (base 2). Ignored if retry_schedule is provided.
556557
retry_interval_seconds:30
557558

558-
# Maximum number of retry attempts for a single event delivery before giving up.
559+
# Maximum number of retry attempts for a single event delivery before giving up. Ignored if retry_schedule is provided.
559560
retry_max_limit:10
560561

562+
# Comma-separated list of retry delays in seconds. If provided, overrides retry_interval_seconds and retry_max_limit. Schedule length defines the max number of retries. Example: '5,60,600,3600,7200' for 5 retries at 5s, 1m, 10m, 1h, 2h.
563+
retry_schedule:[]
564+
561565
# Specifies the service type to run. Valid values: 'api', 'log', 'delivery', or empty/all for singular mode (runs all services).
562566
service:""
563567

‎internal/backoff/backoff.go‎

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ import "time"
44

55
typeBackoffinterface {
66
// Duration returns the duration to wait before retrying the operation.
7-
// Duration accepts thenumeber of times the operation has been retried.
7+
// Duration accepts thenumber of times the operation has been retried.
88
// If the operation has never been retried, the number should be 0.
99
Duration(int) time.Duration
1010
}
@@ -45,3 +45,22 @@ var _ Backoff = &ConstantBackoff{}
4545
func (b*ConstantBackoff)Duration(retriesint) time.Duration {
4646
returnb.Interval
4747
}
48+
49+
// ScheduledBackoff uses a predefined schedule of delays for each retry attempt.
50+
// If the retry attempt exceeds the schedule length, it returns the last value.
51+
typeScheduledBackoffstruct {
52+
Schedule []time.Duration
53+
}
54+
55+
var_Backoff=&ScheduledBackoff{}
56+
57+
func (b*ScheduledBackoff)Duration(retriesint) time.Duration {
58+
iflen(b.Schedule)==0 {
59+
return0
60+
}
61+
ifretries>=len(b.Schedule) {
62+
// Return last value for attempts beyond schedule
63+
returnb.Schedule[len(b.Schedule)-1]
64+
}
65+
returnb.Schedule[retries]
66+
}

‎internal/backoff/backoff_test.go‎

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,3 +87,53 @@ func TestBackoff_Constant(t *testing.T) {
8787
}
8888
testBackoff(t,"ConstantBackoff{Interval:30*time.Second}",bo,testCases)
8989
}
90+
91+
funcTestBackoff_Scheduled(t*testing.T) {
92+
t.Parallel()
93+
94+
t.Run("CustomSchedule",func(t*testing.T) {
95+
bo:=&backoff.ScheduledBackoff{
96+
Schedule: []time.Duration{
97+
5*time.Second,
98+
1*time.Minute,
99+
10*time.Minute,
100+
1*time.Hour,
101+
2*time.Hour,
102+
},
103+
}
104+
testCases:= []testCase{
105+
{0,5*time.Second},
106+
{1,1*time.Minute},
107+
{2,10*time.Minute},
108+
{3,1*time.Hour},
109+
{4,2*time.Hour},
110+
{5,2*time.Hour},// Beyond schedule, returns last value
111+
{10,2*time.Hour},// Beyond schedule, returns last value
112+
}
113+
testBackoff(t,"ScheduledBackoff{Custom}",bo,testCases)
114+
})
115+
116+
t.Run("EmptySchedule",func(t*testing.T) {
117+
bo:=&backoff.ScheduledBackoff{
118+
Schedule: []time.Duration{},
119+
}
120+
testCases:= []testCase{
121+
{0,0},
122+
{1,0},
123+
{5,0},
124+
}
125+
testBackoff(t,"ScheduledBackoff{Empty}",bo,testCases)
126+
})
127+
128+
t.Run("SingleElement",func(t*testing.T) {
129+
bo:=&backoff.ScheduledBackoff{
130+
Schedule: []time.Duration{1*time.Minute},
131+
}
132+
testCases:= []testCase{
133+
{0,1*time.Minute},
134+
{1,1*time.Minute},// Beyond schedule, returns last value
135+
{5,1*time.Minute},// Beyond schedule, returns last value
136+
}
137+
testBackoff(t,"ScheduledBackoff{Single}",bo,testCases)
138+
})
139+
}

‎internal/config/config.go‎

Lines changed: 31 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,10 @@ import (
55
"errors"
66
"fmt"
77
"strings"
8+
"time"
89

910
"github.com/caarlos0/env/v9"
11+
"github.com/hookdeck/outpost/internal/backoff"
1012
"github.com/hookdeck/outpost/internal/migrator"
1113
"github.com/hookdeck/outpost/internal/redis"
1214
"github.com/hookdeck/outpost/internal/telemetry"
@@ -73,8 +75,9 @@ type Config struct {
7375
LogMaxConcurrencyint`yaml:"log_max_concurrency" env:"LOG_MAX_CONCURRENCY" desc:"Maximum number of log writing operations to process concurrently." required:"N"`
7476

7577
// Delivery Retry
76-
RetryIntervalSecondsint`yaml:"retry_interval_seconds" env:"RETRY_INTERVAL_SECONDS" desc:"Interval in seconds between delivery retry attempts for failed webhooks." required:"N"`
77-
RetryMaxLimitint`yaml:"retry_max_limit" env:"MAX_RETRY_LIMIT" desc:"Maximum number of retry attempts for a single event delivery before giving up." required:"N"`
78+
RetrySchedule []int`yaml:"retry_schedule" env:"RETRY_SCHEDULE" envSeparator:"," desc:"Comma-separated list of retry delays in seconds. If provided, overrides retry_interval_seconds and retry_max_limit. Schedule length defines the max number of retries. Example: '5,60,600,3600,7200' for 5 retries at 5s, 1m, 10m, 1h, 2h." required:"N"`
79+
RetryIntervalSecondsint`yaml:"retry_interval_seconds" env:"RETRY_INTERVAL_SECONDS" desc:"Interval in seconds for exponential backoff retry strategy (base 2). Ignored if retry_schedule is provided." required:"N"`
80+
RetryMaxLimitint`yaml:"retry_max_limit" env:"MAX_RETRY_LIMIT" desc:"Maximum number of retry attempts for a single event delivery before giving up. Ignored if retry_schedule is provided." required:"N"`
7881

7982
// Event Delivery
8083
MaxDestinationsPerTenantint`yaml:"max_destinations_per_tenant" env:"MAX_DESTINATIONS_PER_TENANT" desc:"Maximum number of destinations allowed per tenant/organization." required:"N"`
@@ -103,14 +106,14 @@ type Config struct {
103106
}
104107

105108
var (
106-
ErrMismatchedServiceType=errors.New("config validation error: service type mismatch")
107-
ErrInvalidServiceType=errors.New("config validation error: invalid service type")
108-
ErrMissingRedis=errors.New("config validation error: redis configuration is required")
109-
ErrMissingLogStorage=errors.New("config validation error: log storage must be provided")
110-
ErrMissingMQs=errors.New("config validation error: message queue configuration is required")
111-
ErrMissingAESSecret=errors.New("config validation error: AES encryption secret is required")
112-
ErrInvalidPortalProxyURL=errors.New("config validation error: invalid portal proxy url")
113-
ErrInvalidDeploymentID=errors.New("config validation error: deployment_id must contain only alphanumeric characters, hyphens, and underscores (max 64 characters)")
109+
ErrMismatchedServiceType=errors.New("config validation error: service type mismatch")
110+
ErrInvalidServiceType=errors.New("config validation error: invalid service type")
111+
ErrMissingRedis=errors.New("config validation error: redis configuration is required")
112+
ErrMissingLogStorage=errors.New("config validation error: log storage must be provided")
113+
ErrMissingMQs=errors.New("config validation error: message queue configuration is required")
114+
ErrMissingAESSecret=errors.New("config validation error: AES encryption secret is required")
115+
ErrInvalidPortalProxyURL=errors.New("config validation error: invalid portal proxy url")
116+
ErrInvalidDeploymentID=errors.New("config validation error: deployment_id must contain only alphanumeric characters, hyphens, and underscores (max 64 characters)")
114117
)
115118

116119
func (c*Config)InitDefaults() {
@@ -152,6 +155,7 @@ func (c *Config) InitDefaults() {
152155
c.PublishMaxConcurrency=1
153156
c.DeliveryMaxConcurrency=1
154157
c.LogMaxConcurrency=1
158+
c.RetrySchedule= []int{}// Empty by default, falls back to exponential backoff
155159
c.RetryIntervalSeconds=30
156160
c.RetryMaxLimit=10
157161
c.MaxDestinationsPerTenant=20
@@ -373,6 +377,23 @@ func (c *Config) ConfigFilePath() string {
373377
returnc.configPath
374378
}
375379

380+
// GetRetryBackoff returns the configured backoff strategy based on retry configuration
381+
func (c*Config)GetRetryBackoff() (backoff.Backoff,int) {
382+
iflen(c.RetrySchedule)>0 {
383+
// Use scheduled backoff if retry_schedule is provided
384+
schedule:=make([]time.Duration,len(c.RetrySchedule))
385+
fori,seconds:=rangec.RetrySchedule {
386+
schedule[i]=time.Duration(seconds)*time.Second
387+
}
388+
return&backoff.ScheduledBackoff{Schedule:schedule},c.RetryMaxLimit
389+
}
390+
// Fall back to exponential backoff
391+
return&backoff.ExponentialBackoff{
392+
Interval:time.Duration(c.RetryIntervalSeconds)*time.Second,
393+
Base:2,
394+
},c.RetryMaxLimit
395+
}
396+
376397
typeTelemetryConfigstruct {
377398
Disabledbool`yaml:"disabled" env:"DISABLE_TELEMETRY" desc:"Disables telemetry within the 'telemetry' block (Hookdeck usage stats and Sentry). Can be overridden by the global 'disable_telemetry' flag at the root of the configuration." required:"N"`
378399
BatchSizeint`yaml:"batch_size" env:"TELEMETRY_BATCH_SIZE" desc:"Maximum number of telemetry events to batch before sending." required:"N"`
Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
package config_test
2+
3+
import (
4+
"testing"
5+
6+
"github.com/hookdeck/outpost/internal/config"
7+
"github.com/stretchr/testify/assert"
8+
)
9+
10+
funcTestRetrySchedule(t*testing.T) {
11+
tests:= []struct {
12+
namestring
13+
filesmap[string][]byte
14+
envVarsmap[string]string
15+
wantSchedule []int
16+
wantIntervalint
17+
wantMaxLimitint
18+
}{
19+
{
20+
name:"default empty retry schedule",
21+
files:map[string][]byte{},
22+
envVars:map[string]string{},
23+
wantSchedule: []int{},
24+
wantInterval:30,// default exponential backoff interval
25+
wantMaxLimit:10,// default max limit
26+
},
27+
{
28+
name:"yaml retry schedule overrides max limit",
29+
files:map[string][]byte{
30+
"config.yaml": []byte(`
31+
retry_schedule: [5, 300, 1800, 7200, 18000, 36000, 36000]
32+
`),
33+
},
34+
envVars:map[string]string{
35+
"CONFIG":"config.yaml",
36+
},
37+
wantSchedule: []int{5,300,1800,7200,18000,36000,36000},
38+
wantInterval:30,// still have default even though not used
39+
wantMaxLimit:7,// overridden to schedule length
40+
},
41+
{
42+
name:"env retry schedule overrides yaml and max limit",
43+
files:map[string][]byte{
44+
"config.yaml": []byte(`
45+
retry_schedule: [10, 20, 30]
46+
`),
47+
},
48+
envVars:map[string]string{
49+
"CONFIG":"config.yaml",
50+
"RETRY_SCHEDULE":"5,300,1800",
51+
},
52+
wantSchedule: []int{5,300,1800},
53+
wantInterval:30,
54+
wantMaxLimit:3,// overridden to env schedule length
55+
},
56+
{
57+
name:"retry_interval_seconds without retry_schedule",
58+
files:map[string][]byte{
59+
"config.yaml": []byte(`
60+
retry_interval_seconds: 60
61+
`),
62+
},
63+
envVars:map[string]string{
64+
"CONFIG":"config.yaml",
65+
},
66+
wantSchedule: []int{},
67+
wantInterval:60,
68+
wantMaxLimit:10,// default max limit used
69+
},
70+
{
71+
name:"both retry_schedule and retry_interval_seconds set",
72+
files:map[string][]byte{
73+
"config.yaml": []byte(`
74+
retry_schedule: [5, 300, 1800]
75+
retry_interval_seconds: 60
76+
`),
77+
},
78+
envVars:map[string]string{
79+
"CONFIG":"config.yaml",
80+
},
81+
wantSchedule: []int{5,300,1800},
82+
wantInterval:60,// both present, schedule takes precedence
83+
wantMaxLimit:3,// overridden to schedule length
84+
},
85+
}
86+
87+
for_,tt:=rangetests {
88+
t.Run(tt.name,func(t*testing.T) {
89+
mockOS:=&mockOS{
90+
files:tt.files,
91+
envVars:tt.envVars,
92+
}
93+
94+
mockOS.envVars["API_KEY"]="test-key"
95+
mockOS.envVars["API_JWT_SECRET"]="test-jwt-secret"
96+
mockOS.envVars["AES_ENCRYPTION_SECRET"]="test-aes-secret-16b"
97+
mockOS.envVars["POSTGRES_URL"]="postgres://localhost:5432/test"
98+
mockOS.envVars["RABBITMQ_SERVER_URL"]="amqp://localhost:5672"
99+
100+
cfg,err:=config.ParseWithOS(config.Flags{},mockOS)
101+
assert.NoError(t,err)
102+
assert.Equal(t,tt.wantSchedule,cfg.RetrySchedule)
103+
assert.Equal(t,tt.wantInterval,cfg.RetryIntervalSeconds)
104+
assert.Equal(t,tt.wantMaxLimit,cfg.RetryMaxLimit)
105+
})
106+
}
107+
}

‎internal/config/config_test.go‎

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@ func TestDefaultValues(t *testing.T) {
6363
assert.Equal(t,1,cfg.PublishMaxConcurrency)
6464
assert.Equal(t,1,cfg.DeliveryMaxConcurrency)
6565
assert.Equal(t,1,cfg.LogMaxConcurrency)
66+
assert.Equal(t, []int{},cfg.RetrySchedule)
6667
assert.Equal(t,30,cfg.RetryIntervalSeconds)
6768
assert.Equal(t,10,cfg.RetryMaxLimit)
6869
assert.Equal(t,20,cfg.MaxDestinationsPerTenant)

‎internal/config/validation.go‎

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,10 @@ func (c *Config) Validate(flags Flags) error {
4545
returnerr
4646
}
4747

48+
iferr:=c.validateRetryConfiguration();err!=nil {
49+
returnerr
50+
}
51+
4852
// Mark as validated if we get here
4953
c.validated=true
5054
returnnil
@@ -167,3 +171,12 @@ func (c *Config) validateDeploymentID() error {
167171

168172
returnnil
169173
}
174+
175+
// validateRetryConfiguration validates and adjusts the retry configuration
176+
func (c*Config)validateRetryConfiguration()error {
177+
// If retry_schedule is provided, override retry_max_limit to match schedule length
178+
iflen(c.RetrySchedule)>0 {
179+
c.RetryMaxLimit=len(c.RetrySchedule)
180+
}
181+
returnnil
182+
}

‎internal/services/delivery/delivery.go‎

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@ import (
77
"time"
88

99
"github.com/hookdeck/outpost/internal/alert"
10-
"github.com/hookdeck/outpost/internal/backoff"
1110
"github.com/hookdeck/outpost/internal/config"
1211
"github.com/hookdeck/outpost/internal/consumer"
1312
"github.com/hookdeck/outpost/internal/deliverymq"
@@ -140,6 +139,8 @@ func NewService(ctx context.Context,
140139
alert.WithDeploymentID(cfg.DeploymentID),
141140
)
142141

142+
retryBackoff,retryMaxLimit:=cfg.GetRetryBackoff()
143+
143144
handler=deliverymq.NewMessageHandler(
144145
logger,
145146
redisClient,
@@ -149,11 +150,8 @@ func NewService(ctx context.Context,
149150
registry,
150151
eventTracer,
151152
retryScheduler,
152-
&backoff.ExponentialBackoff{
153-
Interval:time.Duration(cfg.RetryIntervalSeconds)*time.Second,
154-
Base:2,
155-
},
156-
cfg.RetryMaxLimit,
153+
retryBackoff,
154+
retryMaxLimit,
157155
alertMonitor,
158156
)
159157
}

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp