Skip to content

Commit

Permalink
i
Browse files Browse the repository at this point in the history
  • Loading branch information
zjwshisb committed Jan 7, 2025
1 parent 81be659 commit 1ee1675
Show file tree
Hide file tree
Showing 8 changed files with 49 additions and 51 deletions.
21 changes: 16 additions & 5 deletions internal/cron/cron.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package cron

import (
"context"
"gf-chat/internal/dao"
"gf-chat/internal/model/do"
"gf-chat/internal/service"
"github.com/gogf/gf/v2/frame/g"
Expand All @@ -21,10 +20,22 @@ func Run() {
for _, admin := range admins {
invalidIds := service.ChatRelation().GetInvalidUsers(ctx, admin.Id)
if len(invalidIds) > 0 {
dao.CustomerChatSessions.Ctx(ctx).Where(g.Map{
"user_id": invalidIds,
"admin_id": admin.Id,
})
sessions, err := service.ChatSession().All(ctx, g.Map{
"user_id": invalidIds,
"admin_id": admin.Id,
"broken_at is null": nil,
}, nil, nil)
if err != nil {
g.Log().Errorf(ctx, "%+v", err)
}
for _, session := range sessions {
if session.BrokenAt == nil {
err := service.ChatSession().Close(ctx, session, false, false)
if err != nil {
g.Log().Errorf(ctx, "%+v", err)
}
}
}
}
}
})
Expand Down
20 changes: 2 additions & 18 deletions internal/logic/chat/admin_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,6 @@ func (m *adminManager) deliveryMessage(ctx context.Context, msg *model.CustomerC
}
}

func (m *adminManager) sendWaiting(admin *model.CustomerAdmin, user iChatUser) {

}

func (m *adminManager) sendOffline(admin *model.CustomerAdmin, msg *model.CustomerChatMessage) {

}
Expand Down Expand Up @@ -100,8 +96,8 @@ func (m *adminManager) handleMessage(ctx context.Context, arg eventArg) error {
conn.deliver(action.newErrorMessage("该用户已失效,无法发送消息"))
return gerror.NewCode(gcode.CodeValidationFailed, "该用户已失效,无法发送消息")
}
session, _ := service.ChatSession().FirstActive(ctx, msg.UserId, conn.getUserId(), nil)
if session == nil {
session, err := service.ChatSession().FirstActive(ctx, msg.UserId, conn.getUserId(), nil)
if err != nil {
conn.deliver(action.newErrorMessage("无效的用户"))
return gerror.NewCode(gcode.CodeValidationFailed, "无效的用户")
}
Expand Down Expand Up @@ -269,18 +265,6 @@ func (m *adminManager) noticeLocalUserOnline(ctx context.Context, uid uint, plat
}
}

//func (m *adminManager) noticeRepeatConnect(admin iChatUser) {
// m.noticeLocalRepeatConnect(admin)
//}
//
//
//func (m *adminManager) noticeLocalRepeatConnect(admin iChatUser) {
// conn, exist := m.getConn(admin.getCustomerId(), admin.getPrimaryKey())
// if exist && conn.getUuid() != m.GetUserUuid(admin) {
// m.SendAction(NewOtherLogin(), conn)
// }
//}

func (m *adminManager) noticeUserTransfer(ctx context.Context, customerId, adminId uint) error {
return m.noticeLocalUserTransfer(ctx, customerId, adminId)
}
Expand Down
21 changes: 13 additions & 8 deletions internal/logic/chat/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ type iWsConn interface {
getUuid() string
getPlatform() string
getCustomerId() uint
getLastActive() *gtime.Time
createTime() *gtime.Time
}

Expand All @@ -38,12 +39,13 @@ type client struct {
closeSignal chan interface{} // 连接断开后的广播通道,用于中断readMsg,sendMsg goroutine
send chan *v1.ChatAction // 发送的消息chan
sync.Once
manager connManager
user iChatUser
uuid string
created *gtime.Time
limiter *rate.Limiter
platform string
manager connManager
user iChatUser
uuid string
created *gtime.Time
limiter *rate.Limiter
lastActive *gtime.Time
platform string
}

func newClient(conn *websocket.Conn, user iChatUser, platform string) *client {
Expand All @@ -57,9 +59,12 @@ func newClient(conn *websocket.Conn, user iChatUser, platform string) *client {
created: gtime.Now(),
limiter: rate.NewLimiter(5, 10),
platform: platform,
lastActive: gtime.Now(),
}
}

func (c *client) getLastActive() *gtime.Time {
return c.lastActive
}
func (c *client) createTime() *gtime.Time {
return c.created
}
Expand Down Expand Up @@ -208,8 +213,8 @@ func (c *client) readMsg() {
Msg: msg,
Conn: c,
})
c.lastActive = gtime.Now()
}

}
}

Expand Down
13 changes: 11 additions & 2 deletions internal/logic/chat/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"github.com/duke-git/lancet/v2/slice"
"github.com/gogf/gf/v2/frame/g"
"github.com/gogf/gf/v2/os/gctx"
"github.com/gogf/gf/v2/os/gtime"
"sync"
"time"

Expand Down Expand Up @@ -263,7 +264,7 @@ func (m *manager) noticeRead(customerId, adminId uint, msgIds []uint) {
func (m *manager) ping() {
duration := m.pingDuration
if duration == 0 {
duration = time.Second * 10
duration = time.Second * 60
}
ticker := time.NewTicker(duration)
for {
Expand All @@ -272,7 +273,15 @@ func (m *manager) ping() {
ping := action.newPing()
for _, s := range m.shard {
conns := s.getAll()
m.SendAction(ping, conns...)
for _, conn := range conns {
// 如果连接超过60分钟没有活动,则关闭连接
duration := gtime.Now().Second() - conn.getLastActive().Second()
if duration > 60*60 {
conn.close()
} else {
m.SendAction(ping, conn)
}
}
}
}
}
Expand Down
12 changes: 1 addition & 11 deletions internal/logic/chat/user_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -346,17 +346,7 @@ func (s *userManager) addToManual(ctx context.Context, user iChatUser) (session
if err != nil {
return nil, err
}
// 没有客服在线则发送公众号消息
go func() {
if onlineServerCount == 0 {
admins, _ := service.Admin().All(ctx, do.CustomerAdmins{
CustomerId: user.getCustomerId(),
}, nil, nil)
for _, admin := range admins {
adminM.sendWaiting(admin, user)
}
}
}()

return
}

Expand Down
10 changes: 5 additions & 5 deletions internal/logic/chatsession/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@ import (
)

func init() {
service.RegisterChatSession(new())
service.RegisterChatSession(newSChatSession())
}

func new() *sChatSession {
func newSChatSession() *sChatSession {
return &sChatSession{
trait.Curd[model.CustomerChatSession]{
Dao: &dao.CustomerChatSessions,
Expand Down Expand Up @@ -56,12 +56,12 @@ func (s *sChatSession) Cancel(ctx context.Context, session *model.CustomerChatSe

// Close 关闭会话
func (s *sChatSession) Close(ctx context.Context, session *model.CustomerChatSession, isRemoveUser bool, updateTime bool) (err error) {
if session.BrokenAt != nil {
return gerror.NewCode(gcode.CodeBusinessValidationFailed, "会话关闭,请勿重复操作")
}
if session.AcceptedAt == nil {
return gerror.NewCode(gcode.CodeBusinessValidationFailed, "未接入会话无法断开")
}
if session.BrokenAt != nil {
return gerror.NewCode(gcode.CodeBusinessValidationFailed, "会话关闭,请勿重复操作")
}
_, err = s.UpdatePri(ctx, session.Id, do.CustomerChatSessions{
BrokenAt: gtime.Now(),
})
Expand Down
1 change: 1 addition & 0 deletions internal/logic/platform/platform.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ type sPlatform struct {
}

// GetPlatform 获取用户的平台
// todo
func (p sPlatform) GetPlatform(ctx context.Context) string {
request := g.RequestFromCtx(ctx)
_ = request.GetHeader("aaa")
Expand Down
2 changes: 0 additions & 2 deletions internal/logic/setup/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"gf-chat/internal/model/entity"
"gf-chat/internal/service"
_ "github.com/gogf/gf/contrib/drivers/mysql/v2"
"github.com/gogf/gf/v2/frame/g"
"github.com/gogf/gf/v2/os/gctx"
)

Expand Down Expand Up @@ -138,7 +137,6 @@ func (s *sSetup) Setup(ctx gctx.Ctx, customerId uint) {
}
if !exists {
rule.CustomerId = customerId
g.Dump(rule)
_, err = service.AutoRule().Save(ctx, rule)
if err != nil {
panic(err)
Expand Down

0 comments on commit 1ee1675

Please sign in to comment.