Nodejs实践二三事

二三事

  1. HTTP请求的超时处理 (HTTP Request Timeout Handle)

    Nodejs不支持设置超时参数?怎样增加超时处理呢?为什么要处理这些呢?

  2. Stream & 读写大文件

    Stream是干什么的?V8有1G堆栈内存限制,那么Nodejs该如何读写大文件?

  3. 串、并行的异步调用

    什么?异步调用还有串行和并行?Callback、Callback ... and Callback

Who am I?

Twitter: @fengmk2, Weibo: @Python发烧友, FaWave

HTTP请求的超时处理

基本HTTP请求

Nodejs中请求一个HTTP URL非常方便:http.request, 封装url get方法:

var http = require('http')
  , parse = require('url').parse;
function urlget(url, callback) {
    var info = parse(url)
      , path = info.pathname + (info.search || '')
      , options = { host: info.hostname, 
            port: info.port || 80, 
            path: path, 
            method: 'GET' };
    var req = http.request(options, function(res) {
    
    var req = http.request(options, function(res) {
        var chunks = [], length = 0;
        res.on('data', function(chunk) {
            length += chunk.length;
            chunks.push(chunk);
        }).on('end', function() {
            var data = new Buffer(length), pos = 0
              , l = chunks.length;
            for(var i = 0; i < l; i++) {
                chunks[i].copy(data, pos);
                pos += chunks[i].length;
            }
            res.body = data;
            callback(null, res);
        }).on('error', function(err) {
            callback(err, res);
        });
    }).on('error', function(err) {
        callback(err);
    });
    req.end();
};

请求正常的URL

一般来说,urlget都能很好地按我们想象的方式正常工作:

var good_url = 'http://www.google.com/';
urlget(good_url, function(err, res) {
    console.log('\nGET', good_url);
    if(err) {
        console.log('error:', err, '\nHeaders:\n', 
            res ? res.headers : null);
    } else {
        console.log('Headers:\n', res.headers, 
            '\nBody:\n', res.body.toString());
    }
});
    

如果请求的URL响应时间很长呢?

例如一些被墙的服务,一些耗时不稳定的服务。即使是你自己的服务,也会有不稳定的时候。

var timeout_url = 'http://t.co/';
urlget(timeout_url, function(err, res) {
    console.log('\nGET', timeout_url);
    if(err) {
        console.log('error:', err, '\nResponse:\n', 
            res ? res.headers : null);
    } else {
        console.log('Headers:\n', res.headers, 
            '\nBody:\n', res.body.toString());
    }
});

t.co, bit.ly 感谢GFW

Request Timeout: 75秒后超时了

等不及了,请求连接超时(Request timeout)异常触发,这是我本机之前的测试结果:

GET http://t.co/
75584
error: { stack: [Getter/Setter],
  arguments: undefined,
  type: undefined,
  message: 'ETIMEDOUT, Operation timed out',
  errno: 60,
  code: 'ETIMEDOUT',
  syscall: 'connect' } 
Response:
 null
    

我能自己设置超时时间吗?很抱歉,在Nodejs文档搜索得到的结果是:

404 Not Found

不会吧,这会白白浪费生命啊,自己掌控不了很不自在啊。。。

我的命,我自己操盘!

《窃听风云2》

为urlget增加请求超时

http.request 即使没有提供参数让我们设置超时,但是人类已经无法阻止setTimeout的出现。

没错,就是setTimeout, 它一直都在:

var req = null, request_timeout = null;
request_timeout = setTimeout(function() {
    request_timeout = null;
    // 终止请求
    req.abort();
    // 回调返回超时异常
    callback(new Error('Request timeout'));
}, 5000);
    

增加请求超时后,urlget完整代码会是怎样的呢?

function urlget(url, callback) {
    // ... 此处忽略相同部分的代码 ...
    var req = null, request_timeout = null;
    request_timeout = setTimeout(function() {
        request_timeout = null; 
        req.abort();
        callback(new Error('Request timeout'));
    }, 5000);
    req = http.request(options, function(res) {
        clearTimeout(request_timeout);
        var chunks = [], length = 0;
        // ... 此处忽略相同部分的代码 ...
    }).on('error', function(err) {
        // node0.5.x及以上,
        // req.abort()会触发一次“socket hang up”
        // 所以需要判断是否超时
        if(request_timeout) {
            clearTimeout(request_timeout);
            callback(err);
        }
    });
    req.end();
};  

什么!?还有响应超时!

处理请求超时,还有的情况是响应返回时间过长的问题,先看看模拟响应时间过长的服务器端代码:

var http = require('http')
  , parse = require('url').parse;
http.createServer(function(req, res) {
    var info = parse(req.url, true);
    var s = +info.query.s;
    // 响应第一批数据
    res.write('Please waitting for ' 
        + s + ' seconds...');
    setTimeout(function() {
        // 模拟响应处理时间
        res.end(s + ' seconds, url: ' + req.url);
    }, s * 1000);
}).listen(1984);
console.log('Server http://localhost:1984/');

响应超时了!

从代码中可以看到,一个请求过来,会马上响应第一批数据, 接着在参数指定的时间后响应剩余的数据。对于这种情况,我们仅仅有请求超时还是不够的:

var url = 'http://localhost:1984/foo?s=10';
urlget(url, function(err, res) {
    if(err) {
        console.log('error:', err, 
            '\nHeaders:\n', res ? res.headers);
    } else {
        console.log('Headers:\n', res.headers, 
            '\nBody:\n', res.body.toString());
    }
}); 
function urlget(url, callback) {
    // ... 此处忽略相同部分的代码 ...
    req = http.request(options, function(res) {
        clearTimeout(request_timeout);
        var chunks = [], length = 0, response_timeout = null;
        response_timeout = setTimeout(function() {
            response_timeout = null;
            req.abort();
            callback(new Error('Response timeout'));
        }, 5000);
        res.on('data', function(chunk) {
            length += chunk.length;
            chunks.push(chunk);
        }).on('end', function() {
            if(response_timeout) {
                // node0.5.x及以上:req.abort()会触发res的end事件
                clearTimeout(response_timeout);
                var data = new Buffer(length);
                // ... 此处忽略相同部分的代码 ...
            }
        }).on('error', function(err) {
            // ... 此处忽略相同部分的代码 ...
        }).on('aborted', function() {
            if(response_timeout) {
                // node0.5.x及以上:当res有效的时候,
                // req.abort()会触发res的aborted事件
                callback(new Error('Response aborted'), res);
            }
        });
    }).on('error', function(err) {
        // ... 此处忽略相同部分的代码 ...

请求超时(Request Timeout):

HTTP客户端发起请求到接受到HTTP服务器端返回响应头的这段时间, 如果超出设定时间,则表示请求超时。

示例代码:http-request-with-timeout-demo.js

响应超时(Response Timeout):

HTTP服务器端开始发送响应数据到HTTP客户端接收全部数据的这段时间, 如果超出设定时间,则表示响应超时。

示例代码:http-request-with-timeout-all-demo.js

尼玛,超时伤不起!为什么要处理这些?

随便请求一个URL确实很容易,但是你想让你的代码健壮性足够好,你必须处理每一个细节。

一个挂起的socket可能你无法注意到,但是瞬间100w个socket挂起了,你的服务器就无法控制了。

使用这么多定时器,会不会有性能问题?

为了解决这个疑惑,就得深入源代码了,查看lib/timers_legacy.js。 从代码中会看到这段注释:

// IDLE TIMEOUTS
//
// Because often many sockets will have the same idle timeout we will not
// use one timeout watcher per item. It is too much overhead.  Instead
// we'll use a single watcher for all sockets with the same timeout value
// and a linked list. This technique is described in the libev manual:
// http://pod.tst.eu/http://cvs.schmorp.de/libev/ev.pod#Be_smart_about_timeouts

Nodejs已经知道会出现大量定时器的情景,于是利用单定时器和双向链表实现了一个非常高效的方案。

像我示例中,超时时间都是固定的5秒和10秒,就算同时发起100000个请求,也只是使用一个Timer进行处理。 因为使用双向链表,从链接remove的时间复杂度是O(1),所以clearTimeout也是非常高效的。

Stream
&
读写大文件

Stream

流(Stream),流窜在Nodejs的各个角落。

Stream有3种:只读,只写,读写。

Stream继承了EventEmitter,正是Stream抽象出各种标准的事件,让文件流,网络流等有了标准的处理方式。

Stream可以说是Nodejs事件驱动编程绝好范例。

读写大文件 & 内存限制

由于V8 1G堆栈内存限制问题,不要直接使用fs.readFilefs.writeFile 进行大文件读写操作。

必须使用Stream版本:fs.ReadStreamfs.WriteStream来对文件进行读写操作。

上传大文件:fs.ReadStream

创建只读文件流

var readstream = fs.createReadStream(uploadfile);

通过监听文件的data事件,获取数据,就像它自己会吐数据出来一样 而不用自己去调用read方法,一点一点地去取数据

readstream.on('data', function(chunk) {
    console.log('write', chunk.length);
    // 向服务器发送数据
    req.write(chunk);
});

通过end事件可以判断文件数据是否全部读取完了

readstream.on('end', function() {
    req.end();
});

Pipe:将水管连接起来

嫌既监听data又要监听end事件很麻烦?那就试试pipe吧,简直像安装水管那么简单。

直接使用pipe,想象两端水管,我们只需将他们按照水流方向连接起来即可(吐数据 ==> 收数据) 当数据读取完,会自动触发req.end()

readstream.pipe(req);

你没眼花,就是一行代码这么简单,所有数据就会自动发出去了。

:) 呵呵,原来程序员也是水电工。

完整示例代码

通过readStream读取大文件并发送到网络中去:upload_file.js

下载大文件:fs.WriteStream

创建只写文件流

var writestream = fs.createWriteStream(savefile);

继续做水电工,安装水管,还是以水流的方向安装
(吐数据 ==> 收数据) 这次网络流变成吐数据,文件流变成收数据

res.pipe(writestream);

PS:非pipe方式请发挥你的水电工想象力。

文件句柄已关闭,回调结果

writestream.on('close', function() {
    callback(null, res);
});

通过WriteStream接收网络中得到的数据:download_file.js

程序员水电工:Pipe

串、并行的异步调用

串行调用

嵌套Callback

db.users.findOne({name: 'foo'}, function(err, user) {
    if(err) {
        return next(err);
    }
    user.visit_count += 1;
    var updates = {visit_count: user.visit_count};
    db.users.update({_id: user._id}, {$set: updates}, 
            function(err) {
        if(err) {
            return next(err);
        }
        res.render('profile', {user: user});
    });
});

并行调用

需要计数器辅助判断异步请求是否都完成了

var counter = 2, post = null, comments = null;
function handle() {
    if(--counter === 0) {
        callback(post, comments);
    }
};
db.posts.findOne({_id: post_id}, function(err, p) {
    post = p;
    handle();
});
db.comments.find({pid: post_id}).toArray(function(err, cms) {
    comments = cms;
    handle();
});

Callback, Callback, Callback and Callback

你肯定见过这样的代码


foo(bar, function(err, data) {
    // ... callback again again and again
    // ...
                            })
                    // ...
                })
            })
        })
    })
});

事件驱动 | EventProxy

将之前的串行和并行调用都混在一起

var ep = new EventProxy();
ep.assgin('user', 'post', 'comments', function(user, post, comments) {
    callback(user, post, comments);
});
ep.on('user', function(user) {
    db.users.update(...);
});
db.users.findOne({name: 'foo'}, function(err, user) {
    ep.emit('user', user);
});
db.posts.findOne({_id: post_id}, function(err, post) {
    ep.emit('post', post);
});
db.comments.find({pid: post_id})
        .toArray(function(err, comments) {
    ep.emit('comments', comments);
});

更多关注 @朴灵 的分享

Q & A
知乎者也

next("Thanks ^_^")

/

#