Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: added support for configcenter listening and logger hot-reloading #647

Open
wants to merge 6 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions configcenter/configclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,15 @@

package configcenter

import "github.com/apache/dubbo-go-pixiu/pkg/model"

type (
ConfigClient interface {
LoadConfig(properties map[string]interface{}) (string, error)

ListenConfig(properties map[string]interface{}) (err error)
}

ListenConfig func(data string)
// ViewConfig returns the current remote configuration.
ViewConfig() *model.Bootstrap
}
)
28 changes: 21 additions & 7 deletions configcenter/load.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (
)

import (
"github.com/ghodss/yaml"
"gopkg.in/yaml.v3"
)

import (
Expand All @@ -42,6 +42,7 @@ var Parsers = map[string]func(data []byte, v interface{}) error{
type (
Load interface {
LoadConfigs(boot *model.Bootstrap, opts ...Option) (v *model.Bootstrap, err error)
ViewRemoteConfig() *model.Bootstrap
}

Option func(opt *Options)
Expand All @@ -61,12 +62,14 @@ type DefaultConfigLoad struct {
}

func NewConfigLoad(bootConfig *model.Bootstrap) *DefaultConfigLoad {

var configClient ConfigClient

var err error
// config center load
if strings.EqualFold(bootConfig.Config.Type, KEY_CONFIG_TYPE_NACOS) {
configClient, _ = NewNacosConfig(bootConfig)
configClient, err = NewNacosConfig(bootConfig)
if err != nil {
logger.Errorf("Get new nacos config failed,err: %v", err)
}
}

if configClient == nil {
Expand All @@ -81,7 +84,6 @@ func NewConfigLoad(bootConfig *model.Bootstrap) *DefaultConfigLoad {
}

func (d *DefaultConfigLoad) LoadConfigs(boot *model.Bootstrap, opts ...Option) (v *model.Bootstrap, err error) {

var opt Options
for _, o := range opts {
o(&opt)
Expand All @@ -104,7 +106,6 @@ func (d *DefaultConfigLoad) LoadConfigs(boot *model.Bootstrap, opts ...Option) (
}

data, err := d.configClient.LoadConfig(m)

if err != nil {
return nil, err
}
Expand All @@ -114,11 +115,24 @@ func (d *DefaultConfigLoad) LoadConfigs(boot *model.Bootstrap, opts ...Option) (
return boot, err
}

err = Parsers[".yml"]([]byte(data), boot)
if err = Parsers[".yml"]([]byte(data), boot); err != nil {
logger.Errorf("failed to parse the configuration loaded from the remote,err: %v", err)
return boot, err
}

if err = d.configClient.ListenConfig(m); err != nil {
logger.Errorf("failed to listen the remote configcenter config,err: %v", err)
return boot, err
}

return boot, err
}

// ViewRemoteConfig returns the current remote configuration.
func (d *DefaultConfigLoad) ViewRemoteConfig() *model.Bootstrap {
return d.configClient.ViewConfig()
}

func ParseYamlBytes(content []byte, v interface{}) error {
return yaml.Unmarshal(content, v)
}
7 changes: 3 additions & 4 deletions configcenter/load_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import (
)

func getBootstrap() *model.Bootstrap {

return &model.Bootstrap{
Config: &model.ConfigCenter{
Type: "nacos",
Expand Down Expand Up @@ -69,7 +68,7 @@ func TestDefaultConfigLoad_LoadConfigs(t *testing.T) {
boot *model.Bootstrap
opts []Option
}
var tests = []struct {
tests := []struct {
name string
fields fields
args args
Expand Down Expand Up @@ -139,8 +138,8 @@ func TestDefaultConfigLoad_LoadConfigs(t *testing.T) {
t.Errorf("LoadConfigs() error = %v, wantErr %v", err, tt.wantErr)
return
}
//assert.True(t, gotV.Nacos.DataId == DataId, "load config by nacos config center error!")
//assert.True(t, len(gotV.StaticResources.Listeners) > 0, "load config by nacos config center error!")
// assert.True(t, gotV.Nacos.DataId == DataId, "load config by nacos config center error!")
// assert.True(t, len(gotV.StaticResources.Listeners) > 0, "load config by nacos config center error!")
conf, _ := json.Marshal(gotV)
logger.Infof("config of Bootstrap load by nacos : %v", string(conf))
})
Expand Down
121 changes: 70 additions & 51 deletions configcenter/nacos_load.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,19 @@
package configcenter

import (
"sync"

"github.com/nacos-group/nacos-sdk-go/clients"
"github.com/nacos-group/nacos-sdk-go/clients/config_client"
"github.com/nacos-group/nacos-sdk-go/common/constant"
"github.com/nacos-group/nacos-sdk-go/vo"
"github.com/pkg/errors"
)

import (
"github.com/apache/dubbo-go-pixiu/pkg/logger"
"github.com/apache/dubbo-go-pixiu/pkg/model"
)

// Constants for configuration keys.
const (
KeyDataId = "dataId"
KeyGroup = "group"
Expand All @@ -39,6 +40,7 @@ const (
KeyTenant = "tenant"
)

// Constants for Nacos configuration.
const (
DataId = "pixiu.yaml"
Group = "DEFAULT_GROUP"
Expand All @@ -50,29 +52,41 @@ const (
Scheme = "http"
)

type (
NacosConfig struct {
client config_client.IConfigClient
// NacosConfig represents the Nacos configuration client and its state.
type NacosConfig struct {
client config_client.IConfigClient
remoteConfig *model.Bootstrap
mu sync.Mutex
}

// todo not support now
listenConfigCallback ListenConfig
// NewNacosConfig creates a new NacosConfig instance.
// It returns an error if no Nacos server is configured or if the client cannot be created.
func NewNacosConfig(boot *model.Bootstrap) (ConfigClient, error) {
if len(boot.Nacos.ServerConfigs) == 0 {
return nil, errors.New("no Nacos server configured")
}
)

func NewNacosConfig(boot *model.Bootstrap) (configClient ConfigClient, err error) {

var sc []constant.ServerConfig
if len(boot.Nacos.ServerConfigs) == 0 {
return nil, errors.New("no nacos server be setted")
nacosClient, err := getNacosConfigClient(boot)
if err != nil {
return nil, err
}
for _, serveConfig := range boot.Nacos.ServerConfigs {
sc = append(sc, constant.ServerConfig{
Port: serveConfig.Port,
IpAddr: serveConfig.IpAddr,

return &NacosConfig{
client: nacosClient,
}, nil
}

// getNacosConfigClient initializes and returns a Nacos config client.
func getNacosConfigClient(boot *model.Bootstrap) (config_client.IConfigClient, error) {
var serverConfigs []constant.ServerConfig
for _, serverConfig := range boot.Nacos.ServerConfigs {
serverConfigs = append(serverConfigs, constant.ServerConfig{
Port: serverConfig.Port,
IpAddr: serverConfig.IpAddr,
})
}

cc := constant.ClientConfig{
clientConfig := constant.ClientConfig{
NamespaceId: boot.Nacos.ClientConfig.NamespaceId,
TimeoutMs: boot.Nacos.ClientConfig.TimeoutMs,
NotLoadCacheAtStart: boot.Nacos.ClientConfig.NotLoadCacheAtStart,
Expand All @@ -81,56 +95,61 @@ func NewNacosConfig(boot *model.Bootstrap) (configClient ConfigClient, err error
LogLevel: boot.Nacos.ClientConfig.LogLevel,
}

pa := vo.NacosClientParam{
ClientConfig: &cc,
ServerConfigs: sc,
}
nacos, err := clients.NewConfigClient(pa)
if err != nil {
return nil, err
}
configClient = &NacosConfig{
client: nacos,
clientParam := vo.NacosClientParam{
ClientConfig: &clientConfig,
ServerConfigs: serverConfigs,
}

return configClient, nil
return clients.NewConfigClient(clientParam)
}

// LoadConfig retrieves the configuration from Nacos based on the provided parameters.
func (n *NacosConfig) LoadConfig(param map[string]interface{}) (string, error) {
return n.client.GetConfig(vo.ConfigParam{
DataId: getOrDefault(param[KeyDataId].(string), DataId),
Group: getOrDefault(param[KeyGroup].(string), Group),
})
}

func getOrDefault(target string, quiet string) string {
// getOrDefault returns the target value if it is not empty; otherwise, it returns the fallback value.
func getOrDefault(target, fallback string) string {
if len(target) == 0 {
target = quiet
return fallback
}
return target
}

func (n *NacosConfig) ListenConfig(param map[string]interface{}) (err error) {
// todo noop, not support
if true {
return nil
}
listen := n.listen(getOrDefault(param[KeyDataId].(string), DataId), getOrDefault(param[KeyGroup].(string), Group))
return listen()
// ListenConfig listens for configuration changes in Nacos.
func (n *NacosConfig) ListenConfig(param map[string]interface{}) error {
return n.client.ListenConfig(vo.ConfigParam{
DataId: getOrDefault(param[KeyDataId].(string), DataId),
Group: getOrDefault(param[KeyGroup].(string), Group),
OnChange: n.onChange,
})
}

func (n *NacosConfig) listen(dataId, group string) func() error {
return func() error {
return n.client.ListenConfig(vo.ConfigParam{
DataId: dataId,
Group: group,
OnChange: func(namespace, group, dataId, data string) {
if len(data) == 0 {
logger.Errorf("nacos listen callback data nil error , namespace : %s,group : %s , dataId : %s , data : %s")
return
}
n.listenConfigCallback(data)
},
})
// onChange is the callback function triggered when the configuration changes in Nacos.
func (n *NacosConfig) onChange(namespace, group, dataId, data string) {
n.mu.Lock()
defer n.mu.Unlock()

if len(data) == 0 {
mutezebra marked this conversation as resolved.
Show resolved Hide resolved
logger.Errorf("Nacos listen callback data is nil. Namespace: %s, Group: %s, DataId: %s", namespace, group, dataId)
return
}

boot := new(model.Bootstrap)
mutezebra marked this conversation as resolved.
Show resolved Hide resolved
if err := Parsers[".yml"]([]byte(data), boot); err != nil {
logger.Errorf("Failed to parse the configuration loaded from the remote. Error: %v", err)
return
}

n.remoteConfig = boot
}

// ViewConfig returns the current remote configuration.
func (n *NacosConfig) ViewConfig() *model.Bootstrap {
n.mu.Lock()
defer n.mu.Unlock()
return n.remoteConfig
}
Loading
Loading