Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 61 additions & 15 deletions Connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@
from copy import deepcopy
import queue
import socket
import pickle
import codecs

import asyncio

import logging

Expand All @@ -16,6 +20,10 @@ def connect(self, destinationIP, destinationPort):
pass


@abstractmethod
def disconnect(self):
pass


@abstractmethod
def send(self, message):
Expand Down Expand Up @@ -52,45 +60,79 @@ def getMessage(self):
pass


class ServerProtocol(asyncio.DatagramProtocol):

def __init__(self, queue):
self.queue = queue


def connection_made(self, transport):
self.transport = transport


def datagram_received(self, data, addr):
#logging.info("ServerProtocol::datagram_received => Received datagram '%s' from %s" % (data, addr))
message = pickle.loads(codecs.decode(data, "base64"))
self.queue.put(message)


class RealNetworkInterface(NetworkInterface):


def __init__(self, senderIP, senderPort):
self.senderIP = senderIP
self.senderPort = senderPort
self.serverSock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.receiveQueue = queue.Queue()
self.loop = asyncio.get_event_loop()


def connect(self, destinationIP, destinationPort):

self.destinationIP = destinationIP
self.destinationPort = destinationPort
#print(self.destinationIP)
#print(self.destinationPort)
#logging.info("RealNetwork::connect => Connecting to (%s:%d)" % (self.destinationIP, self.destinationPort))
self.clientSock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)


def disconnect(self):
self.listen.close()


def send(self, message):
self.clientSock.sendto( (UDP_IP_ADDRESS, UDP_PORT_NO))
pass
#logging.info("RealNetwork::send => Sending message '%s' to (%s:%d)" % (message.getType(), self.destinationIP, self.destinationPort))
data = codecs.encode(pickle.dumps(message), "base64")
result = self.clientSock.sendto(data, (self.destinationIP, self.destinationPort))
#logging.info("RealNetwork::send => Results <%s>" % result)


def receive(self, message):
pass
#logging.info("RealNetwork::receive => Received a message" )
self.receiveQueue.put(message)


def bind(self):
serverSock.bind(self.senderIP, self.senderPort)
pass
logging.info("RealNetworkInterface::bind => Registering (IP:Port) = (%s:%d )" % (self.senderIP, self.senderPort))
self.listen = self.loop.create_datagram_endpoint(
lambda: ServerProtocol(self.receiveQueue), local_addr=(self.senderIP, self.senderPort))
self.loop.run_until_complete(self.listen)


def getIP(self):
pass
return self.senderIP


def getPort(self):
pass
return self.senderPort


def getNumberOfMessages(self):
pass
return self.receiveQueue.qsize()


def getMessage(self):
pass
return self.receiveQueue.get()


class FakeNetworkInterface(NetworkInterface):
Expand All @@ -103,22 +145,26 @@ def __init__(self, senderIP, senderPort):


def connect(self, destinationIP, destinationPort):
logging.debug("Connecting (%s:%s) to (%s:%s)" % (self.senderIP, self.senderPort, destinationIP, destinationPort))
self.destinationInterface = interfaces[destinationIP+":"+destinationPort]
logging.info("FakeNetwork::connect => Connecting (%s:%s) to (%s:%s)" % (self.senderIP, self.senderPort, destinationIP, destinationPort))
self.destinationInterface = interfaces[destinationIP+":"+str(destinationPort)]


def disconnect(self):
interfaces[self.senderIP + ":" + self.senderPort] = None


def bind(self):
logging.info("FakeNetworkInterface::bind => Registering (IP:Port) = (%s:%s )" % (self.senderIP, self.senderPort))
interfaces[self.senderIP+":"+self.senderPort] = self
interfaces[self.senderIP+":"+str(self.senderPort)] = self


def send(self, message):
logging.debug("Sending message '%s' to destinationInterface (%s:%s)" % (message, self.destinationInterface.getIP(), self.destinationInterface.getPort()))
#logging.info("Sending message '%s' to destinationInterface (%s:%s)" % (message, self.destinationInterface.getIP(), self.destinationInterface.getPort()))
self.destinationInterface.receive(deepcopy(message))


def receive(self, message):
logging.debug("Receive Queue of NIC (%s:%s) has <%d> messages" % (self.senderIP, self.senderPort, self.receiveQueue.qsize()))
#logging.debug("Receive Queue of NIC (%s:%s) has <%d> messages" % (self.senderIP, self.senderPort, self.receiveQueue.qsize()))
self.receiveQueue.put(message)
logging.debug("Receive Queue of NIC (%s:%s) has <%d> messages" % (self.senderIP, self.senderPort, self.receiveQueue.qsize()))

Expand Down
49 changes: 49 additions & 0 deletions Message.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,52 @@
class JoinMessage():
def __init__(self, IP, port):
self.nodeID = None
self.type = None # join, query, matcher
self.IP = IP
self.port = port
self.registeredNodes = {}


def setNodeID(self, nodeID):
self.nodeID = nodeID


def getNodeID(self):
return self.nodeID


def setIP(self, IP):
self.IP = IP


def setPort(self, port):
self.port = port


def getIP(self):
return self.IP


def getPort(self):
return self.port


def getType(self):
return self.type


def setType(self, type):
self.type = type


def setRegisteredNodes(self, nodes):
self.registeredNodes = nodes


def getRegisteredNodes(self):
return self.registeredNodes


class Message(object):


Expand Down
Loading