@idearium/amqp
Explicitly connect to an AMQP server and then publish and consume messages.
Installation
$ yarn add -E @idearium/amqp
Beta installation
If you need to install a beta version, you can:
$ yarn add -E @idearium/amqp@beta
Usage
To use @idearium/amqp
, you'll need to:
- Connect to an AMQP server.
- Setup consumers.
- Publish messages.
Connect to an AMQP server
Use the following to create a connection to an AMQP server.
const amqp = require('@idearium/amqp');
await amqp.connect('amqps://localhost:5671')
Setup consumers
Start by setting up consumers so that messages will be processed:
const amqp = require('@idearium/amqp');
amqp.consume(
'consumer-name',
async (data) => {
console.log('Consuming data', data);
return true;
},
{
exchange: 'ampq-test',
queue: 'ampq-test',
routingKey: 'ampq-test',
}
)
Publish messages
Now you can start publishing messages:
const amqp = require('@idearium/amqp');
amqp.publish('test-b', { test: true }, {
exchange: 'ampq-test',
routingKey: 'ampq-test',
persistent: true,
});
Examples
Certificates
This example shows how to load certificates and pass it to connect
to allow making secured connections.
// lib/certs.js
const fs = require('fs/promises');
const { join } = require('path');
const promiseAllSettled = require('@idearium/promise-all-settled');
const loadFile = async (path) => fs.readFile(path, 'utf-8');
const readDir = async (path) => fs.readdir(path);
module.exports = async (dir) => {
const content = await readDir(dir);
const certs = {};
const certPath = content.find((path) => /\.ce?rt$/.test(path));
const keyPath = content.find((path) => /\.key$/.test(path));
if (certPath) {
certs.crt = await loadFile(join(dir, certPath));
}
if (keyPath) {
certs.key = await loadFile(join(dir, keyPath));
}
if (content.includes('ca')) {
[, certs.ca] = await promiseAllSettled(
(await readDir(join(dir, 'ca')))
.filter((path) => /\.ce?rt$/.test(path))
.map((path) => loadFile(join(dir, 'ca', path)))
);
}
return certs;
};
const amqp = require('@idearium/amqp');
const certs = require('./lib/certs');
const createConnection = async () => {
const opts = await certs(`${process.cwd()}/amqp-certs`);
return client.connect(
'amqps://localhost:5671',
opts
);
};
module.exports = async (opts = {}) => {
await createConnection();
// Setup consumers
// Publish messages
};