AsyncSocket

BytesConverter

class BytesConverter.BytesConverter[source]
bytes_to_data(bytes_fields: List[bytes]) Union[None, bytes, str, bool, int, float, List, ndarray][source]

Recover data from bytes fields. Available types: None, bytes, str, bool, signed int, float, list, ndarray.

Parameters

bytes_fields – Bytes fields (Type, Data, Args).

Returns

Converted data.

data_to_bytes(data: Union[None, bytes, str, bool, int, float, List, ndarray], as_list: bool = False) Union[bytes, List[bytes]][source]

Convert data to bytes. Available types: None, bytes, str, bool, signed int, float, list, ndarray.

Parameters
  • data – Data to convert.

  • as_list – (For tests only, False by default) If False, the whole bytes message is returned. If True, the return will be a list of bytes fields.

Returns

Concatenated bytes fields (Number of fields, Size of fields, Type, Data, Args).

TcpIpClient

Bases: TcpIpObject.TcpIpObject

class TcpIpClient.TcpIpClient(environment: Type[AbstractEnvironment], ip_address: str = 'localhost', port: int = 10000, instance_id: int = 0, instance_nb: int = 1)[source]
async __close() None

Close the environment and shutdown the client.

async __communicate(server: socket) None

Communication protocol with a server. First receive a command from the client, then process the appropriate actions.

Parameters

server – TcpIpServer to communicate with.

async __initialize() None

Receive parameters from the server to create environment.

async __launch() None

Trigger the main communication protocol with the server.

async action_on_change_db(data: Dict[Any, Any], client_id: int, sender: socket, loop: AbstractEventLoop) None[source]

Action to run when receiving the ‘step’ command.

Parameters
  • data – Dict storing data.

  • client_id – ID of the TcpIpClient.

  • loop – Asyncio event loop.

  • sender – TcpIpObject sender.

async action_on_exit(data: ndarray, client_id: int, sender: socket, loop: AbstractEventLoop) None[source]

Action to run when receiving the ‘exit’ command.

Parameters
  • data – Dict storing data.

  • client_id – ID of the TcpIpClient.

  • loop – Asyncio event loop.

  • sender – TcpIpObject sender.

async action_on_prediction(data: ndarray, client_id: int, sender: socket, loop: AbstractEventLoop) None[source]

Action to run when receiving the ‘prediction’ command.

Parameters
  • data – Dict storing data.

  • client_id – ID of the TcpIpClient.

  • loop – Asyncio event loop.

  • sender – TcpIpObject sender.

async action_on_sample(data: ndarray, client_id: int, sender: socket, loop: AbstractEventLoop) None[source]

Action to run when receiving the ‘sample’ command.

Parameters
  • data – Dict storing data.

  • client_id – ID of the TcpIpClient.

  • loop – Asyncio event loop.

  • sender – TcpIpObject sender.

async action_on_step(data: ndarray, client_id: int, sender: socket, loop: AbstractEventLoop) None[source]

Action to run when receiving the ‘step’ command.

Parameters
  • data – Dict storing data.

  • client_id – ID of the TcpIpClient.

  • loop – Asyncio event loop.

  • sender – TcpIpObject sender.

get_prediction(**kwargs) Dict[str, ndarray][source]

Request a prediction from Network.

Returns

Prediction of the Network.

initialize() None[source]

Receive parameters from the server to create environment.

launch() None[source]

Trigger the main communication protocol with the server.

request_update_visualization() None[source]

Triggers the Visualizer update.

TcpIpObject

class TcpIpObject.TcpIpObject(ip_address: str = 'localhost', port: int = 10000)[source]
async __send_command(loop: AbstractEventLoop, receiver: socket, command: str = '') None

Send a bytes command among the available commands. Do not use this one. Use the dedicated function ‘send_command_{cmd}(…)’.

Parameters
  • loop – Asyncio event loop.

  • receiver – TcpIpObject receiver.

  • command – Name of the command, must be in ‘self.command_dict’.

async __send_unnamed_dict(dict_to_send: Dict[Any, Any], loop: Optional[AbstractEventLoop] = None, receiver: Optional[socket] = None) None

Send a whole dictionary field by field as labeled data. Dictionary will be unnamed.

Parameters
  • dict_to_send – Dictionary to send

  • loop – Asyncio event loop.

  • receiver – TcpIpObject receiver.

__sync_send_command(receiver: socket, command: str = '') None

Send a bytes command among the available commands. Do not use this one. Use the dedicated function ‘sync_send_command_{cmd}(…)’. Synchronous version of ‘TcpIpObject.send_command’.

Parameters
  • command – Name of the command to send.

  • receiver – TcpIpObject receiver.

__sync_send_unnamed_dict(dict_to_send: Dict[Any, Any], receiver: Optional[socket] = None) None

Send a whole dictionary field by field as labeled data. Dictionary will be unnamed. Synchronous version of ‘TcpIpObject.receive_labeled_data’.

Parameters
  • dict_to_send – Dictionary to send.

  • receiver – TcpIpObject receiver.

async action_on_change_db(data: Dict[Any, Any], client_id: int, sender: socket, loop: AbstractEventLoop) None[source]

Action to run when receiving the ‘change_database’ command.

Parameters
  • data – Dict storing data.

  • client_id – ID of the TcpIpClient.

  • loop – Asyncio event loop.

  • sender – TcpIpObject sender.

async action_on_compute(data: Dict[Any, Any], client_id: int, sender: socket, loop: AbstractEventLoop) None[source]

Action to run when receiving the ‘compute’ command.

Parameters
  • data – Dict storing data.

  • client_id – ID of the TcpIpClient.

  • loop – Asyncio event loop.

  • sender – TcpIpObject sender.

async action_on_done(data: Dict[Any, Any], client_id: int, sender: socket, loop: AbstractEventLoop) None[source]

Action to run when receiving the ‘done’ command.

Parameters
  • data – Dict storing data.

  • client_id – ID of the TcpIpClient.

  • loop – Asyncio event loop.

  • sender – TcpIpObject sender.

async action_on_exit(data: Dict[Any, Any], client_id: int, sender: socket, loop: AbstractEventLoop) None[source]

Action to run when receiving the ‘exit’ command.

Parameters
  • data – Dict storing data.

  • client_id – ID of the TcpIpClient.

  • loop – Asyncio event loop.

  • sender – TcpIpObject sender.

async action_on_finished(data: Dict[Any, Any], client_id: int, sender: socket, loop: AbstractEventLoop) None[source]

Action to run when receiving the ‘finished’ command.

Parameters
  • data – Dict storing data.

  • client_id – ID of the TcpIpClient.

  • loop – Asyncio event loop.

  • sender – TcpIpObject sender.

async action_on_prediction(data: Dict[Any, Any], client_id: int, sender: socket, loop: AbstractEventLoop) None[source]

Action to run when receiving the ‘prediction’ command.

Parameters
  • data – Dict storing data.

  • client_id – ID of the TcpIpClient.

  • loop – Asyncio event loop.

  • sender – TcpIpObject sender.

async action_on_read(data: Dict[Any, Any], client_id: int, sender: socket, loop: AbstractEventLoop) None[source]

Action to run when receiving the ‘read’ command.

Parameters
  • data – Dict storing data.

  • client_id – ID of the TcpIpClient.

  • loop – Asyncio event loop.

  • sender – TcpIpObject sender.

async action_on_sample(data: Dict[Any, Any], client_id: int, sender: socket, loop: AbstractEventLoop) None[source]

Action to run when receiving the ‘sample’ command.

Parameters
  • data – Dict storing data.

  • client_id – ID of the TcpIpClient.

  • loop – Asyncio event loop.

  • sender – TcpIpObject sender.

async action_on_step(data: Dict[Any, Any], client_id: int, sender: socket, loop: AbstractEventLoop) None[source]

Action to run when receiving the ‘step’ command.

Parameters
  • data – Dict storing data.

  • client_id – ID of the TcpIpClient.

  • loop – Asyncio event loop.

  • sender – TcpIpObject sender.

async action_on_visualisation(data: Dict[Any, Any], client_id: int, sender: socket, loop: AbstractEventLoop) None[source]

Action to run when receiving the ‘visualisation’ command.

Parameters
  • data – Dict storing data.

  • client_id – ID of the TcpIpClient.

  • loop – Asyncio event loop.

  • sender – TcpIpObject sender.

async listen_while_not_done(loop: AbstractEventLoop, sender: socket, data_dict: Dict[Any, Any], client_id: Optional[int] = None) Dict[Any, Any][source]

Compute actions until ‘done’ command is received.

Parameters
  • loop – Asyncio event loop.

  • sender – TcpIpObject sender.

  • data_dict – Dictionary to collect data.

  • client_id – ID of a Client.

async read_data(loop: AbstractEventLoop, sender: socket, read_size: int) bytes[source]

Read the data on the socket with value of buffer size as relatively small powers of 2.

Parameters
  • loop – Asyncio event loop.

  • sender – Socket sender.

  • read_size – Amount of data to read on the socket.

Returns

Bytes field with ‘read_size’ length.

async receive_data(loop: AbstractEventLoop, sender: socket) Union[None, bytes, str, bool, int, float, List, ndarray][source]

Receive data from a socket.

Parameters
  • loop – Asyncio event loop.

  • sender – Socket sender.

Returns

Converted data.

async receive_dict(recv_to: Dict[Any, Any], loop: Optional[AbstractEventLoop] = None, sender: Optional[socket] = None) None[source]

Receive a whole dictionary field by field as labeled data.

Parameters
  • recv_to – Dictionary to fill with received fields.

  • loop – Asyncio event loop.

  • sender – TcpIpObject sender.

async receive_labeled_data(loop: AbstractEventLoop, sender: socket) Tuple[str, Union[None, bytes, str, bool, int, float, List, ndarray]][source]

Receive data and an associated label.

Parameters
  • loop – Asyncio event loop.

  • sender – TcpIpObject sender.

Returns

Label, Data.

async send_command_change_db(loop: Optional[AbstractEventLoop] = None, receiver: Optional[socket] = None) None[source]

Send the ‘change_database’ command.

Parameters
  • loop – Asyncio event loop.

  • receiver – TcpIpObject receiver.

async send_command_compute(loop: Optional[AbstractEventLoop] = None, receiver: Optional[socket] = None) None[source]

Send the ‘compute’ command.

Parameters
  • loop – Asyncio event loop.

  • receiver – TcpIpObject receiver.

async send_command_done(loop: Optional[AbstractEventLoop] = None, receiver: Optional[socket] = None) None[source]

Send the ‘done’ command.

Parameters
  • loop – Asyncio event loop.

  • receiver – TcpIpObject receiver.

async send_command_exit(loop: Optional[AbstractEventLoop] = None, receiver: Optional[socket] = None) None[source]

Send the ‘exit’ command.

Parameters
  • loop – Asyncio event loop.

  • receiver – TcpIpObject receiver.

async send_command_finished(loop: Optional[AbstractEventLoop] = None, receiver: Optional[socket] = None) None[source]

Send the ‘finished’ command.

Parameters
  • loop – Asyncio event loop.

  • receiver – TcpIpObject receiver.

async send_command_prediction(loop: Optional[AbstractEventLoop] = None, receiver: Optional[socket] = None) None[source]

Send the ‘prediction’ command.

Parameters
  • loop – Asyncio event loop.

  • receiver – TcpIpObject receiver.

async send_command_read(loop: Optional[AbstractEventLoop] = None, receiver: Optional[socket] = None) None[source]

Send the ‘read’ command.

Parameters
  • loop – Asyncio event loop.

  • receiver – TcpIpObject receiver.

async send_command_sample(loop: Optional[AbstractEventLoop] = None, receiver: Optional[socket] = None) None[source]

Send the ‘sample’ command.

Parameters
  • loop – Asyncio event loop.

  • receiver – TcpIpObject receiver.

async send_command_step(loop: Optional[AbstractEventLoop] = None, receiver: Optional[socket] = None) None[source]

Send the ‘step’ command.

Parameters
  • loop – Asyncio event loop.

  • receiver – TcpIpObject receiver.

async send_command_visualisation(loop: Optional[AbstractEventLoop] = None, receiver: Optional[socket] = None) None[source]

Send the ‘visualisation’ command.

Parameters
  • loop – Asyncio event loop.

  • receiver – TcpIpObject receiver.

async send_data(data_to_send: Union[None, bytes, str, bool, int, float, List, ndarray], loop: Optional[AbstractEventLoop] = None, receiver: Optional[socket] = None) None[source]

Send data through the given socket.

Parameters
  • data_to_send – Data that will be sent on socket.

  • loop – Asyncio event loop.

  • receiver – Socket receiver.

async send_dict(name: str, dict_to_send: Dict[Any, Any], loop: Optional[AbstractEventLoop] = None, receiver: Optional[socket] = None) None[source]

Send a whole dictionary field by field as labeled data.

Parameters
  • name – Name of the dictionary.

  • dict_to_send – Dictionary to send.

  • loop – Asyncio event loop.

  • receiver – TcpIpObject receiver.

async send_labeled_data(data_to_send: Union[None, bytes, str, bool, int, float, List, ndarray], label: str, loop: Optional[AbstractEventLoop] = None, receiver: Optional[socket] = None, send_read_command: bool = True) None[source]

Send data with an associated label.

Parameters
  • data_to_send – Data that will be sent on socket.

  • label – Associated label.

  • loop – Asyncio event loop.

  • receiver – TcpIpObject receiver.

  • send_read_command – If True, the command ‘read’ is sent before sending data.

sync_read_data(read_size: int) bytes[source]

Read the data on the socket with value of buffer size as relatively small powers of 2. Synchronous version of ‘TcpIpObject.read_data’.

Parameters

read_size – Amount of data to read on the socket.

Returns

Bytes field with ‘read_size’ length.

sync_receive_data() Union[None, bytes, str, bool, int, float, List, ndarray][source]

Receive data from a socket. Synchronous version of ‘TcpIpObject.receive_data’.

Returns

Converted data.

sync_receive_dict(recv_to: Dict[Any, Any], sender: Optional[socket] = None) None[source]

Receive a whole dictionary field by field as labeled data. Synchronous version of ‘TcpIpObject.receive_labeled_data’.

Parameters
  • recv_to – Dictionary to fill with received fields.

  • sender – TcpIpObject sender.

sync_receive_labeled_data() Tuple[str, Union[None, bytes, str, bool, int, float, List, ndarray]][source]

Receive data and an associated label. Synchronous version of ‘TcpIpObject.receive_labeled_data’.

Returns

Label, Data.

sync_send_command_change_db(receiver: Optional[socket] = None) None[source]

Send the ‘change_database’ command. Synchronous version of ‘TcpIpObject.send_command_change_database’.

Parameters

receiver – TcpIpObject receiver.

sync_send_command_compute(receiver: Optional[socket] = None) None[source]

Send the ‘compute’ command. Synchronous version of ‘TcpIpObject.send_command_compute’.

Parameters

receiver – TcpIpObject receiver.

sync_send_command_done(receiver: Optional[socket] = None) None[source]

Send the ‘done’ command. Synchronous version of ‘TcpIpObject.send_command_done’.

Parameters

receiver – TcpIpObject receiver.

sync_send_command_exit(receiver: Optional[socket] = None) None[source]

Send the ‘exit’ command. Synchronous version of ‘TcpIpObject.send_command_exit’.

Parameters

receiver – TcpIpObject receiver.

sync_send_command_finished(receiver: Optional[socket] = None) None[source]

Send the ‘finished’ command. Synchronous version of ‘TcpIpObject.send_command_finished’.

Parameters

receiver – TcpIpObject receiver.

sync_send_command_prediction(receiver: Optional[socket] = None) None[source]

Send the ‘prediction’ command. Synchronous version of ‘TcpIpObject.send_command_prediction’.

Parameters

receiver – TcpIpObject receiver.

sync_send_command_read(receiver: Optional[socket] = None) None[source]

Send the ‘read’ command. Synchronous version of ‘TcpIpObject.send_command_read’.

Parameters

receiver – TcpIpObject receiver.

sync_send_command_sample(receiver: Optional[socket] = None) None[source]

Send the ‘sample’ command. Synchronous version of ‘TcpIpObject.send_command_sample’.

Parameters

receiver – TcpIpObject receiver.

sync_send_command_step(receiver: Optional[socket] = None) None[source]

Send the ‘step’ command. Synchronous version of ‘TcpIpObject.send_command_step’.

Parameters

receiver – TcpIpObject receiver.

sync_send_command_visualisation(receiver: Optional[socket] = None) None[source]

Send the ‘visualisation’ command. Synchronous version of ‘TcpIpObject.send_command_visualisation’.

Parameters

receiver – TcpIpObject receiver.

sync_send_data(data_to_send: Union[None, bytes, str, bool, int, float, List, ndarray], receiver: Optional[socket] = None) None[source]

Send data through the given socket. Synchronous version of ‘TcpIpObject.send_data’.

Parameters
  • data_to_send – Data that will be sent on socket.

  • receiver – Socket receiver.

sync_send_dict(name: str, dict_to_send: Dict[Any, Any], receiver: Optional[socket] = None) None[source]

Send a whole dictionary field by field as labeled data. Synchronous version of ‘TcpIpObject.receive_labeled_data’.

Parameters
  • name – Name of the dictionary.

  • dict_to_send – Dictionary to send.

  • receiver – TcpIpObject receiver.

sync_send_labeled_data(data_to_send: Union[None, bytes, str, bool, int, float, List, ndarray], label: str, receiver: Optional[socket] = None, send_read_command: bool = True) None[source]

Send data with an associated label. Synchronous version of ‘TcpIpObject.send_labeled_data’.

Parameters
  • data_to_send – Data that will be sent on socket.

  • label – Associated label.

  • receiver – TcpIpObject receiver.

  • send_read_command – If True, the command ‘read’ is sent before sending data.

TcpIpServer

Base: TcpIpObject.TcpIpObject

class TcpIpServer.TcpIpServer(ip_address: str = 'localhost', port: int = 10000, nb_client: int = 5, max_client_count: int = 10, batch_size: int = 5, manager: Optional[Any] = None)[source]
async __close() None

Run server shutdown protocol.

async __communicate(client: Optional[socket] = None, client_id: Optional[int] = None, animate: bool = True) None

Communication protocol with a client.

Parameters
  • client – TcpIpObject client to communicate with.

  • client_id – Index of the client.

  • animate – If True, triggers an environment step.

async __connect() None

Accept connections from clients.

__database_handler_partitions() None

Partition update event of the DatabaseHandler.

async __initialize(env_kwargs: Dict[str, Any], visualization_db: Optional[Tuple[str, str]] = None) None

Send parameters to the clients to create their environments.

Parameters
  • env_kwargs – Additional arguments to pass to the Environment.

  • visualization_db – Path to the visualization Database to connect to.

async __request_data_to_clients(animate: bool = True) None

Trigger a communication protocol for each client. Wait for all clients before to launch another communication protocol while the batch is not full.

Parameters

animate – If True, triggers an environment step

async __shutdown(client: socket, idx: int) None

Send exit command to all clients.

Parameters
  • client – TcpIpObject client.

  • idx – Client index.

async action_on_prediction(data: Dict[Any, Any], client_id: int, sender: socket, loop: AbstractEventLoop) None[source]

Action to run when receiving the ‘prediction’ command.

Parameters
  • data – Dict storing data.

  • client_id – ID of the TcpIpClient.

  • loop – asyncio.get_event_loop() return.

  • sender – TcpIpObject sender.

async action_on_visualisation(data: Dict[Any, Any], client_id: int, sender: socket, loop: AbstractEventLoop) None[source]

Action to run when receiving the ‘visualisation’ command.

Parameters
  • data – Dict storing data.

  • client_id – ID of the TcpIpClient.

  • loop – asyncio.get_event_loop() return.

  • sender – TcpIpObject sender.

close() None[source]

Run __close method with asyncio.

connect() None[source]

Accept connections from clients.

get_batch(animate: bool = True) List[List[int]][source]

Build a batch from clients samples.

Parameters

animate – If True, triggers an environment step.

get_database_handler() DatabaseHandler[source]

Get the DatabaseHandler of the TcpIpServer.

initialize(env_kwargs: Dict[str, Any], visualization_db: Optional[Tuple[str, str]] = None) None[source]

Send parameters to the clients to create their environments.

Parameters
  • env_kwargs – Additional arguments to pass to the Environment.

  • visualization_db – Path to the visualization Database to connect to.

set_dataset_batch(data_lines: List[int]) None[source]

Receive a batch of data from the Dataset. Samples will be dispatched between clients.

Parameters

data_lines – Batch of indices of samples.