[实践OK]利用Redis的notifications功能实现延时任务,任务失败重试N次的机制实现,Node之Error: Cannot find module 'redis。
PHP版本的实践:https://www.imooc.com/article/10431
背景:在工作中经常会遇到一些关于定时任务的实际场景,比如每天凌晨1点自动备份数据库,或者,每隔1小时执行一次爬虫脚本,这种固定时间执行固定动作的需求我们称之为定时任务,利用crontab即可轻松实现。如果我们对自动备份数据库这个定时任务改变一下需求(这种情况就像你邀请一个人,一天内如果没有人来或有人来你通知下你,你邀请的人来了,这种任务。二、再就是公司没啥好的设备,钱少,网太烂了搞一个任务比如Mysql备份数据库的脚本,比如备份Redis的数据Bgsave的Scp拷贝经常出现网络不好,第一次备份会失败,于是得第二次这种垃圾需求。有垃圾需求就有解决办法,于于优雅或不优雅是一回事,但得技术人员觉得有一个流程总比没有流程好,本来没有方案的,于是就有技术方案。),如图:
点击在新窗口中浏览此图片
如果仍然利用crontab来实现,就有点勉强了。类似这种需求最常见的是服务器之间的消息通知,假如服务器B由于网络不稳定或者服务器压力较大导致不能即时对服务器A的消息作出正确响应,那么服务器A就会延迟一段时间再次发送消息,直到收到服务器B的正确响应或者超出最大通知次数为止。过去的做法是定时扫表,把通知失败的消息再次发送一遍,虽然可以多次发送通知,但是发送间隔太短会增加服务器B的压力,发送间隔太长消息的时效性就不能保证,显然处理这种延时任务用crontab根本不能解决问题。
Node之Error: Cannot find module 'redis:
#npm install -g redis
/usr/lib
└─┬ redis@2.7.1
├── double-ended-queue@2.1.0-0
├── redis-commands@1.3.1
└── redis-parser@2.6.0
环境变量:
#rpm -ql nodejs-6.10.3-1.el7.x86_64
/usr/bin/node
/usr/lib/node_modules
export NODE_PATH=/usr/lib/node_modules
#echo $NODE_PATH
/usr/lib/node_modules
#node notice.js
订阅成功
select 3
OK
setex msg_2 2 chokingwin
OK
client.on("pmessage", function(pattern, channel, expiredKey) {
console.log(pattern + "|" + channel + "|" + expiredKey);
_ _keyevent@3__:expired|__keyevent@3__:expired|msg_2
从Redis 2.8.0版本起,加入了"Keyspace notifications"(即"键空间通知")的功能。按照官方的说法:键空间通知,允许Redis客户端从“发布/订阅”通道中建立订阅关系,以便客户端能够在Redis中的数据因某种方式受到影响时收到相应事件。比如:所有改变给定key的命令;所有经过lpush操作的key;所有在0号数据库中过期的key等等。我们在处理延时任务的时候,先把通知失败的消息ID作为key的一部分存到redis缓存中,并设定过期时间(相当于延时),当这条缓存数据失效的时候,通过订阅关系(用NodeJS实现)就可以收到消息,通过分析消息就可以知道过期KEY,这样就可以再次发送消息通知,从而实现延时任务。
不过,需要注意一点:Redis的发布/订阅目前是即发即弃(fire and forget)模式的,因此无法实现事件的可靠通知。也就是说,如果发布/订阅的客户端断链之后又重连,则在客户端断链期间的所有事件都丢失了。
核心部分是两个Redis的终端,分别连接上Redis,并打开这个特性,另一个终端是监控的,这块里面用代码进行编写订阅,如下:
订阅,作者用的是Node,我在这儿不得不打下广告了,Swoole是不是应该也能支持这个功能?https://wiki.swoole.com/wiki/page/523.html ,http://blog.csdn.net/koastal/article/details/52869140,subscribe。
psubscribe来自:https://wiki.swoole.com/wiki/page/590.html
<?php
$serv = new Swoole\Server("127.0.0.1", 9501);
$serv->set(array(
'worker_num' => 8, //工作进程数量
'daemonize' => false, //是否作为守护进程
));
$serv->on('connect', function ($serv, $fd){
echo "Client:Connect.\n";
});
$serv->on('receive', function ($serv, $fd, $from_id, $data) {
$val = "";
$redis = new Swoole\Coroutine\Redis();
$redis->connect('10.51.77.34', 6379);
while (true) {
$val = $redis->psubscribe(['psubscribe __keyevent@3__:expired']);
//订阅的channel,以第一次调用subscribe时的channel为准,后续的subscribe调用是为了收取Redis Server>的回包
//如果需要改变订阅的channel,请close掉连接,再调用subscribe
var_dump($val);
}
});
$serv->on('close', function ($serv, $fd) {
echo "Client: Close.\n";
});
$serv->start();
Swoole的这个Redis的Coroutine必须要有一个端口暴露,这是和Node最大的不同吧?上面这个图我试着使用了一下,感觉有点问题。
=============================================================================
#redis-cli -h 10.51.77.34
10.51.77.34:6379> psubscribe __keyevent@0__:expired
Reading messages... (press Ctrl-C to quit)
1) "psubscribe"
2) "__keyevent@0__:expired"
3) (integer) 1
1) "pmessage"
2) "__keyevent@0__:expired"
3) "__keyevent@0__:expired"
4) "name"
塞一个数据进去:
#redis-cli -h 10.51.77.34
10.51.77.34:6379> config set notify-keyspace-events Ex
OK
10.51.77.34:6379> setex name 10 chokingwin
OK
===================================================================================
关于expired事件通知的发送时间
Redis使用以下两种方式删除过期的键:a:当一个键被访问时,程序会对这个键进行检查,如果键已过期,则删除该键;b:系统会在后台定期扫描并删除那些过期的键。
当过期键被以上两种方式中的任意一种发现并且删除时,才会产生expired事件通知。
Redis不保证生存时间(TTL)变为 0 的键会立即被删除:如果没有命令访问这个键,或者设置生存时间的键非常多的话,那么在键的生存时间变为0,到该键真正被删除,这中间可能会有一段比较显著的时间间隔。
因此,Redis产生expired事件通知的时间,是过期键被删除的时候,而不是键的生存时间变为 0 的时候。
接下来我们开始代码实现(假定阅读本文的同学已正确安装Nginx/PHP/Redis/NodeJS的环境)。
一、与本文相关的环境信息
Redis配置文件路径:/etc/redis/6379.conf
测试用的Redis库编号为:3
监听消息的NodeJS文件:/NodeApp/notice.js
发送消息的PHP代码为:/send.php
接收redis数据的PHP代码:/test.php
业务流程:首先运行notice.js开启监听,然后运行send.php发送消息,如果没有收到成功响应,将消息ID存入redis缓存,之后按照10秒、30秒、60秒、120秒、300秒的时间间隔,再次发送消息通知,直到收到对消息的成功响应,或者超出最大通知次数为止。
二、修改Redis配置文件
因为键空间通知功能需要耗费一定的CPU时间,因此默认情况下,该功能是关闭的。可以通过修改配置文件,或者通过CONFIG SET命令,设置notify-keyspace-events选项,来启用或关闭该功能。
该选项的值为空字符串时,该功能禁用,选项值为非空字符串时,启用该功能,非空字符串由特定的多个字符组成,每个字符表示不同的意义:
K keyspace事件,事件以__keyspace@<db>__为前缀进行发布
E keyevent事件,事件以__keyevent@<db>__为前缀进行发布
g 一般性的,非特定类型的命令,比如del,expire,rename等
$ 字符串特定命令
l 列表特定命令
s 集合特定命令
h 哈希特定命令
z 有序集合特定命令
x 过期事件,当某个键过期并删除时会产生该事件
e 驱逐事件,当某个键因maxmemore策略而被删除时,产生该事件
A g$lshzxe的别名,因此”AKE”意味着所有事件
注意:该选项的值中至少需要包含K或者E,否则不会发布任何事件。比如,如果需要开启针对列表的keyspace事件通知,则该选项需要配置为"Kl"。
我们在服务器上运行vim /etc/redis/6379.conf,找到notify-keyspace-events开头的一行,将其配置为:notify-keyspace-events Ex,含义为:发布keyevent事件,使用过期事件(当每一个key失效时,都会生成该事件)。保存退出,并重启redis服务。如图:
点击在新窗口中浏览此图片
三、安装Node扩展
在网站根目录下,依次运行:
npm init #初始化创建package.json
npm install redis #安装redis扩展
npm install mysql #安装mysql扩展
四、实现send.php
为了便于实现延时的计算,我们将存入redis的key格式设计为:固定前缀+消息ID+时间戳+次数,如:noticeId_12345678_1482991887_2点击在新窗口中浏览此图片
关键代码:
$delayArr=[0,10,30,60,120,300];//延时间隔,相对于首次通知时间,单位为 s
$res=doSomething();//发送消息
$content=date('Y-m-d H:i:s').' 第 '.$nums.' 次发送通知,消息ID为:'.$noticeId."\n";
if($res==true){
$content.='消息发送成功'."\n";
}else{//未收到对方回应
$content.='消息发送失败,等待下次重发'."\n";
$expTime=$delayArr[$nums];
$nums++;
saveNoticeToRedis($noticeId,$stamp,$nums,$expTime);//存入缓存
}
//记录日志
file_put_contents($root.'/tmp.log',$content,FILE_APPEND);
五、实现 notice.js
服务器端运行notice.js后,会一直监听redis的Expired事件,取到ExpiredKey后,把消息ID、时间、通知次数,POST给test.php,从而实现再次发送消息。
关键代码:
var client = redis.createClient('6379', '127.0.0.1');
client.psubscribe("__keyevent@"+redisDB+"__:expired",function(){
//console.log('订阅成功');
});
client.on("pmessage", function(pattern, channel, expiredKey) {
var tmpArr=expiredKey.split('_');
if(tmpArr[0]==keyPrefix){
console.log('-----expired Key-----',expiredKey);
var noticeId=tmpArr[1];
var stamp=parseInt(tmpArr[2]);
var nums=parseInt(tmpArr[3]);
sendPost(noticeId,stamp,nums,logFile);//向test.php发送数据
}else{
console.log('-----error Key-----',expiredKey);
writeLog(logFile,'The key "'+expiredKey+'" is a error key.');
}
});
六、实现 test.php
点击在新窗口中浏览此图片
关键代码:
$delayArr=[0,10,30,60,120,300];//延时间隔,相对于首次通知时间,单位为 s
$res=doSomething();//发送消息
$content=date('Y-m-d H:i:s').' 第 '.$nums.' 次发送通知,消息ID为:'.$noticeId."\n";
if($res==true){
$content.='消息发送成功'."\n";
}else{//未收到对方回应
if($nums && $nums>=6){
$content.='消息ID:'.$noticeId."已达到最大通知次数,任务停止\n";
}else{
$content.='消息发送失败,等待下次重发'."\n";
$expTime=$stamp+$delayArr[$nums]-time();
$nums++;
saveNoticeToRedis($noticeId,$stamp,$nums,$expTime);//存入缓存
}
}
//记录日志
file_put_contents($root.'/tmp.log',$content,FILE_APPEND);
七、测试结果
点击在新窗口中浏览此图片
八、其他说明
本文内容为个人原创,首发今日头条,同时提供代码下载地址,供大家学习交流。本人以后还会发布更多原创干货,如果觉得有用,希望及时关注本头条号。
代码下载地址:http://www.i1981.com/zb_users/upload/2016/12/20161223.zip
DownLoad:
下载文件
点击这里下载文件
From: http://www.toutiao.com/a6369425996433408257/?tt_from=weixin&utm_campaign=client_share&app=news_article&utm_source=weixin&iid=11032449540&utm_medium=toutiao_ios&wxshare_count=1
背景:在工作中经常会遇到一些关于定时任务的实际场景,比如每天凌晨1点自动备份数据库,或者,每隔1小时执行一次爬虫脚本,这种固定时间执行固定动作的需求我们称之为定时任务,利用crontab即可轻松实现。如果我们对自动备份数据库这个定时任务改变一下需求(这种情况就像你邀请一个人,一天内如果没有人来或有人来你通知下你,你邀请的人来了,这种任务。二、再就是公司没啥好的设备,钱少,网太烂了搞一个任务比如Mysql备份数据库的脚本,比如备份Redis的数据Bgsave的Scp拷贝经常出现网络不好,第一次备份会失败,于是得第二次这种垃圾需求。有垃圾需求就有解决办法,于于优雅或不优雅是一回事,但得技术人员觉得有一个流程总比没有流程好,本来没有方案的,于是就有技术方案。),如图:
点击在新窗口中浏览此图片
如果仍然利用crontab来实现,就有点勉强了。类似这种需求最常见的是服务器之间的消息通知,假如服务器B由于网络不稳定或者服务器压力较大导致不能即时对服务器A的消息作出正确响应,那么服务器A就会延迟一段时间再次发送消息,直到收到服务器B的正确响应或者超出最大通知次数为止。过去的做法是定时扫表,把通知失败的消息再次发送一遍,虽然可以多次发送通知,但是发送间隔太短会增加服务器B的压力,发送间隔太长消息的时效性就不能保证,显然处理这种延时任务用crontab根本不能解决问题。
Node之Error: Cannot find module 'redis:
#npm install -g redis
/usr/lib
└─┬ redis@2.7.1
├── double-ended-queue@2.1.0-0
├── redis-commands@1.3.1
└── redis-parser@2.6.0
环境变量:
#rpm -ql nodejs-6.10.3-1.el7.x86_64
/usr/bin/node
/usr/lib/node_modules
export NODE_PATH=/usr/lib/node_modules
#echo $NODE_PATH
/usr/lib/node_modules
#node notice.js
订阅成功
select 3
OK
setex msg_2 2 chokingwin
OK
client.on("pmessage", function(pattern, channel, expiredKey) {
console.log(pattern + "|" + channel + "|" + expiredKey);
_ _keyevent@3__:expired|__keyevent@3__:expired|msg_2
从Redis 2.8.0版本起,加入了"Keyspace notifications"(即"键空间通知")的功能。按照官方的说法:键空间通知,允许Redis客户端从“发布/订阅”通道中建立订阅关系,以便客户端能够在Redis中的数据因某种方式受到影响时收到相应事件。比如:所有改变给定key的命令;所有经过lpush操作的key;所有在0号数据库中过期的key等等。我们在处理延时任务的时候,先把通知失败的消息ID作为key的一部分存到redis缓存中,并设定过期时间(相当于延时),当这条缓存数据失效的时候,通过订阅关系(用NodeJS实现)就可以收到消息,通过分析消息就可以知道过期KEY,这样就可以再次发送消息通知,从而实现延时任务。
不过,需要注意一点:Redis的发布/订阅目前是即发即弃(fire and forget)模式的,因此无法实现事件的可靠通知。也就是说,如果发布/订阅的客户端断链之后又重连,则在客户端断链期间的所有事件都丢失了。
核心部分是两个Redis的终端,分别连接上Redis,并打开这个特性,另一个终端是监控的,这块里面用代码进行编写订阅,如下:
订阅,作者用的是Node,我在这儿不得不打下广告了,Swoole是不是应该也能支持这个功能?https://wiki.swoole.com/wiki/page/523.html ,http://blog.csdn.net/koastal/article/details/52869140,subscribe。
psubscribe来自:https://wiki.swoole.com/wiki/page/590.html
<?php
$serv = new Swoole\Server("127.0.0.1", 9501);
$serv->set(array(
'worker_num' => 8, //工作进程数量
'daemonize' => false, //是否作为守护进程
));
$serv->on('connect', function ($serv, $fd){
echo "Client:Connect.\n";
});
$serv->on('receive', function ($serv, $fd, $from_id, $data) {
$val = "";
$redis = new Swoole\Coroutine\Redis();
$redis->connect('10.51.77.34', 6379);
while (true) {
$val = $redis->psubscribe(['psubscribe __keyevent@3__:expired']);
//订阅的channel,以第一次调用subscribe时的channel为准,后续的subscribe调用是为了收取Redis Server>的回包
//如果需要改变订阅的channel,请close掉连接,再调用subscribe
var_dump($val);
}
});
$serv->on('close', function ($serv, $fd) {
echo "Client: Close.\n";
});
$serv->start();
Swoole的这个Redis的Coroutine必须要有一个端口暴露,这是和Node最大的不同吧?上面这个图我试着使用了一下,感觉有点问题。
=============================================================================
#redis-cli -h 10.51.77.34
10.51.77.34:6379> psubscribe __keyevent@0__:expired
Reading messages... (press Ctrl-C to quit)
1) "psubscribe"
2) "__keyevent@0__:expired"
3) (integer) 1
1) "pmessage"
2) "__keyevent@0__:expired"
3) "__keyevent@0__:expired"
4) "name"
塞一个数据进去:
#redis-cli -h 10.51.77.34
10.51.77.34:6379> config set notify-keyspace-events Ex
OK
10.51.77.34:6379> setex name 10 chokingwin
OK
===================================================================================
关于expired事件通知的发送时间
Redis使用以下两种方式删除过期的键:a:当一个键被访问时,程序会对这个键进行检查,如果键已过期,则删除该键;b:系统会在后台定期扫描并删除那些过期的键。
当过期键被以上两种方式中的任意一种发现并且删除时,才会产生expired事件通知。
Redis不保证生存时间(TTL)变为 0 的键会立即被删除:如果没有命令访问这个键,或者设置生存时间的键非常多的话,那么在键的生存时间变为0,到该键真正被删除,这中间可能会有一段比较显著的时间间隔。
因此,Redis产生expired事件通知的时间,是过期键被删除的时候,而不是键的生存时间变为 0 的时候。
接下来我们开始代码实现(假定阅读本文的同学已正确安装Nginx/PHP/Redis/NodeJS的环境)。
一、与本文相关的环境信息
Redis配置文件路径:/etc/redis/6379.conf
测试用的Redis库编号为:3
监听消息的NodeJS文件:/NodeApp/notice.js
发送消息的PHP代码为:/send.php
接收redis数据的PHP代码:/test.php
业务流程:首先运行notice.js开启监听,然后运行send.php发送消息,如果没有收到成功响应,将消息ID存入redis缓存,之后按照10秒、30秒、60秒、120秒、300秒的时间间隔,再次发送消息通知,直到收到对消息的成功响应,或者超出最大通知次数为止。
二、修改Redis配置文件
因为键空间通知功能需要耗费一定的CPU时间,因此默认情况下,该功能是关闭的。可以通过修改配置文件,或者通过CONFIG SET命令,设置notify-keyspace-events选项,来启用或关闭该功能。
该选项的值为空字符串时,该功能禁用,选项值为非空字符串时,启用该功能,非空字符串由特定的多个字符组成,每个字符表示不同的意义:
K keyspace事件,事件以__keyspace@<db>__为前缀进行发布
E keyevent事件,事件以__keyevent@<db>__为前缀进行发布
g 一般性的,非特定类型的命令,比如del,expire,rename等
$ 字符串特定命令
l 列表特定命令
s 集合特定命令
h 哈希特定命令
z 有序集合特定命令
x 过期事件,当某个键过期并删除时会产生该事件
e 驱逐事件,当某个键因maxmemore策略而被删除时,产生该事件
A g$lshzxe的别名,因此”AKE”意味着所有事件
注意:该选项的值中至少需要包含K或者E,否则不会发布任何事件。比如,如果需要开启针对列表的keyspace事件通知,则该选项需要配置为"Kl"。
我们在服务器上运行vim /etc/redis/6379.conf,找到notify-keyspace-events开头的一行,将其配置为:notify-keyspace-events Ex,含义为:发布keyevent事件,使用过期事件(当每一个key失效时,都会生成该事件)。保存退出,并重启redis服务。如图:
点击在新窗口中浏览此图片
三、安装Node扩展
在网站根目录下,依次运行:
npm init #初始化创建package.json
npm install redis #安装redis扩展
npm install mysql #安装mysql扩展
四、实现send.php
为了便于实现延时的计算,我们将存入redis的key格式设计为:固定前缀+消息ID+时间戳+次数,如:noticeId_12345678_1482991887_2点击在新窗口中浏览此图片
关键代码:
$delayArr=[0,10,30,60,120,300];//延时间隔,相对于首次通知时间,单位为 s
$res=doSomething();//发送消息
$content=date('Y-m-d H:i:s').' 第 '.$nums.' 次发送通知,消息ID为:'.$noticeId."\n";
if($res==true){
$content.='消息发送成功'."\n";
}else{//未收到对方回应
$content.='消息发送失败,等待下次重发'."\n";
$expTime=$delayArr[$nums];
$nums++;
saveNoticeToRedis($noticeId,$stamp,$nums,$expTime);//存入缓存
}
//记录日志
file_put_contents($root.'/tmp.log',$content,FILE_APPEND);
五、实现 notice.js
服务器端运行notice.js后,会一直监听redis的Expired事件,取到ExpiredKey后,把消息ID、时间、通知次数,POST给test.php,从而实现再次发送消息。
关键代码:
var client = redis.createClient('6379', '127.0.0.1');
client.psubscribe("__keyevent@"+redisDB+"__:expired",function(){
//console.log('订阅成功');
});
client.on("pmessage", function(pattern, channel, expiredKey) {
var tmpArr=expiredKey.split('_');
if(tmpArr[0]==keyPrefix){
console.log('-----expired Key-----',expiredKey);
var noticeId=tmpArr[1];
var stamp=parseInt(tmpArr[2]);
var nums=parseInt(tmpArr[3]);
sendPost(noticeId,stamp,nums,logFile);//向test.php发送数据
}else{
console.log('-----error Key-----',expiredKey);
writeLog(logFile,'The key "'+expiredKey+'" is a error key.');
}
});
六、实现 test.php
点击在新窗口中浏览此图片
关键代码:
$delayArr=[0,10,30,60,120,300];//延时间隔,相对于首次通知时间,单位为 s
$res=doSomething();//发送消息
$content=date('Y-m-d H:i:s').' 第 '.$nums.' 次发送通知,消息ID为:'.$noticeId."\n";
if($res==true){
$content.='消息发送成功'."\n";
}else{//未收到对方回应
if($nums && $nums>=6){
$content.='消息ID:'.$noticeId."已达到最大通知次数,任务停止\n";
}else{
$content.='消息发送失败,等待下次重发'."\n";
$expTime=$stamp+$delayArr[$nums]-time();
$nums++;
saveNoticeToRedis($noticeId,$stamp,$nums,$expTime);//存入缓存
}
}
//记录日志
file_put_contents($root.'/tmp.log',$content,FILE_APPEND);
七、测试结果
点击在新窗口中浏览此图片
八、其他说明
本文内容为个人原创,首发今日头条,同时提供代码下载地址,供大家学习交流。本人以后还会发布更多原创干货,如果觉得有用,希望及时关注本头条号。
代码下载地址:http://www.i1981.com/zb_users/upload/2016/12/20161223.zip
DownLoad:
下载文件
点击这里下载文件
From: http://www.toutiao.com/a6369425996433408257/?tt_from=weixin&utm_campaign=client_share&app=news_article&utm_source=weixin&iid=11032449540&utm_medium=toutiao_ios&wxshare_count=1
作者:jackxiang@向东博客 专注WEB应用 构架之美 --- 构架之美,在于尽态极妍 | 应用之美,在于药到病除
地址:https://jackxiang.com/post/9201/
版权所有。转载时必须以链接形式注明作者和原始出处及本声明!
最后编辑: jackxiang 编辑于2019-9-20 17:37
评论列表