Scaffold implementation

Hello,

I have tried to take over the scaffold implementation from the niid_bench (git_hub). I made a few changes but the logic of the scaffold remained unchanged.

The error occurs that > torch.from_numpy(cv + cv_multiplier * aggregated_cv_update[i])
> IndexError: list index out of range.

I have created a few outputs to show me the problem

Aggregated parameters are empty (None).
The dictionary with additional results is empty.
aggregated_result (None, {})
Size of server_cv: %d, Size of aggregated_cv_update: %d 105 0
Length of self.server_cv: 105
Length of aggregated_cv_update: 0

It shows that the aggregated_ results are empty. Here is the fit_round function from the Scaffold_Server class

def fit_round(
        self,
        server_round: int,
        timeout: Optional[float],
    ) -> Optional[
        Tuple[Optional[Parameters], Dict[str, Scalar], FitResultsAndFailures]
    ]:
        """Perform a single round of federated averaging."""
        # Get clients and their respective instructions from strateg
        client_instructions = self.strategy.configure_fit(
            server_round=server_round,
            parameters=update_parameters_with_cv(self.parameters, self.server_cv),
            client_manager=self._client_manager,
        )

        if not client_instructions:
            log(INFO, "fit_round %s: no clients selected, cancel", server_round)
            return None
        log(
            DEBUG,
            "fit_round %s: strategy sampled %s clients (out of %s)",
            server_round,
            len(client_instructions),
            self._client_manager.num_available(),
        )

        # Collect `fit` results from all clients participating in this round
        results, failures = fit_clients(
            client_instructions=client_instructions,
            max_workers=self.max_workers,
            timeout=timeout,
        )
        log(
            DEBUG,
            "fit_round %s received %s results and %s failures",
            server_round,
            len(results),
            len(failures),
        )

        # Aggregate training results
        aggregated_result: Tuple[Optional[Parameters], Dict[str, Scalar]] = (
            self.strategy.aggregate_fit(server_round, results, failures)
        )
        # Check if the first element (aggregated parameters) is None or not
        if aggregated_result[0] is None:
            print("Aggregated parameters are empty (None).")
        else:
            print("Aggregated parameters are available.")

        # Check if the second element (the dictionary) is empty or not
        if not aggregated_result[1]:
            print("The dictionary with additional results is empty.")
        else:
            print("The dictionary with additional results contains data.")

        aggregated_result_arrays_combined = []
        if aggregated_result[0] is not None:
            aggregated_result_arrays_combined = parameters_to_ndarrays(
                aggregated_result[0]
            )
        print('aggregated_result', aggregated_result)
        aggregated_parameters = aggregated_result_arrays_combined[
            : len(aggregated_result_arrays_combined) // 2
        ]
        aggregated_cv_update = aggregated_result_arrays_combined[
            len(aggregated_result_arrays_combined) // 2 :
        ]

        # convert server cv into ndarrays
        server_cv_np = [cv.numpy() for cv in self.server_cv]
        # update server cv
        total_clients = len(self._client_manager.all())
        cv_multiplier = len(results) / total_clients

It should also be noted that I do not use the main method but start the server via

server = ScaffoldServer(strategy=strategy) fl.server.start_server(server_address=“0.0.0.0:8080”, server=server, config=fl.server.ServerConfig(num_rounds=1)). 

In the Github project it is done via the main method with start_simulation …

Hi,
The error is likely in the fit_clients function since you get all the Nones there. If you share more code then I might be able to help further.

Yes of course,

This is the Server class

'''
class ScaffoldServer(Server):
    """Implement server for SCAFFOLD."""

    def __init__(
        self,
        strategy: ScaffoldStrategy,    # strategy : Strategy
        client_manager: Optional[ClientManager] = None,
    ):
        if client_manager is None:
            client_manager = SimpleClientManager()
        super().__init__(client_manager=client_manager, strategy=strategy)
        self.server_cv: List[torch.Tensor] = []

        #self.strategy = ScaffoldStrategy

    def _get_initial_parameters(self, server_round: int, timeout: Optional[float]) -> Parameters:
        """Get initial parameters from one of the available clients."""
        # Server-side parameter initialization
        parameters: Optional[Parameters] = self.strategy.initialize_parameters(
            client_manager=self._client_manager
        )
        if parameters is not None:
            log(INFO, "Using initial parameters provided by strategy")
            return parameters

        # Get initial parameters from one of the clients
        log(INFO, "Requesting initial parameters from one random client")
        random_client = self._client_manager.sample(1)[0]
        #log(INFO, random_client)
        print(random_client)
        log(INFO, f"Selected client: {random_client.cid}")
        ins = GetParametersIns(config={})
        get_parameters_res = random_client.get_parameters(ins=ins, timeout=timeout, group_id=server_round)
        log(INFO, "Received initial parameters from one random client")
        self.server_cv = [
            torch.from_numpy(t)
            for t in parameters_to_ndarrays(get_parameters_res.parameters)
        ]
        return get_parameters_res.parameters

    # pylint: disable=too-many-locals
    def fit_round(
        self,
        server_round: int,
        timeout: Optional[float],
    ) -> Optional[
        Tuple[Optional[Parameters], Dict[str, Scalar], FitResultsAndFailures]
    ]:
        """Perform a single round of federated averaging."""
        # Get clients and their respective instructions from strateg
        client_instructions = self.strategy.configure_fit(
            server_round=server_round,
            parameters=update_parameters_with_cv(self.parameters, self.server_cv),
            client_manager=self._client_manager,
        )

        if not client_instructions:
            log(INFO, "fit_round %s: no clients selected, cancel", server_round)
            return None
        log(
            DEBUG,
            "fit_round %s: strategy sampled %s clients (out of %s)",
            server_round,
            len(client_instructions),
            self._client_manager.num_available(),
        )

        # Collect `fit` results from all clients participating in this round
        results, failures = fit_clients(
            client_instructions=client_instructions,
            max_workers=self.max_workers,
            timeout=timeout,
        )
        log(
            DEBUG,
            "fit_round %s received %s results and %s failures",
            server_round,
            len(results),
            len(failures),
        )

        # Aggregate training results
        aggregated_result: Tuple[Optional[Parameters], Dict[str, Scalar]] = (
            self.strategy.aggregate_fit(server_round, results, failures)
        )
        # Check if the first element (aggregated parameters) is None or not
        if aggregated_result[0] is None:
            print("Aggregated parameters are empty (None).")
        else:
            print("Aggregated parameters are available.")

        # Check if the second element (the dictionary) is empty or not
        if not aggregated_result[1]:
            print("The dictionary with additional results is empty.")
        else:
            print("The dictionary with additional results contains data.")

        aggregated_result_arrays_combined = []
        if aggregated_result[0] is not None:
            aggregated_result_arrays_combined = parameters_to_ndarrays(
                aggregated_result[0]
            )
        print('aggregated_result', aggregated_result)
        aggregated_parameters = aggregated_result_arrays_combined[
            : len(aggregated_result_arrays_combined) // 2
        ]
        aggregated_cv_update = aggregated_result_arrays_combined[
            len(aggregated_result_arrays_combined) // 2 :
        ]

        # convert server cv into ndarrays
        server_cv_np = [cv.numpy() for cv in self.server_cv]
        # update server cv
        total_clients = len(self._client_manager.all())
        cv_multiplier = len(results) / total_clients

        print( "Size of server_cv: %d, Size of aggregated_cv_update: %d", len(self.server_cv), len(aggregated_cv_update))
        print("Length of self.server_cv:", len(self.server_cv))
        print("Length of aggregated_cv_update:", len(aggregated_cv_update))

        self.server_cv = [
            torch.from_numpy(cv + cv_multiplier * aggregated_cv_update[i])
            for i, cv in enumerate(server_cv_np)
        ]

        # update parameters x = x + 1* aggregated_update
        curr_params = parameters_to_ndarrays(self.parameters)
        updated_params = [
            x + aggregated_parameters[i] for i, x in enumerate(curr_params)
        ]
        parameters_updated = ndarrays_to_parameters(updated_params)

        # metrics
        metrics_aggregated = aggregated_result[1]
        return parameters_updated, metrics_aggregated, (results, failures)


def update_parameters_with_cv(
    parameters: Parameters, s_cv: List[torch.Tensor]
) -> Parameters:
    """Extend the list of parameters with the server control variate."""
    # extend the list of parameters arrays with the cv arrays
    cv_np = [cv.numpy() for cv in s_cv]
    parameters_np = parameters_to_ndarrays(parameters)
    parameters_np.extend(cv_np)
    return ndarrays_to_parameters(parameters_np)


def fit_clients(
    client_instructions: List[Tuple[ClientProxy, FitIns]],
    max_workers: Optional[int],
    timeout: Optional[float],
) -> FitResultsAndFailures:
    """Refine parameters concurrently on all selected clients."""
    with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
        submitted_fs = {
            executor.submit(fit_client, client_proxy, ins, timeout)
            for client_proxy, ins in client_instructions
        }
        finished_fs, _ = concurrent.futures.wait(
            fs=submitted_fs,
            timeout=None,  # Handled in the respective communication stack
        )

    # Gather results
    results: List[Tuple[ClientProxy, FitRes]] = []
    failures: List[Union[Tuple[ClientProxy, FitRes], BaseException]] = []
    for future in finished_fs:
        _handle_finished_future_after_fit(
            future=future, results=results, failures=failures
        )
    return results, failures


def fit_client(
    client: ClientProxy, ins: FitIns, timeout: Optional[float]
) -> Tuple[ClientProxy, FitRes]:
    """Refine parameters on a single client."""
    fit_res = client.fit(ins, timeout=timeout)
    return client, fit_res


def _handle_finished_future_after_fit(
    future: concurrent.futures.Future,  # type: ignore
    results: List[Tuple[ClientProxy, FitRes]],
    failures: List[Union[Tuple[ClientProxy, FitRes], BaseException]],
) -> None:
    """Convert finished future into either a result or a failure."""
    # Check if there was an exception
    failure = future.exception()
    if failure is not None:
        failures.append(failure)
        return

    # Successfully received a result from a client
    result: Tuple[ClientProxy, FitRes] = future.result()
    _, res = result

    # Check result status code
    if res.status.code == Code.OK:
        results.append(result)
        return

    # Not successful, client returned a result where the status code is not OK
    failures.append(result)

As I see fit_clients, fit_client and _handle_finished_future_after_evaluate was taken over from the Base Server class ( flwr.server.server). This is almost taken from the git hub reporisitroy (flower/baselines/niid_bench/niid_bench/server_scaffold.py at main · adap/flower · GitHub). (only without model : DictConfig.

strategy = ScaffoldStrategy(
    fraction_fit=1.0,  
    fraction_evaluate=1.0,  
    min_fit_clients=2,  
    min_evaluate_clients=2,  
    min_available_clients=2,  
    evaluate_metrics_aggregation_fn=weighted_average,

I only selected 2 clients to test the whole thing and get it running. One of the only big differences is that I don’t use the main.py class

further additional information: I get a TypeError in the Server.py file in def fit_clients function

submitted_fs {<Future at 0x731d749c3790 state=finished raised TypeError>, <Future at 0x731d749c1cf0 state=finished raised TypeError>}
finished_fs {<Future at 0x731d749c3790 state=finished raised TypeError>, <Future at 0x731d749c1cf0 state=finished raised TypeError>}

my fit method in the client

 def fit(self, parameters,config):
        server_cv = parameters[len(parameters) // 2 :]
        parameters = parameters[: len(parameters) // 2]
        self.set_parameters(parameters)
        self.client_cv = []
        # load client control variate
        if os.path.exists(f"{self.dir}/client_cv_{self.client_index}.pt"):
            self.client_cv = torch.load(f"{self.dir}/client_cv_{self.client_index}.pt")
        # convert the server control variate to a list of tensors
        server_cv = [torch.Tensor(cv) for cv in server_cv]
        for param in model.parameters(config={}):
            self.client_cv.append(param.clone().detach())
        train(model, self.train, self.client_cv, server_cv, learning_rate=self.learning_rate, epochs=1)
        x = parameters
        y_i = self.get_parameters()
        c_i_n = []
        server_update_x = []
        server_update_c = []
        # update client control variate c_i_1 = c_i - c + 1/eta*K (x - y_i)
        for c_i_j, c_j, x_j, y_i_j in zip(self.client_cv, server_cv, x, y_i):
            c_i_n.append(
                c_i_j
                - c_j
                + (1.0 / (self.learning_rate * 25 * len(self.train)))
                * (x_j - y_i_j)
            )
            # y_i - x, c_i_n - c_i for the server
            server_update_x.append((y_i_j - x_j))
            server_update_c.append((c_i_n[-1] - c_i_j).cpu().numpy())
        self.client_cv = c_i_n

        print('self.client_cv',self.client_cv )
        torch.save(self.client_cv, f"{self.dir}/client_cv_{self.client_index}.pt")

        combined_updates = server_update_x + server_update_c
        return combined_updates , len(self.train), {}