【后端开发实习】用MongoDB和Redis实现消息队列搭建分布式邮件消息系统

news/2024/8/26 16:57:41 标签: mongodb, redis, 分布式

用Redis实现消息队列并搭建分布式邮件消息系统

  • 系统介绍
  • Redis实现消息队列
    • 思路分析
    • 代码实现
  • MongoDB监听数据变化
    • 思路分析
    • 代码实现
      • Mongoose测试连接
      • 监听mongodb数据变化
  • 注意点

系统介绍

本次要实现的是一个能够实现实时监控Mongodb中数据变化的系统,要能够在数据发生变动的时候实时将变动消息发送给指定的邮箱。

  • Node.js:用于开发的语言,既能用于前端开发,又能用来做后端开发。
  • Redis:用于搭建消息队列,实现消息的分布式
  • MongoDB:持久化数据,同时实现触发条件的监听,当MongoDB中有新增数据的时候发送新增数据的邮件消息。

Redis实现消息队列

思路分析

主要使用的就是Redis-smq这个库,下面展示的就是主要使用的消息队列类,其中包括了很多队列种类,有先进先出、优先级先出等方式。
在这里插入图片描述
整个库的原理如下结构图,本次使用到的只有主线,就是发送和接收:
在这里插入图片描述

代码实现

const { transemail } = require('../email_list/email.js');
const redis = require('promise-redis-client');
const redisHost = 'localhost';
const redisPort = 6379;

// 配置 Redis 客户端
const createRedisClient = () => {
    return new Promise((resolve, reject) => {
        let client = redis.createClient({ host: redisHost, port: redisPort });
        client.on('error', err => {
            console.log('Redis 连接出错');
            reject(err);
        });
        client.on('ready', () => {
            console.log('Redis ready');
            resolve(client);
        });
    });
};

async function startWaitMsg(redisClient) {
    while (true) {
        let res = null;
        try {
            res = await redisClient.brpop('bookChanges', 0);
            console.log('收到消息', res);
        } catch (err) {
            console.log('brpop 出错,重新 brpop');
            continue;
        }
        res = res.toString();
        transemail(res);
    }
}

async function listenredis() {
    try {
        // 启动生产者
        // startProducer();

        // 创建 Redis 客户端
        const redisClient = await createRedisClient();

        // 启动消息监听
        startWaitMsg(redisClient);
    } catch (error) {
        console.error('Error:', error);
    }
}
//测试的时候使用的代码
listenredis().catch(console.error);

// 处理退出信号以关闭客户端
process.on('SIGINT', async () => {
    console.log('Closing clients...');
    process.exit(0);
});

MongoDB监听数据变化

思路分析

由于要实现实时检测,经过分析以后使用mongoose中的数据流监控最为合适,但是要实现这个方法需要用到watch方法,这个方法只有在mongodb有副本集的时候才能使用,因此还需要提前配置好mongodb才能进行这里下一步的操作,如果没有配置过mongodb的副本集的可以参考我的这篇博客。

  1. 用mongoose中的watch连接mongodb副本集数据库获取数据变化。
  2. 将数据变化发送到redis消息队列中。

首先在命令行中将服务启动:
在这里插入图片描述

代码实现

Mongoose测试连接

const mongoose = require('mongoose');

mongoose.connect('mongodb://localhost/test', {
  useNewUrlParser: true,
  useUnifiedTopology: true
}).then(() => {
  console.log('Successfully connected to MongoDB');

  const bookSchema = new mongoose.Schema({
    title: String,
    author: String
  });

  const Book = mongoose.model('Book', bookSchema);

  const bookChangeStream = Book.watch();

  bookChangeStream.on('change', (change) => {
    console.log('Collection changed:', change);
    if (change.operationType === 'insert') {
      console.log('New book added:', change.fullDocument);
    }
  });
}).catch((error) => {
  console.log('Error connecting to MongoDB:', error);
});

在这里插入图片描述
测试结果:
在Mongo Campass中添加数据以后,在终端中出现如下消息:
在这里插入图片描述
证明测试成功,可以进行下一步操作啦!

mongodb_121">监听mongodb数据变化

const redis = require('redis');
const mongoose = require('mongoose');
// 创建 Redis 客户端
const redisClient = redis.createClient({
	host: 'localhost',
	port: 6379
  });
  
  // 连接到 Redis
redisClient.connect();
  
//连接mongodb数据库并检测变化发送到redis消息队列
async function connectAndMonitorMongoDB(redisClient) {
  try {
    await mongoose.connect('mongodb://localhost/test', {
      useNewUrlParser: true,
      useUnifiedTopology: true
    });
    console.log('Successfully connected to MongoDB');

    const bookSchema = new mongoose.Schema({
      title: String,
      author: String
    });

    const Book = mongoose.model('Book', bookSchema);

    const bookChangeStream = Book.watch();
	try{
		bookChangeStream.on('change', (change) => {
			console.log('Collection changed:', change);
			console.log("type of change:",typeof(change));
			msg = JSON.stringify(change.fullDocument);
			msg = msg.replace(/{|}/g, '');
			msg = "New message received:"+msg;
			console.log("massage:",msg);
			console.log("type of message:",typeof(msg));
			if (change.operationType === 'insert') {
			  console.log('New book added:', msg);
			  redisClient.lPush('bookChanges', msg, function(err, reply) {
				if (err) {
				  console.log('Error storing JSON to Redis:', err);
				} else {
				  console.log('JSON stored successfully, list length:', reply);
				}})
			}
		  });
	}catch (err){
		console.log("error while loading data into redis:", err)
	}
  } catch (error) {
    console.log('Error connecting to MongoDB:', error);
  }
}

// module.exports = { connectAndMonitorMongoDB };
async function main() {
  try {
    await connectAndMonitorMongoDB(redisClient);
    console.log('Monitoring MongoDB changes...');
  } catch (error) {
    console.error('Failed to start monitoring:', error);
  }
}

main();

注意点

在nodejs中将JSON对象转换成字符串的JSON.Stringify函数并不是严格的转换成字符串而是带有一个大括号,然而这个在进行redis进队列的时候会有问题,因此需要用正则表达式去掉大括号:

msg = JSON.stringify(change.fullDocument);
msg = msg.replace(/{|}/g, '');
msg = "New message received:"+msg;

http://www.niftyadmin.cn/n/5558271.html

相关文章

Netty UDP

Netty在UDP(用户数据报协议,User Datagram Protocol)通信中的应用非常广泛,特别是在对实时性要求较高、对数据准确性要求相对较低的场景中,如视频传输、语音通信等。以下是对Netty在UDP通信中的详细解析: …

定制化服务发现:Eureka中服务实例偏好的高级配置

定制化服务发现:Eureka中服务实例偏好的高级配置 在微服务架构中,服务实例的智能管理和优化是保证系统高效运行的关键。Eureka作为Netflix开源的服务注册与发现框架,提供了丰富的配置选项来满足不同场景下的需求。服务实例偏好配置允许开发者…

Android 音频通道切换HDMI,蓝牙,喇叭

Android 音频通道切换HDMI,蓝牙,喇叭 private void speakerSound() {if (soundOutput.equals("speaker")) {return;}soundOutput "speaker";audoManager (AudioManager) mContext.getSystemService(Context.AUDIO_SERVICE);audoManager.setMode(AudioMa…

OpenCV中的GrabCut图像分割算法的使用

操作系统:ubuntu22.04 OpenCV版本:OpenCV4.9 IDE:Visual Studio Code 编程语言:C11 功能描述 GrabCut 算法是一种用于图像分割的技术,由 Carsten Rother、Vladimir Kolmogorov 和 Andrew Blake 在 2004 年 SIGGRAPH 会议的论文《…

图论建模技巧搜集

一些经典题目 找可达路径 UVa - 11604 General Sultan 平面图最小割对偶图最短路 UVa - 1376 Animal Run 最小割建模 UVa - 1515 Pool construction 费用流建模 洛谷P3159 [CQOI2012] 交换棋子 其他人写的博客 最详细(也可能现在不是了)网络流建模…

昇思25天学习打卡营第12天|Vision Transformer图像分类、SSD目标检测

Vision Transformer(ViT)简介 近些年,随着基于自注意(Self-Attention)结构的模型的发展,特别是Transformer模型的提出,极大地促进了自然语言处理模型的发展。由于Transformers的计算效率和可扩…

Hbase、hive以及ClickHouse的区别?

HBase、Hive以及ClickHouse是三种在大数据领域广泛使用的数据库系统,它们各自具有独特的特点和适用场景。以下是它们之间的主要区别: 一、数据模型与存储方式 系统数据模型存储方式HBase分布式、面向列的NoSQL数据库基于Hadoop的HDFS平台,数…

git 代理错误拒绝连接

git 克隆项目拒绝连接 现象 Failed to connect to 127.0.0.1 port 15732: 拒绝连接 问题描述 代理错误解决方法 取消代理 git config --global --unset http.proxy