Ϫf.dZddlZddlmZddlmZmZmZmZddl m Z ddl m Z ddl mZddlmZGd d Zee Gd d Zeej&ej(Gd dZdZdZefdZdZdZeej&ej(GddZGddej8ZGddej<ZddZ ddZ!y)zF Testing support for protocols -- loopback between client and server. N) implementer)defer interfacesmainprotocol)IAddress) deferLater)policies)failurec4eZdZdZdZdZdZdZdZeZ dZ y)_LoopbackQueuez Trivial wrapper around a list to give it an interface like a queue, which the addition of also sending notifications by way of a Deferred whenever the list has something added to it. NFcg|_yN)_queueselfs MH $ "" ,rr.cX|r(|j}|y|j||r'yy)z L{identityPumpPolicy} is a policy which delivers each chunk of data written to the given queue as-is to the target. This isn't a particularly realistic policy. @see: L{loopbackAsync} N)r# dataReceived)queuetargetr3s ridentityPumpPolicyrYjs-   = E" rcg}|r'|j}|n|j||r'|r!|jdj|yy)z L{collapsingPumpPolicy} is a policy which collapses all outstanding chunks into a single string and delivers it to the target. @see: L{loopbackAsync} Nr)r#rrVr:)rWrXr3chunks rcollapsingPumpPolicyr\zsP E   =  U   CHHUO, rct}t}|jt||jt|t|||||S)a Establish a connection between C{server} and C{client} then transfer data between them until the connection is closed. This is often useful for testing a protocol. @param server: The protocol instance representing the server-side of this connection. @param client: The protocol instance representing the client-side of this connection. @param pumpPolicy: When either C{server} or C{client} writes to its transport, the string passed in is added to a queue of data for the other protocol. Eventually, C{pumpPolicy} will be called with one such queue and the corresponding protocol object. The pump policy callable is responsible for emptying the queue and passing the strings it contains to the given protocol's C{dataReceived} method. The signature of C{pumpPolicy} is C{(queue, protocol)}. C{queue} is an object with a C{get} method which will return the next string written to the transport, or L{None} if the transport has been disconnected, and which evaluates to C{True} if and only if there are more items to be retrieved via C{get}. @return: A L{Deferred} which fires when the connection has been closed and both sides have received notification of this. )r makeConnectionr._loopbackAsyncBody)serverclient pumpPolicyserverToClientclientToServers r loopbackAsyncresT6$%N#%N ,^<= ,^<=   rc$fd} dx}x}}||||}||||}|s@|s>tj} | |_| |_| jt||||| S|j r d}||||n|j r d}|||||ry|j tjtj|j tjtjtjdS )ak Transfer bytes from the output queue of each protocol to the input of the other. @param server: The protocol instance representing the server-side of this connection. @param serverToClient: The L{_LoopbackQueue} holding the server's output. @param client: The protocol instance representing the client-side of this connection. @param clientToServer: The L{_LoopbackQueue} holding the client's output. @param pumpPolicy: See L{loopbackAsync}. @return: A L{Deferred} which fires when the connection has been closed and both sides have received notification of this. cbd}|r ||d}|r|s|jj|S)NFT) transportrK)sourcer0rXsentrbs rpumpz _loopbackAsyncBody..pumps8 q& !D     * * , rFTN) rDeferredr addCallback_loopbackAsyncContinuer(connectionLostr FailurerCONNECTION_DONEsucceed) r`rcrardrbrkr( clientSent serverSentrs ` rr_r_s (  /44 4Z*&.&9 &.&9 * A34N 034N 0 MM&  H  $ $J  0  & &J  0   ! !'//$2F2F"G H  ! !'//$2F2F"G H==& &I rc Vd|_d|_ddlm}t|dt|||||S)Nrreactor)rtwisted.internetrwr r_)ignoredr`rcrardrbrws rrnrns> ,0N(+/N( )    rcZeZdZdZdZdZdZddZdZdZ dZ dZ d Z d Z d Zd Zd Zy) LoopbackRelayrrNc ||_||_yr)rXlogFile)rrXr}s rrzLoopbackRelay.__init__s  rc|j|z|_|jr(|jjdt|zyy)Nzloopback writing %s )bufferr}r7reprr5s rr7zLoopbackRelay.writes:kkD( << LL  6dC D rcD|jdj|yr9)r7r:r;s rr=zLoopbackRelay.writeSequence"s 388E?#rc|jdk(ry|jr|jj|jrk|jr1|jj dt |jz|j}d|_|jj||jdk(rDd|_|jjtjtjyy)Nzloopback receiving %s r) shouldLoserIrSrr}r7rrXrVror rprrq)rrs r clearBufferzLoopbackRelay.clearBuffer%s ??b  == MM ) ) + ;;|| ""#>rr)r$r%r&rrrTrIrr7r=rr?rGrDrNrQrr*rrr{r{sJ FJMHE $N  !?rr{ceZdZdZdZdZy)LoopbackClientFactorycRd|_tj|_||_yr!) disconnectedrrldeferredr)rrs rrzLoopbackClientFactory.__init__Js(   rc|jSr)r)raddrs r buildProtocolz#LoopbackClientFactory.buildProtocolOs }}rcHd|_|jjdy)Nr)rrr)r connectorreasons rclientConnectionLostz*LoopbackClientFactory.clientConnectionLostRs t$rN)r$r%r&rrrr*rrrrIs! %rrceZdZdZdZy) _FireOnClosecxtjj|||tj|_yr)r ProtocolWrapperrrrlr)rrfactorys rrz_FireOnClose.__init__Xs(  ))$'B( rcztjj|||jj dyr)r rrorr)rrs rroz_FireOnClose.connectionLost\s*  //f= t$rN)r$r%r&rror*rrrrWs )%rrc ddlm}tjt j }t || ||_ fd|_|j||dt|}||_|jdjj||j}|j fd|jfd|S)zBRun session between server and client protocol instances over TCP.rrvcSrr*r serverWrappers rzloopbackTCP..h=rz 127.0.0.1) interfacecjSrrxrs rrzloopbackTCP..n M22rc$jSr stopListeningr serverPorts rrzloopbackTCP..oJ446r)rxrwr WrappingFactoryrFactoryrnoisyr listenTCPr connectTCPrGportrrm) r`rarrrwfclientFrrrs @@r loopbackTCPras(  !1!1!34A F+MAG0AO""4k"BJ#F+GGM {J$6$6$8$=$=wGAMM23MM67 Hrc tj}ddlm}t j t j}t|| ||_ fd|_ |j||t|}||_ |j|||j}|j fd|jfd|S)zJRun session between server and client protocol instances over UNIX socket.rrvcSrr*rs rrzloopbackUNIX..{rrcjSrrrs rrzloopbackUNIX..rrc$jSrrrs rrzloopbackUNIX..rr)tempfilemktemprxrwr rrrrrr listenUNIXr connectUNIXrrm) r`rarpathrwrrrrrs @@r loopbackUNIXrss ?? D(  !1!1!34A F+MAG0AO##D!,J#F+GGM g&AMM23MM67 Hr)rT)T)"r'rzope.interfacerrxrrrrtwisted.internet.interfacesrtwisted.internet.taskr twisted.protocolsr twisted.pythonr r r, ITransport IConsumerr.rYr\rer_rnr{ ClientFactoryrrrrrr*rrrs  &>>0,'"""8 X    Z " "J$8$89-,-,:-,` # -".@#LD'N4 Z " "J$8$893?3?:3?l %H22 %%8++% $ r