Skip to content

Commit

Permalink
Merge pull request #1633 from memphisdev/RND-322-check-all-authentica…
Browse files Browse the repository at this point in the history
…tion-types-in-kafka-connector-and-support-ssl-in-conf

RND-322-check-all-authentication-types-in-kafka-connector-and-support-ssl-in-conf
  • Loading branch information
daniel-davidd authored Jan 14, 2024
2 parents 97c01ae + 9f929ea commit 51a4daa
Show file tree
Hide file tree
Showing 12 changed files with 234 additions and 116 deletions.
12 changes: 6 additions & 6 deletions db/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -3354,7 +3354,7 @@ func CountActiveConsumersInCG(consumersGroup string, stationId int) (int64, erro
return 0, err
}
defer conn.Release()
query := `SELECT COUNT(*) FROM consumers WHERE station_id = $1 AND consumers_group = $2 AND is_active = true AND type = 'application'`
query := `SELECT COUNT(*) FROM consumers WHERE station_id = $1 AND consumers_group = $2 AND is_active = true`
stmt, err := conn.Conn().Prepare(ctx, "count_active_consumers_in_cg", query)
if err != nil {
return 0, err
Expand Down Expand Up @@ -8119,7 +8119,7 @@ func UpdatePermissions(tenantName, username string, readPermissions, writePermis
return nil
}

func CheckUserStationPermissions(rolesId []int, stationName string) (bool, error) {
func CheckUserStationPermissions(rolesId []int, stationName, operation string) (bool, error) {
ctx, cancelfunc := context.WithTimeout(context.Background(), DbOperationTimeout*time.Second)
defer cancelfunc()

Expand All @@ -8131,18 +8131,18 @@ func CheckUserStationPermissions(rolesId []int, stationName string) (bool, error
query := `SELECT COUNT(*)
FROM permissions
WHERE role_id = ANY($1)
AND type = 'write'
AND type = $2
AND restriction_type = 'allow'
AND (
(position('*' in pattern) > 0 AND $2 ~ pattern) OR
(position('*' in pattern) = 0 AND $2 = pattern)
(position('*' in pattern) > 0 AND $3 ~ pattern) OR
(position('*' in pattern) = 0 AND $3 = pattern)
);`
stmt, err := conn.Conn().Prepare(ctx, "check_user_station_permissions", query)
if err != nil {
return false, err
}
var count int
err = conn.Conn().QueryRow(ctx, stmt.Name, rolesId, stationName).Scan(&count)
err = conn.Conn().QueryRow(ctx, stmt.Name, rolesId, operation, stationName).Scan(&count)
if err != nil {
return false, err
}
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ require (
github.com/leodido/go-urn v1.2.4 // indirect
github.com/mailru/easyjson v0.7.7 // indirect
github.com/mattn/go-isatty v0.0.19 // indirect
github.com/memphisdev/memphis.go v1.3.2-beta.1
github.com/memphisdev/memphis.go v1.3.1
github.com/moby/spdystream v0.2.0 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,8 @@ github.com/mattn/go-isatty v0.0.12/go.mod h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Ky
github.com/mattn/go-isatty v0.0.14/go.mod h1:7GGIvUiUoEMVVmxf/4nioHXj79iQHKdU27kJ6hsGG94=
github.com/mattn/go-isatty v0.0.19 h1:JITubQf0MOLdlGRuRq+jtsDlekdYPia9ZFsB8h/APPA=
github.com/mattn/go-isatty v0.0.19/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y=
github.com/memphisdev/memphis.go v1.3.1 h1:WbJ2iinvxtOZPZ6rLYFGIlHW27jU30nYqovBlS1md1Y=
github.com/memphisdev/memphis.go v1.3.1/go.mod h1:KurLqbBBZ5PMabJuOh3JX9VpSykRsog1QQKcwW5b9bU=
github.com/memphisdev/memphis.go v1.3.2-beta.1 h1:KsYWa3AcKHuvjc9AbSOAM19pIcjttmIYqrhJ+i8D1iA=
github.com/memphisdev/memphis.go v1.3.2-beta.1/go.mod h1:jZKJ82lyQHr01QissJUdDrnj0KT5wXjh2irhb+j0JH8=
github.com/minio/highwayhash v1.0.2 h1:Aak5U0nElisjDCfPSG79Tgzkn2gl66NxOMspRrKnA/g=
Expand Down
2 changes: 1 addition & 1 deletion server/memphis_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ func CreateDefaultStation(tenantName string, s *Server, sn StationName, user mod
return models.Station{}, false, err
}

allowed, ReloadNeeded, err := ValidateStationPermissions(user.Roles, sn.Ext(), user.TenantName)
allowed, ReloadNeeded, err := ValidateStationPermissions(user.Roles, sn.Ext(), user.TenantName, "write")
if err != nil {
return models.Station{}, false, err
}
Expand Down
2 changes: 1 addition & 1 deletion server/memphis_handlers_consumers.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ func (s *Server) createConsumerDirectCommon(c *client, consumerName, cStationNam
serv.Warnf("[tenant: %v]createConsumerDirectCommon at CreateDefaultStation: Consumer %v at station %v : %v", tenantName, consumerName, cStationName, err.Error())
return []int{}, err
}
allowed, _, err := ValidateStationPermissions(user.Roles, cStationName, user.TenantName)
allowed, _, err := ValidateStationPermissions(user.Roles, cStationName, user.TenantName, "read")
if err != nil {
serv.Errorf("[tenant: %v][user:%v]createConsumerDirectCommon at ValidateStationPermissions: Station %v: %v", user.TenantName, user.Username, cStationName, err.Error())
return []int{}, err
Expand Down
2 changes: 1 addition & 1 deletion server/memphis_handlers_producers.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ func (s *Server) createProducerDirectCommon(c *client, pName, pType, pConnection
serv.Warnf("[tenant: %v]createProducerDirectCommon : Producer %v at station %v : %v", user.TenantName, pName, pStationName, err.Error())
return false, false, err, models.Station{}
}
allowed, _, err := ValidateStationPermissions(user.Roles, pStationName.Ext(), user.TenantName)
allowed, _, err := ValidateStationPermissions(user.Roles, pStationName.Ext(), user.TenantName, "write")
if err != nil {
serv.Errorf("[tenant: %v][user:%v]createProducerDirectCommon at ValidateStationPermissions: Station %v: %v", user.TenantName, user.Username, pStationName.Ext(), err.Error())
return false, false, err, models.Station{}
Expand Down
4 changes: 2 additions & 2 deletions server/memphis_handlers_rbac.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ const (
)

// the function returns a bool for is allowd to create and a bool for if a reload is needed
func ValidateStationPermissions(rolesId []int, stationName, tenantName string) (bool, bool, error) {
func ValidateStationPermissions(rolesId []int, stationName, tenantName, operation string) (bool, bool, error) {
// if the user dosent have a role len rolesId is 0 then he allowd to create
// TODO: add check if denied when we allow to deny
if len(rolesId) == 0 {
Expand All @@ -24,7 +24,7 @@ func ValidateStationPermissions(rolesId []int, stationName, tenantName string) (
}
return true, neededReload, nil
} else {
allowd, err := db.CheckUserStationPermissions(rolesId, stationName)
allowd, err := db.CheckUserStationPermissions(rolesId, stationName, operation)
if err != nil {
return false, false, err
}
Expand Down
6 changes: 3 additions & 3 deletions server/memphis_handlers_stations.go
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,7 @@ func (s *Server) createStationDirectIntern(c *client,
return
}

allowed, ReloadNeeded, err := ValidateStationPermissions(user.Roles, stationName.Ext(), csr.TenantName)
allowed, ReloadNeeded, err := ValidateStationPermissions(user.Roles, stationName.Ext(), csr.TenantName, "write")
if err != nil {
serv.Errorf("[tenant: %v][user:%v]createStationDirect at ValidateStationPermissions: Station %v: %v", csr.TenantName, csr.Username, csr.StationName, err.Error())
respondWithErr(s.MemphisGlobalAccountString(), s, reply, err)
Expand Down Expand Up @@ -939,7 +939,7 @@ func (sh StationsHandler) CreateStation(c *gin.Context) {
return
}

allowed, ReloadNeeded, err := ValidateStationPermissions(user.Roles, stationName.Ext(), user.TenantName)
allowed, ReloadNeeded, err := ValidateStationPermissions(user.Roles, stationName.Ext(), user.TenantName, "write")
if err != nil {
serv.Errorf("[tenant: %v][user: %v]CreateStation at ValidateStationPermissions: Station %v: %v", user.TenantName, user.Username, body.Name, err.Error())
c.AbortWithStatusJSON(500, gin.H{"message": "Server error"})
Expand Down Expand Up @@ -1484,7 +1484,7 @@ func (s *Server) removeStationDirectIntern(c *client,
return
}

allowed, ReloadNeeded, err := ValidateStationPermissions(user.Roles, stationName.Ext(), user.TenantName)
allowed, ReloadNeeded, err := ValidateStationPermissions(user.Roles, stationName.Ext(), user.TenantName, "write")
if err != nil {
serv.Errorf("[tenant: %v][user: %v]CreateStation at ValidateStationPermissions: Station %v: %v", user.TenantName, user.Username, stationName.Ext(), err.Error())
respondWithErr(s.MemphisGlobalAccountString(), s, reply, err)
Expand Down
24 changes: 12 additions & 12 deletions server/memphis_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -716,41 +716,43 @@ func (s *Server) CreateConsumer(tenantName string, consumer models.Consumer, sta
} else {
consumerName = consumer.Name
}

consumerName = getInternalConsumerName(consumerName)

var maxAckTimeMs int64
if consumer.MaxAckTimeMs <= 0 {
maxAckTimeMs = 30000 // 30 sec
} else {
maxAckTimeMs = consumer.MaxAckTimeMs
}

var MaxMsgDeliveries int
if consumer.MaxMsgDeliveries <= 0 || consumer.MaxMsgDeliveries > 10 {
MaxMsgDeliveries = 10
} else {
MaxMsgDeliveries = consumer.MaxMsgDeliveries
}

stationName, err := StationNameFromStr(station.Name)
if err != nil {
return err
}

if len(partitionsList) > len(station.PartitionsList) {
partitionsList = station.PartitionsList
}

var deliveryPolicy DeliverPolicy
var optStartSeq uint64
// This check for case when the last message is 0 (in case StartConsumeFromSequence > 1 the LastMessages is 0 )
if consumer.LastMessages == 0 && consumer.StartConsumeFromSeq == 0 {
if consumer.LastMessages == 0 && consumer.StartConsumeFromSeq == 1 {
deliveryPolicy = DeliverNew
} else if consumer.LastMessages > 0 {
streamInfo, err := serv.memphisStreamInfo(tenantName, stationName.Intern())
if err != nil {
return err
var streamInfo *StreamInfo
if len(partitionsList) == 1 {
streamInfo, err = serv.memphisStreamInfo(tenantName, stationName.Intern()+"$1.final")
if err != nil {
return err
}
} else {
streamInfo, err = serv.memphisStreamInfo(tenantName, stationName.Intern()+".final")
if err != nil {
return err
}
}
lastSeq := streamInfo.State.LastSeq
lastMessages := (lastSeq - uint64(consumer.LastMessages)) + 1
Expand Down Expand Up @@ -779,7 +781,6 @@ func (s *Server) CreateConsumer(tenantName string, consumer models.Consumer, sta
// RateLimit: ,// Bits per sec
// Heartbeat: // time.Duration,
}

if deliveryPolicy == DeliverByStartSequence {
consumerConfig.OptStartSeq = optStartSeq
}
Expand All @@ -800,7 +801,6 @@ func (s *Server) CreateConsumer(tenantName string, consumer models.Consumer, sta
// RateLimit: ,// Bits per sec
// Heartbeat: // time.Duration,
}

if deliveryPolicy == DeliverByStartSequence {
consumerConfig.OptStartSeq = optStartSeq
}
Expand Down
Loading

0 comments on commit 51a4daa

Please sign in to comment.