Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 1 addition & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -79,13 +79,7 @@ logger.log("What we've got here is...failure to communicate", "Some men you just
);
```

Flush all log messages and close down:
```javascript
logger.close(function(){
console.log('All done - cookie now?');
process.exit();
});
```
The connection will be closed automatically at the end of your program.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or rather, when the logger is garbage collected?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The docs say:

Calling unref on a socket will allow the program to exit if this is the only
active socket in the event system [...]

...so I think that the socket remains active until the program exits, but I suppose it would be functionally identical if it were closed as soon as it was GC'd


## Example

Expand Down
167 changes: 49 additions & 118 deletions graylog.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
var zlib = require('zlib'),
crypto = require('crypto'),
dgram = require('dgram'),
util = require('util'),
var zlib = require('zlib'),
crypto = require('crypto'),
dgram = require('dgram'),
util = require('util'),
EventEmitter = require('events').EventEmitter;

/**
Expand All @@ -12,66 +12,49 @@ var zlib = require('zlib'),
var graylog = function graylog(config) {
EventEmitter.call(this);

this.config = config;
this.config = config;

this.servers = config.servers;
this.client = null;
this.hostname = config.hostname || require('os').hostname();
this.facility = config.facility || 'Node.js';
this.servers = config.servers;
this.hostname = config.hostname || require('os').hostname();
this.facility = config.facility || 'Node.js';

this._unsentMessages = 0;
this._unsentChunks = 0;
this._callCount = 0;
this._callCount = 0;

this._onClose = null;
this._isDestroyed = false;
this._bufferSize = config.bufferSize || this.DEFAULT_BUFFERSIZE;

this.client = dgram.createSocket("udp4");

// unref so we don't need to close it explicitly. since we aren't listening
// for messages, there's no need to keep it open
this.client.unref();

var that = this;
this.client.on('error', function (err) {
that.emit('error', err);
});

this._bufferSize = config.bufferSize || this.DEFAULT_BUFFERSIZE;
};

util.inherits(graylog, EventEmitter);

graylog.prototype.DEFAULT_BUFFERSIZE = 1400; // a bit less than a typical MTU of 1500 to be on the safe side

graylog.prototype.level = {
EMERG: 0, // system is unusable
ALERT: 1, // action must be taken immediately
CRIT: 2, // critical conditions
ERR: 3, // error conditions
ERROR: 3, // because people WILL typo
WARNING: 4, // warning conditions
NOTICE: 5, // normal, but significant, condition
INFO: 6, // informational message
DEBUG: 7 // debug level message
EMERG: 0, // system is unusable
ALERT: 1, // action must be taken immediately
CRIT: 2, // critical conditions
ERR: 3, // error conditions
ERROR: 3, // because people WILL typo
WARNING: 4, // warning conditions
NOTICE: 5, // normal, but significant, condition
INFO: 6, // informational message
DEBUG: 7 // debug level message
};

graylog.prototype.getServer = function () {
return this.servers[this._callCount++ % this.servers.length];
};

graylog.prototype.getClient = function () {
if (!this.client && !this._isDestroyed) {
this.client = dgram.createSocket("udp4");

var that = this;
this.client.on('error', function (err) {
that.emit('error', err);
});
}

return this.client;
};

graylog.prototype.destroy = function () {
if (this.client) {
this.client.close();
this.client.removeAllListeners();
this.client = null;
this._onClose = null;
this._isDestroyed = true;
}
};

graylog.prototype.emergency = function (short_message, full_message, additionalFields, timestamp) {
return this._log(short_message, full_message, additionalFields, timestamp, this.level.EMERG);
};
Expand Down Expand Up @@ -107,35 +90,33 @@ graylog.prototype.debug = function (short_message, full_message, additionalField
};

graylog.prototype._log = function log(short_message, full_message, additionalFields, timestamp, level) {
this._unsentMessages += 1;

var payload,
fileinfo,
that = this,
field = '',
that = this,
field = '',
message = {
version : '1.0',
timestamp : (timestamp || new Date()).getTime() / 1000,
host : this.hostname,
facility : this.facility,
level : level
version : '1.0',
timestamp : (timestamp || new Date()).getTime() / 1000,
host : this.hostname,
facility : this.facility,
level : level
};

if (typeof(short_message) !== 'object' && typeof(full_message) === 'object' && additionalFields === undefined) {
// Only short message and additional fields are available
message.short_message = short_message;
message.full_message = short_message;
message.short_message = short_message;
message.full_message = short_message;

additionalFields = full_message;
} else if (typeof(short_message) !== 'object') {
} else if (typeof(short_message) !== 'object') {
// We normally set the data
message.short_message = short_message;
message.full_message = full_message || short_message;
message.short_message = short_message;
message.full_message = full_message || short_message;
} else if (short_message.stack && short_message.message) {

// Short message is an Error message, we process accordingly
message.short_message = short_message.message;
message.full_message = short_message.stack;
message.full_message = short_message.stack;

// extract error file and line
fileinfo = message.stack.split('\n')[0];
Expand Down Expand Up @@ -166,31 +147,27 @@ graylog.prototype._log = function log(short_message, full_message, additionalFie

zlib.deflate(payload, function (err, buffer) {
if (err) {
that._unsentMessages -= 1;
return that.emitError(err);
}

// If it all fits, just send it
if (buffer.length <= that._bufferSize) {
that._unsentMessages -= 1;
return that.send(buffer, that.getServer());
}

// It didn't fit, so prepare for a chunked stream

var bufferSize = that._bufferSize;
var dataSize = bufferSize - 12; // the data part of the buffer is the buffer size - header size
var dataSize = bufferSize - 12; // the data part of the buffer is the buffer size - header size
var chunkCount = Math.ceil(buffer.length / dataSize);

if (chunkCount > 128) {
that._unsentMessages -= 1;
return that.emitError('Cannot log messages bigger than ' + (dataSize * 128) + ' bytes');
}

// Generate a random id in buffer format
crypto.randomBytes(8, function (err, id) {
if (err) {
that._unsentMessages -= 1;
return that.emitError(err);
}

Expand All @@ -213,8 +190,8 @@ graylog.prototype._log = function log(short_message, full_message, additionalFie

function send(err) {
if (err || chunkSequenceNumber >= chunkCount) {
// We have reached the end, or had an error (which will already have been emitted)
that._unsentMessages -= 1;
// We have reached the end, or had an error (which will
// already have been emitted)
return;
}

Expand All @@ -223,7 +200,7 @@ graylog.prototype._log = function log(short_message, full_message, additionalFie

// Copy data from full buffer into the chunk
var start = chunkSequenceNumber * dataSize;
var stop = Math.min((chunkSequenceNumber + 1) * dataSize, buffer.length);
var stop = Math.min((chunkSequenceNumber + 1) * dataSize, buffer.length);

buffer.copy(chunk, 12, start, stop);

Expand All @@ -239,65 +216,19 @@ graylog.prototype._log = function log(short_message, full_message, additionalFie
};

graylog.prototype.send = function (chunk, server, cb) {
var that = this,
client = this.getClient();

if (!client) {
var error = new Error('Socket was already destroyed');

this.emit('error', error);
return cb(error);
}

this._unsentChunks += 1;

client.send(chunk, 0, chunk.length, server.port, server.host, function (err) {
that._unsentChunks -= 1;

var that = this;
this.client.send(chunk, 0, chunk.length, server.port, server.host, function (err) {
if (err) {
that.emit('error', err);
}

if (cb) {
cb(err);
}

if (that._unsentChunks === 0 && that._unsentMessages === 0 && that._onClose) {
that._onClose();
}
});
};

graylog.prototype.emitError = function (err) {
this.emit('error', err);

if (this._unsentChunks === 0 && this._unsentMessages === 0 && this._onClose) {
this._onClose();
}
};

graylog.prototype.close = function (cb) {
var that = this;

if (this._onClose || this._isDestroyed) {
return process.nextTick(function () {
var error = new Error('Close was already called once');

if (cb) {
return cb(error);
}

that.emit('error', error);
});
}

this._onClose = function () {
that.destroy();

if (cb) {
cb();
}
};
};

exports.graylog = graylog;
9 changes: 4 additions & 5 deletions test.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
var graylog = require('./graylog'),
fs = require('fs'),
fs = require('fs'),
file,
data,
servers = [
Expand Down Expand Up @@ -49,7 +49,6 @@ client.log('ParametersTest - Short message and full message', 'Full message');
client.log('ParametersTest - Short Message with full message and json', 'Full message', {cool: 'beans'});
console.log('');

client.close(function () {
console.log('Insertion complete. Please check', 'http://' + servers[0].host + ':3000', 'and verify that insertion was successfull');
console.log('');
});

console.log('Insertion complete. Please check', 'http://' + servers[0].host + ':3000', 'and verify that insertion was successfull');
console.log('');