355 lines
14 KiB
Python
Executable File
Vendored
355 lines
14 KiB
Python
Executable File
Vendored
#!/usr/bin/env python3
|
|
"""
|
|
NetFS — Binary File Server for Sharp MZ BASIC Network Drive
|
|
|
|
Serves MZF files from a local directory over TCP using a simple binary protocol
|
|
designed for easy Z80 assembly parsing.
|
|
|
|
Binary Protocol:
|
|
All requests start with 1-byte command. All responses start with 1-byte status.
|
|
Status: 0x00=OK, 0xFF=error, 0xFE=end of data.
|
|
|
|
DIR (0x01):
|
|
Request: [0x01] [unit:1]
|
|
Response: [0x00] [count:1] [entry0:32] ... [entryN:32]
|
|
Each entry is 32 bytes: atrb(1) + name(17) + size(2LE) + dtadr(2LE) + exadr(2LE) + pad(8)
|
|
Returns up to 63 entries from the directory mapped to unit (1-7). Max 63 (2KB RX buffer).
|
|
|
|
INFO (0x02):
|
|
Request: [0x02] [filename:17]
|
|
Response: [0x00] [entry:32] (or [0xFF] if not found)
|
|
|
|
READ (0x03):
|
|
Request: [0x03] [unit:1] [filename:17] [offset:2LE] [length:2LE]
|
|
Response: [0x00] [actual_len:2LE] [data:actual_len] (or [0xFF] if not found)
|
|
|
|
WRITE (0x04):
|
|
Request: [0x04] [unit:1] [filename:17] [atrb:1] [size:2LE] [dtadr:2LE] [exadr:2LE] [data:size]
|
|
Response: [0x00] (or [0xFF] on error)
|
|
|
|
CLOSE (0x05):
|
|
Request: [0x05]
|
|
Response: [0x00] (server closes connection)
|
|
|
|
DELETE (0x06):
|
|
Request: [0x06] [filename:17]
|
|
Response: [0x00] (or [0xFF] if not found)
|
|
|
|
All commands include a unit byte (1-7) selecting the server directory.
|
|
|
|
Usage:
|
|
python3 netfs.py [--port 6800] [--dir ./mzf_files] [--dir2 path] ... [--dir7 path]
|
|
|
|
--dir maps to NET1: (or NET: with no digit), --dir2 maps to NET2:, etc.
|
|
Units without a --dirN argument fall back to --dir.
|
|
|
|
(c) 2026 Philip Smart <philip.smart@net2net.org>
|
|
"""
|
|
|
|
import argparse
|
|
import os
|
|
import socket
|
|
import struct
|
|
import threading
|
|
import sys
|
|
|
|
DEFAULT_PORT = 6800
|
|
DEFAULT_DIR = "."
|
|
MZF_HEADER_SIZE = 128
|
|
|
|
# Protocol constants
|
|
CMD_DIR = 0x01
|
|
CMD_INFO = 0x02
|
|
CMD_READ = 0x03
|
|
CMD_WRITE = 0x04
|
|
CMD_CLOSE = 0x05
|
|
CMD_DELETE = 0x06
|
|
STATUS_OK = 0x00
|
|
STATUS_ERR = 0xFF
|
|
STATUS_END = 0xFE
|
|
|
|
def parse_mzf_header(data):
|
|
"""Parse a 128-byte MZF header."""
|
|
if len(data) < MZF_HEADER_SIZE:
|
|
return None
|
|
atrb = data[0]
|
|
# Remap RB/BSD/BRD types to BTX (type 2) so BASIC can LOAD them
|
|
if atrb in (0x03, 0x04, 0x05):
|
|
atrb = 0x02
|
|
name_raw = data[1:18]
|
|
name = name_raw.rstrip(b'\x00\x0d').decode('ascii', errors='replace')
|
|
size = struct.unpack('<H', data[18:20])[0]
|
|
dtadr = struct.unpack('<H', data[20:22])[0]
|
|
exadr = struct.unpack('<H', data[22:24])[0]
|
|
return {
|
|
'atrb': atrb, 'name': name, 'name_raw': name_raw,
|
|
'size': size, 'dtadr': dtadr, 'exadr': exadr
|
|
}
|
|
|
|
def make_dir_entry(atrb, name_raw, size, dtadr, exadr):
|
|
"""Create a 32-byte binary directory entry (standard MZF format).
|
|
Z80 handler remaps fields as needed for the target BASIC."""
|
|
entry = bytearray(32)
|
|
entry[0] = atrb & 0xFF
|
|
entry[1:18] = name_raw[:17].ljust(17, b'\x00')
|
|
struct.pack_into('<H', entry, 18, size & 0xFFFF)
|
|
struct.pack_into('<H', entry, 20, dtadr & 0xFFFF)
|
|
struct.pack_into('<H', entry, 22, exadr & 0xFFFF)
|
|
return bytes(entry)
|
|
|
|
def scan_directory(dirpath):
|
|
"""Scan directory for MZF files."""
|
|
entries = []
|
|
for fname in sorted(os.listdir(dirpath)):
|
|
fpath = os.path.join(dirpath, fname)
|
|
if not os.path.isfile(fpath):
|
|
continue
|
|
ext = os.path.splitext(fname)[1].lower()
|
|
if ext in ('.mzf', '.mzt', '.m12'):
|
|
try:
|
|
with open(fpath, 'rb') as f:
|
|
hdr_data = f.read(MZF_HEADER_SIZE)
|
|
hdr = parse_mzf_header(hdr_data)
|
|
if hdr:
|
|
hdr['filepath'] = fpath
|
|
hdr['data_offset'] = MZF_HEADER_SIZE
|
|
entries.append(hdr)
|
|
except Exception as e:
|
|
print(f" Warning: {fname}: {e}")
|
|
elif ext in ('.bin', '.rom', '.dat'):
|
|
fsize = os.path.getsize(fpath)
|
|
name_clean = os.path.splitext(fname)[0][:17]
|
|
name_raw = name_clean.encode('ascii', errors='replace').ljust(17, b'\x00')
|
|
entries.append({
|
|
'atrb': 0x01, 'name': name_clean, 'name_raw': name_raw,
|
|
'size': fsize & 0xFFFF, 'dtadr': 0x1200, 'exadr': 0x1200,
|
|
'filepath': fpath, 'data_offset': 0
|
|
})
|
|
return entries
|
|
|
|
def find_file(entries, name_bytes):
|
|
"""Find a file by 17-byte name field (binary match)."""
|
|
# Strip trailing nulls/CRs for comparison
|
|
search = name_bytes.rstrip(b'\x00\x0d').decode('ascii', errors='replace').strip().upper()
|
|
for ent in entries:
|
|
if ent['name'].strip().upper() == search:
|
|
return ent
|
|
return None
|
|
|
|
def handle_client(conn, addr, dirs):
|
|
"""Handle a binary protocol client connection.
|
|
dirs: dict mapping unit number (1-7) to directory path."""
|
|
conn.settimeout(30) # 30s timeout — recovers if host crashes mid-transfer.
|
|
print(f"[{addr}] Connected")
|
|
try:
|
|
while True:
|
|
# Read 1-byte command
|
|
cmd_data = conn.recv(1)
|
|
if not cmd_data:
|
|
break
|
|
cmd = cmd_data[0]
|
|
|
|
if cmd == CMD_DIR:
|
|
# Read 1-byte unit number.
|
|
unit_data = conn.recv(1)
|
|
if not unit_data:
|
|
break
|
|
unit = unit_data[0]
|
|
dp = dirs.get(unit)
|
|
entries = scan_directory(dp) if dp else []
|
|
count = min(len(entries), 63)
|
|
resp = bytearray()
|
|
resp.append(STATUS_OK)
|
|
resp.append(count)
|
|
for i in range(count):
|
|
ent = entries[i]
|
|
resp.extend(make_dir_entry(ent['atrb'], ent['name_raw'], ent['size'], ent['dtadr'], ent['exadr']))
|
|
conn.sendall(bytes(resp))
|
|
print(f"[{addr}] DIR unit={unit} ({dp or 'not configured'}) → {count}/{len(entries)} entries ({len(resp)} bytes)")
|
|
|
|
elif cmd == CMD_INFO:
|
|
# Read 17-byte filename
|
|
name_bytes = conn.recv(17)
|
|
if len(name_bytes) < 17:
|
|
conn.sendall(bytes([STATUS_ERR]))
|
|
continue
|
|
# INFO searches unit 1 directory (no unit byte in protocol).
|
|
dp = dirs.get(1)
|
|
entries = scan_directory(dp) if dp else []
|
|
found = find_file(entries, name_bytes)
|
|
if found:
|
|
entry = make_dir_entry(found['atrb'], found['name_raw'], found['size'], found['dtadr'], found['exadr'])
|
|
conn.sendall(bytes([STATUS_OK]) + entry)
|
|
print(f"[{addr}] INFO '{found['name']}' → size={found['size']}")
|
|
else:
|
|
conn.sendall(bytes([STATUS_ERR]))
|
|
name_str = name_bytes.rstrip(b'\x00').decode('ascii', errors='replace')
|
|
print(f"[{addr}] INFO '{name_str}' → NOT FOUND")
|
|
|
|
elif cmd == CMD_READ:
|
|
# [unit:1][name:17][offset:2][length:2] = 22 bytes.
|
|
params = conn.recv(22)
|
|
if len(params) < 22:
|
|
conn.sendall(bytes([STATUS_ERR]))
|
|
continue
|
|
unit = params[0]
|
|
name_bytes = params[1:18]
|
|
offset = struct.unpack('<H', params[18:20])[0]
|
|
length = struct.unpack('<H', params[20:22])[0]
|
|
dp = dirs.get(unit)
|
|
entries = scan_directory(dp) if dp else []
|
|
found = find_file(entries, name_bytes) if entries else None
|
|
if found:
|
|
with open(found['filepath'], 'rb') as f:
|
|
f.seek(found['data_offset'] + offset)
|
|
max_read = found['size'] - offset if offset < found['size'] else 0
|
|
data = f.read(min(length, max_read))
|
|
actual_len = len(data)
|
|
resp = bytearray()
|
|
resp.append(STATUS_OK)
|
|
resp.extend(struct.pack('<H', actual_len))
|
|
resp.extend(data)
|
|
conn.sendall(bytes(resp))
|
|
print(f"[{addr}] READ unit={unit} '{found['name']}' ofs={offset} len={length} → {actual_len} bytes")
|
|
else:
|
|
conn.sendall(bytes([STATUS_ERR]))
|
|
name_str = name_bytes.rstrip(b'\x00').decode('ascii', errors='replace')
|
|
print(f"[{addr}] READ unit={unit} '{name_str}' → NOT FOUND")
|
|
|
|
elif cmd == CMD_WRITE:
|
|
# [unit:1][name:17][atrb:1][size:2][dtadr:2][exadr:2] = 25 bytes.
|
|
params = conn.recv(25)
|
|
if len(params) < 25:
|
|
conn.sendall(bytes([STATUS_ERR]))
|
|
continue
|
|
unit = params[0]
|
|
name_bytes = params[1:18]
|
|
atrb = params[18]
|
|
size = struct.unpack('<H', params[19:21])[0]
|
|
dtadr = struct.unpack('<H', params[21:23])[0]
|
|
exadr = struct.unpack('<H', params[23:25])[0]
|
|
dp = dirs.get(unit)
|
|
if not dp:
|
|
# Drain the data bytes then return error.
|
|
remaining = size
|
|
while remaining > 0:
|
|
chunk = conn.recv(min(remaining, 4096))
|
|
if not chunk:
|
|
break
|
|
remaining -= len(chunk)
|
|
conn.sendall(bytes([STATUS_ERR]))
|
|
name_str = name_bytes.rstrip(b'\x00\x0d').decode('ascii', errors='replace').strip()
|
|
print(f"[{addr}] WRITE unit={unit} '{name_str}' → unit not configured")
|
|
continue
|
|
# Read file data
|
|
data = b''
|
|
remaining = size
|
|
while remaining > 0:
|
|
chunk = conn.recv(min(remaining, 4096))
|
|
if not chunk:
|
|
break
|
|
data += chunk
|
|
remaining -= len(chunk)
|
|
# Build MZF file
|
|
name_str = name_bytes.rstrip(b'\x00\x0d').decode('ascii', errors='replace').strip()
|
|
fname = name_str.replace(' ', '_') + '.mzf'
|
|
fpath = os.path.join(dp, fname)
|
|
hdr = bytearray(MZF_HEADER_SIZE)
|
|
hdr[0] = atrb
|
|
hdr[1:18] = name_bytes[:17]
|
|
struct.pack_into('<H', hdr, 18, size)
|
|
struct.pack_into('<H', hdr, 20, dtadr)
|
|
struct.pack_into('<H', hdr, 22, exadr)
|
|
with open(fpath, 'wb') as f:
|
|
f.write(bytes(hdr))
|
|
f.write(data)
|
|
conn.sendall(bytes([STATUS_OK]))
|
|
print(f"[{addr}] WRITE unit={unit} '{name_str}' atrb={atrb:02X} size={size} → {fpath}")
|
|
|
|
elif cmd == CMD_DELETE:
|
|
# Read 17-byte filename (no unit byte — uses unit 1 dir).
|
|
name_bytes = conn.recv(17)
|
|
if len(name_bytes) < 17:
|
|
conn.sendall(bytes([STATUS_ERR]))
|
|
continue
|
|
dp = dirs.get(1)
|
|
entries = scan_directory(dp) if dp else []
|
|
found = find_file(entries, name_bytes)
|
|
if found:
|
|
try:
|
|
os.remove(found['filepath'])
|
|
conn.sendall(bytes([STATUS_OK]))
|
|
print(f"[{addr}] DELETE '{found['name']}' → {found['filepath']}")
|
|
except Exception as e:
|
|
conn.sendall(bytes([STATUS_ERR]))
|
|
print(f"[{addr}] DELETE '{found['name']}' FAILED: {e}")
|
|
else:
|
|
conn.sendall(bytes([STATUS_ERR]))
|
|
name_str = name_bytes.rstrip(b'\x00').decode('ascii', errors='replace')
|
|
print(f"[{addr}] DELETE '{name_str}' → NOT FOUND")
|
|
|
|
elif cmd == CMD_CLOSE:
|
|
conn.sendall(bytes([STATUS_OK]))
|
|
print(f"[{addr}] CLOSE")
|
|
break
|
|
|
|
else:
|
|
conn.sendall(bytes([STATUS_ERR]))
|
|
print(f"[{addr}] Unknown cmd: 0x{cmd:02X}")
|
|
|
|
except (ConnectionResetError, BrokenPipeError):
|
|
pass
|
|
except socket.timeout:
|
|
print(f"[{addr}] Timeout (host crash recovery)")
|
|
except Exception as e:
|
|
print(f"[{addr}] Error: {e}")
|
|
finally:
|
|
print(f"[{addr}] Disconnected")
|
|
conn.close()
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(description="NetFS — Binary File Server for Sharp MZ BASIC")
|
|
parser.add_argument('--port', '-p', type=int, default=DEFAULT_PORT, help=f"TCP port (default: {DEFAULT_PORT})")
|
|
parser.add_argument('--dir', '-d', type=str, default=DEFAULT_DIR, help="Directory for NET1:/NET: (default: .)")
|
|
for i in range(2, 8):
|
|
parser.add_argument(f'--dir{i}', type=str, default=None, help=f"Directory for NET{i}:")
|
|
args = parser.parse_args()
|
|
|
|
# Build unit → directory mapping.
|
|
dirs = {1: os.path.abspath(args.dir)}
|
|
for i in range(2, 8):
|
|
d = getattr(args, f'dir{i}', None)
|
|
if d:
|
|
dirs[i] = os.path.abspath(d)
|
|
|
|
# Validate all directories exist.
|
|
for unit, path in dirs.items():
|
|
if not os.path.isdir(path):
|
|
print(f"Error: directory '{path}' (NET{unit}:) does not exist")
|
|
sys.exit(1)
|
|
|
|
print(f"NetFS v4.0 — Sharp MZ BASIC Network File Server")
|
|
for unit in sorted(dirs):
|
|
entries = scan_directory(dirs[unit])
|
|
print(f" NET{unit}: {dirs[unit]} ({len(entries)} files)")
|
|
print(f"Listening on port {args.port}")
|
|
print()
|
|
|
|
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
|
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
|
|
sock.bind(('0.0.0.0', args.port))
|
|
sock.listen(5)
|
|
|
|
try:
|
|
while True:
|
|
conn, addr = sock.accept()
|
|
t = threading.Thread(target=handle_client, args=(conn, addr, dirs), daemon=True)
|
|
t.start()
|
|
except KeyboardInterrupt:
|
|
print("\nShutting down...")
|
|
finally:
|
|
sock.close()
|
|
|
|
if __name__ == '__main__':
|
|
main()
|