From 14041120cdf948b08ca1a3423e8a707e88dcbf2c Mon Sep 17 00:00:00 2001 From: Tianyu Liu Date: Sat, 30 Aug 2025 12:09:20 +0200 Subject: [PATCH] add helper telnet --- .gitignore | 1 + helper/remote-serial.py | 125 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 126 insertions(+) create mode 100644 helper/remote-serial.py diff --git a/.gitignore b/.gitignore index 89cc49c..933c44f 100644 --- a/.gitignore +++ b/.gitignore @@ -3,3 +3,4 @@ .vscode/c_cpp_properties.json .vscode/launch.json .vscode/ipch +.venv/ \ No newline at end of file diff --git a/helper/remote-serial.py b/helper/remote-serial.py new file mode 100644 index 0000000..2931851 --- /dev/null +++ b/helper/remote-serial.py @@ -0,0 +1,125 @@ +#!/usr/bin/env python3 +""" +Telnet-only tool to toggle DTR/RTS via ser2net admin console. + +New sequences (physical levels): + - restart: RTS HI -> DTR LO -> DTR HI + - download: RTS LO -> DTR LO -> DTR HI -> RTS HI + +Timing parameters tunable via args (ms). Use admin host/port and connection name (con). +Example: + python3 helper/remote-serial.py --host 192.168.1.100 --admin-port 7373 --con con1 --action restart -v +""" +import argparse +import socket +import errno +import time +import sys + +def send_admin_cmd(host, port, cmd, timeout=1.0, verbose=False): + if verbose: + print(f"> {cmd}") + try: + with socket.create_connection((host, port), timeout=timeout) as s: + s.settimeout(timeout) + try: + s.sendall(cmd.encode('ascii') + b"\n") + except Exception as e: + print(f"ERROR sending command: {e}", file=sys.stderr) + return None + # small pause to let server respond + time.sleep(0.05) + chunks = [] + while True: + try: + data = s.recv(4096) + if not data: + break + chunks.append(data) + # short sleep to allow more data to arrive + time.sleep(0.01) + except socket.timeout: + break + except OSError as e: + if getattr(e, 'errno', None) == errno.EWOULDBLOCK: + break + break + resp = b"".join(chunks) + if resp: + try: + txt = resp.decode(errors="ignore") + except Exception: + txt = str(resp) + if verbose: + print(txt.strip()) + return txt + return "" + except Exception as e: + print(f"ERROR: cannot connect to {host}:{port} -> {e}", file=sys.stderr) + return None + +def set_port_control(host, admin_port, con, line, hi_or_lo, timeout, verbose): + # hi_or_lo must be 'HI' or 'LO' + cmd = f"setportcontrol {con} {line}{hi_or_lo}" + return send_admin_cmd(host, admin_port, cmd, timeout=timeout, verbose=verbose) + +def run_sequence(host, admin_port, con, seq, timeout, verbose): + # seq: list of (line, state, sleep_seconds) + for line, state, sleep_s in seq: + if verbose: + print(f"-> {line}{state} (wait {sleep_s:.3f}s)") + res = set_port_control(host, admin_port, con, line, state, timeout, verbose) + # continue even if response is None; user sees errors in stderr + if sleep_s and sleep_s > 0: + time.sleep(sleep_s) + +def main(): + p = argparse.ArgumentParser(description="ser2net admin: restart / download sequences (explicit HI/LO)") + p.add_argument("--host", required=True, help="ser2net admin host/ip") + p.add_argument("--admin-port", type=int, required=True, help="ser2net admin port") + p.add_argument("--con", required=True, help="ser2net connection name (e.g. con1)") + p.add_argument("--action", choices=["restart", "download", "bootloader"], default="restart", + help="restart or download (bootloader alias)") + p.add_argument("--pulse-ms", type=float, default=100.0, help="DTR pulse length in milliseconds") + p.add_argument("--step-delay-ms", type=float, default=20.0, help="short delay between steps in milliseconds") + p.add_argument("--tail-delay-ms", type=float, default=50.0, help="extra delay after sequence in milliseconds") + p.add_argument("--timeout", type=float, default=1.0, help="telnet connect/read timeout seconds") + p.add_argument("-v", "--verbose", action="store_true") + args = p.parse_args() + + pulse_s = max(0.001, args.pulse_ms / 1000.0) + step_delay_s = max(0.0, args.step_delay_ms / 1000.0) + tail_delay_s = max(0.0, args.tail_delay_ms / 1000.0) + + if args.verbose: + print(f"Admin {args.host}:{args.admin_port} con={args.con} action={args.action}") + + # sequences use explicit physical HI/LO as requested + if args.action == "restart": + seq = [ + ("RTS", "HI", step_delay_s), # RTS HI + ("DTR", "LO", pulse_s), # DTR LO (pulse start) + ("DTR", "HI", tail_delay_s), # DTR HI (pulse end), then tail delay + ] + else: # download / bootloader + # Ensure known starting state (RESET released), then enter download: + # 1) ensure DTR=HI (reset released) + # 2) RTS=LO (IO0 low) + # 3) DTR=LO -> wait pulse_ms + # 4) DTR=HI -> small delay + # 5) RTS=HI -> release IO0 (enter bootloader) + seq = [ + ("DTR", "HI", step_delay_s), # ensure reset released + ("RTS", "LO", step_delay_s), # RTS LO (IO0 low) + ("DTR", "LO", pulse_s), # DTR LO (reset asserted) + ("DTR", "HI", step_delay_s), # DTR HI (reset released) + ("RTS", "HI", tail_delay_s), # RTS HI (release IO0) + ] + + run_sequence(args.host, args.admin_port, args.con, seq, args.timeout, args.verbose) + + if args.verbose: + print("Sequence finished.") + +if __name__ == "__main__": + main() \ No newline at end of file