Skip to content

babex-group/babex-kafka

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

11 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

Babex-kafka

Kafka adapter for babex

Examples

Adapter has two mode of consuming. In the default mode messages of multiple topics and partitions are all passed to the single channel:

package main

import (
    "github.com/babex-group/babex-kafka"
)

func main() {
    adapter, err := kafka.NewAdapter(kafka.Options{
        Name:   "collector",
        Addrs:  cfg.Brokers,
        Topics: cfg.Topics,
    })
    if err != nil {
        return nil, err
    }

    signals := make(chan os.Signal, 1)
    signal.Notify(signals, os.Interrupt)

    s := babex.NewService(adapter)

    msgs, _ := s.GetMessages()

    defer s.Close()

    for {
        select {
        case msg, ok := <- msgs:
            if !ok {
                return
            }

            msg.Ack()
        case <-signals:
            return
        }
    }
}

Also you can use multi mode:

package main

import (
    "github.com/babex-group/babex-kafka"
)

func main() {
    adapter, err := kafka.NewAdapter(kafka.Options{
        Name:   "collector",
        Addrs:  cfg.Brokers,
        Topics: cfg.Topics,
        Mode:   kafka.ModeMulti
    })
    if err != nil {
        return nil, err
    }

    signals := make(chan os.Signal, 1)
    signal.Notify(signals, os.Interrupt)

    s := babex.NewService(adapter)

    defer s.Close()

    for {
        select {
        case ch, ok := <- s.GetChannels():
            if !ok {
                return
            }

            go func(ch *babex.Channel) {
                for msg := range ch.GetMessages() {
                    msg.Ack()
                }
            }(ch)
        case <- signals:
            return
        }
    }
}

About

Kafka adapter for babex

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Languages