Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement async callback to notify API about pods statuses #4

Merged
merged 1 commit into from
Jan 27, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ module github.com/macvmio/fugaci
go 1.23.2

require (
github.com/fsnotify/fsnotify v1.7.0
github.com/google/go-containerregistry v0.20.2
github.com/macvmio/geranos v0.7.5
github.com/pkg/errors v0.9.1
Expand All @@ -11,9 +12,9 @@ require (
github.com/spf13/cobra v1.8.0
github.com/spf13/viper v1.19.0
github.com/stretchr/testify v1.9.0
github.com/virtual-kubelet/node-cli v0.8.0
github.com/virtual-kubelet/virtual-kubelet v1.11.0
golang.org/x/crypto v0.24.0
golang.org/x/sync v0.8.0
k8s.io/api v0.29.1
k8s.io/apimachinery v0.29.1
k8s.io/client-go v0.29.1
Expand All @@ -38,7 +39,6 @@ require (
github.com/emicklei/go-restful/v3 v3.11.0 // indirect
github.com/evanphx/json-patch v4.12.0+incompatible // indirect
github.com/felixge/httpsnoop v1.0.4 // indirect
github.com/fsnotify/fsnotify v1.7.0 // indirect
github.com/go-logr/logr v1.4.2 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/go-openapi/jsonpointer v0.19.6 // indirect
Expand Down Expand Up @@ -108,7 +108,6 @@ require (
golang.org/x/exp v0.0.0-20230905200255-921286631fa9 // indirect
golang.org/x/net v0.26.0 // indirect
golang.org/x/oauth2 v0.20.0 // indirect
golang.org/x/sync v0.8.0 // indirect
golang.org/x/sys v0.25.0 // indirect
golang.org/x/term v0.22.0 // indirect
golang.org/x/text v0.16.0 // indirect
Expand Down
512 changes: 0 additions & 512 deletions go.sum

Large diffs are not rendered by default.

11 changes: 11 additions & 0 deletions pkg/fugaci/logging.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,20 @@ package fugaci
import (
"context"
io_prometheus_client "github.com/prometheus/client_model/go"
vknode "github.com/virtual-kubelet/virtual-kubelet/node"
"github.com/virtual-kubelet/virtual-kubelet/node/api"
"github.com/virtual-kubelet/virtual-kubelet/node/api/statsv1alpha1"
"github.com/virtual-kubelet/virtual-kubelet/node/nodeutil"
"io"
v1 "k8s.io/api/core/v1"
"log"
)

var _ vknode.PodLifecycleHandler = (*LoggingProvider)(nil)

var _ vknode.PodNotifier = (*LoggingProvider)(nil)
var _ nodeutil.Provider = (*LoggingProvider)(nil)

// LoggingProvider wraps an existing Provider and logs the responses
type LoggingProvider struct {
UnderlyingProvider *Provider
Expand Down Expand Up @@ -151,3 +158,7 @@ func (lp *LoggingProvider) PortForward(ctx context.Context, namespace, pod strin
}
return err
}

func (lp *LoggingProvider) NotifyPods(ctx context.Context, cb func(*v1.Pod)) {
lp.UnderlyingProvider.NotifyPods(ctx, cb)
}
31 changes: 17 additions & 14 deletions pkg/fugaci/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"github.com/macvmio/fugaci/pkg/portforwarder"
"github.com/macvmio/fugaci/pkg/sshrunner"
io_prometheus_client "github.com/prometheus/client_model/go"
"github.com/virtual-kubelet/node-cli/manager"
"github.com/virtual-kubelet/virtual-kubelet/errdefs"
vknode "github.com/virtual-kubelet/virtual-kubelet/node"
"github.com/virtual-kubelet/virtual-kubelet/node/api"
Expand All @@ -24,21 +23,23 @@ import (
)

var _ vknode.PodLifecycleHandler = (*Provider)(nil)
var __ nodeutil.Provider = (*Provider)(nil)
var _ vknode.PodNotifier = (*Provider)(nil)
var _ nodeutil.Provider = (*Provider)(nil)

var ErrNotImplemented = errors.New("not implemented")

type Provider struct {
appContext context.Context
resourceManager *manager.ResourceManager
cfg Config
virt *curie.Virtualization
puller Puller
appContext context.Context
cfg Config
virt *curie.Virtualization
puller Puller

// Mutex to synchronize access to the in-memory store.
mu sync.Mutex
// In-memory store for Pods.
vms [2]*VM

notifyPodsCallback func(p *v1.Pod)
}

func NewProvider(appCtx context.Context, cfg Config) (*LoggingProvider, error) {
Expand All @@ -48,6 +49,9 @@ func NewProvider(appCtx context.Context, cfg Config) (*LoggingProvider, error) {
virt: curie.NewVirtualization(cfg.CurieVirtualization.BinaryPath, cfg.CurieVirtualization.DataRootPath),
cfg: cfg,
vms: [2]*VM{},
notifyPodsCallback: func(p *v1.Pod) {
log.Printf("notifyPodsCallback is not configured")
},
}), nil
}

Expand All @@ -65,7 +69,7 @@ func (s *Provider) allocateVM(pod *v1.Pod) (*VM, error) {
vm, err := NewVM(s.appContext, s.virt, s.puller,
sshrunner.NewRunner(), portforwarder.NewPortForwarder(),
s.cfg.ContainerLogsDirectory,
pod, 0)
pod, 0, s.notifyPodsCallback)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -171,7 +175,6 @@ func (s *Provider) DeletePod(ctx context.Context, pod *v1.Pod) error {
if err != nil {
return fmt.Errorf("cleanup of VM for pod '%s/%s' failed: %w", pod.Namespace, pod.Name, err)
}

return s.deallocateVM(vm)
}

Expand All @@ -192,16 +195,16 @@ func (s *Provider) GetPods(ctx context.Context) ([]*v1.Pod, error) {
return []*v1.Pod{}, nil
}

func (s *Provider) NotifyPods(ctx context.Context, cb func(*v1.Pod)) {
fmt.Printf("NotifyPods called\n")
s.notifyPodsCallback = cb
}

func (s *Provider) ConfigureNode(ctx context.Context, fugaciVersion string, node *v1.Node) error {
n := NewNode(fugaciVersion, s.cfg)
return n.Configure(node)
}

// TODO(tjarosik):
//func (s *Provider) NotifyPods(ctx context.Context, cb func(*v1.Pod)) {
// log.Printf("Notifying pods on node %s", s.nodeName)
//}

func (s *Provider) GetContainerLogs(ctx context.Context, namespace, podName, containerName string, opts api.ContainerLogOpts) (io.ReadCloser, error) {
// Return a simple static log line
vm, err := s.findVMByNames(namespace, podName, containerName)
Expand Down
24 changes: 18 additions & 6 deletions pkg/fugaci/vm.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@ type VM struct {
streams *streams.FilesBasedStreams
logger *log.Logger
storyLine *storyline.StoryLine

onPodUpdated func(p *v1.Pod)
}

func NewVM(ctx context.Context,
Expand All @@ -89,7 +91,8 @@ func NewVM(ctx context.Context,
portForwarder PortForwarder,
containerLogsDirectory string,
pod *v1.Pod,
containerIndex int) (*VM, error) {
containerIndex int,
onPodUpdated func(p *v1.Pod)) (*VM, error) {
if containerIndex < 0 || containerIndex >= len(pod.Spec.Containers) {
return nil, errors.New("invalid container index")
}
Expand Down Expand Up @@ -143,6 +146,8 @@ func NewVM(ctx context.Context,
streams: fileStreams,
logger: customLogger,
storyLine: storyline.New(),

onPodUpdated: onPodUpdated,
}
vm.containerIndex.Store(int32(containerIndex))
return vm, nil
Expand Down Expand Up @@ -395,22 +400,26 @@ func (s *VM) Run() {
s.storyLine.Add("streamsClosingErr", err)
}
s.logger.Printf("container '%v' finished successfully: %v, exit code=%d\n", containerID, runCmd, runCmd.ProcessState.ExitCode())
s.safeUpdateState(v1.ContainerState{Terminated: &v1.ContainerStateTerminated{
terminatedContainerState := v1.ContainerState{Terminated: &v1.ContainerStateTerminated{
ExitCode: int32(runCmd.ProcessState.ExitCode()),
Reason: "exited successfully",
Reason: "Succeeded",
Message: runCmd.ProcessState.String(),
StartedAt: startedAt,
FinishedAt: metav1.Now(),
ContainerID: s.GetContainerStatus().ContainerID,
}})
}}
// Keep it a single call
s.safeUpdatePod(func(pod *v1.Pod) {
pod.Status.ContainerStatuses[s.containerIndex.Load()].State = terminatedContainerState
pod.Status.ContainerStatuses[s.containerIndex.Load()].Ready = false
pod.Status.Phase = v1.PodSucceeded
pod.Status.Conditions = make([]v1.PodCondition, 0)
})
s.vmCancelFunc(nil)
}

func (s *VM) startContainer(containerID string, initTime time.Time) (*exec.Cmd, metav1.Time, error) {
s.safeUpdateState(v1.ContainerState{Waiting: &v1.ContainerStateWaiting{Reason: "starting"}})
s.safeUpdateState(v1.ContainerState{Waiting: &v1.ContainerStateWaiting{Reason: "Starting"}})
runCmd, err := s.virt.Start(s.vmLifetimeCtx, containerID)
if err != nil {
err2 := s.virt.Destroy(s.vmLifetimeCtx, containerID)
Expand Down Expand Up @@ -593,6 +602,9 @@ func (s *VM) safeGetPod() *v1.Pod {

func (s *VM) safeUpdatePod(update func(pod *v1.Pod)) {
s.mu.Lock()
defer s.mu.Unlock()
update(s.pod)
podCopy := s.pod.DeepCopy()
s.mu.Unlock()

go s.onPodUpdated(podCopy)
}
11 changes: 6 additions & 5 deletions pkg/fugaci/vm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,8 @@ func setupCommonTestVM(t *testing.T, podOverride func(*v1.Pod)) (*VM, *MockVirtu
}
podOverride(pod)

vm, err := NewVM(context.Background(), mockVirt, mockPuller, mockSSHRunner, mockPortFortwarder, t.TempDir(), pod, 0)
callbackFunc := func(p *v1.Pod) {}
vm, err := NewVM(context.Background(), mockVirt, mockPuller, mockSSHRunner, mockPortFortwarder, t.TempDir(), pod, 0, callbackFunc)
require.NoError(t, err)
require.NotNil(t, vm)

Expand Down Expand Up @@ -275,7 +276,7 @@ func TestVM_Run_Successful_ifContainerIDProvided_mustRestart(t *testing.T) {
assert.Equal(t, "containerid-123", status.ContainerID)
assert.Equal(t, int32(0), status.State.Terminated.ExitCode)
assert.Equal(t, "exit status 0", status.State.Terminated.Message)
assert.Equal(t, "exited successfully", status.State.Terminated.Reason)
assert.Equal(t, "Succeeded", status.State.Terminated.Reason)
assert.NotEmpty(t, status.State.Terminated.StartedAt)
assert.NotEmpty(t, status.State.Terminated.FinishedAt)

Expand Down Expand Up @@ -314,7 +315,7 @@ func TestVM_Run_Successful_mustBeReadyWithIPAddress(t *testing.T) {
assert.Equal(t, "containerid-123", status.ContainerID)
assert.Equal(t, int32(0), status.State.Terminated.ExitCode)
assert.Equal(t, "exit status 0", status.State.Terminated.Message)
assert.Equal(t, "exited successfully", status.State.Terminated.Reason)
assert.Equal(t, "Succeeded", status.State.Terminated.Reason)
assert.NotEmpty(t, status.State.Terminated.StartedAt)
assert.NotEmpty(t, status.State.Terminated.FinishedAt)

Expand Down Expand Up @@ -365,7 +366,7 @@ func TestVM_Run_Successful_mustRunContainerCommandThroughSSH(t *testing.T) {
assert.Equal(t, "containerid-123", status.ContainerID)
assert.Equal(t, int32(0), status.State.Terminated.ExitCode)
assert.Equal(t, "exit status 0", status.State.Terminated.Message)
assert.Equal(t, "exited successfully", status.State.Terminated.Reason)
assert.Equal(t, "Succeeded", status.State.Terminated.Reason)
assert.NotEmpty(t, status.State.Terminated.StartedAt)
assert.NotEmpty(t, status.State.Terminated.FinishedAt)

Expand Down Expand Up @@ -466,7 +467,7 @@ func TestVM_Run_Successful_CleanupMustBeIdempotent(t *testing.T) {
assert.Equal(t, "containerid-123", status.ContainerID)
assert.Equal(t, int32(0), status.State.Terminated.ExitCode)
assert.Equal(t, "exit status 0", status.State.Terminated.Message)
assert.Equal(t, "exited successfully", status.State.Terminated.Reason)
assert.Equal(t, "Succeeded", status.State.Terminated.Reason)
assert.NotEmpty(t, status.State.Terminated.StartedAt)
assert.NotEmpty(t, status.State.Terminated.FinishedAt)

Expand Down
2 changes: 1 addition & 1 deletion pkg/fugaci_test/provider_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func TestProviderE2E(t *testing.T) {
assertions: func(t *testing.T, clientset *kubernetes.Clientset, config *rest.Config, pods []*v1.Pod) {
p := pods[0]
logs := getContainerLogs(t, clientset, testNamespace, p.Name, "curie3")
assert.Contains(t, logs, "err=\"pull image: GET https://ghcr.com/v2/invalid/image/manifests/123: unexpected status code 404 Not Found")
assert.Contains(t, logs, "err=\"pull image: GET https://ghcr.io/token?scope=repository%3Ainvalid%2Fimage%3Apull&service=ghcr.io: DENIED: denied")

containerStatus := p.Status.ContainerStatuses[0]
assert.False(t, containerStatus.Ready)
Expand Down
29 changes: 29 additions & 0 deletions pkg/fugaci_test/testdata/deployment-basic1.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
apiVersion: apps/v1
kind: Deployment
metadata:
name: testpod-deployment
spec:
replicas: 2
selector:
matchLabels:
app: testpod
template:
metadata:
labels:
app: testpod
spec:
containers:
- image: ghcr.io/macvmio/macos-sonoma:14.5-agent-v1.7
imagePullPolicy: Never
command: ["sleep", "1000000"]
name: curie3
envFrom:
- secretRef:
name: fugaci-ssh-secret
nodeSelector:
kubernetes.io/os: darwin
tolerations:
- key: "fugaci.macvm.io"
operator: "Equal"
value: "true"
effect: "NoSchedule"
2 changes: 1 addition & 1 deletion pkg/fugaci_test/testdata/pod2-invalid-image.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ metadata:
name: testpod2
spec:
containers:
- image: ghcr.com/invalid/image:123
- image: ghcr.io/invalid/image:123
imagePullPolicy: IfNotPresent
command: [ "sleep", "1000000" ]
name: curie3
Expand Down
Loading