channel.js 3.2 KB
/**
 * Module dependencies.
 */
var Receiver = require('./receiver')

/**
 * Expose `Channel`.
 */
module.exports = Channel

/**
 * Constants.
 */
var CLOSED_ERROR_MSG = 'Cannot add to closed channel'

/**
 * Initialize a `Channel`.
 *
 * @param {Function|Object} [empty=Object]
 * @api private
 */
function Channel(bufferSize) {
  this.pendingAdds = []
  this.pendingGets = []
  this.items       = []
  this.bufferSize  = parseInt(bufferSize, 10) || 0
  this.isClosed    = false
  this.isDone      = false
  this.empty       = {}
}

/**
 * Static reference to the most recently called callback
 */
Channel.lastCalled = null

/**
 * Get an item with `cb`.
 *
 * @param {Function} cb
 * @api private
 */
Channel.prototype.get = function (cb){
  if (this.done()) {
    this.callEmpty(cb)
  } else if (this.items.length > 0 || this.pendingAdds.length > 0) {
    this.call(cb, this.nextItem())
  } else {
    this.pendingGets.push(cb)
  }
}

/**
 * Remove `cb` from the queue.
 *
 * @param {Function} cb
 * @api private
 */
Channel.prototype.removeGet = function (cb) {
  var idx = this.pendingGets.indexOf(cb)
  if (idx > -1) {
    this.pendingGets.splice(idx, 1)
  }
}

/**
 * Get the next item and pull from pendingAdds to fill the buffer.
 *
 * @return {Mixed}
 * @api private
 */
Channel.prototype.nextItem = function () {
  if (this.pendingAdds.length > 0) {
    this.items.push(this.pendingAdds.shift().add())
  }
  return this.items.shift()
}

/**
 * Add `val` to the channel.
 *
 * @param {Mixed} val
 * @return {Function} thunk
 * @api private
 */
Channel.prototype.add = function (val){
  var receiver = new Receiver(val)

  if (this.isClosed) {
    receiver.error(Error(CLOSED_ERROR_MSG))
  } else if (this.pendingGets.length > 0) {
    this.call(this.pendingGets.shift(), receiver.add())
  } else if (this.items.length < this.bufferSize) {
    this.items.push(receiver.add())
  } else {
    this.pendingAdds.push(receiver)
  }

  return function (cb) {
    receiver.callback(cb)
  }
}

/**
 * Invoke `cb` with `val` facilitate both
 * `chan(value)` and the `chan(error, value)`
 * use-cases.
 *
 * @param {Function} cb
 * @param {Mixed} val
 * @api private
 */
Channel.prototype.call = function (cb, val) {
  Channel.lastCalled = this.func
  if (val instanceof Error) {
    cb(val)
  } else {
    cb(null, val)
  }
  this.done()
}

/**
 * Invoke `cb` callback with the empty value.
 *
 * @param {Function} cb
 * @api private
 */
Channel.prototype.callEmpty = function (cb) {
  this.call(cb, this.empty)
}

/**
 * Prevennt future values from being added to
 * the channel.
 *
 * @return {Boolean}
 * @api public
 */
Channel.prototype.close = function () {
  this.isClosed = true
  var receiver
  while (receiver = this.pendingAdds.shift()) {
    receiver.error(Error(CLOSED_ERROR_MSG))
  }
  return this.done()
}

/**
 * Check to see if the channel is done and
 * call pending callbacks if necessary.
 *
 * @return {Boolean}
 * @api private
 */
Channel.prototype.done = function () {
  if (!this.isDone && this.isClosed && this.items.length === 0) {
    this.isDone = true
    // call each pending callback with the empty value
    this.pendingGets.forEach(function (cb) { this.callEmpty(cb) }, this)
  }
  return this.isDone
}