ϪfA2dZddlmZddlmZddlmZddlmZddlm Z ddl m Z m Z m Z mZmZmZmZmZmZddlmZmZdd lmZmZmZmZdd lmZdd lmZGd d eZ GddeZ!GddZ"GddeZ#GddeZ$GddeZ%y)z( Tests for L{twisted.internet.testing}. ) annotations)Callable) verifyObject)Protocol) IPv4Address) IAddress IConnector IConsumerIListeningPort IPushProducer IReactorSSL IReactorTCP IReactorUNIX ITransport) ClientFactoryFactory) MemoryReactorNonStreamingProducerRaisingMemoryReactorStringTransport)namedAny)TestCaseceZdZdZddZddZddZddZddZddZ ddZ dd Z dd Z dd Z dd Zdd ZddZddZddZddZddZddZddZy)StringTransportTestsz@ Tests for L{twisted.internet.testing.StringTransport}. c"t|_yN)r transportselfs D/usr/lib/python3/dist-packages/twisted/internet/test/test_testing.pysetUpzStringTransportTests.setUp+s (*c|jtt|j|jtt|j|jtt |jy)zq L{StringTransport} instances provide L{ITransport}, L{IPushProducer}, and L{IConsumer}. N) assertTruerrrr r rs r test_interfacesz$StringTransportTests.test_interfaces.sI  Z@A  ]DNNCD  Y?@r"ct}t}|jj|||j|jj||j|jj |y)zz L{StringTransport.registerProducer} records the arguments supplied to it as instance attributes. N)objectrregisterProducerassertIsproducer streamingrr*r+s r test_registerProducerz*StringTransportTests.test_registerProducer7sV 8H  '')< dnn--x8 dnn.. :r"cNt}|jj|d|jt|jjtd|j |jj ||j|jjy)zy L{StringTransport.registerProducer} raises L{RuntimeError} if a producer is already registered. TFN) r'rr( assertRaises RuntimeErrorr)r*r$r+)rr*s r test_disallowedRegisterProducerz4StringTransportTests.test_disallowedRegisterProducerBsr 8 ''$7  $..9968U  dnn--x8 001r"ct}t}|jj|d|jj|j |jj |jj|d|j |jj ||j|jjy)z L{StringTransport.unregisterProducer} causes the transport to forget about the registered producer and makes it possible to register a new one. FTN) r'rr(unregisterProducer assertIsNoner*r)r$r+)r oldProducer newProducers r test_unregisterProducerz,StringTransportTests.test_unregisterProducerOs h h  '' U; ))+ $..112 '' T: dnn--{; 001r"cX|jt|jjy)z L{StringTransport.unregisterProducer} raises L{RuntimeError} if called when no producer is registered. N)r/r0rr3rs r test_invalidUnregisterProducerz3StringTransportTests.test_invalidUnregisterProducer^s ,(I(IJr"cP|j|jjdy)zO L{StringTransport.producerState} is initially C{'producing'}. producingN) assertEqualr producerStaters r test_initialProducerStatez.StringTransportTests.test_initialProducerStatees 55{Cr"c|jj|j|jjdy)zy L{StringTransport.pauseProducing} changes the C{producerState} of the transport to C{'paused'}. pausedN)rpauseProducingr<r=rs r test_pauseProducingz(StringTransportTests.test_pauseProducingks. %%' 55x@r"c|jj|jj|j|jjdy)z} L{StringTransport.resumeProducing} changes the C{producerState} of the transport to C{'producing'}. r;N)rrAresumeProducingr<r=rs r test_resumeProducingz)StringTransportTests.test_resumeProducingss> %%' &&( 55{Cr"c|jj|j|jjdy)z{ L{StringTransport.stopProducing} changes the C{'producerState'} of the transport to C{'stopped'}. stoppedN)r stopProducingr<r=rs r test_stopProducingz'StringTransportTests.test_stopProducing|s. $$& 55yAr"c|jj|jt|jjy)zu L{StringTransport.pauseProducing} raises L{RuntimeError} if the transport has been stopped. N)rrHr/r0rArs r test_stoppedTransportCannotPausez5StringTransportTests.test_stoppedTransportCannotPauses. $$& ,(E(EFr"c|jj|jt|jjy)zv L{StringTransport.resumeProducing} raises L{RuntimeError} if the transport has been stopped. N)rrHr/r0rDrs r !test_stoppedTransportCannotResumez6StringTransportTests.test_stoppedTransportCannotResumes. $$& ,(F(FGr"c|jj|jt|jjy)zz L{StringTransport.pauseProducing} raises L{RuntimeError} if the transport is being disconnected. N)rloseConnectionr/r0rArs r &test_disconnectingTransportCannotPausez;StringTransportTests.test_disconnectingTransportCannotPauses. %%' ,(E(EFr"c|jj|jt|jjy)z{ L{StringTransport.resumeProducing} raises L{RuntimeError} if the transport is being disconnected. N)rrOr/r0rDrs r 'test_disconnectingTransportCannotResumezrBrErIrKrMrPrRrVr[r_rbrer"r rr&sr+A ; 2 2KD ADBGHGH6CO44r"rc@eZdZdZd dZd dZd dZd dZd dZd dZ y) ReactorTestszA Tests for L{MemoryReactor} and L{RaisingMemoryReactor}. cxt}tt|tt|tt|y)zt L{MemoryReactor} provides all of the attributes described by the interfaces it advertises. N)rrrr r)r memoryReactors r test_memoryReactorProvidesz'ReactorTests.test_memoryReactorProvidess) & [-0[-0\=1r"cxt}tt|tt|tt|y)z{ L{RaisingMemoryReactor} provides all of the attributes described by the interfaces it advertises. N)rrrr r)rraisingReactors r test_raisingReactorProvidesz(ReactorTests.test_raisingReactorProvidess* ./[.1[.1\>2r"c,t}|jddt|jddtdfD]j}t t ||j }t t||j|jd|j|jdl|jdt}t t ||j }t t||j|jdy)a L{MemoryReactor.connectTCP}, L{MemoryReactor.connectSSL}, and L{MemoryReactor.connectUNIX} will return an L{IConnector} whose C{getDestination} method returns an L{IAddress} with attributes which reflect the values passed. ztest.example.comi Ns /fake/path) r connectTCPr connectSSLrr getDestinationrr<hostport connectUNIXname)rrq connectorrZs r test_connectDestinationz$ReactorTests.test_connectDestinations&  $ $%7} O  $ $%7}PT U  1I Y /..0G 7 +   W\\+= >   W\\4 0 1"--m]_M Z+**,Xw' }5r"c(t}|jdt|jdtdfD]j}t t ||j }t t||j|jd|j|jdl|jdt}t t ||j }t t||j|jdy)a L{MemoryReactor.listenTCP}, L{MemoryReactor.listenSSL} and L{MemoryReactor.listenUNIX} will return an L{IListeningPort} whose C{getHost} method returns an L{IAddress}; C{listenTCP} and C{listenSSL} will have a default host of C{'0.0.0.0'}, and a port that reflects the value passed, and C{listenUNIX} will have a name that reflects the path passed. i2 Nz0.0.0.0s/path/to/socket) r listenTCPr listenSSLrr rXrr<rzr{ listenUNIXr})rrqr{rZs r test_listenDefaultHostz#ReactorTests.test_listenDefaultHosts&  # #D') 4  # #D')T :  1D  .llnG 7 +   W\\9 5   W\\4 0 1''(:GIF^T*,,.Xw' '9:r"ct}t}|j||j||j|j |g|j ||j|j gy)z> Adding, removing, and listing readers works. N)r'r addReaderr< getReaders removeReader)rreaderreactors r test_readerszReactorTests.test_readersp/&!&! ++-x8V$ ++-r2r"ct}t}|j||j||j|j |g|j ||j|j gy)z> Adding, removing, and listing writers works. N)r'r addWriterr< getWriters removeWriter)rwriterrs r test_writerszReactorTests.test_writers'rr"Nrf) rirjrkrlrrrurrrrrmr"r roros%236.;23 3r"roc0eZdZdZddZddZddZd dZy) TestConsumerzP A very basic test consumer for use with the NonStreamingProducerTests. c.g|_d|_d|_yr)writesr*producerStreamingrs r __init__zTestConsumer.__init__=s#% $ .2r"c ||_||_y)z Registers a single producer with this consumer. Just keeps track of it. @param producer: The producer to register. @param streaming: Whether the producer is a streaming one or not. Nr*rr,s r r(zTestConsumer.registerProducerBs! !*r"c d|_d|_y)zC Forget the producer we had previously registered. Nrrs r r3zTestConsumer.unregisterProducerLs !%r"c:|jj|y)zz Some data was written to the consumer: stores it for later use. @param data: The data to write. N)rappend)rdatas r writezTestConsumer.writeSs 4 r"Nrf)r*r'r+boolrgrh)rbytesrgrh)rirjrkrlrr(r3rrmr"r rr8s3 +&!r"rc eZdZdZddZddZy)NonStreamingProducerTestszF Tests for the L{NonStreamingProducer} to validate behaviour. cZt}t|}|j|d|j|j||j|j ||j |jtdD]}|jgd}|j|j|j|j|j|j |j|j||jt|jy)z When the L{NonStreamingProducer} has resumeProducing called 10 times, it writes the counter each time and then fails. F ) 0123456789N)rrr(r)r*consumerrTrrangerDr4r<rr/r0)rrr*_expectedWritess r test_producesOnly10Timesz2NonStreamingProducerTests.test_producesOnly10Timesas  >'1!!(E2 h''2 h''2 334r 'A  $ $ & 'V (++, (445 (++, .9 ,(@(@Ar"ct}t|}|j|d|j|j t |j y)zb When the L{NonStreamingProducer} is paused, it raises a L{RuntimeError}. FN)rrr(rDr/r0rA)rrr*s r test_cannotPauseProductionz4NonStreamingProducerTests.test_cannotPauseProduction{sI  >'1!!(E2   " ,(?(?@r"Nrf)rirjrkrlrrrmr"r rr\sB4 Ar"rceZdZeddZy) _SupportsNamecyrrmrs r riz_SupportsName.__name__s r"N)rgstr)rirjrkpropertyrmr"r rrs   r"rceZdZdZddZddZddZddZddZddZ ddZ dd Z dd Z dd Z dd Zdd ZddZddZddZy)DeprecationTestsz8 Deprecations in L{twisted.test.proto_helpers}. cd|j}|j|g}|jt|dd|jdt ||j ||dd|j |t|y)Nztwisted.internet.testing.rcategorymessage)ri flushWarningsr<DeprecationWarninglenassertInr)r)rtestobjnew_pathwarningss r helperzDeprecationTests.helpers|.s||n=%%tf- +Xa[-DE CM* h I 67 c8H-.r"cHddlm}|j|j|y)Nr)AccumulatingProtocol)twisted.test.proto_helpersrrtest_accumulatingProtocol)rrs r rz*DeprecationTests.test_accumulatingProtocolC D224HIr"cHddlm}|j|j|y)Nr)LineSendingProtocol)rrrtest_lineSendingProtocol)rrs r rz)DeprecationTests.test_lineSendingProtocolsB D113FGr"cHddlm}|j|j|y)Nr)FakeDatagramTransport)rrrtest_fakeDatagramTransport)rrs r rz+DeprecationTests.test_fakeDatagramTransportsD D335JKr"cHddlm}|j|j|y)Nr)r)rrrtest_stringTransport)rrs r rz%DeprecationTests.test_stringTransports> D--?r"cHddlm}|j|j|y)Nr) StringTransportWithDisconnection)rrr%test_stringTransportWithDisconnection)rrs r rz6DeprecationTests.test_stringTransportWithDisconnectionsO  6 68X r"cHddlm}|j|j|y)Nr)StringIOWithoutClosing)rrrtest_stringIOWithoutClosing)rrs r rz,DeprecationTests.test_stringIOWithoutClosingsE D446LMr"cHddlm}|j|j|y)Nr)_FakeConnector)rrrtest__fakeConnector)rrs r rz$DeprecationTests.test__fakeConnectors= D,,n=r"cHddlm}|j|j|y)Nr) _FakePort)rrrtest__fakePort)rrs r rzDeprecationTests.test__fakePorts8 D''3r"cHddlm}|j|j|y)Nr)r)rrrtest_memoryReactor)rrs r rz#DeprecationTests.test_memoryReactors< D++];r"cHddlm}|j|j|y)Nr)MemoryReactorClock)rrrtest_memoryReactorClock)rrs r rz(DeprecationTests.test_memoryReactorClocksA D002DEr"cHddlm}|j|j|y)Nr)r)rrrtest_raisingMemoryReactor)rrs r rz*DeprecationTests.test_raisingMemoryReactorrr"cHddlm}|j|j|y)Nr)r)rrrtest_nonStreamingProducer)rrs r rz*DeprecationTests.test_nonStreamingProducerrr"cHddlm}|j|j|y)Nr)waitUntilAllDisconnected)rrrtest_waitUntilAllDisconnected)rrs r rz.DeprecationTests.test_waitUntilAllDisconnectedsG D668PQr"cHddlm}|j|j|y)Nr)EventLoggingObserver)rrrtest_eventLoggingObserver)rrs r rz*DeprecationTests.test_eventLoggingObserverrr"N)rzCallable[..., object]rrrgrhrf)rirjrkrlrrrrrrrrrrrrrrrrmr"r rrs\/J H L @  N > 4 < F J J R Jr"rN)&rl __future__rtypingrzope.interface.verifyrrtwisted.internet.addressrtwisted.internet.interfacesrr r r r r rrrtwisted.internet.protocolrrtwisted.internet.testingrrrrtwisted.python.reflectrtwisted.trial.unittestrrrorrrrrmr"r rs#.0   = ,+e48e4Pg38g3T!!!!H+A+A\ H SJxSJr"