Skip to content

Commit

Permalink
updated connector pool
Browse files Browse the repository at this point in the history
  • Loading branch information
klarysz committed Jul 27, 2022
1 parent 0f4633c commit f91bba3
Show file tree
Hide file tree
Showing 5 changed files with 118 additions and 52 deletions.
1 change: 0 additions & 1 deletion cmd/relation.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,6 @@ func (s *serverBuilder) buildRelations(ctx context.Context, meta *metadata.Servi
return err
}

defer db.Close()
if err := meta.Info(context.Background(), db, info.KindPrimaryKeys, &pk, option.NewArgs("", s.options.Connector.DbName, s.options.Table)); err == nil && len(pk) > 0 {
for _, rel := range s.options.Relations {
if !strings.Contains(rel, ":") {
Expand Down
3 changes: 2 additions & 1 deletion reader/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,8 @@ func (s *Service) afterRead(session *Session, collector *view.Collector, start *
session.View.Logger.ReadTime(viewName, start, &end, err)
//TODO add to metrics record read
elapsed := end.Sub(*start)
session.Metrics = append(session.Metrics, &Metric{View: viewName, ElapsedMs: int(elapsed.Milliseconds()), Elapsed: elapsed.String(), Rows: collector.Len()})

session.AddMetric(&Metric{View: viewName, ElapsedMs: int(elapsed.Milliseconds()), Elapsed: elapsed.String(), Rows: collector.Len()})
if err != nil {
session.View.Counter.IncrementValue(Error)
} else {
Expand Down
8 changes: 8 additions & 0 deletions reader/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,13 @@ import (
"fmt"
"github.com/viant/datly/view"
"reflect"
"sync"
)

//Session groups view required to Read view
type (
Session struct {
mux sync.Mutex
Dest interface{} //slice
View *view.View
Selectors *view.Selectors
Expand Down Expand Up @@ -49,6 +51,12 @@ func (s *Session) AddCriteria(aView *view.View, criteria string, placeholders ..
sel.Placeholders = placeholders
}

func (s *Session) AddMetric(m *Metric) {
s.mux.Lock()
s.Metrics = append(s.Metrics, m)
s.mux.Unlock()
}

//NewSession creates a session
func NewSession(dest interface{}, aView *view.View) *Session {
return &Session{
Expand Down
36 changes: 10 additions & 26 deletions view/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/viant/scy/auth/gcp"
"github.com/viant/scy/auth/gcp/client"
"google.golang.org/api/option"
"sync"
"time"
)

Expand All @@ -23,9 +24,10 @@ type (
DSN string `json:",omitempty"`

//TODO add secure password storage
db *sql.DB
db func() (*sql.DB, error)
initialized bool
DBConfig
mux sync.Mutex
}

DBConfig struct {
Expand Down Expand Up @@ -77,9 +79,7 @@ func (c *Connector) Init(ctx context.Context, connectors Connectors) error {
//It is important to not close the DB since the connection is shared.
func (c *Connector) DB(ctx context.Context) (*sql.DB, error) {
if c.db != nil {
if err := c.db.PingContext(ctx); err == nil {
return c.db, nil
}
return c.db()
}

var err error
Expand All @@ -97,8 +97,12 @@ func (c *Connector) DB(ctx context.Context) (*sql.DB, error) {
c.setDriverOptions(secret)
}

c.db, err = aDbPool.DB(ctx, c.Driver, dsn, &c.DBConfig)
return c.db, err
c.mux.Lock()
c.db = aDbPool.DB(ctx, c.Driver, dsn, &c.DBConfig)
aDB, err := c.db()
c.mux.Unlock()

return aDB, err
}

//Validate check if connector was configured properly.
Expand Down Expand Up @@ -140,26 +144,6 @@ func (c *Connector) inherit(connector *Connector) {
}
}

//Reset reset connector
func (c *Connector) Reset() {
if c.db == nil {
return
}

_ = c.db.Close()
c.db = nil
}

//Close closes connector
func (c *Connector) Close() error {
if c.db == nil {
return nil
}
err := c.db.Close()
c.db = nil
return err
}

func (c *Connector) setDriverOptions(secret *scy.Secret) {
if secret == nil || c.initialized {
return
Expand Down
122 changes: 98 additions & 24 deletions view/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,12 @@ package view
import (
"context"
"database/sql"
"fmt"
as "github.com/aerospike/aerospike-client-go"
"strconv"
"strings"
"sync"
"time"
)

type (
Expand All @@ -16,8 +18,11 @@ type (
}

db struct {
mutex sync.Mutex
actual *sql.DB
mutex sync.Mutex
actual *sql.DB
ctx context.Context
cancelFunc context.CancelFunc
initialized bool
}

aerospikePool struct {
Expand Down Expand Up @@ -56,34 +61,43 @@ func newClientPool() *aerospikePool {
return &aerospikePool{index: map[string]*aerospikeClient{}}
}

func (d *db) connectWithLock(ctx context.Context, driver string, dsn string, config *DBConfig) (*sql.DB, error) {
func (d *db) initWithLock(ctx context.Context, driver string, dsn string, config *DBConfig) error {
d.mutex.Lock()
aDb, err := d.connect(ctx, driver, dsn, config)
err := d.initDatabase(ctx, driver, dsn, config)
//d.keepConnectionAlive(driver, dsn, config)
d.mutex.Unlock()

if err == nil && d.actual != aDb {
return err
}

d.actual = aDb
func (d *db) initDatabase(ctx context.Context, driver string, dsn string, config *DBConfig) error {
if d.initialized {
return nil
}

d.mutex.Unlock()
return aDb, err
d.initialized = true
var err error
d.actual, err = sql.Open(driver, dsn)
if d.actual != nil {
d.configureDB(config, d.actual)
}

return err
}

func (d *db) connect(ctx context.Context, driver string, dsn string, c *DBConfig) (*sql.DB, error) {
if d.actual != nil {
if err := d.actual.PingContext(ctx); err != nil {
d.actual = nil
return d.connect(ctx, driver, dsn, c)
}
func (d *db) connect() (*sql.DB, error) {
d.mutex.Lock()
aDb := d.actual
d.mutex.Unlock()

return d.actual, nil
if aDb == nil {
return nil, fmt.Errorf("no connection with database is available")
}

aDb, err := sql.Open(driver, dsn)
if err != nil {
return nil, err
}
return aDb, nil
}

func (d *db) configureDB(c *DBConfig, aDb *sql.DB) {
if c.MaxIdleConns != 0 {
aDb.SetMaxIdleConns(c.MaxIdleConns)
}
Expand All @@ -99,11 +113,62 @@ func (d *db) connect(ctx context.Context, driver string, dsn string, c *DBConfig
if c.ConnMaxLifetimeMs != 0 {
aDb.SetConnMaxLifetime(c.ConnMaxLifetime())
}
}

func (d *db) keepConnectionAlive(driver string, dsn string, config *DBConfig) {
if d.cancelFunc != nil {
return
}

newCtx := context.Background()
cancel, cancelFunc := context.WithCancel(newCtx)

d.ctx = cancel
d.cancelFunc = cancelFunc

go func(driver, dsn string, config *DBConfig) {
for {
time.Sleep(time.Second * time.Duration(10))

select {
case <-cancel.Done():
return
default:
d.mutex.Lock()
aDb := d.actual
d.mutex.Unlock()

var err error
if aDb != nil {
err = aDb.PingContext(d.ctx)
}

if err != nil || aDb == nil {
newDb, err := sql.Open(driver, dsn)
d.mutex.Lock()
d.actual = newDb
if newDb != nil {
d.configureDB(config, newDb)
}
d.mutex.Unlock()

err = newDb.PingContext(d.ctxWithTimeout(time.Duration(5) * time.Second))
if err != nil {
fmt.Printf("[INFO] couldn't connect to one of %v database \n", driver)
}
}
}
}
}(driver, dsn, config)
}

return aDb, err
func (d *db) ctxWithTimeout(duration time.Duration) context.Context {
background := context.Background()
ctxWithTimeout, _ := context.WithTimeout(background, duration)
return ctxWithTimeout
}

func (p *dbPool) DB(ctx context.Context, driver, dsn string, config *DBConfig) (*sql.DB, error) {
func (p *dbPool) DB(ctx context.Context, driver, dsn string, config *DBConfig) func() (*sql.DB, error) {
builder := &strings.Builder{}
builder.WriteString(strconv.Itoa(config.ConnMaxLifetimeMs))
builder.WriteByte('#')
Expand All @@ -118,16 +183,21 @@ func (p *dbPool) DB(ctx context.Context, driver, dsn string, config *DBConfig) (
builder.WriteString(dsn)

actualKey := builder.String()
dbConn := p.getItem(actualKey)
dbConn := p.getItem(ctx, actualKey, driver, dsn, config)

return dbConn.connectWithLock(ctx, driver, dsn, config)
return dbConn.connect
}

func (p *dbPool) getItem(key string) *db {
func (p *dbPool) getItem(ctx context.Context, key string, driver string, dsn string, config *DBConfig) *db {
p.mutex.Lock()
item, ok := p.index[key]
if !ok {
item = &db{}
err := item.initWithLock(ctx, driver, dsn, config)
if err != nil {
fmt.Printf("error occured while initializing db %v\n", err.Error())
}

p.index[key] = item
}

Expand All @@ -136,6 +206,10 @@ func (p *dbPool) getItem(key string) *db {
}

func ResetDBPool() {
for _, dbItem := range aDbPool.index {
dbItem.cancelFunc()
}

aDbPool = newPool()
}

Expand Down

0 comments on commit f91bba3

Please sign in to comment.