ϪfdZddlmZddlZddlmZddlmZ ddlm Z m Z dZ d dZ d d ZGd d eZy#e $rdZ YwxYw)z. Tests for L{twisted.internet.kqueuereactor}. ) annotationsN) implementer)TestCase) KQueueReactor_IKQueuezKQueue not available.cy)z Do nothing. N)argskwargss J/usr/lib/python3/dist-packages/twisted/internet/test/test_kqueuereactor.py _fakeKEventr scRttGfdd}|S)a Create a fake that implements L{_IKQueue}. @param testKQueue: Something that acts like L{select.kqueue}. @param testKEvent: Something that acts like L{select.kevent}. @return: An implementation of L{_IKQueue} that includes C{testKQueue} and C{testKEvent}. ceZdZWZWZy)"makeFakeKQueue..FakeKQueueN)__name__ __module__ __qualname__kqueuekevent) testKEvent testKQueuesr FakeKQueuer's rr)rr)rrrs`` r makeFakeKQueuers+ <rceZdZdZeZddZy) KQueueTestsz These are tests for L{KQueueReactor}'s implementation, not its real world behaviour. For that, look at L{twisted.internet.test.reactormixins.ReactorBuilder}. clGdd}tt|t}|jdy)zV L{KQueueReactor} handles L{errno.EINTR} in C{doKEvent} by returning. ceZdZdZddZy)*KQueueTests.test_EINTR..FakeKQueuez A fake KQueue that raises L{errno.EINTR} when C{control} is called, like a real KQueue would if it was interrupted. c6ttjd)N Interrupted)OSErrorerrnoEINTR)selfr r s r controlz2KQueueTests.test_EINTR..FakeKQueue.controlCsekk=99rNr objectr r(returnNone)rrr__doc__r&r rr rr=s    :rrrN)rrr doKEvent)r%rreactors r test_EINTRzKQueueTests.test_EINTR8s/  : : z; GH rN)r)r*)rrrr+ kqueueSkipskipr.r rr rr/s Drrr')rr(rr(r)r)r+ __future__rr#zope.interfacertwisted.trial.unittestrtwisted.internet.kqreactorrrr/ ImportErrorr rrr rr r6sQ# &+)BJ  $(9)(J)s 8AA