#!/usr/bin/env python3 # Copyright (c) 2021 José Manuel Barroso Galindo # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # You should have received a copy of the GNU General Public License # along with this program. If not, see . import subprocess from pathlib import Path import os import hashlib import json import xml.etree.cElementTree as ET def main(): print('START!') mad_finder = MadFinder('mad') mad_reader = MadReader() for mad in mad_finder.find_all_mads(): print(str(mad)) mad_reader.read_mad(mad) data = mad_reader.data() repeated = mad_reader.repeated() errors = mad_reader.errors() create_orphan_branch('db') json_filename = 'mad_db.json' zip_filename = json_filename + '.zip' save_data_to_compressed_json(data, json_filename, zip_filename) md5_filename = zip_filename + '.md5' with open(md5_filename, 'w') as md5_file: md5_file.write(hash(zip_filename)) run_succesfully('git add %s' % md5_filename) if len(repeated) > 0: with open('repeated.txt', 'w') as repeated_file: for repeats_key in sorted(repeated): repeated_file.write('%s: ' % repeats_key) repeated_file.write(', '.join(repeated[repeats_key])) repeated_file.write('\n') run_succesfully('git add repeated.txt') if len(errors) > 0: with open('errors.txt', 'w') as errors_file: for key in sorted(errors): errors_file.write('%s: ' % key) errors_file.write(', '.join(errors[key])) errors_file.write('\n') run_succesfully('git add errors.txt') force_push_file(zip_filename, 'db') print('Done.') def translate_mad_rotation(rot): if rot == 'horizontal': return 0 elif rot == 'vertical (cw)': return 90 elif rot == 'horizontal (180)': return 180 elif rot == 'vertical (ccw)': return 270 else: return None class MadFinder: def __init__(self, dir): self._dir = dir def find_all_mads(self): return sorted(self._scan(self._dir), key=lambda mad: mad.name.lower()) def _scan(self, directory): for entry in os.scandir(directory): if entry.is_dir(follow_symlinks=False): yield from self._scan(entry.path) elif entry.name.lower().endswith(".mad"): yield Path(entry.path) def read_mad_fields(mad_path, tags): fields = { i : '' for i in tags } try: context = ET.iterparse(str(mad_path), events=("start",)) for event, elem in context: elem_tag = elem.tag.lower() if elem_tag in tags: tags.remove(elem_tag) elem_value = elem.text if isinstance(elem_value, str): fields[elem_tag] = elem_value if len(tags) == 0: break except Exception as e: print("Line %s || %s (%s)" % (lineno(), e, mad_path)) return fields class MadReader: def __init__(self): self._data = dict() self._repeated = dict() self._errors = dict() def read_mad(self, mad): self._mad = mad self._entry_fields = read_mad_fields(mad, [ 'setname', 'name', 'alternative', 'rotation', 'flip', 'resolution', 'cocktail', 'region', 'year', 'category', 'manufacturer', 'homebrew', 'bootleg', 'enhancements', 'translations', 'hacks', 'best_of', 'platform', 'series', 'num_buttons', 'players', 'num_monitors', 'move_inputs', 'special_controls', ]) self._entry_data = {'file': mad.stem + '.mra'} self.set_str_if_not_empty('name') self.set_bool_if_not_empty('alternative') self.set_bool_if_not_empty('flip') self.set_str_if_not_empty('resolution') self.set_str_if_not_empty('cocktail') self.set_str_if_not_empty('region') self.set_int_if_not_empty('year') self.set_str_list_if_not_empty('category') self.set_str_list_if_not_empty('manufacturer') self.set_bool_if_not_empty('homebrew') self.set_bool_if_not_empty('bootleg') self.set_bool_if_not_empty('enhancements') self.set_bool_if_not_empty('translations') self.set_bool_if_not_empty('hacks') self.set_str_list_if_not_empty('best_of') self.set_str_list_if_not_empty('platform') self.set_str_list_if_not_empty('series') self.set_int_if_not_empty('num_buttons') self.set_str_if_not_empty('players') self.set_int_if_not_empty('num_monitors') self.set_str_list_if_not_empty('move_inputs') self.set_str_list_if_not_empty('special_controls') if self._entry_fields['rotation'] != '': rot = translate_mad_rotation(self._entry_fields['rotation'].strip().lower()) if rot is not None: self._entry_data['rotation'] = rot if self._entry_fields['setname'] in self._repeated: self._repeated[self._entry_fields['setname']].append(str(mad)) print('REPEATED! %s' % mad) return self._repeated[self._entry_fields['setname']] = [str(mad)] self._data[self._entry_fields['setname']] = self._entry_data def get_field(self, key): field = self._entry_fields[key].strip('"\' ') if field != '': return field return None def set_str_list_if_not_empty(self, key): field = self.get_field(key) if field is not None: self._entry_data[key] = [s.strip('"\' ') for s in field.split(',')] def set_int_list_if_not_empty(self, key): field = self.get_field(key) if field is not None: try: self._entry_data[key] = [int(s.strip('"\' ')) for s in field.split(',')] except: self.add_error('field %s could not be parsed as int list' % key) def set_str_if_not_empty(self, key): field = self.get_field(key) if field is not None: self._entry_data[key] = field def set_bool_if_not_empty(self, key): field = self.get_field(key) if field is not None: self._entry_data[key] = field.lower() == "yes" or field.lower() == "true" or field.lower() == "y" or field.lower() == "t" def set_int_if_not_empty(self, key): field = self.get_field(key) if field is not None: try: self._entry_data[key] = int(field) except: self.add_error('field %s could not be parsed as int' % key) def add_error(self, message): print('ERROR! %s' % message) if self._mad not in self._errors: self._errors[self._mad] = [] self._errors[self._mad].append(message) def data(self): return self._data def repeated(self): return {key: self._repeated[key] for key in self._repeated if len(self._repeated[key]) > 1} def errors(self): return self._errors def create_orphan_branch(branch): run_succesfully('git checkout -qf --orphan %s' % branch) run_succesfully('git rm -rf .') def force_push_file(file_name, branch): run_succesfully('git add %s' % file_name) run_succesfully('git commit -m "BOT: Releasing new MAD database." > /dev/null 2>&1 || true') run_succesfully('git fetch origin %s > /dev/null 2>&1 || true' % branch) if not run_conditional('git diff --exit-code %s origin/%s' % (branch, branch)): print("There are changes to push.") print() run_succesfully('git push --force origin %s' % branch) print() print("New %s ready to be used." % file_name) else: print("Nothing to be updated.") def save_data_to_compressed_json(db, json_name, zip_name): with open(json_name, 'w') as f: json.dump(db, f, sort_keys=True) run_succesfully('touch -a -m -t 202108231405 %s' % json_name) run_succesfully('zip -rq -D -X -9 -A --compression-method deflate %s %s' % (zip_name, json_name)) def hash(file): with open(file, "rb") as f: file_hash = hashlib.md5() chunk = f.read(8192) while chunk: file_hash.update(chunk) chunk = f.read(8192) return file_hash.hexdigest() def run_conditional(command): result = subprocess.run(command, shell=True, stderr=subprocess.DEVNULL, stdout=subprocess.PIPE) stdout = result.stdout.decode() if stdout.strip(): print(stdout) return result.returncode == 0 def run_succesfully(command): result = subprocess.run(command, shell=True, stderr=subprocess.PIPE, stdout=subprocess.PIPE) stdout = result.stdout.decode() stderr = result.stderr.decode() if stdout.strip(): print(stdout) if stderr.strip(): print(stderr) if result.returncode != 0: raise Exception("subprocess.run Return Code was '%d'" % result.returncode) if __name__ == '__main__': main()