Ϫf&bdZddlZddlZddlmZmZmZmZmZm Z m Z m Z m Z ddl mZddlmZddlmZddlmZdd lmZd Zd Z ej$j3d r dd lmZmZd Zd Zej;ZeeeZ ejCGddeeZ"y#e$rYBwxYw)z/ Tests for L{twisted.internet.asyncioreactor}. N) AbstractEventLoopAbstractEventLoopPolicyDefaultEventLoopPolicyFutureSelectorEventLoopget_event_loopget_event_loop_policyset_event_loopset_event_loop_policy)skipIf)AsyncioSelectorReactor)platform)SynchronousTestCase)ReactorBuilderFwin32)WindowsProactorEventLoopPolicyWindowsSelectorEventLoopPolicyTc eZdZdZdZdedefdZee dje e e jje jj ej$dZee dje e e jje jj ej$dZee d je e e jje jj ej$d Zee d d Zee d dZee ddZee ddZdZdZdZdZy)AsyncioSelectorReactorTestsz* L{AsyncioSelectorReactor} tests. ct}gfd}|j||jd|jg|j d|jdgy)zn Ensure that C{reactor} has an event loop that works properly with L{asyncio.Future}. cdj|jjyN)appendresultstop)futurereactorrs K/usr/lib/python3/dist-packages/twisted/internet/test/test_asyncioreactor.py completedzRAsyncioSelectorReactorTests.assertReactorWorksWithAsyncioFuture..completed=s MM&--/ * LLNTr)timeoutN)radd_done_callback set_result assertEqual runReactor)selfrrr rs ` @r#assertReactorWorksWithAsyncioFuturez?AsyncioSelectorReactorTests.assertReactorWorksWithAsyncioFuture5sf     +$ $ + $(r!policyreturnctt|j|jfd}S)z Make a new asyncio loop from a policy for use with a reactor, and add appropriate cleanup to restore any global state. cRjttyr)closer r ) existingLoopexistingPolicyrsrcleanUpz4AsyncioSelectorReactorTests.newLoop..cleanUpQs LLN < ( !. 1r!)rr new_event_loop addCleanup)r'r)r0r.r/rs @@@rnewLoopz#AsyncioSelectorReactorTests.newLoopHs@ &' .0&&(  2  2  r!zLdefault event loop: {} is not of type SelectorEventLoop on Python {}.{} ({})c:t}|j|y)a L{AsyncioSelectorReactor} wraps the global policy's event loop by default. This ensures that L{asyncio.Future}s and coroutines created by library code that uses L{asyncio.get_event_loop} are bound to the same loop. N)r r(r'rs r-test_defaultSelectorEventLoopFromGlobalPolicyzIAsyncioSelectorReactorTests.test_defaultSelectorEventLoopFromGlobalPolicyYs")* 009r!c|jt}t|}t||j |y)z If we use the L{asyncio.DefaultLoopPolicy} to create a new event loop, and then pass that event loop to a new L{AsyncioSelectorReactor}, this reactor should work properly with L{asyncio.Future}. N)r3rr r r(r' event_looprs r3test_newSelectorEventLoopFromDefaultEventLoopPolicyzOAsyncioSelectorReactorTests.test_newSelectorEventLoopFromDefaultEventLoopPolicyms6 \\"8":; (4z" 009r!zHdefault event loop: {} is of type SelectorEventLoop on Python {}.{} ({})c8|jtty)ay On Windows Python 3.5 to 3.7, L{get_event_loop()} returns a L{WindowsSelectorEventLoop} by default. On Windows Python 3.8+, L{get_event_loop()} returns a L{WindowsProactorEventLoop} by default. L{AsyncioSelectorReactor} should raise a L{TypeError} if the default event loop is not a L{WindowsSelectorEventLoop}. N) assertRaises TypeErrorr r's r1test_defaultNotASelectorEventLoopFromGlobalPolicyzMAsyncioSelectorReactorTests.test_defaultNotASelectorEventLoopFromGlobalPolicys( )%;zQAsyncioSelectorReactorTests.test_WindowsProactorEventLoopPolicy.. 5d ;r!N)r rr2r<r=r r>s r#test_WindowsProactorEventLoopPolicyz?AsyncioSelectorReactorTests.test_WindowsProactorEventLoopPolicysF <>? ;<   y ) % " $ % % %s AAz.WindowsSelectorEventLoopPolicy only on Windowsctt|jdt}|j |y)zy L{AsyncioSelectorReactor} will work if if L{asyncio.WindowsSelectorEventLoopPolicy} is default. ctdSrrFrGr!rrHzQAsyncioSelectorReactorTests.test_WindowsSelectorEventLoopPolicy..rIr!N)r rr2r r(r5s r#test_WindowsSelectorEventLoopPolicyz?AsyncioSelectorReactorTests.test_WindowsSelectorEventLoopPolicys3 <>? ;<(* 009r!ctr%tt|jdt }|j }|j |d|j|dy)z0L{seconds} should return a plausible epoch time.ctdSrrFrGr!rrHz:AsyncioSelectorReactorTests.test_seconds.. $9$$?r!i ^l#G4N)!hasWindowsSelectorEventLoopPolicyr rr2r seconds assertGreater assertLess)r'rrs r test_secondsz(AsyncioSelectorReactorTests.test_secondssT , !"@"B C OO? @(*" 6:.  +r!ctr%tt|jdt dgfd}j }j d|}|jdj djj|jd|jd|z dy)zQ L{DelayedCall.reset()} properly reschedules timer to later time ctdSrrFrGr!rrHzJAsyncioSelectorReactorTests.test_delayedCallResetToLater..rPr!Nc,jd<yNrrRrtimer_called_atsron_timerzJAsyncioSelectorReactorTests.test_delayedCallResetToLater..on_timer!(!2OA r!r?r皙?) rQr rr2r rR callLaterresetrrunassertIsNotNonerS)r'r] start_timedcrr\s @@rtest_delayedCallResetToLaterz8AsyncioSelectorReactorTests.test_delayedCallResetToLaters - !"@"B C OO? @(*& 3__&   q( +  !W\\*  _Q/0 ?1- :C@r!cHtrtttdgfd}j }j d|}|j dj djddl}ddl m }|j}||5jddd|j|jd|jd|j!d|z dtr tdyy#1swYhxYw) zS L{DelayedCall.reset()} properly reschedules timer to earlier time Nc,jd<yrYrZr[srr]zLAsyncioSelectorReactorTests.test_delayedCallResetToEarlier..on_timerr^r!r_rr)redirect_stderrr`)rQr rr rRrarbrio contextlibrjStringIOrcr%getvaluerdrT) r'r]rerfrlrjstderrrr\s @@rtest_delayedCallResetToEarlierz:AsyncioSelectorReactorTests.test_delayedCallResetToEarliers - !"@"B C(*& 3__&   sH -  !W\\*. V $  KKM  *B/ _Q/0 *Z7= , !$ ' -   s DD!c4trtttj}tj  t tj}d}t}t|D]}|jdd|jt tj}|j||z |z d|rtj tr tdyy#|rtjwwxYw)zW L{AsyncioSelectorReactor.callLater()} doesn't leave cyclic references ircyrrGrGr!rrHzOAsyncioSelectorReactorTests.test_noCycleReferencesInCallLater..sr!rN)rQr rgc isenableddisablelen get_objectsr rangerarunUntilCurrentrTenable)r'gc_was_enabledobjects_before timer_countr_ objects_afters r!test_noCycleReferencesInCallLaterz=AsyncioSelectorReactorTests.test_noCycleReferencesInCallLaters - !"@"B C   !12NK,.G;' 3!!!\2 3  # # % 01M OO]^;{JA N , !$ ' - s BC>>DN) __name__ __module__ __qualname____doc__r(rrr3r _defaultEventLoopIsSelectorformattype_defaultEventLoopsys version_infomajorminorrgetTyper6r:r?!hasWindowsProactorEventLoopPolicyrArQrCrJrMrUrgrqrrGr!rrr0s)&5:K" '' %v " #    " "    " " H      : : '' %v " #    " "    " " H       :  : # %v " #    " "    " " H       =  = --/WII --2: : --8% % --8: : ,A0(>(r!r)#rrtrasynciorrrrrrr r r unittestr twisted.internet.asyncioreactorr twisted.python.runtimertwisted.trial.unittestr reactormixinsrrrQ startswithrr ImportErrorr1r isinstancerr-rrGr!rrs    B+6)$)!$)!   ||w' -1),0) +,;;=():