|
| 1 | +""" |
| 2 | + The Horn-Schunck method estimates the optical flow for every single pixel of |
| 3 | + a sequence of images. |
| 4 | + It works by assuming brightness constancy between two consecutive frames |
| 5 | + and smoothness in the optical flow. |
| 6 | +
|
| 7 | + Useful resources: |
| 8 | + Wikipedia: https://en.wikipedia.org/wiki/Horn%E2%80%93Schunck_method |
| 9 | + Paper: http://image.diku.dk/imagecanon/material/HornSchunckOptical_Flow.pdf |
| 10 | +""" |
| 11 | + |
| 12 | +import numpy as np |
| 13 | +from scipy.ndimage.filters import convolve |
| 14 | +from typing_extensions import SupportsIndex |
| 15 | + |
| 16 | + |
| 17 | +def warp( |
| 18 | + image: np.ndarray, horizontal_flow: np.ndarray, vertical_flow: np.ndarray |
| 19 | +) -> np.ndarray: |
| 20 | + """ |
| 21 | + Warps the pixels of an image into a new image using the horizontal and vertical |
| 22 | + flows. |
| 23 | + Pixels that are warped from an invalid location are set to 0. |
| 24 | +
|
| 25 | + Parameters: |
| 26 | + image: Grayscale image |
| 27 | + horizontal_flow: Horizontal flow |
| 28 | + vertical_flow: Vertical flow |
| 29 | +
|
| 30 | + Returns: Warped image |
| 31 | +
|
| 32 | + >>> warp(np.array([[0, 1, 2], [0, 3, 0], [2, 2, 2]]), \ |
| 33 | + np.array([[0, 1, -1], [-1, 0, 0], [1, 1, 1]]), \ |
| 34 | + np.array([[0, 0, 0], [0, 1, 0], [0, 0, 1]])) |
| 35 | + array([[0, 0, 0], |
| 36 | + [3, 1, 0], |
| 37 | + [0, 2, 3]]) |
| 38 | + """ |
| 39 | + flow = np.stack((horizontal_flow, vertical_flow), 2) |
| 40 | + |
| 41 | + # Create a grid of all pixel coordinates and subtract the flow to get the |
| 42 | + # target pixels coordinates |
| 43 | + grid = np.stack( |
| 44 | + np.meshgrid(np.arange(0, image.shape[1]), np.arange(0, image.shape[0])), 2 |
| 45 | + ) |
| 46 | + grid = np.round(grid - flow).astype(np.int32) |
| 47 | + |
| 48 | + # Find the locations outside of the original image |
| 49 | + invalid = (grid < 0) | (grid >= np.array([image.shape[1], image.shape[0]])) |
| 50 | + grid[invalid] = 0 |
| 51 | + |
| 52 | + warped = image[grid[:, :, 1], grid[:, :, 0]] |
| 53 | + |
| 54 | + # Set pixels at invalid locations to 0 |
| 55 | + warped[invalid[:, :, 0] | invalid[:, :, 1]] = 0 |
| 56 | + |
| 57 | + return warped |
| 58 | + |
| 59 | + |
| 60 | +def horn_schunck( |
| 61 | + image0: np.ndarray, |
| 62 | + image1: np.ndarray, |
| 63 | + num_iter: SupportsIndex, |
| 64 | + alpha: float | None = None, |
| 65 | +) -> tuple[np.ndarray, np.ndarray]: |
| 66 | + """ |
| 67 | + This function performs the Horn-Schunck algorithm and returns the estimated |
| 68 | + optical flow. It is assumed that the input images are grayscale and |
| 69 | + normalized to be in [0, 1]. |
| 70 | +
|
| 71 | + Parameters: |
| 72 | + image0: First image of the sequence |
| 73 | + image1: Second image of the sequence |
| 74 | + alpha: Regularization constant |
| 75 | + num_iter: Number of iterations performed |
| 76 | +
|
| 77 | + Returns: estimated horizontal & vertical flow |
| 78 | +
|
| 79 | + >>> np.round(horn_schunck(np.array([[0, 0, 2], [0, 0, 2]]), \ |
| 80 | + np.array([[0, 2, 0], [0, 2, 0]]), alpha=0.1, num_iter=110)).\ |
| 81 | + astype(np.int32) |
| 82 | + array([[[ 0, -1, -1], |
| 83 | + [ 0, -1, -1]], |
| 84 | + <BLANKLINE> |
| 85 | + [[ 0, 0, 0], |
| 86 | + [ 0, 0, 0]]], dtype=int32) |
| 87 | + """ |
| 88 | + if alpha is None: |
| 89 | + alpha = 0.1 |
| 90 | + |
| 91 | + # Initialize flow |
| 92 | + horizontal_flow = np.zeros_like(image0) |
| 93 | + vertical_flow = np.zeros_like(image0) |
| 94 | + |
| 95 | + # Prepare kernels for the calculation of the derivatives and the average velocity |
| 96 | + kernel_x = np.array([[-1, 1], [-1, 1]]) * 0.25 |
| 97 | + kernel_y = np.array([[-1, -1], [1, 1]]) * 0.25 |
| 98 | + kernel_t = np.array([[1, 1], [1, 1]]) * 0.25 |
| 99 | + kernel_laplacian = np.array( |
| 100 | + [[1 / 12, 1 / 6, 1 / 12], [1 / 6, 0, 1 / 6], [1 / 12, 1 / 6, 1 / 12]] |
| 101 | + ) |
| 102 | + |
| 103 | + # Iteratively refine the flow |
| 104 | + for _ in range(num_iter): |
| 105 | + warped_image = warp(image0, horizontal_flow, vertical_flow) |
| 106 | + derivative_x = convolve(warped_image, kernel_x) + convolve(image1, kernel_x) |
| 107 | + derivative_y = convolve(warped_image, kernel_y) + convolve(image1, kernel_y) |
| 108 | + derivative_t = convolve(warped_image, kernel_t) + convolve(image1, -kernel_t) |
| 109 | + |
| 110 | + avg_horizontal_velocity = convolve(horizontal_flow, kernel_laplacian) |
| 111 | + avg_vertical_velocity = convolve(vertical_flow, kernel_laplacian) |
| 112 | + |
| 113 | + # This updates the flow as proposed in the paper (Step 12) |
| 114 | + update = ( |
| 115 | + derivative_x * avg_horizontal_velocity |
| 116 | + + derivative_y * avg_vertical_velocity |
| 117 | + + derivative_t |
| 118 | + ) |
| 119 | + update = update / (alpha**2 + derivative_x**2 + derivative_y**2) |
| 120 | + |
| 121 | + horizontal_flow = avg_horizontal_velocity - derivative_x * update |
| 122 | + vertical_flow = avg_vertical_velocity - derivative_y * update |
| 123 | + |
| 124 | + return horizontal_flow, vertical_flow |
| 125 | + |
| 126 | + |
| 127 | +if __name__ == "__main__": |
| 128 | + import doctest |
| 129 | + |
| 130 | + doctest.testmod() |
0 commit comments