From 1833460992168e9e380c4673164d5c54e7815c38 Mon Sep 17 00:00:00 2001 From: Antonio Ojea Date: Fri, 6 Sep 2024 13:50:41 +0000 Subject: [PATCH] add dns caching Change-Id: If3dae1963ca002dd95b8c121799cd8c034c3d93d --- cmd/kindnetd/dnscache.go | 593 +++++++++++++++++++++++++++++++++++++++ cmd/kindnetd/main.go | 38 ++- cmd/kindnetd/masq.go | 2 +- install-kindnet.yaml | 5 +- 4 files changed, 627 insertions(+), 11 deletions(-) create mode 100644 cmd/kindnetd/dnscache.go diff --git a/cmd/kindnetd/dnscache.go b/cmd/kindnetd/dnscache.go new file mode 100644 index 0000000..70f5ce4 --- /dev/null +++ b/cmd/kindnetd/dnscache.go @@ -0,0 +1,593 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package main + +import ( + "context" + "fmt" + "net" + "os" + "strings" + "sync" + "time" + + "golang.org/x/net/dns/dnsmessage" + utilerrors "k8s.io/apimachinery/pkg/util/errors" + "k8s.io/apimachinery/pkg/util/wait" + coreinformers "k8s.io/client-go/informers/core/v1" + v1 "k8s.io/client-go/listers/core/v1" + "k8s.io/client-go/tools/cache" + "k8s.io/klog/v2" + utilio "k8s.io/utils/io" + "k8s.io/utils/ptr" + "sigs.k8s.io/knftables" +) + +const ( + maxResolvConfLength = 10 * 1 << 20 // 10MB + expireTimeout = 30 * time.Second // same as CoreDNS? +) + +// NewDNSCacheAgent caches all DNS traffic from Pods with network based on the PodCIDR of the node they are running. +// Cache logic is very specific to Kubernetes, +func NewDNSCacheAgent(nodeName string, nodeInformer coreinformers.NodeInformer) (*DNSCacheAgent, error) { + klog.V(2).Info("Initializing nftables") + nft, err := knftables.New(knftables.InetFamily, "kindnet-dnscache") + if err != nil { + return nil, err + } + + d := &DNSCacheAgent{ + nft: nft, + nodeName: nodeName, + nodeLister: nodeInformer.Lister(), + nodesSynced: nodeInformer.Informer().HasSynced, + interval: 5 * time.Minute, + cache: newIPCache(), + } + + return d, nil +} + +// DNSCacheAgent caches all DNS traffic from Pods with network based on the PodCIDR of the node they are running. +// Cache logic is very specific to Kubernetes, +type DNSCacheAgent struct { + nft knftables.Interface + nodeName string + nodeLister v1.NodeLister + nodesSynced cache.InformerSynced + interval time.Duration + + podCIDRv4 string + podCIDRv6 string + flushed bool + + localAddr string // UDP server listener address + resolver *net.Resolver + cache *ipCache +} + +type ipEntry struct { + ts time.Time + ips []net.IP +} + +type ipCache struct { + mu sync.RWMutex + cacheV4Address map[string]ipEntry + cacheV6Address map[string]ipEntry +} + +func (i *ipCache) add(network string, host string, ips []net.IP) { + i.mu.Lock() + defer i.mu.Unlock() + now := time.Now() + entry := ipEntry{ + ts: now, + ips: ips, + } + if network == "ip6" { + i.cacheV6Address[host] = entry + } + if network == "ip4" { + i.cacheV4Address[host] = entry + } +} + +func (i *ipCache) get(network string, host string, ttl time.Duration) []net.IP { + var entry ipEntry + var ok bool + + i.mu.RLock() + if network == "ip6" { + entry, ok = i.cacheV6Address[host] + } + if network == "ip4" { + entry, ok = i.cacheV4Address[host] + } + i.mu.RUnlock() + if !ok { + return nil + } + // check if the entry is still valid + if entry.ts.Add(ttl).Before(time.Now()) { + i.delete(network, host) + return nil + } + return entry.ips +} +func (i *ipCache) delete(network string, host string) { + i.mu.Lock() + defer i.mu.Unlock() + if network == "ip6" { + delete(i.cacheV6Address, host) + } + if network == "ip4" { + delete(i.cacheV4Address, host) + } +} + +func newIPCache() *ipCache { + return &ipCache{ + cacheV4Address: map[string]ipEntry{}, + cacheV6Address: map[string]ipEntry{}, + } +} + +// Run syncs dns cache intercept rules +func (d *DNSCacheAgent) Run(ctx context.Context) error { + if !cache.WaitForNamedCacheSync("kindnet-dnscache", ctx.Done(), d.nodesSynced) { + return fmt.Errorf("error syncing cache") + } + klog.Info("Waiting for node parameters") + err := wait.PollUntilContextCancel(ctx, 1*time.Second, true, func(context.Context) (bool, error) { + node, err := d.nodeLister.Get(d.nodeName) + if err != nil { + return false, nil + } + podCIDRsv4, podCIDRsv6 := splitCIDRslice(node.Spec.PodCIDRs) + klog.V(7).Infof("Got %v and %v from node %s", podCIDRsv4, podCIDRsv6, node.Name) + if len(podCIDRsv4) > 0 { + d.podCIDRv4 = podCIDRsv4[0] + } + if len(podCIDRsv6) > 0 { + d.podCIDRv6 = podCIDRsv6[0] + } + return true, nil + }) + if err != nil { + return err + } + // start listener + conn, err := net.ListenUDP("udp", &net.UDPAddr{IP: net.IPv4(127, 0, 0, 1), Port: 0}) + if err != nil { + return err + } + defer conn.Close() + + d.localAddr = conn.LocalAddr().String() + go func() { + for { + buf := make([]byte, 1024) + n, addr, err := conn.ReadFromUDP(buf) + if err != nil { + klog.Infof("error on UDP connection: %v", err) + continue + } + go d.serveDNS(conn, addr, buf[:n]) + } + }() + + // kindnet is using hostNetwork and dnsPolicy: ClusterFirstWithHostNet + // so its resolv.conf will have the configuration from the network Pods + klog.Info("Configuring upstream DNS resolver") + hostDNS, hostSearch, hostOptions, err := parseResolvConf() + if err != nil { + err := fmt.Errorf("encountered error while parsing resolv conf file. Error: %w", err) + klog.ErrorS(err, "Could not parse resolv conf file.") + return err + } + + klog.V(2).Infof("Parsed resolv.conf: nameservers: %v search: %v options: %v", hostDNS, hostSearch, hostOptions) + + d.resolver = &net.Resolver{ + PreferGo: true, + Dial: func(ctx context.Context, network, address string) (net.Conn, error) { + // TODO check multiple nameservers + return net.Dial(network, hostDNS[0]) + }, + } + + klog.Info("Syncing nftables rules") + errs := 0 + for { + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + if err := d.SyncRules(ctx); err != nil { + errs++ + if errs > 3 { + return fmt.Errorf("can't synchronize rules after 3 attempts: %v", err) + } + } else { + errs = 0 + } + time.Sleep(d.interval) + } +} + +func (d *DNSCacheAgent) serveDNS(conn *net.UDPConn, addr *net.UDPAddr, data []byte) { + _, err := conn.WriteTo(d.dnsPacketRoundTrip(data), addr) + if err != nil { + klog.Infof("error writing DNS answer: %v", err) + } +} + +// SyncRules syncs ip masquerade rules +func (d *DNSCacheAgent) SyncRules(ctx context.Context) error { + table := &knftables.Table{ + Comment: knftables.PtrTo("rules for kindnet dnscache"), + } + tx := d.nft.NewTransaction() + // do it once to delete the existing table + if !d.flushed { + tx.Add(table) + tx.Delete(table) + d.flushed = true + } + tx.Add(table) + + hook := knftables.PreroutingHook + chainName := string(hook) + tx.Add(&knftables.Chain{ + Name: chainName, + Type: knftables.PtrTo(knftables.FilterType), + Hook: knftables.PtrTo(hook), + Priority: knftables.PtrTo(knftables.ManglePriority + "-5"), + }) + tx.Flush(&knftables.Chain{ + Name: chainName, + }) + // process coming from Pods destined to the DNS server + // https://www.netfilter.org/projects/nftables/manpage.html + // TODO: obtain the DNS server for the Pods from the kubelet config + // Port 10250/configz ?? + if d.podCIDRv4 != "" { + tx.Add(&knftables.Rule{ + Chain: chainName, + Rule: knftables.Concat( + "ip saddr", d.podCIDRv4, "meta l4proto udp th dport 53", "tproxy ip to", d.localAddr, "accept", + ), + Comment: ptr.To("DNS IPv4 pod originated traffic"), + }) + } + + if d.podCIDRv6 != "" { + tx.Add(&knftables.Rule{ + Chain: chainName, + Rule: knftables.Concat( + "ip saddr", d.podCIDRv6, "meta l4proto udp th dport 53", "tproxy ip to", d.localAddr, "accept", + ), + Comment: ptr.To("DNS IPv4 pod originated traffic"), + }) + } + + if err := d.nft.Run(ctx, tx); err != nil { + klog.Infof("error syncing nftables rules %v", err) + return err + } + return nil +} + +func (d *DNSCacheAgent) CleanRules() { + tx := d.nft.NewTransaction() + // Add+Delete is idempotent and won't return an error if the table doesn't already + // exist. + tx.Add(&knftables.Table{}) + tx.Delete(&knftables.Table{}) + + if err := d.nft.Run(context.TODO(), tx); err != nil { + klog.Infof("error deleting nftables rules %v", err) + } +} + +const ttl = 300 + +func (d *DNSCacheAgent) dnsPacketRoundTrip(b []byte) []byte { + var p dnsmessage.Parser + hdr, err := p.Start(b) + if err != nil { + return dnsErrorMessage(hdr.ID, dnsmessage.RCodeFormatError, dnsmessage.Question{}) + } + // RFC1035 max 512 bytes for UDP + if len(b) > 512 { + return dnsErrorMessage(hdr.ID, dnsmessage.RCodeFormatError, dnsmessage.Question{}) + } + + // Only support 1 question, ref: + // https://cs.opensource.google/go/x/net/+/e898025e:dns/dnsmessage/message.go + // Multiple questions are valid according to the spec, + // but servers don't actually support them. There will + // be at most one question here. + questions, err := p.AllQuestions() + if err != nil { + return dnsErrorMessage(hdr.ID, dnsmessage.RCodeFormatError, dnsmessage.Question{}) + } + if len(questions) > 1 { + return dnsErrorMessage(hdr.ID, dnsmessage.RCodeNotImplemented, dnsmessage.Question{}) + } else if len(questions) == 0 { + return dnsErrorMessage(hdr.ID, dnsmessage.RCodeFormatError, dnsmessage.Question{}) + } + + answer, delegate := d.processDNSRequest(hdr.ID, questions[0]) + // pass it through + if delegate { + // the dialer overrides the parameters with the upstream dns resolver + conn, err := d.resolver.Dial(context.Background(), "network", "address") + if err != nil { + return dnsErrorMessage(hdr.ID, dnsmessage.RCodeFormatError, dnsmessage.Question{}) + } + defer conn.Close() + _, err = conn.Write(b) + if err != nil { + return dnsErrorMessage(hdr.ID, dnsmessage.RCodeFormatError, dnsmessage.Question{}) + } + buf := make([]byte, 1500) + n, err := conn.Read(buf) + if err != nil { + klog.Infof("error on UDP connection: %v", err) + return dnsErrorMessage(hdr.ID, dnsmessage.RCodeFormatError, dnsmessage.Question{}) + } + answer = buf[:n] + } + + // Return a truncated packet if the answer is too big + if len(answer) > 512 { + answer = dnsTruncatedMessage(hdr.ID, questions[0]) + } + + return answer +} + +// dnsErrorMessage return an encoded dns error message +func dnsErrorMessage(id uint16, rcode dnsmessage.RCode, q dnsmessage.Question) []byte { + msg := dnsmessage.Message{ + Header: dnsmessage.Header{ + ID: id, + Response: true, + Authoritative: true, + RCode: rcode, + }, + Questions: []dnsmessage.Question{q}, + } + buf, err := msg.Pack() + if err != nil { + panic(err) + } + return buf +} + +func dnsTruncatedMessage(id uint16, q dnsmessage.Question) []byte { + msg := dnsmessage.Message{ + Header: dnsmessage.Header{ + ID: id, + Response: true, + Authoritative: true, + Truncated: true, + }, + Questions: []dnsmessage.Question{q}, + } + buf, err := msg.Pack() + if err != nil { + panic(err) + } + return buf +} + +// processDNSRequest implements dnsHandlerFunc so it can be used in a DNSCache +// transforming a DNS request to the corresponding Golang Lookup functions. +// If is not able to process the request it delegates to the caller the request. +func (d *DNSCacheAgent) processDNSRequest(id uint16, q dnsmessage.Question) ([]byte, bool) { + // DNS packet length is encoded in 2 bytes + buf := []byte{} + answer := dnsmessage.NewBuilder(buf, + dnsmessage.Header{ + ID: id, + Response: true, + Authoritative: true, + }) + answer.EnableCompression() + err := answer.StartQuestions() + if err != nil { + return dnsErrorMessage(id, dnsmessage.RCodeServerFailure, q), false + } + answer.Question(q) // nolint: errcheck + err = answer.StartAnswers() + if err != nil { + return dnsErrorMessage(id, dnsmessage.RCodeServerFailure, q), false + } + switch q.Type { + case dnsmessage.TypeA: + addrs, err := d.lookupIP(context.Background(), "ip4", q.Name.String()) + if err != nil { + return dnsErrorMessage(id, dnsmessage.RCodeServerFailure, q), false + } + for _, ip := range addrs { + a := ip.To4() + if a == nil { + continue + } + err = answer.AResource( + dnsmessage.ResourceHeader{ + Name: q.Name, + Class: q.Class, + TTL: ttl, + }, + dnsmessage.AResource{ + A: [4]byte{a[0], a[1], a[2], a[3]}, + }, + ) + if err != nil { + return dnsErrorMessage(id, dnsmessage.RCodeServerFailure, q), false + } + } + case dnsmessage.TypeAAAA: + addrs, err := d.lookupIP(context.Background(), "ip6", q.Name.String()) + if err != nil { + return dnsErrorMessage(id, dnsmessage.RCodeServerFailure, q), false + } + for _, ip := range addrs { + if ip.To16() == nil || ip.To4() != nil { + continue + } + var aaaa [16]byte + copy(aaaa[:], ip.To16()) + err = answer.AAAAResource( + dnsmessage.ResourceHeader{ + Name: q.Name, + Class: q.Class, + TTL: ttl, + }, + dnsmessage.AAAAResource{ + AAAA: aaaa, + }, + ) + if err != nil { + return dnsErrorMessage(id, dnsmessage.RCodeServerFailure, q), false + } + } + case dnsmessage.TypePTR: + return nil, true + case dnsmessage.TypeSRV: + return nil, true + case dnsmessage.TypeNS: + return nil, true + case dnsmessage.TypeCNAME: + return nil, true + case dnsmessage.TypeSOA: + return nil, true + case dnsmessage.TypeMX: + return nil, true + case dnsmessage.TypeTXT: + return nil, true + default: + return nil, true + } + buf, err = answer.Finish() + if err != nil { + return dnsErrorMessage(id, dnsmessage.RCodeServerFailure, q), false + } + return buf, false +} + +func (d *DNSCacheAgent) lookupIP(ctx context.Context, network, host string) ([]net.IP, error) { + ips := d.cache.get(network, host, expireTimeout) + if len(ips) > 0 { + klog.V(4).Infof("Cached entries for %s %s : %v", network, host, ips) + return ips, nil + } + ips, err := d.resolver.LookupIP(ctx, network, host) + if err != nil { + return nil, err + } + d.cache.add(network, host, ips) + klog.V(4).Infof("Caching new entries for %s %s : %v", network, host, ips) + return ips, nil +} + +// https://github.com/kubernetes/kubernetes/blob/2108e54f5249c6b3b0c9f824314cb5f33c01e3f4/pkg/kubelet/network/dns/dns.go#L176 +// parseResolvConf reads a resolv.conf file from the given reader, and parses +// it into nameservers, searches and options, possibly returning an error. +func parseResolvConf() (nameservers []string, searches []string, options []string, err error) { + f, err := os.Open("/etc/resolv.conf") + if err != nil { + klog.ErrorS(err, "Could not open resolv conf file.") + return nil, nil, nil, err + } + defer f.Close() + + file, err := utilio.ReadAtMost(f, maxResolvConfLength) + if err != nil { + return nil, nil, nil, err + } + + // Lines of the form "nameserver 1.2.3.4" accumulate. + nameservers = []string{} + + // Lines of the form "search example.com" overrule - last one wins. + searches = []string{} + + // Lines of the form "option ndots:5 attempts:2" overrule - last one wins. + // Each option is recorded as an element in the array. + options = []string{} + + var allErrors []error + lines := strings.Split(string(file), "\n") + for l := range lines { + trimmed := strings.TrimSpace(lines[l]) + if strings.HasPrefix(trimmed, "#") { + continue + } + fields := strings.Fields(trimmed) + if len(fields) == 0 { + continue + } + if fields[0] == "nameserver" { + if len(fields) >= 2 { + nameservers = append(nameservers, fields[1]) + } else { + allErrors = append(allErrors, fmt.Errorf("nameserver list is empty ")) + } + } + if fields[0] == "search" { + // Normalise search fields so the same domain with and without trailing dot will only count once, to avoid hitting search validation limits. + searches = []string{} + for _, s := range fields[1:] { + if s != "." { + searches = append(searches, strings.TrimSuffix(s, ".")) + } + } + } + if fields[0] == "options" { + options = appendOptions(options, fields[1:]...) + } + } + + return nameservers, searches, options, utilerrors.NewAggregate(allErrors) +} + +// appendOptions appends options to the given list, but does not add duplicates. +// append option will overwrite the previous one either in new line or in the same line. +func appendOptions(options []string, newOption ...string) []string { + var optionMap = make(map[string]string) + for _, option := range options { + optName := strings.Split(option, ":")[0] + optionMap[optName] = option + } + for _, option := range newOption { + optName := strings.Split(option, ":")[0] + optionMap[optName] = option + } + + options = []string{} + for _, v := range optionMap { + options = append(options, v) + } + return options +} diff --git a/cmd/kindnetd/main.go b/cmd/kindnetd/main.go index e0c9673..09128e5 100644 --- a/cmd/kindnetd/main.go +++ b/cmd/kindnetd/main.go @@ -64,6 +64,7 @@ const ( var ( useBridge bool networkpolicies bool + dnsCaching bool hostnameOverride string masquerading bool noMasqueradeCIDRs string @@ -72,6 +73,7 @@ var ( func init() { flag.BoolVar(&useBridge, "cni-bridge", false, "If set, enable the CNI bridge plugin (default is the ptp plugin)") flag.BoolVar(&networkpolicies, "network-policy", true, "If set, enable Network Policies (default true)") + flag.BoolVar(&dnsCaching, "dns-caching", true, "If set, enable Kubernetes DNS caching (default true)") flag.StringVar(&hostnameOverride, "hostname-override", "", "If non-empty, will be used as the name of the Node that kube-network-policies is running on. If unset, the node name is assumed to be the same as the node's hostname.") flag.BoolVar(&masquerading, "masquerading", true, "masquerade with the Node IP the cluster to external traffic (default true)") flag.StringVar(&noMasqueradeCIDRs, "no-masquerade-cidr", "", "Comma seperated list of CIDRs that will not be masqueraded.") @@ -90,6 +92,16 @@ func main() { flag.VisitAll(func(flag *flag.Flag) { klog.Infof("FLAG: --%s=%q", flag.Name, flag.Value) }) + + var err error + nodeName := hostnameOverride + if nodeName == "" { + nodeName, err = os.Hostname() + if err != nil { + klog.Fatalf("couldn't determine hostname: %v", err) + } + } + // create a Kubernetes client config, err := rest.InClusterConfig() if err != nil { @@ -182,19 +194,29 @@ func main() { klog.Info("Skipping ipMasqAgent") } + // create a dnsCacheAgent + if dnsCaching { + klog.Infof("caching DNS cluster traffic") + dnsCacheAgent, err := NewDNSCacheAgent(nodeName, nodeInformer) + if err != nil { + klog.Fatalf("error creating dnsCacheAgent agent: %v", err) + } + + go func() { + defer dnsCacheAgent.CleanRules() + if err := dnsCacheAgent.Run(ctx); err != nil { + klog.Infof("error running dnsCacheAgent agent: %v", err) + } + }() + } else { + klog.Info("Skipping dnsCacheAgent") + } + // setup nodes reconcile function, closes over arguments reconcileNodes := makeNodesReconciler(cniConfigWriter, hostIP) // network policies if networkpolicies { - nodeName := hostnameOverride - if nodeName == "" { - nodeName, err = os.Hostname() - if err != nil { - klog.Fatalf("couldn't determine hostname: %v", err) - } - } - cfg := networkpolicy.Config{ FailOpen: true, QueueID: 100, diff --git a/cmd/kindnetd/masq.go b/cmd/kindnetd/masq.go index 843f2dd..a664eb1 100644 --- a/cmd/kindnetd/masq.go +++ b/cmd/kindnetd/masq.go @@ -35,7 +35,7 @@ import ( // but allows to masquerade the cluster to external traffic. func NewIPMasqAgent(nodeInformer coreinformers.NodeInformer, noMasqueradeCIDRs string) (*IPMasqAgent, error) { klog.V(2).Info("Initializing nftables") - nft, err := knftables.New(knftables.InetFamily, "kindnet") + nft, err := knftables.New(knftables.InetFamily, "kindnet-ipmasq") if err != nil { return nil, err } diff --git a/install-kindnet.yaml b/install-kindnet.yaml index c7d6909..338cc7b 100644 --- a/install-kindnet.yaml +++ b/install-kindnet.yaml @@ -74,20 +74,21 @@ spec: k8s-app: kindnet spec: hostNetwork: true + dnsPolicy: ClusterFirstWithHostNet tolerations: - operator: Exists effect: NoSchedule serviceAccountName: kindnet initContainers: - name: install-cni-bin - image: ghcr.io/aojea/kindnetd:v1.5.0 + image: ghcr.io/aojea/kindnetd:v1.6.0 command: ['sh', '-c', 'cd /opt/cni/bin; for i in * ; do cat $i > /cni/$i ; chmod +x /cni/$i ; done'] volumeMounts: - name: cni-bin mountPath: /cni containers: - name: kindnet-cni - image: ghcr.io/aojea/kindnetd:v1.5.0 + image: ghcr.io/aojea/kindnetd:v1.6.0 args: - /bin/kindnetd - --v=2