Skip to content

💎 nodejs kafka connect connector for MySQL, Postgres, SQLite and MSSQL


Notifications You must be signed in to change notification settings


Folders and files

Last commit message
Last commit date

Latest commit



84 Commits

Repository files navigation


Node.js Kafka Connect connector for MySQL, Postgres, SQLite and MSSQL databases

Build Status

Coverage Status


npm install --save sequelize-kafka-connect

A note on native mode

If you are using the native mode (config: { noptions: {} }). You will have to manually install node-rdkafka alongside kafka-connect. (This requires a Node.js version between 9 and 12 and will not work with Node.js >= 13, last tested with 12.16.1)

On Mac OS High Sierra / Mojave: CPPFLAGS=-I/usr/local/opt/openssl/include LDFLAGS=-L/usr/local/opt/openssl/lib yarn add --frozen-lockfile node-rdkafka@2.7.4

Otherwise: yarn add --frozen-lockfile node-rdkafka@2.7.4

(Please also note: Doing this with npm does not work, it will remove your deps, npm i -g yarn)

database -> kafka

const { runSourceConnector } = require("sequelize-kafka-connect");
runSourceConnector(config, [], onError).then(config => {
    //runs forever until: config.stop();

kafka -> database

const { runSinkConnector } = require("sequelize-kafka-connect");
runSinkConnector(config, [], onError).then(config => {
    //runs forever until: config.stop();

kafka -> database (with custom topic (no source-task topic))

const { runSinkConnector, ConverterFactory } = require("sequelize-kafka-connect");

const tableSchema = {
    "id": {
        "type": "integer",
        "allowNull": false,
        "primaryKey": true
    "name": {
        "type": "varchar(255)",
        "allowNull": true

const etlFunc = (messageValue, callback) => {

    //type is an example json format field
    if (messageValue.type === "publish") {
        return callback(null, {

    if (messageValue.type === "unpublish") {
        return callback(null, null); //null value will cause deletion

    callback(new Error("unknown messageValue.type"));

const converter = ConverterFactory.createSinkSchemaConverter(tableSchema, etlFunc);

runSinkConnector(config, [converter], onError).then(config => {
    //runs forever until: config.stop();

    this example would be able to store kafka message values
    that look like this (so completely unrelated to messages created by a default SourceTask)
        payload: {
            id: 123,
            name: "bla"
        type: "publish"


note: in BETA 🌱

npm install -g sequelize-kafka-connect
# run source etl: database -> kafka
nkc-sequelize-source --help
# run sink etl: kafka -> database
nkc-sequelize-sink --help


const config = {
    kafka: {
        //zkConStr: "localhost:2181/",
        kafkaHost: "localhost:9092",
        logger: null,
        groupId: "kc-sequelize-test",
        clientName: "kc-sequelize-test-name",
        workerPerPartition: 1,
        options: {
            sessionTimeout: 8000,
            protocol: ["roundrobin"],
            fromOffset: "earliest", //latest
            fetchMaxBytes: 1024 * 100,
            fetchMinBytes: 1,
            fetchMaxWaitMs: 10,
            heartbeatInterval: 250,
            retryMinTimeout: 250,
            requireAcks: 1,
            //ackTimeoutMs: 100,
            //partitionerType: 3
    topic: "sc_test_topic",
    partitions: 1,
    maxTasks: 1,
    pollInterval: 2000,
    produceKeyed: true,
    produceCompressionType: 0,
    connector: {
        options: {
            host: "localhost",
            port: 5432,
            dialect: "sqlite",
            pool: {
                max: 5,
                min: 0,
                idle: 10000
            storage: path.join(__dirname, "test-db.sqlite")
        database: null,
        user: null,
        password: null,
        maxPollCount: 50,
        table: "accounts",
        incrementingColumnName: "id"
    http: {
        port: 3149,
        middlewares: []
    enableMetrics: true,
    batch: {
        batchSize: 100, 
        commitEveryNBatch: 1, 
        concurrency: 1,
        commitSync: true

Native Clients Config(uration)

const config = {
    kafka: {
        noptions: {
            "": "localhost:9092",
            "": "n-test-group",
            "": false,
            "debug": "all",
            "event_cb": true,
            "": "kcs-test"
        tconf: {
            "auto.offset.reset": "earliest",
            "request.required.acks": 1
    topic: "sc_test_topic",
    partitions: 1,
    maxTasks: 1,
    pollInterval: 2000,
    produceKeyed: true,
    produceCompressionType: 0,
    connector: {
        options: {
            host: "localhost",
            port: 5432,
            dialect: "sqlite",
            pool: {
                max: 5,
                min: 0,
                idle: 10000
            storage: path.join(__dirname, "test-db.sqlite")
        database: null,
        user: null,
        password: null,
        maxPollCount: 50,
        table: "accounts",
        incrementingColumnName: "id"
    http: {
        port: 3149,
        middlewares: []
    enableMetrics: true


No releases published


No packages published