with Node.js
Principal Developer Advocate at Heroku
Β
JSConf/NodeConf Colombia Organizer
Node.js Collaborator Emeritus
Β
Β
π¦ @julianduque.co
/in/juliandavidduque
XΒ @julian_duque
Event-driven architecture (EDA), also known as message-driven architecture, is a software architecture pattern promoting the production, detection, consumption of, and reaction to events.
Source: Wikipedia
Events β‘ Simple signals or messages that indicate something has happened, a state change, or a new request for action
Β
EDA β‘ A way to communicate between components without tight coupling
Β
Benefits β‘ EDA enables asynchronous, non-blocking interactions
Β
Β
JavaScript's non-blocking nature (Event loop)
Clicking a button on a website is an event. The browser listens to this event and sends a message to perform an action β this is essentially how EDA works at a small scale
βββββββββββββββββββββββββββββ
ββ>β timers β
β βββββββββββββββ¬ββββββββββββββ
β βββββββββββββββ΄ββββββββββββββ
β β pending callbacks β
β βββββββββββββββ¬ββββββββββββββ
β βββββββββββββββ΄ββββββββββββββ
β β idle, prepare β
β βββββββββββββββ¬ββββββββββββββ βββββββββββββββββ
β βββββββββββββββ΄ββββββββββββββ β incoming: β
β β poll β<ββββββ€ connections, β
β βββββββββββββββ¬ββββββββββββββ β data, etc. β
β βββββββββββββββ΄ββββββββββββββ βββββββββββββββββ
β β check β
β βββββββββββββββ¬ββββββββββββββ
β βββββββββββββββ΄ββββββββββββββ
ββββ€ close callbacks β
βββββββββββββββββββββββββββββ
Source: The Node.js Event Loop
import { EventEmitter } from "node:events";
const emitter = new EventEmitter();
emitter.on("start", () => {
console.log("started");
});
emitter.emit("start");
emitter.on("data", (data) => {
console.log("received", data);
});
emitter.emit("data", { message: "Hello Node Congress" });
emitter.on("error", handleError);
Handle events in Node.js
Emit events synchronously
Used widely in Node core and the ecosystem
http/https
fs
stream
dgram
process
import { EventEmitter } from "node:events";
const events = new EventEmitter();
events.on("emote", (data) => {
const { emote } = data;
sendToClient(emote);
});
events.on("votes", (data) => {
const { votes } = data;
sendToClient(votes);
});
events.emit("emote", { "emote": "dolphin" });
events.emit("votes", { "dolphin": 10, "applause": 5 });
Synchronous communication
Β
Fire & forget
Β
Fan-out only
Β
Non-persistent
import Redis from "redis";
const pub = new Redis();
const sub = new Redis();
sub.subscribe("emotes", (err, count) => {});
sub.on('message', (channel, message) => {
if (channel === "emotes") {
const data = JSON.parse(message);
// process event
}
});
setInterval(() => {
pub.publish("emotes", JSON.stringify({
emote: "dolphin"
}));
}, 1000);
$ npm install redis
Brokers
The instances running Kafka and managing streams of events in a cluster.
βProducers + Consumers
Clients that write to or read from a Kafka cluster.
βTopics
Streams of events that are replicated across the brokers. Configured with time based retention or log compaction.
βPartitions
Discrete subsets of topics, and important tuning points for parallelism and ordering.
BROKER
TOPIC
PARTITION
PRODUCERS
CONSUMERS
100% implemented in JavaScript (no native dependencies)
Β
Production-ready
Β
Transactions, compression, batches
import { EventEmitter } from 'node:events';
import { Kafka } from 'kafkajs';
const kafka = new Kafka({
clientId: 'emotes-api',
brokers: ['host1:port', 'host2:port', 'host3:port'],
});
const producer = kafka.producer();
const consumer = kafka.consumer({
groupId: KAFKA_PREFIX + 'emotes'
});
const topic = KAFKA_PREFIX + 'emotes';
const events = new EventEmitter();
await consumer.subscribe({ topic });
await consumer.run({
eachMessage: async ({
topic: _topic,
partition,
message
}) => {
try {
const { event, data } =
JSON.parse(message.value.toString());
events.emit(event, data);
} catch (error) {
}
},
});
await producer.connect();
$ npm install kafkajs
Unidirectional stream of events from the server to the browser
Β
Content-Type: text/event-stream
id: 42
event: hitchhike
data: { "planet": "Betelgeuse V" }
Β
Β
// server.js
import FastifySSEPlugin from 'fastify-sse-v2';
// Register SSE plugin
fastify.register(FastifySSEPlugin);
// routes/index.js
import { EventIterator } from 'event-iterator';
// Route /event/:id
fastify.get('/events/:id', async (request, reply) => {
// fastify-cors doesn't seem to work with fastify-sse-v2
// so we need to add this header to this route manually
reply.raw.setHeader('Access-Control-Allow-Origin', '*');
const id = request.params.id;
const eventIterator = new EventIterator(({ push }) => {
// Get events from Kafka and send them through SSE
events.on(`emote:${id}`, push);
events.on(`votes:${id}`, push);
// Cleanup event listeners to prevent memory leaks
request.raw.on('close', () => cleanup(id, push));
return () => cleanup(id, push);
});
reply.sse(eventIterator);
});
Server
$ npm install fastify-sse-v2 event-iterator
Unidirectional stream of events from the server to the browser
Β
Content-Type: text/event-stream
id: 42
event: hitchhike
data: { "planet": "Betelgeuse V" }
Β
EventSourceΒ is part of the Web API
const events = new EventSource("/api/events/nodecongress");
// Handle any message
events.onmessage = (message) => {};
// Handle named events
events.addEventListener("votes", (event) => {
// Update votes on interface
});
events.addEventListener("emote", (event) => {
// Trigger animation
});
Client
π¬
Thank you!