diff --git a/torch_np/__init__.py b/torch_np/__init__.py index 4690798a..8ea1bf3c 100644 --- a/torch_np/__init__.py +++ b/torch_np/__init__.py @@ -1,4 +1,4 @@ -from . import linalg, random +from . import fft, linalg, random from ._dtypes import * from ._funcs import * from ._getlimits import errstate, finfo, iinfo diff --git a/torch_np/fft.py b/torch_np/fft.py new file mode 100644 index 00000000..b6bb763c --- /dev/null +++ b/torch_np/fft.py @@ -0,0 +1,30 @@ +def fft(): + raise NotImplementedError + + +def ifft(): + raise NotImplementedError + + +def fftn(): + raise NotImplementedError + + +def ifftn(): + raise NotImplementedError + + +def rfftn(): + raise NotImplementedError + + +def irfftn(): + raise NotImplementedError + + +def fft2(): + raise NotImplementedError + + +def ifft2(): + raise NotImplementedError diff --git a/torch_np/tests/numpy_tests/fft/test_helper.py b/torch_np/tests/numpy_tests/fft/test_helper.py new file mode 100644 index 00000000..41650354 --- /dev/null +++ b/torch_np/tests/numpy_tests/fft/test_helper.py @@ -0,0 +1,172 @@ +"""Test functions for fftpack.helper module + +Copied from fftpack.helper by Pearu Peterson, October 2005 + +""" +import torch_np as np +from torch_np.testing import assert_array_almost_equal +from torch_np import fft, pi + +import pytest + +@pytest.mark.xfail(reason="TODO") +class TestFFTShift: + + def test_definition(self): + x = [0, 1, 2, 3, 4, -4, -3, -2, -1] + y = [-4, -3, -2, -1, 0, 1, 2, 3, 4] + assert_array_almost_equal(fft.fftshift(x), y) + assert_array_almost_equal(fft.ifftshift(y), x) + x = [0, 1, 2, 3, 4, -5, -4, -3, -2, -1] + y = [-5, -4, -3, -2, -1, 0, 1, 2, 3, 4] + assert_array_almost_equal(fft.fftshift(x), y) + assert_array_almost_equal(fft.ifftshift(y), x) + + def test_inverse(self): + for n in [1, 4, 9, 100, 211]: + x = np.random.random((n,)) + assert_array_almost_equal(fft.ifftshift(fft.fftshift(x)), x) + + def test_axes_keyword(self): + freqs = [[0, 1, 2], [3, 4, -4], [-3, -2, -1]] + shifted = [[-1, -3, -2], [2, 0, 1], [-4, 3, 4]] + assert_array_almost_equal(fft.fftshift(freqs, axes=(0, 1)), shifted) + assert_array_almost_equal(fft.fftshift(freqs, axes=0), + fft.fftshift(freqs, axes=(0,))) + assert_array_almost_equal(fft.ifftshift(shifted, axes=(0, 1)), freqs) + assert_array_almost_equal(fft.ifftshift(shifted, axes=0), + fft.ifftshift(shifted, axes=(0,))) + + assert_array_almost_equal(fft.fftshift(freqs), shifted) + assert_array_almost_equal(fft.ifftshift(shifted), freqs) + + def test_uneven_dims(self): + """ Test 2D input, which has uneven dimension sizes """ + freqs = [ + [0, 1], + [2, 3], + [4, 5] + ] + + # shift in dimension 0 + shift_dim0 = [ + [4, 5], + [0, 1], + [2, 3] + ] + assert_array_almost_equal(fft.fftshift(freqs, axes=0), shift_dim0) + assert_array_almost_equal(fft.ifftshift(shift_dim0, axes=0), freqs) + assert_array_almost_equal(fft.fftshift(freqs, axes=(0,)), shift_dim0) + assert_array_almost_equal(fft.ifftshift(shift_dim0, axes=[0]), freqs) + + # shift in dimension 1 + shift_dim1 = [ + [1, 0], + [3, 2], + [5, 4] + ] + assert_array_almost_equal(fft.fftshift(freqs, axes=1), shift_dim1) + assert_array_almost_equal(fft.ifftshift(shift_dim1, axes=1), freqs) + + # shift in both dimensions + shift_dim_both = [ + [5, 4], + [1, 0], + [3, 2] + ] + assert_array_almost_equal(fft.fftshift(freqs, axes=(0, 1)), shift_dim_both) + assert_array_almost_equal(fft.ifftshift(shift_dim_both, axes=(0, 1)), freqs) + assert_array_almost_equal(fft.fftshift(freqs, axes=[0, 1]), shift_dim_both) + assert_array_almost_equal(fft.ifftshift(shift_dim_both, axes=[0, 1]), freqs) + + # axes=None (default) shift in all dimensions + assert_array_almost_equal(fft.fftshift(freqs, axes=None), shift_dim_both) + assert_array_almost_equal(fft.ifftshift(shift_dim_both, axes=None), freqs) + assert_array_almost_equal(fft.fftshift(freqs), shift_dim_both) + assert_array_almost_equal(fft.ifftshift(shift_dim_both), freqs) + + def test_equal_to_original(self): + """ Test that the new (>=v1.15) implementation (see #10073) is equal to the original (<=v1.14) """ + from numpy.core import asarray, concatenate, arange, take + + def original_fftshift(x, axes=None): + """ How fftshift was implemented in v1.14""" + tmp = asarray(x) + ndim = tmp.ndim + if axes is None: + axes = list(range(ndim)) + elif isinstance(axes, int): + axes = (axes,) + y = tmp + for k in axes: + n = tmp.shape[k] + p2 = (n + 1) // 2 + mylist = concatenate((arange(p2, n), arange(p2))) + y = take(y, mylist, k) + return y + + def original_ifftshift(x, axes=None): + """ How ifftshift was implemented in v1.14 """ + tmp = asarray(x) + ndim = tmp.ndim + if axes is None: + axes = list(range(ndim)) + elif isinstance(axes, int): + axes = (axes,) + y = tmp + for k in axes: + n = tmp.shape[k] + p2 = n - (n + 1) // 2 + mylist = concatenate((arange(p2, n), arange(p2))) + y = take(y, mylist, k) + return y + + # create possible 2d array combinations and try all possible keywords + # compare output to original functions + for i in range(16): + for j in range(16): + for axes_keyword in [0, 1, None, (0,), (0, 1)]: + inp = np.random.rand(i, j) + + assert_array_almost_equal(fft.fftshift(inp, axes_keyword), + original_fftshift(inp, axes_keyword)) + + assert_array_almost_equal(fft.ifftshift(inp, axes_keyword), + original_ifftshift(inp, axes_keyword)) + + +@pytest.mark.xfail(reason="TODO") +class TestFFTFreq: + + def test_definition(self): + x = [0, 1, 2, 3, 4, -4, -3, -2, -1] + assert_array_almost_equal(9*fft.fftfreq(9), x) + assert_array_almost_equal(9*pi*fft.fftfreq(9, pi), x) + x = [0, 1, 2, 3, 4, -5, -4, -3, -2, -1] + assert_array_almost_equal(10*fft.fftfreq(10), x) + assert_array_almost_equal(10*pi*fft.fftfreq(10, pi), x) + + +@pytest.mark.xfail(reason="TODO") +class TestRFFTFreq: + + def test_definition(self): + x = [0, 1, 2, 3, 4] + assert_array_almost_equal(9*fft.rfftfreq(9), x) + assert_array_almost_equal(9*pi*fft.rfftfreq(9, pi), x) + x = [0, 1, 2, 3, 4, 5] + assert_array_almost_equal(10*fft.rfftfreq(10), x) + assert_array_almost_equal(10*pi*fft.rfftfreq(10, pi), x) + + +@pytest.mark.xfail(reason="TODO") +class TestIRFFTN: + + def test_not_last_axis_success(self): + ar, ai = np.random.random((2, 16, 8, 32)) + a = ar + 1j*ai + + axes = (-2,) + + # Should not raise error + fft.irfftn(a, axes=axes) diff --git a/torch_np/tests/numpy_tests/fft/test_pocketfft.py b/torch_np/tests/numpy_tests/fft/test_pocketfft.py new file mode 100644 index 00000000..e5b4c770 --- /dev/null +++ b/torch_np/tests/numpy_tests/fft/test_pocketfft.py @@ -0,0 +1,315 @@ +import torch_np as np +import pytest +from pytest import raises as assert_raises + +from torch_np.random import random +from torch_np.testing import ( + assert_array_equal, assert_allclose #, IS_WASM + ) +import threading +import queue + +IS_WASM = False + + +def fft1(x): + L = len(x) + phase = -2j * np.pi * (np.arange(L) / L) + phase = np.arange(L).reshape(-1, 1) * phase + return np.sum(x*np.exp(phase), axis=1) + + +@pytest.mark.xfail(reason='TODO') +class TestFFTShift: + + def test_fft_n(self): + assert_raises(ValueError, np.fft.fft, [1, 2, 3], 0) + + +@pytest.mark.xfail(reason='TODO') +class TestFFT1D: + + def test_identity(self): + maxlen = 512 + x = random(maxlen) + 1j*random(maxlen) + xr = random(maxlen) + for i in range(1, maxlen): + assert_allclose(np.fft.ifft(np.fft.fft(x[0:i])), x[0:i], + atol=1e-12) + assert_allclose(np.fft.irfft(np.fft.rfft(xr[0:i]), i), + xr[0:i], atol=1e-12) + + def test_fft(self): + x = random(30) + 1j*random(30) + assert_allclose(fft1(x), np.fft.fft(x), atol=1e-6) + assert_allclose(fft1(x), np.fft.fft(x, norm="backward"), atol=1e-6) + assert_allclose(fft1(x) / np.sqrt(30), + np.fft.fft(x, norm="ortho"), atol=1e-6) + assert_allclose(fft1(x) / 30., + np.fft.fft(x, norm="forward"), atol=1e-6) + + @pytest.mark.parametrize('norm', (None, 'backward', 'ortho', 'forward')) + def test_ifft(self, norm): + x = random(30) + 1j*random(30) + assert_allclose( + x, np.fft.ifft(np.fft.fft(x, norm=norm), norm=norm), + atol=1e-6) + # Ensure we get the correct error message + with pytest.raises(ValueError, + match='Invalid number of FFT data points'): + np.fft.ifft([], norm=norm) + + def test_fft2(self): + x = random((30, 20)) + 1j*random((30, 20)) + assert_allclose(np.fft.fft(np.fft.fft(x, axis=1), axis=0), + np.fft.fft2(x), atol=1e-6) + assert_allclose(np.fft.fft2(x), + np.fft.fft2(x, norm="backward"), atol=1e-6) + assert_allclose(np.fft.fft2(x) / np.sqrt(30 * 20), + np.fft.fft2(x, norm="ortho"), atol=1e-6) + assert_allclose(np.fft.fft2(x) / (30. * 20.), + np.fft.fft2(x, norm="forward"), atol=1e-6) + + def test_ifft2(self): + x = random((30, 20)) + 1j*random((30, 20)) + assert_allclose(np.fft.ifft(np.fft.ifft(x, axis=1), axis=0), + np.fft.ifft2(x), atol=1e-6) + assert_allclose(np.fft.ifft2(x), + np.fft.ifft2(x, norm="backward"), atol=1e-6) + assert_allclose(np.fft.ifft2(x) * np.sqrt(30 * 20), + np.fft.ifft2(x, norm="ortho"), atol=1e-6) + assert_allclose(np.fft.ifft2(x) * (30. * 20.), + np.fft.ifft2(x, norm="forward"), atol=1e-6) + + def test_fftn(self): + x = random((30, 20, 10)) + 1j*random((30, 20, 10)) + assert_allclose( + np.fft.fft(np.fft.fft(np.fft.fft(x, axis=2), axis=1), axis=0), + np.fft.fftn(x), atol=1e-6) + assert_allclose(np.fft.fftn(x), + np.fft.fftn(x, norm="backward"), atol=1e-6) + assert_allclose(np.fft.fftn(x) / np.sqrt(30 * 20 * 10), + np.fft.fftn(x, norm="ortho"), atol=1e-6) + assert_allclose(np.fft.fftn(x) / (30. * 20. * 10.), + np.fft.fftn(x, norm="forward"), atol=1e-6) + + def test_ifftn(self): + x = random((30, 20, 10)) + 1j*random((30, 20, 10)) + assert_allclose( + np.fft.ifft(np.fft.ifft(np.fft.ifft(x, axis=2), axis=1), axis=0), + np.fft.ifftn(x), atol=1e-6) + assert_allclose(np.fft.ifftn(x), + np.fft.ifftn(x, norm="backward"), atol=1e-6) + assert_allclose(np.fft.ifftn(x) * np.sqrt(30 * 20 * 10), + np.fft.ifftn(x, norm="ortho"), atol=1e-6) + assert_allclose(np.fft.ifftn(x) * (30. * 20. * 10.), + np.fft.ifftn(x, norm="forward"), atol=1e-6) + + def test_rfft(self): + x = random(30) + for n in [x.size, 2*x.size]: + for norm in [None, 'backward', 'ortho', 'forward']: + assert_allclose( + np.fft.fft(x, n=n, norm=norm)[:(n//2 + 1)], + np.fft.rfft(x, n=n, norm=norm), atol=1e-6) + assert_allclose( + np.fft.rfft(x, n=n), + np.fft.rfft(x, n=n, norm="backward"), atol=1e-6) + assert_allclose( + np.fft.rfft(x, n=n) / np.sqrt(n), + np.fft.rfft(x, n=n, norm="ortho"), atol=1e-6) + assert_allclose( + np.fft.rfft(x, n=n) / n, + np.fft.rfft(x, n=n, norm="forward"), atol=1e-6) + + def test_irfft(self): + x = random(30) + assert_allclose(x, np.fft.irfft(np.fft.rfft(x)), atol=1e-6) + assert_allclose(x, np.fft.irfft(np.fft.rfft(x, norm="backward"), + norm="backward"), atol=1e-6) + assert_allclose(x, np.fft.irfft(np.fft.rfft(x, norm="ortho"), + norm="ortho"), atol=1e-6) + assert_allclose(x, np.fft.irfft(np.fft.rfft(x, norm="forward"), + norm="forward"), atol=1e-6) + + def test_rfft2(self): + x = random((30, 20)) + assert_allclose(np.fft.fft2(x)[:, :11], np.fft.rfft2(x), atol=1e-6) + assert_allclose(np.fft.rfft2(x), + np.fft.rfft2(x, norm="backward"), atol=1e-6) + assert_allclose(np.fft.rfft2(x) / np.sqrt(30 * 20), + np.fft.rfft2(x, norm="ortho"), atol=1e-6) + assert_allclose(np.fft.rfft2(x) / (30. * 20.), + np.fft.rfft2(x, norm="forward"), atol=1e-6) + + def test_irfft2(self): + x = random((30, 20)) + assert_allclose(x, np.fft.irfft2(np.fft.rfft2(x)), atol=1e-6) + assert_allclose(x, np.fft.irfft2(np.fft.rfft2(x, norm="backward"), + norm="backward"), atol=1e-6) + assert_allclose(x, np.fft.irfft2(np.fft.rfft2(x, norm="ortho"), + norm="ortho"), atol=1e-6) + assert_allclose(x, np.fft.irfft2(np.fft.rfft2(x, norm="forward"), + norm="forward"), atol=1e-6) + + def test_rfftn(self): + x = random((30, 20, 10)) + assert_allclose(np.fft.fftn(x)[:, :, :6], np.fft.rfftn(x), atol=1e-6) + assert_allclose(np.fft.rfftn(x), + np.fft.rfftn(x, norm="backward"), atol=1e-6) + assert_allclose(np.fft.rfftn(x) / np.sqrt(30 * 20 * 10), + np.fft.rfftn(x, norm="ortho"), atol=1e-6) + assert_allclose(np.fft.rfftn(x) / (30. * 20. * 10.), + np.fft.rfftn(x, norm="forward"), atol=1e-6) + + def test_irfftn(self): + x = random((30, 20, 10)) + assert_allclose(x, np.fft.irfftn(np.fft.rfftn(x)), atol=1e-6) + assert_allclose(x, np.fft.irfftn(np.fft.rfftn(x, norm="backward"), + norm="backward"), atol=1e-6) + assert_allclose(x, np.fft.irfftn(np.fft.rfftn(x, norm="ortho"), + norm="ortho"), atol=1e-6) + assert_allclose(x, np.fft.irfftn(np.fft.rfftn(x, norm="forward"), + norm="forward"), atol=1e-6) + + def test_hfft(self): + x = random(14) + 1j*random(14) + x_herm = np.concatenate((random(1), x, random(1))) + x = np.concatenate((x_herm, x[::-1].conj())) + assert_allclose(np.fft.fft(x), np.fft.hfft(x_herm), atol=1e-6) + assert_allclose(np.fft.hfft(x_herm), + np.fft.hfft(x_herm, norm="backward"), atol=1e-6) + assert_allclose(np.fft.hfft(x_herm) / np.sqrt(30), + np.fft.hfft(x_herm, norm="ortho"), atol=1e-6) + assert_allclose(np.fft.hfft(x_herm) / 30., + np.fft.hfft(x_herm, norm="forward"), atol=1e-6) + + def test_ihfft(self): + x = random(14) + 1j*random(14) + x_herm = np.concatenate((random(1), x, random(1))) + x = np.concatenate((x_herm, x[::-1].conj())) + assert_allclose(x_herm, np.fft.ihfft(np.fft.hfft(x_herm)), atol=1e-6) + assert_allclose(x_herm, np.fft.ihfft(np.fft.hfft(x_herm, + norm="backward"), norm="backward"), atol=1e-6) + assert_allclose(x_herm, np.fft.ihfft(np.fft.hfft(x_herm, + norm="ortho"), norm="ortho"), atol=1e-6) + assert_allclose(x_herm, np.fft.ihfft(np.fft.hfft(x_herm, + norm="forward"), norm="forward"), atol=1e-6) + + @pytest.mark.parametrize("op", [np.fft.fftn, np.fft.ifftn, + np.fft.rfftn, np.fft.irfftn]) + def test_axes(self, op): + x = random((30, 20, 10)) + axes = [(0, 1, 2), (0, 2, 1), (1, 0, 2), (1, 2, 0), (2, 0, 1), (2, 1, 0)] + for a in axes: + op_tr = op(np.transpose(x, a)) + tr_op = np.transpose(op(x, axes=a), a) + assert_allclose(op_tr, tr_op, atol=1e-6) + + def test_all_1d_norm_preserving(self): + # verify that round-trip transforms are norm-preserving + x = random(30) + x_norm = np.linalg.norm(x) + n = x.size * 2 + func_pairs = [(np.fft.fft, np.fft.ifft), + (np.fft.rfft, np.fft.irfft), + # hfft: order so the first function takes x.size samples + # (necessary for comparison to x_norm above) + (np.fft.ihfft, np.fft.hfft), + ] + for forw, back in func_pairs: + for n in [x.size, 2*x.size]: + for norm in [None, 'backward', 'ortho', 'forward']: + tmp = forw(x, n=n, norm=norm) + tmp = back(tmp, n=n, norm=norm) + assert_allclose(x_norm, + np.linalg.norm(tmp), atol=1e-6) + + @pytest.mark.parametrize("dtype", [np.half, np.single, np.double]) + def test_dtypes(self, dtype): + # make sure that all input precisions are accepted and internally + # converted to 64bit + x = random(30).astype(dtype) + assert_allclose(np.fft.ifft(np.fft.fft(x)), x, atol=1e-6) + assert_allclose(np.fft.irfft(np.fft.rfft(x)), x, atol=1e-6) + + +@pytest.mark.xfail(reason='TODO') +@pytest.mark.parametrize( + "dtype", + [np.float32, np.float64, np.complex64, np.complex128]) +@pytest.mark.parametrize("order", ["F", 'non-contiguous']) +@pytest.mark.parametrize( + "fft", + [np.fft.fft, np.fft.fft2, np.fft.fftn, + np.fft.ifft, np.fft.ifft2, np.fft.ifftn]) +def test_fft_with_order(dtype, order, fft): + # Check that FFT/IFFT produces identical results for C, Fortran and + # non contiguous arrays + rng = np.random.RandomState(42) + X = rng.rand(8, 7, 13).astype(dtype, copy=False) + # See discussion in pull/14178 + _tol = 8.0 * np.sqrt(np.log2(X.size)) * np.finfo(X.dtype).eps + if order == 'F': + Y = np.asfortranarray(X) + else: + # Make a non contiguous array + Y = X[::-1] + X = np.ascontiguousarray(X[::-1]) + + if fft.__name__.endswith('fft'): + for axis in range(3): + X_res = fft(X, axis=axis) + Y_res = fft(Y, axis=axis) + assert_allclose(X_res, Y_res, atol=_tol, rtol=_tol) + elif fft.__name__.endswith(('fft2', 'fftn')): + axes = [(0, 1), (1, 2), (0, 2)] + if fft.__name__.endswith('fftn'): + axes.extend([(0,), (1,), (2,), None]) + for ax in axes: + X_res = fft(X, axes=ax) + Y_res = fft(Y, axes=ax) + assert_allclose(X_res, Y_res, atol=_tol, rtol=_tol) + else: + raise ValueError() + + +@pytest.mark.xfail(reason='TODO') +@pytest.mark.skipif(IS_WASM, reason="Cannot start thread") +class TestFFTThreadSafe: + threads = 16 + input_shape = (800, 200) + + def _test_mtsame(self, func, *args): + def worker(args, q): + q.put(func(*args)) + + q = queue.Queue() + expected = func(*args) + + # Spin off a bunch of threads to call the same function simultaneously + t = [threading.Thread(target=worker, args=(args, q)) + for i in range(self.threads)] + [x.start() for x in t] + + [x.join() for x in t] + # Make sure all threads returned the correct value + for i in range(self.threads): + assert_array_equal(q.get(timeout=5), expected, + 'Function returned wrong value in multithreaded context') + + def test_fft(self): + a = np.ones(self.input_shape) * 1+0j + self._test_mtsame(np.fft.fft, a) + + def test_ifft(self): + a = np.ones(self.input_shape) * 1+0j + self._test_mtsame(np.fft.ifft, a) + + def test_rfft(self): + a = np.ones(self.input_shape) + self._test_mtsame(np.fft.rfft, a) + + def test_irfft(self): + a = np.ones(self.input_shape) * 1+0j + self._test_mtsame(np.fft.irfft, a)