Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file modified .DS_Store
Binary file not shown.
5 changes: 2 additions & 3 deletions app.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ See the License for the specific language governing permissions and
limitations under the License.

This code is written by Prateek Reddy Yammanuru, Shiva Manognya Kandikuppa, Uday Kumar Mydam, Nirup TNL, Sandeep Reddy G, Deepak Kumar
and updated by Ashish Gupta, Tarun Mohandas, Suriya Prakash, Srinivasa Burli, Jishnu Surendran and Bhairavi Balakrishnan*/
and updated by Ashish Gupta, Tarun Mohandas, Suriya Prakash, Srinivasa Burli, Jishnu Surendran and Bhairavi Balakrishnan,
enhanced by wave 4 team - Baskaran,Neel,Pallabee,Rahil,Shefali*/

//var mongoose = require('./mongoose');
//var db = mongoose();
Expand Down Expand Up @@ -66,8 +67,6 @@ var dimensionRouter = require('./routes/defineData/dimensions');
var measuresRouter = require('./routes/defineData/measures');
var namespaceRouter = require('./routes/defineData/logdata');
var expressions=require('./routes/realTimeLogs/queryBuilder/expressions.js')
// wave 4 code starts here
//require('./database');
var saveQuery = require('./routes/queryBuilder/saveQuery');

// wave 4 code ends here
Expand Down
2 changes: 1 addition & 1 deletion bin/www
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ var app = require('../app');
var debug = require('debug')('LogAggregatorExpress:server');
var http = require('http');

var port = normalizePort(process.env.PORT || '8686')
var port = normalizePort(process.env.PORT || '8585')
app.set('port', port);

var server = http.createServer(app);
Expand Down
87 changes: 50 additions & 37 deletions components/RTdatamapper/RTdatamapper.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,55 +4,68 @@ var WebSocketClient = require('websocket').client;
var WebSocketServer = require('ws').Server;
// var wss = new WebSocketServer({port: 8484});
var WebSocket1 = new WebSocketClient();
var sift = require('sift');
var wss = new WebSocketServer({
port: 5050
});
var serverWs;
wss.on('connection', function(ws) {
console.log('CONNECTED');
ws.send('Connected');
// ws.send('Connected');
serverWs = ws;
//ws.setMaxListeners(ws.getMaxListeners() + 1);
});

module.exports = function(namespaceId, measures) {
console.log("inside component", measures);

WebSocket1.on('connect', function(connection) {
console.log("Connected..Waiting for some message");
var streamData = {};
_('message', connection).map(function(msg) {
var measures = [];
var source;
// var forwardPort;
module.exports = function(namespaceId,Namespace) {
Namespace.findNamespace(namespaceId, function(err, namespace) {
if (namespace != null) {
// console.log("routes",namespace.dimensions);
// res.send(namespace.dimensions);
measures = [];
measures = namespace.measures;
source = namespace.source;
//console.log(measures);
console.log("called --------------------------------------------------------------------------------------------------------------");
}
});
};
console.log("inside component", measures);
WebSocket1.on('connect', function(connection) {
console.log("Connected..Waiting for some message");
streamData = {};
_('message', connection).map(function(msg) {
var sourceData = JSON.parse(msg.utf8Data)[0];
if (source === sourceData) {
streamData = JSON.parse(msg.utf8Data)[2];
var keys = Object.keys(streamData); //array of keys in the streaming data
for (var k = 0; k < measures.length; k++) {
for (var i = 0; i < keys.length; i++) {
if (keys[i] === measures[k].eventField) {
var keyValue = keys[i];
if (measures[k].measureType === "radioField") {
var displayValue = measures[k].displayName;
streamData[displayValue] = true;
} else {
if (streamData[keyValue] === measures[k].eventValue) {
var displayValue = measures[k].displayName;
streamData[displayValue] = true;
} else {
var displayValue = measures[k].displayName;
streamData[displayValue] = false;
}
}
}
console.log(streamData);
// var keys = Object.keys(streamData); //array of keys in the streaming data
var condition;
var sifter;
for (var k = 0; k < measures.length; k++)
{
var displayValue=measures[k].displayName;
// streamdata, keys
if(measures[k].measureType === "fieldMeasure") {
condition = {};

condition[measures[k].eventField] = {$exists: true};
sifter = sift(condition);
streamData[displayValue] = sifter(streamData) ? streamData[measures[k].eventField] : 0
} else {
condition = {};
condition[measures[k].eventField] = measures[k].eventValue;
sifter = sift(condition);
streamData[displayValue] = sifter(streamData) ? 1 : 0;
}
}
if (serverWs) {
serverWs.send(JSON.stringify(streamData));
}
console.log(streamData);
}).done(function() {
console.log('Done');
});
}
// console.log(streamData);
}).done(function() {
console.log('Done');
});
WebSocket1.connect('ws://172.23.238.253:7070');

// function msg
/* ending of program*/
}
});
WebSocket1.connect('ws://172.23.238.253:7070');
4 changes: 3 additions & 1 deletion components/aggregator/avg-aggregator.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@ module.exports=function (arr){
var i=0;
var sum=0;
var avg=0;

for(i=0;i<len;i++)
{
sum=sum+arr[i];
var k=parseInt(arr[i]);
sum=sum+k;
}
avg=sum/len;
return avg;
Expand Down
4 changes: 4 additions & 0 deletions components/aggregator/max-aggregator.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
module.exports=function (arr){
var max=0;
for(var i=0;i<arr.length;i++)
{
arr[i]=parseInt(arr[i]);
}
max = Math.max.apply(null, arr);
return max;
}
6 changes: 6 additions & 0 deletions components/aggregator/min-aggregator.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,10 @@
module.exports=function (arr){


for(var i=0;i<arr.length;i++)
{
arr[i]=parseInt(arr[i]);
}
var min=0;
min = Math.min.apply(null, arr);
return min;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ module.exports=function (arr){
var sum=0;
for(i=0;i<len;i++)
{
sum=sum+arr[i];
sum=sum+parseInt(arr[i]);
}

return sum;
}
}
138 changes: 85 additions & 53 deletions components/queryexecutor/mainquery.js
Original file line number Diff line number Diff line change
@@ -1,68 +1,100 @@
var _ = require('highland');
var QueryExecutor = require('./query-executor');
var query = {
select:['remote','host','method','code'],
eval: {
val1: {
rolling: {
evaluate: 'average',
over: {
count: 10
},
on: 'code' // measure
}
},
val2: {
rolling: {
evaluate: 'average',
over: {
count: 10
},
on: 'insertions'
}
},
},
project: {
// $highlight: {$condition: {val1: {$eq: eval['val2']}}}
$highlight: {$condition: 'val1 ==val2'}
},
to: 'streamB'
};
// var query = {
// select:['remote','host','method','code'],
// eval: {
// val1: {
// rolling: {
// evaluate: 'average',
// over: {
// count: 10
// },
// on: 'code' // measure
// }
// },
// val2: {
// rolling: {
// evaluate: 'average',
// over: {
// count: 10
// },
// on: 'insertions'
// }
// },
// },
// project: {
// // $highlight: {$condition: {val1: {$eq: eval['val2']}}}
// $highlight: {$condition: 'val1 ==val2'}
// },
// to: 'streamB'
// };

var WebSocketClient = require('websocket').client;
var WebSocket1 = new WebSocketClient();
var WebSocketServer = require('ws').Server;
var wss = new WebSocketServer({port: 8080});
var wss = new WebSocketServer({port: 9000});
var serverWs;

wss.on('connection', function(ws) {
console.log('CONNECTED');
ws.send('Connected');
//console.log('CONNECTED');
// ws.send('Connected');
serverWs = ws;
ws.setMaxListeners(ws.getMaxListeners() + 1);
//ws.setMaxListeners(ws.getMaxListeners() + 1);
});

module.exports=function (queryy) {
var isClientConnected = false;
var stream;
var con;
var pipeline;


var executor = new QueryExecutor(query);
var pipeline = executor.getPipeline();
WebSocket1.on('connect', function(connection) {
console.log("Connected..Waiting for some message");
var streamData = {};
_('message', connection).pipe(_.pipeline(_.map(function(msg) {
console.log('data received');
return JSON.parse(msg.utf8Data)[2];
})
))
.pipe(pipeline)
.pipe(_.pipeline(
_.map(function(msg) {
if(serverWs) {
serverWs.send(JSON.stringify(msg));
}
})
)).done();
function bootstrapStream() {
console.log("connected in mainQuery");
stream=_('message',con);
stream.pipe(_.pipeline(_.map(function(msg) {
streamData=msg;
console.log('data received ');
return JSON.parse(streamData.utf8Data);
})
))
.pipe(pipeline)
.pipe(_.pipeline(
_.map(function(msg) {
if(serverWs) {
serverWs.send(JSON.stringify(msg));
}
})
)).done();
}

module.exports=function (queryy) {
var executor = new QueryExecutor(queryy);
pipeline = executor.getPipeline();

connection.setMaxListeners(connection.getMaxListeners() + 1);
});
if (isClientConnected) {
// isClientConnected=false;
console.log('before stream destroyed ');
// for (var variable in stream) {
// console.log('>>> '+variable);
// }
stream.end(function() {
console.log('after stream destroyed');
bootstrapStream();
});
console.log('after stream destroyed');
}
else{
WebSocket1.on('connect', function(connection) {
isClientConnected = true;
console.log(" Connected..Waiting for some message inQuery");
var streamData = {};
connection.on('close', function() {
console.log('connection closed ***');
});
con=connection;
bootstrapStream();
// connection.setMaxListeners(connection.getMaxListeners() + 1);
});
WebSocket1.connect('ws://localhost:5050');
}
}
Loading