|
11 | 11 | # ANY KIND, either express or implied. See the License for the specific
|
12 | 12 | # language governing permissions and limitations under the License.
|
13 | 13 | """Functional test suite for aws_encryption_sdk.kms_thick_client"""
|
| 14 | +from __future__ import division |
| 15 | + |
14 | 16 | import io
|
| 17 | +import logging |
15 | 18 |
|
16 | 19 | import attr
|
17 | 20 | import botocore.client
|
|
28 | 31 | from aws_encryption_sdk.exceptions import CustomMaximumValueExceeded
|
29 | 32 | from aws_encryption_sdk.identifiers import Algorithm, EncryptionKeyType, WrappingAlgorithm
|
30 | 33 | from aws_encryption_sdk.internal.crypto.wrapping_keys import WrappingKey
|
| 34 | +from aws_encryption_sdk.internal.defaults import LINE_LENGTH |
31 | 35 | from aws_encryption_sdk.internal.formatting.encryption_context import serialize_encryption_context
|
32 | 36 | from aws_encryption_sdk.key_providers.base import MasterKeyProviderConfig
|
33 | 37 | from aws_encryption_sdk.key_providers.raw import RawMasterKeyProvider
|
@@ -498,12 +502,14 @@ def test_encryption_cycle_with_caching():
|
498 | 502 | def test_encrypt_source_length_enforcement():
|
499 | 503 | key_provider = fake_kms_key_provider()
|
500 | 504 | cmm = aws_encryption_sdk.DefaultCryptoMaterialsManager(key_provider)
|
| 505 | + plaintext = io.BytesIO(VALUES["plaintext_128"]) |
501 | 506 | with pytest.raises(CustomMaximumValueExceeded) as excinfo:
|
502 | 507 | aws_encryption_sdk.encrypt(
|
503 |
| - source=VALUES["plaintext_128"], materials_manager=cmm, source_length=int(len(VALUES["plaintext_128"]) / 2) |
| 508 | + source=plaintext, materials_manager=cmm, source_length=int(len(VALUES["plaintext_128"]) / 2) |
504 | 509 | )
|
505 | 510 |
|
506 | 511 | excinfo.match(r"Bytes encrypted has exceeded stated source length estimate:*")
|
| 512 | + assert repr(plaintext) not in excinfo.exconly() |
507 | 513 |
|
508 | 514 |
|
509 | 515 | def test_encrypt_source_length_enforcement_legacy_support():
|
@@ -669,3 +675,55 @@ def test_incomplete_read_stream_cycle(frame_length):
|
669 | 675 | )
|
670 | 676 |
|
671 | 677 | assert ciphertext != decrypted == plaintext
|
| 678 | + |
| 679 | + |
| 680 | +def _prep_plaintext_and_logs(log_catcher, plaintext_length): |
| 681 | + log_catcher.set_level(logging.DEBUG) |
| 682 | + key_provider = fake_kms_key_provider() |
| 683 | + plaintext = exact_length_plaintext(plaintext_length) |
| 684 | + return plaintext, key_provider |
| 685 | + |
| 686 | + |
| 687 | +def _look_in_logs(log_catcher, plaintext): |
| 688 | + logs = log_catcher.text |
| 689 | + # look for every possible 32-byte chunk |
| 690 | + start = 0 |
| 691 | + end = 32 |
| 692 | + plaintext_length = len(plaintext) |
| 693 | + while end <= plaintext_length: |
| 694 | + chunk_repr = repr(plaintext[start:end]) |
| 695 | + repr_body = chunk_repr[2:-1] |
| 696 | + assert repr_body not in logs |
| 697 | + start += 1 |
| 698 | + end += 1 |
| 699 | + |
| 700 | + |
| 701 | +@pytest.mark.parametrize("frame_size", (0, LINE_LENGTH // 2, LINE_LENGTH, LINE_LENGTH * 2)) |
| 702 | +@pytest.mark.parametrize( |
| 703 | + "plaintext_length", (1, LINE_LENGTH // 2, LINE_LENGTH, int(LINE_LENGTH * 1.5), LINE_LENGTH * 2) |
| 704 | +) |
| 705 | +def test_plaintext_logs_oneshot(caplog, plaintext_length, frame_size): |
| 706 | + plaintext, key_provider = _prep_plaintext_and_logs(caplog, plaintext_length) |
| 707 | + |
| 708 | + _ciphertext, _header = aws_encryption_sdk.encrypt( |
| 709 | + source=plaintext, key_provider=key_provider, frame_length=frame_size |
| 710 | + ) |
| 711 | + |
| 712 | + _look_in_logs(caplog, plaintext) |
| 713 | + |
| 714 | + |
| 715 | +@pytest.mark.parametrize("frame_size", (0, LINE_LENGTH // 2, LINE_LENGTH, LINE_LENGTH * 2)) |
| 716 | +@pytest.mark.parametrize( |
| 717 | + "plaintext_length", (1, LINE_LENGTH // 2, LINE_LENGTH, int(LINE_LENGTH * 1.5), LINE_LENGTH * 2) |
| 718 | +) |
| 719 | +def test_plaintext_logs_stream(caplog, plaintext_length, frame_size): |
| 720 | + plaintext, key_provider = _prep_plaintext_and_logs(caplog, plaintext_length) |
| 721 | + |
| 722 | + ciphertext = b"" |
| 723 | + with aws_encryption_sdk.stream( |
| 724 | + mode="encrypt", source=plaintext, key_provider=key_provider, frame_length=frame_size |
| 725 | + ) as encryptor: |
| 726 | + for line in encryptor: |
| 727 | + ciphertext += line |
| 728 | + |
| 729 | + _look_in_logs(caplog, plaintext) |
0 commit comments