diff --git a/pool.js b/pool.js index d6d0a49..aecc6df 100644 --- a/pool.js +++ b/pool.js @@ -3,11 +3,12 @@ var GO, // GO is global object, for passing to a REPL or finding in a core dump EventEmitter = require("events").EventEmitter, Stream = require("stream"), - inherits = require("util").inherits; + inherits = require("util").inherits, + http = require('http'); // Pool - manages a set of equivalent endpoints and distributes requests among them // -// http: which endpoint HTTP module to use, either http.js or https.js +// protocol: which endpoint HTTP module to use, either http.js or https.js // endpoints: array of strings formatted like "ip:port" // options: { // max_pending: number of pending requests allowed (1000) @@ -21,14 +22,14 @@ var GO, // GO is global object, for passing to a REPL or finding in a core dump // max_retries: number (default = 5) // agent_options: {} an object for passing options directly to the HTTP(S) Agent // } -function Pool(http, endpoints, options) { - if (!http || !http.request || !http.Agent) { +function Pool(protocol, endpoints, options) { + if (!protocol || !protocol.request || !protocol.Agent) { throw new Error("invalid http module"); } if (! Array.isArray(endpoints)) { throw new Error("endpoints must be an array"); } - this.http = http; + this.protocol = protocol; options = options || {}; options.retry_filter = options.retry_filter || options.retryFilter; @@ -52,6 +53,8 @@ function Pool(http, endpoints, options) { this.max_pending = options.max_pending || options.maxPending || 1000; this.endpoints = []; this.endpoints_by_name = {}; + this.keep_alive = options.keep_alive || options.keepAlive; + this.agent_options = options.agent_options || options.agentOptions; this.length = 0; for (var i = 0; i < endpoints.length; i++) { @@ -62,6 +65,17 @@ function Pool(http, endpoints, options) { throw new Error("no valid endpoints"); } + + if (this.keep_alive) { + if (protocol === http) { + this.agent = new GO.KeepAliveAgent.HTTP(this.agent_options); + } else { + this.agent = new GO.KeepAliveAgent.HTTPS(this.agent_options); + } + } else { + this.agent = new protocol.Agent(this.agent_options); + } + // this special endpoint is returned when the pool is overloaded this.overloaded_endpoint = new GO.PoolEndpoint({Agent: Object}, null, null, {timeout: 0}); this.overloaded_endpoint.special_endpoint = "overloaded"; @@ -309,7 +323,7 @@ Pool.prototype.add_endpoint = function (host_port) { var ip = ip_port[0]; var port = +ip_port[1]; if (port > 0 && port < 65536) { - var endpoint = new GO.PoolEndpoint(this.http, ip, port, this.options); + var endpoint = new GO.PoolEndpoint(this.protocol, ip, port, this.options); endpoint.on("health", this.endpoint_health_changed.bind(this)); endpoint.on("timeout", this.endpoint_timed_out.bind(this)); this.endpoints.push(endpoint); diff --git a/pool_endpoint.js b/pool_endpoint.js index 178b430..6edec8a 100644 --- a/pool_endpoint.js +++ b/pool_endpoint.js @@ -33,14 +33,18 @@ function PoolEndpoint(protocol, ip, port, options) { this.keep_alive = options.keep_alive || options.keepAlive; this.agent_options = options.agent_options || options.agentOptions; - if (this.keep_alive) { - if (protocol === http) { - this.agent = new GO.KeepAliveAgent.HTTP(this.agent_options); + if (options.agent) { + this.agent = options.agent; + } else { + if (this.keep_alive) { + if (protocol === http) { + this.agent = new GO.KeepAliveAgent.HTTP(this.agent_options); + } else { + this.agent = new GO.KeepAliveAgent.HTTPS(this.agent_options); + } } else { - this.agent = new GO.KeepAliveAgent.HTTPS(this.agent_options); + this.agent = new protocol.Agent(this.agent_options); } - } else { - this.agent = new protocol.Agent(this.agent_options); } if (this.agent && typeof this.agent.getName === 'function') {