"""Module containing the Worktime calculator class."""
import datetime
import os
from typing import Dict, Union
import holidays
import pandas as pd
from holidays.holiday_base import HolidayBase
from pandas.tseries.offsets import CustomBusinessDay # type: ignore
from .base_classes import DbBaseClass
from .update_work_db import get_abs_path
# from .helpfer_functions import debug_printer
[docs]class WorktimeCalculator(DbBaseClass):
default_config_path = get_abs_path("default_config.ini")
def __init__(self, user_config_path=".user_config.ini"):
"""
Class to calculate the worktime.
Parameters
----------
user_config_path : str, optional
Path to the user specific config, which will overwrite default settings.
by default ".user_config.ini"
"""
super().__init__(user_config_path)
self.special_holidays = {}
self.load_config()
self.db = self.load_db()
self.split_date_overlap_session()
self.contract_worktime_df = self.generate_contract_worktime_df()
self.holidays = self.init_holidays()
[docs] def load_db(self) -> pd.DataFrame:
"""
Load Database remote or locally.
Tries to load the database directly from the server if possible, else
it loads the local database or throws an exception that isn't possible either.
Returns
-------
db: pandas.DataFrame
Database with the actually worked time
"""
if self.get_remote_db() and os.path.isfile(self.db_path_online):
return pd.read_csv(
self.db_path_online, parse_dates=["start", "end"], sep="\t"
) # type:ignore
elif os.path.isfile(self.db_path_offline):
return pd.read_csv(
self.db_path_offline, parse_dates=["start", "end"], sep="\t"
) # type:ignore
else:
raise Exception("There was no proper database file provided")
[docs] def load_config(self):
"""Load the config files and sets all necessary properties."""
config = super().load_config()
self.country = config.get("location", "country", fallback="")
self.province = config.get("location", "province", fallback="")
if "special_holidays" in config.sections():
self.special_holidays = config._sections["special_holidays"] # type:ignore
[docs] def generate_contract_worktime_df(self) -> pd.DataFrame:
"""
Generate a Dataframe with 'worktime' column.
Generate a template like DataFrame containing the mean daily work time for each day
counting as workday based on the contract details. This Template will be used later on
to generate the manual_df and holiday_df.
Returns
-------
contract_worktime_df: pandas.DataFrame
Dataframe containing all workdays and their mean daily working time,
since the 1st contract started until now
"""
contract_worktime_df = pd.DataFrame()
contract_info_df = pd.read_csv(
self.contract_info_path, parse_dates=["start", "end"], sep="\t" # type:ignore
)
# debug_printer(contract_info_df)
for _, row in contract_info_df.iterrows():
if pd.isna(row["end"]):
end_date = self.db["end"].max()
else:
end_date = row["end"]
business_days = CustomBusinessDay(weekmask=row["weekmask"])
start_values = pd.Series(
pd.date_range(row["start"], end_date, normalize=True, freq=business_days)
)
# calculate the daily worktime depending on the interval and weekmask
daily_worktime = self.get_daily_worktime(
row["frequenzy"], row["worktime"], row["weekmask"]
)
worktime_df = pd.DataFrame({"start": start_values})
worktime_df["worktime"] = pd.to_timedelta(daily_worktime, unit="h")
contract_worktime_df = contract_worktime_df.append(worktime_df)
# summing up worktime from different jobs at the same day
contract_worktime_df = contract_worktime_df.groupby("start")["worktime"].apply(
lambda x: x.sum()
)
return contract_worktime_df.reset_index().sort_values("start") # type:ignore
[docs] def get_daily_worktime(
self, frequenzy: str, worktime: Union[int, float], weekmask: str
) -> float:
"""
Calculate the mean daily work time based on the contract details.
Parameters
----------
frequenzy: str
Timeperiod in which the the user is supposed to to work worktime*1h
Supported values are: monthly, weekly
worktime: int, float
Time in hours that the user is supposed to work in frequenzy
weekmask: str
Listing of the abbreviated weekdays the user is supposed to work
i.e.: "Mon Tue Wed Thu Fri Sat"
Returns
-------
daily_worktime: float
"""
# calculate the daily worktime depending on the interval and weekmask
work_days_per_week = len(weekmask.split(" "))
if frequenzy == "weekly":
daily_worktime = worktime / work_days_per_week
else:
daily_worktime = (worktime * 12) / (365 - 52 * (7 - work_days_per_week))
return daily_worktime
[docs] def get_holiday_df(self) -> pd.DataFrame:
"""
Generate a Dataframe containing all needed holidays.
They begin at the start of the first contract until now,
as well as work time based on the contract parameters for those days.
Returns
-------
holiday_df: pandas.DataFrame
Dataframe containing all holidays from the start of the
first contract until now, as well as work time based on
the contract parameters for those days.
"""
is_holiday = self.contract_worktime_df["start"].apply(lambda x: x in self.holidays)
holiday_df = self.contract_worktime_df[is_holiday].copy()
holiday_df["end"] = holiday_df["start"]
holiday_df["occupation"] = "holiday"
holiday_df.sort_values(["start"], inplace=True)
return holiday_df[["start", "end", "occupation", "worktime"]].reset_index(drop=True)
[docs] def init_holidays(self) -> Union[HolidayBase, Dict]:
"""
Initialize the holidays with the custom holidays from the config.
Returns the class instance from holiday, which matches the country and province
given in the config and updates it with the special holidays, also given in the config
Returns
-------
Union[HolidayBase, Dict]
HolidayBase object for the given country and province or an empty dict.
"""
# fallback in case country wasn't provided
if self.country in dir(holidays):
# get holidays class depending on the country
country_class = getattr(holidays, self.country)
# init holiday class depending province
if self.province in country_class.subdivisions: # type: ignore
custom_holidays: HolidayBase = country_class(subdiv=self.province) # type: ignore
else:
custom_holidays: HolidayBase = country_class() # type: ignore
# update holidays with special_holidays, given in the config
special_holiday_update_dict = {}
for year in pd.unique(self.contract_worktime_df["start"].dt.year):
for key, val in self.special_holidays.items():
day = int(key.split("-")[0])
month = int(key.split("-")[1])
date = datetime.datetime(year, month, day)
special_holiday_update_dict[date] = val
# add new holidays to custom_holidays
custom_holidays.update(special_holiday_update_dict)
return custom_holidays
else:
return {}
[docs] def get_manual_df_with_workime(self) -> pd.DataFrame:
"""
Read the manual_db file.
That file contains information like sick time | vacation,
specified by manual_db in the config.
Then expands the given time periods to date_ranges on a daily base, drops the none
workdays and adds the daily work time based on the contract parameters for that day.
Returns
-------
pandas.DataFrame
Dataframe containing the to a daily base expanded entry's of manual_df.
"""
manual_df = pd.DataFrame()
manual_db = pd.read_csv(
self.manual_db_path, parse_dates=["start", "end"], sep="\t"
) # type:ignore
# expand start and end date to a range of dates
for _, row in manual_db.iterrows():
new_df = pd.DataFrame()
new_df["start"] = pd.date_range(row["start"], row["end"], normalize=True)
new_df["end"] = pd.date_range(row["start"], row["end"], normalize=True)
new_df["occupation"] = row["occupation"]
manual_df = manual_df.append(new_df)
# drop days which are holidays
work_day = manual_df["start"].apply(lambda x: x not in self.holidays)
manual_df = pd.merge(manual_df[work_day], self.contract_worktime_df, on="start")
return manual_df.sort_values(["start"]).reset_index(drop=True)
[docs] def split_date_overlap_session(self) -> None:
"""
Split sessions which contain midnight to to sessions.
The first lasting until midnight and the second starting at midnight.
.. code-block:: text
df before:
start end
1.1.1970 21:00:00 2.1.1970 02:00:00
df after:
start end
1.1.1970 21:00:00 2.1.1970 00:00:00
2.1.1970 00:00:00 2.1.1970 02:00:00
"""
# midnight of the start and end date, are at different dates
overlap_sessions = (
self.db["start"].dt.normalize() != self.db["end"].dt.normalize() # type:ignore
)
# copy overlapping part from the df
df_to_append = self.db[overlap_sessions].copy()
# set start date of the df to append to midnight
df_to_append.loc[:, "start"] = df_to_append["end"].dt.normalize()
# set the end date of the overlapping df to midnight
self.db.loc[overlap_sessions, "end"] = self.db[overlap_sessions]["end"].dt.normalize()
self.db = self.db.append(df_to_append)
self.db = self.db[(self.db["end"] - self.db["start"]) > pd.to_timedelta(1, unit="m")]
self.db = self.db.sort_values("start").reset_index(drop=True)
[docs] def get_total_df(self) -> pd.DataFrame:
"""
Calculate total Dataframe.
Returns
-------
pd.DataFrame
Dataframe with 'worktime' and time columns
See Also
--------
add_time_columns
"""
self.db["worktime"] = self.db["end"] - self.db["start"]
result_df = pd.concat(
[self.db, self.get_holiday_df(), self.get_manual_df_with_workime()], # type:ignore
sort=False,
)
result_df = self.add_time_columns(result_df)
result_df = result_df.sort_values(["start", "end"]).reset_index(drop=True)
return result_df[
["start", "end", "worktime", "year", "month", "week", "day", "occupation"]
]
[docs] @classmethod
def add_time_columns(
cls, df: pd.DataFrame, date_time_column_name: str = "start"
) -> pd.DataFrame:
"""
Add Year, Month, Week and Day columns to an existing Dataframe.
Parameters
----------
df: pd.DataFrame
Dataframe the columns should be added to.
date_time_column: str
Name of the column containing the used date
Returns
-------
pd.DataFrame
Dataframe with added Year, Month, Week and Day columns.
"""
result_df = df.copy()
date_time_column = result_df[date_time_column_name].dt
result_df["year"] = date_time_column.year # type:ignore
result_df["month"] = date_time_column.month # type:ignore
result_df["week"] = date_time_column.week # type:ignore
result_df["day"] = date_time_column.day # type:ignore
return result_df
[docs] def get_plot_df(self, rule="D", date_time_column="start") -> pd.DataFrame:
"""
Return a Dataframe prepared for plotting.
Dataframe with a DateTimeIndex, columns named by occupation and
containing the worked time of that occupation for the given samplingrate
Parameters
----------
rule : str
Resampling rule see pandas.DataFrame.resample
Returns
-------
pandas.DataFrame
Dataframe with a DateDimeIndex, columns named by occupation and
containing the worktime of that occupation for the samplingrate
"""
total_df = self.get_total_df().sort_values(date_time_column).reset_index(drop=True)
plot_df = pd.DataFrame(total_df.resample(rule, on=date_time_column).worktime.sum())
plot_df.columns = ["total"]
for occupation in total_df["occupation"].unique():
occupation_series = total_df[total_df["occupation"] == occupation]
occupation_series = occupation_series.resample(
rule, on=date_time_column
).worktime.sum()
occupation_series = occupation_series.rename(occupation)
plot_df = plot_df.join(occupation_series)
return plot_df.fillna(pd.Timedelta(seconds=0)) # type:ignore