Custom Transport

Nest provides TCP and Redis as a built-in transport methods. It makes prototyping incredibly fast & easy, but sometimes you might want to use another type of transport, e.g. RabbitMQ messaging. Is it possible? Yes, sure.

You can port any transport strategy to Nest. You only have to create a class, which extends Server and implements CustomTransportStrategy interface.

The Server class provides getHandlers() method, which returns MessagePattern mappings (object, where key is a pattern and value is a callback), while CustomTransportStrategy forces on you to implement both listen() and close() methods.

Let's create a simple RabbitMQServer class. We will use ampqlib library.

import * as amqp from 'amqplib';
import { Server, CustomTransportStrategy } from '@nestjs/microservices';
import { Observable } from 'rxjs/Observable';

export class RabbitMQServer extends Server implements CustomTransportStrategy {
    private server = null;
    private channel = null;

    constructor(
        private readonly host: string,
        private readonly queue: string) {
            super();
        }

    public async listen(callback: () => void) {
        await this.init();
        this.channel.consume(`${this.queue}_sub`, this.handleMessage.bind(this), { noAck: true });
    }

    public close() {
        this.channel && this.channel.close();
        this.server && this.server.close();
    }

    private handleMessage(message) {
        const { content } = message;
        const msg = JSON.parse(content.toString());

        const handlers = this.getHandlers();
        const pattern = JSON.stringify(msg.pattern);
        if (!this.messageHandlers[pattern]) {
            return;
        }

        const handler = this.messageHandlers[pattern];
        const response$ = handler(msg.data) as Observable<any>;
        response$ && this.send(response$, (data) => this.sendMessage(data));
    }

    private sendMessage(message) {
        this.channel.sendToQueue(`${this.queue}_pub`, Buffer.from(JSON.stringify(message)));
    }

    private async init() {
        this.server = await amqp.connect(this.host);
        this.channel = await this.server.createChannel();
        this.channel.assertQueue(`${this.queue}_sub`, { durable: false });
        this.channel.assertQueue(`${this.queue}_pub`, { durable: false });
    }
}

The most interesting method is handleMessage(). Its resposibility is to match pattern with appropriate handler and call it with received data. Also, notice that I used send() method inherited from Server class. You should use it too if you want to avoid e.g. sending disposed message when Observable is completed.

Last step is to set-up our RabbitMQ strategy:

const app = NestFactory.createMicroservice(ApplicationModule, {
    strategy: new RabbitMQServer('amqp://localhost', 'example'),
});

It's everything!

Client

The RabbitMQ server is listening for messages. Now, we must create a client class, which should extends built-in ClientProxy. We only have to override abstract sendSingleMessage() method.

Let's create RabbitMQClient class:

import * as amqp from 'amqplib';
import { ClientProxy } from '@nestjs/microservices';

export class RabbitMQClient extends ClientProxy {
    constructor(
        private readonly host: string,
        private readonly queue: string) {
            super();
        }

    protected async sendSingleMessage(msg, callback: (err, result, disposed?: boolean) => void) {
        const server = await amqp.connect(this.host);
        const channel = await server.createChannel();
        const sub = this.getSubscriberQueue();
        const pub = this.getPublisherQueue();

        channel.assertQueue(sub, { durable: false });
        channel.assertQueue(pub, { durable: false });

        channel.consume(pub, (message) => this.handleMessage(message, server, callback), { noAck: true });
        channel.sendToQueue(sub, Buffer.from(JSON.stringify(msg)));
    }

    private handleMessage(message, server, callback: (err, result, disposed?: boolean) => void) {
        const { content } = message;
        const { err, response, disposed } = JSON.parse(content.toString());
        if (disposed) {
            server.close();
        }
        callback(err, response, disposed);
    }

    private getPublisherQueue(): string {
        return `${this.queue}_pub`;
    }

    private getSubscriberQueue(): string {
        return `${this.queue}_sub`;
    }
}

How to use it? There is nothing special, just create an instance:

export class ClientController {
    private readonly client = new RabbitMQClient('amqp://localhost', 'example');
}

The rest work equivalently (use send() method).

results matching ""

    No results matching ""