o
    M,hda                     @   s   d dl Z d dlZd dlZd dlmZmZmZ d dlmZm	Z	m
Z
mZmZ d dlmZmZ d dlmZ d dlmZmZ 	 G dd de jZed	krNe   dS dS )
    N)Mockpatch	MagicMock)SSLErrorSSLEOFErrorSSLWantReadErrorSSLWantWriteErrorHAVE_SSL)_ssl_socket_wrap_sni_socket)WebSocketException)recvsendc                   @   s   e Zd Zdd Zdd Zdd Zdd Zd	d
 Zdd Zdd Z	dd Z
dd Zdd Zdd Zdd Zdd Zdd Zdd Zdd  Zd!d" Zd#d$ Zd%d& Zd'd( Zd)d* Zd+d, Zd-d. Zd/d0 Zd1d2 Zd3d4 Zd5d6 Zd7d8 Zd9d: Zd;S )<SSLEdgeCasesTestc                 C   s   t s	| d d S d S )NzSSL not available)r	   skipTest)self r   o/var/www/www-root/data/www/bot.pdev.uz/venv/lib/python3.10/site-packages/websocket/tests/test_ssl_edge_cases.pysetUp'   s   zSSLEdgeCasesTest.setUpc              	   C   s   t  }td?}t  }||_td|j_dtji}| 	tj t
||d W d   n1 s1w   Y  W d   dS W d   dS 1 sIw   Y  dS )z$Test SSL handshake failure scenariosssl.SSLContextzSSL handshake timeout	cert_reqsexample.comN)r   r   return_valuesockettimeoutwrap_socketside_effectsslCERT_REQUIREDassertRaisesr
   r   	mock_sockmock_ssl_contextmock_contextssloptr   r   r   test_ssl_handshake_failure+   s   

	"z+SSLEdgeCasesTest.test_ssl_handshake_failurec              	   C      t  }td@}t  }||_td|j_tjdd}| tj t	||d W d   n1 s2w   Y  W d   dS W d   dS 1 sJw   Y  dS )z;Test various SSL certificate verification failure scenariosr   zCertificate verification failedTr   check_hostnamezbadssl.exampleN
r   r   r   r   SSLCertVerificationErrorr   r   r   r   r
   r    r   r   r   *test_ssl_certificate_verification_failures<   s   
	"z;SSLEdgeCasesTest.test_ssl_certificate_verification_failuresc                 C   sh   t  }td#}t  }t  |j_||_d|i}t||d |j  W d   dS 1 s-w   Y  dS )z6Test SSL context configuration with various edge casesr   contextr   N)r   r   r   r   r
   assert_called_once)r   r!   r"   existing_contextr$   r   r   r   )test_ssl_context_configuration_edge_casesM   s   

"z:SSLEdgeCasesTest.test_ssl_context_configuration_edge_casesc                 C   s  t  }tdddi` tdddJ tddd4 td }t  }||_t  |j_i }t||d	 |j  W d
   n1 sAw   Y  W d
   n1 sPw   Y  W d
   n1 s_w   Y  W d
   n1 snw   Y  tdddil tdddM tddd7 td#}t  }||_t  |j_i }t||d	 |jjd
dd W d
   n1 sw   Y  W d
   n1 sw   Y  W d
   n1 sw   Y  W d
   d
S W d
   d
S 1 sw   Y  d
S )z.Test CA bundle environment variable edge cases
os.environWEBSOCKET_CLIENT_CA_BUNDLEz/nonexistent/ca-bundle.crtzos.path.isfileF)r   zos.path.isdirr   r   Nz/etc/ssl/certsT)cafilecapath)	r   r   dictr   r   r
   load_verify_locationsassert_not_calledassert_called_withr    r   r   r   )test_ssl_ca_bundle_environment_edge_cases_   sP   



"z:SSLEdgeCasesTest.test_ssl_ca_bundle_environment_edge_casesc              	   C   s   t  }tdB}t  }||_td|j_t  |j_ddi}| t	 t
||d W d   n1 s4w   Y  W d   dS W d   dS 1 sLw   Y  dS )z(Test SSL cipher configuration edge casesr   zNo cipher can be selectedciphersINVALID_CIPHERr   N)r   r   r   r   r   set_ciphersr   r   r   r   r
   r    r   r   r   (test_ssl_cipher_configuration_edge_cases   s   


"z9SSLEdgeCasesTest.test_ssl_cipher_configuration_edge_casesc              	   C      t  }tdA}t  }||_td|j_t  |j_ddi}| t t	||d W d   n1 s3w   Y  W d   dS W d   dS 1 sKw   Y  dS )z(Test ECDH curve configuration edge casesr   zunknown curve name
ecdh_curveinvalid_curver   N)
r   r   r   
ValueErrorset_ecdh_curver   r   r   r   r
   r    r   r   r   test_ssl_ecdh_curve_edge_cases      

"z/SSLEdgeCasesTest.test_ssl_ecdh_curve_edge_casesc              	   C   r=   )z0Test client certificate configuration edge casesr   zNo such filecertfilez/nonexistent/client.crtr   N)
r   r   r   FileNotFoundErrorload_cert_chainr   r   r   r   r
   r    r   r   r   &test_ssl_client_certificate_edge_cases   rC   z7SSLEdgeCasesTest.test_ssl_client_certificate_edge_casesc                    s   t  }dg  fdd}||j_d|j_td1}t  }||_dg|j_t|d}| |d |  d d	 |j	  |j	  W d
   d
S 1 sLw   Y  d
S )z)Test SSL want read/write retry edge casesr   c                    s8    d  d7  <  d dkrt d d dkrdS dS )Nr      The operation did not complete      data after retries    r   bufsizeread_attemptsr   r   	mock_recv   s   zMSSLEdgeCasesTest.test_ssl_want_read_write_retry_edge_cases.<locals>.mock_recv      >@selectors.DefaultSelectorTd   rK   rJ   N)
r   r   r   
gettimeoutr   r   selectassertEqualregisterassert_calledr   r!   rR   mock_selector_classmock_selectorresultr   rP   r   )test_ssl_want_read_write_retry_edge_cases   s   	



"z:SSLEdgeCasesTest.test_ssl_want_read_write_retry_edge_casesc                    s   t  }dg  fdd}||j_d|j_td'}t  }||_dg|j_t|d}| |d |  d d	 W d
   d
S 1 sBw   Y  d
S )z$Test SSL want write retry edge casesr   c                    s<    d  d7  <  d dkrt d d dkrt| S dS )Nr   rH   rI   rJ   )r   len)datawrite_attemptsr   r   	mock_send   s   zHSSLEdgeCasesTest.test_ssl_want_write_retry_edge_cases.<locals>.mock_sendrS   rT   T	   test data	   rJ   N)r   r   r   rV   r   r   rW   rX   )r   r!   rd   r\   r]   r^   r   rb   r   $test_ssl_want_write_retry_edge_cases   s   	


"z5SSLEdgeCasesTest.test_ssl_want_write_retry_edge_casesc                 C   s`   t  }td|j_d|j_ddlm} | | t|d W d   dS 1 s)w   Y  dS )zTest SSL EOF error edge caseszSSL connection has been closedrS   r   "WebSocketConnectionClosedExceptionre   N)	r   r   r   r   rV   r   websocket._exceptionsri   r   r   r!   ri   r   r   r   test_ssl_eof_error_edge_cases   s   "z.SSLEdgeCasesTest.test_ssl_eof_error_edge_casesc                 C   st   ddl m} ddlm} t }d|j_t|d}t |_||j_||d}||t }| 	||g |j
  dS )zTest SSL pending data scenariosr   )SSLDispatcher)WebSocketApp   )specg      @N)websocket._dispatcherrm   websocket._apprn   r   pendingr   sockrW   rX   r-   )r   rm   rn   mock_ssl_sockmock_app
dispatcherr^   r   r   r    test_ssl_pending_data_edge_cases  s   

z1SSLEdgeCasesTest.test_ssl_pending_data_edge_casesc                    s   t  }d  fdd}||j_d|j_td%}t  }||_dg|j_t|d}| |d |  d	 W d
   d
S 1 s?w   Y  d
S )z Test SSL renegotiation scenariosr   c                    s    d7   dkrt ddS )NrH   zSSL renegotiation required   data after renegotiationrM   rN   
call_countr   r   rR     s   zESSLEdgeCasesTest.test_ssl_renegotiation_edge_cases.<locals>.mock_recvrS   rT   TrU   ry   rJ   N)r   r   r   rV   r   r   rW   rX   r[   r   rz   r   !test_ssl_renegotiation_edge_cases  s   


"z2SSLEdgeCasesTest.test_ssl_renegotiation_edge_casesc                 C   sr   t  }td(}t  }||_t  |j_ddi}t||d |jj|dddd W d   dS 1 s2w   Y  dS )z+Test SSL server hostname override scenariosr   server_hostnamezoverride.example.comzoriginal.example.comTdo_handshake_on_connectsuppress_ragged_eofsr}   Nr   r   r   r   r
   r7   r    r   r   r   !test_ssl_server_hostname_override3  s   

"z2SSLEdgeCasesTest.test_ssl_server_hostname_overridec                 C   s   t  }td2}t  }||_t  |j_ttdr1dtji}t||d |tj W d   dS W d   dS 1 s<w   Y  dS )z$Test SSL protocol version edge casesr   PROTOCOL_TLSssl_versionr   N)	r   r   r   r   hasattrr   r   r
   r7   r    r   r   r   $test_ssl_protocol_version_edge_casesH  s   



"z5SSLEdgeCasesTest.test_ssl_protocol_version_edge_casesc              	   C   s   t  }tdddi? td"}t  }||_t  |j_i }t||d | |jd W d   n1 s5w   Y  W d   dS W d   dS 1 sMw   Y  dS )z-Test SSL keylog file configuration edge casesr0   SSLKEYLOGFILEz/tmp/ssl_keys.logr   r   N)r   r   r4   r   r   r
   rX   keylog_filenamer    r   r   r   test_ssl_keylog_file_edge_casesY  s   

"z0SSLEdgeCasesTest.test_ssl_keylog_file_edge_casesc           
   
   C   s   t  }tjdtjdftjdtjdftjdtjdfg}|D ]U\}}}}| j||d@ td,}t  }||_t  |j_||d}	t||	d | 	|j
| | 	|j| W d   n1 s]w   Y  W d   n1 slw   Y  qdS )z1Test different SSL verification mode combinationsFTr'   r   r   N)r   r   	CERT_NONEr   subTestr   r   r   r
   rX   verify_moder(   )
r   r!   
test_casesr   r(   expected_verifyexpected_checkr"   r#   r$   r   r   r   #test_ssl_context_verification_modesj  s(   


z4SSLEdgeCasesTest.test_ssl_context_verification_modesc                 C   s^   ddl m} t }td|j_| }||_d|_z|  W dS  ty.   | 	d Y dS w )z#Test SSL socket shutdown edge casesr   )	WebSocketzSSL shutdown failedTz/SSL shutdown error should be handled gracefullyN)
websocket._corer   r   r   shutdownr   rt   	connectedclosefail)r   r   ru   wsr   r   r   #test_ssl_socket_shutdown_edge_cases  s   z4SSLEdgeCasesTest.test_ssl_socket_shutdown_edge_casesc                 C   sd   t  }td|j_d|j_ddlm} | t|f t|d W d   dS 1 s+w   Y  dS )z6Test SSL socket being closed during ongoing operationsz+SSL connection has been closed unexpectedlyrS   r   rh   rU   N)	r   r   r   r   rV   r   rj   ri   r   rk   r   r   r   &test_ssl_socket_close_during_operation  s   "z7SSLEdgeCasesTest.test_ssl_socket_close_during_operationc              	   C   s   t  }td2}t  }||_t  |j_ddi}zt||d W n	 ty(   Y n	w W d   dS W d   dS 1 s<w   Y  dS )z-Test SSL compression configuration edge casesr   compressionFr   N)r   r   r   r   r
   AttributeErrorr    r   r   r   test_ssl_compression_edge_cases  s   

"z0SSLEdgeCasesTest.test_ssl_compression_edge_casesc                 C   sp   t  }td'}t  }||_t  }||j_d|_d|_t|i d}| | W d   dS 1 s1w   Y  dS )z Test SSL session reuse scenariosr   mock_sessionTr   N)r   r   r   r   sessionsession_reusedr
   assertIsNotNone)r   r!   r"   r#   ru   r^   r   r   r   !test_ssl_session_reuse_edge_cases  s   
"z2SSLEdgeCasesTest.test_ssl_session_reuse_edge_casesc                 C   sl   t  }td%}t  }||_t  |j_dddgi}t||d}| | W d   dS 1 s/w   Y  dS )zATest SSL ALPN (Application Layer Protocol Negotiation) edge casesr   alpn_protocolszhttp/1.1h2r   N)r   r   r   r   r
   r   )r   r!   r"   r#   r$   r^   r   r   r   !test_ssl_alpn_protocol_edge_cases  s   

"z2SSLEdgeCasesTest.test_ssl_alpn_protocol_edge_casesc                 C   sn   t  }td&}t  }||_t  |j_d}t|i | |jj|dd|d W d   dS 1 s0w   Y  dS )z0Test SSL SNI (Server Name Indication) edge casesr   z2001:db8::1Tr~   Nr   )r   r!   r"   r#   ipv6_hostnamer   r   r   test_ssl_sni_edge_cases  s   

"z(SSLEdgeCasesTest.test_ssl_sni_edge_casesc                    s\   t   dd }| j_d j_ddlm} | fdddd	}|d
}| t	|d dS )z'Test SSL buffer size related edge casesc                 S   s   | dkrt ddt| d S )N @  z"[SSL: BAD_LENGTH] buffer too large   Aro   )r   minrN   r   r   r   rR     s   zCSSLEdgeCasesTest.test_ssl_buffer_size_edge_cases.<locals>.mock_recvrS   r   )frame_bufferc                    s
   t  | S )N)r   )sizer!   r   r   <lambda>  s   
 zBSSLEdgeCasesTest.test_ssl_buffer_size_edge_cases.<locals>.<lambda>T)skip_utf8_validationr   N)
r   r   r   rV   r   websocket._abnfr   recv_strictassertGreaterr`   )r   rR   r   fbr^   r   r   r   test_ssl_buffer_size_edge_cases  s   
z0SSLEdgeCasesTest.test_ssl_buffer_size_edge_casesc              	   C   s   t  }td?}t  }||_td|j_dtji}| tj t	||d W d   n1 s1w   Y  W d   dS W d   dS 1 sIw   Y  dS )z&Test SSL protocol downgrade protectionr   SSLV3_ALERT_HANDSHAKE_FAILUREr   r   N)
r   r   r   r   r   r   r   PROTOCOL_TLS_CLIENTr   r
   r    r   r   r   &test_ssl_protocol_downgrade_protection  s   


"z7SSLEdgeCasesTest.test_ssl_protocol_downgrade_protectionc              	   C   r&   )z0Test SSL certificate chain validation edge casesr   z2certificate verify failed: certificate has expiredTr'   zexpired.badssl.comNr)   r    r   r   r   %test_ssl_certificate_chain_validation  s   
"z6SSLEdgeCasesTest.test_ssl_certificate_chain_validationc              	   C   s   t  }td>}t  }||_td|j_ddi}| tj t||d W d   n1 s0w   Y  W d   dS W d   dS 1 sHw   Y  dS )z(Test SSL weak cipher rejection scenariosr   zno shared cipherr9   zRC4-MD5r   N)	r   r   r   r   r   r   r   r   r
   r    r   r   r   test_ssl_weak_cipher_rejection+  s   
"z/SSLEdgeCasesTest.test_ssl_weak_cipher_rejectionc           	      C   s   t  }g d}|D ]\}}| j||dq td]}t  }||_||krYd|v rYtd| d| d|j_tjdd	}| 	tj t
||| W d
   n1 sSw   Y  nt  |j_tjdd	}t
|||}| | W d
   n1 syw   Y  W d
   n1 sw   Y  q	d
S )z)Test SSL hostname verification edge cases))*.example.comzsubdomain.example.com)r   zsub.subdomain.example.com)r   zwww.example.com)certhostnamer   zsub.subdomainz
hostname 'z' doesn't match ''Tr'   N)r   r   r   r   r   r*   r   r   r   r   r
   r   )	r   r!   r   cert_hostnameconnect_hostnamer"   r#   r$   r^   r   r   r   )test_ssl_hostname_verification_edge_cases:  s>   


z:SSLEdgeCasesTest.test_ssl_hostname_verification_edge_casesc              	   C   s   t  }z;ddl}t|dr<td }t  }||_t  |j_t|i d |j  W d   W dS 1 s4w   Y  W dS W dS  tt	fyO   | 
d Y dS w )zTest SSL memory BIO edge casesr   N	MemoryBIOr   r   zSSL MemoryBIO not available)r   r   r   r   r   r   r
   r-   ImportErrorr   r   )r   r!   r   r"   r#   r   r   r   test_ssl_memory_bio_edge_casesf  s   


&z/SSLEdgeCasesTest.test_ssl_memory_bio_edge_casesN) __name__
__module____qualname__r   r%   r+   r/   r8   r<   rB   rG   r_   rg   rl   rx   r|   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   %   s<    & ,r   __main__)unittestr   r   unittest.mockr   r   r   websocket._ssl_compatr   r   r   r   r	   websocket._httpr
   r   rj   r   websocket._socketr   r   TestCaser   r   mainr   r   r   r   <module>   s"       \