diff --git a/http-proxy/acceptor.py b/http-proxy/acceptor.py new file mode 100644 index 0000000..f0a884d --- /dev/null +++ b/http-proxy/acceptor.py @@ -0,0 +1,46 @@ +from socket import socket, AF_INET, SOCK_STREAM, SOL_SOCKET, SO_REUSEADDR, SHUT_WR +from threading import Thread, RLock +import logging + +from cache import Cache +from worker import Worker + +BACKLOG_SIZE = 10 + + +class Acceptor(Thread): + def __init__(self, address, port, cache_expire, cache_max_size): + super().__init__() + logging.info("Accept connection address: %s, port %s" % (address, port)) + self.__address = address + self.__port = port + self.__cache_expire = cache_expire + self.__cache_max_size = cache_max_size + self.__server_socket = socket(AF_INET, SOCK_STREAM) + self.__server_socket.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1) + self.__server_socket.bind((address, port)) + self.__server_socket.listen(BACKLOG_SIZE) + self.__interrupted_mutex = RLock() + self.__interrupted = False + + def interrupt(self): + with self.__interrupted_mutex: + socket(AF_INET, SOCK_STREAM).connect((self.__address, self.__port)) + self.__interrupted = True + + def run(self): + workers = [] + cache = Cache(self.__cache_expire, self.__cache_max_size) + while True: + (client_socket, _) = self.__server_socket.accept() + with self.__interrupted_mutex: + if self.__interrupted: + break + logging.info("Accept new connection from: %s" % str(client_socket.getpeername())) + worker = Worker(client_socket, cache) + workers.append(worker) + worker.start() + self.__server_socket.shutdown(SHUT_WR) + for worker in workers: + worker.interrupt() + worker.join() diff --git a/http-proxy/cache.py b/http-proxy/cache.py new file mode 100644 index 0000000..7f18f9b --- /dev/null +++ b/http-proxy/cache.py @@ -0,0 +1,66 @@ +import sys +import logging +from threading import RLock +from time import time +from operator import itemgetter + + +class Cache: + + def __init__(self, expire, max_size): + self.__cache = {} + self.__expire = expire + self.__max_size = max_size + self.__dict_mutex = RLock() + self.__cache_size = 0 + + def get(self, request): + with self.__dict_mutex: + if request.can_cache() and not self.__check_expire(request): + logging.info("Get from cache: %s" % (str(request))) + return self.__cache.get(str(request), (None, None)) + else: + return None, None + + def put(self, request, response): + if not request.can_cache() or not response.can_cache_with_size(self.__max_size): + return + with self.__dict_mutex: + self.__clear_old_cache(response) + if self.__check_cache_size(response): + self.__cache[str(request)] = (time(), response) + self.__cache_size += sys.getsizeof(response) + + def __check_expire(self, request): + set_time, value = self.__cache.get(str(request), (None, None)) + if set_time is not None: + if set_time + self.__expire < time(): + self.__pop_key_and_log(request) + return True + return False + return True + + def __check_cache_size(self, response): + return self.__cache_size + sys.getsizeof(response) < self.__max_size + + def __clear_old_cache(self, response): + if self.__check_cache_size(response): + return + key_for_deleted = [] + for key, value in self.__cache.items(): + if value[0] + self.__expire < time(): + key_for_deleted.append(key) + for key in key_for_deleted: + self.__pop_key_and_log(key) + sorted_by_time_cache = sorted(self.__cache.items(), key=itemgetter(0)) + for key, value in sorted_by_time_cache: + if self.__check_cache_size(response): + return + self.__pop_key_and_log(key) + + def __pop_key_and_log(self, key): + key = str(key) + _, value = self.__cache[key] + logging.info("Remove from cache: %s" % str(value)) + self.__cache_size -= sys.getsizeof(value) + self.__cache.pop(str(key)) diff --git a/http-proxy/connection.py b/http-proxy/connection.py new file mode 100644 index 0000000..6556d5a --- /dev/null +++ b/http-proxy/connection.py @@ -0,0 +1,54 @@ +from socket import socket, AF_INET, SOCK_STREAM, SHUT_WR +import logging + +from httparser import HTTParser + +MAX_CHUNK_LEN = 1024 + + +class Connection: + + def __init__(self, sock=None): + self.__socket = sock + self.__host = "client" + + def establish(self, host): + self.__host = host + self.__socket = socket(AF_INET, SOCK_STREAM) + self.__socket.connect(self.__host) + + def close(self): + try: + self.__socket.shutdown(SHUT_WR) + self.__socket.close() + except OSError: + pass + finally: + logging.info("Socket closed") + + def receive_message(self): + parser = HTTParser() + while True: + try: + chunk = self.__socket.recv(MAX_CHUNK_LEN) + except BrokenPipeError: + chunk = 0 + if not chunk: + logging.error("Connection aborted by %s" % self.__host) + return None + message = parser.append(chunk) + if message: + return message + + def send_message(self, message): + msg = message.to_bytes() + total_sent = 0 + while total_sent < len(msg): + try: + sent = self.__socket.send(msg[total_sent:]) + except BrokenPipeError: + sent = 0 + if not sent: + logging.error("Connection aborted by %s" % self.__host) + break + total_sent += sent diff --git a/http-proxy/httparser.py b/http-proxy/httparser.py new file mode 100644 index 0000000..528dc80 --- /dev/null +++ b/http-proxy/httparser.py @@ -0,0 +1,30 @@ +from message import Message, NEW_LINE_B + + +class HTTParser: + + def __init__(self): + self.__message = Message() + self.__head = "" + + def append(self, chunk): + if self.__head is None: + return self.__parse_body_part(chunk) + if NEW_LINE_B * 2 not in chunk: + self.__head += chunk.decode() + return None + parts = chunk.split(NEW_LINE_B * 2, 1) + self.__head += parts[0].decode() + self.__parse_head() + return self.__parse_body_part(parts[1] if len(parts) > 1 else b"") + + def __parse_head(self): + tokens = self.__head.splitlines() + self.__message.set_start_line(tokens[0]) + for token in tokens[1:]: + header = token.split(": ", maxsplit=1) + self.__message.add_header(header[0], header[1]) + self.__head = None + + def __parse_body_part(self, chunk): + return self.__message if self.__message.append_to_body(chunk) else None diff --git a/http-proxy/main.py b/http-proxy/main.py new file mode 100644 index 0000000..d93ddfb --- /dev/null +++ b/http-proxy/main.py @@ -0,0 +1,15 @@ +from sys import argv +from acceptor import Acceptor +import logging + +if __name__ == "__main__": + port = int(argv[1]) + cache_expire = int(argv[2]) + cache_max_size = int(argv[3]) + logging.basicConfig(filename="proxy.log", level=logging.INFO) + logging.info("Start proxy on port %s, with cache expire %s and cache size %s" % (port, cache_expire, cache_max_size)) + my_server = Acceptor("", port, cache_expire, cache_max_size) + my_server.start() + x = input("input anything to exit\n") + my_server.interrupt() + my_server.join() diff --git a/http-proxy/message.py b/http-proxy/message.py new file mode 100644 index 0000000..630410f --- /dev/null +++ b/http-proxy/message.py @@ -0,0 +1,99 @@ +import sys +from re import split, search +from time import ctime + +NEW_LINE = "\r\n" +NEW_LINE_B = b"\r\n" +HTTP_PORT = 80 +SUPPORTED_METHODS = ["GET", "POST", "HEAD"] + + +def not_implemented_response(): + response = Message() + response.set_start_line("HTTP/1.1 501 Not Implemented") + return response + + +class Message: + + def __init__(self): + self.__body = b"" + self.__start_line = "" + self.__headers = {} + + def set_start_line(self, start_line): + self.__start_line = start_line + + def add_header(self, header_title, header_value): + self.__headers[header_title] = header_value + + def append_to_body(self, chunk): + self.__body += chunk + if self.__is_chunked(): + complete = self.__body.endswith(b"0" + NEW_LINE_B * 2) + if complete: + self.__body = self.__body[:-2] + return complete + return len(self.__body) >= self.__get_body_len() + + def get_body(self): + return self.__body + + def get_status(self): + return self.__headers.get("Status") + + def to_bytes(self): + message = self.__start_line + NEW_LINE + message += NEW_LINE.join(["%s: %s" % header for header in self.__headers.items()]) + message += NEW_LINE * 2 + message = message.encode() + if self.__get_body_len() > 0 or self.__is_chunked(): + message += self.__body + NEW_LINE_B + return message + + def get_host(self): + host = self.__headers.get("Host") + if host: + host = split(r":", host) + return host[0], int(host[1]) if len(host) > 1 else HTTP_PORT + return None, None + + def can_cache(self): + if self.__headers.get("Cache-Control") is None: + return True + return search(r"no-cache|no-store", self.__headers.get("Cache-Control")) is None + + def can_cache_with_size(self, cache_size): + return self.can_cache() and sys.getsizeof(self) <= cache_size + + def is_modify(self): + return self.get_status() != 304 + + def add_modify_request(self, timestamp): + self.add_header('If-Modified-Since', ctime(timestamp)) + + def is_method_supported(self): + method = self.__start_line.split(" ")[0] + return method in SUPPORTED_METHODS + + def __get_body_len(self): + return int(self.__headers.get("Content-Length", "0")) + + def __is_chunked(self): + return self.__headers.get("Transfer-Encoding", "") == "chunked" + + def __hash__(self): + return hash(self.__body) + hash(self.__start_line) + hash(self.get_host()) + + def __eq__(self, other): + if not isinstance(other, Message): + return False + return (self.__body == other.__body + and self.__start_line == other.__start_line and self.__headers == other.__headers) + + def __str__(self): + host, port = self.get_host() + body = self.__body if self.__body is not None else "" + if host is not None: + return "%s %s:%s %s" % (self.__start_line, host, port, body) + return "%s %s" % (self.__start_line, body) diff --git a/http-proxy/test.py b/http-proxy/test.py new file mode 100644 index 0000000..a93606b --- /dev/null +++ b/http-proxy/test.py @@ -0,0 +1,85 @@ +from unittest import TestCase +from time import sleep +from cache import Cache +from message import Message + + +BODY = b'Click here' + + +class Test(TestCase): + + def test_create_message(self): + message = Message() + message.set_start_line("GET /background.png HTTP/1.0") + message.add_header("Host", "example.org") + message.add_header("Content-Length", "110") + message.append_to_body(BODY) + self.assertEqual(b'GET /background.png HTTP/1.0\r\n' + b'Host: example.org\r\nContent-Length: 110\r\n\r\n' + b'Click here' + b'\r\n', message.to_bytes()) + + def test_get_host_message(self): + message = Message() + message.set_start_line("GET /background.png HTTP/1.0") + message.add_header("Host", "example.org") + message.add_header("Content-Length", "110") + message.append_to_body(BODY) + host = message.get_host() + self.assertEqual(("example.org", 80), host) + + def test_need_cash_message(self): + message = Message() + message.set_start_line("GET /background.png HTTP/1.0") + message.add_header("Host", "example.org") + message.add_header("Content-Length", "110") + message.append_to_body(BODY) + can = message.can_cache() + message.add_header("Cache-Control", "no-store") + cant = message.can_cache() + self.assertEqual(True, can) + self.assertEqual(False, cant) + + def test_put_in_cash(self): + message = Message() + message.set_start_line("GET /background.png HTTP/1.0") + message.add_header("Host", "example.org") + message.add_header("Content-Length", "110") + message.append_to_body(BODY) + cache = Cache(1000, 100000) + cache.put(message, message) + _, getting_message = cache.get(message) + self.assertEqual(message, getting_message) + + def test_expire_cash(self): + message = Message() + message.set_start_line("GET /background.png HTTP/1.0") + message.add_header("Host", "example.org") + message.add_header("Content-Length", "110") + message.append_to_body(BODY) + cache = Cache(1, 100000) + cache.put(message, message) + sleep(5) + getting_message = cache.get(message) + self.assertEqual((None, None), getting_message) + + def test_size_cash(self): + message = Message() + message1 = Message() + message.set_start_line("GET /background.png HTTP/1.0") + message1.set_start_line("GET /background.png HTTP/1.0") + message.add_header("Host", "example.org") + message.add_header("Content-Length", "110") + message1.add_header("Content-Length", "110") + message.append_to_body(BODY) + message1.append_to_body(BODY) + cache = Cache(100000, 60) + cache.put(message, message) + _, getting_message = cache.get(message) + cache.put(message1, message1) + _, getting_message1 = cache.get(message1) + empty_message = cache.get(message) + self.assertEqual(message, getting_message) + self.assertEqual(message1, getting_message1) + self.assertEqual((None, None), empty_message) diff --git a/http-proxy/worker.py b/http-proxy/worker.py new file mode 100644 index 0000000..6b9e708 --- /dev/null +++ b/http-proxy/worker.py @@ -0,0 +1,53 @@ +from threading import Thread +from connection import Connection +import logging + +from message import not_implemented_response + + +class Worker(Thread): + + def __init__(self, sock, cache): + super().__init__() + self.__socket = sock + self.__cache = cache + self.__client_connection = Connection(self.__socket) + + def interrupt(self): + self.__client_connection.close() + + def run(self): + request = self.__client_connection.receive_message() + if request is None: + logging.info("Could not get request") + return + logging.info("Get request: %s" % (str(request))) + if not request.is_method_supported(): + response = not_implemented_response() + else: + response = self.__get_from_cache_with_check(self.__cache.get(request), request) + if response is None: + response = Worker.__connect_and_send(request) + logging.info("Send response: %s" % (str(response))) + self.__client_connection.send_message(response) + self.__cache.put(request, response) + self.__client_connection.close() + + @staticmethod + def __connect_and_send(request): + server_connection = Connection() + server_connection.establish(request.get_host()) + server_connection.send_message(request) + response = server_connection.receive_message() + server_connection.close() + return response + + @staticmethod + def __get_from_cache_with_check(cache_answer, request): + timestamp, response = cache_answer + if timestamp is not None: + request.add_modify_request(timestamp) + tmp_response = Worker.__connect_and_send(request) + if tmp_response.is_modify(): + response = None + return response