o
    M,h4                     @   s   d dl Z d dlZd dlmZmZmZ d dlZd dlZd dlZd dl	m
Z
mZmZmZ 	 G dd dZG dd dZG dd	 d	ZG d
d dejZedkrSe  dS dS )    N)Mockpatch	MagicMock)
DispatcherDispatcherBaseSSLDispatcherWrappedDispatcherc                   @   s   e Zd ZdZdd ZdS )MockAppzMock WebSocketApp for testingc                 C   s   d| _ t | _t | j_d S NT)keep_runningr   sockself r   k/var/www/www-root/data/www/bot.pdev.uz/venv/lib/python3.10/site-packages/websocket/tests/test_dispatcher.py__init__&   s   zMockApp.__init__N)__name__
__module____qualname____doc__r   r   r   r   r   r	   #   s    r	   c                   @   s    e Zd ZdZdd Zdd ZdS )
MockSocketzMock socket for testingc                 C   s
   d| _ d S NFpending_returnr   r   r   r   r   /   s   
zMockSocket.__init__c                 C   s   | j S Nr   r   r   r   r   pending2   s   zMockSocket.pendingN)r   r   r   r   r   r   r   r   r   r   r   ,   s    r   c                   @   s@   e Zd ZdZdd Zdd Zdd Zdd	 Zd
d Zdd Z	dS )MockDispatcherz6Mock external dispatcher for WrappedDispatcher testingc                 C   s"   g | _ g | _g | _g | _g | _d S r   )signal_callsabort_calls
read_callsbuffwrite_callstimeout_callsr   r   r   r   r   9   s
   
zMockDispatcher.__init__c                 C      | j ||f d S r   )r   append)r   sighandlerr   r   r   signal@      zMockDispatcher.signalc                 C   s   | j d d S r
   )r   r#   r   r   r   r   abortC   s   zMockDispatcher.abortc                 C   r"   r   )r   r#   )r   r   callbackr   r   r   readF   r'   zMockDispatcher.readc                 C   s   | j ||||f d S r   )r    r#   )r   r   data	send_funcdisconnect_handlerr   r   r   	buffwriteI   s   zMockDispatcher.buffwritec                 G   s   | j |||f d S r   )r!   r#   )r   secondsr)   argsr   r   r   timeoutL   s   zMockDispatcher.timeoutN)
r   r   r   r   r   r&   r(   r*   r.   r1   r   r   r   r   r   6   s    r   c                   @   s   e Zd Zdd Zdd Zdd Zdd Zd	d
 Zdd Zdd Z	dd Z
dd Zdd Zdd Zdd Zdd Zdd Zdd Zdd  Zd!d" Zd#S )$DispatcherTestc                 C   s   t  | _d S r   )r	   appr   r   r   r   setUpQ   s   zDispatcherTest.setUpc                 C   s.   t | jd}| |j| j | |jd dS )z"Test DispatcherBase initialization      >@N)r   r3   assertEqualping_timeout)r   
dispatcherr   r   r   test_dispatcher_base_initT   s   z(DispatcherTest.test_dispatcher_base_initc                 C   sf   t | jd}t }|d| |  |  t }|d| t | }|  | |d dS )z"Test DispatcherBase timeout methodr5   Ng?g?)r   r3   r   r1   assert_called_once
reset_mocktimeassertGreaterEqual)r   r8   r)   
start_timeelapsedr   r   r   test_dispatcher_base_timeout[   s   z+DispatcherTest.test_dispatcher_base_timeoutc                 C   sx   t | jd}t }|d| |jdd |  td|_| t |d| W d   dS 1 s5w   Y  dS )z$Test DispatcherBase reconnect methodr5      T)reconnectingzUser interruptedN)	r   r3   r   	reconnectassert_called_once_withr;   KeyboardInterruptside_effectassertRaises)r   r8   reconnectorr   r   r   test_dispatcher_base_reconnectm   s   
"z-DispatcherTest.test_dispatcher_base_reconnectc                 C   sv   t | jd}t }d}td"}t||_|||}||| | |t| W d   dS 1 s4w   Y  dS )zTest DispatcherBase send methodr5   	   test datawebsocket._dispatcher.sendN)	r   r3   r   r   lenreturn_valuesendrD   r6   )r   r8   	mock_sock	test_data	mock_sendresultr   r   r   test_dispatcher_base_send}   s   

"z(DispatcherTest.test_dispatcher_base_sendc                    s   t  jd}tdd}t }t }td8}t }||_g |j_ fdd}||j_|||| |j	  |j
d |j	  |	  W d   dS 1 sPw   Y  dS )zTest Dispatcher read method      @TrM   selectors.DefaultSelectorc                        d j _g S r   r3   r   r0   r   r   r   rF         z8DispatcherTest.test_dispatcher_read.<locals>.side_effectN)r   r3   r   r   rM   selectrF   r*   registerassert_calledassert_called_withclose)r   r8   read_callbackcheck_callbackrO   mock_selector_classmock_selectorrF   r   r   r   test_dispatcher_read   s    




"z#DispatcherTest.test_dispatcher_readc                    s   t jd}tdd}t }t }td+}t }||_d  fdd}||j_|||| |  |  W d   dS 1 sCw   Y  dS )	z2Test Dispatcher read method when data is availablerT   TrU   rV   r   c                     s"    d7   dkrdgS dj _g S )NrA   TFrX   rY   
call_countr   r   r   select_side_effect   s
   zIDispatcherTest.test_dispatcher_read_with_data.<locals>.select_side_effectN)	r   r3   r   r   rM   r[   rF   r*   r]   )r   r8   r`   ra   rO   rb   rc   rg   r   re   r   test_dispatcher_read_with_data   s   

	
"z-DispatcherTest.test_dispatcher_read_with_datac                    s   t  jd}tdd}t }t }| jj_td-}t }||_g |j_ fdd}||j_|	d|| |j
  |  W d   dS 1 sJw   Y  dS )zTest SSLDispatcher read methodrT   TrU   rV   c                     rW   r   rX   rY   r   r   r   rF      rZ   z<DispatcherTest.test_ssl_dispatcher_read.<locals>.side_effectN)r   r3   r   r   r   r   rM   r[   rF   r*   r\   r]   )r   r8   r`   ra   mock_ssl_sockrb   rc   rF   r   r   r   test_ssl_dispatcher_read   s   




"z'DispatcherTest.test_ssl_dispatcher_readc                 C   sF   t | jd}t }d|_|| jj_t }|d|}| ||g dS )z2Test SSLDispatcher select method with pending datarT   TN)r   r3   r   r   r   r   r[   r6   r   r8   ri   rc   rR   r   r   r   'test_ssl_dispatcher_select_with_pending   s   
z6DispatcherTest.test_ssl_dispatcher_select_with_pendingc                 C   s^   t | jd}t }d|_|| jj_t }|dfg|j_|d|}| || |j	d dS )z5Test SSLDispatcher select method without pending datarT   FN)
r   r3   r   r   r   r   r[   rM   r6   r^   rk   r   r   r   *test_ssl_dispatcher_select_without_pending   s   
z9DispatcherTest.test_ssl_dispatcher_select_without_pendingc                 C   sJ   t | jd}t }d|_|| jj_t }g |j_|d|}| | dS )z0Test SSLDispatcher select method with no resultsrT   FN)	r   r3   r   r   r   r   r[   rM   assertIsNonerk   r   r   r   %test_ssl_dispatcher_select_no_results   s   
z4DispatcherTest.test_ssl_dispatcher_select_no_resultsc                 C   s   t  }t }t| jd||}| |j| j | |jd | |j| | |j| | t|j	d |j	d \}}| |d | ||j
 dS )z%Test WrappedDispatcher initialization      $@rA   r      N)r   r   r   r3   r6   r7   r8   handleDisconnectrL   r   r(   )r   mock_dispatcherhandle_disconnectwrappedr$   r%   r   r   r   test_wrapped_dispatcher_init  s   z+DispatcherTest.test_wrapped_dispatcher_initc                 C   s   t  }t }t| jd||}t }t }t }|||| | t|jd | |jd ||f | t|jd |jd }| |d d | |d | dS )z"Test WrappedDispatcher read methodrp   rA   r   N	r   r   r   r3   r*   r6   rL   r   r!   )r   rs   rt   ru   rO   r`   ra   timeout_callr   r   r   test_wrapped_dispatcher_read  s   
z+DispatcherTest.test_wrapped_dispatcher_readc                 C   sd   t  }t }t| jd||}t }t }t }|||| | t|jd | t|jd dS )z7Test WrappedDispatcher read method without ping timeoutNrA   r   rw   )r   rs   rt   ru   rO   r`   ra   r   r   r   ,test_wrapped_dispatcher_read_no_ping_timeout3  s   z;DispatcherTest.test_wrapped_dispatcher_read_no_ping_timeoutc           	      C   s   t  }t }t| jd||}t }d}tdE}|||}| t|jd |jd }| |d | | |d | | |d | | |d | | |t| W d   dS 1 s_w   Y  dS )	z"Test WrappedDispatcher send methodrp   rJ   rK   rA   r   rq      N)	r   r   r   r3   r   rN   r6   rL   r    )	r   rs   rt   ru   rO   rP   rQ   rR   callr   r   r   test_wrapped_dispatcher_sendE  s   

"z+DispatcherTest.test_wrapped_dispatcher_sendc                 C   s   t  }t }t| jd||}t }d}|jd|g|R   | t|jd |jd }| |d d | |d | | |d | dS )z%Test WrappedDispatcher timeout methodrp   )arg1arg2rT   rA   r   rq   N)r   r   r   r3   r1   r6   rL   r!   )r   rs   rt   ru   r)   r0   r|   r   r   r   test_wrapped_dispatcher_timeout\  s   
z.DispatcherTest.test_wrapped_dispatcher_timeoutc                 C   s~   t  }t }t| jd||}t }|d| | t|jd |jd }| |d d | |d | | |d d dS )z'Test WrappedDispatcher reconnect methodrp   r{   rA   r   rq   )TN)r   r   r   r3   rC   r6   rL   r!   )r   rs   rt   ru   rH   r|   r   r   r   !test_wrapped_dispatcher_reconnectn  s   
z0DispatcherTest.test_wrapped_dispatcher_reconnectN)r   r   r   r4   r9   r@   rI   rS   rd   rh   rj   rl   rm   ro   rv   ry   rz   r}   r   r   r   r   r   r   r2   P   s$    r2   __main__)socketunittestunittest.mockr   r   r   	threadingr<   	websocketwebsocket._dispatcherr   r   r   r   r	   r   r   TestCaser2   r   mainr   r   r   r   <module>   s"   	
  2