|
13 | 13 |
|
14 | 14 | package com.rabbitmq.stream.impl;
|
15 | 15 |
|
| 16 | +import static java.lang.String.format; |
16 | 17 | import static java.util.concurrent.TimeUnit.SECONDS;
|
17 | 18 | import static org.assertj.core.api.Assertions.assertThat;
|
18 | 19 | import static org.junit.jupiter.api.Assertions.fail;
|
|
64 | 65 | import java.util.concurrent.atomic.AtomicReference;
|
65 | 66 | import java.util.function.Consumer;
|
66 | 67 | import java.util.function.Function;
|
| 68 | +import java.util.function.Predicate; |
67 | 69 | import java.util.function.Supplier;
|
68 | 70 | import java.util.stream.IntStream;
|
69 | 71 | import org.assertj.core.api.AssertDelegateTarget;
|
@@ -336,7 +338,7 @@ private static String streamName(ExtensionContext context) {
|
336 | 338 |
|
337 | 339 | private static String streamName(Class<?> testClass, Method testMethod) {
|
338 | 340 | String uuid = UUID.randomUUID().toString();
|
339 |
| - return String.format( |
| 341 | + return format( |
340 | 342 | "%s_%s%s",
|
341 | 343 | testClass.getSimpleName(), testMethod.getName(), uuid.substring(uuid.length() / 2));
|
342 | 344 | }
|
@@ -480,6 +482,12 @@ static boolean atLeastVersion(String expectedVersion, String currentVersion) {
|
480 | 482 | @ExtendWith(DisabledIfAmqp10NotEnabledCondition.class)
|
481 | 483 | @interface DisabledIfAmqp10NotEnabled {}
|
482 | 484 |
|
| 485 | + @Target({ElementType.TYPE, ElementType.METHOD}) |
| 486 | + @Retention(RetentionPolicy.RUNTIME) |
| 487 | + @Documented |
| 488 | + @ExtendWith(DisabledIfAuthMechanismSslNotEnabledCondition.class) |
| 489 | + @interface DisabledIfAuthMechanismSslNotEnabled {} |
| 490 | + |
483 | 491 | @Target({ElementType.TYPE, ElementType.METHOD})
|
484 | 492 | @Retention(RetentionPolicy.RUNTIME)
|
485 | 493 | @Documented
|
@@ -707,75 +715,72 @@ public ConditionEvaluationResult evaluateExecutionCondition(ExtensionContext con
|
707 | 715 | }
|
708 | 716 | }
|
709 | 717 |
|
710 |
| - static class DisabledIfMqttNotEnabledCondition implements ExecutionCondition { |
| 718 | + abstract static class DisabledIfPluginNotEnabledCondition implements ExecutionCondition { |
| 719 | + |
| 720 | + private final String pluginLabel; |
| 721 | + private final Predicate<String> condition; |
| 722 | + |
| 723 | + DisabledIfPluginNotEnabledCondition(String pluginLabel, Predicate<String> condition) { |
| 724 | + this.pluginLabel = pluginLabel; |
| 725 | + this.condition = condition; |
| 726 | + } |
711 | 727 |
|
712 | 728 | @Override
|
713 | 729 | public ConditionEvaluationResult evaluateExecutionCondition(ExtensionContext context) {
|
714 | 730 | if (Host.rabbitmqctlCommand() == null) {
|
715 | 731 | return ConditionEvaluationResult.disabled(
|
716 |
| - "rabbitmqctl.bin system property not set, cannot check if MQTT plugin is enabled"); |
| 732 | + format( |
| 733 | + "rabbitmqctl.bin system property not set, cannot check if %s plugin is enabled", |
| 734 | + pluginLabel)); |
717 | 735 | } else {
|
718 | 736 | try {
|
719 | 737 | Process process = Host.rabbitmqctl("status");
|
720 | 738 | String output = capture(process.getInputStream());
|
721 |
| - if (output.contains("rabbitmq_mqtt") && output.contains("protocol: mqtt")) { |
722 |
| - return ConditionEvaluationResult.enabled("MQTT plugin enabled"); |
| 739 | + if (condition.test(output)) { |
| 740 | + return ConditionEvaluationResult.enabled(format("%s plugin enabled", pluginLabel)); |
723 | 741 | } else {
|
724 |
| - return ConditionEvaluationResult.disabled("MQTT plugin disabled"); |
| 742 | + return ConditionEvaluationResult.disabled(format("%s plugin disabled", pluginLabel)); |
725 | 743 | }
|
726 | 744 | } catch (Exception e) {
|
727 | 745 | return ConditionEvaluationResult.disabled(
|
728 |
| - "Error while trying to detect MQTT plugin: " + e.getMessage()); |
| 746 | + format("Error while trying to detect %s plugin: " + e.getMessage(), pluginLabel)); |
729 | 747 | }
|
730 | 748 | }
|
731 | 749 | }
|
732 | 750 | }
|
733 | 751 |
|
734 |
| - static class DisabledIfStompNotEnabledCondition implements ExecutionCondition { |
| 752 | + static class DisabledIfMqttNotEnabledCondition extends DisabledIfPluginNotEnabledCondition { |
735 | 753 |
|
736 |
| - @Override |
737 |
| - public ConditionEvaluationResult evaluateExecutionCondition(ExtensionContext context) { |
738 |
| - if (Host.rabbitmqctlCommand() == null) { |
739 |
| - return ConditionEvaluationResult.disabled( |
740 |
| - "rabbitmqctl.bin system property not set, cannot check if STOMP plugin is enabled"); |
741 |
| - } else { |
742 |
| - try { |
743 |
| - Process process = Host.rabbitmqctl("status"); |
744 |
| - String output = capture(process.getInputStream()); |
745 |
| - if (output.contains("rabbitmq_stomp") && output.contains("protocol: stomp")) { |
746 |
| - return ConditionEvaluationResult.enabled("STOMP plugin enabled"); |
747 |
| - } else { |
748 |
| - return ConditionEvaluationResult.disabled("STOMP plugin disabled"); |
749 |
| - } |
750 |
| - } catch (Exception e) { |
751 |
| - return ConditionEvaluationResult.disabled( |
752 |
| - "Error while trying to detect STOMP plugin: " + e.getMessage()); |
753 |
| - } |
754 |
| - } |
| 754 | + DisabledIfMqttNotEnabledCondition() { |
| 755 | + super( |
| 756 | + "MQTT", output -> output.contains("rabbitmq_mqtt") && output.contains("protocol: mqtt")); |
755 | 757 | }
|
756 | 758 | }
|
757 | 759 |
|
758 |
| - static class DisabledIfAmqp10NotEnabledCondition implements ExecutionCondition { |
| 760 | + static class DisabledIfStompNotEnabledCondition extends DisabledIfPluginNotEnabledCondition { |
759 | 761 |
|
760 |
| - @Override |
761 |
| - public ConditionEvaluationResult evaluateExecutionCondition(ExtensionContext context) { |
762 |
| - if (Host.rabbitmqctlCommand() == null) { |
763 |
| - return ConditionEvaluationResult.disabled( |
764 |
| - "rabbitmqctl.bin system property not set, cannot check if STOMP plugin is enabled"); |
765 |
| - } else { |
766 |
| - try { |
767 |
| - Process process = Host.rabbitmqctl("status"); |
768 |
| - String output = capture(process.getInputStream()); |
769 |
| - if (output.contains("rabbitmq_amqp1_0") && output.contains("AMQP 1.0")) { |
770 |
| - return ConditionEvaluationResult.enabled("STOMP plugin enabled"); |
771 |
| - } else { |
772 |
| - return ConditionEvaluationResult.disabled("STOMP plugin disabled"); |
773 |
| - } |
774 |
| - } catch (Exception e) { |
775 |
| - return ConditionEvaluationResult.disabled( |
776 |
| - "Error while trying to detect STOMP plugin: " + e.getMessage()); |
777 |
| - } |
778 |
| - } |
| 762 | + DisabledIfStompNotEnabledCondition() { |
| 763 | + super( |
| 764 | + "STOMP", |
| 765 | + output -> output.contains("rabbitmq_stomp") && output.contains("protocol: stomp")); |
| 766 | + } |
| 767 | + } |
| 768 | + |
| 769 | + static class DisabledIfAuthMechanismSslNotEnabledCondition |
| 770 | + extends DisabledIfPluginNotEnabledCondition { |
| 771 | + |
| 772 | + DisabledIfAuthMechanismSslNotEnabledCondition() { |
| 773 | + super( |
| 774 | + "X509 authentication mechanism", |
| 775 | + output -> output.contains("rabbitmq_auth_mechanism_ssl")); |
| 776 | + } |
| 777 | + } |
| 778 | + |
| 779 | + static class DisabledIfAmqp10NotEnabledCondition extends DisabledIfPluginNotEnabledCondition { |
| 780 | + |
| 781 | + DisabledIfAmqp10NotEnabledCondition() { |
| 782 | + super( |
| 783 | + "AMQP 1.0", output -> output.contains("rabbitmq_amqp1_0") && output.contains("AMQP 1.0")); |
779 | 784 | }
|
780 | 785 | }
|
781 | 786 |
|
|
0 commit comments