import struct import logging import gevent from gevent import socket from gevent import queue from locust.exception import LocustError from .protocol import Message logger = logging.getLogger(__name__) def _recv_bytes(sock, bytes): data = "" while bytes: temp = sock.recv(bytes) if not temp: raise Exception("Connection reset by peer? Received so far: %r" % (data, )) bytes -= len(temp) data += temp return data def _send_obj(sock, msg): data = msg.serialize() packed = struct.pack('!i', len(data)) + data try: sock.sendall(packed) except Exception as e: try: sock.close() except: pass finally: raise LocustError("Slave has disconnected") def _recv_obj(sock): d = _recv_bytes(sock, 4) bytes, = struct.unpack('!i', d) data = _recv_bytes(sock, bytes) return Message.unserialize(data) class Client(object): def __init__(self, host, port): self.host = host self.port = port self.command_queue = gevent.queue.Queue() self.socket = self._connect() def _connect(self): sock = socket.create_connection((self.host, self.port)) def handle(): try: while True: self.command_queue.put_nowait(_recv_obj(sock)) except Exception as e: try: sock.close() except: pass gevent.spawn(handle) return sock def send(self, event): _send_obj(self.socket, event) def recv(self): return self.command_queue.get() class Server(object): def __init__(self, host, port): self.host = "0.0.0.0" if host == "*" else host self.port = port self.event_queue = gevent.queue.Queue() self.command_dispatcher = self._listen() def send(self, msg): self.command_dispatcher(msg) def recv(self): return self.event_queue.get() def _listen(self): sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) sock.bind((self.host, self.port)) sock.listen(256) self.slave_index = 0 slaves = [] def dispatch_command(cmd): _send_obj(slaves[self.slave_index], cmd) self.slave_index += 1 if self.slave_index == len(slaves): self.slave_index = 0 def handle_slave(sock): try: while True: self.event_queue.put_nowait(_recv_obj(sock)) except Exception as e: logger.info("Slave disconnected") slaves.remove(sock) if self.slave_index == len(slaves) and len(slaves) > 0: self.slave_index -= 1 try: sock.close() except: pass def listener(): while True: _socket, _addr = sock.accept() logger.info("Slave connected") slaves.append(_socket) gevent.spawn(lambda: handle_slave(_socket)) gevent.spawn(listener) return dispatch_command
# | Change | User | Description | Committed | |
---|---|---|---|---|---|
#1 | 21680 | Robert Cowham |
Populate -o //guest/robert_cowham/p4benchmark/main/locust/... //guest/robert_cowham/p4benchmark/github/locust/.... |
||
//guest/robert_cowham/p4benchmark/main/locust/locust/rpc/socketrpc.py | |||||
#1 | 19772 | Robert Cowham | Initial version of locust |