Update.
authorFrançois Fleuret <francois@fleuret.org>
Tue, 29 Oct 2024 08:21:05 +0000 (09:21 +0100)
committerFrançois Fleuret <francois@fleuret.org>
Tue, 29 Oct 2024 08:21:05 +0000 (09:21 +0100)
distributed.py

index 6fe3f6b..91d33ee 100755 (executable)
@@ -10,7 +10,7 @@ parser = argparse.ArgumentParser(
     formatter_class=argparse.ArgumentDefaultsHelpFormatter,
 )
 
-parser.add_argument("--server_host", type=str, default=None)
+parser.add_argument("--server", type=str, default=None)
 
 parser.add_argument("--port", type=int, default=12021)
 
@@ -56,44 +56,50 @@ class SocketConnection:
 ######################################################################
 
 
-def create_server(port, reader, writer):
+def start_server(port, core, reader):
     s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
     s.bind(("0.0.0.0", port))
     s.listen(5)
-    self.nb_accepts = 0
+    nb_accepts = 0
 
-    def reading_loop(self, link, client_nb):
+    def reader_thread(reader, link, client_nb):
         try:
             while True:
-                reader(link.receive())
+                reader(link.receive(), client_nb)
         except EOFError:
-            print(f"closing #{client_nb} on EOF")
+            print(f"closing reader #{client_nb} on EOFError")
+
+    def core_thread(writer, client_nb):
+        try:
+            core(writer, client_nb)
+        except BrokenPipeError:
+            print(f"closing core #{client_nb} on BrokenPipeError")
 
     while True:
         client_socket, ip_and_port = s.accept()
         link = SocketConnection(client_socket)
 
         threading.Thread(
-            target=writer,
-            kwargs={"link": link.send, "client_nb": self.nb_accepts},
+            target=core_thread,
+            kwargs={"writer": link.send, "client_nb": nb_accepts},
             daemon=True,
         ).start()
 
         threading.Thread(
-            target=reading_loop,
-            kwargs={"link": link, "client_nb": self.nb_accepts},
+            target=reader_thread,
+            kwargs={"reader": reader, "link": link, "client_nb": nb_accepts},
             daemon=True,
         ).start()
 
-        self.nb_accepts += 1
+        nb_accepts += 1
 
 
 ######################################################################
 
 
-def create_client(server_hostname, port, reader):
+def create_client(servername, port, reader):
     server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
-    server_socket.connect((server_hostname, port))
+    server_socket.connect((servername, port))
     link = SocketConnection(server_socket)
 
     def reader_thread(reader):
@@ -101,7 +107,7 @@ def create_client(server_hostname, port, reader):
             reader(link.receive())
 
     def writer(x):
-        self.link.send(x)
+        link.send(x)
 
     threading.Thread(
         target=reader_thread, kwargs={"reader": reader}, daemon=True
@@ -112,11 +118,30 @@ def create_client(server_hostname, port, reader):
 
 ######################################################################
 
-if args.server_host is None:
+if args.server is None:
     print(f"Starting server port {args.port}")
-    CultureServer(args.port)
+
+    def reader(x, nb):
+        print(f'Server received from client #{nb} "{x}"')
+
+    def core(writer, client_nb):
+        writer(f"HELLO {client_nb}")
+        while True:
+            writer(f"PONG {time.localtime().tm_sec}")
+            time.sleep(3)
+
+    start_server(port=args.port, core=core, reader=reader)
+
 else:
-    print(f"Starting client connecting to {args.server_host}:{args.port}")
-    CultureClient(args.server_host, args.port)
+    print(f"Starting client connecting to {args.server}:{args.port}")
+
+    def reader(x):
+        print(f'Client received "{x}"')
+
+    writer = create_client(args.server, args.port, reader)
+
+    while True:
+        writer(f"PING {time.localtime().tm_sec}")
+        time.sleep(3)
 
 ######################################################################