"""Module containing the baseclass for data interactions."""
import datetime
import os
from configparser import ConfigParser
import pandas as pd
import pysftp
from .helpfer_functions import get_abs_path, hash_file
[docs]class DbBaseClass:
default_config_path = get_abs_path("default_config.ini")
def __init__(self, user_config_path=".user_config.ini"):
"""
Baseclass for data interactions.
Parameters
----------
user_config_path : str, optional
Path to the user specific config, which will overwrite default settings.
by default ".user_config.ini"
"""
self.user_config_path = get_abs_path(user_config_path)
[docs] def get_pandas_now(self) -> pd.Timestamp: # type: ignore
"""
Return datetime.now as pd.Timestamp.
Returns
-------
pd.Timestamp:
current time as timestamp: pd.Timestamp
"""
return pd.to_datetime(datetime.datetime.now())
[docs] def get_datetime_now(self) -> datetime.datetime:
"""
Helpermethod for mocking of datetime.datetime.now() in unittests.
Returns
-------
datetime.datetime:
datetime.now()
"""
return datetime.datetime.now()
[docs] def load_config(self) -> ConfigParser:
"""Load the config files and sets all necessary properties."""
config = ConfigParser()
config.read([self.default_config_path, self.user_config_path])
self.data_folder_path = get_abs_path(
config.get("paths", "data_folder", fallback="../data")
)
self.db_path_offline: str = os.path.join(self.data_folder_path, "local_db.tsv")
self.db_path_online: str = os.path.join(self.data_folder_path, "remote_db.tsv")
self.manual_db_path: str = os.path.join(self.data_folder_path, "manual_db.tsv")
self.contract_info_path: str = os.path.join(self.data_folder_path, "contract_info.tsv")
self.local_files = pd.DataFrame(
{
"local_db": {"path": self.db_path_offline},
"manual_db": {"path": self.manual_db_path},
"contract_info": {"path": self.contract_info_path},
}
).T
host = config.get("login", "host")
username = config.get("login", "username")
password = config.get("login", "password")
port = config.get("login", "port", fallback=22)
self.db_path = config.get("login", "db_path")
self.login_dict = {
"host": host,
"username": username,
"password": password,
"port": port,
}
occupations = config.get("occupation", "occupations").split(",")
last_occupation = config.get("occupation", "last_occupation")
if last_occupation in occupations:
self.occupation = last_occupation
# preventing some errors with different versions of pysftp
try:
cnopts = pysftp.CnOpts()
cnopts.hostkeys = None # disable host key checking.
self.login_dict["cnopts"] = cnopts
except Exception:
print("pysftp.CnOpts() doesn't exist")
# returning config so subclasses and use it to obtain more information if needed
return config
[docs] def load_db(self, db_path: str) -> pd.DataFrame:
"""
Read in the db file if it exists or creates a new one.
Parameters
----------
db_path : str
path to the db_file on the SFTP server
Returns
-------
pd.Dataframe
Loaded database.
"""
if not os.path.isfile(db_path):
return pd.DataFrame(
[
{
"start": self.get_pandas_now(),
"end": self.get_pandas_now(),
"occupation": self.occupation,
}
]
)
else:
return pd.read_csv(db_path, parse_dates=["start", "end"], sep="\t") # type: ignore
[docs] def clean_db(self) -> None:
"""Remove rows where the session work was less than 1min."""
work_time = self.db["end"] - self.db["start"] # pylint: disable=E0203
real_work_period = work_time > pd.to_timedelta(1, unit="m") # 1 minute
self.db = self.db[real_work_period]
[docs] def calc_file_hashes(self) -> pd.DataFrame:
"""
Calculate hashvalues for files.
Returns
-------
pd.DataFrame
Dataframe with file hashes.
"""
local_files = self.local_files.copy()
local_files["hashes"] = local_files["path"].apply(hash_file)
return local_files
[docs] def get_remote_db(self) -> bool:
"""
Download the db_file to db_path_online from the SFTP server.
This uses the values specified at ["login"]["db_path"] in the config file.
Returns
-------
bool
Whether database retrieval succeeded or not.
"""
try:
with pysftp.Connection(**self.login_dict) as sftp:
sftp.get(self.db_path, localpath=self.db_path_online, preserve_mtime=True)
return True
except Exception:
print("Failed to get remote_db")
return False
[docs] def push_remote_db(self) -> bool:
"""
Push the db_file from db_path_offline to the SFTP server.
This uses the values specified at ["login"]["db_path"] in the config file.
Returns
-------
bool
Whether database upload succeeded or not.
"""
try:
with pysftp.Connection(**self.login_dict) as sftp:
sftp.put(self.db_path_offline, remotepath=self.db_path, preserve_mtime=True)
return True
except Exception:
print("Failed to push remote_db")
return False
[docs] def merge_dbs(self) -> pd.DataFrame:
"""
Merge local db with remote db.
The overlap (same start) is replaced with the max value of end.
Returns
-------
pd.Dataframe
Local db merged with remote db, with striped overlap.
"""
remote_db = self.load_db(self.db_path_online)
if not self.db.equals(remote_db):
new_db = pd.merge(
self.db,
remote_db,
on=["occupation", "start", "end"], # type: ignore
how="outer",
)
# get conflicting start values (same start value different end value)
start_fix = new_db["start"][new_db["start"].duplicated()]
drop_list = []
for start_val in start_fix.values:
dup_index = new_db.index[new_db["start"].isin([start_val])]
max_end_ts = new_db["end"].loc[dup_index].max()
new_db.at[dup_index[0], "end"] = max_end_ts
drop_list.append(dup_index[1:])
flat_drop_list = [item for sublist in drop_list for item in sublist]
new_db.drop(new_db.index[flat_drop_list], inplace=True)
else:
new_db = self.db
new_db.drop_duplicates()
return new_db.sort_values(["start"]).reset_index(drop=True)