Network Programming

Overview

Python's socket library provides low-level network access for building port scanners, reverse shells, and custom protocol clients. Higher-level libraries like scapy enable packet crafting and sniffing, while paramiko provides SSH automation. This file covers practical network programming patterns used in security testing.

Socket Programming

TCP Client

# socket — low-level networking
# https://docs.python.org/3/library/socket.html

import socket

def tcp_connect(host, port, timeout=5):
    """Connect to a TCP service and return the banner."""
    try:
        with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
            s.settimeout(timeout)
            s.connect((host, port))
            # Send data
            s.send(b"HEAD / HTTP/1.1\r\nHost: target\r\n\r\n")
            # Receive response
            response = s.recv(4096)
            return response.decode(errors="replace")
    except socket.error as e:
        return f"Error: {e}"

banner = tcp_connect("10.0.0.1", 80)
print(banner)

TCP Server (Listener)

# socket — TCP server
# https://docs.python.org/3/library/socket.html

import socket

def start_listener(bind_ip="0.0.0.0", bind_port=4444):
    """Start a TCP listener (e.g., for catching reverse shells)."""
    with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as server:
        server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        server.bind((bind_ip, bind_port))
        server.listen(1)
        print(f"[*] Listening on {bind_ip}:{bind_port}")

        client, addr = server.accept()
        print(f"[+] Connection from {addr[0]}:{addr[1]}")

        with client:
            while True:
                data = client.recv(4096)
                if not data:
                    break
                print(data.decode(errors="replace"), end="")

UDP Client

# socket — UDP communication
# https://docs.python.org/3/library/socket.html

import socket

def udp_send(host, port, data, timeout=3):
    """Send a UDP datagram and receive a response."""
    with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as s:
        s.settimeout(timeout)
        s.sendto(data, (host, port))
        try:
            response, addr = s.recvfrom(4096)
            return response
        except socket.timeout:
            return None

# UDP example: send data to a target
response = udp_send("10.0.0.1", 161, b"\x30\x26\x02\x01\x01")  # SNMP query bytes

Port Scanner

# socket — TCP port scanner
# https://docs.python.org/3/library/socket.html

import socket
from concurrent.futures import ThreadPoolExecutor, as_completed

def scan_port(host, port, timeout=1):
    """Check if a single TCP port is open."""
    try:
        with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
            s.settimeout(timeout)
            result = s.connect_ex((host, port))
            if result == 0:
                # Try to grab banner
                try:
                    s.send(b"\r\n")
                    banner = s.recv(1024).decode(errors="replace").strip()
                except (socket.timeout, socket.error):
                    banner = ""
                return port, True, banner
            return port, False, ""
    except socket.error:
        return port, False, ""

def port_scan(host, ports, threads=50):
    """Scan multiple ports concurrently."""
    open_ports = []
    with ThreadPoolExecutor(max_workers=threads) as executor:
        futures = {
            executor.submit(scan_port, host, port): port
            for port in ports
        }
        for future in as_completed(futures):
            port, is_open, banner = future.result()
            if is_open:
                open_ports.append((port, banner))
                info = f" — {banner}" if banner else ""
                print(f"  [+] {port}/tcp open{info}")
    return sorted(open_ports)

# Usage
target = "10.0.0.1"
print(f"Scanning {target}...")
results = port_scan(target, range(1, 1025))
print(f"\n{len(results)} open ports found")

Packet Crafting with Scapy

Basic Packet Construction

# Scapy
# https://scapy.net/

from scapy.all import IP, TCP, UDP, ICMP, sr1, sr, send, conf

# Disable verbose output
conf.verb = 0

# ICMP ping
pkt = IP(dst="10.0.0.1") / ICMP()
resp = sr1(pkt, timeout=2)
if resp:
    print(f"Host is up (TTL: {resp.ttl})")

# TCP SYN packet
syn = IP(dst="10.0.0.1") / TCP(dport=80, flags="S")
resp = sr1(syn, timeout=2)
if resp and resp.haslayer(TCP):
    if resp[TCP].flags == "SA":  # SYN-ACK
        print("Port 80 is open")
    elif resp[TCP].flags == "RA":  # RST-ACK
        print("Port 80 is closed")

# UDP packet
udp_pkt = IP(dst="10.0.0.1") / UDP(dport=53) / b"\x00\x00"
resp = sr1(udp_pkt, timeout=2)

SYN Scan

# Scapy — SYN scan
# https://scapy.net/

from scapy.all import IP, TCP, sr, conf

conf.verb = 0

def syn_scan(target, ports):
    """Perform a SYN scan using scapy."""
    # Send SYN packets to all ports at once
    pkt = IP(dst=target) / TCP(dport=ports, flags="S")
    answered, unanswered = sr(pkt, timeout=2)

    open_ports = []
    for sent, received in answered:
        if received.haslayer(TCP) and received[TCP].flags == 0x12:  # SYN-ACK
            open_ports.append(received[TCP].sport)
            # Send RST to close the half-open connection
            rst = IP(dst=target) / TCP(
                dport=received[TCP].sport, flags="R"
            )
            send(rst)

    return sorted(open_ports)

# Requires root privileges
results = syn_scan("10.0.0.1", list(range(1, 1025)))
for port in results:
    print(f"  {port}/tcp open")

ARP Scanning

# Scapy — ARP host discovery
# https://scapy.net/

from scapy.all import Ether, ARP, srp, conf

conf.verb = 0

def arp_scan(network):
    """Discover hosts on a local network via ARP."""
    pkt = Ether(dst="ff:ff:ff:ff:ff:ff") / ARP(pdst=network)
    answered, _ = srp(pkt, timeout=2)

    hosts = []
    for sent, received in answered:
        hosts.append({
            "ip": received[ARP].psrc,
            "mac": received[ARP].hwsrc
        })
    return hosts

# Requires root privileges
hosts = arp_scan("10.0.0.0/24")
for h in hosts:
    print(f"  {h['ip']:15s}  {h['mac']}")

Packet Sniffing

# Scapy — packet capture and filtering
# https://scapy.net/

from scapy.all import sniff, wrpcap, DNS, DNSQR, TCP, IP

def packet_callback(pkt):
    """Process each captured packet."""
    if pkt.haslayer(DNS) and pkt.haslayer(DNSQR):
        query = pkt[DNSQR].qname.decode()
        print(f"DNS query: {query}")

    if pkt.haslayer(TCP) and pkt.haslayer(IP):
        src = pkt[IP].src
        dst = pkt[IP].dst
        dport = pkt[TCP].dport
        if pkt[TCP].flags == "S":
            print(f"SYN: {src} → {dst}:{dport}")

# Capture 100 packets on eth0 with a BPF filter
# Requires root privileges
packets = sniff(
    iface="eth0",
    count=100,
    filter="tcp port 80 or udp port 53",
    prn=packet_callback
)

# Save captured packets to PCAP
wrpcap("capture.pcap", packets)

SSH Automation with Paramiko

# Paramiko
# https://www.paramiko.org/

import paramiko

def ssh_execute(host, username, password, command, port=22):
    """Execute a command over SSH and return the output."""
    client = paramiko.SSHClient()
    client.set_missing_host_key_policy(paramiko.AutoAddPolicy())

    try:
        client.connect(host, port=port, username=username, password=password)
        stdin, stdout, stderr = client.exec_command(command)
        output = stdout.read().decode()
        errors = stderr.read().decode()
        return output, errors
    finally:
        client.close()

# Execute a command
output, errors = ssh_execute(
    "10.0.0.1", "admin", "password123", "id && hostname"
)
print(output)

SSH Key Authentication

# Paramiko — key-based authentication
# https://www.paramiko.org/

import paramiko

def ssh_key_execute(host, username, key_path, command, port=22):
    """Execute a command over SSH using key authentication."""
    client = paramiko.SSHClient()
    client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
    key = paramiko.RSAKey.from_private_key_file(key_path)

    try:
        client.connect(host, port=port, username=username, pkey=key)
        stdin, stdout, stderr = client.exec_command(command)
        return stdout.read().decode()
    finally:
        client.close()

# SFTP file transfer
def sftp_download(host, username, password, remote_path, local_path, port=22):
    """Download a file over SFTP."""
    transport = paramiko.Transport((host, port))
    transport.connect(username=username, password=password)
    sftp = paramiko.SFTPClient.from_transport(transport)

    try:
        sftp.get(remote_path, local_path)
        print(f"Downloaded: {remote_path} → {local_path}")
    finally:
        sftp.close()
        transport.close()

SSH Command on Multiple Hosts

# Paramiko — run commands on multiple hosts
# https://www.paramiko.org/

import paramiko
from concurrent.futures import ThreadPoolExecutor

def run_on_host(host, username, password, command):
    """Run a command on a single host and return results."""
    try:
        client = paramiko.SSHClient()
        client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
        client.connect(host, username=username, password=password, timeout=5)
        stdin, stdout, stderr = client.exec_command(command)
        output = stdout.read().decode().strip()
        client.close()
        return host, output, None
    except Exception as e:
        return host, None, str(e)

# Run 'id' on multiple hosts
hosts = ["10.0.0.1", "10.0.0.2", "10.0.0.3"]
with ThreadPoolExecutor(max_workers=10) as executor:
    futures = [
        executor.submit(run_on_host, h, "admin", "password", "id")
        for h in hosts
    ]
    for future in futures:
        host, output, error = future.result()
        if output:
            print(f"  {host}: {output}")
        else:
            print(f"  {host}: ERROR — {error}")

DNS Resolution

# socket — DNS lookups
# https://docs.python.org/3/library/socket.html

import socket

# Forward lookup
try:
    ip = socket.gethostbyname("example.com")
    print(f"A record: {ip}")
except socket.gaierror:
    print("Resolution failed")

# Get all addresses
addrs = socket.getaddrinfo("example.com", 443, socket.AF_INET)
for addr in addrs:
    print(f"  {addr[4][0]}")

# Reverse lookup
try:
    hostname, aliases, ips = socket.gethostbyaddr("8.8.8.8")
    print(f"PTR: {hostname}")
except socket.herror:
    print("Reverse lookup failed")

References

Tools

Further Reading