From 4415f6ebda1da87d5af1e22c5dbcca908ad38fc1 Mon Sep 17 00:00:00 2001 From: Manik2708 Date: Fri, 31 Jan 2025 04:00:21 +0530 Subject: [PATCH] [bug][storage] Enhanced the idempotency of ES-Rollover Signed-off-by: Manik2708 --- cmd/es-rollover/app/init/action.go | 30 ++------- cmd/es-rollover/app/init/action_test.go | 67 ++++++++----------- pkg/es/client/client.go | 11 +++ pkg/es/client/index_client.go | 15 +++++ pkg/es/client/index_client_test.go | 44 ++++++++++++ pkg/es/client/interfaces.go | 1 + pkg/es/client/mocks/IndexAPI.go | 28 ++++++++ .../integration/es_index_rollover_test.go | 16 +++++ 8 files changed, 151 insertions(+), 61 deletions(-) diff --git a/cmd/es-rollover/app/init/action.go b/cmd/es-rollover/app/init/action.go index 4651aba3ece..9b29d1bcb40 100644 --- a/cmd/es-rollover/app/init/action.go +++ b/cmd/es-rollover/app/init/action.go @@ -4,11 +4,8 @@ package init import ( - "encoding/json" "errors" "fmt" - "net/http" - "strings" "github.com/jaegertracing/jaeger/cmd/es-rollover/app" "github.com/jaegertracing/jaeger/pkg/es" @@ -69,29 +66,16 @@ func (c Action) Do() error { } func createIndexIfNotExist(c client.IndexAPI, index string) error { - err := c.CreateIndex(index) + exists, err := c.IndexExists(index) if err != nil { - var esErr client.ResponseError - if errors.As(err, &esErr) { - if esErr.StatusCode != http.StatusBadRequest || esErr.Body == nil { - return esErr.Err - } - // check for the reason of the error - jsonError := map[string]any{} - err := json.Unmarshal(esErr.Body, &jsonError) - if err != nil { - // return unmarshal error - return err - } - errorMap := jsonError["error"].(map[string]any) - // check for reason, ignore already exist error - if strings.Contains(errorMap["type"].(string), "resource_already_exists_exception") { - return nil - } - } - // Return any other error unrelated to the response return err } + if !exists { + err := c.CreateIndex(index) + if err != nil { + return err + } + } return nil } diff --git a/cmd/es-rollover/app/init/action_test.go b/cmd/es-rollover/app/init/action_test.go index 0f87f579ea2..342847141a0 100644 --- a/cmd/es-rollover/app/init/action_test.go +++ b/cmd/es-rollover/app/init/action_test.go @@ -5,7 +5,6 @@ package init import ( "errors" - "net/http" "testing" "github.com/stretchr/testify/assert" @@ -18,59 +17,47 @@ import ( ) func TestIndexCreateIfNotExist(t *testing.T) { - const esErrResponse = `{"error":{"root_cause":[{"type":"resource_already_exists_exception","reason":"]"}],"type":"resource_already_exists_exception","reason":"request [/jaeger-*] contains unrecognized parameter: [help]"},"status":400}` - tests := []struct { - name string - returnErr error - expectedErr error - containsError string + name string + exists bool + indexExistsReturnError error + indexExistsExpectedError error + createIndexReturnErr error + createIndexExpectedError error }{ { - name: "success", + name: "success", + exists: false, }, { - name: "generic error", - returnErr: errors.New("may be an http error?"), - expectedErr: errors.New("may be an http error?"), + name: "generic error from index exists", + exists: false, + indexExistsReturnError: errors.New("may be an http error from index exists"), + indexExistsExpectedError: errors.New("may be an http error from index exists"), }, { - name: "response error", - returnErr: client.ResponseError{ - Err: errors.New("x"), - StatusCode: http.StatusForbidden, - }, - expectedErr: errors.New("x"), + name: "generic error from create index", + exists: false, + createIndexReturnErr: errors.New("may be an http error from create index"), + createIndexExpectedError: errors.New("may be an http error from create index"), }, { - name: "unmarshal error", - returnErr: client.ResponseError{ - Err: errors.New("x"), - StatusCode: http.StatusBadRequest, - Body: []byte("blablabla"), - }, - containsError: "invalid character", - }, - { - name: "existing error", - returnErr: client.ResponseError{ - Err: errors.New("x"), - StatusCode: http.StatusBadRequest, - Body: []byte(esErrResponse), - }, - expectedErr: nil, + name: "index already exists", + exists: true, }, } for _, test := range tests { t.Run(test.name, func(t *testing.T) { indexClient := &mocks.IndexAPI{} - indexClient.On("CreateIndex", "jaeger-span").Return(test.returnErr) + indexClient.On("IndexExists", "jaeger-span").Return(test.exists, test.indexExistsReturnError) + indexClient.On("CreateIndex", "jaeger-span").Return(test.createIndexReturnErr) err := createIndexIfNotExist(indexClient, "jaeger-span") - if test.containsError != "" { - assert.ErrorContains(t, err, test.containsError) - } else { - assert.Equal(t, test.expectedErr, err) + if test.indexExistsExpectedError != nil { + assert.Equal(t, test.indexExistsExpectedError, err) + } + if test.createIndexExpectedError != nil { + assert.Equal(t, test.createIndexExpectedError, err) } }) } @@ -157,6 +144,7 @@ func TestRolloverAction(t *testing.T) { name: "fail to get jaeger indices", setupCallExpectations: func(indexClient *mocks.IndexAPI, clusterClient *mocks.ClusterAPI, _ *mocks.IndexManagementLifecycleAPI) { clusterClient.On("Version").Return(uint(7), nil) + indexClient.On("IndexExists", "jaeger-span-archive-000001").Return(false, nil) indexClient.On("CreateTemplate", mock.Anything, "jaeger-span").Return(nil) indexClient.On("CreateIndex", "jaeger-span-archive-000001").Return(nil) indexClient.On("GetJaegerIndices", "").Return([]client.Index{}, errors.New("error getting jaeger indices")) @@ -173,6 +161,7 @@ func TestRolloverAction(t *testing.T) { name: "fail to create alias", setupCallExpectations: func(indexClient *mocks.IndexAPI, clusterClient *mocks.ClusterAPI, _ *mocks.IndexManagementLifecycleAPI) { clusterClient.On("Version").Return(uint(7), nil) + indexClient.On("IndexExists", "jaeger-span-archive-000001").Return(false, nil) indexClient.On("CreateTemplate", mock.Anything, "jaeger-span").Return(nil) indexClient.On("CreateIndex", "jaeger-span-archive-000001").Return(nil) indexClient.On("GetJaegerIndices", "").Return([]client.Index{}, nil) @@ -193,6 +182,7 @@ func TestRolloverAction(t *testing.T) { name: "create rollover index", setupCallExpectations: func(indexClient *mocks.IndexAPI, clusterClient *mocks.ClusterAPI, _ *mocks.IndexManagementLifecycleAPI) { clusterClient.On("Version").Return(uint(7), nil) + indexClient.On("IndexExists", "jaeger-span-archive-000001").Return(false, nil) indexClient.On("CreateTemplate", mock.Anything, "jaeger-span").Return(nil) indexClient.On("CreateIndex", "jaeger-span-archive-000001").Return(nil) indexClient.On("GetJaegerIndices", "").Return([]client.Index{}, nil) @@ -213,6 +203,7 @@ func TestRolloverAction(t *testing.T) { name: "create rollover index with ilm", setupCallExpectations: func(indexClient *mocks.IndexAPI, clusterClient *mocks.ClusterAPI, ilmClient *mocks.IndexManagementLifecycleAPI) { clusterClient.On("Version").Return(uint(7), nil) + indexClient.On("IndexExists", "jaeger-span-archive-000001").Return(false, nil) indexClient.On("CreateTemplate", mock.Anything, "jaeger-span").Return(nil) indexClient.On("CreateIndex", "jaeger-span-archive-000001").Return(nil) indexClient.On("GetJaegerIndices", "").Return([]client.Index{}, nil) diff --git a/pkg/es/client/client.go b/pkg/es/client/client.go index 3db076b9113..26b5b15bf25 100644 --- a/pkg/es/client/client.go +++ b/pkg/es/client/client.go @@ -10,6 +10,14 @@ import ( "net/http" ) +type notFoundError struct { + err error +} + +func (e notFoundError) Error() string { + return e.err.Error() +} + // ResponseError holds information about a request error type ResponseError struct { // Error returned by the http client @@ -79,6 +87,9 @@ func (c *Client) request(esRequest elasticRequest) ([]byte, error) { } defer res.Body.Close() + if res.StatusCode == http.StatusNotFound { + return []byte{}, notFoundError{err: fmt.Errorf("%s doesn't exists", esRequest.endpoint)} + } if res.StatusCode != http.StatusOK { return []byte{}, c.handleFailedRequest(res) } diff --git a/pkg/es/client/index_client.go b/pkg/es/client/index_client.go index 522bb45b5a4..58bd90d8926 100644 --- a/pkg/es/client/index_client.go +++ b/pkg/es/client/index_client.go @@ -186,6 +186,21 @@ func (i *IndicesClient) DeleteAlias(aliases []Alias) error { return nil } +// IndexExists check whether an index exists or not +func (i *IndicesClient) IndexExists(index string) (bool, error) { + _, err := i.request(elasticRequest{ + endpoint: index, + method: http.MethodHead, + }) + if err != nil { + if errors.As(err, ¬FoundError{}) { + return false, nil + } + return false, fmt.Errorf("failed to check if index exists: %w", err) + } + return true, nil +} + func (*IndicesClient) aliasesString(aliases []Alias) string { concatAliases := "" for _, alias := range aliases { diff --git a/pkg/es/client/index_client_test.go b/pkg/es/client/index_client_test.go index cefa3c1e7cc..20a10ac7c87 100644 --- a/pkg/es/client/index_client_test.go +++ b/pkg/es/client/index_client_test.go @@ -280,6 +280,50 @@ func TestClientDeleteIndices(t *testing.T) { } } +func TestClientIndexExists(t *testing.T) { + maxURLPathLength := 4000 + tests := []struct { + name string + exists bool + responseCode int + }{ + { + name: "exists", + responseCode: http.StatusOK, + exists: true, + }, + { + name: "not exists", + responseCode: http.StatusNotFound, + exists: false, + }, + } + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + apiTriggered := false + testServer := httptest.NewServer(http.HandlerFunc(func(res http.ResponseWriter, req *http.Request) { + apiTriggered = true + assert.Equal(t, http.MethodHead, req.Method) + assert.Equal(t, "Basic foobar", req.Header.Get("Authorization")) + assert.LessOrEqual(t, len(req.URL.Path), maxURLPathLength) + res.WriteHeader(test.responseCode) + })) + defer testServer.Close() + c := &IndicesClient{ + Client: Client{ + Client: testServer.Client(), + Endpoint: testServer.URL, + BasicAuth: "foobar", + }, + } + exists, err := c.IndexExists("jaeger-span") + require.NoError(t, err) + assert.True(t, apiTriggered) + assert.Equal(t, test.exists, exists) + }) + } +} + func TestClientRequestError(t *testing.T) { c := &IndicesClient{ Client: Client{ diff --git a/pkg/es/client/interfaces.go b/pkg/es/client/interfaces.go index 8ede983a38c..20e8e6f6a6f 100644 --- a/pkg/es/client/interfaces.go +++ b/pkg/es/client/interfaces.go @@ -5,6 +5,7 @@ package client type IndexAPI interface { GetJaegerIndices(prefix string) ([]Index, error) + IndexExists(index string) (bool, error) DeleteIndices(indices []Index) error CreateIndex(index string) error CreateAlias(aliases []Alias) error diff --git a/pkg/es/client/mocks/IndexAPI.go b/pkg/es/client/mocks/IndexAPI.go index d1f6d135a67..2d4493b34fe 100644 --- a/pkg/es/client/mocks/IndexAPI.go +++ b/pkg/es/client/mocks/IndexAPI.go @@ -137,6 +137,34 @@ func (_m *IndexAPI) GetJaegerIndices(prefix string) ([]client.Index, error) { return r0, r1 } +// IndexExists provides a mock function with given fields: index +func (_m *IndexAPI) IndexExists(index string) (bool, error) { + ret := _m.Called(index) + + if len(ret) == 0 { + panic("no return value specified for IndexExists") + } + + var r0 bool + var r1 error + if rf, ok := ret.Get(0).(func(string) (bool, error)); ok { + return rf(index) + } + if rf, ok := ret.Get(0).(func(string) bool); ok { + r0 = rf(index) + } else { + r0 = ret.Get(0).(bool) + } + + if rf, ok := ret.Get(1).(func(string) error); ok { + r1 = rf(index) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + // Rollover provides a mock function with given fields: rolloverTarget, conditions func (_m *IndexAPI) Rollover(rolloverTarget string, conditions map[string]any) error { ret := _m.Called(rolloverTarget, conditions) diff --git a/plugin/storage/integration/es_index_rollover_test.go b/plugin/storage/integration/es_index_rollover_test.go index aa7a6694e4a..679d29afac6 100644 --- a/plugin/storage/integration/es_index_rollover_test.go +++ b/plugin/storage/integration/es_index_rollover_test.go @@ -38,6 +38,22 @@ func TestIndexRollover_FailIfILMNotPresent(t *testing.T) { assert.Empty(t, indices) } +func TestIndexRollover_Idempotency(t *testing.T) { + SkipUnlessEnv(t, "elasticsearch", "opensearch") + t.Cleanup(func() { + testutils.VerifyGoLeaksOnceForES(t) + }) + client, err := createESClient(t, getESHttpClient(t)) + require.NoError(t, err) + // Make sure that es is clean before the test! + cleanES(t, client, defaultILMPolicyName) + err = runEsRollover("init", []string{}, false) + require.NoError(t, err) + err = runEsRollover("init", []string{}, false) + require.NoError(t, err) + cleanES(t, client, defaultILMPolicyName) +} + func TestIndexRollover_CreateIndicesWithILM(t *testing.T) { SkipUnlessEnv(t, "elasticsearch", "opensearch") t.Cleanup(func() {