Skip to content

Commit

Permalink
Add support for NodePort services (#559)
Browse files Browse the repository at this point in the history
* First stab at NodePort support. Testing incomplete

* Fix up the unit tests

* Remove some deadcode in the unittests

* gather node ips once and add support for srv records

* Make sure we match gofmt simple

* Move the nodes to the testcase and add a test for clusters that only have internal ip addresses

* Somehow forgot about the weight field in the records

* Add SRV as a supported record type
  • Loading branch information
grimmy authored and linki committed Jun 14, 2018
1 parent 49f36ea commit 2ee4b2e
Show file tree
Hide file tree
Showing 4 changed files with 282 additions and 10 deletions.
2 changes: 2 additions & 0 deletions endpoint/endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ const (
RecordTypeCNAME = "CNAME"
// RecordTypeTXT is a RecordType enum value
RecordTypeTXT = "TXT"
// RecordTypeSRV is a RecordType enum value
RecordTypeSRV = "SRV"
)

// TTL is a structure defining the TTL of a DNS record
Expand Down
4 changes: 2 additions & 2 deletions provider/recordfilter.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@ limitations under the License.
package provider

// supportedRecordType returns true only for supported record types.
// Currently only A, CNAME and TXT record types are supported.
// Currently A, CNAME, SRV, and TXT record types are supported.
func supportedRecordType(recordType string) bool {
switch recordType {
case "A", "CNAME", "TXT":
case "A", "CNAME", "SRV", "TXT":
return true
default:
return false
Expand Down
91 changes: 83 additions & 8 deletions source/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,12 @@ func (sc *serviceSource) Endpoints() ([]*endpoint.Endpoint, error) {
return nil, err
}

// get the ip addresses of all the nodes and cache them for this run
nodeTargets, err := sc.extractNodeTargets()
if err != nil {
return nil, err
}

endpoints := []*endpoint.Endpoint{}

for _, svc := range services.Items {
Expand All @@ -101,7 +107,7 @@ func (sc *serviceSource) Endpoints() ([]*endpoint.Endpoint, error) {
continue
}

svcEndpoints := sc.endpoints(&svc)
svcEndpoints := sc.endpoints(&svc, nodeTargets)

// process legacy annotations if no endpoints were returned and compatibility mode is enabled.
if len(svcEndpoints) == 0 && sc.compatibility != "" {
Expand All @@ -110,7 +116,7 @@ func (sc *serviceSource) Endpoints() ([]*endpoint.Endpoint, error) {

// apply template if none of the above is found
if (sc.combineFQDNAnnotation || len(svcEndpoints) == 0) && sc.fqdnTemplate != nil {
sEndpoints, err := sc.endpointsFromTemplate(&svc)
sEndpoints, err := sc.endpointsFromTemplate(&svc, nodeTargets)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -169,7 +175,8 @@ func (sc *serviceSource) extractHeadlessEndpoints(svc *v1.Service, hostname stri

return endpoints
}
func (sc *serviceSource) endpointsFromTemplate(svc *v1.Service) ([]*endpoint.Endpoint, error) {

func (sc *serviceSource) endpointsFromTemplate(svc *v1.Service, nodeTargets endpoint.Targets) ([]*endpoint.Endpoint, error) {
var endpoints []*endpoint.Endpoint

// Process the whole template string
Expand All @@ -181,19 +188,19 @@ func (sc *serviceSource) endpointsFromTemplate(svc *v1.Service) ([]*endpoint.End

hostnameList := strings.Split(strings.Replace(buf.String(), " ", "", -1), ",")
for _, hostname := range hostnameList {
endpoints = append(endpoints, sc.generateEndpoints(svc, hostname)...)
endpoints = append(endpoints, sc.generateEndpoints(svc, hostname, nodeTargets)...)
}

return endpoints, nil
}

// endpointsFromService extracts the endpoints from a service object
func (sc *serviceSource) endpoints(svc *v1.Service) []*endpoint.Endpoint {
func (sc *serviceSource) endpoints(svc *v1.Service, nodeTargets endpoint.Targets) []*endpoint.Endpoint {
var endpoints []*endpoint.Endpoint

hostnameList := getHostnamesFromAnnotations(svc.Annotations)
for _, hostname := range hostnameList {
endpoints = append(endpoints, sc.generateEndpoints(svc, hostname)...)
endpoints = append(endpoints, sc.generateEndpoints(svc, hostname, nodeTargets)...)
}

return endpoints
Expand Down Expand Up @@ -236,7 +243,7 @@ func (sc *serviceSource) setResourceLabel(service v1.Service, endpoints []*endpo
}
}

func (sc *serviceSource) generateEndpoints(svc *v1.Service, hostname string) []*endpoint.Endpoint {
func (sc *serviceSource) generateEndpoints(svc *v1.Service, hostname string, nodeTargets endpoint.Targets) []*endpoint.Endpoint {
hostname = strings.TrimSuffix(hostname, ".")
ttl, err := getTTLFromAnnotations(svc.Annotations)
if err != nil {
Expand Down Expand Up @@ -272,7 +279,10 @@ func (sc *serviceSource) generateEndpoints(svc *v1.Service, hostname string) []*
if svc.Spec.ClusterIP == v1.ClusterIPNone {
endpoints = append(endpoints, sc.extractHeadlessEndpoints(svc, hostname, ttl)...)
}

case v1.ServiceTypeNodePort:
// add the nodeTargets and extract an SRV endpoint
targets = append(targets, nodeTargets...)
endpoints = append(endpoints, sc.extractNodePortEndpoints(svc, nodeTargets, hostname, ttl)...)
}

for _, t := range targets {
Expand Down Expand Up @@ -316,3 +326,68 @@ func extractLoadBalancerTargets(svc *v1.Service) endpoint.Targets {

return targets
}

func (sc *serviceSource) extractNodeTargets() (endpoint.Targets, error) {
var (
internalIPs endpoint.Targets
externalIPs endpoint.Targets
)

nodes, err := sc.client.CoreV1().Nodes().List(metav1.ListOptions{})
if err != nil {
return nil, err
}

for _, node := range nodes.Items {
for _, address := range node.Status.Addresses {
switch address.Type {
case v1.NodeExternalIP:
externalIPs = append(externalIPs, address.Address)
case v1.NodeInternalIP:
internalIPs = append(internalIPs, address.Address)
}
}
}

if len(externalIPs) > 0 {
return externalIPs, nil
}

return internalIPs, nil
}

func (sc *serviceSource) extractNodePortEndpoints(svc *v1.Service, nodeTargets endpoint.Targets, hostname string, ttl endpoint.TTL) []*endpoint.Endpoint {
var endpoints []*endpoint.Endpoint

for _, port := range svc.Spec.Ports {
if port.NodePort > 0 {
// build a target with a priority of 0, weight of 0, and pointing the given port on the given host
target := fmt.Sprintf("0 50 %d %s", port.NodePort, hostname)

// figure out the portname
portName := port.Name
if portName == "" {
portName = fmt.Sprintf("%d", port.NodePort)
}

// figure out the protocol
protocol := strings.ToLower(string(port.Protocol))
if protocol == "" {
protocol = "tcp"
}

recordName := fmt.Sprintf("_%s._%s.%s", portName, protocol, hostname)

var ep *endpoint.Endpoint
if ttl.IsConfigured() {
ep = endpoint.NewEndpointWithTTL(recordName, endpoint.RecordTypeSRV, ttl, target)
} else {
ep = endpoint.NewEndpoint(recordName, endpoint.RecordTypeSRV, target)
}

endpoints = append(endpoints, ep)
}
}

return endpoints
}
195 changes: 195 additions & 0 deletions source/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1022,6 +1022,201 @@ func TestClusterIpServices(t *testing.T) {
}
}

// testNodePortServices tests that various services generate the correct endpoints.
func TestNodePortServices(t *testing.T) {
for _, tc := range []struct {
title string
targetNamespace string
annotationFilter string
svcNamespace string
svcName string
svcType v1.ServiceType
compatibility string
fqdnTemplate string
labels map[string]string
annotations map[string]string
lbs []string
expected []*endpoint.Endpoint
expectError bool
nodes []*v1.Node
}{
{
"annotated NodePort services return an endpoint with IP addresses of the cluster's nodes",
"",
"",
"testing",
"foo",
v1.ServiceTypeNodePort,
"",
"",
map[string]string{},
map[string]string{
hostnameAnnotationKey: "foo.example.org.",
},
nil,
[]*endpoint.Endpoint{
{DNSName: "_30192._tcp.foo.example.org", Targets: endpoint.Targets{"0 50 30192 foo.example.org"}, RecordType: endpoint.RecordTypeSRV},
{DNSName: "foo.example.org", Targets: endpoint.Targets{"54.10.11.1", "54.10.11.2"}, RecordType: endpoint.RecordTypeA},
},
false,
[]*v1.Node{{
ObjectMeta: metav1.ObjectMeta{
Name: "node1",
},
Status: v1.NodeStatus{
Addresses: []v1.NodeAddress{
{Type: v1.NodeExternalIP, Address: "54.10.11.1"},
{Type: v1.NodeInternalIP, Address: "10.0.1.1"},
},
},
}, {
ObjectMeta: metav1.ObjectMeta{
Name: "node2",
},
Status: v1.NodeStatus{
Addresses: []v1.NodeAddress{
{Type: v1.NodeExternalIP, Address: "54.10.11.2"},
{Type: v1.NodeInternalIP, Address: "10.0.1.2"},
},
},
}},
},
{
"non-annotated NodePort services with set fqdnTemplate return an endpoint with target IP",
"",
"",
"testing",
"foo",
v1.ServiceTypeNodePort,
"",
"{{.Name}}.bar.example.com",
map[string]string{},
map[string]string{},
nil,
[]*endpoint.Endpoint{
{DNSName: "_30192._tcp.foo.bar.example.com", Targets: endpoint.Targets{"0 50 30192 foo.bar.example.com"}, RecordType: endpoint.RecordTypeSRV},
{DNSName: "foo.bar.example.com", Targets: endpoint.Targets{"54.10.11.1", "54.10.11.2"}, RecordType: endpoint.RecordTypeA},
},
false,
[]*v1.Node{{
ObjectMeta: metav1.ObjectMeta{
Name: "node1",
},
Status: v1.NodeStatus{
Addresses: []v1.NodeAddress{
{Type: v1.NodeExternalIP, Address: "54.10.11.1"},
{Type: v1.NodeInternalIP, Address: "10.0.1.1"},
},
},
}, {
ObjectMeta: metav1.ObjectMeta{
Name: "node2",
},
Status: v1.NodeStatus{
Addresses: []v1.NodeAddress{
{Type: v1.NodeExternalIP, Address: "54.10.11.2"},
{Type: v1.NodeInternalIP, Address: "10.0.1.2"},
},
},
}},
},
{
"annotated NodePort services return an endpoint with IP addresses of the private cluster's nodes",
"",
"",
"testing",
"foo",
v1.ServiceTypeNodePort,
"",
"",
map[string]string{},
map[string]string{
hostnameAnnotationKey: "foo.example.org.",
},
nil,
[]*endpoint.Endpoint{
{DNSName: "_30192._tcp.foo.example.org", Targets: endpoint.Targets{"0 50 30192 foo.example.org"}, RecordType: endpoint.RecordTypeSRV},
{DNSName: "foo.example.org", Targets: endpoint.Targets{"10.0.1.1", "10.0.1.2"}, RecordType: endpoint.RecordTypeA},
},
false,
[]*v1.Node{{
ObjectMeta: metav1.ObjectMeta{
Name: "node1",
},
Status: v1.NodeStatus{
Addresses: []v1.NodeAddress{
{Type: v1.NodeInternalIP, Address: "10.0.1.1"},
},
},
}, {
ObjectMeta: metav1.ObjectMeta{
Name: "node2",
},
Status: v1.NodeStatus{
Addresses: []v1.NodeAddress{
{Type: v1.NodeInternalIP, Address: "10.0.1.2"},
},
},
}},
},
} {
t.Run(tc.title, func(t *testing.T) {
// Create a Kubernetes testing client
kubernetes := fake.NewSimpleClientset()

// Create the nodes
for _, node := range tc.nodes {
if _, err := kubernetes.Core().Nodes().Create(node); err != nil {
t.Fatal(err)
}
}

// Create a service to test against
service := &v1.Service{
Spec: v1.ServiceSpec{
Type: tc.svcType,
Ports: []v1.ServicePort{
{
NodePort: 30192,
},
},
},
ObjectMeta: metav1.ObjectMeta{
Namespace: tc.svcNamespace,
Name: tc.svcName,
Labels: tc.labels,
Annotations: tc.annotations,
},
}

_, err := kubernetes.CoreV1().Services(service.Namespace).Create(service)
require.NoError(t, err)

// Create our object under test and get the endpoints.
client, _ := NewServiceSource(
kubernetes,
tc.targetNamespace,
tc.annotationFilter,
tc.fqdnTemplate,
false,
tc.compatibility,
true,
)
require.NoError(t, err)

endpoints, err := client.Endpoints()
if tc.expectError {
require.Error(t, err)
} else {
require.NoError(t, err)
}

// Validate returned endpoints against desired endpoints.
validateEndpoints(t, endpoints, tc.expected)
})
}
}

// TestHeadlessServices tests that headless services generate the correct endpoints.
func TestHeadlessServices(t *testing.T) {
for _, tc := range []struct {
Expand Down

0 comments on commit 2ee4b2e

Please sign in to comment.