# -*- coding: utf-8 """Interfaces to Roundup XMLRPC server see documentation at http://roundup.sourceforge.net/docs/xmlrpc.html """ import os import random import re import ssl import sys import xmlrpc.client from contextlib import contextmanager from urllib.request import urlopen from aslint.baseutils import force_list from aslint.config import ASCFG from aslint.i18n import _ from aslint.logger import logger from .histor import InitHistor from .utils import get_productname, get_username, get_value # boolean values YES = 1 NO = 0 def _get_url(user="readonly", password="", debug=False): """Return the url of the xmlrpc server""" # url = "http://{userpass}{rex_url}/xmlrpc" url = "{rex_url}/xmlrpc{query}" kwargs = { "rex_url": ASCFG.get("rex.url") + (debug * "-test"), # 'userpass' : '', "query": "", } assert user, "user is required" # encoded = base64.encodebytes((user + ":" + password).encode()) # current problem with encoded creds... # kwargs['query'] = "?" + urlencode({'auth': encoded}) url = url.format(**kwargs) # ... so insert user:pass userpass = user + ":" + password + "@" url = url.replace("https://", "https://" + userpass) return url def _get_connection(user=None, password="", write=False, debug=False): """Return a connection to the Roundup server. If user/password are provided they are passed to the connection. If they are not provided an anonymous user is used if write is False. If write is True user/password are read from the configuration ([aster] section in ~/.gitconfig, option rex-write-user/rex-write-password).""" if not user: user = os.environ.get("REX_USERNAME", "readonly") if write: user = ASCFG.get("rex.write_user") or user password = ASCFG.get("rex.write_password") or password password = password or os.environ.get("REX_PASSWORD") assert password, _( "Valid REX credentials are required. " "Please define REX_USERNAME and REX_PASSWORD environment variables." ) url = _get_url(user, password, debug) if (sys.version_info.major, sys.version_info.minor) < (3, 5): server = xmlrpc.client.ServerProxy(url, allow_none=True) else: # do not check certificate - only used on private network ctxt = ssl.create_default_context() ctxt.check_hostname = False ctxt.verify_mode = ssl.CERT_NONE server = xmlrpc.client.ServerProxy(url, allow_none=True, context=ctxt) return server # This block allows to use the REX API with Python 3.4 on some old servers if sys.version_info < (3, 5): @contextmanager def get_connection(user=None, password="", write=False, debug=False): """Add a context manager to *ServerProxy* object for Python < 3.5.""" yield _get_connection(user, password, write, debug) else: def get_connection(user=None, password="", write=False, debug=False): """Simple wrapper to *_get_connection*.""" return _get_connection(user, password, write, debug) def get_expected_issues(branch, product=None, _unittest=False): """Return the id of the issues expected in a branch and, eventually, for some products. Arguments: branch (str): Keep only issues expected in this branch. product (str or list[str]): Keep only issues related to one or more products. Returns: dict({issue_number, issue}): Dict of filtered issues. """ with get_connection() as rex: product = force_list(product or []) try: status_id = rex.lookup("status", "valide_EDA") except xmlrpc.client.ProtocolError as err: print("A protocol error occurred") print("URL: %s" % err.url) print("HTTP/HTTPS headers: %s" % err.headers) print("Error code: %d" % err.errcode) print("Error message: %s" % err.errmsg) raise res = rex.filter("issue", None, {"status": status_id}) lbr = "dev" if branch == "default" else "expl" loth = "dev" if lbr == "expl" else "expl" issues = {} for issue_id in res: if issues and _unittest: break dv = rex.display("issue" + str(issue_id)) dv["id"] = int(issue_id) dv["assignedto"] = get_username(rex, dv["assignedto"]) dv["produit"] = get_productname(rex, dv["produit"]) if product and dv["produit"] not in product: continue dv["corrVdev"] = dv["corrVdev"] and 1 or 0 dv["verCorrVdev"] = dv["verCorrVdev"] and 1 or 0 dv["corrVexpl"] = dv["corrVexpl"] and 1 or 0 dv["verCorrVexpl"] = dv["verCorrVexpl"] and 1 or 0 if dv["corrV%s" % lbr] and dv["verCorrV%s" % lbr] and dv["corrV%s" % loth]: continue issues[dv["id"]] = dv return issues def check_status(issue_id, issue, status, rex, verbose=False, force=False, logfunc=logger.error): """Check the status of an issue. Arguments: issue_id (str): Identifier of the issue. issue (dict): Issue content to check. status (str): Expected status. rex (*ServerProxy*, optional): Use this connection to avoid to create a new one at each call. verbose (bool): If *True*, prints the status even it is ok. force (bool): Force to accept the status. logfunc (func): Function use to report the error. Returns: bool: *True* if the status is the expected one, *False* otherwise. """ issue_status = get_value(rex, "status", issue["status"], "name") ok = issue_status == status if verbose or not ok: try: dvp = get_value(rex, "user", issue["assignedto"], "username") except Exception: dvp = "?" if not ok and not force: logfunc( _("the status of issue #{0}, solved by {1}, is {2}, not {3}").format( issue_id, dvp, issue_status, status ) ) elif verbose: logger.info( _("the status of issue #{0}, solved by {1}, is {2}").format(issue_id, dvp, issue_status) ) return ok def get_documents(issue_id, rex): """Return the list of documents to modify resolving an issue. Arguments: issue_id (str): Identifier of the issue. rex (*ServerProxy*, optional): Use this connection to avoid to create a new one at each call. Returns: list[str]: List of the ids of documents (*keyword* field in *Docaster*). """ field = get_value(rex, "issue", issue_id, "impactDoc") or "" docs = re.sub("[/;,:]+", " ", field).split() return docs def mark_as_closed(issue_list, tagv, expl): """Mark a list of issues as closed issue_list: list of issues numbers tagv: tag inserted in issues expl: True if it concerns a stable version (exploitation)""" with get_connection(write=True) as server: typv = "dev" other = "expl" if expl: typv = "expl" other = "dev" d_champ = { "a_corrige": "corrV" + typv, "v_correct": "verCorrV" + typv, "a_tester": "corrV" + other, "v_tester": "verCorrV" + other, } req_status = "valide_EDA" for numf in issue_list: issue_id = "issue" + str(numf) issue = server.display(issue_id) changes = [] # 4.2. check issue values if not check_status(issue_id, issue, req_status, server): continue prod_name = get_value(server, "produit", issue["produit"], "name") if prod_name == "code_aster": type_name = get_value(server, "type", issue["type"], "name") if not issue[d_champ["a_corrige"]] and type_name != "aide utilisation": changes.append([d_champ["a_corrige"], YES]) logger.warn(_("{0} should not be solved in V{1}").format(issue_id, typv)) else: changes.append([d_champ["a_corrige"], NO]) changes.append([d_champ["a_tester"], NO]) # 4.3. fill issue fields changes.append([d_champ["v_correct"], tagv]) # 4.4. close issue ? if not issue[d_champ["a_tester"]] or not issue[d_champ["v_tester"]] in ("", None): new_status = server.lookup("status", "attente_doc") if not issue["impactDoc"]: new_status = server.lookup("status", "ferme") # 4.5 close issue changes.append(["status", new_status]) logger.info(_("closing {0}").format(issue_id)) else: logger.info(_("{0} must be solved in V{1}").format(issue_id, other)) args = ["{0}={1}".format(x, y) for x, y in changes] logger.debug("{0}: setting {1}".format(issue_id, args)) server.set(issue_id, *args) def validate_issue(issue_list, types=None): """Change the status of issues to 'valide_EDA' issue_list: list of issues numbers""" with get_connection(write=True) as server: req_status = "resolu" new_status = server.lookup("status", "valide_EDA") change = "status={0}".format(new_status) for numf in issue_list: issue_id = "issue" + str(numf) issue = server.display(issue_id) if types is not None: type_name = get_value(server, "type", issue["type"], "name") if type_name not in types: logger.info(_("{0}: skipped (type: {1})").format(issue_id, type_name)) continue # 4.2. check issue values status = get_value(server, "status", issue["status"], "name") if status == req_status: server.set(issue_id, change) logger.info(_("{0}: status changed").format(issue_id)) else: logger.error( _("{2}: expecting status '{0}', " "not '{1}': unchanged").format( req_status, status, issue_id ) ) def build_histor(issue_list, format, o_msg="last", req_status=None): """Build an Histor instance from a list of issues server: connection object to the Roundup server issue_list: list of issues numbers format: format of histor o_msg: show all messages or just the last one""" with get_connection() as server: histor = InitHistor(server, format=format, url=ASCFG.get("rex.url")) for numf in issue_list: issue_id = "issue" + str(numf) if req_status: issue = server.display(issue_id) if not check_status(issue_id, issue, req_status, server): continue msg_ids = server.display(issue_id, "messages")["messages"] msg_ids = [int(i) for i in msg_ids] msg_ids.sort() msg_ids.reverse() text = [] if o_msg != "all": # only keep the last one msg_ids = [max(msg_ids)] for i, msgid in enumerate(msg_ids): msg_name = "msg" + str(msgid) txt = server.display(msg_name, "content")["content"] if len(msg_ids) > 1: text.append(_("message {}:").format(len(msg_ids) - i)) text.append(txt) # add this issue to the histor object histor.AddIssue(numf, text) return repr(histor) def get_solved_issues_eda(): """Retreive the list of solved issues :return: ordered dict of solved issues grouped by developer and product""" from collections import OrderedDict with get_connection() as rex: # get all solved issues solv = rex.lookup("status", "resolu") solvedIssues = rex.filter("issue", None, {"status": solv}) # ids of products other than code_aster dprod = {} separated = [ "Edyos", "Groove", "Europlexus", "homard", "Eficas", "SALOME", "salome_meca", "Miss3D", "Rex", "devtools", "asterstudy", "asterxx", ] prod_ids = rex.list("produit", "id") logger.debug("products ids: {}".format(repr(prod_ids))) for idprod in prod_ids: prod = get_value(rex, "produit", idprod, "name") if prod in separated: dprod[idprod] = prod separ_ids = list(dprod.keys()) logger.debug("products ids: {}".format(repr(separ_ids))) # group issues by assignedto and products other than code_aster dict_issue = {} for num in solvedIssues: issue = rex.display("issue" + str(num), "assignedto", "produit", "activity") idresp = issue["assignedto"] idprod = issue["produit"] last = issue["activity"] if idprod in separ_ids: key = 1, dprod[idprod] else: key = 0, get_username(rex, idresp) dict_issue[key] = dict_issue.get(key, []) + [(int(num), idresp, idprod, last)] logger.debug( "key: {0} issue{1} resp:{2}, prod:{3}, {4}".format(key, num, idresp, idprod, last) ) # shuffle the list of developers lresp = [resp for i, resp in list(dict_issue.keys()) if i == 0] random.shuffle(lresp) logger.debug("list of assignees: {0}".format(repr(lresp))) dIssue = OrderedDict() # add developer first, issues sorted by activity for resp in lresp: values = sorted(dict_issue[0, resp], key=lambda t: t[3]) issues = [val[0] for val in values] dIssue[resp] = issues # add issues by product, sorted by developer lprod = sorted([resp for i, resp in list(dict_issue.keys()) if i == 1]) logger.debug("list of products: {0}".format(repr(lprod))) for prod in lprod: values = sorted(dict_issue[1, prod], key=lambda t: t[1]) issues = [val[0] for val in values] dIssue[prod] = issues return dIssue def extract_csv(entity): """Extract all issues content as csv data""" if entity == "issue": columns = [ "id", "creation", "activity", "assignedto", "type", "status", "creator", "title", "produit", "projet", "verCorrVdev", "corrVdev", "verCorrVexpl", "corrVexpl", "nbJours", "fauxVdev", "fauxVexpl", "intervenant", "etatIntervenant", "chiffrageCharge", "chiffrageDelai", "realiseCharge", "realiseDelai", "validReal", "validation", "versCible", "version", "impactDoc", "fichetude", ] elif entity == "user": columns = ["id", "username", "realname", "organisation", "address", "roles", "loginaster"] else: raise ValueError("unsupported entity: {0}".format(entity)) url = ( "{rex_url}/{entity}?@action=export_csv_names" "{columns}" "&:pagesize=50&:startwith=0" ).format( rex_url=ASCFG.get("rex.url"), entity=entity, columns="&:columns=" + ",".join(columns) if columns else "", ) logger.info(_("exporting data as csv...")) req = urlopen(url) content = req.read() logger.debug(_("{:10} bytes downloaded").format(len(content))) return str(content, "utf-8")