diff --git a/Connector.py b/Connector.py index a9c6a84..d6b58f5 100644 --- a/Connector.py +++ b/Connector.py @@ -2,6 +2,10 @@ from copy import deepcopy import queue import socket +import pickle +import codecs + +import asyncio import logging @@ -16,6 +20,10 @@ def connect(self, destinationIP, destinationPort): pass + @abstractmethod + def disconnect(self): + pass + @abstractmethod def send(self, message): @@ -52,45 +60,79 @@ def getMessage(self): pass +class ServerProtocol(asyncio.DatagramProtocol): + + def __init__(self, queue): + self.queue = queue + + + def connection_made(self, transport): + self.transport = transport + + + def datagram_received(self, data, addr): + #logging.info("ServerProtocol::datagram_received => Received datagram '%s' from %s" % (data, addr)) + message = pickle.loads(codecs.decode(data, "base64")) + self.queue.put(message) + + class RealNetworkInterface(NetworkInterface): + + def __init__(self, senderIP, senderPort): self.senderIP = senderIP self.senderPort = senderPort - self.serverSock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) self.receiveQueue = queue.Queue() + self.loop = asyncio.get_event_loop() + def connect(self, destinationIP, destinationPort): + + self.destinationIP = destinationIP + self.destinationPort = destinationPort + #print(self.destinationIP) + #print(self.destinationPort) + #logging.info("RealNetwork::connect => Connecting to (%s:%d)" % (self.destinationIP, self.destinationPort)) self.clientSock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + def disconnect(self): + self.listen.close() + + def send(self, message): - self.clientSock.sendto( (UDP_IP_ADDRESS, UDP_PORT_NO)) - pass + #logging.info("RealNetwork::send => Sending message '%s' to (%s:%d)" % (message.getType(), self.destinationIP, self.destinationPort)) + data = codecs.encode(pickle.dumps(message), "base64") + result = self.clientSock.sendto(data, (self.destinationIP, self.destinationPort)) + #logging.info("RealNetwork::send => Results <%s>" % result) def receive(self, message): - pass + #logging.info("RealNetwork::receive => Received a message" ) + self.receiveQueue.put(message) def bind(self): - serverSock.bind(self.senderIP, self.senderPort) - pass + logging.info("RealNetworkInterface::bind => Registering (IP:Port) = (%s:%d )" % (self.senderIP, self.senderPort)) + self.listen = self.loop.create_datagram_endpoint( + lambda: ServerProtocol(self.receiveQueue), local_addr=(self.senderIP, self.senderPort)) + self.loop.run_until_complete(self.listen) def getIP(self): - pass + return self.senderIP def getPort(self): - pass + return self.senderPort def getNumberOfMessages(self): - pass + return self.receiveQueue.qsize() def getMessage(self): - pass + return self.receiveQueue.get() class FakeNetworkInterface(NetworkInterface): @@ -103,22 +145,26 @@ def __init__(self, senderIP, senderPort): def connect(self, destinationIP, destinationPort): - logging.debug("Connecting (%s:%s) to (%s:%s)" % (self.senderIP, self.senderPort, destinationIP, destinationPort)) - self.destinationInterface = interfaces[destinationIP+":"+destinationPort] + logging.info("FakeNetwork::connect => Connecting (%s:%s) to (%s:%s)" % (self.senderIP, self.senderPort, destinationIP, destinationPort)) + self.destinationInterface = interfaces[destinationIP+":"+str(destinationPort)] + + + def disconnect(self): + interfaces[self.senderIP + ":" + self.senderPort] = None def bind(self): logging.info("FakeNetworkInterface::bind => Registering (IP:Port) = (%s:%s )" % (self.senderIP, self.senderPort)) - interfaces[self.senderIP+":"+self.senderPort] = self + interfaces[self.senderIP+":"+str(self.senderPort)] = self def send(self, message): - logging.debug("Sending message '%s' to destinationInterface (%s:%s)" % (message, self.destinationInterface.getIP(), self.destinationInterface.getPort())) + #logging.info("Sending message '%s' to destinationInterface (%s:%s)" % (message, self.destinationInterface.getIP(), self.destinationInterface.getPort())) self.destinationInterface.receive(deepcopy(message)) def receive(self, message): - logging.debug("Receive Queue of NIC (%s:%s) has <%d> messages" % (self.senderIP, self.senderPort, self.receiveQueue.qsize())) + #logging.debug("Receive Queue of NIC (%s:%s) has <%d> messages" % (self.senderIP, self.senderPort, self.receiveQueue.qsize())) self.receiveQueue.put(message) logging.debug("Receive Queue of NIC (%s:%s) has <%d> messages" % (self.senderIP, self.senderPort, self.receiveQueue.qsize())) diff --git a/Message.py b/Message.py index 5713d3b..a7a9956 100644 --- a/Message.py +++ b/Message.py @@ -1,3 +1,52 @@ +class JoinMessage(): + def __init__(self, IP, port): + self.nodeID = None + self.type = None # join, query, matcher + self.IP = IP + self.port = port + self.registeredNodes = {} + + + def setNodeID(self, nodeID): + self.nodeID = nodeID + + + def getNodeID(self): + return self.nodeID + + + def setIP(self, IP): + self.IP = IP + + + def setPort(self, port): + self.port = port + + + def getIP(self): + return self.IP + + + def getPort(self): + return self.port + + + def getType(self): + return self.type + + + def setType(self, type): + self.type = type + + + def setRegisteredNodes(self, nodes): + self.registeredNodes = nodes + + + def getRegisteredNodes(self): + return self.registeredNodes + + class Message(object): diff --git a/Node.py b/Node.py index dcd721b..78378e5 100644 --- a/Node.py +++ b/Node.py @@ -1,7 +1,69 @@ from VAST import VASTInterface from collections import defaultdict -from Message import Message +from Message import Message, JoinMessage import logging +from Connector import NetworkInterface + +class Gateway(): + def __init__(self, interface): + self.interface = interface + self.ID = 0 + self.registeredNodes = {} + self.matchers = [] + + + def initialiseNetworkInterface(self): + self.interface.bind() + + def processSingleMessage(self): + + #logging.info("Gateway::processSingleMessage => Number of received messages <%d>" % self.interface.getNumberOfMessages()) + if (self.interface.getNumberOfMessages()): + message = self.interface.getMessage() + type = message.getType() + #logging.info("Gateway::processSingleMessage => Received message of type <%s>" % type) + if (type == 'join'): + IP = message.getIP() + port = message.getPort() + nodeID = self.generateID(IP, port) + self.registerNode(IP, port, nodeID) + message.setRegisteredNodes(self.registeredNodes) + for ID in self.registeredNodes: + (nodeIP, nodePort) = self.registeredNodes[ID] + message.setIP(nodeIP) + message.setPort(nodePort) + message.setNodeID(ID) + self.interface.connect(nodeIP, nodePort) + self.interface.send(message) + elif (type == 'matcher'): + nodeID = message.getNodeID() + self.registerMatcher(nodeID) + elif (type == 'query'): + nodeID = message.getNodeID() + (IP, port) = self.queryNodeID(nodeID) + message.setIP(IP) + message.setPort(port) + self.interface.connect(IP, port) + self.interface.send(message) + + + def generateID(self, IP, port): + nodeID = self.ID + self.ID += 1 + return nodeID + + + def registerNode(self, IP, port, nodeID): + self.registeredNodes[nodeID] = (IP, port) + + + def registerMatcher(self, nodeID): + self.matchers.append(nodeID) + + + def queryNodeID(self, nodeID): + (IP, port) = self.registeredNodes[nodeID] + return (IP, port) class SPSNode(object): @@ -49,17 +111,23 @@ def unsubscribeFromArea(self, area): class VASTNode(object): - def __init__(self, nodeID, networkInterface, VASTInterface): - self.nodeID = nodeID + def __init__(self, networkInterface, VASTInterface): + self.nodeID = None self.networkInterface = networkInterface self.VAST = VASTInterface - def registerID(self): + def initialiseNetworkInterface(self): + self.networkInterface.bind() + + + def registerID(self, gatewayIP, gatewayPort): IP = self.networkInterface.getIP() port = self.networkInterface.getPort() - self.VAST.join(self.nodeID, IP, port) - self.networkInterface.bind() + message = JoinMessage(IP, port) + message.setType('join') + self.networkInterface.connect(gatewayIP, gatewayPort) + self.networkInterface.send(message) def connect(self, IP, port): @@ -69,19 +137,35 @@ def connect(self, IP, port): def send(self, destinationNodeID, message): #logging.info("VASTNode::send => Node [%s] is sending message with content '%s' to node [%s]" % (self.nodeID, message.getPayload(), destinationNodeID)) (IP, port) = self.VAST.getIPPort(destinationNodeID) + #message.setSenderID(self.getNodeID()) self.networkInterface.connect(IP, port) self.networkInterface.send(message=message) def processSingleMessage(self): - #while (self.networkInterface.getNumberOfMessages()): + #logging.info("Gateway::processSingleMessage => Number of received messages <%d>" % self.networkInterface.getNumberOfMessages()) + if (self.networkInterface.getNumberOfMessages()): message = self.networkInterface.getMessage() - payload = message.getPayload() - senderID = message.getSenderID() type = message.getType() - logging.info("VASTNode::processSingleMessage => Node [%s] has received message <%s> from node [%s] with content '%s'" % (self.nodeID, type, senderID, payload)) - # print("Node [%s] has <%d> messages left in queue" % (self.nodeID, self.networkInterface.getNumberOfMessages()) ) + #logging.info("VASTNode::processSingleMessage => Received message of type <%s>" % type) + if (type == 'join'): + self.nodeID = message.getNodeID() + IP = self.networkInterface.getIP() + port = self.networkInterface.getPort() + self.VAST.join(IP, port, self.nodeID) + registeredNodes = message.getRegisteredNodes() + for nodeID in registeredNodes: + (nodeIP, nodePort) = registeredNodes[nodeID] + self.VAST.join(nodeIP, nodePort, nodeID) + logging.info("VASTNode::handleMessage => MatcherNode::handleMessage => Received nodeID <%s> from gateway" % self.nodeID) + else: + payload = message.getPayload() + senderID = message.getSenderID() + channel = message.getChannel() + logging.info("VASTNode::handleMessage => Node [%s] has received message type <%s> for channel <%s> from node [%s] with content '%s'" % (self.nodeID, type, channel, senderID, payload)) + # logging.info("VASTNode::processSingleMessage => Node [%s] has received message <%s> from node [%s] with content '%s'" % (self.nodeID, type, senderID, payload)) + # print("Node [%s] has <%d> messages left in queue" % (self.nodeID, self.networkInterface.getNumberOfMessages()) ) def processMessages(self): @@ -90,8 +174,9 @@ def processMessages(self): payload = message.getPayload() senderID = message.getSenderID() type = message.getType() - logging.info("VASTNode::processMessages => Node [%s] has received message <%s> from node [%s] with content '%s'" % (self.nodeID, type, senderID, payload)) - # print("Node [%s] has <%d> messages left in queue" % (self.nodeID, self.networkInterface.getNumberOfMessages()) ) + channel = message.getChannel() + logging.info("VASTNode::handleMessage => Node [%s] has received message type <%s> for channel <%s> from node [%s] with content '%s'" % (self.nodeID, type, channel, senderID, payload)) +# # print("Node [%s] has <%d> messages left in queue" % (self.nodeID, self.networkInterface.getNumberOfMessages()) ) def receive(self, message, **kwargs): @@ -105,8 +190,8 @@ def getNodeID(self): class MatcherNode(VASTNode): - def __init__(self, nodeID, networkInterface, VASTInterface): - self.nodeID = nodeID + def __init__(self, networkInterface, VASTInterface): + self.nodeID = None self.networkInterface = networkInterface self.VAST = VASTInterface self.channelSubscriptions = defaultdict() @@ -115,17 +200,20 @@ def __init__(self, nodeID, networkInterface, VASTInterface): def handleMessage(self, message): type = message.getType() - senderID = message.getSenderID() - logging.info("Matcher::handleMessage => Matcher handling message <%s> from node <%s>" % (type, senderID)) + # + #logging.info("Matcher::handleMessage => Matcher handling message <%s> from node <%s>" % (type, senderID)) if type == 'sub': + senderID = message.getSenderID() channel = message.getChannel() logging.info("Matcher::handleMessage => Node <%s> subscribing to channel <%s>" % (senderID, channel)) self.addChannelSubscription(senderID, channel) elif type == 'unsub': + senderID = message.getSenderID() channel = message.getChannel() logging.info("Matcher::handleMessage => Node <%s> unsubscribing from channel <%s>" % (senderID, channel)) self.removeChannelSubscription(senderID, channel) elif type == 'pub': + senderID = message.getSenderID() channel = message.getChannel() payload = message.getPayload() logging.info("Matcher::handleMessage => Node <%s> publishing message '%s' to channel <%s>" % (senderID, payload, channel)) @@ -134,21 +222,34 @@ def handleMessage(self, message): payload = message.getPayload() senderID = message.getSenderID() type = message.getType() - logging.info("Matcher::handleMessage => Node [%s] has received message <%s> from node [%s] with content '%s'" % (self.nodeID, type, senderID, payload)) + channel = message.getChannel() + logging.info("Matcher::handleMessage => Node [%s] has received message type <%s> for channel <%s> from node [%s] with content '%s'" % (self.nodeID, type, channel, senderID, payload)) elif type == 'spatialsub': + senderID = message.getSenderID() area = message.getArea() logging.info("Matcher::handleMessage => Node <%s> subscribing to area <%d,%d,%d>" % (senderID, area.position[0], area.position[1],area.radius)) self.addSpatialSubscription(senderID, area) elif type == 'spatialunsub': + senderID = message.getSenderID() area = message.getArea() logging.info("Matcher::handleMessage => Node <%s> unsubscribing from area <%d,%d,%d>" % (senderID, area.position[0], area.position[1],area.radius)) self.removeSpatialSubscription(senderID, area) elif type == 'spatialpub': + senderID = message.getSenderID() area = message.getArea() payload = message.getPayload() logging.info("Matcher::handleMessage => MatcherNode::handleMessage => Node <%s> publishing message '%s' to area <%d,%d,%d>" % (senderID, payload, area.position[0], area.position[1],area.radius)) self.publishSpatial(senderID, area, payload) - + if (type == 'join'): + self.nodeID = message.getNodeID() + IP = self.networkInterface.getIP() + port = self.networkInterface.getPort() + self.VAST.join(IP, port, self.nodeID) + registeredNodes = message.getRegisteredNodes() + for nodeID in registeredNodes: + (nodeIP, nodePort) = registeredNodes[nodeID] + self.VAST.join(nodeIP, nodePort, nodeID) + logging.info("Matcher::handleMessage => MatcherNode::handleMessage => Received nodeID <%s> from gateway" % self.nodeID) def addChannelSubscription(self, nodeID, channel): if (channel in self.channelSubscriptions): diff --git a/Simulator.py b/Simulator.py index 529d497..e7cccbe 100644 --- a/Simulator.py +++ b/Simulator.py @@ -3,7 +3,7 @@ import asyncio from Node import MatcherNode -from Node import VASTNode, SPSNode +from Node import VASTNode, SPSNode, Gateway from VAST import VASTInterface from Message import Message from Connector import * @@ -19,67 +19,63 @@ async def work(node): async def simulate(matcher, nodes, loop): + await asyncio.sleep(2) matcherNodeID = matcher.getNodeID() - # nodes 'joioniconnect to matcher - matcher.registerID() - for node in nodes: - node.registerID() - # Channel based pub/sub logging.info("") logging.info("Simulator::main => TESTING CHANNEL-BASED PUB/SUB") logging.info("") # Node '0' subscribes to channel 'test' - message = Message(senderNodeID='0', type='sub', channel='test', payload=None) + message = Message(senderNodeID=matcher.getNodeID(), type='sub', channel='test', payload=None) matcher.send(destinationNodeID=matcherNodeID, message=message) await asyncio.sleep(simulationTick) - # Node '1' subscribes to channel 'test' - message = Message(senderNodeID='1', type='sub', channel='test', payload=None) + # Node '1' subscribes to channel 'test2' + message = Message(senderNodeID=nodes[0].getNodeID(), type='sub', channel='test', payload=None) nodes[0].send(destinationNodeID=matcherNodeID, message=message) await asyncio.sleep(simulationTick) - # Node '1' subscribes to channel 'test' - message = Message(senderNodeID='2', type='sub', channel='test2', payload=None) + # Node '1' subscribes to channel 'test2' + message = Message(senderNodeID=nodes[1].getNodeID(), type='sub', channel='test2', payload=None) nodes[1].send(destinationNodeID=matcherNodeID, message=message) await asyncio.sleep(simulationTick) - message = Message(senderNodeID='2', type='pub', channel='test2', payload='publication message 1') + message = Message(senderNodeID=nodes[1].getNodeID(), type='pub', channel='test2', payload='publication message 1') nodes[1].send(destinationNodeID=matcherNodeID, message=message) await asyncio.sleep(simulationTick) - message = Message(senderNodeID='1', type='pub', channel='test2', payload='publication message 2') + message = Message(senderNodeID=nodes[0].getNodeID(), type='pub', channel='test2', payload='publication message 2') nodes[0].send(destinationNodeID=matcherNodeID, message=message) await asyncio.sleep(simulationTick) - message = Message(senderNodeID='2', type='pub', channel='test', payload='publication message 3') + message = Message(senderNodeID=nodes[1].getNodeID(), type='pub', channel='test', payload='publication message 3') nodes[1].send(destinationNodeID=matcherNodeID, message=message) await asyncio.sleep(simulationTick) # Node '0' unsubscribes from channel 'test' - message = Message(senderNodeID='0', type='unsub', channel='test') + message = Message(senderNodeID=matcher.getNodeID(), type='unsub', channel='test') matcher.send(destinationNodeID=matcherNodeID, message=message) await asyncio.sleep(simulationTick) - message = Message(senderNodeID='2', type='pub', channel='test', payload='publication message 4') + message = Message(senderNodeID=nodes[1].getNodeID(), type='pub', channel='test', payload='publication message 4') nodes[1].send(destinationNodeID=matcherNodeID, message=message) await asyncio.sleep(simulationTick) # Node '0' unsubscribes from channel 'test' - message = Message(senderNodeID='1', type='unsub', channel='test') + message = Message(senderNodeID=nodes[0].getNodeID(), type='unsub', channel='test') nodes[0].send(destinationNodeID=matcherNodeID, message=message) await asyncio.sleep(simulationTick) - message = Message(senderNodeID='2', type='pub', channel='test', payload='publication message 5') + message = Message(senderNodeID=nodes[1].getNodeID(), type='pub', channel='test', payload='publication message 5') nodes[1].send(destinationNodeID=matcherNodeID, message=message) await asyncio.sleep(simulationTick) # Node '2' subscribes to channel 'test' - message = Message(senderNodeID='2', type='unsub', channel='test2') + message = Message(senderNodeID=nodes[1].getNodeID(), type='unsub', channel='test2') nodes[1].send(destinationNodeID=matcherNodeID, message=message) await asyncio.sleep(simulationTick) @@ -92,7 +88,7 @@ async def simulate(matcher, nodes, loop): # Node '0' subscribes to area (x,y,r)=(0,0,10) pos1 = (0, 0) radius1 = 10 - message = Message(senderNodeID='0', type='spatialsub') + message = Message(senderNodeID=matcher.getNodeID(), type='spatialsub') message.setArea(area=Area(pos1, radius1)) matcher.send(destinationNodeID=matcherNodeID, message=message) await asyncio.sleep(simulationTick) @@ -100,7 +96,7 @@ async def simulate(matcher, nodes, loop): # Node '1' subscribes to area (x,y,r)=(0,5,5) pos2 = (0, 5) radius2 = 5 - message = Message(senderNodeID='1', type='spatialsub') + message = Message(senderNodeID=nodes[0].getNodeID(), type='spatialsub') message.setArea(area=Area(pos2, radius2)) nodes[0].send(destinationNodeID=matcherNodeID, message=message) await asyncio.sleep(simulationTick) @@ -108,7 +104,7 @@ async def simulate(matcher, nodes, loop): # Node '2' subscribes to area (x,y,r)=(0,5,5) pos2 = (0, 5) radius2 = 5 - message = Message(senderNodeID='2', type='spatialsub') + message = Message(senderNodeID=nodes[1].getNodeID(), type='spatialsub') message.setArea(area=Area(pos2, radius2)) nodes[1].send(destinationNodeID=matcherNodeID, message=message) await asyncio.sleep(simulationTick) @@ -116,7 +112,7 @@ async def simulate(matcher, nodes, loop): # Node '1' publishes to area (x,y,r)=(0,5,1) pos2 = (0, 5) radius3 = 1 - message = Message(senderNodeID='1', type='spatialpub') + message = Message(senderNodeID=nodes[0].getNodeID(), type='spatialpub') message.setArea(area=Area(pos2, radius3)) message.setPayload(payload='spatial publication message 1') nodes[0].send(destinationNodeID=matcherNodeID, message=message) @@ -125,7 +121,7 @@ async def simulate(matcher, nodes, loop): # Node '2' unsubscribes from area (x,y,r)=(0,5,5) pos2 = (0, 5) radius2 = 5 - message = Message(senderNodeID='2', type='spatialunsub') + message = Message(senderNodeID=nodes[1].getNodeID(), type='spatialunsub') message.setArea(area=Area(pos2, radius2)) nodes[1].send(destinationNodeID=matcherNodeID, message=message) await asyncio.sleep(simulationTick) @@ -133,7 +129,7 @@ async def simulate(matcher, nodes, loop): # Node '0' publishes to area (x,y,r)=(0,5,1) pos2 = (0, 5) radius3 = 1 - message = Message(senderNodeID='0', type='spatialpub') + message = Message(senderNodeID=matcher.getNodeID(), type='spatialpub') message.setArea(area=Area(pos2, radius3)) message.setPayload(payload='spatial publication message 2') matcher.send(destinationNodeID=matcherNodeID, message=message) @@ -142,7 +138,7 @@ async def simulate(matcher, nodes, loop): # Node '0' publishes to area (x,y,r)=(0,-5,1) pos3 = (0, -5) radius3 = 1 - message = Message(senderNodeID='0', type='spatialpub') + message = Message(senderNodeID=matcher.getNodeID(), type='spatialpub') message.setArea(area=Area(pos3, radius3)) message.setPayload(payload='spatial publication message 3') matcher.send(destinationNodeID=matcherNodeID, message=message) @@ -151,7 +147,7 @@ async def simulate(matcher, nodes, loop): # Node '0' publishes to area (x,y,r)=(0,20,1) pos4 = (0, 20) radius3 = 1 - message = Message(senderNodeID='0', type='spatialpub') + message = Message(senderNodeID=matcher.getNodeID(), type='spatialpub') message.setArea(area=Area(pos4, radius3)) message.setPayload(payload='spatial publication message 4') matcher.send(destinationNodeID=matcherNodeID, message=message) @@ -161,7 +157,7 @@ async def simulate(matcher, nodes, loop): # Node '0' unsubscribes from area (x,y,r)=(0,0,10) pos1 = (0, 0) radius1 = 10 - message = Message(senderNodeID='0', type='spatialunsub') + message = Message(senderNodeID=matcher.getNodeID(), type='spatialunsub') message.setArea(area=Area(pos1, radius1)) matcher.send(destinationNodeID=matcherNodeID, message=message) await asyncio.sleep(simulationTick) @@ -169,7 +165,7 @@ async def simulate(matcher, nodes, loop): # Node '1' unsubscribes from area (x,y,r)=(0,5,5) pos2 = (0, 5) radius2 = 5 - message = Message(senderNodeID='1', type='spatialunsub') + message = Message(senderNodeID=nodes[0].getNodeID(), type='spatialunsub') message.setArea(area=Area(pos2, radius2)) nodes[0].send(destinationNodeID=matcherNodeID, message=message) await asyncio.sleep(simulationTick) @@ -279,16 +275,46 @@ def main(): logging.basicConfig(format=format, level=logging.INFO, datefmt="%H:%M:%S") + gatewayIP = '127.0.0.1' + gatewayPort = 10000 + VAST = VASTInterface() - matcherNodeID = '0' - matcher = MatcherNode(nodeID=matcherNodeID, - networkInterface=FakeNetworkInterface(senderIP='127.0.0.1', senderPort='1000'),VASTInterface=VAST) - nodes = [VASTNode(nodeID='1', networkInterface=FakeNetworkInterface(senderIP='127.0.0.1', senderPort='1001'), VASTInterface=VAST), - VASTNode(nodeID='2', networkInterface=FakeNetworkInterface(senderIP='127.0.0.1', senderPort='1002'), VASTInterface=VAST)] + RealNetwork = True + + if (RealNetwork): + gateway = Gateway(RealNetworkInterface(senderIP=gatewayIP, senderPort=gatewayPort)) + matcher = MatcherNode(networkInterface=RealNetworkInterface(senderIP='127.0.0.1', senderPort=12000), VASTInterface=VAST) + nodes = [VASTNode(networkInterface=RealNetworkInterface(senderIP='127.0.0.1', senderPort=12001), VASTInterface=VAST), + VASTNode(networkInterface=RealNetworkInterface(senderIP='127.0.0.1', senderPort=12002), VASTInterface=VAST)] + + + else: + gateway = Gateway(RealNetworkInterface(senderIP=gatewayIP, senderPort=gatewayPort)) + matcher = MatcherNode(networkInterface=FakeNetworkInterface(senderIP='127.0.0.1', senderPort=12000), + VASTInterface=VAST) + nodes = [VASTNode(networkInterface=FakeNetworkInterface(senderIP='127.0.0.1', senderPort=12001), + VASTInterface=VAST), + VASTNode(networkInterface=FakeNetworkInterface(senderIP='127.0.0.1', senderPort=12002), + VASTInterface=VAST)] + + # Initialise network interface + gateway.initialiseNetworkInterface() + matcher.initialiseNetworkInterface() + for node in nodes: + node.initialiseNetworkInterface() + + # Matcher obtain nodeID from gateway + matcher.registerID(gatewayIP, gatewayPort) + matcherNodeID = matcher.getNodeID() + + # Nodes obtain nodeIDs from gateway + for node in nodes: + node.registerID(gatewayIP, gatewayPort) loop = asyncio.get_event_loop() try: asyncio.ensure_future(work(matcher)) + asyncio.ensure_future(work(gateway)) for node in nodes: asyncio.ensure_future(work(node)) asyncio.ensure_future(simulate(matcher, nodes, loop)) @@ -297,6 +323,7 @@ def main(): pass finally: logging.info("Closing Loop") + loop.close() diff --git a/StandaloneMatcherNode.py b/StandaloneMatcherNode.py new file mode 100644 index 0000000..6b30365 --- /dev/null +++ b/StandaloneMatcherNode.py @@ -0,0 +1,70 @@ +import sys +import getopt + + +from Node import MatcherNode +from VAST import VASTInterface +from Connector import * + + +messageProcessingTick = 0.001 +simulationTick = 0.01 + +async def processReceiveMessage(node): + while True: + await asyncio.sleep(messageProcessingTick) + node.processSingleMessage() + +def main(argv): + + + IP = '' + port = '' + try: + opts, args = getopt.getopt(argv, "hi:p:") + except getopt.GetoptError: + print + 'singlenode.py -i -p ' + sys.exit(2) + for opt, arg in opts: + if opt == '-h': + print + 'singlenode.py -i -p ' + sys.exit() + elif opt in ("-ip"): + IP = arg + elif opt in ("-port"): + port = int(arg) + + print('(IP:Port) = (%s:%d)' % (IP, port)) + + format = "%(asctime)s: %(message)s" + logging.basicConfig(format=format, level=logging.INFO, + datefmt="%H:%M:%S") + + gatewayIP = '127.0.0.1' + gatewayPort = 49152 + + VAST = VASTInterface() + #matcherNodeID = 0 + #VAST.join('127.0.0.1', 49153, matcherNodeID) + #VAST.join('127.0.0.1', 49154, 1) + #VAST.join('127.0.0.1', 49155, 2) + + matcher = MatcherNode(networkInterface=RealNetworkInterface(senderIP=IP, senderPort=49153), VASTInterface=VAST) + matcher.initialiseNetworkInterface() + matcher.registerID(gatewayIP, gatewayPort) + + loop = asyncio.get_event_loop() + try: + asyncio.ensure_future(processReceiveMessage(matcher)) + loop.run_forever() + except KeyboardInterrupt: + pass + finally: + logging.info("Closing Loop") + + loop.close() + +if __name__ == '__main__': + main(sys.argv[1:]) \ No newline at end of file diff --git a/StandaloneNode.py b/StandaloneNode.py new file mode 100644 index 0000000..61beff3 --- /dev/null +++ b/StandaloneNode.py @@ -0,0 +1,100 @@ +import sys +import getopt + + +from Node import VASTNode +from VAST import VASTInterface +from Message import Message +from Connector import * +import random + +messageProcessingTick = 0.001 + + +async def processReceiveMessage(node): + while True: + await asyncio.sleep(messageProcessingTick) + node.processSingleMessage() + + +async def publishMessage(node, channel, subchannel, destinationID, messageNumber): + subscribed = False + while True: + publishTick = random.randint(0, 2) + await asyncio.sleep(publishTick) + if (node.getNodeID() != None): + logging:info("Have received nodeID <%s>" % node.getNodeID()) + if (subscribed == False): + # Node '1' subscribes to channel 'test2' + message = Message(senderNodeID=node.getNodeID(), type='sub', channel=subchannel, payload=None) + node.send(destinationNodeID=0, message=message) + subscribed = True + else: + + payload = 'publication message %d' % messageNumber + message = Message(senderNodeID=node.getNodeID(), type='pub', channel=channel, payload=payload) + node.send(destinationNodeID=destinationID, message=message) + messageNumber += 1 + +def main(argv): + + IP = '' + port = '' + subchannel = '' + pubchannel = '' + try: + opts, args = getopt.getopt(argv, "hi:p:c:s:") + except getopt.GetoptError: + print + 'singlenode.py -i -p ' + sys.exit(2) + for opt, arg in opts: + if opt == '-h': + print + 'singlenode.py -i -p ' + sys.exit() + elif opt in ("-i"): + IP = arg + elif opt in ("-p"): + port = int(arg) + elif opt in ("-c"): + pubchannel = arg + elif opt in ("-s"): + subchannel = arg + + + print('(IP:Port) = (%s:%d)' % (IP, port)) + + format = "%(asctime)s: %(message)s" + logging.basicConfig(format=format, level=logging.INFO, + datefmt="%H:%M:%S") + + gatewayIP = '127.0.0.1' + gatewayPort = 49152 + + VAST = VASTInterface() + matcherNodeID = 0 + #VAST.join('127.0.0.1', 49153, matcherNodeID) + #VAST.join('127.0.0.1', 49154, 1) + #VAST.join('127.0.0.1', 49155, 2) + + + node = VASTNode(networkInterface=RealNetworkInterface(senderIP=IP, senderPort=port), VASTInterface=VAST) + node.initialiseNetworkInterface() + node.registerID(gatewayIP, gatewayPort) + + + loop = asyncio.get_event_loop() + try: + asyncio.ensure_future(processReceiveMessage(node)) + asyncio.ensure_future(publishMessage(node, pubchannel, subchannel, matcherNodeID, 0)) + loop.run_forever() + except KeyboardInterrupt: + pass + finally: + logging.info("Closing Loop") + + loop.close() + +if __name__ == '__main__': + main(sys.argv[1:]) \ No newline at end of file diff --git a/VAST.py b/VAST.py index 8e9488e..6d041ee 100644 --- a/VAST.py +++ b/VAST.py @@ -1,12 +1,19 @@ +from Connector import NetworkInterface +from Message import JoinMessage + class VASTInterface(object): def __init__(self): self.registeredNodes = {} + def join(self, IP, port, ID): + self.registerNode(IP, port, ID) - def join(self, nodeID, IP, port): + def registerNode(self, IP, port, nodeID): self.registeredNodes[nodeID] = (IP, port) + #print(self.registeredNodes) def getIPPort(self, nodeID): + #print(self.registeredNodes) result = self.registeredNodes[nodeID] return result \ No newline at end of file