Extending flwr to use sikit-learn XGBClassifier - Error when serializing/sending model coeficients

*This question was migrated from Github Discussions.

Original questions:
"Dear flwr community,

I am an engineer working on a project were we want to use xgboost on federated cvs data, meaning, traditional machine learning from skit-learn. I face some challenges but ended up with a some-what first attempt that it is almost working if not for data model serialization.

On the client side

I created a server based on the numpy client as:

class XGBoostClient( fl.client.NumPyClient ):
    def __init__( self, model ):
        self.model = model

    def get_parameters( self, config ):
        return get_parameters( self.model )

    def fit( self, parameters, config ):
        self.model = model_from_paramaters( parameters )
        loss = log_loss( y_train, self.model.predict_proba( X_train ) )
        self.model.fit( X_train, y_train )
        return (
            get_parameters( self.model ),
            len( X_train ),
            { 'loss': float( loss ) },
        )

    def evaluate( self, parameters, config ):
        self.model = model_from_paramaters( parameters )
        loss = log_loss( y_test, self.model.predict_proba( X_test ) )
        accuracy = float( self.model.score( X_test, y_test ) )
        return float( loss ), len( X_test ), { 'loss': float(loss), 'accuracy': accuracy }

With the following implementation for the get_parameters and model_from_paramaters :

def get_parameters( model ):
    tmp_model = Path( tmp_path / 'model_tmp_prms.json' )
    model.get_booster().save_model( tmp_model )
    with open( tmp_model, 'r' ) as fr:
        params = json.load( fr )
    return [ params ]

def model_from_paramaters( parameters ):
    tmp_model = Path( tmp_path / 'model_tmp_fit.json' )
    with open( tmp_model, 'w' ) as fw:
        json.dump( parameters, fw )
    model = XGBClassifier()
    model.load_model( tmp_model )
    return model

Those are used to create a flwr-client as:

X, y = load_data( data_path )
X_train, X_test, y_train, y_test = train_test_split( X, y, test_size = 0.2, random_state = 42 )

model = XGBClassifier(
     learning_rate    = 0.1,
     min_child_weight = 1,
     gamma            = 1,
     subsample        = 0.8,
     colsample_bytree = 0.6,
     max_depth        = 4
)
model.fit( X_train, y_train )
client = XGBoostClient( model )
fl.client.start_numpy_client(
    server_address = f"{central_ip}:{central_port}", 
    client = client
)

As you see, I am using the model.get_booster().save_model( tmp_model ) and the model.load_model( tmp_model ) of the XGBClassifier in order to serialized the model definition (aka. coefficients) to be transmitted to and from the central flwr.

On the server side

Now, for the server I started with the same philosophy:

def get_evaluate_fn( model, X_test, y_test ):
    def evaluate( server_round, parameters ):
        model = model_from_paramaters( parameters )
        y_pred = model.predict( X_test )
        loss = log_loss( y_test, y_pred )
        accuracy = model.score( X_test, y_test )
        return loss, { 'accuracy': accuracy }
    return evaluate

def model_from_paramaters( parameters ):
    parameters = json.loads( str( parameters.tensors, 'utf-8' ) )
    tmp_model = Path( tmp_path ) / 'model_tmp_fit.json'
    with open( tmp_model, 'w' ) as fw:
        json.dump( parameters, fw )
    model = XGBClassifier()
    model.load_model( tmp_model )
    return model

def evaluate_for_xgb( self, server_round, parameters ):
    eval_res = self.evaluate_fn( server_round, parameters ) 
    if eval_res is None:
        return None
    loss, metrics = eval_res
    return loss, metrics

def get_parameters( model ):
    class ParamWrap( dict ): # Wrapper to propagate the json-like definition of the model
        def __init__( self, params ):
            dict.__init__( self )
            self.tensors = json.dumps( params ).encode( 'utf-8' ) 
            self.tensor_type = ''
            self.parameters = json.dumps( params ).encode( 'utf-8' ) 
            self.config = {}

    tmp_model = Path( tmp_path ) / 'model_tmp_prms.json'
    model.get_booster().save_model( tmp_model )
    with open( tmp_model, 'r' ) as fr:
        params = json.load( fr )

    return ParamWrap( params )

X, y = load_data( data_path )
X_train, X_test, y_train, y_test = train_test_split( X, y, test_size = 0.2, random_state = 42 )

#[...]

model = xgb.XGBClassifier(
    learning_rate    = 0.1,
    min_child_weight = 2,
    gamma            = 1,
    subsample        = 0.8,
    colsample_bytree = 0.6,
    max_depth        = 4
)
model.fit( X_test, y_test )
parameters = get_parameters( model )

strategy = fl.server.strategy.FedAvg(
    evaluate_fn        = get_evaluate_fn( model, X_test, y_test ),
    on_fit_config_fn   = fit_round,
    initial_parameters = parameters
)
strategy.evaluate = types.MethodType( evaluate_for_xgb, strategy )

history = fl.server.start_server(
    erver_address  = '0.0.0.0:9900',
    config         = fl.server.ServerConfig( num_rounds = 3 ),
    strategy       = strategy 
)

# [...]

In the server I impersonated the FedAvg 's evaluate method for evaluate_for_xgb in order to bypass the line 162 of fedavg.py corresponding to:

parameters_ndarrays = parameters_to_ndarrays(parameters)

Than cannot be applied to the serialized-json version from the XGBClassifier.

But I am having some issues when sending the coefficients of the model. From the log, and placing some extra logs and prints in the library I found:

DEBUG flwr 2023-03-01 17:24:34,298 | server.py:165 | evaluate_round 1: strategy sampled 1 clients (out of 1)
--> results []
--> failures [TypeError('123 has type int, but expected one of: bytes')]

Going a bit deeper I found that it corresponds to a serialization error in sarde.py . Here I tried to emulate the issue:

parameters = Parameters( tensors = ParamWrap( params ).tensors, tensor_type = ParamWrap( params ).tensor_type )
TypeError: 123 has type int, but expected one of: bytes

In other words: I am trying to send the serialized-json obtained from the parameters of an XGBClassifier but got an error when trying to send the model to the contributor nodes. How I should solve this? Can I use the client_manager and client_proxy to solve this issue?"

Answer:
"The solution to my issue was to change the way how I was implementing the server. So as this:

from flwr.common import ndarrays_to_parameters, parameters_to_ndarrays

def model_from_paramaters( parameters, tmp_path, tag = '' ):
    parameters = parameters[ 0 ]
    tmp_model = Path( tmp_path ) / 'model_tmp_fit.json'
    parameters = json.loads( str( parameters ) )
    with open( tmp_model, 'w' ) as fw:
        json.dump( parameters, fw )
    model = XGBClassifier()
    model.load_model( tmp_model )
    return model

class XGBAvgStrategy( fl.server.strategy.FedAvg ):
    def aggregate_fit( self, rnd, results, failures ):
        if not results:
            return None, {}
        status       = [ x[1].status for x in results ]
        parameters   = [ json.loads( parameters_to_ndarrays( x[1].parameters )[ 0 ].item( 0 ) ) for x in results ]
        num_examples = [ x[1].num_examples for x in results ]
        metrics      = [ x[1].metrics for x in results ]
        aggregated_params = self.aggregate( status, parameters, num_examples, metrics )
        model = model_from_paramaters( aggregated_params, tmp_path, 'server' )
        model.fit( X_train, y_train )
        return ndarrays_to_parameters( np.array( get_parameters( model, tmp_path ) ) ),  {} # parameters, metrics

    def aggregate( self, status, parameters, num_examples, metrics ):
        # [wathever you need - ideas are welcome]
        parameters = json.dumps( parameters[ 0 ] )
        return [ parameters ]