diff --git a/.cursor/rules/dev-loop.mdc b/.cursor/rules/dev-loop.mdc index 1886aa702..b6ecb405f 100644 --- a/.cursor/rules/dev-loop.mdc +++ b/.cursor/rules/dev-loop.mdc @@ -16,7 +16,7 @@ uv run mypy Lint: ``` -uv run ruff check . --fix; uv run ruff format .; +uv run ruff check . --fix --show-fixes; uv run ruff format .; ``` Check tests: diff --git a/.cursor/rules/git-commits.mdc b/.cursor/rules/git-commits.mdc index 0a5fa1184..1090f5f95 100644 --- a/.cursor/rules/git-commits.mdc +++ b/.cursor/rules/git-commits.mdc @@ -2,81 +2,93 @@ description: git-commits: Git commit message standards and AI assistance globs: git-commits: Git commit message standards and AI assistance | *.git/* .gitignore .github/* CHANGELOG.md CHANGES.md --- -# Git Commit Standards +# Optimized Git Commit Standards -## Format +## Commit Message Format ``` -type(scope[component]): concise description +Component/File(commit-type[Subcomponent/method]): Concise description -why: explanation of necessity/impact -what: -- technical changes made -- keep focused on single topic +why: Explanation of necessity or impact. +what: +- Specific technical changes made +- Focused on a single topic -refs: #issue-number, breaking changes, links +refs: #issue-number, breaking changes, or relevant links ``` -## Commit Types -- `feat`: New features/enhancements -- `fix`: Bug fixes -- `refactor`: Code restructuring -- `docs`: Documentation changes -- `chore`: Maintenance tasks (deps, tooling) -- `test`: Test-related changes -- `style`: Code style/formatting - -## Guidelines -- Subject line: max 50 chars -- Body lines: max 72 chars -- Use imperative mood ("Add" not "Added") -- Single topic per commit -- Blank line between subject and body -- Mark breaking changes with "BREAKING:" -- Use "See also:" for external links - -## AI Assistance in Cursor -- Stage changes with `git add` -- Use `@commit` to generate initial message -- Review and adjust the generated message -- Ensure it follows format above - -## Examples - -Good commit: +## Component Patterns +### General Code Changes +``` +Component/File(feat[method]): Add feature +Component/File(fix[method]): Fix bug +Component/File(refactor[method]): Code restructure ``` -feat(subprocess[run]): Switch to unicode-only text handling -why: Improve consistency and type safety in subprocess handling -what: -- BREAKING: Changed run() to use text=True by default -- Removed console_to_str() helper and encoding logic -- Simplified output handling -- Updated type hints for better safety +### Packages and Dependencies +| Language | Standard Packages | Dev Packages | Extras / Sub-packages | +|------------|------------------------------------|-------------------------------|-----------------------------------------------| +| General | `lang(deps):` | `lang(deps[dev]):` | | +| Python | `py(deps):` | `py(deps[dev]):` | `py(deps[extra]):` | +| JavaScript | `js(deps):` | `js(deps[dev]):` | `js(deps[subpackage]):`, `js(deps[dev{subpackage}]):` | -refs: #485 -See also: https://docs.python.org/3/library/subprocess.html +#### Examples +- `py(deps[dev]): Update pytest to v8.1` +- `js(deps[ui-components]): Upgrade Button component package` +- `js(deps[dev{linting}]): Add ESLint plugin` + +### Documentation Changes +Prefix with `docs:` +``` +docs(Component/File[Subcomponent/method]): Update API usage guide ``` -Bad commit: +### Test Changes +Prefix with `tests:` ``` -updated some stuff and fixed bugs +tests(Component/File[Subcomponent/method]): Add edge case tests ``` -Cursor Rules: Add development QA and git commit standards (#cursor-rules) +## Commit Types Summary +- **feat**: New features or enhancements +- **fix**: Bug fixes +- **refactor**: Code restructuring without functional change +- **docs**: Documentation updates +- **chore**: Maintenance (dependencies, tooling, config) +- **test**: Test-related updates +- **style**: Code style and formatting + +## General Guidelines +- Subject line: Maximum 50 characters +- Body lines: Maximum 72 characters +- Use imperative mood (e.g., "Add", "Fix", not "Added", "Fixed") +- Limit to one topic per commit +- Separate subject from body with a blank line +- Mark breaking changes clearly: `BREAKING:` +- Use `See also:` to provide external references + +## AI Assistance Workflow in Cursor +- Stage changes with `git add` +- Use `@commit` to generate initial commit message +- Review and refine generated message +- Ensure adherence to these standards + +## Good Commit Example +``` +Pane(feat[capture_pane]): Add screenshot capture support -- Add dev-loop.mdc: QA process for code edits - - Type checking with mypy - - Linting with ruff - - Test validation with pytest - - Ensures edits are validated before commits +why: Provide visual debugging capability +what: +- Implement capturePane method with image export +- Integrate with existing Pane component logic +- Document usage in Pane README -- Add git-commits.mdc: Commit message standards - - Structured format with why/what sections - - Defined commit types and guidelines - - Examples of good/bad commits - - AI assistance instructions +refs: #485 +See also: https://example.com/docs/pane-capture +``` -Note: These rules help maintain code quality and commit history -consistency across the project. +## Bad Commit Example +``` +fixed stuff and improved some functions +``` -See also: https://docs.cursor.com/context/rules-for-ai \ No newline at end of file +These guidelines ensure clear, consistent commit histories, facilitating easier code review and maintenance. \ No newline at end of file diff --git a/CHANGES b/CHANGES index 451ce501c..ff4140759 100644 --- a/CHANGES +++ b/CHANGES @@ -15,6 +15,18 @@ $ pip install --user --upgrade --pre libtmux - _Future release notes will be placed here_ +### New features + +#### Waiting (#582) + +Added experimental `waiter.py` module for polling for terminal content in tmux panes: + +- Fluent API inspired by Playwright for better readability and chainable options +- Support for multiple pattern types (exact text, contains, regex, custom predicates) +- Composable waiting conditions with `wait_for_any_content` and `wait_for_all_content` +- Enhanced error handling with detailed timeouts and match information +- Robust shell prompt detection + ## libtmux 0.46.0 (2025-02-25) ### Breaking @@ -196,6 +208,43 @@ Add `TestServer` pytest fixture for creating temporary tmux servers (#565): - dev dependencies: Include `typing-extensions` for Python version < 3.11 via the `testing` and `lint` groups, via #564. +### Renamed commands + +- Renamed `Window.set_window_option()` to {meth}`Window.set_option()` (#516) + + Deprecated `Window.set_window_option()` + +- Renamed `Window.show_window_option()` to {meth}`Window.show_option()` (#516) + + Deprecated `Window.show_window_option()` + +- Renamed `Window.show_window_options()` to {meth}`Window.show_options()` (#516) + + Deprecated `Window.show_window_options()` + +### Improved options + +- Option support expanded to server, session, window, and pane +- Option support enhanced and streamlined via {class}`options.OptionsMixin`. + + - `set_option` + - `show_option` + - `show_options` + - `unset_option` + +- {meth}`Window.set_option()` (#516) + + Added arguments: + + - `format` -> `-F` + - `unset` -> `-u` + - `global` -> `-g` + - `unset_panes` -> `-U`: Also unset other panse in windows + - `prevent_overwrite`: `-o` + - `suppress_warnings`: `-q` + - `append`: `-a` + + ## libtmux 0.42.0 (2025-02-02) ### Bug fixes @@ -222,17 +271,6 @@ Add `TestServer` pytest fixture for creating temporary tmux servers (#565): - `Server`: Fix `colors` docstring to note it accepts `88` or `256`, Thank you @TravisDart! (via #544) -### Development - -#### chore: Implement PEP 563 deferred annotation resolution (#555) - -- Add `from __future__ import annotations` to defer annotation resolution and reduce unnecessary runtime computations during type checking. -- Enable Ruff checks for PEP-compliant annotations: - - [non-pep585-annotation (UP006)](https://docs.astral.sh/ruff/rules/non-pep585-annotation/) - - [non-pep604-annotation (UP007)](https://docs.astral.sh/ruff/rules/non-pep604-annotation/) - -For more details on PEP 563, see: https://peps.python.org/pep-0563/ - ## libtmux 0.40.1 (2024-12-24) ### Bug fix diff --git a/docs/api/hooks.md b/docs/api/hooks.md new file mode 100644 index 000000000..7c4d1cf8f --- /dev/null +++ b/docs/api/hooks.md @@ -0,0 +1,6 @@ +# Hooks + +```{eval-rst} +.. automodule:: libtmux.hooks + :members: +``` diff --git a/docs/api/index.md b/docs/api/index.md index 99d614fee..49c720a5c 100644 --- a/docs/api/index.md +++ b/docs/api/index.md @@ -11,6 +11,8 @@ servers sessions windows panes +options +hooks constants common exceptions diff --git a/docs/api/options.md b/docs/api/options.md new file mode 100644 index 000000000..5a5b9af3d --- /dev/null +++ b/docs/api/options.md @@ -0,0 +1,6 @@ +# Options + +```{eval-rst} +.. automodule:: libtmux.options + :members: +``` diff --git a/docs/internals/constants.md b/docs/internals/constants.md new file mode 100644 index 000000000..65059ce94 --- /dev/null +++ b/docs/internals/constants.md @@ -0,0 +1,15 @@ +# Internal Constants - `libtmux._internal.constants` + +:::{warning} +Be careful with these! These constants are private, internal as they're **not** covered by version policies. They can break or be removed between minor versions! + +If you need a data structure here made public or stabilized please [file an issue](https://github.com/tmux-python/libtmux/issues). +::: + +```{eval-rst} +.. automodule:: libtmux._internal.constants + :members: + :undoc-members: + :inherited-members: + :show-inheritance: +``` diff --git a/docs/internals/index.md b/docs/internals/index.md index 09d4a1d6f..1212dc39e 100644 --- a/docs/internals/index.md +++ b/docs/internals/index.md @@ -11,6 +11,8 @@ If you need an internal API stabilized please [file an issue](https://github.com ```{toctree} dataclasses query_list +waiter +constants ``` ## Environmental variables diff --git a/docs/internals/waiter.md b/docs/internals/waiter.md new file mode 100644 index 000000000..016d8b185 --- /dev/null +++ b/docs/internals/waiter.md @@ -0,0 +1,135 @@ +(waiter)= + +# Waiters - `libtmux._internal.waiter` + +The waiter module provides utilities for waiting on specific content to appear in tmux panes, making it easier to write reliable tests that interact with terminal output. + +## Key Features + +- **Fluent API**: Playwright-inspired chainable API for expressive, readable test code +- **Multiple Match Types**: Wait for exact matches, substring matches, regex patterns, or custom predicate functions +- **Composable Waiting**: Wait for any of multiple conditions or all conditions to be met +- **Flexible Timeout Handling**: Configure timeout behavior and error handling to suit your needs +- **Shell Prompt Detection**: Easily wait for shell readiness with built-in prompt detection +- **Robust Error Handling**: Improved exception handling and result reporting +- **Clean Code**: Well-formatted, linted code with proper type annotations + +## Basic Concepts + +When writing tests that interact with tmux sessions and panes, it's often necessary to wait for specific content to appear before proceeding with the next step. The waiter module provides a set of functions to help with this. + +There are multiple ways to match content: +- **Exact match**: The content exactly matches the specified string +- **Contains**: The content contains the specified string +- **Regex**: The content matches the specified regular expression +- **Predicate**: A custom function that takes the pane content and returns a boolean + +## Quick Start Examples + +### Simple Waiting + +Wait for specific text to appear in a pane: + +```{literalinclude} ../../tests/examples/_internal/waiter/test_wait_for_text.py +:language: python +``` + +### Advanced Matching + +Use regex patterns or custom predicates for more complex matching: + +```{literalinclude} ../../tests/examples/_internal/waiter/test_wait_for_regex.py +:language: python +``` + +```{literalinclude} ../../tests/examples/_internal/waiter/test_custom_predicate.py +:language: python +``` + +### Timeout Handling + +Control how long to wait and what happens when a timeout occurs: + +```{literalinclude} ../../tests/examples/_internal/waiter/test_timeout_handling.py +:language: python +``` + +### Waiting for Shell Readiness + +A common use case is waiting for a shell prompt to appear, indicating the command has completed. The example below uses a regular expression to match common shell prompt characters (`$`, `%`, `>`, `#`): + +```{literalinclude} ../../tests/examples/_internal/waiter/test_wait_until_ready.py +:language: python +``` + +> Note: This test is skipped in CI environments due to timing issues but works well for local development. + +## Fluent API (Playwright-inspired) + +For a more expressive and chainable API, you can use the fluent interface provided by the `PaneContentWaiter` class: + +```{literalinclude} ../../tests/examples/_internal/waiter/test_fluent_basic.py +:language: python +``` + +```{literalinclude} ../../tests/examples/_internal/waiter/test_fluent_chaining.py +:language: python +``` + +## Multiple Conditions + +The waiter module also supports waiting for multiple conditions at once: + +```{literalinclude} ../../tests/examples/_internal/waiter/test_wait_for_any_content.py +:language: python +``` + +```{literalinclude} ../../tests/examples/_internal/waiter/test_wait_for_all_content.py +:language: python +``` + +```{literalinclude} ../../tests/examples/_internal/waiter/test_mixed_pattern_types.py +:language: python +``` + +## Implementation Notes + +### Error Handling + +The waiting functions are designed to be robust and handle timing and error conditions gracefully: + +- All wait functions properly calculate elapsed time for performance tracking +- Functions handle exceptions consistently and provide clear error messages +- Proper handling of return values ensures consistent behavior whether or not raises=True + +### Type Safety + +The waiter module is fully type-annotated to ensure compatibility with static type checkers: + +- All functions include proper type hints for parameters and return values +- The ContentMatchType enum ensures that only valid match types are used +- Combined with runtime checks, this prevents type-related errors during testing + +### Example Usage in Documentation + +All examples in this documentation are actual test files from the libtmux test suite. The examples are included using `literalinclude` directives, ensuring that the documentation remains synchronized with the actual code. + +## API Reference + +```{eval-rst} +.. automodule:: libtmux._internal.waiter + :members: + :undoc-members: + :show-inheritance: + :member-order: bysource +``` + +## Extended Retry Functionality + +```{eval-rst} +.. automodule:: libtmux.test.retry_extended + :members: + :undoc-members: + :show-inheritance: + :member-order: bysource +``` diff --git a/docs/test-helpers/constants.md b/docs/test-helpers/constants.md index facbfb871..b7583a251 100644 --- a/docs/test-helpers/constants.md +++ b/docs/test-helpers/constants.md @@ -1,3 +1,5 @@ +(test_helpers_constants)= + # Constants Test-related constants used across libtmux test helpers. @@ -7,4 +9,5 @@ Test-related constants used across libtmux test helpers. :members: :undoc-members: :show-inheritance: -``` \ No newline at end of file + :member-order: bysource +``` diff --git a/docs/test-helpers/environment.md b/docs/test-helpers/environment.md index e385193a6..58b4bb549 100644 --- a/docs/test-helpers/environment.md +++ b/docs/test-helpers/environment.md @@ -1,3 +1,5 @@ +(test_helpers_environment)= + # Environment Environment variable mocking utilities for tests. @@ -7,4 +9,5 @@ Environment variable mocking utilities for tests. :members: :undoc-members: :show-inheritance: -``` \ No newline at end of file + :member-order: bysource +``` diff --git a/docs/test-helpers/index.md b/docs/test-helpers/index.md index b27fa8d3e..dd99384bf 100644 --- a/docs/test-helpers/index.md +++ b/docs/test-helpers/index.md @@ -8,10 +8,11 @@ Test helpers for libtmux and downstream libraries. constants environment random +retry temporary ``` ```{eval-rst} .. automodule:: libtmux.test :members: -``` \ No newline at end of file +``` diff --git a/docs/test-helpers/random.md b/docs/test-helpers/random.md index 2222a6cee..e4248a7fc 100644 --- a/docs/test-helpers/random.md +++ b/docs/test-helpers/random.md @@ -1,3 +1,5 @@ +(test_helpers_random)= + # Random Random string generation utilities for test names. @@ -7,4 +9,5 @@ Random string generation utilities for test names. :members: :undoc-members: :show-inheritance: -``` \ No newline at end of file + :member-order: bysource +``` diff --git a/docs/test-helpers/retry.md b/docs/test-helpers/retry.md new file mode 100644 index 000000000..6ec72e3c4 --- /dev/null +++ b/docs/test-helpers/retry.md @@ -0,0 +1,15 @@ +(test_helpers_retry)= + +# Retry Utilities + +Retry helper functions for libtmux test utilities. These utilities help manage testing operations that may require multiple attempts before succeeding. + +## Basic Retry Functionality + +```{eval-rst} +.. automodule:: libtmux.test.retry + :members: + :undoc-members: + :show-inheritance: + :member-order: bysource +``` diff --git a/docs/test-helpers/temporary.md b/docs/test-helpers/temporary.md index f1ee07b2f..ea3b8ddf9 100644 --- a/docs/test-helpers/temporary.md +++ b/docs/test-helpers/temporary.md @@ -1,3 +1,5 @@ +(test_helpers_temporary_objects)= + # Temporary Objects Context managers for temporary tmux objects (sessions, windows). @@ -7,4 +9,5 @@ Context managers for temporary tmux objects (sessions, windows). :members: :undoc-members: :show-inheritance: -``` \ No newline at end of file + :member-order: bysource +``` diff --git a/pyproject.toml b/pyproject.toml index 1115cd419..86d5a99ec 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -128,6 +128,11 @@ files = [ "tests", ] +[[tool.mypy.overrides]] +module = "tests.examples.*" +disallow_untyped_defs = false +disallow_incomplete_defs = false + [tool.coverage.run] branch = true parallel = true diff --git a/src/libtmux/_internal/constants.py b/src/libtmux/_internal/constants.py new file mode 100644 index 000000000..eea6fd089 --- /dev/null +++ b/src/libtmux/_internal/constants.py @@ -0,0 +1,461 @@ +"""Internal constants.""" + +from __future__ import annotations + +import io +import logging +import typing as t +from dataclasses import dataclass, field + +from libtmux._internal.dataclasses import SkipDefaultFieldsReprMixin +from libtmux._internal.sparse_array import SparseArray, is_sparse_array_list + +if t.TYPE_CHECKING: + from typing_extensions import TypeAlias + + +T = t.TypeVar("T") + +TerminalFeatures = dict[str, list[str]] +HookArray: TypeAlias = "dict[str, SparseArray[str]]" + +logger = logging.getLogger(__name__) + + +@dataclass(repr=False) +class ServerOptions( + SkipDefaultFieldsReprMixin, +): + backspace: str | None = field(default=None) + buffer_limit: int | None = field(default=None) + command_alias: SparseArray[str] = field(default_factory=SparseArray) + default_terminal: str | None = field(default=None) + copy_command: str | None = field(default=None) + escape_time: int | None = field(default=None) + editor: str | None = field(default=None) + exit_empty: t.Literal["on", "off"] | None = field(default=None) + exit_unattached: t.Literal["on", "off"] | None = field(default=None) + extended_keys: t.Literal["on", "off", "always"] | None = field(default=None) + focus_events: t.Literal["on", "off"] | None = field(default=None) + history_file: str | None = field(default=None) + message_limit: int | None = field(default=None) + prompt_history_limit: int | None = field(default=None) + set_clipboard: t.Literal["on", "external", "off"] | None = field(default=None) + terminal_features: TerminalFeatures = field(default_factory=dict) + terminal_overrides: SparseArray[str] = field(default_factory=SparseArray) + user_keys: SparseArray[str] = field(default_factory=SparseArray) + + def __init__(self, **kwargs: object) -> None: + # Convert hyphenated keys to underscored attribute names and assign values + for key, value in kwargs.items(): + key_underscored = key.replace("-", "_") + setattr(self, key_underscored, value) + + +@dataclass(repr=False) +class SessionOptions( + SkipDefaultFieldsReprMixin, +): + activity_action: t.Literal["any", "none", "current", "other"] | None = field( + default=None, + ) + assume_paste_time: int | None = field(default=None) + base_index: int | None = field(default=None) + bell_action: t.Literal["any", "none", "current", "other"] | None = field( + default=None, + ) + default_command: str | None = field(default=None) + default_shell: str | None = field(default=None) + default_size: str | None = field(default=None) # Format "XxY" + destroy_unattached: t.Literal["on", "off"] | None = field(default=None) + detach_on_destroy: ( + t.Literal["off", "on", "no-detached", "previous", "next"] | None + ) = field(default=None) + display_panes_active_colour: str | None = field(default=None) + display_panes_colour: str | None = field(default=None) + display_panes_time: int | None = field(default=None) + display_time: int | None = field(default=None) + history_limit: int | None = field(default=None) + key_table: str | None = field(default=None) + lock_after_time: int | None = field(default=None) + lock_command: str | None = field(default=None) + menu_style: str | None = field(default=None) + menu_selected_style: str | None = field(default=None) + menu_border_style: str | None = field(default=None) + menu_border_lines: ( + t.Literal["single", "rounded", "double", "heavy", "simple", "padded", "none"] + | None + ) = field(default=None) + message_command_style: str | None = field(default=None) + message_line: int | None = field(default=None) + message_style: str | None = field(default=None) + mouse: t.Literal["on", "off"] | None = field(default=None) + prefix: str | None = field(default=None) + prefix2: str | None = field(default=None) + renumber_windows: t.Literal["on", "off"] | None = field(default=None) + repeat_time: int | None = field(default=None) + set_titles: t.Literal["on", "off"] | None = field(default=None) + set_titles_string: str | None = field(default=None) + silence_action: t.Literal["any", "none", "current", "other"] | None = field( + default=None, + ) + status: t.Literal["off", "on"] | int | None = field(default=None) + status_format: list[str] | None = field(default=None) + status_interval: int | None = field(default=None) + status_justify: t.Literal["left", "centre", "right", "absolute-centre"] | None = ( + field(default=None) + ) + status_keys: t.Literal["vi", "emacs"] | None = field(default=None) + status_left: str | None = field(default=None) + status_left_length: int | None = field(default=None) + status_left_style: str | None = field(default=None) + status_position: t.Literal["top", "bottom"] | None = field(default=None) + status_right: str | None = field(default=None) + status_right_length: int | None = field(default=None) + status_right_style: str | None = field(default=None) + status_style: str | None = field(default=None) + update_environment: SparseArray[str] = field(default_factory=SparseArray) + visual_activity: t.Literal["on", "off", "both"] | None = field(default=None) + visual_bell: t.Literal["on", "off", "both"] | None = field(default=None) + visual_silence: t.Literal["on", "off", "both"] | None = field(default=None) + word_separators: str | None = field(default=None) + + def __init__(self, **kwargs: object) -> None: + # Convert hyphenated keys to underscored attribute names and assign values + for key, value in kwargs.items(): + key_underscored = key.replace("-", "_") + setattr(self, key_underscored, value) + + +@dataclass(repr=False) +class WindowOptions( + SkipDefaultFieldsReprMixin, +): + aggressive_resize: t.Literal["on", "off"] | None = field(default=None) + automatic_rename: t.Literal["on", "off"] | None = field(default=None) + automatic_rename_format: str | None = field(default=None) + clock_mode_colour: str | None = field(default=None) + clock_mode_style: t.Literal["12", "24"] | None = field(default=None) + fill_character: str | None = field(default=None) + main_pane_height: int | str | None = field(default=None) + main_pane_width: int | str | None = field(default=None) + copy_mode_match_style: str | None = field(default=None) + copy_mode_mark_style: str | None = field(default=None) + copy_mode_current_match_style: str | None = field(default=None) + mode_keys: t.Literal["vi", "emacs"] | None = field(default=None) + mode_style: str | None = field(default=None) + monitor_activity: t.Literal["on", "off"] | None = field(default=None) + monitor_bell: t.Literal["on", "off"] | None = field(default=None) + monitor_silence: int | None = field(default=None) # Assuming seconds as int + other_pane_height: int | str | None = field(default=None) + other_pane_width: int | str | None = field(default=None) + pane_active_border_style: str | None = field(default=None) + pane_base_index: int | None = field(default=None) + pane_border_format: str | None = field(default=None) + pane_border_indicators: t.Literal["off", "colour", "arrows", "both"] | None = field( + default=None, + ) + pane_border_lines: ( + t.Literal["single", "double", "heavy", "simple", "number"] | None + ) = field(default=None) + pane_border_status: t.Literal["off", "top", "bottom"] | None = field( + default=None, + ) + pane_border_style: str | None = field(default=None) + popup_style: str | None = field(default=None) + popup_border_style: str | None = field(default=None) + popup_border_lines: ( + t.Literal["single", "rounded", "double", "heavy", "simple", "padded", "none"] + | None + ) = field(default=None) + window_status_activity_style: str | None = field(default=None) + window_status_bell_style: str | None = field(default=None) + window_status_current_format: str | None = field(default=None) + window_status_current_style: str | None = field(default=None) + window_status_format: str | None = field(default=None) + window_status_last_style: str | None = field(default=None) + window_status_separator: str | None = field(default=None) + window_status_style: str | None = field(default=None) + window_size: t.Literal["largest", "smallest", "manual", "latest"] | None = field( + default=None, + ) + wrap_search: t.Literal["on", "off"] | None = field(default=None) + + def __init__(self, **kwargs: object) -> None: + # Convert hyphenated keys to underscored attribute names and assign values + for key, value in kwargs.items(): + key_underscored = key.replace("-", "_") + setattr(self, key_underscored, value) + + +@dataclass(repr=False) +class PaneOptions( + SkipDefaultFieldsReprMixin, +): + allow_passthrough: t.Literal["on", "off", "all"] | None = field(default=None) + allow_rename: t.Literal["on", "off"] | None = field(default=None) + alternate_screen: t.Literal["on", "off"] | None = field(default=None) + cursor_colour: str | None = field(default=None) + pane_colours: list[str] | None = field(default=None) + cursor_style: ( + t.Literal[ + "default", + "blinking-block", + "block", + "blinking-underline", + "underline", + "blinking-bar", + "bar", + ] + | None + ) = field(default=None) + remain_on_exit: t.Literal["on", "off", "failed"] | None = field(default=None) + remain_on_exit_format: str | None = field(default=None) + scroll_on_clear: t.Literal["on", "off"] | None = field(default=None) + synchronize_panes: t.Literal["on", "off"] | None = field(default=None) + window_active_style: str | None = field(default=None) + window_style: str | None = field(default=None) + + def __init__(self, **kwargs: object) -> None: + # Convert hyphenated keys to underscored attribute names and assign values + for key, value in kwargs.items(): + key_underscored = key.replace("-", "_") + setattr(self, key_underscored, value) + + +@dataclass(repr=False) +class Options( + ServerOptions, + SessionOptions, + WindowOptions, + PaneOptions, + SkipDefaultFieldsReprMixin, +): + def __init__(self, **kwargs: object) -> None: + # Convert hyphenated keys to underscored attribute names and assign values + # Remove asaterisk from inherited options + for key, value in kwargs.items(): + key_underscored = key.replace("-", "_") + key_asterisk_removed = key_underscored.rstrip("*") + setattr(self, key_asterisk_removed, value) + + +@dataclass(repr=False) +class Hooks( + SkipDefaultFieldsReprMixin, +): + """tmux hooks data structure.""" + + # --- Tmux normal hooks --- + # Run when a window has activity. See monitor-activity. + alert_activity: SparseArray[str] = field(default_factory=SparseArray) + # Run when a window has received a bell. See monitor-bell. + alert_bell: SparseArray[str] = field(default_factory=SparseArray) + # Run when a window has been silent. See monitor-silence. + alert_silence: SparseArray[str] = field(default_factory=SparseArray) + # Run when a client becomes the latest active client of its session. + client_active: SparseArray[str] = field(default_factory=SparseArray) + # Run when a client is attached. + client_attached: SparseArray[str] = field(default_factory=SparseArray) + # Run when a client is detached. + client_detached: SparseArray[str] = field(default_factory=SparseArray) + # Run when focus enters a client. + client_focus_in: SparseArray[str] = field(default_factory=SparseArray) + # Run when focus exits a client. + client_focus_out: SparseArray[str] = field(default_factory=SparseArray) + # Run when a client is resized. + client_resized: SparseArray[str] = field(default_factory=SparseArray) + # Run when a client's attached session is changed. + client_session_changed: SparseArray[str] = field(default_factory=SparseArray) + # Run when the program running in a pane exits, but remain-on-exit is on so the pane + # has not closed. + pane_died: SparseArray[str] = field(default_factory=SparseArray) + # Run when the program running in a pane exits. + pane_exited: SparseArray[str] = field(default_factory=SparseArray) + # Run when the focus enters a pane, if the focus-events option is on. + pane_focus_in: SparseArray[str] = field(default_factory=SparseArray) + # Run when the focus exits a pane, if the focus-events option is on. + pane_focus_out: SparseArray[str] = field(default_factory=SparseArray) + # Run when the terminal clipboard is set using the xterm(1) escape sequence. + pane_set_clipboard: SparseArray[str] = field(default_factory=SparseArray) + # Run when a new session created. + session_created: SparseArray[str] = field(default_factory=SparseArray) + # Run when a session closed. + session_closed: SparseArray[str] = field(default_factory=SparseArray) + # Run when a session is renamed. + session_renamed: SparseArray[str] = field(default_factory=SparseArray) + # Run when a window is linked into a session. + window_linked: SparseArray[str] = field(default_factory=SparseArray) + # Run when a window is renamed. + window_renamed: SparseArray[str] = field(default_factory=SparseArray) + # Run when a window is resized. This may be after the client-resized hook is run. + window_resized: SparseArray[str] = field(default_factory=SparseArray) + # Run when a window is unlinked from a session. + window_unlinked: SparseArray[str] = field(default_factory=SparseArray) + + # --- Tmux control mode hooks --- + # The client has detached. + client_detached_control: SparseArray[str] = field(default_factory=SparseArray) + # The client is now attached to the session with ID session-id, which is named name. + client_session_changed_control: SparseArray[str] = field( + default_factory=SparseArray, + ) + # An error has happened in a configuration file. + config_error: SparseArray[str] = field(default_factory=SparseArray) + # The pane has been continued after being paused (if the pause-after flag is set, + # see refresh-client -A). + continue_control: SparseArray[str] = field(default_factory=SparseArray) + # The tmux client is exiting immediately, either because it is not attached to any + # session or an error occurred. + exit_control: SparseArray[str] = field(default_factory=SparseArray) + # New form of %output sent when the pause-after flag is set. + extended_output: SparseArray[str] = field(default_factory=SparseArray) + # The layout of a window with ID window-id changed. + layout_change: SparseArray[str] = field(default_factory=SparseArray) + # A message sent with the display-message command. + message_control: SparseArray[str] = field(default_factory=SparseArray) + # A window pane produced output. + output: SparseArray[str] = field(default_factory=SparseArray) + # The pane with ID pane-id has changed mode. + pane_mode_changed: SparseArray[str] = field(default_factory=SparseArray) + # Paste buffer name has been changed. + paste_buffer_changed: SparseArray[str] = field(default_factory=SparseArray) + # Paste buffer name has been deleted. + paste_buffer_deleted: SparseArray[str] = field(default_factory=SparseArray) + # The pane has been paused (if the pause-after flag is set). + pause_control: SparseArray[str] = field(default_factory=SparseArray) + # The client is now attached to the session with ID session-id, which is named name. + session_changed_control: SparseArray[str] = field(default_factory=SparseArray) + # The current session was renamed to name. + session_renamed_control: SparseArray[str] = field(default_factory=SparseArray) + # The session with ID session-id changed its active window to the window with ID + # window-id. + session_window_changed: SparseArray[str] = field(default_factory=SparseArray) + # A session was created or destroyed. + sessions_changed: SparseArray[str] = field(default_factory=SparseArray) + # The value of the format associated with subscription name has changed to value. + subscription_changed: SparseArray[str] = field(default_factory=SparseArray) + # The window with ID window-id was created but is not linked to the current session. + unlinked_window_add: SparseArray[str] = field(default_factory=SparseArray) + # The window with ID window-id, which is not linked to the current session, was + # closed. + unlinked_window_close: SparseArray[str] = field(default_factory=SparseArray) + # The window with ID window-id, which is not linked to the current session, was + # renamed. + unlinked_window_renamed: SparseArray[str] = field(default_factory=SparseArray) + # The window with ID window-id was linked to the current session. + window_add: SparseArray[str] = field(default_factory=SparseArray) + # The window with ID window-id closed. + window_close: SparseArray[str] = field(default_factory=SparseArray) + # The layout of a window with ID window-id changed. The new layout is window-layout. + # The window's visible layout is window-visible-layout and the window flags are + # window-flags. + window_layout_changed: SparseArray[str] = field(default_factory=SparseArray) + # The active pane in the window with ID window-id changed to the pane with ID + # pane-id. + window_pane_changed: SparseArray[str] = field(default_factory=SparseArray) + # The window with ID window-id was renamed to name. + window_renamed_control: SparseArray[str] = field(default_factory=SparseArray) + + # --- After hooks - Run after specific tmux commands complete --- + # Runs after 'bind-key' completes + after_bind_key: SparseArray[str] = field(default_factory=SparseArray) + # Runs after 'capture-pane' completes + after_capture_pane: SparseArray[str] = field(default_factory=SparseArray) + # Runs after 'copy-mode' completes + after_copy_mode: SparseArray[str] = field(default_factory=SparseArray) + # Runs after 'display-message' completes + after_display_message: SparseArray[str] = field(default_factory=SparseArray) + # Runs after 'display-panes' completes + after_display_panes: SparseArray[str] = field(default_factory=SparseArray) + # Runs after 'kill-pane' completes + after_kill_pane: SparseArray[str] = field(default_factory=SparseArray) + # Runs after 'list-buffers' completes + after_list_buffers: SparseArray[str] = field(default_factory=SparseArray) + # Runs after 'list-clients' completes + after_list_clients: SparseArray[str] = field(default_factory=SparseArray) + # Runs after 'list-keys' completes + after_list_keys: SparseArray[str] = field(default_factory=SparseArray) + # Runs after 'list-panes' completes + after_list_panes: SparseArray[str] = field(default_factory=SparseArray) + # Runs after 'list-sessions' completes + after_list_sessions: SparseArray[str] = field(default_factory=SparseArray) + # Runs after 'list-windows' completes + after_list_windows: SparseArray[str] = field(default_factory=SparseArray) + # Runs after 'load-buffer' completes + after_load_buffer: SparseArray[str] = field(default_factory=SparseArray) + # Runs after 'lock-server' completes + after_lock_server: SparseArray[str] = field(default_factory=SparseArray) + # Runs after 'new-session' completes + after_new_session: SparseArray[str] = field(default_factory=SparseArray) + # Runs after 'new-window' completes + after_new_window: SparseArray[str] = field(default_factory=SparseArray) + # Runs after 'paste-buffer' completes + after_paste_buffer: SparseArray[str] = field(default_factory=SparseArray) + # Runs after 'pipe-pane' completes + after_pipe_pane: SparseArray[str] = field(default_factory=SparseArray) + # Runs after 'queue' command is processed + after_queue: SparseArray[str] = field(default_factory=SparseArray) + # Runs after 'refresh-client' completes + after_refresh_client: SparseArray[str] = field(default_factory=SparseArray) + # Runs after 'rename-session' completes + after_rename_session: SparseArray[str] = field(default_factory=SparseArray) + # Runs after 'rename-window' completes + after_rename_window: SparseArray[str] = field(default_factory=SparseArray) + # Runs after 'resize-pane' completes + after_resize_pane: SparseArray[str] = field(default_factory=SparseArray) + # Runs after 'resize-window' completes + after_resize_window: SparseArray[str] = field(default_factory=SparseArray) + # Runs after 'save-buffer' completes + after_save_buffer: SparseArray[str] = field(default_factory=SparseArray) + # Runs after 'select-layout' completes + after_select_layout: SparseArray[str] = field(default_factory=SparseArray) + # Runs after 'select-pane' completes + after_select_pane: SparseArray[str] = field(default_factory=SparseArray) + # Runs after 'select-window' completes + after_select_window: SparseArray[str] = field(default_factory=SparseArray) + # Runs after 'send-keys' completes + after_send_keys: SparseArray[str] = field(default_factory=SparseArray) + # Runs after 'set-buffer' completes + after_set_buffer: SparseArray[str] = field(default_factory=SparseArray) + # Runs after 'set-environment' completes + after_set_environment: SparseArray[str] = field(default_factory=SparseArray) + # Runs after 'set-hook' completes + after_set_hook: SparseArray[str] = field(default_factory=SparseArray) + # Runs after 'set-option' completes + after_set_option: SparseArray[str] = field(default_factory=SparseArray) + # Runs after 'show-environment' completes + after_show_environment: SparseArray[str] = field(default_factory=SparseArray) + # Runs after 'show-messages' completes + after_show_messages: SparseArray[str] = field(default_factory=SparseArray) + # Runs after 'show-options' completes + after_show_options: SparseArray[str] = field(default_factory=SparseArray) + # Runs after 'split-window' completes + after_split_window: SparseArray[str] = field(default_factory=SparseArray) + # Runs after 'unbind-key' completes + after_unbind_key: SparseArray[str] = field(default_factory=SparseArray) + + @classmethod + def from_stdout(cls, value: list[str]) -> Hooks: + from libtmux.options import ( + explode_arrays, + explode_complex, + parse_options_to_dict, + ) + + output_exploded = explode_complex( + explode_arrays( + parse_options_to_dict( + io.StringIO("\n".join(value)), + ), + force_array=True, + ), + ) + + assert is_sparse_array_list(output_exploded) + + output_renamed: HookArray = { + k.lstrip("%").replace("-", "_"): v for k, v in output_exploded.items() + } + + return cls(**output_renamed) diff --git a/src/libtmux/_internal/retry_extended.py b/src/libtmux/_internal/retry_extended.py new file mode 100644 index 000000000..6d76ef998 --- /dev/null +++ b/src/libtmux/_internal/retry_extended.py @@ -0,0 +1,65 @@ +"""Extended retry functionality for libtmux.""" + +from __future__ import annotations + +import logging +import time +import typing as t + +from libtmux.exc import WaitTimeout +from libtmux.test.constants import ( + RETRY_INTERVAL_SECONDS, + RETRY_TIMEOUT_SECONDS, +) + +logger = logging.getLogger(__name__) + +if t.TYPE_CHECKING: + from collections.abc import Callable + + +def retry_until_extended( + fun: Callable[[], bool], + seconds: float = RETRY_TIMEOUT_SECONDS, + *, + interval: float = RETRY_INTERVAL_SECONDS, + raises: bool | None = True, +) -> tuple[bool, Exception | None]: + """ + Retry a function until a condition meets or the specified time passes. + + Extended version that returns both success state and exception. + + Parameters + ---------- + fun : callable + A function that will be called repeatedly until it returns ``True`` or + the specified time passes. + seconds : float + Seconds to retry. Defaults to ``8``, which is configurable via + ``RETRY_TIMEOUT_SECONDS`` environment variables. + interval : float + Time in seconds to wait between calls. Defaults to ``0.05`` and is + configurable via ``RETRY_INTERVAL_SECONDS`` environment variable. + raises : bool + Whether or not to raise an exception on timeout. Defaults to ``True``. + + Returns + ------- + tuple[bool, Exception | None] + Tuple containing (success, exception). If successful, the exception will + be None. + """ + ini = time.time() + exception = None + + while not fun(): + end = time.time() + if end - ini >= seconds: + timeout_msg = f"Timed out after {seconds} seconds" + exception = WaitTimeout(timeout_msg) + if raises: + raise exception + return False, exception + time.sleep(interval) + return True, None diff --git a/src/libtmux/_internal/sparse_array.py b/src/libtmux/_internal/sparse_array.py new file mode 100644 index 000000000..e15c0e60e --- /dev/null +++ b/src/libtmux/_internal/sparse_array.py @@ -0,0 +1,56 @@ +"""Sparse array for libtmux options and hooks.""" + +from __future__ import annotations + +import typing as t + +if t.TYPE_CHECKING: + from typing_extensions import TypeAlias, TypeGuard + + from libtmux.options import ExplodedComplexUntypedOptionsDict + + +T = t.TypeVar("T") +HookArray: TypeAlias = "dict[str, SparseArray[str]]" + + +def is_sparse_array_list( + items: ExplodedComplexUntypedOptionsDict, +) -> TypeGuard[HookArray]: + return all( + isinstance( + v, + SparseArray, + ) + for k, v in items.items() + ) + + +class SparseArray(dict[int, T], t.Generic[T]): + """Support non-sequential indexes while maintaining :class:`list`-like behavior. + + A normal :class:`list` would raise :exc:`IndexError`. + + There are no native sparse arrays in python that contain non-sequential indexes and + maintain list-like behavior. This is useful for handling libtmux options and hooks: + + ``command-alias[1] split-pane=split-window`` to + ``{'command-alias[1]': {'split-pane=split-window'}}`` + + :class:`list` would lose indice info, and :class:`dict` would lose list-like + behavior. + """ + + def add(self, index: int, value: T) -> None: + self[index] = value + + def append(self, value: T) -> None: + index = max(self.keys()) + 1 + self[index] = value + + def iter_values(self) -> t.Iterator[T]: + for index in sorted(self.keys()): + yield self[index] + + def as_list(self) -> list[T]: + return [self[index] for index in sorted(self.keys())] diff --git a/src/libtmux/_internal/waiter.py b/src/libtmux/_internal/waiter.py new file mode 100644 index 000000000..eb687917f --- /dev/null +++ b/src/libtmux/_internal/waiter.py @@ -0,0 +1,1806 @@ +"""Terminal content waiting utility for libtmux tests. + +This module provides functions to wait for specific content to appear in tmux panes, +making it easier to write reliable tests that interact with terminal output. +""" + +from __future__ import annotations + +import logging +import re +import time +import typing as t +from dataclasses import dataclass +from enum import Enum, auto + +from libtmux._internal.retry_extended import retry_until_extended +from libtmux.exc import WaitTimeout +from libtmux.test.constants import ( + RETRY_INTERVAL_SECONDS, + RETRY_TIMEOUT_SECONDS, +) +from libtmux.test.retry import retry_until + +if t.TYPE_CHECKING: + from collections.abc import Callable + + from libtmux.pane import Pane + from libtmux.server import Server + from libtmux.session import Session + from libtmux.window import Window + +logger = logging.getLogger(__name__) + + +class ContentMatchType(Enum): + """Type of content matching to use when waiting for pane content. + + Examples + -------- + >>> # Using content match types with their intended patterns + >>> ContentMatchType.EXACT + + >>> ContentMatchType.CONTAINS + + >>> ContentMatchType.REGEX + + >>> ContentMatchType.PREDICATE + + + >>> # These match types are used to specify how to match content in wait functions + >>> def demo_match_types(): + ... # For exact matching (entire content must exactly match) + ... exact_type = ContentMatchType.EXACT + ... # For substring matching (content contains the specified string) + ... contains_type = ContentMatchType.CONTAINS + ... # For regex pattern matching + ... regex_type = ContentMatchType.REGEX + ... # For custom predicate functions + ... predicate_type = ContentMatchType.PREDICATE + ... return [exact_type, contains_type, regex_type, predicate_type] + >>> match_types = demo_match_types() + >>> len(match_types) + 4 + """ + + EXACT = auto() # Full exact match of content + CONTAINS = auto() # Content contains the specified string + REGEX = auto() # Content matches the specified regex pattern + PREDICATE = auto() # Custom predicate function returns True + + +@dataclass +class WaitResult: + """Result from a wait operation. + + Attributes + ---------- + success : bool + Whether the wait operation succeeded + content : list[str] | None + The content of the pane at the time of the match + matched_content : str | list[str] | None + The content that matched the pattern + match_line : int | None + The line number of the match (0-indexed) + elapsed_time : float | None + Time taken for the wait operation + error : str | None + Error message if the wait operation failed + matched_pattern_index : int | None + Index of the pattern that matched (only for wait_for_any_content) + + Examples + -------- + >>> # Create a successful wait result + >>> result = WaitResult( + ... success=True, + ... content=["line 1", "hello world", "line 3"], + ... matched_content="hello world", + ... match_line=1, + ... elapsed_time=0.5, + ... ) + >>> result.success + True + >>> result.matched_content + 'hello world' + >>> result.match_line + 1 + + >>> # Create a failed wait result with an error message + >>> error_result = WaitResult( + ... success=False, + ... error="Timed out waiting for 'pattern' after 5.0 seconds", + ... ) + >>> error_result.success + False + >>> error_result.error + "Timed out waiting for 'pattern' after 5.0 seconds" + >>> error_result.content is None + True + + >>> # Wait result with matched_pattern_index (from wait_for_any_content) + >>> multi_pattern = WaitResult( + ... success=True, + ... content=["command output", "success: operation completed", "more output"], + ... matched_content="success: operation completed", + ... match_line=1, + ... matched_pattern_index=2, + ... ) + >>> multi_pattern.matched_pattern_index + 2 + """ + + success: bool + content: list[str] | None = None + matched_content: str | list[str] | None = None + match_line: int | None = None + elapsed_time: float | None = None + error: str | None = None + matched_pattern_index: int | None = None + + +# Error messages as constants +ERR_PREDICATE_TYPE = "content_pattern must be callable when match_type is PREDICATE" +ERR_EXACT_TYPE = "content_pattern must be a string when match_type is EXACT" +ERR_CONTAINS_TYPE = "content_pattern must be a string when match_type is CONTAINS" +ERR_REGEX_TYPE = ( + "content_pattern must be a string or regex pattern when match_type is REGEX" +) + + +class PaneContentWaiter: + r"""Fluent interface for waiting on pane content. + + This class provides a more fluent API for waiting on pane content, + allowing method chaining for better readability. + + Examples + -------- + >>> # Basic usage - assuming pane is a fixture from conftest.py + >>> waiter = PaneContentWaiter(pane) + >>> isinstance(waiter, PaneContentWaiter) + True + + >>> # Method chaining to configure options + >>> waiter = ( + ... PaneContentWaiter(pane) + ... .with_timeout(10.0) + ... .with_interval(0.5) + ... .without_raising() + ... ) + >>> waiter.timeout + 10.0 + >>> waiter.interval + 0.5 + >>> waiter.raises + False + + >>> # Configure line range for capture + >>> waiter = PaneContentWaiter(pane).with_line_range(0, 10) + >>> waiter.start_line + 0 + >>> waiter.end_line + 10 + + >>> # Create a checker for demonstration + >>> import re + >>> def is_ready(content): + ... return any("ready" in line.lower() for line in content) + + >>> # Methods available for different match types + >>> hasattr(waiter, 'wait_for_text') + True + >>> hasattr(waiter, 'wait_for_exact_text') + True + >>> hasattr(waiter, 'wait_for_regex') + True + >>> hasattr(waiter, 'wait_for_predicate') + True + >>> hasattr(waiter, 'wait_until_ready') + True + + A functional example: send text to the pane and wait for it: + + >>> # First, send "hello world" to the pane + >>> pane.send_keys("echo 'hello world'", enter=True) + >>> + >>> # Then wait for it to appear in the pane content + >>> result = PaneContentWaiter(pane).wait_for_text("hello world") + >>> result.success + True + >>> "hello world" in result.matched_content + True + >>> + + With options: + + >>> result = ( + ... PaneContentWaiter(pane) + ... .with_timeout(5.0) + ... .wait_for_text("hello world") + ... ) + + Wait for text with a longer timeout: + + >>> pane.send_keys("echo 'Operation completed'", enter=True) + >>> try: + ... result = ( + ... expect(pane) + ... .with_timeout(1.0) # Reduce timeout for faster doctest execution + ... .wait_for_text("Operation completed") + ... ) + ... print(f"Result success: {result.success}") + ... except Exception as e: + ... print(f"Caught exception: {type(e).__name__}: {e}") + Result success: True + + Wait for regex pattern: + + >>> pane.send_keys("echo 'Process 0 completed.'", enter=True) + >>> try: + ... result = ( + ... PaneContentWaiter(pane) + ... .with_timeout(1.0) # Reduce timeout for faster doctest execution + ... .wait_for_regex(r"Process \d+ completed") + ... ) + ... # Print debug info about the result for doctest + ... print(f"Result success: {result.success}") + ... except Exception as e: + ... print(f"Caught exception: {type(e).__name__}: {e}") + Result success: True + + Custom predicate: + + >>> pane.send_keys("echo 'We are ready!'", enter=True) + >>> def is_ready(content): + ... return any("ready" in line.lower() for line in content) + >>> result = PaneContentWaiter(pane).wait_for_predicate(is_ready) + + Timeout: + + >>> try: + ... result = ( + ... PaneContentWaiter(pane) + ... .with_timeout(0.01) + ... .wait_for_exact_text("hello world") + ... ) + ... except WaitTimeout: + ... print('No exact match') + No exact match + """ + + def __init__(self, pane: Pane) -> None: + """Initialize with a tmux pane. + + Parameters + ---------- + pane : Pane + The tmux pane to check + """ + self.pane = pane + self.timeout: float = RETRY_TIMEOUT_SECONDS + self.interval: float = RETRY_INTERVAL_SECONDS + self.raises: bool = True + self.start_line: t.Literal["-"] | int | None = None + self.end_line: t.Literal["-"] | int | None = None + + def with_timeout(self, timeout: float) -> PaneContentWaiter: + """Set the timeout for waiting. + + Parameters + ---------- + timeout : float + Maximum time to wait in seconds + + Returns + ------- + PaneContentWaiter + Self for method chaining + """ + self.timeout = timeout + return self + + def with_interval(self, interval: float) -> PaneContentWaiter: + """Set the interval between checks. + + Parameters + ---------- + interval : float + Time between checks in seconds + + Returns + ------- + PaneContentWaiter + Self for method chaining + """ + self.interval = interval + return self + + def without_raising(self) -> PaneContentWaiter: + """Disable raising exceptions on timeout. + + Returns + ------- + PaneContentWaiter + Self for method chaining + """ + self.raises = False + return self + + def with_line_range( + self, + start: t.Literal["-"] | int | None, + end: t.Literal["-"] | int | None, + ) -> PaneContentWaiter: + """Specify lines to capture from the pane. + + Parameters + ---------- + start : int | "-" | None + Starting line for capture_pane (passed to pane.capture_pane) + end : int | "-" | None + End line for capture_pane (passed to pane.capture_pane) + + Returns + ------- + PaneContentWaiter + Self for method chaining + """ + self.start_line = start + self.end_line = end + return self + + def wait_for_text(self, text: str) -> WaitResult: + """Wait for text to appear in the pane (contains match). + + Parameters + ---------- + text : str + Text to wait for (contains match) + + Returns + ------- + WaitResult + Result of the wait operation + """ + return wait_for_pane_content( + pane=self.pane, + content_pattern=text, + match_type=ContentMatchType.CONTAINS, + timeout=self.timeout, + interval=self.interval, + start=self.start_line, + end=self.end_line, + raises=self.raises, + ) + + def wait_for_exact_text(self, text: str) -> WaitResult: + """Wait for exact text to appear in the pane. + + Parameters + ---------- + text : str + Text to wait for (exact match) + + Returns + ------- + WaitResult + Result of the wait operation + """ + return wait_for_pane_content( + pane=self.pane, + content_pattern=text, + match_type=ContentMatchType.EXACT, + timeout=self.timeout, + interval=self.interval, + start=self.start_line, + end=self.end_line, + raises=self.raises, + ) + + def wait_for_regex(self, pattern: str | re.Pattern[str]) -> WaitResult: + """Wait for text matching a regex pattern. + + Parameters + ---------- + pattern : str | re.Pattern + Regex pattern to match + + Returns + ------- + WaitResult + Result of the wait operation + """ + return wait_for_pane_content( + pane=self.pane, + content_pattern=pattern, + match_type=ContentMatchType.REGEX, + timeout=self.timeout, + interval=self.interval, + start=self.start_line, + end=self.end_line, + raises=self.raises, + ) + + def wait_for_predicate(self, predicate: Callable[[list[str]], bool]) -> WaitResult: + """Wait for a custom predicate function to return True. + + Parameters + ---------- + predicate : callable + Function that takes pane content lines and returns boolean + + Returns + ------- + WaitResult + Result of the wait operation + """ + return wait_for_pane_content( + pane=self.pane, + content_pattern=predicate, + match_type=ContentMatchType.PREDICATE, + timeout=self.timeout, + interval=self.interval, + start=self.start_line, + end=self.end_line, + raises=self.raises, + ) + + def wait_until_ready( + self, + shell_prompt: str | re.Pattern[str] | None = None, + ) -> WaitResult: + """Wait until the pane is ready with a shell prompt. + + Parameters + ---------- + shell_prompt : str | re.Pattern | None + The shell prompt pattern to look for, or None to auto-detect + + Returns + ------- + WaitResult + Result of the wait operation + """ + return wait_until_pane_ready( + pane=self.pane, + shell_prompt=shell_prompt, + timeout=self.timeout, + interval=self.interval, + raises=self.raises, + ) + + +def expect(pane: Pane) -> PaneContentWaiter: + r"""Fluent interface for waiting on pane content. + + This function provides a more fluent API for waiting on pane content, + allowing method chaining for better readability. + + Examples + -------- + Basic usage with pane fixture: + + >>> waiter = expect(pane) + >>> isinstance(waiter, PaneContentWaiter) + True + + Method chaining to configure the waiter: + + >>> configured_waiter = expect(pane).with_timeout(15.0).without_raising() + >>> configured_waiter.timeout + 15.0 + >>> configured_waiter.raises + False + + Equivalent to :class:`PaneContentWaiter` but with a more expressive name: + + >>> expect(pane) is not PaneContentWaiter(pane) # Different instances + True + >>> type(expect(pane)) == type(PaneContentWaiter(pane)) # Same class + True + + A functional example showing actual usage: + + >>> # Send a command to the pane + >>> pane.send_keys("echo 'testing expect'", enter=True) + >>> + >>> # Wait for the output using the expect function + >>> result = expect(pane).wait_for_text("testing expect") + >>> result.success + True + >>> + + Wait for text with a longer timeout: + + >>> pane.send_keys("echo 'Operation completed'", enter=True) + >>> try: + ... result = ( + ... expect(pane) + ... .with_timeout(1.0) # Reduce timeout for faster doctest execution + ... .without_raising() # Don't raise exceptions + ... .wait_for_text("Operation completed") + ... ) + ... print(f"Result success: {result.success}") + ... except Exception as e: + ... print(f"Caught exception: {type(e).__name__}: {e}") + Result success: True + + Wait for a regex match without raising exceptions on timeout: + >>> pane.send_keys("echo 'Process 19 completed'", enter=True) + >>> try: + ... result = ( + ... expect(pane) + ... .with_timeout(1.0) # Reduce timeout for faster doctest execution + ... .without_raising() # Don't raise exceptions + ... .wait_for_regex(r"Process \d+ completed") + ... ) + ... print(f"Result success: {result.success}") + ... except Exception as e: + ... print(f"Caught exception: {type(e).__name__}: {e}") + Result success: True + """ + return PaneContentWaiter(pane) + + +def wait_for_pane_content( + pane: Pane, + content_pattern: str | re.Pattern[str] | Callable[[list[str]], bool], + match_type: ContentMatchType = ContentMatchType.CONTAINS, + timeout: float = RETRY_TIMEOUT_SECONDS, + interval: float = RETRY_INTERVAL_SECONDS, + start: t.Literal["-"] | int | None = None, + end: t.Literal["-"] | int | None = None, + raises: bool = True, +) -> WaitResult: + r"""Wait for specific content to appear in a pane. + + Parameters + ---------- + pane : Pane + The tmux pane to wait for content in + content_pattern : str | re.Pattern | callable + Content to wait for. This can be: + - A string to match exactly or check if contained (based on match_type) + - A compiled regex pattern to match against + - A predicate function that takes the pane content lines and returns a boolean + match_type : ContentMatchType + How to match the content_pattern against pane content + timeout : float + Maximum time to wait in seconds + interval : float + Time between checks in seconds + start : int | "-" | None + Starting line for capture_pane (passed to pane.capture_pane) + end : int | "-" | None + End line for capture_pane (passed to pane.capture_pane) + raises : bool + Whether to raise an exception on timeout + + Returns + ------- + WaitResult + Result object with success status and matched content information + + Raises + ------ + WaitTimeout + If raises=True and the timeout is reached before content is found + + Examples + -------- + Wait with contains match (default), for testing purposes with a small timeout + and no raises: + + >>> result = wait_for_pane_content( + ... pane=pane, + ... content_pattern=r"$", # Look for shell prompt + ... timeout=0.5, + ... raises=False + ... ) + >>> isinstance(result, WaitResult) + True + + Using exact match: + + >>> result_exact = wait_for_pane_content( + ... pane=pane, + ... content_pattern="exact text to match", + ... match_type=ContentMatchType.EXACT, + ... timeout=0.1, + ... raises=False + ... ) + >>> isinstance(result_exact, WaitResult) + True + + Using regex pattern: + + >>> import re + >>> pattern = re.compile(r"\$|%|>") # Common shell prompts + >>> result_regex = wait_for_pane_content( + ... pane=pane, + ... content_pattern=pattern, + ... match_type=ContentMatchType.REGEX, + ... timeout=0.1, + ... raises=False + ... ) + >>> isinstance(result_regex, WaitResult) + True + + Using predicate function: + + >>> def has_at_least_1_line(content): + ... return len(content) >= 1 + >>> result_pred = wait_for_pane_content( + ... pane=pane, + ... content_pattern=has_at_least_1_line, + ... match_type=ContentMatchType.PREDICATE, + ... timeout=0.1, + ... raises=False + ... ) + >>> isinstance(result_pred, WaitResult) + True + + Wait for a `$` written on the screen (unsubmitted): + + >>> pane.send_keys("$") + >>> result = wait_for_pane_content(pane, "$", ContentMatchType.CONTAINS) + + Wait for exact text (unsubmitted, and fails): + + >>> try: + ... pane.send_keys("echo 'Success'") + ... result = wait_for_pane_content( + ... pane, + ... "Success", + ... ContentMatchType.EXACT, + ... timeout=0.01 + ... ) + ... except WaitTimeout: + ... print("No exact match.") + No exact match. + + Use regex pattern matching: + + >>> import re + >>> pane.send_keys("echo 'Error: There was a problem.'") + >>> result = wait_for_pane_content( + ... pane, + ... re.compile(r"Error: .*"), + ... ContentMatchType.REGEX + ... ) + + Use custom predicate function: + + >>> def has_at_least_3_lines(content): + ... return len(content) >= 3 + + >>> for _ in range(5): + ... pane.send_keys("echo 'A line'", enter=True) + >>> result = wait_for_pane_content( + ... pane, + ... has_at_least_3_lines, + ... ContentMatchType.PREDICATE + ... ) + """ + result = WaitResult(success=False) + + def check_content() -> bool: + """Check if the content pattern is in the pane.""" + content = pane.capture_pane(start=start, end=end) + if isinstance(content, str): + content = [content] + + result.content = content + + # Handle predicate match type + if match_type == ContentMatchType.PREDICATE: + if not callable(content_pattern): + raise TypeError(ERR_PREDICATE_TYPE) + # For predicate, we pass the list of content lines + matched = content_pattern(content) + if matched: + result.matched_content = "\n".join(content) + return True + return False + + # Handle exact match type + if match_type == ContentMatchType.EXACT: + if not isinstance(content_pattern, str): + raise TypeError(ERR_EXACT_TYPE) + matched = "\n".join(content) == content_pattern + if matched: + result.matched_content = content_pattern + return True + return False + + # Handle contains match type + if match_type == ContentMatchType.CONTAINS: + if not isinstance(content_pattern, str): + raise TypeError(ERR_CONTAINS_TYPE) + content_str = "\n".join(content) + if content_pattern in content_str: + result.matched_content = content_pattern + # Find which line contains the match + for i, line in enumerate(content): + if content_pattern in line: + result.match_line = i + break + return True + return False + + # Handle regex match type + if match_type == ContentMatchType.REGEX: + if isinstance(content_pattern, (str, re.Pattern)): + pattern = ( + content_pattern + if isinstance(content_pattern, re.Pattern) + else re.compile(content_pattern) + ) + content_str = "\n".join(content) + match = pattern.search(content_str) + if match: + result.matched_content = match.group(0) + # Try to find which line contains the match + for i, line in enumerate(content): + if pattern.search(line): + result.match_line = i + break + return True + return False + raise TypeError(ERR_REGEX_TYPE) + return None + + try: + success, exception = retry_until_extended( + check_content, + timeout, + interval=interval, + raises=raises, + ) + if exception: + if raises: + raise + result.error = str(exception) + return result + result.success = success + except WaitTimeout as e: + if raises: + raise + result.error = str(e) + return result + + +def wait_until_pane_ready( + pane: Pane, + shell_prompt: str | re.Pattern[str] | Callable[[list[str]], bool] | None = None, + match_type: ContentMatchType = ContentMatchType.CONTAINS, + timeout: float = RETRY_TIMEOUT_SECONDS, + interval: float = RETRY_INTERVAL_SECONDS, + raises: bool = True, +) -> WaitResult: + r"""Wait until pane is ready with shell prompt. + + This is a convenience function for the common case of waiting for a shell prompt. + + Parameters + ---------- + pane : Pane + The tmux pane to check + shell_prompt : str | re.Pattern | callable + The shell prompt pattern to look for, or None to auto-detect + match_type : ContentMatchType + How to match the shell_prompt + timeout : float + Maximum time to wait in seconds + interval : float + Time between checks in seconds + raises : bool + Whether to raise an exception on timeout + + Returns + ------- + WaitResult + Result of the wait operation + + Examples + -------- + Basic usage - auto-detecting shell prompt: + + >>> result = wait_until_pane_ready( + ... pane=pane, + ... timeout=0.5, + ... raises=False + ... ) + >>> isinstance(result, WaitResult) + True + + Wait with specific prompt pattern: + + >>> result_prompt = wait_until_pane_ready( + ... pane=pane, + ... shell_prompt=r"$", + ... timeout=0.1, + ... raises=False + ... ) + >>> isinstance(result_prompt, WaitResult) + True + + Using regex pattern: + + >>> import re + >>> pattern = re.compile(r"[$%#>]") + >>> result_regex = wait_until_pane_ready( + ... pane=pane, + ... shell_prompt=pattern, + ... match_type=ContentMatchType.REGEX, + ... timeout=0.1, + ... raises=False + ... ) + >>> isinstance(result_regex, WaitResult) + True + + Using custom predicate function: + + >>> def has_prompt(content): + ... return any(line.endswith("$") for line in content) + >>> result_predicate = wait_until_pane_ready( + ... pane=pane, + ... shell_prompt=has_prompt, + ... match_type=ContentMatchType.PREDICATE, + ... timeout=0.1, + ... raises=False + ... ) + >>> isinstance(result_predicate, WaitResult) + True + """ + if shell_prompt is None: + # Default to checking for common shell prompts + def check_for_prompt(lines: list[str]) -> bool: + content = "\n".join(lines) + return "$" in content or "%" in content or "#" in content + + shell_prompt = check_for_prompt + match_type = ContentMatchType.PREDICATE + + return wait_for_pane_content( + pane=pane, + content_pattern=shell_prompt, + match_type=match_type, + timeout=timeout, + interval=interval, + raises=raises, + ) + + +def wait_for_server_condition( + server: Server, + condition: Callable[[Server], bool], + timeout: float = RETRY_TIMEOUT_SECONDS, + interval: float = RETRY_INTERVAL_SECONDS, + raises: bool = True, +) -> bool: + """Wait for a condition on the server to be true. + + Parameters + ---------- + server : Server + The tmux server to check + condition : callable + A function that takes the server and returns a boolean + timeout : float + Maximum time to wait in seconds + interval : float + Time between checks in seconds + raises : bool + Whether to raise an exception on timeout + + Returns + ------- + bool + True if the condition was met, False if timed out (and raises=False) + + Examples + -------- + Basic usage with a simple condition: + + >>> def has_sessions(server): + ... return len(server.sessions) > 0 + + Assuming server has at least one session: + + >>> result = wait_for_server_condition( + ... server, + ... has_sessions, + ... timeout=0.1, + ... raises=False + ... ) + >>> isinstance(result, bool) + True + + Using a lambda for a simple condition: + + >>> result = wait_for_server_condition( + ... server, + ... lambda s: len(s.sessions) >= 1, + ... timeout=0.1, + ... raises=False + ... ) + >>> isinstance(result, bool) + True + + Condition that checks for a specific session: + + >>> def has_specific_session(server): + ... return any(s.name == "specific_name" for s in server.sessions) + + This will likely timeout since we haven't created that session: + + >>> result = wait_for_server_condition( + ... server, + ... has_specific_session, + ... timeout=0.1, + ... raises=False + ... ) + >>> isinstance(result, bool) + True + """ + + def check_condition() -> bool: + return condition(server) + + return retry_until(check_condition, timeout, interval=interval, raises=raises) + + +def wait_for_session_condition( + session: Session, + condition: Callable[[Session], bool], + timeout: float = RETRY_TIMEOUT_SECONDS, + interval: float = RETRY_INTERVAL_SECONDS, + raises: bool = True, +) -> bool: + """Wait for a condition on the session to be true. + + Parameters + ---------- + session : Session + The tmux session to check + condition : callable + A function that takes the session and returns a boolean + timeout : float + Maximum time to wait in seconds + interval : float + Time between checks in seconds + raises : bool + Whether to raise an exception on timeout + + Returns + ------- + bool + True if the condition was met, False if timed out (and raises=False) + + Examples + -------- + Basic usage with a simple condition: + + >>> def has_windows(session): + ... return len(session.windows) > 0 + + Assuming session has at least one window: + + >>> result = wait_for_session_condition( + ... session, + ... has_windows, + ... timeout=0.1, + ... raises=False + ... ) + >>> isinstance(result, bool) + True + + Using a lambda for a simple condition: + + >>> result = wait_for_session_condition( + ... session, + ... lambda s: len(s.windows) >= 1, + ... timeout=0.1, + ... raises=False + ... ) + >>> isinstance(result, bool) + True + + Condition that checks for a specific window: + + >>> def has_specific_window(session): + ... return any(w.name == "specific_window" for w in session.windows) + + This will likely timeout since we haven't created that window: + + >>> result = wait_for_session_condition( + ... session, + ... has_specific_window, + ... timeout=0.1, + ... raises=False + ... ) + >>> isinstance(result, bool) + True + """ + + def check_condition() -> bool: + return condition(session) + + return retry_until(check_condition, timeout, interval=interval, raises=raises) + + +def wait_for_window_condition( + window: Window, + condition: Callable[[Window], bool], + timeout: float = RETRY_TIMEOUT_SECONDS, + interval: float = RETRY_INTERVAL_SECONDS, + raises: bool = True, +) -> bool: + """Wait for a condition on the window to be true. + + Parameters + ---------- + window : Window + The tmux window to check + condition : callable + A function that takes the window and returns a boolean + timeout : float + Maximum time to wait in seconds + interval : float + Time between checks in seconds + raises : bool + Whether to raise an exception on timeout + + Returns + ------- + bool + True if the condition was met, False if timed out (and raises=False) + + Examples + -------- + Basic usage with a simple condition: + + >>> def has_panes(window): + ... return len(window.panes) > 0 + + Assuming window has at least one pane: + + >>> result = wait_for_window_condition( + ... window, + ... has_panes, + ... timeout=0.1, + ... raises=False + ... ) + >>> isinstance(result, bool) + True + + Using a lambda for a simple condition: + + >>> result = wait_for_window_condition( + ... window, + ... lambda w: len(w.panes) >= 1, + ... timeout=0.1, + ... raises=False + ... ) + >>> isinstance(result, bool) + True + + Condition that checks window layout: + + >>> def is_tiled_layout(window): + ... return window.window_layout == "tiled" + + Check for a specific layout: + + >>> result = wait_for_window_condition( + ... window, + ... is_tiled_layout, + ... timeout=0.1, + ... raises=False + ... ) + >>> isinstance(result, bool) + True + """ + + def check_condition() -> bool: + return condition(window) + + return retry_until(check_condition, timeout, interval=interval, raises=raises) + + +def wait_for_window_panes( + window: Window, + expected_count: int, + timeout: float = RETRY_TIMEOUT_SECONDS, + interval: float = RETRY_INTERVAL_SECONDS, + raises: bool = True, +) -> bool: + """Wait until window has a specific number of panes. + + Parameters + ---------- + window : Window + The tmux window to check + expected_count : int + The number of panes to wait for + timeout : float + Maximum time to wait in seconds + interval : float + Time between checks in seconds + raises : bool + Whether to raise an exception on timeout + + Returns + ------- + bool + True if the condition was met, False if timed out (and raises=False) + + Examples + -------- + Basic usage - wait for a window to have exactly 1 pane: + + >>> result = wait_for_window_panes( + ... window, + ... expected_count=1, + ... timeout=0.1, + ... raises=False + ... ) + >>> isinstance(result, bool) + True + + Wait for a window to have 2 panes (will likely timeout in this example): + + >>> result = wait_for_window_panes( + ... window, + ... expected_count=2, + ... timeout=0.1, + ... raises=False + ... ) + >>> isinstance(result, bool) + True + + In a real test, you might split the window first: + + >>> # window.split_window() # Create a new pane + >>> # Then wait for the pane count to update: + >>> # result = wait_for_window_panes(window, 2) + """ + + def check_pane_count() -> bool: + # Force refresh window panes list + panes = window.panes + return len(panes) == expected_count + + return retry_until(check_pane_count, timeout, interval=interval, raises=raises) + + +def wait_for_any_content( + pane: Pane, + content_patterns: list[str | re.Pattern[str] | Callable[[list[str]], bool]], + match_types: list[ContentMatchType] | ContentMatchType, + timeout: float = RETRY_TIMEOUT_SECONDS, + interval: float = RETRY_INTERVAL_SECONDS, + start: t.Literal["-"] | int | None = None, + end: t.Literal["-"] | int | None = None, + raises: bool = True, +) -> WaitResult: + """Wait for any of the specified content patterns to appear in a pane. + + This is useful for handling alternative expected outputs. + + Parameters + ---------- + pane : Pane + The tmux pane to check + content_patterns : list[str | re.Pattern | callable] + List of content patterns to wait for, any of which can match + match_types : list[ContentMatchType] | ContentMatchType + How to match each content_pattern against pane content + timeout : float + Maximum time to wait in seconds + interval : float + Time between checks in seconds + start : int | "-" | None + Starting line for capture_pane (passed to pane.capture_pane) + end : int | "-" | None + End line for capture_pane (passed to pane.capture_pane) + raises : bool + Whether to raise an exception on timeout + + Returns + ------- + WaitResult + Result object with success status and matched pattern information + + Raises + ------ + WaitTimeout + If raises=True and the timeout is reached before any pattern is found + TypeError + If a match type is incompatible with the specified pattern + ValueError + If match_types list has a different length than content_patterns + + Examples + -------- + Wait for any of the specified patterns: + + >>> pane.send_keys("echo 'pattern2'", enter=True) + >>> result = wait_for_any_content( + ... pane, + ... ["pattern1", "pattern2"], + ... ContentMatchType.CONTAINS + ... ) + + Wait for any of the specified regex patterns: + + >>> import re + >>> pane.send_keys("echo 'Error: this did not do the trick'", enter=True) + >>> pane.send_keys("echo 'Success: But subsequently this worked'", enter=True) + >>> result = wait_for_any_content( + ... pane, + ... [re.compile(r"Error: .*"), re.compile(r"Success: .*")], + ... ContentMatchType.REGEX + ... ) + + Wait for any of the specified predicate functions: + + >>> def has_at_least_3_lines(content): + ... return len(content) >= 3 + >>> + >>> def has_at_least_5_lines(content): + ... return len(content) >= 5 + >>> + >>> for _ in range(5): + ... pane.send_keys("echo 'A line'", enter=True) + >>> result = wait_for_any_content( + ... pane, + ... [has_at_least_3_lines, has_at_least_5_lines], + ... ContentMatchType.PREDICATE + ... ) + """ + if not content_patterns: + msg = "At least one content pattern must be provided" + raise ValueError(msg) + + # If match_types is a single value, convert to a list of the same value + if not isinstance(match_types, list): + match_types = [match_types] * len(content_patterns) + elif len(match_types) != len(content_patterns): + msg = ( + f"match_types list ({len(match_types)}) " + f"doesn't match patterns ({len(content_patterns)})" + ) + raise ValueError(msg) + + result = WaitResult(success=False) + start_time = time.time() + + def check_any_content() -> bool: + """Try to match any of the specified patterns.""" + content = pane.capture_pane(start=start, end=end) + if isinstance(content, str): + content = [content] + + result.content = content + + for i, (pattern, match_type) in enumerate( + zip(content_patterns, match_types), + ): + # Handle predicate match + if match_type == ContentMatchType.PREDICATE: + if not callable(pattern): + msg = f"Pattern at index {i}: {ERR_PREDICATE_TYPE}" + raise TypeError(msg) + # For predicate, we pass the list of content lines + if pattern(content): + result.matched_content = "\n".join(content) + result.matched_pattern_index = i + return True + continue # Try next pattern + + # Handle exact match + if match_type == ContentMatchType.EXACT: + if not isinstance(pattern, str): + msg = f"Pattern at index {i}: {ERR_EXACT_TYPE}" + raise TypeError(msg) + if "\n".join(content) == pattern: + result.matched_content = pattern + result.matched_pattern_index = i + return True + continue # Try next pattern + + # Handle contains match + if match_type == ContentMatchType.CONTAINS: + if not isinstance(pattern, str): + msg = f"Pattern at index {i}: {ERR_CONTAINS_TYPE}" + raise TypeError(msg) + content_str = "\n".join(content) + if pattern in content_str: + result.matched_content = pattern + result.matched_pattern_index = i + # Find which line contains the match + for i, line in enumerate(content): + if pattern in line: + result.match_line = i + break + return True + continue # Try next pattern + + # Handle regex match + if match_type == ContentMatchType.REGEX: + if isinstance(pattern, (str, re.Pattern)): + regex = ( + pattern + if isinstance(pattern, re.Pattern) + else re.compile(pattern) + ) + content_str = "\n".join(content) + match = regex.search(content_str) + if match: + result.matched_content = match.group(0) + result.matched_pattern_index = i + # Try to find which line contains the match + for i, line in enumerate(content): + if regex.search(line): + result.match_line = i + break + return True + continue # Try next pattern + msg = f"Pattern at index {i}: {ERR_REGEX_TYPE}" + raise TypeError(msg) + + # None of the patterns matched + return False + + try: + success, exception = retry_until_extended( + check_any_content, + timeout, + interval=interval, + raises=raises, + ) + if exception: + if raises: + raise + result.error = str(exception) + return result + result.success = success + result.elapsed_time = time.time() - start_time + except WaitTimeout as e: + if raises: + raise + result.error = str(e) + result.elapsed_time = time.time() - start_time + return result + + +def wait_for_all_content( + pane: Pane, + content_patterns: list[str | re.Pattern[str] | Callable[[list[str]], bool]], + match_types: list[ContentMatchType] | ContentMatchType, + timeout: float = RETRY_TIMEOUT_SECONDS, + interval: float = RETRY_INTERVAL_SECONDS, + start: t.Literal["-"] | int | None = None, + end: t.Literal["-"] | int | None = None, + raises: bool = True, +) -> WaitResult: + """Wait for all patterns to appear in a pane. + + This function waits until all specified patterns are found in a pane. + It supports mixed match types, allowing different patterns to be matched + in different ways. + + Parameters + ---------- + pane : Pane + The tmux pane to check + content_patterns : list[str | re.Pattern | callable] + List of patterns to wait for + match_types : list[ContentMatchType] | ContentMatchType + How to match each pattern. Either a single match type for all patterns, + or a list of match types, one for each pattern. + timeout : float + Maximum time to wait in seconds + interval : float + Time between checks in seconds + start : int | "-" | None + Starting line for capture_pane (passed to pane.capture_pane) + end : int | "-" | None + End line for capture_pane (passed to pane.capture_pane) + raises : bool + Whether to raise an exception on timeout + + Returns + ------- + WaitResult + Result object with status and match information + + Raises + ------ + WaitTimeout + If raises=True and the timeout is reached before all patterns are found + TypeError + If match types and patterns are incompatible + ValueError + If match_types list has a different length than content_patterns + + Examples + -------- + Wait for all of the specified patterns: + + >>> # Send some text to the pane that will match both patterns + >>> pane.send_keys("echo 'pattern1 pattern2'", enter=True) + >>> + >>> result = wait_for_all_content( + ... pane, + ... ["pattern1", "pattern2"], + ... ContentMatchType.CONTAINS, + ... timeout=0.5, + ... raises=False + ... ) + >>> isinstance(result, WaitResult) + True + >>> result.success + True + + Using regex patterns: + + >>> import re + >>> # Send content that matches both regex patterns + >>> pane.send_keys("echo 'Error: something went wrong'", enter=True) + >>> pane.send_keys("echo 'Success: but we fixed it'", enter=True) + >>> + >>> result = wait_for_all_content( + ... pane, + ... [re.compile(r"Error: .*"), re.compile(r"Success: .*")], + ... ContentMatchType.REGEX, + ... timeout=0.5, + ... raises=False + ... ) + >>> isinstance(result, WaitResult) + True + + Using predicate functions: + + >>> def has_at_least_3_lines(content): + ... return len(content) >= 3 + >>> + >>> def has_at_least_5_lines(content): + ... return len(content) >= 5 + >>> + >>> # Send enough lines to satisfy both predicates + >>> for _ in range(5): + ... pane.send_keys("echo 'Adding a line'", enter=True) + >>> + >>> result = wait_for_all_content( + ... pane, + ... [has_at_least_3_lines, has_at_least_5_lines], + ... ContentMatchType.PREDICATE, + ... timeout=0.5, + ... raises=False + ... ) + >>> isinstance(result, WaitResult) + True + """ + if not content_patterns: + msg = "At least one content pattern must be provided" + raise ValueError(msg) + + # Convert single match_type to list of same type + if not isinstance(match_types, list): + match_types = [match_types] * len(content_patterns) + elif len(match_types) != len(content_patterns): + msg = ( + f"match_types list ({len(match_types)}) " + f"doesn't match patterns ({len(content_patterns)})" + ) + raise ValueError(msg) + + result = WaitResult(success=False) + matched_patterns: list[str] = [] + start_time = time.time() + + def check_all_content() -> bool: + content = pane.capture_pane(start=start, end=end) + if isinstance(content, str): + content = [content] + + result.content = content + matched_patterns.clear() + + for i, (pattern, match_type) in enumerate( + zip(content_patterns, match_types), + ): + # Handle predicate match + if match_type == ContentMatchType.PREDICATE: + if not callable(pattern): + msg = f"Pattern at index {i}: {ERR_PREDICATE_TYPE}" + raise TypeError(msg) + # For predicate, we pass the list of content lines + if not pattern(content): + return False + matched_patterns.append(f"predicate_function_{i}") + continue # Pattern matched, check next + + # Handle exact match + if match_type == ContentMatchType.EXACT: + if not isinstance(pattern, str): + msg = f"Pattern at index {i}: {ERR_EXACT_TYPE}" + raise TypeError(msg) + if "\n".join(content) != pattern: + return False + matched_patterns.append(pattern) + continue # Pattern matched, check next + + # Handle contains match + if match_type == ContentMatchType.CONTAINS: + if not isinstance(pattern, str): + msg = f"Pattern at index {i}: {ERR_CONTAINS_TYPE}" + raise TypeError(msg) + content_str = "\n".join(content) + if pattern not in content_str: + return False + matched_patterns.append(pattern) + continue # Pattern matched, check next + + # Handle regex match + if match_type == ContentMatchType.REGEX: + if isinstance(pattern, (str, re.Pattern)): + regex = ( + pattern + if isinstance(pattern, re.Pattern) + else re.compile(pattern) + ) + content_str = "\n".join(content) + match = regex.search(content_str) + if not match: + return False + matched_patterns.append( + pattern if isinstance(pattern, str) else pattern.pattern, + ) + continue # Pattern matched, check next + msg = f"Pattern at index {i}: {ERR_REGEX_TYPE}" + raise TypeError(msg) + + # All patterns matched + result.matched_content = matched_patterns + return True + + try: + success, exception = retry_until_extended( + check_all_content, + timeout, + interval=interval, + raises=raises, + ) + if exception: + if raises: + raise + result.error = str(exception) + return result + result.success = success + result.elapsed_time = time.time() - start_time + except WaitTimeout as e: + if raises: + raise + result.error = str(e) + result.elapsed_time = time.time() - start_time + return result + + +def _contains_match( + content: list[str], + pattern: str, +) -> tuple[bool, str | None, int | None]: + r"""Check if content contains the pattern. + + Parameters + ---------- + content : list[str] + Lines of content to check + pattern : str + String to check for in content + + Returns + ------- + tuple[bool, str | None, int | None] + (matched, matched_content, match_line) + + Examples + -------- + Pattern found in content: + + >>> content = ["line 1", "hello world", "line 3"] + >>> matched, matched_text, line_num = _contains_match(content, "hello") + >>> matched + True + >>> matched_text + 'hello' + >>> line_num + 1 + + Pattern not found: + + >>> matched, matched_text, line_num = _contains_match(content, "not found") + >>> matched + False + >>> matched_text is None + True + >>> line_num is None + True + + Pattern spans multiple lines (in the combined content): + + >>> multi_line = ["first part", "second part"] + >>> content_str = "\n".join(multi_line) # "first part\nsecond part" + >>> # A pattern that spans the line boundary can be matched + >>> "part\nsec" in content_str + True + >>> matched, _, _ = _contains_match(multi_line, "part\nsec") + >>> matched + True + """ + content_str = "\n".join(content) + if pattern in content_str: + # Find which line contains the match + return next( + ((True, pattern, i) for i, line in enumerate(content) if pattern in line), + (True, pattern, None), + ) + + return False, None, None + + +def _regex_match( + content: list[str], + pattern: str | re.Pattern[str], +) -> tuple[bool, str | None, int | None]: + r"""Check if content matches the regex pattern. + + Parameters + ---------- + content : list[str] + Lines of content to check + pattern : str | re.Pattern + Regular expression pattern to match against content + + Returns + ------- + tuple[bool, str | None, int | None] + (matched, matched_content, match_line) + + Examples + -------- + Using string pattern: + + >>> content = ["line 1", "hello world 123", "line 3"] + >>> matched, matched_text, line_num = _regex_match(content, r"world \d+") + >>> matched + True + >>> matched_text + 'world 123' + >>> line_num + 1 + + Using compiled pattern: + + >>> import re + >>> pattern = re.compile(r"line \d") + >>> matched, matched_text, line_num = _regex_match(content, pattern) + >>> matched + True + >>> matched_text + 'line 1' + >>> line_num + 0 + + Pattern not found: + + >>> matched, matched_text, line_num = _regex_match(content, r"not found") + >>> matched + False + >>> matched_text is None + True + >>> line_num is None + True + + Matching groups in pattern: + + >>> content = ["user: john", "email: john@example.com"] + >>> pattern = re.compile(r"email: ([\w.@]+)") + >>> matched, matched_text, line_num = _regex_match(content, pattern) + >>> matched + True + >>> matched_text + 'email: john@example.com' + >>> line_num + 1 + """ + content_str = "\n".join(content) + regex = pattern if isinstance(pattern, re.Pattern) else re.compile(pattern) + + if match := regex.search(content_str): + matched_text = match.group(0) + # Try to find which line contains the match + return next( + ( + (True, matched_text, i) + for i, line in enumerate(content) + if regex.search(line) + ), + (True, matched_text, None), + ) + + return False, None, None + + +def _match_regex_across_lines( + content: list[str], + pattern: re.Pattern[str], +) -> tuple[bool, str | None, int | None]: + r"""Try to match a regex across multiple lines. + + Args: + content: List of content lines + pattern: Regex pattern to match + + Returns + ------- + (matched, matched_content, match_line) + + Examples + -------- + Pattern that spans multiple lines: + + >>> import re + >>> content = ["start of", "multi-line", "content"] + >>> pattern = re.compile(r"of\nmulti", re.DOTALL) + >>> matched, matched_text, line_num = _match_regex_across_lines(content, pattern) + >>> matched + True + >>> matched_text + 'of\nmulti' + >>> line_num + 0 + + Pattern that spans multiple lines but isn't found: + + >>> pattern = re.compile(r"not\nfound", re.DOTALL) + >>> matched, matched_text, line_num = _match_regex_across_lines(content, pattern) + >>> matched + False + >>> matched_text is None + True + >>> line_num is None + True + + Complex multi-line pattern with groups: + + >>> content = ["user: john", "email: john@example.com", "status: active"] + >>> pattern = re.compile(r"email: ([\w.@]+)\nstatus: (\w+)", re.DOTALL) + >>> matched, matched_text, line_num = _match_regex_across_lines(content, pattern) + >>> matched + True + >>> matched_text + 'email: john@example.com\nstatus: active' + >>> line_num + 1 + """ + content_str = "\n".join(content) + regex = pattern if isinstance(pattern, re.Pattern) else re.compile(pattern) + + if match := regex.search(content_str): + matched_text = match.group(0) + + # Find the starting position of the match in the joined string + start_pos = match.start() + + # Count newlines before the match to determine the starting line + newlines_before_match = content_str[:start_pos].count("\n") + return True, matched_text, newlines_before_match + + return False, None, None diff --git a/src/libtmux/common.py b/src/libtmux/common.py index db0b4151f..16c78cd06 100644 --- a/src/libtmux/common.py +++ b/src/libtmux/common.py @@ -35,6 +35,20 @@ PaneDict = dict[str, t.Any] +class CmdProtocol(t.Protocol): + """Command protocol for tmux command.""" + + def __call__(self, cmd: str, *args: t.Any, **kwargs: t.Any) -> tmux_cmd: + """Wrap tmux_cmd.""" + ... + + +class CmdMixin: + """Command mixin for tmux command.""" + + cmd: CmdProtocol + + class EnvironmentMixin: """Mixin for manager session and server level environment variables in tmux.""" @@ -450,42 +464,6 @@ def session_check_name(session_name: str | None) -> None: raise exc.BadSessionName(reason="contains colons", session_name=session_name) -def handle_option_error(error: str) -> type[exc.OptionError]: - """Raise exception if error in option command found. - - In tmux 3.0, show-option and show-window-option return invalid option instead of - unknown option. See https://github.com/tmux/tmux/blob/3.0/cmd-show-options.c. - - In tmux >2.4, there are 3 different types of option errors: - - - unknown option - - invalid option - - ambiguous option - - In tmux <2.4, unknown option was the only option. - - All errors raised will have the base error of :exc:`exc.OptionError`. So to - catch any option error, use ``except exc.OptionError``. - - Parameters - ---------- - error : str - Error response from subprocess call. - - Raises - ------ - :exc:`exc.OptionError`, :exc:`exc.UnknownOption`, :exc:`exc.InvalidOption`, - :exc:`exc.AmbiguousOption` - """ - if "unknown option" in error: - raise exc.UnknownOption(error) - if "invalid option" in error: - raise exc.InvalidOption(error) - if "ambiguous option" in error: - raise exc.AmbiguousOption(error) - raise exc.OptionError(error) # Raise generic option error - - def get_libtmux_version() -> LooseVersion: """Return libtmux version is a PEP386 compliant format. diff --git a/src/libtmux/constants.py b/src/libtmux/constants.py index b4c23ee64..3b23895b9 100644 --- a/src/libtmux/constants.py +++ b/src/libtmux/constants.py @@ -51,3 +51,35 @@ class PaneDirection(enum.Enum): PaneDirection.Right: ["-h"], PaneDirection.Left: ["-h", "-b"], } + + +class _DefaultOptionScope: + # Sentinel value for default scope + ... + + +DEFAULT_OPTION_SCOPE: _DefaultOptionScope = _DefaultOptionScope() + + +class OptionScope(enum.Enum): + """Scope used with ``set-option`` and ``show-option(s)`` commands.""" + + Server = "SERVER" + Session = "SESSION" + Window = "WINDOW" + Pane = "PANE" + + +OPTION_SCOPE_FLAG_MAP: dict[OptionScope, str] = { + OptionScope.Server: "-s", + OptionScope.Session: "", + OptionScope.Window: "-w", + OptionScope.Pane: "-p", +} + +HOOK_SCOPE_FLAG_MAP: dict[OptionScope, str] = { + OptionScope.Server: "", + OptionScope.Session: "", + OptionScope.Window: "-w", + OptionScope.Pane: "-p", +} diff --git a/src/libtmux/hooks.py b/src/libtmux/hooks.py new file mode 100644 index 000000000..759b9fe3c --- /dev/null +++ b/src/libtmux/hooks.py @@ -0,0 +1,346 @@ +"""Helpers for tmux hooks.""" + +from __future__ import annotations + +import logging +import shlex +import typing as t +import warnings + +from libtmux._internal.constants import ( + Hooks, +) +from libtmux.common import CmdMixin, has_lt_version +from libtmux.constants import ( + DEFAULT_OPTION_SCOPE, + HOOK_SCOPE_FLAG_MAP, + OptionScope, + _DefaultOptionScope, +) +from libtmux.options import handle_option_error + +if t.TYPE_CHECKING: + from typing_extensions import Self + +HookDict = dict[str, t.Any] + +logger = logging.getLogger(__name__) + + +class HooksMixin(CmdMixin): + """Mixin for manager scoped hooks in tmux. + + Require tmux 3.1+. For older versions, use raw commands. + """ + + default_hook_scope: OptionScope | None + hooks: Hooks + + def __init__(self, default_hook_scope: OptionScope | None) -> None: + """When not a user (custom) hook, scope can be implied.""" + self.default_hook_scope = default_hook_scope + self.hooks = Hooks() + + def run_hook( + self, + hook: str, + scope: OptionScope | _DefaultOptionScope | None = DEFAULT_OPTION_SCOPE, + ) -> Self: + """Run a hook immediately. Useful for testing.""" + if scope is DEFAULT_OPTION_SCOPE: + scope = self.default_hook_scope + + flags: list[str] = ["-R"] + + if scope is not None and not isinstance(scope, _DefaultOptionScope): + assert scope in HOOK_SCOPE_FLAG_MAP + + flag = HOOK_SCOPE_FLAG_MAP[scope] + if flag in {"-p", "-w"} and has_lt_version("3.2"): + warnings.warn( + "Scope flag '-w' and '-p' requires tmux 3.2+. Ignoring.", + stacklevel=2, + ) + else: + flags += (flag,) + + cmd = self.cmd( + "set-hook", + *flags, + hook, + ) + + if isinstance(cmd.stderr, list) and len(cmd.stderr): + handle_option_error(cmd.stderr[0]) + + return self + + def set_hook( + self, + hook: str, + value: int | str, + _format: bool | None = None, + unset: bool | None = None, + run: bool | None = None, + prevent_overwrite: bool | None = None, + ignore_errors: bool | None = None, + append: bool | None = None, + g: bool | None = None, + _global: bool | None = None, + scope: OptionScope | _DefaultOptionScope | None = DEFAULT_OPTION_SCOPE, + ) -> Self: + """Set hook for tmux target. + + Wraps ``$ tmux set-hook ``. + + Parameters + ---------- + hook : str + hook to set, e.g. 'aggressive-resize' + value : str + hook command. + + Raises + ------ + :exc:`exc.OptionError`, :exc:`exc.UnknownOption`, + :exc:`exc.InvalidOption`, :exc:`exc.AmbiguousOption` + """ + if scope is DEFAULT_OPTION_SCOPE: + scope = self.default_hook_scope + + flags: list[str] = [] + + if unset is not None and unset: + assert isinstance(unset, bool) + flags.append("-u") + + if run is not None and run: + assert isinstance(run, bool) + flags.append("-R") + + if _format is not None and _format: + assert isinstance(_format, bool) + flags.append("-F") + + if prevent_overwrite is not None and prevent_overwrite: + assert isinstance(prevent_overwrite, bool) + flags.append("-o") + + if ignore_errors is not None and ignore_errors: + assert isinstance(ignore_errors, bool) + flags.append("-q") + + if append is not None and append: + assert isinstance(append, bool) + flags.append("-a") + + if _global is not None and _global: + assert isinstance(_global, bool) + flags.append("-g") + + if scope is not None and not isinstance(scope, _DefaultOptionScope): + assert scope in HOOK_SCOPE_FLAG_MAP + + flag = HOOK_SCOPE_FLAG_MAP[scope] + if flag in {"-p", "-w"} and has_lt_version("3.2"): + warnings.warn( + "Scope flag '-w' and '-p' requires tmux 3.2+. Ignoring.", + stacklevel=2, + ) + else: + flags += (flag,) + + cmd = self.cmd( + "set-hook", + *flags, + hook, + value, + ) + + if isinstance(cmd.stderr, list) and len(cmd.stderr): + handle_option_error(cmd.stderr[0]) + + return self + + def unset_hook( + self, + hook: str, + _global: bool | None = None, + ignore_errors: bool | None = None, + scope: OptionScope | _DefaultOptionScope | None = DEFAULT_OPTION_SCOPE, + ) -> Self: + """Unset hook for tmux target. + + Wraps ``$ tmux set-hook -u `` / ``$ tmux set-hook -U `` + + Parameters + ---------- + hook : str + hook to unset, e.g. 'after-show-environment' + + Raises + ------ + :exc:`exc.OptionError`, :exc:`exc.UnknownOption`, + :exc:`exc.InvalidOption`, :exc:`exc.AmbiguousOption` + """ + if scope is DEFAULT_OPTION_SCOPE: + scope = self.default_hook_scope + + flags: list[str] = ["-u"] + + if ignore_errors is not None and ignore_errors: + assert isinstance(ignore_errors, bool) + flags.append("-q") + + if _global is not None and _global: + assert isinstance(_global, bool) + flags.append("-g") + + if scope is not None and not isinstance(scope, _DefaultOptionScope): + assert scope in HOOK_SCOPE_FLAG_MAP + + flag = HOOK_SCOPE_FLAG_MAP[scope] + if flag in {"-p", "-w"} and has_lt_version("3.2"): + warnings.warn( + "Scope flag '-w' and '-p' requires tmux 3.2+. Ignoring.", + stacklevel=2, + ) + else: + flags += (flag,) + + cmd = self.cmd( + "set-hook", + *flags, + hook, + ) + + if isinstance(cmd.stderr, list) and len(cmd.stderr): + handle_option_error(cmd.stderr[0]) + + return self + + def show_hooks( + self, + _global: bool | None = False, + scope: OptionScope | _DefaultOptionScope | None = DEFAULT_OPTION_SCOPE, + ignore_errors: bool | None = None, + ) -> HookDict: + """Return a dict of hooks for the target.""" + if scope is DEFAULT_OPTION_SCOPE: + scope = self.default_hook_scope + + flags: tuple[str, ...] = () + + if _global: + flags += ("-g",) + + if scope is not None and not isinstance(scope, _DefaultOptionScope): + assert scope in HOOK_SCOPE_FLAG_MAP + + flag = HOOK_SCOPE_FLAG_MAP[scope] + if flag in {"-p", "-w"} and has_lt_version("3.2"): + warnings.warn( + "Scope flag '-w' and '-p' requires tmux 3.2+. Ignoring.", + stacklevel=2, + ) + else: + flags += (flag,) + + if ignore_errors is not None and ignore_errors: + assert isinstance(ignore_errors, bool) + flags += ("-q",) + + cmd = self.cmd("show-hooks", *flags) + output = cmd.stdout + hooks: HookDict = {} + for item in output: + try: + key, val = shlex.split(item) + except ValueError: + logger.warning(f"Error extracting hook: {item}") + key, val = item, None + assert isinstance(key, str) + assert isinstance(val, str) or val is None + + if isinstance(val, str) and val.isdigit(): + hooks[key] = int(val) + + return hooks + + def _show_hook( + self, + hook: str, + _global: bool = False, + scope: OptionScope | _DefaultOptionScope | None = DEFAULT_OPTION_SCOPE, + ignore_errors: bool | None = None, + ) -> list[str] | None: + """Return value for the hook. + + Parameters + ---------- + hook : str + + Raises + ------ + :exc:`exc.OptionError`, :exc:`exc.UnknownOption`, + :exc:`exc.InvalidOption`, :exc:`exc.AmbiguousOption` + """ + if scope is DEFAULT_OPTION_SCOPE: + scope = self.default_hook_scope + + flags: tuple[str | int, ...] = () + + if _global: + flags += ("-g",) + + if scope is not None and not isinstance(scope, _DefaultOptionScope): + assert scope in HOOK_SCOPE_FLAG_MAP + + flag = HOOK_SCOPE_FLAG_MAP[scope] + if flag in {"-p", "-w"} and has_lt_version("3.2"): + warnings.warn( + "Scope flag '-w' and '-p' requires tmux 3.2+. Ignoring.", + stacklevel=2, + ) + else: + flags += (flag,) + + if ignore_errors is not None and ignore_errors: + flags += ("-q",) + + flags += (hook,) + + cmd = self.cmd("show-hooks", *flags) + + if len(cmd.stderr): + handle_option_error(cmd.stderr[0]) + + return cmd.stdout + + def show_hook( + self, + hook: str, + _global: bool = False, + scope: OptionScope | _DefaultOptionScope | None = DEFAULT_OPTION_SCOPE, + ignore_errors: bool | None = None, + ) -> str | int | None: + """Return value for the hook. + + Parameters + ---------- + hook : str + + Raises + ------ + :exc:`exc.OptionError`, :exc:`exc.UnknownOption`, + :exc:`exc.InvalidOption`, :exc:`exc.AmbiguousOption` + """ + hooks_output = self._show_hook( + hook=hook, + scope=scope, + ignore_errors=ignore_errors, + ) + if hooks_output is None: + return None + hooks = Hooks.from_stdout(hooks_output) + return getattr(hooks, hook.replace("-", "_"), None) diff --git a/src/libtmux/options.py b/src/libtmux/options.py new file mode 100644 index 000000000..61c48c622 --- /dev/null +++ b/src/libtmux/options.py @@ -0,0 +1,1155 @@ +# ruff: NOQA: E501 +"""Helpers for tmux options. + +Option parsing function trade testability and clarity for performance. + +Tmux options +------------ + +Options in tmux consist of empty values, strings, integers, arrays, and complex shapes. + +Marshalling types from text: + +Integers: ``buffer-limit 50`` to ``{'buffer-limit': 50}`` +Booleans: ``exit-unattached on`` to ``{'exit-unattached': True}`` + +Exploding arrays: + +``command-alias[1] split-pane=split-window`` to +``{'command-alias[1]': {'split-pane=split-window'}}`` + +However, there is no equivalent to the above type of object in Python (a sparse array), +so a SparseArray is used. + +Exploding complex shapes: + +``"choose-session=choose-tree -s"`` to ``{'choose-session': 'choose-tree -s'}`` + +Finally, we need to convert hyphenated keys to underscored attribute names and assign +values, as python does not allow hyphens in attribute names. + +``command-alias`` is ``command_alias`` in python. + +Options object +-------------- +Dataclasses are used to provide typed access to tmux' option shape. + +Extra data gleaned from the options, such as user options (custom data) and an option +being inherited, + +User options +------------ +There are also custom user options, preceded with @, which exist are stored to +`Options.context.user_options` as a dictionary. + +> tmux set-option -w my-custom-variable my-value +invalid option: my-custom-option + +> tmux set-option -w @my-custom-option my-value +> tmux show-option -w +@my-custom-optione my-value + +Inherited options +----------------- + +`tmux show-options` -A can include inherited options. The raw output of an inherited +option is detected by the key having a *: + +``` +visual-activity* on +visual-bell* off +``` + +A list of options that are inherited is kept at `Options.context._inherited_options` and +`Options.context.inherited_options`. + +They are mixed with the normal options, +to differentiate them, run `show_options()` without ``include_inherited=True``. +""" + +from __future__ import annotations + +import io +import logging +import re +import shlex +import typing as t +import warnings + +from libtmux._internal.sparse_array import SparseArray +from libtmux.common import CmdMixin +from libtmux.constants import ( + DEFAULT_OPTION_SCOPE, + OPTION_SCOPE_FLAG_MAP, + OptionScope, + _DefaultOptionScope, +) + +from . import exc + +if t.TYPE_CHECKING: + from typing_extensions import Self, TypeAlias + + from libtmux._internal.constants import TerminalFeatures + from libtmux.common import tmux_cmd + + +TerminalOverride = dict[str, t.Optional[str]] +TerminalOverrides = dict[str, TerminalOverride] +CommandAliases = dict[str, str] + +OptionDict: TypeAlias = dict[str, t.Any] +UntypedOptionsDict: TypeAlias = dict[str, t.Optional[str]] +ExplodedUntypedOptionsDict: TypeAlias = dict[ + str, + t.Union[ + str, + int, + list[str], + dict[ + str, + list[str], + ], + ], +] +ExplodedComplexUntypedOptionsDict: TypeAlias = dict[ + str, + t.Optional[ + t.Union[ + str, + int, + list[t.Union[str, int]], + dict[str, list[t.Union[str, int]]], + SparseArray[t.Union[str, int]], + ] + ], +] + +logger = logging.getLogger(__name__) + + +def handle_option_error(error: str) -> type[exc.OptionError]: + """Raise exception if error in option command found. + + In tmux 3.0, show-option and show-window-option return invalid option instead of + unknown option. See https://github.com/tmux/tmux/blob/3.0/cmd-show-options.c. + + In tmux >2.4, there are 3 different types of option errors: + + - unknown option + - invalid option + - ambiguous option + + In tmux <2.4, unknown option was the only option. + + All errors raised will have the base error of :exc:`exc.OptionError`. So to + catch any option error, use ``except exc.OptionError``. + + Parameters + ---------- + error : str + Error response from subprocess call. + + Raises + ------ + :exc:`exc.OptionError`, :exc:`exc.UnknownOption`, :exc:`exc.InvalidOption`, + :exc:`exc.AmbiguousOption` + + Examples + -------- + >>> result = server.cmd( + ... 'set-option', + ... 'unknown-option-name', + ... ) + + >>> bool(isinstance(result.stderr, list) and len(result.stderr)) + True + + >>> import pytest + >>> from libtmux import exc + + >>> with pytest.raises(exc.OptionError): + ... handle_option_error(result.stderr[0]) + """ + if "unknown option" in error: + raise exc.UnknownOption(error) + if "invalid option" in error: + raise exc.InvalidOption(error) + if "ambiguous option" in error: + raise exc.AmbiguousOption(error) + raise exc.OptionError(error) # Raise generic option error + + +_V = t.TypeVar("_V") +ConvertedValue: TypeAlias = t.Union[str, int, bool, None] +ConvertedValues: TypeAlias = t.Union[ + ConvertedValue, + list[ConvertedValue], + dict[str, ConvertedValue], + SparseArray[ConvertedValue], +] + + +def convert_value( + value: _V | None, +) -> ConvertedValue | _V | None: + """Convert raw option strings to python types. + + Examples + -------- + >>> convert_value("on") + True + >>> convert_value("off") + False + + >>> convert_value("1") + 1 + >>> convert_value("50") + 50 + + >>> convert_value("%50") + '%50' + """ + if not isinstance(value, str): + return value + + if value.isdigit(): + return int(value) + + if value == "on": + return True + + if value == "off": + return False + + return value + + +def convert_values( + value: _V | None, +) -> ConvertedValues | _V | None: + """Recursively convert values to python types via :func:`convert_value`. + + >>> convert_values(None) + + >>> convert_values("on") + True + >>> convert_values("off") + False + + >>> convert_values(["on"]) + [True] + >>> convert_values(["off"]) + [False] + + >>> convert_values({"window_index": "1"}) + {'window_index': 1} + + >>> convert_values({"visual-bell": "on"}) + {'visual-bell': True} + """ + if value is None: + return None + if isinstance(value, dict): + for k, v in value.items(): + value[k] = convert_value(v) + return value + if isinstance(value, SparseArray): + for v in value.iter_values(): + value[v] = convert_value(v) + return value + if isinstance(value, list): + for idx, v in enumerate(value): + value[idx] = convert_value(v) + return value + return convert_value(value) + + +def parse_options_to_dict( + stdout: t.IO[str], +) -> UntypedOptionsDict: + r"""Process subprocess.stdout options or hook output to flat, naive, untyped dict. + + Does not explode arrays or deep values. + + Examples + -------- + >>> import io + + >>> raw_options = io.StringIO("status-keys vi") + >>> parse_options_to_dict(raw_options) == {"status-keys": "vi"} + True + + >>> int_options = io.StringIO("message-limit 50") + >>> parse_options_to_dict(int_options) == {"message-limit": "50"} + True + + >>> empty_option = io.StringIO("user-keys") + >>> parse_options_to_dict(empty_option) == {"user-keys": None} + True + + >>> array_option = io.StringIO("command-alias[0] split-pane=split-window") + >>> parse_options_to_dict(array_option) == { + ... "command-alias[0]": "split-pane=split-window"} + True + + >>> array_option = io.StringIO("command-alias[40] split-pane=split-window") + >>> parse_options_to_dict(array_option) == { + ... "command-alias[40]": "split-pane=split-window"} + True + + >>> many_options = io.StringIO(r'''status-keys + ... command-alias[0] split-pane=split-window + ... ''') + >>> parse_options_to_dict(many_options) == { + ... "command-alias[0]": "split-pane=split-window", + ... "status-keys": None,} + True + + >>> many_more_options = io.StringIO(r''' + ... terminal-features[0] xterm*:clipboard:ccolour:cstyle:focus + ... terminal-features[1] screen*:title + ... ''') + >>> parse_options_to_dict(many_more_options) == { + ... "terminal-features[0]": "xterm*:clipboard:ccolour:cstyle:focus", + ... "terminal-features[1]": "screen*:title",} + True + + >>> quoted_option = io.StringIO(r''' + ... command-alias[0] "choose-session=choose-tree -s" + ... ''') + >>> parse_options_to_dict(quoted_option) == { + ... "command-alias[0]": "choose-session=choose-tree -s", + ... } + True + """ + output: UntypedOptionsDict = {} + + val: ConvertedValue | None = None + + for item in stdout.readlines(): + if " " in item: + try: + key, val = shlex.split(item) + except ValueError: + key, val = item.split(" ", maxsplit=1) + else: + key, val = item, None + key = key.strip() + + if key: + if isinstance(val, str) and val.endswith("\n"): + val = val.rstrip("\n") + + output[key] = val + return output + + +def explode_arrays( + _dict: UntypedOptionsDict, + force_array: bool = False, +) -> ExplodedUntypedOptionsDict: + """Explode flat, naive options dict's option arrays. + + Examples + -------- + >>> import io + + >>> many_more_options = io.StringIO(r''' + ... terminal-features[0] xterm*:clipboard:ccolour:cstyle:focus + ... terminal-features[1] screen*:title + ... ''') + >>> many_more_flat_dict = parse_options_to_dict(many_more_options) + >>> many_more_flat_dict == { + ... "terminal-features[0]": "xterm*:clipboard:ccolour:cstyle:focus", + ... "terminal-features[1]": "screen*:title",} + True + >>> explode_arrays(many_more_flat_dict) == { + ... "terminal-features": {0: "xterm*:clipboard:ccolour:cstyle:focus", + ... 1: "screen*:title"}} + True + + tmux arrays allow non-sequential indexes, so we need to support that: + + >>> explode_arrays(parse_options_to_dict(io.StringIO(r''' + ... terminal-features[0] xterm*:clipboard:ccolour:cstyle:focus + ... terminal-features[5] screen*:title + ... '''))) == { + ... "terminal-features": {0: "xterm*:clipboard:ccolour:cstyle:focus", + ... 5: "screen*:title"}} + True + """ + options: dict[str, t.Any] = {} + for key, val in _dict.items(): + Default: type[dict[t.Any, t.Any] | SparseArray[str | int | bool | None]] = ( + dict if isinstance(key, str) and key == "terminal-features" else SparseArray + ) + if "[" not in key: + if force_array: + options[key] = Default() + if val is not None: + options[key][0] = val + else: + options[key] = val + continue + + try: + matchgroup = re.match( + r"(?P[\w-]+)(\[(?P\d+)\])?", + key, + ) + if matchgroup is not None: + match = matchgroup.groupdict() + if match.get("hook") and match.get("index"): + key = match["hook"] + index = int(match["index"]) + + if options.get(key) is None: + options[key] = Default() + options[key][index] = val + except Exception: + if force_array and val: + options[key] = Default() + if isinstance(options[key], SparseArray): + options[key][0] = val + else: + options[key] = val + logger.exception("Error parsing options") + return options + + +def explode_complex( + _dict: ExplodedUntypedOptionsDict, +) -> ExplodedComplexUntypedOptionsDict: + r"""Explode arrayed option's complex values. + + Examples + -------- + >>> import io + + >>> explode_complex(explode_arrays(parse_options_to_dict(io.StringIO(r''' + ... terminal-features[0] xterm*:clipboard:ccolour:cstyle:focus + ... terminal-features[5] screen*:title + ... ''')))) + {'terminal-features': {'xterm*': ['clipboard', 'ccolour', 'cstyle', 'focus'], 'screen*': ['title']}} + + >>> explode_complex(explode_arrays(parse_options_to_dict(io.StringIO(r''' + ... terminal-features[0] xterm*:clipboard:ccolour:cstyle:focus + ... terminal-features[5] screen*:title + ... ''')))) == { + ... "terminal-features": {"xterm*": ["clipboard", "ccolour", "cstyle", "focus"], + ... "screen*": ["title"]}} + True + + >>> explode_complex(explode_arrays(parse_options_to_dict(io.StringIO(r''' + ... command-alias[0] split-pane=split-window + ... command-alias[1] splitp=split-window + ... command-alias[2] "server-info=show-messages -JT" + ... ''')))) == { + ... "command-alias": {"split-pane": "split-window", + ... "splitp": "split-window", + ... "server-info": "show-messages -JT"}} + True + + >>> explode_complex(explode_arrays({"terminal-features": {0: "xterm*:clipboard:ccolour:cstyle:focus", + ... 1: "screen*:title"}})) + {'terminal-features': {0: 'xterm*:clipboard:ccolour:cstyle:focus', 1: 'screen*:title'}} + + >>> explode_complex(explode_arrays({"terminal-features": {0: "xterm*:clipboard:ccolour:cstyle:focus", + ... 8: "screen*:title"}})) == SparseArray({'terminal-features': {0: + ... 'xterm*:clipboard:ccolour:cstyle:focus', 8: 'screen*:title'}}) + True + + >>> explode_complex(explode_arrays(parse_options_to_dict(io.StringIO(r''' + ... terminal-overrides[0] xterm-256color:Tc + ... terminal-overrides[1] *:U8=0 + ... ''')))) == { + ... "terminal-overrides": {"xterm-256color": {"Tc": None}, + ... "*": {"U8": 0}}} + True + + >>> explode_complex(explode_arrays(parse_options_to_dict(io.StringIO(r''' + ... user-keys[100] "\e[test" + ... user-keys[6] "\e\n" + ... user-keys[0] "\e[5;30012~" + ... ''')))) == { + ... "user-keys": {0: "\\e[5;30012~", + ... 6: "\\e\\n", + ... 100: "\\e[test"}} + True + + >>> explode_complex(explode_arrays(parse_options_to_dict(io.StringIO(r''' + ... status-format[0] "#[align=left range=left #{E:status-left-style}]#[push-default]#{T;=/#{status-left-length}:status-left}#[pop-default]#[norange default]#[list=on align=#{status-justify}]#[list=left-marker]<#[list=right-marker]>#[list=on]#{W:#[range=window|#{window_index} #{E:window-status-style}#{?#{&&:#{window_last_flag},#{!=:#{E:window-status-last-style},default}}, #{E:window-status-last-style},}#{?#{&&:#{window_bell_flag},#{!=:#{E:window-status-bell-style},default}}, #{E:window-status-bell-style},#{?#{&&:#{||:#{window_activity_flag},#{window_silence_flag}},#{!=:#{E:window-status-activity-style},default}}, #{E:window-status-activity-style},}}]#[push-default]#{T:window-status-format}#[pop-default]#[norange default]#{?window_end_flag,,#{window-status-separator}},#[range=window|#{window_index} list=focus #{?#{!=:#{E:window-status-current-style},default},#{E:window-status-current-style},#{E:window-status-style}}#{?#{&&:#{window_last_flag},#{!=:#{E:window-status-last-style},default}}, #{E:window-status-last-style},}#{?#{&&:#{window_bell_flag},#{!=:#{E:window-status-bell-style},default}}, #{E:window-status-bell-style},#{?#{&&:#{||:#{window_activity_flag},#{window_silence_flag}},#{!=:#{E:window-status-activity-style},default}}, #{E:window-status-activity-style},}}]#[push-default]#{T:window-status-current-format}#[pop-default]#[norange list=on default]#{?window_end_flag,,#{window-status-separator}}}#[nolist align=right range=right #{E:status-right-style}]#[push-default]#{T;=/#{status-right-length}:status-right}#[pop-default]#[norange default]" + ... status-format[1] "#[align=centre]#{P:#{?pane_active,#[reverse],}#{pane_index}[#{pane_width}x#{pane_height}]#[default] }" + ... ''')))) == { + ... "status-format": {0: "#[align=left range=left #{E:status-left-style}]#[push-default]#{T;=/#{status-left-length}:status-left}#[pop-default]#[norange default]#[list=on align=#{status-justify}]#[list=left-marker]<#[list=right-marker]>#[list=on]#{W:#[range=window|#{window_index} #{E:window-status-style}#{?#{&&:#{window_last_flag},#{!=:#{E:window-status-last-style},default}}, #{E:window-status-last-style},}#{?#{&&:#{window_bell_flag},#{!=:#{E:window-status-bell-style},default}}, #{E:window-status-bell-style},#{?#{&&:#{||:#{window_activity_flag},#{window_silence_flag}},#{!=:#{E:window-status-activity-style},default}}, #{E:window-status-activity-style},}}]#[push-default]#{T:window-status-format}#[pop-default]#[norange default]#{?window_end_flag,,#{window-status-separator}},#[range=window|#{window_index} list=focus #{?#{!=:#{E:window-status-current-style},default},#{E:window-status-current-style},#{E:window-status-style}}#{?#{&&:#{window_last_flag},#{!=:#{E:window-status-last-style},default}}, #{E:window-status-last-style},}#{?#{&&:#{window_bell_flag},#{!=:#{E:window-status-bell-style},default}}, #{E:window-status-bell-style},#{?#{&&:#{||:#{window_activity_flag},#{window_silence_flag}},#{!=:#{E:window-status-activity-style},default}}, #{E:window-status-activity-style},}}]#[push-default]#{T:window-status-current-format}#[pop-default]#[norange list=on default]#{?window_end_flag,,#{window-status-separator}}}#[nolist align=right range=right #{E:status-right-style}]#[push-default]#{T;=/#{status-right-length}:status-right}#[pop-default]#[norange default]", + ... 1: "#[align=centre]#{P:#{?pane_active,#[reverse],}#{pane_index}[#{pane_width}x#{pane_height}]#[default] }", + ... }} + True + """ + options: dict[str, t.Any] = {} + for key, val in _dict.items(): + try: + if isinstance(val, SparseArray) and key == "terminal-features": + new_val: TerminalFeatures = {} + + for item in val.iter_values(): + try: + term, features = item.split(":", maxsplit=1) + new_val[term] = features.split(":") + except Exception: # NOQA: PERF203 + logger.exception("Error parsing options") + options[key] = new_val + continue + if isinstance(val, SparseArray) and key == "terminal-overrides": + new_overrides: TerminalOverrides = {} + + for item in val.iter_values(): + try: + term, features = item.split(":", maxsplit=1) + if term not in new_overrides: + new_overrides[term] = {} + if features and "=" in features: + k, v = features.split("=") + + if v.isdigit(): + v = int(v) + + new_overrides[term][k] = v + else: + new_overrides[term][features] = None + except Exception: # NOQA: PERF203 + logger.exception("Error parsing options") + options[key] = new_overrides + continue + if isinstance(val, SparseArray) and key == "command-alias": + new_aliases: CommandAliases = {} + + for item in val.iter_values(): + try: + alias, command = item.split("=", maxsplit=1) + if options.get(key) is None or not isinstance( + options.get(key), + dict, + ): + options[key] = {} + new_aliases[alias] = command + except Exception: # NOQA: PERF203 + logger.exception("Error parsing options") + options[key] = new_aliases + continue + options[key] = val + continue + + except Exception: + options[key] = val + logger.exception("Error parsing options") + return options + + +class OptionsMixin(CmdMixin): + """Mixin for managing tmux options based on scope.""" + + default_option_scope: OptionScope | None + + def __init__(self, default_option_scope: OptionScope | None = None) -> None: + """When not a user (custom) option, scope can be implied.""" + if default_option_scope is not None: + self.default_option_scope = default_option_scope + + def set_option( + self, + option: str, + value: int | str, + _format: bool | None = None, + prevent_overwrite: bool | None = None, + ignore_errors: bool | None = None, + suppress_warnings: bool | None = None, + append: bool | None = None, + g: bool | None = None, + global_: bool | None = None, + scope: OptionScope | _DefaultOptionScope | None = DEFAULT_OPTION_SCOPE, + ) -> Self: + """Set option for tmux target. + + Wraps ``$ tmux set-option