-
Notifications
You must be signed in to change notification settings - Fork 48
Closed
Labels
Description
@jmcvey3 can you let me know if I am approaching this incorrectly?
Description:
The docstring of cpsd_quasisync_1D claims that I can pass a custom np.array window.
MHKiT-Python/mhkit/dolfyn/tools/fft.py
Lines 81 to 162 in 2ccc286
| def cpsd_quasisync_1D(a, b, nfft, fs, window='hann'): | |
| """ | |
| Compute the cross power spectral density (CPSD) of the signals `a` and `b`. | |
| Parameters | |
| ---------- | |
| a : numpy.ndarray | |
| The first signal. | |
| b : numpy.ndarray | |
| The second signal. | |
| nfft : int | |
| The number of points in the fft. | |
| fs : float | |
| The sample rate (e.g. sample/second). | |
| window : {None, 1, 'hann', numpy.ndarray} | |
| The window to use (default: 'hann'). Valid entries are: | |
| - None,1 : uses a 'boxcar' or ones window. | |
| - 'hann' : hanning window. | |
| - a length(nfft) array : use this as the window directly. | |
| Returns | |
| ------- | |
| cpsd : numpy.ndarray | |
| The cross-spectral density of `a` and `b`. | |
| See Also | |
| --------- | |
| :func:`psd`, | |
| :func:`coherence_1D`, | |
| :func:`cpsd_1D`, | |
| numpy.fft | |
| Notes | |
| ----- | |
| `a` and `b` do not need to be 'tightly' synchronized, and can even | |
| be different lengths, but the first- and last-index of both series | |
| should be synchronized (to whatever degree you want unbiased | |
| phases). | |
| This performs: | |
| .. math:: | |
| fft(a)*conj(fft(b)) | |
| Note that this is consistent with :func:`numpy.correlate`. | |
| It detrends the data and uses a minimum of 50% overlap for the | |
| shorter of `a` and `b`. For the longer, the overlap depends on the | |
| difference in size. 1-(l_short/l_long) data will be underutilized | |
| (where l_short and l_long are the length of the shorter and longer | |
| series, respectively). | |
| The units of the spectra is the product of the units of `a` and | |
| `b`, divided by the units of fs. | |
| """ | |
| if np.iscomplexobj(a) or np.iscomplexobj(b): | |
| raise Exception("Velocity cannot be complex") | |
| l = [len(a), len(b)] | |
| if l[0] == l[1]: | |
| return cpsd_1D(a, b, nfft, fs, window=window) | |
| elif l[0] > l[1]: | |
| a, b = b, a | |
| l = l[::-1] | |
| step = [0, 0] | |
| step[0], nens, nfft = _stepsize(l[0], nfft) | |
| step[1], nens, nfft = _stepsize(l[1], nfft, nens=nens) | |
| fs = np.float64(fs) | |
| window = _getwindow(window, nfft) | |
| fft_inds = slice(1, int(nfft / 2. + 1)) | |
| wght = 2. / (window ** 2).sum() | |
| pwr = fft(detrend_array(a[0:nfft]) * window)[fft_inds] * \ | |
| np.conj(fft(detrend_array(b[0:nfft]) * window)[fft_inds]) | |
| if nens - 1: | |
| for i1, i2 in zip(range(step[0], l[0] - nfft + 1, step[0]), | |
| range(step[1], l[1] - nfft + 1, step[1])): | |
| pwr += fft(detrend_array(a[i1:(i1 + nfft)]) * window)[fft_inds] * \ | |
| np.conj( | |
| fft(detrend_array(b[i2:(i2 + nfft)]) * window)[fft_inds]) | |
| pwr *= wght / nens / fs | |
| return pwr |
However _getwindow has not implemented that logic:
MHKiT-Python/mhkit/dolfyn/tools/fft.py
Lines 34 to 41 in 2ccc286
| def _getwindow(window, nfft): | |
| if window == 'hann': | |
| window = np.hanning(nfft) | |
| elif window == 'hamm': | |
| window = np.hamming(nfft) | |
| elif window is None or window == 1: | |
| window = np.ones(nfft) | |
| return window |
Test Case Issue:
The following test will fail when I pass a custom window.
def test_cpsd_quasisync_1D(self):
fs = 1000 # Sample rate
nfft = 512 # Number of points in the fft
# Test with signals of same length
a = np.random.normal(0, 1, 1000)
b = np.random.normal(0, 1, 1000)
# Test with a custom window
custom_window = np.hamming(nfft)
cpsd = tools.fft.cpsd_quasisync_1D(a, b, nfft, fs, window=custom_window)
self.assertEqual(cpsd.shape, (nfft // 2,))
Proposed solution
I think _getwindow simply needs a check to make sure the passed window is within bounds and then return the custom window
def _getwindow(window, nfft):
# Check if the window is a numpy array
if isinstance(window, np.ndarray):
# Check if the length of the custom window matches nfft
if len(window) != nfft:
raise ValueError("Custom window length must be equal to nfft")
return window
# Existing conditions for string-specified windows
elif window == "hann":
window = np.hanning(nfft)
elif window == "hamm":
window = np.hamming(nfft)
elif window is None or window == 1:
window = np.ones(nfft)
return window