Source code for BytesConverter

from typing import Callable, Dict, Union, List
from numpy import ndarray, array, frombuffer, zeros
from struct import pack, unpack, calcsize

Convertible = Union[type(None), bytes, str, bool, int, float, List, ndarray]


[docs]class BytesConverter: def __init__(self): """ Convert usual types to bytes and vice versa. Available types: None, bytes, str, bool, int, float, list, ndarray. """ # Data to bytes conversions self.__data_to_bytes_conversion: Dict[type, Callable[[Convertible], bytes]] = { type(None): lambda d: b'0', bytes: lambda d: d, str: lambda d: d.encode('utf-8'), bool: lambda d: bytearray(pack('?', d)), int: lambda d: bytearray(pack('i', d)), float: lambda d: bytearray(pack('f', d)), list: lambda d: array(d, dtype=float).tobytes(), ndarray: lambda d: array(d, dtype=float).tobytes(), } # Bytes to data conversions self.__bytes_to_data_conversion: Dict[str, Callable[[...], Convertible]] = { type(None).__name__: lambda b: None, bytes.__name__: lambda b: b, str.__name__: lambda b: b.decode('utf-8'), bool.__name__: lambda b: unpack('?', b)[0], int.__name__: lambda b: unpack('i', b)[0], float.__name__: lambda b: unpack('f', b)[0], list.__name__: lambda b, t, s: frombuffer(b).astype(t).reshape(s).tolist(), ndarray.__name__: lambda b, t, s: frombuffer(b).astype(t).reshape(s), } # Size of a bytes field self.size_to_bytes: Callable[[bytes], bytes] = lambda i: self.__data_to_bytes_conversion[int](len(i)) self.size_from_bytes: Callable[[bytes], int] = lambda b: self.__bytes_to_data_conversion[int.__name__](b) self.int_size: int = calcsize("i")
[docs] def data_to_bytes(self, data: Convertible, as_list: bool = False) -> Union[bytes, List[bytes]]: """ Convert data to bytes. Available types: None, bytes, str, bool, signed int, float, list, ndarray. :param data: Data to convert. :param as_list: (For tests only, False by default) If False, the whole bytes message is returned. If True, the return will be a list of bytes fields. :return: Concatenated bytes fields (Number of fields, Size of fields, Type, Data, Args). """ # Convert the type of 'data' from str to bytes type_data = self.__data_to_bytes_conversion[str](type(data).__name__) # Convert 'data' to bytes data_bytes = self.__data_to_bytes_conversion[type(data)](data) # Store the sizes of the bytes fields, sizes will have a constant number of 4 bytes sizes = (self.size_to_bytes(type_data), self.size_to_bytes(data_bytes)) # Additional arguments are required for some types of data args = () # Shape and datatype for list and array if type(data) in [list, ndarray]: # Get python native datatype of array dtype = type(zeros(1, dtype=array(data).dtype).item()).__name__ # Convert datatype of array from str to bytes dtype_bytes = self.__data_to_bytes_conversion[str](dtype) # Convert data shape from array to bytes shape_bytes = self.__data_to_bytes_conversion[ndarray](array(data).shape) # Store the sizes of the bytes fields sizes += (self.size_to_bytes(dtype_bytes), self.size_to_bytes(shape_bytes)) # Add the bytes fields to additional arguments args += (dtype_bytes, shape_bytes) # Convert the number of bytes fields to bytes (type_data, data_bytes, args) nb_fields = self.__data_to_bytes_conversion[int](2 + len(args)) # Gather all bytes fields in the desired order fields = [nb_fields, *sizes, type_data, data_bytes, *args] if as_list: return fields # Concatenate bytes fields bytes_message = fields[0] for f in fields[1:]: bytes_message += f return bytes_message
[docs] def bytes_to_data(self, bytes_fields: List[bytes]) -> Convertible: """ Recover data from bytes fields. Available types: None, bytes, str, bool, signed int, float, list, ndarray. :param bytes_fields: Bytes fields (Type, Data, Args). :return: Converted data. """ # Recover the data type data_type = self.__bytes_to_data_conversion[str.__name__](bytes_fields[0]) # Recover additional arguments args = () # Shape and data type for list and array if data_type in [list.__name__, ndarray.__name__]: # Recover datatype of array args += (self.__bytes_to_data_conversion[str.__name__](bytes_fields[2]),) # Recover shape of array args += (self.__bytes_to_data_conversion[ndarray.__name__](bytes_fields[3], int, -1),) # Convert bytes to data return self.__bytes_to_data_conversion[data_type](bytes_fields[1], *args)