mirror of
https://github.com/MiSTer-devel/ArcadeDatabase_MiSTer.git
synced 2026-04-19 03:04:02 +00:00
303 lines
9.6 KiB
Python
Executable File
303 lines
9.6 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
# Copyright (c) 2021 José Manuel Barroso Galindo <theypsilon@gmail.com>
|
|
|
|
# This program is free software: you can redistribute it and/or modify
|
|
# it under the terms of the GNU General Public License as published by
|
|
# the Free Software Foundation, either version 3 of the License, or
|
|
# (at your option) any later version.
|
|
|
|
# This program is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU General Public License for more details.
|
|
|
|
# You should have received a copy of the GNU General Public License
|
|
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
|
|
import subprocess
|
|
from pathlib import Path
|
|
import os
|
|
import hashlib
|
|
import json
|
|
import xml.etree.cElementTree as ET
|
|
|
|
def main():
|
|
|
|
print('START!')
|
|
|
|
mad_finder = MadFinder('mad')
|
|
mad_reader = MadReader()
|
|
|
|
for mad in mad_finder.find_all_mads():
|
|
print(str(mad))
|
|
mad_reader.read_mad(mad)
|
|
|
|
data = mad_reader.data()
|
|
repeated = mad_reader.repeated()
|
|
errors = mad_reader.errors()
|
|
|
|
create_orphan_branch('db')
|
|
json_filename = 'mad_db.json'
|
|
zip_filename = json_filename + '.zip'
|
|
save_data_to_compressed_json(data, json_filename, zip_filename)
|
|
|
|
md5_filename = zip_filename + '.md5'
|
|
with open(md5_filename, 'w') as md5_file:
|
|
md5_file.write(hash(zip_filename))
|
|
|
|
run_succesfully('git add %s' % md5_filename)
|
|
|
|
if len(repeated) > 0:
|
|
with open('repeated.txt', 'w') as repeated_file:
|
|
for repeats_key in sorted(repeated):
|
|
repeated_file.write('%s: ' % repeats_key)
|
|
repeated_file.write(', '.join(repeated[repeats_key]))
|
|
repeated_file.write('\n')
|
|
|
|
run_succesfully('git add repeated.txt')
|
|
|
|
if len(errors) > 0:
|
|
with open('errors.txt', 'w') as errors_file:
|
|
for key in sorted(errors):
|
|
errors_file.write('%s: ' % key)
|
|
errors_file.write(', '.join(errors[key]))
|
|
errors_file.write('\n')
|
|
|
|
run_succesfully('git add errors.txt')
|
|
|
|
force_push_file(zip_filename, 'db')
|
|
|
|
print('Done.')
|
|
|
|
def translate_mad_rotation(rot):
|
|
if rot == 'horizontal':
|
|
return 0
|
|
elif rot == 'vertical (cw)':
|
|
return 90
|
|
elif rot == 'horizontal (180)':
|
|
return 180
|
|
elif rot == 'vertical (ccw)':
|
|
return 270
|
|
else:
|
|
return None
|
|
|
|
class MadFinder:
|
|
def __init__(self, dir):
|
|
self._dir = dir
|
|
|
|
def find_all_mads(self):
|
|
return sorted(self._scan(self._dir), key=lambda mad: mad.name.lower())
|
|
|
|
def _scan(self, directory):
|
|
for entry in os.scandir(directory):
|
|
if entry.is_dir(follow_symlinks=False):
|
|
yield from self._scan(entry.path)
|
|
elif entry.name.lower().endswith(".mad"):
|
|
yield Path(entry.path)
|
|
|
|
def read_mad_fields(mad_path, tags):
|
|
fields = { i : '' for i in tags }
|
|
|
|
try:
|
|
context = ET.iterparse(str(mad_path), events=("start",))
|
|
for event, elem in context:
|
|
elem_tag = elem.tag.lower()
|
|
if elem_tag in tags:
|
|
tags.remove(elem_tag)
|
|
elem_value = elem.text
|
|
if isinstance(elem_value, str):
|
|
fields[elem_tag] = elem_value
|
|
if len(tags) == 0:
|
|
break
|
|
except Exception as e:
|
|
print("Line %s || %s (%s)" % (lineno(), e, mad_path))
|
|
|
|
return fields
|
|
|
|
class MadReader:
|
|
def __init__(self):
|
|
self._data = dict()
|
|
self._repeated = dict()
|
|
self._errors = dict()
|
|
|
|
def read_mad(self, mad):
|
|
self._mad = mad
|
|
self._entry_fields = read_mad_fields(mad, [
|
|
'setname',
|
|
'name',
|
|
'alternative',
|
|
'rotation',
|
|
'flip',
|
|
'resolution',
|
|
'cocktail',
|
|
'region',
|
|
'year',
|
|
'category',
|
|
'manufacturer',
|
|
'homebrew',
|
|
'bootleg',
|
|
'enhancements',
|
|
'translations',
|
|
'hacks',
|
|
'best_of',
|
|
'platform',
|
|
'series',
|
|
'num_buttons',
|
|
'players',
|
|
'num_monitors',
|
|
'move_inputs',
|
|
'special_controls',
|
|
])
|
|
|
|
self._entry_data = {'file': mad.stem + '.mra'}
|
|
self.set_str_if_not_empty('name')
|
|
self.set_bool_if_not_empty('alternative')
|
|
self.set_bool_if_not_empty('flip')
|
|
self.set_str_if_not_empty('resolution')
|
|
self.set_str_if_not_empty('cocktail')
|
|
self.set_str_if_not_empty('region')
|
|
self.set_int_if_not_empty('year')
|
|
self.set_str_list_if_not_empty('category')
|
|
self.set_str_list_if_not_empty('manufacturer')
|
|
self.set_bool_if_not_empty('homebrew')
|
|
self.set_bool_if_not_empty('bootleg')
|
|
self.set_bool_if_not_empty('enhancements')
|
|
self.set_bool_if_not_empty('translations')
|
|
self.set_bool_if_not_empty('hacks')
|
|
self.set_str_list_if_not_empty('best_of')
|
|
self.set_str_list_if_not_empty('platform')
|
|
self.set_str_list_if_not_empty('series')
|
|
self.set_int_if_not_empty('num_buttons')
|
|
self.set_str_if_not_empty('players')
|
|
self.set_int_if_not_empty('num_monitors')
|
|
self.set_str_list_if_not_empty('move_inputs')
|
|
self.set_str_list_if_not_empty('special_controls')
|
|
|
|
if self._entry_fields['rotation'] != '':
|
|
rot = translate_mad_rotation(self._entry_fields['rotation'].strip().lower())
|
|
if rot is not None:
|
|
self._entry_data['rotation'] = rot
|
|
|
|
if self._entry_fields['setname'] in self._repeated:
|
|
self._repeated[self._entry_fields['setname']].append(str(mad))
|
|
print('REPEATED! %s' % mad)
|
|
return
|
|
|
|
self._repeated[self._entry_fields['setname']] = [str(mad)]
|
|
self._data[self._entry_fields['setname']] = self._entry_data
|
|
|
|
def get_field(self, key):
|
|
field = self._entry_fields[key].strip('"\' ')
|
|
if field != '':
|
|
return field
|
|
|
|
return None
|
|
|
|
def set_str_list_if_not_empty(self, key):
|
|
field = self.get_field(key)
|
|
if field is not None:
|
|
self._entry_data[key] = [s.strip('"\' ') for s in field.split(',')]
|
|
|
|
def set_int_list_if_not_empty(self, key):
|
|
field = self.get_field(key)
|
|
if field is not None:
|
|
try:
|
|
self._entry_data[key] = [int(s.strip('"\' ')) for s in field.split(',')]
|
|
except:
|
|
self.add_error('field %s could not be parsed as int list' % key)
|
|
|
|
def set_str_if_not_empty(self, key):
|
|
field = self.get_field(key)
|
|
if field is not None:
|
|
self._entry_data[key] = field
|
|
|
|
def set_bool_if_not_empty(self, key):
|
|
field = self.get_field(key)
|
|
if field is not None:
|
|
self._entry_data[key] = field.lower() == "yes" or field.lower() == "true" or field.lower() == "y" or field.lower() == "t"
|
|
|
|
def set_int_if_not_empty(self, key):
|
|
field = self.get_field(key)
|
|
if field is not None:
|
|
try:
|
|
self._entry_data[key] = int(field)
|
|
except:
|
|
self.add_error('field %s could not be parsed as int' % key)
|
|
|
|
def add_error(self, message):
|
|
print('ERROR! %s' % message)
|
|
if self._mad not in self._errors:
|
|
self._errors[self._mad] = []
|
|
|
|
self._errors[self._mad].append(message)
|
|
|
|
def data(self):
|
|
return self._data
|
|
|
|
def repeated(self):
|
|
return {key: self._repeated[key] for key in self._repeated if len(self._repeated[key]) > 1}
|
|
|
|
def errors(self):
|
|
return self._errors
|
|
|
|
def create_orphan_branch(branch):
|
|
run_succesfully('git checkout -qf --orphan %s' % branch)
|
|
run_succesfully('git rm -rf .')
|
|
|
|
def force_push_file(file_name, branch):
|
|
run_succesfully('git add %s' % file_name)
|
|
run_succesfully('git commit -m "BOT: Releasing new MAD database." > /dev/null 2>&1 || true')
|
|
run_succesfully('git fetch origin %s > /dev/null 2>&1 || true' % branch)
|
|
if not run_conditional('git diff --exit-code %s origin/%s' % (branch, branch)):
|
|
print("There are changes to push.")
|
|
print()
|
|
|
|
run_succesfully('git push --force origin %s' % branch)
|
|
print()
|
|
print("New %s ready to be used." % file_name)
|
|
else:
|
|
print("Nothing to be updated.")
|
|
|
|
def save_data_to_compressed_json(db, json_name, zip_name):
|
|
|
|
with open(json_name, 'w') as f:
|
|
json.dump(db, f, sort_keys=True)
|
|
|
|
run_succesfully('touch -a -m -t 202108231405 %s' % json_name)
|
|
run_succesfully('zip -rq -D -X -9 -A --compression-method deflate %s %s' % (zip_name, json_name))
|
|
|
|
def hash(file):
|
|
with open(file, "rb") as f:
|
|
file_hash = hashlib.md5()
|
|
chunk = f.read(8192)
|
|
while chunk:
|
|
file_hash.update(chunk)
|
|
chunk = f.read(8192)
|
|
return file_hash.hexdigest()
|
|
|
|
def run_conditional(command):
|
|
result = subprocess.run(command, shell=True, stderr=subprocess.DEVNULL, stdout=subprocess.PIPE)
|
|
|
|
stdout = result.stdout.decode()
|
|
if stdout.strip():
|
|
print(stdout)
|
|
|
|
return result.returncode == 0
|
|
|
|
def run_succesfully(command):
|
|
result = subprocess.run(command, shell=True, stderr=subprocess.PIPE, stdout=subprocess.PIPE)
|
|
|
|
stdout = result.stdout.decode()
|
|
stderr = result.stderr.decode()
|
|
if stdout.strip():
|
|
print(stdout)
|
|
|
|
if stderr.strip():
|
|
print(stderr)
|
|
|
|
if result.returncode != 0:
|
|
raise Exception("subprocess.run Return Code was '%d'" % result.returncode)
|
|
|
|
if __name__ == '__main__':
|
|
main()
|