stomp.js | |
---|---|
stompThe stomp.StompAn instance of the
If debug is set to true, extra output will be printed to the console. | |
Helpers to handle frames, and do parsing | var net = require('net'),
tls = require('tls'),
sys = require('util'),
frame = require('./frame'),
stomp_utils = require('./stomp-utils'),
exceptions = require('./stomp-exceptions'),
utils = new stomp_utils.StompUtils(),
log = null;
function parse_command(data) {
var command,
this_string = data.toString('utf8', 0, data.length);
command = this_string.split('\n');
return command[0];
};
function parse_headers(headers_str) {
var these_headers = {},
one_header = [],
header_key = null,
header_val = null,
headers_split = headers_str.split('\n');
for (var i = 0; i < headers_split.length; i++) {
one_header = headers_split[i].split(':');
if (one_header.length > 1) {
header_key = one_header.shift();
header_val = one_header.join(':');
these_headers[header_key] = header_val;
}
else {
these_headers[one_header[0]] = one_header[1];
}
}
return these_headers;
};
function parse_frame(chunk) {
var args = {},
data = null,
command = null,
headers = null,
body = null,
headers_str = null;
if (!utils.really_defined(chunk))
return null;
command = parse_command(chunk);
data = chunk.slice(command.length + 1, chunk.length);
data = data.toString('utf8', 0, data.length);
var the_rest = data.split('\n\n');
headers = parse_headers(the_rest[0]);
body = the_rest.slice(1, the_rest.length);
if ('content-length' in headers)
headers['bytes_message'] = true;
args = {
command: command,
headers: headers,
body: body
}
var this_frame = new frame.Frame();
var return_frame = this_frame.build_frame(args);
return return_frame;
};
function _connect(stomp) {
log = stomp.log;
if (stomp.ssl) {
log.debug('Connecting to ' + stomp.host + ':' + stomp.port + ' using SSL');
stomp.socket = tls.connect(stomp.port, stomp.host, stomp.ssl_options, function() {
log.debug('SSL connection complete');
if (!stomp.socket.authorized) {
log.error('SSL is not authorized: '+stomp.socket.authorizationError);
if (stomp.ssl_validate) {
_disconnect(stomp);
return;
}
}
_setupListeners(stomp);
});
} else {
log.debug('Connecting to ' + stomp.host + ':' + stomp.port);
stomp.socket = new net.Socket();
stomp.socket.connect(stomp.port, stomp.host);
_setupListeners(stomp);
}
}
function _setupListeners(stomp) {
function _connected() {
log.debug('Connected to socket');
var headers = {};
if (utils.really_defined(stomp.login) &&
utils.really_defined(stomp.passcode)) {
headers.login = stomp.login;
headers.passcode = stomp.passcode;
}
if (utils.really_defined(stomp["client-id"])) {
headers["client-id"] = stomp["client-id"];
}
stomp_connect(stomp, headers);
}
var socket = stomp.socket;
socket.on('drain', function(data) {
log.debug('draining');
});
var buffer = '';
socket.on('data', function(chunk) {
buffer += chunk;
var frames = buffer.split('\0\n'); |
Temporary fix : NULL,LF is not a guranteed standard, the LF is optional, so lets deal with it. (Rauls) | if (frames.length == 1) {
frames = buffer.split('\0');
}
if (frames.length == 1) return;
buffer = frames.pop();
var parsed_frame = null;
var _frame = null;
while (_frame = frames.shift()) {
parsed_frame = parse_frame(_frame);
stomp.handle_new_frame(parsed_frame);
}
});
socket.on('end', function() {
log.debug("end");
});
socket.on('error', function(error) {
log.error(error.stack + 'error name: ' + error.name);
stomp.emit("error", error);
});
socket.on('close', function(error) {
log.debug('disconnected');
if (error) {
log.error('Disconnected with error: ' + error);
}
stomp.emit("disconnected", error);
});
if (stomp.ssl) {
_connected();
} else {
socket.on('connect', _connected);
}
};
function stomp_connect(stomp, headers) {
var _frame = new frame.Frame(),
args = {},
headers = headers || {};
args['command'] = 'CONNECT';
args['headers'] = headers;
var frame_to_send = _frame.build_frame(args);
send_frame(stomp, frame_to_send);
};
function _disconnect(stomp) {
var socket = stomp.socket;
socket.end();
if (socket.readyState == 'readOnly')
socket.destroy();
log.debug('disconnect called');
};
function send_command(stomp, command, headers, body, want_receipt) {
var want_receipt = want_receipt || false;
if (!utils.really_defined(headers))
headers = {};
var args = {
'command': command,
'headers': headers,
'body': body
};
var _frame = new frame.Frame();
var this_frame = _frame.build_frame(args, want_receipt);
send_frame(stomp, this_frame);
return this_frame;
};
function send_frame(stomp, _frame) {
var socket = stomp.socket;
var frame_str = _frame.as_string();
if (socket.write(frame_str) === false) {
log.debug('Write buffered');
}
return true;
}; |
Stomp - Client APITakes an argument object | function Stomp(args) {
this.port = args['port'] || 61613;
this.host = args['host'] || "127.0.0.1";
this.debug = args['debug'];
this.login = args['login'] || null;
this.passcode = args['passcode'] || null;
this.log = new StompLogging(this.debug);
this._subscribed_to = {};
this.session = null;
this.ssl = args['ssl'] ? true : false;
this.ssl_validate = args['ssl_validate'] ? true : false;
this.ssl_options = args['ssl_options'] || {};
this['client-id'] = args['client-id'] || null;
}; |
Stomp is an EventEmitter | Stomp.prototype = new process.EventEmitter(); |
Stomp.connect()Begin connection | Stomp.prototype.connect = function() {
_connect(this);
}; |
Stomp.handle_new_frame()Handle frame based on type. Emit events when needed. Takes a | Stomp.prototype.handle_new_frame = function(this_frame) {
switch (this_frame.command) {
case "MESSAGE":
if (utils.really_defined(this_frame.headers['message-id'])) {
if (this_frame.headers !== null && this_frame.headers.destination !== null && this._subscribed_to[this_frame.headers.destination] !== null) {
var subscription = this._subscribed_to[this_frame.headers.destination];
if (subscription.enabled && subscription.callback !== null && typeof(subscription.callback) == 'function') {
subscription.callback(this_frame.body, this_frame.headers);
}
}
this.emit('message', this_frame);
}
break;
case "CONNECTED":
log.debug('Connected to STOMP');
this.session = this_frame.headers['session'];
this.emit('connected');
break;
case "RECEIPT":
this.emit('receipt', this_frame.headers['receipt-id']);
break;
case "ERROR":
this.emit('error', this_frame);
break;
default:
console.log("Could not parse command: " + this_frame.command);
}
}; |
Stomp.disconnect()Disconnect from STOMP broker | Stomp.prototype.disconnect = function() {
_disconnect(this);
} |
Stomp.subscribe(headers, callback)Subscribe to destination (queue or topic) Takes a header object Takes a callback function | Stomp.prototype.subscribe = function(headers, callback) {
var destination = headers['destination'];
headers['session'] = this.session;
send_command(this, 'SUBSCRIBE', headers);
/**
/ Maybe we could subscribe to mulitple queues?
/ if (destination instanceof Array) {
/ for (var = i; i < 0; i++) {
/ this._subscribed_to[destination[i]] = { enabled: true, callback: callback };
/ }
/ }
/ else {
/ this._subscribed_to[destination] = { enabled: true, callback: callback };
/ }
/
*/
this._subscribed_to[destination] = { enabled: true, callback: callback };
this.log.debug('subscribed to: ' + destination + ' with headers ' + sys.inspect(headers));
}; |
Stomp.unsubscribe()Unsubscribe from destination (queue or topic) Takes a header object | Stomp.prototype.unsubscribe = function(headers) {
var destination = headers['destination'];
headers['session'] = this.session;
send_command(this, 'UNSUBSCRIBE', headers);
this._subscribed_to[destination].enabled = false;
this.log.debug('no longer subscribed to: ' + destination);
}; |
Stomp.ack()Acknowledge received message Takes a string representing the message id to ack | Stomp.prototype.ack = function(message_id) {
send_command(this, 'ACK', {'message-id': message_id});
this.log.debug('acknowledged message: ' + message_id);
}; |
Stomp.begin()Begin transaction Return a string representing the generated transaction id | Stomp.prototype.begin = function() {
var transaction_id = Math.floor(Math.random()*99999999999).toString();
send_command(this, 'BEGIN', {'transaction': transaction_id});
this.log.debug('begin transaction: ' + transaction_id);
return transaction_id;
}; |
Stomp.commit()Commit transaction Takes a string representing the transaction id generated by stomp.Stomp.begin() | Stomp.prototype.commit = function(transaction_id) {
send_command(this, 'COMMIT', {'transaction': transaction_id});
this.log.debug('commit transaction: ' + transaction_id);
}; |
Stomp.abort()Abort transaction Takes a string representing the transaction id generated by stomp.Stomp.begin() | Stomp.prototype.abort = function(transaction_id) {
send_command(this, 'ABORT', {'transaction': transaction_id});
this.log.debug('abort transaction: ' + transaction_id);
}; |
Stomp.send()Send MESSAGE to STOMP broker Takes a header object (destination is required) Takes a boolean requesting recipt of the sent message Returns a | Stomp.prototype.send = function(headers, want_receipt) {
var destination = headers['destination'],
body = headers['body'] || null;
delete headers['body'];
headers['session'] = this.session;
return send_command(this, 'SEND', headers, body, want_receipt)
};
module.exports.Stomp = Stomp;
|