Update. master
authorFrançois Fleuret <francois@fleuret.org>
Mon, 28 Oct 2024 12:14:42 +0000 (13:14 +0100)
committerFrançois Fleuret <francois@fleuret.org>
Mon, 28 Oct 2024 12:14:42 +0000 (13:14 +0100)
covid19.py
distributed.py [new file with mode: 0755]

index beccb9b..fdfe16e 100755 (executable)
@@ -13,30 +13,32 @@ import urllib.request
 
 ######################################################################
 
-def gentle_download(url, delay = 86400):
-    filename = url[url.rfind('/') + 1:]
+
+def gentle_download(url, delay=86400):
+    filename = url[url.rfind("/") + 1 :]
     if not os.path.isfile(filename) or os.path.getmtime(filename) < time.time() - delay:
-        print(f'Retrieving {url}')
+        print(f"Retrieving {url}")
         urllib.request.urlretrieve(url, filename)
     return filename
 
+
 ######################################################################
 
 nbcases_filename = gentle_download(
-    'https://raw.githubusercontent.com/CSSEGISandData/COVID-19/master/csse_covid_19_data/csse_covid_19_time_series/time_series_covid19_confirmed_global.csv'
+    "https://raw.githubusercontent.com/CSSEGISandData/COVID-19/master/csse_covid_19_data/csse_covid_19_time_series/time_series_covid19_confirmed_global.csv"
 )
 
 ######################################################################
 
-with open(nbcases_filename, newline='') as csvfile:
-    reader = csv.reader(csvfile, delimiter=',')
+with open(nbcases_filename, newline="") as csvfile:
+    reader = csv.reader(csvfile, delimiter=",")
     times = []
     nb_cases = {}
     time_col = 5
     for row_nb, row in enumerate(reader):
         for col_nb, field in enumerate(row):
             if row_nb == 0 and col_nb >= time_col:
-                times.append(time.mktime(time.strptime(field, '%m/%d/%y')))
+                times.append(time.mktime(time.strptime(field, "%m/%d/%y")))
             if row_nb >= 1:
                 if col_nb == 1:
                     country = field
@@ -48,41 +50,40 @@ with open(nbcases_filename, newline='') as csvfile:
 
 countries = list(nb_cases.keys())
 countries.sort()
-print('Countries: ', countries)
+print("Countries: ", countries)
 
-nb_cases['World'] = sum(nb_cases.values())
+nb_cases["World"] = sum(nb_cases.values())
 
 ######################################################################
 
 fig = plt.figure()
 ax = fig.add_subplot(1, 1, 1)
 
-ax.yaxis.grid(color='gray', linestyle='-', linewidth=0.25)
-ax.set_title('Nb. of COVID-19 cases')
-ax.set_xlabel('Date', labelpad = 10)
-ax.set_yscale('log')
+ax.yaxis.grid(color="gray", linestyle="-", linewidth=0.25)
+ax.set_title("Nb. of COVID-19 cases")
+ax.set_xlabel("Date", labelpad=10)
+ax.set_yscale("log")
 
-myFmt = mdates.DateFormatter('%b %d')
+myFmt = mdates.DateFormatter("%b %d")
 
 ax.xaxis.set_major_formatter(myFmt)
 dates = mdates.epoch2num(times)
 
 for key, color, label in [
-        ('World', 'blue', 'World'),
-        ('Switzerland', 'red', 'Switzerland'),
-        ('France', 'lightgreen', 'France'),
-        ('US', 'black', 'USA'),
-        ('Korea, South', 'gray', 'South Korea'),
-        ('Italy', 'purple', 'Italy'),
-        ('China', 'orange', 'China')
+    ("World", "blue", "World"),
+    ("Switzerland", "red", "Switzerland"),
+    ("France", "lightgreen", "France"),
+    ("US", "black", "USA"),
+    ("Korea, South", "gray", "South Korea"),
+    ("Italy", "purple", "Italy"),
+    ("China", "orange", "China"),
 ]:
-    ax.plot(dates, nb_cases[key],
-            color = color, label = label, linewidth = 2)
+    ax.plot(dates, nb_cases[key], color=color, label=label, linewidth=2)
 
-ax.legend(frameon = False)
+ax.legend(frameon=False)
 
 plt.show()
 
-fig.savefig('covid19_nb_cases.png')
+fig.savefig("covid19_nb_cases.png")
 
 ######################################################################
diff --git a/distributed.py b/distributed.py
new file mode 100755 (executable)
index 0000000..adaa36f
--- /dev/null
@@ -0,0 +1,138 @@
+#!/usr/bin/env python
+
+import time, socket, threading, struct, pickle
+
+import math, sys, argparse
+
+######################################################################
+
+parser = argparse.ArgumentParser(
+    formatter_class=argparse.ArgumentDefaultsHelpFormatter,
+)
+
+parser.add_argument("--server_host", type=str, default=None)
+
+parser.add_argument("--port", type=int, default=12021)
+
+args = parser.parse_args()
+
+######################################################################
+
+
+class SocketConnection:
+    def __init__(self, established_socket, read_len=16384):
+        self.read_len = read_len
+        self.socket = established_socket
+        self.socket.setblocking(1)
+        self.buffer = b""
+        self.SEND_LOCK = threading.Lock()
+        self.RECEIVE_LOCK = threading.Lock()
+        self.failed = False
+
+    def send(self, x):
+        with self.SEND_LOCK:
+            data = pickle.dumps(x)
+            self.socket.send(struct.pack("!i", len(data)))
+            self.socket.sendall(data)
+
+    def raw_read(self, l):
+        while len(self.buffer) < l:
+            d = self.socket.recv(self.read_len)
+            if d:
+                self.buffer += d
+            else:
+                raise EOFError()
+
+        x = self.buffer[:l]
+        self.buffer = self.buffer[l:]
+        return x
+
+    def receive(self):
+        with self.RECEIVE_LOCK:
+            l = struct.unpack("!i", self.raw_read(4))[0]
+            return pickle.loads(self.raw_read(l))
+
+
+######################################################################
+
+
+class CultureServer:
+    def __init__(self, port):
+        s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+        s.bind(("0.0.0.0", port))
+        s.listen(5)
+        self.nb_accepts = 0
+
+        while True:
+            client_socket, ip_and_port = s.accept()
+            link = SocketConnection(client_socket)
+
+            threading.Thread(
+                target=self.client_loop,
+                kwargs={
+                    "link": link,
+                    "nb": self.nb_accepts,
+                },
+                daemon=True,
+            ).start()
+
+            self.nb_accepts += 1
+
+    def client_loop(self, link, nb):
+        link.send(f"HELLO #{nb}")
+        try:
+            while True:
+                r = link.receive()
+                print(f'from #{nb} receive "{r}"')
+                link.send(f"ACK {r}")
+                if r == "BYE":
+                    break
+        except EOFError:
+            print(f"closing #{nb} on EOF")
+
+
+######################################################################
+
+
+class CultureClient:
+    def __init__(self, server_hostname, port):
+        server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+        server_socket.connect((server_hostname, port))
+        self.link = SocketConnection(server_socket)
+
+        threading.Thread(target=self.receive, daemon=True).start()
+
+        self.send()
+        # threading.Thread(target=self.send, daemon=True).start()
+
+    def receive(self):
+        try:
+            while True:
+                x = self.link.receive()
+                print(f'CultureClient receive "{x}"')
+        except EOFError:
+            print(f"closing connection on EOF")
+
+    def send(self):
+        try:
+            self.link.send(f"HELLO")
+            x = 0
+            while True:
+                time.sleep(5)
+                print(f'CultureClient send "{x}"')
+                self.link.send(x)
+                x += 1
+        except BrokenPipeError:
+            print(f"closing connection on broken pipe")
+
+
+######################################################################
+
+if args.server_host is None:
+    print(f"Starting server port {args.port}")
+    CultureServer(args.port)
+else:
+    print(f"Starting client connecting to {args.server_host}:{args.port}")
+    CultureClient(args.server_host, args.port)
+
+######################################################################