mirror of
https://github.com/MiSTer-devel/Distribution_MiSTer.git
synced 2026-04-19 03:04:16 +00:00
1227 lines
45 KiB
Python
Executable File
1227 lines
45 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
# Copyright (c) 2022-2025 José Manuel Barroso Galindo <theypsilon@gmail.com>
|
|
|
|
import subprocess
|
|
import sys
|
|
import time
|
|
from typing import Any, Dict, Generator, Iterator, List, Optional, Set, Tuple
|
|
from pathlib import Path
|
|
import xml.etree.ElementTree as ET
|
|
import io
|
|
import re
|
|
import os
|
|
import json
|
|
import hashlib
|
|
import shlex
|
|
import tempfile
|
|
import csv
|
|
from urllib.parse import urlparse
|
|
from argparse import ArgumentParser
|
|
from zipfile import ZipFile, ZIP_DEFLATED
|
|
from dataclasses import dataclass
|
|
|
|
def main() -> None:
|
|
start = time.time()
|
|
|
|
parser = ArgumentParser()
|
|
subparsers = parser.add_subparsers(dest='command', required=True)
|
|
subparsers.add_parser('build').add_argument('source_dir', default='delme', help="Folder with the content that will be in the Database")
|
|
compare_parser = subparsers.add_parser('compare')
|
|
compare_parser.add_argument('left_db', help="Address pointing to Database")
|
|
compare_parser.add_argument('right_db', help="Address pointing to another Database")
|
|
args = parser.parse_args()
|
|
|
|
if args.command == 'build':
|
|
build_database(args.source_dir)
|
|
elif args.command == 'compare':
|
|
compare_databases(args.left_db, args.right_db)
|
|
else:
|
|
raise ValueError(args.command)
|
|
|
|
print()
|
|
print("Time:")
|
|
end = time.time()
|
|
print(end - start)
|
|
print()
|
|
|
|
# Entrypoints for the different Use Cases:
|
|
|
|
def build_database(source_dir: str):
|
|
print('Building database...')
|
|
print()
|
|
vars = BuildVars()
|
|
print('BuildVars:', json.dumps(vars.__dict__, indent=True))
|
|
if vars.db_id == '':
|
|
raise ValueError(f'Variable "DB_ID" is missing!')
|
|
|
|
set_source_dir(source_dir)
|
|
|
|
finder = Finder('.')
|
|
finder.ignore('./.git')
|
|
finder.ignore('./.github')
|
|
for ignore_entry in vars.finder_ignore.split():
|
|
finder.ignore(ignore_entry)
|
|
internal_files = finder.find_all()
|
|
external_files = ExternalFilesReader(vars.external_files).read_external_files()
|
|
|
|
tags = Tags(try_read_json(vars.download_metadata_json), vars.broken_mras_ignore)
|
|
tags.init_aliases(initial_filter_aliases)
|
|
|
|
builder = DatabaseBuilder(tags)
|
|
for file in internal_files:
|
|
builder.add_file(file)
|
|
for file, description, filter_terms in external_files:
|
|
builder.add_external_file(file, description, filter_terms)
|
|
for file in internal_files:
|
|
builder.add_parent_folders(file)
|
|
for file, _d, _f in external_files:
|
|
builder.add_parent_folders(file)
|
|
|
|
db = builder.build(db_id=vars.db_id)
|
|
|
|
transformer = DatabaseTransformer(db, vars)
|
|
transformer.apply_urls()
|
|
transformer.apply_linux_update()
|
|
transformer.apply_zips()
|
|
|
|
persistence = DatabasePersistence(db, vars)
|
|
if persistence.needs_save():
|
|
print()
|
|
print('Changes detected. Proceeding to save new db...')
|
|
persistence.save()
|
|
save_report_terms_in_readme(tags.get_report_terms())
|
|
print()
|
|
print('Saving complete.')
|
|
else:
|
|
print()
|
|
print('No changes detected.')
|
|
|
|
def compare_databases(left_path: str, right_path: str) -> None:
|
|
are_same = mut_diff_db(get_url_db(left_path), get_url_db(right_path))
|
|
print()
|
|
if are_same:
|
|
print('No changes.')
|
|
else:
|
|
print('Databases are different.')
|
|
|
|
# build_database domain:
|
|
|
|
@dataclass
|
|
class BuildVars:
|
|
github_token: str = os.getenv("GITHUB_TOKEN", '').strip()
|
|
db_id: str = os.getenv("DB_ID", '').strip()
|
|
db_url: str = os.getenv('DB_URL', '').strip()
|
|
test_db_url: str = os.getenv('TEST_DB_URL', '').strip()
|
|
db_json_name: str = os.getenv('DB_JSON_NAME', 'dbresult.json').strip()
|
|
base_files_url: str = os.getenv('BASE_FILES_URL', '').strip()
|
|
linux_github_repository: str = os.getenv('LINUX_GITHUB_REPOSITORY', '').strip()
|
|
zips_config: str = os.getenv('ZIPS_CONFIG', '').strip()
|
|
download_metadata_json: str = os.getenv('DOWNLOAD_METADATA_JSON', '/tmp/download_metadata.json').strip()
|
|
finder_ignore: str = os.getenv('FINDER_IGNORE', '').strip()
|
|
broken_mras_ignore: bool = os.getenv('BROKEN_MRAS_IGNORE', 'false').strip().lower() == 'true'
|
|
external_files: str = os.getenv("EXTERNAL_FILES", '').strip()
|
|
|
|
class Finder:
|
|
def __init__(self, dir: str):
|
|
self._dir = dir
|
|
self._not_in_ignore: List[str] = []
|
|
|
|
@property
|
|
def dir(self) -> str:
|
|
return self._dir
|
|
|
|
def ignore(self, entry_path: str) -> None:
|
|
ignored_entry = str(Path(entry_path))
|
|
print('Ignored: %s' % ignored_entry)
|
|
self._not_in_ignore.append(ignored_entry)
|
|
|
|
def find_all(self) -> List[Path]:
|
|
return sorted(self._scan(self._dir), key=lambda file: str(file).lower())
|
|
|
|
def _scan(self, directory: str) -> Generator[Path, None, None]:
|
|
for entry in os.scandir(directory):
|
|
if str(Path(entry.path)) in self._not_in_ignore:
|
|
continue
|
|
if entry.is_dir(follow_symlinks=False):
|
|
yield from self._scan(entry.path)
|
|
else:
|
|
yield Path(entry.path)
|
|
|
|
class ExternalFilesReader:
|
|
def __init__(self, strpath: str):
|
|
self._strpath = strpath
|
|
|
|
def read_external_files(self) -> List[Tuple[Path, Dict[str, Any], List[str]]]:
|
|
if self._strpath == '':
|
|
return []
|
|
|
|
result = []
|
|
for strpath in self._strpath.split():
|
|
data = self._read_csv_data(strpath)
|
|
if data is None or len(data) == 0:
|
|
continue
|
|
|
|
print(f"Parsing CSV '{strpath}' to extract external files.")
|
|
for row in data:
|
|
self._parse_data_row(row, result)
|
|
|
|
return result
|
|
|
|
def _parse_data_row(self, row, result: List[Tuple[Path, Dict[str, Any], List[str]]]) -> None:
|
|
if len(row) < 2:
|
|
print('Not enough columns in this row, skipping it.', row)
|
|
return
|
|
if len(row) == 2:
|
|
print('Hash and size columns are missing.', row)
|
|
path, url, size, md5hash = row[0].strip(), row[1].strip(), '', ''
|
|
elif len(row) == 3:
|
|
print('Hash column is missing.', row)
|
|
path, url, size, md5hash = row[0].strip(), row[1].strip(), row[2].strip(), ''
|
|
else:
|
|
path, url, size, md5hash = row[0].strip(), row[1].strip(), row[2].strip(), row[3].strip().lower()
|
|
|
|
if size == '' or md5hash == '':
|
|
size, md5hash = self._read_size_and_md5hash_from_real_file(url, size, md5hash)
|
|
|
|
if not is_valid_path(path):
|
|
print(f"Invalid path in this row: {path}, skipping it.", row)
|
|
return
|
|
if not is_valid_url(url):
|
|
print(f"Invalid URL in this row: {url}, skipping it.", row)
|
|
return
|
|
if not is_valid_size(size):
|
|
print(f"Invalid size in this row: {size}, skipping it.", row)
|
|
return
|
|
if not is_valid_md5hash(md5hash):
|
|
print(f"Invalid MD5 hash in this row: {md5hash}, skipping it.", row)
|
|
return
|
|
|
|
description = {"url": url, "size": int(size), "hash": md5hash}
|
|
|
|
filter_terms = self._extract_filter_terms(row)
|
|
|
|
for field_name, field_value in self._extract_extra_fields(row):
|
|
description[field_name] = field_value
|
|
|
|
result.append((Path(path), description, filter_terms))
|
|
|
|
def _read_size_and_md5hash_from_real_file(self, url: str, size: str, md5hash: str) -> Tuple[str, str]:
|
|
with tempfile.NamedTemporaryFile() as tmp_file:
|
|
download_file(url, tmp_file.name)
|
|
new_size, new_md5hash = str(file_size(tmp_file.name)), file_hash(tmp_file.name)
|
|
if size != '' and size != new_size:
|
|
print(f'Real size {new_size} is different than anotated size {size}')
|
|
if md5hash != '' and md5hash != new_md5hash:
|
|
print(f'Real MD5 Hash {new_md5hash} is different than anotated MD5 Hash {md5hash}')
|
|
return new_size, new_md5hash
|
|
|
|
@staticmethod
|
|
def _extract_filter_terms(row: List[str]) -> List[str]:
|
|
filter_terms = []
|
|
if len(row) >= 5:
|
|
filter_terms = row[4].strip().split()
|
|
|
|
return filter_terms
|
|
|
|
@staticmethod
|
|
def _extract_extra_fields(row: List[str]) -> List[Tuple[str, Any]]:
|
|
if len(row) < 6:
|
|
return []
|
|
|
|
result = []
|
|
for extra_field in row[5].strip().split():
|
|
if not is_valid_field_tuple(extra_field):
|
|
print(f"Invalid field tuple: {extra_field}. in row:", row)
|
|
continue
|
|
|
|
field_parts = extra_field.split(':')
|
|
field_name = field_parts[0].lower()
|
|
field_value = field_parts[1]
|
|
|
|
actual_value = None
|
|
|
|
if field_name in ['overwrite', 'reboot']:
|
|
actual_value = parse_boolean(field_value)
|
|
else:
|
|
print(f"Invalid field name: {field_name}. in row:", row)
|
|
continue
|
|
|
|
if actual_value is None:
|
|
print(f"Invalid field value: {field_value}, for field name: {field_name}, in row:", row)
|
|
continue
|
|
|
|
result.append((field_name, actual_value))
|
|
|
|
return result
|
|
|
|
@staticmethod
|
|
def _read_csv_data(strpath) -> Optional[List[List[str]]]:
|
|
try:
|
|
with open(strpath, newline='') as csvfile:
|
|
csv_reader = csv.reader(csvfile, delimiter=',', quotechar='"')
|
|
return [row for row in csv_reader][1:]
|
|
except Exception as e:
|
|
print('csv file not opened: ' + strpath)
|
|
print(e)
|
|
return None
|
|
|
|
def is_valid_url(url: str) -> bool:
|
|
try:
|
|
result = urlparse(url)
|
|
return all([result.scheme, result.netloc])
|
|
except ValueError:
|
|
return False
|
|
|
|
def is_valid_path(path: str) -> bool:
|
|
try:
|
|
p = Path(path)
|
|
if p.is_absolute() or len(path) < 3:
|
|
return False
|
|
|
|
for part in p.parts:
|
|
if part in ['..', '.']:
|
|
return False
|
|
|
|
return True
|
|
except Exception as e:
|
|
print(e)
|
|
return False
|
|
|
|
def is_valid_size(size: str) -> bool:
|
|
return size.isdigit() and int(size) > 0
|
|
|
|
def is_valid_md5hash(md5hash: str) -> bool:
|
|
return bool(re.match(r'^[a-fA-F0-9]{32}$', md5hash))
|
|
|
|
def is_valid_field_tuple(s):
|
|
pattern = r'^\w+:\w+$'
|
|
return bool(re.match(pattern, s))
|
|
|
|
def parse_boolean(s):
|
|
s_lower = s.lower()
|
|
if s_lower in ('true', 'yes', 'y', '1'):
|
|
return True
|
|
elif s_lower in ('false', 'no', 'n', '0'):
|
|
return False
|
|
else:
|
|
return None
|
|
|
|
initial_filter_aliases = [
|
|
# Consoles
|
|
['nes', 'famicom', 'nintendo'],
|
|
['snes', 'sufami', 'supernes', 'supernintendo', 'superfamicom'],
|
|
['pcengine', 'tgfx16', 'turbografx16', 'turbografx'],
|
|
['pcenginecd', 'tgfx16cd', 'turbografx16cd', 'turbografxcd'],
|
|
['megadrive', 'genesis'],
|
|
['megacd', 'segacd'],
|
|
['sms', 'mastersystem', 'segamark3'],
|
|
['gb', 'gameboy'],
|
|
['gbc', 'gameboycolor'],
|
|
['sgb', 'supergameboy'],
|
|
['gba', 'gameboyadvance'],
|
|
|
|
# General
|
|
['console-cores', 'console'],
|
|
['arcade-cores', 'arcade'],
|
|
['computer-cores', 'computer'],
|
|
['other-cores', 'other'],
|
|
['service-cores', 'utility'],
|
|
]
|
|
|
|
class Tags:
|
|
filter_part_regex = re.compile("[-_a-z0-9.]$", )
|
|
|
|
def __init__(self, metadata_props: Optional[Dict[str, Any]], broken_mras_ignore: bool) -> None:
|
|
self._metadata = Metadata(metadata_props if metadata_props is not None else Metadata.new_props())
|
|
self._broken_mras_ignore = broken_mras_ignore
|
|
self._dict: Dict[str, int] = {}
|
|
self._alternatives: Dict[str, Set[str]] = {}
|
|
self._index: int = 0
|
|
self._report_set: Set[str] = set()
|
|
self._used: Set[int] = set()
|
|
self._init: bool = False
|
|
|
|
def init_aliases(self, aliases: List[List[str]]) -> None:
|
|
if self._init:
|
|
raise Exception("Can only be initialised once.")
|
|
self._init = True
|
|
|
|
for alias_list in [*self._metadata.aliases(), *aliases]:
|
|
clean_terms = [self._clean_term(alias) for alias in alias_list]
|
|
index = self._matching_dict_index(clean_terms)
|
|
for term in clean_terms:
|
|
self._dict[term] = index
|
|
|
|
def _matching_dict_index(self, clean_terms: List[str]) -> int:
|
|
result = None
|
|
for term in clean_terms:
|
|
if term not in self._dict:
|
|
continue
|
|
|
|
if result is None:
|
|
result = self._dict[term]
|
|
elif result != self._dict[term]:
|
|
raise Exception(f'Aliases with different indexes should not happen. {str(result)} != {str(self._dict[term])}')
|
|
|
|
if result is not None:
|
|
return result
|
|
|
|
result = self._index
|
|
self._index += 1
|
|
return result
|
|
|
|
def get_tags_for_file(self, path: Path) -> List[int]:
|
|
return sorted(self._impl_tags_for_file(path))
|
|
|
|
def _impl_tags_for_file(self, path: Path) -> List[int]:
|
|
parent = path.parts[0].lower()
|
|
if parent[0] == '_':
|
|
parent = parent[1:]
|
|
|
|
result: List[int] = []
|
|
if len(path.parts) > 1:
|
|
self._append(result, self._use_term(parent))
|
|
|
|
self._add_cores_terms(parent, result)
|
|
|
|
suffix = path.suffix.lower()
|
|
stem = path.stem.lower()
|
|
|
|
if suffix == '.mra':
|
|
self._append(result, self._use_term('mra'))
|
|
rbf, zips, broken_error = read_mra_fields(path)
|
|
|
|
if broken_error is None:
|
|
if rbf is not None:
|
|
self._append(result, self._use_arcade_term(rbf))
|
|
|
|
if self._contains_hbmame_rom(zips):
|
|
self._append(result, self._use_term('hbmame'))
|
|
|
|
if len(path.parts) > 1 and path.parts[1].lower() == '_alternatives':
|
|
self._append(result, self._use_term('alternatives'))
|
|
|
|
if rbf is not None and len(path.parts) > 2:
|
|
alternative_subfolder = path.parts[2].lower()[1:]
|
|
if alternative_subfolder not in self._alternatives:
|
|
self._alternatives[alternative_subfolder] = set()
|
|
self._alternatives[alternative_subfolder].add(rbf)
|
|
else:
|
|
if not self._broken_mras_ignore:
|
|
raise broken_error
|
|
|
|
elif suffix == '.rbf':
|
|
nodates = stem[0:-9]
|
|
if not nodates:
|
|
nodates = stem
|
|
|
|
self._append(result, self._use_term('cores'))
|
|
if parent == 'arcade' or nodates.startswith('arcade-'):
|
|
self._append(result, self._use_arcade_term(nodates))
|
|
else:
|
|
self._append(result, self._use_term(nodates))
|
|
|
|
if nodates in ['gba2p', 'gameboy2p']:
|
|
self._append(result, self._use_term('handheld2p'))
|
|
|
|
if nodates == 'genesis':
|
|
self._append(result, self._use_term('genesis-core'))
|
|
|
|
if nodates == 'megadrive':
|
|
self._append(result, self._use_term('megadrive-core'))
|
|
|
|
elif suffix == '.mgl':
|
|
self._append(result, self._use_term('mgl'))
|
|
self._append(result, self._use_term('cores'))
|
|
self._append(result, self._use_term(stem))
|
|
rbf, _, broken_error = read_mgl_fields(path)
|
|
if broken_error is None and rbf is not None:
|
|
self._append(result, self._use_term(Path(rbf).name.lower()))
|
|
elif broken_error is not None and not self._broken_mras_ignore:
|
|
raise broken_error
|
|
|
|
if stem in ['menu', 'mister']:
|
|
self._append(result, self._use_term('essential'))
|
|
|
|
if stem == 'mister':
|
|
self._append(result, self._use_term('misterfirmware'))
|
|
|
|
if stem == 'downloader_latest':
|
|
self._append(result, self._use_term('downloaderlatest'))
|
|
self._append(result, self._use_term('downloader'))
|
|
|
|
if stem == 'yc' and suffix == '.txt':
|
|
self._append(result, self._use_term('yctxt'))
|
|
|
|
if parent in ['games', 'docs']:
|
|
first_level = path.parts[1].lower()
|
|
self._append(result, self._use_term(first_level))
|
|
if self._metadata.is_mgl_home(first_level):
|
|
self._append(result, self._use_term('mgl'))
|
|
self._append(result, self._use_term(self._metadata.mgl_dependency(first_level)))
|
|
|
|
category = self._metadata.category_by_home(first_level)
|
|
if category is not None:
|
|
self._append(result, self._use_term(category))
|
|
|
|
if first_level in ['gba2p', 'gameboy2p']:
|
|
self._append(result, self._use_term('handheld2p'))
|
|
|
|
second_level = path.parts[2].lower()
|
|
if len(path.parts) > 3:
|
|
self._append(result, self._use_term(second_level))
|
|
|
|
if parent == 'games':
|
|
if second_level.endswith('.rom'):
|
|
self._append(result, self._use_term('bios'))
|
|
elif second_level not in ['palettes'] and suffix != '.rbf' and suffix != '.mra':
|
|
self._append(result, self._use_term('extra-utilities'))
|
|
elif parent == 'docs' and 'readme' in stem:
|
|
self._append(result, self._use_term('readme'))
|
|
|
|
elif parent == 'cheats':
|
|
first_level = path.parts[1].lower()
|
|
self._append(result, self._use_term(first_level))
|
|
self._append(result, self._use_term('console'))
|
|
|
|
elif parent in ['gamma', 'filters', 'filters_audio', 'shadow_masks']:
|
|
self._append(result, self._use_term('all_filters'))
|
|
|
|
if parent in ['gamma', 'filters', 'shadow_masks']:
|
|
self._append(result, self._use_term('filters_video'))
|
|
|
|
elif parent in ['wallpapers']:
|
|
ar = read_image_aspect_ratio(path)
|
|
if ar is None:
|
|
pass
|
|
elif abs(ar - 1.77) < 0.1:
|
|
self._append(result, self._use_term('ar16:9'))
|
|
elif abs(ar - 1.33) < 0.1:
|
|
self._append(result, self._use_term('ar4:3'))
|
|
|
|
self._append(result, self._use_term(stem))
|
|
|
|
return result
|
|
|
|
def _contains_hbmame_rom(self, zips: List[str]) -> bool:
|
|
for z in zips:
|
|
if 'hbmame' in z.lower():
|
|
return True
|
|
|
|
return False
|
|
|
|
def get_tags_for_folder(self, path: Path) -> List[int]:
|
|
return sorted(self._impl_tags_for_folder(path))
|
|
|
|
def _impl_tags_for_folder(self, path: Path) -> List[int]:
|
|
if len(path.parts) == 0:
|
|
return []
|
|
|
|
parent = path.parts[0].lower()
|
|
if parent[0] == '_':
|
|
parent = parent[1:]
|
|
result = [self._use_term(parent)]
|
|
|
|
if parent in ['console', 'computer', 'other', 'utility']:
|
|
self._append(result, self._use_term('cores'))
|
|
elif parent == 'cheats':
|
|
self._append(result, self._use_term('console'))
|
|
|
|
self._add_cores_terms(parent, result)
|
|
|
|
if len(path.parts) == 1:
|
|
return result
|
|
|
|
first_level = path.parts[1].lower()
|
|
if first_level[0] == '_':
|
|
first_level = first_level[1:]
|
|
|
|
if parent in ['games', 'docs']:
|
|
if first_level in ['gba2p', 'gameboy2p']:
|
|
self._append(result, self._use_term('handheld2p'))
|
|
if self._metadata.is_mgl_home(first_level):
|
|
self._append(result, self._use_term('mgl'))
|
|
self._append(result, self._use_term(self._metadata.mgl_dependency(first_level)))
|
|
category = self._metadata.category_by_home(first_level)
|
|
if category is not None:
|
|
self._append(result, self._use_term(category))
|
|
|
|
self._append(result, self._use_term(first_level))
|
|
|
|
if len(path.parts) == 2:
|
|
return result
|
|
|
|
second_level = path.parts[2].lower()
|
|
if second_level[0] == '_':
|
|
second_level = second_level[1:]
|
|
|
|
if parent == 'arcade' and first_level == 'alternatives':
|
|
if second_level in self._alternatives:
|
|
for rbf in self._alternatives[second_level]:
|
|
if not rbf:
|
|
continue
|
|
self._append(result, self._use_arcade_term(rbf))
|
|
|
|
if parent == 'games':
|
|
if second_level in ['palettes']:
|
|
self._append(result, self._use_term(second_level))
|
|
else:
|
|
self._append(result, self._use_term('extra-utilities'))
|
|
|
|
return result
|
|
|
|
def _use_term(self, term: str) -> int:
|
|
return self._use_from_dict(self._clean_term(term))
|
|
|
|
def _use_arcade_term(self, term: str) -> int:
|
|
return self._use_from_dict(self._clean_term('arcade-' + term))
|
|
|
|
def _use_cores_term(self, term: str) -> int:
|
|
return self._use_from_dict(self._clean_term(term + '-cores'))
|
|
|
|
def _clean_term(self, term: str) -> str:
|
|
if not term:
|
|
raise Exception('Term is empty')
|
|
result = ''.join(filter(lambda chr: self.filter_part_regex.match(chr), term.replace(' ', '')))
|
|
if result not in self._report_set:
|
|
self._report_set.add(result)
|
|
return result.replace('-', '').replace('_', '')
|
|
|
|
def _use_from_dict(self, term: str) -> int:
|
|
if term == 'menu.rbf':
|
|
raise Exception('should not happen')
|
|
if not term:
|
|
raise Exception('Term is empty')
|
|
if term not in self._dict:
|
|
self._dict[term] = self._index
|
|
self._index += 1
|
|
|
|
self._used.add(self._dict[term])
|
|
|
|
return self._dict[term]
|
|
|
|
def _add_cores_terms(self, parent: str, result: List[int]) -> None:
|
|
if parent in ['console', 'computer', 'other', 'arcade']:
|
|
self._append(result, self._use_cores_term(parent))
|
|
elif parent == 'utility':
|
|
self._append(result, self._use_cores_term('service'))
|
|
|
|
def _append(self, result: List[int], term: int) -> None:
|
|
if term in result:
|
|
return
|
|
result.append(term)
|
|
|
|
def get_dictionary(self) -> Dict[str, int]:
|
|
result: Dict[str, int] = {}
|
|
for k, v in self._dict.items():
|
|
if v in self._used:
|
|
result[k] = v
|
|
return result
|
|
|
|
def get_report_terms(self) -> List[str]:
|
|
result: List[str] = []
|
|
for entry in self._report_set:
|
|
if self._dict[self._clean_term(entry)] in self._used:
|
|
result.append(entry)
|
|
return sorted(result)
|
|
|
|
class DatabaseBuilder:
|
|
firmware = 'MiSTer'
|
|
main_binaries = ['MiSTer', 'menu.rbf']
|
|
|
|
def __init__(self, tags: Tags):
|
|
self._files: Dict[str, Any] = {}
|
|
self._folders: Dict[str, Any] = {}
|
|
self._tags = tags
|
|
|
|
def add_file(self, file: Path) -> None:
|
|
self.add_external_file(file, new_file_description(str(file)), [])
|
|
|
|
def add_external_file(self, file: Path, description: Dict[str, Any], filter_terms: List[str]) -> None:
|
|
strfile = str(file)
|
|
|
|
if file.name in ['.delme', '.DS_Store'] or strfile in ['README.md', 'LICENSE', 'latest_linux.txt', '.gitattributes']:
|
|
return
|
|
|
|
if strfile.startswith('games') or strfile.startswith('docs'):
|
|
description['path'] = 'pext'
|
|
|
|
tags = self._tags.get_tags_for_file(file)
|
|
for term in filter_terms:
|
|
tags.append(self._tags._use_term(term))
|
|
|
|
self._files[strfile] = {**description, "tags": tags}
|
|
|
|
if file.name.lower() in ['boot.rom', 'boot1.rom', 'boot0.rom'] and not strfile.startswith('games/AO486/'):
|
|
self._files[strfile]['overwrite'] = False
|
|
|
|
if strfile in self.main_binaries or strfile.startswith('linux/'):
|
|
self._files[strfile]['path'] = 'system'
|
|
|
|
if strfile in self.main_binaries:
|
|
self._files[strfile]['reboot'] = True
|
|
|
|
if strfile == self.firmware:
|
|
self._files[strfile]['backup'] = '.MiSTer.old'
|
|
self._files[strfile]['tmp'] = 'MiSTer.new'
|
|
|
|
def add_parent_folders(self, file: Path) -> None:
|
|
for folder in file.parents:
|
|
strfolder = str(folder)
|
|
if strfolder in self._folders or strfolder in ['.', '']:
|
|
continue
|
|
self._folders[strfolder] = {"tags": self._tags.get_tags_for_folder(folder)}
|
|
if strfolder.startswith('games') or strfolder.startswith('docs'):
|
|
self._folders[strfolder]['path'] = 'pext'
|
|
|
|
def build(self, db_id: str) -> Dict[str, Any]:
|
|
return {
|
|
"db_id": db_id,
|
|
"files": self._files,
|
|
"folders": self._folders,
|
|
"tag_dictionary": self._tags.get_dictionary(),
|
|
"timestamp": int(time.time()),
|
|
}
|
|
|
|
class DatabaseTransformer:
|
|
def __init__(self, db: Dict[str, Any], vars: BuildVars):
|
|
self._db = db
|
|
self._vars = vars
|
|
|
|
def apply_urls(self) -> None:
|
|
if self._vars.base_files_url == '':
|
|
raise ValueError('Variable "BASE_FILES_URL" missing!')
|
|
|
|
print('BASE_FILES_URL:', self._vars.base_files_url)
|
|
sha = run_stdout('git rev-parse --verify HEAD')
|
|
print('SHA:', sha)
|
|
base_files_url = self._vars.base_files_url % sha
|
|
print('Combined BASE_FILES_URL % SHA:', base_files_url)
|
|
self._db['base_files_url'] = base_files_url
|
|
|
|
if self._vars.db_url != '':
|
|
self._db['db_url'] = self._vars.db_url
|
|
|
|
def apply_linux_update(self) -> None:
|
|
if self._vars.linux_github_repository == '':
|
|
return
|
|
|
|
print('LINUX_GITHUB_REPOSITORY:', self._vars.linux_github_repository)
|
|
url_linux = get_linux_latest_release_url(self._vars.linux_github_repository, self._vars.github_token)
|
|
with tempfile.NamedTemporaryFile() as tmp_file:
|
|
download_file(url_linux, tmp_file.name)
|
|
version = Path(url_linux).stem[-6:]
|
|
self._db['linux'] = {**new_file_description(tmp_file.name), "url": url_linux, "version": version}
|
|
|
|
def apply_zips(self) -> None:
|
|
if self._vars.zips_config == '':
|
|
return
|
|
|
|
config = try_read_json(self._vars.zips_config)
|
|
if config is None:
|
|
raise ValueError(f'Need "{self._vars.zips_config}" to be a valid JSON!')
|
|
|
|
builder = ZipsBuilder(self._db)
|
|
for zip_id, zip_description in config.items():
|
|
builder.add_zip(zip_id, zip_description)
|
|
|
|
self._db['zips'] = builder.build()
|
|
|
|
class DatabasePersistence:
|
|
def __init__(self, db: Dict[str, Any], vars: BuildVars):
|
|
self._db = db
|
|
self._vars = vars
|
|
|
|
def needs_save(self) -> bool:
|
|
test_db_url = self._vars.test_db_url
|
|
if test_db_url == '':
|
|
test_db_url = self._vars.db_url
|
|
if test_db_url == '':
|
|
print('Missing "DB_URL" and "TEST_DB_URL", can not check previous db!')
|
|
return True
|
|
|
|
try:
|
|
previous_db = get_url_db(test_db_url)
|
|
except ReturnCodeException as e:
|
|
print('ReturnCodeException at get_url_db ' + test_db_url)
|
|
print(e)
|
|
return True
|
|
|
|
are_same = mut_diff_db(previous_db, json.loads(json.dumps(self._db)))
|
|
return not are_same
|
|
|
|
def save(self):
|
|
easy_debug = self._vars.db_json_name == 'dbresult.json'
|
|
if 'zips' in self._db:
|
|
if self._vars.base_files_url == '':
|
|
raise ValueError('Variable "BASE_FILES_URL" missing!')
|
|
|
|
save_zips(self._db['zips'], self._vars.base_files_url)
|
|
|
|
with open(self._vars.db_json_name, 'w') as f:
|
|
json.dump(self._db, f, indent=4 if easy_debug else None, sort_keys=True)
|
|
|
|
class ZipsBuilder:
|
|
def __init__(self, db: Dict[str, Any]):
|
|
self._db = db
|
|
self._zips: Dict[str, Any] = {}
|
|
self._intermediate: Dict[str, Any] = {}
|
|
|
|
def add_zip(self, zip_id: str, zip_description: Dict[str, Any]) -> None:
|
|
mode = zip_description.get('mode', 'simple')
|
|
if mode == 'simple':
|
|
self._simple_process(zip_id, zip_description['source'], zip_description)
|
|
elif mode == 'multi':
|
|
self._multi_process(zip_id, zip_description)
|
|
elif mode == 'subfolders':
|
|
self._subfolders_process(zip_id, zip_description['source'], zip_description)
|
|
else:
|
|
raise ValueError(f'Unknown mode: {mode}')
|
|
|
|
def build(self) -> Dict[str, Any]:
|
|
return self._zips
|
|
|
|
def _multi_process(self, zip_id: str, zip_description: Dict[str, Any]) -> None:
|
|
self._intermediate[zip_id] = {'files': {}, 'folders': {}}
|
|
for source in zip_description['sources']:
|
|
self._move_elements(zip_id, source, 'files')
|
|
self._move_elements(zip_id, source, 'folders')
|
|
|
|
self._add_zip(zip_id,
|
|
contents=zip_description['sources'],
|
|
description=self._description(", ".join(zip_description["sources"]), zip_description['path']),
|
|
parent=zip_description['path'] + '/',
|
|
mode='multi'
|
|
)
|
|
|
|
def _simple_process(self, zip_id: str, source: str, zip_description: Dict[str, Any]) -> None:
|
|
self._intermediate[zip_id] = {'files': {}, 'folders': {}}
|
|
|
|
self._move_elements(zip_id, source, 'files')
|
|
self._move_elements(zip_id, source, 'folders')
|
|
|
|
source2 = Path(source)
|
|
for outer in source2.parents:
|
|
outer = str(outer)
|
|
if outer == '.' or outer == '':
|
|
continue
|
|
self._intermediate[zip_id]['folders'][outer] = {**self._db['folders'][outer], 'zip_id': zip_id}
|
|
|
|
parent = str(source2.parent) + '/'
|
|
|
|
self._add_zip(zip_id,
|
|
contents=[source2.name],
|
|
description=self._description(source2.name, parent),
|
|
parent=parent,
|
|
source=zip_description['source']
|
|
)
|
|
|
|
@staticmethod
|
|
def _description(unpacking_str: str, parent: str) -> str:
|
|
return f'Unpacking {unpacking_str} at {parent}' if parent not in ['./', '.'] else f'Unpacking {unpacking_str} at the root'
|
|
|
|
def _subfolders_process(self, zip_id: str, source: str, zip_description: Dict[str, Any]) -> None:
|
|
subfolder_len = len(Path(source).parts)
|
|
subfolders: Set[str] = set()
|
|
|
|
self._fill_subfolders(subfolders, subfolder_len, source, 'files')
|
|
self._fill_subfolders(subfolders, subfolder_len, source, 'folders')
|
|
|
|
for folder in sorted(subfolders, key=lambda k: len(k), reverse=True):
|
|
composed_source = f'{source}/{folder}'
|
|
|
|
if not self._enough_files_for_subfolder(composed_source):
|
|
continue
|
|
|
|
composed_zip_id = f'{zip_id}{folder.lower()}'
|
|
self._simple_process(composed_zip_id, composed_source, {**zip_description, 'source': composed_source})
|
|
|
|
def _move_elements(self, zip_id: str, source: str, key: str) -> None:
|
|
for element in list(self._db[key]):
|
|
if element.startswith(source):
|
|
self._intermediate[zip_id][key][element] = self._db[key][element]
|
|
self._intermediate[zip_id][key][element]['zip_id'] = zip_id
|
|
del self._db[key][element]
|
|
|
|
def _fill_subfolders(self, subfolders: Set[str], subfolder_len: int, source: str, key: str) -> None:
|
|
for element in list(self._db[key]):
|
|
if element.startswith(source):
|
|
parts = Path(element).parts
|
|
if len(parts) == subfolder_len:
|
|
continue
|
|
|
|
subfolder = Path(element).parts[subfolder_len]
|
|
subfolders.add(subfolder)
|
|
|
|
def _add_zip(self, zip_id: str, contents: List[str], description: str, parent: str, mode: Optional[str] = None, source: Optional[str] = None) -> None:
|
|
raw_files_size = 0
|
|
for file_desc in self._intermediate[zip_id]['files'].values():
|
|
raw_files_size += file_desc['size']
|
|
|
|
result = {
|
|
'base_files_url': self._db['base_files_url'],
|
|
'contents': contents,
|
|
'description': description,
|
|
'kind': 'extract_all_contents',
|
|
'raw_files_size': raw_files_size,
|
|
'summary_file_content': {
|
|
'folders': self._intermediate[zip_id]['folders'],
|
|
'files': self._intermediate[zip_id]['files'],
|
|
},
|
|
'target_folder_path': parent,
|
|
}
|
|
|
|
if parent.startswith('games') or parent.startswith('docs'):
|
|
result['path'] = 'pext'
|
|
|
|
if mode is not None:
|
|
result['mode'] = mode
|
|
|
|
if source is not None:
|
|
result['source'] = source
|
|
|
|
self._zips[zip_id] = result
|
|
|
|
def _enough_files_for_subfolder(self, composed_source: str) -> bool:
|
|
qty = 0
|
|
for f in self._db['files']:
|
|
if not f.startswith(composed_source):
|
|
continue
|
|
qty += 1
|
|
if qty >= 60:
|
|
return True
|
|
|
|
return False
|
|
|
|
class Metadata:
|
|
@staticmethod
|
|
def new_props() -> Dict[str, Any]:
|
|
return {'home': {}, 'aliases': []}
|
|
|
|
def __init__(self, props: Dict[str, Any]):
|
|
self._props = props
|
|
|
|
def is_mgl_home(self, home: str) -> bool:
|
|
return home in self._props['home'] and self._props['home'][home]['mgl_dependency'] != ''
|
|
|
|
def mgl_dependency(self, home: str) -> str:
|
|
mgl_dependency = self._props['home'][home]['mgl_dependency']
|
|
if len(mgl_dependency) == 0:
|
|
raise Exception('This method should be used after is_mgl_home is true')
|
|
return mgl_dependency
|
|
|
|
def category_by_home(self, home: str) -> Optional[str]:
|
|
return None if home not in self._props['home'] else self._props['home'][home]['category']
|
|
|
|
def aliases(self) -> List[List[str]]:
|
|
return self._props['aliases']
|
|
|
|
# MiSTer save functions
|
|
|
|
def save_zips(zips: Dict[str, Any], base_files_url: str) -> None:
|
|
base_zips_url = base_files_url % '<ZIPS_BRANCH_BASE_URL>'
|
|
for zip_id, zip_description in zips.items():
|
|
summary_file_content = zip_description['summary_file_content']
|
|
del zip_description['summary_file_content']
|
|
|
|
summary_file_zip = save_summary_file_zip(zip_id, summary_file_content)
|
|
zip_description['summary_file'] = {**new_file_description(summary_file_zip), 'url': f'{base_zips_url}{summary_file_zip}'}
|
|
contents_file_zip = save_contents_file_zip(zip_id, summary_file_content, zip_description.get('target_folder_path', None))
|
|
zip_description['contents_file'] = {**new_file_description(contents_file_zip), 'url': f'{base_zips_url}{contents_file_zip}'}
|
|
|
|
def save_summary_file_zip(zip_id: str, summary_file_content: Dict[str, Any]) -> str:
|
|
summary_file_zip = f'{zip_id}_summary.json.zip'
|
|
with ZipFile(summary_file_zip, 'w', compression=ZIP_DEFLATED, compresslevel=1) as zipf:
|
|
zipf.writestr(f'{zip_id}_summary.json', json.dumps(summary_file_content, sort_keys=True))
|
|
return summary_file_zip
|
|
|
|
def save_contents_file_zip(zip_id: str, summary_file_content: Dict[str, Any], zip_path: Optional[str]) -> str:
|
|
contents_file_zip = f'{zip_id}.zip'
|
|
with ZipFile(contents_file_zip, 'w', compression=ZIP_DEFLATED, compresslevel=1) as zipf:
|
|
for file in summary_file_content['files']:
|
|
source = file
|
|
target = file
|
|
if zip_path is not None and target.find(zip_path) == 0:
|
|
target = target[len(zip_path):]
|
|
zipf.write(source, target)
|
|
return contents_file_zip
|
|
|
|
def save_report_terms_in_readme(terms: List[str]) -> None:
|
|
try:
|
|
tag_list = '`' + '`, `'.join(terms) + '`'
|
|
print('TAG_LIST: ' + tag_list)
|
|
|
|
with open("README.md", "rt") as fin:
|
|
readme_content = fin.read()
|
|
|
|
with open("README.md", "wt") as fout:
|
|
fout.write(readme_content.replace('ALL_TAGS_GO_HERE', tag_list))
|
|
|
|
print('README.md updated!')
|
|
except FileNotFoundError as e:
|
|
print('FileNotFoundError: README.md', flush=True)
|
|
print(e, flush=True)
|
|
|
|
# MiSTer entity descriptions
|
|
|
|
def new_file_description(name: str) -> Dict[str, Any]:
|
|
return {"size": file_size(name), "hash": file_hash(name)}
|
|
|
|
# MiSTer XMLs
|
|
|
|
def read_mra_fields(mra_path: Path) -> Tuple[Optional[str], List[str], Optional[ET.ParseError]]:
|
|
try:
|
|
rbf, zips = _read_mra_fields_impl(mra_path)
|
|
return rbf, zips, None
|
|
except ET.ParseError as e:
|
|
print('ERROR: Defect XML for mra file: ' + str(mra_path))
|
|
return None, [], e
|
|
|
|
def _read_mra_fields_impl(mra_path: Path) -> Tuple[Optional[str], List[str]]:
|
|
rbf = None
|
|
zips: Set[str] = set()
|
|
|
|
context = et_iterparse(str(mra_path), events=("start",))
|
|
for _, elem in context:
|
|
elem_tag = elem.tag.lower()
|
|
if elem_tag == 'rbf':
|
|
if rbf is not None:
|
|
print('WARNING! Duplicated rbf tag on file %s, first value %s, later value %s' % (str(mra_path),rbf,elem.text))
|
|
continue
|
|
if elem.text is None:
|
|
continue
|
|
rbf = elem.text.strip().lower()
|
|
elif elem_tag == 'rom':
|
|
attributes = {k.strip().lower(): v for k, v in elem.attrib.items()}
|
|
if 'zip' in attributes and attributes['zip'] is not None:
|
|
zips |= {z.strip().lower() for z in attributes['zip'].strip().lower().split('|')}
|
|
|
|
return rbf, list(zips)
|
|
|
|
def read_mgl_fields(mgl_path: Path) -> Tuple[Optional[str], Optional[str], Optional[ET.ParseError]]:
|
|
try:
|
|
rbf, setname = _read_mgl_fields_impl(mgl_path)
|
|
return rbf, setname, None
|
|
except ET.ParseError as e:
|
|
print('ERROR: Defect XML for mgl file: ' + str(mgl_path))
|
|
return None, None, e
|
|
|
|
def _read_mgl_fields_impl(mgl_path: Path) -> Tuple[Optional[str], Optional[str]]:
|
|
rbf = None
|
|
setname = None
|
|
|
|
context = et_iterparse(str(mgl_path), events=("start",))
|
|
for _, elem in context:
|
|
elem_tag = elem.tag.lower()
|
|
if elem_tag == 'rbf':
|
|
if rbf is not None:
|
|
print('WARNING! Duplicated rbf tag on file %s, first value %s, later value %s' % (str(mgl_path),rbf,elem.text))
|
|
continue
|
|
if elem.text is None:
|
|
continue
|
|
rbf = elem.text.strip().lower()
|
|
elif setname == 'rom':
|
|
if setname is not None:
|
|
print('WARNING! Duplicated setname tag on file %s, first value %s, later value %s' % (str(mgl_path),setname,elem.text))
|
|
continue
|
|
if elem.text is None:
|
|
continue
|
|
setname = elem.text.strip().lower()
|
|
|
|
return rbf, setname
|
|
|
|
# Read other files
|
|
|
|
def read_image_aspect_ratio(path: Path) -> Optional[float]:
|
|
try:
|
|
_ensure_image_library()
|
|
from PIL import Image
|
|
img = Image.open(str(path))
|
|
return float(img.width) / float(img.height)
|
|
except Exception as e:
|
|
print('wallpaper image not opened: ' + str(path))
|
|
print(e)
|
|
return None
|
|
|
|
def _ensure_image_library() -> None:
|
|
try:
|
|
from PIL import Image
|
|
except ImportError as _e:
|
|
subprocess.run([sys.executable, '-m', 'pip', 'install', 'Pillow'], stderr=subprocess.STDOUT, check=True)
|
|
|
|
# MiSTer network utilities
|
|
|
|
def download_db(url: str) -> Dict[str, Any]:
|
|
with tempfile.NamedTemporaryFile() as tf:
|
|
download_file(url, tf.name)
|
|
return load_json(tf.name) if is_json(url) else unzip_json(tf.name)
|
|
|
|
def get_url_db(url: str) -> Dict[str, Any]:
|
|
print("Downloading db from " + url)
|
|
try:
|
|
db = load_json(url) if is_json(url) else unzip_json(url)
|
|
except Exception as _:
|
|
db = download_db(url)
|
|
|
|
if 'zips' not in db:
|
|
return db
|
|
|
|
for zip in db['zips'].values():
|
|
summary_url = zip['summary_file']['url']
|
|
try:
|
|
zip['summary_file_content'] = get_summary_file_content(summary_url)
|
|
except ReturnCodeException as e:
|
|
print('ReturnCodeException at get_summary_file_content ' + summary_url)
|
|
print(e)
|
|
|
|
return db
|
|
|
|
def get_summary_file_content(url: str) -> Dict[str, Any]:
|
|
summary = download_db(url)
|
|
content: Dict[str, Any] = {'files': {}, 'folders': {}}
|
|
for file_name, file_description in summary['files'].items():
|
|
content['files'][file_name] = file_description
|
|
|
|
for folder_name, folder_description in summary['folders'].items():
|
|
content['folders'][folder_name] = folder_description
|
|
|
|
return content
|
|
|
|
def get_linux_latest_release_url(linux_github_repository: str, github_token: str) -> str:
|
|
auth = '' if github_token == '' else f'-H "Authorization: Bearer {github_token}"'
|
|
sd_installer_output = run_stdout(f'curl --fail --location --silent -H "Accept: application/vnd.github.v3+json" {auth} https://api.github.com/repos/{linux_github_repository}/git/trees/HEAD')
|
|
try:
|
|
sd_installer_json = json.loads(sd_installer_output)
|
|
except Exception as e:
|
|
print('Could not parse output: ' + sd_installer_output)
|
|
raise e
|
|
|
|
releases = sorted([x['path'] for x in sd_installer_json['tree'] if x['path'][0:8].lower() == 'release_' and x['path'][-3:].lower() == '.7z'])
|
|
|
|
latest_release = releases[-1]
|
|
return 'https://raw.githubusercontent.com/%s/%s/%s' % (linux_github_repository, sd_installer_json['sha'], latest_release)
|
|
|
|
# db diff tooling
|
|
|
|
def mut_diff_db(left_db: Dict[str, Any], right_db: Dict[str, Any]) -> bool:
|
|
reformat_db_for_comparison(left_db)
|
|
reformat_db_for_comparison(right_db)
|
|
|
|
left_str = json.dumps(left_db, sort_keys=True)
|
|
right_str = json.dumps(right_db, sort_keys=True)
|
|
|
|
with tempfile.NamedTemporaryFile() as temp_left, tempfile.NamedTemporaryFile() as temp_right:
|
|
with open(temp_left.name, 'w') as ndf, open(temp_right.name, 'w') as odf:
|
|
print(json.dumps(left_db, sort_keys=True, indent=True), file=ndf)
|
|
print(json.dumps(right_db, sort_keys=True, indent=True), file=odf)
|
|
try:
|
|
run(f'git diff --no-index --exit-code {temp_left.name} {temp_right.name}')
|
|
except ReturnCodeException as _:
|
|
print('RED[-] is left, GREEN[+] is right')
|
|
pass
|
|
|
|
return left_str == right_str
|
|
|
|
def reformat_db_for_comparison(db: Dict[str, Any]) -> None:
|
|
db['base_files_url'] = ''
|
|
db['latest_zip_url'] = ''
|
|
db['timestamp'] = 0
|
|
db['db_files'] = []
|
|
db['db_url'] = db.get('db_url', '')
|
|
db['default_options'] = db.get('default_options', {})
|
|
|
|
indexes: Dict[int, str] = {db['tag_dictionary'][word]: word for word in sorted(db.get('tag_dictionary', {}))}
|
|
|
|
reformat_elements(indexes, db['files'].values())
|
|
reformat_elements(indexes, db['folders'].values())
|
|
|
|
for zip_description in db.get('zips', {}).values():
|
|
zip_description['base_files_url'] = ''
|
|
zip_description['contents_file'] = {}
|
|
zip_description['summary_file'] = {}
|
|
|
|
if 'summary_file_content' in zip_description:
|
|
reformat_elements(indexes, zip_description['summary_file_content']['files'].values())
|
|
reformat_elements(indexes, zip_description['summary_file_content']['folders'].values())
|
|
|
|
db['tag_dictionary'] = sorted(db.get('tag_dictionary', {}).keys())
|
|
|
|
def reformat_elements(indexes: Dict[int, str], collection: List[Dict[str, Any]]) -> None:
|
|
for dict in collection:
|
|
if 'tags' in dict:
|
|
dict['tags'] = sorted([indexes[t] for t in dict.get('tags', [])])
|
|
|
|
# filesystem utilities
|
|
|
|
def et_iterparse(xml: str, events: Tuple[str]) -> Iterator[Tuple[str, Any]]:
|
|
try:
|
|
with open(xml, 'r') as ftemp:
|
|
f = io.StringIO()
|
|
f.write(ftemp.read().lower())
|
|
f.seek(0)
|
|
return ET.iterparse(f, events=events)
|
|
except Exception as e:
|
|
print('Exception during %s !' % xml)
|
|
raise e
|
|
|
|
def file_size(file: str) -> int:
|
|
return os.path.getsize(file)
|
|
|
|
def file_hash(file: str) -> str:
|
|
with open(file, "rb") as f:
|
|
file_hash = hashlib.md5()
|
|
chunk = f.read(8192)
|
|
while chunk:
|
|
file_hash.update(chunk)
|
|
chunk = f.read(8192)
|
|
return file_hash.hexdigest()
|
|
|
|
def try_read_json(filename: str) -> Optional[Dict[str, Any]]:
|
|
try:
|
|
return load_json(filename)
|
|
except:
|
|
print(f'WARNING! File "{filename}" is not valid JSON.')
|
|
return None
|
|
|
|
def load_json(filename: str) -> Dict[str, Any]:
|
|
with open(filename) as f:
|
|
return json.load(f)
|
|
|
|
def unzip_json(path: str) -> Dict[str, Any]:
|
|
return json.loads(run_stdout('unzip -p ' + path))
|
|
|
|
def set_source_dir(source_dir: str):
|
|
print('Source directory: ' + source_dir)
|
|
os.chdir(source_dir)
|
|
|
|
def is_json(file: str):
|
|
return Path(file).suffix.lower() == '.json'
|
|
|
|
# network utilities
|
|
|
|
def download_file(url: str, target: str) -> None:
|
|
Path(target).parent.mkdir(parents=True, exist_ok=True)
|
|
run(f'curl --show-error --fail --location -o "{target}" "{url}"')
|
|
|
|
# execution utilities
|
|
|
|
def run(command: str, cwd: Optional[str] = None) -> None:
|
|
_run(command, cwd, stderr=subprocess.STDOUT, stdout=None)
|
|
|
|
def run_stdout(command: str, cwd: Optional[str] = None) -> str:
|
|
return _run(command, cwd, stderr=subprocess.DEVNULL, stdout=subprocess.PIPE).stdout.decode().strip()
|
|
|
|
def _run(command: str, cwd: Optional[str], stderr: Optional[int], stdout: Optional[int]) -> Any:
|
|
result = subprocess.run(shlex.split(command), cwd=cwd, shell=False, stderr=subprocess.STDOUT, stdout=stdout)
|
|
if result.returncode == -2:
|
|
raise KeyboardInterrupt()
|
|
elif result.returncode != 0:
|
|
print(f'returncode {result.returncode} from: {command}')
|
|
raise ReturnCodeException(f'returncode {result.returncode} from: {command}')
|
|
return result
|
|
|
|
class ReturnCodeException(Exception):
|
|
pass
|
|
|
|
if __name__ == '__main__':
|
|
main() |