Skip to content

Commit

Permalink
Allow SCRAM nonce as prefix for older buggy versions of librdkafka (#326
Browse files Browse the repository at this point in the history
)
  • Loading branch information
purplefox authored Dec 3, 2024
1 parent c4f8eab commit d902ed3
Show file tree
Hide file tree
Showing 12 changed files with 199 additions and 30 deletions.
3 changes: 2 additions & 1 deletion agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,8 @@ func NewAgentWithFactories(cfg Conf, objStore objstore.Client, connectionFactory
}
agent.compactionWorkersService = lsm.NewCompactionWorkerService(cfg.CompactionWorkersConf, objStore,
clFactory, true)
scramManager, err := auth.NewScramManager(auth.ScramAuthTypeSHA512, agent.controlClientCache, getter.get)
scramManager, err := auth.NewScramManager(auth.ScramAuthTypeSHA512, agent.controlClientCache, getter.get,
cfg.AllowScramNonceAsPrefix)
if err != nil {
return nil, err
}
Expand Down
52 changes: 51 additions & 1 deletion agent/auth_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,15 @@ func TestKafkaAuthSaslPlain(t *testing.T) {
require.Error(t, err)
}

func TestKafkaAuthSaslScram(t *testing.T) {
func TestKafkaAuthSaslScramDontAllowNonceAsPrefix(t *testing.T) {
testKafkaAuthSaslScram(t, false)
}

func TestKafkaAuthSaslScramAllowNonceAsPrefix(t *testing.T) {
testKafkaAuthSaslScram(t, true)
}

func testKafkaAuthSaslScram(t *testing.T, allowNonceAsPrefix bool) {

cfg := NewConf()
cfg.AuthType = kafkaserver2.AuthenticationTypeSaslScram512
Expand All @@ -139,6 +147,10 @@ func TestKafkaAuthSaslScram(t *testing.T) {
ServerPrivateKeyFile: serverKeyPath,
ServerCertFile: serverCertPath,
}
if allowNonceAsPrefix {
cfg.AddJunkOnScramNonce = true
cfg.AllowScramNonceAsPrefix = true
}
agents, tearDown := setupAgents(t, cfg, 1, func(i int) string {
return "az1"
})
Expand Down Expand Up @@ -195,6 +207,44 @@ func TestKafkaAuthSaslScram(t *testing.T) {
require.Error(t, err)
}

func TestKafkaAuthSaslScramFailIfNonceAsPrefixNotAllowe3d(t *testing.T) {
cfg := NewConf()
cfg.AuthType = kafkaserver2.AuthenticationTypeSaslScram512
cfg.KafkaListenerConfig.TLSConfig = conf.TlsConf{
Enabled: true,
ServerPrivateKeyFile: serverKeyPath,
ServerCertFile: serverCertPath,
}
cfg.AddJunkOnScramNonce = true
cfg.AllowScramNonceAsPrefix = false

agents, tearDown := setupAgents(t, cfg, 1, func(i int) string {
return "az1"
})
defer tearDown(t)
agent := agents[0]

clientTLSConfig := conf.ClientTlsConf{
Enabled: true,
ServerCertFile: serverCertPath,
}

username1 := "some-user1"
password1 := "some-password1"

scramType := auth.AuthenticationSaslScramSha512

putUserCred(t, agent, username1, password1, scramType)

mechProvider := func(t *testing.T, username string, password string) sasl.Mechanism {
mechanism, err := scram.Mechanism(scram.SHA512, username, password)
require.NoError(t, err)
return mechanism
}

tryConnect(t, username1, password1, false, agent, clientTLSConfig, mechProvider)
}

type saslMechanismProvider func(t *testing.T, username string, password string) sasl.Mechanism

func tryConnect(t *testing.T, username string, password string, shouldSucceeed bool, agent *Agent,
Expand Down
27 changes: 18 additions & 9 deletions agent/conf.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,17 +34,17 @@ type CommandConf struct {
MembershipUpdateIntervalMs int `help:"interval between updating cluster membership in ms" default:"5000"`
MembershipEvictionIntervalMs int `help:"interval after which member will be evicted from the cluster" default:"20000"`
ConsumerGroupInitialJoinDelayMs int `name:"consumer-group-initial-join-delay-ms" help:"initial delay to wait for more consumers to join a new consumer group before performing the first rebalance, in ms" default:"3000"`
AuthenticationType string `help:"type of authentication. one of sasl/plain, sasl/scram-sha-512, mtls, none" default:"none"`

AuthenticationType string `help:"type of authentication. one of sasl/plain, sasl/scram-sha-512, mtls, none" default:"none"`
AllowScramNonceAsPrefix bool

TopicName string `name:"topic-name" help:"name of the topic"`
}

var authTypeMapping = map[string]kafkaserver.AuthenticationType{
"none": kafkaserver.AuthenticationTypeNone,
"sasl/plain": kafkaserver.AuthenticationTypeSaslPlain,
"none": kafkaserver.AuthenticationTypeNone,
"sasl/plain": kafkaserver.AuthenticationTypeSaslPlain,
"sasl/scram-sha-512": kafkaserver.AuthenticationTypeSaslScram512,
"mtls": kafkaserver.AuthenticationTypeMTls,
"mtls": kafkaserver.AuthenticationTypeMTls,
}

const (
Expand Down Expand Up @@ -135,6 +135,13 @@ func CreateConfFromCommandConf(commandConf CommandConf) (Conf, error) {
return Conf{}, errors.Errorf("invalid authentication-type: %s", commandConf.AuthenticationType)
}
cfg.AuthType = authType
cfg.AllowScramNonceAsPrefix = commandConf.AllowScramNonceAsPrefix
if cfg.AllowScramNonceAsPrefix {
log.Warnf("allow-scram-nonce-as-prefix is set to true to allow SCRAM handshakes to pass with older" +
" versions of librdkafka which have a bug where the nonce sent in the second SCRAM handshake request is a" +
" prefix of the required nonce. It is recommended to upgrade clients to later versions of librdkafka where possible" +
" and not to enable this setting.")
}
return cfg, nil
}

Expand Down Expand Up @@ -197,6 +204,8 @@ type Conf struct {
MaxControllerClients int
MaxConnectionsPerAddress int
AuthType kafkaserver.AuthenticationType
AllowScramNonceAsPrefix bool
AddJunkOnScramNonce bool
}

func NewConf() Conf {
Expand All @@ -210,7 +219,7 @@ func NewConf() Conf {
GroupCoordinatorConf: group.NewConf(),
MaxControllerClients: DefaultMaxControllerClients,
MaxConnectionsPerAddress: DefaultMaxConnectionsPerAddress,
AuthType: kafkaserver.AuthenticationTypeNone,
AuthType: kafkaserver.AuthenticationTypeNone,
}
}

Expand Down Expand Up @@ -251,9 +260,9 @@ func (c *Conf) Validate() error {
}

type ListenerConfig struct {
Address string
AdvertisedAddress string
TLSConfig conf.TlsConf
Address string
AdvertisedAddress string
TLSConfig conf.TlsConf
}

func (l *ListenerConfig) Validate() error {
Expand Down
29 changes: 28 additions & 1 deletion agent/kafka_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"github.com/spirit-labs/tektite/common"
"github.com/spirit-labs/tektite/kafkaprotocol"
"github.com/spirit-labs/tektite/kafkaserver2"
log "github.com/spirit-labs/tektite/logger"
"strings"
)

func (a *Agent) newKafkaHandler(ctx kafkaserver2.ConnectionContext) kafkaprotocol.RequestHandler {
Expand Down Expand Up @@ -134,7 +136,13 @@ func (k *kafkaHandler) HandleSaslAuthenticateRequest(_ *kafkaprotocol.RequestHea
msg := "SaslAuthenticateRequest without a preceding SaslAuthenticateRequest"
resp.ErrorMessage = &msg
} else {
saslRespBytes, complete, failed := conv.Process(req.AuthBytes)
reqBytes := req.AuthBytes
sc, isSCram := conv.(*auth.ScramConversation)
if isSCram && k.agent.cfg.AddJunkOnScramNonce && sc.Step() == 1 {
log.Warnf("Testing: Adding Junk to SCRAM nonce")
reqBytes = addJunkToScramNonce(reqBytes)
}
saslRespBytes, complete, failed := conv.Process(reqBytes)
if failed {
resp.ErrorCode = kafkaprotocol.ErrorCodeSaslAuthenticationFailed
} else {
Expand All @@ -149,6 +157,25 @@ func (k *kafkaHandler) HandleSaslAuthenticateRequest(_ *kafkaprotocol.RequestHea
return completionFunc(&resp)
}

func addJunkToScramNonce(reqBytes []byte) []byte {
// Used in testing only. We add some junk to the nonce
sRequest := string(reqBytes)
fields := strings.Split(sRequest, ",")
var newRequest strings.Builder
for i, field := range fields {
if i == 1 {
nonce := strings.TrimPrefix(field, "r=")
newRequest.WriteString("r=" + nonce + "-some-junk")
} else {
newRequest.WriteString(field)
}
if i != len(fields)-1 {
newRequest.WriteRune(',')
}
}
return []byte(newRequest.String())
}

func (k *kafkaHandler) HandleSaslHandshakeRequest(_ *kafkaprotocol.RequestHeader,
req *kafkaprotocol.SaslHandshakeRequest,
completionFunc func(resp *kafkaprotocol.SaslHandshakeResponse) error) error {
Expand Down
54 changes: 53 additions & 1 deletion auth2/scram.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/spirit-labs/tektite/sst"
"github.com/xdg-go/pbkdf2"
"github.com/xdg-go/scram"
"strings"
"sync"
)

Expand All @@ -24,7 +25,8 @@ const (
NumIters = 4096
)

func NewScramManager(authType ScramAuthType, controlClientCache *control.ClientCache, tableGetter sst.TableGetter) (*ScramManager, error) {
func NewScramManager(authType ScramAuthType, controlClientCache *control.ClientCache, tableGetter sst.TableGetter,
allowNonceAsPrefix bool) (*ScramManager, error) {
partHash, err := parthash.CreateHash([]byte("user.creds"))
if err != nil {
return nil, err
Expand All @@ -42,6 +44,7 @@ func NewScramManager(authType ScramAuthType, controlClientCache *control.ClientC
tableGetter: tableGetter,
partHash: partHash,
credsSequenceLocal: common.NewGRLocal(),
allowNonceAsPrefix: allowNonceAsPrefix,
}
scramServer, err := hashGenFunc.NewServer(sm.lookupCredential)
if err != nil {
Expand All @@ -59,6 +62,7 @@ type ScramManager struct {
tableGetter sst.TableGetter
hashGenFunc scram.HashGeneratorFcn
credsSequenceLocal common.GRLocal
allowNonceAsPrefix bool
}

// AuthenticateWithUserPwd is used e.g. with SASL/PLAIN, when we need to auth on the server with a username and
Expand Down Expand Up @@ -187,6 +191,11 @@ type ScramConversation struct {
lock sync.Mutex
credsSequence int
step int
returnedNonce string
}

func (s *ScramConversation) Step() int {
return s.step
}

func (s *ScramConversation) Principal() string {
Expand All @@ -204,13 +213,18 @@ func (s *ScramConversation) CredentialsSequence() int {
func (s *ScramConversation) Process(request []byte) (resp []byte, complete bool, failed bool) {
s.lock.Lock()
defer s.lock.Unlock()
request = s.maybeTrimNonce(request)
r, err := s.conv.Step(string(request))
if err != nil {
// Log auth failures at info
log.Infof("Kafka API SASL SCRAM authentication failure: %v", err)
return nil, false, true
}
if s.step == 0 {
if s.mgr.allowNonceAsPrefix {
// extract the nonce
s.returnedNonce = extractNonce(r, 0)
}
// GRLocal for sequence is set in the credentials lookup which occurs in the first step
// The credentials sequence number should have been set using a GR local
credsSequence, ok := s.mgr.credsSequenceLocal.Get()
Expand All @@ -228,6 +242,44 @@ func (s *ScramConversation) Process(request []byte) (resp []byte, complete bool,
return []byte(r), s.conv.Valid(), false
}

func extractNonce(resp string, index int) string {
fields := strings.Split(resp, ",")
nonceField := fields[index]
if !strings.HasPrefix(nonceField, "r=") {
panic("invalid scram field")
}
return strings.TrimPrefix(nonceField, "r=")
}

func (s *ScramConversation) maybeTrimNonce(request []byte) []byte {
if s.mgr.allowNonceAsPrefix && s.step == 1 {
nonce := extractNonce(string(request), 1)
if strings.HasPrefix(nonce, s.returnedNonce) {
// The nonce provided by the client is a prefix of the actual nonce returned by the server in step 1
// According to the SCRAM RFC it should be exactly equal, but earlier versions of librdkafka had
// a bug whereby the nonce sent back in step 2 had the server nonce as a prefix burt was not exactly equal.
// To allow compatibility with older librdkafka versions we don't fail the handshake if the supplied nonce
// has a prefix. By default we don't allow this but it can be configured on the agent if necessary.
sRequest := string(request)
fields := strings.Split(sRequest, ",")
var newRequest strings.Builder
for i, field := range fields {
if i == 1 {
// substitute the exact nonce so handshake will pass
newRequest.WriteString("r=" + s.returnedNonce)
} else {
newRequest.WriteString(field)
}
if i != len(fields)-1 {
newRequest.WriteRune(',')
}
}
request = []byte(newRequest.String())
}
}
return request
}

func AlgoForAuthType(authType string) scram.HashGeneratorFcn {
var algo scram.HashGeneratorFcn
if authType == AuthenticationSaslScramSha256 {
Expand Down
2 changes: 1 addition & 1 deletion auth2/scram_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
)

// TestScram - this just runs a client/server conv using xdg-go as a sanity check, it does not test our end-end scram
// that is done in the integration tests
// that is done in other tests
func TestScram(t *testing.T) {
username := "some_user"
password := "some_password"
Expand Down
3 changes: 2 additions & 1 deletion cl_test/produce_to_az_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,8 @@ func testProduceWithAz(t *testing.T, clientAZ string, numAgents int, azSetter fu
totBatches += int(batchCount)
}
}
require.Equal(t, numMessages, totBatches)
// Likely to be more but possible they could be combined into a single batch due to timing
require.GreaterOrEqual(t, totBatches, 1)
}

func startAgents(t *testing.T, numAgents int, azPicker func(int) string) ([]*agent.Agent, func(t *testing.T)) {
Expand Down
Loading

0 comments on commit d902ed3

Please sign in to comment.