Skip to content

Added implementation of the Horn-Schunck algorithm #5333

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
May 2, 2022
Merged
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
112 changes: 112 additions & 0 deletions computer_vision/horn_schunck.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
"""
The Horn-Schunck method estimates the optical flow for every single pixel of
a sequence of images.
It works by assuming brightness constancy between two consecutive frames
and smoothness in the optical flow.

Useful resources:
Wikipedia: https://en.wikipedia.org/wiki/Horn%E2%80%93Schunck_method
Paper: http://image.diku.dk/imagecanon/material/HornSchunckOptical_Flow.pdf
"""

from typing import Optional

import numpy as np
from scipy.ndimage.filters import convolve
from typing_extensions import SupportsIndex


def warp(
image: np.ndarray, horizontal_flow: np.ndarray, vertical_flow: np.ndarray
) -> np.ndarray:
"""
Warps the pixels of an image into a new image using the horizontal and vertical
flows.
Pixels that are warped from an invalid location are set to 0.

Parameters:
image: Grayscale image
horizontal_flow: Horizontal flow
vertical_flow: Vertical flow

Returns: Warped image

"""
flow = np.stack((horizontal_flow, vertical_flow), 2)

# Create a grid of all pixel coordinates and subtract the offsets
grid = np.stack(
np.meshgrid(np.arange(0, image.shape[1]), np.arange(0, image.shape[0])), 2
)
grid = np.round(grid - flow).astype(np.int32)

# Find the locations outside of the original image
invalid = (grid < 0) | (grid >= np.array([image.shape[1], image.shape[0]]))
grid[invalid] = 0

warped = image[grid[:, :, 1], grid[:, :, 0]]

# Set pixels at invalid locations to 0
warped[invalid[:, :, 0] | invalid[:, :, 1]] = 0

return warped


def horn_schunck(
image0: np.ndarray,
image1: np.ndarray,
num_iter: SupportsIndex,
alpha: Optional[float] = None,
) -> tuple[np.ndarray, np.ndarray]:
"""
This function performs the Horn-Schunck algorithm and returns the estimated
optical flow. It is assumed that the input images are grayscale and
normalized to be in [0, 1].

Parameters:
image0: First image of the sequence
image1: Second image of the sequence
alpha: Regularization constant
num_iter: Number of iterations performed

Returns:
horizontal_flow: Horizontal flow
vertical_flow: Vertical flow
"""
if alpha is None:
alpha = 0.1

# Initialize flow
horizontal_flow = np.zeros_like(image0)
vertical_flow = np.zeros_like(image0)

# Prepare kernels for the calculation of the derivatives and the average velocity
kernel_x = np.array([[-1, 1], [-1, 1]]) * 0.25
kernel_y = np.array([[-1, -1], [1, 1]]) * 0.25
kernel_t = np.array([[1, 1], [1, 1]]) * 0.25
laplacian_kernel = np.array(
[[1 / 12, 1 / 6, 1 / 12], [1 / 6, 0, 1 / 6], [1 / 12, 1 / 6, 1 / 12]]
)

# Iteratively refine the flow
for _ in range(num_iter):
warped_image = warp(image0, horizontal_flow, vertical_flow)
derivative_x = convolve(warped_image, kernel_x) + convolve(image1, kernel_x)
derivative_y = convolve(warped_image, kernel_y) + convolve(image1, kernel_y)
derivative_t = convolve(warped_image, kernel_t) + convolve(image1, -kernel_t)

avg_horizontal_velocity = convolve(horizontal_flow, laplacian_kernel)
avg_vertical_velocity = convolve(vertical_flow, laplacian_kernel)

# This updates the flow as proposed in the paper (Step 12)
update = (
derivative_x * avg_horizontal_velocity
+ derivative_y * avg_vertical_velocity
+ derivative_t
)
update = update / (alpha ** 2 + derivative_x ** 2 + derivative_y ** 2)

horizontal_flow = avg_horizontal_velocity - derivative_x * update
vertical_flow = avg_vertical_velocity - derivative_y * update

return horizontal_flow, vertical_flow