From 7c8df37cfd78de9d35b57710d0cff3adfa27cfaf Mon Sep 17 00:00:00 2001 From: Fred VanEijk Date: Tue, 22 Oct 2024 16:38:14 -0400 Subject: [PATCH] add a pyhon script to build/read the cpm image --- CPM-sd-image/cpm_file_copier.py | 60 +++++++++++++++++++++++---------- 1 file changed, 43 insertions(+), 17 deletions(-) diff --git a/CPM-sd-image/cpm_file_copier.py b/CPM-sd-image/cpm_file_copier.py index b5e66f1..1b3a6f2 100644 --- a/CPM-sd-image/cpm_file_copier.py +++ b/CPM-sd-image/cpm_file_copier.py @@ -30,7 +30,7 @@ def get_drive_offset(drive_index: int) -> int: def read_directory(image_file: str, drive_index: int) -> List[Tuple[str, int, List[int], int, int]]: """ - Read directory entries, handling 16-bit block numbers. + Read directory entries, handling multiple extents. Returns list of tuples: (filename, user_number, block_list, extent, record_count) """ directory = [] @@ -67,9 +67,11 @@ def read_directory(image_file: str, drive_index: int) -> List[Tuple[str, int, Li def calculate_needed_blocks(content_length: int) -> int: """ Calculate number of blocks needed for a given content length. - Even empty files need at least one block in CP/M. + Each block is 4096 bytes. Even empty files need at least one block. """ - return max(1, (content_length + BLOCK_SIZE - 1) // BLOCK_SIZE) + if content_length == 0: + return 1 + return (content_length + BLOCK_SIZE - 1) // BLOCK_SIZE def find_free_blocks(image_file: str, drive_index: int, blocks_needed: int) -> List[int]: """Find consecutive free blocks for file allocation""" @@ -124,9 +126,12 @@ def write_file(image_file: str, drive_index: int, filename: str, user: int, cont """Write a file to the CP/M image, splitting across multiple extents if needed""" offset = get_drive_offset(drive_index) total_size = len(content) - bytes_per_extent = BLOCKS_PER_EXTENT * BLOCK_SIZE + bytes_per_extent = BLOCK_SIZE * BLOCKS_PER_EXTENT num_extents = (total_size + bytes_per_extent - 1) // bytes_per_extent + if num_extents > 1: + print(f"File {filename} requires {num_extents} extents") + name, ext = os.path.splitext(filename) name = name.ljust(8)[:8] ext = ext[1:].ljust(3)[:3] @@ -166,10 +171,19 @@ def write_file(image_file: str, drive_index: int, filename: str, user: int, cont f.write(b'\x00') # S1 # Calculate record count for this extent - if len(extent_content) == 0: - record_count = 0 + # CP/M records are 128 bytes each, and record count must fit in one byte (0-255) + if extent_num == num_extents - 1: # Last extent + if len(extent_content) == 0: + record_count = 0 + else: + # Calculate actual records in the last extent + record_count = (len(extent_content) + CPM_SECTOR_SIZE - 1) // CPM_SECTOR_SIZE + # Ensure record count doesn't exceed 128 (maximum records per extent) + record_count = min(record_count, 128) else: - record_count = min(128, (len(extent_content) + CPM_SECTOR_SIZE - 1) // CPM_SECTOR_SIZE) + # Full extent always has 128 records (16KB / 128 bytes = 128) + record_count = 128 + f.write(bytes([record_count])) # Write block allocation @@ -205,7 +219,7 @@ def extract_file(image_file: str, drive_index: int, filename: str, user: int, bl f.seek(block_offset) content += f.read(BLOCK_SIZE) - # Trim content based on record count + # For the last extent, trim based on record count if record_count > 0: return content[:record_count * CPM_SECTOR_SIZE] return content @@ -221,35 +235,47 @@ def copy_files_to_image(image_file: str, source_dir: str, drive_letter: str, dri write_file(image_file, drive_index, filename, user, content) def copy_files_from_image(image_file: str, target_dir: str): - """Modified to handle multiple extents per file""" + """Modified to properly handle multiple extents per file""" for drive_index, drive_letter in enumerate('ABCDEFGHIJKLMNOP'): - print(f"Extracting files from drive {drive_letter}...") + print(f"Processing drive {drive_letter}...") directory = read_directory(image_file, drive_index) - # Group entries by filename and user number + # Group files by name and user number files = {} - for filename, user, blocks, extent, record_count in directory: + for entry in directory: + filename, user, blocks, extent, record_count = entry key = (filename, user) if key not in files: files[key] = [] files[key].append((extent, blocks, record_count)) - # Process each file + # Process each unique file for (filename, user), extents in files.items(): + print(f" Extracting {filename} (user {user})") + # Sort extents by extent number extents.sort(key=lambda x: x[0]) - # Combine content from all extents + # Combine all extents into one file complete_content = b'' - for extent, blocks, record_count in extents: + for extent_num, (extent, blocks, record_count) in enumerate(extents): content = extract_file(image_file, drive_index, filename, user, blocks, extent, record_count) + + # Only trim the content for the last extent + if extent_num == len(extents) - 1 and record_count > 0: + content = content[:record_count * CPM_SECTOR_SIZE] complete_content += content - # Write the complete file + # Create user directory if it doesn't exist user_dir = os.path.join(target_dir, drive_letter, str(user)) os.makedirs(user_dir, exist_ok=True) - with open(os.path.join(user_dir, filename), 'wb') as f: + + # Write the complete file + output_path = os.path.join(user_dir, filename) + with open(output_path, 'wb') as f: f.write(complete_content) + + print(f" Wrote {len(complete_content)} bytes to {output_path}") print(f"Extracted {len(files)} files from drive {drive_letter}")