Finishing ExternalFilesReader

This commit is contained in:
José Manuel Barroso Galindo
2023-04-12 00:34:17 +02:00
committed by GitHub
parent b005ad2ac5
commit d560c98b5f

View File

@@ -14,6 +14,8 @@ import json
import hashlib
import shlex
import tempfile
import csv
from urllib.parse import urlparse
from argparse import ArgumentParser
from zipfile import ZipFile, ZIP_DEFLATED
from dataclasses import dataclass
@@ -149,7 +151,62 @@ class ExternalFilesReader:
self._strpath = strpath
def read_external_files(self) -> List[Tuple[Path, Dict[str, Any]]:
return []
if self._strpath == '':
return []
data = self._read_csv_data()
if data is None:
return []
print('Parsing ' + self._strpath + ' to extract external files.)
result = []
for row in data:
if len(row) < 4:
print('Not enough columns in this row, skipping it.', row)
continue
path, url, size, md5hash = row[0].strip(), row[1].strip(), row[2].strip(), row[3].strip()
if not is_valid_path(path):
print(f"Invalid path in this row: {path}, skipping it.", row)
continue
if not is_valid_url(url):
print(f"Invalid URL in this row: {url}, skipping it.", row)
continue
if not is_valid_size(size):
print(f"Invalid size in this row: {size}, skipping it.", row)
continue
if not is_valid_md5hash(md5hash):
print(f"Invalid MD5 hash in this row: {md5hash}, skipping it.", row)
continue
result.append((path, {"url": url, "size": int(size), "hash": md5hash}))
return result
def _read_csv_data(self) -> Optional[List[List[str]]]:
try:
with open(self._strpath, newline='') as csvfile:
csv_reader = csv.reader(csvfile, delimiter=',', quotechar='"')
return [row for row in csv_reader][1:]
except Exception as e:
print('csv file not opened: ' + self._strpath)
print(e)
return None
def is_valid_url(url: str) -> bool:
try:
result = urlparse(url)
return all([result.scheme, result.netloc])
except ValueError:
return False
def is_valid_path(path: str) -> bool:
return bool(re.match(r'^[\w\-/]{3,}$', path))
def is_valid_size(size: str) -> bool:
return size.isdigit() and int(size) > 0
def is_valid_md5hash(md5hash: str) -> bool:
return bool(re.match(r'^[a-fA-F0-9]{32}$', md5hash))
initial_filter_aliases = [
# Consoles