From cd61d9217fc102df6f95e6984b7e7ae0df4c3b75 Mon Sep 17 00:00:00 2001 From: Elis Lulja Date: Mon, 6 Jun 2022 16:33:00 +0200 Subject: [PATCH] =?UTF-8?q?=F0=9F=A9=B9=20Fix=20issue=20not=20sending=20de?= =?UTF-8?q?lete=20events=20on=20etcd=20(#46)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit An issue that was preventing the reader from sending `DELETE` service events when an endpoint was deleted is now fixed. Signed-off-by: Elis Lulja --- pkg/cmd/watch/etcd/watcher.go | 43 ++++++++++++++++++++++++++++-- pkg/cmd/watch/etcd/watcher_test.go | 12 +-------- 2 files changed, 42 insertions(+), 13 deletions(-) diff --git a/pkg/cmd/watch/etcd/watcher.go b/pkg/cmd/watch/etcd/watcher.go index b36c4bb..99efcbc 100644 --- a/pkg/cmd/watch/etcd/watcher.go +++ b/pkg/cmd/watch/etcd/watcher.go @@ -19,10 +19,14 @@ package etcd import ( + "bytes" "context" + "errors" "fmt" + "github.com/CloudNativeSDWAN/cnwan-operator/pkg/servregistry" opsr "github.com/CloudNativeSDWAN/cnwan-operator/pkg/servregistry" + "github.com/CloudNativeSDWAN/cnwan-operator/pkg/servregistry/etcd" opetcd "github.com/CloudNativeSDWAN/cnwan-operator/pkg/servregistry/etcd" "github.com/CloudNativeSDWAN/cnwan-reader/pkg/openapi" "github.com/CloudNativeSDWAN/cnwan-reader/pkg/queue" @@ -102,8 +106,17 @@ func (e *etcdWatcher) parseEndpointAndCreateEvent(kvpair *mvccpb.KeyValue, event srv, err := e.servreg.GetServ(endp.NsName, endp.ServName) if err != nil { - l.Err(err).Msg("error while trying to get parent service: skipping endpoint...") - return nil, err + if errors.Is(err, servregistry.ErrNotFound) { + fmt.Println("getting before the delete") + // If not found, then we'll try to get the previous version + srv, err = e.getServiceBeforeDelete(etcd.KeyFromNames(endp.NsName, endp.ServName).String()) + } + + // Still an error? + if err != nil { + l.Err(err).Msg("error while trying to get parent service: skipping endpoint...") + return nil, err + } } if !mapContainsKeys(srv.Metadata, e.options.targetKeys) { @@ -115,6 +128,32 @@ func (e *etcdWatcher) parseEndpointAndCreateEvent(kvpair *mvccpb.KeyValue, event return event, nil } +func (e *etcdWatcher) getServiceBeforeDelete(name string) (*opsr.Service, error) { + // First, we need to get the revision (WithPrevKV does not work here) + resp, err := e.kv.Get(context.Background(), name, clientv3.WithCountOnly()) + if err != nil { + return nil, fmt.Errorf("error while getting key last revision: %w", err) + } + + // Now we get the object with the previous revision + resp, err = e.kv.Get(context.Background(), name, clientv3.WithRev(resp.Header.Revision-1)) + if err != nil { + return nil, fmt.Errorf("error while getting service with previous revision: %w", err) + } + + if resp.Count == 0 { + return nil, servregistry.ErrNotFound + } + + var serv opsr.Service + if err := yaml.NewDecoder(bytes.NewReader(resp.Kvs[0].Value)). + Decode(&serv); err != nil { + return nil, fmt.Errorf("could not unmarshal service: %w", err) + } + + return &serv, nil +} + func (e *etcdWatcher) parseEndpointChange(now, prev *mvccpb.KeyValue) (*openapi.Event, error) { l := log.With().Str("key", string(now.Key)).Str("event", "update").Logger() var parsedPrev *opsr.Endpoint diff --git a/pkg/cmd/watch/etcd/watcher_test.go b/pkg/cmd/watch/etcd/watcher_test.go index dd33574..f842c32 100644 --- a/pkg/cmd/watch/etcd/watcher_test.go +++ b/pkg/cmd/watch/etcd/watcher_test.go @@ -1,4 +1,4 @@ -// Copyright © 2021 Cisco +// Copyright © 2021, 2022 Cisco // // SPDX-License-Identifier: Apache-2.0 // @@ -210,16 +210,6 @@ func TestParseEndpointAndCreateEvent(t *testing.T) { }, expErr: opsr.ErrNsNameNotProvided, }, - { - kv: &mvccpb.KeyValue{ - Key: []byte(okEndpKey.String()), - Value: okEndpVal, - }, - getServ: func(nsName, servName string) (*opsr.Service, error) { - return nil, opsr.ErrNotFound - }, - expErr: opsr.ErrNotFound, - }, { kv: &mvccpb.KeyValue{ Key: []byte(okEndpKey.String()),