Skip to content

Commit

Permalink
Add a utility function for SASL/SCRAM setup inside Sarama's Config. (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
rhstr authored Nov 10, 2022
1 parent d58f993 commit 47d77eb
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 0 deletions.
3 changes: 3 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ require (
github.com/Shopify/sarama v1.30.0
github.com/msales/pkg/v4 v4.4.0
github.com/stretchr/testify v1.7.0
github.com/xdg/scram v1.0.5
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1
)

Expand All @@ -29,7 +30,9 @@ require (
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 // indirect
github.com/stretchr/objx v0.1.1 // indirect
github.com/xdg/stringprep v1.0.3 // indirect
golang.org/x/crypto v0.0.0-20210920023735-84f357641f63 // indirect
golang.org/x/net v0.0.0-20210917221730-978cfadd31cf // indirect
golang.org/x/text v0.3.7 // indirect
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b // indirect
)
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,10 @@ github.com/urfave/cli/v2 v2.3.0/go.mod h1:LJmUH05zAU44vOAcrfzZQKsZbVcdbOG8rtL3/X
github.com/xdg-go/pbkdf2 v1.0.0/go.mod h1:jrpuAogTd400dnrH08LKmI/xc1MbPOebTwRqcT5RDeI=
github.com/xdg-go/scram v1.0.2/go.mod h1:1WAq6h33pAW+iRreB34OORO2Nf7qel3VV3fjBj+hCSs=
github.com/xdg-go/stringprep v1.0.2/go.mod h1:8F9zXuvzgwmyT5DUm4GUfZGDdT3W+LCvS6+da4O5kxM=
github.com/xdg/scram v1.0.5 h1:TuS0RFmt5Is5qm9Tm2SoD89OPqe4IRiFtyFY4iwWXsw=
github.com/xdg/scram v1.0.5/go.mod h1:lB8K/P019DLNhemzwFU4jHLhdvlE6uDZjXFejJXr49I=
github.com/xdg/stringprep v1.0.3 h1:cmL5Enob4W83ti/ZHuZLuKD/xqJfus4fVPwE+/BDm+4=
github.com/xdg/stringprep v1.0.3/go.mod h1:Jhud4/sHMO4oL310DaZAKk9ZaJ08SJfe+sJh0HrGL1Y=
github.com/yuin/gopher-lua v0.0.0-20190514113301-1cd887cd7036/go.mod h1:gqRgreBUhTSL0GeU64rtZ3Uq3wtjOa/TB2YfrtkCbVQ=
go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0=
Expand Down
40 changes: 40 additions & 0 deletions kafka/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"sync"
"time"

"github.com/xdg/scram"
"golang.org/x/xerrors"

"github.com/Shopify/sarama"
Expand Down Expand Up @@ -92,6 +93,45 @@ func (c *SourceConfig) ModifyConfig() {
}
}

// Compile-time check.
var _ sarama.SCRAMClient = (*SCRAMClient)(nil)

// SCRAMClient represents a SASL/SCRAM client.
type SCRAMClient struct {
*scram.Client
*scram.ClientConversation

hashGenerator scram.HashGeneratorFcn
}

// NewSCRAMClientGeneratorFn is a SCRAM client generator function for sarama.Config.
func NewSCRAMClientGeneratorFn(hashFn scram.HashGeneratorFcn) func() sarama.SCRAMClient {
return func() sarama.SCRAMClient {
return &SCRAMClient{hashGenerator: hashFn}
}
}

func (x *SCRAMClient) Begin(userName, password, authzID string) (err error) {
x.Client, err = x.hashGenerator.NewClient(userName, password, authzID)
if err != nil {
return err
}

x.ClientConversation = x.Client.NewConversation()

return nil
}

func (x *SCRAMClient) Step(challenge string) (response string, err error) {
response, err = x.ClientConversation.Step(challenge)

return
}

func (x *SCRAMClient) Done() bool {
return x.ClientConversation.Done()
}

// Metadata represents an the kafka topic metadata.
type Metadata []*PartitionOffset

Expand Down

0 comments on commit 47d77eb

Please sign in to comment.