Skip to content
This repository has been archived by the owner on Nov 24, 2019. It is now read-only.

DNS resolver #3

Open
wants to merge 10 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
language: go
go:
- 1.12.x
- 1.13.x

script:
- go get -d ./...
- go vet ./...
- go test -v -race ./...
- go vet ./...
5 changes: 2 additions & 3 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
FROM golang:1.12
FROM golang:1.13
WORKDIR /src/unload
COPY . .
RUN GOOS=linux CGO_ENABLED=0 go build -ldflags="-s -w" .

FROM alpine:3.10
RUN [ ! -e /etc/nsswitch.conf ] && echo 'hosts: files dns' > /etc/nsswitch.conf
RUN apk --no-cache add ca-certificates
COPY --from=0 /src/unload/unload /usr/local/bin
EXPOSE 50051
CMD ["unload"]
ENTRYPOINT ["unload"]
72 changes: 0 additions & 72 deletions api.go

This file was deleted.

53 changes: 22 additions & 31 deletions aws.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
"github.com/aws/aws-sdk-go-v2/aws/ec2metadata"
"github.com/aws/aws-sdk-go-v2/aws/external"
"github.com/aws/aws-sdk-go-v2/service/elasticloadbalancingv2"
"k8s.io/klog"
"github.com/google/logger"
)

var (
Expand All @@ -18,22 +18,21 @@ var (
)

var (
watchNlbList = make(map[string]struct{})
errSetupLbv2 = fmt.Errorf("lbv2 is not setup")
)

func setupLbv2() error {
setupLbv2Once.Do(func() {
cfg, err := external.LoadDefaultAWSConfig()
if err != nil {
klog.Errorln(err)
logger.Errorln(err)
return
}
// work out aws current region
meta := ec2metadata.New(cfg)
cfg.Region, err = meta.Region()
if err != nil {
klog.Errorln(err)
logger.Errorln(err)
return
}
lbv2 = elasticloadbalancingv2.New(cfg)
Expand All @@ -46,60 +45,59 @@ func setupLbv2() error {
return nil
}

func regPod(targetGroupArn string, ip string, port int64) {
func regPod(arn *string, ip string, port int64) {
if err := setupLbv2(); err != nil {
klog.Warningln(err)
logger.Warningln(err)
return
}
req := lbv2.RegisterTargetsRequest(&elasticloadbalancingv2.RegisterTargetsInput{
TargetGroupArn: &targetGroupArn,
TargetGroupArn: arn,
Targets: []elasticloadbalancingv2.TargetDescription{{
Id: &ip,
Port: &port,
}},
})
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
ctx, cancel := context.WithTimeout(context.Background(), 6*time.Second)
defer cancel()
_, err := req.Send(ctx)
if err != nil {
klog.Errorln(err)
logger.Errorln(err)
}
}

func deregPod(targetGroupArn string, ip string, port int64) {
func deregPod(arn *string, ip string, port int64) {
if err := setupLbv2(); err != nil {
klog.Warningln(err)
logger.Warningln(err)
return
}
req := lbv2.DeregisterTargetsRequest(&elasticloadbalancingv2.DeregisterTargetsInput{
TargetGroupArn: &targetGroupArn,
TargetGroupArn: arn,
Targets: []elasticloadbalancingv2.TargetDescription{{
Id: &ip,
Port: &port,
}},
})
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()
_, err := req.Send(ctx)
if err != nil {
klog.Errorln(err)
if _, err := req.Send(ctx); err != nil {
logger.Errorln(err)
}
}

// this will remove out-of-synced unhealthy targets
func reconcile(targetGroupArn string) {
func reconcile(arn *string) {
if err := setupLbv2(); err != nil {
klog.Warningln(err)
logger.Warningln(err)
return
}
des := lbv2.DescribeTargetHealthRequest(&elasticloadbalancingv2.DescribeTargetHealthInput{
TargetGroupArn: &targetGroupArn,
TargetGroupArn: arn,
})
ctx, cancel := context.WithTimeout(context.Background(), 6*time.Second)
defer cancel()
res, err := des.Send(ctx)
if err != nil {
klog.Errorln(err)
logger.Errorln(err)
return
}
var targets []elasticloadbalancingv2.TargetDescription
Expand All @@ -110,24 +108,17 @@ func reconcile(targetGroupArn string) {
}
if len(targets) > 0 {
dereg := lbv2.DeregisterTargetsRequest(&elasticloadbalancingv2.DeregisterTargetsInput{
TargetGroupArn: &targetGroupArn,
TargetGroupArn: arn,
Targets: targets,
})
_, err := dereg.Send(ctx)
if err != nil {
klog.Errorln(err)
if _, err := dereg.Send(ctx); err != nil {
logger.Errorln(err)
}
}
}

func addWatchLbv2(targetGroupArn string) {
watchNlbList[targetGroupArn] = struct{}{}
}

func watchLbv2() {
for range time.Tick(20 * time.Second) {
for arn := range watchNlbList {
reconcile(arn)
}
for range time.Tick(3 * time.Minute) {
reconcile(targetGroupArn)
}
}
39 changes: 15 additions & 24 deletions ctl_k8s.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import (
"fmt"
"time"

"k8s.io/klog"
"github.com/google/logger"

v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/fields"
Expand All @@ -17,9 +17,8 @@ import (
)

const (
unloadPodPort = 50051
unloadIngressHostname = "unload.ingress.k8s.io/grpc-hostname"
unloadNlbTargetGroupArn = "unload.lb.k8s.io/aws-nlb-target-group-arn"
unloadPodPort = 50051
unloadNlbIPTarget = "unload.lb.k8s.io/aws-nlb-ip-target"
)

type controller struct {
Expand Down Expand Up @@ -57,11 +56,11 @@ func (c *controller) processNextItem() bool {
func (c *controller) updateLb(key string) error {
obj, exists, err := c.indexer.GetByKey(key)
if err != nil {
klog.Errorf("Fetching object with key %s from store failed with %v", key, err)
logger.Errorf("Fetching object with key %s from store failed with %v", key, err)
return err
}
if !exists {
klog.Infof("Pod %s does not exist anymore\n", key)
logger.Infof("Pod %s does not exist anymore\n", key)
return nil
}
// Note that you also have to check the uid if you have a local controlled resource, which
Expand All @@ -74,13 +73,8 @@ func (c *controller) updateLb(key string) error {
return nil
}
annotations := pod.GetAnnotations()
if host, ok := annotations[unloadIngressHostname]; ok {
// todo add pod port in
addDst(newConf(host, pod.Status.PodIP))
}
if arn, ok := annotations[unloadNlbTargetGroupArn]; ok {
regPod(arn, pod.Status.PodIP, unloadPodPort)
addWatchLbv2(arn)
if _, ok := annotations[unloadNlbIPTarget]; ok {
regPod(targetGroupArn, pod.Status.PodIP, unloadPodPort)
}
return nil
}
Expand All @@ -97,7 +91,7 @@ func (c *controller) handleErr(err error, key interface{}) {

// This controller retries 5 times if something goes wrong. After that, it stops trying.
if c.queue.NumRequeues(key) < 5 {
klog.Infof("Error syncing pod %v: %v", key, err)
logger.Infof("Error syncing pod %v: %v", key, err)

// Re-enqueue the key rate limited. Based on the rate limiter on the
// queue and the re-enqueue history, the key will be processed later again.
Expand All @@ -108,15 +102,15 @@ func (c *controller) handleErr(err error, key interface{}) {
c.queue.Forget(key)
// Report to an external entity that, even after several retries, we could not successfully process this key
runtime.HandleError(err)
klog.Infof("Dropping pod %q out of the queue: %v", key, err)
logger.Infof("Dropping pod %q out of the queue: %v", key, err)
}

func (c *controller) run(threadiness int, stopCh chan struct{}) {
defer runtime.HandleCrash()

// Let the workers stop when we are done
defer c.queue.ShutDown()
klog.Info("Starting Pod controller")
logger.Info("Starting Pod controller")

go c.informer.Run(stopCh)

Expand All @@ -131,7 +125,7 @@ func (c *controller) run(threadiness int, stopCh chan struct{}) {
}

<-stopCh
klog.Info("Stopping Pod controller")
logger.Info("Stopping Pod controller")
}

func (c *controller) runWorker() {
Expand All @@ -142,11 +136,11 @@ func (c *controller) runWorker() {
func startCtl() {
config, err := rest.InClusterConfig()
if err != nil {
klog.Fatal(err)
logger.Fatal(err)
}
clientset, err := kubernetes.NewForConfig(config)
if err != nil {
klog.Fatal(err)
logger.Fatal(err)
}
// create the pod watcher
podListWatcher := cache.NewListWatchFromClient(clientset.CoreV1().RESTClient(), "pods", v1.NamespaceDefault, fields.Everything())
Expand Down Expand Up @@ -179,11 +173,8 @@ func startCtl() {
return
}
annotations := pod.GetAnnotations()
if host, ok := annotations[unloadIngressHostname]; ok {
rmDst(newConf(host, pod.Status.PodIP))
}
if arn, ok := annotations[unloadNlbTargetGroupArn]; ok {
deregPod(arn, pod.Status.PodIP, unloadPodPort)
if _, ok := annotations[unloadNlbIPTarget]; ok {
deregPod(targetGroupArn, pod.Status.PodIP, unloadPodPort)
}
},
}, cache.Indexers{})
Expand Down
2 changes: 0 additions & 2 deletions examples/fortune-teller.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,6 @@ spec:
metadata:
labels:
app: fortune-teller-app
annotations:
unload.ingress.k8s.io/grpc-hostname: teller.local
spec:
containers:
- name: fortune-teller-app
Expand Down
25 changes: 18 additions & 7 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,12 +1,23 @@
module github.com/owlwalks/unload

go 1.12
go 1.13

require (
github.com/aws/aws-sdk-go-v2 v0.10.0
golang.org/x/net v0.0.0-20190206173232-65e2d4e15006
k8s.io/api v0.0.0-20190620084959-7cf5895f2711
k8s.io/apimachinery v0.0.0-20190612205821-1799e75a0719
k8s.io/client-go v0.0.0-20190620085101-78d2af792bab
k8s.io/klog v0.3.1
github.com/aws/aws-sdk-go-v2 v0.12.0
github.com/gogo/protobuf v1.3.0 // indirect
github.com/google/logger v1.0.1
github.com/googleapis/gnostic v0.3.1 // indirect
github.com/hashicorp/golang-lru v0.5.3 // indirect
github.com/json-iterator/go v1.1.7 // indirect
github.com/modern-go/reflect2 v1.0.1 // indirect
golang.org/x/net v0.0.0-20190930134127-c5a3c61f89f3
golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45 // indirect
golang.org/x/time v0.0.0-20190921001708-c4c64cad1fd0 // indirect
gopkg.in/inf.v0 v0.9.1 // indirect
k8s.io/api v0.0.0-20190816222004-e3a6b8045b0b
k8s.io/apimachinery v0.0.0-20190816221834-a9f1d8a9c101
k8s.io/client-go v11.0.1-0.20190918222721-c0e3722d5cf0+incompatible
k8s.io/klog v1.0.0 // indirect
k8s.io/utils v0.0.0-20190923111123-69764acb6e8e // indirect
sigs.k8s.io/yaml v1.1.0 // indirect
)
Loading