*This question was migrated from Github Discussions.
Original questions:
"Dear flwr community,
I am an engineer working on a project were we want to use xgboost on federated cvs data, meaning, traditional machine learning from skit-learn. I face some challenges but ended up with a some-what first attempt that it is almost working if not for data model serialization.
On the client side
I created a server based on the numpy client as:
class XGBoostClient( fl.client.NumPyClient ):
def __init__( self, model ):
self.model = model
def get_parameters( self, config ):
return get_parameters( self.model )
def fit( self, parameters, config ):
self.model = model_from_paramaters( parameters )
loss = log_loss( y_train, self.model.predict_proba( X_train ) )
self.model.fit( X_train, y_train )
return (
get_parameters( self.model ),
len( X_train ),
{ 'loss': float( loss ) },
)
def evaluate( self, parameters, config ):
self.model = model_from_paramaters( parameters )
loss = log_loss( y_test, self.model.predict_proba( X_test ) )
accuracy = float( self.model.score( X_test, y_test ) )
return float( loss ), len( X_test ), { 'loss': float(loss), 'accuracy': accuracy }
With the following implementation for the get_parameters
and model_from_paramaters
:
def get_parameters( model ):
tmp_model = Path( tmp_path / 'model_tmp_prms.json' )
model.get_booster().save_model( tmp_model )
with open( tmp_model, 'r' ) as fr:
params = json.load( fr )
return [ params ]
def model_from_paramaters( parameters ):
tmp_model = Path( tmp_path / 'model_tmp_fit.json' )
with open( tmp_model, 'w' ) as fw:
json.dump( parameters, fw )
model = XGBClassifier()
model.load_model( tmp_model )
return model
Those are used to create a flwr-client as:
X, y = load_data( data_path )
X_train, X_test, y_train, y_test = train_test_split( X, y, test_size = 0.2, random_state = 42 )
model = XGBClassifier(
learning_rate = 0.1,
min_child_weight = 1,
gamma = 1,
subsample = 0.8,
colsample_bytree = 0.6,
max_depth = 4
)
model.fit( X_train, y_train )
client = XGBoostClient( model )
fl.client.start_numpy_client(
server_address = f"{central_ip}:{central_port}",
client = client
)
As you see, I am using the model.get_booster().save_model( tmp_model )
and the model.load_model( tmp_model )
of the XGBClassifier
in order to serialized the model definition (aka. coefficients) to be transmitted to and from the central flwr.
On the server side
Now, for the server I started with the same philosophy:
def get_evaluate_fn( model, X_test, y_test ):
def evaluate( server_round, parameters ):
model = model_from_paramaters( parameters )
y_pred = model.predict( X_test )
loss = log_loss( y_test, y_pred )
accuracy = model.score( X_test, y_test )
return loss, { 'accuracy': accuracy }
return evaluate
def model_from_paramaters( parameters ):
parameters = json.loads( str( parameters.tensors, 'utf-8' ) )
tmp_model = Path( tmp_path ) / 'model_tmp_fit.json'
with open( tmp_model, 'w' ) as fw:
json.dump( parameters, fw )
model = XGBClassifier()
model.load_model( tmp_model )
return model
def evaluate_for_xgb( self, server_round, parameters ):
eval_res = self.evaluate_fn( server_round, parameters )
if eval_res is None:
return None
loss, metrics = eval_res
return loss, metrics
def get_parameters( model ):
class ParamWrap( dict ): # Wrapper to propagate the json-like definition of the model
def __init__( self, params ):
dict.__init__( self )
self.tensors = json.dumps( params ).encode( 'utf-8' )
self.tensor_type = ''
self.parameters = json.dumps( params ).encode( 'utf-8' )
self.config = {}
tmp_model = Path( tmp_path ) / 'model_tmp_prms.json'
model.get_booster().save_model( tmp_model )
with open( tmp_model, 'r' ) as fr:
params = json.load( fr )
return ParamWrap( params )
X, y = load_data( data_path )
X_train, X_test, y_train, y_test = train_test_split( X, y, test_size = 0.2, random_state = 42 )
#[...]
model = xgb.XGBClassifier(
learning_rate = 0.1,
min_child_weight = 2,
gamma = 1,
subsample = 0.8,
colsample_bytree = 0.6,
max_depth = 4
)
model.fit( X_test, y_test )
parameters = get_parameters( model )
strategy = fl.server.strategy.FedAvg(
evaluate_fn = get_evaluate_fn( model, X_test, y_test ),
on_fit_config_fn = fit_round,
initial_parameters = parameters
)
strategy.evaluate = types.MethodType( evaluate_for_xgb, strategy )
history = fl.server.start_server(
erver_address = '0.0.0.0:9900',
config = fl.server.ServerConfig( num_rounds = 3 ),
strategy = strategy
)
# [...]
In the server I impersonated the FedAvg
's evaluate
method for evaluate_for_xgb
in order to bypass the line 162 of fedavg.py
corresponding to:
parameters_ndarrays = parameters_to_ndarrays(parameters)
Than cannot be applied to the serialized-json version from the XGBClassifier.
But I am having some issues when sending the coefficients of the model. From the log, and placing some extra logs and prints in the library I found:
DEBUG flwr 2023-03-01 17:24:34,298 | server.py:165 | evaluate_round 1: strategy sampled 1 clients (out of 1)
--> results []
--> failures [TypeError('123 has type int, but expected one of: bytes')]
Going a bit deeper I found that it corresponds to a serialization error in sarde.py
. Here I tried to emulate the issue:
parameters = Parameters( tensors = ParamWrap( params ).tensors, tensor_type = ParamWrap( params ).tensor_type )
TypeError: 123 has type int, but expected one of: bytes
In other words: I am trying to send the serialized-json obtained from the parameters of an XGBClassifier
but got an error when trying to send the model to the contributor nodes. How I should solve this? Can I use the client_manager and client_proxy to solve this issue?"
Answer:
"The solution to my issue was to change the way how I was implementing the server. So as this:
from flwr.common import ndarrays_to_parameters, parameters_to_ndarrays
def model_from_paramaters( parameters, tmp_path, tag = '' ):
parameters = parameters[ 0 ]
tmp_model = Path( tmp_path ) / 'model_tmp_fit.json'
parameters = json.loads( str( parameters ) )
with open( tmp_model, 'w' ) as fw:
json.dump( parameters, fw )
model = XGBClassifier()
model.load_model( tmp_model )
return model
class XGBAvgStrategy( fl.server.strategy.FedAvg ):
def aggregate_fit( self, rnd, results, failures ):
if not results:
return None, {}
status = [ x[1].status for x in results ]
parameters = [ json.loads( parameters_to_ndarrays( x[1].parameters )[ 0 ].item( 0 ) ) for x in results ]
num_examples = [ x[1].num_examples for x in results ]
metrics = [ x[1].metrics for x in results ]
aggregated_params = self.aggregate( status, parameters, num_examples, metrics )
model = model_from_paramaters( aggregated_params, tmp_path, 'server' )
model.fit( X_train, y_train )
return ndarrays_to_parameters( np.array( get_parameters( model, tmp_path ) ) ), {} # parameters, metrics
def aggregate( self, status, parameters, num_examples, metrics ):
# [wathever you need - ideas are welcome]
parameters = json.dumps( parameters[ 0 ] )
return [ parameters ]