Skip to content

Commit

Permalink
topic auto-create (#344)
Browse files Browse the repository at this point in the history
  • Loading branch information
purplefox authored Dec 26, 2024
1 parent 47b0acc commit a34a4c1
Show file tree
Hide file tree
Showing 6 changed files with 90 additions and 16 deletions.
57 changes: 44 additions & 13 deletions agent/cluster_meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,25 +172,56 @@ func (a *Agent) handleMetadataRequest(authContext *auth.Context, hdr *kafkaproto
return err
}
if !exists {
resp.Topics[i].ErrorCode = kafkaprotocol.ErrorCodeUnknownTopicOrPartition
} else {
authorised := true
if authContext != nil {
authorised, err = authContext.Authorize(acls.ResourceTypeTopic, topicName, acls.OperationDescribe)
if err != nil {
return err
if req.AllowAutoTopicCreation && a.cfg.EnableTopicAutoCreate && hdr.RequestApiVersion >= 4 {
// auto create topic
if authContext != nil {
authorised, err := authContext.Authorize(acls.ResourceTypeTopic, topicName, acls.OperationCreate)
if err != nil {
return err
}
if !authorised {
resp.Topics[i].ErrorCode = kafkaprotocol.ErrorCodeTopicAuthorizationFailed
continue
}
}
if !authorised {
resp.Topics[i].ErrorCode = kafkaprotocol.ErrorCodeTopicAuthorizationFailed
if err := client.CreateOrUpdateTopic(topicmeta.TopicInfo{
Name: topicName,
PartitionCount: a.cfg.DefaultPartitionCount,
RetentionTime: a.cfg.DefaultTopicRetentionTime,
UseServerTimestamp: a.cfg.DefaultUseServerTimestamp,
}, true); err != nil {
return err
}
}
if authorised {
top, err := a.populateTopicMetadata(&topicInfo, agents)
topicInfo, _, exists, err = client.GetTopicInfo(topicName)
if err != nil {
return err
}
resp.Topics[i] = *top
if !exists {
log.Warnf("topic does not exist after auto creation!")
resp.Topics[i].ErrorCode = kafkaprotocol.ErrorCodeUnknownTopicOrPartition
continue
}
} else {
resp.Topics[i].ErrorCode = kafkaprotocol.ErrorCodeUnknownTopicOrPartition
continue
}
}
authorised := true
if authContext != nil {
authorised, err = authContext.Authorize(acls.ResourceTypeTopic, topicName, acls.OperationDescribe)
if err != nil {
return err
}
if !authorised {
resp.Topics[i].ErrorCode = kafkaprotocol.ErrorCodeTopicAuthorizationFailed
}
}
if authorised {
top, err := a.populateTopicMetadata(&topicInfo, agents)
if err != nil {
return err
}
resp.Topics[i] = *top
}
}
}
Expand Down
8 changes: 8 additions & 0 deletions agent/conf.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ type CommandConf struct {
AllowScramNonceAsPrefix bool
UserAuthCacheTimeout time.Duration `help:"maximum time for which a user authorisation is cached" default:"5m"`
UseServerTimestampForRecords bool `help:"whether to use server timestamp for incoming produced records. if 'false' then producer timestamp is preserved" default:"false"`
EnableTopicAutoCreate bool `help:"if 'true' then enables topic auto-creation for topics that do not already exist"`
AutoCreateNumPartitions int `help:"the number of partitions for auto-created topics" default:"1"`
}

var authTypeMapping = map[string]kafkaserver.AuthenticationType{
Expand Down Expand Up @@ -144,6 +146,8 @@ func CreateConfFromCommandConf(commandConf CommandConf) (Conf, error) {
}
cfg.UserAuthCacheTimeout = commandConf.UserAuthCacheTimeout
cfg.DefaultUseServerTimestamp = commandConf.UseServerTimestampForRecords
cfg.EnableTopicAutoCreate = commandConf.EnableTopicAutoCreate
cfg.DefaultPartitionCount = commandConf.AutoCreateNumPartitions
return cfg, nil
}

Expand Down Expand Up @@ -212,6 +216,8 @@ type Conf struct {
DefaultUseServerTimestamp bool
ClusterName string
UserAuthCacheTimeout time.Duration
EnableTopicAutoCreate bool
DefaultPartitionCount int
}

func NewConf() Conf {
Expand All @@ -229,6 +235,7 @@ func NewConf() Conf {
AuthType: kafkaserver.AuthenticationTypeNone,
DefaultTopicRetentionTime: DefaultDefaultTopicRetentionTime,
UserAuthCacheTimeout: DefaultUserAuthCacheTimeout,
DefaultPartitionCount: DefaultDefaultPartitionCount,
}
}

Expand All @@ -238,6 +245,7 @@ const (
DefaultMaxControllerClients = 10
DefaultMaxConnectionsPerAddress = 10
DefaultUserAuthCacheTimeout = 5 * time.Minute
DefaultDefaultPartitionCount = 1
)

func (c *Conf) Validate() error {
Expand Down
35 changes: 34 additions & 1 deletion agent/metadata_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,35 @@ func TestGetAzFromClientID(t *testing.T) {
require.Equal(t, "az-2", getAZFromClientID("ws_az=az-2"))
}

func TestMetadataAutoCreateTopic(t *testing.T) {
cfg := NewConf()
cfg.EnableTopicAutoCreate = true
cfg.DefaultPartitionCount = 23
numAgents := 5
agents, tearDown := setupAgents(t, cfg, numAgents, func(i int) string {
return "az1"
})
defer tearDown(t)
req := &kafkaprotocol.MetadataRequest{
AllowAutoTopicCreation: true,
Topics: []kafkaprotocol.MetadataRequestMetadataRequestTopic{
{
Name: common.StrPtr("unknown"),
},
},
}
req.Topics = []kafkaprotocol.MetadataRequestMetadataRequestTopic{
{
Name: common.StrPtr("create-me-topic"),
},
}
resp := sendMetadataRequestWithVersion(t, agents[0], req, "", 4)
verifyBrokers(t, agents, resp)
require.Equal(t, 1, len(resp.Topics))
require.Equal(t, 23, len(resp.Topics[0].Partitions))
require.Equal(t, kafkaprotocol.ErrorCodeNone, int(resp.Topics[0].ErrorCode))
}

func setupAgents(t *testing.T, cfg Conf, numAgents int, azPicker func(int) string) ([]*Agent, func(t *testing.T)) {
objStore := dev.NewInMemStore(0)
inMemMemberships := NewInMemClusterMemberships()
Expand Down Expand Up @@ -267,6 +296,10 @@ func setupNumTopics(t *testing.T, numTopics int, agent *Agent) {
}

func sendMetadataRequest(t *testing.T, agent *Agent, req *kafkaprotocol.MetadataRequest, clientID string) *kafkaprotocol.MetadataResponse {
return sendMetadataRequestWithVersion(t, agent, req, clientID, 1)
}

func sendMetadataRequestWithVersion(t *testing.T, agent *Agent, req *kafkaprotocol.MetadataRequest, clientID string, apiVersion int16) *kafkaprotocol.MetadataResponse {
cl, err := apiclient.NewKafkaApiClientWithClientID(clientID)
require.NoError(t, err)
conn, err := cl.NewConnection(agent.Conf().KafkaListenerConfig.Address)
Expand All @@ -276,7 +309,7 @@ func sendMetadataRequest(t *testing.T, agent *Agent, req *kafkaprotocol.Metadata
require.NoError(t, err)
}()
resp := &kafkaprotocol.MetadataResponse{}
r, err := conn.SendRequest(req, kafkaprotocol.APIKeyMetadata, 1, resp)
r, err := conn.SendRequest(req, kafkaprotocol.APIKeyMetadata, apiVersion, resp)
require.NoError(t, err)
return r.(*kafkaprotocol.MetadataResponse)
}
Expand Down
2 changes: 2 additions & 0 deletions integ/command_output_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ Flags:
--user-auth-cache-timeout=5m maximum time for which a user authorisation is cached
--use-server-timestamp-for-records whether to use server timestamp for incoming produced records. if 'false' then producer timestamp
is preserved
--enable-topic-auto-create if 'true' then enables topic auto-creation for topics that do not already exist
--auto-create-num-partitions=1 the number of partitions for auto-created topics
--log-format="console" format to write log lines in - one of: console, json
--log-level="info" lowest log level that will be emitted - one of: debug, info, warn, error`

Expand Down
2 changes: 1 addition & 1 deletion kafkaprotocol/metadata_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -353,5 +353,5 @@ func (m *MetadataRequest) HeaderVersions(version int16) (int16, int16) {
}

func (m *MetadataRequest) SupportedApiVersions() (int16, int16) {
return 1, 1
return 1, 4
}
2 changes: 1 addition & 1 deletion kafkaprotocol/protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ var SupportedAPIVersions = []ApiVersionsResponseApiVersion{
{ApiKey: APIKeyProduce, MinVersion: 3, MaxVersion: 3},
{ApiKey: APIKeyFetch, MinVersion: 2, MaxVersion: 4},
{ApiKey: APIKeyAPIVersions, MinVersion: 0, MaxVersion: 4},
{ApiKey: APIKeyMetadata, MinVersion: 1, MaxVersion: 1},
{ApiKey: APIKeyMetadata, MinVersion: 1, MaxVersion: 4},
{ApiKey: APIKeyFindCoordinator, MinVersion: 0, MaxVersion: 1},
{ApiKey: ApiKeyJoinGroup, MinVersion: 0, MaxVersion: 0},
{ApiKey: ApiKeySyncGroup, MinVersion: 0, MaxVersion: 0},
Expand Down

0 comments on commit a34a4c1

Please sign in to comment.