- Notifications
You must be signed in to change notification settings - Fork928
feat: add wsproxy implementation for key fetching#14917
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to ourterms of service andprivacy statement. We’ll occasionally send you account related emails.
Already on GitHub?Sign in to your account
Uh oh!
There was an error while loading.Please reload this page.
Changes fromall commits
38574ff
ea5ec77
e6612bd
558ad30
9061a1c
0fef6b0
a801d0c
2ce4876
5eedbf8
6c2be2c
a1d4b46
f6cc1cd
b33616e
ee77d8e
42eda0b
File filter
Filter by extension
Conversations
Uh oh!
There was an error while loading.Please reload this page.
Jump to
Uh oh!
There was an error while loading.Please reload this page.
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,224 @@ | ||
package wsproxy | ||
import ( | ||
"context" | ||
"sync" | ||
"time" | ||
"golang.org/x/xerrors" | ||
"cdr.dev/slog" | ||
"github.com/coder/coder/v2/coderd/cryptokeys" | ||
"github.com/coder/coder/v2/codersdk" | ||
"github.com/coder/quartz" | ||
) | ||
const ( | ||
// latestSequence is a special sequence number that represents the latest key. | ||
latestSequence = -1 | ||
// refreshInterval is the interval at which the key cache will refresh. | ||
refreshInterval = time.Minute * 10 | ||
) | ||
type Fetcher interface { | ||
Fetch(ctx context.Context) ([]codersdk.CryptoKey, error) | ||
} | ||
type CryptoKeyCache struct { | ||
Clock quartz.Clock | ||
refreshCtx context.Context | ||
refreshCancel context.CancelFunc | ||
fetcher Fetcher | ||
logger slog.Logger | ||
mu sync.Mutex | ||
keys map[int32]codersdk.CryptoKey | ||
lastFetch time.Time | ||
refresher *quartz.Timer | ||
fetching bool | ||
closed bool | ||
cond *sync.Cond | ||
} | ||
func NewCryptoKeyCache(ctx context.Context, log slog.Logger, client Fetcher, opts ...func(*CryptoKeyCache)) (*CryptoKeyCache, error) { | ||
cache := &CryptoKeyCache{ | ||
Clock: quartz.NewReal(), | ||
logger: log, | ||
fetcher: client, | ||
} | ||
for _, opt := range opts { | ||
opt(cache) | ||
} | ||
cache.cond = sync.NewCond(&cache.mu) | ||
cache.refreshCtx, cache.refreshCancel = context.WithCancel(ctx) | ||
cache.refresher = cache.Clock.AfterFunc(refreshInterval, cache.refresh) | ||
keys, err := cache.cryptoKeys(ctx) | ||
if err != nil { | ||
cache.refreshCancel() | ||
return nil, xerrors.Errorf("initial fetch: %w", err) | ||
} | ||
cache.keys = keys | ||
return cache, nil | ||
} | ||
func (k *CryptoKeyCache) Signing(ctx context.Context) (codersdk.CryptoKey, error) { | ||
return k.cryptoKey(ctx, latestSequence) | ||
} | ||
func (k *CryptoKeyCache) Verifying(ctx context.Context, sequence int32) (codersdk.CryptoKey, error) { | ||
return k.cryptoKey(ctx, sequence) | ||
} | ||
func (k *CryptoKeyCache) cryptoKey(ctx context.Context, sequence int32) (codersdk.CryptoKey, error) { | ||
k.mu.Lock() | ||
defer k.mu.Unlock() | ||
if k.closed { | ||
return codersdk.CryptoKey{}, cryptokeys.ErrClosed | ||
} | ||
var key codersdk.CryptoKey | ||
var ok bool | ||
for key, ok = k.key(sequence); !ok && k.fetching && !k.closed; { | ||
k.cond.Wait() | ||
} | ||
if k.closed { | ||
return codersdk.CryptoKey{}, cryptokeys.ErrClosed | ||
} | ||
if ok { | ||
return checkKey(key, sequence, k.Clock.Now()) | ||
} | ||
k.fetching = true | ||
k.mu.Unlock() | ||
keys, err := k.cryptoKeys(ctx) | ||
if err != nil { | ||
return codersdk.CryptoKey{}, xerrors.Errorf("get keys: %w", err) | ||
} | ||
k.mu.Lock() | ||
k.lastFetch = k.Clock.Now() | ||
k.refresher.Reset(refreshInterval) | ||
k.keys = keys | ||
k.fetching = false | ||
k.cond.Broadcast() | ||
key, ok = k.key(sequence) | ||
if !ok { | ||
return codersdk.CryptoKey{}, cryptokeys.ErrKeyNotFound | ||
} | ||
return checkKey(key, sequence, k.Clock.Now()) | ||
} | ||
func (k *CryptoKeyCache) key(sequence int32) (codersdk.CryptoKey, bool) { | ||
if sequence == latestSequence { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others.Learn more. if you're going to use a special int32 to represent latest, you might as well go whole hog and just insert the crypto key into the map under that special value, and drop | ||
return k.keys[latestSequence], k.keys[latestSequence].CanSign(k.Clock.Now()) | ||
} | ||
key, ok := k.keys[sequence] | ||
return key, ok | ||
} | ||
func checkKey(key codersdk.CryptoKey, sequence int32, now time.Time) (codersdk.CryptoKey, error) { | ||
if sequence == latestSequence { | ||
if !key.CanSign(now) { | ||
return codersdk.CryptoKey{}, cryptokeys.ErrKeyInvalid | ||
} | ||
return key, nil | ||
} | ||
if !key.CanVerify(now) { | ||
return codersdk.CryptoKey{}, cryptokeys.ErrKeyInvalid | ||
} | ||
return key, nil | ||
} | ||
// refresh fetches the keys from the control plane and updates the cache. | ||
func (k *CryptoKeyCache) refresh() { | ||
now := k.Clock.Now("CryptoKeyCache", "refresh") | ||
k.mu.Lock() | ||
if k.closed { | ||
k.mu.Unlock() | ||
return | ||
} | ||
// If something's already fetching, we don't need to do anything. | ||
if k.fetching { | ||
k.mu.Unlock() | ||
return | ||
} | ||
// There's a window we must account for where the timer fires while a fetch | ||
// is ongoing but prior to the timer getting reset. In this case we want to | ||
// avoid double fetching. | ||
if now.Sub(k.lastFetch) < refreshInterval { | ||
k.mu.Unlock() | ||
return | ||
} | ||
k.fetching = true | ||
k.mu.Unlock() | ||
keys, err := k.cryptoKeys(k.refreshCtx) | ||
if err != nil { | ||
k.logger.Error(k.refreshCtx, "fetch crypto keys", slog.Error(err)) | ||
return | ||
} | ||
k.mu.Lock() | ||
defer k.mu.Unlock() | ||
k.lastFetch = k.Clock.Now() | ||
k.refresher.Reset(refreshInterval) | ||
k.keys = keys | ||
k.fetching = false | ||
k.cond.Broadcast() | ||
} | ||
// cryptoKeys queries the control plane for the crypto keys. | ||
// Outside of initialization, this should only be called by fetch. | ||
func (k *CryptoKeyCache) cryptoKeys(ctx context.Context) (map[int32]codersdk.CryptoKey, error) { | ||
keys, err := k.fetcher.Fetch(ctx) | ||
if err != nil { | ||
return nil, xerrors.Errorf("crypto keys: %w", err) | ||
} | ||
cache := toKeyMap(keys, k.Clock.Now()) | ||
return cache, nil | ||
} | ||
func toKeyMap(keys []codersdk.CryptoKey, now time.Time) map[int32]codersdk.CryptoKey { | ||
m := make(map[int32]codersdk.CryptoKey) | ||
var latest codersdk.CryptoKey | ||
for _, key := range keys { | ||
m[key.Sequence] = key | ||
if key.Sequence > latest.Sequence && key.CanSign(now) { | ||
m[latestSequence] = key | ||
} | ||
} | ||
return m | ||
} | ||
func (k *CryptoKeyCache) Close() { | ||
k.mu.Lock() | ||
defer k.mu.Unlock() | ||
if k.closed { | ||
return | ||
} | ||
k.closed = true | ||
k.refreshCancel() | ||
k.refresher.Stop() | ||
k.cond.Broadcast() | ||
} |
Uh oh!
There was an error while loading.Please reload this page.
Uh oh!
There was an error while loading.Please reload this page.