-
Notifications
You must be signed in to change notification settings - Fork 11
/
Copy pathconnection.go
executable file
·156 lines (131 loc) · 4.36 KB
/
connection.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
package dsc
import (
"log"
"time"
)
//AbstractConnection represents an abstract connection
type AbstractConnection struct {
Connection
lastUsed *time.Time
config *Config
connectionPool chan Connection
}
//Config returns a datastore config
func (ac *AbstractConnection) Config() *Config {
return ac.config
}
//ConnectionPool returns a connection channel
func (ac *AbstractConnection) ConnectionPool() chan Connection {
return ac.connectionPool
}
//LastUsed returns a last used time
func (ac *AbstractConnection) LastUsed() *time.Time {
return ac.lastUsed
}
//SetLastUsed sets last used time
func (ac *AbstractConnection) SetLastUsed(ts *time.Time) {
ac.lastUsed = ts
}
//Close closes connection if pool is full or send it back to the pool
func (ac *AbstractConnection) Close() error {
channel := ac.Connection.ConnectionPool()
config := ac.config
if len(ac.Connection.ConnectionPool()) < config.MaxPoolSize {
var connection = ac.Connection
channel <- connection
var ts = time.Now()
connection.SetLastUsed(&ts)
} else {
return ac.Connection.CloseNow()
}
return nil
}
//Begin starts a transaction - this method is an abstract method
func (ac *AbstractConnection) Begin() error { return nil }
//Commit finishes current transaction - this method is an abstract method
func (ac *AbstractConnection) Commit() error { return nil }
//Rollback - discards transaction - this method is an abstract method
func (ac *AbstractConnection) Rollback() error { return nil }
//NewAbstractConnection create a new abstract connection
func NewAbstractConnection(config *Config, connectionPool chan Connection, connection Connection) *AbstractConnection {
return &AbstractConnection{config: config, connectionPool: connectionPool, Connection: connection}
}
//AbstractConnectionProvider represents an abstract/superclass ConnectionProvider
type AbstractConnectionProvider struct {
ConnectionProvider
config *Config
connectionPool chan Connection
}
//Config returns a datastore config,
func (cp *AbstractConnectionProvider) Config() *Config {
return cp.config
}
//ConnectionPool returns a ConnectionPool
func (cp *AbstractConnectionProvider) ConnectionPool() chan Connection {
return cp.connectionPool
}
//SpawnConnectionIfNeeded creates a new connection if connection pool has not reached size controlled by Config.PoolSize
func (cp *AbstractConnectionProvider) SpawnConnectionIfNeeded() {
config := cp.ConnectionProvider.Config()
if config.PoolSize == 0 {
config.PoolSize = 1
}
connectionPool := cp.ConnectionProvider.ConnectionPool()
for i := len(connectionPool); i < config.PoolSize; i++ {
connection, err := cp.ConnectionProvider.NewConnection()
if err != nil {
log.Printf("failed to create connection %v\n", err)
break
}
select {
case <-time.After(1 * time.Second):
log.Fatalf("failed to add connection to queue (size: %v, cap:%v)", len(connectionPool), cap(connectionPool))
case connectionPool <- connection:
}
}
}
//Close closes a datastore connection or returns it to the pool (Config.PoolSize and Config.MaxPoolSize).
func (cp *AbstractConnectionProvider) Close() error {
poolsize := len(cp.connectionPool)
for i := 0; i < poolsize; i++ {
var connection Connection
select {
case <-time.After(1 * time.Second):
case connection = <-cp.connectionPool:
err := connection.CloseNow()
if err != nil {
return err
}
}
}
// 防止池子中再回收进新连接
if len(cp.connectionPool) > 0 {
cp.Close()
}
return nil
}
//Get returns a new datastore connection or error.
func (cp *AbstractConnectionProvider) Get() (Connection, error) {
cp.ConnectionProvider.SpawnConnectionIfNeeded()
connectionPool := cp.ConnectionProvider.ConnectionPool()
var result Connection
select {
case <-time.After(100 * time.Millisecond):
{
Logf("unable to acquire connection from pool, creating new connection ...")
}
case result = <-connectionPool:
}
if result == nil {
var err error
result, err = cp.ConnectionProvider.NewConnection()
if err != nil {
return nil, err
}
}
return result, nil
}
//NewAbstractConnectionProvider create a new AbstractConnectionProvider
func NewAbstractConnectionProvider(config *Config, connectionPool chan Connection, connectionProvider ConnectionProvider) *AbstractConnectionProvider {
return &AbstractConnectionProvider{config: config, connectionPool: connectionPool, ConnectionProvider: connectionProvider}
}