def steering_matrix(doas, wavelength, sensor_locations):
"""
Generates a steering matrix.
:param doas: A numpy array of DOAs. Must be a row vector.
:param wavelength: Wavelength.
:param sensor_locations: A numpy array of sensor locations.
:returns: A steering matrix.
"""
return np.exp(1j * 2 * math.pi / wavelength * np.reshape(sensor_locations, (-1, 1)) * np.sin(doas))
def generate_cov_mat(doas, wavelength, sensor_locations, noise_power=1., snr=0., n_snapshots=1):
"""
Generates a sample covariance matrix.
:param doas: A numpy array of DOAs. Must be a row vector.
:param wavelength: Wavelength.
:param sensor_locations: A numpy array of sensor locations.
:param noise_power: Variance of the additive noise.
:param snr: Signal-to-noise ratio in dB.
:param n_snapshots: Number of snapshots.
:returns: A sample covariance matrix.
"""
noise_power = 10**(-snr / 10)
A = steering_matrix(doas, wavelength, sensor_locations)
# Noise
N = 1j * np.random.randn(sensor_locations.size, n_snapshots)
N += np.random.randn(sensor_locations.size, n_snapshots)
N *= math.sqrt(noise_power)
# Source
S = 1j * np.random.randn(doas.size, n_snapshots)
S += np.random.randn(doas.size, n_snapshots)
S *= math.sqrt(0.5)
# Snapshot model
Y = A @ S + N
return (Y @ Y.conj().T) / n_snapshots
def process_cov_mat(R):
"""
Convert a sample covariance matrix for the input layer of our
neural network.
:param R: An MxM sample covariance matrix.
:returns: Converted sample covariance matrix. An M^2 x 1 vector.
"""
R /= np.linalg.norm(R, 'fro')
# Because R is Hermitian, we only need the upper triangular part.
# Hence we can combine the real and imaginary part here.
R_comb = np.triu(R.real) + np.tril(R.imag)
# Vectorize
return R_comb.reshape((-1,))
# Generates flatten and normalized covariance matrices.
class CovarianceMatrixGenerator:
def __init__(self, batch_size, doa_grid, sensor_locations, wavelength=1, min_n_doas=1, max_n_doas=None,
min_n_snapshots=50, max_n_snapshots=1000, min_snr=-10, max_snr=10):
"""
Creates a data generator that generates sample covariance matrices.
Note: The data generation process is not cheap. Ideally we want to
run the data generator on a separate thread. Overall the
training process is CPU bound.
:param batch_size: Batch size.
:param doa_grid: A numpy vector representing the DOA grid.
:param sensor_locations: A numpy array of sensor locations.
:param wavelength: Wavelength.
:param min_n_doas: Minimum number of DOAs.
:param max_n_doas: Maximum number of DOAs.
:param min_n_snapshots: Minimum number of snapshots.
:param max_n_snapshots: Maximum number of snapshots.
:param min_snr: Minimum SNR.
:param max_snr: Maximum SNR.
"""
self.batch_size = batch_size
self.doa_grid = doa_grid
self.sensor_locations = sensor_locations
self.wavelength = wavelength
self.min_n_doas = min_n_doas
if max_n_doas is None:
self.max_n_doas = sensor_locations.size - 1
else:
# Cannot have too many source.
self.max_n_doas = min(sensor_locations.size - 1, max_n_doas)
self.min_n_snapshots = min_n_snapshots
self.max_n_snapshots = max_n_snapshots
self.min_snr = min_snr
self.max_snr = max_snr
def __pick_doas(self):
"""
Picks doas on grid.
"""
k = np.random.randint(self.min_n_doas, self.max_n_doas + 1)
while True:
ind = np.random.choice(self.doa_grid.size, k)
# ensure minimal separation
if k == 1 or np.min(np.diff(np.sort(ind))) > 2:
break
return self.doa_grid[ind], ind
def generate(self):
"""
Generate a batch of training samples.
"""
r_dim = self.sensor_locations.size**2
n_grid_points = self.doa_grid.size
# An infinite generator.
while True:
x = np.zeros((self.batch_size, r_dim), dtype=np.float32)
y = np.zeros((self.batch_size, n_grid_points), dtype=np.float32)
for i in range(self.batch_size):
doas, ind = self.__pick_doas()
snr = np.random.uniform(self.min_snr, self.max_snr)
n_snapshots = np.random.randint(self.min_n_snapshots, self.max_n_snapshots + 1)
# We use normalized noise variance.
R = generate_cov_mat(doas, self.wavelength, self.sensor_locations, 1., snr, n_snapshots)
x[i,:] = process_cov_mat(R)
y[i,ind] = 1.0
yield x, y
class CombinedLoss:
def __init__(self, weight=0.5):
"""
Creates a combined loss function.
:param weight: Weight between the binary cross entropy loss and the L1 loss.
"""
self.weight = weight
def compute_loss(self, y_true, y_pred):
return binary_crossentropy(y_true, y_pred) * self.weight + mean_absolute_error(y_true, y_pred) * (1.0 - self.weight)
class PlotCallback(Callback):
def __init__(self, doa_grid, x, true_doas, outputdir):
"""
Plots the result for a fixed input after each epoch.
:param doa_grid: DOA grid.
:param x: Input to the neural network.
:param true_doas: True doas.
"""
super().__init__()
self.doa_grid = doa_grid
self.x = x
self.true_doas = true_doas
self.outputdir = outputdir
def on_epoch_end(self, epoch, logs=None):
plt.ioff()
y_pred = self.model.predict(self.x, verbose=0)
plt.figure()
plt.plot(self.doa_grid, y_pred[0,:])
plt.stem(self.true_doas, np.ones(self.true_doas.shape), '--', basefmt=' ')
plt.legend(['Estimated', 'True DOAs'])
plt.xlabel('DOA')
plt.ylabel('Output')
plt.savefig(self.outputdir + '/output_%d.png' % (epoch,))
plt.close()
plt.ion()