Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: new write protocol implement #207

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions opengemini/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,12 @@ import (
"context"
"crypto/tls"
"log/slog"
"strconv"
"time"

"github.com/prometheus/client_golang/prometheus"

"github.com/openGemini/opengemini-client-go/proto"
)

const (
Expand All @@ -33,6 +36,7 @@ const (
type Codec string

type ContentType string

type CompressMethod string

const (
Expand Down Expand Up @@ -63,6 +67,9 @@ type Client interface {
WriteBatchPoints(ctx context.Context, database string, bp []*Point) error
// WriteBatchPointsWithRp write batch points with retention policy
WriteBatchPointsWithRp(ctx context.Context, database string, rp string, bp []*Point) error
// WriteByGrpc write batch record to assigned database.retention_policy by gRPC.
// You'd better use NewWriteRequestBuilder to build req.
WriteByGrpc(ctx context.Context, req *proto.WriteRequest) error

// CreateDatabase Create database
CreateDatabase(database string) error
Expand Down Expand Up @@ -160,6 +167,8 @@ type Config struct {
CustomMetricsLabels map[string]string
// Logger structured logger for logging operations
Logger *slog.Logger
// GrpcConfig configuration information for write service by gRPC
GrpcConfig *GrpcConfig
}

// Address configuration for providing service.
Expand All @@ -170,6 +179,10 @@ type Address struct {
Port int
}

func (a *Address) String() string {
return a.Host + ":" + strconv.Itoa(a.Port)
}

// AuthType type of identity authentication.
type AuthType int

Expand Down Expand Up @@ -206,6 +219,21 @@ type RpConfig struct {
IndexDuration string
}

// GrpcConfig represents the configuration information for write service by gRPC.
type GrpcConfig struct {
// Addresses Configure the service endpoints for the openGemini grpc write service.
// This parameter is required.
Addresses []Address
// AuthConfig configuration information for authentication.
AuthConfig *AuthConfig
// TlsConfig configuration information for tls.
TlsConfig *tls.Config
// CompressMethod determines the compress method used for data transmission.
CompressMethod CompressMethod
// Timeout default 30s
Timeout time.Duration
}

// NewClient Creates a openGemini client instance
func NewClient(config *Config) (Client, error) {
return newClient(config)
Expand Down
11 changes: 11 additions & 0 deletions opengemini/client_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ type client struct {
prevIdx atomic.Int32
dataChanMap syncx.Map[dbRp, chan *sendBatchWithCB]
metrics *metrics
rpcClient *writerClient

batchContext context.Context
batchContextCancel context.CancelFunc
Expand Down Expand Up @@ -91,6 +92,13 @@ func newClient(c *Config) (Client, error) {
} else {
dbClient.logger = slog.Default()
}
if c.GrpcConfig != nil {
rc, err := newWriterClient(c.GrpcConfig)
if err != nil {
return nil, errors.New("failed to create rpc client: " + err.Error())
}
dbClient.rpcClient = rc
}
dbClient.prevIdx.Store(-1)
if len(c.Addresses) > 1 {
// if there are multiple addresses, start the health check
Expand All @@ -106,6 +114,9 @@ func (c *client) Close() error {
c.dataChanMap.Delete(key)
return true
})
if c.rpcClient != nil {
_ = c.rpcClient.Close()
}
c.cli.CloseIdleConnections()
return nil
}
Expand Down
1 change: 1 addition & 0 deletions opengemini/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ var (
ErrNoAddress = errors.New("must have at least one address")
ErrRetentionPolicy = errors.New("empty retention policy")
ErrUnsupportedFieldValueType = errors.New("unsupported field value type")
ErrEmptyRecord = errors.New("empty record")
)

// checkDatabaseName checks if the database name is empty and returns an error if it is.
Expand Down
3 changes: 2 additions & 1 deletion opengemini/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,9 @@ import (
"net/http"
"time"

compressionPool "github.com/openGemini/opengemini-client-go/lib/pool"
"github.com/vmihailenco/msgpack/v5"

compressionPool "github.com/openGemini/opengemini-client-go/lib/pool"
)

const (
Expand Down
1 change: 1 addition & 0 deletions opengemini/query_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ func TestQueryWithEpoch(t *testing.T) {
assert.Equal(t, length, getTimestampLength(v))
}
}

func TestQueryWithMsgPack(t *testing.T) {
c := testNewClient(t, &Config{
Addresses: []Address{{
Expand Down
256 changes: 256 additions & 0 deletions opengemini/record_builder.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,256 @@
// Copyright 2024 openGemini Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package opengemini

import (
"errors"
"fmt"
"math/rand"
"time"

"github.com/openGemini/opengemini-client-go/lib/record"
"github.com/openGemini/opengemini-client-go/proto"
)

var (
_ WriteRequestBuilder = (*writeRequestBuilderImpl)(nil)
random = rand.New(rand.NewSource(time.Now().UnixNano()))
)

// RecordLine define an abstract record line structure.
type RecordLine any

// RecordBuilder build record line, it is not thread safe
type RecordBuilder interface {
// NewLine start a new line, otherwise the added attributes will be in the default row
NewLine() RecordBuilder
// AddTag add a tag to the record.
// If the key exists, it will be overwritten.
// If the key is `time`, it will cause an error.
// If the key is empty or the value is empty, it will be ignored.
AddTag(key string, value string) RecordBuilder
// AddTags add multiple tags to the record.
// Each entry in the map represents a tag where the key is the tag name and the value is the tag value.
AddTags(tags map[string]string) RecordBuilder
// AddField add a field to the record.
// If the key is empty, it will be ignored.
// If the key is `time`, it will cause an error.
// If the key already exists, its value will be overwritten.
AddField(key string, value interface{}) RecordBuilder
// AddFields add multiple fields to the record.
// Each entry in the map represents a field where the key is the field name and the value is the field value.
AddFields(fields map[string]interface{}) RecordBuilder
// CompressMethod set compress method for request data.
CompressMethod(method CompressMethod) RecordBuilder
// Build specifies the time of the record.
// If the time is not specified or zero value, the current time will be used.
Build(timestamp int64) RecordLine
}

type WriteRequestBuilder interface {
// Authenticate configuration write request information for authentication.
Authenticate(username, password string) WriteRequestBuilder
// AddRecord append Record for WriteRequest, you'd better use NewRecordBuilder to build RecordLine.
AddRecord(rlb ...RecordLine) WriteRequestBuilder
// Build generate WriteRequest.
Build() (*proto.WriteRequest, error)
}

type fieldTuple struct {
record.Field
value interface{}
}

type writeRequestBuilderImpl struct {
database string
retentionPolicy string
username string
password string
transform transform
err error
}

func (r *writeRequestBuilderImpl) reset() {
r.transform.reset()
}

func (r *writeRequestBuilderImpl) Authenticate(username, password string) WriteRequestBuilder {
r.username = username
r.password = password
return r
}

func NewWriteRequestBuilder(database, retentionPolicy string) (WriteRequestBuilder, error) {
if err := checkDatabaseName(database); err != nil {
return nil, err
}
return &writeRequestBuilderImpl{database: database, retentionPolicy: retentionPolicy, transform: make(transform)}, nil
}

func (r *writeRequestBuilderImpl) AddRecord(rlb ...RecordLine) WriteRequestBuilder {
for _, lineBuilder := range rlb {
lb, ok := lineBuilder.(*recordLineBuilderImpl)
if !ok {
continue
}
if lb.err != nil {
r.err = errors.Join(r.err, lb.err)
continue
}
err := r.transform.AppendRecord(lb)
if err != nil {
r.err = errors.Join(r.err, err)
continue
}
}
return r
}

func (r *writeRequestBuilderImpl) Build() (*proto.WriteRequest, error) {
defer r.reset()

if r.err != nil {
return nil, r.err
}

if r.database == "" {
return nil, ErrEmptyDatabaseName
}

if r.retentionPolicy == "" {
r.retentionPolicy = "autogen"
}

var req = &proto.WriteRequest{
Database: r.database,
RetentionPolicy: r.retentionPolicy,
Username: r.username,
Password: r.password,
}

for mst, rawRecord := range r.transform {
rec, err := rawRecord.toSrvRecords()
if err != nil {
return nil, fmt.Errorf("failed to convert records: %v", err)
}
var buff []byte
buff, err = rec.Marshal(buff)
if err != nil {
return nil, fmt.Errorf("failed to marshal record: %v", err)
}

req.Records = append(req.Records, &proto.Record{
Measurement: mst,
MinTime: rawRecord.MinTime,
MaxTime: rawRecord.MaxTime,
Block: buff,
})
}

return req, nil
}

type recordLineBuilderImpl struct {
measurement string
tags []*fieldTuple
fields []*fieldTuple
timestamp int64
compressMethod CompressMethod
err error
}

func (r *recordLineBuilderImpl) NewLine() RecordBuilder {
return &recordLineBuilderImpl{measurement: r.measurement}
}

func NewRecordBuilder(measurement string) (RecordBuilder, error) {
if err := checkMeasurementName(measurement); err != nil {
return nil, err
}
return &recordLineBuilderImpl{measurement: measurement}, nil
}

func (r *recordLineBuilderImpl) CompressMethod(method CompressMethod) RecordBuilder {
r.compressMethod = method
return r
}

func (r *recordLineBuilderImpl) AddTag(key string, value string) RecordBuilder {
if key == "" {
r.err = errors.Join(r.err, fmt.Errorf("miss tag name: %w", ErrEmptyName))
return r
}
if key == record.TimeField {
r.err = errors.Join(r.err, fmt.Errorf("tag name %s invalid: %w", key, ErrInvalidTimeColumn))
return r
}
r.tags = append(r.tags, &fieldTuple{
Field: record.Field{
Name: key,
Type: record.FieldTypeTag,
},
value: value,
})
return r
}

func (r *recordLineBuilderImpl) AddTags(tags map[string]string) RecordBuilder {
for key, value := range tags {
r.AddTag(key, value)
}
return r
}

func (r *recordLineBuilderImpl) AddField(key string, value interface{}) RecordBuilder {
if key == "" {
r.err = errors.Join(r.err, fmt.Errorf("miss field name: %w", ErrEmptyName))
return r
}
if key == record.TimeField {
r.err = errors.Join(r.err, fmt.Errorf("field name %s invalid: %w", key, ErrInvalidTimeColumn))
return r
}
typ := record.FieldTypeUnknown
switch value.(type) {
case string:
typ = record.FieldTypeString
case float32, float64:
typ = record.FieldTypeFloat
case bool:
typ = record.FieldTypeBoolean
case int8, int16, int32, int64, uint8, uint16, uint32, uint64, int:
typ = record.FieldTypeInt
}
r.fields = append(r.fields, &fieldTuple{
Field: record.Field{
Name: key,
Type: typ,
},
value: value,
})
return r
}

func (r *recordLineBuilderImpl) AddFields(fields map[string]interface{}) RecordBuilder {
for key, value := range fields {
r.AddField(key, value)
}
return r
}

func (r *recordLineBuilderImpl) Build(t int64) RecordLine {
r.timestamp = t
return r
}
Loading