diff --git a/lib/api_roundup/histor.py b/lib/api_roundup/histor.py index 0d6adc62fb3b73c32120fb5948b0fb91ea69bbf9..ca6126a7424f98693cbdbb5b7316de400ca2f53a 100644 --- a/lib/api_roundup/histor.py +++ b/lib/api_roundup/histor.py @@ -1,6 +1,6 @@ # coding=utf-8 # -------------------------------------------------------------------- -# Copyright (C) 1991 - 2019 - EDF R&D - www.code-aster.org +# Copyright (C) 1991 - 2024 - EDF R&D - www.code-aster.org # This file is part of code_aster. # # code_aster is free software: you can redistribute it and/or modify @@ -24,7 +24,6 @@ Define classes representing an histor file from issues objects. import os import re -import datetime from xml.sax import saxutils from aslint.i18n import _ @@ -36,7 +35,7 @@ from .utils import format_date, get_value, get_realname NumberTypes = (float, int) EnumTypes = (list, tuple) -french_date_fmt = '%d/%m/%Y' +french_date_fmt = "%d/%m/%Y" class HistorError(Exception): @@ -44,18 +43,19 @@ class HistorError(Exception): class HISTOR(object): - """Class representing a "standard" histor i.e. as a text file. - """ + """Class representing a "standard" histor i.e. as a text file.""" + CR = os.linesep - format = 'text' - header = '' - footer = '' - separ = '=' * 80 + format = "text" + header = "" + footer = "" + separ = "=" * 80 seprep = CR * 2 - prefix = ' | ' + prefix = " | " mask_summary = "{id:>6s} {Sassignedto:<24s} {Stitle}" + CR - mask = (CR + -""" + mask = ( + CR + + """ {separ} RESTITUTION FICHE {id} DU {creation} {separ} @@ -71,11 +71,12 @@ class HISTOR(object): - RESTITUTION_VERSION_DEVELOPPEMENT : {corrVdev} - IMPACT_DOCUMENTAIRE : {impactDoc} - VALIDATION : {validation} -{fix_in}{nbJours}""") +{fix_in}{nbJours}""" + ) mask_nbjours = "- NB_JOURS_TRAV : {nbJours}" + CR - mask_tma = '- TMA : {tma}' + CR + mask_tma = "- TMA : {tma}" + CR mask_restit = "- DEJA RESTITUE DANS : {versCorr}" + CR - mask_user_begin = separ + CR + '--- AUTEUR {realname:<20s}' + CR * 2 + mask_user_begin = separ + CR + "--- AUTEUR {realname:<20s}" + CR * 2 mask_user_end = separ + CR def __init__(self, server, **kargs): @@ -89,12 +90,12 @@ class HISTOR(object): and escape all HTML characters.""" newd = {} for k, v in list(dico.items()): - if k == 'CR' or type(v) in NumberTypes: + if k == "CR" or type(v) in NumberTypes: newd[k] = v elif type(v) is str: newd[k] = convert(v) elif v is None: - newd[k] = '' + newd[k] = "" return newd def DictIssue(self, issueid, l_rep=None): @@ -102,79 +103,96 @@ class HISTOR(object): l_rep : list of the content of messages to include.""" # 0. store issue infos issue = "issue" + str(issueid) - dico = self._serv.display(issue, - 'id', 'creation', 'assignedto', 'type', 'produit', 'version', - 'title', 'messages', 'fauxVexpl', 'corrVdev', 'impactDoc', - 'validation', 'nbJours', 'corrVexpl', 'corrVdev', 'intervenant', - 'verCorrVexpl', 'verCorrVdev', 'fauxVdev') - - dico['separ'] = self.separ - if dico['creation'] is not None: - dico['creation'] = format_date(dico['creation']) - if dico['assignedto'] is not None: - userid = dico['assignedto'] - dico['assignedto'] = get_realname(self._serv, userid) - if dico['type'] is not None: - dico['type'] = get_value(self._serv, 'type', dico['type'], 'name') - if dico['produit'] is not None: - dico['produit'] = get_value(self._serv, 'produit', dico['produit'], 'name') - if dico['version'] is not None: - dico['version'] = get_value(self._serv, 'version', dico['version'], 'name') + dico = self._serv.display( + issue, + "id", + "creation", + "assignedto", + "type", + "produit", + "version", + "title", + "messages", + "fauxVexpl", + "corrVdev", + "impactDoc", + "validation", + "nbJours", + "corrVexpl", + "corrVdev", + "intervenant", + "verCorrVexpl", + "verCorrVdev", + "fauxVdev", + ) + + dico["separ"] = self.separ + if dico["creation"] is not None: + dico["creation"] = format_date(dico["creation"]) + if dico["assignedto"] is not None: + userid = dico["assignedto"] + dico["assignedto"] = get_realname(self._serv, userid) + if dico["type"] is not None: + dico["type"] = get_value(self._serv, "type", dico["type"], "name") + if dico["produit"] is not None: + dico["produit"] = get_value(self._serv, "produit", dico["produit"], "name") + if dico["version"] is not None: + dico["version"] = get_value(self._serv, "version", dico["version"], "name") # 1a. list of messages or last message if l_rep is not None and len(l_rep) > 0: if not type(l_rep) in EnumTypes: l_rep = [l_rep] l_rep = [indent(convert(s), self.prefix) for s in l_rep] - dico['messages'] = self.seprep.join(l_rep) + dico["messages"] = self.seprep.join(l_rep) else: - d_msg = dico['messages'] + d_msg = dico["messages"] msg_id = max(d_msg) - msgid = 'msg' + msg_id + msgid = "msg" + msg_id msg = self._serv.display(msgid) - dico['messages'] = msg['summary'] + '...' + dico["messages"] = msg["summary"] + "..." # 1b. text fields - for k in ('title', 'validation'): + for k in ("title", "validation"): dico[k] = convert(dico[k]) # 2. champs OUI/NON - for key in ('corrVdev', 'corrVexpl'): + for key in ("corrVdev", "corrVexpl"): if dico[key] not in (None, 0): - dico[key] = 'OUI' + dico[key] = "OUI" else: - dico[key] = 'NON' - for key in ('fauxVdev', 'fauxVexpl'): + dico[key] = "NON" + for key in ("fauxVdev", "fauxVexpl"): if dico[key] not in (None, 0): - dico[key] = 'OUI DEPUIS : {0}'.format(dico[key]) + dico[key] = "OUI DEPUIS : {0}".format(dico[key]) else: - dico[key] = 'NON' + dico[key] = "NON" # 3. "pour le compte de" field - if dico['intervenant'] is not None: - tma_name = get_value(self._serv, 'intervenant', dico['intervenant'], 'name') - dico['tma'] = self.mask_tma.format(tma=tma_name) + if dico["intervenant"] is not None: + tma_name = get_value(self._serv, "intervenant", dico["intervenant"], "name") + dico["tma"] = self.mask_tma.format(tma=tma_name) else: - dico['tma'] = None + dico["tma"] = None # 4. hide 'nbJours' field if there is an 'intervenant' - if dico['intervenant'] is None: - dico['nbJours'] = self.mask_nbjours.format(nbJours=dico['nbJours']) + if dico["intervenant"] is None: + dico["nbJours"] = self.mask_nbjours.format(nbJours=dico["nbJours"]) else: - dico['nbJours'] = '' + dico["nbJours"] = "" # 5. fixed in... val = "" - if dico['verCorrVexpl'] is not None: - val += dico['verCorrVexpl'] - if dico['verCorrVdev'] is not None: + if dico["verCorrVexpl"] is not None: + val += dico["verCorrVexpl"] + if dico["verCorrVdev"] is not None: if val: val += ", " - val += dico['verCorrVdev'] + val += dico["verCorrVdev"] if val: - dico['fix_in'] = self.mask_restit.format(versCorr=val) + dico["fix_in"] = self.mask_restit.format(versCorr=val) else: - dico['fix_in'] = "" + dico["fix_in"] = "" # 6. keep only strings dico = self.clean_dict(dico) @@ -184,12 +202,12 @@ class HISTOR(object): def summary_line(self, issue_dict): """Create a summary line for the issue.""" infos = dict(issue_dict) - infos['Stitle'] = infos['title'] - if len(infos['Stitle']) > 80: - infos['Stitle'] = infos['Stitle'][:80 - 3] + "..." - infos['Sassignedto'] = infos['assignedto'] or "?" - if len(infos['Sassignedto']) > 24: - infos['Sassignedto'] = infos['Sassignedto'][:24] + infos["Stitle"] = infos["title"] + if len(infos["Stitle"]) > 80: + infos["Stitle"] = infos["Stitle"][: 80 - 3] + "..." + infos["Sassignedto"] = infos["assignedto"] or "?" + if len(infos["Sassignedto"]) > 24: + infos["Sassignedto"] = infos["Sassignedto"][:24] return self.mask_summary.format(**infos) def AddIssue(self, issue, l_rep=None): @@ -201,8 +219,8 @@ class HISTOR(object): def DictUser(self, user): """Represent an user as a dict object.""" # required fields for formatting - dico = self._serv.display(user, 'loginaster', 'realname') - dico['CR'] = self.CR + dico = self._serv.display(user, "loginaster", "realname") + dico["CR"] = self.CR return dico def BeginUser(self, user): @@ -221,24 +239,22 @@ class HISTOR(object): """Representation of the histor object.""" txt = [self.header] if self.l_summ: - sep_line = self.summary_line({ - 'id': '-' * 6, 'assignedto': '-' * 24, 'title': '-' * 80}) + sep_line = self.summary_line({"id": "-" * 6, "assignedto": "-" * 24, "title": "-" * 80}) txt.append(sep_line) - txt.append(self.summary_line( - dict(id="FICHE ", assignedto="AUTEUR", title="TITRE"))) + txt.append(self.summary_line(dict(id="FICHE ", assignedto="AUTEUR", title="TITRE"))) txt.append(sep_line) txt.extend(self.l_summ) txt.append(sep_line) txt.extend(self.l_bloc) txt.append(self.footer) - return ''.join([convert(s) for s in txt]) + return "".join([convert(s) for s in txt]) class HISTOR_HTML(HISTOR): - """Class representing a histor in HTML format. - """ - CR = '<br />' - format = 'html' + """Class representing a histor in HTML format.""" + + CR = "<br />" + format = "html" header = """<!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN"> <html> <head> @@ -268,9 +284,11 @@ class HISTOR_HTML(HISTOR): footer = """</body> </html> """ - separ = '<hr />' + separ = "<hr />" prefix = " " - mask = separ + """ + mask = ( + separ + + """ <!-- @@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@ --> <div class="fiche"> @@ -297,6 +315,7 @@ VALIDATION </div> {fix_in}{CR}{nbJours}{CR}</div> """ + ) mask_user_begin = """ <!-- @@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@ --> {CR} @@ -305,7 +324,7 @@ VALIDATION </p> """ mask_nbjours = "NB_JOURS_TRAV : {nbJours}" - mask_tma = 'TMA : {tma}' + mask_tma = "TMA : {tma}" mask_restit = "DEJA RESTITUE DANS : {versCorr}" mask_user_end = separ + CR @@ -313,22 +332,21 @@ VALIDATION """Initialize histor object.""" super(HISTOR_HTML, self).__init__(server, **kargs) # specific arguments - self.server_url = kargs.get('url', None) - self.fmt_url_issue = '{0}/issue{{issue}}'.format( - saxutils.escape(self.server_url)) + self.server_url = kargs.get("url", None) + self.fmt_url_issue = "{0}/issue{{issue}}".format(saxutils.escape(self.server_url)) def clean_dict(self, dico): """Keep only elements of dict of type string and number, and escape all HTML characters.""" newd = {} for k, v in list(dico.items()): - if k == 'CR' or type(v) in NumberTypes: + if k == "CR" or type(v) in NumberTypes: newd[k] = v elif type(v) is str: newd[k] = saxutils.escape(convert(v)) elif v is None: - newd[k] = '' - if k == 'messages': + newd[k] = "" + if k == "messages": newd[k] = self.format_messages(newd[k]) return newd @@ -337,22 +355,25 @@ VALIDATION dico = HISTOR.DictIssue(self, issue, l_rep) # specific fields - dico['url_issue'] = self.fmt_url_issue.format(issue=issue) + dico["url_issue"] = self.fmt_url_issue.format(issue=issue) # multi-lines fields - for k in ('messages',): + for k in ("messages",): dico[k] = dico[k].replace(os.linesep, self.CR) - dico['CR'] = self.CR + dico["CR"] = self.CR return dico def format_messages(self, orig): """Format magic fields (example 'issue1234' as hyperlink)""" - magic = re.compile("(?P<class>issue) *(?P<id>[0-9]{2}[0-9]+)", - re.M | re.I) - text = magic.sub(r'<a href="{0}/\g<class>\g<id>">\g<class>\g<id></a>' - .format(saxutils.escape(self.server_url)), orig) + magic = re.compile("(?P<class>issue) *(?P<id>[0-9]{2}[0-9]+)", re.M | re.I) + text = magic.sub( + r'<a href="{0}/\g<class>\g<id>">\g<class>\g<id></a>'.format( + saxutils.escape(self.server_url) + ), + orig, + ) return text def AddIssue(self, issue, l_rep=None): @@ -362,20 +383,22 @@ VALIDATION class HISTOR_FSQ(HISTOR): - """Class representing issues for quality purpose. - """ + """Class representing issues for quality purpose.""" + CR = os.linesep - format = 'fsq' + format = "fsq" separ = CR prefix = " " - mask = (separ + -"""Fiche {id} : {title} + mask = ( + separ + + """Fiche {id} : {title} {messages} Risque de résultats faux depuis la version : {fauxVexpl} {fauxVdev}, corrigé en {corrVexpl} {corrVdev} -""") +""" + ) mask_user_begin = "" mask_user_end = "" @@ -384,8 +407,8 @@ Risque de résultats faux depuis la version : {fauxVexpl} {fauxVdev}, corrigé e dico = HISTOR.DictIssue(self, issue, l_rep) # clean response - for k in ('messages',): - dico[k] = re.sub(' +', ' ', dico[k]) + for k in ("messages",): + dico[k] = re.sub(" +", " ", dico[k]) dico[k] = dico[k].strip() return dico @@ -396,15 +419,15 @@ Risque de résultats faux depuis la version : {fauxVexpl} {fauxVdev}, corrigé e self.l_bloc.append(self.mask.format(**issue_dict)) -def InitHistor(server, format='text', **kargs): +def InitHistor(server, format="text", **kargs): """Return a histor object according to `format`. server: ServerProxy object to connect the XMLRPC server """ - if format == 'text': + if format == "text": return HISTOR(server, **kargs) - elif format == 'html': + elif format == "html": return HISTOR_HTML(server, **kargs) - elif format == 'fsq': + elif format == "fsq": return HISTOR_FSQ(server, **kargs) else: - raise HistorError(_('Unknown histor format : {0}').format(format)) + raise HistorError(_("Unknown histor format : {0}").format(format)) diff --git a/lib/api_roundup/rex.py b/lib/api_roundup/rex.py index 7c2be1fc67ae479bcb6e45385ac87428999dd047..fde974db69f4fb76e9018836fd08abfe163c9a9a 100644 --- a/lib/api_roundup/rex.py +++ b/lib/api_roundup/rex.py @@ -5,6 +5,7 @@ see documentation at http://roundup.sourceforge.net/docs/xmlrpc.html """ +import os import random import re import ssl @@ -26,7 +27,7 @@ YES = 1 NO = 0 -def _get_url(user="readonly", password="readonly", debug=False): +def _get_url(user="readonly", password="", debug=False): """Return the url of the xmlrpc server""" # url = "http://{userpass}{rex_url}/xmlrpc" url = "{rex_url}/xmlrpc{query}" @@ -53,10 +54,15 @@ def _get_connection(user=None, password="", write=False, debug=False): If write is True user/password are read from the configuration ([aster] section in ~/.gitconfig, option rex-write-user/rex-write-password).""" if not user: - user = password = "readonly" + user = os.environ.get("REX_USERNAME", "readonly") if write: user = ASCFG.get("rex.write_user") or user password = ASCFG.get("rex.write_password") or password + password = password or os.environ.get("REX_PASSWORD") + assert password, _( + "Valid REX credentials are required. " + "Please define REX_USERNAME and REX_PASSWORD environment variables." + ) url = _get_url(user, password, debug) if (sys.version_info.major, sys.version_info.minor) < (3, 5): server = xmlrpc.client.ServerProxy(url, allow_none=True) @@ -284,9 +290,7 @@ def build_histor(issue_list, format, o_msg="last", req_status=None): text = [] if o_msg != "all": # only keep the last one - msg_ids = [ - max(msg_ids), - ] + msg_ids = [max(msg_ids)] for i, msgid in enumerate(msg_ids): msg_name = "msg" + str(msgid) txt = server.display(msg_name, "content")["content"] diff --git a/lib/api_roundup/stats.py b/lib/api_roundup/stats.py index e8618ba910d68bc2f3bb772ed8c6cfecfa4b2f71..1352925d6879644f0a4abfcaa707ee6aac3aa9a1 100644 --- a/lib/api_roundup/stats.py +++ b/lib/api_roundup/stats.py @@ -8,7 +8,6 @@ Helper functions and objects to make a statistics report import os import os.path as osp -import tempfile import pickle import re from collections import UserDict @@ -16,22 +15,40 @@ from datetime import datetime import time from glob import glob -from aslint.logger import logger, setlevel +from aslint.logger import logger from aslint.i18n import _ from aslint.config import ASCFG from aslint.utils import convert from .rex import extract_csv from .utils import read_issue_changelog -ASTER_TEAM = '|'.join([ - 'abbas', 'bereux', 'courtois', 'delmas', 'kudawoo', 'fournier', - 'desoza', 'lefebvre', 'pellet', 'sellenet', 'drouet', - 'assire', 'durand', - 'lebouvier', 'rezette', - 'macocco', 'chansard', 'ladier', - 'cheignon', 'lecorvec', 'bourcet']) - -CACHE_USERS = osp.join(ASCFG.get('devtools_cache'), 'rex_users.pick') +ASTER_TEAM = "|".join( + [ + "abbas", + "bereux", + "courtois", + "delmas", + "kudawoo", + "fournier", + "desoza", + "lefebvre", + "pellet", + "sellenet", + "drouet", + "assire", + "durand", + "lebouvier", + "rezette", + "macocco", + "chansard", + "ladier", + "cheignon", + "lecorvec", + "bourcet", + ] +) + +CACHE_USERS = osp.join(ASCFG.get("devtools_cache"), "rex_users.pick") class RexEntity(UserDict): @@ -43,23 +60,27 @@ class RexEntity(UserDict): for key, value in list(args.items()): if not value: args[key] = "''" - return os.linesep.join([ - "Issue {id}, {type} ({status}) created at {creation} by {creator} " + return os.linesep.join( + [ + "Issue {id}, {type} ({status}) created at {creation} by {creator} " "for {produit} {version} (target {versCible})", - "Title: {title}", - "Solved by {assignedto} (last changes {activity}) for " + "Title: {title}", + "Solved by {assignedto} (last changes {activity}) for " "project {projet} in {verCorrVexpl} and {verCorrVdev}, " "cost {nbJours} days", - ]).format(**args) + ] + ).format(**args) def summary_reminder(self): """Return a representation of the issue content""" - return os.linesep.join([ - "Fiche {id}, {type} créée le {creation} " + return os.linesep.join( + [ + "Fiche {id}, {type} créée le {creation} " "par {creator} (version cible {versCible})", - " Titre: {title}", - " " + ASCFG.get('rex_url') + "/issue{id}", - ]).format(**self) + " Titre: {title}", + " " + ASCFG.get("rex_url") + "/issue{id}", + ] + ).format(**self) class EntityList(UserDict): @@ -78,56 +99,51 @@ class EntityList(UserDict): def filter_equal(self, key, value): """Filter the list, keep issues where 'key' is equal to 'value'""" - return EntityList(dict([(id, issue) for id, issue in self.items() \ - if issue[key] == value])) + return EntityList(dict([(id, issue) for id, issue in self.items() if issue[key] == value])) def filter_not_equal(self, key, value): """Filter the list, keep issues where 'key' is not equal to 'value'""" - return EntityList(dict([(id, issue) for id, issue in self.items() \ - if issue[key] != value])) + return EntityList(dict([(id, issue) for id, issue in self.items() if issue[key] != value])) def filter_less(self, key, value): """Filter the list, keep issues where 'key' is less than 'value'""" - return EntityList(dict([(id, issue) for id, issue in self.items() \ - if issue[key] < value])) + return EntityList(dict([(id, issue) for id, issue in self.items() if issue[key] < value])) def filter_greater(self, key, value): """Filter the list, keep issues where 'key' is greater than 'value'""" - return EntityList(dict([(id, issue) for id, issue in self.items() \ - if issue[key] > value])) + return EntityList(dict([(id, issue) for id, issue in self.items() if issue[key] > value])) def filter_regexp(self, key, regexp): """Filter the list, keep issues where 'key' value match a regular expression""" exp = re.compile(regexp) - return EntityList(dict([(id, issue) for id, issue in self.items() \ - if exp.search(issue[key])])) + return EntityList( + dict([(id, issue) for id, issue in self.items() if exp.search(issue[key])]) + ) def filter_type(self, typ): """Filter the list of issues by type""" - if typ in ('all', '*'): + if typ in ("all", "*"): by_type = self.copy() - elif typ == 'anomalie': - by_type = self.filter_equal('type', 'anomalie') | \ - self.filter_equal('type', 'express') - elif typ == 'anomalie-vdev': - by_type = (self.filter_equal('type', 'anomalie') | \ - self.filter_equal('type', 'express')) & \ - self.not_empty('verCorrVdev') & \ - self.empty('verCorrVexpl') + elif typ == "anomalie": + by_type = self.filter_equal("type", "anomalie") | self.filter_equal("type", "express") + elif typ == "anomalie-vdev": + by_type = ( + (self.filter_equal("type", "anomalie") | self.filter_equal("type", "express")) + & self.not_empty("verCorrVdev") + & self.empty("verCorrVexpl") + ) else: - by_type = self.filter_equal('type', typ) + by_type = self.filter_equal("type", typ) return by_type def empty(self, key): """Filter the list, keep issues where 'key' value is empty""" - return EntityList(dict([(id, issue) for id, issue in self.items() \ - if not issue[key]])) + return EntityList(dict([(id, issue) for id, issue in self.items() if not issue[key]])) def not_empty(self, key): """Filter the list, keep issues where 'key' value is empty""" - return EntityList(dict([(id, issue) for id, issue in self.items() \ - if issue[key]])) + return EntityList(dict([(id, issue) for id, issue in self.items() if issue[key]])) def deepcopy(self): """Create a new EntityList with the copy of each issue""" @@ -137,28 +153,29 @@ class EntityList(UserDict): """Add a field containing the elapsed time between creation and closure (or last activity if closure is not known)""" for issue in self.values(): - creat = issue['creation'] - end = closure_date.get(int(issue['id']), issue['activity']) - issue['_closure'] = end + creat = issue["creation"] + end = closure_date.get(int(issue["id"]), issue["activity"]) + issue["_closure"] = end # offset between eda and changelog date - issue['_delay'] = max(0, (str2date(end) - str2date(creat)).days - 7) + issue["_delay"] = max(0, (str2date(end) - str2date(creat)).days - 7) def add_delay_reminder(self): """Add a field containing the elapsed time since the last activity on a issue""" for issue in self.values(): - start = issue['activity'] + start = issue["activity"] end = time.strftime("%Y-%m-%d") - issue['_delay'] = (str2date(end) - str2date(start)).days + issue["_delay"] = (str2date(end) - str2date(start)).days def getValues(self, key): """Return unique values of a field""" return list(set([issue[key] for issue in self.values()])) -def csv_as_dict(content, header=1, delimiter=',', encoding=None): +def csv_as_dict(content, header=1, delimiter=",", encoding=None): """Read a CSV file and return a list of dicts""" import csv + keys = [] rows = [] lines = content.splitlines() @@ -170,19 +187,22 @@ def csv_as_dict(content, header=1, delimiter=',', encoding=None): keys = values continue if reader.line_num <= header: - logger.info(_("Ignore line #{0}: {1}") - .format(reader.line_num, delimiter.join(values))) + logger.info(_("Ignore line #{0}: {1}").format(reader.line_num, delimiter.join(values))) continue try: values = [v.strip() for v in values] drow = dict(list(zip(keys, values))) rows.append(drow) except: - logger.error(_("Error line #{0}:\n keys: {1}\n values: {2}") - .format(reader.line_num, keys, values)) + logger.error( + _("Error line #{0}:\n keys: {1}\n values: {2}").format( + reader.line_num, keys, values + ) + ) logger.info(_("{0:4} lines read").format(len(rows))) return keys, rows + def download(filename, entity): """Extract the data from REX and write a pickled file containing the content as a list of dicts.""" @@ -191,62 +211,69 @@ def download(filename, entity): rows = [i for i in rows if i] issues = EntityList() for iss in rows: - issues[iss['id']] = RexEntity(iss) + issues[iss["id"]] = RexEntity(iss) write_dict(filename, issues) + # downloading all users infos need admin access def get_users(force=False): """Return the list of all users""" if not force and osp.isfile(CACHE_USERS): - with open(CACHE_USERS, 'rb') as pick: + with open(CACHE_USERS, "rb") as pick: users = pickle.load(pick) logger.info(_("{1:6} users read from {0}").format(CACHE_USERS, len(users))) else: from . import get_connection + with get_connection() as rex: - usernames = rex.list('user') + usernames = rex.list("user") users = EntityList() for dvp in sorted(usernames): logger.debug(_("reading user {0}...").format(dvp)) users[dvp] = download_user(dvp) - with open(CACHE_USERS, 'wb') as pick: + with open(CACHE_USERS, "wb") as pick: pickle.dump(users, pick) return users + def download_user(username): """Return the users""" from . import get_connection + with get_connection() as server: - userid = server.filter('user', None, { 'username': username }) - match_users = [RexEntity(server.display('user' + id)) for id in userid] + userid = server.filter("user", None, {"username": username}) + match_users = [RexEntity(server.display("user" + id)) for id in userid] - match_users = [user for user in match_users if user['username'] == username] + match_users = [user for user in match_users if user["username"] == username] if not match_users: - raise ValueError('user not found: {0}'.format(username)) + raise ValueError("user not found: {0}".format(username)) assert len(match_users) == 1, match_users return match_users[0] + def write_dict(filename, issues): """Write the REX data into a pickle file""" _clean_dict(issues) - with open(filename, 'wb') as pick: + with open(filename, "wb") as pick: pickle.dump(issues, pick) logger.info(_("data written into {0}").format(filename)) + def read_dict(filename): """Read a pickled file containing the REX data""" - with open(filename, 'rb') as pick: + with open(filename, "rb") as pick: issues = pickle.load(pick) logger.info(_("{1:6} issues read from {0}").format(filename, len(issues))) return issues + def _clean_dict(issues): """Clean the data""" re_date = re.compile("([0-9]{4}\-[0-9]{2}\-[0-9]{2})\.") for line in list(issues.values()): for key, value in list(line.items()): # remove working fields - if key.startswith('_'): + if key.startswith("_"): del line[key] continue if value.strip() in ("None", ""): @@ -256,19 +283,23 @@ def _clean_dict(issues): if mat: line[key] = mat.group(1) + def str2date(date): """Convert a date from format "Y-m-d" to a datetime object""" - return datetime(*[int(i) for i in date.split('-')]) + return datetime(*[int(i) for i in date.split("-")]) + def _diff_days(date1, date2): """Return the number of days between date1 and date2""" return (str2date(date2) - str2date(date1)).days + def tofloat(string): """Convert the argument to float""" if type(string) is float: return string - return float(string.replace(',', '.').strip() or 0) + return float(string.replace(",", ".").strip() or 0) + def centile(values, k): """Return the k-th centile of the values (k in %)""" @@ -279,16 +310,18 @@ def centile(values, k): values = [0] return values[int(k * len(values) / 100)] -def sum_days(issues, sumfield='nbJours'): + +def sum_days(issues, sumfield="nbJours"): """Return the sum of days of work""" return sum([tofloat(i[sumfield]) for i in issues.values()]) + def extract_closure(pattern): """Extract closure date of issues from changelog files""" re_date = re.compile(r"([0-9]{4}\-[0-9]{2}\-[0-9]{2})") closure = {} for fname in glob(pattern): - with open(fname, 'rb') as fobj: + with open(fname, "rb") as fobj: txt = convert(fobj.read()) mat = re_date.search(txt) if not mat: @@ -298,82 +331,109 @@ def extract_closure(pattern): closure.update(dict.fromkeys(issues, date)) return closure -def summary(issues, title, sumfield='nbJours', with_team=True): + +def summary(issues, title, sumfield="nbJours", with_team=True): """Print a summary of issues""" nb = max(1, len(issues)) - delay = [i['_delay'] for i in issues.values()] - mean_delay = 1. * sum(delay) / nb + delay = [i["_delay"] for i in issues.values()] + mean_delay = 1.0 * sum(delay) / nb nbday = sum_days(issues, sumfield) mean_nbday = nbday / nb - by_dvp = issues.filter_regexp('creator', ASTER_TEAM) + by_dvp = issues.filter_regexp("creator", ASTER_TEAM) print(title) - print(("{0:5} fiches / {1:6.1f} j, {2:6.1f} par fiche, delai moyen {3:6.1f} " - "(80% < {4} j/ 90% < {5} j)").format( - len(issues), nbday, mean_nbday, mean_delay, - centile(delay, 80), centile(delay, 90))) + print( + ( + "{0:5} fiches / {1:6.1f} j, {2:6.1f} par fiche, delai moyen {3:6.1f} " + "(80% < {4} j/ 90% < {5} j)" + ).format(len(issues), nbday, mean_nbday, mean_delay, centile(delay, 80), centile(delay, 90)) + ) if with_team: print("dont {0:5} fiches émises par les développeurs").format(len(by_dvp)) -def developer_stats(issues, title, sumfield='nbJours'): + +def developer_stats(issues, title, sumfield="nbJours"): """Statistics by developer""" - list_dvp = sorted(issues.getValues('assignedto')) + list_dvp = sorted(issues.getValues("assignedto")) if title: print(title) for dvp in list_dvp: - solved = issues.filter_equal('assignedto', dvp) + solved = issues.filter_equal("assignedto", dvp) if len(solved) == 0: continue nb = max(1, len(solved)) - delay = [i['_delay'] for i in solved.values()] - mean_delay = 1. * sum(delay) / nb + delay = [i["_delay"] for i in solved.values()] + mean_delay = 1.0 * sum(delay) / nb nbday = sum_days(solved, sumfield) mean_nbday = nbday / nb - dvp = dvp or '-' - print(("{0:5} fiches pour {1:6.1f} j à {6:10}, {2:6.1f} par fiche, delai moyen {3:6.1f} " - "(50% < {4} j/ 80% < {5} j)").format( - len(solved), nbday, mean_nbday, mean_delay, - centile(delay, 50), centile(delay, 80), dvp)) - -def project_stats(issues, title, sumfield='nbJours'): + dvp = dvp or "-" + print( + ( + "{0:5} fiches pour {1:6.1f} j à {6:10}, {2:6.1f} par fiche, delai moyen {3:6.1f} " + "(50% < {4} j/ 80% < {5} j)" + ).format( + len(solved), + nbday, + mean_nbday, + mean_delay, + centile(delay, 50), + centile(delay, 80), + dvp, + ) + ) + + +def project_stats(issues, title, sumfield="nbJours"): """Statistics by project""" - list_project = sorted(issues.getValues('projet')) + list_project = sorted(issues.getValues("projet")) if title: print(title) total = sum_days(issues, sumfield) print("Nb jours total : {0:6.1f}".format(total)) for proj in list_project: - solved = issues.filter_equal('projet', proj) + solved = issues.filter_equal("projet", proj) _project_summary(proj, solved, total) - psm = issues.empty('projet') | \ - issues.filter_equal('projet', 'AOM') | \ - issues.filter_regexp('projet', '^ASTER') | \ - issues.filter_equal('projet', 'Bonnes idées') | \ - issues.filter_equal('projet', 'CODE') | \ - issues.filter_equal('projet', 'VALIDATION') - _project_summary('projet PSM', psm, total) - p3m = issues.filter_equal('projet', 'ISTA') | \ - issues.filter_equal('projet', 'Méthodes Numériques') | \ - issues.filter_equal('projet', '5% AMA') - _project_summary('projet 3M', p3m, total) + psm = ( + issues.empty("projet") + | issues.filter_equal("projet", "AOM") + | issues.filter_regexp("projet", "^ASTER") + | issues.filter_equal("projet", "Bonnes idées") + | issues.filter_equal("projet", "CODE") + | issues.filter_equal("projet", "VALIDATION") + ) + _project_summary("projet PSM", psm, total) + p3m = ( + issues.filter_equal("projet", "ISTA") + | issues.filter_equal("projet", "Méthodes Numériques") + | issues.filter_equal("projet", "5% AMA") + ) + _project_summary("projet 3M", p3m, total) + def extern_stats(issues): """Statistics by external partner""" - list_ext = sorted(issues.getValues('intervenant')) + list_ext = sorted(issues.getValues("intervenant")) for tma in list_ext: if not tma: continue - solved = issues.filter_equal('intervenant', tma) + solved = issues.filter_equal("intervenant", tma) nbday = sum_days(solved) - nbreal = sum_days(solved, 'realiseCharge') - print('Fiches {0} (vérif nbJours = {1:6.1f} / realiseCharge = {2:6.1f})' \ - .format(tma, nbday, nbreal)) - summary(solved, "Type = all", sumfield='realiseCharge', with_team=False) - developer_stats(solved, "", sumfield='realiseCharge') + nbreal = sum_days(solved, "realiseCharge") + print( + "Fiches {0} (vérif nbJours = {1:6.1f} / realiseCharge = {2:6.1f})".format( + tma, nbday, nbreal + ) + ) + summary(solved, "Type = all", sumfield="realiseCharge", with_team=False) + developer_stats(solved, "", sumfield="realiseCharge") + -def _project_summary(project, issues, total, sumfield='nbJours'): +def _project_summary(project, issues, total, sumfield="nbJours"): """Print a summary line for a project""" nbday = sum_days(issues, sumfield) - project = project or '-' + project = project or "-" ratio = nbday / total * 100 - print("{0:5} fiches pour {1:25}, soit {2:6.1f} j, {3:5.1f} %".format( - len(issues), project, nbday, ratio)) + print( + "{0:5} fiches pour {1:25}, soit {2:6.1f} j, {3:5.1f} %".format( + len(issues), project, nbday, ratio + ) + ) diff --git a/lib/api_roundup/utils.py b/lib/api_roundup/utils.py index 01ef5fd30fe9f3465b3e4283027d0927e639d9b4..1dce294ac5c0e4cdf161a71a56949b1a09d0bbf8 100644 --- a/lib/api_roundup/utils.py +++ b/lib/api_roundup/utils.py @@ -1,6 +1,6 @@ # coding=utf-8 # -------------------------------------------------------------------- -# Copyright (C) 1991 - 2019 - EDF R&D - www.code-aster.org +# Copyright (C) 1991 - 2024 - EDF R&D - www.code-aster.org # This file is part of code_aster. # # code_aster is free software: you can redistribute it and/or modify @@ -24,7 +24,6 @@ Convenient functions to manipulate values returned by the Roundup XMLRPC server. import re from datetime import datetime -from aslint.logger import logger from aslint.i18n import _ from aslint.utils import convert @@ -43,6 +42,7 @@ def get_value(server, klass, idx, field): """ return server.display(klass + str(idx), field)[field] + def get_username(server, idx): """Return the username. @@ -53,7 +53,8 @@ def get_username(server, idx): Returns: str: Username with the given id. """ - return get_value(server, 'user', idx, "username") + return get_value(server, "user", idx, "username") + def get_realname(server, idx): """Return the realname of a user. @@ -65,7 +66,8 @@ def get_realname(server, idx): Returns: str: Realname of the user with the given id. """ - return get_value(server, 'user', idx, "realname") + return get_value(server, "user", idx, "realname") + def get_productname(server, idx): """Return the name of a product. @@ -77,25 +79,28 @@ def get_productname(server, idx): Returns: str: Name of the product. """ - return get_value(server, 'produit', idx, "name") + return get_value(server, "produit", idx, "name") + def format_date(date): """Return the date in a common format""" return datetime.strptime(date, "<Date %Y-%m-%d.%H:%M:%S.%f>").strftime("%d/%m/%Y") + def read_issue_list(fname): """Read the issues numbers from 'fname' One number at each beginning of lines.""" - with open(fname, 'r') as fobj: + with open(fname, "r") as fobj: content = fobj.read() - expr = re.compile('^ *([0-9]{4}[0-9]?)', re.MULTILINE) + expr = re.compile("^ *([0-9]{4}[0-9]?)", re.MULTILINE) issues = [int(i) for i in expr.findall(content)] return issues + def read_issue_changelog(fname): """Read the issues numbers from the changelog 'fname'""" - with open(fname, 'rb') as fobj: + with open(fname, "rb") as fobj: hist_content = convert(fobj.read()) - expr = re.compile('RESTITUTION FICHE +([0-9]+)', re.MULTILINE) + expr = re.compile("RESTITUTION FICHE +([0-9]+)", re.MULTILINE) issues = [int(i) for i in expr.findall(hist_content)] return issues diff --git a/share/test/test_api/test_rex.py b/share/test/test_api/test_rex.py index 85ccb894fc6467d1e4331401e6d4f633462dccf4..8a36ccd86cc1b5203129d9bb2eb3f335326c35d2 100644 --- a/share/test/test_api/test_rex.py +++ b/share/test/test_api/test_rex.py @@ -8,45 +8,47 @@ import base64 import unittest from urllib.parse import urlencode -from api_roundup import (build_histor, get_connection, get_documents, - get_expected_issues) +from api_roundup import build_histor, get_connection, get_documents, get_expected_issues from api_roundup.rex import _get_url from hamcrest import * def test_get_url(): """check url function""" - url0 = "https://readonly:readonly@aster.retd.edf.fr/rex" - # query = "?" + urlencode({'auth': base64.encodebytes(b"readonly:readonly")}) + url0 = "https://readonly:@aster.retd.edf.fr/rex" + # query = "?" + urlencode({'auth': base64.encodebytes(b"readonly:")}) query = "" assert_that(_get_url(), equal_to(url0 + "/xmlrpc" + query)) assert_that(_get_url(debug=True), equal_to(url0 + "-test/xmlrpc" + query)) # query = "?" + urlencode({'auth': base64.encodebytes(b"admin:pass")}) # assert_that(_get_url('admin', 'pass'), equal_to(url0 + "/xmlrpc" + query)) + def test_connection_schema(): """check the connection and the schema of the tracker""" with get_connection() as serv: schema = serv.schema() - assert_that('status', is_in(list(schema.keys()))) - assert_that('issue', is_in(list(schema.keys()))) - assert_that('produit', is_in(list(schema.keys()))) - assert_that('projet', is_in(list(schema.keys()))) + assert_that("status", is_in(list(schema.keys()))) + assert_that("issue", is_in(list(schema.keys()))) + assert_that("produit", is_in(list(schema.keys()))) + assert_that("projet", is_in(list(schema.keys()))) + def test_basic_request(): """check a basic request""" with get_connection() as serv: - status = serv.list('status') + status = serv.list("status") assert_that(len(status), greater_than(3)) - assert_that(status, has_item('emis')) + assert_that(status, has_item("emis")) + def test_build_histor(): """check the build of histor text""" issue_list = [10000, 20000] - histor = build_histor(issue_list, 'text') - assert_that(histor, contains_string('RESTITUTION FICHE 20000')) - assert_that(histor, - contains_string('dans le catalogue phenomene_modelisation')) + histor = build_histor(issue_list, "text") + assert_that(histor, contains_string("RESTITUTION FICHE 20000")) + assert_that(histor, contains_string("dans le catalogue phenomene_modelisation")) + def test_expected_issues(): """check list of expected issues""" @@ -55,17 +57,16 @@ def test_expected_issues(): if issues: ids = list(issues.keys()) issue_test = issues[ids[0]] - assert_that(issue_test['status'], equal_to('6')) + assert_that(issue_test["status"], equal_to("6")) - issues = get_expected_issues("default", - product=("code_aster", "salome_meca"), - _unittest=True) + issues = get_expected_issues("default", product=("code_aster", "salome_meca"), _unittest=True) n_ca = len(issues) assert_that(n_all, greater_than_or_equal_to(n_ca)) if issues: ids = list(issues.keys()) - prods = [issues[i]['produit'] for i in ids] - assert_that(prods, only_contains('code_aster', 'salome_meca')) + prods = [issues[i]["produit"] for i in ids] + assert_that(prods, only_contains("code_aster", "salome_meca")) + def test_expected_documents(): """check for expected documents""" @@ -79,5 +80,6 @@ def test_expected_documents(): if __name__ == "__main__": import sys from testutils import get_test_suite + RET = unittest.TextTestRunner(verbosity=2).run(get_test_suite(__name__)) sys.exit(not RET.wasSuccessful())