Hello,
I’m using Flower Deploy with Docker instances, and I’ve implemented a custom client manager.
For each client registration, I schedule a recurring task (using apscheduler) to collect the client’s properties every X seconds.
Currently, I collect basic system metrics such as RAM and CPU usage.
Here’s the relevant code snippet:
class ClientProps:
cid: str
system: dict = field(default_factory=dict)
dataset: dict = field(default_factory=dict)
metrics: deque = field(default_factory=lambda: deque(maxlen=10))
class MyClientManager(SimpleClientManager):
def __init__(self, ctx: ServerContext, criterion: Optional[Criterion] = None) -> None:
super().__init__()
self.ctx = ctx
self.clients_info: dict[str, ClientProps] = {}
self.criterion = criterion
self.scheduler: Optional[BackgroundScheduler] = None
if self.ctx.server_cfg.collect_metrics:
self.scheduler = BackgroundScheduler()
self.scheduler.start()
def register(self, client: ClientProxy) -> bool:
debug(f"Registering client {client.cid}")
success = super().register(client)
self.setup_client_info(client)
return success
def _append_new_metrics(self, client: ClientProxy):
tik = time.perf_counter()
client_props = self.clients_info[client.cid]
new_metrics = self.remote_client_props(client, "metrics")
client_props.metrics.append(new_metrics)
tok = time.perf_counter()
info(f"Collected new metrics from client {client.cid} in {tok - tik:.4f} seconds")
def setup_client_info(self, client: ClientProxy):
# Initialize the ClientProps for this client if it doesn't exist
if client.cid not in self.clients_info:
self.clients_info[client.cid] = ClientProps(cid=client.cid)
self.clients_info[client.cid].system = self.remote_client_props(client, "system")
if self.ctx.server_cfg.collect_metrics:
interval = self.ctx.server_cfg.collect_metrics_interval
self.scheduler.add_job(
self._append_new_metrics, args=(client,),
trigger="interval", seconds=interval + random.randint(-5, 5),
id=f"metrics-{client.cid}", replace_existing=True
)
@staticmethod
def remote_client_props(client, props_type):
properties = client.get_properties(GetPropertiesIns({"props_type": props_type}), None, None).properties
props_dict = dict(properties)
return props_dict
By default, metrics are collected every 60 seconds.
However, I’ve noticed that sometimes the collection process completes quickly (in about 5 seconds), while other times it takes much longer (up to 30 seconds).
Based on my debugging, it seems that the client’s fit function occupies the gRPC channel and blocks the get_properties call until fit has finished.
Question:
Is there a way to enable concurrent calls over the gRPC channel so that get_properties can run while fit is still in progress?