|
3 | 3 | from __future__ import annotations
|
4 | 4 |
|
5 | 5 | import textwrap
|
| 6 | +import time |
6 | 7 | import typing as t
|
7 | 8 |
|
8 | 9 | if t.TYPE_CHECKING:
|
| 10 | + import pathlib |
| 11 | + |
9 | 12 | import pytest
|
10 | 13 |
|
| 14 | + from libtmux.server import Server |
| 15 | + |
11 | 16 |
|
12 | 17 | def test_plugin(
|
13 | 18 | pytester: pytest.Pytester,
|
@@ -71,3 +76,83 @@ def test_repo_git_remote_checkout(
|
71 | 76 | # Test
|
72 | 77 | result = pytester.runpytest(str(first_test_filename))
|
73 | 78 | result.assert_outcomes(passed=1)
|
| 79 | + |
| 80 | + |
| 81 | +def test_temp_server(TempServer: t.Callable[..., Server]) -> None: |
| 82 | + """Test TempServer creates and cleans up server.""" |
| 83 | + server = TempServer() |
| 84 | + assert server.is_alive() is False # Server not started yet |
| 85 | + |
| 86 | + session = server.new_session() |
| 87 | + assert server.is_alive() is True |
| 88 | + assert len(server.sessions) == 1 |
| 89 | + assert session.session_name is not None |
| 90 | + |
| 91 | + # Test socket name is unique |
| 92 | + assert server.socket_name is not None |
| 93 | + assert server.socket_name.startswith("libtmux_test") |
| 94 | + |
| 95 | + # Each call creates a new server with unique socket |
| 96 | + server2 = TempServer() |
| 97 | + assert server2.socket_name is not None |
| 98 | + assert server2.socket_name.startswith("libtmux_test") |
| 99 | + assert server2.socket_name != server.socket_name |
| 100 | + |
| 101 | + |
| 102 | +def test_temp_server_with_config( |
| 103 | + TempServer: t.Callable[..., Server], |
| 104 | + tmp_path: pathlib.Path, |
| 105 | +) -> None: |
| 106 | + """Test TempServer with config file.""" |
| 107 | + config_file = tmp_path / "tmux.conf" |
| 108 | + config_file.write_text("set -g status off", encoding="utf-8") |
| 109 | + |
| 110 | + server = TempServer(config_file=str(config_file)) |
| 111 | + session = server.new_session() |
| 112 | + |
| 113 | + # Verify config was loaded |
| 114 | + assert session.cmd("show-options", "-g", "status").stdout[0] == "status off" |
| 115 | + |
| 116 | + |
| 117 | +def test_temp_server_cleanup(TempServer: t.Callable[..., Server]) -> None: |
| 118 | + """Test TempServer properly cleans up after itself.""" |
| 119 | + server = TempServer() |
| 120 | + socket_name = server.socket_name |
| 121 | + assert socket_name is not None |
| 122 | + |
| 123 | + # Create multiple sessions |
| 124 | + server.new_session(session_name="test1") |
| 125 | + server.new_session(session_name="test2") |
| 126 | + assert len(server.sessions) == 2 |
| 127 | + |
| 128 | + # Verify server is alive |
| 129 | + assert server.is_alive() is True |
| 130 | + |
| 131 | + # Delete server and verify cleanup |
| 132 | + server.kill() |
| 133 | + time.sleep(0.1) # Give time for cleanup |
| 134 | + |
| 135 | + # Create new server to verify old one was cleaned up |
| 136 | + new_server = TempServer() |
| 137 | + assert new_server.is_alive() is False # Server not started yet |
| 138 | + new_server.new_session() # This should work if old server was cleaned up |
| 139 | + assert new_server.is_alive() is True |
| 140 | + |
| 141 | + |
| 142 | +def test_temp_server_multiple(TempServer: t.Callable[..., Server]) -> None: |
| 143 | + """Test multiple TempServer instances can coexist.""" |
| 144 | + server1 = TempServer() |
| 145 | + server2 = TempServer() |
| 146 | + |
| 147 | + # Each server should have a unique socket |
| 148 | + assert server1.socket_name != server2.socket_name |
| 149 | + |
| 150 | + # Create sessions in each server |
| 151 | + server1.new_session(session_name="test1") |
| 152 | + server2.new_session(session_name="test2") |
| 153 | + |
| 154 | + # Verify sessions are in correct servers |
| 155 | + assert any(s.session_name == "test1" for s in server1.sessions) |
| 156 | + assert any(s.session_name == "test2" for s in server2.sessions) |
| 157 | + assert not any(s.session_name == "test1" for s in server2.sessions) |
| 158 | + assert not any(s.session_name == "test2" for s in server1.sessions) |
0 commit comments