ref –
- https://blog.sessionstack.com/how-javascript-works-the-publisher-subscriber-pattern-9edc62ef1a68
- https://refactoring.guru/design-patterns/observer/typescript/example
- Broker Demo
The broker demo is to describe the pub/sub broker. To run it:
yarn install
yarn develop
open browser at “localhost:8080”
First, let’s take a look at the Observer Pattern.
Conceptual example on Observer Pattern
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 |
** * The Subject interface declares a set of methods for managing subscribers. */ interface Subject { // Attach an observer to the subject. attach(observer: Observer): void; // Detach an observer from the subject. detach(observer: Observer): void; // Notify all observers about an event. notify(): void; } /** * The Subject owns some important state and notifies observers when the state * changes. */ class ConcreteSubject implements Subject { /** * @type {number} For the sake of simplicity, the Subject's state, essential * to all subscribers, is stored in this variable. */ public state: number; /** * @type {Observer[]} List of subscribers. In real life, the list of * subscribers can be stored more comprehensively (categorized by event * type, etc.). */ private observers: Observer[] = []; /** * The subscription management methods. */ public attach(observer: Observer): void { const isExist = this.observers.includes(observer); if (isExist) { return console.log('Subject: Observer has been attached already.'); } console.log('Subject: Attached an observer.'); this.observers.push(observer); } public detach(observer: Observer): void { const observerIndex = this.observers.indexOf(observer); if (observerIndex === -1) { return console.log('Subject: Nonexistent observer.'); } this.observers.splice(observerIndex, 1); console.log('Subject: Detached an observer.'); } /** * Trigger an update in each subscriber. */ public notify(): void { console.log('Subject: Notifying observers...'); for (const observer of this.observers) { observer.update(this); } } /** * Usually, the subscription logic is only a fraction of what a Subject can * really do. Subjects commonly hold some important business logic, that * triggers a notification method whenever something important is about to * happen (or after it). */ public someBusinessLogic(): void { console.log('\nSubject: I\'m doing something important.'); this.state = Math.floor(Math.random() * (10 + 1)); console.log(`Subject: My state has just changed to: ${this.state}`); this.notify(); } } /** * The Observer interface declares the update method, used by subjects. */ interface Observer { // Receive update from subject. update(subject: Subject): void; } /** * Concrete Observers react to the updates issued by the Subject they had been * attached to. */ class ConcreteObserverA implements Observer { public update(subject: Subject): void { if (subject instanceof ConcreteSubject && subject.state < 3) { console.log('ConcreteObserverA: Reacted to the event.'); } } } class ConcreteObserverB implements Observer { public update(subject: Subject): void { if (subject instanceof ConcreteSubject && (subject.state === 0 || subject.state >= 2)) { console.log('ConcreteObserverB: Reacted to the event.'); } } } /** * The client code. */ const subject = new ConcreteSubject(); const observer1 = new ConcreteObserverA(); subject.attach(observer1); const observer2 = new ConcreteObserverB(); subject.attach(observer2); subject.someBusinessLogic(); subject.someBusinessLogic(); subject.detach(observer2); subject.someBusinessLogic(); |
In the observer pattern, observers are attached to a subject. When the subjects notifies them all, it calls update on the subjects. That’s how the subjects are all notified.
In other words, with the observer pattern, each observer is required to be attached to the subject, so that it can get a notification. However, the pub/sub pattern does not require this.
Difference between pub/sub and Observer
The pub/sub pattern uses a middleware that sits between the objects firing the event. In this case, the publishers and the subscribed objects.
The pub/sub pattern involves a middleware that is also referred to as the broker.
The pub/sub broker handles interactions between the publishers and the subscribers via data structures and its implementations.
Publishers publish contents or publications to the pub/sub broker and it handles the delivery of these contents to the appropriate subscriber.
The pub/sub broker also enables loose decoupling of publishers and subscribers and it supports many to many relationships between the publishers and the subscribers.
So unlike the observer pattern, the pub/sub pattern allows multiple publishers and multiple subscribers.
Example – React app that subscribes to certain topics and receives the data in the UI component
Let’s see how it is used.
In our case subscribers are clients such as a React app. In its UI component, it needs to receive data about what to change and update on certain pages.
So the React code may be like so:
subscribing on the UI
1 2 3 4 5 6 7 8 9 10 11 12 13 |
import PSBroker from "PubsubBroker"; const RacePageView: React.FC = () => ( // A React client may use our broker on its UI component to subscribe to messages. // it gets a PSBroker object from say PSBroker npm package, subscribes to its special 'string' for events... // and then places this line on say UI component racePageView: PSBroker.subscription(racePageTopic, racePageViewCB).subscribe(); ); export default RacePageView; |
Callback to receive published data
callback will start receiving data once an entity publishes to racePageTopic.
1 2 3 4 5 6 |
export const racePageViewCB = (param) => { const [ data, type ] = param; if (type === 'race') { console.log(`race page view will receive data: ${data}`); } }; |
We can also use the broker in other components to subscribe to different topics to receive different data.
Again, the callback would receive the data and update the UI accordingly:
1 2 3 4 5 6 7 8 9 |
import PSBroker from "PubSubBroker"; const InvestmentPageView: React.FC = () => ( ... ... PSBroker.subscription(investmentViewTopic, investmentPageViewCB).subscribe(); ); export default InvestmentPageView; |
callback will start receiving data once an entity publishes to investmentViewTopic.
1 2 3 4 5 6 |
export const investmentPageViewCB = (param) => { const [ data, type ] = param; if (type === 'investment') { console.log(`investment page view will receive data: ${data}`); } }; |
The Broker
We have a Map structure that keeps track of a string key and an array of functions.
The key is the topic that different clients will subscribe to, and the array of functions keep track of the subscribers.
We use subscription to return an object that has two functionalities: subscribe and unsubscribe. It uses closure to access the needed topic name and subscriber.
When subscribe, we simply keep track of the topic name as keys in our Map, and then append subscribers (from clients) onto the value array.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 |
class PubSubBroker { private events: Map<string, [any]>; constructor() { this.events = new Map<string, [any]>(); } subscription(topicName: string, subscriber: any) { const subscribersArr = this.events.get(topicName); return { subscribe: () => { // client subscribes to this topic }, unsubscribe: () => { // client unsubscribes } } } publish(topicName, ...args) { // publisher publishes according top topic const subscribers = this.events.get(topicName); // other code } } |
The publishers will want to publish data according to topic name. This means all subscribers to that topic name, will receive that data.
Subscribe
We first to check whether we have an entry for this topic. If we do, simply push. If not, create one, then push the subscriber onto it.
1 2 3 4 5 6 7 8 9 |
const subscribersArr = this.events.get(topicName); subscribe: () => { if (subscribersArr) { subscribersArr.push(func); // if it does, we simply add on the subscribers. } else { this.events.set(topicName, [func]); } console.log(`${func.name} subscribed to ${topicName}`); } |
Unsubscribe
We unsubscribe a client subscriber by iterating our values array to find it. Then remove it.
1 2 3 4 5 6 7 8 9 10 11 12 |
const subscribersArr = this.events.get(topicName); unsubscribe: () => { if (Array.isArray(subscribersArr) && subscribersArr.length > 0) { for(let i = 0; i < subscribersArr.length; i++) { if (subscribersArr[i].name === func.name) { subscribersArr.splice(i, 1); console.log(`${func.name} UnSubscribed to ${topicName}`); return; } } } } |
Publish
Backend apps can act as publishers and push data like so. All it needs is to know which topic it should push to.
Once we give a topic name, it’ll search for that topic, seems which subscribers are stored in the value array, and then
execute each subscriber with data as args.
1 2 3 4 5 6 7 8 |
publish(topicName, ...args) { const subscribers = this.events.get(topicName); if (Array.isArray(subscribers)) { subscribers.forEach((subscriber) => { subscriber.apply(null, args); }) } } |
Publishers
A node server may act as a publisher. It will import PSBroker, then use it to publish data
to all the subscribers for that event topic.
1 2 3 4 5 6 |
import PSBroker from "../PubsubBroker"; PSBroker.publish(racePageTopic, ["Voided", "race"]); PSBroker.publish(racePageTopic, ["Resulted", "race"]); PSBroker.publish(investmentViewTopic, ["1024 bucks", "investment"]); PSBroker.publish(investmentViewTopic, ["520 bucks", "investment"]); |