Skip to content

Commit

Permalink
Merge pull request #4 from bricks-cloud/v0.0.6
Browse files Browse the repository at this point in the history
[v0.0.6] Fix log format issue in production mode
  • Loading branch information
spikelu2016 authored Sep 4, 2023
2 parents 2b32efd + fa7cda6 commit e93c91c
Show file tree
Hide file tree
Showing 12 changed files with 180 additions and 105 deletions.
10 changes: 9 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,14 @@
## 0.0.6 - 2023-09-03
### Added
- Added support of cost estimation for fined tuned models
- Added logging for openAI error response

### Fixed
- Fixed incorrect logging format in production environment

## 0.0.5 - 2023-09-03
### Added
- Add logging for admin and proxy web servers
- Added logging for admin and proxy web servers

## 0.0.4 - 2023-08-30
### Fixed
Expand Down
39 changes: 19 additions & 20 deletions cmd/bricksllm/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,48 +30,47 @@ func main() {

flag.Parse()

lg := zap.NewLogger(*modePtr)
log := zap.NewZapLogger(*modePtr)

gin.SetMode(gin.ReleaseMode)

cfg, err := config.ParseEnvVariables()
if err != nil {
lg.Fatalf("cannot parse environment variables: %v", err)
log.Sugar().Fatalf("cannot parse environment variables: %v", err)
}

store, err := postgresql.NewStore(
fmt.Sprintf("postgresql:///%s?sslmode=%s&user=%s&password=%s&host=%s&port=%s", cfg.PostgresqlDbName, cfg.PostgresqlSslMode, cfg.PostgresqlUsername, cfg.PostgresqlPassword, cfg.PostgresqlHosts, cfg.PostgresqlPort),
lg,
cfg.PostgresqlWriteTimeout,
cfg.PostgresqlReadTimeout,
)

if err != nil {
lg.Fatalf("cannot connect to postgresql: %v", err)
log.Sugar().Fatalf("cannot connect to postgresql: %v", err)
}

err = store.CreateKeysTable()
if err != nil {
lg.Fatalf("error creating keys table: %v", err)
log.Sugar().Fatalf("error creating keys table: %v", err)
}

memStore, err := memdb.NewMemDb(store, lg, cfg.InMemoryDbUpdateInterval)
memStore, err := memdb.NewMemDb(store, log, cfg.InMemoryDbUpdateInterval)
if err != nil {
lg.Fatalf("cannot initialize memdb: %v", err)
log.Sugar().Fatalf("cannot initialize memdb: %v", err)
}

memStore.Listen()

e := encrypter.NewEncrypter()
m := manager.NewManager(store, e)
as, err := web.NewAdminServer(lg, *modePtr, m)
as, err := web.NewAdminServer(log, *modePtr, m)
if err != nil {
lg.Fatalf("error creating admin http server: %v", err)
log.Sugar().Fatalf("error creating admin http server: %v", err)
}

tc, err := openai.NewTokenCounter()
if err != nil {
lg.Fatalf("error creating token counter: %v", err)
log.Sugar().Fatalf("error creating token counter: %v", err)
}

as.Run()
Expand All @@ -84,7 +83,7 @@ func main() {
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
if err := rateLimitRedisCache.Ping(ctx).Err(); err != nil {
lg.Fatalf("error connecting to rate limit redis cache: %v", err)
log.Sugar().Fatalf("error connecting to rate limit redis cache: %v", err)
}

costLimitRedisCache := redis.NewClient(&redis.Options{
Expand All @@ -96,7 +95,7 @@ func main() {
ctx, cancel = context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
if err := costLimitRedisCache.Ping(ctx).Err(); err != nil {
lg.Fatalf("error connecting to cost limit redis cache: %v", err)
log.Sugar().Fatalf("error connecting to cost limit redis cache: %v", err)
}

costLimitRedisStorage := redis.NewClient(&redis.Options{
Expand All @@ -108,7 +107,7 @@ func main() {
ctx, cancel = context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
if err := costLimitRedisStorage.Ping(ctx).Err(); err != nil {
lg.Fatalf("error connecting to cost limit redis storage: %v", err)
log.Sugar().Fatalf("error connecting to cost limit redis storage: %v", err)
}

rateLimitCache := redisStorage.NewCache(rateLimitRedisCache, cfg.RedisWriteTimeout, cfg.RedisReadTimeout)
Expand All @@ -120,9 +119,9 @@ func main() {
rec := recorder.NewRecorder(costLimitStorage, costLimitCache, ce)
rlm := manager.NewRateLimitManager(rateLimitCache)

ps, err := web.NewProxyServer(lg, *modePtr, *privacyPtr, m, store, memStore, ce, v, rec, cfg.OpenAiKey, e, rlm)
ps, err := web.NewProxyServer(log, *modePtr, *privacyPtr, m, store, memStore, ce, v, rec, cfg.OpenAiKey, e, rlm)
if err != nil {
lg.Fatalf("error creating proxy http server: %v", err)
log.Sugar().Fatalf("error creating proxy http server: %v", err)
}

ps.Run()
Expand All @@ -133,24 +132,24 @@ func main() {

memStore.Stop()

lg.Infof("shutting down server...")
log.Sugar().Infof("shutting down server...")

ctx, cancel = context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
if err := as.Shutdown(ctx); err != nil {
lg.Debugf("admin server shutdown: %v", err)
log.Sugar().Debugf("admin server shutdown: %v", err)
}

ctx, cancel = context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
if err := ps.Shutdown(ctx); err != nil {
lg.Debugf("proxy server shutdown: %v", err)
log.Sugar().Debugf("proxy server shutdown: %v", err)
}

select {
case <-ctx.Done():
lg.Infof("timeout of 5 seconds")
log.Info("timeout of 5 seconds")
}

lg.Infof("server exited")
log.Info("server exited")
}
43 changes: 21 additions & 22 deletions cmd/tool/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (

"github.com/bricks-cloud/bricksllm/internal/config"
"github.com/bricks-cloud/bricksllm/internal/encrypter"
"github.com/bricks-cloud/bricksllm/internal/logger/zap"
logger "github.com/bricks-cloud/bricksllm/internal/logger/zap"
"github.com/bricks-cloud/bricksllm/internal/manager"
"github.com/bricks-cloud/bricksllm/internal/provider/openai"
"github.com/bricks-cloud/bricksllm/internal/recorder"
Expand All @@ -29,48 +29,47 @@ func main() {
privacyPtr := flag.String("p", "strict", "select the privacy mode that bricksllm runs in")
flag.Parse()

lg := zap.NewLogger(*modePtr)
log := logger.NewZapLogger(*modePtr)

gin.SetMode(gin.ReleaseMode)

cfg, err := config.ParseEnvVariables()
if err != nil {
lg.Fatalf("cannot parse environment variables: %v", err)
log.Sugar().Fatalf("cannot parse environment variables: %v", err)
}

store, err := postgresql.NewStore(
fmt.Sprintf("postgresql:///%s?sslmode=%s&user=%s&password=%s&host=%s&port=%s", cfg.PostgresqlDbName, cfg.PostgresqlSslMode, cfg.PostgresqlUsername, cfg.PostgresqlPassword, cfg.PostgresqlHosts, cfg.PostgresqlPort),
lg,
cfg.PostgresqlWriteTimeout,
cfg.PostgresqlReadTimeout,
)

if err != nil {
lg.Fatalf("cannot connect to postgresql: %v", err)
log.Sugar().Fatalf("cannot connect to postgresql: %v", err)
}

err = store.CreateKeysTable()
if err != nil {
lg.Fatalf("error creating keys table: %v", err)
log.Sugar().Fatalf("error creating keys table: %v", err)
}

memStore, err := memdb.NewMemDb(store, lg, cfg.InMemoryDbUpdateInterval)
memStore, err := memdb.NewMemDb(store, log, cfg.InMemoryDbUpdateInterval)
if err != nil {
lg.Fatalf("cannot initialize memdb: %v", err)
log.Sugar().Fatalf("cannot initialize memdb: %v", err)
}

memStore.Listen()

e := encrypter.NewEncrypter()
m := manager.NewManager(store, e)
as, err := web.NewAdminServer(lg, *modePtr, m)
as, err := web.NewAdminServer(log, *modePtr, m)
if err != nil {
lg.Fatalf("error creating admin http server: %v", err)
log.Sugar().Fatalf("error creating admin http server: %v", err)
}

tc, err := openai.NewTokenCounter()
if err != nil {
lg.Fatalf("error creating token counter: %v", err)
log.Sugar().Fatalf("error creating token counter: %v", err)
}

as.Run()
Expand All @@ -83,7 +82,7 @@ func main() {
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
if err := rateLimitRedisCache.Ping(ctx).Err(); err != nil {
lg.Fatalf("error connecting to rate limit redis cache: %v", err)
log.Sugar().Fatalf("error connecting to rate limit redis cache: %v", err)
}

costLimitRedisCache := redis.NewClient(&redis.Options{
Expand All @@ -95,7 +94,7 @@ func main() {
ctx, cancel = context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
if err := costLimitRedisCache.Ping(ctx).Err(); err != nil {
lg.Fatalf("error connecting to cost limit redis cache: %v", err)
log.Sugar().Fatalf("error connecting to cost limit redis cache: %v", err)
}

costLimitRedisStorage := redis.NewClient(&redis.Options{
Expand All @@ -107,7 +106,7 @@ func main() {
ctx, cancel = context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
if err := costLimitRedisStorage.Ping(ctx).Err(); err != nil {
lg.Fatalf("error connecting to cost limit redis storage: %v", err)
log.Sugar().Fatalf("error connecting to cost limit redis storage: %v", err)
}

rateLimitCache := redisStorage.NewCache(rateLimitRedisCache, cfg.RedisWriteTimeout, cfg.RedisReadTimeout)
Expand All @@ -119,9 +118,9 @@ func main() {
rec := recorder.NewRecorder(costLimitStorage, costLimitCache, ce)
rlm := manager.NewRateLimitManager(rateLimitCache)

ps, err := web.NewProxyServer(lg, *modePtr, *privacyPtr, m, store, memStore, ce, v, rec, cfg.OpenAiKey, e, rlm)
ps, err := web.NewProxyServer(log, *modePtr, *privacyPtr, m, store, memStore, ce, v, rec, cfg.OpenAiKey, e, rlm)
if err != nil {
lg.Fatalf("error creating proxy http server: %v", err)
log.Sugar().Fatalf("error creating proxy http server: %v", err)
}

ps.Run()
Expand All @@ -132,29 +131,29 @@ func main() {

memStore.Stop()

lg.Infof("shutting down server...")
log.Sugar().Info("shutting down server...")

ctx, cancel = context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
if err := as.Shutdown(ctx); err != nil {
lg.Debugf("admin server shutdown: %v", err)
log.Sugar().Debugf("admin server shutdown: %v", err)
}

ctx, cancel = context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
if err := ps.Shutdown(ctx); err != nil {
lg.Debugf("proxy server shutdown: %v", err)
log.Sugar().Debugf("proxy server shutdown: %v", err)
}

select {
case <-ctx.Done():
lg.Infof("timeout of 5 seconds")
log.Sugar().Infof("timeout of 5 seconds")
}

err = store.DropKeysTable()
if err != nil {
lg.Fatalf("error dropping keys table: %v", err)
log.Sugar().Fatalf("error dropping keys table: %v", err)
}

lg.Infof("server exited")
log.Sugar().Infof("server exited")
}
2 changes: 1 addition & 1 deletion config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -323,7 +323,7 @@ func parseRouteConfig(rc *RouteConfig, isOpenAiConfigured bool) error {
}

if !rc.OpenAiConfig.Model.Valid() {
return errors.New("open ai model must be of gpt-3.5-turbo, gpt-3.5-turbo-16k, gpt-3.5-turbo-0613, gpt-3.5-turbo-16k-0613, gpt-4, gpt-4-0613, gpt-4-32k and gpt-4-32k-0613")
return errors.New("openai model must be of gpt-3.5-turbo, gpt-3.5-turbo-16k, gpt-3.5-turbo-0613, gpt-3.5-turbo-16k-0613, gpt-4, gpt-4-0613, gpt-4-32k and gpt-4-32k-0613")
}

for _, prompt := range rc.OpenAiConfig.Prompts {
Expand Down
8 changes: 4 additions & 4 deletions internal/client/openai/openai.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func (c OpenAiClient) Send(rc *config.OpenAiRouteConfig, prompts []*config.OpenA
bodyReader := bytes.NewReader(b)

if err != nil {
return nil, fmt.Errorf("error when marhsalling open ai json payload: %v", err)
return nil, fmt.Errorf("error when marhsalling openai json payload: %v", err)
}

req, err := http.NewRequest(http.MethodPost, "https://api.openai.com/v1/chat/completions", bodyReader)
Expand Down Expand Up @@ -135,11 +135,11 @@ func (c OpenAiClient) Send(rc *config.OpenAiRouteConfig, prompts []*config.OpenA
openAiErr := &OpenAiErrorResponse{}
err = json.Unmarshal(b, openAiErr)
if err != nil {
return nil, fmt.Errorf("error unmarshaling open ai error response : %w", err)
return nil, fmt.Errorf("error unmarshaling openai error response : %w", err)
}

if openAiErr.Error == nil {
return nil, fmt.Errorf("cannot parse open ai error response : %w", err)
return nil, fmt.Errorf("cannot parse openai error response : %w", err)
}

return nil, NewOpenAiError(openAiErr.Error.Message, openAiErr.Error.Type, res.StatusCode)
Expand All @@ -150,7 +150,7 @@ func (c OpenAiClient) Send(rc *config.OpenAiRouteConfig, prompts []*config.OpenA
openaiRes := &OpenAiResponse{}
err = json.Unmarshal(b, openaiRes)
if err != nil {
return nil, fmt.Errorf("error unmarshaling open ai response : %w", err)
return nil, fmt.Errorf("error unmarshaling openai response : %w", err)
}

lm.SetEstimatedCost(EstimateCost(string(rc.Model), openaiRes.Usage.PromptTokens, openaiRes.Usage.CompletionTokens))
Expand Down
41 changes: 41 additions & 0 deletions internal/logger/zap/zap.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,3 +115,44 @@ func NewLogger(mode string) logger.Logger {

return zapLogger.Sugar()
}

func NewZapLogger(mode string) *zap.Logger {
rawJSON := []byte(`{
"level": "debug",
"encoding": "json",
"outputPaths": ["stdout"],
"errorOutputPaths": ["stderr"],
"encoderConfig": {
"messageKey": "message",
"levelKey": "level",
"levelEncoder": "lowercase"
}
}`)

var cfg zap.Config

if err := json.Unmarshal(rawJSON, &cfg); err != nil {
panic(err)
}

if mode == "production" {
cfg.Level = zap.NewAtomicLevelAt(zapcore.InfoLevel)
return zap.Must(cfg.Build())
}

cfg.EncoderConfig.LevelKey = zapcore.OmitKey

enc := &prependEncoder{
Encoder: zapcore.NewConsoleEncoder(cfg.EncoderConfig),
pool: buffer.NewPool(),
cfg: cfg.EncoderConfig,
}

zapLogger := zap.New(zapcore.NewCore(
enc,
zapcore.AddSync(colorable.NewColorableStdout()),
zapcore.DebugLevel,
))

return zapLogger
}
Loading

0 comments on commit e93c91c

Please sign in to comment.