Skip to content

Commit

Permalink
CLC Improvements (hazelcast#967)
Browse files Browse the repository at this point in the history
* Fixed compact serializers

* More compact fixes

* Fixed java date serializer

* Fixed JavaDate test

* Added ServerVersion method to the Connecion type

* Make the page non-pointer in the SQL driver

* Introduced persisten non-retryable error and used it with the REST methods.

* cb.NonRetryableError is unwrappable

* Added ListenerBinder to hazelcastinternaltest API

* Added ExperimentalAPIBaseURL configuration item

* updated default Hz version in RC
  • Loading branch information
yuce authored Aug 15, 2023
1 parent 7ef6939 commit e359966
Show file tree
Hide file tree
Showing 20 changed files with 637 additions and 191 deletions.
6 changes: 5 additions & 1 deletion client_internals_test_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
// +build hazelcastinternal,hazelcastinternaltest

/*
* Copyright (c) 2008-2022, Hazelcast, Inc. All Rights Reserved.
* Copyright (c) 2008-2023, Hazelcast, Inc. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License")
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -58,6 +58,10 @@ func (ci *ClientInternal) Proxies() map[string]interface{} {
return ci.client.proxyManager.Proxies()
}

func (ci *ClientInternal) ListenerBinder() *cluster.ConnectionListenerBinder {
return ci.client.proxyManager.serviceBundle.ListenerBinder
}

func (ci *ClientInternal) NewNearCacheManager(reconInterval, maxMiss int) *inearcache.Manager {
return inearcache.NewManager(ci.client.ic, reconInterval, maxMiss)
}
Expand Down
6 changes: 5 additions & 1 deletion cluster/cloud_config.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2008-2021, Hazelcast, Inc. All Rights Reserved.
* Copyright (c) 2008-2023, Hazelcast, Inc. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License")
* you may not use this file except in compliance with the License.
Expand All @@ -22,6 +22,10 @@ type CloudConfig struct {
Token string `json:",omitempty"`
// Enabled enables Hazelcast Cloud integration.
Enabled bool `json:",omitempty"`
// ExperimentalAPIBaseURL sets the Viridian API base URL.
// You generally should leave its value unset.
// Note that this configuration may be modified or removed anytime.
ExperimentalAPIBaseURL string `json:",omitempty"`
}

func (h CloudConfig) Clone() CloudConfig {
Expand Down
2 changes: 1 addition & 1 deletion examples/cloud/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import (

const (
clusterName = "PUT-YOUR-CLUSTER-NAME-HERE!"
token = "PUT-YOUR-VIRIIDAN-HERE!"
token = "PUT-YOUR-VIRIDIAN-TOKEN-HERE!"
caFilePath = "/PATH/ca.pem"
certFilePath = "/PATH/cert.pem"
keyFilePath = "/PATH/key.pem"
Expand Down
6 changes: 4 additions & 2 deletions internal/cb/circuitbreaker.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2008-2021, Hazelcast, Inc. All Rights Reserved.
* Copyright (c) 2008-2023, Hazelcast, Inc. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License")
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -113,7 +113,9 @@ loop:
break loop
}
if errors.As(err, &nonRetryableErr) {
err = nonRetryableErr.Err
if !nonRetryableErr.Persistent {
err = nonRetryableErr.Err
}
break loop
}
if attempt < cb.MaxRetries {
Expand Down
21 changes: 18 additions & 3 deletions internal/cb/errors.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2008-2021, Hazelcast, Inc. All Rights Reserved.
* Copyright (c) 2008-2023, Hazelcast, Inc. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License")
* you may not use this file except in compliance with the License.
Expand All @@ -22,21 +22,36 @@ var ErrCircuitOpen = errors.New("circuit open")
var ErrDeadlineExceeded = errors.New("deadline exceeded")

type NonRetryableError struct {
// Err is the wrapper error
Err error
// Persistent disables unwrapping the error in the CB if true
// This is useful to make the error non-retryable in chain of Trys
Persistent bool
}

func WrapNonRetryableError(err error) *NonRetryableError {
return &NonRetryableError{err}
return &NonRetryableError{Err: err}
}

func WrapNonRetryableErrorPersistent(err error) *NonRetryableError {
return &NonRetryableError{
Err: err,
Persistent: true,
}
}

func (n NonRetryableError) Error() string {
return n.Err.Error()
}

func (e NonRetryableError) Is(target error) bool {
func (n NonRetryableError) Is(target error) bool {
if _, ok := target.(*NonRetryableError); ok {
return true
}
_, ok := target.(NonRetryableError)
return ok
}

func (n NonRetryableError) Unwrap() error {
return n.Err
}
27 changes: 17 additions & 10 deletions internal/cloud/cloud_discovery.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2008-2021, Hazelcast, Inc. All Rights Reserved.
* Copyright (c) 2008-2023, Hazelcast, Inc. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License")
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -34,25 +34,32 @@ type DiscoveryClient struct {
logger logger.LogAdaptor
httpClient *rest.HTTPClient
token string
baseURL string
}

func NewDiscoveryClient(config *cluster.CloudConfig, logger logger.LogAdaptor) *DiscoveryClient {
url := config.ExperimentalAPIBaseURL
if url == "" {
url = defaultBaseAPIURL()
}
url = strings.TrimRight(url, "/")
return &DiscoveryClient{
token: config.Token,
httpClient: rest.NewHTTPClient(),
logger: logger,
baseURL: url,
}
}

func (c *DiscoveryClient) DiscoverNodes(ctx context.Context) ([]Address, error) {
url := makeCoordinatorURL(c.token)
if j, err := c.httpClient.GetJSONArray(ctx, url); err != nil {
url := makeCoordinatorURL(c.baseURL, c.token)
j, err := c.httpClient.GetJSONArray(ctx, url)
if err != nil {
return nil, err
} else {
addrs := extractAddresses(j)
c.logger.Trace(func() string { return fmt.Sprintf("cloud addresses: %v", addrs) })
return addrs, nil
}
addrs := extractAddresses(j)
c.logger.Trace(func() string { return fmt.Sprintf("cloud addresses: %v", addrs) })
return addrs, nil
}

func extractAddresses(j interface{}) []Address {
Expand All @@ -69,11 +76,11 @@ func extractAddresses(j interface{}) []Address {
return r
}

func makeCoordinatorURL(token string) string {
return fmt.Sprintf("%s/cluster/discovery?token=%s", baseURL(), token)
func makeCoordinatorURL(baseURL, token string) string {
return fmt.Sprintf("%s/cluster/discovery?token=%s", baseURL, token)
}

func baseURL() string {
func defaultBaseAPIURL() string {
url := os.Getenv(envCoordinatorBaseURL)
if url == "" {
return "https://api.viridian.hazelcast.com"
Expand Down
11 changes: 3 additions & 8 deletions internal/cloud/cloud_discovery_test.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2008-2021, Hazelcast, Inc. All Rights Reserved.
* Copyright (c) 2008-2023, Hazelcast, Inc. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License")
* you may not use this file except in compliance with the License.
Expand All @@ -19,7 +19,6 @@ package cloud
import (
"encoding/json"
"fmt"
"os"
"strconv"
"testing"

Expand Down Expand Up @@ -96,14 +95,10 @@ func TestTranslateAddrs(t *testing.T) {
}

func TestMakeCoordinatorURL(t *testing.T) {
os.Setenv(envCoordinatorBaseURL, "")
url := makeCoordinatorURL("TOK")
url := makeCoordinatorURL("https://api.viridian.hazelcast.com", "TOK")
target := "https://api.viridian.hazelcast.com/cluster/discovery?token=TOK"
assert.Equal(t, target, url)
if err := os.Setenv(envCoordinatorBaseURL, "http://test.dev"); err != nil {
t.Fatal(err)
}
url = makeCoordinatorURL("TOK")
url = makeCoordinatorURL("http://test.dev", "TOK")
target = "http://test.dev/cluster/discovery?token=TOK"
assert.Equal(t, target, url)
}
4 changes: 4 additions & 0 deletions internal/cluster/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,10 @@ func (c *Connection) SetEndpoint(addr pubcluster.Address) {
c.endpoint.Store(addr)
}

func (c *Connection) ServerVersion() string {
return c.connectedServerVersionStr
}

func (c *Connection) MemberUUID() types.UUID {
v := c.memberUUID.Load()
if v == nil {
Expand Down
33 changes: 21 additions & 12 deletions internal/rest/http_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import (
"context"
"encoding/json"
"errors"
"io/ioutil"
"io"
"net/http"
"net/url"
"time"
Expand Down Expand Up @@ -65,45 +65,54 @@ func (c *HTTPClient) Get(ctx context.Context, uri string, headers ...HTTPHeader)
req.Header.Add(h.Name, h.Value)
}
i, err := c.cb.TryContext(ctx, func(ctx context.Context, attempt int) (interface{}, error) {
if resp, err := c.httpClient.Do(req); err != nil {
resp, err := c.httpClient.Do(req)
if err != nil {
var e *url.Error
if errors.As(err, &e) {
e.URL = ""
}
return nil, err
} else if resp.StatusCode < 300 {
}
if resp.StatusCode < 300 {
return resp, nil
} else if resp.StatusCode >= 500 {
}
if resp.StatusCode >= 500 {
return nil, NewErrorFromResponse(resp)
} else {
return nil, cb.WrapNonRetryableError(NewErrorFromResponse(resp))
}
return nil, cb.WrapNonRetryableErrorPersistent(NewErrorFromResponse(resp))
})
if err != nil {
return nil, err
}
resp := i.(*http.Response)
b, err := ioutil.ReadAll(resp.Body)
b, err := io.ReadAll(resp.Body)
if err != nil {
return nil, err
}
// error is unhandled
resp.Body.Close()
return b, err
return b, nil
}

func (c *HTTPClient) GetJSONObject(ctx context.Context, url string, headers ...HTTPHeader) (map[string]interface{}, error) {
r := map[string]interface{}{}
if b, err := c.Get(ctx, url, headers...); err != nil {
b, err := c.Get(ctx, url, headers...)
if err != nil {
return nil, err
} else if err := json.Unmarshal(b, &r); err != nil {
}
if err := json.Unmarshal(b, &r); err != nil {
return nil, err
}
return r, nil
}

func (c *HTTPClient) GetJSONArray(ctx context.Context, url string, headers ...HTTPHeader) ([]interface{}, error) {
r := []interface{}{}
if b, err := c.Get(ctx, url, headers...); err != nil {
b, err := c.Get(ctx, url, headers...)
if err != nil {
return nil, err
} else if err := json.Unmarshal(b, &r); err != nil {
}
if err := json.Unmarshal(b, &r); err != nil {
return nil, err
}
return r, nil
Expand Down
24 changes: 16 additions & 8 deletions internal/rest/http_error.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2008-2021, Hazelcast, Inc. All Rights Reserved.
* Copyright (c) 2008-2023, Hazelcast, Inc. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License")
* you may not use this file except in compliance with the License.
Expand All @@ -18,25 +18,33 @@ package rest

import (
"fmt"
"io/ioutil"
"io"
"net/http"
)

type Error struct {
Text string
Code int
text string
code int
}

func NewError(code int, text string) *Error {
return &Error{
Code: code,
Text: text,
code: code,
text: text,
}
}

func (e Error) Text() string {
return e.text
}

func (e Error) Code() int {
return e.code
}

func NewErrorFromResponse(resp *http.Response) *Error {
code := resp.StatusCode
body, err := ioutil.ReadAll(resp.Body)
body, err := io.ReadAll(resp.Body)
if err != nil {
return NewError(code, "(cannot read error message)")
}
Expand All @@ -47,5 +55,5 @@ func NewErrorFromResponse(resp *http.Response) *Error {
}

func (e Error) Error() string {
return fmt.Sprintf("HTTP error: %d, %s", e.Code, e.Text)
return fmt.Sprintf("HTTP error: %d, %s", e.code, e.text)
}
Loading

0 comments on commit e359966

Please sign in to comment.