ϪfVXdZddlZddlZddlZddlZddlZddlmZmZddl m Z m Z m Z m Z ddlmZGddZe j"eGdd ej$ZGd d ej$ZGd d e j*ZGddZGddej$Zy)z( Tests for L{twisted.python.threadpool} N)TeamcreateMemoryWorker)contextfailure threadable threadpool)unittestc"eZdZdZdZdZdgZy)Synchronizationrc`||_||_tj|_g|_yN)Nwaiting threadingLocklockruns)selfrrs >/usr/lib/python3/dist-packages/twisted/test/test_threadpool.py__init__zSynchronization.__init__s% NN$  c |jjdrHt|jdzst j d|jj n|xjdz c_|jj|jjdt|j|jk(r|jj |jj y)NFg-C6*?) racquirelenrtimesleepreleasefailuresappendrrrs rrunzSynchronization.runs 99  U #tyy>A% 6" II    MMQ M   tyy>TVV # LL " rr#N)__name__ __module__ __qualname__r rr# synchronizedrrr r sH 07Lrr ceZdZdZdZdZdZdZdZdZ dZ d Z d Z d Z d Zd ZdZdZdZdZdZdZddZy)ThreadPoolTestsz Test threadpools. cy)zD Return number of seconds to wait before giving up. rr(r"s r getTimeoutzThreadPoolTests.getTimeout@rctd}|D]*}|jdrytjd,|j dy)Ni@BFgh㈵>z%A long time passed without succeeding)rangerrrfail)rritemsis r _waitForLockzThreadPoolTests._waitForLockFsDg ?A||E" JJt  ? II= >rctjdd}|j|jd|j|jdy)zy L{ThreadPool.min} and L{ThreadPool.max} are set to the values passed to L{ThreadPool.__init__}. N)r ThreadPool assertEqualminmaxrpools rtest_attributeszThreadPoolTests.test_attributesOs> $$R, 2& 2&rctjdd}|j|j|j|j t |jdtjdd}|j t |jd|j|j|j|j t |jdy)zV L{ThreadPool.start} creates the minimum number of threads specified. rr N)rr7start addCleanupstopr8rthreadsr;s r test_startzThreadPoolTests.test_startXs$$Q*   " T\\*A.$$Q+ T\\*A.   " T\\*A.rctjdd}|j|j|j d|j t |jdy)z L{ThreadPool.adjustPoolsize} only modifies the pool size and does not start new workers while the pool is not running. rrN)rr7rArCadjustPoolsizer8rrDr;s rtest_adjustingWhenPoolStoppedz-ThreadPoolTests.test_adjustingWhenPoolStoppedgsO $$Q*   A T\\*A.rc~tjdd}|j|j|j|j |j gd}Gdd}|}tj|}tj|}|j||tj}|j|j|j|j~~tj |j#||j#|y)z Test that creating threads in the threadpool with application-level objects as arguments doesn't results in those objects never being freed, with the thread maintaining a reference to them as long as it exists. rrcyr r()args rworkerz.workers rc eZdZy):ThreadPoolTests.test_threadCreationArguments..DumbNr$r%r&r(rrDumbrO rrQN)rr7rArBrCr8rDweakrefref callInThreadrEventsetwaitr,gccollect assertIsNone)rtprMrQunique workerRef uniqueRefevents rtest_threadCreationArgumentsz,ThreadPoolTests.test_threadCreationArgumentsrs " "1a (     R(   KK' KK'  '!  " 4??$%    )+& )+&rc tjdd}|jj|jj |j gi tjtjg  fd}fd}Gdd}tj|}tj| tj| |j||||~~jjjtj j# j# ~tj j#|j# dj t% j'ddgy) ze As C{test_threadCreationArguments} above, but for callInThreadWithCallback. rrctjjjd<d<j j t j|y)Nr^r_)rYrZrXr,rWr!rSrT) successresult onResultDone onResultWaitrefdict resultRefrr_r^s ronResultzVThreadPoolTests.test_threadCreationArgumentsCallInThreadWithCallback..onResults^ JJL   doo/ 0#,;GK #,;GK       W[[0 1rcSr r()rLtestrQs rrMzTThreadPoolTests.test_threadCreationArgumentsCallInThreadWithCallback..workers 6Mrc eZdZy)RThreadPoolTests.test_threadCreationArgumentsCallInThreadWithCallback..DumbNrPr(rrrQrnrRrrQ)rlN)rr7rArBrCr8rDrrVrSrTcallInThreadWithCallbackrWrXr,rYrZr[listvalues) rr\rjrMr] onResultRefrQrfrgrhrir_r^s ` @@@@@@@r4test_threadCreationArgumentsCallInThreadWithCallbackzDThreadPoolTests.test_threadCreationArgumentsCallInThreadWithCallbacksw  " "1a (     R( (  (   2 2   kk(+ KK' KK'  ##Hff6#J   $//+,  )+& )+&   +-( ,)A,.) gnn./$>rc`tjdd}|j|jd|j|jdt j t j|}|j|jd|j|jdy)z Threadpools can be pickled and unpickled, which should preserve the number of threads and other parameters. N)rr7r8r9r:pickleloadsdumps)rr<copys rtest_persistencez ThreadPoolTests.test_persistences $$Q+ 1% 2&||FLL./ 1% 2&rcd}tj}|j|j|jt j }|jt||}t|D] }||| |j||j|jd|jdy)z Test synchronization of calls made with C{method}, which should be one of the mechanisms of the threadpool to execute work in threads. r@zrun() re-entered z timesN) rr7rArBrCrrrr r/r3 assertFalser )rmethodrr\ractorr2s r_threadpoolTestzThreadPoolTests._threadpoolTests   " " $    .."7+q A 2u   '" +z3ThreadPoolTests.test_callInThread..sbooeii6Pr)rr"s rtest_callInThreadz!ThreadPoolTests.test_callInThreads##$PQQrcGddtfd}tjdd}|j||j |j |j }|jt|dy)zi L{ThreadPool.callInThread} logs exceptions raised by the callable it is passed. c eZdZy).NewErrorNrPr(rrNewErrorrrRrrcr r(rsr raiseErrorz>ThreadPoolTests.test_callInThreadException..raiseError * rrrN) Exceptionrr7rUrArCflushLoggedErrorsr8r)rrr\errorsrs @rtest_callInThreadExceptionz*ThreadPoolTests.test_callInThreadExceptionsj  y   " "1a (  #    ''1 Va(rctjjgfd}tjdd}|j |d|j  |j|j|jd|jddy#|jwxYw)z L{ThreadPool.callInThreadWithCallback} calls C{onResult} with a two-tuple of C{(True, result)} where C{result} is the value returned by the callable supplied. cjjj|j|yr rr!rdreresultswaiters rrjz?ThreadPoolTests.test_callInThreadWithCallback..onResult5% NN  NN7 # NN6 "rrrcy)Nrlr(r(rrrz?ThreadPoolTests.test_callInThreadWithCallback..;rrlN) rrrrr7rorAr3rC assertTruer8)rrjr\rrs @@rtest_callInThreadWithCallbackz-ThreadPoolTests.test_callInThreadWithCallback*s ! #  " "1a ( ##Hn=      f % GGI  # V, GGIs )B44Cc&Gddtfd}tjjgfd}t j dd}|j |||j |j|j|jd|jdtj|jtdj y#|jwxYw)z L{ThreadPool.callInThreadWithCallback} calls C{onResult} with a two-tuple of C{(False, failure)} where C{failure} represents the exception raised by the callable supplied. c eZdZy)RThreadPoolTests.test_callInThreadWithCallbackExceptionInCallback..NewErrorNrPr(rrrrMrRrrcr r(rsrrzTThreadPoolTests.test_callInThreadWithCallbackExceptionInCallback..raiseErrorPrrcjjj|j|yr rrs rrjzRThreadPoolTests.test_callInThreadWithCallbackExceptionInCallback..onResultXrrrrN)rrrrrr7rorAr3rCr}assertIsInstancerFailurer issubclasstype)rrrjr\rrrs @@@r0test_callInThreadWithCallbackExceptionInCallbackz@ThreadPoolTests.test_callInThreadWithCallbackExceptionInCallbackFs y  ! #  " "1a ( ##Hj9      f % GGI $ gaj'//:  71:??H=> GGIs =C>>Dc<Gddttj}|jgfd}t j dd}|j |d|j|j|j |j||j|j}|jt|d|jd|j!dy#|jwxYw)zj L{ThreadPool.callInThreadWithCallback} logs the exception raised by C{onResult}. c eZdZy)RThreadPoolTests.test_callInThreadWithCallbackExceptionInOnResult..NewErrorNrPr(rrrrprRrrcVj|j|r )r!)rdrerrs rrjzRThreadPoolTests.test_callInThreadWithCallbackExceptionInOnResult..onResultxs# NN7 # NN6 "* rrrcyr r(r(rrrzRThreadPoolTests.test_callInThreadWithCallbackExceptionInOnResult..~rrN)rrrrrr7rorUrrAr3rCrr8rrr[)rrrjr\rrrs @@r0test_callInThreadWithCallbackExceptionInOnResultz@ThreadPoolTests.test_callInThreadWithCallbackExceptionInOnResultjs  y !   " "1a ( ##Hl; '      f % GGI''1 Va(  # '!*% GGIs D Dcgtjfd}fd}tjdd}|j |||j |j |jj|j|jtd|jddy)z L{ThreadPool.callInThreadWithCallback} calls the function it is given and the C{onResult} callback in the same thread. cjtjjj yr )r!rcurrent_threadidentrW)rdrer` threadIdss rrjz5ThreadPoolTests.test_callbackThread..onResults)   Y557== > IIKrc`jtjjyr )r!rrr)rsrfuncz1ThreadPoolTests.test_callbackThread..funcs   Y557== >rrrrGN) rrVrr7rorArBrCrXr,r8r)rrjrr\r`rs @@rtest_callbackThreadz#ThreadPoolTests.test_callbackThreads  !  ? " "1a ( ##Hd3     4??$% Y+ 1y|4rctjjjd}d|d<gt j fd}fd}t jdd}|j|||j|j|jj|j|jtd|j|d|j|dy ) z The context L{ThreadPool.callInThreadWithCallback} is invoked in is shared by the context the callable and C{onResult} callback are invoked in. zthis must be presenttestingctjjjd}j |j yNr)rtheContextTrackercurrentContextcontextsr!rW)rdrectxrr`s rrjz6ThreadPoolTests.test_callbackContext..onResults7++::<EEbIC OOC IIKrc~tjjjd}j |yr)rrrrr!)rrs rrz2ThreadPoolTests.test_callbackContext..funcs.++::<EEbIC OOC rrrrGN)rrrrrrVrr7rorArBrCrXr,r8r)rmyctxrjrr\rr`s @@rtest_callbackContextz$ThreadPoolTests.test_callbackContexts ))88:CCBG1i!   ! " "1a ( ##Hd3     4??$% X*  ,  ,rc<tj}|jtjdd}|j |j |j |j||jy#|jwxYw)z Work added to the threadpool before its start should be executed once the threadpool is started: this is ensured by trying to release a lock previously acquired. rrN) rrrrr7rUrrAr3rC)rrr\s rtest_existingWorkz!ThreadPoolTests.test_existingWorksk !  " "1a ( '      f % GGIBGGIs 'B Bctjdd}|j|j|j|j |j d|j t|jd|j t|jdtjtjfd}|j|jd|j |j d|j t|jd|j t|jdjt|js+tj dt|js+|j t|jd|j t|jdy)z{ As the worker receives and completes work, it transitions between the working and waiting states. rrcHjjdy)Nr@)rWrX) threadFinish threadWorkingsr_threadz;ThreadPoolTests.test_workerStateTransition.._threads       b !rr@gMb@?N)rr7rArBrCr8workersrwaitersworkingrrVrUrXrWrr)rr<rrrs @@rtest_workerStateTransitionz*ThreadPoolTests.test_workerStateTransitionsc $$Q*   " q) T\\*A. T\\*A.")  (  " '"2 q) T\\*A. T\\*A. dll# JJv dll# T\\*A. T\\*A.rNctjdd}|j|jj dy)zB There is a property '_queue' for legacy purposes rrN)rr7r8_queueqsizer;s rtest_qzThreadPoolTests.test_qs3$$Q* **,a0r)returnN)r$r%r&__doc__r,r3r=rErIrarsr{rrrrrrrrrrrr(rrr*r*;so ?' / /('TH?T' U4R )(-8"?H!&F52-B$#/J1rr*ceZdZdZdZdZy)RaceConditionTestsctjdd_tj_jj fd}j |y)Nrr@c>jj`yr )rrCr"srdonez&RaceConditionTests.setUp..done s OO "r)rr7rrVr`rArB)rrs` rsetUpzRaceConditionTests.setUpsH$//26__&    rcy)z= A reasonable number of seconds to time out. rr(r"s rr,zRaceConditionTests.getTimeoutr-rc|j}|jj|jj|jj ||jj tdD]1}|jj|jj 3|jj|jj|jj ||jjs,|jj |jdyy)a If multiple threads are waiting on an event (via blocking on something in a callable passed to L{threadpool.ThreadPool.callInThread}), and there is spare capacity in the threadpool, sending another callable which will cause those to un-block to L{threadpool.ThreadPool.callInThread} will reliably run that callable and un-block the blocked threads promptly. @note: This is not really a unit test, it is a stress-test. You may need to run it with C{trial -u} to fail reliably if there is a problem. It is very hard to regression-test for this particular bug - one where the thread pool may consider itself as having "enough capacity" when it really needs to spin up a new thread if it possibly can - in a deterministic way, since the bug can only be provoked by subtle race conditions. r?z9'set' did not run in thread; timed out waiting on 'wait'.N) r,rrUr`rWrXclearr/isSetr0)rtimeoutr2s rtest_synchronizationz'RaceConditionTests.test_synchronizations"//# $$TZZ^^4   q :A OO ( ( 9 : $$TZZ^^4  zz! JJNN  IIQ R"rN)r$r%r&rr,rr(rrrrs  SrrceZdZdZdZdZy) MemoryPoolz A deterministic threadpool that uses in-memory data structures to queue work rather than threads to execute work. ct||_||_||_tjj |g|i|y)a Initialize this L{MemoryPool} with a test case. @param coordinator: a worker used to coordinate work in the L{Team} underlying this threadpool. @type coordinator: L{twisted._threads.IExclusiveWorker} @param failTest: A 1-argument callable taking an exception and raising a test-failure exception. @type failTest: 1-argument callable taking (L{Failure}) and raising L{unittest.FailTest}. @param newWorker: a 0-argument callable that produces a new L{twisted._threads.IWorker} provider on each invocation. @type newWorker: 0-argument callable returning L{twisted._threads.IWorker}. N) _coordinator _failTest _newWorkerrr7r)r coordinatorfailTest newWorkerargskwargss rrzMemoryPool.__init__<s8$(!#&&t=d=f=rc^fd}tj|jS)a Override testing hook to create a deterministic threadpool. @param currentLimit: A 1-argument callable which returns the current threadpool size limit. @param threadFactory: ignored in this invocation; a 0-argument callable that would produce a thread. @return: a L{Team} backed by the coordinator and worker passed to L{MemoryPool.__init__}. cj}|j|jzk\ryjSr ) statisticsbusyWorkerCountidleWorkerCountr)stats currentLimitrteams r respectLimitz&MemoryPool._pool..respectLimitas<OO%E%%(=(==,.P??$ $r)r createWorker logException)rrr)rr threadFactoryrrs`` @r_poolzMemoryPool._poolSs. %))%   rN)r$r%r&rrrr(rrrr6s >.rrceZdZdZdZdZy) PoolHelpera A L{PoolHelper} constructs a L{threadpool.ThreadPool} that doesn't actually use threads, by using the internal interfaces in L{twisted._threads}. @ivar performCoordination: a 0-argument callable that will perform one unit of "coordination" - work involved in delegating work to other threads - and return L{True} if it did any work, L{False} otherwise. @ivar workers: the workers which represent the threads within the pool - the workers other than the coordinator. @type workers: L{list} of 2-tuple of (L{IWorker}, C{workPerformer}) where C{workPerformer} is a 0-argument callable like C{performCoordination}. @ivar threadpool: a modified L{threadpool.ThreadPool} to test. @type threadpool: L{MemoryPool} ct\}_g_fd}t||j|g|i|_y)z Create a L{PoolHelper}. @param testCase: a test case attached to this helper. @type args: The arguments passed to a L{threadpool.ThreadPool}. @type kwargs: The arguments passed to a L{threadpool.ThreadPool} cnjjtjddS)Nrr)rr!rr"srrz&PoolHelper.__init__..newWorkers- LL   2 4 5<<#A& &rN)rperformCoordinationrrr0r)rtestCaserrrrs` rrzPoolHelper.__init__sK1C0D- T-  '%   48 .rrN) rr/rrUrr8rrArrhelpernxs rtest_workBeforeStartingz)MemoryBackedTests.test_workBeforeStartings D!R( q 9A    * *< 8 9%%' ,!%%' V^^,a0rct|dd}d}t|D]}|jjd |j |j |j g|jj|j |j t|j |jjy)z If the amount of work before starting exceeds the maximum number of threads allowed to the threadpool, only the maximum count will be started. rr@2cyr r(r(rrrzBMemoryBackedTests.test_tooMuchWorkBeforeStarting..rrN) rr/rrUrr8rrArr:rs rtest_tooMuchWorkBeforeStartingz0MemoryBackedTests.test_tooMuchWorkBeforeStartings D!R( q 9A    * *< 8 9%%' ,!%%' V^^,f.?.?.C.CDrN)r$r%r&rrr r(rrrrs 1 Err)rrYrwrrrStwisted._threadsrrtwisted.pythonrrrr twisted.trialr r synchronizeSynchronousTestCaser*rr7rrrr(rrrs   5CC"!!H 'H1h22H1V-S55-S`9&&9x--`$E44$Er