Scaffold implementation

Hello,

I have tried to take over the scaffold implementation from the niid_bench (git_hub). I made a few changes but the logic of the scaffold remained unchanged.

The error occurs that > torch.from_numpy(cv + cv_multiplier * aggregated_cv_update[i])
> IndexError: list index out of range.

I have created a few outputs to show me the problem

Aggregated parameters are empty (None).
The dictionary with additional results is empty.
aggregated_result (None, {})
Size of server_cv: %d, Size of aggregated_cv_update: %d 105 0
Length of self.server_cv: 105
Length of aggregated_cv_update: 0

It shows that the aggregated_ results are empty. Here is the fit_round function from the Scaffold_Server class

def fit_round(
        self,
        server_round: int,
        timeout: Optional[float],
    ) -> Optional[
        Tuple[Optional[Parameters], Dict[str, Scalar], FitResultsAndFailures]
    ]:
        """Perform a single round of federated averaging."""
        # Get clients and their respective instructions from strateg
        client_instructions = self.strategy.configure_fit(
            server_round=server_round,
            parameters=update_parameters_with_cv(self.parameters, self.server_cv),
            client_manager=self._client_manager,
        )

        if not client_instructions:
            log(INFO, "fit_round %s: no clients selected, cancel", server_round)
            return None
        log(
            DEBUG,
            "fit_round %s: strategy sampled %s clients (out of %s)",
            server_round,
            len(client_instructions),
            self._client_manager.num_available(),
        )

        # Collect `fit` results from all clients participating in this round
        results, failures = fit_clients(
            client_instructions=client_instructions,
            max_workers=self.max_workers,
            timeout=timeout,
        )
        log(
            DEBUG,
            "fit_round %s received %s results and %s failures",
            server_round,
            len(results),
            len(failures),
        )

        # Aggregate training results
        aggregated_result: Tuple[Optional[Parameters], Dict[str, Scalar]] = (
            self.strategy.aggregate_fit(server_round, results, failures)
        )
        # Check if the first element (aggregated parameters) is None or not
        if aggregated_result[0] is None:
            print("Aggregated parameters are empty (None).")
        else:
            print("Aggregated parameters are available.")

        # Check if the second element (the dictionary) is empty or not
        if not aggregated_result[1]:
            print("The dictionary with additional results is empty.")
        else:
            print("The dictionary with additional results contains data.")

        aggregated_result_arrays_combined = []
        if aggregated_result[0] is not None:
            aggregated_result_arrays_combined = parameters_to_ndarrays(
                aggregated_result[0]
            )
        print('aggregated_result', aggregated_result)
        aggregated_parameters = aggregated_result_arrays_combined[
            : len(aggregated_result_arrays_combined) // 2
        ]
        aggregated_cv_update = aggregated_result_arrays_combined[
            len(aggregated_result_arrays_combined) // 2 :
        ]

        # convert server cv into ndarrays
        server_cv_np = [cv.numpy() for cv in self.server_cv]
        # update server cv
        total_clients = len(self._client_manager.all())
        cv_multiplier = len(results) / total_clients

It should also be noted that I do not use the main method but start the server via

server = ScaffoldServer(strategy=strategy) fl.server.start_server(server_address=“0.0.0.0:8080”, server=server, config=fl.server.ServerConfig(num_rounds=1)). 

In the Github project it is done via the main method with start_simulation …

Hi,
The error is likely in the fit_clients function since you get all the Nones there. If you share more code then I might be able to help further.

Yes of course,

This is the Server class

'''
class ScaffoldServer(Server):
    """Implement server for SCAFFOLD."""

    def __init__(
        self,
        strategy: ScaffoldStrategy,    # strategy : Strategy
        client_manager: Optional[ClientManager] = None,
    ):
        if client_manager is None:
            client_manager = SimpleClientManager()
        super().__init__(client_manager=client_manager, strategy=strategy)
        self.server_cv: List[torch.Tensor] = []

        #self.strategy = ScaffoldStrategy

    def _get_initial_parameters(self, server_round: int, timeout: Optional[float]) -> Parameters:
        """Get initial parameters from one of the available clients."""
        # Server-side parameter initialization
        parameters: Optional[Parameters] = self.strategy.initialize_parameters(
            client_manager=self._client_manager
        )
        if parameters is not None:
            log(INFO, "Using initial parameters provided by strategy")
            return parameters

        # Get initial parameters from one of the clients
        log(INFO, "Requesting initial parameters from one random client")
        random_client = self._client_manager.sample(1)[0]
        #log(INFO, random_client)
        print(random_client)
        log(INFO, f"Selected client: {random_client.cid}")
        ins = GetParametersIns(config={})
        get_parameters_res = random_client.get_parameters(ins=ins, timeout=timeout, group_id=server_round)
        log(INFO, "Received initial parameters from one random client")
        self.server_cv = [
            torch.from_numpy(t)
            for t in parameters_to_ndarrays(get_parameters_res.parameters)
        ]
        return get_parameters_res.parameters

    # pylint: disable=too-many-locals
    def fit_round(
        self,
        server_round: int,
        timeout: Optional[float],
    ) -> Optional[
        Tuple[Optional[Parameters], Dict[str, Scalar], FitResultsAndFailures]
    ]:
        """Perform a single round of federated averaging."""
        # Get clients and their respective instructions from strateg
        client_instructions = self.strategy.configure_fit(
            server_round=server_round,
            parameters=update_parameters_with_cv(self.parameters, self.server_cv),
            client_manager=self._client_manager,
        )

        if not client_instructions:
            log(INFO, "fit_round %s: no clients selected, cancel", server_round)
            return None
        log(
            DEBUG,
            "fit_round %s: strategy sampled %s clients (out of %s)",
            server_round,
            len(client_instructions),
            self._client_manager.num_available(),
        )

        # Collect `fit` results from all clients participating in this round
        results, failures = fit_clients(
            client_instructions=client_instructions,
            max_workers=self.max_workers,
            timeout=timeout,
        )
        log(
            DEBUG,
            "fit_round %s received %s results and %s failures",
            server_round,
            len(results),
            len(failures),
        )

        # Aggregate training results
        aggregated_result: Tuple[Optional[Parameters], Dict[str, Scalar]] = (
            self.strategy.aggregate_fit(server_round, results, failures)
        )
        # Check if the first element (aggregated parameters) is None or not
        if aggregated_result[0] is None:
            print("Aggregated parameters are empty (None).")
        else:
            print("Aggregated parameters are available.")

        # Check if the second element (the dictionary) is empty or not
        if not aggregated_result[1]:
            print("The dictionary with additional results is empty.")
        else:
            print("The dictionary with additional results contains data.")

        aggregated_result_arrays_combined = []
        if aggregated_result[0] is not None:
            aggregated_result_arrays_combined = parameters_to_ndarrays(
                aggregated_result[0]
            )
        print('aggregated_result', aggregated_result)
        aggregated_parameters = aggregated_result_arrays_combined[
            : len(aggregated_result_arrays_combined) // 2
        ]
        aggregated_cv_update = aggregated_result_arrays_combined[
            len(aggregated_result_arrays_combined) // 2 :
        ]

        # convert server cv into ndarrays
        server_cv_np = [cv.numpy() for cv in self.server_cv]
        # update server cv
        total_clients = len(self._client_manager.all())
        cv_multiplier = len(results) / total_clients

        print( "Size of server_cv: %d, Size of aggregated_cv_update: %d", len(self.server_cv), len(aggregated_cv_update))
        print("Length of self.server_cv:", len(self.server_cv))
        print("Length of aggregated_cv_update:", len(aggregated_cv_update))

        self.server_cv = [
            torch.from_numpy(cv + cv_multiplier * aggregated_cv_update[i])
            for i, cv in enumerate(server_cv_np)
        ]

        # update parameters x = x + 1* aggregated_update
        curr_params = parameters_to_ndarrays(self.parameters)
        updated_params = [
            x + aggregated_parameters[i] for i, x in enumerate(curr_params)
        ]
        parameters_updated = ndarrays_to_parameters(updated_params)

        # metrics
        metrics_aggregated = aggregated_result[1]
        return parameters_updated, metrics_aggregated, (results, failures)


def update_parameters_with_cv(
    parameters: Parameters, s_cv: List[torch.Tensor]
) -> Parameters:
    """Extend the list of parameters with the server control variate."""
    # extend the list of parameters arrays with the cv arrays
    cv_np = [cv.numpy() for cv in s_cv]
    parameters_np = parameters_to_ndarrays(parameters)
    parameters_np.extend(cv_np)
    return ndarrays_to_parameters(parameters_np)


def fit_clients(
    client_instructions: List[Tuple[ClientProxy, FitIns]],
    max_workers: Optional[int],
    timeout: Optional[float],
) -> FitResultsAndFailures:
    """Refine parameters concurrently on all selected clients."""
    with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
        submitted_fs = {
            executor.submit(fit_client, client_proxy, ins, timeout)
            for client_proxy, ins in client_instructions
        }
        finished_fs, _ = concurrent.futures.wait(
            fs=submitted_fs,
            timeout=None,  # Handled in the respective communication stack
        )

    # Gather results
    results: List[Tuple[ClientProxy, FitRes]] = []
    failures: List[Union[Tuple[ClientProxy, FitRes], BaseException]] = []
    for future in finished_fs:
        _handle_finished_future_after_fit(
            future=future, results=results, failures=failures
        )
    return results, failures


def fit_client(
    client: ClientProxy, ins: FitIns, timeout: Optional[float]
) -> Tuple[ClientProxy, FitRes]:
    """Refine parameters on a single client."""
    fit_res = client.fit(ins, timeout=timeout)
    return client, fit_res


def _handle_finished_future_after_fit(
    future: concurrent.futures.Future,  # type: ignore
    results: List[Tuple[ClientProxy, FitRes]],
    failures: List[Union[Tuple[ClientProxy, FitRes], BaseException]],
) -> None:
    """Convert finished future into either a result or a failure."""
    # Check if there was an exception
    failure = future.exception()
    if failure is not None:
        failures.append(failure)
        return

    # Successfully received a result from a client
    result: Tuple[ClientProxy, FitRes] = future.result()
    _, res = result

    # Check result status code
    if res.status.code == Code.OK:
        results.append(result)
        return

    # Not successful, client returned a result where the status code is not OK
    failures.append(result)

As I see fit_clients, fit_client and _handle_finished_future_after_evaluate was taken over from the Base Server class ( flwr.server.server). This is almost taken from the git hub reporisitroy (flower/baselines/niid_bench/niid_bench/server_scaffold.py at main · adap/flower · GitHub). (only without model : DictConfig.

strategy = ScaffoldStrategy(
    fraction_fit=1.0,  
    fraction_evaluate=1.0,  
    min_fit_clients=2,  
    min_evaluate_clients=2,  
    min_available_clients=2,  
    evaluate_metrics_aggregation_fn=weighted_average,

I only selected 2 clients to test the whole thing and get it running. One of the only big differences is that I don’t use the main.py class

further additional information: I get a TypeError in the Server.py file in def fit_clients function

submitted_fs {<Future at 0x731d749c3790 state=finished raised TypeError>, <Future at 0x731d749c1cf0 state=finished raised TypeError>}
finished_fs {<Future at 0x731d749c3790 state=finished raised TypeError>, <Future at 0x731d749c1cf0 state=finished raised TypeError>}

my fit method in the client

 def fit(self, parameters,config):
        server_cv = parameters[len(parameters) // 2 :]
        parameters = parameters[: len(parameters) // 2]
        self.set_parameters(parameters)
        self.client_cv = []
        # load client control variate
        if os.path.exists(f"{self.dir}/client_cv_{self.client_index}.pt"):
            self.client_cv = torch.load(f"{self.dir}/client_cv_{self.client_index}.pt")
        # convert the server control variate to a list of tensors
        server_cv = [torch.Tensor(cv) for cv in server_cv]
        for param in model.parameters(config={}):
            self.client_cv.append(param.clone().detach())
        train(model, self.train, self.client_cv, server_cv, learning_rate=self.learning_rate, epochs=1)
        x = parameters
        y_i = self.get_parameters()
        c_i_n = []
        server_update_x = []
        server_update_c = []
        # update client control variate c_i_1 = c_i - c + 1/eta*K (x - y_i)
        for c_i_j, c_j, x_j, y_i_j in zip(self.client_cv, server_cv, x, y_i):
            c_i_n.append(
                c_i_j
                - c_j
                + (1.0 / (self.learning_rate * 25 * len(self.train)))
                * (x_j - y_i_j)
            )
            # y_i - x, c_i_n - c_i for the server
            server_update_x.append((y_i_j - x_j))
            server_update_c.append((c_i_n[-1] - c_i_j).cpu().numpy())
        self.client_cv = c_i_n

        print('self.client_cv',self.client_cv )
        torch.save(self.client_cv, f"{self.dir}/client_cv_{self.client_index}.pt")

        combined_updates = server_update_x + server_update_c
        return combined_updates , len(self.train), {}

Hello again,

I just can’t get any further and can’t fix the error. Something is wrong with the fit method. I would be very grateful if someone could give me a possible suggestion on how to solve this. Everything looks right. I don’t know what could be wrong. I’m using Flower 1.8.0 but I’ve tried other versions too.

In Server file

def fit_clients(
    client_instructions: List[Tuple[ClientProxy, FitIns]],
    max_workers: Optional[int],
    timeout: Optional[float],
) -> FitResultsAndFailures:
    """Refine parameters concurrently on all selected clients."""
    with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
        submitted_fs = {
            executor.submit(fit_client, client_proxy, ins, timeout)
            for client_proxy, ins in client_instructions
        }
        for client_proxy, ins in client_instructions:
            print('ins', parameters_to_ndarrays(ins.parameters))
        finished_fs, _ = concurrent.futures.wait(
            fs=submitted_fs,
            timeout=None,  # Handled in the respective communication stack
        )
        print('submitted_fs', submitted_fs)

    # Gather results
    results: List[Tuple[ClientProxy, FitRes]] = []
    failures: List[Union[Tuple[ClientProxy, FitRes], BaseException]] = []
    for future in finished_fs:
        _handle_finished_future_after_fit(
            future=future, results=results, failures=failures
        )
    return results, failures


def fit_client(
    client: ClientProxy, ins: FitIns, timeout: Optional[float]
) -> Tuple[ClientProxy, FitRes]:
    """Refine parameters on a single client."""
    fit_res = client.fit(ins, timeout=timeout)
    return client, fit_res

My Flower Client

class FlowerClient1(fl.client.NumPyClient):

    def __init__(self, client_index):
        args = parse_args()
        self.client_index = args.client_index
        self.model = model
        train1,test2 = train_loader, test_loader
        self.train = train1[self.client_index]
        self.test = test2[self.client_index]
        self.client_cvalue = []
        for param in self.model.parameters():
            self.client_cvalue.append(torch.zeros(param.shape))
        #save_dir = ""
        #if save_dir == "":
        #    save_dir = "clients_cvs"
        self.dir = "client_cvs"
        if not os.path.exists(self.dir):
            os.makedirs(self.dir)

    def get_parameters(self, config):
        return [val.cpu().numpy() for _, val in model.state_dict().items()]

    def set_parameters(self, parameters):
        params_dict = zip(model.state_dict().keys(), parameters)
        state_dict = OrderedDict({k: torch.tensor(v) for k, v in params_dict})
        model.load_state_dict(state_dict, strict=True)

    def fit(self, parameters, config):
        model_parameter = parameters[len(parameters) // 2 :]
        server_C = parameters[: len(parameters) // 2]
        self.set_parameters(model_parameter)
        self.client_cvalue = []
        for param in self.model.parameters():
            self.client_cvalue.append(param.clone().detach())
        self.client_cvalue = [cv.cpu().numpy() for cv in self.client_cvalue]

        if os.path.exists(f"{self.dir}/client_cv_{self.client_index}.pkl"):
            with open(f"{self.dir}/client_cv_{self.client_index}.pkl", 'rb') as f:
                self.client_cvalue = pickle.load(f)
        else:
            with open(f"{self.dir}/client_cv_{self.client_index}.pkl", 'wb') as f:
                pickle.dump(self.client_cvalue, f)
            
        server_cv = [torch.Tensor(sv) for sv in server_C]
        self.client_cvalue = [torch.Tensor(cv) for cv in self.client_cvalue]
        train(model, self.train, self.client_cvalue, server_cv, epochs=1)
        
        c_i_n = []
        x = model_parameter
        y_i = self.get_parameters(config={})

        for ci, c, x_model, local_model in zip(self.client_cvalue, server_cv, x, y_i):
            c_i_n.append(ci - c + + (1.0 / (self.learning_rate * 1 * len(self.train)))* (x_model - local_model)
            )
        
        c_delta = []
        y_delta = []

        for param_yi, param_x in zip(y_i, x):
            y_delta.append(param_yi - param_x)

        for c_new, c_old in zip(c_i_n, self.client_cvalue):
            c_delta.append(c_new - c_old)

        combined_parameters = y_delta + c_delta

        return combined_parameters, len(self.train.dataset), {}

I tried to create pickle files but with pytorch files it also dont work.
The problem is that it doesn’t create the file.

… f"{self.dir}/client_cv_{self.client_index}.pt" …

This suggests once again that the fit function is not working. If you need more code, I can provide it. I would really appreciate it if someone could suggest a solution.

Hello,

I wanted to inform you that I have found the solution to the NoneType error. The issue was caused by the model and some of the data being stored on different devices. This small oversight caused significant frustration. However, after identifying this problem, I was able to resolve several other minor errors that followed.

1 Like

This topic was automatically closed 7 days after the last reply. New replies are no longer allowed.