Hi, I have some difficulties trying to run flower simulation.
Why does my aggregate_fit function not receive any results from client though configure_fit is already success. Here is my logging error
INFO : Starting Flower ServerApp, config: num_rounds=10, no round_timeout
INFO :
INFO : [INIT]
INFO : Using initial global parameters provided by strategy
INFO : Starting evaluation of initial global parameters
INFO : Evaluation returned no results (`None`)
INFO :
INFO : [ROUND 1]
INFO : configure_fit: strategy sampled 10 clients (out of 10)
INFO : aggregate_fit: received 0 results and 10 failures
Here is my client
def is_dropout(partition_id: int, num_partitions: int) -> bool:
return random.random() < DROPOUT
class FlowerClient(Client):
def __init__(self, partition_id, net, trainloader, valloader, num_partitions):
self.partition_id = partition_id
self.net = net
self.trainloader = trainloader
self.valloader = valloader
self.num_partitions = num_partitions
self.previous_parameters = None
def get_parameters(self, ins: GetParametersIns) -> GetParametersRes:
ndarrays = get_parameters(self.net)
parameters = ndarrays_to_parameters(ndarrays)
status = Status(code=Code.OK, message="Success")
return GetParametersRes(status=status, parameters=parameters)
def fit(self, ins: FitIns) -> FitRes:
# Check dropout first
if is_dropout(self.partition_id, self.num_partitions):
print(f"[Client {self.partition_id}] dropped out in round {ins.config.get('server_round')}")
status = Status(code=Code.CLIENT_NOT_AVAILABLE, message="Client dropped out")
return FitRes(
status=status,
parameters=ins.parameters,
num_examples=0,
metrics={}
)
# Decode parameters
ndarrays_original = parameters_to_ndarrays(ins.parameters)
set_parameters(self.net, ndarrays_original)
# Decode prototypes
encoded_prototypes = ins.config.get("global_prototypes", {})
global_prototypes = encoded_prototypes.get("prototypes")
# Get training parameters
lr = ins.config.get("lr", 0.001)
epochs = ins.config.get("epochs", 1)
# train_loss, train_accuracy, prototypes = train(self.net, self.trainloader, global_prototypes, epochs=epochs, lr=lr, id=self.partition_id)
train_loss, train_accuracy, local_prototypes = 0.0, 0.0, []
train_loss, train_accuracy = float(train_loss), float(train_accuracy)
ndarrays_updated = get_parameters(self.net)
parameters_updated = ndarrays_to_parameters(ndarrays_updated)
self.previous_parameters = parameters_updated
# Calculate label counts
from collections import Counter
label_counts = Counter()
for _, label in self.trainloader.dataset:
if isinstance(label, torch.Tensor):
label = label.item()
label_counts[label] += 1
print(f"[Client {self.partition_id}] Fit completed successfully, sending results")
status = Status(code=Code.OK, message="Success")
return FitRes(
status=status,
parameters=parameters_updated,
num_examples=sum(label_counts.values()),
metrics={
"train_loss": train_loss,
"train_accuracy": train_accuracy,
"num_classes": int(self.trainloader.dataset.num_classes),
"local_prototypes": {"prototypes": local_prototypes},
"label_counts": dict(label_counts)
}
)
def evaluate(self, ins: EvaluateIns) -> EvaluateRes:
if is_dropout(self.partition_id, self.num_partitions):
print(f"[Client {self.partition_id}] dropped out in evaluation round {ins.config.get('server_round')}")
status = Status(code=CustomCode.DROPPED_OUT, message="Client dropped out")
return EvaluateRes(
status=status,
loss=0.0,
num_examples=0,
metrics={}
)
print(f"[Client {self.partition_id}] evaluate, config: {ins.config}")
ndarrays_original = parameters_to_ndarrays(ins.parameters)
set_parameters(self.net, ndarrays_original)
val_loss, val_accuracy = test(self.net, self.valloader, self.partition_id)
val_loss, val_accuracy = float(val_loss), float(val_accuracy)
if wandb_log:
wandb_logger.log({
"round": ins.config.get("server_round"),
f"client_{self.partition_id}/val loss": val_loss,
f"client_{self.partition_id}/val acc": val_accuracy
})
status = Status(code=Code.OK, message="Success")
return EvaluateRes(
status=status,
loss=val_loss,
num_examples=len(self.valloader.dataset),
metrics={"val_accuracy": val_accuracy}
)
# Hàm tạo client
def client_fn(context: Context) -> Client:
print(f"Creating client {context.node_config['partition-id']}")
net = model.to(DEVICE)
partition_id = int(context.node_config["partition-id"])
num_partitions = context.node_config["num-partitions"]
train_subset, val_subset = load_client_datasets(partition_id, num_partitions, dataset=trainset, overlap=0.6, global_bin_pool=global_bin_pool)
trainloader = DataLoader(train_subset, batch_size=BATCHSIZE, shuffle=True)
valloader = DataLoader(val_subset, batch_size=BATCHSIZE, shuffle=False)
return FlowerClient(partition_id, net, trainloader, valloader, num_partitions)
and server
class FedCustom(Strategy):
def __init__(
self,
fraction_fit: float = 1.0,
fraction_evaluate: float = 1.0,
min_fit_clients: int = 1,
min_evaluate_clients: int = 1,
min_available_clients: int = 1,
evaluate_fn: Optional[Callable[[int, NDArrays, Dict[str, Scalar]], Optional[Tuple[float, Dict[str, Scalar]]]]] = None
):
super().__init__()
self.fraction_fit = fraction_fit
self.fraction_evaluate = fraction_evaluate
self.min_fit_clients = min_fit_clients
self.min_evaluate_clients = min_evaluate_clients
self.min_available_clients = min_available_clients
self.evaluate_fn = evaluate_fn
self.global_prototypes = [torch.zeros(264) for _ in range(2)] # create pseudo-global prototypes
def initialize_parameters(self, client_manager: ClientManager) -> Optional[Parameters]:
net = model
ndarrays = get_parameters(net)
return ndarrays_to_parameters(ndarrays)
def configure_fit(self, server_round: int, parameters: Parameters, client_manager: ClientManager) -> List[Tuple[ClientProxy, FitIns]]:
sample_size, min_num_clients = self.num_fit_clients(client_manager.num_available())
clients = client_manager.sample(num_clients=sample_size, min_num_clients=min_num_clients)
try:
encoded_prototypes = {"prototypes": self.global_prototypes}
except Exception as e:
print(f"Error encoding prototypes {e}")
num_classes, feature_dim = len(self.global_prototypes), len(self.global_prototypes[0])
encoded_prototypes = {"prototypes": [[0.0] * feature_dim for _ in range(num_classes)]}
standard_config = {"lr": LR,
"epochs": epochs_round,
"server_round": server_round,
"global_prototypes": encoded_prototypes
}
fit_configurations = []
for client in clients:
fit_configurations.append((client, FitIns(parameters, standard_config)))
return fit_configurations
def aggregate_fit(
self,
server_round: int,
results: List[Tuple[ClientProxy, FitRes]],
failures: List[Union[Tuple[ClientProxy, FitRes], BaseException]]
) -> Tuple[Optional[Parameters], Dict[str, Scalar]]:
if not results:
return None, {}
valid_results = [(client_proxy, res) for client_proxy, res in results if res.status.code == Code.OK]
if not valid_results:
return None, {}
weights_results = [
(parameters_to_ndarrays(res.parameters), res.num_examples)
for _, res in valid_results
]
# sample size-based weighted average
parameters_aggregated = ndarrays_to_parameters(
flwr.server.strategy.aggregate.aggregate(weights_results)
)
# update global prototypes
for _, res in valid_results:
for label in res.metrics["label_counts"]:
proto = res.metrics["local_prototypes"]["prototypes"][label]
count = res.metrics["label_counts"][label]
self.global_prototypes[label] += (count * proto) / res.num_examples
# save model
if wandb_log:
if server_round % 2 == 0:
save_model_parameters(parameters_aggregated, file_path="final_model.pth")
total_examples = sum(res.num_examples for _, res in valid_results)
train_loss_agg = sum(
res.num_examples * res.metrics["train_loss"] for _, res in valid_results
) / total_examples if total_examples > 0 else 0.0
train_accuracy_agg = sum(
res.num_examples * res.metrics["train_accuracy"] for _, res in valid_results
) / total_examples if total_examples > 0 else 0.0
metrics_aggregated = {
"train_loss": train_loss_agg,
"train_accuracy": train_accuracy_agg,
"clients_participated": len(valid_results)
}
if wandb_log:
wandb_logger.log({
"round": server_round,
"server/train loss": train_loss_agg,
"server/train accuracy": train_accuracy_agg,
"clients_participated_fit": len(valid_results)
})
HISTORY["train_loss"].append(train_loss_agg)
HISTORY["train_accuracy"].append(train_accuracy_agg)
HISTORY["clients_participated_fit"].append(len(valid_results))
return parameters_aggregated, metrics_aggregated
def configure_evaluate(
self, server_round: int, parameters: Parameters, client_manager: ClientManager
) -> List[Tuple[ClientProxy, EvaluateIns]]:
if self.fraction_evaluate == 0.0:
return []
config = {"server_round": server_round}
evaluate_ins = EvaluateIns(parameters, config)
sample_size, min_num_clients = self.num_evaluation_clients(client_manager.num_available())
clients = client_manager.sample(num_clients=sample_size, min_num_clients=min_num_clients)
return [(client, evaluate_ins) for client in clients]
def aggregate_evaluate(
self,
server_round: int,
results: List[Tuple[ClientProxy, EvaluateRes]],
failures: List[Union[Tuple[ClientProxy, EvaluateRes], BaseException]]
) -> Tuple[Optional[float], Dict[str, Scalar]]:
if not results:
print(f"Round {server_round}: No results received")
return None, {"val_accuracy": 0.0, "clients_participated": 0}
valid_results = [
(client_proxy, res) for client_proxy, res in results
if res.status.code == Code.OK
]
if not valid_results:
print(f"Round {server_round}: No valid evaluation results")
return None, {"val_accuracy": 0.0, "clients_participated": 0}
total_examples = sum(res.num_examples for _, res in valid_results)
loss_aggregated = flwr.server.strategy.aggregate.weighted_loss_avg(
[(res.num_examples, res.loss) for _, res in valid_results]
)
val_accuracy_aggregated = sum(
res.num_examples * res.metrics["val_accuracy"] for _, res in valid_results
) / total_examples if total_examples > 0 else 0.0
metrics_aggregated = {
"val_loss": loss_aggregated,
"val_accuracy": val_accuracy_aggregated,
"clients_participated": len(valid_results)
}
if wandb_log:
wandb_logger.log({
"round": server_round,
"server/val loss": loss_aggregated,
"server/val accuracy": val_accuracy_aggregated,
"clients_participated_eval": len(valid_results)
})
HISTORY["val_loss"].append(loss_aggregated)
HISTORY["val_accuracy"].append(val_accuracy_aggregated)
HISTORY["clients_participated_eval"].append(len(valid_results))
return loss_aggregated, metrics_aggregated
def evaluate(self, server_round: int, parameters: Parameters) -> Optional[Tuple[float, Dict[str, Scalar]]]:
if server_round == 0 or self.evaluate_fn is None:
return None
parameters_ndarrays = parameters_to_ndarrays(parameters)
eval_res = self.evaluate_fn(server_round, parameters_ndarrays, {})
if eval_res is None:
return None
return eval_res
def num_fit_clients(self, num_available_clients: int) -> Tuple[int, int]:
num_clients = int(num_available_clients * self.fraction_fit)
return max(num_clients, self.min_fit_clients), self.min_available_clients
def num_evaluation_clients(self, num_available_clients: int) -> Tuple[int, int]:
num_clients = int(num_available_clients * self.fraction_evaluate)
return max(num_clients, self.min_evaluate_clients), self.min_available_clients
# Hàm đánh giá trên server
def evaluate_fn(server_round: int, parameters: NDArrays, config: Dict[str, Scalar]) -> Optional[Tuple[float, Dict[str, Scalar]]]:
net = model.to(DEVICE)
set_parameters(net, parameters)
testloader = load_server_test_datasets(data=testset)
test_loss, test_accuracy = test(net, testloader, id=-1, server_test=True)
if wandb_log:
wandb_logger.log({
"round": server_round,
"server/test loss": test_loss,
"server/test accuracy": test_accuracy
})
HISTORY["test_loss"].append(test_loss)
HISTORY["test_accuracy"].append(test_accuracy)
return test_loss, {"test_accuracy": test_accuracy}
# Server
def server_fn(context: Context) -> ServerAppComponents:
strategy = FedCustom(
fraction_fit=1.0,
fraction_evaluate=1.0,
min_fit_clients=1,
min_evaluate_clients=1,
min_available_clients=NUM_CLIENTS,
evaluate_fn=evaluate_fn
)
config = ServerConfig(num_rounds=NUM_ROUNDS)
return ServerAppComponents(strategy=strategy, config=config)
UPDATE:
I found a solution to my problem.
The encoded_prototypes I’m passing through configure_fit is Dict and Flower only accept int, str, float, byte,… Therefore, I have use pickle to convert it into byte and send it to client