话说写nodejs忽然也两年多了,让后发现事件编程写起来的确和原来的流式程序感觉不一样,其实在C#里面借助的委托和事件其实也能达到类似效果,不过只是那样写的人少一些而已,因为在C#里面的大call并不是所有都是异步的。而node里面则不然,基本上都是异步需要回调的。
于是你就会发现一个问题,多异步的call应该怎么去控制触发的时间,然后再如何去控制回调里面的操作,以及多个call之间的先后问题。其实如果是同步方法,这一切也就是几行代码的问题。但如果是异步的,那么你就必须十分小心了。
当然,一般处理复杂的业务流的时候基本上都是事务操作的,用异步回调来处理复杂流操作其实也是有点醉的。
在node里面,为了避免回调的大坑,我们一半用async库来集中处理,其实现本身也是一个循环函数链不停的侦听,+1和-1的操作而已。回过头想想原来写MFC程序时候,里面封装的消息pipline,大致也是如此方式,只不过那边主数据结构是一个队列,而这边用的是一个链表而已。
下面的程序展示了以下几个点,供自己以后回忆记录来用。
1,利用waterfall来控制查询,从队列的长度来判断看是否需要从队列里面取东西,
2,串行或者并行的处理取到item,确认是从ios或者android的通道进行处理。
3,对于多array的for在nodejs里面,为了避免array的长度过长而导致nodejs缓存超出限制,所以利用eachlimit来进行每次异步遍历提交的数量。
4,在异步的forEach当中,需要遍历完毕所有项目才进行回调。避免单次for当中的cb多次被for给撸了。
5,v8的调用栈深度的内存是有限制的,如果在大的遍历中,自测大概5000以上,一般cb就会溢出了,所以呢需要在cb的时候强制给释放一下,其实就是setimmediate或者用process.nextTick来处理一下便可。
//(function loop() { setInterval(function () { process.nextTick(function () { async.waterfall([ function (cb1) { redisClient.queueLength(function (err, dt) { cb1(err, dt); }); }, function (qlenth, cb2) { if (qlenth > 0) { redisClient.queuePop(function (err, item) { if (err) { console.log(err); return cb2(err, "pop"); } var tpItem = JSON.parse(item[1]);//get the item from the queue if (tpItem) { Notification.find({ ftID: {$in: tpItem.targetUID}, Subscribe: true }).lean().exec(function (err, dt) { if (err) return cb2(err, "pop"); if (dt && dt.length > 0) {//get the push item from the mongo notification async.series([function iosPush(cbIOS) { var tpIOSDT = _.pluck(_.filter(dt, function (it) { return tools.compareAcceptSubArray(tpItem.content.messageType, it.accept) && it.deviceType == 'ios'; }), 'pushID'); if (tpIOSDT.length > 0) { Notification.sendApplePushMult({ pushID: tpIOSDT, content: tpItem.content }, function (err, result) { console.log(err + JSON.stringify(result)); cbIOS(err, result); }); } else { cbIOS(null, 'pop'); } }, function androidPush(cbAndroid) { var tpAndroidDT = _.pluck(_.filter(dt, function (it) { return tools.compareAcceptSubArray(tpItem.content.messageType, it.accept) && it.deviceType == 'android'; }), 'pushID'); if (tpAndroidDT.length > 0) { Notification.sendAndroidPushNotifyMult({ pushID: tpAndroidDT, content: tpItem.content }, function (err, result) { console.log(err + JSON.stringify(result)); cbAndroid(err, result); }) } else { cbAndroid(null, 'pop'); } }], function (err, result) { console.log(err + JSON.stringify(result)); setImmediate(function () { cb2(err, result) }); }); } else { cb2(null, "pop"); } }); } else { cb2(null, "pop"); } }); } else { cb2(null, 'pop'); } }], function (err, results) { console.log(new Date().getMilliseconds() + " one round done "); //setImmediate(loop); }); }) //}); //}()); // //while(true) { //process.nextTick(function () { //todo old //} }, 1000);