"""A group of useful functions"""
import inspect
import os
import numpy as np
from numpy import polymul, polyadd
import scipy
import scipy.linalg
import scipy.signal
from .py2to3 import range
[docs]class UndefinedError(Exception): pass
[docs]def atleast_2d_row(array):
"""Converts 1d arrays to 2d arrays, but always as row vectors"""
array = np.array(array)
if array.ndim < 2:
return np.atleast_2d(array)
else:
return array
[docs]def atleast_2d_col(array):
"""Converts 1d arrays to 2d arrays, but always as column vectors"""
array = np.array(array)
if array.ndim < 2:
return np.atleast_2d(array).T
else:
return array
[docs]def make_iterable(arg):
"""Checks if ``arg`` is iterable. If not, makes it a one-element list.
Otherwise returns ``arg``."""
try:
iterator = iter(arg)
return arg
except TypeError:
return [arg]
[docs]def flatten_list(my_list):
"""Flatten a list of lists into a single list."""
return [num for elem in my_list for num in elem]
[docs]def save_array_text(array, file_name, delimiter=None):
"""Saves a 1D or 2D array to a text file.
Args:
``array``: 1D or 2D or array to save to file.
``file_name``: Filepath to location where data is to be saved.
Kwargs:
``delimiter``: Delimiter in file. Default is same as ``numpy.savetxt``.
Format of saved files is::
2.3 3.1 2.1 ...
5.1 2.2 9.8 ...
7.6 3.1 5.5 ...
0.1 1.9 9.1 ...
...
Complex data is saved in the following format (as floats)::
real[0,0] imag[0,0] real[0,1] imag[0,1] ...
real[1,0] imag[1,0] real[1,1] imag[1,1] ...
...
Files can be read in Matlab with the provided functions or often
with Matlab's ``load``.
"""
# Force data to be an array
array = np.array(array)
# If array is 1d, then make it into a 2d column vector
if array.ndim == 1:
array = atleast_2d_col(array)
elif array.ndim > 2:
raise RuntimeError('Cannot save an array with >2 dimensions')
# Save data
if delimiter is None:
np.savetxt(file_name, array.view(float))
else:
np.savetxt(file_name, array.view(float), delimiter=delimiter)
[docs]def load_array_text(file_name, delimiter=None, is_complex=False):
"""Reads data saved in a text file, returns an array.
Args:
``file_name``: Name of file from which to load data.
Kwargs:
``delimiter``: Delimiter in file. Default is same as ``numpy.loadtxt``.
``is_complex``: Boolean describing whether the data to be loaded is
complex valued.
Returns:
``array``: 2D array containing loaded data.
See :py:func:`save_array_text` for the format used by this function.
"""
# Set data type
if is_complex:
dtype = complex
else:
dtype = float
# Load data
array = np.loadtxt(file_name, delimiter=delimiter, ndmin=2)
if is_complex and array.shape[1] % 2 != 0:
raise ValueError(
('Cannot load complex data, file %s has an odd number of columns. '
'Maybe it has real data.') % file_name)
# Cast as an array, copies to make it C-contiguous memory
return np.array(array.view(dtype))
[docs]def get_file_list(directory, file_extension=None):
"""Returns list of files in ``directory`` with ``file_extension``."""
files = os.listdir(directory)
if file_extension is not None:
if len(file_extension) == 0:
print('Warning: gave an empty file extension.')
filtered_files = []
for f in files:
if f[-len(file_extension):] == file_extension:
filtered_files.append(f)
return filtered_files
else:
return files
[docs]def get_data_members(obj):
"""Returns a dictionary containing data members of ``obj``."""
data_members = {}
for name in dir(obj):
value = getattr(obj, name)
if not name.startswith('__') and not inspect.ismethod(value):
data_members[name] = value
return data_members
[docs]def sum_arrays(arr1, arr2):
"""Used for ``allreduce`` command."""
return np.array(arr1) + np.array(arr2)
[docs]def sum_lists(list1, list2):
"""Sums the elements of each list, returns a new list.
This function is used in MPI reduce commands, but could be used
elsewhere too."""
assert len(list1) == len(list2)
return [list1[i] + list2[i] for i in range(len(list1))]
[docs]def smart_eq(arg1, arg2):
"""Checks for equality, accounting for the fact that numpy's ``==`` doesn't
return a bool. In that case, returns True only if all elements are equal."""
if type(arg1) != type(arg2):
return False
if isinstance(arg1, np.ndarray):
if arg1.shape != arg2.shape:
return False
return (arg1 == arg2).all()
return arg1 == arg2
[docs]class InnerProductBlock(object):
"""Only used in tests. Takes inner product of all vectors."""
def __init__(self, inner_product):
self.inner_product = inner_product
def __call__(self, vecs1, vecs2):
n1 = len(vecs1)
n2 = len(vecs2)
IP_array = np.zeros(
(n1, n2),
dtype=type(self.inner_product(vecs1[0], vecs2[0])))
for i in range(n1):
for j in range(n2):
IP_array[i, j] = self.inner_product(vecs1[i], vecs2[j])
return IP_array
[docs]def svd(array, atol=1e-13, rtol=None):
"""Wrapper for ``numpy.linalg.svd``, computes the singular value
decomposition of an array.
Args:
``array``: Array to take singular value decomposition of.
Kwargs:
``atol``: Level below which singular values are truncated.
``rtol``: Maximum relative difference between largest and smallest
singular values. Smaller ones are truncated.
Returns:
``U``: Array whose columns are left singular vectors.
``S``: 1D array of singular values.
``V``: Array whose columns are right singular vectors.
Truncates ``U``, ``S``, and ``V`` such that the singular values
obey both ``atol`` and ``rtol``.
"""
# Compute SVD (force data to be array)
U, S, V_conj_T = np.linalg.svd(np.array(array), full_matrices=False)
V = V_conj_T.conj().T
# Figure out how many singular values satisfy the tolerances
if atol is not None:
num_nonzeros_atol = (abs(S) > atol).sum()
else:
num_nonzeros_atol = S.size
if rtol is not None:
num_nonzeros_rtol = (
abs(S[:num_nonzeros_atol]) / abs(S[0]) > rtol).sum()
num_nonzeros = min(num_nonzeros_atol, num_nonzeros_rtol)
else:
num_nonzeros = num_nonzeros_atol
# Truncate arrays according to tolerances
U = U[:, :num_nonzeros]
V = V[:, :num_nonzeros]
S = S[:num_nonzeros]
return U, S, V
[docs]def eigh(array, atol=1e-13, rtol=None, is_positive_definite=False):
"""Wrapper for ``numpy.linalg.eigh``. Computes eigendecomposition of a
Hermitian array.
Args:
``array``: Array to take eigendecomposition of.
``atol``: Value below which eigenvalues (and corresponding
eigenvectors) are truncated.
``rtol``: Maximum relative difference between largest and smallest
eigenvalues. Smaller ones are truncated.
``is_positive_definite``: If true, array being decomposed will be
assumed to be positive definite. Tolerance will be automatically
adjusted (if necessary) so that only positive eigenvalues are returned.
Returns:
``eigvals``: 1D array of eigenvalues, sorted in descending order (of
magnitude).
``eigvecs``: Array whose columns are eigenvectors.
"""
# Compute eigendecomposition (force data to be array)
eigvals, eigvecs = np.linalg.eigh(np.array(array))
# Sort the vecs and eigvals by eigval magnitude. The first element will
# have the largest magnitude and the last element will have the smallest
# magnitude.
sort_indices = np.argsort(np.abs(eigvals))[::-1]
eigvals = eigvals[sort_indices]
eigvecs = eigvecs[:, sort_indices]
# Adjust absolute tolerance for positive definite case if there are negative
# eigenvalues and the most negative one has magnitude greater than the
# given tolerance. In that case, we assume the given tolerance is too
# samll (relative to the accuracy of the computation) and increase it to at
# least filter out negative eigenvalues.
if is_positive_definite and eigvals.min() < 0 and abs(eigvals.min()) > atol:
atol = abs(eigvals.min())
# Filter out small and negative eigenvalues, if necessary
if atol is not None:
num_nonzeros_atol = (abs(eigvals) > atol).sum()
else:
num_nonzeros_atol = eigvals.size
if rtol is not None:
num_nonzeros_rtol = (
abs(eigvals[:num_nonzeros_atol]) / abs(eigvals[0]) > rtol).sum()
num_nonzeros = min(num_nonzeros_atol, num_nonzeros_rtol)
else:
num_nonzeros = num_nonzeros_atol
eigvals = eigvals[:num_nonzeros]
eigvecs = eigvecs[:, :num_nonzeros]
return eigvals, eigvecs
[docs]def eig_biorthog(array, scale_choice='left'):
"""Wrapper for ``numpy.linalg.eig`` that returns both left and right
eigenvectors. Eigenvalues and eigenvectors are sorted and scaled so that
the left and right eigenvector arrays are orthonormal.
Args:
``array``: Array to take eigendecomposition of.
Kwargs:
``scale_choice``: Determines whether 'left' (default) or 'right'
eigenvectors will be scaled to yield a biorthonormal set. The other
eigenvectors will be left unscaled, leaving them with unit norms.
Returns:
``evals``: 1D array of eigenvalues.
``R_evecs``: Array whose columns are right eigenvectors.
``L_evecs``: Array whose columns are left eigenvectors.
"""
# Force data to be array
array = np.array(array)
# Compute eigendecomposition
evals, L_evecs, R_evecs = scipy.linalg.eig(array, left=True, right=True)
# Scale the evecs to get a biorthogonal set
scale_factors = np.diag(np.dot(L_evecs.conj().T, R_evecs))
if scale_choice.lower() == 'left':
L_evecs /= scale_factors.conj()
elif scale_choice.lower() == 'right':
R_evecs /= scale_factors
else:
raise ValueError('Invalid scale choice. Must be LEFT or RIGHT.')
return evals, R_evecs, L_evecs
[docs]def balanced_truncation(
A, B, C, order=None, return_sing_vals=False):
"""Balance and truncate discrete-time linear time-invariant (LTI) system
defined by A, B, C arrays. It's not very accurate due to numerical issues.
Args:
``A``, ``B``, ``C``: LTI discrete-time arrays.
Kwargs:
``order``: Order (number of states) of truncated system. Default is to
use the maximal possible value (can truncate system afterwards).
Returns:
``A_balanced``, ``B_balanced``, ``C_balanced``: LTI discrete-time
arrays of balanced system.
If ``return_sing_vals`` is True, also returns:
``sing_vals``: Hankel singular values.
Notes:
- ``D`` is unchanged by balanced truncation.
- This function is not computationally efficient or accurate relative to
Matlab's ``balancmr``.
"""
A = np.array(A)
B = np.array(B)
C = np.array(C)
gram_cont = scipy.linalg.solve_lyapunov(A, B.dot(B.transpose().conj()))
gram_obsv = scipy.linalg.solve_lyapunov(A.transpose().conj(),
C.transpose().conj().dot(C))
Uc, Ec, Vc = svd(gram_cont)
Uo, Eo, Vo = svd(gram_obsv)
Lc = Uc.dot(np.diag(Ec**0.5))
Lo = Uo.dot(np.diag(Eo**0.5))
U, E, V = svd(Lo.transpose().dot(Lc))
if order is None:
order = len(E)
SL = Lo.dot(U[:, :order]).dot(np.diag(E[:order]**-0.5))
SR = Lc.dot(V[:, :order]).dot(np.diag(E[:order]**-0.5))
A_bal_trunc = SL.transpose().dot(A).dot(SR)
B_bal_trunc = SL.transpose().dot(B)
C_bal_trunc = C.dot(SR)
if return_sing_vals:
return A_bal_trunc, B_bal_trunc, C_bal_trunc, E
else:
return A_bal_trunc, B_bal_trunc, C_bal_trunc
[docs]def lsim(A, B, C, inputs, initial_condition=None):
"""Simulates a discrete-time system with arbitrary inputs.
:math:`x(n+1) = Ax(n) + Bu(n)`
:math:`y(n) = Cx(n)`
Args:
``A``, ``B``, and ``C``: State-space system arrays.
``inputs``: Array of inputs :math:`u`, with dimensions
``[num_time_steps, num_inputs]``.
Kwargs:
``initial_condition``: Initial condition :math:`x(0)`.
Returns:
``outputs``: Array of outputs :math:`y`, with dimensions
``[num_time_steps, num_outputs]``.
``D`` array is assumed to be zero.
"""
A = np.array(A)
B = np.array(B)
C = np.array(C)
ss = scipy.signal.StateSpace(A, B, C,
np.zeros((C.shape[0], B.shape[1])), dt=1)
tout_dum, outputs, xout_dum = scipy.signal.dlsim(
ss, inputs, x0=initial_condition)
return outputs
[docs]def impulse(A, B, C, num_time_steps=None):
"""Generates impulse response outputs for a discrete-time system.
Args:
``A``, ``B``, ``C``: State-space system arrays.
Kwargs:
``num_time_steps``: Number of time steps to simulate.
Returns:
``outputs``: Impulse response outputs, with indices corresponding to
[time step, output, input].
No D array is included, but one can simply be prepended to the output if
it is non-zero.
"""
ss = scipy.signal.StateSpace(A, B, C, np.zeros((C.shape[0], B.shape[1])), dt=1)
if num_time_steps is not None:
dum, Markovs = scipy.signal.dimpulse(ss, n=num_time_steps+1)
else:
dum, Markovs = scipy.signal.dimpulse(ss)
# Remove the first element, which is 0, since we define C*B as first output
# of impulse response, i.e., x(0) == B.
Markovs = np.array(Markovs).swapaxes(0, 1).swapaxes(1, 2)[1:]
return Markovs
[docs]def load_signals(signal_path, delimiter=None):
"""Loads signals from text files with columns [t signal1 signal2 ...].
Args:
``signal_paths``: List of filepaths to files containing signals.
Returns:
``time_values``: 1D array of time values.
``signals``: Array of signals with dimensions [time, signal].
Convenience function. Example file has format::
0 0.1 0.2
1 0.2 0.46
2 0.2 1.6
3 0.6 0.1
"""
raw_data = load_array_text(signal_path, delimiter=delimiter)
num_signals = raw_data.shape[1] - 1
if num_signals == 0:
raise ValueError('Data must have at least two columns')
time_values = raw_data[:, 0]
signals = raw_data[:, 1:]
# Guarantee that signals is 2D
if signals.ndim == 1:
signals = signals.reshape((signals.shape[0], 1))
return time_values, signals
[docs]def load_multiple_signals(signal_paths, delimiter=None):
"""Loads multiple signal files from text files with columns [t channel1
channel2 ...].
Args:
``signal_paths``: List of filepaths to files containing signals.
Returns:
``time_values``: 1D array of time values.
``all_signals``: Array of signals with indices [path, time, signal].
See :py:func:`load_signals`.
"""
num_signal_paths = len(signal_paths)
# Read the first file to get parameters
time_values, signals = load_signals(signal_paths[0], delimiter=delimiter)
num_time_values = len(time_values)
num_signals = signals.shape[1]
# Now allocate array and read all of the signals
all_signals = np.zeros((num_signal_paths, num_time_values, num_signals))
# Set the signals we already loaded
all_signals[0] = signals
# Load all remaining files
for path_num, signal_path in enumerate(signal_paths):
time_values_read, signals = load_signals(signal_path,
delimiter=delimiter)
if not np.allclose(time_values_read, time_values):
raise ValueError('Time values in %s are inconsistent with '
'other files')
all_signals[path_num] = signals
return time_values, all_signals
[docs]def Hankel(first_col, last_row=None):
"""
Construct a Hankel array, whose skew diagonals are constant.
Args:
``first_col``: 1D array corresponding to first column of Hankel array.
Kwargs:
``last_row``: 1D array corresponding to the last row of Hankel array.
First element will be ignored. Default is an array of zeros of the same
size as ``first_col``.
Returns:
Hankel: 2D array with dimensions ``[len(first_col), len(last_row)]``.
"""
first_col = np.array(first_col).flatten()
if last_row is None:
last_row = np.zeros(first_col.shape)
else:
last_row = last_row.flatten()
unique_vals = np.concatenate((first_col, last_row[1:]))
a, b = np.ogrid[0:len(first_col), 0:len(last_row)]
indices = a + b
return unique_vals[indices]
[docs]def Hankel_chunks(first_col_chunks, last_row_chunks=None):
"""
Construct a Hankel array using chunks, whose elements have Hankel structure
at the chunk level (constant along skew diagonals), rather than at the
element level.
Args:
``first_col_chunks``: List of 2D arrays corresponding to the first
column of Hankel array chunks.
Kwargs:
``last_row_chunks``: List of 2D arrays corresponding to the last row of
Hankel array chunks. Default is a list of arrays of zeros.
Returns:
Hankel: 2D array with dimension
``[len(first_col) * first_col[0].shape[0],
len(last_row) * last_row[0].shape[1]]``.
"""
# If nothing is passed in for last row, use a list of chunks where each
# chunk is an array of zeros, and each chunk has the same size as the chunks
# in the first column.
if last_row_chunks is None:
last_row_chunks = [
np.zeros(first_col_chunks[0].shape)] * len(first_col_chunks)
# Gather the unique chunks in one list
unique_chunks = first_col_chunks + last_row_chunks[1:]
# Use a list comprehension to create a list where each element of the list
# is an array corresponding a all the chunks in a row. To get that array,
# slice the list of unique chunks using the right index and call hstack on
# it. Finally, call vstack on the list comprehension to get the whole
# Hankel array.
return np.vstack([np.hstack(
np.array(unique_chunks[idx:idx + len(last_row_chunks)]))
for idx in range(len(first_col_chunks))])