)zX d Z ddlZddlZddlZddlZddlZddlZddlZ ddl mZmZ ddl mZ ddlmZmZmZmZ erddlZ ee ZdZdedefd Z G d de Z G d d Z G d de Z G d de Z G d dej j Z! G d dej j" Z# G d de j$ j% Z& G d de j$ j' Z( G d d Z)dS )av Networking transport helpers for urllib. This module provides a small abstraction on top of urllib.request so that callers can keep using urllib.request.Request, but routing of connections can be customized: - hostname resolution is handled in user code; - selected IP may be randomized or chosen using any complex logic; - for HTTPS: connects to a chosen IP but keeps correct SNI and certificate hostname validation for the original hostname (NOT the IP). Examples: Default behavior (plain urllib): from defence360agent.utils.net_transport import UrlTransport transport = UrlTransport() req = urllib.request.Request( "https://files.imunify360.com/static/sigs/v1/description.json" ) with transport.open(req, timeout=10) as resp: body = resp.read() Randomize target IP on each connection (A/AAAA -> random choice): from defence360agent.utils.net_transport import UrlTransport, RandomIpChooser chooser = RandomIpChooser() transport = UrlTransport(ip_chooser=chooser) req = urllib.request.Request( "https://files.imunify360.com/static/sigs/v1/description.json" ) with transport.open(req, timeout=10) as resp: body = resp.read() Notes: - HTTPS: connects to the chosen IP but keeps SNI/cert checks against original hostname. - HTTP: Host header stays original hostname because urllib builds it from the URL. N)ABCabstractmethod) getLogger)DictOptionalTuple TYPE_CHECKINGg r@ipreturnc ~ t t j | t j S # t $ r Y dS w xY w)z~Return True if *ip* is an IPv4 address string. Implementation relies solely on ipaddress.ip_address for correctness. F) isinstance ipaddress ip_addressIPv4Address ValueError)r s X/opt/imunify360/venv/lib/python3.11/site-packages/defence360agent/utils/net_transport.py_is_ipv4r C sG ).r22I4IJJJ uus +. <<c J e Zd ZdZedededefd ZdededefdZdS ) IpChooserzSelect an IP address to connect to for a given hostname and port. Implementations may be stateful and can keep caches/metrics inside. hostnameportr c t )z6Return an IP address (v4 or v6) for *hostname*:*port*.)NotImplementedErrorselfr r s r choosezIpChooser.chooseT s "! c . | || S N)r r s r __call__zIpChooser.__call__Y s {{8T***r N) __name__ __module____qualname____doc__r strintr r r r r r N s| "s "# "# " " " ^"+ +C +C + + + + + +r r c X e Zd ZdZej eddedefdZ de dedee d f fd ZdS )DnsCacheResolverzDNS cache for socket.getaddrinfo() results. It caches per (hostname, port, family). This is intentionally small and local: it is meant only to avoid excessive getaddrinfo() calls. )familyttl_secondsr* r+ c ` || _ || _ i | _ t j | _ d S r )_family_ttl_seconds_cache threadingLock_lock)r r* r+ s r __init__zDnsCacheResolver.__init__d s4 ' ^%% r r r r .c , ||| j f}t j }| j 5 | j | }|;|\ }}||k r0t d||| j |cd d d S d d d n# 1 swxY w Y t d||| j t j ||| j t j }g }|D ])\ } } } } } | d }||vr| | *|s#t d || t | }| j 5 || j z |f| j |<