|
1 | 1 | # Copyright (C) 2016 Adrien Vergé
|
| 2 | +# Copyright (C) 2023–2025 Jason Yundt |
2 | 3 | #
|
3 | 4 | # This program is free software: you can redistribute it and/or modify
|
4 | 5 | # it under the terms of the GNU General Public License as published by
|
|
13 | 14 | # You should have received a copy of the GNU General Public License
|
14 | 15 | # along with this program. If not, see <http://www.gnu.org/licenses/>.
|
15 | 16 |
|
| 17 | +import codecs |
16 | 18 | import contextlib
|
17 | 19 | from io import StringIO
|
18 | 20 | import os
|
19 | 21 | import shutil
|
20 | 22 | import sys
|
21 | 23 | import tempfile
|
22 | 24 | import unittest
|
| 25 | +import warnings |
| 26 | +from codecs import CodecInfo |
23 | 27 |
|
24 | 28 | import yaml
|
25 | 29 |
|
26 | 30 | from yamllint import linter
|
27 | 31 | from yamllint.config import YamlLintConfig
|
28 | 32 |
|
29 | 33 |
|
| 34 | +# Encoding related stuff: |
| 35 | +UTF_CODECS = ( |
| 36 | + 'utf_32_be', |
| 37 | + 'utf_32_be_sig', |
| 38 | + 'utf_32_le', |
| 39 | + 'utf_32_le_sig', |
| 40 | + 'utf_16_be', |
| 41 | + 'utf_16_be_sig', |
| 42 | + 'utf_16_le', |
| 43 | + 'utf_16_le_sig', |
| 44 | + 'utf_8', |
| 45 | + 'utf_8_sig' |
| 46 | +) |
| 47 | + |
| 48 | + |
| 49 | +def encode_utf_32_be_sig(obj): |
| 50 | + return ( |
| 51 | + codecs.BOM_UTF32_BE + codecs.encode(obj, 'utf_32_be', 'strict'), |
| 52 | + len(obj) |
| 53 | + ) |
| 54 | + |
| 55 | + |
| 56 | +def encode_utf_32_le_sig(obj): |
| 57 | + return ( |
| 58 | + codecs.BOM_UTF32_LE + codecs.encode(obj, 'utf_32_le', 'strict'), |
| 59 | + len(obj) |
| 60 | + ) |
| 61 | + |
| 62 | + |
| 63 | +def encode_utf_16_be_sig(obj): |
| 64 | + return ( |
| 65 | + codecs.BOM_UTF16_BE + codecs.encode(obj, 'utf_16_be', 'strict'), |
| 66 | + len(obj) |
| 67 | + ) |
| 68 | + |
| 69 | + |
| 70 | +def encode_utf_16_le_sig(obj): |
| 71 | + return ( |
| 72 | + codecs.BOM_UTF16_LE + codecs.encode(obj, 'utf_16_le', 'strict'), |
| 73 | + len(obj) |
| 74 | + ) |
| 75 | + |
| 76 | + |
| 77 | +test_codec_infos = { |
| 78 | + 'utf_32_be_sig': |
| 79 | + CodecInfo(encode_utf_32_be_sig, codecs.getdecoder('utf_32')), |
| 80 | + 'utf_32_le_sig': |
| 81 | + CodecInfo(encode_utf_32_le_sig, codecs.getdecoder('utf_32')), |
| 82 | + 'utf_16_be_sig': |
| 83 | + CodecInfo(encode_utf_16_be_sig, codecs.getdecoder('utf_16')), |
| 84 | + 'utf_16_le_sig': |
| 85 | + CodecInfo(encode_utf_16_le_sig, codecs.getdecoder('utf_16')), |
| 86 | +} |
| 87 | + |
| 88 | + |
| 89 | +def register_test_codecs(): |
| 90 | + codecs.register(test_codec_infos.get) |
| 91 | + |
| 92 | + |
| 93 | +def unregister_test_codecs(): |
| 94 | + if sys.version_info >= (3, 10, 0): |
| 95 | + codecs.unregister(test_codec_infos.get) |
| 96 | + else: |
| 97 | + warnings.warn( |
| 98 | + "This version of Python doesn’t allow us to unregister codecs.", |
| 99 | + stacklevel=1 |
| 100 | + ) |
| 101 | + |
| 102 | + |
| 103 | +def is_test_codec(codec): |
| 104 | + return codec in test_codec_infos.keys() |
| 105 | + |
| 106 | + |
| 107 | +def test_codec_built_in_equivalent(test_codec): |
| 108 | + return_value = test_codec |
| 109 | + for suffix in ('_sig', '_be', '_le'): |
| 110 | + return_value = return_value.replace(suffix, '') |
| 111 | + return return_value |
| 112 | + |
| 113 | + |
| 114 | +def uses_bom(codec): |
| 115 | + for suffix in ('_32', '_16', '_sig'): |
| 116 | + if codec.endswith(suffix): |
| 117 | + return True |
| 118 | + return False |
| 119 | + |
| 120 | + |
| 121 | +def encoding_detectable(string, codec): |
| 122 | + """ |
| 123 | + Returns True if encoding can be detected after string is encoded |
| 124 | +
|
| 125 | + Encoding detection only works if you’re using a BOM or the first character |
| 126 | + is ASCII. See yamllint.decoder.auto_decode()’s docstring. |
| 127 | + """ |
| 128 | + return uses_bom(codec) or (len(string) > 0 and string[0].isascii()) |
| 129 | + |
| 130 | + |
| 131 | +# Workspace related stuff: |
| 132 | +class Blob: |
| 133 | + def __init__(self, text, encoding): |
| 134 | + self.text = text |
| 135 | + self.encoding = encoding |
| 136 | + |
| 137 | + |
| 138 | +def build_temp_workspace(files): |
| 139 | + tempdir = tempfile.mkdtemp(prefix='yamllint-tests-') |
| 140 | + |
| 141 | + for path, content in files.items(): |
| 142 | + path = os.fsencode(os.path.join(tempdir, path)) |
| 143 | + if not os.path.exists(os.path.dirname(path)): |
| 144 | + os.makedirs(os.path.dirname(path)) |
| 145 | + |
| 146 | + if isinstance(content, list): |
| 147 | + os.mkdir(path) |
| 148 | + elif isinstance(content, str) and content.startswith('symlink://'): |
| 149 | + os.symlink(content[10:], path) |
| 150 | + else: |
| 151 | + if isinstance(content, Blob): |
| 152 | + content = content.text.encode(content.encoding) |
| 153 | + mode = 'wb' if isinstance(content, bytes) else 'w' |
| 154 | + with open(path, mode) as f: |
| 155 | + f.write(content) |
| 156 | + |
| 157 | + return tempdir |
| 158 | + |
| 159 | + |
| 160 | +@contextlib.contextmanager |
| 161 | +def temp_workspace(files): |
| 162 | + """Provide a temporary workspace that is automatically cleaned up.""" |
| 163 | + backup_wd = os.getcwd() |
| 164 | + wd = build_temp_workspace(files) |
| 165 | + |
| 166 | + try: |
| 167 | + os.chdir(wd) |
| 168 | + yield |
| 169 | + finally: |
| 170 | + os.chdir(backup_wd) |
| 171 | + shutil.rmtree(wd) |
| 172 | + |
| 173 | + |
| 174 | +def temp_workspace_with_files_in_many_codecs(path_template, text): |
| 175 | + workspace = {} |
| 176 | + for codec in UTF_CODECS: |
| 177 | + if encoding_detectable(text, codec): |
| 178 | + workspace[path_template.format(codec)] = Blob(text, codec) |
| 179 | + return workspace |
| 180 | + |
| 181 | + |
| 182 | +# Miscellaneous stuff: |
30 | 183 | class RuleTestCase(unittest.TestCase):
|
31 | 184 | def build_fake_config(self, conf):
|
32 | 185 | if conf is None:
|
@@ -81,37 +234,3 @@ def __exit__(self, *exc_info):
|
81 | 234 | @property
|
82 | 235 | def returncode(self):
|
83 | 236 | return self._raises_ctx.exception.code
|
84 |
| - |
85 |
| - |
86 |
| -def build_temp_workspace(files): |
87 |
| - tempdir = tempfile.mkdtemp(prefix='yamllint-tests-') |
88 |
| - |
89 |
| - for path, content in files.items(): |
90 |
| - path = os.fsencode(os.path.join(tempdir, path)) |
91 |
| - if not os.path.exists(os.path.dirname(path)): |
92 |
| - os.makedirs(os.path.dirname(path)) |
93 |
| - |
94 |
| - if isinstance(content, list): |
95 |
| - os.mkdir(path) |
96 |
| - elif isinstance(content, str) and content.startswith('symlink://'): |
97 |
| - os.symlink(content[10:], path) |
98 |
| - else: |
99 |
| - mode = 'wb' if isinstance(content, bytes) else 'w' |
100 |
| - with open(path, mode) as f: |
101 |
| - f.write(content) |
102 |
| - |
103 |
| - return tempdir |
104 |
| - |
105 |
| - |
106 |
| -@contextlib.contextmanager |
107 |
| -def temp_workspace(files): |
108 |
| - """Provide a temporary workspace that is automatically cleaned up.""" |
109 |
| - backup_wd = os.getcwd() |
110 |
| - wd = build_temp_workspace(files) |
111 |
| - |
112 |
| - try: |
113 |
| - os.chdir(wd) |
114 |
| - yield |
115 |
| - finally: |
116 |
| - os.chdir(backup_wd) |
117 |
| - shutil.rmtree(wd) |
|
0 commit comments