Skip to content

API Reference

NHS Practice Pipeline Package

A comprehensive NHS practice level crosstabs data processing pipeline built with the oops-its-a-pipeline framework.

SummarisationStage

Bases: PipelineStage

Pipeline stage for creating descriptive statistics and summary tables.

This stage processes the combined appointment data to generate comprehensive statistical summaries and NHS performance metrics suitable for analysis and reporting purposes.

Parameters:

Name Type Description Default
config NHSPracticeAnalysisConfig

Configuration object containing analysis parameters and specifications.

required

Attributes:

Name Type Description
config NHSPracticeAnalysisConfig

The configuration object passed during initialisation.

Methods:

Name Description
run

Execute the summarisation stage and generate statistical summaries.

Notes

Generated summaries include: - Monthly appointment trends by status (attended, DNA, cancelled) - Healthcare professional type distribution and workload analysis - Appointment mode analysis (face-to-face, telephone, online) - Regional performance comparisons and geographical analysis - Booking time analysis and access pattern evaluation - Key NHS performance indicators and completion rates

All metrics follow NHS performance monitoring standards and include appropriate statistical measures for different analytical purposes.

Examples:

>>> config = NHSPracticeAnalysisConfig()
>>> stage = SummarisationStage(config)
>>> context = {"combined_data": dataframe}
>>> results = stage.run(context)
Source code in practice_level_gp_appointments/analytics.py
class SummarisationStage(PipelineStage):
    """
    Pipeline stage for creating descriptive statistics and summary tables.

    This stage processes the combined appointment data to generate
    comprehensive statistical summaries and NHS performance metrics suitable
    for analysis and reporting purposes.

    Parameters
    ----------
    config : NHSPracticeAnalysisConfig
        Configuration object containing analysis parameters and specifications.

    Attributes
    ----------
    config : NHSPracticeAnalysisConfig
        The configuration object passed during initialisation.

    Methods
    -------
    run(context)
        Execute the summarisation stage and generate statistical summaries.

    Notes
    -----
    Generated summaries include:
    - Monthly appointment trends by status (attended, DNA, cancelled)
    - Healthcare professional type distribution and workload analysis
    - Appointment mode analysis (face-to-face, telephone, online)
    - Regional performance comparisons and geographical analysis
    - Booking time analysis and access pattern evaluation
    - Key NHS performance indicators and completion rates

    All metrics follow NHS performance monitoring standards and include
    appropriate statistical measures for different analytical purposes.

    Examples
    --------
    >>> config = NHSPracticeAnalysisConfig()
    >>> stage = SummarisationStage(config)
    >>> context = {"combined_data": dataframe}
    >>> results = stage.run(context)
    """

    def __init__(self, config: NHSPracticeAnalysisConfig):
        """
        Initialize the summarisation stage.

        Parameters
        ----------
        config : NHSPracticeAnalysisConfig
            Configuration object containing analysis parameters.
        """
        super().__init__(
            inputs="combined_data",
            outputs="summary_statistics",
            name="summarisation",
        )
        self.config = config

    def run(self, context):
        """
        Create descriptive statistics and summary tables.

        This method processes the combined appointment data to generate
        multiple summary tables and key performance indicators for NHS
        practice level analysis.

        Parameters
        ----------
        context : dict
            Pipeline execution context containing combined appointment data.

        Returns
        -------
        dict
            Updated pipeline context containing summary statistics and metrics.

        Notes
        -----
        The method generates seven main summary categories:
        1. Monthly trends by appointment status
        2. Healthcare professional type analysis
        3. Appointment mode temporal analysis
        4. Regional performance summaries
        5. Booking time access analysis
        6. Overall descriptive statistics
        7. Key NHS performance metrics (DNA rates, completion rates)
        """
        combined_data = self._get_input_values(context)[0]
        logger.info("Creating summary statistics...")

        df = combined_data
        summaries = {}

        monthly_summary = (
            df.groupby(["data_month", "appt_status"])["count_of_appointments"]
            .sum()
            .reset_index()
        )
        summaries["monthly_by_status"] = monthly_summary

        hcp_summary = (
            df.groupby("hcp_type")["count_of_appointments"]
            .agg(["sum", "mean", "count"])
            .reset_index()
        )
        hcp_summary.columns = [
            "hcp_type",
            "total_appointments",
            "mean_appointments",
            "number_of_records",
        ]
        summaries["hcp_type_summary"] = hcp_summary

        mode_summary = (
            df.groupby(["appt_mode", "data_month"])["count_of_appointments"]
            .sum()
            .reset_index()
        )
        summaries["mode_by_month"] = mode_summary

        if "region_name" in df.columns:
            regional_summary = (
                df.groupby("region_name")["count_of_appointments"]
                .agg(["sum", "mean"])
                .reset_index()
            )
            regional_summary.columns = [
                "region_name",
                "total_appointments",
                "mean_appointments",
            ]
            summaries["regional_summary"] = regional_summary

        booking_time_summary = (
            df.groupby("time_between_book_and_appt")["count_of_appointments"]
            .sum()
            .reset_index()
        )
        summaries["booking_time_summary"] = booking_time_summary

        numeric_cols = ["count_of_appointments"]
        desc_stats = df[numeric_cols].describe()
        summaries["descriptive_stats"] = desc_stats

        total_appointments = df["count_of_appointments"].sum()

        if "appt_status" in df.columns:
            dna_appointments = df[df["appt_status"] == "DNA"][
                "count_of_appointments"
            ].sum()
            attended_appointments = df[df["appt_status"] == "Attended"][
                "count_of_appointments"
            ].sum()
            dna_rate = (
                dna_appointments / total_appointments
                if total_appointments > 0
                else 0
            )
            completion_rate = (
                attended_appointments / total_appointments
                if total_appointments > 0
                else 0
            )

            metrics_summary = pd.DataFrame(
                {
                    "metric": [
                        "total_appointments",
                        "dna_rate",
                        "completion_rate",
                        "dna_count",
                        "attended_count",
                    ],
                    "value": [
                        total_appointments,
                        dna_rate,
                        completion_rate,
                        dna_appointments,
                        attended_appointments,
                    ],
                }
            )
            summaries["key_metrics"] = metrics_summary

        logger.info(f"Created {len(summaries)} summary tables")
        self._store_outputs(context, summaries)
        return context

__init__(config)

Initialize the summarisation stage.

Parameters:

Name Type Description Default
config NHSPracticeAnalysisConfig

Configuration object containing analysis parameters.

required
Source code in practice_level_gp_appointments/analytics.py
def __init__(self, config: NHSPracticeAnalysisConfig):
    """
    Initialize the summarisation stage.

    Parameters
    ----------
    config : NHSPracticeAnalysisConfig
        Configuration object containing analysis parameters.
    """
    super().__init__(
        inputs="combined_data",
        outputs="summary_statistics",
        name="summarisation",
    )
    self.config = config

run(context)

Create descriptive statistics and summary tables.

This method processes the combined appointment data to generate multiple summary tables and key performance indicators for NHS practice level analysis.

Parameters:

Name Type Description Default
context dict

Pipeline execution context containing combined appointment data.

required

Returns:

Type Description
dict

Updated pipeline context containing summary statistics and metrics.

Notes

The method generates seven main summary categories: 1. Monthly trends by appointment status 2. Healthcare professional type analysis 3. Appointment mode temporal analysis 4. Regional performance summaries 5. Booking time access analysis 6. Overall descriptive statistics 7. Key NHS performance metrics (DNA rates, completion rates)

Source code in practice_level_gp_appointments/analytics.py
def run(self, context):
    """
    Create descriptive statistics and summary tables.

    This method processes the combined appointment data to generate
    multiple summary tables and key performance indicators for NHS
    practice level analysis.

    Parameters
    ----------
    context : dict
        Pipeline execution context containing combined appointment data.

    Returns
    -------
    dict
        Updated pipeline context containing summary statistics and metrics.

    Notes
    -----
    The method generates seven main summary categories:
    1. Monthly trends by appointment status
    2. Healthcare professional type analysis
    3. Appointment mode temporal analysis
    4. Regional performance summaries
    5. Booking time access analysis
    6. Overall descriptive statistics
    7. Key NHS performance metrics (DNA rates, completion rates)
    """
    combined_data = self._get_input_values(context)[0]
    logger.info("Creating summary statistics...")

    df = combined_data
    summaries = {}

    monthly_summary = (
        df.groupby(["data_month", "appt_status"])["count_of_appointments"]
        .sum()
        .reset_index()
    )
    summaries["monthly_by_status"] = monthly_summary

    hcp_summary = (
        df.groupby("hcp_type")["count_of_appointments"]
        .agg(["sum", "mean", "count"])
        .reset_index()
    )
    hcp_summary.columns = [
        "hcp_type",
        "total_appointments",
        "mean_appointments",
        "number_of_records",
    ]
    summaries["hcp_type_summary"] = hcp_summary

    mode_summary = (
        df.groupby(["appt_mode", "data_month"])["count_of_appointments"]
        .sum()
        .reset_index()
    )
    summaries["mode_by_month"] = mode_summary

    if "region_name" in df.columns:
        regional_summary = (
            df.groupby("region_name")["count_of_appointments"]
            .agg(["sum", "mean"])
            .reset_index()
        )
        regional_summary.columns = [
            "region_name",
            "total_appointments",
            "mean_appointments",
        ]
        summaries["regional_summary"] = regional_summary

    booking_time_summary = (
        df.groupby("time_between_book_and_appt")["count_of_appointments"]
        .sum()
        .reset_index()
    )
    summaries["booking_time_summary"] = booking_time_summary

    numeric_cols = ["count_of_appointments"]
    desc_stats = df[numeric_cols].describe()
    summaries["descriptive_stats"] = desc_stats

    total_appointments = df["count_of_appointments"].sum()

    if "appt_status" in df.columns:
        dna_appointments = df[df["appt_status"] == "DNA"][
            "count_of_appointments"
        ].sum()
        attended_appointments = df[df["appt_status"] == "Attended"][
            "count_of_appointments"
        ].sum()
        dna_rate = (
            dna_appointments / total_appointments
            if total_appointments > 0
            else 0
        )
        completion_rate = (
            attended_appointments / total_appointments
            if total_appointments > 0
            else 0
        )

        metrics_summary = pd.DataFrame(
            {
                "metric": [
                    "total_appointments",
                    "dna_rate",
                    "completion_rate",
                    "dna_count",
                    "attended_count",
                ],
                "value": [
                    total_appointments,
                    dna_rate,
                    completion_rate,
                    dna_appointments,
                    attended_appointments,
                ],
            }
        )
        summaries["key_metrics"] = metrics_summary

    logger.info(f"Created {len(summaries)} summary tables")
    self._store_outputs(context, summaries)
    return context

NHSPracticeAnalysisConfig

Bases: PipelineConfig

Simple configuration for NHS Practice Level Crosstabs pipeline.

Source code in practice_level_gp_appointments/config.py
class NHSPracticeAnalysisConfig(PipelineConfig):
    """Simple configuration for NHS Practice Level Crosstabs pipeline."""

    # Core directories
    data_dir: Path
    compressed_data_dir: Path
    lookup_data_dir: Path

    # Date-specific directories
    date_id: str
    raw_data_dir: Path
    processed_data_dir: Path
    output_dir: Path
    figures_dir: Path

    # Input file
    input_zip_file: Path

    # File patterns and processing options
    csv_file_pattern: str = "*.csv"
    sample_size: Optional[int] = None
    lookup_file: str = "Mapping.csv"

    # Output settings
    figure_format: str = "png"
    figure_dpi: int = 300
    figure_bbox_inches: str = "tight"

    @classmethod
    def create(
        cls, zip_file_stem: str = "jul_25"
    ) -> "NHSPracticeAnalysisConfig":
        """
        Create configuration with date-specific paths.

        Parameters
        ----------
        zip_file_stem : str, default="jul_25"
            Date identifier for input data (e.g., "jul_25", "jun_25").

        Returns
        -------
        NHSPracticeAnalysisConfig
            Configured instance with date-specific paths.

        Raises
        ------
        FileNotFoundError
            If the specified zip file does not exist.
        """
        # Core directories
        data_dir = Path("data")
        compressed_data_dir = data_dir / "compressed"
        lookup_data_dir = data_dir / "lookup"

        # Date-specific directories
        raw_data_dir = data_dir / "raw" / zip_file_stem
        processed_data_dir = data_dir / "processed"
        output_dir = processed_data_dir / zip_file_stem
        figures_dir = Path("figures") / zip_file_stem

        # Input file
        input_zip_file = compressed_data_dir / f"{zip_file_stem}.zip"

        # Validate zip file exists
        if not input_zip_file.exists():
            msg = f"Zip file not found: {input_zip_file}"
            raise FileNotFoundError(msg)

        return cls(
            data_dir=data_dir,
            compressed_data_dir=compressed_data_dir,
            lookup_data_dir=lookup_data_dir,
            date_id=zip_file_stem,
            raw_data_dir=raw_data_dir,
            processed_data_dir=processed_data_dir,
            output_dir=output_dir,
            figures_dir=figures_dir,
            input_zip_file=input_zip_file,
        )

create(zip_file_stem='jul_25') classmethod

Create configuration with date-specific paths.

Parameters:

Name Type Description Default
zip_file_stem str

Date identifier for input data (e.g., "jul_25", "jun_25").

"jul_25"

Returns:

Type Description
NHSPracticeAnalysisConfig

Configured instance with date-specific paths.

Raises:

Type Description
FileNotFoundError

If the specified zip file does not exist.

Source code in practice_level_gp_appointments/config.py
@classmethod
def create(
    cls, zip_file_stem: str = "jul_25"
) -> "NHSPracticeAnalysisConfig":
    """
    Create configuration with date-specific paths.

    Parameters
    ----------
    zip_file_stem : str, default="jul_25"
        Date identifier for input data (e.g., "jul_25", "jun_25").

    Returns
    -------
    NHSPracticeAnalysisConfig
        Configured instance with date-specific paths.

    Raises
    ------
    FileNotFoundError
        If the specified zip file does not exist.
    """
    # Core directories
    data_dir = Path("data")
    compressed_data_dir = data_dir / "compressed"
    lookup_data_dir = data_dir / "lookup"

    # Date-specific directories
    raw_data_dir = data_dir / "raw" / zip_file_stem
    processed_data_dir = data_dir / "processed"
    output_dir = processed_data_dir / zip_file_stem
    figures_dir = Path("figures") / zip_file_stem

    # Input file
    input_zip_file = compressed_data_dir / f"{zip_file_stem}.zip"

    # Validate zip file exists
    if not input_zip_file.exists():
        msg = f"Zip file not found: {input_zip_file}"
        raise FileNotFoundError(msg)

    return cls(
        data_dir=data_dir,
        compressed_data_dir=compressed_data_dir,
        lookup_data_dir=lookup_data_dir,
        date_id=zip_file_stem,
        raw_data_dir=raw_data_dir,
        processed_data_dir=processed_data_dir,
        output_dir=output_dir,
        figures_dir=figures_dir,
        input_zip_file=input_zip_file,
    )

DataJoiningStage

Bases: PipelineStage

Pipeline stage for joining monthly data and combining with mapping data.

This stage combines monthly crosstab datasets into a unified dataframe and merges with geographical mapping information to enable regional analysis and reporting.

Methods:

Name Description
run

Execute the data joining stage and store results in pipeline context.

Notes

The joining process includes: - Concatenation of monthly crosstab data with data_month identifier - Left join with mapping data using gp_code as the key - Addition of geographical information (ICB, region details) - Validation of join results and data quality checks

The resulting dataset contains all original crosstab fields plus: - data_month: Identifier for the source month - icb_code, icb_name: Integrated Care Board information - region_code, region_name: NHS regional information

Examples:

>>> stage = DataJoiningStage()
>>> context = {"raw_data": loaded_datasets}
>>> updated_context = stage.run(context)
Source code in practice_level_gp_appointments/data_processing.py
class DataJoiningStage(PipelineStage):
    """
    Pipeline stage for joining monthly data and combining with mapping data.

    This stage combines monthly crosstab datasets into a unified dataframe
    and merges with geographical mapping information to enable regional
    analysis and reporting.

    Methods
    -------
    run(context)
        Execute the data joining stage and store results in pipeline context.

    Notes
    -----
    The joining process includes:
    - Concatenation of monthly crosstab data with data_month identifier
    - Left join with mapping data using gp_code as the key
    - Addition of geographical information (ICB, region details)
    - Validation of join results and data quality checks

    The resulting dataset contains all original crosstab fields plus:
    - data_month: Identifier for the source month
    - icb_code, icb_name: Integrated Care Board information
    - region_code, region_name: NHS regional information

    Examples
    --------
    >>> stage = DataJoiningStage()
    >>> context = {"raw_data": loaded_datasets}
    >>> updated_context = stage.run(context)
    """

    def __init__(self):
        """
        Initialize the data joining stage.

        The stage is configured to consume raw_data from the loading stage
        and produce combined_data for downstream analysis stages.
        """
        super().__init__(
            inputs="raw_data", outputs="combined_data", name="data_joining"
        )

    def run(self, context):
        """
        Join monthly data and combine with mapping data.

        This method performs the core data joining operations to create
        a unified dataset suitable for comprehensive analysis.

        Parameters
        ----------
        context : dict
            Pipeline execution context containing raw_data from loading stage.

        Returns
        -------
        dict
            Updated pipeline context containing the joined dataset.

        Raises
        ------
        ValueError
            If no monthly data is found in the input datasets.

        Notes
        -----
        Processing steps:
        1. Extract monthly datasets and add data_month identifier
        2. Concatenate all monthly data into single dataframe
        3. Merge with mapping data on gp_code field
        4. Validate join results and log summary statistics
        """
        raw_data = self._get_input_values(context)[0]
        logger.info("Joining monthly NHS practice data...")

        monthly_dfs = []
        for month, df in raw_data.items():
            if month != "mapping":
                df_copy = df.copy()
                df_copy["data_month"] = month
                monthly_dfs.append(df_copy)

        if not monthly_dfs:
            raise ValueError("No monthly data found to join")

        combined_df = pd.concat(monthly_dfs, ignore_index=True)
        logger.info(f"Combined data shape: {combined_df.shape}")

        if "mapping" in raw_data:
            mapping_cols = [
                "gp_code",
                "icb_code",
                "icb_name",
                "region_code",
                "region_name",
            ]
            joined_df = combined_df.merge(
                raw_data["mapping"][mapping_cols],
                on="gp_code",
                how="left",
                suffixes=("", "_mapping"),
            )
            logger.info(f"Joined data shape: {joined_df.shape}")
        else:
            joined_df = combined_df
            logger.warning("No mapping data available for joining")

        logger.info("Data joining complete")
        self._store_outputs(context, joined_df)
        return context

__init__()

Initialize the data joining stage.

The stage is configured to consume raw_data from the loading stage and produce combined_data for downstream analysis stages.

Source code in practice_level_gp_appointments/data_processing.py
def __init__(self):
    """
    Initialize the data joining stage.

    The stage is configured to consume raw_data from the loading stage
    and produce combined_data for downstream analysis stages.
    """
    super().__init__(
        inputs="raw_data", outputs="combined_data", name="data_joining"
    )

run(context)

Join monthly data and combine with mapping data.

This method performs the core data joining operations to create a unified dataset suitable for comprehensive analysis.

Parameters:

Name Type Description Default
context dict

Pipeline execution context containing raw_data from loading stage.

required

Returns:

Type Description
dict

Updated pipeline context containing the joined dataset.

Raises:

Type Description
ValueError

If no monthly data is found in the input datasets.

Notes

Processing steps: 1. Extract monthly datasets and add data_month identifier 2. Concatenate all monthly data into single dataframe 3. Merge with mapping data on gp_code field 4. Validate join results and log summary statistics

Source code in practice_level_gp_appointments/data_processing.py
def run(self, context):
    """
    Join monthly data and combine with mapping data.

    This method performs the core data joining operations to create
    a unified dataset suitable for comprehensive analysis.

    Parameters
    ----------
    context : dict
        Pipeline execution context containing raw_data from loading stage.

    Returns
    -------
    dict
        Updated pipeline context containing the joined dataset.

    Raises
    ------
    ValueError
        If no monthly data is found in the input datasets.

    Notes
    -----
    Processing steps:
    1. Extract monthly datasets and add data_month identifier
    2. Concatenate all monthly data into single dataframe
    3. Merge with mapping data on gp_code field
    4. Validate join results and log summary statistics
    """
    raw_data = self._get_input_values(context)[0]
    logger.info("Joining monthly NHS practice data...")

    monthly_dfs = []
    for month, df in raw_data.items():
        if month != "mapping":
            df_copy = df.copy()
            df_copy["data_month"] = month
            monthly_dfs.append(df_copy)

    if not monthly_dfs:
        raise ValueError("No monthly data found to join")

    combined_df = pd.concat(monthly_dfs, ignore_index=True)
    logger.info(f"Combined data shape: {combined_df.shape}")

    if "mapping" in raw_data:
        mapping_cols = [
            "gp_code",
            "icb_code",
            "icb_name",
            "region_code",
            "region_name",
        ]
        joined_df = combined_df.merge(
            raw_data["mapping"][mapping_cols],
            on="gp_code",
            how="left",
            suffixes=("", "_mapping"),
        )
        logger.info(f"Joined data shape: {joined_df.shape}")
    else:
        joined_df = combined_df
        logger.warning("No mapping data available for joining")

    logger.info("Data joining complete")
    self._store_outputs(context, joined_df)
    return context

DataLoadingStage

Bases: PipelineStage

Pipeline stage for loading extracted CSV files.

This stage loads monthly crosstab CSV files and mapping data that have been extracted by the DataExtractionStage, using NHS_HERBOT for standardised processing and column normalisation.

Parameters:

Name Type Description Default
config NHSPracticeAnalysisConfig

Configuration object containing data directory paths and processing parameters including sample size limits.

required

Methods:

Name Description
run

Execute the data loading stage and store results in pipeline context.

Notes

The stage loads: - Monthly practice level crosstab files from raw data directory - Practice mapping/lookup data for geographical information - All data is processed through NHS_HERBOT for column normalisation

Examples:

>>> config = NHSPracticeAnalysisConfig()
>>> stage = DataLoadingStage(config)
>>> context = {"extracted_files": file_paths}
>>> updated_context = stage.run(context)
Source code in practice_level_gp_appointments/data_processing.py
class DataLoadingStage(PipelineStage):
    """
    Pipeline stage for loading extracted CSV files.

    This stage loads monthly crosstab CSV files and mapping data that have
    been extracted by the DataExtractionStage, using NHS_HERBOT for
    standardised processing and column normalisation.

    Parameters
    ----------
    config : NHSPracticeAnalysisConfig
        Configuration object containing data directory paths and processing
        parameters including sample size limits.

    Methods
    -------
    run(context)
        Execute the data loading stage and store results in pipeline context.

    Notes
    -----
    The stage loads:
    - Monthly practice level crosstab files from raw data directory
    - Practice mapping/lookup data for geographical information
    - All data is processed through NHS_HERBOT for column normalisation

    Examples
    --------
    >>> config = NHSPracticeAnalysisConfig()
    >>> stage = DataLoadingStage(config)
    >>> context = {"extracted_files": file_paths}
    >>> updated_context = stage.run(context)
    """

    def __init__(self, config: NHSPracticeAnalysisConfig):
        """
        Initialize the data loading stage.

        Parameters
        ----------
        config : NHSPracticeAnalysisConfig
            Configuration object containing data paths and parameters.
        """
        super().__init__(
            inputs="extracted_files", outputs="raw_data", name="data_loading"
        )
        self.config = config

    def _discover_csv_files(self):
        """
        Dynamically discover CSV files in the raw data directory.

        Returns
        -------
        dict
            Dictionary mapping dataset names to file paths.
        """
        raw_dir = self.config.raw_data_dir
        csv_files = {}

        pattern = str(raw_dir / self.config.csv_file_pattern)
        matching_files = glob.glob(pattern)

        for file_path in matching_files:
            path_obj = Path(file_path)
            dataset_name = path_obj.stem
            csv_files[dataset_name] = path_obj
            logger.info(f"Discovered dataset: {dataset_name} -> {path_obj}")

        return csv_files

    def run(self, context):
        """
        Load NHS practice level crosstab data from extracted CSV files.

        Parameters
        ----------
        context : dict
            Pipeline execution context containing extracted file paths.

        Returns
        -------
        dict
            Updated pipeline context containing loaded datasets.
        """
        logger.info("Loading NHS practice level crosstab data...")

        # Discover CSV files to load
        csv_files = self._discover_csv_files()

        if not csv_files:
            logger.warning("No CSV files found matching pattern")
            logger.info(f"Searched in: {self.config.raw_data_dir}")
            logger.info(f"Pattern: {self.config.csv_file_pattern}")

        mapping_file = self.config.lookup_data_dir / "Mapping.csv"
        loaded_data = {}

        # Load discovered CSV files
        for month, file_path in csv_files.items():
            if file_path.exists():
                logger.info(f"Loading {month} data from {file_path}")
                try:
                    raw_crosstab_df = nhs_herbot.load_csv_data(
                        dataset_name=month,
                        filepath_or_buffer=file_path,
                    )
                    norm_crosstab_df = nhs_herbot.normalise_column_names(
                        raw_crosstab_df
                    )
                    loaded_data[month] = norm_crosstab_df
                    logger.info(
                        f"Loaded {len(norm_crosstab_df)} rows for {month}"
                    )
                except Exception as e:
                    logger.error(f"Failed to load {month} data: {e}")
                    continue
            else:
                logger.warning(f"File not found: {file_path}")

        # Load mapping data
        if mapping_file.exists():
            logger.info(f"Loading mapping data from {mapping_file}")
            try:
                raw_mapping_df = nhs_herbot.load_csv_data(
                    dataset_name="Mapping",
                    filepath_or_buffer=mapping_file,
                )
                norm_mapping_df = nhs_herbot.normalise_column_names(
                    raw_mapping_df
                )
                loaded_data["mapping"] = norm_mapping_df
                logger.info(f"Loaded {len(norm_mapping_df)} mapping records")
            except Exception as e:
                logger.error(f"Failed to load mapping data: {e}")
        else:
            logger.warning(f"Mapping file not found: {mapping_file}")

        logger.info(f"Data loading complete: {list(loaded_data.keys())}")
        self._store_outputs(context, loaded_data)
        return context

__init__(config)

Initialize the data loading stage.

Parameters:

Name Type Description Default
config NHSPracticeAnalysisConfig

Configuration object containing data paths and parameters.

required
Source code in practice_level_gp_appointments/data_processing.py
def __init__(self, config: NHSPracticeAnalysisConfig):
    """
    Initialize the data loading stage.

    Parameters
    ----------
    config : NHSPracticeAnalysisConfig
        Configuration object containing data paths and parameters.
    """
    super().__init__(
        inputs="extracted_files", outputs="raw_data", name="data_loading"
    )
    self.config = config

run(context)

Load NHS practice level crosstab data from extracted CSV files.

Parameters:

Name Type Description Default
context dict

Pipeline execution context containing extracted file paths.

required

Returns:

Type Description
dict

Updated pipeline context containing loaded datasets.

Source code in practice_level_gp_appointments/data_processing.py
def run(self, context):
    """
    Load NHS practice level crosstab data from extracted CSV files.

    Parameters
    ----------
    context : dict
        Pipeline execution context containing extracted file paths.

    Returns
    -------
    dict
        Updated pipeline context containing loaded datasets.
    """
    logger.info("Loading NHS practice level crosstab data...")

    # Discover CSV files to load
    csv_files = self._discover_csv_files()

    if not csv_files:
        logger.warning("No CSV files found matching pattern")
        logger.info(f"Searched in: {self.config.raw_data_dir}")
        logger.info(f"Pattern: {self.config.csv_file_pattern}")

    mapping_file = self.config.lookup_data_dir / "Mapping.csv"
    loaded_data = {}

    # Load discovered CSV files
    for month, file_path in csv_files.items():
        if file_path.exists():
            logger.info(f"Loading {month} data from {file_path}")
            try:
                raw_crosstab_df = nhs_herbot.load_csv_data(
                    dataset_name=month,
                    filepath_or_buffer=file_path,
                )
                norm_crosstab_df = nhs_herbot.normalise_column_names(
                    raw_crosstab_df
                )
                loaded_data[month] = norm_crosstab_df
                logger.info(
                    f"Loaded {len(norm_crosstab_df)} rows for {month}"
                )
            except Exception as e:
                logger.error(f"Failed to load {month} data: {e}")
                continue
        else:
            logger.warning(f"File not found: {file_path}")

    # Load mapping data
    if mapping_file.exists():
        logger.info(f"Loading mapping data from {mapping_file}")
        try:
            raw_mapping_df = nhs_herbot.load_csv_data(
                dataset_name="Mapping",
                filepath_or_buffer=mapping_file,
            )
            norm_mapping_df = nhs_herbot.normalise_column_names(
                raw_mapping_df
            )
            loaded_data["mapping"] = norm_mapping_df
            logger.info(f"Loaded {len(norm_mapping_df)} mapping records")
        except Exception as e:
            logger.error(f"Failed to load mapping data: {e}")
    else:
        logger.warning(f"Mapping file not found: {mapping_file}")

    logger.info(f"Data loading complete: {list(loaded_data.keys())}")
    self._store_outputs(context, loaded_data)
    return context

OutputStage

Bases: PipelineStage

Stage for saving outputs (tables, figures, reports).

Source code in practice_level_gp_appointments/output.py
class OutputStage(PipelineStage):
    """Stage for saving outputs (tables, figures, reports)."""

    def __init__(self, config: NHSPracticeAnalysisConfig):
        super().__init__(
            inputs=("combined_data", "summary_statistics", "figures"),
            outputs="output_files",
            name="output",
        )
        self.config = config

    def run(self, context):
        """Save outputs (tables, figures, reports)."""
        combined_data, summary_stats, figures = self._get_input_values(context)
        logger.info("Saving outputs...")

        # Create output directories
        processed_dir = self.config.processed_data_dir
        figures_dir = self.config.figures_dir
        processed_dir.mkdir(exist_ok=True)
        figures_dir.mkdir(exist_ok=True)

        outputs = {}

        # Save combined data
        combined_data_path = processed_dir / "combined_data.csv"
        combined_data.to_csv(combined_data_path, index=False)
        outputs["combined_data"] = str(combined_data_path)
        logger.info(f"Saved combined data: {combined_data_path}")

        # Save summary tables
        for summary_name, summary_df in summary_stats.items():
            if summary_name != "overall":
                # Use simple naming: monthly_summary.csv, hcp_summary.csv, etc.
                filename = f"{summary_name}.csv"
                summary_path = processed_dir / filename
                summary_df.to_csv(summary_path, index=False)
                outputs[f"summary_{summary_name}"] = str(summary_path)
                logger.info(f"Saved summary: {summary_path}")

        # Save figures
        for fig_name, fig in figures.items():
            output_path = (
                figures_dir / f"{fig_name}.{self.config.figure_format}"
            )
            fig.savefig(
                output_path,
                dpi=self.config.figure_dpi,
                bbox_inches=self.config.figure_bbox_inches,
            )
            outputs[f"figure_{fig_name}"] = str(output_path)
            logger.info(f"Saved figure: {output_path}")

        # Create a summary report
        report_path = processed_dir / "pipeline_report.txt"
        with open(report_path, "w", encoding="utf-8") as f:
            f.write("NHS Practice Level Crosstabs Pipeline Report\n")
            f.write("=" * 50 + "\n\n")

            f.write("Data Summary:\n")
            f.write(f"- Total records processed: {len(combined_data):,}\n")
            f.write(f"- Data months: {combined_data['data_month'].unique()}\n")
            f.write(
                f"- Number of unique GP practices: "
                f"{combined_data['gp_code'].nunique()}\n"
            )
            f.write(
                f"- Total appointments: "
                f"{combined_data['count_of_appointments'].sum():,}\n\n"
            )

            f.write("Summary Tables Created:\n")
            for output_name in outputs:
                if output_name.startswith("summary_"):
                    f.write(f"- {outputs[output_name]}\n")

            f.write("\nVisualizations Created:\n")
            for output_name in outputs:
                if output_name.startswith("figure_"):
                    f.write(f"- {outputs[output_name]}\n")

        outputs["report"] = str(report_path)
        logger.info(f"Saved report: {report_path}")

        logger.info(f"Output stage complete: Saved {len(outputs)} outputs")
        self._store_outputs(context, outputs)
        return context

run(context)

Save outputs (tables, figures, reports).

Source code in practice_level_gp_appointments/output.py
def run(self, context):
    """Save outputs (tables, figures, reports)."""
    combined_data, summary_stats, figures = self._get_input_values(context)
    logger.info("Saving outputs...")

    # Create output directories
    processed_dir = self.config.processed_data_dir
    figures_dir = self.config.figures_dir
    processed_dir.mkdir(exist_ok=True)
    figures_dir.mkdir(exist_ok=True)

    outputs = {}

    # Save combined data
    combined_data_path = processed_dir / "combined_data.csv"
    combined_data.to_csv(combined_data_path, index=False)
    outputs["combined_data"] = str(combined_data_path)
    logger.info(f"Saved combined data: {combined_data_path}")

    # Save summary tables
    for summary_name, summary_df in summary_stats.items():
        if summary_name != "overall":
            # Use simple naming: monthly_summary.csv, hcp_summary.csv, etc.
            filename = f"{summary_name}.csv"
            summary_path = processed_dir / filename
            summary_df.to_csv(summary_path, index=False)
            outputs[f"summary_{summary_name}"] = str(summary_path)
            logger.info(f"Saved summary: {summary_path}")

    # Save figures
    for fig_name, fig in figures.items():
        output_path = (
            figures_dir / f"{fig_name}.{self.config.figure_format}"
        )
        fig.savefig(
            output_path,
            dpi=self.config.figure_dpi,
            bbox_inches=self.config.figure_bbox_inches,
        )
        outputs[f"figure_{fig_name}"] = str(output_path)
        logger.info(f"Saved figure: {output_path}")

    # Create a summary report
    report_path = processed_dir / "pipeline_report.txt"
    with open(report_path, "w", encoding="utf-8") as f:
        f.write("NHS Practice Level Crosstabs Pipeline Report\n")
        f.write("=" * 50 + "\n\n")

        f.write("Data Summary:\n")
        f.write(f"- Total records processed: {len(combined_data):,}\n")
        f.write(f"- Data months: {combined_data['data_month'].unique()}\n")
        f.write(
            f"- Number of unique GP practices: "
            f"{combined_data['gp_code'].nunique()}\n"
        )
        f.write(
            f"- Total appointments: "
            f"{combined_data['count_of_appointments'].sum():,}\n\n"
        )

        f.write("Summary Tables Created:\n")
        for output_name in outputs:
            if output_name.startswith("summary_"):
                f.write(f"- {outputs[output_name]}\n")

        f.write("\nVisualizations Created:\n")
        for output_name in outputs:
            if output_name.startswith("figure_"):
                f.write(f"- {outputs[output_name]}\n")

    outputs["report"] = str(report_path)
    logger.info(f"Saved report: {report_path}")

    logger.info(f"Output stage complete: Saved {len(outputs)} outputs")
    self._store_outputs(context, outputs)
    return context

NHSPracticeAnalysisPipeline

Bases: Pipeline

NHS Practice Level Crosstabs Analysis Pipeline.

This pipeline processes NHS practice level appointment data through five sequential stages to produce comprehensive analysis outputs including summary statistics, visualizations, and reports.

Parameters:

Name Type Description Default
config NHSPracticeAnalysisConfig

Configuration object containing data paths, processing parameters, and output specifications.

required

Attributes:

Name Type Description
config NHSPracticeAnalysisConfig

The configuration object passed during initialisation.

Methods:

Name Description
run_analysis

Execute the complete pipeline and return exit code.

Notes

The pipeline stages are: 1. Data Loading Stage - Load CSV files from raw data directory 2. Data Joining Phase - Combine monthly data with mapping information 3. Summarisation Stage - Generate statistical summaries and metrics 4. Graphing Stage - Create visualizations and charts 5. Output Stage - Save processed data, figures, and reports

Examples:

>>> config = NHSPracticeAnalysisConfig()
>>> pipeline = NHSPracticeAnalysisPipeline(config)
>>> exit_code = pipeline.run_analysis()
>>> print(f"Pipeline completed with exit code: {exit_code}")
Source code in practice_level_gp_appointments/pipeline.py
class NHSPracticeAnalysisPipeline(Pipeline):
    """
    NHS Practice Level Crosstabs Analysis Pipeline.

    This pipeline processes NHS practice level appointment data through five
    sequential stages to produce comprehensive analysis outputs including
    summary statistics, visualizations, and reports.

    Parameters
    ----------
    config : NHSPracticeAnalysisConfig
        Configuration object containing data paths, processing parameters,
        and output specifications.

    Attributes
    ----------
    config : NHSPracticeAnalysisConfig
        The configuration object passed during initialisation.

    Methods
    -------
    run_analysis()
        Execute the complete pipeline and return exit code.

    Notes
    -----
    The pipeline stages are:
    1. Data Loading Stage - Load CSV files from raw data directory
    2. Data Joining Phase - Combine monthly data with mapping information
    3. Summarisation Stage - Generate statistical summaries and metrics
    4. Graphing Stage - Create visualizations and charts
    5. Output Stage - Save processed data, figures, and reports

    Examples
    --------
    >>> config = NHSPracticeAnalysisConfig()
    >>> pipeline = NHSPracticeAnalysisPipeline(config)
    >>> exit_code = pipeline.run_analysis()
    >>> print(f"Pipeline completed with exit code: {exit_code}")
    """

    def __init__(self, config: NHSPracticeAnalysisConfig):
        """
        Initialize the NHS Practice Analysis Pipeline.

        Parameters
        ----------
        config : NHSPracticeAnalysisConfig
            Configuration object containing all pipeline parameters.
        """
        stages = [
            DataExtractionStage(config),
            DataLoadingStage(config),
            DataJoiningStage(),
            SummarisationStage(config),
            GraphingStage(config),
            OutputStage(config),
        ]
        super().__init__(config, stages)

    def run_analysis(self):
        """
        Run the complete NHS practice level analysis pipeline.

        This method orchestrates the execution of all pipeline stages,
        handles validation, error management, and logging.

        Returns
        -------
        int
            Exit code: 0 for success, 1 for failure.

        Raises
        ------
        Exception
            Any unhandled exception during pipeline execution will be
            caught and logged, returning exit code 1.

        Notes
        -----
        The method performs the following operations:
        - Validates pipeline configuration and stages
        - Generates unique run identifier with timestamp
        - Executes all pipeline stages in sequence
        - Handles errors and provides appropriate logging
        """
        logger.log("START", "NHS Practice Level Crosstabs Analysis Pipeline")

        try:
            self.validate()
            logger.success("Pipeline validation successful")
        except Exception as e:
            logger.error(f"Pipeline validation failed: {e}")
            return 1

        try:
            timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
            run_id = f"nhs_practice_analysis_{timestamp}"
            self.run(run_id)
            return 0
        except Exception as e:
            logger.error(f"Pipeline failed: {e}")
            return 1

__init__(config)

Initialize the NHS Practice Analysis Pipeline.

Parameters:

Name Type Description Default
config NHSPracticeAnalysisConfig

Configuration object containing all pipeline parameters.

required
Source code in practice_level_gp_appointments/pipeline.py
def __init__(self, config: NHSPracticeAnalysisConfig):
    """
    Initialize the NHS Practice Analysis Pipeline.

    Parameters
    ----------
    config : NHSPracticeAnalysisConfig
        Configuration object containing all pipeline parameters.
    """
    stages = [
        DataExtractionStage(config),
        DataLoadingStage(config),
        DataJoiningStage(),
        SummarisationStage(config),
        GraphingStage(config),
        OutputStage(config),
    ]
    super().__init__(config, stages)

run_analysis()

Run the complete NHS practice level analysis pipeline.

This method orchestrates the execution of all pipeline stages, handles validation, error management, and logging.

Returns:

Type Description
int

Exit code: 0 for success, 1 for failure.

Raises:

Type Description
Exception

Any unhandled exception during pipeline execution will be caught and logged, returning exit code 1.

Notes

The method performs the following operations: - Validates pipeline configuration and stages - Generates unique run identifier with timestamp - Executes all pipeline stages in sequence - Handles errors and provides appropriate logging

Source code in practice_level_gp_appointments/pipeline.py
def run_analysis(self):
    """
    Run the complete NHS practice level analysis pipeline.

    This method orchestrates the execution of all pipeline stages,
    handles validation, error management, and logging.

    Returns
    -------
    int
        Exit code: 0 for success, 1 for failure.

    Raises
    ------
    Exception
        Any unhandled exception during pipeline execution will be
        caught and logged, returning exit code 1.

    Notes
    -----
    The method performs the following operations:
    - Validates pipeline configuration and stages
    - Generates unique run identifier with timestamp
    - Executes all pipeline stages in sequence
    - Handles errors and provides appropriate logging
    """
    logger.log("START", "NHS Practice Level Crosstabs Analysis Pipeline")

    try:
        self.validate()
        logger.success("Pipeline validation successful")
    except Exception as e:
        logger.error(f"Pipeline validation failed: {e}")
        return 1

    try:
        timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
        run_id = f"nhs_practice_analysis_{timestamp}"
        self.run(run_id)
        return 0
    except Exception as e:
        logger.error(f"Pipeline failed: {e}")
        return 1

GraphingStage

Bases: PipelineStage

Stage for creating visualizations and graphs.

Source code in practice_level_gp_appointments/visualization.py
class GraphingStage(PipelineStage):
    """Stage for creating visualizations and graphs."""

    def __init__(self, config: NHSPracticeAnalysisConfig):
        super().__init__(
            inputs="summary_statistics", outputs="figures", name="graphing"
        )
        self.config = config

    def run(self, context):
        """Create visualizations and graphs."""
        summary_stats = self._get_input_values(context)[0]
        logger.info("Creating visualizations...")

        # Set up matplotlib style for NHS branding
        plt.style.use("default")
        sns.set_palette("husl")

        figures = {}

        # Figure 1: Monthly appointments by status
        if "monthly_by_status" in summary_stats:
            fig1, ax1 = plt.subplots(figsize=(12, 6))
            monthly_pivot = (
                summary_stats["monthly_by_status"]
                .pivot(
                    index="data_month",
                    columns="appt_status",
                    values="count_of_appointments",
                )
                .fillna(0)
            )
            monthly_pivot.plot(kind="bar", ax=ax1, stacked=True)
            ax1.set_title(
                "Monthly Appointments by Status",
                fontsize=14,
                fontweight="bold",
            )
            ax1.set_xlabel("Month")
            ax1.set_ylabel("Number of Appointments")
            ax1.legend(
                title="Appointment Status",
                bbox_to_anchor=(1.05, 1),
                loc="upper left",
            )
            plt.xticks(rotation=45)
            plt.tight_layout()
            figures["monthly_appointments_by_status"] = fig1

        # Figure 2: HCP Type Distribution
        if "hcp_type_summary" in summary_stats:
            fig2, ax2 = plt.subplots(figsize=(10, 6))
            hcp_data = summary_stats["hcp_type_summary"].head(10)
            sns.barplot(
                data=hcp_data, x="total_appointments", y="hcp_type", ax=ax2
            )
            ax2.set_title(
                "Total Appointments by Healthcare Professional Type",
                fontsize=14,
                fontweight="bold",
            )
            ax2.set_xlabel("Total Appointments")
            ax2.set_ylabel("HCP Type")
            plt.tight_layout()
            figures["hcp_type_distribution"] = fig2

        # Figure 3: Appointment Mode Trends
        if "mode_by_month" in summary_stats:
            fig3, ax3 = plt.subplots(figsize=(12, 6))
            mode_pivot = (
                summary_stats["mode_by_month"]
                .pivot(
                    index="data_month",
                    columns="appt_mode",
                    values="count_of_appointments",
                )
                .fillna(0)
            )
            mode_pivot.plot(kind="line", ax=ax3, marker="o")
            ax3.set_title(
                "Appointment Mode Trends Across Months",
                fontsize=14,
                fontweight="bold",
            )
            ax3.set_xlabel("Month")
            ax3.set_ylabel("Number of Appointments")
            ax3.legend(
                title="Appointment Mode",
                bbox_to_anchor=(1.05, 1),
                loc="upper left",
            )
            plt.xticks(rotation=45)
            plt.tight_layout()
            figures["appointment_mode_trends"] = fig3

        # Figure 4: Regional Distribution (if available)
        if "regional_summary" in summary_stats:
            fig4, ax4 = plt.subplots(figsize=(12, 8))
            regional_data = summary_stats["regional_summary"]
            sns.barplot(
                data=regional_data,
                x="total_appointments",
                y="region_name",
                ax=ax4,
            )
            ax4.set_title(
                "Total Appointments by Region", fontsize=14, fontweight="bold"
            )
            ax4.set_xlabel("Total Appointments")
            ax4.set_ylabel("Region")
            plt.tight_layout()
            figures["regional_distribution"] = fig4

        # Figure 5: Booking Time Analysis
        if "booking_time_summary" in summary_stats:
            fig5, ax5 = plt.subplots(figsize=(10, 6))
            booking_data = summary_stats["booking_time_summary"]
            sns.barplot(
                data=booking_data,
                x="time_between_book_and_appt",
                y="count_of_appointments",
                ax=ax5,
            )
            ax5.set_title(
                "Appointments by Time Between Booking and Appointment",
                fontsize=14,
                fontweight="bold",
            )
            ax5.set_xlabel("Time Between Booking and Appointment")
            ax5.set_ylabel("Number of Appointments")
            plt.xticks(rotation=45)
            plt.tight_layout()
            figures["booking_time_analysis"] = fig5

        logger.info(f"Created {len(figures)} visualizations")
        self._store_outputs(context, figures)
        return context

run(context)

Create visualizations and graphs.

Source code in practice_level_gp_appointments/visualization.py
def run(self, context):
    """Create visualizations and graphs."""
    summary_stats = self._get_input_values(context)[0]
    logger.info("Creating visualizations...")

    # Set up matplotlib style for NHS branding
    plt.style.use("default")
    sns.set_palette("husl")

    figures = {}

    # Figure 1: Monthly appointments by status
    if "monthly_by_status" in summary_stats:
        fig1, ax1 = plt.subplots(figsize=(12, 6))
        monthly_pivot = (
            summary_stats["monthly_by_status"]
            .pivot(
                index="data_month",
                columns="appt_status",
                values="count_of_appointments",
            )
            .fillna(0)
        )
        monthly_pivot.plot(kind="bar", ax=ax1, stacked=True)
        ax1.set_title(
            "Monthly Appointments by Status",
            fontsize=14,
            fontweight="bold",
        )
        ax1.set_xlabel("Month")
        ax1.set_ylabel("Number of Appointments")
        ax1.legend(
            title="Appointment Status",
            bbox_to_anchor=(1.05, 1),
            loc="upper left",
        )
        plt.xticks(rotation=45)
        plt.tight_layout()
        figures["monthly_appointments_by_status"] = fig1

    # Figure 2: HCP Type Distribution
    if "hcp_type_summary" in summary_stats:
        fig2, ax2 = plt.subplots(figsize=(10, 6))
        hcp_data = summary_stats["hcp_type_summary"].head(10)
        sns.barplot(
            data=hcp_data, x="total_appointments", y="hcp_type", ax=ax2
        )
        ax2.set_title(
            "Total Appointments by Healthcare Professional Type",
            fontsize=14,
            fontweight="bold",
        )
        ax2.set_xlabel("Total Appointments")
        ax2.set_ylabel("HCP Type")
        plt.tight_layout()
        figures["hcp_type_distribution"] = fig2

    # Figure 3: Appointment Mode Trends
    if "mode_by_month" in summary_stats:
        fig3, ax3 = plt.subplots(figsize=(12, 6))
        mode_pivot = (
            summary_stats["mode_by_month"]
            .pivot(
                index="data_month",
                columns="appt_mode",
                values="count_of_appointments",
            )
            .fillna(0)
        )
        mode_pivot.plot(kind="line", ax=ax3, marker="o")
        ax3.set_title(
            "Appointment Mode Trends Across Months",
            fontsize=14,
            fontweight="bold",
        )
        ax3.set_xlabel("Month")
        ax3.set_ylabel("Number of Appointments")
        ax3.legend(
            title="Appointment Mode",
            bbox_to_anchor=(1.05, 1),
            loc="upper left",
        )
        plt.xticks(rotation=45)
        plt.tight_layout()
        figures["appointment_mode_trends"] = fig3

    # Figure 4: Regional Distribution (if available)
    if "regional_summary" in summary_stats:
        fig4, ax4 = plt.subplots(figsize=(12, 8))
        regional_data = summary_stats["regional_summary"]
        sns.barplot(
            data=regional_data,
            x="total_appointments",
            y="region_name",
            ax=ax4,
        )
        ax4.set_title(
            "Total Appointments by Region", fontsize=14, fontweight="bold"
        )
        ax4.set_xlabel("Total Appointments")
        ax4.set_ylabel("Region")
        plt.tight_layout()
        figures["regional_distribution"] = fig4

    # Figure 5: Booking Time Analysis
    if "booking_time_summary" in summary_stats:
        fig5, ax5 = plt.subplots(figsize=(10, 6))
        booking_data = summary_stats["booking_time_summary"]
        sns.barplot(
            data=booking_data,
            x="time_between_book_and_appt",
            y="count_of_appointments",
            ax=ax5,
        )
        ax5.set_title(
            "Appointments by Time Between Booking and Appointment",
            fontsize=14,
            fontweight="bold",
        )
        ax5.set_xlabel("Time Between Booking and Appointment")
        ax5.set_ylabel("Number of Appointments")
        plt.xticks(rotation=45)
        plt.tight_layout()
        figures["booking_time_analysis"] = fig5

    logger.info(f"Created {len(figures)} visualizations")
    self._store_outputs(context, figures)
    return context

Module Documentation

Module Description Key Components
Data Processing Data loading, cleaning, and transformation DataLoadingStage, DataJoiningStage
Analytics Statistical analysis and summaries SummarisationStage
Visualization Chart generation and plotting Visualization functions
Output Data export and report generation OutputStage
Pipeline Pipeline orchestration and workflow NHSPracticeAnalysisPipeline