Files

copied
Last update 6 years 3 months by Olivier Gillet
Fileselementsresources
..
__init__.py
audio_io.py
lookup_tables.py
resources.py
samples.py
audio_io.py
#!/usr/bin/python2.6 # # Author: Olivier Gillet (ol.gillet@gmail.com) """Class and functions to read and write numpy array from and to audio files.""" import copy import logging import numpy import struct import sys sys.path.append('.') # Constant used when converting between unsigned char and float. A different # value is used in both directions to avoid clipping. _UNSIGNED_CHAR_TO_FLOAT_SCALE = 128.0 _FLOAT_TO_UNSIGNED_CHAR_SCALE = 127.0 _DATA_CHUNK_HEADER_SIZE = 8 _FMT_CHUNK_DATA_SIZE = 16 _FMT_CHUNK_HEADER_SIZE = 8 _RIFF_FORMAT_DESCRIPTOR_SIZE = 4 class AudioIoException(Exception): """An error indicating a failure in audio file reading/writing.""" def __init__(self, message): """Initializes an AudioIoException object.""" Exception.__init__(self, 'Audio IO error: %s' % message) def _ReadBytesOrFail(file_object, num_bytes, error_message): """Read a given number of bytes from the file or raise an error. Args: file_object: file object. num_bytes: int. number of bytes to read. error_message: string. text message of the exception thrown when the number of bytes could not be read (for example, identifying which section the caller attempted to read.) Returns: String with the bytes read from the file. Raises: AudioIoException: - The required number of bytes could not be read from the file. """ read = file_object.read(num_bytes) if len(read) < num_bytes: raise AudioIoException(error_message) return read def _GoToIffChunk(file_object, iff_chunk_id): """Jump to a named chunk in a (R)IFF file. Args: file_object: file object. iff_chunk_id: 4 chars ID of the chunk. Returns: length of the chunk in bytes. -1 if the chunk has not been found. If the chunk is found, file_object is positioned at the beginning of the chunk. Otherwise, it is positioned at the end of the file. """ while True: chunk_id = file_object.read(4) if len(chunk_id) < 4: return -1 chunk_size = file_object.read(4) if len(chunk_size) < 4: return -1 chunk_size = struct.unpack('<L', chunk_size) if iff_chunk_id == chunk_id: return chunk_size[0] else: file_object.seek(chunk_size, 1) def ReadWavFile(file_name, scale=True): """Read a .wav file into a numpy array. Note: the FFmpeg based AudioDecoder is more generic, use this only as a low-level alternative to AudioDecoder. Args: file_name: string. name of the local file to load. scale: boolean. if True, returns float data in the [-1, 1] range instead of integers. Returns: 2-dimensional numpy array of size (num_samples, num_channels) Raises: AudioIoException: - The file header is corrupted. - The file uses an unsupported sampling rate, bitdepth or codec. """ f = file(file_name, 'r') header = f.read(12) if len(header) < 12 or header[:4] != 'RIFF' or header[8:] != 'WAVE': raise AudioIoException('Corrupted header') format_header_size = _GoToIffChunk(f, 'fmt ') if format_header_size < 0 or format_header_size != 16: raise AudioIoException('Invalid header size') format_header = _ReadBytesOrFail(f, 16, 'Corrupted header') compression, num_channels, sample_rate, _, _, bitdepth = struct.unpack( '<HHLLHH', format_header) if compression != 1: raise AudioIoException('Unknown .wav codec: %d' % compression) if not num_channels: raise AudioIoException('Wrong number of channels') if sample_rate < 1000 or sample_rate > 96000: raise AudioIoException('Invalid sample rate') if bitdepth != 8 and bitdepth != 16: raise AudioIoException('Unsupported bit depth') sample_data_size = _GoToIffChunk(f, 'data') num_samples = sample_data_size / (bitdepth / 8) # Make sure we are reading a number of samples which is a multiple of the # number of channels. Some corrupted stereo .wav files may contain 5 samples! num_samples -= num_samples % num_channels if bitdepth == 8: samples = numpy.fromfile(f, dtype=numpy.ubyte, count=num_samples) if scale: samples = (samples / _UNSIGNED_CHAR_TO_FLOAT_SCALE) - 1.0 else: bytes = bitdepth / 8 samples = numpy.fromfile(f, dtype='<i%d' % bytes, count=num_samples) if scale: # Semantics of x = x / y and x /= y are different when x and y are # numpy arrays of a different type. x /= y casts to y's type, while # x = x / y casts to x's type. # pylint: disable-msg=C6407 samples = samples / float(1 << (bitdepth - 1)) return (samples.reshape(-1, num_channels), sample_rate) def Quantize(signal, bitdepth, normalize=True): """Convert an array of float to an array of integers. Args: signal: numpy array. source signal. bitdepth: int. size of the integer in bits. normalize: boolean. whether samples should be scaled to use all the available dynamic range. Returns: array of integers. """ norm = numpy.abs(signal).max() # Normalization or clipping. if normalize and norm > 0: scaled_signal = signal / norm else: scaled_signal = copy.copy(signal) if norm > 1.0: logging.warning('Some samples will be clipped.') # Clip samples above 1 and below -1. scaled_signal[scaled_signal < -1] = -1 scaled_signal[scaled_signal > 1] = 1 if bitdepth == 8: scaled_signal = (scaled_signal + 1.0) * _FLOAT_TO_UNSIGNED_CHAR_SCALE scaled_signal = numpy.array(scaled_signal, dtype=numpy.uint8) else: scale = (1 << (bitdepth - 1)) - 1 # pylint: disable-msg=C6407 scaled_signal = scaled_signal * scale scaled_signal = numpy.array(scaled_signal, dtype='i%d' % (bitdepth / 8)) return scaled_signal def WriteWavFile(signal, sample_rate, file_name, bitdepth=16, normalize=True): """Write a .wav file from a numpy array. Args: signal: 2-dimensional numpy array, of size (num_samples, num_channels). sample_rate: int. sample rate of the signal in Hz. file_name: string. name of the destination file. bitdepth: int. bitdepth in bits (default 16). normalize: boolean. if set to True, scale the data to the [-1, 1] range before writing. """ if signal.dtype == numpy.uint8 or signal.dtype == numpy.int16: bitdepth = signal.dtype.itemsize * 8 scaled_signal = signal else: scaled_signal = Quantize(signal, bitdepth, normalize=normalize) if scaled_signal.ndim == 1: num_channels = 1 else: num_channels = scaled_signal.shape[1] # Compute the total size of the output .wav file, minus the size of the # first two fields of the RIFF header. # RIFF Format. total_size = _RIFF_FORMAT_DESCRIPTOR_SIZE # 'fmt ' chunk. total_size += _FMT_CHUNK_HEADER_SIZE + _FMT_CHUNK_DATA_SIZE # 'data' chunk. total_size += _DATA_CHUNK_HEADER_SIZE + scaled_signal.nbytes f = file(file_name, 'w') try: f.write('RIFF') f.write(struct.pack('<L', total_size)) f.write('WAVEfmt ') bitrate = sample_rate * num_channels * (bitdepth / 8) bits_per_sample = num_channels * (bitdepth / 8) f.write(struct.pack('<LHHLLHH', 16, 1, num_channels, sample_rate, bitrate, bits_per_sample, bitdepth)) f.write('data') f.write(struct.pack('<L', scaled_signal.nbytes)) scaled_signal.tofile(f) finally: f.close()
Report a bug