Source code for pyspedas.tplot_tools.tplot_math.dpwrspc

"""
Compute power spectra for data.

Notes
-----
Similar to dpwrspc.pro in IDL SPEDAS.

"""

import logging
import numpy as np


[docs] def dpwrspc( time, quantity, nboxpoints=256, nshiftpoints=128, bin=3, tbegin=-1.0, tend=-1.0, noline=False, nohanning=False, notperhz=False, notmvariance=False, tm_sensitivity=None, ): """ Compute power spectra. Parameters ---------- time: list of float Time array. quantity: list of float Data array. nboxpoints: int, optional The number of points to use for the hanning window. The default is 256. nshiftpoints: int, optional The number of points to shift for each spectrum. The default is 128. bin: int, optional Size for binning of the data along the frequency domain. The default is 3. tbegin: float, optional Start time for the calculation. If -1.0, the start time is the first time in the time array. tend: float, optional End time for the calculation. If -1.0, the end time is the last time in the time array. noline: bool, optional If True, no straight line is subtracted. The default is False. nohanning: bool, optional If True, no hanning window is applied to the input. The default is False. notperhz: bool, optional If True, the output units are the square of the input units. The default is False. notmvariance: bool, optional If True, replace output spectrum for any windows that have variable cadence with NaNs. The default is False. tm_sensitivity: float, optional If noTmVariance is set, this number controls how much of a dt anomaly is accepted. The default is None. Returns ------- tuple tdps: array of float The time array for the dynamic power spectrum, the center time of the interval used for the spectrum. fdps: array of float The frequency array (units =1/time units). dps: array of float The power spectrum, (units of quantity)^2/frequency_units. Examples -------- >>> # Compute the power spectrum of a given time series >>> import numpy as np >>> time = range(3000) >>> quantity = np.random.random(3000) >>> power = pyspedas.dpwrspc(time, quantity) """ tdps, fdps, dps = np.array(-1.0), np.array(-1.0), np.array(-1.0) tdps0, fdps0, dps0 = np.array(-1.0), np.array(-1.0), np.array(-1.0) if nohanning is False: window = np.array(np.hanning(nboxpoints), dtype=np.float64) else: window = np.ones(nboxpoints) time = np.array(time, dtype=np.float64) quantity = np.array(quantity, dtype=np.float64) if tbegin == -1.0: tbegin = time[0] if tend == -1.0: tend = time[-1] # Find the time array that correspond to the start and end times tbegin_idx = np.where(time >= tbegin)[0][0] tend_idx = np.where(time <= tend)[0][-1] if tend_idx - tbegin_idx < nboxpoints: logging.error("Not enough points for a calculation") return tdps0, fdps0, dps0 time = time[tbegin_idx : tend_idx + 1] quantity = quantity[tbegin_idx : tend_idx + 1] # remove NaNs from the data where_finite = np.where(np.isnan(quantity) == False) quantity2process = quantity[where_finite[0]] times2process = time[where_finite[0]] nboxpnts = int(nboxpoints) nshiftpnts = nshiftpoints totalpoints = len(times2process) nspectra = int((totalpoints - nboxpnts / 2.0) / nshiftpnts) # test nspectra, if the value of nshiftpnts is much smaller than # nboxpnts/2 strange things happen nbegin = np.array([nshiftpnts * i for i in range(nspectra)], dtype=np.int64) nend = nbegin + nboxpnts okspec = np.where(nend <= totalpoints - 1) if len(okspec[0]) <= 0: logging.error("Not enough points for a calculation") return tdps0, fdps0, dps0 else: nspectra = len(okspec[0]) tdps = np.zeros(nspectra) nfreqs = int(int(nboxpnts / 2) / bin) if nfreqs <= 1: logging.error("Not enough frequencies for a calculation") return tdps0, fdps0, dps0 dps = np.zeros([nspectra, nfreqs]) fdps = np.zeros([nspectra, nfreqs]) # Main calculation loop for nthspectrum in range(0, nspectra): nbegin = int(nthspectrum * nshiftpnts) nend = nbegin + nboxpnts if nend <= totalpoints: t = times2process[nbegin:nend] t0 = t[0] t = t - t0 x = quantity2process[nbegin:nend] # Use center time tdps[nthspectrum] = (times2process[nbegin] + times2process[nend - 1]) / 2.0 if noline is False: coef = np.polyfit(t, x, 1) poly1d_fn = np.poly1d(coef) line = poly1d_fn(t) x = x - line if nohanning is False: x = x * window bign = nboxpnts if bign % 2 != 0: logging.warning( "dpwrspc: needs an even number of data points, dropping last point..." ) t = t[0 : bign - 1] x = x[0 : bign - 1] bign = bign - 1 n_tm = len(t) # time variance can break power spectrum # this keyword skips over those gaps if notmvariance and n_tm > 1: if tm_sensitivity is not None: tmsn = tm_sensitivity else: tmsn = 100.0 tdiff = t[1:n_tm] - t[0 : n_tm - 1] med_diff = np.median(tdiff) idx = np.where(np.abs(tdiff / med_diff - 1) > 1.0 / tmsn) if len(idx[0]) > 0: dps[nthspectrum, :] = float("nan") fdps[nthspectrum, :] = float("nan") continue # following Numerical recipes in Fortran, p. 421, sort of... # (actually following the IDL implementation) k = np.array(range(int(bign / 2) + 1)) tres = np.median(t[1 : len(t)] - t[0 : len(t) - 1]) fk = k / (bign * tres) xs2 = np.abs(np.fft.fft(x)) ** 2 pwr = np.zeros(int(bign / 2 + 1)) pwr[0] = xs2[0] / bign**2 ptmp = 1 + np.array(range(int(bign / 2 - 1))) pwr[1 : int(bign / 2)] = ( xs2[1 : int(bign / 2)] + xs2[bign - ptmp] ) / bign**2 pwr[int(bign / 2)] = xs2[int(bign / 2)] / bign**2 if nohanning is False: wss = float(bign) * float(np.sum(window**2)) pwr = bign**2 * pwr / wss dfreq = bin * (fk[1] - fk[0]) npwr = len(pwr) - 1 nfinal = int(npwr / bin) iarray = np.array(range(nfinal)) power = np.zeros(nfinal) # Note: zeroth point includes zero freq. power. freqcenter = (fk[iarray * bin + 1] + fk[iarray * bin + bin]) / 2.0 for i in range(bin): power = power + pwr[iarray * bin + i + 1] if notperhz is False: power = power / dfreq dps[nthspectrum, :] = power fdps[nthspectrum, :] = freqcenter return tdps, fdps, dps