From 79f9eff4d409f2671628a2f1024d65cac0ba3cce Mon Sep 17 00:00:00 2001 From: =?utf8?q?Fran=C3=A7ois=20Fleuret?= Date: Mon, 28 Oct 2024 13:14:42 +0100 Subject: [PATCH] Update. --- covid19.py | 51 +++++++++--------- distributed.py | 138 +++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 164 insertions(+), 25 deletions(-) create mode 100755 distributed.py diff --git a/covid19.py b/covid19.py index beccb9b..fdfe16e 100755 --- a/covid19.py +++ b/covid19.py @@ -13,30 +13,32 @@ import urllib.request ###################################################################### -def gentle_download(url, delay = 86400): - filename = url[url.rfind('/') + 1:] + +def gentle_download(url, delay=86400): + filename = url[url.rfind("/") + 1 :] if not os.path.isfile(filename) or os.path.getmtime(filename) < time.time() - delay: - print(f'Retrieving {url}') + print(f"Retrieving {url}") urllib.request.urlretrieve(url, filename) return filename + ###################################################################### nbcases_filename = gentle_download( - 'https://raw.githubusercontent.com/CSSEGISandData/COVID-19/master/csse_covid_19_data/csse_covid_19_time_series/time_series_covid19_confirmed_global.csv' + "https://raw.githubusercontent.com/CSSEGISandData/COVID-19/master/csse_covid_19_data/csse_covid_19_time_series/time_series_covid19_confirmed_global.csv" ) ###################################################################### -with open(nbcases_filename, newline='') as csvfile: - reader = csv.reader(csvfile, delimiter=',') +with open(nbcases_filename, newline="") as csvfile: + reader = csv.reader(csvfile, delimiter=",") times = [] nb_cases = {} time_col = 5 for row_nb, row in enumerate(reader): for col_nb, field in enumerate(row): if row_nb == 0 and col_nb >= time_col: - times.append(time.mktime(time.strptime(field, '%m/%d/%y'))) + times.append(time.mktime(time.strptime(field, "%m/%d/%y"))) if row_nb >= 1: if col_nb == 1: country = field @@ -48,41 +50,40 @@ with open(nbcases_filename, newline='') as csvfile: countries = list(nb_cases.keys()) countries.sort() -print('Countries: ', countries) +print("Countries: ", countries) -nb_cases['World'] = sum(nb_cases.values()) +nb_cases["World"] = sum(nb_cases.values()) ###################################################################### fig = plt.figure() ax = fig.add_subplot(1, 1, 1) -ax.yaxis.grid(color='gray', linestyle='-', linewidth=0.25) -ax.set_title('Nb. of COVID-19 cases') -ax.set_xlabel('Date', labelpad = 10) -ax.set_yscale('log') +ax.yaxis.grid(color="gray", linestyle="-", linewidth=0.25) +ax.set_title("Nb. of COVID-19 cases") +ax.set_xlabel("Date", labelpad=10) +ax.set_yscale("log") -myFmt = mdates.DateFormatter('%b %d') +myFmt = mdates.DateFormatter("%b %d") ax.xaxis.set_major_formatter(myFmt) dates = mdates.epoch2num(times) for key, color, label in [ - ('World', 'blue', 'World'), - ('Switzerland', 'red', 'Switzerland'), - ('France', 'lightgreen', 'France'), - ('US', 'black', 'USA'), - ('Korea, South', 'gray', 'South Korea'), - ('Italy', 'purple', 'Italy'), - ('China', 'orange', 'China') + ("World", "blue", "World"), + ("Switzerland", "red", "Switzerland"), + ("France", "lightgreen", "France"), + ("US", "black", "USA"), + ("Korea, South", "gray", "South Korea"), + ("Italy", "purple", "Italy"), + ("China", "orange", "China"), ]: - ax.plot(dates, nb_cases[key], - color = color, label = label, linewidth = 2) + ax.plot(dates, nb_cases[key], color=color, label=label, linewidth=2) -ax.legend(frameon = False) +ax.legend(frameon=False) plt.show() -fig.savefig('covid19_nb_cases.png') +fig.savefig("covid19_nb_cases.png") ###################################################################### diff --git a/distributed.py b/distributed.py new file mode 100755 index 0000000..adaa36f --- /dev/null +++ b/distributed.py @@ -0,0 +1,138 @@ +#!/usr/bin/env python + +import time, socket, threading, struct, pickle + +import math, sys, argparse + +###################################################################### + +parser = argparse.ArgumentParser( + formatter_class=argparse.ArgumentDefaultsHelpFormatter, +) + +parser.add_argument("--server_host", type=str, default=None) + +parser.add_argument("--port", type=int, default=12021) + +args = parser.parse_args() + +###################################################################### + + +class SocketConnection: + def __init__(self, established_socket, read_len=16384): + self.read_len = read_len + self.socket = established_socket + self.socket.setblocking(1) + self.buffer = b"" + self.SEND_LOCK = threading.Lock() + self.RECEIVE_LOCK = threading.Lock() + self.failed = False + + def send(self, x): + with self.SEND_LOCK: + data = pickle.dumps(x) + self.socket.send(struct.pack("!i", len(data))) + self.socket.sendall(data) + + def raw_read(self, l): + while len(self.buffer) < l: + d = self.socket.recv(self.read_len) + if d: + self.buffer += d + else: + raise EOFError() + + x = self.buffer[:l] + self.buffer = self.buffer[l:] + return x + + def receive(self): + with self.RECEIVE_LOCK: + l = struct.unpack("!i", self.raw_read(4))[0] + return pickle.loads(self.raw_read(l)) + + +###################################################################### + + +class CultureServer: + def __init__(self, port): + s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + s.bind(("0.0.0.0", port)) + s.listen(5) + self.nb_accepts = 0 + + while True: + client_socket, ip_and_port = s.accept() + link = SocketConnection(client_socket) + + threading.Thread( + target=self.client_loop, + kwargs={ + "link": link, + "nb": self.nb_accepts, + }, + daemon=True, + ).start() + + self.nb_accepts += 1 + + def client_loop(self, link, nb): + link.send(f"HELLO #{nb}") + try: + while True: + r = link.receive() + print(f'from #{nb} receive "{r}"') + link.send(f"ACK {r}") + if r == "BYE": + break + except EOFError: + print(f"closing #{nb} on EOF") + + +###################################################################### + + +class CultureClient: + def __init__(self, server_hostname, port): + server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + server_socket.connect((server_hostname, port)) + self.link = SocketConnection(server_socket) + + threading.Thread(target=self.receive, daemon=True).start() + + self.send() + # threading.Thread(target=self.send, daemon=True).start() + + def receive(self): + try: + while True: + x = self.link.receive() + print(f'CultureClient receive "{x}"') + except EOFError: + print(f"closing connection on EOF") + + def send(self): + try: + self.link.send(f"HELLO") + x = 0 + while True: + time.sleep(5) + print(f'CultureClient send "{x}"') + self.link.send(x) + x += 1 + except BrokenPipeError: + print(f"closing connection on broken pipe") + + +###################################################################### + +if args.server_host is None: + print(f"Starting server port {args.port}") + CultureServer(args.port) +else: + print(f"Starting client connecting to {args.server_host}:{args.port}") + CultureClient(args.server_host, args.port) + +###################################################################### -- 2.39.5