https://www.npmjs.com/package/kafka-node
kafka-node-hello-world
We first require kafkajs module and extract the class. We then create a topic and group ID.
It is simply to identify the group that will be consuming, and the name of a topic that we’ll be producing.
1 2 3 |
const { Kafka } = require('kafkajs') const topicName = "test-topic22" const GroupID = "test-group22" |
Then we instantiate it with a unique client id. For brokers let’s use localhost:9092 because that is what we’re using in our Kafka settings file.
1 2 3 4 |
const kafka = new Kafka({ clientId: 'my8app', brokers: ['localhost:9092', 'localhost:9092'] }) |
get a producer
1 |
const producer = kafka.producer() |
get a consumer
1 |
const consumer = kafka.consumer({ groupId: GroupID }) |
Consumers Subscribe
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
server.route({ method: 'GET', path: '/consumer-subscribe', handler: async (request, h) => { console.log("consumer connect") await consumer.connect() console.log("consumer subscribe") await consumer.subscribe({ topic: topicName, fromBeginning: true }) return true } }); |
Consumer Run
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
server.route({ method: 'GET', path: '/consumer-run', handler: async (request, h) => { console.log("consumer run") await consumer.run({ eachMessage: async ({ topic, partition, message }) => { console.log({ partition, offset: message.offset, value: message.value.toString(), }) }, }) return true } }); |
Consumer Unsubscribe
1 2 3 4 5 6 7 8 9 |
server.route({ method: 'GET', path: '/consumer-unsubscribe', handler: async (request, h) => { console.log("consumer disconnect") await consumer.disconnect({ topic: topicName }) return true } }); |
Producer Start
1 2 3 4 5 6 7 8 9 |
server.route({ method: 'GET', path: '/producer-start', handler: async (request, h) => { console.log('producer connected') await producer.connect(); return true; } }); |
Producer Send Data
1 2 3 4 5 6 7 8 9 10 11 12 13 |
server.route({ method: 'GET', path: '/producer-send', handler: async (request, h) => { console.log('producer send') return await producer.send({ topic: topicName, messages: [ { value: '8 - Herro KafkaJS user!' }, ], }) } }); |
Producer Stop
1 2 3 4 5 6 7 8 9 |
server.route({ method: 'GET', path: '/producer-stop', handler: async (request, h) => { console.log('producer stop') await producer.disconnect() return true } }); |