"""Misc. regolith tools.
"""
import email.utils
import os
import pathlib
import platform
import re
import sys
import requests
import uuid
from copy import copy
from copy import deepcopy
from datetime import datetime, date
from dateutil import parser as date_parser
from dateutil.relativedelta import relativedelta
from habanero import Crossref
from googleapiclient.discovery import build
from google_auth_oauthlib.flow import InstalledAppFlow
from google.auth.transport.requests import Request
from google.oauth2.credentials import Credentials
from urllib.parse import urlparse
from regolith.dates import month_to_int, date_to_float, get_dates, is_current
from regolith.sorters import id_key, ene_date_key, \
doc_date_key_high
from regolith.schemas import APPOINTMENTS_TYPES, PRESENTATION_TYPES, \
PRESENTATION_STATI, OPTIONAL_KEYS_INSTITUTIONS
from requests.exceptions import HTTPError, ConnectionError
try:
from bibtexparser.bwriter import BibTexWriter
from bibtexparser.bibdatabase import BibDatabase
HAVE_BIBTEX_PARSER = True
except ImportError:
HAVE_BIBTEX_PARSER = False
LATEX_OPTS = ["-halt-on-error", "-file-line-error"]
if sys.version_info[0] >= 3:
string_types = (str, bytes)
unicode_type = str
else:
pass
DEFAULT_ENCODING = sys.getdefaultencoding()
ON_WINDOWS = platform.system() == "Windows"
ON_MAC = platform.system() == "Darwin"
ON_LINUX = platform.system() == "Linux"
ON_POSIX = os.name == "posix"
[docs]def dbdirname(db, rc):
"""Gets the database dir name."""
if db.get("local", False) is False:
dbsdir = os.path.join(rc.builddir, "_dbs")
dbdir = os.path.join(dbsdir, db["name"])
else:
dbdir = db["url"]
return dbdir
[docs]def dbpathname(db, rc):
"""Gets the database path name."""
dbdir = dbdirname(db, rc)
dbpath = os.path.join(dbdir, db["path"])
return dbpath
[docs]def fallback(cond, backup):
"""Decorator for returning the object if cond is true and a backup if
cond is false. """
def dec(obj):
return obj if cond else backup
return dec
[docs]def all_docs_from_collection(client, collname, copy=True):
"""Yield all entries in all collections of a given name in a given
database. """
yield from client.all_documents(collname, copy=copy)
SHORT_MONTH_NAMES = (
None,
"Jan",
"Feb",
"Mar",
"Apr",
"May",
"Jun",
"Jul",
"Aug",
"Sept",
"Oct",
"Nov",
"Dec",
)
[docs]def date_to_rfc822(y, m, d=1):
"""Converts a date to an RFC 822 formatted string."""
d = datetime(int(y), month_to_int(m), int(d))
return email.utils.format_datetime(d)
[docs]def rfc822now():
"""Creates a string of the current time according to RFC 822."""
now = datetime.utcnow()
return email.utils.format_datetime(now)
[docs]def gets(seq, key, default=None):
"""Gets a key from every element of a sequence if possible."""
for x in seq:
yield x.get(key, default)
[docs]def month_and_year(m=None, y=None):
"""Creates a string from month and year data, if available."""
if y is None:
return "present"
if m is None:
return str(y)
m = month_to_int(m)
return "{0} {1}".format(SHORT_MONTH_NAMES[m], y)
[docs]def get_team_from_grant(grantcol):
for grant in grantcol:
return gets(grant["team"], "name")
[docs]def filter_publications(citations, authors, reverse=False, bold=True,
since=None, before=None, ackno=False,
grants=None):
"""Filter publications by the author(s)/editor(s)
Parameters
----------
citations : list of dict
The publication citations
authors : set of str
The authors to be filtered against
reverse : bool, optional
If True reverse the order, defaults to False
bold : bool, optional
If True put latex bold around the author(s) in question
since : date, optional
The date after which papers must have been published
before : date, optional
The date before which papers must have been published
ackno : bool
Move the acknowledgement statement to note so that it is displayed in the
publication list
grants : string or list of strings, optional
The grant or grants to filter over
"""
pubs_by_date, pubs_by_grant = [], []
if not isinstance(citations, list):
citations = list(citations)
cites = deepcopy(citations)
for pub in cites:
if (
len((set(pub.get("author", [])) | set(
pub.get("editor", []))) & authors)
== 0
):
continue
if bold:
bold_self = []
for a in pub["author"]:
if a in authors:
bold_self.append("\\textbf{" + a + "}")
else:
bold_self.append(a)
pub["author"] = bold_self
if ackno:
if pub.get('ackno'):
pub["note"] = latex_safe(f"\\newline\\newline\\noindent "
f"Acknowledgement:\\newline\\noindent "
f"{pub.get('ackno')}\\newline\\newline\\noindent ")
if since:
bibdate = date(int(pub.get("year")),
month_to_int(pub.get("month", 12)),
int(pub.get("day", 28)))
if bibdate > since:
if before:
if bibdate < before:
pubs_by_date.append(pub)
else:
pubs_by_date.append(pub)
else:
pubs_by_date.append(pub)
if grants:
if isinstance(grants, str):
grants = [grants]
for grant in grants:
if grant in pub.get("grant", ""):
pubs_by_grant.append(pub)
else:
pubs_by_grant.append(pub)
pubs = [x for x in pubs_by_date if x in pubs_by_grant]
pubs.sort(key=doc_date_key_high, reverse=reverse)
return pubs
[docs]def filter_projects(projects, people, reverse=False,
active_only=False, group=None, ptype=None):
"""Filter projects by the author(s)
Parameters
----------
projects : list of dict
The publication citations
people : set of list of str
The people to be filtered against
reverse : bool, optional
If True reverse the order, defaults to False
since : date, optional
The date after which a highlight must be for a project to be returned,
defaults to None
before : date, optional
The date before which a highlight must be for a project to be returned,
defaults to None
active_only : bool, optional
Only active projects will be returned if True,
defaults to False
group : str, optional
Only projects from this group will be returned if specified, otherwise
projects from all groups will be returned, defaults to None
ptype : str, optional
The type of the project to filter for, such as ossoftware for open source
software, defaults to None
"""
projs = []
# Fixme dereference team from grant collection if provided
for proj in projects:
team_names = set(gets(proj["team"], "name"))
if len(team_names & people) == 0:
continue
if active_only:
if not proj.get("active"):
continue
if group:
if proj.get("group") != group:
continue
if ptype:
if proj.get("type") != ptype:
continue
projs.append(proj)
projs.sort(key=id_key, reverse=reverse)
return projs
[docs]def filter_grants(input_grants, names, pi=True, reverse=True, multi_pi=False):
"""Filter grants by those involved
Parameters
----------
input_grants : list of dict
The grants to filter
names : set of str
The authors to be filtered against
pi : bool, optional
If True add the grant amount to that person's total amount
reverse : bool, optional
If True reverse the order, defaults to False
multi_pi : bool, optional
If True compute sub-awards for multi PI grants, defaults to False
"""
grants = []
total_amount = 0.0
subaward_amount = 0.0
for grant in input_grants:
grant_dates = get_dates(grant)
datenames = ["begin_", "end_"]
for datename in datenames:
grant[f"{datename}year"] = grant_dates[f"{datename}date"].year
grant[f"{datename}month"] = grant_dates[f"{datename}date"].month
team_names = set(gets(grant["team"], "name"))
if len(team_names & names) == 0:
continue
grant = deepcopy(grant)
person = [x for x in grant["team"] if x["name"] in names][0]
if pi:
if person["position"].lower() == "pi":
total_amount += grant["amount"]
else:
continue
elif multi_pi:
grant["subaward_amount"] = person.get("subaward_amount", 0.0)
grant["multi_pi"] = any(gets(grant["team"], "subaward_amount"))
else:
if person["position"].lower() == "pi":
continue
else:
total_amount += grant["amount"]
subaward_amount += person.get("subaward_amount", 0.0)
grant["subaward_amount"] = person.get("subaward_amount", 0.0)
grant["pi"] = [
x for x in grant["team"] if x["position"].lower() == "pi"
][0]
grant["me"] = person
grants.append(grant)
grants.sort(key=ene_date_key, reverse=reverse)
return grants, total_amount, subaward_amount
[docs]def filter_employment_for_advisees(peoplecoll, begin_period, status,
advisor, now=None):
"""Filter people to get advisees since begin_period
Parameters
----------
people: list of dicts
The people collection
begin_period: date
Only select advisees who were active after this date (i.e., their end date
is after begin_period
status: str
the status of the person in the group to filter for, e.g., ms, phd, postdoc
"""
people = deepcopy(peoplecoll)
if not now:
now = date.today()
advisees = []
if isinstance(begin_period, str):
begin_period = date_parser.parse(begin_period).date()
for p in people:
for i in p.get("employment", []):
if i.get("advisor", "no advisor") == advisor:
if i.get("status") == status:
emp_dates = get_dates(i)
begin_date = emp_dates.get("begin_date")
end_date = emp_dates.get("end_date")
if not end_date:
end_date = now
if end_date >= begin_period:
p['role'] = i.get("position")
p['begin_year'] = begin_date.year
if not emp_dates.get("end_date"):
p['end_year'] = "present"
else:
p['end_year'] = end_date.year
p['status'] = status
p['position'] = i.get("position")
p['end_date'] = end_date
advisees.append(p)
advisees.sort(key=lambda x: x['end_date'], reverse=True)
return advisees
[docs]def filter_service(p, begin_period, type):
myservice = []
for i in p.get("service", []):
if i.get("type") == type:
i_dates = get_dates(i)
end_date = i_dates.get("end_date", i_dates.get("date"))
if not end_date:
end_date = date.today()
if end_date >= begin_period:
myservice.append(i)
return myservice
[docs]def filter_committees(person, begin_period, type):
mycommittees = []
for committee in person.get("committees", []):
if committee.get("type") == type:
committee_dates = get_dates(committee)
end_date = committee_dates.get("end_date", committee_dates.get("date"))
if not end_date:
end_date = date.today()
if end_date >= begin_period:
mycommittees.append(committee)
return mycommittees
[docs]def filter_facilities(people, begin_period, type, verbose=False):
facilities = []
for p in people:
myfacility = []
svc = copy(p.get("facilities", []))
for i in svc:
if i.get("type") == type:
if i.get('year'):
end_year = i.get('year')
elif i.get('end_year'):
end_year = i.get('end_year')
else:
end_year = date.today().year
end_date = date(end_year,
i.get("end_month", 12),
i.get("end_day", 28))
if end_date >= begin_period:
if not i.get('month'):
month = i.get("begin_month", 0)
i['month'] = SHORT_MONTH_NAMES[month_to_int(month)]
else:
i['month'] = SHORT_MONTH_NAMES[month_to_int(i['month'])]
myfacility.append(i)
if verbose: print("p['facilities'] = {}".format(myfacility))
p['facilities'] = myfacility
if len(p['facilities']) > 0:
facilities.append(p)
return facilities
[docs]def filter_patents(patentscoll, people, target, since=None, before=None):
patents = []
allowed_statuses = ["active", "pending"]
for i in patentscoll:
if i.get("status") in allowed_statuses and i.get("type") in "patent":
inventors = [
fuzzy_retrieval(
people,
["aka", "name", "_id"],
inv,
case_sensitive=False,
)
for inv in i['inventors']
]
person = fuzzy_retrieval(
people,
["aka", "name", "_id"],
target,
case_sensitive=False,
)
if person in inventors:
if i.get('end_year'):
end_year = i.get('end_year')
else:
end_year = date.today().year
end_date = date(end_year,
i.get("end_month", 12),
i.get("end_day", 28))
if since:
if end_date >= since:
if not i.get('month'):
month = i.get("begin_month", 0)
i['month'] = SHORT_MONTH_NAMES[month_to_int(month)]
else:
i['month'] = SHORT_MONTH_NAMES[
month_to_int(i['month'])]
events = [event for event in i["events"] if
date(event["year"], event["month"],
event.get("day", 28)) > since]
events = sorted(events,
key=lambda event: date(
event["year"],
event["month"],
event.get("day", 28)))
i["events"] = events
patents.append(i)
else:
events = [event for event in i["events"]]
events = sorted(events,
key=lambda event: date(event["year"],
event["month"],
28))
i["events"] = events
patents.append(i)
return patents
[docs]def filter_licenses(patentscoll, people, target, since=None, before=None):
licenses = []
allowed_statuses = ["active", "pending"]
for i in patentscoll:
if i.get("status") in allowed_statuses and i.get("type") in "license":
inventors = [
fuzzy_retrieval(
people,
["aka", "name", "_id"],
inv,
case_sensitive=False,
)
for inv in i['inventors']
]
person = fuzzy_retrieval(
people,
["aka", "name", "_id"],
target,
case_sensitive=False,
)
if person in inventors:
if i.get('end_year'):
end_year = i.get('end_year')
else:
end_year = date.today().year
end_date = date(end_year,
i.get("end_month", 12),
i.get("end_day", 28))
if since:
if end_date >= since:
if not i.get('month'):
month = i.get("begin_month", 0)
i['month'] = SHORT_MONTH_NAMES[month_to_int(month)]
else:
i['month'] = SHORT_MONTH_NAMES[
month_to_int(i['month'])]
total = sum(
[event.get("amount") for event in i["events"]])
i["total_amount"] = total
events = [event for event in i["events"] if
date(event["year"], event["month"],
event.get("day", 28)) > since]
events = sorted(events,
key=lambda event: date(event["year"],
event["month"],
event.get("day",
28)))
i["events"] = events
licenses.append(i)
else:
total = sum([event.get("amount") for event in events])
i["total_amount"] = total
events = [event for event in i["events"]]
events = sorted(events,
key=lambda event: date(event["year"],
event["month"],
28))
i["events"] = events
licenses.append(i)
return licenses
[docs]def filter_activities(people, begin_period, type, verbose=False):
activities = []
for p in people:
myactivity = []
svc = copy(p.get("activities", []))
for i in svc:
if i.get("type") == type:
idates = get_dates(i)
if idates["end_date"] >= begin_period:
usedate = idates.get('begin_date', idates.get('date'))
i['year'] = usedate.year
i['month'] = SHORT_MONTH_NAMES[month_to_int(usedate.month)]
myactivity.append(i)
p['activities'] = myactivity
if len(p['activities']) > 0:
activities.append(p)
return activities
[docs]def filter_presentations(people, presentations, institutions, target,
types=None,
since=None, before=None, statuses=None):
f'''
filters presentations for different types and date ranges
Parameters
----------
people: iterable of dicts
The people collection
presentations: iterable of dicts
The presentations collection
institutions: iterable of dicts
The institutions collection
target: str
The id of the person you will build the list for
types: list of strings. Optional, default = all
The types to filter for. Allowed types are
{*PRESENTATION_TYPES,}
since: date. Optional, default is None
The begin date to filter from
before: date. Optional, default is None
The end date to filter for. None does not apply this filter
statuses: list of str. Optional. Default is accepted
The list of statuses to filter for. Allowed statuses are
{PRESENTATION_STATI}
Returns
-------
list of presentation documents
'''
if not types:
types = ["all"]
if not statuses:
statuses = ["accepted"]
presentations = deepcopy(presentations)
firstclean = list()
secondclean = list()
thirdclean = list()
fourthclean = list()
presclean = list()
# build the filtered collection
# only list the talk if the group member is an author
for pres in presentations:
pauthors = pres["authors"]
if isinstance(pauthors, str):
pauthors = [pauthors]
authors = [
fuzzy_retrieval(
people,
["aka", "name", "_id"],
author,
case_sensitive=False,
)
for author in pauthors
]
authorids = [
author["_id"] if author is not None
else author
for author in authors
]
if target in authorids:
firstclean.append(pres)
# only list the presentation if it has status in statuses
for pres in firstclean:
if pres["status"] in statuses or "all" in statuses:
secondclean.append(pres)
# only list the presentation if it has type in types
for pres in secondclean:
if pres["type"] in types or "all" in types:
thirdclean.append(pres)
# if specified, only list presentations in specified date ranges
if since:
for pres in thirdclean:
if get_dates(pres).get('date'):
presdate = get_dates(pres).get('date')
else:
presdate = get_dates(pres).get('begin_date')
if presdate > since:
fourthclean.append(pres)
else:
fourthclean = thirdclean
if before:
for pres in fourthclean:
if get_dates(pres).get('date'):
presdate = get_dates(pres).get('date')
else:
presdate = get_dates(pres).get('begin_date')
if presdate < before:
presclean.append(pres)
else:
presclean = fourthclean
# build author list
for pres in presclean:
pauthors = pres["authors"]
if isinstance(pauthors, str):
pauthors = [pauthors]
pres["authors"] = [
author
if fuzzy_retrieval(
people,
["aka", "name", "_id"],
author,
case_sensitive=False,
)
is None
else fuzzy_retrieval(
people,
["aka", "name", "_id"],
author,
case_sensitive=False,
)["name"]
for author in pauthors
]
authorlist = ", ".join(pres["authors"])
pres["authors"] = authorlist
if get_dates(pres).get('date'):
presdate = get_dates(pres).get('date')
else:
presdate = get_dates(pres).get('begin_date')
pres["begin_month"] = presdate.month
pres["begin_year"] = presdate.year
pres["begin_day"] = presdate.day
end_date = get_dates(pres).get("end_date")
if end_date:
pres["end_day"] = end_date.day
pres["date"] = presdate
for day in ["begin_", "end_", ""]:
try:
pres["{}day_suffix".format(day)] = number_suffix(
get_dates(pres).get(f'{day}date').day
)
except AttributeError:
print(f"presentation {pres.get('_id')} has no {day}date")
if "institution" in pres:
inst = {"institution": pres.get("institution"),
"department": pres.get("department")}
dereference_institution(inst, institutions)
pres["institution"] = {'name': inst.get("institution", ""),
'city': inst.get("city"),
'state': inst.get("state"),
'country': inst.get("country")}
pres["department"] = {'name': inst.get("department")}
if len(presclean) > 0:
presclean = sorted(
presclean,
key=lambda k: k.get("date", None),
reverse=True,
)
return presclean
[docs]def awards_grants_honors(p, target_name, funding=True, service_types=None):
"""Make sorted awards grants and honors list.
Parameters
----------
p : dict
The person entry
"""
if not service_types:
service_types = ["profession"]
aghs = []
if funding:
if p.get("funding"):
for x in p.get("funding", ()):
d = {
"description": "{0} ({1}{2:,})".format(
latex_safe(x["name"]),
x.get("currency", "$").replace("$", r"\$"),
x["value"],
),
"year": x["year"],
"_key": date_to_float(x["year"], x.get("month", 0)),
}
aghs.append(d)
target = p.get(target_name, [])
for x in target:
if target_name != "service" or target_name == "service" and x.get("type") in service_types:
d = {"description": latex_safe(x["name"])}
if "year" in x:
x["date"] = date(x["year"], 1, 1)
del x["year"]
x_dates = get_dates(x)
if x_dates.get("date"):
d.update(
{"year": x_dates["date"].year,
"_key": date_to_float(x_dates["date"].year, x_dates["date"].month)}
)
elif x_dates.get("begin_date") and x_dates.get("end_date"):
d.update(
{
"year": "{}-{}".format(x_dates["begin_date"].year, x_dates["end_date"].year),
"_key": date_to_float(x_dates["begin_date"].year, x_dates["begin_date"].month),
}
)
elif x_dates.get("begin_date"):
d.update(
{
"year": "{}".format(x_dates["begin_date"].year),
"_key": date_to_float(x_dates["begin_date"].year, x_dates["begin_date"].month),
}
)
aghs.append(d)
aghs.sort(key=(lambda x: x.get("_key", 0.0)), reverse=True)
return aghs
[docs]def awards(p, since=None, before=None, ):
"""Make sorted awards and honors
Parameters
----------
p : dict
The person entry
since : date. Optional, default is None
The begin date to filter from
before : date. Optional, default is None
The end date to filter for. None does not apply this filter
"""
if not since: since = date(1500, 1, 1)
a = []
for x in p.get("honors", []):
if "year" in x:
if date(x.get("year"), 12, 31) > since:
d = {"description": latex_safe(x["name"]), "year": x["year"],
"_key": date_to_float(x["year"], x.get("month", 0))}
a.append(d)
elif "begin_year" in x and "end_year" in x:
if date(x.get("begin_year", 12, 31)) > since:
d = {"description": latex_safe(x["name"]),
"year": "{}-{}".format(x["begin_year"], x["end_year"]),
"_key": date_to_float(x["begin_year"], x.get("month", 0)),
}
a.append(d)
elif "begin_year" in x:
if date(x.get("begin_year"), 12, 31) > since:
d = {"description": latex_safe(x["name"]),
"year": "{}".format(x["begin_year"]),
"_key": date_to_float(x["begin_year"], x.get("month", 0)),
}
a.append(d)
a.sort(key=(lambda x: x.get("_key", 0.0)), reverse=True)
return a
HTTP_RE = re.compile(
r"https?:\/\/(www\.)?[-a-zA-Z0-9@:%._\+~#=]{2,256}\.[a-z]{2,4}\b([-a-zA-Z0-9@:%_\+.~#?&//=]*)"
)
[docs]def latex_safe_url(s):
"""Makes a string that is a URL latex safe."""
return s.replace("#", r"\#")
[docs]def latex_safe(s, url_check=True, wrapper="url"):
"""Make string latex safe
Parameters
----------
s : str
url_check : bool, optional
If True check for URLs and wrap them, if False check for URL but don't
wrap, defaults to True
wrapper : str, optional
The wrapper for wrapping urls defaults to url
"""
if not s:
return s
if url_check:
# If it looks like a URL make it a latex URL
url_search = HTTP_RE.search(s)
if url_search:
url = r"{start}\{wrapper}{{{s}}}{end}".format(
start=(latex_safe(s[: url_search.start()])),
end=(latex_safe(s[url_search.end():])),
wrapper=wrapper,
s=latex_safe_url(s[url_search.start(): url_search.end()]),
)
return url
return (
s.replace("&", r"\&")
.replace("$", r"\$")
.replace("#", r"\#")
.replace("_", r"\_")
)
[docs]def make_bibtex_file(pubs, pid, person_dir="."):
"""Make a bibtex file given the publications
Parameters
----------
pubs : list of dict
The publications
pid : str
The person id
person_dir : str, optional
The person's directory
"""
if not HAVE_BIBTEX_PARSER:
return None
skip_keys = {"ID", "ENTRYTYPE", "author"}
bibdb = BibDatabase()
bibwriter = BibTexWriter()
bibdb.entries = ents = []
for pub in pubs:
ent = dict(pub)
ent["ID"] = ent.pop("_id")
ent["ENTRYTYPE"] = ent.pop("entrytype")
if ent.get('doi') == 'tbd':
del ent['doi']
if ent.get("supplementary_info_urls"):
ent.update({"supplementary_info_urls":
", ".join(ent.get("supplementary_info_urls"))})
if isinstance(ent.get("editor"), list):
for n in ["author", "editor"]:
if n in ent:
ent[n] = " and ".join(ent[n])
else:
if "author" in ent:
ent["author"] = " and ".join(ent["author"])
for key in ent.keys():
if key in skip_keys:
continue
# don't think I want the bibfile entries to be latex safe
# ent[key] = latex_safe(ent[key])
ent[key] = str(ent[key])
ents.append(ent)
fname = os.path.join(person_dir, pid) + ".bib"
with open(fname, "w", encoding="utf-8") as f:
f.write(bibwriter.write(bibdb))
return fname
[docs]def document_by_value(documents, address, value):
"""Get a specific document by one of its values
Parameters
----------
documents: generator
Generator which yields the documents
address: str or tuple
The address of the data in the document
value: any
The expected value for the document
Returns
-------
dict:
The first document which matches the request
"""
if isinstance(address, str):
address = (address,)
for g_doc in documents:
doc = deepcopy(g_doc)
for add in address:
doc = doc[add]
if doc == value:
return g_doc
[docs]def fuzzy_retrieval(documents, sources, value, case_sensitive=True):
"""Retrieve a document from the documents where value is compared against
multiple potential sources
Parameters
----------
documents: generator
The documents
sources: iterable
The potential data sources
value:
The value to compare against to find the document of interest
case_sensitive: Bool
When true will match case (Default = True)
Returns
-------
dict:
The document
Examples
--------
>>> fuzzy_retrieval(people, ['aka', 'name'], 'pi_name', case_sensitive = False)
This would get the person entry for which either the alias or the name was
``pi_name``.
"""
for doc in documents:
returns = []
for k in sources:
ret = doc.get(k, [])
if not isinstance(ret, list):
ret = [ret]
returns.extend(ret)
if not case_sensitive:
returns = [reti.lower() for reti in returns if
isinstance(reti, str)]
if isinstance(value, str):
if value.lower() in frozenset(returns):
return doc
else:
if value in frozenset(returns):
return doc
[docs]def number_suffix(number):
"""returns the suffix that adjectivises a number (st, nd, rd, th)
Paramters
---------
number: integer
The number. If number is not an integer, returns an empty string
Returns
-------
suffix: string
The suffix (st, nd, rd, th)
"""
if not isinstance(number, (int, float)):
return ""
if 10 < number < 20:
suffix = "th"
else:
suffix = {1: "st", 2: "nd", 3: "rd"}.get(number % 10, "th")
return suffix
[docs]def dereference_institution(input_record, institutions, verbose=False):
"""Tool for replacing placeholders for institutions with the actual
institution data. Note that the replacement is done inplace
Parameters
----------
input_record : dict
The record to dereference
institutions : iterable of dicts
The institutions
Returns
-------
nothing
"""
inst = input_record.get("institution") or input_record.get("organization")
if verbose:
if not inst:
print(f"WARNING: no institution or organization in entry: {input_record}")
return
db_inst = fuzzy_retrieval(institutions, ["name", "_id", "aka"], inst)
if not db_inst:
print(
f"WARNING: {input_record.get('institution', input_record.get('organization', 'unknown'))} not found in institutions")
db_inst = {"name": input_record.get("institution", input_record.get("organization", "unknown")),
"location": input_record.get("location",
f"{input_record.get('city', 'unknown')}, {input_record.get('state', 'unknown')}"),
"city": input_record.get('city', 'unknown'),
"country": input_record.get('country', 'unknown'),
"state": input_record.get('state', 'unknown'),
"departments": {
input_record.get('department', 'unknown'): {'name': input_record.get('department', 'unknown')}}
}
if input_record.get('department') and not db_inst.get("departments"):
if verbose:
print(f"WARNING: no departments in {db_inst.get('_id')}. "
f"{input_record.get('department')} sought")
db_inst.update({"departments": {
input_record.get('department', 'unknown'): {'name': input_record.get('department', 'unknown')}}})
if db_inst.get("country") == "USA":
state_country = db_inst.get("state")
else:
state_country = db_inst.get("country")
# now update the input record in place with what we have found
input_record["location"] = db_inst.get("location",
f"{db_inst['city']}, {state_country}")
input_record["institution"] = db_inst["name"]
input_record["organization"] = db_inst["name"]
input_record["city"] = db_inst["city"]
input_record["country"] = db_inst["country"]
for optional_key in OPTIONAL_KEYS_INSTITUTIONS:
if optional_key not in ["departments", "schools"]:
if db_inst.get(optional_key):
input_record[optional_key] = db_inst.get(optional_key)
if "department" in input_record:
for k, v in db_inst.get("departments").items():
v.update({"_id": k})
extracted_department = fuzzy_retrieval(
db_inst["departments"].values(), ["name", "aka", "_id"],
input_record["department"]
)
if extracted_department:
input_record["department"] = extracted_department.get("name")
else:
input_record["department"] = input_record.get("department", "")
else:
input_record["department"] = "unknown"
return
[docs]def merge_collections_all(a, b, target_id):
"""
merge two collections into a single merged collection
for keys that are in both collections, the value in b will be kept
Parameters
----------
a the inferior collection (will lose values of shared keys)
b the superior collection (will keep values of shared keys)
target_id str the name of the key used in b to dereference ids in a
Returns
-------
the combined collection. Note that it returns a collection containing
all items from a and b with the items dereferenced in b merged with the
dereferenced items in a.
see also merge_intersection that returns collection that is just referenced
in both
Examples
--------
>>> grants = merge_collections_all(self.gtx["proposals"], self.gtx["grants"], "proposal_id")
This would merge all entries in the proposals collection with entries in the
grants collection for which "_id" in proposals has the value of
"proposal_id" in grants, returning also unchanged any other entries that are
not linked.
"""
intersect = merge_collections_intersect(a, b, target_id)
for j in intersect:
for i in b:
if i.get("_id") == j.get("_id"):
b.remove(i)
for j in intersect:
for i in a:
if i.get("_id") == j.get(target_id):
a.remove(i)
bdis, adis = b, a
return adis + intersect + bdis
[docs]def merge_collections_superior(a, b, target_id):
"""
merge two collections into a single merged collection
for keys that are in both collections, the value in b will be kept
Parameters
----------
a the inferior collection (will lose values of shared keys)
b the superior collection (will keep values of shared keys)
target_id str the name of the key used in b to dereference ids in a
Returns
-------
the combined collection. Note that it returns a collection containing
all items from a and b with the items dereferenced in b merged with the
dereferenced items in a.
see also merge_intersection that returns collection that is just referenced
in both
Examples
--------
>>> grants = merge_collections_all(self.gtx["proposals"], self.gtx["grants"], "proposal_id")
This would merge all entries in the proposals collection with entries in the
grants collection for which "_id" in proposals has the value of
"proposal_id" in grants, returning also unchanged any other entries that are
not linked.
"""
intersect = merge_collections_intersect(a, b, target_id)
b = list(b)
for j in intersect:
for i in b:
if i.get("_id") == j.get("_id"):
b.remove(i)
bdis = b
return intersect + bdis
[docs]def merge_collections_intersect(a, b, target_id):
"""
merge two collections such thta just the intersection is returned
for shared keys that are in both collections, the value in b will be kept
Parameters
----------
a the inferior collection (will lose values of shared keys)
b the superior collection (will keep values of shared keys)
target_id str the name of the key used in b to dereference ids in a
Returns
-------
the combined collection. Note that it returns a collection only containing
merged items from a and b that are dereferenced in b, i.e., the merged
intercept.
see also merge_collections_all that returns all items in a, b and the intersect
and merge_collections_superior that returns all items in b and the intercept
Examples
--------
>>> grants = merge_collections_intesect(self.gtx["proposals"], self.gtx["grants"], "proposal_id")
This would merge all entries in the proposals collection with entries in the
grants collection for which "_id" in proposals has the value of
"proposal_id" in grants, returning just those items that have the dereference
"""
intersect = [{**j, **i} for j in a for i in b if
j.get("_id") == i.get(target_id)]
return intersect
[docs]def update_schemas(default_schema, user_schema):
"""
Merging the user schema into the default schema recursively and return the
merged schema. The default schema and user schema will not be modified
during the merging.
Parameters
----------
default_schema : dict
The default schema.
user_schema : dict
The user defined schema.
Returns
-------
updated_schema : dict
The merged schema.
"""
updated_schema = deepcopy(default_schema)
for key in user_schema.keys():
if (key in updated_schema) and isinstance(updated_schema[key],
dict) and isinstance(
user_schema[key], dict):
updated_schema[key] = update_schemas(updated_schema[key],
user_schema[key])
else:
updated_schema[key] = user_schema[key]
return updated_schema
[docs]def get_person(person_id, rc):
"""Get the person's name."""
person_found = fuzzy_retrieval(
all_docs_from_collection(rc.client, "people"),
["name", "aka", "_id"],
person_id,
case_sensitive=False
)
if person_found:
return person_found
person_found = fuzzy_retrieval(
all_docs_from_collection(rc.client, "contacts"),
["name", "aka", "_id"],
person_id,
case_sensitive=False
)
if person_found:
return person_found
print("WARNING: {} missing from people and contacts. Check aka.".format(
person_id))
return None
[docs]def group(db, by):
"""
Group the document in the database according to the value of the doc[by] in db.
Parameters
----------
db : iterable
The database of documents.
by : basestring
The key to group the documents.
Returns
-------
grouped: dict
A dictionary mapping the feature value of group to the list of docs. All docs in the same generator have
the same value of doc[by].
Examples
--------
Here, we use a tuple of dict as an example of the database.
>>> db = ({"k": "v0"}, {"k": "v1"}, {"k": "v0"})
>>> group(db)
This will return
>>> {"v0": [{"k": "v0"}, {"k": "v0"}], "v1": [{"k": "v1"}]}
"""
grouped = {}
doc: dict
for doc in db:
key = doc.get(by)
if not key:
print("There is no field {} in {}".format(by, id_key(doc)))
elif key not in grouped:
grouped[key] = [doc]
else:
grouped[key].append(doc)
return grouped
[docs]def get_pi_id(rc):
"""
Gets the database id of the group PI
Parameters
----------
rc: runcontrol object
The runcontrol object. It must contain the 'groups' and 'people'
collections in the needed databases
Returns
-------
The database '_id' of the group PI
"""
groupiter = list(all_docs_from_collection(rc.client, "groups"))
peoplecoll = all_docs_from_collection(rc.client, "people")
pi_ref = [i.get("pi_name") for i in groupiter if
i.get("name").casefold() == rc.groupname.casefold()]
pi = fuzzy_retrieval(peoplecoll, ["_id", "aka", "name"], pi_ref[0])
return pi.get("_id")
[docs]def group_member_ids(ppl_coll, grpname):
"""Get a list of all group member ids
Parameters
----------
ppl_coll: collection (list of dicts)
The people collection that should contain the group members
grp: string
The id of the group in groups.yml
Returns
-------
set:
The set of ids of the people in the group
Notes
-----
- Groups that are being tracked are listed in the groups.yml collection
with a name and an id.
- People are in a group during an educational or employment period.
- To assign a person to a tracked group during one such period, add
a "group" key to that education/employment item with a value
that is the group id.
- This function takes the group id that is passed and searches
the people collection for all people that have been
assigned to that group in some period of time and returns a list of
"""
grpmembers = set()
for person in ppl_coll:
for k in ["education", "employment"]:
for position in person.get(k, {}):
if position.get("group", None) == grpname:
grpmembers.add(person["_id"])
return grpmembers
[docs]def group_member_employment_start_end(person, grpname):
"""
Get start and end dates of group member employment
Parameters
----------
person dict
The person whose dates we want
grpname
The code for the group we want the dates of employment from
Returns
-------
list of dicts
The employment periods, with person id, begin and end dates
"""
grpmember = []
for k in ["employment"]:
for position in person.get(k, {}):
if position.get("group", None) == grpname:
dates = get_dates(position)
if not dates.get('end_date') and not position.get("permanent"):
raise RuntimeError(
"WARNING: {} has no end date in employment for {} starting {}".
format(person["_id"], grpname, dates.get("begin_date")))
grpmember.append({"_id": person["_id"],
"begin_date": dates.get("begin_date"),
"end_date": dates.get("end_date"),
"permanent": position.get("permanent"),
"status": position.get("status")
})
return grpmember
[docs]def compound_dict(doc, li):
"""
Recursive function that collects all the strings from a document that is a dictionary
Parameters
----------
doc dict
The specific document we are traversing
li
The recursive list that holds all the strings
Returns
-------
list of strings
The strings that make up the nested attributes of this object
"""
for key in doc:
res = doc.get(key)
if isinstance(res, str):
li.append(res)
elif isinstance(res, list):
li.extend(compound_list(res, []))
elif isinstance(res, dict):
li.extend(compound_dict(res, []))
return li
[docs]def compound_list(doc, li):
"""
Recursive function that collects all the strings from a document that is a list
Parameters
----------
doc list
The specific document we are traversing
li
The recursive list that holds all the strings
Returns
-------
list of strings
The strings that make up the nested attributes of this list
"""
for item in doc:
if isinstance(item, dict):
li.extend(compound_dict(item, []))
elif isinstance(item, str):
li.append(item)
elif isinstance(item, list):
li.extend(compound_list(item, []))
return li
[docs]def fragment_retrieval(coll, fields, fragment, case_sensitive=False):
"""Retrieves a list of all documents from the collection where the fragment
appears in any one of the given fields
Parameters
----------
coll: generator
The collection containing the documents
fields: iterable
The fields of each document to check for the fragment
fragment:
The value to compare against to find the documents of interest
case_sensitive: Bool
When true will match case (Default = False)
Returns
-------
list:
A list of documents (that are dicts)
Examples
--------
>>> fragment_retrieval(people, ['aka', 'name'], 'pi_name', case_sensitive = False)
This would get all people for which either the alias or the name included
the substring ``pi_name``.
"""
ret_list = []
for doc in coll:
returns = []
for k in fields:
ret = doc.get(k, None)
if ret is not None:
if isinstance(ret, list):
ret = compound_list(ret, [])
elif isinstance(ret, dict):
ret = compound_dict(ret, [])
else:
ret = [ret]
else:
ret = []
returns.extend(ret)
if not case_sensitive:
returns = [reti.lower() for reti in returns if
isinstance(reti, str)]
if isinstance(fragment, str):
for item in frozenset(returns):
if fragment.lower() in item:
ret_list.append(doc)
break
else:
for item in frozenset(returns):
if fragment in item:
ret_list.append(doc)
break
return ret_list
[docs]def get_id_from_name(coll, name):
person = fuzzy_retrieval(coll, ["name", "aka", "_id"], name,
case_sensitive=False)
if person:
return person["_id"]
else:
return None
[docs]def is_fully_appointed(person, begin_date, end_date):
"""Checks if a collection of appointments for a person is valid and fully loaded
for a given interval of time
Parameters
----------
person: dict
The person whose appointments need to be checked
begin_date: datetime, string, optional
The start date of the interval of time to check appointments for
end_date: datetime, string, optional
The end date of the interval of time to check appointments for
Returns
-------
bool:
True if the person is fully appointed and False if not
Examples
--------
>>> appts = [{"begin_year": 2017, "begin_month": 6, "begin_day": 1, "end_year": 2017, "end_month": 6,\
"end_day": 15, "grant": "grant1", "loading": 1.0, "type": "pd", }, {"begin_year": 2017, "begin_month": 6, \
"begin_day": 20, "end_year": 2017, "end_month": 6, "end_day": 30, "grant": "grant2", "loading": 1.0, \
"type": "pd",} ]
>>> aejaz = {"name": "Adiba Ejaz", "_id": "aejaz", "appointments": appts}
>>> is_fully_appointed(aejaz, "2017-06-01", "2017-06-30")
In this case, we have an invalid loading from 2017-06-16 to 2017-06-19 hence it would return False and
print "appointment gap for aejaz from 2017-06-16 to 2017-06-19".
"""
if not person.get('appointments'):
print("No appointments defined for this person")
return False
status = True
appts = person.get('appointments')
if begin_date > end_date:
raise ValueError("invalid begin and end dates")
if isinstance(begin_date, str):
begin_date = date_parser.parse(begin_date).date()
if isinstance(end_date, str):
end_date = date_parser.parse(end_date).date()
timespan = end_date - begin_date
good_period, start_gap = True, None
for x in range(timespan.days + 1):
day_loading = 0.0
day = begin_date + relativedelta(days=x)
for appt in appts:
if is_current(appts[appt], now=day):
day_loading += appts[appt].get("loading")
if day_loading > 1.0 or day_loading < 1.0:
status = False
if good_period:
start_gap = day
good_period = False
else:
if not good_period:
print("WARNING: appointment gap for {} from {} to {}".format(
person.get('_id'),
str(start_gap), str(day - relativedelta(days=1))))
good_period = True
if x == timespan.days and not good_period:
if day != start_gap:
print("WARNING: appointment gap for {} from {} to {}".format(
person.get('_id'),
str(start_gap), str(day)))
else:
print("WARNING: appointment gap for {} on {}".format(
person.get('_id'), str(day)))
return status
[docs]def key_value_pair_filter(collection, arguments):
"""Retrieves a list of all documents from the collection where the fragment
appears in any one of the given fields
Parameters
----------
collection: generator
The collection containing the documents
arguments: list
The name of the fields to look for and their accompanying substring
Returns
-------
generator:
The collection containing the elements that satisfy the search criteria
Examples
--------
>>> key_value_pair_filter(people, ['name', 'ab', 'position', 'professor'])
This would get all people for which their name contains the string 'ab'
and whose position is professor and return them
"""
if len(arguments) % 2 != 0:
raise RuntimeError("Error: Number of keys and values do not match")
elements = collection
for i in range(0, len(arguments) - 1, 2):
elements = fragment_retrieval(elements, [arguments[i]],
arguments[i + 1])
return elements
[docs]def collection_str(collection, keys=None):
"""Retrieves a list of all documents from the collection where the fragment
appears in any one of the given fields
Parameters
----------
collection: generator
The collection containing the documents
keys: list, optional
The name of the fields to return from the search. Defaults to none in which case only the id is returned
Returns
-------
str:
A str of all the values
"""
if not keys:
keys = ['_id']
if '_id' not in keys:
keys.insert(0, '_id')
output = ""
for doc in collection:
for key in keys:
if key == '_id':
output += (doc.get(key) + ' ')
else:
output += ('{}: {} '.format(key, doc.get(key)))
output += '\n'
return output
[docs]def search_collection(collection, arguments, keys=None):
"""Retrieves a list of all documents from the collection where the fragment
appears in any one of the given fields
Parameters
----------
collection: generator
The collection containing the documents
arguments: list
The name of the fields to look for and their accompanying substring
keys: list, optional
The name of the fields to return from the search. Defaults to none in which case only the id is returned
Returns
-------
generator:
The collection containing the elements that satisfy the search criteria
Examples
--------
>>> search_collection(people, ['name', 'ab', 'position', 'professor'], ['_id', 'name'])
This would get all people for which their name contains the string 'ab'
and whose position is professor. It would return the name and id of the
valid entries
"""
collection = key_value_pair_filter(collection, arguments)
return collection_str(collection, keys)
[docs]def collect_appts(ppl_coll, filter_key=None, filter_value=None, begin_date=None,
end_date=None):
"""
Retrieves a list of all the appointments on the given grant(s) in the given interval of time for each person in the
given people collection.
Parameters
----------
ppl_coll: collection (list of dicts)
The people collection containing persons with appointments
filter_key: string, list, optional
The key we want to filter appointments by
filter_value: string, int, float, list, optional
The values for each key that we want to filter appointments by
begin_date: string, datetime, optional
The start date for the interval in which we want to collect appointments
end_date: string, datetime, optional
The start date for the interval in which we want to collect appointments
Returns
-------
list:
a list of all appointments in the people collection that satisfy the provided conditions (if any)
Examples
--------
>>> collect_appts(people,filter_key=['grant', 'status'], filter_value=['mrsec14', 'finalized'], \
begin_date= '2020-09-01', end_date='2020-12-31')
This would return all appointments on the grant 'mrsec14' with status 'finalized' that are valid on/during any
dates from 2020-09-01 to 2020-12-31
>>> collect_appts(people, filter_key=['grant', 'grant'], filter_value=['mrsec14', 'dmref19'])
This would return all appointments on the grants 'mrsec14' and 'dmref19' irrespective of their dates.
"""
if bool(begin_date) ^ bool(end_date):
raise RuntimeError(
"please enter both begin date and end date or neither")
filter_key = [filter_key] if not isinstance(filter_key,
list) else filter_key
filter_value = [filter_value] if not isinstance(filter_value,
list) else filter_value
if (bool(filter_key) ^ bool(filter_value)) or (
filter_key and filter_value and len(filter_key) != len(
filter_value)):
raise RuntimeError(
"number of filter keys and filter values do not match")
begin_date = date_parser.parse(begin_date).date() if isinstance(begin_date,
str) else begin_date
end_date = date_parser.parse(end_date).date() if isinstance(end_date,
str) else end_date
timespan = 0
if begin_date:
timespan = end_date - begin_date
if timespan.days < 0:
raise ValueError("begin date is after end date")
appts = []
for p in ppl_coll:
p_appts = p.get('appointments')
if not p_appts:
continue
for a in p_appts:
if p_appts[a].get('type') not in APPOINTMENTS_TYPES:
raise ValueError(
"invalid type {} for appointment {} of {}".format(
p_appts[a].get('type'), a, p.get('_id')))
if filter_key:
if all(p_appts[a].get(filter_key[x]) == filter_value[x] for x in
range(len(filter_key))):
if begin_date:
for y in range(timespan.days + 1):
day = begin_date + relativedelta(days=y)
if is_current(p_appts[a], now=day):
appts.append(p_appts[a])
appts[-1].update({'person': p.get('_id'),
'_id': a})
break
else:
appts.append(p_appts[a])
appts[-1].update({'person': p.get('_id'), '_id': a})
elif timespan:
for y in range(timespan.days + 1):
day = begin_date + relativedelta(days=y)
if is_current(p_appts[a], now=day):
appts.append(p_appts[a])
appts[-1].update({'person': p.get('_id'), '_id': a})
break
else:
appts.append(p_appts[a])
appts[-1].update({'person': p.get('_id'), '_id': a})
return appts
[docs]def grant_burn(grant, appts, begin_date=None, end_date=None):
"""
Retrieves the total burn of a grant over an interval of time by integrating over all appointments
made on the grant.
Parameters
----------
grant: dict
The grant object whose burn needs to be retrieved
appts: collection (list of dicts), dict
The collection of appointments made on assorted grants
begin_date: datetime, string, optional
The start date of the interval of time to retrieve the grant burn for, either a date object or a string
in YYYY-MM-DD format. Defaults to the begin_date of the grant.
end_date: datetime, string, optional
The end date of the interval of time to retrieve the grant burn for, either a date object or a string
in YYYY-MM-DD format. Defaults to the end_date of the grant.
Returns
-------
dict:
A dictionaries whose keys are the dates and values are a dict containing the corresponding grant amounts
on that date
Examples
--------
>>> grant_burn(mygrant, myappts, begin_date="2020-09-01", end_date="2020-09-03")
returns
>>> {datetime.date(2020, 9, 1): {'student_days': 5.0, 'postdoc_days': 12.0, 'ss_days': 20.0}, \
datetime.date(2020, 9, 2): {'student_days': 4.0, 'postdoc_days': 11.5, 'ss_days': 15.0}, \
datetime.date(2020, 9, 3): {'student_days': 3.0, 'postdoc_days': 11.0, 'ss_days': 10.0}}
"""
if not grant.get('budget'):
raise ValueError("{} has no specified budget".format(grant.get('_id')))
if bool(begin_date) ^ bool(end_date):
raise RuntimeError(
"please enter both begin date and end date or neither")
begin_date = date_parser.parse(begin_date).date() if isinstance(begin_date,
str) else begin_date
end_date = date_parser.parse(end_date).date() if isinstance(end_date,
str) else end_date
if isinstance(appts, dict):
appts = collect_appts([{"appointments": appts}])
grad_val, pd_val, ss_val = 0.0, 0.0, 0.0
grant_amounts = {}
budget_dates = get_dates(grant.get('budget')[0])
budget_begin, budget_end = budget_dates['begin_date'], budget_dates[
'end_date']
for period in grant.get('budget'):
period_dates = get_dates(period)
period_begin, period_end = period_dates['begin_date'], period_dates[
'end_date']
budget_begin = period_begin if period_begin < budget_begin else budget_begin
budget_end = period_end if period_end > budget_end else budget_end
grad_val += (period.get('student_months', 0) - period.get(
'student_writeoff', 0)) * 30.5
pd_val += (period.get('postdoc_months', 0) - period.get(
'postdoc_writeoff', 0)) * 30.5
ss_val += (period.get('ss_months', 0) - period.get('ss_writeoff',
0)) * 30.5
span = period_end - period_begin
for x in range(span.days + 1):
day = period_begin + relativedelta(days=x)
for a in appts:
if (a.get('grant') == grant.get('_id') or a.get(
'grant') == grant.get('alias')) and is_current(a,
now=day):
if a.get('type') == 'gra':
grad_val -= a.get('loading') * 1
elif a.get('type') == 'pd':
pd_val -= a.get('loading') * 1
elif a.get('type') == 'ss':
ss_val -= a.get('loading') * 1
if (not begin_date) or (begin_date <= day <= end_date):
gvals = {"student_days": round(grad_val, 2),
"postdoc_days": round(pd_val, 2),
"ss_days": round(ss_val, 2)}
grant_amounts.update({day: gvals})
return grant_amounts
[docs]def validate_meeting(meeting, date):
"""
Validates a meeting by checking is it has a journal club doi, a presentation link, and a presentation
title. This function will return nothing is the meeting is valid, otherwise it will raise a ValueError.
Parameters
----------
meeting: dict
The meeting object that needs to be validated
date: datetime object
The date we want to use to see if a meeting has happened or not
"""
meeting_date = date_parser.parse(meeting.get('_id')[3:]).date()
if meeting.get('journal_club') and meeting_date < date:
if meeting.get('journal_club').get('doi').lower() == 'tbd':
raise ValueError(
f'{meeting.get("_id")} does not have a journal club doi')
if meeting_date < date and meeting.get('presentation').get(
'link').lower() == 'tbd':
raise ValueError(
f'{meeting.get("_id")} does not have a presentation link')
if meeting_date < date and meeting.get('presentation').get(
'title').lower() == 'tbd':
raise ValueError(
f'{meeting.get("_id")} does not have a presentation title')
[docs]def print_task(task_list, stati, index=True):
"""
Print tasks in a nice format.
Parameters
----------
task_list: list
A list of tasks that will be printed.
stati: list
Filter status of the task
"""
for status in stati:
if f"'status': '{status}'" in str(task_list):
print(f"{status}:")
for task in task_list:
if index:
try:
task["preamble"] = f"({task.get('running_index', 0)}) "
except:
task["preamble"] = ""
else:
task["preamble"] = ""
if task.get('status') == status:
print(
f"{task.get('preamble')}{task.get('description').strip()} ({task.get('days_to_due')}|{task.get('importance')}|{str(task.get('duration'))}|{','.join(task.get('tags', []))}|{task.get('assigned_by')}|{task.get('uuid',[])[:6]})")
if task.get('notes'):
for note in task.get('notes'):
print(f" - {note}")
print("-" * 30)
print("Tasks (decreasing priority going up)")
print("-" * 30)
deadline_list = [task for task in task_list
if task.get('deadline') and task.get("status") in stati]
deadline_list.sort(key=lambda x: x.get("due_date"), reverse=True)
for task in deadline_list:
print(
f"{task.get('due_date')}({task.get('days_to_due')} days): ({task.get('running_index', 0)}) {task.get('description').strip()} ({task.get('days_to_due')}|{task.get('importance')}|{str(task.get('duration'))}|{','.join(task.get('tags', []))}|{task.get('assigned_by')}|{task.get('uuid')[:6]})")
if task.get('notes'):
for note in task.get('notes'):
print(f" - {note}")
print(f"{'-' * 30}\nDeadlines:\n{'-' * 30}")
return
[docs]def remove_duplicate_docs(coll, key):
'''
find all docs where the target key has the same value and remove duplicates
The doc found first will be kept and subsequent docs will be removed
parameters
----------
target iterable of dicts
the list of documents
key string
the key that will be used to compare
return
------
The list of docs with duplicates (as described above) removed
'''
values, newcoll = [], []
for doc in coll:
if doc.get(key) in values:
continue
elif not doc.get(key):
raise RuntimeError(f"ERROR: Target key, {key} not found in {doc}")
else:
newcoll.append(doc)
values.append(doc.get(key))
return newcoll
[docs]def validate_doc(collection_name, doc, rc):
from regolith.schemas import validate
from pprint import pformat
v = validate(collection_name, doc, rc.schemas)
error_message = ""
if v[0] is False:
error_message += f"ERROR in {doc['_id']}:\n{pformat(v[1])}\n"
for vv in v[1]:
error_message += f"{pformat(doc.get(vv))}\n"
error_message += ("-" * 15)
error_message += "\n"
return v[0], error_message
[docs]def add_to_google_calendar(event):
"""Takes a newly created event, and adds it to the user's google calendar
Parameters:
event - a dictionary containing the event details to be added to google calendar
https://developers.google.com/calendar/api/v3/reference/events
Returns:
None
"""
tokendir = os.path.expanduser("~/.config/regolith/tokens/google_calendar_api")
creds = None
os.makedirs(tokendir, exist_ok=True)
tokenfile = os.path.join(tokendir, 'token.json')
# The file token.json stores the user's access and refresh tokens, and is
# created automatically when the authorization flow completes for the first
# time.
if os.path.exists(tokenfile):
creds = Credentials.from_authorized_user_file(tokenfile, ['https://www.googleapis.com/auth/calendar.events'])
# If there are no (valid) credentials available, let the user log in.
if not creds or not creds.valid:
if creds and creds.expired and creds.refresh_token:
creds.refresh(Request())
else:
print('The google calendar feature needs authentication information to run. '
'This needs to be done just once for each new device. '
'Please grant permission to regolith to access your calendar. '
'If this process takes more than 1 minute you will have to rerun '
'the helper to complete the addition of the presentation.')
return 0
with open(tokenfile, 'w') as token:
token.write(creds.to_json())
service = build('calendar', 'v3', credentials=creds)
event = service.events().insert(calendarId='primary', body=event).execute()
print('Event created: %s' % (event.get('htmlLink')))
return 1
[docs]def google_cal_auth_flow():
"""First time authentication, this function opens a window to request user consent to use google calendar API,
and then returns a token"""
tokendir = os.path.expanduser("~/.config/regolith/tokens/google_calendar_api")
os.makedirs(tokendir, exist_ok=True)
tokenfile = os.path.join(tokendir, 'token.json')
curr = pathlib.Path(__file__).parent.resolve()
print(curr)
flow = InstalledAppFlow.from_client_secrets_file(
os.path.join(curr, 'credentials.json'),
['https://www.googleapis.com/auth/calendar.events'])
creds = flow.run_local_server(port=0)
with open(tokenfile, 'w') as token:
token.write(creds.to_json())
# Save the credentials for the next run
[docs]def get_target_repo_info(target_repo_id, repos):
"""checks if repo information is defined and valid in rc
Parameters:
target_repo_id - string
the id of the doc with the target repo information
repos - list
the list of repos. A repo must have a name, a url and a params
kwarg.
Returns:
The target repo document, or False if it is not present or properly
formulatedinformation
"""
setup_message = ("INFO: If you would like regolith to automatically create a repository in GitHub/GitLab, "
"please add your repository information in reolgithrc.json and "
"your private authentication token in "
"user.json respectively. See regolith documentation for details.")
target_repo = [repo for repo in repos if repo.get("_id", "") == target_repo_id]
if len(target_repo) == 0:
print(setup_message)
return False
if len(target_repo) > 1:
print(f"more than on repo found in regolithrc.json with the name {target_repo_id}")
return False
target_repo = target_repo[0]
message_params_not_defined = (f"WARNING: The request parameters may not be defined. "
f"Info we have: {target_repo}"
f"If you would like regolith to automatically create a repository in GitHub/GitLab, "
f"please add repository information in regolithrc.json. See regolith documentation "
f"for details.")
message_url_not_defined = ("WARNING: The request url may not be valid. "
"If you would like regolith to automatically create a repository in GitHub/GitLab, "
"please add repository information in regolithrc.json. See regolith documentation "
"for details.")
if not target_repo.get("params"):
print(message_params_not_defined)
return False
if not target_repo.get("params").get("name"):
print(message_params_not_defined)
return False
if not target_repo.get("url"):
print(message_url_not_defined)
return False
else:
built_url = f"{target_repo.get('url')}{target_repo.get('api_route')}"
url = urlparse(built_url)
if url.scheme and url.netloc and url.path:
target_repo['params'].update({"name": target_repo.get("params").get("name").strip().replace(" ", "_")})
target_repo['built_url'] = built_url
return target_repo
else:
print(message_url_not_defined)
return False
[docs]def get_target_token(target_token_id, tokens):
"""Checks if API authentication token is defined and valid in rc
Parameters:
target_token_id - string
the name of the personal access token (defined in rc)
rc - run control object
Returns:
The token if the token exists and False if not
"""
message_token_not_defined = ("WARNING: Cannot find an authentication token. It may not be correctly defined. If you would like regolith to "
"automatically create a repository in GitHub/GitLab, please add your private "
"authentication token in user.json. See regolith documentation for details.")
target_token = [token for token in tokens if token.get("_id") == target_token_id]
if len(target_token) == 0:
print(message_token_not_defined)
return None
if len(target_token) > 1:
print(f"more than one token found in regolithrc.json with the name {target_token_id}")
return None
if target_token[0].get("token", ""):
return target_token[0].get("token")
[docs]def create_repo(destination_id, token_info_id, rc):
""" Creates a repo at the target distination
tries to fail gracefully if repo information and token is not defined
Parameters:
destination_id - string
the id of the target repo information document
token_info_id - string
the id for the token info document (e.g. 'priv_token')
rc - run control object
the run control object that should contain rc.repos and rc.tokens docs
Returns:
Success message (repo target_repo has been created in talks) if repo is successfully created in target_repo
Warning/setup messages if unsuccessful (or if repo info or token are not valid)
"""
repo_info = get_target_repo_info(destination_id, rc.repos)
token = get_target_token(token_info_id, rc.tokens)
if repo_info and token:
try:
response = requests.post(repo_info.get('built_url'), params=repo_info['params'],
headers={'PRIVATE-TOKEN': token})
response.raise_for_status()
clone_text = f"{repo_info.get('url').replace('https://', '')}:{repo_info.get('namespace_name','<group/org name>')}/{repo_info['params'].get('name')}.git"
return f"repo {repo_info.get('params').get('name', 'unknown')} " \
f"has been created at {repo_info.get('url')}.\nClone this " \
f"to your local using (HTTPS):\ngit clone https://{clone_text}\n" \
f"or (SSH):\ngit clone git@{clone_text}"
except requests.exceptions.HTTPError:
raise HTTPError(f"WARNING: Unsuccessful attempt at making a GitHub/GitLab etc., repository "
f"due to an issue with the API call (status code: {response.status_code}). "
f"If you would like regolith to automatically create a repository in GitHub/GitLab, "
f"please add repository information in regolithrc.json. See regolith documentation "
f"for details.")
except requests.exceptions.RequestException as e:
raise SystemExit(e)
else:
return
[docs]def get_uuid():
'''
returns a uuid.uuid4 string
'''
return str(uuid.uuid4())