Source code for cvfe.data.preprocessor

__all__ = [
    "DataframePreprocessor",
    "CanadaDataframePreprocessor",
    "FileTransformCompose",
    "FileTransform",
    "CopyFile",
    "MakeContentCopyProtectedMachineReadable",
]

import logging
import shutil
from typing import Any, Callable, Optional

import numpy as np
import pandas as pd
import pikepdf
from dateutil import parser
from dateutil.relativedelta import *

from cvfe.configs import CANADA_COUNTRY_CODE_TO_NAME
from cvfe.data import functional
from cvfe.data.constant import *
from cvfe.data.pdf import CanadaXFA

# config logger
logger = logging.getLogger(__name__)


[docs] class DataframePreprocessor: """A wrapper around builtin Pandas functions to make it easier for our data values A class that contains methods for dealing with dataframes regarding transformation of data such as filling missing values, dropping columns, or aggregating multiple columns into a single more meaningful one. This class needs to be extended for file specific preprocessing where tags are unique and need to be done entirely manually. In this case, :func:`file_specific_basic_transform` needs to be implemented. """
[docs] def __init__(self, dataframe: pd.DataFrame = None) -> None: """ Args: dataframe (:class:`pandas.DataFrame`, optional): Main dataframe to be preprocessed. Defaults to None. """ self.dataframe = dataframe
[docs] def column_dropper( self, string: str, exclude: Optional[str] = None, regex: bool = False, inplace: bool = True, ) -> Optional[pd.DataFrame]: """See :func:`cvfe.data.functional.column_dropper` for more information""" return functional.column_dropper( dataframe=self.dataframe, string=string, exclude=exclude, regex=regex, inplace=inplace, )
[docs] def fillna_datetime( self, col_base_name: str, type: DocTypes, one_sided: str | bool, date: Optional[str] = None, inplace: bool = False, ) -> Optional[pd.DataFrame]: """See :func:`cvfe.data.functional.fillna_datetime` for more details""" if date is None: date = T0 return functional.fillna_datetime( dataframe=self.dataframe, col_base_name=col_base_name, one_sided=one_sided, date=date, inplace=inplace, type=type, )
[docs] def aggregate_datetime( self, col_base_name: str, new_col_name: str, type: DocTypes, if_nan: Optional[str | Callable] = None, one_sided: Optional[str] = None, reference_date: Optional[str] = None, current_date: Optional[str] = None, ) -> pd.DataFrame: """See :func:`cvfe.data.functional.aggregate_datetime` for more details""" return functional.aggregate_datetime( dataframe=self.dataframe, col_base_name=col_base_name, new_col_name=new_col_name, one_sided=one_sided, if_nan=if_nan, type=type, reference_date=reference_date, current_date=current_date, )
[docs] def file_specific_basic_transform(self, type: DocTypes, path: str) -> pd.DataFrame: """ Takes a specific file then does data type fixing, missing value filling, discretization, etc. Note: Since each files has its own unique tags and requirements, it is expected that all these transformation being hardcoded for each file, hence this method exists to just improve readability without any generalization to other problems or even files. Args: type (DocTypes): The input document type (see :class:`DocTypes <cvfe.data.constant.DocTypes>`) path (str): Path to the input document """ raise NotImplementedError
[docs] def change_dtype( self, col_name: str, dtype: Callable, if_nan: str | Callable = "skip", **kwargs, ): """See :func:`cvfe.data.functional.change_dtype` for more details""" return functional.change_dtype( dataframe=self.dataframe, col_name=col_name, dtype=dtype, if_nan=if_nan, **kwargs, )
[docs] def config_csv_to_dict(self, path: str) -> dict: """ Take a config CSV and return a dictionary of key and values Args: path (str): string path to config file """ config_df = pd.read_csv(path) return dict( zip(config_df[config_df.columns[0]], config_df[config_df.columns[1]]) )
[docs] class CanadaDataframePreprocessor(DataframePreprocessor):
[docs] def __init__(self, dataframe: Optional[pd.DataFrame] = None) -> None: super().__init__(dataframe) self.base_date = ( None # the time forms were filled, considered "today" for forms ) # get country code to name dict self.config_path = CANADA_COUNTRY_CODE_TO_NAME self.CANADA_COUNTRY_CODE_TO_NAME = self.config_csv_to_dict(self.config_path)
[docs] def convert_country_code_to_name(self, string: str) -> str: """ Converts the (custom and non-standard) code of a country to its name given the XFA docs LOV section. Args: string (str): input code string """ country = [c for c in self.CANADA_COUNTRY_CODE_TO_NAME.keys() if string in c] if country: return self.CANADA_COUNTRY_CODE_TO_NAME[country] else: logger.debug( ( f'"{string}" country code could not be found' f'in the config file="{self.config_path}".' ) ) return CanadaFillna.COUNTRY_CODE_5257E
[docs] def file_specific_basic_transform(self, type: DocTypes, path: str) -> pd.DataFrame: canada_xfa = CanadaXFA() # Canada PDF to XML if type == DocTypes.CANADA_5257E: # XFA to XML xml = canada_xfa.extract_raw_content(path) xml = canada_xfa.clean_xml_for_csv(xml=xml, type=DocTypes.CANADA_5257E) # XML to flattened dict data_dict = canada_xfa.xml_to_flattened_dict(xml=xml) data_dict = canada_xfa.flatten_dict(data_dict) # clean flattened dict data_dict = functional.dict_summarizer( data_dict, cutoff_term=CanadaCutoffTerms.CA5257E, KEY_ABBREVIATION_DICT=CANADA_5257E_KEY_ABBREVIATION, VALUE_ABBREVIATION_DICT=CANADA_5257E_VALUE_ABBREVIATION, ) # convert each data dict to a dataframe dataframe = pd.DataFrame.from_dict(data=[data_dict], orient="columns") self.dataframe = dataframe # drop pepeg columns # warning: setting `errors='ignore` ignores errors if columns do not exist! dataframe.drop( CANADA_5257E_DROP_COLUMNS, axis=1, inplace=True, errors="ignore" ) # Adult binary state: adult=True or child=False dataframe["P1.AdultFlag"] = dataframe["P1.AdultFlag"].apply( lambda x: True if x == "adult" else False ) # service language: 1=En, 2=Fr -> need to be changed to categorical dataframe = self.change_dtype( col_name="P1.PD.ServiceIn.ServiceIn", dtype=np.int8, if_nan="skip" ) # AliasNameIndicator: 1=True, 0=False dataframe[ "P1.PD.AliasName.AliasNameIndicator.AliasNameIndicator" ] = dataframe[ "P1.PD.AliasName.AliasNameIndicator.AliasNameIndicator" ].apply( lambda x: True if x == "Y" else False ) # VisaType: String -> categorical dataframe = self.change_dtype( col_name="P1.PD.VisaType.VisaType", dtype=str, if_nan="fill", value=CanadaFillna.VISA_TYPE_5257E, ) # Birth City: String -> categorical dataframe = self.change_dtype( col_name="P1.PD.PlaceBirthCity", dtype=str, if_nan="fill", value=CanadaFillna.PLACE_BIRTH_CITY_5257E, ) # Birth country: string -> categorical dataframe = self.change_dtype( col_name="P1.PD.PlaceBirthCountry", dtype=str, if_nan="fill", value=CanadaFillna.COUNTRY_5257E, ) # citizen of: string -> categorical dataframe = self.change_dtype( col_name="P1.PD.Citizenship.Citizenship", dtype=str, if_nan="fill", value=CanadaFillna.CITIZENSHIP_5257E, ) # current country of residency: string -> categorical dataframe = self.change_dtype( col_name="P1.PD.CurrCOR.Row2.Country", dtype=str, if_nan="fill", value=CanadaFillna.COUNTRY_5257E, ) # current country of residency status: string -> categorical dataframe = self.change_dtype( col_name="P1.PD.CurrCOR.Row2.Status", dtype=np.int8, if_nan="fill", value=np.int8(CanadaFillna.RESIDENCY_STATUS_5257E), ) # current country of residency other description: bool -> categorical dataframe = self.change_dtype( col_name="P1.PD.CurrCOR.Row2.Other", dtype=bool, if_nan="fill", value=CanadaFillna.OTHER_DESCRIPTION_INDICATOR_5257E, ) # validation date of information, i.e. current date: datetime dataframe = self.change_dtype( col_name="P3.Sign.C1CertificateIssueDate", dtype=parser.parse, if_nan="skip", ) # keep it so we can access for other file if that was None if not dataframe["P3.Sign.C1CertificateIssueDate"].isna().all(): self.base_date = dataframe["P3.Sign.C1CertificateIssueDate"] # date of birth in year: string -> datetime dataframe = self.change_dtype( col_name="P1.PD.DOBYear", dtype=parser.parse, if_nan="skip" ) # current country of residency period: None -> Datetime (=age period) dataframe = self.change_dtype( col_name="P1.PD.CurrCOR.Row2.FromDate", dtype=parser.parse, if_nan="fill", value=dataframe["P1.PD.DOBYear"], ) dataframe = self.change_dtype( col_name="P1.PD.CurrCOR.Row2.ToDate", dtype=parser.parse, if_nan="fill", value=dataframe["P3.Sign.C1CertificateIssueDate"], ) # has previous country of residency: bool -> categorical dataframe["P1.PD.PCRIndicator"] = dataframe["P1.PD.PCRIndicator"].apply( lambda x: True if x == "Y" else False ) # clean previous country of residency features country_tag_list = [ c for c in dataframe.columns.values if "P1.PD.PrevCOR." in c ] PREV_COUNTRY_MAX_FEATURES = 4 for i in range(len(country_tag_list) // PREV_COUNTRY_MAX_FEATURES): # in XLA extracted file, this section start from `Row2` (ie. i+2) i += 2 # previous country of residency 02: string -> categorical dataframe = self.change_dtype( col_name="P1.PD.PrevCOR.Row" + str(i) + ".Country", dtype=str, if_nan="fill", value=CanadaFillna.PREVIOUS_COUNTRY_5257E, ) # previous country of residency status 02: string -> categorical dataframe = self.change_dtype( col_name="P1.PD.PrevCOR.Row" + str(i) + ".Status", dtype=np.int8, if_nan="fill", value=np.int8(CanadaFillna.RESIDENCY_STATUS_5257E), ) # previous country of residency 02 period (P1.PD.PrevCOR.Row2): string -> datetime -> int days dataframe = self.change_dtype( col_name="P1.PD.PrevCOR.Row" + str(i) + ".FromDate", dtype=parser.parse, if_nan="fill", value=dataframe["P3.Sign.C1CertificateIssueDate"], ) dataframe = self.change_dtype( col_name="P1.PD.PrevCOR.Row" + str(i) + ".ToDate", dtype=parser.parse, if_nan="fill", value=dataframe["P3.Sign.C1CertificateIssueDate"], ) # apply from country of residency (cwa=country where apply): Y=True, N=False dataframe["P1.PD.SameAsCORIndicator"] = dataframe[ "P1.PD.SameAsCORIndicator" ].apply(lambda x: True if x == "Y" else False) # country where applying: string -> categorical dataframe = self.change_dtype( col_name="P1.PD.CWA.Row2.Country", dtype=str, if_nan="fill", value=CanadaFillna.COUNTRY_WHERE_APPLYING_5257E, ) # country where applying status: string -> categorical dataframe = self.change_dtype( col_name="P1.PD.CWA.Row2.Status", dtype=np.int8, if_nan="fill", value=np.int8(CanadaFillna.RESIDENCY_STATUS_5257E), ) # country where applying other: string -> categorical dataframe = self.change_dtype( col_name="P1.PD.CWA.Row2.Other", dtype=bool, if_nan="fill", value=CanadaFillna.OTHER_DESCRIPTION_INDICATOR_5257E, ) # country where applying period: datetime -> int days dataframe = self.change_dtype( col_name="P1.PD.CWA.Row2.FromDate", dtype=parser.parse, if_nan="fill", value=dataframe["P3.Sign.C1CertificateIssueDate"], ) dataframe = self.change_dtype( col_name="P1.PD.CWA.Row2.ToDate", dtype=parser.parse, if_nan="fill", value=dataframe["P3.Sign.C1CertificateIssueDate"], ) # marriage period: datetime -> int days dataframe = self.change_dtype( col_name="P1.MS.SecA.DateOfMarr", dtype=parser.parse, if_nan="fill", value=dataframe["P3.Sign.C1CertificateIssueDate"], ) # previous marriage: Y=True, N=False dataframe["P2.MS.SecA.PrevMarrIndicator"] = dataframe[ "P2.MS.SecA.PrevMarrIndicator" ].apply(lambda x: True if x == "Y" else False) # previous marriage type of relationship dataframe = self.change_dtype( col_name="P2.MS.SecA.TypeOfRelationship", dtype=str, if_nan="fill", value=CanadaFillna.MARRIAGE_TYPE_5257E, ) # previous spouse age period: string -> datetime -> int days dataframe = self.change_dtype( col_name="P2.MS.SecA.PrevSpouseDOB.DOBYear", dtype=parser.parse, if_nan="fill", value=dataframe["P3.Sign.C1CertificateIssueDate"], ) # previous marriage period: string -> datetime -> int days dataframe = self.change_dtype( col_name="P2.MS.SecA.FromDate", dtype=parser.parse, if_nan="fill", value=dataframe["P3.Sign.C1CertificateIssueDate"], ) dataframe = self.change_dtype( col_name="P2.MS.SecA.ToDate.ToDate", dtype=parser.parse, if_nan="fill", value=dataframe["P3.Sign.C1CertificateIssueDate"], ) # passport country of issue: string -> categorical dataframe = self.change_dtype( col_name="P2.MS.SecA.Psprt.CountryofIssue.CountryofIssue", dtype=str, if_nan="fill", value=CanadaFillna.PASSPORT_COUNTRY_5257E, ) # expiry remaining period: datetime -> int days # if None, fill with 1 year ago, ie. period=1year temp_date = dataframe["P3.Sign.C1CertificateIssueDate"].apply( lambda x: x + relativedelta(years=-1) ) dataframe = self.change_dtype( col_name="P2.MS.SecA.Psprt.ExpiryDate", dtype=parser.parse, if_nan="fill", value=temp_date, ) # native lang: string -> categorical dataframe = self.change_dtype( col_name="P2.MS.SecA.Langs.languages.nativeLang.nativeLang", dtype=str, if_nan="fill", value=CanadaFillna.NATIVE_LANG_5257E, ) # communication lang: Eng, Fr, both, none -> categorical dataframe = self.change_dtype( col_name="P2.MS.SecA.Langs.languages.ableToCommunicate.ableToCommunicate", dtype=str, if_nan="fill", value=CanadaFillna.LANGUAGES_ABLE_TO_COMMUNICATE_5257E, ) # language official test: bool -> binary dataframe["P2.MS.SecA.Langs.LangTest"] = dataframe[ "P2.MS.SecA.Langs.LangTest" ].apply(lambda x: True if x == "Y" else False) # have national ID: bool -> binary dataframe["P2.natID.q1.natIDIndicator"] = dataframe[ "P2.natID.q1.natIDIndicator" ].apply(lambda x: True if x == "Y" else False) # national ID country of issue: string -> categorical dataframe = self.change_dtype( col_name="P2.natID.natIDdocs.CountryofIssue.CountryofIssue", dtype=str, if_nan="fill", value=CanadaFillna.ID_COUNTRY_5257E, ) # United States doc: bool -> binary dataframe["P2.USCard.q1.usCardIndicator"] = dataframe[ "P2.USCard.q1.usCardIndicator" ].apply(lambda x: True if x == "Y" else False) # US Canada phone number: bool -> binary dataframe["P2.CI.cntct.PhnNums.Phn.CanadaUS"] = dataframe[ "P2.CI.cntct.PhnNums.Phn.CanadaUS" ].apply(lambda x: True if x == "1" else False) # US Canada alt phone number: bool -> binary dataframe["P2.CI.cntct.PhnNums.AltPhn.CanadaUS"] = dataframe[ "P2.CI.cntct.PhnNums.AltPhn.CanadaUS" ].apply(lambda x: True if x == "1" else False) # purpose of visit: string, 8 states -> categorical dataframe = self.change_dtype( col_name="P3.DOV.PrpsRow1.PrpsOfVisit.PrpsOfVisit", dtype=np.int8, if_nan="fill", value=np.int8(CanadaFillna.PURPOSE_OF_VISIT_5257E), ) # 7 is other in the form # purpose of visit description: string -> binary dataframe = self.change_dtype( col_name="P3.DOV.PrpsRow1.Other.Other", dtype=bool, if_nan="fill", value=CanadaFillna.OTHER_DESCRIPTION_INDICATOR_5257E, ) # how long going to stay: None -> datetime (0 days) dataframe = self.change_dtype( col_name="P3.DOV.PrpsRow1.HLS.FromDate", dtype=parser.parse, if_nan="fill", value=dataframe["P3.Sign.C1CertificateIssueDate"], ) dataframe = self.change_dtype( col_name="P3.DOV.PrpsRow1.HLS.ToDate", dtype=parser.parse, if_nan="fill", value=dataframe["P3.Sign.C1CertificateIssueDate"], ) # fund to integer dataframe = self.change_dtype( col_name="P3.DOV.PrpsRow1.Funds.Funds", dtype=np.int32, if_nan="skip" ) # relation to applicant of purpose of visit 01: string -> categorical dataframe = self.change_dtype( col_name="P3.DOV.cntcts_Row1.RelationshipToMe.RelationshipToMe", dtype=str, if_nan="fill", value=CanadaFillna.CONTACT_TYPE_5257E, ) # relation to applicant of purpose of visit 02: string -> categorical dataframe = self.change_dtype( col_name="P3.cntcts_Row2.Relationship.RelationshipToMe", dtype=str, if_nan="fill", value=CanadaFillna.CONTACT_TYPE_5257E, ) # higher education: bool -> binary dataframe["P3.Edu.EduIndicator"] = dataframe["P3.Edu.EduIndicator"].apply( lambda x: True if x == "Y" else False ) # higher education period: string -> datetime -> int days dataframe = self.change_dtype( col_name="P3.Edu.Edu_Row1.FromYear", dtype=parser.parse, if_nan="fill", value=dataframe["P3.Sign.C1CertificateIssueDate"], ) dataframe = self.change_dtype( col_name="P3.Edu.Edu_Row1.ToYear", dtype=parser.parse, if_nan="fill", value=dataframe["P3.Sign.C1CertificateIssueDate"], ) # higher education country: string -> categorical dataframe = self.change_dtype( col_name="P3.Edu.Edu_Row1.Country.Country", dtype=str, if_nan="fill", value=CanadaFillna.COUNTRY_5257E, ) # field of study: string -> categorical dataframe["P3.Edu.Edu_Row1.FieldOfStudy"] = dataframe[ "P3.Edu.Edu_Row1.FieldOfStudy" ].astype("string") # clean occupation features occupation_tag_list = [ c for c in dataframe.columns.values if "P3.Occ.OccRow" in c ] PREV_OCCUPATION_MAX_FEATURES = 9 for i in range(len(occupation_tag_list) // PREV_OCCUPATION_MAX_FEATURES): i += 1 # in the form, it starts from Row1 (ie. i+1) # occupation period 01: none -> string year -> int days dataframe = self.change_dtype( col_name="P3.Occ.OccRow" + str(i) + ".FromYear", dtype=parser.parse, if_nan="fill", value=dataframe["P3.Sign.C1CertificateIssueDate"], ) dataframe = self.change_dtype( col_name="P3.Occ.OccRow" + str(i) + ".ToYear", dtype=parser.parse, if_nan="fill", value=dataframe["P3.Sign.C1CertificateIssueDate"], ) # occupation type 01: string -> categorical dataframe = self.change_dtype( col_name="P3.Occ.OccRow" + str(i) + ".Occ.Occ", dtype=str, if_nan="fill", value=CanadaFillna.OCCUPATION_5257E, ) # occupation country: string -> categorical dataframe = self.change_dtype( col_name="P3.Occ.OccRow" + str(i) + ".Country.Country", dtype=str, if_nan="fill", value=CanadaFillna.COUNTRY_5257E, ) # medical details: string -> binary dataframe = self.change_dtype( col_name="P3.BGI.Details.MedicalDetails", dtype=bool, if_nan="fill", value=CanadaFillna.INDICATOR_FIELD_5257E, ) # other than medical: string -> binary dataframe = self.change_dtype( col_name="P3.BGI.otherThanMedic", dtype=bool, if_nan="fill", value=CanadaFillna.INDICATOR_FIELD_5257E, ) # without authentication stay, work, etc: bool -> binary dataframe["P3.noAuthStay"] = dataframe["P3.noAuthStay"].apply( lambda x: True if x == "Y" else False ) # deported or refused entry: bool -> binary dataframe["P3.refuseDeport"] = dataframe["P3.refuseDeport"].apply( lambda x: True if x == "Y" else False ) # previously applied: bool -> binary dataframe["P3.BGI2.PrevApply"] = dataframe["P3.BGI2.PrevApply"].apply( lambda x: True if x == "Y" else False ) # criminal record: bool -> binary dataframe["P3.PWrapper.criminalRec"] = dataframe[ "P3.PWrapper.criminalRec" ].apply(lambda x: True if x == "Y" else False) # military record: bool -> binary dataframe["P3.PWrapper.Military.Choice"] = dataframe[ "P3.PWrapper.Military.Choice" ].apply(lambda x: True if x == "Y" else False) # political, violent movement record: bool -> binary dataframe["P3.PWrapper.politicViol"] = dataframe[ "P3.PWrapper.politicViol" ].apply(lambda x: True if x == "Y" else False) # witness of ill treatment: bool -> binary dataframe["P3.PWrapper.witnessIllTreat"] = dataframe[ "P3.PWrapper.witnessIllTreat" ].apply(lambda x: True if x == "Y" else False) return dataframe if type == DocTypes.CANADA_5645E: # XFA to XML xml = canada_xfa.extract_raw_content(path) xml = canada_xfa.clean_xml_for_csv(xml=xml, type=DocTypes.CANADA_5645E) # XML to flattened dict data_dict = canada_xfa.xml_to_flattened_dict(xml=xml) data_dict = canada_xfa.flatten_dict(data_dict) # clean flattened dict data_dict = functional.dict_summarizer( data_dict, cutoff_term=CanadaCutoffTerms.CA5645E, KEY_ABBREVIATION_DICT=CANADA_5645E_KEY_ABBREVIATION, VALUE_ABBREVIATION_DICT=None, ) # convert each data dict to a dataframe dataframe = pd.DataFrame.from_dict(data=[data_dict], orient="columns") self.dataframe = dataframe # drop pepeg columns # warning: setting `errors='ignore` ignores errors if columns do not exist! dataframe.drop( CANADA_5645E_DROP_COLUMNS, axis=1, inplace=True, errors="ignore" ) # transform multiple pleb columns into a single chad one and fixing column dtypes # type of application: (already onehot) string -> int cols = [col for col in dataframe.columns.values if "p1.Subform1" in col] for c in cols: dataframe = self.change_dtype( col_name=c, dtype=np.int16, if_nan="fill", value=np.int16(CanadaFillna.VISA_APPLICATION_TYPE_5645E), ) # drop all Accompany=No and only rely on Accompany=Yes using binary state self.column_dropper(string="No", inplace=True) # applicant marriage status: string to integer dataframe = self.change_dtype( col_name="p1.SecA.App.ChdMStatus", dtype=np.int16, if_nan="fill", value=np.int16(CanadaFillna.CHILD_MARRIAGE_STATUS_5645E), ) # validation date of information, i.e. current date: datetime dataframe = self.change_dtype( col_name="p1.SecC.SecCdate", dtype=parser.parse, if_nan="fill", value=self.base_date, ) # spouse date of birth: string -> datetime dataframe = self.change_dtype( col_name="p1.SecA.Sps.SpsDOB", dtype=parser.parse, if_nan="fill", value=dataframe["p1.SecC.SecCdate"], ) # spouse country of birth: string -> categorical dataframe = self.change_dtype( col_name="p1.SecA.Sps.SpsCOB", dtype=str, if_nan="skip" ) # spouse occupation type (issue #2): string -> categorical dataframe = self.change_dtype( col_name="p1.SecA.Sps.SpsOcc", dtype=str, if_nan="fill", value=CanadaFillna.OCCUPATION_5257E, ) # spouse accompanying: coming=True or not_coming=False dataframe["p1.SecA.Sps.SpsAccomp"] = dataframe[ "p1.SecA.Sps.SpsAccomp" ].apply(lambda x: True if x == "1" else False) # mother date of birth: string -> datetime dataframe = self.change_dtype( col_name="p1.SecA.Mo.MoDOB", dtype=parser.parse, if_nan="fill", value=dataframe["p1.SecC.SecCdate"], ) # mother occupation type (issue #2): string -> categorical dataframe = self.change_dtype( col_name="p1.SecA.Mo.MoOcc", dtype=str, if_nan="fill", value=CanadaFillna.OCCUPATION_5257E, ) # mother marriage status: int -> categorical dataframe = self.change_dtype( col_name="p1.SecA.Mo.ChdMStatus", dtype=np.int16, if_nan="fill", value=np.int16(CanadaFillna.CHILD_MARRIAGE_STATUS_5645E), ) # mother accompanying: coming=True or not_coming=False dataframe["p1.SecA.Mo.MoAccomp"] = dataframe["p1.SecA.Mo.MoAccomp"].apply( lambda x: True if x == "1" else False ) # father date of birth: string -> datetime dataframe = self.change_dtype( col_name="p1.SecA.Fa.FaDOB", dtype=parser.parse, if_nan="fill", value=dataframe["p1.SecC.SecCdate"], ) # mother occupation type (issue #2): string -> categorical dataframe = self.change_dtype( col_name="p1.SecA.Fa.FaOcc", dtype=str, if_nan="fill", value=CanadaFillna.OCCUPATION_5257E, ) # father marriage status: int -> categorical dataframe = self.change_dtype( col_name="p1.SecA.Fa.ChdMStatus", dtype=np.int16, if_nan="fill", value=np.int16(CanadaFillna.CHILD_MARRIAGE_STATUS_5645E), ) # father accompanying: coming=True or not_coming=False dataframe["p1.SecA.Fa.FaAccomp"] = dataframe["p1.SecA.Fa.FaAccomp"].apply( lambda x: True if x == "1" else False ) # children's status children_tag_list = [ c for c in dataframe.columns.values if "p1.SecB.Chd" in c ] CHILDREN_MAX_FEATURES = 7 for i in range(len(children_tag_list) // CHILDREN_MAX_FEATURES): # child's marriage status 01: string to integer dataframe = self.change_dtype( col_name="p1.SecB.Chd.[" + str(i) + "].ChdMStatus", dtype=np.int16, if_nan="fill", value=np.int16(CanadaFillna.CHILD_MARRIAGE_STATUS_5645E), ) # child's relationship 01: string -> categorical dataframe = self.change_dtype( col_name="p1.SecB.Chd.[" + str(i) + "].ChdRel", dtype=str, if_nan="fill", value=CanadaFillna.CHILD_RELATION_5645E, ) # child's date of birth 01: string -> datetime dataframe = self.change_dtype( col_name="p1.SecB.Chd.[" + str(i) + "].ChdDOB", dtype=parser.parse, if_nan="skip", ) # child's country of birth 01: string -> categorical dataframe = self.change_dtype( col_name="p1.SecB.Chd.[" + str(i) + "].ChdCOB", dtype=str, if_nan="fill", value=CanadaFillna.COUNTRY_5257E, ) # child's occupation type 01 (issue #2): string -> categorical dataframe = self.change_dtype( col_name="p1.SecB.Chd.[" + str(i) + "].ChdOcc", dtype=str, if_nan="fill", value=CanadaFillna.OCCUPATION_5257E, ) # child's marriage status: int -> categorical dataframe = self.change_dtype( col_name="p1.SecB.Chd.[" + str(i) + "].ChdMStatus", dtype=np.int16, if_nan="fill", value=np.int16(CanadaFillna.CHILD_MARRIAGE_STATUS_5645E), ) # child's accompanying 01: coming=True or not_coming=False dataframe["p1.SecB.Chd.[" + str(i) + "].ChdAccomp"] = dataframe[ "p1.SecB.Chd.[" + str(i) + "].ChdAccomp" ].apply(lambda x: True if x == "1" else False) # check if the child does not exist and fill it properly (ghost case monkaS) if ( ( dataframe["p1.SecB.Chd.[" + str(i) + "].ChdMStatus"] == CanadaFillna.CHILD_MARRIAGE_STATUS_5645E ).all() and ( dataframe["p1.SecB.Chd.[" + str(i) + "].ChdRel"] == "OTHER" ).all() and (dataframe["p1.SecB.Chd.[" + str(i) + "].ChdDOB"].isna()).all() and ( dataframe["p1.SecB.Chd.[" + str(i) + "].ChdAccomp"] == False ).all() ): # ghost child's date of birth: None -> datetime (current date) -> 0 days dataframe = self.change_dtype( col_name="p1.SecB.Chd.[" + str(i) + "].ChdDOB", dtype=parser.parse, if_nan="fill", value=dataframe["p1.SecC.SecCdate"], ) # siblings' status siblings_tag_list = [ c for c in dataframe.columns.values if "p1.SecC.Chd" in c ] SIBLINGS_MAX_FEATURES = 8 for i in range(len(siblings_tag_list) // SIBLINGS_MAX_FEATURES): # sibling's marriage status 01: string to integer dataframe = self.change_dtype( col_name="p1.SecC.Chd.[" + str(i) + "].ChdMStatus", dtype=np.int16, if_nan="fill", value=np.int16(CanadaFillna.CHILD_MARRIAGE_STATUS_5645E), ) # sibling's relationship 01: string -> categorical dataframe = self.change_dtype( col_name="p1.SecC.Chd.[" + str(i) + "].ChdRel", dtype=str, if_nan="fill", value=CanadaFillna.CHILD_RELATION_5645E, ) # sibling's date of birth 01: string -> datetime dataframe = self.change_dtype( col_name="p1.SecC.Chd.[" + str(i) + "].ChdDOB", dtype=parser.parse, if_nan="skip", ) # sibling's country of birth 01: string -> categorical dataframe = self.change_dtype( col_name="p1.SecC.Chd.[" + str(i) + "].ChdCOB", dtype=str, if_nan="fill", value=CanadaFillna.COUNTRY_5257E, ) # sibling's occupation type 01 (issue #2): string -> categorical dataframe = self.change_dtype( col_name="p1.SecC.Chd.[" + str(i) + "].ChdOcc", dtype=str, if_nan="fill", value=CanadaFillna.OCCUPATION_5257E, ) # sibling's accompanying: coming=True or not_coming=False dataframe["p1.SecC.Chd.[" + str(i) + "].ChdAccomp"] = dataframe[ "p1.SecC.Chd.[" + str(i) + "].ChdAccomp" ].apply(lambda x: True if x == "1" else False) # check if the sibling does not exist and fill it properly (ghost case monkaS) if ( ( dataframe["p1.SecC.Chd.[" + str(i) + "].ChdMStatus"] == CanadaFillna.CHILD_MARRIAGE_STATUS_5645E ).all() and ( dataframe["p1.SecC.Chd.[" + str(i) + "].ChdRel"] == "OTHER" ).all() and (dataframe["p1.SecC.Chd.[" + str(i) + "].ChdOcc"].isna()).all() and ( dataframe["p1.SecC.Chd.[" + str(i) + "].ChdAccomp"] == False ).all() ): # ghost sibling's date of birth: None -> datetime (current date) -> 0 days dataframe = self.change_dtype( col_name="p1.SecC.Chd.[" + str(i) + "].ChdDOB", dtype=parser.parse, if_nan="fill", value=dataframe["p1.SecC.SecCdate"], ) return dataframe if type == DocTypes.CANADA_LABEL: dataframe = pd.read_csv(path, sep=" ", names=["VisaResult"]) functional.change_dtype( dataframe=dataframe, col_name="VisaResult", dtype=np.int8, if_nan="fill", value=np.int8(CanadaFillna.VISA_RESULT), ) return dataframe
[docs] class FileTransform: """A base class for applying transforms as a composable object over files. Any behavior over the files itself (not the content of files) must extend this class. """
[docs] def __init__(self) -> None: pass
[docs] def __call__(self, src: str, dst: str, *args: Any, **kwds: Any) -> Any: """ Args: src (str): source file to be processed dst (str): the pass that the processed file to be saved """ pass
[docs] class CopyFile(FileTransform): """Only copies a file, a wrapper around `shutil`'s copying methods Default is set to 'cf', i.e. `shutil.copyfile`. For more info see shutil_ documentation. Reference: 1. https://stackoverflow.com/a/30359308/18971263 """
[docs] def __init__(self, mode: str) -> None: super().__init__() self.COPY_MODES = ["c", "cf", "c2"] self.mode = mode if mode is not None else "cf" self.__check_mode(mode=mode)
[docs] def __call__(self, src: str, dst: str, *args: Any, **kwds: Any) -> Any: if self.mode == "c": shutil.copy(src=src, dst=dst) elif self.mode == "cf": shutil.copyfile(src=src, dst=dst) elif self.mode == "c2": shutil.copy2(src=src, dst=dst)
def __check_mode(self, mode: str): """Checks copying mode to be available in shutil_ Args: mode: copying mode in `shutil`, one of `'c'`, `'cf'`, `'c2'` .. _shutil: https://docs.python.org/3/library/shutil.html """ if not mode in self.COPY_MODES: raise ValueError( (f"Mode {mode} does not exist,", f'choose one of "{self.COPY_MODES}".') )
[docs] class MakeContentCopyProtectedMachineReadable(FileTransform): """Reads a 'content-copy' protected PDF and removes this restriction Removing the protection is done by saving a "printed" version of via pikepdf_ References: 1. https://www.reddit.com/r/Python/comments/t32z2o/simple_code_to_unlock_all_readonly_pdfs_in/ 2. https://pikepdf.readthedocs.io/en/latest/ .. _pikepdf: https://pikepdf.readthedocs.io/en/latest/ """
[docs] def __init__(self) -> None: super().__init__()
[docs] def __call__(self, src: str, dst: str, *args: Any, **kwds: Any) -> Any: """ Args: src (str): source file to be processed dst (str): destination to save the processed file Returns: Any: None """ pdf = pikepdf.open(src, allow_overwriting_input=True) pdf.save(dst)
[docs] class FileTransformCompose: """Composes several transforms operating on files together The transforms should be tied to files with keyword and this will be only applying functions on files that match the keyword using a dictionary Transformation dictionary over files in the following structure:: { FileTransform: 'filter_str', ..., } Note: Transforms will be applied in order of the keys in the dictionary """
[docs] def __init__(self, transforms: dict[FileTransform, str]) -> None: """ Args: transforms (dict[FileTransform, str]): a dictionary of transforms, where the key is the instance of FileTransform and the value is the keyword that the transform will be applied to Raises: ValueError: if the keyword is not a string """ if transforms is not None: for k in transforms.keys(): if not issubclass(k.__class__, FileTransform): raise TypeError(f"Keys must be {FileTransform} instance.") self.transforms = transforms
[docs] def __call__(self, src: str, dst: str, *args: Any, **kwds: Any) -> Any: """Applies transforms in order Args: src (str): source file path to be processed dst (str): destination to save the processed file """ for transform, file_filter in self.transforms.items(): if file_filter in src: transform(src, dst)