文章目录
  1. 1. nodejs 连接kafka踩坑记
    1. 1.1. 背景
    2. 1.2. 收集容器日志输出到kafka
    3. 1.3. 写代码方案-选组件
      1. 1.3.1. kafkajs
      2. 1.3.2. kafka-node
      3. 1.3.3. node-rdkafka

nodejs 连接kafka踩坑记

背景

由于所在的架构团队中很多后台应用使用的nodejs写的代码,现在要把nodejs的日志全部记入到公司的kafka中,运维团队只分配给我无一个topic,然后让我们通过结构逻辑去区分。

需求:

  1. 因我的应用全是容器部署,故希望通过不改代码,将容器控制台日志直接输出到kafka
  2. 如果不改代码不可行,则通过改代码的方式解决

kafka-server:

版本号:3.2.0

加密协议:SASL_PLAINTEXT

加密算法:SCRAM-SHA-256

收集容器日志输出到kafka

使用阿里开源的log-ploit来收集容器日志,然后输出到kafka

docker-compose:

version: '3.2'
services:
  zookeeper:
    image: wurstmeister/zookeeper
    ports:
      - "2181:2181"
  pilot:
    image: hz-log-pilot:1.0.3
    volumes:
      - /var/run/docker.sock:/var/run/docker.sock
      - /:/host
    privileged: true
    environment:
      - LOGGING_OUTPUT=kafka
      - KAFKA_BROKERS=test-kafka-idc-1.xxx.com:9092
      - KAFKA_VERSION=0.11.0
      - KAFKA_USERNAME=admin
      - KAFKA_PASSWORD=xxx
      - KAFKA_MECHANISM=SCRAM-SHA-256
      - KAFKA_SSL=true
      - KAFKA_PROTOCOL=PLAIN
    labels:
      aliyun.global: true

  hello-node:
    image: hello-node:1.0
    ports:

   - "9003:8888"
     vironment:
        - aliyun_logs_topic-devops=stdout
          aliyun_logs_topic-devops_format=json
             - aliyun_logs_topic-devops_tags=appId=backend,appName=test-app

经过测试发现这个在我厂的kafka服务端的配置中,是无法将日志发送过去的,但在我自己搭建的kafka服务器上可以正常输出到kafka。

后面经过一通查找,发现ploit这个组件底层使用的filebeat是不支持SCRAM-SHA-256的加密的,气死。。。。。

写代码方案-选组件

必应了一下,关键字: nodejs kafka

发现很多人都推荐使用kafkajs来对接nodejs。

https://kafka.js.org/docs/getting-started

https://github.com/tulios/kafkajs

然后写了个test:

kafkajs

kafka.test-spec.ts:

const { Kafka } = require('kafkajs')

const kafka = new Kafka({
    // clientId: 'devops',
    logLevel: 5,
    brokers: ['10.105.141.164:49171'],
    authenticationTimeout: 1000,
    reauthenticationThreshold: 10000,
    sasl: {
        mechanism: "SCRAM-SHA-256",
        username: "admin",
        password: "xxxx"
    }
});

const topic = 'test1'
const producer = kafka.producer()


const sendMessage = async () => {
    await producer.connect()
    return producer
        .send({
            topic,
            compression: Kafka.CompressionTypes.None,
            messages: [
                {topic:"test",messages:JSON.stringify({"name":"jack","age":"120"}),partition:0}
            ],
        })
        .then(console.log)
        .catch(e => console.error(`[example/producer] ${e.message}`, e))
}



describe('kafka tests', function() {
    describe('send Tests', function() {
        this.timeout(15000);
        it('should connection success', async function (done) {
                await sendMessage();
            done();

        },4000);

    });
});

webstorm(集成nodejs单元测试)运行一下,发现无法连接:

image-20210811175832351

原因是,因为kafka-server端使用的加密协议是:SASL_PLAINTEXT,但kafkajs只支持plain协议,所有无法连接。

再加上kafka-server端使用的版本是:2.3.0,虽然kafkajs说支持kafka的版本是0.11+,有可能2.x还不支持。

后面又看到说kafka-node也挻火,所以又写了个test。

kafka-node

https://www.npmjs.com/package/kafka-node/v/5.0.0#producer

Kafka-node is a Node.js client for Apache Kafka 0.9 and later.

https://github.com/SOHU-Co/kafka-node

是soho的库,说是支持0.9之后的版本,上代码:

var assert = require('assert');

var expect = require('chai').expect;

var kafka = require('kafka-node'),
    Producer = kafka.Producer,
    KeyedMessage = kafka.KeyedMessage,
    client = new kafka.KafkaClient({kafkaHost: 'test-kafka-idc-1.xxx.com:9092', connectTimeout: 3000, sasl:{mechanism: 'scram-sha-256', username: 'admin', password: 'xxx'} }),
    producer = new Producer(client),
    km = new KeyedMessage('key', 'message');



var producerOption = {
    requireAcks: 1,
    ackTimeoutMs: 100,
    partitionerType: 0 //默认为第一个分区
};
 producer = new Producer(client,producerOption);


function getPayloads(){
    return [
        {topic:"test",messages:JSON.stringify({"name":"jack","age":"120"}),partition:0}
    ];
}

describe('kafka tests', function() {
    describe('send Tests', function() {
        this.timeout(15000);
        it('should connection success',  function (done) {
            producer.on('ready', async function () {
                producer.send(getPayloads(), function (err, data) {
                    if (err) {
                        console.log('[kafka-producer -> test1]: broker update failed');
                    } else {
                        console.log('[kafka-producer -> test1]: broker update success');
                    }
                    done();
                });
            });
            producer.on('error', function (err) {
                console.log('error:'+err.toString());
                done();
            })

        },6000);

    });
});


// 函数实现,参数单位 秒 ;
function wait(second) {
    // execSync 属于同步方法;异步方式请根据需要自行查询 node.js 的 child_process 相关方法;
    let ChildProcess_ExecSync = require('child_process').execSync;
    ChildProcess_ExecSync('sleep ' + second);
};

还是无法连接,报连接超时:

image-20210811183231115

在他github上发现,已经两年没更新了,看来是不维护了,估计还是不支持那个协议,或者是客户端版本不匹配问题。

还是上kafka官网看看吧,在官网上看到kafka client端支持:

https://cwiki.apache.org/confluence/display/KAFKA/Clients

发现了nodejs的支持的库,官方推荐排在第一的是:node-rdkafka

image-20210811183700339

虽然kafka-node排第二,但是貌似不支持我厂kafka服务端的协议配置。

node-rdkafka

https://github.com/Blizzard/node-rdkafka

同样写个test, rd-kafka.test-spec.ts:

var assert = require('assert');

var expect = require('chai').expect;
const Kafka = require('node-rdkafka')
const ERR_TOPIC_ALREADY_EXISTS = 36;
const config = {
    'bootstrap.servers': 'test-kafka-idc-1.xxx.com:9092',
    'sasl.username': 'admin',
    'sasl.password': 'xxx',
    'security.protocol': 'SASL_PLAINTEXT',
    'sasl.mechanisms': 'SCRAM-SHA-256'
};

const topic = 'topic-devops';

function getPayloads(){
    return [
        {topic:topic,messages:JSON.stringify({"name":"jack","age":"120"}),partition:0}
    ];
}

describe('kafka tests', function() {
    describe('send Tests', function() {
        this.timeout(15000);
        it('should connection success', async function (done) {
            await produceExample();
        },6000);

    });
});

function ensureTopicExists() {
    const adminClient = Kafka.AdminClient.create(config);

    return new Promise((resolve, reject) => {
        adminClient.createTopic({
            topic: topic,
            num_partitions: 1,
            replication_factor: 3
        }, (err) => {
            if (!err) {
                console.log(`Created topic ${config.topic}`);
                return resolve();
            }

            if (err.code === ERR_TOPIC_ALREADY_EXISTS) {
                return resolve();
            }

            return reject(err);
        });
    });
}

async function produceExample() {


    // await ensureTopicExists();

    const producer = await createProducer((err, report) => {
        if (err) {
            console.warn('Error producing', err)
        } else {
            const {topic, partition, value} = report;
            console.log(`Successfully produced record to topic "${topic}" partition ${partition} ${value}`);
        }
    });

    for (let idx = 0; idx < 10; ++idx) {
        const key = 'alice';
        const value = Buffer.from(JSON.stringify({ name:key+idx,count: idx }));

        console.log(`Producing record ${key}\t${value}`);

        producer.produce(topic, -1, value, key);
    }

    producer.flush(10000, () => {
        producer.disconnect();
    });
}

produceExample()
    .catch((err) => {
        console.error(`Something went wrong:\n${err}`);
        process.exit(1);
    });


function createProducer(onDeliveryReport) {
    let proConfig = config;
    proConfig['dr_msg_cb']=true;
    const producer = new Kafka.Producer(proConfig);

    return new Promise((resolve, reject) => {
        producer
            .on('ready', () => resolve(producer))
            .on('delivery-report', onDeliveryReport)
            .on('event.error', (err) => {
                console.warn('event.error', err);
                reject(err);
            });
        producer.connect();
    });
}

webstorm执行一下:

image-20210811184052421

发现已经发送成功,到后台查下,发现已经发过去了,太棒了。。。。。。

这个问题之前已经困扰了我差不多一周时间了,特此记录下来,方便后面的同学参考。。。。。。

https://docs.confluent.io/platform/current/tutorials/examples/clients/docs/nodejs.html

https://blizzard.github.io/node-rdkafka/current/

文章目录
  1. 1. nodejs 连接kafka踩坑记
    1. 1.1. 背景
    2. 1.2. 收集容器日志输出到kafka
    3. 1.3. 写代码方案-选组件
      1. 1.3.1. kafkajs
      2. 1.3.2. kafka-node
      3. 1.3.3. node-rdkafka