diff --git a/internal/pkg/docker/dockerengine/dockerengine.go b/internal/pkg/docker/dockerengine/dockerengine.go index 62d4805316c..4e5f0268e91 100644 --- a/internal/pkg/docker/dockerengine/dockerengine.go +++ b/internal/pkg/docker/dockerengine/dockerengine.go @@ -87,6 +87,7 @@ type RunOptions struct { ContainerNetwork string // Optional. Network mode for the container. LogOptions RunLogOptions // Optional. Configure logging for output from the container AddLinuxCapabilities []string // Optional. Adds linux capabilities to the container. + Init bool // Optional. Adds an init process as an entrypoint. } // RunLogOptions holds the logging configuration for Run(). @@ -274,6 +275,10 @@ func (in *RunOptions) generateRunArguments() []string { args = append(args, "--cap-add", cap) } + if in.Init { + args = append(args, "--init") + } + args = append(args, in.ImageURI) if in.Command != nil && len(in.Command) > 0 { diff --git a/internal/pkg/docker/orchestrator/Pause-Dockerfile b/internal/pkg/docker/orchestrator/Pause-Dockerfile index 0d64cd66ae2..83c31c9ed32 100644 --- a/internal/pkg/docker/orchestrator/Pause-Dockerfile +++ b/internal/pkg/docker/orchestrator/Pause-Dockerfile @@ -1,4 +1,7 @@ FROM public.ecr.aws/amazonlinux/amazonlinux:2023 RUN dnf install -y aws-cli iptables -RUN dnf install -y https://s3.amazonaws.com/session-manager-downloads/plugin/latest/linux_64bit/session-manager-plugin.rpm \ No newline at end of file + +ARG ARCH=64bit +# url from https://docs.aws.amazon.com/systems-manager/latest/userguide/install-plugin-linux.html +RUN dnf install -y https://s3.amazonaws.com/session-manager-downloads/plugin/latest/linux_${ARCH}/session-manager-plugin.rpm \ No newline at end of file diff --git a/internal/pkg/docker/orchestrator/orchestrator.go b/internal/pkg/docker/orchestrator/orchestrator.go index 31bff7f4dff..f1e9e97c52e 100644 --- a/internal/pkg/docker/orchestrator/orchestrator.go +++ b/internal/pkg/docker/orchestrator/orchestrator.go @@ -12,7 +12,9 @@ import ( "maps" "net" "os" + "runtime" "strconv" + "strings" "sync" "sync/atomic" "time" @@ -283,11 +285,6 @@ func (o *Orchestrator) setupProxyConnections(ctx context.Context, pauseContainer return fmt.Errorf("modify iptables: %w", err) } - err = o.docker.Exec(ctx, pauseContainer, io.Discard, "iptables-save") - if err != nil { - return fmt.Errorf("save iptables: %w", err) - } - err = o.docker.Exec(ctx, pauseContainer, io.Discard, "/bin/bash", "-c", fmt.Sprintf(`echo %s %s >> /etc/hosts`, ip.String(), host.Name)) if err != nil { @@ -338,10 +335,18 @@ func ipv4Increment(ip net.IP, network *net.IPNet) (net.IP, error) { } func (o *Orchestrator) buildPauseContainer(ctx context.Context) error { + arch := "64bit" + if strings.Contains(runtime.GOARCH, "arm") { + arch = "arm64" + } + return o.docker.Build(ctx, &dockerengine.BuildArguments{ URI: pauseCtrURI, Tags: []string{pauseCtrTag}, DockerfileContent: pauseDockerfile, + Args: map[string]string{ + "ARCH": arch, + }, }, os.Stderr) } @@ -351,6 +356,7 @@ func (o *Orchestrator) buildPauseContainer(ctx context.Context) error { func (o *Orchestrator) Stop() { o.stopOnce.Do(func() { close(o.stopped) + fmt.Printf("\nStopping task...\n") o.actions <- &stopAction{} }) } @@ -368,9 +374,11 @@ func (a *stopAction) Do(o *Orchestrator) error { } // stop pause container + fmt.Printf("Stopping %q\n", "pause") if err := o.docker.Stop(context.Background(), o.containerID("pause")); err != nil { errs = append(errs, fmt.Errorf("stop %q: %w", "pause", err)) } + fmt.Printf("Stopped %q\n", "pause") return errors.Join(errs...) } @@ -386,10 +394,12 @@ func (o *Orchestrator) stopTask(ctx context.Context, task Task) error { for name := range task.Containers { name := name go func() { + fmt.Printf("Stopping %q\n", name) if err := o.docker.Stop(ctx, o.containerID(name)); err != nil { errCh <- fmt.Errorf("stop %q: %w", name, err) return } + fmt.Printf("Stopped %q\n", name) errCh <- nil }() } @@ -456,6 +466,7 @@ func (o *Orchestrator) pauseRunOptions(t Task) dockerengine.RunOptions { ContainerPorts: make(map[string]string), Secrets: t.PauseSecrets, AddLinuxCapabilities: []string{"NET_ADMIN"}, + Init: true, } for _, ctr := range t.Containers { @@ -486,18 +497,22 @@ func (o *Orchestrator) run(taskID int32, opts dockerengine.RunOptions) { o.wg.Add(1) go func() { defer o.wg.Done() + err := o.docker.Run(context.Background(), &opts) - if err := o.docker.Run(context.Background(), &opts); err != nil { - curTaskID := o.curTaskID.Load() - if curTaskID == orchestratorStoppedTaskID { - return - } + // if the orchestrator has already stopped, + // we don't want to report the error + curTaskID := o.curTaskID.Load() + if curTaskID == orchestratorStoppedTaskID { + return + } - // the error is from the pause container - // or from the currently running task - if taskID == pauseCtrTaskID || taskID == curTaskID { - o.runErrs <- fmt.Errorf("run %q: %w", opts.ContainerName, err) + // the error is from the pause container + // or from the currently running task + if taskID == pauseCtrTaskID || taskID == curTaskID { + if err == nil { + err = errors.New("container stopped unexpectedly") } + o.runErrs <- fmt.Errorf("run %q: %w", opts.ContainerName, err) } }() } diff --git a/internal/pkg/docker/orchestrator/orchestrator_test.go b/internal/pkg/docker/orchestrator/orchestrator_test.go index 3c06596a0e5..6e14133089b 100644 --- a/internal/pkg/docker/orchestrator/orchestrator_test.go +++ b/internal/pkg/docker/orchestrator/orchestrator_test.go @@ -37,19 +37,20 @@ func TestOrchestrator(t *testing.T) { type test func(*testing.T, *Orchestrator) tests := map[string]struct { - logOptions logOptionsFunc - test func(t *testing.T) (test, DockerEngine) - stopAfterNErrs int + logOptions logOptionsFunc + test func(t *testing.T) (test, *dockerenginetest.Double) + runUntilStopped bool - errs []string + stopAfterNErrs int + errs []string }{ "stop and start": { - test: func(t *testing.T) (test, DockerEngine) { + test: func(t *testing.T) (test, *dockerenginetest.Double) { return func(t *testing.T, o *Orchestrator) {}, &dockerenginetest.Double{} }, }, "error if fail to build pause container": { - test: func(t *testing.T) (test, DockerEngine) { + test: func(t *testing.T) (test, *dockerenginetest.Double) { de := &dockerenginetest.Double{ BuildFn: func(ctx context.Context, ba *dockerengine.BuildArguments, w io.Writer) error { return errors.New("some error") @@ -64,7 +65,8 @@ func TestOrchestrator(t *testing.T) { }, }, "error if unable to check if pause container is running": { - test: func(t *testing.T) (test, DockerEngine) { + runUntilStopped: true, + test: func(t *testing.T) (test, *dockerenginetest.Double) { de := &dockerenginetest.Double{ IsContainerRunningFn: func(ctx context.Context, name string) (bool, error) { return false, errors.New("some error") @@ -79,8 +81,9 @@ func TestOrchestrator(t *testing.T) { }, }, "error stopping task": { - logOptions: noLogs, - test: func(t *testing.T) (test, DockerEngine) { + logOptions: noLogs, + runUntilStopped: true, + test: func(t *testing.T) (test, *dockerenginetest.Double) { de := &dockerenginetest.Double{ IsContainerRunningFn: func(ctx context.Context, name string) (bool, error) { return true, nil @@ -109,8 +112,9 @@ func TestOrchestrator(t *testing.T) { }, }, "error restarting new task due to pause changes": { - logOptions: noLogs, - test: func(t *testing.T) (test, DockerEngine) { + logOptions: noLogs, + runUntilStopped: true, + test: func(t *testing.T) (test, *dockerenginetest.Double) { de := &dockerenginetest.Double{ IsContainerRunningFn: func(ctx context.Context, name string) (bool, error) { return true, nil @@ -142,8 +146,9 @@ func TestOrchestrator(t *testing.T) { }, }, "success with a task": { - logOptions: noLogs, - test: func(t *testing.T) (test, DockerEngine) { + logOptions: noLogs, + runUntilStopped: true, + test: func(t *testing.T) (test, *dockerenginetest.Double) { de := &dockerenginetest.Double{ IsContainerRunningFn: func(ctx context.Context, name string) (bool, error) { return true, nil @@ -161,6 +166,9 @@ func TestOrchestrator(t *testing.T) { } return nil }, + StopFn: func(ctx context.Context, name string) error { + return nil + }, } return func(t *testing.T, o *Orchestrator) { o.RunTask(Task{ @@ -182,70 +190,87 @@ func TestOrchestrator(t *testing.T) { }) }, de }, - errs: []string{}, }, - "proxy setup, connection returns error": { + "container run stops early with error": { logOptions: noLogs, - test: func(t *testing.T) (test, DockerEngine) { + test: func(t *testing.T) (test, *dockerenginetest.Double) { + stopPause := make(chan struct{}) de := &dockerenginetest.Double{ IsContainerRunningFn: func(ctx context.Context, name string) (bool, error) { return true, nil }, - ExecFn: func(ctx context.Context, ctr string, w io.Writer, cmd string, args ...string) error { - if cmd == "aws" { + RunFn: func(ctx context.Context, opts *dockerengine.RunOptions) error { + if opts.ContainerName == "prefix-foo" { return errors.New("some error") + } else { + // block pause container until Stop(pause) + <-stopPause + } + return nil + }, + StopFn: func(ctx context.Context, s string) error { + if s == "prefix-pause" { + stopPause <- struct{}{} } return nil }, } return func(t *testing.T, o *Orchestrator) { - _, ipNet, err := net.ParseCIDR("172.20.0.0/16") - require.NoError(t, err) - - o.RunTask(Task{}, RunTaskWithProxy("ecs:cluster_task_ctr", *ipNet, Host{ - Name: "remote-foo", - Port: 80, - })) + o.RunTask(Task{ + Containers: map[string]ContainerDefinition{ + "foo": {}, + }, + }) }, de }, stopAfterNErrs: 1, - errs: []string{`proxy to remote-foo:80: some error`}, + errs: []string{`run "prefix-foo": some error`}, }, - "proxy setup, ip increment error": { + "container run stops early with nil error": { logOptions: noLogs, - test: func(t *testing.T) (test, DockerEngine) { + test: func(t *testing.T) (test, *dockerenginetest.Double) { + stopPause := make(chan struct{}) de := &dockerenginetest.Double{ IsContainerRunningFn: func(ctx context.Context, name string) (bool, error) { return true, nil }, - ExecFn: func(ctx context.Context, ctr string, w io.Writer, cmd string, args ...string) error { - if cmd == "aws" { - fmt.Fprintf(w, "Port 61972 opened for sessionId mySessionId\n") + RunFn: func(ctx context.Context, opts *dockerengine.RunOptions) error { + if opts.ContainerName == "prefix-foo" { + return nil + } else { + // block pause container until Stop(pause) + <-stopPause + } + return nil + }, + StopFn: func(ctx context.Context, s string) error { + if s == "prefix-pause" { + stopPause <- struct{}{} } return nil }, } return func(t *testing.T, o *Orchestrator) { - _, ipNet, err := net.ParseCIDR("255.255.255.254/31") // 255.255.255.254 - 255.255.255.255 - require.NoError(t, err) - - o.RunTask(Task{}, RunTaskWithProxy("ecs:cluster_task_ctr", *ipNet, generateHosts(3)...)) + o.RunTask(Task{ + Containers: map[string]ContainerDefinition{ + "foo": {}, + }, + }) }, de }, stopAfterNErrs: 1, - errs: []string{`setup proxy connections: increment ip: max ipv4 address`}, + errs: []string{`run "prefix-foo": container stopped unexpectedly`}, }, - "proxy setup, ip tables error": { - logOptions: noLogs, - test: func(t *testing.T) (test, DockerEngine) { + "proxy setup, connection returns error": { + logOptions: noLogs, + runUntilStopped: true, + test: func(t *testing.T) (test, *dockerenginetest.Double) { de := &dockerenginetest.Double{ IsContainerRunningFn: func(ctx context.Context, name string) (bool, error) { return true, nil }, ExecFn: func(ctx context.Context, ctr string, w io.Writer, cmd string, args ...string) error { if cmd == "aws" { - fmt.Fprintf(w, "Port 61972 opened for sessionId mySessionId\n") - } else if cmd == "iptables" { return errors.New("some error") } return nil @@ -262,11 +287,12 @@ func TestOrchestrator(t *testing.T) { }, de }, stopAfterNErrs: 1, - errs: []string{`setup proxy connections: modify iptables: some error`}, + errs: []string{`proxy to remote-foo:80: some error`}, }, - "proxy setup, ip tables save error": { - logOptions: noLogs, - test: func(t *testing.T) (test, DockerEngine) { + "proxy setup, ip increment error": { + logOptions: noLogs, + runUntilStopped: true, + test: func(t *testing.T) (test, *dockerenginetest.Double) { de := &dockerenginetest.Double{ IsContainerRunningFn: func(ctx context.Context, name string) (bool, error) { return true, nil @@ -274,7 +300,31 @@ func TestOrchestrator(t *testing.T) { ExecFn: func(ctx context.Context, ctr string, w io.Writer, cmd string, args ...string) error { if cmd == "aws" { fmt.Fprintf(w, "Port 61972 opened for sessionId mySessionId\n") - } else if cmd == "iptables-save" { + } + return nil + }, + } + return func(t *testing.T, o *Orchestrator) { + _, ipNet, err := net.ParseCIDR("255.255.255.254/31") // 255.255.255.254 - 255.255.255.255 + require.NoError(t, err) + + o.RunTask(Task{}, RunTaskWithProxy("ecs:cluster_task_ctr", *ipNet, generateHosts(3)...)) + }, de + }, + errs: []string{`setup proxy connections: increment ip: max ipv4 address`}, + }, + "proxy setup, ip tables error": { + logOptions: noLogs, + runUntilStopped: true, + test: func(t *testing.T) (test, *dockerenginetest.Double) { + de := &dockerenginetest.Double{ + IsContainerRunningFn: func(ctx context.Context, name string) (bool, error) { + return true, nil + }, + ExecFn: func(ctx context.Context, ctr string, w io.Writer, cmd string, args ...string) error { + if cmd == "aws" { + fmt.Fprintf(w, "Port 61972 opened for sessionId mySessionId\n") + } else if cmd == "iptables" { return errors.New("some error") } return nil @@ -290,12 +340,12 @@ func TestOrchestrator(t *testing.T) { })) }, de }, - stopAfterNErrs: 1, - errs: []string{`setup proxy connections: save iptables: some error`}, + errs: []string{`setup proxy connections: modify iptables: some error`}, }, "proxy setup, /etc/hosts error": { - logOptions: noLogs, - test: func(t *testing.T) (test, DockerEngine) { + logOptions: noLogs, + runUntilStopped: true, + test: func(t *testing.T) (test, *dockerenginetest.Double) { de := &dockerenginetest.Double{ IsContainerRunningFn: func(ctx context.Context, name string) (bool, error) { return true, nil @@ -319,13 +369,12 @@ func TestOrchestrator(t *testing.T) { })) }, de }, - stopAfterNErrs: 1, - errs: []string{`setup proxy connections: update /etc/hosts: some error`}, + errs: []string{`setup proxy connections: update /etc/hosts: some error`}, }, "proxy success": { - logOptions: noLogs, - test: func(t *testing.T) (test, DockerEngine) { - waitUntilRun := make(chan struct{}) + logOptions: noLogs, + runUntilStopped: true, + test: func(t *testing.T) (test, *dockerenginetest.Double) { de := &dockerenginetest.Double{ IsContainerRunningFn: func(ctx context.Context, name string) (bool, error) { return true, nil @@ -336,12 +385,6 @@ func TestOrchestrator(t *testing.T) { } return nil }, - RunFn: func(ctx context.Context, opts *dockerengine.RunOptions) error { - if opts.ContainerName == "prefix-foo" { - close(waitUntilRun) - } - return nil - }, } return func(t *testing.T, o *Orchestrator) { _, ipNet, err := net.ParseCIDR("172.20.0.0/16") @@ -355,8 +398,6 @@ func TestOrchestrator(t *testing.T) { Name: "remote-foo", Port: 80, })) - - <-waitUntilRun }, de }, }, @@ -365,6 +406,41 @@ func TestOrchestrator(t *testing.T) { for name, tc := range tests { t.Run(name, func(t *testing.T) { test, dockerEngine := tc.test(t) + + if tc.runUntilStopped { + // make Run(x) not return until until Stop(x) is called + stopChs := make(map[string]chan struct{}) + mu := &sync.Mutex{} + getCh := func(name string) chan struct{} { + mu.Lock() + defer mu.Unlock() + + _, ok := stopChs[name] + if !ok { + stopChs[name] = make(chan struct{}) + } + return stopChs[name] + } + + ogRun := dockerEngine.RunFn + dockerEngine.RunFn = func(ctx context.Context, ro *dockerengine.RunOptions) error { + <-getCh(ro.ContainerName) + if ogRun != nil { + return ogRun(ctx, ro) + } + return nil + } + + ogStop := dockerEngine.StopFn + dockerEngine.StopFn = func(ctx context.Context, name string) error { + getCh(name) <- struct{}{} + if ogStop != nil { + return ogStop(ctx, name) + } + return nil + } + } + o := New(dockerEngine, "prefix-", tc.logOptions) wg := &sync.WaitGroup{}