Update.
authorFrançois Fleuret <francois@fleuret.org>
Mon, 28 Oct 2024 12:16:09 +0000 (13:16 +0100)
committerFrançois Fleuret <francois@fleuret.org>
Mon, 28 Oct 2024 12:16:09 +0000 (13:16 +0100)
distributed.py [new file with mode: 0755]

diff --git a/distributed.py b/distributed.py
new file mode 100755 (executable)
index 0000000..adaa36f
--- /dev/null
@@ -0,0 +1,138 @@
+#!/usr/bin/env python
+
+import time, socket, threading, struct, pickle
+
+import math, sys, argparse
+
+######################################################################
+
+parser = argparse.ArgumentParser(
+    formatter_class=argparse.ArgumentDefaultsHelpFormatter,
+)
+
+parser.add_argument("--server_host", type=str, default=None)
+
+parser.add_argument("--port", type=int, default=12021)
+
+args = parser.parse_args()
+
+######################################################################
+
+
+class SocketConnection:
+    def __init__(self, established_socket, read_len=16384):
+        self.read_len = read_len
+        self.socket = established_socket
+        self.socket.setblocking(1)
+        self.buffer = b""
+        self.SEND_LOCK = threading.Lock()
+        self.RECEIVE_LOCK = threading.Lock()
+        self.failed = False
+
+    def send(self, x):
+        with self.SEND_LOCK:
+            data = pickle.dumps(x)
+            self.socket.send(struct.pack("!i", len(data)))
+            self.socket.sendall(data)
+
+    def raw_read(self, l):
+        while len(self.buffer) < l:
+            d = self.socket.recv(self.read_len)
+            if d:
+                self.buffer += d
+            else:
+                raise EOFError()
+
+        x = self.buffer[:l]
+        self.buffer = self.buffer[l:]
+        return x
+
+    def receive(self):
+        with self.RECEIVE_LOCK:
+            l = struct.unpack("!i", self.raw_read(4))[0]
+            return pickle.loads(self.raw_read(l))
+
+
+######################################################################
+
+
+class CultureServer:
+    def __init__(self, port):
+        s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+        s.bind(("0.0.0.0", port))
+        s.listen(5)
+        self.nb_accepts = 0
+
+        while True:
+            client_socket, ip_and_port = s.accept()
+            link = SocketConnection(client_socket)
+
+            threading.Thread(
+                target=self.client_loop,
+                kwargs={
+                    "link": link,
+                    "nb": self.nb_accepts,
+                },
+                daemon=True,
+            ).start()
+
+            self.nb_accepts += 1
+
+    def client_loop(self, link, nb):
+        link.send(f"HELLO #{nb}")
+        try:
+            while True:
+                r = link.receive()
+                print(f'from #{nb} receive "{r}"')
+                link.send(f"ACK {r}")
+                if r == "BYE":
+                    break
+        except EOFError:
+            print(f"closing #{nb} on EOF")
+
+
+######################################################################
+
+
+class CultureClient:
+    def __init__(self, server_hostname, port):
+        server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+        server_socket.connect((server_hostname, port))
+        self.link = SocketConnection(server_socket)
+
+        threading.Thread(target=self.receive, daemon=True).start()
+
+        self.send()
+        # threading.Thread(target=self.send, daemon=True).start()
+
+    def receive(self):
+        try:
+            while True:
+                x = self.link.receive()
+                print(f'CultureClient receive "{x}"')
+        except EOFError:
+            print(f"closing connection on EOF")
+
+    def send(self):
+        try:
+            self.link.send(f"HELLO")
+            x = 0
+            while True:
+                time.sleep(5)
+                print(f'CultureClient send "{x}"')
+                self.link.send(x)
+                x += 1
+        except BrokenPipeError:
+            print(f"closing connection on broken pipe")
+
+
+######################################################################
+
+if args.server_host is None:
+    print(f"Starting server port {args.port}")
+    CultureServer(args.port)
+else:
+    print(f"Starting client connecting to {args.server_host}:{args.port}")
+    CultureClient(args.server_host, args.port)
+
+######################################################################