Skip to content

Data Processing Module

This module handles data extraction, loading, and joining operations for NHS practice level appointment data according to NHS data standards and best practices.

The module implements three main pipeline stages: 1. DataExtractionStage - Extracts compressed data files to directories 2. DataLoadingStage - Loads crosstab data and mapping files from CSV files 3. DataJoiningStage - Combines monthly datasets and merges with mappings

Classes:

Name Description
DataExtractionStage

Pipeline stage for extracting compressed data archives (zip files)

DataLoadingStage

Pipeline stage for loading NHS practice level crosstab data from CSV files

DataJoiningStage

Pipeline stage for joining monthly data and combining with mapping data

Notes

This module uses NHS_HERBOT for standardised data loading and column normalisation to ensure consistency with NHS data processing standards. All CSV files are loaded with normalised column names using snake_case convention.

The separation of extraction and loading stages allows for better error handling and enables the pipeline to skip extraction if files are already available.

Examples:

>>> config = NHSPracticeAnalysisConfig()
>>> extraction_stage = DataExtractionStage(config)
>>> loading_stage = DataLoadingStage(config)
>>> joining_stage = DataJoiningStage()

DataExtractionStage

Bases: PipelineStage

Pipeline stage for extracting compressed data files.

This stage extracts compressed archives (zip files) containing NHS practice level crosstab data and mapping files to the appropriate directories for subsequent processing stages.

Parameters:

Name Type Description Default
config NHSPracticeAnalysisConfig

Configuration object containing data directory paths and extraction parameters.

required

Methods:

Name Description
run

Execute the data extraction stage and store extracted file paths.

Notes

The extraction process: - Searches for compressed files matching the configured pattern - Extracts CSV files to appropriate directories (raw data vs lookup) - Mapping files are placed in lookup directory - Practice crosstab files are placed in raw directory - Skips extraction if files already exist

Examples:

>>> config = NHSPracticeAnalysisConfig()
>>> stage = DataExtractionStage(config)
>>> context = {}
>>> updated_context = stage.run(context)
Source code in practice_level_gp_appointments/data_processing.py
class DataExtractionStage(PipelineStage):
    """
    Pipeline stage for extracting compressed data files.

    This stage extracts compressed archives (zip files) containing NHS practice
    level crosstab data and mapping files to the appropriate directories for
    subsequent processing stages.

    Parameters
    ----------
    config : NHSPracticeAnalysisConfig
        Configuration object containing data directory paths and extraction
        parameters.

    Methods
    -------
    run(context)
        Execute the data extraction stage and store extracted file paths.

    Notes
    -----
    The extraction process:
    - Searches for compressed files matching the configured pattern
    - Extracts CSV files to appropriate directories (raw data vs lookup)
    - Mapping files are placed in lookup directory
    - Practice crosstab files are placed in raw directory
    - Skips extraction if files already exist

    Examples
    --------
    >>> config = NHSPracticeAnalysisConfig()
    >>> stage = DataExtractionStage(config)
    >>> context = {}
    >>> updated_context = stage.run(context)
    """

    def __init__(self, config: NHSPracticeAnalysisConfig):
        """
        Initialize the data extraction stage.

        Parameters
        ----------
        config : NHSPracticeAnalysisConfig
            Configuration object containing extraction parameters.
        """
        super().__init__(outputs="extracted_files", name="data_extraction")
        self.config = config

    def run(self, context):
        """
        Extract compressed data files to appropriate directories.

        Parameters
        ----------
        context : dict
            Pipeline execution context for storing stage outputs.

        Returns
        -------
        dict
            Updated pipeline context containing extracted file paths.
        """
        logger.info("Extracting compressed data files...")

        # Use specific input zip file from config
        zip_path = self.config.input_zip_file

        if not zip_path.exists():
            logger.error(f"Input zip file not found: {zip_path}")
            self._store_outputs(context, [])
            return context

        # Create date-specific directories
        raw_dir = self.config.raw_data_dir
        raw_dir.mkdir(parents=True, exist_ok=True)

        extracted_files = []
        logger.info(f"Processing compressed file: {zip_path}")

        with zipfile.ZipFile(zip_path, "r") as zip_ref:
            for member in zip_ref.namelist():
                if member.endswith(".csv"):
                    # Determine destination directory based on file type
                    if "mapping" in member.lower():
                        dest_dir = self.config.lookup_data_dir
                        dest_dir.mkdir(parents=True, exist_ok=True)
                        extracted_path = dest_dir / member
                    else:
                        extracted_path = raw_dir / member

                    if not extracted_path.exists():
                        logger.info(f"Extracting {member}")
                        if "mapping" in member.lower():
                            zip_ref.extract(member, dest_dir)
                        else:
                            zip_ref.extract(member, raw_dir)
                        extracted_files.append(extracted_path)
                    else:
                        logger.info(f"File exists: {extracted_path}")
                        extracted_files.append(extracted_path)

        logger.info(f"Extraction complete: {len(extracted_files)} files")
        self._store_outputs(context, extracted_files)
        return context

__init__(config)

Initialize the data extraction stage.

Parameters:

Name Type Description Default
config NHSPracticeAnalysisConfig

Configuration object containing extraction parameters.

required
Source code in practice_level_gp_appointments/data_processing.py
def __init__(self, config: NHSPracticeAnalysisConfig):
    """
    Initialize the data extraction stage.

    Parameters
    ----------
    config : NHSPracticeAnalysisConfig
        Configuration object containing extraction parameters.
    """
    super().__init__(outputs="extracted_files", name="data_extraction")
    self.config = config

run(context)

Extract compressed data files to appropriate directories.

Parameters:

Name Type Description Default
context dict

Pipeline execution context for storing stage outputs.

required

Returns:

Type Description
dict

Updated pipeline context containing extracted file paths.

Source code in practice_level_gp_appointments/data_processing.py
def run(self, context):
    """
    Extract compressed data files to appropriate directories.

    Parameters
    ----------
    context : dict
        Pipeline execution context for storing stage outputs.

    Returns
    -------
    dict
        Updated pipeline context containing extracted file paths.
    """
    logger.info("Extracting compressed data files...")

    # Use specific input zip file from config
    zip_path = self.config.input_zip_file

    if not zip_path.exists():
        logger.error(f"Input zip file not found: {zip_path}")
        self._store_outputs(context, [])
        return context

    # Create date-specific directories
    raw_dir = self.config.raw_data_dir
    raw_dir.mkdir(parents=True, exist_ok=True)

    extracted_files = []
    logger.info(f"Processing compressed file: {zip_path}")

    with zipfile.ZipFile(zip_path, "r") as zip_ref:
        for member in zip_ref.namelist():
            if member.endswith(".csv"):
                # Determine destination directory based on file type
                if "mapping" in member.lower():
                    dest_dir = self.config.lookup_data_dir
                    dest_dir.mkdir(parents=True, exist_ok=True)
                    extracted_path = dest_dir / member
                else:
                    extracted_path = raw_dir / member

                if not extracted_path.exists():
                    logger.info(f"Extracting {member}")
                    if "mapping" in member.lower():
                        zip_ref.extract(member, dest_dir)
                    else:
                        zip_ref.extract(member, raw_dir)
                    extracted_files.append(extracted_path)
                else:
                    logger.info(f"File exists: {extracted_path}")
                    extracted_files.append(extracted_path)

    logger.info(f"Extraction complete: {len(extracted_files)} files")
    self._store_outputs(context, extracted_files)
    return context

DataLoadingStage

Bases: PipelineStage

Pipeline stage for loading extracted CSV files.

This stage loads monthly crosstab CSV files and mapping data that have been extracted by the DataExtractionStage, using NHS_HERBOT for standardised processing and column normalisation.

Parameters:

Name Type Description Default
config NHSPracticeAnalysisConfig

Configuration object containing data directory paths and processing parameters including sample size limits.

required

Methods:

Name Description
run

Execute the data loading stage and store results in pipeline context.

Notes

The stage loads: - Monthly practice level crosstab files from raw data directory - Practice mapping/lookup data for geographical information - All data is processed through NHS_HERBOT for column normalisation

Examples:

>>> config = NHSPracticeAnalysisConfig()
>>> stage = DataLoadingStage(config)
>>> context = {"extracted_files": file_paths}
>>> updated_context = stage.run(context)
Source code in practice_level_gp_appointments/data_processing.py
class DataLoadingStage(PipelineStage):
    """
    Pipeline stage for loading extracted CSV files.

    This stage loads monthly crosstab CSV files and mapping data that have
    been extracted by the DataExtractionStage, using NHS_HERBOT for
    standardised processing and column normalisation.

    Parameters
    ----------
    config : NHSPracticeAnalysisConfig
        Configuration object containing data directory paths and processing
        parameters including sample size limits.

    Methods
    -------
    run(context)
        Execute the data loading stage and store results in pipeline context.

    Notes
    -----
    The stage loads:
    - Monthly practice level crosstab files from raw data directory
    - Practice mapping/lookup data for geographical information
    - All data is processed through NHS_HERBOT for column normalisation

    Examples
    --------
    >>> config = NHSPracticeAnalysisConfig()
    >>> stage = DataLoadingStage(config)
    >>> context = {"extracted_files": file_paths}
    >>> updated_context = stage.run(context)
    """

    def __init__(self, config: NHSPracticeAnalysisConfig):
        """
        Initialize the data loading stage.

        Parameters
        ----------
        config : NHSPracticeAnalysisConfig
            Configuration object containing data paths and parameters.
        """
        super().__init__(
            inputs="extracted_files", outputs="raw_data", name="data_loading"
        )
        self.config = config

    def _discover_csv_files(self):
        """
        Dynamically discover CSV files in the raw data directory.

        Returns
        -------
        dict
            Dictionary mapping dataset names to file paths.
        """
        raw_dir = self.config.raw_data_dir
        csv_files = {}

        pattern = str(raw_dir / self.config.csv_file_pattern)
        matching_files = glob.glob(pattern)

        for file_path in matching_files:
            path_obj = Path(file_path)
            dataset_name = path_obj.stem
            csv_files[dataset_name] = path_obj
            logger.info(f"Discovered dataset: {dataset_name} -> {path_obj}")

        return csv_files

    def run(self, context):
        """
        Load NHS practice level crosstab data from extracted CSV files.

        Parameters
        ----------
        context : dict
            Pipeline execution context containing extracted file paths.

        Returns
        -------
        dict
            Updated pipeline context containing loaded datasets.
        """
        logger.info("Loading NHS practice level crosstab data...")

        # Discover CSV files to load
        csv_files = self._discover_csv_files()

        if not csv_files:
            logger.warning("No CSV files found matching pattern")
            logger.info(f"Searched in: {self.config.raw_data_dir}")
            logger.info(f"Pattern: {self.config.csv_file_pattern}")

        mapping_file = self.config.lookup_data_dir / "Mapping.csv"
        loaded_data = {}

        # Load discovered CSV files
        for month, file_path in csv_files.items():
            if file_path.exists():
                logger.info(f"Loading {month} data from {file_path}")
                try:
                    raw_crosstab_df = nhs_herbot.load_csv_data(
                        dataset_name=month,
                        filepath_or_buffer=file_path,
                    )
                    norm_crosstab_df = nhs_herbot.normalise_column_names(
                        raw_crosstab_df
                    )
                    loaded_data[month] = norm_crosstab_df
                    logger.info(
                        f"Loaded {len(norm_crosstab_df)} rows for {month}"
                    )
                except Exception as e:
                    logger.error(f"Failed to load {month} data: {e}")
                    continue
            else:
                logger.warning(f"File not found: {file_path}")

        # Load mapping data
        if mapping_file.exists():
            logger.info(f"Loading mapping data from {mapping_file}")
            try:
                raw_mapping_df = nhs_herbot.load_csv_data(
                    dataset_name="Mapping",
                    filepath_or_buffer=mapping_file,
                )
                norm_mapping_df = nhs_herbot.normalise_column_names(
                    raw_mapping_df
                )
                loaded_data["mapping"] = norm_mapping_df
                logger.info(f"Loaded {len(norm_mapping_df)} mapping records")
            except Exception as e:
                logger.error(f"Failed to load mapping data: {e}")
        else:
            logger.warning(f"Mapping file not found: {mapping_file}")

        logger.info(f"Data loading complete: {list(loaded_data.keys())}")
        self._store_outputs(context, loaded_data)
        return context

__init__(config)

Initialize the data loading stage.

Parameters:

Name Type Description Default
config NHSPracticeAnalysisConfig

Configuration object containing data paths and parameters.

required
Source code in practice_level_gp_appointments/data_processing.py
def __init__(self, config: NHSPracticeAnalysisConfig):
    """
    Initialize the data loading stage.

    Parameters
    ----------
    config : NHSPracticeAnalysisConfig
        Configuration object containing data paths and parameters.
    """
    super().__init__(
        inputs="extracted_files", outputs="raw_data", name="data_loading"
    )
    self.config = config

run(context)

Load NHS practice level crosstab data from extracted CSV files.

Parameters:

Name Type Description Default
context dict

Pipeline execution context containing extracted file paths.

required

Returns:

Type Description
dict

Updated pipeline context containing loaded datasets.

Source code in practice_level_gp_appointments/data_processing.py
def run(self, context):
    """
    Load NHS practice level crosstab data from extracted CSV files.

    Parameters
    ----------
    context : dict
        Pipeline execution context containing extracted file paths.

    Returns
    -------
    dict
        Updated pipeline context containing loaded datasets.
    """
    logger.info("Loading NHS practice level crosstab data...")

    # Discover CSV files to load
    csv_files = self._discover_csv_files()

    if not csv_files:
        logger.warning("No CSV files found matching pattern")
        logger.info(f"Searched in: {self.config.raw_data_dir}")
        logger.info(f"Pattern: {self.config.csv_file_pattern}")

    mapping_file = self.config.lookup_data_dir / "Mapping.csv"
    loaded_data = {}

    # Load discovered CSV files
    for month, file_path in csv_files.items():
        if file_path.exists():
            logger.info(f"Loading {month} data from {file_path}")
            try:
                raw_crosstab_df = nhs_herbot.load_csv_data(
                    dataset_name=month,
                    filepath_or_buffer=file_path,
                )
                norm_crosstab_df = nhs_herbot.normalise_column_names(
                    raw_crosstab_df
                )
                loaded_data[month] = norm_crosstab_df
                logger.info(
                    f"Loaded {len(norm_crosstab_df)} rows for {month}"
                )
            except Exception as e:
                logger.error(f"Failed to load {month} data: {e}")
                continue
        else:
            logger.warning(f"File not found: {file_path}")

    # Load mapping data
    if mapping_file.exists():
        logger.info(f"Loading mapping data from {mapping_file}")
        try:
            raw_mapping_df = nhs_herbot.load_csv_data(
                dataset_name="Mapping",
                filepath_or_buffer=mapping_file,
            )
            norm_mapping_df = nhs_herbot.normalise_column_names(
                raw_mapping_df
            )
            loaded_data["mapping"] = norm_mapping_df
            logger.info(f"Loaded {len(norm_mapping_df)} mapping records")
        except Exception as e:
            logger.error(f"Failed to load mapping data: {e}")
    else:
        logger.warning(f"Mapping file not found: {mapping_file}")

    logger.info(f"Data loading complete: {list(loaded_data.keys())}")
    self._store_outputs(context, loaded_data)
    return context

DataJoiningStage

Bases: PipelineStage

Pipeline stage for joining monthly data and combining with mapping data.

This stage combines monthly crosstab datasets into a unified dataframe and merges with geographical mapping information to enable regional analysis and reporting.

Methods:

Name Description
run

Execute the data joining stage and store results in pipeline context.

Notes

The joining process includes: - Concatenation of monthly crosstab data with data_month identifier - Left join with mapping data using gp_code as the key - Addition of geographical information (ICB, region details) - Validation of join results and data quality checks

The resulting dataset contains all original crosstab fields plus: - data_month: Identifier for the source month - icb_code, icb_name: Integrated Care Board information - region_code, region_name: NHS regional information

Examples:

>>> stage = DataJoiningStage()
>>> context = {"raw_data": loaded_datasets}
>>> updated_context = stage.run(context)
Source code in practice_level_gp_appointments/data_processing.py
class DataJoiningStage(PipelineStage):
    """
    Pipeline stage for joining monthly data and combining with mapping data.

    This stage combines monthly crosstab datasets into a unified dataframe
    and merges with geographical mapping information to enable regional
    analysis and reporting.

    Methods
    -------
    run(context)
        Execute the data joining stage and store results in pipeline context.

    Notes
    -----
    The joining process includes:
    - Concatenation of monthly crosstab data with data_month identifier
    - Left join with mapping data using gp_code as the key
    - Addition of geographical information (ICB, region details)
    - Validation of join results and data quality checks

    The resulting dataset contains all original crosstab fields plus:
    - data_month: Identifier for the source month
    - icb_code, icb_name: Integrated Care Board information
    - region_code, region_name: NHS regional information

    Examples
    --------
    >>> stage = DataJoiningStage()
    >>> context = {"raw_data": loaded_datasets}
    >>> updated_context = stage.run(context)
    """

    def __init__(self):
        """
        Initialize the data joining stage.

        The stage is configured to consume raw_data from the loading stage
        and produce combined_data for downstream analysis stages.
        """
        super().__init__(
            inputs="raw_data", outputs="combined_data", name="data_joining"
        )

    def run(self, context):
        """
        Join monthly data and combine with mapping data.

        This method performs the core data joining operations to create
        a unified dataset suitable for comprehensive analysis.

        Parameters
        ----------
        context : dict
            Pipeline execution context containing raw_data from loading stage.

        Returns
        -------
        dict
            Updated pipeline context containing the joined dataset.

        Raises
        ------
        ValueError
            If no monthly data is found in the input datasets.

        Notes
        -----
        Processing steps:
        1. Extract monthly datasets and add data_month identifier
        2. Concatenate all monthly data into single dataframe
        3. Merge with mapping data on gp_code field
        4. Validate join results and log summary statistics
        """
        raw_data = self._get_input_values(context)[0]
        logger.info("Joining monthly NHS practice data...")

        monthly_dfs = []
        for month, df in raw_data.items():
            if month != "mapping":
                df_copy = df.copy()
                df_copy["data_month"] = month
                monthly_dfs.append(df_copy)

        if not monthly_dfs:
            raise ValueError("No monthly data found to join")

        combined_df = pd.concat(monthly_dfs, ignore_index=True)
        logger.info(f"Combined data shape: {combined_df.shape}")

        if "mapping" in raw_data:
            mapping_cols = [
                "gp_code",
                "icb_code",
                "icb_name",
                "region_code",
                "region_name",
            ]
            joined_df = combined_df.merge(
                raw_data["mapping"][mapping_cols],
                on="gp_code",
                how="left",
                suffixes=("", "_mapping"),
            )
            logger.info(f"Joined data shape: {joined_df.shape}")
        else:
            joined_df = combined_df
            logger.warning("No mapping data available for joining")

        logger.info("Data joining complete")
        self._store_outputs(context, joined_df)
        return context

__init__()

Initialize the data joining stage.

The stage is configured to consume raw_data from the loading stage and produce combined_data for downstream analysis stages.

Source code in practice_level_gp_appointments/data_processing.py
def __init__(self):
    """
    Initialize the data joining stage.

    The stage is configured to consume raw_data from the loading stage
    and produce combined_data for downstream analysis stages.
    """
    super().__init__(
        inputs="raw_data", outputs="combined_data", name="data_joining"
    )

run(context)

Join monthly data and combine with mapping data.

This method performs the core data joining operations to create a unified dataset suitable for comprehensive analysis.

Parameters:

Name Type Description Default
context dict

Pipeline execution context containing raw_data from loading stage.

required

Returns:

Type Description
dict

Updated pipeline context containing the joined dataset.

Raises:

Type Description
ValueError

If no monthly data is found in the input datasets.

Notes

Processing steps: 1. Extract monthly datasets and add data_month identifier 2. Concatenate all monthly data into single dataframe 3. Merge with mapping data on gp_code field 4. Validate join results and log summary statistics

Source code in practice_level_gp_appointments/data_processing.py
def run(self, context):
    """
    Join monthly data and combine with mapping data.

    This method performs the core data joining operations to create
    a unified dataset suitable for comprehensive analysis.

    Parameters
    ----------
    context : dict
        Pipeline execution context containing raw_data from loading stage.

    Returns
    -------
    dict
        Updated pipeline context containing the joined dataset.

    Raises
    ------
    ValueError
        If no monthly data is found in the input datasets.

    Notes
    -----
    Processing steps:
    1. Extract monthly datasets and add data_month identifier
    2. Concatenate all monthly data into single dataframe
    3. Merge with mapping data on gp_code field
    4. Validate join results and log summary statistics
    """
    raw_data = self._get_input_values(context)[0]
    logger.info("Joining monthly NHS practice data...")

    monthly_dfs = []
    for month, df in raw_data.items():
        if month != "mapping":
            df_copy = df.copy()
            df_copy["data_month"] = month
            monthly_dfs.append(df_copy)

    if not monthly_dfs:
        raise ValueError("No monthly data found to join")

    combined_df = pd.concat(monthly_dfs, ignore_index=True)
    logger.info(f"Combined data shape: {combined_df.shape}")

    if "mapping" in raw_data:
        mapping_cols = [
            "gp_code",
            "icb_code",
            "icb_name",
            "region_code",
            "region_name",
        ]
        joined_df = combined_df.merge(
            raw_data["mapping"][mapping_cols],
            on="gp_code",
            how="left",
            suffixes=("", "_mapping"),
        )
        logger.info(f"Joined data shape: {joined_df.shape}")
    else:
        joined_df = combined_df
        logger.warning("No mapping data available for joining")

    logger.info("Data joining complete")
    self._store_outputs(context, joined_df)
    return context