AsyncSocket
BytesConverter
- class BytesConverter.BytesConverter[source]
- bytes_to_data(bytes_fields: List[bytes]) Union[None, bytes, str, bool, int, float, List, ndarray] [source]
Recover data from bytes fields. Available types: None, bytes, str, bool, signed int, float, list, ndarray.
- Parameters
bytes_fields – Bytes fields (Type, Data, Args).
- Returns
Converted data.
- data_to_bytes(data: Union[None, bytes, str, bool, int, float, List, ndarray], as_list: bool = False) Union[bytes, List[bytes]] [source]
Convert data to bytes. Available types: None, bytes, str, bool, signed int, float, list, ndarray.
- Parameters
data – Data to convert.
as_list – (For tests only, False by default) If False, the whole bytes message is returned. If True, the return will be a list of bytes fields.
- Returns
Concatenated bytes fields (Number of fields, Size of fields, Type, Data, Args).
TcpIpClient
Bases:
TcpIpObject.TcpIpObject
- class TcpIpClient.TcpIpClient(environment: Type[AbstractEnvironment], ip_address: str = 'localhost', port: int = 10000, instance_id: int = 0, instance_nb: int = 1)[source]
-
- async __communicate(server: socket) None
Communication protocol with a server. First receive a command from the client, then process the appropriate actions.
- Parameters
server – TcpIpServer to communicate with.
- async action_on_change_db(data: Dict[Any, Any], client_id: int, sender: socket, loop: AbstractEventLoop) None [source]
Action to run when receiving the ‘step’ command.
- Parameters
data – Dict storing data.
client_id – ID of the TcpIpClient.
loop – Asyncio event loop.
sender – TcpIpObject sender.
- async action_on_exit(data: ndarray, client_id: int, sender: socket, loop: AbstractEventLoop) None [source]
Action to run when receiving the ‘exit’ command.
- Parameters
data – Dict storing data.
client_id – ID of the TcpIpClient.
loop – Asyncio event loop.
sender – TcpIpObject sender.
- async action_on_prediction(data: ndarray, client_id: int, sender: socket, loop: AbstractEventLoop) None [source]
Action to run when receiving the ‘prediction’ command.
- Parameters
data – Dict storing data.
client_id – ID of the TcpIpClient.
loop – Asyncio event loop.
sender – TcpIpObject sender.
- async action_on_sample(data: ndarray, client_id: int, sender: socket, loop: AbstractEventLoop) None [source]
Action to run when receiving the ‘sample’ command.
- Parameters
data – Dict storing data.
client_id – ID of the TcpIpClient.
loop – Asyncio event loop.
sender – TcpIpObject sender.
- async action_on_step(data: ndarray, client_id: int, sender: socket, loop: AbstractEventLoop) None [source]
Action to run when receiving the ‘step’ command.
- Parameters
data – Dict storing data.
client_id – ID of the TcpIpClient.
loop – Asyncio event loop.
sender – TcpIpObject sender.
TcpIpObject
- class TcpIpObject.TcpIpObject(ip_address: str = 'localhost', port: int = 10000)[source]
- async __send_command(loop: AbstractEventLoop, receiver: socket, command: str = '') None
Send a bytes command among the available commands. Do not use this one. Use the dedicated function ‘send_command_{cmd}(…)’.
- Parameters
loop – Asyncio event loop.
receiver – TcpIpObject receiver.
command – Name of the command, must be in ‘self.command_dict’.
- async __send_unnamed_dict(dict_to_send: Dict[Any, Any], loop: Optional[AbstractEventLoop] = None, receiver: Optional[socket] = None) None
Send a whole dictionary field by field as labeled data. Dictionary will be unnamed.
- Parameters
dict_to_send – Dictionary to send
loop – Asyncio event loop.
receiver – TcpIpObject receiver.
- __sync_send_command(receiver: socket, command: str = '') None
Send a bytes command among the available commands. Do not use this one. Use the dedicated function ‘sync_send_command_{cmd}(…)’. Synchronous version of ‘TcpIpObject.send_command’.
- Parameters
command – Name of the command to send.
receiver – TcpIpObject receiver.
- __sync_send_unnamed_dict(dict_to_send: Dict[Any, Any], receiver: Optional[socket] = None) None
Send a whole dictionary field by field as labeled data. Dictionary will be unnamed. Synchronous version of ‘TcpIpObject.receive_labeled_data’.
- Parameters
dict_to_send – Dictionary to send.
receiver – TcpIpObject receiver.
- async action_on_change_db(data: Dict[Any, Any], client_id: int, sender: socket, loop: AbstractEventLoop) None [source]
Action to run when receiving the ‘change_database’ command.
- Parameters
data – Dict storing data.
client_id – ID of the TcpIpClient.
loop – Asyncio event loop.
sender – TcpIpObject sender.
- async action_on_compute(data: Dict[Any, Any], client_id: int, sender: socket, loop: AbstractEventLoop) None [source]
Action to run when receiving the ‘compute’ command.
- Parameters
data – Dict storing data.
client_id – ID of the TcpIpClient.
loop – Asyncio event loop.
sender – TcpIpObject sender.
- async action_on_done(data: Dict[Any, Any], client_id: int, sender: socket, loop: AbstractEventLoop) None [source]
Action to run when receiving the ‘done’ command.
- Parameters
data – Dict storing data.
client_id – ID of the TcpIpClient.
loop – Asyncio event loop.
sender – TcpIpObject sender.
- async action_on_exit(data: Dict[Any, Any], client_id: int, sender: socket, loop: AbstractEventLoop) None [source]
Action to run when receiving the ‘exit’ command.
- Parameters
data – Dict storing data.
client_id – ID of the TcpIpClient.
loop – Asyncio event loop.
sender – TcpIpObject sender.
- async action_on_finished(data: Dict[Any, Any], client_id: int, sender: socket, loop: AbstractEventLoop) None [source]
Action to run when receiving the ‘finished’ command.
- Parameters
data – Dict storing data.
client_id – ID of the TcpIpClient.
loop – Asyncio event loop.
sender – TcpIpObject sender.
- async action_on_prediction(data: Dict[Any, Any], client_id: int, sender: socket, loop: AbstractEventLoop) None [source]
Action to run when receiving the ‘prediction’ command.
- Parameters
data – Dict storing data.
client_id – ID of the TcpIpClient.
loop – Asyncio event loop.
sender – TcpIpObject sender.
- async action_on_read(data: Dict[Any, Any], client_id: int, sender: socket, loop: AbstractEventLoop) None [source]
Action to run when receiving the ‘read’ command.
- Parameters
data – Dict storing data.
client_id – ID of the TcpIpClient.
loop – Asyncio event loop.
sender – TcpIpObject sender.
- async action_on_sample(data: Dict[Any, Any], client_id: int, sender: socket, loop: AbstractEventLoop) None [source]
Action to run when receiving the ‘sample’ command.
- Parameters
data – Dict storing data.
client_id – ID of the TcpIpClient.
loop – Asyncio event loop.
sender – TcpIpObject sender.
- async action_on_step(data: Dict[Any, Any], client_id: int, sender: socket, loop: AbstractEventLoop) None [source]
Action to run when receiving the ‘step’ command.
- Parameters
data – Dict storing data.
client_id – ID of the TcpIpClient.
loop – Asyncio event loop.
sender – TcpIpObject sender.
- async action_on_visualisation(data: Dict[Any, Any], client_id: int, sender: socket, loop: AbstractEventLoop) None [source]
Action to run when receiving the ‘visualisation’ command.
- Parameters
data – Dict storing data.
client_id – ID of the TcpIpClient.
loop – Asyncio event loop.
sender – TcpIpObject sender.
- async listen_while_not_done(loop: AbstractEventLoop, sender: socket, data_dict: Dict[Any, Any], client_id: Optional[int] = None) Dict[Any, Any] [source]
Compute actions until ‘done’ command is received.
- Parameters
loop – Asyncio event loop.
sender – TcpIpObject sender.
data_dict – Dictionary to collect data.
client_id – ID of a Client.
- async read_data(loop: AbstractEventLoop, sender: socket, read_size: int) bytes [source]
Read the data on the socket with value of buffer size as relatively small powers of 2.
- Parameters
loop – Asyncio event loop.
sender – Socket sender.
read_size – Amount of data to read on the socket.
- Returns
Bytes field with ‘read_size’ length.
- async receive_data(loop: AbstractEventLoop, sender: socket) Union[None, bytes, str, bool, int, float, List, ndarray] [source]
Receive data from a socket.
- Parameters
loop – Asyncio event loop.
sender – Socket sender.
- Returns
Converted data.
- async receive_dict(recv_to: Dict[Any, Any], loop: Optional[AbstractEventLoop] = None, sender: Optional[socket] = None) None [source]
Receive a whole dictionary field by field as labeled data.
- Parameters
recv_to – Dictionary to fill with received fields.
loop – Asyncio event loop.
sender – TcpIpObject sender.
- async receive_labeled_data(loop: AbstractEventLoop, sender: socket) Tuple[str, Union[None, bytes, str, bool, int, float, List, ndarray]] [source]
Receive data and an associated label.
- Parameters
loop – Asyncio event loop.
sender – TcpIpObject sender.
- Returns
Label, Data.
- async send_command_change_db(loop: Optional[AbstractEventLoop] = None, receiver: Optional[socket] = None) None [source]
Send the ‘change_database’ command.
- Parameters
loop – Asyncio event loop.
receiver – TcpIpObject receiver.
- async send_command_compute(loop: Optional[AbstractEventLoop] = None, receiver: Optional[socket] = None) None [source]
Send the ‘compute’ command.
- Parameters
loop – Asyncio event loop.
receiver – TcpIpObject receiver.
- async send_command_done(loop: Optional[AbstractEventLoop] = None, receiver: Optional[socket] = None) None [source]
Send the ‘done’ command.
- Parameters
loop – Asyncio event loop.
receiver – TcpIpObject receiver.
- async send_command_exit(loop: Optional[AbstractEventLoop] = None, receiver: Optional[socket] = None) None [source]
Send the ‘exit’ command.
- Parameters
loop – Asyncio event loop.
receiver – TcpIpObject receiver.
- async send_command_finished(loop: Optional[AbstractEventLoop] = None, receiver: Optional[socket] = None) None [source]
Send the ‘finished’ command.
- Parameters
loop – Asyncio event loop.
receiver – TcpIpObject receiver.
- async send_command_prediction(loop: Optional[AbstractEventLoop] = None, receiver: Optional[socket] = None) None [source]
Send the ‘prediction’ command.
- Parameters
loop – Asyncio event loop.
receiver – TcpIpObject receiver.
- async send_command_read(loop: Optional[AbstractEventLoop] = None, receiver: Optional[socket] = None) None [source]
Send the ‘read’ command.
- Parameters
loop – Asyncio event loop.
receiver – TcpIpObject receiver.
- async send_command_sample(loop: Optional[AbstractEventLoop] = None, receiver: Optional[socket] = None) None [source]
Send the ‘sample’ command.
- Parameters
loop – Asyncio event loop.
receiver – TcpIpObject receiver.
- async send_command_step(loop: Optional[AbstractEventLoop] = None, receiver: Optional[socket] = None) None [source]
Send the ‘step’ command.
- Parameters
loop – Asyncio event loop.
receiver – TcpIpObject receiver.
- async send_command_visualisation(loop: Optional[AbstractEventLoop] = None, receiver: Optional[socket] = None) None [source]
Send the ‘visualisation’ command.
- Parameters
loop – Asyncio event loop.
receiver – TcpIpObject receiver.
- async send_data(data_to_send: Union[None, bytes, str, bool, int, float, List, ndarray], loop: Optional[AbstractEventLoop] = None, receiver: Optional[socket] = None) None [source]
Send data through the given socket.
- Parameters
data_to_send – Data that will be sent on socket.
loop – Asyncio event loop.
receiver – Socket receiver.
- async send_dict(name: str, dict_to_send: Dict[Any, Any], loop: Optional[AbstractEventLoop] = None, receiver: Optional[socket] = None) None [source]
Send a whole dictionary field by field as labeled data.
- Parameters
name – Name of the dictionary.
dict_to_send – Dictionary to send.
loop – Asyncio event loop.
receiver – TcpIpObject receiver.
- async send_labeled_data(data_to_send: Union[None, bytes, str, bool, int, float, List, ndarray], label: str, loop: Optional[AbstractEventLoop] = None, receiver: Optional[socket] = None, send_read_command: bool = True) None [source]
Send data with an associated label.
- Parameters
data_to_send – Data that will be sent on socket.
label – Associated label.
loop – Asyncio event loop.
receiver – TcpIpObject receiver.
send_read_command – If True, the command ‘read’ is sent before sending data.
- sync_read_data(read_size: int) bytes [source]
Read the data on the socket with value of buffer size as relatively small powers of 2. Synchronous version of ‘TcpIpObject.read_data’.
- Parameters
read_size – Amount of data to read on the socket.
- Returns
Bytes field with ‘read_size’ length.
- sync_receive_data() Union[None, bytes, str, bool, int, float, List, ndarray] [source]
Receive data from a socket. Synchronous version of ‘TcpIpObject.receive_data’.
- Returns
Converted data.
- sync_receive_dict(recv_to: Dict[Any, Any], sender: Optional[socket] = None) None [source]
Receive a whole dictionary field by field as labeled data. Synchronous version of ‘TcpIpObject.receive_labeled_data’.
- Parameters
recv_to – Dictionary to fill with received fields.
sender – TcpIpObject sender.
- sync_receive_labeled_data() Tuple[str, Union[None, bytes, str, bool, int, float, List, ndarray]] [source]
Receive data and an associated label. Synchronous version of ‘TcpIpObject.receive_labeled_data’.
- Returns
Label, Data.
- sync_send_command_change_db(receiver: Optional[socket] = None) None [source]
Send the ‘change_database’ command. Synchronous version of ‘TcpIpObject.send_command_change_database’.
- Parameters
receiver – TcpIpObject receiver.
- sync_send_command_compute(receiver: Optional[socket] = None) None [source]
Send the ‘compute’ command. Synchronous version of ‘TcpIpObject.send_command_compute’.
- Parameters
receiver – TcpIpObject receiver.
- sync_send_command_done(receiver: Optional[socket] = None) None [source]
Send the ‘done’ command. Synchronous version of ‘TcpIpObject.send_command_done’.
- Parameters
receiver – TcpIpObject receiver.
- sync_send_command_exit(receiver: Optional[socket] = None) None [source]
Send the ‘exit’ command. Synchronous version of ‘TcpIpObject.send_command_exit’.
- Parameters
receiver – TcpIpObject receiver.
- sync_send_command_finished(receiver: Optional[socket] = None) None [source]
Send the ‘finished’ command. Synchronous version of ‘TcpIpObject.send_command_finished’.
- Parameters
receiver – TcpIpObject receiver.
- sync_send_command_prediction(receiver: Optional[socket] = None) None [source]
Send the ‘prediction’ command. Synchronous version of ‘TcpIpObject.send_command_prediction’.
- Parameters
receiver – TcpIpObject receiver.
- sync_send_command_read(receiver: Optional[socket] = None) None [source]
Send the ‘read’ command. Synchronous version of ‘TcpIpObject.send_command_read’.
- Parameters
receiver – TcpIpObject receiver.
- sync_send_command_sample(receiver: Optional[socket] = None) None [source]
Send the ‘sample’ command. Synchronous version of ‘TcpIpObject.send_command_sample’.
- Parameters
receiver – TcpIpObject receiver.
- sync_send_command_step(receiver: Optional[socket] = None) None [source]
Send the ‘step’ command. Synchronous version of ‘TcpIpObject.send_command_step’.
- Parameters
receiver – TcpIpObject receiver.
- sync_send_command_visualisation(receiver: Optional[socket] = None) None [source]
Send the ‘visualisation’ command. Synchronous version of ‘TcpIpObject.send_command_visualisation’.
- Parameters
receiver – TcpIpObject receiver.
- sync_send_data(data_to_send: Union[None, bytes, str, bool, int, float, List, ndarray], receiver: Optional[socket] = None) None [source]
Send data through the given socket. Synchronous version of ‘TcpIpObject.send_data’.
- Parameters
data_to_send – Data that will be sent on socket.
receiver – Socket receiver.
- sync_send_dict(name: str, dict_to_send: Dict[Any, Any], receiver: Optional[socket] = None) None [source]
Send a whole dictionary field by field as labeled data. Synchronous version of ‘TcpIpObject.receive_labeled_data’.
- Parameters
name – Name of the dictionary.
dict_to_send – Dictionary to send.
receiver – TcpIpObject receiver.
- sync_send_labeled_data(data_to_send: Union[None, bytes, str, bool, int, float, List, ndarray], label: str, receiver: Optional[socket] = None, send_read_command: bool = True) None [source]
Send data with an associated label. Synchronous version of ‘TcpIpObject.send_labeled_data’.
- Parameters
data_to_send – Data that will be sent on socket.
label – Associated label.
receiver – TcpIpObject receiver.
send_read_command – If True, the command ‘read’ is sent before sending data.
TcpIpServer
Base:
TcpIpObject.TcpIpObject
- class TcpIpServer.TcpIpServer(ip_address: str = 'localhost', port: int = 10000, nb_client: int = 5, max_client_count: int = 10, batch_size: int = 5, manager: Optional[Any] = None)[source]
-
- async __communicate(client: Optional[socket] = None, client_id: Optional[int] = None, animate: bool = True) None
Communication protocol with a client.
- Parameters
client – TcpIpObject client to communicate with.
client_id – Index of the client.
animate – If True, triggers an environment step.
- async __initialize(env_kwargs: Dict[str, Any], visualization_db: Optional[Tuple[str, str]] = None) None
Send parameters to the clients to create their environments.
- Parameters
env_kwargs – Additional arguments to pass to the Environment.
visualization_db – Path to the visualization Database to connect to.
- async __request_data_to_clients(animate: bool = True) None
Trigger a communication protocol for each client. Wait for all clients before to launch another communication protocol while the batch is not full.
- Parameters
animate – If True, triggers an environment step
- async __shutdown(client: socket, idx: int) None
Send exit command to all clients.
- Parameters
client – TcpIpObject client.
idx – Client index.
- async action_on_prediction(data: Dict[Any, Any], client_id: int, sender: socket, loop: AbstractEventLoop) None [source]
Action to run when receiving the ‘prediction’ command.
- Parameters
data – Dict storing data.
client_id – ID of the TcpIpClient.
loop – asyncio.get_event_loop() return.
sender – TcpIpObject sender.
- async action_on_visualisation(data: Dict[Any, Any], client_id: int, sender: socket, loop: AbstractEventLoop) None [source]
Action to run when receiving the ‘visualisation’ command.
- Parameters
data – Dict storing data.
client_id – ID of the TcpIpClient.
loop – asyncio.get_event_loop() return.
sender – TcpIpObject sender.
- get_batch(animate: bool = True) List[List[int]] [source]
Build a batch from clients samples.
- Parameters
animate – If True, triggers an environment step.
- initialize(env_kwargs: Dict[str, Any], visualization_db: Optional[Tuple[str, str]] = None) None [source]
Send parameters to the clients to create their environments.
- Parameters
env_kwargs – Additional arguments to pass to the Environment.
visualization_db – Path to the visualization Database to connect to.