142 lines
		
	
	
		
			2.9 KiB
		
	
	
	
		
			JavaScript
		
	
	
	
	
	
		
		
			
		
	
	
			142 lines
		
	
	
		
			2.9 KiB
		
	
	
	
		
			JavaScript
		
	
	
	
	
	
|  | var async = require('./async.js'); | ||
|  | 
 | ||
|  | // API
 | ||
|  | module.exports = { | ||
|  |   iterator: wrapIterator, | ||
|  |   callback: wrapCallback | ||
|  | }; | ||
|  | 
 | ||
|  | /** | ||
|  |  * Wraps iterators with long signature | ||
|  |  * | ||
|  |  * @this    ReadableAsyncKit# | ||
|  |  * @param   {function} iterator - function to wrap | ||
|  |  * @returns {function} - wrapped function | ||
|  |  */ | ||
|  | function wrapIterator(iterator) | ||
|  | { | ||
|  |   var stream = this; | ||
|  | 
 | ||
|  |   return function(item, key, cb) | ||
|  |   { | ||
|  |     var aborter | ||
|  |       , wrappedCb = async(wrapIteratorCallback.call(stream, cb, key)) | ||
|  |       ; | ||
|  | 
 | ||
|  |     stream.jobs[key] = wrappedCb; | ||
|  | 
 | ||
|  |     // it's either shortcut (item, cb)
 | ||
|  |     if (iterator.length == 2) | ||
|  |     { | ||
|  |       aborter = iterator(item, wrappedCb); | ||
|  |     } | ||
|  |     // or long format (item, key, cb)
 | ||
|  |     else | ||
|  |     { | ||
|  |       aborter = iterator(item, key, wrappedCb); | ||
|  |     } | ||
|  | 
 | ||
|  |     return aborter; | ||
|  |   }; | ||
|  | } | ||
|  | 
 | ||
|  | /** | ||
|  |  * Wraps provided callback function | ||
|  |  * allowing to execute snitch function before | ||
|  |  * real callback | ||
|  |  * | ||
|  |  * @this    ReadableAsyncKit# | ||
|  |  * @param   {function} callback - function to wrap | ||
|  |  * @returns {function} - wrapped function | ||
|  |  */ | ||
|  | function wrapCallback(callback) | ||
|  | { | ||
|  |   var stream = this; | ||
|  | 
 | ||
|  |   var wrapped = function(error, result) | ||
|  |   { | ||
|  |     return finisher.call(stream, error, result, callback); | ||
|  |   }; | ||
|  | 
 | ||
|  |   return wrapped; | ||
|  | } | ||
|  | 
 | ||
|  | /** | ||
|  |  * Wraps provided iterator callback function | ||
|  |  * makes sure snitch only called once, | ||
|  |  * but passes secondary calls to the original callback | ||
|  |  * | ||
|  |  * @this    ReadableAsyncKit# | ||
|  |  * @param   {function} callback - callback to wrap | ||
|  |  * @param   {number|string} key - iteration key | ||
|  |  * @returns {function} wrapped callback | ||
|  |  */ | ||
|  | function wrapIteratorCallback(callback, key) | ||
|  | { | ||
|  |   var stream = this; | ||
|  | 
 | ||
|  |   return function(error, output) | ||
|  |   { | ||
|  |     // don't repeat yourself
 | ||
|  |     if (!(key in stream.jobs)) | ||
|  |     { | ||
|  |       callback(error, output); | ||
|  |       return; | ||
|  |     } | ||
|  | 
 | ||
|  |     // clean up jobs
 | ||
|  |     delete stream.jobs[key]; | ||
|  | 
 | ||
|  |     return streamer.call(stream, error, {key: key, value: output}, callback); | ||
|  |   }; | ||
|  | } | ||
|  | 
 | ||
|  | /** | ||
|  |  * Stream wrapper for iterator callback | ||
|  |  * | ||
|  |  * @this  ReadableAsyncKit# | ||
|  |  * @param {mixed} error - error response | ||
|  |  * @param {mixed} output - iterator output | ||
|  |  * @param {function} callback - callback that expects iterator results | ||
|  |  */ | ||
|  | function streamer(error, output, callback) | ||
|  | { | ||
|  |   if (error && !this.error) | ||
|  |   { | ||
|  |     this.error = error; | ||
|  |     this.pause(); | ||
|  |     this.emit('error', error); | ||
|  |     // send back value only, as expected
 | ||
|  |     callback(error, output && output.value); | ||
|  |     return; | ||
|  |   } | ||
|  | 
 | ||
|  |   // stream stuff
 | ||
|  |   this.push(output); | ||
|  | 
 | ||
|  |   // back to original track
 | ||
|  |   // send back value only, as expected
 | ||
|  |   callback(error, output && output.value); | ||
|  | } | ||
|  | 
 | ||
|  | /** | ||
|  |  * Stream wrapper for finishing callback | ||
|  |  * | ||
|  |  * @this  ReadableAsyncKit# | ||
|  |  * @param {mixed} error - error response | ||
|  |  * @param {mixed} output - iterator output | ||
|  |  * @param {function} callback - callback that expects final results | ||
|  |  */ | ||
|  | function finisher(error, output, callback) | ||
|  | { | ||
|  |   // signal end of the stream
 | ||
|  |   // only for successfully finished streams
 | ||
|  |   if (!error) | ||
|  |   { | ||
|  |     this.push(null); | ||
|  |   } | ||
|  | 
 | ||
|  |   // back to original track
 | ||
|  |   callback(error, output); | ||
|  | } |