Merge branch 'feature/add-wl-support-fatfs' into 'master'

Add wear levelling support for fatfs partition generator

Closes IDF-4043 and IDF-2053

See merge request espressif/esp-idf!15798
This commit is contained in:
Roland Dobai
2021-12-01 14:21:39 +00:00
24 changed files with 649 additions and 175 deletions

View File

@@ -2,16 +2,14 @@
# SPDX-FileCopyrightText: 2021 Espressif Systems (Shanghai) CO LTD
# SPDX-License-Identifier: Apache-2.0
import argparse
import os
import uuid
from typing import Any, List, Optional
from construct import Const, Int8ul, Int16ul, Int32ul, PaddedString, Struct
from fatfsgen_utils.fat import FAT
from fatfsgen_utils.fatfs_state import FATFSState
from fatfsgen_utils.fs_object import Directory
from fatfsgen_utils.utils import pad_string
from fatfsgen_utils.utils import generate_4bytes_random, get_args_for_partition_generator, pad_string
class FATFS:
@@ -61,7 +59,6 @@ class FATFS:
hidden_sectors: int = 0,
long_names_enabled: bool = False,
entry_size: int = 32,
wl_sectors: int = 0,
num_heads: int = 0xff,
oem_name: str = 'MSDOS5.0',
sec_per_track: int = 0x3f,
@@ -84,7 +81,6 @@ class FATFS:
sec_per_track=sec_per_track,
long_names_enabled=long_names_enabled,
volume_label=volume_label,
wl_sectors=wl_sectors,
oem_name=oem_name)
binary_image = bytearray(
self.read_filesystem(binary_image_path) if binary_image_path else self.create_empty_fatfs())
@@ -119,7 +115,7 @@ class FATFS:
def create_empty_fatfs(self) -> Any:
sectors_count = self.state.size // self.state.sector_size
volume_uuid = uuid.uuid4().int & 0xFFFFFFFF
volume_uuid = generate_4bytes_random()
return (
FATFS.BOOT_SECTOR_HEADER.build(
dict(BS_OEMName=pad_string(self.state.oem_name, size=FATFS.MAX_OEM_NAME_SIZE),
@@ -195,22 +191,12 @@ class FATFS:
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='Create a FAT filesystem and populate it with directory content')
parser.add_argument('input_directory',
help='Path to the directory that will be encoded into fatfs image')
parser.add_argument('--output_file',
default='fatfs_image.img',
help='Filename of the generated fatfs image')
parser.add_argument('--partition_size',
default=1024 * 1024,
help='Size of the partition in bytes')
args = parser.parse_args()
args = get_args_for_partition_generator('Create a FAT filesystem and populate it with directory content')
input_dir = args.input_directory
try:
partition_size = eval(args.partition_size)
except ValueError:
partition_size = args.partition_size
fatfs = FATFS(size=partition_size)
partition_size = int(str(args.partition_size), 0)
sector_size_bytes = int(str(args.sector_size), 0)
fatfs = FATFS(size=partition_size, sector_size=sector_size_bytes)
fatfs.generate(input_dir)
fatfs.write_filesystem(args.output_file)

View File

@@ -29,5 +29,12 @@ class TooLongNameException(Exception):
pass
class WLNotInitialized(Exception):
"""
Exception is raised when the user tries to write fatfs not initialized with wear levelling
"""
pass
class FatalError(Exception):
pass

View File

@@ -28,12 +28,10 @@ class FATFSState:
num_heads: int,
hidden_sectors: int,
file_sys_type: str,
wl_sectors: int,
long_names_enabled: bool = False):
self._binary_image: bytearray = bytearray(b'')
self.fat_tables_cnt: int = fat_tables_cnt
self.oem_name: str = oem_name
self.wl_sectors_cnt: int = wl_sectors
self.file_sys_type: str = file_sys_type
self.sec_per_track: int = sec_per_track
self.hidden_sectors: int = hidden_sectors
@@ -70,7 +68,7 @@ class FATFSState:
@property
def non_data_sectors(self) -> int:
return self.reserved_sectors_cnt + self.sectors_per_fat_cnt + self.root_dir_sectors_cnt + self.wl_sectors_cnt
return self.reserved_sectors_cnt + self.sectors_per_fat_cnt + self.root_dir_sectors_cnt
@property
def data_region_start(self) -> int:

View File

@@ -1,23 +1,38 @@
# SPDX-FileCopyrightText: 2021 Espressif Systems (Shanghai) CO LTD
# SPDX-License-Identifier: Apache-2.0
import argparse
import binascii
import os
import typing
import uuid
from typing import List, Optional, Tuple
from construct import Int16ul
def crc32(input_values: List[int], crc: int) -> int:
"""
Name Polynomial Reversed? Init-value XOR-out
crc32 0x104C11DB7 True 4294967295 (UINT32_MAX) 0xFFFFFFFF
"""
return binascii.crc32(bytearray(input_values), crc)
def required_clusters_count(cluster_size: int, content: str) -> int:
# compute number of required clusters for file text
return (len(content) + cluster_size - 1) // cluster_size
def pad_string(content: str, size: typing.Optional[int] = None, pad: int = 0x20) -> str:
def generate_4bytes_random() -> int:
return uuid.uuid4().int & 0xFFFFFFFF
def pad_string(content: str, size: Optional[int] = None, pad: int = 0x20) -> str:
# cut string if longer and fill with pad character if shorter than size
return content.ljust(size or len(content), chr(pad))[:size]
def split_to_name_and_extension(full_name: str) -> typing.Tuple[str, str]:
def split_to_name_and_extension(full_name: str) -> Tuple[str, str]:
name, extension = os.path.splitext(full_name)
return name, extension.replace('.', '')
@@ -26,7 +41,7 @@ def is_valid_fatfs_name(string: str) -> bool:
return string == string.upper()
def split_by_half_byte_12_bit_little_endian(value: int) -> typing.Tuple[int, int, int]:
def split_by_half_byte_12_bit_little_endian(value: int) -> Tuple[int, int, int]:
value_as_bytes = Int16ul.build(value)
return value_as_bytes[0] & 0x0f, value_as_bytes[0] >> 4, value_as_bytes[1] & 0x0f
@@ -51,10 +66,30 @@ def clean_second_half_byte(bytes_array: bytearray, address: int) -> None:
bytes_array[address] &= 0x0f
def split_content_into_sectors(content: str, sector_size: int) -> typing.List[str]:
def split_content_into_sectors(content: str, sector_size: int) -> List[str]:
result = []
clusters_cnt = required_clusters_count(cluster_size=sector_size, content=content)
for i in range(clusters_cnt):
result.append(content[sector_size * i:(i + 1) * sector_size])
return result
def get_args_for_partition_generator(desc: str) -> argparse.Namespace:
parser = argparse.ArgumentParser(
description=desc)
parser.add_argument('input_directory',
help='Path to the directory that will be encoded into fatfs image')
parser.add_argument('--output_file',
default='fatfs_image.img',
help='Filename of the generated fatfs image')
parser.add_argument('--partition_size',
default=1024 * 1024,
help='Size of the partition in bytes')
parser.add_argument('--sector_size',
default=4096,
help='Size of the partition in bytes')
args = parser.parse_args()
if not os.path.isdir(args.input_directory):
raise NotADirectoryError(f'The target directory `{args.input_directory}` does not exist!')
return args

View File

@@ -3,16 +3,20 @@
# Create a fatfs image of the specified directory on the host during build and optionally
# have the created image flashed using `idf.py flash`
function(fatfs_create_partition_image partition base_dir)
set(options FLASH_IN_PROJECT)
set(options FLASH_IN_PROJECT WL_INIT)
cmake_parse_arguments(arg "${options}" "" "${multi}" "${ARGN}")
idf_build_get_property(idf_path IDF_PATH)
idf_build_get_property(python PYTHON)
set(fatfsgen_py ${python} ${idf_path}/components/fatfs/fatfsgen.py)
if(arg_WL_INIT)
set(fatfsgen_py ${python} ${idf_path}/components/fatfs/wl_fatfsgen.py)
else()
set(fatfsgen_py ${python} ${idf_path}/components/fatfs/fatfsgen.py)
endif()
get_filename_component(base_dir_full_path ${base_dir} ABSOLUTE)
partition_table_get_partition_info(size "--partition-name ${partition}" "size")
partition_table_get_partition_info(offset "--partition-name ${partition}" "offset")
@@ -38,7 +42,6 @@ function(fatfs_create_partition_image partition base_dir)
esptool_py_flash_to_partition(${partition}-flash "${partition}" "${image_file}")
add_dependencies(${partition}-flash fatfs_${partition}_bin)
if(arg_FLASH_IN_PROJECT)
esptool_py_flash_to_partition(flash "${partition}" "${image_file}")
add_dependencies(flash fatfs_${partition}_bin)
@@ -49,3 +52,24 @@ function(fatfs_create_partition_image partition base_dir)
fail_at_build_time(fatfs_${partition}_bin "${message}")
endif()
endfunction()
function(fatfs_create_rawflash_image partition base_dir)
set(options FLASH_IN_PROJECT)
cmake_parse_arguments(arg "${options}" "" "${multi}" "${ARGN}")
if(arg_FLASH_IN_PROJECT)
fatfs_create_partition_image(${partition} ${base_dir} FLASH_IN_PROJECT)
else()
fatfs_create_partition_image(${partition} ${base_dir})
endif()
endfunction()
function(fatfs_create_spiflash_image partition base_dir)
set(options FLASH_IN_PROJECT)
cmake_parse_arguments(arg "${options}" "" "${multi}" "${ARGN}")
if(arg_FLASH_IN_PROJECT)
fatfs_create_partition_image(${partition} ${base_dir} FLASH_IN_PROJECT WL_INIT)
else()
fatfs_create_partition_image(${partition} ${base_dir} WL_INIT)
endif()
endfunction()

View File

@@ -6,7 +6,8 @@ import os
import shutil
import sys
import unittest
from typing import Any, Dict, Union
from test_utils import CFG, generate_test_dir_1, generate_test_dir_2
sys.path.append(os.path.join(os.path.dirname(__file__), '..'))
import fatfsgen # noqa E402
@@ -15,62 +16,29 @@ from fatfsgen_utils.exceptions import LowerCaseException, NoFreeClusterException
class FatFSGen(unittest.TestCase):
CFG = dict(
sector_size=4096,
entry_size=32,
fat_start=0x1000,
data_start=0x7000,
root_start=0x2000,
output_file=os.path.join('output_data', 'tmp_file.img'),
test_dir=os.path.join('output_data', 'test'),
test_dir2=os.path.join('output_data', 'tst_str'),
) # type: Union[Dict[str, Any]]
def setUp(self) -> None:
os.makedirs('output_data')
self.generate_test_dir_1()
self.generate_test_dir_2()
generate_test_dir_1()
generate_test_dir_2()
def tearDown(self) -> None:
shutil.rmtree('output_data')
@staticmethod
def generate_test_dir_1() -> None:
os.makedirs(os.path.join(FatFSGen.CFG['test_dir'], 'test', 'test'))
with open(os.path.join(FatFSGen.CFG['test_dir'], 'test', 'test', 'lastfile'), 'w') as file:
file.write('deeptest\n')
with open(os.path.join(FatFSGen.CFG['test_dir'], 'test', 'testfil2'), 'w') as file:
file.write('thisistest\n')
with open(os.path.join(FatFSGen.CFG['test_dir'], 'testfile'), 'w') as file:
file.write('ahoj\n')
@staticmethod
def generate_test_dir_2() -> None:
os.makedirs(os.path.join(FatFSGen.CFG['test_dir2'], 'test', 'test'))
with open(os.path.join(FatFSGen.CFG['test_dir2'], 'test', 'test', 'lastfile.txt'), 'w') as file:
file.write('deeptest\n')
with open(os.path.join(FatFSGen.CFG['test_dir2'], 'test', 'testfil2'), 'w') as file:
file.write('thisistest\n')
with open(os.path.join(FatFSGen.CFG['test_dir2'], 'testfile'), 'w') as file:
file.write('ahoj\n')
def test_empty_file_sn_fat12(self) -> None:
fatfs = fatfsgen.FATFS()
fatfs.create_file('TESTFILE')
fatfs.write_filesystem(FatFSGen.CFG['output_file'])
fatfs.write_filesystem(CFG['output_file'])
file_system = fatfs.read_filesystem(CFG['output_file'])
with open(FatFSGen.CFG['output_file'], 'rb') as fs_file:
file_system = fs_file.read()
self.assertEqual(file_system[0x2000:0x200c], b'TESTFILE \x20') # check entry name and type
self.assertEqual(file_system[0x1000:0x1006], b'\xf8\xff\xff\xff\x0f\x00') # check fat
def test_directory_sn_fat12(self) -> None:
fatfs = fatfsgen.FATFS()
fatfs.create_directory('TESTFOLD')
fatfs.write_filesystem(FatFSGen.CFG['output_file'])
fatfs.write_filesystem(CFG['output_file'])
file_system = fatfs.read_filesystem(CFG['output_file'])
with open(FatFSGen.CFG['output_file'], 'rb') as fs_file:
file_system = fs_file.read()
self.assertEqual(file_system[0x2000:0x200c], b'TESTFOLD \x10') # check entry name and type
self.assertEqual(file_system[0x1000:0x1006], b'\xf8\xff\xff\xff\x0f\x00') # check fat
self.assertEqual(file_system[0x6000:0x600c], b'. \x10') # reference to itself
@@ -79,9 +47,8 @@ class FatFSGen(unittest.TestCase):
def test_empty_file_with_extension_sn_fat12(self) -> None:
fatfs = fatfsgen.FATFS()
fatfs.create_file('TESTF', extension='TXT')
fatfs.write_filesystem(FatFSGen.CFG['output_file'])
with open(FatFSGen.CFG['output_file'], 'rb') as fs_file:
file_system = fs_file.read()
fatfs.write_filesystem(CFG['output_file'])
file_system = fatfs.read_filesystem(CFG['output_file'])
self.assertEqual(file_system[0x2000:0x200c], b'TESTF TXT\x20') # check entry name and type
self.assertEqual(file_system[0x1000:0x1006], b'\xf8\xff\xff\xff\x0f\x00') # check fat
@@ -90,9 +57,8 @@ class FatFSGen(unittest.TestCase):
fatfs = fatfsgen.FATFS()
fatfs.create_file('WRITEF', extension='TXT')
fatfs.write_content(path_from_root=['WRITEF.TXT'], content='testcontent')
fatfs.write_filesystem(FatFSGen.CFG['output_file'])
with open(FatFSGen.CFG['output_file'], 'rb') as fs_file:
file_system = fs_file.read()
fatfs.write_filesystem(CFG['output_file'])
file_system = fatfs.read_filesystem(CFG['output_file'])
self.assertEqual(file_system[0x2000:0x200c], b'WRITEF TXT\x20') # check entry name and type
self.assertEqual(file_system[0x201a:0x2020], b'\x02\x00\x0b\x00\x00\x00') # check size and cluster ref
@@ -104,9 +70,8 @@ class FatFSGen(unittest.TestCase):
fatfs.create_directory('TESTFOLD')
fatfs.create_file('WRITEF', extension='TXT', path_from_root=['TESTFOLD'])
fatfs.write_content(path_from_root=['TESTFOLD', 'WRITEF.TXT'], content='testcontent')
fatfs.write_filesystem(FatFSGen.CFG['output_file'])
with open(FatFSGen.CFG['output_file'], 'rb') as fs_file:
file_system = fs_file.read()
fatfs.write_filesystem(CFG['output_file'])
file_system = fatfs.read_filesystem(CFG['output_file'])
self.assertEqual(file_system[0x2000:0x200c], b'TESTFOLD \x10')
self.assertEqual(
@@ -126,9 +91,8 @@ class FatFSGen(unittest.TestCase):
fatfs.fat.clusters[2].set_in_fat(1000)
fatfs.fat.clusters[3].set_in_fat(4)
fatfs.fat.clusters[4].set_in_fat(5)
fatfs.write_filesystem(FatFSGen.CFG['output_file'])
with open(FatFSGen.CFG['output_file'], 'rb') as fs_file:
file_system = fs_file.read()
fatfs.write_filesystem(CFG['output_file'])
file_system = fatfs.read_filesystem(CFG['output_file'])
self.assertEqual(
file_system[0x1000:0x1010],
b'\xf8\xff\xff\xe8\x43\x00\x05\xf0\xff\xff\x0f\x00\x00\x00\x00\x00')
@@ -136,34 +100,31 @@ class FatFSGen(unittest.TestCase):
def test_full_sector_file(self) -> None:
fatfs = fatfsgen.FATFS()
fatfs.create_file('WRITEF', extension='TXT')
fatfs.write_content(path_from_root=['WRITEF.TXT'], content=FatFSGen.CFG['sector_size'] * 'a')
fatfs.write_filesystem(FatFSGen.CFG['output_file'])
with open(FatFSGen.CFG['output_file'], 'rb') as fs_file:
file_system = fs_file.read()
fatfs.write_content(path_from_root=['WRITEF.TXT'], content=CFG['sector_size'] * 'a')
fatfs.write_filesystem(CFG['output_file'])
file_system = fatfs.read_filesystem(CFG['output_file'])
self.assertEqual(file_system[0x1000: 0x100e], b'\xf8\xff\xff\xff\x0f\x00\x00\x00\x00\x00\x00\x00\x00\x00')
self.assertEqual(file_system[0x6000: 0x7000], FatFSGen.CFG['sector_size'] * b'a')
self.assertEqual(file_system[0x6000: 0x7000], CFG['sector_size'] * b'a')
def test_file_chaining(self) -> None:
fatfs = fatfsgen.FATFS()
fatfs.create_file('WRITEF', extension='TXT')
fatfs.write_content(path_from_root=['WRITEF.TXT'], content=FatFSGen.CFG['sector_size'] * 'a' + 'a')
fatfs.write_filesystem(FatFSGen.CFG['output_file'])
with open(FatFSGen.CFG['output_file'], 'rb') as fs_file:
file_system = fs_file.read()
fatfs.write_content(path_from_root=['WRITEF.TXT'], content=CFG['sector_size'] * 'a' + 'a')
fatfs.write_filesystem(CFG['output_file'])
file_system = fatfs.read_filesystem(CFG['output_file'])
self.assertEqual(file_system[0x1000: 0x100e], b'\xf8\xff\xff\x03\xf0\xff\x00\x00\x00\x00\x00\x00\x00\x00')
self.assertEqual(file_system[0x7000: 0x8000], b'a' + (FatFSGen.CFG['sector_size'] - 1) * b'\x00')
self.assertEqual(file_system[0x7000: 0x8000], b'a' + (CFG['sector_size'] - 1) * b'\x00')
def test_full_sector_folder(self) -> None:
fatfs = fatfsgen.FATFS()
fatfs.create_directory('TESTFOLD')
for i in range(FatFSGen.CFG['sector_size'] // FatFSGen.CFG['entry_size']):
for i in range(CFG['sector_size'] // CFG['entry_size']):
fatfs.create_file(f'A{str(i).upper()}', path_from_root=['TESTFOLD'])
fatfs.write_content(path_from_root=['TESTFOLD', 'A0'], content='first')
fatfs.write_content(path_from_root=['TESTFOLD', 'A126'], content='later')
fatfs.write_filesystem(FatFSGen.CFG['output_file'])
with open(FatFSGen.CFG['output_file'], 'rb') as fs_file:
file_system = fs_file.read()
fatfs.write_filesystem(CFG['output_file'])
file_system = fatfs.read_filesystem(CFG['output_file'])
self.assertEqual(file_system[0x1000: 0x10d0],
b'\xf8\xff\xff\x82\xf0\xff' + 192 * b'\xff' + 10 * b'\x00')
self.assertEqual(file_system[0x85000:0x85005], b'later')
@@ -187,7 +148,7 @@ class FatFSGen(unittest.TestCase):
def create_too_many_files() -> None:
fatfs = fatfsgen.FATFS()
fatfs.create_directory('TESTFOLD')
for i in range(2 * FatFSGen.CFG['sector_size'] // FatFSGen.CFG['entry_size']):
for i in range(2 * CFG['sector_size'] // CFG['entry_size']):
fatfs.create_file(f'A{str(i).upper()}', path_from_root=['TESTFOLD'])
def test_too_many_files(self) -> None:
@@ -197,13 +158,12 @@ class FatFSGen(unittest.TestCase):
fatfs = fatfsgen.FATFS(size=2 * 1024 * 1024)
fatfs.create_directory('TESTFOLD')
for i in range(2 * FatFSGen.CFG['sector_size'] // FatFSGen.CFG['entry_size']):
for i in range(2 * CFG['sector_size'] // CFG['entry_size']):
fatfs.create_file(f'A{str(i).upper()}', path_from_root=['TESTFOLD'])
fatfs.write_content(path_from_root=['TESTFOLD', 'A253'], content='later')
fatfs.write_content(path_from_root=['TESTFOLD', 'A255'], content='last')
fatfs.write_filesystem(FatFSGen.CFG['output_file'])
with open(FatFSGen.CFG['output_file'], 'rb') as fs_file:
file_system = fs_file.read()
fatfs.write_filesystem(CFG['output_file'])
file_system = fatfs.read_filesystem(CFG['output_file'])
self.assertEqual(file_system[0x105000:0x105010], b'later\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00')
self.assertEqual(file_system[0x108000:0x108010], b'last\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00')
@@ -233,9 +193,8 @@ class FatFSGen(unittest.TestCase):
fatfs.create_directory('TESTFOLO', path_from_root=['TESTFOLD', 'TESTFOLL'])
fatfs.create_file('WRITEF', extension='TXT', path_from_root=['TESTFOLD', 'TESTFOLL', 'TESTFOLO'])
fatfs.write_content(path_from_root=['TESTFOLD', 'TESTFOLL', 'TESTFOLO', 'WRITEF.TXT'], content='later')
fatfs.write_filesystem(FatFSGen.CFG['output_file'])
with open(FatFSGen.CFG['output_file'], 'rb') as fs_file:
file_system = fs_file.read()
fatfs.write_filesystem(CFG['output_file'])
file_system = fatfs.read_filesystem(CFG['output_file'])
self.assertEqual(file_system[0x9000:0x9010], b'later\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00')
@@ -246,9 +205,8 @@ class FatFSGen(unittest.TestCase):
fatfs.create_directory('TESTFOLD', path_from_root=['TESTFOLD', 'TESTFOLD'])
fatfs.create_file('WRITEF', extension='TXT', path_from_root=['TESTFOLD', 'TESTFOLD', 'TESTFOLD'])
fatfs.write_content(path_from_root=['TESTFOLD', 'TESTFOLD', 'TESTFOLD', 'WRITEF.TXT'], content='later')
fatfs.write_filesystem(FatFSGen.CFG['output_file'])
with open(FatFSGen.CFG['output_file'], 'rb') as fs_file:
file_system = fs_file.read()
fatfs.write_filesystem(CFG['output_file'])
file_system = fatfs.read_filesystem(CFG['output_file'])
self.assertEqual(file_system[0x2000:0x2010], b'TESTFOLD \x10\x00\x00\x01\x00')
self.assertEqual(file_system[0x2010:0x2020], b'!\x00\x00\x00\x00\x00\x01\x00\x01\x00\x02\x00\x00\x00\x00\x00')
@@ -261,10 +219,9 @@ class FatFSGen(unittest.TestCase):
def test_e2e_deep_folder_into_image(self) -> None:
fatfs = fatfsgen.FATFS()
fatfs.generate(FatFSGen.CFG['test_dir'])
fatfs.write_filesystem(FatFSGen.CFG['output_file'])
with open(FatFSGen.CFG['output_file'], 'rb') as fs_file:
file_system = fs_file.read()
fatfs.generate(CFG['test_dir'])
fatfs.write_filesystem(CFG['output_file'])
file_system = fatfs.read_filesystem(CFG['output_file'])
self.assertEqual(file_system[0x6060:0x6070], b'TESTFIL2 \x00\x00\x01\x00')
self.assertEqual(file_system[0x6070:0x6080], b'!\x00\x00\x00\x00\x00\x01\x00\x01\x00\x05\x00\x0b\x00\x00\x00')
self.assertEqual(file_system[0x7040:0x7050], b'LASTFILE \x00\x00\x01\x00')
@@ -274,9 +231,9 @@ class FatFSGen(unittest.TestCase):
def test_e2e_deep_folder_into_image_ext(self) -> None:
fatfs = fatfsgen.FATFS()
fatfs.generate(FatFSGen.CFG['test_dir2'])
fatfs.write_filesystem(FatFSGen.CFG['output_file'])
file_system = fatfs.read_filesystem(FatFSGen.CFG['output_file'])
fatfs.generate(CFG['test_dir2'])
fatfs.write_filesystem(CFG['output_file'])
file_system = fatfs.read_filesystem(CFG['output_file'])
self.assertEqual(file_system[0x2020:0x2030], b'TESTFILE \x00\x00\x01\x00')
self.assertEqual(file_system[0x6060:0x6070], b'TESTFIL2 \x00\x00\x01\x00')

View File

@@ -0,0 +1,35 @@
# SPDX-FileCopyrightText: 2021 Espressif Systems (Shanghai) CO LTD
# SPDX-License-Identifier: Apache-2.0
import os
from typing import Any, Dict, Union
CFG = dict(
sector_size=4096,
entry_size=32,
fat_start=0x1000,
data_start=0x7000,
root_start=0x2000,
output_file=os.path.join('output_data', 'tmp_file.img'),
test_dir=os.path.join('output_data', 'test'),
test_dir2=os.path.join('output_data', 'tst_str'),
) # type: Union[Dict[str, Any]]
def generate_test_dir_1() -> None:
os.makedirs(os.path.join(CFG['test_dir'], 'test', 'test'))
with open(os.path.join(CFG['test_dir'], 'test', 'test', 'lastfile'), 'w') as file:
file.write('deeptest\n')
with open(os.path.join(CFG['test_dir'], 'test', 'testfil2'), 'w') as file:
file.write('thisistest\n')
with open(os.path.join(CFG['test_dir'], 'testfile'), 'w') as file:
file.write('ahoj\n')
def generate_test_dir_2() -> None:
os.makedirs(os.path.join(CFG['test_dir2'], 'test', 'test'))
with open(os.path.join(CFG['test_dir2'], 'test', 'test', 'lastfile.txt'), 'w') as file:
file.write('deeptest\n')
with open(os.path.join(CFG['test_dir2'], 'test', 'testfil2'), 'w') as file:
file.write('thisistest\n')
with open(os.path.join(CFG['test_dir2'], 'testfile'), 'w') as file:
file.write('ahoj\n')

View File

@@ -0,0 +1,144 @@
#!/usr/bin/env python
# SPDX-FileCopyrightText: 2021 Espressif Systems (Shanghai) CO LTD
# SPDX-License-Identifier: Apache-2.0
import os
import shutil
import sys
import unittest
from test_utils import CFG, generate_test_dir_1, generate_test_dir_2
sys.path.append(os.path.join(os.path.dirname(__file__), '..'))
import wl_fatfsgen # noqa E402 # pylint: disable=C0413
from fatfsgen_utils.exceptions import WLNotInitialized # noqa E402 # pylint: disable=C0413
class WLFatFSGen(unittest.TestCase):
def setUp(self) -> None:
os.makedirs('output_data')
generate_test_dir_1()
generate_test_dir_2()
def tearDown(self) -> None:
shutil.rmtree('output_data')
def test_empty_file_sn_fat12(self) -> None:
fatfs = wl_fatfsgen.WLFATFS()
fatfs.wl_create_file('TESTFILE')
fatfs.init_wl()
fatfs.wl_write_filesystem(CFG['output_file'])
with open(CFG['output_file'], 'rb') as fs_file:
file_system = fs_file.read()
self.assertEqual(file_system[0x3000:0x300c], b'TESTFILE \x20') # check entry name and type
self.assertEqual(file_system[0x2000:0x2006], b'\xf8\xff\xff\xff\x0f\x00') # check fat
def test_directory_sn_fat12(self) -> None:
fatfs = wl_fatfsgen.WLFATFS(device_id=3750448905)
fatfs.wl_create_directory('TESTFOLD')
fatfs.init_wl()
fatfs.wl_write_filesystem(CFG['output_file'])
with open(CFG['output_file'], 'rb') as fs_file:
file_system = fs_file.read()
# boot sector
self.assertEqual(file_system[0x1000:0x1010], b'\xeb\xfe\x90MSDOS5.0\x00\x10\x01\x01\x00')
self.assertEqual(file_system[0x1010:0x1020], b'\x01\x00\x02\xfa\x00\xf8\x01\x00?\x00\xff\x00\x00\x00\x00\x00')
self.assertEqual(file_system[0x102b:0x1034], b'Espressif')
self.assertEqual(file_system[0x3000:0x300c], b'TESTFOLD \x10') # check entry name and type
self.assertEqual(file_system[0x2000:0x2006], b'\xf8\xff\xff\xff\x0f\x00') # check fat
self.assertEqual(file_system[0x7000:0x700c], b'. \x10') # reference to itself
self.assertEqual(file_system[0x7020:0x702c], b'.. \x10') # reference to parent
# check state1
self.assertEqual(file_system[0xfb000:0xfb00f], b'\x00\x00\x00\x00\xfb\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00')
self.assertEqual(file_system[0xfb010:0xfb020], b'\x10\x00\x00\x00\x00\x10\x00\x00\x02\x00\x00\x00\tO\x8b\xdf')
self.assertEqual(file_system[0xfb020:0xfb02f], b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00')
self.assertEqual(file_system[0xfb031:0xfb040], b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xd4\xa1\x94i')
# check state2
self.assertEqual(file_system[0xfd000:0xfd00f], b'\x00\x00\x00\x00\xfb\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00')
self.assertEqual(file_system[0xfd010:0xfd020], b'\x10\x00\x00\x00\x00\x10\x00\x00\x02\x00\x00\x00\tO\x8b\xdf')
self.assertEqual(file_system[0xfd020:0xfd02f], b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00')
self.assertEqual(file_system[0xfd031:0xfd040], b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xd4\xa1\x94i')
# check config
self.assertEqual(file_system[0xff001:0xff010], b'\x00\x00\x00\x00\x00\x10\x00\x00\x10\x00\x00\x00\x10\x00\x00')
self.assertEqual(file_system[0xff010:0xff01f], b'\x10\x00\x00\x00\x10\x00\x00\x00\x02\x00\x00\x00 \x00\x00')
self.assertEqual(file_system[0xff020:0xff030], b'\xe0b\xb5O\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00')
self.assertEqual(file_system[0xff030:0xff03f], b'\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff')
def test_directory_sn_fat122mb(self) -> None:
fatfs = wl_fatfsgen.WLFATFS(device_id=3750448905, size=2 * 1024 * 1024)
fatfs.wl_create_directory('TESTFOLD')
fatfs.init_wl()
fatfs.wl_write_filesystem(CFG['output_file'])
with open(CFG['output_file'], 'rb') as fs_file:
file_system = fs_file.read()
# check state1
self.assertEqual(file_system[0x1f9000:0x1f900e], b'\x00\x00\x00\x00\xf9\x01\x00\x00\x00\x00\x00\x00\x00\x00')
self.assertEqual(file_system[0x1f9010:0x1f9020],
b'\x10\x00\x00\x00\x00\x10\x00\x00\x02\x00\x00\x00\tO\x8b\xdf')
self.assertEqual(file_system[0x1f9020:0x1f902e], b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00')
self.assertEqual(file_system[0x1f9030:0x1f9040], b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00j5\xbdp')
# check state2
self.assertEqual(file_system[0x1fc000:0x1fc00e], b'\x00\x00\x00\x00\xf9\x01\x00\x00\x00\x00\x00\x00\x00\x00')
self.assertEqual(file_system[0x1fc010:0x1fc020],
b'\x10\x00\x00\x00\x00\x10\x00\x00\x02\x00\x00\x00\tO\x8b\xdf')
self.assertEqual(file_system[0x1fc020:0x1fc02e], b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00')
self.assertEqual(file_system[0x1fc030:0x1fc040], b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00j5\xbdp')
# check config
self.assertEqual(file_system[0x1ff000:0x1ff00f], b'\x00\x00\x00\x00\x00\x00 \x00\x00\x10\x00\x00\x00\x10\x00')
self.assertEqual(file_system[0x1ff010:0x1ff01f], b'\x10\x00\x00\x00\x10\x00\x00\x00\x02\x00\x00\x00 \x00\x00')
self.assertEqual(file_system[0x1ff020:0x1ff030], b')\x892j\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00')
self.assertEqual(file_system[0x1ff030:0x1ff03e], b'\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff')
def test_write_not_initialized_wlfatfs(self) -> None:
fatfs = wl_fatfsgen.WLFATFS()
fatfs.wl_create_directory('TESTFOLD')
self.assertRaises(WLNotInitialized, fatfs.wl_write_filesystem, CFG['output_file'])
def test_wrong_sector_size(self) -> None:
self.assertRaises(NotImplementedError, wl_fatfsgen.WLFATFS, sector_size=1024)
def test_e2e_deep_folder_into_image_ext(self) -> None:
fatfs = wl_fatfsgen.WLFATFS()
fatfs.wl_generate(CFG['test_dir2'])
fatfs.init_wl()
fatfs.wl_write_filesystem(CFG['output_file'])
with open(CFG['output_file'], 'rb') as fs_file:
file_system = bytearray(fs_file.read())
self.assertEqual(file_system[0x3020:0x3030], b'TESTFILE \x00\x00\x01\x00')
self.assertEqual(file_system[0x7060:0x7070], b'TESTFIL2 \x00\x00\x01\x00')
self.assertEqual(file_system[0x8000:0x8010], b'. \x10\x00\x00\x01\x00')
self.assertEqual(file_system[0x8040:0x8050], b'LASTFILETXT \x00\x00\x01\x00')
self.assertEqual(file_system[0x9000:0x9010], b'deeptest\n\x00\x00\x00\x00\x00\x00\x00')
self.assertEqual(file_system[0xa000:0xa010], b'thisistest\n\x00\x00\x00\x00\x00')
self.assertEqual(file_system[0xb000:0xb010], b'ahoj\n\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00')
self.assertEqual(file_system[0xc000:0xc009], b'\xff\xff\xff\xff\xff\xff\xff\xff\xff')
def test_e2e_deep_folder_into_image(self) -> None:
fatfs = wl_fatfsgen.WLFATFS()
fatfs.wl_generate(CFG['test_dir'])
fatfs.init_wl()
fatfs.wl_write_filesystem(CFG['output_file'])
with open(CFG['output_file'], 'rb') as fs_file:
file_system = bytearray(fs_file.read())
self.assertEqual(file_system[0x7060:0x7070], b'TESTFIL2 \x00\x00\x01\x00')
self.assertEqual(file_system[0x7070:0x7080], b'!\x00\x00\x00\x00\x00\x01\x00\x01\x00\x05\x00\x0b\x00\x00\x00')
self.assertEqual(file_system[0x8040:0x8050], b'LASTFILE \x00\x00\x01\x00')
self.assertEqual(file_system[0x9000:0x9010], b'deeptest\n\x00\x00\x00\x00\x00\x00\x00')
self.assertEqual(file_system[0xa000:0xa010], b'thisistest\n\x00\x00\x00\x00\x00')
self.assertEqual(file_system[0xb000:0xb010], b'ahoj\n\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00')
if __name__ == '__main__':
unittest.main()

198
components/fatfs/wl_fatfsgen.py Executable file
View File

@@ -0,0 +1,198 @@
#!/usr/bin/env python
# SPDX-FileCopyrightText: 2021 Espressif Systems (Shanghai) CO LTD
# SPDX-License-Identifier: Apache-2.0
from typing import List, Optional
from construct import Const, Int32ul, Struct
from fatfsgen import FATFS
from fatfsgen_utils.exceptions import WLNotInitialized
from fatfsgen_utils.utils import crc32, generate_4bytes_random, get_args_for_partition_generator
class WLFATFS:
# pylint: disable=too-many-instance-attributes
CFG_SECTORS_COUNT = 1
DUMMY_SECTORS_COUNT = 1
WL_CONFIG_HEADER_SIZE = 48
WL_STATE_RECORD_SIZE = 16
WL_STATE_HEADER_SIZE = 64
WL_STATE_COPY_COUNT = 2
UINT32_MAX = 4294967295
WL_SECTOR_SIZE = 0x1000
WL_STATE_T_DATA = Struct(
'pos' / Int32ul,
'max_pos' / Int32ul,
'move_count' / Int32ul,
'access_count' / Int32ul,
'max_count' / Int32ul,
'block_size' / Int32ul,
'version' / Int32ul,
'device_id' / Int32ul,
'reserved' / Const(28 * b'\x00')
)
WL_CONFIG_T_DATA = Struct(
'start_addr' / Int32ul,
'full_mem_size' / Int32ul,
'page_size' / Int32ul,
'sector_size' / Int32ul,
'updaterate' / Int32ul,
'wr_size' / Int32ul,
'version' / Int32ul,
'temp_buff_size' / Int32ul
)
WL_CONFIG_T_HEADER_SIZE = 48
def __init__(self,
size: int = 1024 * 1024,
reserved_sectors_cnt: int = 1,
fat_tables_cnt: int = 1,
sectors_per_cluster: int = 1,
sector_size: int = 0x1000,
sectors_per_fat: int = 1,
root_dir_sectors_cnt: int = 4,
hidden_sectors: int = 0,
long_names_enabled: bool = False,
entry_size: int = 32,
num_heads: int = 0xff,
oem_name: str = 'MSDOS5.0',
sec_per_track: int = 0x3f,
volume_label: str = 'Espressif',
file_sys_type: str = 'FAT',
version: int = 2,
temp_buff_size: int = 32,
updaterate: int = 16,
device_id: int = None,
media_type: int = 0xf8) -> None:
if sector_size != WLFATFS.WL_SECTOR_SIZE:
raise NotImplementedError(f'The only supported sector size is currently {WLFATFS.WL_SECTOR_SIZE}')
self._initialized = False
self.sector_size = sector_size
self._version = version
self._temp_buff_size = temp_buff_size
self._device_id = device_id
self._updaterate = updaterate
self.partition_size = size
self.total_sectors = self.partition_size // self.sector_size
self.wl_state_size = WLFATFS.WL_STATE_HEADER_SIZE + WLFATFS.WL_STATE_RECORD_SIZE * self.total_sectors
# determine the number of required sectors (roundup to sector size)
self.wl_state_sectors = (self.wl_state_size + self.sector_size - 1) // self.sector_size
self.boot_sector_start = self.sector_size # shift by one "dummy" sector
self.fat_table_start = self.boot_sector_start + reserved_sectors_cnt * self.sector_size
wl_sectors = WLFATFS.DUMMY_SECTORS_COUNT + WLFATFS.CFG_SECTORS_COUNT + self.wl_state_sectors * 2
self.plain_fat_sectors = self.total_sectors - wl_sectors
self.plain_fatfs = FATFS(
size=self.plain_fat_sectors * self.sector_size,
reserved_sectors_cnt=reserved_sectors_cnt,
fat_tables_cnt=fat_tables_cnt,
sectors_per_cluster=sectors_per_cluster,
sector_size=sector_size,
sectors_per_fat=sectors_per_fat,
root_dir_sectors_cnt=root_dir_sectors_cnt,
hidden_sectors=hidden_sectors,
long_names_enabled=long_names_enabled,
entry_size=entry_size,
num_heads=num_heads,
oem_name=oem_name,
sec_per_track=sec_per_track,
volume_label=volume_label,
file_sys_type=file_sys_type,
media_type=media_type
)
self.fatfs_binary_image = self.plain_fatfs.state.binary_image
def init_wl(self) -> None:
self.fatfs_binary_image = self.plain_fatfs.state.binary_image
self._add_dummy_sector()
# config must be added after state, do not change the order of these two calls!
self._add_state_sectors()
self._add_config_sector()
self._initialized = True
def _add_dummy_sector(self) -> None:
self.fatfs_binary_image = self.sector_size * b'\xff' + self.fatfs_binary_image
def _add_config_sector(self) -> None:
wl_config_data = WLFATFS.WL_CONFIG_T_DATA.build(
dict(
start_addr=0,
full_mem_size=self.partition_size,
page_size=self.sector_size,
sector_size=self.sector_size,
updaterate=self._updaterate,
wr_size=16,
version=self._version,
temp_buff_size=self._temp_buff_size
)
)
crc = crc32(list(wl_config_data), WLFATFS.UINT32_MAX)
wl_config_crc = Int32ul.build(crc)
# adding three 4 byte zeros to align the structure
wl_config = wl_config_data + wl_config_crc + Int32ul.build(0) + Int32ul.build(0) + Int32ul.build(0)
self.fatfs_binary_image += (wl_config + (self.sector_size - WLFATFS.WL_CONFIG_HEADER_SIZE) * b'\xff')
def _add_state_sectors(self) -> None:
wl_state_data = WLFATFS.WL_STATE_T_DATA.build(
dict(
pos=0,
max_pos=self.plain_fat_sectors + WLFATFS.DUMMY_SECTORS_COUNT,
move_count=0,
access_count=0,
max_count=self._updaterate,
block_size=self.sector_size,
version=self._version,
device_id=self._device_id or generate_4bytes_random(),
)
)
crc = crc32(list(wl_state_data), WLFATFS.UINT32_MAX)
wl_state_crc = Int32ul.build(crc)
wl_state = wl_state_data + wl_state_crc
self.fatfs_binary_image += WLFATFS.WL_STATE_COPY_COUNT * (
(wl_state + (self.sector_size - WLFATFS.WL_STATE_HEADER_SIZE) * b'\xff') + (
self.wl_state_sectors - 1) * self.sector_size * b'\xff')
def wl_write_filesystem(self, output_path: str) -> None:
if not self._initialized:
raise WLNotInitialized('FATFS is not initialized with WL. First call method WLFATFS.init_wl!')
with open(output_path, 'wb') as output:
output.write(bytearray(self.fatfs_binary_image))
def wl_generate(self, input_directory: str) -> None:
"""
Normalize path to folder and recursively encode folder to binary image
"""
self.plain_fatfs.generate(input_directory=input_directory)
def wl_create_file(self, name: str, extension: str = '', path_from_root: Optional[List[str]] = None) -> None:
self.plain_fatfs.create_file(name, extension, path_from_root)
def wl_create_directory(self, name: str, path_from_root: Optional[List[str]] = None) -> None:
self.plain_fatfs.create_directory(name, path_from_root)
def wl_write_content(self, path_from_root: List[str], content: str) -> None:
self.plain_fatfs.write_content(path_from_root, content)
if __name__ == '__main__':
desc = 'Create a FAT filesystem with support for wear levelling and populate it with directory content'
args = get_args_for_partition_generator(desc)
input_dir = args.input_directory
partition_size = int(str(args.partition_size), 0)
sector_size_bytes = int(str(args.sector_size), 0)
wl_fatfs = WLFATFS(size=partition_size, sector_size=sector_size_bytes)
wl_fatfs.wl_generate(input_dir)
wl_fatfs.init_wl()
wl_fatfs.wl_write_filesystem(args.output_file)

View File

@@ -1,16 +1,8 @@
// Copyright 2015-2017 Espressif Systems (Shanghai) PTE LTD
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
/*
* SPDX-FileCopyrightText: 2015-2021 Espressif Systems (Shanghai) CO LTD
*
* SPDX-License-Identifier: Apache-2.0
*/
#ifndef _WL_Config_H_
#define _WL_Config_H_
@@ -36,7 +28,7 @@ typedef struct ALIGNED_(16) WL_Config_s { /*!< Size of wl_config_t structure sho
uint32_t sector_size; /*!< size of flash memory sector that will be erased and stored at once (erase)*/
uint32_t updaterate; /*!< Amount of accesses before block will be moved*/
uint32_t wr_size; /*!< Minimum amount of bytes per one block at write operation: 1...*/
uint32_t version; /*!< A version of current implementatioon. To erase and reallocate complete memory this ID must be different from id before.*/
uint32_t version; /*!< A version of current implementation. To erase and reallocate complete memory this ID must be different from id before.*/
size_t temp_buff_size; /*!< Size of temporary allocated buffer to copy from one flash area to another. The best way, if this value will be equal to sector size.*/
uint32_t crc; /*!< CRC for this config*/
} wl_config_t;

View File

@@ -1,16 +1,8 @@
// Copyright 2015-2017 Espressif Systems (Shanghai) PTE LTD
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
/*
* SPDX-FileCopyrightText: 2015-2021 Espressif Systems (Shanghai) CO LTD
*
* SPDX-License-Identifier: Apache-2.0
*/
#ifndef _WL_State_H_
#define _WL_State_H_
#include "esp_err.h"
@@ -35,7 +27,7 @@ public:
uint32_t access_count; /*!< current access count*/
uint32_t max_count; /*!< max access count when block will be moved*/
uint32_t block_size; /*!< size of move block*/
uint32_t version; /*!< state id used to identify the version of current libary implementaion*/
uint32_t version; /*!< state id used to identify the version of current library implementation*/
uint32_t device_id; /*!< ID of current WL instance*/
uint32_t reserved[7]; /*!< Reserved space for future use*/
uint32_t crc; /*!< CRC of structure*/