Skip to content

Commit

Permalink
Add support for port forwarding
Browse files Browse the repository at this point in the history
Use like like that:
 kubectl port-forward testpod1 5900:5900

To get access to UI via VNC
  • Loading branch information
tomekjarosik committed Nov 24, 2024
1 parent 39fff5a commit 35f942f
Show file tree
Hide file tree
Showing 5 changed files with 107 additions and 11 deletions.
17 changes: 12 additions & 5 deletions pkg/fugaci/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"errors"
"fmt"
"github.com/macvmio/fugaci/pkg/curie"
"github.com/macvmio/fugaci/pkg/portforwarder"
"github.com/macvmio/fugaci/pkg/sshrunner"
io_prometheus_client "github.com/prometheus/client_model/go"
"github.com/virtual-kubelet/node-cli/manager"
Expand Down Expand Up @@ -60,7 +61,9 @@ func (s *Provider) allocateVM(pod *v1.Pod) (*VM, error) {
if s.vms[i] != nil {
continue
}
vm, err := NewVM(s.appContext, s.virt, s.puller, sshrunner.NewRunner(), pod, 0)
vm, err := NewVM(s.appContext, s.virt, s.puller,
sshrunner.NewRunner(), portforwarder.NewPortForwarder(),
pod, 0)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -217,6 +220,14 @@ func (s *Provider) RunInContainer(ctx context.Context, namespace, podName, conta
return vm.RunCommand(ctx, cmd, sshrunner.WithAttachIO(attach), sshrunner.WithEnv(vm.GetEnvVars()))
}

func (s *Provider) PortForward(ctx context.Context, namespace, podName string, port int32, stream io.ReadWriteCloser) error {
vm, err := s.findVMByNames(namespace, podName, "")
if err != nil {
return fmt.Errorf("failed to find VM for pod %s/%s: %w", namespace, podName, err)
}
return vm.PortForward(ctx, port, stream)
}

func (s *Provider) AttachToContainer(ctx context.Context, namespace, podName, containerName string, attach api.AttachIO) error {
return ErrNotImplemented
}
Expand All @@ -228,7 +239,3 @@ func (s *Provider) GetStatsSummary(ctx context.Context) (*statsv1alpha1.Summary,
func (s *Provider) GetMetricsResource(ctx context.Context) ([]*io_prometheus_client.MetricFamily, error) {
return nil, ErrNotImplemented
}

func (s *Provider) PortForward(ctx context.Context, namespace, pod string, port int32, stream io.ReadWriteCloser) error {
return ErrNotImplemented
}
19 changes: 15 additions & 4 deletions pkg/fugaci/vm.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"io"
"log"
"net"
"os"
Expand Down Expand Up @@ -32,6 +33,10 @@ type SSHRunner interface {
Run(ctx context.Context, dialInfo sshrunner.DialInfo, cmd []string, opts ...sshrunner.Option) error
}

type PortForwarder interface {
PortForward(ctx context.Context, address string, stream io.ReadWriteCloser) error
}

type VirtualizationLifecycle interface {
Create(ctx context.Context, pod v1.Pod, containerIndex int) (containerID string, err error)
Start(ctx context.Context, containerID string) (runCommand *exec.Cmd, err error)
Expand All @@ -53,6 +58,7 @@ type VM struct {
virt Virtualization
puller Puller
sshRunner SSHRunner
portForwarder PortForwarder
pod *v1.Pod
containerIndex atomic.Int32

Expand All @@ -73,7 +79,7 @@ type VM struct {
storyLine *storyline.StoryLine
}

func NewVM(ctx context.Context, virt Virtualization, puller Puller, sshRunner SSHRunner, pod *v1.Pod, containerIndex int) (*VM, error) {
func NewVM(ctx context.Context, virt Virtualization, puller Puller, sshRunner SSHRunner, portForwarder PortForwarder, pod *v1.Pod, containerIndex int) (*VM, error) {
if containerIndex < 0 || containerIndex >= len(pod.Spec.Containers) {
return nil, errors.New("invalid container index")
}
Expand All @@ -100,9 +106,10 @@ func NewVM(ctx context.Context, virt Virtualization, puller Puller, sshRunner SS
}

vm := &VM{
virt: virt,
puller: puller,
sshRunner: sshRunner,
virt: virt,
puller: puller,
sshRunner: sshRunner,
portForwarder: portForwarder,

pod: pod,
vmLifetimeCtx: lifetimeCtx,
Expand Down Expand Up @@ -498,6 +505,10 @@ func (s *VM) RunCommand(ctx context.Context, cmd []string, opts ...sshrunner.Opt
return s.sshRunner.Run(ctx, s.sshDialInfo, cmd, extOpts...)
}

func (s *VM) PortForward(ctx context.Context, port int32, stream io.ReadWriteCloser) error {
return s.portForwarder.PortForward(ctx, fmt.Sprintf("%s:%d", s.safeGetPod().Status.PodIP, port), stream)
}

// Below are functions which are safe to call in multiple goroutines

func (s *VM) IsReady() bool {
Expand Down
13 changes: 12 additions & 1 deletion pkg/fugaci/vm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"io"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"net"
Expand Down Expand Up @@ -75,6 +76,15 @@ func (m *MockSSHRunner) Run(ctx context.Context, dialInfo sshrunner.DialInfo, cm
return args.Error(0)
}

type MockPortForwarder struct {
mock.Mock
}

func (m *MockPortForwarder) PortForward(ctx context.Context, address string, stream io.ReadWriteCloser) error {
args := m.Called(ctx, address, stream)
return args.Error(0)
}

func noPodOverride(pod *v1.Pod) {
}

Expand Down Expand Up @@ -108,6 +118,7 @@ func setupCommonTestVM(t *testing.T, podOverride func(*v1.Pod)) (*VM, *MockVirtu
mockPuller := new(MockPuller)
mockVirt := new(MockVirtualization)
mockSSHRunner := new(MockSSHRunner)
mockPortFortwarder := new(MockPortForwarder)

pod := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Expand All @@ -131,7 +142,7 @@ func setupCommonTestVM(t *testing.T, podOverride func(*v1.Pod)) (*VM, *MockVirtu
}
podOverride(pod)

vm, err := NewVM(context.Background(), mockVirt, mockPuller, mockSSHRunner, pod, 0)
vm, err := NewVM(context.Background(), mockVirt, mockPuller, mockSSHRunner, mockPortFortwarder, pod, 0)
require.NoError(t, err)
require.NotNil(t, vm)

Expand Down
2 changes: 1 addition & 1 deletion pkg/fugaci_test/testdata/pod1-basic-running.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ metadata:
name: testpod1
spec:
containers:
- image: ghcr.io/macvmio/macos-sonoma:14.5-agent-v1.6
- image: ghcr.io/macvmio/macos-sonoma:14.5-agent-v1.7
imagePullPolicy: Never
command: [ "sleep", "1000000" ]
name: curie3
Expand Down
67 changes: 67 additions & 0 deletions pkg/portforwarder/portforwarder.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package portforwarder

import (
"context"
"fmt"
"io"
"net"
"sync"
"time"
)

type PortForwarder struct {
}

func NewPortForwarder() *PortForwarder {
return &PortForwarder{}
}

func (s *PortForwarder) PortForward(ctx context.Context, address string, stream io.ReadWriteCloser) error {
// Step 1: Establish a Connection to the VM
conn, err := net.DialTimeout("tcp", address, 5*time.Second)
if err != nil {
return fmt.Errorf("failed to connect to %s: %w", address, err)
}
defer conn.Close()

// Step 2: Forward Data Between the Streams
// Use a WaitGroup to wait for both directions to finish
var wg sync.WaitGroup
wg.Add(2)

// Context to handle cancellation
ctx, cancel := context.WithCancel(ctx)
defer cancel()

// Function to copy data and handle errors
copyFunc := func(dst io.Writer, src io.Reader) {
defer wg.Done()
_, err := io.Copy(dst, src)
if err != nil && err != io.EOF {
// Log the error, but don't terminate immediately
fmt.Printf("Data copy error: %v\n", err)
}
// Cancel the context to stop the other copy
cancel()
}

// Start copying data in both directions
go copyFunc(conn, stream) // from Kubernetes client to VM
go copyFunc(stream, conn) // from VM to Kubernetes client

// Step 3: Wait for Completion or Context Cancellation
doneChan := make(chan struct{})
go func() {
wg.Wait()
close(doneChan)
}()

select {
case <-ctx.Done():
// Context cancelled or timeout
return ctx.Err()
case <-doneChan:
// Port forwarding completed successfully
}
return nil
}

0 comments on commit 35f942f

Please sign in to comment.