from typing import Dict, Any, List, Union, Tuple, Optional
from socket import socket, AF_INET, SOCK_STREAM, SOL_SOCKET, SO_REUSEADDR
from asyncio import get_event_loop
from asyncio import AbstractEventLoop as EventLoop
from numpy import ndarray
from DeepPhysX.Core.AsyncSocket.BytesConverter import BytesConverter
import sys
if sys.version_info[0] == 3 and sys.version_info[1] >= 8 and sys.platform.startswith('win'):
import asyncio
asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
Convertible = Union[type(None), bytes, str, bool, int, float, List, ndarray]
[docs]class TcpIpObject:
def __init__(self,
ip_address: str = 'localhost',
port: int = 10000):
"""
TcpIpObject defines communication protocols to send and receive data and commands.
:param ip_address: IP address of the TcpIpObject.
:param port: Port number of the TcpIpObject.
"""
self.name: str = self.__class__.__name__
# Define socket
self.sock: socket = socket(AF_INET, SOCK_STREAM)
self.sock.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
# Register IP and PORT
self.ip_address: str = ip_address
self.port: int = port
# Create data converter
self.data_converter: BytesConverter = BytesConverter()
# Available commands
self.command_dict: Dict[str, bytes] = {'exit': b'exit', 'step': b'step', 'done': b'done', 'finished': b'fini',
'prediction': b'pred', 'read': b'read', 'sample': b'samp',
'visualisation': b'visu', 'db': b'chdb'}
self.action_on_command: Dict[bytes, Any] = {
self.command_dict["exit"]: self.action_on_exit,
self.command_dict["step"]: self.action_on_step,
self.command_dict["done"]: self.action_on_done,
self.command_dict["finished"]: self.action_on_finished,
self.command_dict["prediction"]: self.action_on_prediction,
self.command_dict["read"]: self.action_on_read,
self.command_dict["sample"]: self.action_on_sample,
self.command_dict["visualisation"]: self.action_on_visualisation,
self.command_dict['db']: self.action_on_change_db
}
##########################################################################################
##########################################################################################
# LOW level of send & receive data on network #
##########################################################################################
##########################################################################################
[docs] async def send_data(self,
data_to_send: Convertible,
loop: Optional[EventLoop] = None,
receiver: Optional[socket] = None) -> None:
"""
Send data through the given socket.
:param data_to_send: Data that will be sent on socket.
:param loop: Asyncio event loop.
:param receiver: Socket receiver.
"""
loop = get_event_loop() if loop is None else loop
receiver = self.sock if receiver is None else receiver
# Cast data to bytes fields
data_as_bytes = self.data_converter.data_to_bytes(data_to_send)
# Send the whole message
if await loop.sock_sendall(sock=receiver, data=data_as_bytes) is not None:
ValueError(f"[{self.name}] Could not send all of the data for an unknown reason")
[docs] def sync_send_data(self,
data_to_send: Convertible,
receiver: Optional[socket] = None) -> None:
"""
Send data through the given socket.
Synchronous version of 'TcpIpObject.send_data'.
:param data_to_send: Data that will be sent on socket.
:param receiver: Socket receiver.
"""
receiver = self.sock if receiver is None else receiver
# Cast data to bytes fields
data_as_bytes = self.data_converter.data_to_bytes(data_to_send)
# Send the whole message
receiver.sendall(data_as_bytes)
[docs] async def receive_data(self,
loop: EventLoop,
sender: socket) -> Convertible:
"""
Receive data from a socket.
:param loop: Asyncio event loop.
:param sender: Socket sender.
:return: Converted data.
"""
# Receive the number of fields to receive
nb_bytes_fields_b = await loop.sock_recv(sender, self.data_converter.int_size)
nb_bytes_fields = self.data_converter.size_from_bytes(nb_bytes_fields_b)
# Receive the sizes in bytes of all the relevant fields
sizes_b = [await loop.sock_recv(sender, self.data_converter.int_size) for _ in range(nb_bytes_fields)]
sizes = [self.data_converter.size_from_bytes(size_b) for size_b in sizes_b]
# Receive each byte field
bytes_fields = [await self.read_data(loop, sender, size) for size in sizes]
# Return the data in the expected format
return self.data_converter.bytes_to_data(bytes_fields)
[docs] def sync_receive_data(self) -> Convertible:
"""
Receive data from a socket.
Synchronous version of 'TcpIpObject.receive_data'.
:return: Converted data.
"""
self.sock.setblocking(True)
# Receive the number of fields to receive
nb_bytes_fields_b = self.sock.recv(self.data_converter.int_size)
nb_bytes_fields = self.data_converter.size_from_bytes(nb_bytes_fields_b)
# Receive the sizes in bytes of all the relevant fields
sizes_b = [self.sock.recv(self.data_converter.int_size) for _ in range(nb_bytes_fields)]
sizes = [self.data_converter.size_from_bytes(size_b) for size_b in sizes_b]
# Receive each byte field
bytes_fields = [self.sync_read_data(size) for size in sizes]
# Return the data in the expected format
return self.data_converter.bytes_to_data(bytes_fields)
[docs] async def read_data(self,
loop: EventLoop,
sender: socket,
read_size: int) -> bytes:
"""
Read the data on the socket with value of buffer size as relatively small powers of 2.
:param loop: Asyncio event loop.
:param sender: Socket sender.
:param read_size: Amount of data to read on the socket.
:return: Bytes field with 'read_size' length.
"""
# Maximum read size
read_sizes = [8192, 4096]
bytes_field = b''
while read_size > 0:
# Select the good amount of bytes to read
read_size_idx = 0
while read_size_idx < len(read_sizes) and read_size < read_sizes[read_size_idx]:
read_size_idx += 1
# If the amount of bytes to read is too small then read it all
chunk_size_to_read = read_size if read_size_idx >= len(read_sizes) else read_sizes[read_size_idx]
# Try to read at most chunk_size_to_read bytes from the socket
data_received_as_bytes = await loop.sock_recv(sender, chunk_size_to_read)
# Todo: add security with <<await asyncio.wait_for(loop.sock_recv(sender, chunk_size_to_read), timeout=1.)>>
# Accumulate the data
bytes_field += data_received_as_bytes
read_size -= len(data_received_as_bytes)
return bytes_field
[docs] def sync_read_data(self,
read_size: int) -> bytes:
"""
Read the data on the socket with value of buffer size as relatively small powers of 2.
Synchronous version of 'TcpIpObject.read_data'.
:param read_size: Amount of data to read on the socket.
:return: Bytes field with 'read_size' length.
"""
# Maximum read sizes array
read_sizes = [8192, 4096]
bytes_field = b''
while read_size > 0:
# Select the good amount of bytes to read
read_size_idx = 0
while read_size_idx < len(read_sizes) and read_size < read_sizes[read_size_idx]:
read_size_idx += 1
# If the amount of bytes to read is too small then read it all
chunk_size_to_read = read_size if read_size_idx >= len(read_sizes) else read_sizes[read_size_idx]
# Try to read at most chunk_size_to_read bytes from the socket
data_received_as_bytes = self.sock.recv(chunk_size_to_read)
# Accumulate the data
bytes_field += data_received_as_bytes
read_size -= len(data_received_as_bytes)
return bytes_field
##########################################################################################
##########################################################################################
# Send & receive abstract named data #
##########################################################################################
##########################################################################################
[docs] async def send_labeled_data(self,
data_to_send: Convertible,
label: str,
loop: Optional[EventLoop] = None,
receiver: Optional[socket] = None,
send_read_command: bool = True) -> None:
"""
Send data with an associated label.
:param data_to_send: Data that will be sent on socket.
:param label: Associated label.
:param loop: Asyncio event loop.
:param receiver: TcpIpObject receiver.
:param send_read_command: If True, the command 'read' is sent before sending data.
"""
loop = get_event_loop() if loop is None else loop
receiver = self.sock if receiver is None else receiver
# Send a 'read' command before data if specified
if send_read_command:
await self.send_command_read(loop=loop, receiver=receiver)
# Send label
await self.send_data(data_to_send=label, loop=loop, receiver=receiver)
# Send data
await self.send_data(data_to_send=data_to_send, loop=loop, receiver=receiver)
[docs] def sync_send_labeled_data(self,
data_to_send: Convertible,
label: str,
receiver: Optional[socket] = None,
send_read_command: bool = True) -> None:
"""
Send data with an associated label.
Synchronous version of 'TcpIpObject.send_labeled_data'.
:param data_to_send: Data that will be sent on socket.
:param label: Associated label.
:param receiver: TcpIpObject receiver.
:param send_read_command: If True, the command 'read' is sent before sending data.
"""
receiver = self.sock if receiver is None else receiver
# Send a 'read' command before data if specified
if send_read_command:
self.sync_send_command_read(receiver=receiver)
# Send label
self.sync_send_data(data_to_send=label, receiver=receiver)
# Send data
self.sync_send_data(data_to_send=data_to_send, receiver=receiver)
[docs] async def receive_labeled_data(self,
loop: EventLoop,
sender: socket) -> Tuple[str, Convertible]:
"""
Receive data and an associated label.
:param loop: Asyncio event loop.
:param sender: TcpIpObject sender.
:return: Label, Data.
"""
# Listen to sender
recv = await self.receive_data(loop=loop, sender=sender)
# 'recv' can be either a 'read' command either the label
if recv in self.command_dict.values():
label = await self.receive_data(loop=loop, sender=sender)
else:
label = recv
# Receive data
data = await self.receive_data(loop=loop, sender=sender)
return label, data
[docs] def sync_receive_labeled_data(self) -> Tuple[str, Convertible]:
"""
Receive data and an associated label.
Synchronous version of 'TcpIpObject.receive_labeled_data'.
:return: Label, Data.
"""
# Listen to sender
recv = self.sync_receive_data()
# 'recv' can be either a 'read' command either the label
if recv in self.command_dict.values():
label = self.sync_receive_data()
else:
label = recv
# Receive data
data = self.sync_receive_data()
return label, data
[docs] async def send_dict(self,
name: str,
dict_to_send: Dict[Any, Any],
loop: Optional[EventLoop] = None,
receiver: Optional[socket] = None) -> None:
"""
Send a whole dictionary field by field as labeled data.
:param name: Name of the dictionary.
:param dict_to_send: Dictionary to send.
:param loop: Asyncio event loop.
:param receiver: TcpIpObject receiver.
"""
loop = get_event_loop() if loop is None else loop
receiver = self.sock if receiver is None else receiver
# If dict is empty, the sending is finished
if dict_to_send is None or dict_to_send == {}:
await self.send_command_finished(loop=loop, receiver=receiver)
return
# Sends to make the listener start the receive_dict routine
await self.send_command_read(loop=loop, receiver=receiver)
await self.send_labeled_data(data_to_send=name, label="::dict::", loop=loop, receiver=receiver)
# Treat the dictionary field by field
for key in dict_to_send:
# If data is another dict, send as an unnamed dictionary
if type(dict_to_send[key]) == dict:
# Send key
await self.send_labeled_data(data_to_send=key, label="dict_id", loop=loop, receiver=receiver)
# Send data
await self.__send_unnamed_dict(dict_to_send=dict_to_send[key], loop=loop, receiver=receiver)
# If data is not a dict, send as labeled data
else:
# Send key and data
await self.send_labeled_data(data_to_send=dict_to_send[key], label=key, loop=loop, receiver=receiver)
# The sending is finished
await self.send_command_finished(loop=loop, receiver=receiver)
await self.send_command_finished(loop=loop, receiver=receiver)
[docs] def sync_send_dict(self,
name: str,
dict_to_send: Dict[Any, Any],
receiver: Optional[socket] = None) -> None:
"""
Send a whole dictionary field by field as labeled data.
Synchronous version of 'TcpIpObject.receive_labeled_data'.
:param name: Name of the dictionary.
:param dict_to_send: Dictionary to send.
:param receiver: TcpIpObject receiver.
"""
receiver = self.sock if receiver is None else receiver
# If dict is empty, the sending is finished
if dict_to_send is None or dict_to_send == {}:
self.sync_send_command_finished(receiver=receiver)
return
# Sends to make the listener start the receive_dict routine
self.sync_send_command_read()
self.sync_send_labeled_data(data_to_send=name, label="::dict::", receiver=receiver)
# Treat the dictionary field by field
for key in dict_to_send:
# If data is another dict, send as an unnamed dictionary
if type(dict_to_send[key]) == dict:
# Send key
self.sync_send_labeled_data(data_to_send=key, label="dict_id", receiver=receiver)
# Send data
self.__sync_send_unnamed_dict(dict_to_send=dict_to_send[key], receiver=receiver)
# If data is not a dict, send as labeled data
else:
# Send key and data
self.sync_send_labeled_data(data_to_send=dict_to_send[key], label=key, receiver=receiver)
# The sending is finished
self.sync_send_command_finished(receiver=receiver)
self.sync_send_command_finished(receiver=receiver)
async def __send_unnamed_dict(self,
dict_to_send: Dict[Any, Any],
loop: Optional[EventLoop] = None,
receiver: Optional[socket] = None) -> None:
"""
Send a whole dictionary field by field as labeled data. Dictionary will be unnamed.
:param dict_to_send: Dictionary to send
:param loop: Asyncio event loop.
:param receiver: TcpIpObject receiver.
"""
loop = get_event_loop() if loop is None else loop
receiver = self.sock if receiver is None else receiver
# Treat the dictionary field by field
for key in dict_to_send:
# If data is another dict, send as an unnamed dictionary
if type(dict_to_send[key]) == dict:
# Send key
await self.send_labeled_data(data_to_send=key, label="dict_id", loop=loop, receiver=receiver)
# Send data
await self.__send_unnamed_dict(dict_to_send=dict_to_send[key], loop=loop, receiver=receiver)
# If data is not a dict, send as labeled data
else:
# Send key and data
await self.send_labeled_data(data_to_send=dict_to_send[key], label=key, loop=loop, receiver=receiver)
# The sending is finished
await self.send_command_finished(loop=loop, receiver=receiver)
def __sync_send_unnamed_dict(self,
dict_to_send: Dict[Any, Any],
receiver: Optional[socket] = None) -> None:
"""
Send a whole dictionary field by field as labeled data. Dictionary will be unnamed.
Synchronous version of 'TcpIpObject.receive_labeled_data'.
:param dict_to_send: Dictionary to send.
:param receiver: TcpIpObject receiver.
"""
receiver = self.sock if receiver is None else receiver
# Treat the dictionary field by field
for key in dict_to_send:
# If data is another dict, send as an unnamed dictionary
if type(dict_to_send[key]) == dict:
# Send key
self.sync_send_labeled_data(data_to_send=key, label="dict_id", receiver=receiver,
send_read_command=True)
# Send data
self.__sync_send_unnamed_dict(dict_to_send=dict_to_send[key], receiver=receiver)
# If data is not a dict, send as labeled data
else:
# Send key and data
self.sync_send_labeled_data(data_to_send=dict_to_send[key], label=key, receiver=receiver,
send_read_command=True)
# The sending is finished
self.sync_send_command_finished(receiver=receiver)
[docs] async def receive_dict(self,
recv_to: Dict[Any, Any],
loop: Optional[EventLoop] = None,
sender: Optional[socket] = None) -> None:
"""
Receive a whole dictionary field by field as labeled data.
:param recv_to: Dictionary to fill with received fields.
:param loop: Asyncio event loop.
:param sender: TcpIpObject sender.
"""
loop = get_event_loop() if loop is None else loop
sender = self.sock if sender is None else sender
# Receive data while command 'finished' is not received
while (cmd := await self.receive_data(loop=loop, sender=sender)) != self.command_dict['finished']:
# Receive field as a labeled data
label, param = await self.receive_labeled_data(loop=loop, sender=sender)
# If label refers to dict keyword, receive an unnamed dict
if label in ["::dict::", "dict_id"]:
recv_to[param] = {}
await self.receive_dict(recv_to=recv_to[param], loop=loop, sender=sender)
# Otherwise, set the dict field directly
else:
recv_to[label] = param
[docs] def sync_receive_dict(self,
recv_to: Dict[Any, Any],
sender: Optional[socket] = None) -> None:
"""
Receive a whole dictionary field by field as labeled data.
Synchronous version of 'TcpIpObject.receive_labeled_data'.
:param recv_to: Dictionary to fill with received fields.
:param sender: TcpIpObject sender.
"""
sender = self.sock if sender is None else sender
# Receive data while command 'finished' is not received
while self.sync_receive_data() != self.command_dict['finished']:
# Receive field as a labeled data
label, param = self.sync_receive_labeled_data()
# If label refers to dict keyword, receive an unnamed dict
if label in ["::dict::", "dict_id"]:
recv_to[param] = {}
self.sync_receive_dict(recv_to=recv_to[param], sender=sender)
# Otherwise, set the dict field directly
else:
recv_to[label] = param
##########################################################################################
##########################################################################################
# Command related sends #
##########################################################################################
##########################################################################################
async def __send_command(self,
loop: EventLoop,
receiver: socket,
command: str = '') -> None:
"""
Send a bytes command among the available commands.
Do not use this one. Use the dedicated function 'send_command_{cmd}(...)'.
:param loop: Asyncio event loop.
:param receiver: TcpIpObject receiver.
:param command: Name of the command, must be in 'self.command_dict'.
"""
# Check if the command exists
try:
cmd = self.command_dict[command]
except KeyError:
raise KeyError(f"\"{command}\" is not a valid command. Use {self.command_dict.keys()} instead.")
# Send command as a byte data
await self.send_data(data_to_send=cmd, loop=loop, receiver=receiver)
def __sync_send_command(self,
receiver: socket,
command: str = '') -> None:
"""
Send a bytes command among the available commands.
Do not use this one. Use the dedicated function 'sync_send_command_{cmd}(...)'.
Synchronous version of 'TcpIpObject.send_command'.
:param command: Name of the command to send.
:param receiver: TcpIpObject receiver.
"""
# Check if the command exists
try:
cmd = self.command_dict[command]
except KeyError:
raise KeyError(f"\"{command}\" is not a valid command. Use {self.command_dict.keys()} instead.")
# Send command as a byte data
self.sync_send_data(data_to_send=cmd, receiver=receiver)
[docs] async def send_command_compute(self,
loop: Optional[EventLoop] = None,
receiver: Optional[socket] = None) -> None:
"""
Send the 'compute' command.
:param loop: Asyncio event loop.
:param receiver: TcpIpObject receiver.
"""
await self.__send_command(loop=loop, receiver=receiver, command='compute')
[docs] def sync_send_command_compute(self,
receiver: Optional[socket] = None) -> None:
"""
Send the 'compute' command.
Synchronous version of 'TcpIpObject.send_command_compute'.
:param receiver: TcpIpObject receiver.
"""
self.__sync_send_command(receiver=receiver, command='compute')
[docs] async def send_command_done(self,
loop: Optional[EventLoop] = None,
receiver: Optional[socket] = None) -> None:
"""
Send the 'done' command.
:param loop: Asyncio event loop.
:param receiver: TcpIpObject receiver.
"""
await self.__send_command(loop=loop, receiver=receiver, command='done')
[docs] def sync_send_command_done(self,
receiver: Optional[socket] = None) -> None:
"""
Send the 'done' command.
Synchronous version of 'TcpIpObject.send_command_done'.
:param receiver: TcpIpObject receiver.
"""
self.__sync_send_command(receiver=receiver, command='done')
[docs] async def send_command_exit(self,
loop: Optional[EventLoop] = None,
receiver: Optional[socket] = None) -> None:
"""
Send the 'exit' command.
:param loop: Asyncio event loop.
:param receiver: TcpIpObject receiver.
"""
await self.__send_command(loop=loop, receiver=receiver, command='exit')
[docs] def sync_send_command_exit(self,
receiver: Optional[socket] = None) -> None:
"""
Send the 'exit' command.
Synchronous version of 'TcpIpObject.send_command_exit'.
:param receiver: TcpIpObject receiver.
"""
self.__sync_send_command(receiver=receiver, command='exit')
[docs] async def send_command_finished(self,
loop: Optional[EventLoop] = None,
receiver: Optional[socket] = None) -> None:
"""
Send the 'finished' command.
:param loop: Asyncio event loop.
:param receiver: TcpIpObject receiver.
"""
await self.__send_command(loop=loop, receiver=receiver, command='finished')
[docs] def sync_send_command_finished(self,
receiver: Optional[socket] = None) -> None:
"""
Send the 'finished' command.
Synchronous version of 'TcpIpObject.send_command_finished'.
:param receiver: TcpIpObject receiver.
"""
self.__sync_send_command(receiver=receiver, command='finished')
[docs] async def send_command_prediction(self,
loop: Optional[EventLoop] = None,
receiver: Optional[socket] = None) -> None:
"""
Send the 'prediction' command.
:param loop: Asyncio event loop.
:param receiver: TcpIpObject receiver.
"""
await self.__send_command(loop=loop, receiver=receiver, command='prediction')
[docs] def sync_send_command_prediction(self,
receiver: Optional[socket] = None) -> None:
"""
Send the 'prediction' command.
Synchronous version of 'TcpIpObject.send_command_prediction'.
:param receiver: TcpIpObject receiver.
"""
self.__sync_send_command(receiver=receiver, command='prediction')
[docs] async def send_command_read(self,
loop: Optional[EventLoop] = None,
receiver: Optional[socket] = None) -> None:
"""
Send the 'read' command.
:param loop: Asyncio event loop.
:param receiver: TcpIpObject receiver.
"""
await self.__send_command(loop=loop, receiver=receiver, command='read')
[docs] def sync_send_command_read(self,
receiver: Optional[socket] = None) -> None:
"""
Send the 'read' command.
Synchronous version of 'TcpIpObject.send_command_read'.
:param receiver: TcpIpObject receiver.
"""
self.__sync_send_command(receiver=receiver, command='read')
[docs] async def send_command_sample(self,
loop: Optional[EventLoop] = None,
receiver: Optional[socket] = None) -> None:
"""
Send the 'sample' command.
:param loop: Asyncio event loop.
:param receiver: TcpIpObject receiver.
"""
await self.__send_command(loop=loop, receiver=receiver, command='sample')
[docs] def sync_send_command_sample(self,
receiver: Optional[socket] = None) -> None:
"""
Send the 'sample' command.
Synchronous version of 'TcpIpObject.send_command_sample'.
:param receiver: TcpIpObject receiver.
"""
self.__sync_send_command(receiver=receiver, command='sample')
[docs] async def send_command_step(self,
loop: Optional[EventLoop] = None,
receiver: Optional[socket] = None) -> None:
"""
Send the 'step' command.
:param loop: Asyncio event loop.
:param receiver: TcpIpObject receiver.
"""
await self.__send_command(loop=loop, receiver=receiver, command='step')
[docs] def sync_send_command_step(self,
receiver: Optional[socket] = None) -> None:
"""
Send the 'step' command.
Synchronous version of 'TcpIpObject.send_command_step'.
:param receiver: TcpIpObject receiver.
"""
self.__sync_send_command(receiver=receiver, command='step')
[docs] async def send_command_visualisation(self,
loop: Optional[EventLoop] = None,
receiver: Optional[socket] = None) -> None:
"""
Send the 'visualisation' command.
:param loop: Asyncio event loop.
:param receiver: TcpIpObject receiver.
"""
await self.__send_command(loop=loop, receiver=receiver, command='visualisation')
[docs] def sync_send_command_visualisation(self,
receiver: Optional[socket] = None) -> None:
"""
Send the 'visualisation' command.
Synchronous version of 'TcpIpObject.send_command_visualisation'.
:param receiver: TcpIpObject receiver.
"""
self.__sync_send_command(receiver=receiver, command='visualisation')
[docs] async def send_command_change_db(self,
loop: Optional[EventLoop] = None,
receiver: Optional[socket] = None) -> None:
"""
Send the 'change_database' command.
:param loop: Asyncio event loop.
:param receiver: TcpIpObject receiver.
"""
await self.__send_command(loop=loop, receiver=receiver, command='db')
[docs] def sync_send_command_change_db(self,
receiver: Optional[socket] = None) -> None:
"""
Send the 'change_database' command.
Synchronous version of 'TcpIpObject.send_command_change_database'.
:param receiver: TcpIpObject receiver.
"""
self.__sync_send_command(receiver=receiver, command='db')
##########################################################################################
##########################################################################################
# Actions to perform on commands #
##########################################################################################
##########################################################################################
[docs] async def listen_while_not_done(self,
loop: EventLoop,
sender: socket,
data_dict: Dict[Any, Any],
client_id: int = None) -> Dict[Any, Any]:
"""
Compute actions until 'done' command is received.
:param loop: Asyncio event loop.
:param sender: TcpIpObject sender.
:param data_dict: Dictionary to collect data.
:param client_id: ID of a Client.
"""
# Compute actions until 'done' command is received
while (cmd := await self.receive_data(loop=loop, sender=sender)) != self.command_dict['done']:
# Compute the associated action
if cmd in self.command_dict.values():
await self.action_on_command[cmd](data=data_dict, client_id=client_id, sender=sender, loop=loop)
# Return collected data
return data_dict
[docs] async def action_on_compute(self,
data: Dict[Any, Any],
client_id: int,
sender: socket,
loop: EventLoop) -> None:
"""
Action to run when receiving the 'compute' command.
:param data: Dict storing data.
:param client_id: ID of the TcpIpClient.
:param loop: Asyncio event loop.
:param sender: TcpIpObject sender.
"""
pass
[docs] async def action_on_done(self,
data: Dict[Any, Any],
client_id: int,
sender: socket,
loop: EventLoop) -> None:
"""
Action to run when receiving the 'done' command.
:param data: Dict storing data.
:param client_id: ID of the TcpIpClient.
:param loop: Asyncio event loop.
:param sender: TcpIpObject sender.
"""
pass
[docs] async def action_on_exit(self,
data: Dict[Any, Any],
client_id: int,
sender: socket,
loop: EventLoop) -> None:
"""
Action to run when receiving the 'exit' command.
:param data: Dict storing data.
:param client_id: ID of the TcpIpClient.
:param loop: Asyncio event loop.
:param sender: TcpIpObject sender.
"""
pass
[docs] async def action_on_finished(self,
data: Dict[Any, Any],
client_id: int,
sender: socket,
loop: EventLoop) -> None:
"""
Action to run when receiving the 'finished' command.
:param data: Dict storing data.
:param client_id: ID of the TcpIpClient.
:param loop: Asyncio event loop.
:param sender: TcpIpObject sender.
"""
pass
[docs] async def action_on_prediction(self,
data: Dict[Any, Any],
client_id: int,
sender: socket,
loop: EventLoop) -> None:
"""
Action to run when receiving the 'prediction' command.
:param data: Dict storing data.
:param client_id: ID of the TcpIpClient.
:param loop: Asyncio event loop.
:param sender: TcpIpObject sender.
"""
pass
[docs] async def action_on_read(self,
data: Dict[Any, Any],
client_id: int,
sender: socket,
loop: EventLoop) -> None:
"""
Action to run when receiving the 'read' command.
:param data: Dict storing data.
:param client_id: ID of the TcpIpClient.
:param loop: Asyncio event loop.
:param sender: TcpIpObject sender.
"""
# Receive labeled data
label, param = await self.receive_labeled_data(loop=loop, sender=sender)
# If data to receive appears to be a dict, receive dict
if label == "::dict::":
data[client_id][param] = {}
await self.receive_dict(recv_to=data[client_id][param], sender=sender, loop=loop)
# Otherwise add labeled data to data dict
else:
data[client_id][label] = param
[docs] async def action_on_sample(self,
data: Dict[Any, Any],
client_id: int,
sender: socket,
loop: EventLoop) -> None:
"""
Action to run when receiving the 'sample' command.
:param data: Dict storing data.
:param client_id: ID of the TcpIpClient.
:param loop: Asyncio event loop.
:param sender: TcpIpObject sender.
"""
pass
[docs] async def action_on_step(self,
data: Dict[Any, Any],
client_id: int,
sender: socket,
loop: EventLoop) -> None:
"""
Action to run when receiving the 'step' command.
:param data: Dict storing data.
:param client_id: ID of the TcpIpClient.
:param loop: Asyncio event loop.
:param sender: TcpIpObject sender.
"""
pass
[docs] async def action_on_visualisation(self,
data: Dict[Any, Any],
client_id: int,
sender: socket,
loop: EventLoop) -> None:
"""
Action to run when receiving the 'visualisation' command.
:param data: Dict storing data.
:param client_id: ID of the TcpIpClient.
:param loop: Asyncio event loop.
:param sender: TcpIpObject sender.
"""
pass
[docs] async def action_on_change_db(self,
data: Dict[Any, Any],
client_id: int,
sender: socket,
loop: EventLoop) -> None:
"""
Action to run when receiving the 'change_database' command.
:param data: Dict storing data.
:param client_id: ID of the TcpIpClient.
:param loop: Asyncio event loop.
:param sender: TcpIpObject sender.
"""
pass