Skip to content

Commit

Permalink
workloads informers
Browse files Browse the repository at this point in the history
  • Loading branch information
juozasg authored and Kingdon Barrett committed Jul 20, 2023
1 parent 4c9a4d2 commit 4b9e39a
Show file tree
Hide file tree
Showing 10 changed files with 145 additions and 42 deletions.
2 changes: 2 additions & 0 deletions src/cli/kubernetes/apiResources.ts
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ export async function loadAvailableResourceKinds() {

lines.map(line => {
let cols = line.split(/\s+/);


if(cols.length === 7) {
// delete optional SHORTNAMES column
cols = cols.slice(0, 1).concat(cols.slice(2));
Expand Down
56 changes: 30 additions & 26 deletions src/cli/kubernetes/kubectlProxy.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ import { kubeConfig } from 'cli/kubernetes/kubernetesConfig';
import { shell } from 'cli/shell/exec';
import { createK8sClients, destroyK8sClients } from 'k8s/client';

let connected = false;
let kubectlProxyProcess: ChildProcess | undefined;
// let isConnecting = false;
let proxyProc: ChildProcess | undefined;

// tries to keep alive the `kubectl proxy` process
// if process dies or errors out it will be stopped
Expand All @@ -14,70 +14,74 @@ let kubectlProxyProcess: ChildProcess | undefined;
export function kubeProxyKeepAlive() {
// keep alive
setInterval(async () => {
if(!connected) {
await stopKubeProxy();
if(!proxyProc) {
destroyK8sClients();
await startKubeProxy();
}
}, 1000);
}, 2000);
}

async function startKubeProxy() {
if(kubectlProxyProcess) {
if(proxyProc) {
await stopKubeProxy();
}

kubectlProxyProcess = shell.execProc('kubectl proxy -p 0');
console.log('started kube proxy process');
proxyProc = shell.execProc('kubectl proxy -p 0');
console.log(`~proxy started ${proxyProc.pid}`);

procListen(kubectlProxyProcess);
procListen(proxyProc);
}

function procListen(p: ChildProcess) {
p.on('exit', async code => {
console.log('proxy exit', p, code);
stopKubeProxy();
console.log('~proxy exit', p.pid, code);
if(proxyProc?.pid === p.pid) {
stopKubeProxy();
}
});

p.on('error', err => {
console.log('proxy error', p, err);
stopKubeProxy();
console.log('~proxy error', p.pid, err);
p.kill();

});

p.stdout?.on('data', (data: string) => {
console.log(`proxy STDOUT: ${data}`);
console.log(`~proxy ${p.pid} STDOUT: ${data}`);
if(data.includes('Starting to serve on')) {
const port = parseInt(data.split(':')[1].trim());
const proxyKc = makeProxyConfig(port);
console.log('kubeproxy config ready');
connected = true;
// isConnecting = true;

createK8sClients(proxyKc);
}
});

p.stderr?.on('data', (data: string) => {
console.log(`proxy STDERR: ${data}`);
stopKubeProxy();
console.log(`~proxy ${p.pid} STDERR: ${data}`);
p.kill();
});
}


async function stopKubeProxy() {
if(kubectlProxyProcess) {
if(!kubectlProxyProcess.killed) {
kubectlProxyProcess.kill();
if(proxyProc) {
if(!proxyProc.killed) {
console.log(`~proxy.kill() ${proxyProc.pid}`);
proxyProc.kill();
}
kubectlProxyProcess = undefined;
}
proxyProc = undefined;

destroyK8sClients();
connected = false;
console.log('stopped kube proxy');
destroyK8sClients();
// isConnecting = false;
console.log('stopped kube proxy');
}

}

export async function restartKubeProxy() {
if(kubectlProxyProcess) {
if(proxyProc) {
await stopKubeProxy();
}
await startKubeProxy();
Expand Down
5 changes: 4 additions & 1 deletion src/cli/kubernetes/kubernetesConfig.ts
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,10 @@ export async function loadKubeConfig(forceReloadResourceKinds = false) {
console.log('currentContext changed', kubeConfig.getCurrentContext());
vscodeOnCurrentContextChanged();
await restartKubeProxy();
refreshAllTreeViews();
// give proxy a chance to start
setTimeout(() => {
refreshAllTreeViews();
}, 100);
}
} else if(forceReloadResourceKinds) {
await loadAvailableResourceKinds();
Expand Down
8 changes: 6 additions & 2 deletions src/commands/refreshTreeViews.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,15 @@ import { refreshClustersTreeView, refreshSourcesTreeView, refreshTemplatesTreeVi

export async function refreshAllTreeViewsCommand() {
await loadKubeConfig(true);
refreshClustersTreeView();
refreshResourcesTreeViews();
// give proxy a chance to start
setTimeout(() => {
refreshAllTreeViews();
}, 100);
}

export async function refreshAllTreeViews() {
console.log('refreshAllTreeViews');

refreshClustersTreeView();
refreshResourcesTreeViews();
}
Expand Down
14 changes: 9 additions & 5 deletions src/k8s/client.ts
Original file line number Diff line number Diff line change
@@ -1,19 +1,23 @@
import * as k8s from '@kubernetes/client-node';

// export let informer: k8s.Informer<GitRepository> & k8s.ObjectCache<GitRepository> | undefined;
import { createInformers, destroyInformers } from './informers';

export let k8sCoreApi: k8s.CoreV1Api | undefined;
export let k8sCustomApi: k8s.CustomObjectsApi | undefined;
export let k8sWatch: k8s.Watch | undefined;

export function createK8sClients(kc: k8s.KubeConfig) {
destroyK8sClients();
k8sCoreApi = kc.makeApiClient(k8s.CoreV1Api);
k8sWatch = new k8s.Watch(kc);
k8sCustomApi = kc.makeApiClient(k8s.CustomObjectsApi);

createInformers(kc);

}

export function destroyK8sClients() {
destroyInformers();

k8sCoreApi = undefined;
k8sWatch = undefined;
k8sCustomApi = undefined;
}


81 changes: 81 additions & 0 deletions src/k8s/informers.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
import * as k8s from '@kubernetes/client-node';
import { getAPIParams } from 'cli/kubernetes/apiResources';
import { GitRepository } from 'types/flux/gitRepository';
import { Kind, KubernetesListObject, KubernetesObject } from 'types/kubernetes/kubernetesTypes';
import { KubernetesObjectDataProvider } from 'ui/treeviews/dataProviders/kubernetesObjectDataProvider';
import { sourceDataProvider, workloadDataProvider } from 'ui/treeviews/treeViews';
import { k8sCustomApi } from './client';
import { FluxSourceKinds, FluxWorkloadKinds } from 'types/flux/object';


let informers: k8s.Informer<KubernetesObject>[] = [];

export function createInformers(kc: k8s.KubeConfig) {
FluxSourceKinds.forEach(kind => {
createInformer(kc, sourceDataProvider, kind);
});

FluxWorkloadKinds.forEach(kind => {
createInformer(kc, workloadDataProvider, kind);
});
}

export function destroyInformers() {
informers.forEach(informer => {
informer.stop();
});

informers = [];
}


async function createInformer(kc: k8s.KubeConfig, receiver: KubernetesObjectDataProvider, kind: Kind) {
const api = getAPIParams(kind);
if (!api) {
return;
}

const listFn = async () => {
const result = await k8sCustomApi!.listClusterCustomObject(api.group, api.version, api.plural);
const kbody = result.body as KubernetesListObject<GitRepository>;
return Promise.resolve({ response: result.response, body: kbody });
};

const informer = k8s.makeInformer(
kc,
`/apis/${api.group}/${api.version}/${api.plural}`,
listFn,
);

try {
await informer.start();
registerInformerEvents(informer, receiver);
informers.push(informer);
} catch (error) {
destroyInformers();
}
}

function registerInformerEvents(informer: k8s.Informer<KubernetesObject>, receiver: KubernetesObjectDataProvider) {
informer?.on('add', (obj: KubernetesObject) => {
// console.log('*- informer Add', obj);
receiver.add(obj);
});

informer?.on('update', (obj: KubernetesObject) => {
// console.log('*- informer Update', obj);
receiver.update(obj);
});

informer?.on('delete', (obj: KubernetesObject) => {
// console.log('*- informer Delete ', obj);
receiver.delete(obj);
});

informer?.on('error', (err: Error) => {
console.log('*- informer Error', err);
destroyInformers();
});

// console.log('*- informer listening for events...');
}
3 changes: 0 additions & 3 deletions src/k8s/list.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,3 @@
// kubectl auth can-i watch gitrepository
// kubectl auth can-i watch kustomizations --all-namespaces

import { getAPIParams } from 'cli/kubernetes/apiResources';
import { FluxObject } from 'types/flux/object';
import { Kind, KubernetesListObject, Namespace } from 'types/kubernetes/kubernetesTypes';
Expand Down
7 changes: 6 additions & 1 deletion src/types/flux/object.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,15 @@ export type FluxSourceObject = GitRepository | OCIRepository | HelmRepository |
export type FluxWorkloadObject = Kustomization | HelmRelease;
export type FluxObject = FluxSourceObject | FluxWorkloadObject;

export const FluxSourceKinds: string[] = [
export const FluxSourceKinds: Kind[] = [
Kind.GitRepository,
Kind.OCIRepository,
Kind.HelmRepository,
Kind.Bucket,
];

export const FluxWorkloadKinds: Kind[] = [
Kind.Kustomization,
Kind.HelmRelease,
];

Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,9 @@ export abstract class KubernetesObjectDataProvider extends DataProvider {
}

public update(object: KubernetesObject) {
console.log('update', object);
// console.log('update', object);
// console.log('treeitems', this.treeItems);


const namespaceNode = this.findParentNamespaceNode(object);
if(!namespaceNode) {
Expand Down
7 changes: 4 additions & 3 deletions src/ui/treeviews/dataProviders/workloadDataProvider.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,11 @@ export class WorkloadDataProvider extends KubernetesObjectDataProvider {

setVSCodeContext(ContextId.LoadingWorkloads, true);

const [kustomizations, helmReleases, namespaces] = await Promise.all([
const [kustomizations, helmReleases, _] = await Promise.all([
// Fetch all workloads
getKustomizations(),
getHelmReleases(),
// Fetch namespaces to group the nodes
// Cache namespaces to group the nodes
getNamespaces(),
]);

Expand Down Expand Up @@ -123,7 +123,8 @@ export class WorkloadDataProvider extends KubernetesObjectDataProvider {
return [new TreeNode('Loading...')];
}
} else {
return await this.buildTree();
this.treeItems = await this.buildTree();
return this.treeItems;
}
}
}

0 comments on commit 4b9e39a

Please sign in to comment.