@@ -2382,6 +2382,26 @@ class CustomListener(logging.handlers.QueueListener):
2382
2382
class CustomQueue (queue .Queue ):
2383
2383
pass
2384
2384
2385
+ class CustomQueueProtocol :
2386
+ def __init__ (self , maxsize = 0 ):
2387
+ self .queue = queue .Queue (maxsize )
2388
+
2389
+ def __getattr__ (self , attribute ):
2390
+ queue = object .__getattribute__ (self , 'queue' )
2391
+ return getattr (queue , attribute )
2392
+
2393
+ class CustomQueueFakeProtocol (CustomQueueProtocol ):
2394
+ # An object implementing the Queue API (incorrect signatures).
2395
+ # The object will be considered a valid queue class since we
2396
+ # do not check the signatures (only callability of methods)
2397
+ # but will NOT be usable in production since a TypeError will
2398
+ # be raised due to a missing argument.
2399
+ def empty (self , x ):
2400
+ pass
2401
+
2402
+ class CustomQueueWrongProtocol (CustomQueueProtocol ):
2403
+ empty = None
2404
+
2385
2405
def queueMaker ():
2386
2406
return queue .Queue ()
2387
2407
@@ -3869,18 +3889,16 @@ def do_queuehandler_configuration(self, qspec, lspec):
3869
3889
@threading_helper .requires_working_threading ()
3870
3890
@support .requires_subprocess ()
3871
3891
def test_config_queue_handler (self ):
3872
- q = CustomQueue ()
3873
- dq = {
3874
- '()' : __name__ + '.CustomQueue' ,
3875
- 'maxsize' : 10
3876
- }
3892
+ qs = [CustomQueue (), CustomQueueProtocol ()]
3893
+ dqs = [{'()' : f'{ __name__ } .{ cls } ' , 'maxsize' : 10 }
3894
+ for cls in ['CustomQueue' , 'CustomQueueProtocol' ]]
3877
3895
dl = {
3878
3896
'()' : __name__ + '.listenerMaker' ,
3879
3897
'arg1' : None ,
3880
3898
'arg2' : None ,
3881
3899
'respect_handler_level' : True
3882
3900
}
3883
- qvalues = (None , __name__ + '.queueMaker' , __name__ + '.CustomQueue' , dq , q )
3901
+ qvalues = (None , __name__ + '.queueMaker' , __name__ + '.CustomQueue' , * dqs , * qs )
3884
3902
lvalues = (None , __name__ + '.CustomListener' , dl , CustomListener )
3885
3903
for qspec , lspec in itertools .product (qvalues , lvalues ):
3886
3904
self .do_queuehandler_configuration (qspec , lspec )
@@ -3900,15 +3918,21 @@ def test_config_queue_handler(self):
3900
3918
@support .requires_subprocess ()
3901
3919
@patch ("multiprocessing.Manager" )
3902
3920
def test_config_queue_handler_does_not_create_multiprocessing_manager (self , manager ):
3903
- # gh-120868
3921
+ # gh-120868, gh-121723
3904
3922
3905
3923
from multiprocessing import Queue as MQ
3906
3924
3907
3925
q1 = {"()" : "queue.Queue" , "maxsize" : - 1 }
3908
3926
q2 = MQ ()
3909
3927
q3 = queue .Queue ()
3910
-
3911
- for qspec in (q1 , q2 , q3 ):
3928
+ # CustomQueueFakeProtocol passes the checks but will not be usable
3929
+ # since the signatures are incompatible. Checking the Queue API
3930
+ # without testing the type of the actual queue is a trade-off
3931
+ # between usability and the work we need to do in order to safely
3932
+ # check that the queue object correctly implements the API.
3933
+ q4 = CustomQueueFakeProtocol ()
3934
+
3935
+ for qspec in (q1 , q2 , q3 , q4 ):
3912
3936
self .apply_config (
3913
3937
{
3914
3938
"version" : 1 ,
@@ -3924,21 +3948,62 @@ def test_config_queue_handler_does_not_create_multiprocessing_manager(self, mana
3924
3948
3925
3949
@patch ("multiprocessing.Manager" )
3926
3950
def test_config_queue_handler_invalid_config_does_not_create_multiprocessing_manager (self , manager ):
3927
- # gh-120868
3951
+ # gh-120868, gh-121723
3928
3952
3929
- with self .assertRaises (ValueError ):
3930
- self .apply_config (
3931
- {
3932
- "version" : 1 ,
3933
- "handlers" : {
3934
- "queue_listener" : {
3935
- "class" : "logging.handlers.QueueHandler" ,
3936
- "queue" : object (),
3953
+ for qspec in [object (), CustomQueueWrongProtocol ()]:
3954
+ with self .assertRaises (ValueError ):
3955
+ self .apply_config (
3956
+ {
3957
+ "version" : 1 ,
3958
+ "handlers" : {
3959
+ "queue_listener" : {
3960
+ "class" : "logging.handlers.QueueHandler" ,
3961
+ "queue" : qspec ,
3962
+ },
3937
3963
},
3938
- },
3964
+ }
3965
+ )
3966
+ manager .assert_not_called ()
3967
+
3968
+ @skip_if_tsan_fork
3969
+ @support .requires_subprocess ()
3970
+ @unittest .skipUnless (support .Py_DEBUG , "requires a debug build for testing"
3971
+ "assertions in multiprocessing" )
3972
+ def test_config_queue_handler_multiprocessing_context (self ):
3973
+ # regression test for gh-121723
3974
+ if support .MS_WINDOWS :
3975
+ start_methods = ['spawn' ]
3976
+ else :
3977
+ start_methods = ['spawn' , 'fork' , 'forkserver' ]
3978
+ for start_method in start_methods :
3979
+ with self .subTest (start_method = start_method ):
3980
+ ctx = multiprocessing .get_context (start_method )
3981
+ with ctx .Manager () as manager :
3982
+ q = manager .Queue ()
3983
+ records = []
3984
+ # use 1 process and 1 task per child to put 1 record
3985
+ with ctx .Pool (1 , initializer = self ._mpinit_issue121723 ,
3986
+ initargs = (q , "text" ), maxtasksperchild = 1 ):
3987
+ records .append (q .get (timeout = 60 ))
3988
+ self .assertTrue (q .empty ())
3989
+ self .assertEqual (len (records ), 1 )
3990
+
3991
+ @staticmethod
3992
+ def _mpinit_issue121723 (qspec , message_to_log ):
3993
+ # static method for pickling support
3994
+ logging .config .dictConfig ({
3995
+ 'version' : 1 ,
3996
+ 'disable_existing_loggers' : True ,
3997
+ 'handlers' : {
3998
+ 'log_to_parent' : {
3999
+ 'class' : 'logging.handlers.QueueHandler' ,
4000
+ 'queue' : qspec
3939
4001
}
3940
- )
3941
- manager .assert_not_called ()
4002
+ },
4003
+ 'root' : {'handlers' : ['log_to_parent' ], 'level' : 'DEBUG' }
4004
+ })
4005
+ # log a message (this creates a record put in the queue)
4006
+ logging .getLogger ().info (message_to_log )
3942
4007
3943
4008
@support .requires_subprocess ()
3944
4009
def test_multiprocessing_queues (self ):
0 commit comments