Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

fix: avoid terraform state concurrent access, remove global mutex#5273

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to ourterms of service andprivacy statement. We’ll occasionally send you account related emails.

Already on GitHub?Sign in to your account

Merged
deansheather merged 1 commit intomainfromdean/fix-terraform-corruption
Dec 6, 2022
Merged
Show file tree
Hide file tree
Changes fromall commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 15 additions & 5 deletionscli/server.go
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -112,6 +112,14 @@ func Server(vip *viper.Viper, newAPI func(context.Context, *coderd.Options) (*co
notifyCtx,notifyStop:=signal.NotifyContext(ctx,InterruptSignals...)
defernotifyStop()

// Ensure we have a unique cache directory for this process.
cacheDir:=filepath.Join(cfg.CacheDirectory.Value,uuid.NewString())
err=os.MkdirAll(cacheDir,0o700)
iferr!=nil {
returnxerrors.Errorf("create cache directory: %w",err)
}
deferos.RemoveAll(cacheDir)

// Clean up idle connections at the end, e.g.
// embedded-postgres can leave an idle connection
// which is caught by goleaks.
Expand DownExpand Up@@ -355,7 +363,7 @@ func Server(vip *viper.Viper, newAPI func(context.Context, *coderd.Options) (*co
Database:databasefake.New(),
DERPMap:derpMap,
Pubsub:database.NewPubsubInMemory(),
CacheDir:cfg.CacheDirectory.Value,
CacheDir:cacheDir,
GoogleTokenValidator:googleTokenValidator,
GitAuthConfigs:gitAuthConfigs,
RealIPConfig:realIPConfig,
Expand DownExpand Up@@ -632,7 +640,8 @@ func Server(vip *viper.Viper, newAPI func(context.Context, *coderd.Options) (*co
}()
provisionerdMetrics:=provisionerd.NewMetrics(options.PrometheusRegistry)
fori:=0;i<cfg.Provisioner.Daemons.Value;i++ {
daemon,err:=newProvisionerDaemon(ctx,coderAPI,provisionerdMetrics,logger,cfg,errCh,false)
daemonCacheDir:=filepath.Join(cacheDir,fmt.Sprintf("provisioner-%d",i))
daemon,err:=newProvisionerDaemon(ctx,coderAPI,provisionerdMetrics,logger,cfg,daemonCacheDir,errCh,false)
iferr!=nil {
returnxerrors.Errorf("create provisioner daemon: %w",err)
}
Expand DownExpand Up@@ -902,6 +911,7 @@ func newProvisionerDaemon(
metrics provisionerd.Metrics,
logger slog.Logger,
cfg*codersdk.DeploymentConfig,
cacheDirstring,
errChchanerror,
devbool,
) (srv*provisionerd.Server,errerror) {
Expand All@@ -912,9 +922,9 @@ func newProvisionerDaemon(
}
}()

err=os.MkdirAll(cfg.CacheDirectory.Value,0o700)
err=os.MkdirAll(cacheDir,0o700)
iferr!=nil {
returnnil,xerrors.Errorf("mkdir %q: %w",cfg.CacheDirectory.Value,err)
returnnil,xerrors.Errorf("mkdir %q: %w",cacheDir,err)
}

terraformClient,terraformServer:=provisionersdk.MemTransportPipe()
Expand All@@ -930,7 +940,7 @@ func newProvisionerDaemon(
ServeOptions:&provisionersdk.ServeOptions{
Listener:terraformServer,
},
CachePath:cfg.CacheDirectory.Value,
CachePath:cacheDir,
Logger:logger,
})
iferr!=nil&&!xerrors.Is(err,context.Canceled) {
Expand Down
75 changes: 34 additions & 41 deletionsprovisioner/terraform/executor.go
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -23,27 +23,15 @@ import (
"github.com/coder/coder/provisionersdk/proto"
)

// initMut is a global mutex that protects the Terraform cache directory from
// concurrent usage by path. Only `terraform init` commands are guarded by this
// mutex.
//
// When cache path is set, we must protect against multiple calls to
// `terraform init`.
//
// From the Terraform documentation:
//
//Note: The plugin cache directory is not guaranteed to be concurrency
//safe. The provider installer's behavior in environments with multiple
//terraform init calls is undefined.
var initMut = &sync.Mutex{}

type executor struct {
mut *sync.Mutex
binaryPath string
cachePath string
workdir string
// cachePath and workdir must not be used by multiple processes at once.
cachePath string
workdir string
}

func (e executor) basicEnv() []string {
func (e*executor) basicEnv() []string {
// Required for "terraform init" to find "git" to
// clone Terraform modules.
env := safeEnviron()
Expand All@@ -55,7 +43,8 @@ func (e executor) basicEnv() []string {
return env
}

func (e executor) execWriteOutput(ctx, killCtx context.Context, args, env []string, stdOutWriter, stdErrWriter io.WriteCloser) (err error) {
// execWriteOutput must only be called while the lock is held.
func (e *executor) execWriteOutput(ctx, killCtx context.Context, args, env []string, stdOutWriter, stdErrWriter io.WriteCloser) (err error) {
defer func() {
closeErr := stdOutWriter.Close()
if err == nil && closeErr != nil {
Expand DownExpand Up@@ -98,7 +87,8 @@ func (e executor) execWriteOutput(ctx, killCtx context.Context, args, env []stri
return cmd.Wait()
}

func (e executor) execParseJSON(ctx, killCtx context.Context, args, env []string, v interface{}) error {
// execParseJSON must only be called while the lock is held.
func (e *executor) execParseJSON(ctx, killCtx context.Context, args, env []string, v interface{}) error {
if ctx.Err() != nil {
return ctx.Err()
}
Expand DownExpand Up@@ -133,7 +123,7 @@ func (e executor) execParseJSON(ctx, killCtx context.Context, args, env []string
return nil
}

func (e executor) checkMinVersion(ctx context.Context) error {
func (e*executor) checkMinVersion(ctx context.Context) error {
v, err := e.version(ctx)
if err != nil {
return err
Expand All@@ -147,7 +137,8 @@ func (e executor) checkMinVersion(ctx context.Context) error {
return nil
}

func (e executor) version(ctx context.Context) (*version.Version, error) {
// version doesn't need the lock because it doesn't read or write to any state.
func (e *executor) version(ctx context.Context) (*version.Version, error) {
return versionFromBinaryPath(ctx, e.binaryPath)
}

Expand DownExpand Up@@ -177,7 +168,10 @@ func versionFromBinaryPath(ctx context.Context, binaryPath string) (*version.Ver
return version.NewVersion(vj.Version)
}

func (e executor) init(ctx, killCtx context.Context, logr logSink) error {
func (e *executor) init(ctx, killCtx context.Context, logr logSink) error {
e.mut.Lock()
defer e.mut.Unlock()

outWriter, doneOut := logWriter(logr, proto.LogLevel_DEBUG)
errWriter, doneErr := logWriter(logr, proto.LogLevel_ERROR)
defer func() {
Expand All@@ -193,23 +187,14 @@ func (e executor) init(ctx, killCtx context.Context, logr logSink) error {
"-input=false",
}

// When cache path is set, we must protect against multiple calls
// to `terraform init`.
//
// From the Terraform documentation:
// Note: The plugin cache directory is not guaranteed to be
// concurrency safe. The provider installer's behavior in
// environments with multiple terraform init calls is undefined.
if e.cachePath != "" {
initMut.Lock()
defer initMut.Unlock()
}

return e.execWriteOutput(ctx, killCtx, args, e.basicEnv(), outWriter, errWriter)
}

// revive:disable-next-line:flag-parameter
func (e executor) plan(ctx, killCtx context.Context, env, vars []string, logr logSink, destroy bool) (*proto.Provision_Response, error) {
func (e *executor) plan(ctx, killCtx context.Context, env, vars []string, logr logSink, destroy bool) (*proto.Provision_Response, error) {
e.mut.Lock()
defer e.mut.Unlock()

planfilePath := filepath.Join(e.workdir, "terraform.tfplan")
args := []string{
"plan",
Expand DownExpand Up@@ -257,7 +242,8 @@ func (e executor) plan(ctx, killCtx context.Context, env, vars []string, logr lo
}, nil
}

func (e executor) planResources(ctx, killCtx context.Context, planfilePath string) ([]*proto.Resource, error) {
// planResources must only be called while the lock is held.
func (e *executor) planResources(ctx, killCtx context.Context, planfilePath string) ([]*proto.Resource, error) {
plan, err := e.showPlan(ctx, killCtx, planfilePath)
if err != nil {
return nil, xerrors.Errorf("show terraform plan file: %w", err)
Expand All@@ -270,14 +256,16 @@ func (e executor) planResources(ctx, killCtx context.Context, planfilePath strin
return ConvertResources(plan.PlannedValues.RootModule, rawGraph)
}

func (e executor) showPlan(ctx, killCtx context.Context, planfilePath string) (*tfjson.Plan, error) {
// showPlan must only be called while the lock is held.
func (e *executor) showPlan(ctx, killCtx context.Context, planfilePath string) (*tfjson.Plan, error) {
args := []string{"show", "-json", "-no-color", planfilePath}
p := new(tfjson.Plan)
err := e.execParseJSON(ctx, killCtx, args, e.basicEnv(), p)
return p, err
}

func (e executor) graph(ctx, killCtx context.Context) (string, error) {
// graph must only be called while the lock is held.
func (e *executor) graph(ctx, killCtx context.Context) (string, error) {
if ctx.Err() != nil {
return "", ctx.Err()
}
Expand All@@ -302,9 +290,12 @@ func (e executor) graph(ctx, killCtx context.Context) (string, error) {
}

// revive:disable-next-line:flag-parameter
func (e executor) apply(
func (e*executor) apply(
ctx, killCtx context.Context, plan []byte, env []string, logr logSink,
) (*proto.Provision_Response, error) {
e.mut.Lock()
defer e.mut.Unlock()

planFile, err := ioutil.TempFile("", "coder-terrafrom-plan")
if err != nil {
return nil, xerrors.Errorf("create plan file: %w", err)
Expand DownExpand Up@@ -356,7 +347,8 @@ func (e executor) apply(
}, nil
}

func (e executor) stateResources(ctx, killCtx context.Context) ([]*proto.Resource, error) {
// stateResources must only be called while the lock is held.
func (e *executor) stateResources(ctx, killCtx context.Context) ([]*proto.Resource, error) {
state, err := e.state(ctx, killCtx)
if err != nil {
return nil, err
Expand All@@ -375,7 +367,8 @@ func (e executor) stateResources(ctx, killCtx context.Context) ([]*proto.Resourc
return resources, nil
}

func (e executor) state(ctx, killCtx context.Context) (*tfjson.State, error) {
// state must only be called while the lock is held.
func (e *executor) state(ctx, killCtx context.Context) (*tfjson.State, error) {
args := []string{"show", "-json", "-no-color"}
state := &tfjson.State{}
err := e.execParseJSON(ctx, killCtx, args, e.basicEnv(), state)
Expand Down
6 changes: 3 additions & 3 deletionsprovisioner/terraform/provision_test.go
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -77,7 +77,7 @@ func readProvisionLog(t *testing.T, response proto.DRPCProvisioner_ProvisionClie

iflog:=msg.GetLog();log!=nil {
t.Log(log.Level.String(),log.Output)
logBuf.WriteString(log.Output)
_,_=logBuf.WriteString(log.Output)
}
ifc=msg.GetComplete();c!=nil {
require.Empty(t,c.Error)
Expand DownExpand Up@@ -190,8 +190,6 @@ func TestProvision_Cancel(t *testing.T) {
funcTestProvision(t*testing.T) {
t.Parallel()

ctx,api:=setupProvisioner(t,nil)

testCases:= []struct {
Namestring
Filesmap[string]string
Expand DownExpand Up@@ -329,6 +327,8 @@ func TestProvision(t *testing.T) {
t.Run(testCase.Name,func(t*testing.T) {
t.Parallel()

ctx,api:=setupProvisioner(t,nil)

directory:=t.TempDir()
forpath,content:=rangetestCase.Files {
err:=os.WriteFile(filepath.Join(directory,path), []byte(content),0o600)
Expand Down
13 changes: 9 additions & 4 deletionsprovisioner/terraform/serve.go
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -3,6 +3,7 @@ package terraform
import (
"context"
"path/filepath"
"sync"
"time"

"github.com/cli/safeexec"
Expand All@@ -22,8 +23,9 @@ type ServeOptions struct {
// BinaryPath specifies the "terraform" binary to use.
// If omitted, the $PATH will attempt to find it.
BinaryPath string
CachePath string
Logger slog.Logger
// CachePath must not be used by multiple processes at once.
CachePath string
Logger slog.Logger

// ExitTimeout defines how long we will wait for a running Terraform
// command to exit (cleanly) if the provision was stopped. This only
Expand DownExpand Up@@ -91,6 +93,7 @@ func Serve(ctx context.Context, options *ServeOptions) error {
options.ExitTimeout = defaultExitTimeout
}
return provisionersdk.Serve(ctx, &server{
execMut: &sync.Mutex{},
binaryPath: options.BinaryPath,
cachePath: options.CachePath,
logger: options.Logger,
Expand All@@ -99,14 +102,16 @@ func Serve(ctx context.Context, options *ServeOptions) error {
}

type server struct {
execMut *sync.Mutex
binaryPath string
cachePath string
logger slog.Logger
exitTimeout time.Duration
}

func (s *server) executor(workdir string) executor {
return executor{
func (s *server) executor(workdir string) *executor {
return &executor{
mut: s.execMut,
binaryPath: s.binaryPath,
cachePath: s.cachePath,
workdir: workdir,
Expand Down
3 changes: 2 additions & 1 deletionprovisionerd/provisionerd.go
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -57,7 +57,8 @@ type Options struct {
JobPollJitter time.Duration
JobPollDebounce time.Duration
Provisioners Provisioners
WorkDirectory string
// WorkDirectory must not be used by multiple processes at once.
WorkDirectory string
}

// New creates and starts a provisioner daemon.
Expand Down

[8]ページ先頭

©2009-2025 Movatter.jp