Skip to content

Commit

Permalink
Merge pull request #36 from SunSince90/shift-to-aws-tags
Browse files Browse the repository at this point in the history
Read AWS Tags
  • Loading branch information
ljakab authored Nov 23, 2021
2 parents 30dbb8b + 4d4d221 commit e40f298
Show file tree
Hide file tree
Showing 3 changed files with 127 additions and 5 deletions.
8 changes: 7 additions & 1 deletion docs/usage.md
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ For more information about AWS credentials, you may take a look at aws' [documen

CN-WAN Reader can connect to your *etcd* nodes and watch the values that have been registered there, i.e. with `cnwan-reader watch etcd [FLAGS]` .

In order to work, you will need to provide the addresses of you etcd nodes with the `--endpoints` flag, optional `username` and `password` and a `prefix` in case your service registry on etcd contains one.
In order to work, you will need to provide the addresses of your etcd nodes with the `--endpoints` flag, optional `username` and `password` and a `prefix` in case your service registry on etcd contains one.
There is no need to insert all the nodes of your etcd cluster in `--endpoints` but just make sure you enter a few. As per `username` and `password` you can leave them empty if you don't need them to connect to etcd. Finally, `prefix` defaults to `/` in case you don't enter another value.

As a final note, make sure your etcd user has a role that enables it to at least *read* values in the provided prefix.
Expand Down Expand Up @@ -202,6 +202,12 @@ or just:
cnwan-reader --conf /path/to/configuration/file.yaml
```

#### Using tags

The default behavior of the CN-WAN Reader is to parse *attributes* on service instances. This behavior can be changed with `--with-tags`, which will tell the CN-WAN Reader to look for [tags](https://docs.aws.amazon.com/general/latest/gr/aws_tagging.html) on services instead of attributes on instances.

This won't change how data is sent to the adaptor but only how it is searched and parsed on Cloud Map: if you store your metadata as attributes you may continue to use the *cloudmap* command as always; but if you register relevant metadata as *tags* -- i.e. if you register services with the CN-WAN Operator, then we recommend you to use `--with-tags`.

### With etcd

In the following example, the CN-WAN Reader watches changes in etcd with the following requirements:
Expand Down
90 changes: 90 additions & 0 deletions pkg/cmd/poll/cloudmap/cloudmap.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,13 @@ package cloudmap
import (
"context"
"fmt"
"path"
"strconv"
"sync"
"time"

"github.com/CloudNativeSDWAN/cnwan-reader/pkg/openapi"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/servicediscovery"
"github.com/aws/aws-sdk-go/service/servicediscovery/servicediscoveryiface"
)
Expand All @@ -41,6 +43,94 @@ type awsCloudMap struct {
sd servicediscoveryiface.ServiceDiscoveryAPI
}

func (a *awsCloudMap) getServiceTags(ctx context.Context) (map[string]*openapi.Service, error) {
out, err := a.sd.ListServicesWithContext(ctx, &servicediscovery.ListServicesInput{})
if err != nil {
return nil, err
}

keysMap := map[string]bool{}
for _, key := range a.opts.keys {
keysMap[key] = true
}

servTags := map[string]*openapi.Service{}
for _, srv := range out.Services {
l := log.With().Str("service-name", aws.StringValue(srv.Name)).Logger()

metadata := func() map[string]string {
tagsCtx, tagsCanc := context.WithTimeout(ctx, 30*time.Second)
defer tagsCanc()

out, err := a.sd.ListTagsForResourceWithContext(tagsCtx, &servicediscovery.ListTagsForResourceInput{
ResourceARN: srv.Arn,
})
if err != nil {
l.Warn().Err(err).Msg("could not get tags for service: skipping...")
return map[string]string{}
}

tags := map[string]string{}
for _, tag := range out.Tags {
if _, exists := keysMap[aws.StringValue(tag.Key)]; exists {
tags[aws.StringValue(tag.Key)] = aws.StringValue(tag.Value)
}
}
return tags
}()

if len(metadata) == 0 {
continue
}

endps, err := func() ([]*openapi.Service, error) {
instCtx, instCanc := context.WithTimeout(ctx, 30*time.Second)
defer instCanc()

insts, err := a.sd.ListInstancesWithContext(instCtx, &servicediscovery.ListInstancesInput{
ServiceId: srv.Id,
})
if err != nil {
return nil, err
}

srvEp := []*openapi.Service{}
for _, inst := range insts.Instances {
srvEp = append(srvEp, &openapi.Service{
Name: aws.StringValue(inst.Id),
Address: aws.StringValue(inst.Attributes["AWS_INSTANCE_IPV4"]),
Port: func() int32 {
val, _ := strconv.ParseInt(aws.StringValue(inst.Attributes["AWS_INSTANCE_PORT"]), 10, 32)
return int32(val)
}(),
})
}
return srvEp, nil
}()
if err != nil {
l.Err(err).Msg("error while getting instances for service: skipping...")
continue
}

for _, endp := range endps {
name := path.Join(aws.StringValue(srv.Name), endp.Name)
servTags[name] = &openapi.Service{
Name: name,
Address: endp.Address,
Port: endp.Port,
Metadata: func() (met []openapi.Metadata) {
for k, v := range metadata {
met = append(met, openapi.Metadata{Key: k, Value: v})
}
return
}(),
}
}
}

return servTags, nil
}

func (a *awsCloudMap) getCurrentState(ctx context.Context) (map[string]*openapi.Service, error) {
srvCtx, srvCanc := context.WithTimeout(ctx, defaultTimeout)
srvIDs, err := a.getServicesIDs(srvCtx)
Expand Down
34 changes: 30 additions & 4 deletions pkg/cmd/poll/cloudmap/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"os/signal"

"github.com/CloudNativeSDWAN/cnwan-reader/pkg/configuration"
"github.com/CloudNativeSDWAN/cnwan-reader/pkg/openapi"
"github.com/CloudNativeSDWAN/cnwan-reader/pkg/poller"
"github.com/CloudNativeSDWAN/cnwan-reader/pkg/queue"
"github.com/CloudNativeSDWAN/cnwan-reader/pkg/services"
Expand All @@ -48,6 +49,7 @@ func init() {
// other programming pattern, maybe with a factory.
func GetCloudMapCommand() *cobra.Command {
var cm *awsCloudMap
var withTags bool

cmd := &cobra.Command{
Use: cmdUse,
Expand Down Expand Up @@ -82,20 +84,24 @@ func GetCloudMapCommand() *cobra.Command {
}
},
Run: func(cmd *cobra.Command, args []string) {
run(cm)
run(cm, withTags)
},
}

// Flags
cmd.Flags().String("region", "", "region to use")
cmd.Flags().String("credentials-path", "", "the path to the credentials file")
cmd.Flags().StringSlice("metadata-keys", []string{}, "the metadata keys to watch for")
cmd.Flags().BoolVar(&withTags, "with-tags", false, "whether to look for AWS tags rather than attributes")

return cmd
}

func run(cm *awsCloudMap) {
func run(cm *awsCloudMap, withTags bool) {
log.Info().Str("service-registry", "Cloud Map").Str("adaptor", cm.opts.adaptor).Msg("starting...")
if withTags {
log.Info().Msg("switching to tag parsing...")
}

ctx, canc := context.WithCancel(context.Background())

Expand All @@ -108,7 +114,17 @@ func run(cm *awsCloudMap) {

go func() {
log.Info().Msg("getting initial state...")
oaSrvs, err := cm.getCurrentState(ctx)
var (
oaSrvs map[string]*openapi.Service
err error
)

if !withTags {
oaSrvs, err = cm.getCurrentState(ctx)
} else {
oaSrvs, err = cm.getServiceTags(ctx)
}

if err != nil {
log.Fatal().Err(err).Msg("error while getting initial state of cloud map")
return
Expand All @@ -123,7 +139,17 @@ func run(cm *awsCloudMap) {
log.Info().Msg("observing changes...")
poll := poller.New(ctx, cm.opts.interval)
poll.SetPollFunction(func() {
oaSrvs, err := cm.getCurrentState(ctx)
var (
oaSrvs map[string]*openapi.Service
err error
)

if !withTags {
oaSrvs, err = cm.getCurrentState(ctx)
} else {
oaSrvs, err = cm.getServiceTags(ctx)
}

if err != nil {
log.Err(err).Msg("error while polling, skipping...")
return
Expand Down

0 comments on commit e40f298

Please sign in to comment.