
    gm                         d Z ddlZddlZddlZddlZddlZddlZddlm	Z	  ej                  e      ZdZ G d de      Z G d de      Z G d	 d
e      Zd Z G d de      Z G d de      Zy)z%Bi-directional streaming RPC helpers.    N)
exceptionsz!Thread-ConsumeBidirectionalStreamc                   $    e Zd ZdZddZd Zd Zy)_RequestQueueGeneratora1
  A helper for sending requests to a gRPC stream from a Queue.

    This generator takes requests off a given queue and yields them to gRPC.

    This helper is useful when you have an indeterminate, indefinite, or
    otherwise open-ended set of requests to send through a request-streaming
    (or bidirectional) RPC.

    The reason this is necessary is because gRPC takes an iterator as the
    request for request-streaming RPCs. gRPC consumes this iterator in another
    thread to allow it to block while generating requests for the stream.
    However, if the generator blocks indefinitely gRPC will not be able to
    clean up the thread as it'll be blocked on `next(iterator)` and not be able
    to check the channel status to stop iterating. This helper mitigates that
    by waiting on the queue with a timeout and checking the RPC state before
    yielding.

    Finally, it allows for retrying without swapping queues because if it does
    pull an item off the queue when the RPC is inactive, it'll immediately put
    it back and then exit. This is necessary because yielding the item in this
    case will cause gRPC to discard it. In practice, this means that the order
    of messages is not guaranteed. If such a thing is necessary it would be
    easy to use a priority queue.

    Example::

        requests = request_queue_generator(q)
        call = stub.StreamingRequest(iter(requests))
        requests.call = call

        for response in call:
            print(response)
            q.put(...)

    Note that it is possible to accomplish this behavior without "spinning"
    (using a queue timeout). One possible way would be to use more threads to
    multiplex the grpc end event with the queue, another possible way is to
    use selectors and a custom event/queue object. Both of these approaches
    are significant from an engineering perspective for small benefit - the
    CPU consumed by spinning is pretty minuscule.

    Args:
        queue (queue_module.Queue): The request queue.
        period (float): The number of seconds to wait for items from the queue
            before checking if the RPC is cancelled. In practice, this
            determines the maximum amount of time the request consumption
            thread will live after the RPC is cancelled.
        initial_request (Union[protobuf.Message,
                Callable[None, protobuf.Message]]): The initial request to
            yield. This is done independently of the request queue to allow fo
            easily restarting streams that require some initial configuration
            request.
    Nc                 <    || _         || _        || _        d | _        y N)_queue_period_initial_requestcall)selfqueueperiodinitial_requests       q/var/www/html/FastMealFinder_FlaskServer-InitialRelease/venv/lib/python3.12/site-packages/google/api_core/bidi.py__init__z_RequestQueueGenerator.__init__U   s     /	    c                 V    | j                   d u xs | j                   j                         S r   r   	is_activer   s    r   
_is_activez!_RequestQueueGenerator._is_active[   s%    
 yyD 9DII$7$7$99r   c              #     K   | j                   6t        | j                         r| j                          n| j                    	 	 | j                  j                  | j                        }|t        j                  d       y | j                         s1| j                  j                  |       t        j                  d       y | # t
        j                  $ r* | j                         st        j                  d       Y y Y w xY ww)N)timeoutz9Empty queue and inactive call, exiting request generator.z"Cleanly exiting request generator.zEInactive call, replacing item on queue and exiting request generator.)r
   callabler   getr	   queue_moduleEmptyr   _LOGGERdebugput)r   items     r   __iter__z_RequestQueueGenerator.__iter__b   s       ,--.++--+++
{{t||< |BC??$ %) J;   %% (MMV  s+   AD&C -AD8DDDD)   N)__name__
__module____qualname____doc__r   r   r"    r   r   r   r      s    4l:$r   r   c                   (    e Zd ZdZd Zd Zd Zd Zy)	_Throttlea  A context manager limiting the total entries in a sliding time window.

    If more than ``access_limit`` attempts are made to enter the context manager
    instance in the last ``time window`` interval, the exceeding requests block
    until enough time elapses.

    The context manager instances are thread-safe and can be shared between
    multiple threads. If multiple requests are blocked and waiting to enter,
    the exact order in which they are allowed to proceed is not determined.

    Example::

        max_three_per_second = _Throttle(
            access_limit=3, time_window=datetime.timedelta(seconds=1)
        )

        for i in range(5):
            with max_three_per_second as time_waited:
                print("{}: Waited {} seconds to enter".format(i, time_waited))

    Args:
        access_limit (int): the maximum number of entries allowed in the time window
        time_window (datetime.timedelta): the width of the sliding time window
    c                     |dk  rt        d      |t        j                  d      k  rt        d      || _        || _        t        j                  |      | _        t        j                         | _
        y )Nr#   z&access_limit argument must be positiver   z1time_window argument must be a positive timedelta)maxlen)
ValueErrordatetime	timedelta_time_window_access_limitcollectionsdeque_past_entries	threadingLock_entry_lock)r   access_limittime_windows      r   r   z_Throttle.__init__   sl    !EFF(,,Q//PQQ')(..
 %>>+r   c                    | j                   5  t        j                  j                         | j                  z
  }| j                  rK| j                  d   |k  r9| j                  j                          | j                  r| j                  d   |k  r9t        | j                        | j                  k  rA| j                  j                  t        j                  j                                	 d d d        y| j                  d   |z
  j                         }t        j                  |       | j                  j                  t        j                  j                                |cd d d        S # 1 sw Y   y xY w)Nr   g        )r7   r.   nowr0   r4   popleftlenr1   appendtotal_secondstimesleep)r   cutoff_timeto_waits      r   	__enter__z_Throttle.__enter__   s(    	"++//1D4E4EEK $$););A)>)L""**, $$););A)>)L 4%%&););;""))(*;*;*?*?*AB	 	 ))!,{:IIKGJJw%%h&7&7&;&;&=>	 	 	s   BE*AE*3A-E**E3c                      y r   r(   )r   _s     r   __exit__z_Throttle.__exit__   s    r   c                     dj                  | j                  j                  | j                  t	        | j
                              S )Nz#{}(access_limit={}, time_window={}))format	__class__r$   r1   reprr0   r   s    r   __repr__z_Throttle.__repr__   s7    4;;NN##T%7%7d>O>O9P
 	
r   N)r$   r%   r&   r'   r   rD   rG   rL   r(   r   r   r*   r*      s    2,$
r   r*   c                   \    e Zd ZdZddZd Zd Zd Zd Zd Z	d	 Z
ed
        Zed        Zy)BidiRpca#  A helper for consuming a bi-directional streaming RPC.

    This maps gRPC's built-in interface which uses a request iterator and a
    response iterator into a socket-like :func:`send` and :func:`recv`. This
    is a more useful pattern for long-running or asymmetric streams (streams
    where there is not a direct correlation between the requests and
    responses).

    Example::

        initial_request = example_pb2.StreamingRpcRequest(
            setting='example')
        rpc = BidiRpc(
            stub.StreamingRpc,
            initial_request=initial_request,
            metadata=[('name', 'value')]
        )

        rpc.open()

        while rpc.is_active():
            print(rpc.recv())
            rpc.send(example_pb2.StreamingRpcRequest(
                data='example'))

    This does *not* retry the stream on errors. See :class:`ResumableBidiRpc`.

    Args:
        start_rpc (grpc.StreamStreamMultiCallable): The gRPC method used to
            start the RPC.
        initial_request (Union[protobuf.Message,
                Callable[None, protobuf.Message]]): The initial request to
            yield. This is useful if an initial request is needed to start the
            stream.
        metadata (Sequence[Tuple(str, str)]): RPC metadata to include in
            the request.
    Nc                     || _         || _        || _        t        j                         | _        d | _        d| _        g | _        d | _	        y NF)

_start_rpcr
   _rpc_metadatar   Queue_request_queue_request_generatorr   
_callbacksr   )r   	start_rpcr   metadatas       r   r   zBidiRpc.__init__   sG    # /%*002"&	r   c                 :    | j                   j                  |       y)a{  Adds a callback that will be called when the RPC terminates.

        This occurs when the RPC errors or is successfully terminated.

        Args:
            callback (Callable[[grpc.Future], None]): The callback to execute.
                It will be provided with the same gRPC future as the underlying
                stream which will also be a :class:`grpc.Call`.
        N)rV   r>   )r   callbacks     r   add_done_callbackzBidiRpc.add_done_callback   s     	x(r   c                 6    | j                   D ]
  } ||        y r   )rV   )r   futurerZ   s      r   _on_call_donezBidiRpc._on_call_done	  s    
  	HV	r   c                    | j                   rt        d      t        | j                  | j                        }	 | j                  t        |      | j                        }||_        t        |d      r&|j                  j                  | j                         n|j                  | j                         || _        || _        y# t        j                  $ r!}| j                  |j                          d}~ww xY w)zOpens the stream.z$Can not open an already open stream.)r   )rX   N_wrapped)r   r-   r   rT   r
   rQ   iterrR   r   GoogleAPICallErrorr^   responser   hasattrr`   r[   rU   )r   request_generatorr   excs       r   openzBidiRpc.open  s    >>CDD21F1F
	??4(9#:TEWEW?XD "& 4$MM++D,>,>?""4#5#56"3	! ,, 	 s||,		s   &C C7C22C7c                     | j                   y| j                  j                  d       | j                   j                          d| _        d| _        g | _        y)zCloses the stream.N)r   rT   r    cancelrU   r
   rV   r   s    r   closezBidiRpc.close-  sJ    99%		"& $r   c                     | j                   t        d      | j                   j                         r| j                  j	                  |       yt        | j                          y)zQueue a message to be sent on the stream.

        Send is non-blocking.

        If the underlying RPC has been closed, this will raise.

        Args:
            request (protobuf.Message): The request to send.
        N6Can not send() on an RPC that has never been open()ed.)r   r-   r   rT   r    nextr   requests     r   sendzBidiRpc.send:  sM     99UVV 99 ##G, Or   c                 Z    | j                   t        d      t        | j                         S )zWait for a message to be returned from the stream.

        Recv is blocking.

        If the underlying RPC has been closed, this will raise.

        Returns:
            protobuf.Message: The received message.
        6Can not recv() on an RPC that has never been open()ed.)r   r-   rm   r   s    r   recvzBidiRpc.recvO  s(     99UVVDIIr   c                 V    | j                   duxr | j                   j                         S z7bool: True if this stream is currently open and active.Nr   r   s    r   r   zBidiRpc.is_active^  s%     yy$>)<)<)>>r   c                 6    | j                   j                         S )z:int: Returns an estimate of the number of queued requests.)rT   qsizer   s    r   pending_requestszBidiRpc.pending_requestsc  s     ""((**r   )NN)r$   r%   r&   r'   r   r[   r^   rg   rj   rp   rs   propertyr   rx   r(   r   r   rN   rN      sS    $L
)8	* ? ? + +r   rN   c                      y)z-By default, no errors cause BiDi termination.Fr(   )future_or_errors    r   _never_terminater|   i  s    r   c                   v     e Zd ZdZedddf fd	Zd Zd Zd Zd Z	d	 Z
d
 Zd Zd Z fdZed        Z xZS )ResumableBidiRpca5  A :class:`BidiRpc` that can automatically resume the stream on errors.

    It uses the ``should_recover`` arg to determine if it should re-establish
    the stream on error.

    Example::

        def should_recover(exc):
            return (
                isinstance(exc, grpc.RpcError) and
                exc.code() == grpc.StatusCode.UNAVAILABLE)

        initial_request = example_pb2.StreamingRpcRequest(
            setting='example')

        metadata = [('header_name', 'value')]

        rpc = ResumableBidiRpc(
            stub.StreamingRpc,
            should_recover=should_recover,
            initial_request=initial_request,
            metadata=metadata
        )

        rpc.open()

        while rpc.is_active():
            print(rpc.recv())
            rpc.send(example_pb2.StreamingRpcRequest(
                data='example'))

    Args:
        start_rpc (grpc.StreamStreamMultiCallable): The gRPC method used to
            start the RPC.
        initial_request (Union[protobuf.Message,
                Callable[None, protobuf.Message]]): The initial request to
            yield. This is useful if an initial request is needed to start the
            stream.
        should_recover (Callable[[Exception], bool]): A function that returns
            True if the stream should be recovered. This will be called
            whenever an error is encountered on the stream.
        should_terminate (Callable[[Exception], bool]): A function that returns
            True if the stream should be terminated. This will be called
            whenever an error is encountered on the stream.
        metadata Sequence[Tuple(str, str)]: RPC metadata to include in
            the request.
        throttle_reopen (bool): If ``True``, throttling will be applied to
            stream reopen calls. Defaults to ``False``.
    NFc                    t         t        |   |||       || _        || _        t        j                         | _        d| _        t        j                         | _
        |r't        dt        j                  d            | _        y d | _        y )NF   
   )seconds)r8   r9   )superr~   r   _should_recover_should_terminater5   RLock_operational_lock
_finalizedr6   _finalize_lockr*   r.   r/   _reopen_throttle)r   rW   should_recovershould_terminater   rX   throttle_reopenrJ   s          r   r   zResumableBidiRpc.__init__  sx     	.y/8T-!1!*!2'nn.$-H,>,>r,J%D! %)D!r   c                     | j                   5  | j                  r
	 d d d        y | j                  D ]
  } ||        d| _        d d d        y # 1 sw Y   y xY w)NT)r   r   rV   )r   resultrZ   s      r   	_finalizezResumableBidiRpc._finalize  sZ       	#	# 	# !OO ! ! #DO	# 	# 	#s   A AAc                    | j                   5  | j                  |      r| j                  |       nH| j                  |      s| j                  |       n%t        j                  d       | j                          d d d        y # 1 sw Y   y xY w)Nz%Re-opening stream from gRPC callback.)r   r   r   r   r   r   _reopenr   r]   s     r   r^   zResumableBidiRpc._on_call_done  sl     ## 	%%f-v&))&1v&EF	 	 	s   A,BBc                 &   | j                   5  | j                  9| j                  j                         rt        j	                  d       	 d d d        y d | _        d | _        	 | j                  r&| j                  5  | j                          d d d        n| j                          t        j                  d       d d d        y # 1 sw Y   'xY w# t        $ r-}t        j	                  d|       | j                  |        d }~ww xY w# 1 sw Y   y xY w)Nz"Stream was already re-established.z"Failed to re-open stream due to %szRe-established stream)r   r   r   r   r   rU   r   rg   	Exceptionr   info)r   rf   s     r   r   zResumableBidiRpc._reopen  s    ##  	2yy$)<)<)>BC	 	2  	2 DI '+D#((.. $		$ $ IIK LL01A 	2  	2*$ $  BCHs#7 	2  	2sM   =DD"C:CC$DC	C	D(C??DDDc                 J   	 	  ||i |S # t         $ r}| j                  5  t        j                  d||       | j	                  |      rF| j                          t        j                  d||       | j                  |       	 ddd       Y d}~y| j                  |      s:| j                          t        j                  d||       | j                  |       |t        j                  d|       | j                          ddd       n# 1 sw Y   nxY wY d}~nd}~ww xY w#)aW  Wraps a method to recover the stream and retry on error.

        If a retryable error occurs while making the call, then the stream will
        be re-opened and the method will be retried. This happens indefinitely
        so long as the error is a retryable one. If an error occurs while
        re-opening the stream, then this method will raise immediately and
        trigger finalization of this object.

        Args:
            method (Callable[..., Any]): The method to call.
            args: The args to pass to the method.
            kwargs: The kwargs to pass to the method.
        zCall to retryable %r caused %s.zTerminating %r due to %s.NzNot retrying %r due to %s.z$Re-opening stream from retryable %r.)	r   r   r   r   r   rj   r   r   r   )r   methodargskwargsrf   s        r   _recoverablezResumableBidiRpc._recoverable  s     #t.v.. #++ #MM"CVSQ--c2

&A63Os+# #  //4

&BFCPs+!	MM"H&QLLN!# # ##	 s9    
D DA"D
DA1D
	D
D	DD c                     | j                   5  | j                  }d d d        t        d      |j                         r| j                  j                  |       y t        |       y # 1 sw Y   NxY w)Nrl   )r   r   r-   r   rT   r    rm   )r   ro   r   s      r   _sendzResumableBidiRpc._send  sj     ## 	99D	 <UVV >>##G, J	 	s   A''A0c                 :    | j                  | j                  |      S r   )r   r   rn   s     r   rp   zResumableBidiRpc.send.  s      W55r   c                     | j                   5  | j                  }d d d        t        d      t        |      S # 1 sw Y   !xY w)Nrr   )r   r   r-   rm   )r   r   s     r   _recvzResumableBidiRpc._recv1  sF    ## 	99D	 <UVVDz	 	s	   :Ac                 8    | j                  | j                        S r   )r   r   r   s    r   rs   zResumableBidiRpc.recv:  s      ,,r   c                 L    | j                  d        t        t        |           y r   )r   r   r~   rj   )r   rJ   s    r   rj   zResumableBidiRpc.close=  s    t+-r   c                     | j                   5  | j                  duxr | j                   cddd       S # 1 sw Y   yxY wru   )r   r   r   r   s    r   r   zResumableBidiRpc.is_activeA  s>     ## 	A99D(@-@	A 	A 	As   4=)r$   r%   r&   r'   r|   r   r   r^   r   r   r   rp   r   rs   rj   ry   r   __classcell__)rJ   s   @r   r~   r~   n  sa    0l *).#!2F##J.6-. 
A 
Ar   r~   c                   Z    e Zd ZdZd Zd Zd Zd Zd Ze	d        Z
d Zd	 Ze	d
        Zy)BackgroundConsumera  A bi-directional stream consumer that runs in a separate thread.

    This maps the consumption of a stream into a callback-based model. It also
    provides :func:`pause` and :func:`resume` to allow for flow-control.

    Example::

        def should_recover(exc):
            return (
                isinstance(exc, grpc.RpcError) and
                exc.code() == grpc.StatusCode.UNAVAILABLE)

        initial_request = example_pb2.StreamingRpcRequest(
            setting='example')

        rpc = ResumeableBidiRpc(
            stub.StreamingRpc,
            initial_request=initial_request,
            should_recover=should_recover)

        def on_response(response):
            print(response)

        consumer = BackgroundConsumer(rpc, on_response)
        consumer.start()

    Note that error handling *must* be done by using the provided
    ``bidi_rpc``'s ``add_done_callback``. This helper will automatically exit
    whenever the RPC itself exits and will not provide any error details.

    Args:
        bidi_rpc (BidiRpc): The RPC to consume. Should not have been
            ``open()``ed yet.
        on_response (Callable[[protobuf.Message], None]): The callback to
            be called for every response on the stream.
    c                     || _         || _        d| _        t        j                         | _        d | _        t        j                         | _        y rP   )		_bidi_rpc_on_response_pausedr5   	Condition_wake_threadr6   r   )r   bidi_rpcon_responses      r   r   zBackgroundConsumer.__init__u  s>    !'((*
!*!1r   c                 $    | j                          y r   )resumer   s     r   r^   z BackgroundConsumer._on_call_done}  s     	r   c                    	 |j                          | j                  j                  | j                         | j                  j	                          | j                  j
                  r| j                  5  | j                  rQt        j                  d       | j                  j                          t        j                  d       | j                  rQd d d        t        j                  d       | j                  j                         }t        j                  d       | j                  |       | j                  j
                  rt        j%                  d	t               y # 1 sw Y   xY w# t        j                  $ r'}t        j                  dt        |d       Y d }~\d }~wt         $ r%}t        j#                  dt        |       Y d }~d }~ww xY w)
Nzpaused, waiting for waking.zwoken.zwaiting for recv.zrecved response.z%s caught error %s and will exit. Generally this is due to the RPC itself being cancelled and the error will be surfaced to the calling code.T)exc_infoz0%s caught unexpected exception %s and will exit.z
%s exiting)setr   r[   r^   rg   r   r   r   r   r   waitrs   r   r   rb   _BIDIRECTIONAL_CONSUMER_NAMEr   	exceptionr   )r   readyrc   rf   s       r   _thread_mainzBackgroundConsumer._thread_main  s[   )	IIKNN,,T-?-?@NN!..** ZZ 0,,&CD

)h/ ,,0 12>>..001!!(+' ..**L 	\#?@90 0 ,, 	MM0 -     	B, 	s>   A1E, 3AE A3E,  E)%E, ,G?F!!G-GGc                 X   | j                   5  t        j                         }t        j                  t        | j
                  |f      }d|_        |j                          |j                          || _	        t        j                  d|j                         ddd       y# 1 sw Y   yxY w)z;Start the background thread and begin consuming the thread.)nametargetr   TzStarted helper thread %sN)r   r5   EventThreadr   r   daemonstartr   r   r   r   r   )r   r   threads      r   r   zBackgroundConsumer.start  s    ## 	COO%E%%1((XF
 !FMLLN
 JJL!DLMM4fkkB	C 	C 	Cs   B
B  B)c                 b   | j                   5  | j                  j                          | j                  Z| j	                          | j                  j                  d       | j                  j                         rt        j                  d       d| _        d| _	        ddd       y# 1 sw Y   yxY w)z=Stop consuming the stream and shutdown the background thread.Ng      ?zBackground thread did not exit.)
r   r   rj   r   r   joinis_aliver   warningr   r   s    r   stopzBackgroundConsumer.stop  s    ## 	%NN  "||' !!#&<<((*OO$EFDL $D	% 	% 	%s   BB%%B.c                 V    | j                   duxr | j                   j                         S )z.bool: True if the background thread is active.N)r   r   r   s    r   r   zBackgroundConsumer.is_active  s%     ||4'CDLL,A,A,CCr   c                 T    | j                   5  d| _        ddd       y# 1 sw Y   yxY w)zWPauses the response stream.

        This does *not* pause the request stream.
        TN)r   r   r   s    r   pausezBackgroundConsumer.pause  s'    
 ZZ 	 DL	  	  	 s   'c                     | j                   5  d| _        | j                   j                          ddd       y# 1 sw Y   yxY w)zResumes the response stream.FN)r   r   
notify_allr   s    r   r   zBackgroundConsumer.resume  s5    ZZ 	$ DLJJ!!#	$ 	$ 	$s	   "8Ac                     | j                   S )z,bool: True if the response stream is paused.)r   r   s    r   	is_pausedzBackgroundConsumer.is_paused  s     ||r   N)r$   r%   r&   r'   r   r^   r   r   r   ry   r   r   r   r   r(   r   r   r   r   O  sX    #J2
,A\C&%" D D $  r   r   )r'   r2   r.   loggingr   r   r5   r@   google.api_corer   	getLoggerr$   r   r   objectr   r*   rN   r|   r~   r   r(   r   r   <module>r      s    ,       &
'

H
%B hV hV@
 @
FZ+f Z+z
^Aw ^AB[ [r   