Skip to content

Commit

Permalink
Add e2e test for stream attach and ignore expected errors
Browse files Browse the repository at this point in the history
context.Cancelled and io.EOF are treated as expected error because
they are the only way to cancel stream
  • Loading branch information
tomekjarosik committed Nov 30, 2024
1 parent d462238 commit d1202f2
Show file tree
Hide file tree
Showing 6 changed files with 352 additions and 103 deletions.
20 changes: 17 additions & 3 deletions pkg/fugaci_test/provider_e2e_helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,14 @@ import (
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/remotecommand"
"net/http"
"os"
"path/filepath"
"testing"
"time"
)

const testNamespace = "jenkins"
const testNamespace = "default"

func createPodFromYAML(clientset *kubernetes.Clientset, fileName string) (*v1.Pod, error) {
podYAML, err := os.ReadFile(filepath.Join("testdata", fileName))
Expand Down Expand Up @@ -126,14 +127,27 @@ func execCommandInPod(t *testing.T, clientset *kubernetes.Clientset, config *res
exec, err := remotecommand.NewSPDYExecutor(config, "POST", req.URL())
require.NoError(t, err, "error creating executor")
var stdout, stderr bytes.Buffer

err = exec.Stream(remotecommand.StreamOptions{
ctx := context.Background()
err = exec.StreamWithContext(ctx, remotecommand.StreamOptions{
Stdout: &stdout,
Stderr: &stderr,
})
require.NoError(t, err, "error executing command: %v", command)
return stdout.String(), stderr.String()
}
func attachStreamToPod(t *testing.T, clientset *kubernetes.Clientset, config *rest.Config, namespace, podName string,
attachOptions *v1.PodAttachOptions, ctx context.Context, streamOptions remotecommand.StreamOptions) error {
req := clientset.CoreV1().RESTClient().Post().
Resource("pods").
Namespace(namespace).
Name(podName).
SubResource("attach").
VersionedParams(attachOptions, scheme.ParameterCodec)

exec, err := remotecommand.NewSPDYExecutor(config, http.MethodPost, req.URL())
require.NoError(t, err)
return exec.StreamWithContext(ctx, streamOptions)
}

func generateRandomName(baseName string) string {
b := make([]byte, 4) // 4 bytes will give us 8 hex characters
Expand Down
110 changes: 107 additions & 3 deletions pkg/fugaci_test/provider_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,17 @@ package fugaci_test
*/

import (
"bufio"
"bytes"
"context"
"fmt"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"io"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/remotecommand"
"os"
"strings"
"testing"
"time"

Expand All @@ -33,7 +39,11 @@ type PodTestCase struct {
}

func TestProviderE2E(t *testing.T) {
config, err := clientcmd.BuildConfigFromFlags("", clientcmd.RecommendedHomeFile)
kubeconfigPath := os.Getenv("KUBECONFIG")
if kubeconfigPath == "" {
kubeconfigPath = clientcmd.RecommendedHomeFile
}
config, err := clientcmd.BuildConfigFromFlags("", kubeconfigPath)
if err != nil {
t.Fatalf("Failed to build kubeconfig: %v", err)
}
Expand Down Expand Up @@ -81,7 +91,7 @@ func TestProviderE2E(t *testing.T) {
assertions: func(t *testing.T, clientset *kubernetes.Clientset, config *rest.Config, pods []*v1.Pod) {
p := pods[0]
logs := getContainerLogs(t, clientset, testNamespace, p.Name, "curie3")
assert.Contains(t, logs, "spec.image=ghcr.io/macvmio/macos-sonoma:14.5-agent-v1.6")
assert.Contains(t, logs, "spec.image=ghcr.io/macvmio/macos-sonoma:14.5-agent-v1.7")
assert.Contains(t, logs, "action=pulling")
},
},
Expand All @@ -108,7 +118,7 @@ func TestProviderE2E(t *testing.T) {

assert.Contains(t, logs, "spec.image=ghcr.io/macvmio/macos-sonoma:14.5-agent-v1.6")
assert.Contains(t, logs, "action=pulling")
assert.Contains(t, logs, "imageID=sha256:08bbe35549962a2aef9e79631f93c43a245aa662674cb88298be518aabbaed32")
assert.Contains(t, logs, "imageID=sha256:5e21ef1cd7e667ba8581f2df7bb292b7db23bc62df7137d3a1fa5790a57d3260")
assert.Contains(t, logs, "state=created")
assert.Contains(t, logs, "state=SSHReady")
assert.Contains(t, logs, "action=stop")
Expand All @@ -133,6 +143,100 @@ func TestProviderE2E(t *testing.T) {
assert.Empty(t, stderr, "Stderr should be empty")
},
},
{
name: "TestAttachWithStdoutAndStderr",
podFiles: []string{"pod6-attach-stdout.yaml"},
postCreate: waitForPodConditionReady,
assertions: func(t *testing.T, clientset *kubernetes.Clientset, config *rest.Config, pods []*v1.Pod) {
pod := pods[0]
ctx, cancel := context.WithCancel(context.Background())
stdoutReader, stdoutWriter := io.Pipe()
defer stdoutWriter.Close()
outputChan := make(chan string, 100)
go func() {
defer close(outputChan)
scanner := bufio.NewScanner(stdoutReader)
for scanner.Scan() {
line := scanner.Text()
outputChan <- line
if strings.Contains(line, "counter-5") {
cancel() // Cancel the context once the line is found
return
}
}
if err := scanner.Err(); err != nil {
t.Errorf("Error scanning output: %v", err)
}
}()
// This blocks until stream is completed - it's when context is cancelled
err = attachStreamToPod(t, clientset, config, testNamespace, pod.Name,
&v1.PodAttachOptions{
Stdin: false,
Stdout: true,
Stderr: true,
TTY: false,
},
ctx,
remotecommand.StreamOptions{
Stdout: stdoutWriter,
Stderr: stdoutWriter,
Tty: false,
})
// Collect and validate output
var capturedOutput []string
for line := range outputChan {
capturedOutput = append(capturedOutput, line)
}

// Assertions for stdout
assert.Contains(t, strings.Join(capturedOutput, "\n"), "counter-1")
assert.Contains(t, strings.Join(capturedOutput, "\n"), "counter-2")
assert.Contains(t, strings.Join(capturedOutput, "\n"), "counter-3")
assert.Contains(t, strings.Join(capturedOutput, "\n"), "counter-4")
assert.Contains(t, strings.Join(capturedOutput, "\n"), "counter-5")
},
},
{
name: "TestAttachWithStdinAndTTY",
podFiles: []string{"pod7-attach-stdin-auto.yaml"},
postCreate: waitForPodConditionReady,
assertions: func(t *testing.T, clientset *kubernetes.Clientset, config *rest.Config, pods []*v1.Pod) {
pod := pods[0]
// Use io.Pipe for stdin to control when to close
stdinReader, stdinWriter := io.Pipe()
stdout := &bytes.Buffer{}
ctx := context.Background()

// Prepare input
input := "Hello, pod!"
go func() {
defer stdinWriter.Close()
_, err := stdinWriter.Write([]byte(input))
require.NoError(t, err)
time.Sleep(200 * time.Millisecond)
// Close stdinWriter to signal EOF to the remote process
}()

// This blocks until stream is completed. In this case when stdin is closed
err = attachStreamToPod(t, clientset, config, testNamespace, pod.Name,
&v1.PodAttachOptions{
Stdin: true,
Stdout: true,
Stderr: false, // Must be false when TTY is true
TTY: true,
},
ctx,
remotecommand.StreamOptions{
Stdin: stdinReader,
Stdout: stdout,
Tty: true,
})
require.NoError(t, err)

output := stdout.String()
assert.Contains(t, output, input, "The output should contain the input sent via stdin")
},
},
//{
// // TODO: Figure out a way to delete test image
// name: "TestPodPullStrategy_Always",
Expand Down
2 changes: 1 addition & 1 deletion pkg/fugaci_test/testdata/pod6-attach-stdout.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ spec:
containers:
- image: ghcr.io/macvmio/macos-sonoma:14.5-agent-v1.7
imagePullPolicy: Never
command: [ "bash", "-c", 'for ((i=1;;i++)); do echo "$i"; sleep 0.1; done' ]
command: [ "bash", "-c", 'for ((i=1;;i++)); do echo "counter-$i"; sleep 0.1; done' ]
name: curie3
envFrom:
- secretRef:
Expand Down
22 changes: 22 additions & 0 deletions pkg/fugaci_test/testdata/pod7-attach-stdin-auto.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
apiVersion: v1
kind: Pod
metadata:
name: testpod1
spec:
containers:
- image: ghcr.io/macvmio/macos-sonoma:14.5-agent-v1.7
imagePullPolicy: Never
command: ["sh", "-c", "cat"]
name: curie3
stdin: true
tty: true
envFrom:
- secretRef:
name: fugaci-ssh-secret
nodeSelector:
kubernetes.io/os: darwin
tolerations:
- key: "fugaci.macvm.io"
operator: "Equal"
value: "true"
effect: "NoSchedule"
17 changes: 17 additions & 0 deletions pkg/streams/streams.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"errors"
"fmt"
"github.com/macvmio/fugaci/pkg/ctxio"
"github.com/virtual-kubelet/virtual-kubelet/errdefs"
"github.com/virtual-kubelet/virtual-kubelet/node/api"
"golang.org/x/sync/errgroup"
"io"
Expand Down Expand Up @@ -109,6 +110,12 @@ func (f *FilesBasedStreams) Close() error {
}

func (f *FilesBasedStreams) Stream(ctx context.Context, attach api.AttachIO, loggerPrintf func(format string, v ...any)) error {
if attach.Stdin() != nil && f.stdinReader == nil {
return errdefs.InvalidInput("stdin streaming is disabled")
}
if attach.TTY() && !f.allocateTTY {
return errdefs.InvalidInput("TTY is disabled")
}
// Create an errgroup with the provided context
eg, ctx := errgroup.WithContext(ctx)

Expand Down Expand Up @@ -142,6 +149,12 @@ func (f *FilesBasedStreams) Stream(ctx context.Context, attach api.AttachIO, log
// TODO: This blocks until if stdin has no data, even if context is cancelled
_, err := io.Copy(f.stdinWriter, attach.Stdin())
loggerPrintf("stdin copy completed: %v", err)
eg.Go(func() error {
if err == nil {
return io.EOF
}
return err
})
}()
}

Expand Down Expand Up @@ -172,6 +185,10 @@ func (f *FilesBasedStreams) Stream(ctx context.Context, attach api.AttachIO, log

loggerPrintf("waiting for Stream() to finish")
err := eg.Wait()
if errors.Is(err, io.EOF) || errors.Is(err, context.Canceled) {
loggerPrintf("Stream() ignoring expected error: %v", err)
err = nil
}
loggerPrintf("Stream() has completed")
return err
}
Expand Down
Loading

0 comments on commit d1202f2

Please sign in to comment.