标题:[实践OK]利用Redis的notifications功能实现延时任务,任务失败重试N次的机制实现,Node之Error: Cannot find module 'redis。 出处:向东博客 专注WEB应用 构架之美 --- 构架之美,在于尽态极妍 | 应用之美,在于药到病除 时间:Thu, 22 Jun 2017 23:45:37 +0000 作者:jackxiang 地址:http://jackxiang.com/post/9201/ 内容: PHP版本的实践:https://www.imooc.com/article/10431 背景:在工作中经常会遇到一些关于定时任务的实际场景,比如每天凌晨1点自动备份数据库,或者,每隔1小时执行一次爬虫脚本,这种固定时间执行固定动作的需求我们称之为定时任务,利用crontab即可轻松实现。如果我们对自动备份数据库这个定时任务改变一下需求(这种情况就像你邀请一个人,一天内如果没有人来或有人来你通知下你,你邀请的人来了,这种任务。二、再就是公司没啥好的设备,钱少,网太烂了搞一个任务比如Mysql备份数据库的脚本,比如备份Redis的数据Bgsave的Scp拷贝经常出现网络不好,第一次备份会失败,于是得第二次这种垃圾需求。有垃圾需求就有解决办法,于于优雅或不优雅是一回事,但得技术人员觉得有一个流程总比没有流程好,本来没有方案的,于是就有技术方案。),如图: 点击在新窗口中浏览此图片 如果仍然利用crontab来实现,就有点勉强了。类似这种需求最常见的是服务器之间的消息通知,假如服务器B由于网络不稳定或者服务器压力较大导致不能即时对服务器A的消息作出正确响应,那么服务器A就会延迟一段时间再次发送消息,直到收到服务器B的正确响应或者超出最大通知次数为止。过去的做法是定时扫表,把通知失败的消息再次发送一遍,虽然可以多次发送通知,但是发送间隔太短会增加服务器B的压力,发送间隔太长消息的时效性就不能保证,显然处理这种延时任务用crontab根本不能解决问题。 Node之Error: Cannot find module 'redis: #npm install -g redis /usr/lib └─┬ redis@2.7.1 ├── double-ended-queue@2.1.0-0 ├── redis-commands@1.3.1 └── redis-parser@2.6.0 环境变量: #rpm -ql nodejs-6.10.3-1.el7.x86_64 /usr/bin/node /usr/lib/node_modules export NODE_PATH=/usr/lib/node_modules #echo $NODE_PATH /usr/lib/node_modules #node notice.js 订阅成功 select 3 OK setex msg_2 2 chokingwin OK client.on("pmessage", function(pattern, channel, expiredKey) { console.log(pattern + "|" + channel + "|" + expiredKey); _ _keyevent@3__:expired|__keyevent@3__:expired|msg_2 从Redis 2.8.0版本起,加入了"Keyspace notifications"(即"键空间通知")的功能。按照官方的说法:键空间通知,允许Redis客户端从“发布/订阅”通道中建立订阅关系,以便客户端能够在Redis中的数据因某种方式受到影响时收到相应事件。比如:所有改变给定key的命令;所有经过lpush操作的key;所有在0号数据库中过期的key等等。我们在处理延时任务的时候,先把通知失败的消息ID作为key的一部分存到redis缓存中,并设定过期时间(相当于延时),当这条缓存数据失效的时候,通过订阅关系(用NodeJS实现)就可以收到消息,通过分析消息就可以知道过期KEY,这样就可以再次发送消息通知,从而实现延时任务。 不过,需要注意一点:Redis的发布/订阅目前是即发即弃(fire and forget)模式的,因此无法实现事件的可靠通知。也就是说,如果发布/订阅的客户端断链之后又重连,则在客户端断链期间的所有事件都丢失了。 核心部分是两个Redis的终端,分别连接上Redis,并打开这个特性,另一个终端是监控的,这块里面用代码进行编写订阅,如下: 订阅,作者用的是Node,我在这儿不得不打下广告了,Swoole是不是应该也能支持这个功能?https://wiki.swoole.com/wiki/page/523.html ,http://blog.csdn.net/koastal/article/details/52869140,subscribe。 psubscribe来自:https://wiki.swoole.com/wiki/page/590.html set(array( 'worker_num' => 8, //工作进程数量 'daemonize' => false, //是否作为守护进程 )); $serv->on('connect', function ($serv, $fd){ echo "Client:Connect.\n"; }); $serv->on('receive', function ($serv, $fd, $from_id, $data) { $val = ""; $redis = new Swoole\Coroutine\Redis(); $redis->connect('10.51.77.34', 6379); while (true) { $val = $redis->psubscribe(['psubscribe __keyevent@3__:expired']); //订阅的channel,以第一次调用subscribe时的channel为准,后续的subscribe调用是为了收取Redis Server>的回包 //如果需要改变订阅的channel,请close掉连接,再调用subscribe var_dump($val); } }); $serv->on('close', function ($serv, $fd) { echo "Client: Close.\n"; }); $serv->start(); Swoole的这个Redis的Coroutine必须要有一个端口暴露,这是和Node最大的不同吧?上面这个图我试着使用了一下,感觉有点问题。 ============================================================================= #redis-cli -h 10.51.77.34 10.51.77.34:6379> psubscribe __keyevent@0__:expired Reading messages... (press Ctrl-C to quit) 1) "psubscribe" 2) "__keyevent@0__:expired" 3) (integer) 1 1) "pmessage" 2) "__keyevent@0__:expired" 3) "__keyevent@0__:expired" 4) "name" 塞一个数据进去: #redis-cli -h 10.51.77.34 10.51.77.34:6379> config set notify-keyspace-events Ex OK 10.51.77.34:6379> setex name 10 chokingwin OK =================================================================================== 关于expired事件通知的发送时间 Redis使用以下两种方式删除过期的键:a:当一个键被访问时,程序会对这个键进行检查,如果键已过期,则删除该键;b:系统会在后台定期扫描并删除那些过期的键。 当过期键被以上两种方式中的任意一种发现并且删除时,才会产生expired事件通知。 Redis不保证生存时间(TTL)变为 0 的键会立即被删除:如果没有命令访问这个键,或者设置生存时间的键非常多的话,那么在键的生存时间变为0,到该键真正被删除,这中间可能会有一段比较显著的时间间隔。 因此,Redis产生expired事件通知的时间,是过期键被删除的时候,而不是键的生存时间变为 0 的时候。 接下来我们开始代码实现(假定阅读本文的同学已正确安装Nginx/PHP/Redis/NodeJS的环境)。 一、与本文相关的环境信息 Redis配置文件路径:/etc/redis/6379.conf 测试用的Redis库编号为:3 监听消息的NodeJS文件:/NodeApp/notice.js 发送消息的PHP代码为:/send.php 接收redis数据的PHP代码:/test.php 业务流程:首先运行notice.js开启监听,然后运行send.php发送消息,如果没有收到成功响应,将消息ID存入redis缓存,之后按照10秒、30秒、60秒、120秒、300秒的时间间隔,再次发送消息通知,直到收到对消息的成功响应,或者超出最大通知次数为止。 二、修改Redis配置文件 因为键空间通知功能需要耗费一定的CPU时间,因此默认情况下,该功能是关闭的。可以通过修改配置文件,或者通过CONFIG SET命令,设置notify-keyspace-events选项,来启用或关闭该功能。 该选项的值为空字符串时,该功能禁用,选项值为非空字符串时,启用该功能,非空字符串由特定的多个字符组成,每个字符表示不同的意义: K keyspace事件,事件以__keyspace@__为前缀进行发布 E keyevent事件,事件以__keyevent@__为前缀进行发布 g 一般性的,非特定类型的命令,比如del,expire,rename等 $ 字符串特定命令 l 列表特定命令 s 集合特定命令 h 哈希特定命令 z 有序集合特定命令 x 过期事件,当某个键过期并删除时会产生该事件 e 驱逐事件,当某个键因maxmemore策略而被删除时,产生该事件 A g$lshzxe的别名,因此”AKE”意味着所有事件 注意:该选项的值中至少需要包含K或者E,否则不会发布任何事件。比如,如果需要开启针对列表的keyspace事件通知,则该选项需要配置为"Kl"。 我们在服务器上运行vim /etc/redis/6379.conf,找到notify-keyspace-events开头的一行,将其配置为:notify-keyspace-events Ex,含义为:发布keyevent事件,使用过期事件(当每一个key失效时,都会生成该事件)。保存退出,并重启redis服务。如图: 点击在新窗口中浏览此图片 三、安装Node扩展 在网站根目录下,依次运行: npm init #初始化创建package.json npm install redis #安装redis扩展 npm install mysql #安装mysql扩展 四、实现send.php 为了便于实现延时的计算,我们将存入redis的key格式设计为:固定前缀+消息ID+时间戳+次数,如:noticeId_12345678_1482991887_2点击在新窗口中浏览此图片 关键代码: $delayArr=[0,10,30,60,120,300];//延时间隔,相对于首次通知时间,单位为 s $res=doSomething();//发送消息 $content=date('Y-m-d H:i:s').' 第 '.$nums.' 次发送通知,消息ID为:'.$noticeId."\n"; if($res==true){ $content.='消息发送成功'."\n"; }else{//未收到对方回应 $content.='消息发送失败,等待下次重发'."\n"; $expTime=$delayArr[$nums]; $nums++; saveNoticeToRedis($noticeId,$stamp,$nums,$expTime);//存入缓存 } //记录日志 file_put_contents($root.'/tmp.log',$content,FILE_APPEND); 五、实现 notice.js 服务器端运行notice.js后,会一直监听redis的Expired事件,取到ExpiredKey后,把消息ID、时间、通知次数,POST给test.php,从而实现再次发送消息。 关键代码: var client = redis.createClient('6379', '127.0.0.1'); client.psubscribe("__keyevent@"+redisDB+"__:expired",function(){ //console.log('订阅成功'); }); client.on("pmessage", function(pattern, channel, expiredKey) { var tmpArr=expiredKey.split('_'); if(tmpArr[0]==keyPrefix){ console.log('-----expired Key-----',expiredKey); var noticeId=tmpArr[1]; var stamp=parseInt(tmpArr[2]); var nums=parseInt(tmpArr[3]); sendPost(noticeId,stamp,nums,logFile);//向test.php发送数据 }else{ console.log('-----error Key-----',expiredKey); writeLog(logFile,'The key "'+expiredKey+'" is a error key.'); } }); 六、实现 test.php 点击在新窗口中浏览此图片 关键代码: $delayArr=[0,10,30,60,120,300];//延时间隔,相对于首次通知时间,单位为 s $res=doSomething();//发送消息 $content=date('Y-m-d H:i:s').' 第 '.$nums.' 次发送通知,消息ID为:'.$noticeId."\n"; if($res==true){ $content.='消息发送成功'."\n"; }else{//未收到对方回应 if($nums && $nums>=6){ $content.='消息ID:'.$noticeId."已达到最大通知次数,任务停止\n"; }else{ $content.='消息发送失败,等待下次重发'."\n"; $expTime=$stamp+$delayArr[$nums]-time(); $nums++; saveNoticeToRedis($noticeId,$stamp,$nums,$expTime);//存入缓存 } } //记录日志 file_put_contents($root.'/tmp.log',$content,FILE_APPEND); 七、测试结果 点击在新窗口中浏览此图片 八、其他说明 本文内容为个人原创,首发今日头条,同时提供代码下载地址,供大家学习交流。本人以后还会发布更多原创干货,如果觉得有用,希望及时关注本头条号。 代码下载地址:http://www.i1981.com/zb_users/upload/2016/12/20161223.zip DownLoad: 下载文件 点击这里下载文件 From: http://www.toutiao.com/a6369425996433408257/?tt_from=weixin&utm_campaign=client_share&app=news_article&utm_source=weixin&iid=11032449540&utm_medium=toutiao_ios&wxshare_count=1 Generated by Jackxiang's Bo-blog 2.1.1 Release