b%ddlZddlZddlZddlZddlZddlmZmZmZm Z m Z m Z m Z m Z mZmZmZmZmZmZmZmZmZmZddlmZmZmZmZddlmZddlm Z ddl!m"Z"m#Z#m$Z$m%Z%m&Z&ddl'm(Z(m)Z)m*Z*m+Z+ddl,m-Z-m.Z.dd l/m0Z0m1Z1m2Z2m3Z3m4Z4m5Z5m6Z6dd l7m8Z8m9Z9m:Z:dd l;me>fge dfZ?ed e: Z@ede:e9ZAedd ZBedee8eCf ZDerddlEmFZFGdde-ZGGdde-ZHeeGeHfZIGdde$e)e(e*ZJeJZKGdde.ZLGddZMGddZNGdde-ZOGd d!e-ZPeeOePfZQeeee>eRfd"fee>effZSe eSZTGd#d$eJZUy)%N) TYPE_CHECKINGAny AsyncIterator AwaitableCallableDictIterableListMappingMutableMappingNoReturnOptionalSetTupleTypeTypeVarUnioncast) ConnectionConnectionPool SSLConnectionUnixDomainSocketConnectionLock)Retry)EMPTY_RESPONSE NEVER_DECODE AbstractRedisCaseInsensitiveDictbool_ok)AsyncCoreCommandsAsyncRedisModuleCommandsAsyncSentinelCommands list_or_args)Protocol TypedDict)ConnectionErrorExecAbortError PubSubError RedisError ResponseError TimeoutError WatchError)ChannelT EncodableTKeyT)safe_str str_if_bytes_KeyT)bound_ArgT_RedisTRedis_NormalizeKeysT)ScriptceZdZdefdZy)ResponseCallbackProtocolresponsec yNselfr<kwargss 6/usr/lib/python3/dist-packages/redis/asyncio/client.py__call__z!ResponseCallbackProtocol.__call__G N__name__ __module__ __qualname__rrDr?rFrCr;r;Fs  rFr;ceZdZdefdZy)AsyncResponseCallbackProtocolr<c Kywr>r?r@s rCrDz&AsyncResponseCallbackProtocol.__call__L  NrGr?rFrCrLrLKs s rFrLc;eZdZUdZeeeefefe d<e defdZ ddddddddddd d d d dd ddd ddd dd ddddd dddede deee fde ede ede ede ede ee ee effde ede ededededede edede ed e ed!ed"e ed#e ed$ed%e e d&ed'e d(e ed)e ed*e ed+ef:d,Zd-Zd.Zd/ed0efd1Zd2ed3efd4Zd5Zd6Zd7Z d[d8ed9e ed0d:fd;Zdd dd<d=ed:geee effd>e!d9e ed?ed@e ef dAZ" d\dBe!dCe edDedEe edFe e#e$dGed0e$fdHZ%d]dIZ&d^dJZ'd_dKZ(d/ed0efdLZ)dMZ*dNZ+e,fdOed0dfdPZ-d`dQe ed0dfdRZ.dSZ/dTe0dUe1fdVZ2dWZ3dXe0dYeeeffdZZ4y)ar7a Implementation of the Redis protocol. This abstract class provides a Python interface to all Redis commands and an implementation of the Redis protocol. Pipelines derive from this, implementing how the commands are sent and received to the Redis server. Based on configuration, an instance will either use a ConnectionPool, or Connection object to talk to redis. response_callbacksurlc @tj|fi|}||S)a Return a Redis client object configured from the given URL For example:: redis://[[username]:[password]]@localhost:6379/0 rediss://[[username]:[password]]@localhost:6379/0 unix://[[username]:[password]]@/path/to/socket.sock?db=0 Three URL schemes are supported: - `redis://` creates a TCP socket connection. See more at: - `rediss://` creates a SSL wrapped TCP socket connection. See more at: - ``unix://``: creates a Unix Domain Socket connection. The username, password, hostname, path and all querystring values are passed through urllib.parse.unquote in order to replace any percent-encoded values with their corresponding characters. There are several ways to specify a database number. The first value found will be used: 1. A ``db`` querystring option, e.g. redis://localhost?db=0 2. If using the redis:// or rediss:// schemes, the path argument of the url, e.g. redis://localhost/0 3. A ``db`` keyword argument to this function. If none of these options are specified, the default db=0 is used. All querystring options are cast to their appropriate Python types. Boolean arguments can be specified with string values "True"/"False" or "Yes"/"No". Values that cannot be properly cast cause a ``ValueError`` to be raised. Once parsed, the querystring arguments and keyword arguments are passed to the ``ConnectionPool``'s class initializer. In the case of conflicting arguments, querystring arguments always win. )connection_pool)rfrom_url)clsrRrBrTs rCrUzRedis.from_urlds%R)11#@@?33rF localhostirNzutf-8strictFrequiredT)hostportdbpasswordsocket_timeoutsocket_connect_timeoutsocket_keepalivesocket_keepalive_optionsrTunix_socket_pathencodingencoding_errorsdecode_responsesretry_on_timeoutretry_on_errorssl ssl_keyfile ssl_certfile ssl_cert_reqs ssl_ca_certs ssl_ca_datassl_check_hostnamemax_connectionssingle_connection_clienthealth_check_interval client_nameusernameretryauto_close_connection_poolredis_connect_funcrZr[r\r]r^r_r`rarTrbrcrdrerfrgrhrirjrkrlrmrnrorprqrrrsrtruc| |nd|_| s|sg}|dur|jt||||| | | ||tj|||||d}| |j | t dn6|j |||||d|r|j t||||||dtdi|} | |_ ||_ d|_ t|jj|_y) a4 Initialize a new Redis client. To specify a retry policy for specific errors, first set `retry_on_error` to a list of the error/s to retry on, then set `retry` to a valid `Retry` object. To retry on TimeoutError, `retry_on_timeout` can also be set to `True`. NFT)r\rsr]r^rcrdrerfrgrtrorqrrrv)pathconnection_class)rZr[r_r`ra)ryrirjrkrlrmrnr?)ruappendr,copydeepcopyupdaterrrrTrp connectionr __class__RESPONSE_CALLBACKSrQ) rArZr[r\r]r^r_r`rarTrbrcrdrerfrgrhrirjrkrlrmrnrorprqrrrsrtrurvrBs rC__init__zRedis.__init__s^+:*A &u '!!#4'%%l3$$"0$#2$4$4"0u-#2)>*&8F" + 0,F $ $2H,<4L MM0=+6,8-:,8+62D -6v6O.(@%04"5dnn6W6W"XrFcN|jjd|jdS)N<>)rrHrTrAs rC__repr__zRedis.__repr__s'..))*!D,@,@+C1EErFc>|jjSr>) initialize __await__rs rCrzRedis.__await__s **,,rFrAreturncK|jr4|j(|jjdd{|_|S7 w)N_)rpr~rTget_connectionrs rCrzRedis.initializes>  ( (T__-D$($8$8$G$G$LLDO Ms7AA Acommandcallbackc"||j|<y)zSet a custom Response CallbackN)rQ)rArrs rCset_response_callbackzRedis.set_response_callbacks+3(rFc6|jjS)z!Get the connection pool's encoder)rT get_encoderrs rCrzRedis.get_encoder s##//11rFc.|jjS)z'Get the connection's key-word arguments)rTconnection_kwargsrs rCget_connection_kwargszRedis.get_connection_kwargss##555rFct|||y)a This function can be used to add externally defined redis modules, and their namespaces to the redis client. funcname - A string containing the name of the function to create func - The function, being added to this class. ex: Assume that one has a custom redis module named foomod that creates command named 'foo.dothing' and 'foo.anotherthing' in redis. To load function functions into this namespace: from redis import Redis from foomodule import F r = Redis() r.load_external_module("foo", F) r.foo().dothing('your', 'arguments') For a concrete example see the reimport of the redisjson module in tests/test_connection.py::test_loading_external_modules N)setattr)rAfuncnamefuncs rCload_external_modulezRedis.load_external_modules* h%rF transaction shard_hintPipelinecFt|j|j||S)a_ Return a new pipeline object that can queue multiple commands for later execution. ``transaction`` indicates whether all commands should be executed atomically. Apart from making a group of operations atomic, pipelines are useful for reducing the back-and-forth overhead between the client and server. )rrTrQ)rArrs rCpipelinezRedis.pipeline+s&  $"9"9;   rF)rvalue_from_callable watch_delayrwatchesrrcK|jd|4d{} |r|j|d{||}tj|r |d{}|j d{}|r|n| cdddd{S7v7]787"7 #t $r(|#|dkDrt j|d{7YwxYw#1d{7swYyxYww)z Convenience method for executing the callable `func` as a transaction while watching all keys specified in `watches`. The 'func' callable should expect a single argument which is a Pipeline object. TNr)rwatchinspect isawaitableexecuter-asynciosleep) rArrrrrpipe func_value exec_values rCrzRedis.transaction9s==z2  d (djj'222!%dJ**:6+5%5 '+||~!5J)<:*L   3&6!5 "".;?%mmK888   sC#BC#CBB&BBB3B4 B> C# B C#BBBC#(C CC C C  CC C C C#nametimeoutrblocking_timeout lock_class thread_localc.|t}|||||||S)a Return a new Lock object using key ``name`` that mimics the behavior of threading.Lock. If specified, ``timeout`` indicates a maximum life for the lock. By default, it will remain locked until release() is called. ``sleep`` indicates the amount of time to sleep per loop iteration when the lock is in blocking mode and another client is currently holding the lock. ``blocking_timeout`` indicates the maximum amount of time in seconds to spend trying to acquire the lock. A value of ``None`` indicates continue trying forever. ``blocking_timeout`` can be specified as a float or integer, both representing the number of seconds to wait. ``lock_class`` forces the specified lock implementation. Note that as of redis-py 3.0, the only lock class we implement is ``Lock`` (which is a Lua-based lock). So, it's unlikely you'll need this parameter, unless you have created your own custom lock class. ``thread_local`` indicates whether the lock token is placed in thread-local storage. By default, the token is placed in thread local storage so that a thread only sees its token, not a token set by another thread. Consider the following timeline: time: 0, thread-1 acquires `my-lock`, with a timeout of 5 seconds. thread-1 sets the token to "abc" time: 1, thread-2 blocks trying to acquire `my-lock` using the Lock instance. time: 5, thread-1 has not yet completed. redis expires the lock key. time: 5, thread-2 acquired `my-lock` now that it's available. thread-2 sets the token to "xyz" time: 6, thread-1 finishes its work and calls release(). if the token is *not* stored in thread local storage, then thread-1 would see the token value as "xyz" and would be able to successfully release the thread-2's lock. In some use cases it's necessary to disable thread local storage. For example, if you have code where one thread acquires a lock and passes that lock instance to a worker thread to release later. If thread local storage isn't disabled in this case, the worker thread won't see the token set by the thread that acquired the lock. Our assumption is that these cases aren't common and as such default to using thread local storage.)rrrrr)rArrrrrrs rClockz Redis.lockVs/n  J  -%   rFc .t|jfi|S)z Return a Publish/Subscribe object. With this object, you can subscribe to channels and listen for messages that get published to them. )PubSubrT)rArBs rCpubsubz Redis.pubsubs d**5f55rFc,t|jSr>)MonitorrTrs rCmonitorz Redis.monitorst++,,rFc<|j|jdS)NT)rTrp)rrTrs rCclientz Redis.clients#~~ 004  rFc>K|jd{S7wr>)rrs rC __aenter__zRedis.__aenter__s__&&&&s c@K|jd{y7wr>)closerAexc_type exc_value tracebacks rC __aexit__zRedis.__aexit__jjl zUnclosed Redis client _warningsc|jN|jd|t|||jd}t j j |yy)NzUnclosed client session )source)rmessage)r~warnResourceWarning _DEL_MESSAGErget_event_loopcall_exception_handler)rArcontexts rC__del__z Redis.__del__sY ?? & NN*4(3_T  "&$2C2CDG  " " $ ; ;G D 'rFclose_connection_poolcK|j}|r*d|_|jj|d{|s|0|jr#|jj d{yyy797 w)aB Closes Redis client connection :param close_connection_pool: decides whether to close the connection pool used by this Redis client, overriding Redis.auto_close_connection_pool. By default, let Redis.auto_close_connection_pool decide whether to close the connection pool. N)r~rTreleaseru disconnect)rArconns rCrz Redis.closesr "DO&&..t4 4 4 ! )d.M.M&&113 3 3/N ) 5 4s!4A4A01A4(A2)A42A4cxK|j|d{|j||fi|d{S7 7w)z7 Send a command and parse the response N) send_commandparse_response)rAr command_nameargsoptionss rC_send_command_parse_responsez"Redis._send_command_parse_responsesD d&&&(T((|GwGGG 'Gs:6:8::rerrorcK|jd{|j!t|t|jdur|y74w)z Close the connection and raise an exception if retry_on_timeout is not set or the error is not a TimeoutError NF)rrg isinstancetuplerArrs rC_disconnect_raisezRedis._disconnect_raisesN oo    '%t':':!;<EKF sA A 5A cKjd{j}djxs|jfid{ jj fdfdd{js|j d{SS77\7.7 #js|j d{7wwxYww)z.Execute a command and return a parsed responseNrc2jgiSr>rrrrrrAsrCz'Redis.execute_command..s*999,)-18rFc(j|Sr>)rrrrAs rCrz'Redis.execute_command..sd44T5ArF)rrTr~rrtcall_with_retryr)rArrpoolrrs``` @@rCexecute_commandzRedis.execute_commandsoo##Aw T(;(;(;L(TG(T"T )33B ??ll4(((# #U)??ll4(((#siC$B09C$B2C$*B8B4B8 C$)B6*C$2C$4B86C$8!C!CC!!C$r~rc~K t|vr|jdd{}n|jd{}||jvrGt t |}|j||fi|}tj|r |d{S|S|S7t7]#t$rt|vr |tcYSwxYw7-w)z'Parses a response from the Redis serverT)disable_decodingN) r read_responser+rrQrstrrr)rAr~rrr<retvals rCrzRedis.parse_responses w&!+!9!94!9!PP!+!9!9!;; 422 2\2L:T,,\:8OwOF#*#6#6v#>< JF JQ; (~..   sTB=BBBBBAB= B;B=BBB84B=6B88B=)TN)Ng?NNT)rr)rr)rr7r>)5rHrIrJ__doc__r rrbytesResponseCallbackT__annotations__ classmethodrUintrfloatboolr rlistrrrrr6rrrrrrrrrr0rrrrrrrrrrwarningsrrrr Exceptionrrrr?rFrCr7r7Ss5 'uS%Z'8:K'KLL)43)4)4\ "&*.26+/NR48*.'!&!&)-%)&*'&*%)#()-).%&%)"&!%+/AkYkY kY #s(O kY 3- kY!kY!)kY#4.kY#+73c5j8I3I+J"KkY".1kY#3-kYkYkYkY !kY"!#kY$%kY&c]'kY(sm)kY*+kY,sm-kY.c]/kY0!1kY2"#3kY4#'5kY6 #7kY8c]9kY:3-;kY<=kY>%)?kYZF-w7 4S4%??@SM  "  e_ @$(,0+/!@ @ %@  @ #5/ @ T$Z( @ @  @ D6- 'w'7'+L'/EEDE4$444$H J y )$$49#u*4ErFcJeZdZUeed<eed<eed<eed<eed<eed<y)MonitorCommandInfotimer\client_address client_port client_typerN)rHrIrJrrrrr?rFrCrr s# K G LrFrceZdZdZej dZej dZdefdZ dZ dZ dZ d e fd Zd ee fd Zy ) rz Monitor is useful for handling the MONITOR command to the redis server. next_command() method returns one command from monitor listen() method yields commands from monitor. z\[(\d+) (.*)\] (.*)z"(.*?)(?)rTr~)rArTs rCrzMonitor.__init__s.04rFczK|j)|jjdd{|_yy7 w)NMONITOR)r~rTrrs rCconnectzMonitor.connect s3 ?? "$($8$8$G$G $RRDO #Rs +;9 ;cK|jd{|jjdd{|jjd{}t |st d||S7d7C7#w)NrzMONITOR failed: )r r~rrr r*rAr<s rCrzMonitor.__aenter__$sollnoo**95556688x /z:; ; 58s1BA;"BA=!BA? B=B?BcK|jjd{|jj|jd{y727wr>)r~rrTr)rArs rCrzMonitor.__aexit__-sCoo((***""**4??;;; +;s!AA,A AAArcK|jd{|jjd{}t|tr'|jj j |d}|jdd\}}|jj|}|j\}}}dj|jj|}|jdd}|dk(rd}d } d} n2|jd r d }|d d} d } n|j!d d\}} d } t#|t%||| | |dS777w)z)Parse the response from a monitor commandNTforce z\""luaunix:tcp)rr\rrrr)r r~rrrencoderdecodesplit monitor_rematchgroupsjoin command_refindallreplace startswithrsplitrr) rAr< command_time command_datamdb_id client_inforrrrs rC next_commandzMonitor.next_command1sClln6688 h &..55hd5KH%-^^C%;" l OO ! !, /&'hhj#{G((4??227;<//%- % "NKK  # #F +#N%ab/K K+6*<*?,rFrc^eZdZdZdZdZdZ d-dedee de fd Z d Z d Z d Zd ZdeefdZdefdZedZdefdZdZdZdZd.de defdZdZdedefdZ de!de"fdZ#de!defd Z$de!de%fd!Z&defd"Z'de(fd#Z) d/de defd$Z*d0defd%Z+d1d&Z,dd'd(d)ed*d+eddfd,Z-y)2ra* PubSub provides publish, subscribe and listen support to Redis channels. After subscribing to one or more channels, the listen() method will block until a message arrives on one of the subscribed channels. That message will be returned and it's safe to start listening again. )rpmessage) unsubscribe punsubscribezredis-py-health-checkNrTrignore_subscribe_messagesc||_||_||_d|_||_|j|jj |_|jj rd|jg|_n,d|jj|jg|_i|_ t|_ i|_ t|_tj |_y)Npongspong)rTrr3r~rrreHEALTH_CHECK_MESSAGEhealth_check_responseencodechannelssetpending_unsubscribe_channelspatternspending_unsubscribe_patternsrr_lock)rArTrr3rs rCrzPubSub.__init__hs /$)B& << //;;=DL << ( ())GD &  ##D$=$=>*D & ,/E) ,/E)\\^ rFcK|Swr>r?rs rCrzPubSub.__aenter__  c@K|jd{y7wr>resetrs rCrzPubSub.__aexit__rrcR|jr|jjyyr>)r~clear_connect_callbacksrs rCrzPubSub.__del__s ?? OO 3 3 5 rFcK|j4d{|jrp|jjd{|jj|jj |jd{d|_i|_t|_i|_ t|_ dddd{y777L7 #1d{7swYyxYwwr>) r>r~rrFrTrr9r:r;r<r=rs rCrDz PubSub.resets:: 6 6oo00222779**224??CCC"&DM03D -DM03D - 6 6 62C 6 6 6 6siC.CC.*CCACC 7C C. C C.CCC.C+C" C+'C.rc"|jSr>rCrs rCrz PubSub.closeszz|rFr~c&K|jj|jj|jr^i}|jj D]%\}}|||j j |d<'|jdi|d{|jr_i}|jj D]%\}}|||j j |d<'|jdi|d{yy7p7w)zBRe-subscribe to any channels and patterns previously subscribed toTrNr?) r;clearr=r9itemsrr subscriber< psubscribe)rAr~r9kvr<s rC on_connectzPubSub.on_connects ))//1 ))//1 ==H ++- A1?@,,Qd,;< A $..,8, , , ==H ++- A1?@,,Qd,;< A!$//-H- - -  - .s%BDD A)DDDDcHt|jxs |jS)z@Indicates if there are subscriptions to any channels or patterns)rr9r<rs rC subscribedzPubSub.subscribedsDMM2T]]33rFrcK|jd{|j}d|j i}|j||jg|i|d{y7I7w)z#Execute a publish/subscribe commandN check_health)r r~rR_executer)rArr~rBs rCrzPubSub.execute_commandsclln__  doo"56dmmJ (?(?Q$Q&QQQ  Rs"A$A AA$A"A$"A$c"K|jY|jjd|jd{|_|jj |j y|jj d{y7R7w)z5 Ensure that the PubSub is connected Nr)r~rTrrregister_connect_callbackrPr rs rCr zPubSub.connectsq ?? "$($8$8$G$G$//%DO OO 5 5doo F//))+ + + ,s"6BB A BB B BcK|jd{|jrt|ts||j d{y7;7w)z Close the connection and raise an exception if retry_on_timeout is not set or the error is not a TimeoutError. Otherwise, try to reconnect N)rrfrr,r rs rC_disconnect_raise_connectz PubSub._disconnect_raise_connectsI oo%%*UL*IKlln  s!AA5A A AAcrKjjfdfdd{S7w)aU Connect manually upon disconnection. If the Redis server is down, this will fail and raise a ConnectionError as desired. After reconnection, the ``on_connect`` callback should have been called by the # connection to resubscribe us to any channels and patterns we were previously listening to ciSr>r?)rrrBsrCrz!PubSub._execute..sGT,V,rFc(j|Sr>)rYrs rCrz!PubSub._execute..s$88uErFN)rtr)rArrrrBs`````rCrUzPubSub._executes0ZZ// , E    s )757blockrcNK|j}| td|jd{|s'|j||j|d{sy|j||j d{}|j r||jk(ry|S7o7I7&w)z3Parse the response from a publish/subscribe commandNNpubsub connection not set: did you forget to call subscribe() or psubscribe()?)r)r~ RuntimeErrorrTrUcan_readrrqr7)rAr]rrr<s rCrzPubSub.parse_responses <F  !!!4==t}}g=#VVVtT-?-?@@  % %(d6P6P*P "V@s3-B%B'B%B!$B%<B#=#B%!B%#B%cK|j}| td|jrWtjj |j kDr'|jd|jdd{yyy7w)Nr_PINGF)rT) r~r`rqrrrnext_health_checkrr6)rArs rCrTzPubSub.check_healths <F   & &&&(--/$2H2HH##11$  I ' sA6B8B9Bdatac |jj}|jj}|jDcic]\}}||||c}}Scc}}w)z normalize channel/pattern names to be either bytes or strings based on whether responses are automatically decoded. this saves us from coercing the value for each message coming in. )rr8rrK)rArer8rrNrOs rC_normalize_keyszPubSub._normalize_keyssN $$$$15>Avay!1$>>>sArBclK|rt|df|ddn|}tj|}|j||jdg|j d{}|j |}|jj||jj||S7Mw)aE Subscribe to channel patterns. Patterns supplied as keyword arguments expect a pattern name as the key and a callable as the value. A pattern's callable will be invoked automatically when a message is received on that pattern rather than producing a message via ``listen()``. rrN PSUBSCRIBE) r$dictfromkeysr}rkeysrgr<r=difference_update)rArrB parsed_args new_patternsret_vals rCrMzPubSub.psubscribes=AlDG:tABx8d 6:mmK6P F#,,,\PLE--++GI,>E..w7!'***G$ V #)D,J,J+sEF  F!'F g?)exception_handler poll_timeoutrPSWorkerThreadExcHandlerTrc2K|jjD]\}}| td|d|jjD]\}}| td|d|j d{ |j d|d{t jdd{:7?7$#t j$rt$r6}||||}tj|r |d{7Yd}~od}~wwxYw7^w)aProcess pub/sub messages using registered callbacks. This is the equivalent of :py:meth:`redis.PubSub.run_in_thread` in redis-py, but it is a coroutine. To launch it as a separate task, use ``asyncio.create_task``: >>> task = asyncio.create_task(pubsub.run()) To shut it down, use asyncio cancellation: >>> task.cancel() >>> await task Nz Channel: 'z' has no handler registeredz Pattern: 'T)r3rr) r9rKr)r<r r|rCancelledError BaseExceptionrrr)rArrrrreress rCrunz PubSub.runs6&!% 3 3 5 U GW!Jwi7R"STT U!% 3 3 5 U GW!Jwi7R"STT Ulln &&.2L'--" " " ))   $,'40&&s+II   #sp#D2D#D<B<=DCB>CD6D7D>CD&D DD D DD)NFN)Tr)Fgr>)F).rHrIrJrrrr6rrrrrrrrrDrr rrrPpropertyrRr/rr rYrUrrrTr8rgr. PubSubHandlerrMr2rrLr1rr,r|rrzrr?rFrCrr[s4 ?2 %)*/ $'$SM$$( $@6 6y*.:.$44 R: R ,   $( ?O??h-* B( By BX* AI AmIL )- @E 5I5<BDH! *#$$?@*# *#  *#rFrceZdZdedefdZy)PubsubWorkerExceptionHandlerrrcyr>r?rArrs rCrDz%PubsubWorkerExceptionHandler.__call__rErFNrHrIrJrrrDr?rFrCrrs -  rFrceZdZdedefdZy)!AsyncPubsubWorkerExceptionHandlerrrc Kywr>r?rs rCrDz*AsyncPubsubWorkerExceptionHandler.__call__rNrONrr?rFrCrrs  v rFr.c peZdZdZhdZdedeeee fe fde de efdZ ded efd Zd Zd Zd ZdZdZdZdZdZd ededffdZdZdZdZdedefdZdedede fdZ dede!e"fdZ#de$de%d e!e&d d!fd"Z'ded#eee fffd$ Z(d%Z)d&ed'e$fd(Z*d.de fd)Z+d*Z,d+e-fd,Z.d-Z/xZ0S)/ra_ Pipelines provide a way to transmit multiple commands to the Redis server in one transmission. This is convenient for batch processing, such as saving all the values in a list to Redis. All commands executed within a pipeline are wrapped with MULTI and EXEC calls. This guarantees all commands executed in the pipeline will be executed atomically. Any command raising an exception does *not* halt the execution of subsequent commands in the pipeline. Instead, the exception is caught and its instance is placed into the response list returned by execute(). Code iterating over the response list should be able to deal with an instance of an exception as a potential value. In general, these will be ResponseError exceptions, such as those raised when issuing a command on a key of a different datatype. >EXECDISCARDUNWATCHrTrQrrc||_d|_||_||_||_d|_g|_t|_d|_ y)NF) rTr~rQis_transactionrwatching command_stackr:scriptsexplicit_transaction)rArTrQrrs rCrzPipeline.__init__sJ /"4)$ ,.&)e $)!rFrArcK|Swr>r?rs rCrzPipeline.__aenter__ r@rAc@K|jd{y7wr>rCrs rCrzPipeline.__aexit__#rrc>|jjSr>) _async_selfrrs rCrzPipeline.__await__&s!++--rFzUnclosed Pipeline clientc,t|jSr>)lenrrs rC__len__zPipeline.__len__+s4%%&&rFcy)z1Pipeline instances should always evaluate to TrueTr?rs rC__bool__zPipeline.__bool__.srFcK|Swr>r?rs rCrzPipeline._async_self2r@rAcKg|_t|_|jrR|jrF |jj dd{|jj d{d|_d|_ |jr5|jj|jd{d|_yy7v7V#t$r2|jr#|jjd{7YwxYw7Ow)NrF) rr:rrr~rrr'rrrTrrs rCrDzPipeline.reset5su  ==T__ 7oo229===oo33555  $)! ??&&..t? ? ?"DO >5" 7??//44666 7 @se/D C C!C 2C 3C 7AD 9D : D C C 2D=D>DD DD cn|jr td|jr tdd|_y)z Start a transactional block of the pipeline after WATCH commands are issued. End the transactional block with `execute`. z"Cannot issue nested calls to MULTIz:Commands without an initial WATCH have already been issuedTN)rr*rrs rCmultizPipeline.multiMs<  $ $AB B   O %)!rFc|js|ddk(r|js|j|i|S|j|i|S)NrWATCH)rrimmediate_execute_commandpipeline_execute_command)rArrBs rCrzPipeline.execute_commandZsN MMT!W/9R9R14114B6B B,t,,d=f==rFcK|jd{|jr#|jd{td|jrt |t s|jd{y7i7G7 w)z Close the connection, reset watching state and raise an exception if we were watching, retry_on_timeout is not set, or the error is not a TimeoutError N=A ConnectionError occurred on while watching one or more keys)rrrDr-rfrr,rs rC_disconnect_reset_raisez Pipeline._disconnect_reset_raiseas}oo ==**,  R  %%*UL*I**,   +J  s1BB#BB>B9B:BBBc Kdjs5jjjd{_jj fdfdd{S7:7w)z Execute a command immediately, but don't auto-retry on a ConnectionError if we're already WATCHing a variable. Used when issuing WATCH or subsequent commands retrieving their values but before MULTI is called. rNc2jgiSr>rrsrCrz4Pipeline.immediate_execute_command..s*5D55l%)-4rFc(j|Sr>)rrs rCrz4Pipeline.immediate_execute_command..s$66tUCrF)r~rTrrrtr)rArrrrs```@@rCrz"Pipeline.immediate_execute_commandws}Aw --<<dooD#DOZZ//  D      s#=BA?5B:B;BBc@|jj||f|S)ar Stage a command to be executed when execute() is next called Returns the current Pipeline object back so commands can be chained together, such as: pipe = pipe.set('foo', 'bar').incr('baz').decr('bang') At some other point, you can then run: pipe.execute(), which will execute all commands queued in the pipe. )rrz)rArrs rCrz!Pipeline.pipeline_execute_commands! !!4/2 rFr~commandscKdif}dif}|g||}|jd|D}|j|d{g} |j|dd{t |D]I\} } t | dvr|j | | dt f/ |j|dd{K |j|dd{} d|_ | tdd|D]\} } | j| | t| t|k7r:|jr"|jjd{td d|r|j|| g}t!| |D]s\}}t#|t$sM|\}}|d}||j&vr5|j&||fi|}t)j*|r |d{}|j |u|S77#t$r} |j d| fYd} ~ d} ~ wwxYw7n#t$r7} |j| | dz| d|j | | fYd} ~ d} ~ wwxYw7#t$r} |r |dd| d} ~ wwxYw7A7w) N)MULTI)rc38K|]\}}t|vs|ywr>)r).0rrs rC z0Pipeline._execute_transaction..s , "T7nG.KD, srrrFzWatched variable changed.z6Wrong number of response items from pipeline execution) pack_commandssend_packed_commandrr+rz enumeraterannotate_exceptionr(rr-insertrr~rraise_first_errorziprrrQrr)rAr~rraise_on_errorprepostcmdsall_cmdserrorserrirr<rrercmdrrrs rC_execute_transactionzPipeline._execute_transactions$R(#R%h%%++, &*,  ,,X666  $%%j#6 6 6 $H- ,JAw+ q'!*^"<=>,--j#>>>  , !00SAAH  89t C "DAq OOAq ! " x=CM )oo00222H    " "8X 6(H- FAsa+ # g#Aw 4#:#::=// =aK7KA**1-"#G KKN   7 7 $ MM1c( # # $?$,++CQ CMM1c(++, B Qil+  "3$$s=J G/J G5G2G5!:J H"1H2H"6J 9I(I%I(A1J JBJ JJ 2G55 H>HJ HJ H"" I"+,IJ I""J %I(( J1 I==JJ J rcK|j|Dcgc]\}}| c}}}|j|d{g}|D]4\}} |j|j||dfi|d{6|r|j |||Scc}}w7[7(#t$r} |j| Yd} ~ vd} ~ wwxYww)Nr)rrrzrr+r) rAr~rrrrrr<rrs rC_execute_pipelinezPipeline._execute_pipelines++,JgdAT,JK,,X666% #MD' #-$--j$q'MWMM #   " "8X 6-K6 N  #"" #sUC B CBC %B0B 1B9CB B?$B:5C:B??Cr<ct|D]3\}}t|ts|j||dz||d|y)Nrr)rrr+r)rArr<rrs rCrzPipeline.raise_first_errorsGh' DAq!]+''1q5(1+a.A rF exceptionnumberrNcdjtt|}d|d|d|j}|f|jddz|_y)Nrz Command # z (z) of pipeline caused error: r)rmapr1r)rArrrrmsgs rCrzPipeline.annotate_exceptionsPhhs8W-.6("SE)EinnEUV).."44 rFrcKt|||fi|d{}||jvr d|_|S|dk(rd|_|S7)w)NFrT)superrUNWATCH_COMMANDSr)rAr~rrresultrs rCrzPipeline.parse_responsesZw-j,R'RR 400 0!DM W $ DM SsAA*Ac:Kt|j}|j}|Dcgc]}|j}}|dg|d{}t |s8t ||D](\}}|r |d|j d{|_*yycc}w7M7w)Nz SCRIPT EXISTSz SCRIPT LOAD)rrrshaallrscript)rAr immediatesshasexistsexists rC load_scriptszPipeline.load_scriptsst||$22 &'!''!84886{0 E5"+M188"DDAE E (9Es3&BBB B #B.BBBBrrcK|jd{|jr td|jrt |t s|j d{y7Q7w)z Close the connection, raise an exception if we were watching, and raise an exception if retry_on_timeout is not set, or the error is not a TimeoutError Nr)rrr-rfrr,rDrs rC_disconnect_raise_resetz Pipeline._disconnect_raise_resetsh oo ==R  %%*UL*I**,   +J  s"A,A(A A,!A*"A,*A,cvKjsjsgSjrjd{js j r j n jjs5jjdjd{_tt jjfdfdd{j!d{S77d7 7 #j!d{7wxYww)z0Execute all the commands in the current pipelineNrcSr>r?)rrrstacksrCrz"Pipeline.execute..Ise^<rFc(j|Sr>)rrs rCrz"Pipeline.execute..Jsd::4GrF)rrrrrrrrr~rTrrrrrtrrD)rArrrrs``@@@rCrzPipeline.execute3s""T]]I <<##% % %   $";";//G,,G--<O+OP* * SM *"w7..L'#0 )> z9Z00 1>, . H$H0=HT$0=OS(-8C= 5"5,/5:B6:J5 5$49#u*4E E*Y(D8. ;$; OrFr)Vrr{rr-rtypingrrrrrrr r r r r rrrrrrrredis.asyncio.connectionrrrrredis.asyncio.lockrredis.asyncio.retryr redis.clientrrrrr redis.commandsr!r"r#r$ redis.compatr%r&redis.exceptionsr'r(r)r*r+r,r- redis.typingr.r/r0 redis.utilsr1r2rrr3r5r6rr8redis.commands.corer9r;rLrr7 StrictRedisrrrrrrrCommandTrrr?rFrCr s  * $% -43.$sCx.)9T?:; t$z* )7 ++78V;K3LM* x  H 24QQRp+->@Upf  F,F,RI#I#X  8   " "CC uS%Z(#-.S0AA BX cOucOrF