Source code for vr180_convert.cli

from datetime import datetime, timezone
from enum import auto
from hashlib import sha256
from logging import DEBUG, INFO, basicConfig, getLogger
from pathlib import Path
from typing import Any, Sequence

import cv2 as cv
import numpy as np
import typer
from quaternion import *  # noqa
from rich.logging import RichHandler
from strenum import StrEnum
from typing_extensions import Annotated

from vr180_convert.transformer import *  # noqa
from vr180_convert.transformer import (
    EquirectangularEncoder,
    Euclidean3DRotator,
    FisheyeDecoder,
    MultiTransformer,
    TransformerBase,
)

from .remapper import apply, apply_lr, match_lr

LOG = getLogger(__name__)
DEFAULT_EXTENSION = "png"

app = typer.Typer()


@app.callback()
def _main(verbose: bool = typer.Option(False, "--verbose", "-v")) -> None:
    level = INFO
    if verbose:
        level = DEBUG
    basicConfig(
        level=level,
        format="%(message)s",
        datefmt="[%X]",
        handlers=[RichHandler(rich_tracebacks=True)],
    )


class _InterpolationFlags(StrEnum):
    """Interpolation flags enum for typer."""

    INTER_NEAREST = auto()
    INTER_LINEAR = auto()
    INTER_CUBIC = auto()
    INTER_AREA = auto()
    INTER_LANCZOS4 = auto()
    INTER_MAX = auto()
    WARP_FILL_OUTLIERS = auto()
    WARP_INVERSE_MAP = auto()


class _BorderTypes(StrEnum):
    """Border types enum for typer."""

    BORDER_CONSTANT = auto()
    BORDER_REPLICATE = auto()
    BORDER_REFLECT = auto()
    BORDER_WRAP = auto()
    BORDER_REFLECT_101 = auto()
    BORDER_TRANSPARENT = auto()
    BORDER_ISOLATED = auto()


def _get_position_gui(image_paths: Sequence[Path]) -> list[tuple[int, int]]:
    """Get the position of the GUI window."""
    window_name = "Select position"
    cv.namedWindow(window_name, cv.WND_PROP_FULLSCREEN)
    cv.setWindowProperty(window_name, cv.WND_PROP_FULLSCREEN, cv.WINDOW_FULLSCREEN)

    i = 0
    res = []

    def on_mouse(event: int, x: int, y: int, flags: int, param: Any) -> None:
        nonlocal res, i
        if event == cv.EVENT_LBUTTONDOWN:
            res.append((x, y))
            LOG.info(f"Position {i}: ({x}, {y})")

    cv.setMouseCallback(window_name, on_mouse)
    cv.imshow(window_name, cv.imread(image_paths[i].as_posix()))
    while True:
        cv.waitKey(10)
        if len(res) == i + 1:
            i += 1
            if i == len(image_paths):
                break
            cv.imshow(window_name, cv.imread(image_paths[i].as_posix()))
    cv.destroyAllWindows()
    return res


[docs] @app.command() def lr( left_path: Annotated[Path, typer.Argument(help="Left image path")], right_path: Annotated[Path, typer.Argument(help="Right image path")], transformer: Annotated[ str, typer.Option(help="Transformer Python code (to be `eval()`ed)") ] = "", out_path: Annotated[ Path, typer.Option( help="Output image path, defaults to left_path.with_suffix('.out.jpg')" ), ] = Path(""), size: Annotated[ str, typer.Option(help="Output image size, defaults to 4096x4096") ] = "4096x4096", interpolation: Annotated[ _InterpolationFlags, typer.Option(help="Interpolation method, defaults to lanczos4"), ] = _InterpolationFlags.INTER_LANCZOS4, # type: ignore border_mode: Annotated[ _BorderTypes, typer.Option(help="Border mode, defaults to constant") ] = _BorderTypes.BORDER_CONSTANT, # type: ignore border_value: int = 0, radius: Annotated[ str, typer.Option(help="Radius of the fisheye image, defaults to 'auto'") ] = "auto", merge: Annotated[bool, typer.Option(help="Export as an anaglyph")] = False, autosearch_timestamp_calib_r_earlier_l: Annotated[ float, typer.Option( "--autosearch-timestamp-calib-r-earlier-l", "-ac", help="Autosearch timestamp calibration " "(right timestamp -= autosearch_timestamp_calib_r_earlier_l) (in seconds)", ), ] = 0.0, swap: Annotated[ bool, typer.Option(help="Swap left and right images as well as transformer, etc."), ] = False, name_unique: Annotated[bool, typer.Option(help="Make output name unique")] = False, automatch: Annotated[ str, typer.Option( help='Calibrate rotation. e.g. "0,0;0,0;1,1;1,1". If "gui", use GUI' ), ] = "", ) -> None: """Remap a pair of fisheye images to a pair of SBS equirectangular images.""" if swap: left_path, right_path = right_path, left_path autosearch_timestamp_calib_r_earlier_l = -autosearch_timestamp_calib_r_earlier_l # find closest time-matched images if left_path.is_dir() and not right_path.is_dir(): # find closest time-matched right image # sort with time right_time = right_path.stat().st_mtime left_path_candidates = sorted( left_path.rglob("*"), key=lambda p: abs( p.stat().st_mtime - right_time + autosearch_timestamp_calib_r_earlier_l ), reverse=False, ) left_path_candidates = [ p for p in left_path_candidates if (p != right_path) and (p.suffix == right_path.suffix) ] if len(left_path_candidates) == 0: raise ValueError("No time-matched left image found") left_path = left_path_candidates[0] elif not left_path.is_dir() and right_path.is_dir(): # find closest time-matched left image # sort with time left_time = left_path.stat().st_mtime right_path_candidates = sorted( right_path.rglob("*"), key=lambda p: abs( p.stat().st_mtime - left_time - autosearch_timestamp_calib_r_earlier_l ), reverse=False, ) right_path_candidates = [ p for p in right_path_candidates if (p != left_path) and (p.suffix == left_path.suffix) ] if len(right_path_candidates) == 0: raise ValueError("No time-matched right image found") right_path = right_path_candidates[0] elif left_path.is_dir() and right_path.is_dir(): raise ValueError("Both left and right paths must not be directories") LOG.info( f"L: {left_path}" f"@{datetime.fromtimestamp(left_path.stat().st_mtime, timezone.utc)}, " f"R: {right_path}" f"@{datetime.fromtimestamp(right_path.stat().st_mtime, timezone.utc)}" ) # evaluate automatch transformer_: TransformerBase | tuple[TransformerBase, TransformerBase] # evaluate transformer if transformer == "": transformer_ = EquirectangularEncoder() * FisheyeDecoder("equidistant") else: transformer_ = eval(transformer) # noqa if automatch != "": if not isinstance(transformer_, MultiTransformer): raise ValueError("Automatch requires MultiTransformer") transformer_is_encoder = [ x.__class__.__name__.endswith("Encoder") for x in transformer_.transformers ] transformer_first_encoder_index = transformer_is_encoder.index(True) transformer_until_encoder = MultiTransformer( transformer_.transformers[: transformer_first_encoder_index + 1] ) transformer_after_encoder = MultiTransformer( transformer_.transformers[transformer_first_encoder_index + 1 :] ) LOG.debug(f"{transformer_until_encoder=}, {transformer_after_encoder=}") if automatch == "gui": automatch_ = _get_position_gui( [left_path, right_path, left_path, right_path] ) LOG.info( f"Automatched position: {';'.join([','.join(map(str, p)) for p in automatch_])}" ) else: automatch_ = [tuple(chunk.split(",")) for chunk in automatch.split(";")] # type: ignore q = match_lr( FisheyeDecoder("equidistant"), # odd automatch_[1::2], # even automatch_[::2], radius=float(radius) if radius not in ["auto", "max"] else radius, # type: ignore in_paths=[left_path, right_path], ) LOG.info(f"Automatched quaternion: {q}") transformer_ = ( transformer_until_encoder * Euclidean3DRotator(q) * transformer_after_encoder, transformer_, ) LOG.info(f"Automatched transformer: {transformer_}") # if swap: # if isinstance(transformer_, tuple) and automatch == "": # transformer_ = transformer_[1], transformer_[0] # apply transformer name_unique_content = ( ( "-" + sha256( "".join( [ transformer, size, interpolation, border_mode, str(border_value), radius, str(merge), str(autosearch_timestamp_calib_r_earlier_l), str(swap), ] ).encode("utf-8") ).hexdigest()[:8] ) if name_unique else "" ) filename_default = ( f"{Path(left_path).stem}-" + f"{Path(right_path).stem}{name_unique_content}.{DEFAULT_EXTENSION}" ) apply_lr( transformer=transformer_, left_path=left_path, right_path=right_path, out_path=( Path(left_path).parent / filename_default if out_path == Path("") else out_path / filename_default if out_path.is_dir() else out_path ), radius=float(radius) if radius not in ["auto", "max"] else radius, # type: ignore size_output=tuple(map(int, size.split("x"))), # type: ignore interpolation=getattr(cv, interpolation.upper()), boarder_mode=getattr(cv, border_mode.upper()), boarder_value=border_value, merge=merge, )
[docs] @app.command() def s( in_paths: Annotated[list[Path], typer.Argument(help="Image paths")], transformer: Annotated[ str, typer.Option(help="Transformer Python code (to be `eval()`ed)") ] = "", out_path: Annotated[ Path, typer.Option( help="Output image path, defaults to left_path.with_suffix('.out.jpg')" ), ] = Path(""), size: Annotated[ str, typer.Option(help="Output image size, defaults to 4096x4096") ] = "4096x4096", interpolation: Annotated[ _InterpolationFlags, typer.Option(help="Interpolation method, defaults to lanczos4"), ] = _InterpolationFlags.INTER_LANCZOS4, # type: ignore boarder_mode: Annotated[ _BorderTypes, typer.Option(help="Border mode, defaults to constant") ] = _BorderTypes.BORDER_CONSTANT, # type: ignore boarder_value: int = 0, radius: Annotated[ str, typer.Option(help="Radius of the fisheye image, defaults to 'auto'") ] = "auto", ) -> None: """Remap fisheye images to SBS equirectangular images.""" if transformer == "": transformer_ = EquirectangularEncoder() * FisheyeDecoder("equidistant") else: transformer_ = eval(transformer) # noqa if out_path == Path(""): out_paths = [p.with_suffix(f".out.{DEFAULT_EXTENSION}") for p in in_paths] elif out_path.is_dir(): out_paths = [out_path / p.name for p in in_paths] else: if len(in_paths) > 1: raise ValueError( "Output path must be a directory when multiple input paths are provided" ) out_paths = [out_path for p in in_paths] apply( transformer=transformer_, in_paths=in_paths, out_paths=out_paths, radius=float(radius) if radius not in ["auto", "max"] else radius, # type: ignore size_output=tuple(map(int, size.split("x"))), # type: ignore interpolation=getattr(cv, interpolation.upper()), boarder_mode=getattr(cv, boarder_mode.upper()), boarder_value=boarder_value, )
[docs] @app.command() def xmp( in_paths: Annotated[list[Path], typer.Argument(help="Image paths")], wslpath: Annotated[ bool, typer.Option( "-wsl", "--wslpath", help="Convert Windows path to WSL path, be careful as it uses subprocess", ), ] = False, ) -> None: """Add XMP metadata to the image.""" import base64 import subprocess as sp from tempfile import NamedTemporaryFile try: from libxmp import XMPFiles, XMPMeta except Exception as e: import os import sys if os.name != "nt": raise e LOG.info("Trying to install this package in WSL...") command = ( "wsl -- sudo apt install -y exempi pipx python3.11 " "&& pipx run --python=python3.11 --spec=vr180-convert[xmp] " f'vr180-convert {" ".join(sys.argv[1:])} -wsl' ) LOG.info(f"Running command: {command}") sp.run(command, check=True) # noqa return for in_path in in_paths: if wslpath: in_path = Path( sp.run(["wslpath", "-u", "-a", in_path], capture_output=True) # noqa .stdout.decode() .strip() ) left_path = in_path.with_suffix(f".xmp{in_path.suffix}") # read combined image image = cv.imread(in_path.as_posix()) # extract left image left_image = image[:, : image.shape[1] // 2] right_image = image[:, image.shape[1] // 2 :] width = image.shape[1] height = image.shape[0] with NamedTemporaryFile(suffix=left_path.suffix) as right_file: cv.imwrite(left_path.as_posix(), left_image) cv.imwrite(right_file.name, right_image) # use left file as a base xmpfile = XMPFiles(file_path=left_path.as_posix(), open_forupdate=True) lxmp = XMPMeta() LOG.debug(f"{in_path=}, {lxmp=}") # Google's namespace XMP_GIMAGE = "http://ns.google.com/photos/1.0/image/" XMP_GPANO = "http://ns.google.com/photos/1.0/panorama/" XMP_NOTE = "http://ns.adobe.com/xmp/note/" XMPMeta.register_namespace(XMP_GIMAGE, "GImage") XMPMeta.register_namespace(XMP_GPANO, "GPano") XMPMeta.register_namespace(XMP_NOTE, "xmpNote") # Set GPano properties lxmp.set_property(XMP_GPANO, "UsePanoramaViewer", "True") lxmp.set_property(XMP_GPANO, "ProjectionType", "equirectangular") lxmp.set_property_int(XMP_GPANO, "CroppedAreaImageWidthPixels", width / 2) lxmp.set_property_int(XMP_GPANO, "CroppedAreaImageHeightPixels", height) lxmp.set_property_int(XMP_GPANO, "CroppedAreaLeftPixels", width / 4) lxmp.set_property_int(XMP_GPANO, "CroppedAreaTopPixels", 0) lxmp.set_property_int(XMP_GPANO, "FullPanoWidthPixels", width) lxmp.set_property_int(XMP_GPANO, "FullPanoHeightPixels", height) lxmp.set_property_int(XMP_GPANO, "PosePitchDegrees", 0) lxmp.set_property_int(XMP_GPANO, "PoseRollDegrees", 0) lxmp.set_property_int(XMP_GPANO, "InitialViewHeadingDegrees", 180) # Set GImage properties lxmp.set_property(XMP_GIMAGE, "Mime", "image/jpeg") lxmp.set_property( XMP_GIMAGE, "Data", base64.b64encode(right_file.read()).decode() ) # Set xmpNote properties and write right image lxmp.set_property( XMP_NOTE, "HasExtendedXMP", "06A56CB0A1A7FAFDA459CA3FAA14B474" ) if not xmpfile.can_put_xmp(lxmp): raise ValueError(f"Cannot put XMP to {in_path}") xmpfile.put_xmp(lxmp) xmpfile.close_file()
[docs] @app.command() def swap( in_paths: Annotated[list[Path], typer.Argument(help="Image paths")], ) -> None: """Swap left and right images.""" for in_path in in_paths: out_path = in_path.with_suffix(f".swap{in_path.suffix}") image = cv.imread(in_path.as_posix()) left, right = image[:, : image.shape[1] // 2], image[:, image.shape[1] // 2 :] image_swapped = np.hstack([right, left]) cv.imwrite(out_path.as_posix(), image_swapped)