Whamcloud - gitweb
land b_colibri_devel on HEAD:
[fs/lustre-release.git] / lustre / mgs / mgs_llog.c
index 7f5eb86..e942879 100644 (file)
 #include <lustre_sec.h>
 #include "mgs_internal.h"
 
+static int mgs_get_fsdb_srpc_from_llog(struct obd_device *obd,
+                                       struct fs_db *fsdb);
+static int mgs_get_srpc_conf_log(struct fs_db *fsdb, const char *tgt,
+                                 enum lustre_sec_part from,
+                                 enum lustre_sec_part to,
+                                 struct sptlrpc_conf_log *log);
+
 /********************** Class functions ********************/
 
 /* Caller must list_del and OBD_FREE each dentry from the list */
@@ -260,6 +267,26 @@ out_pop:
         RETURN(rc);
 }
 
+static void mgs_free_fsdb_srpc(struct fs_db *fsdb)
+{
+        struct mgs_tgt_srpc_conf *tgtconf;
+
+        /* free target-specific rules */
+        while (fsdb->fsdb_srpc_tgt) {
+                tgtconf = fsdb->fsdb_srpc_tgt;
+                fsdb->fsdb_srpc_tgt = tgtconf->mtsc_next;
+
+                LASSERT(tgtconf->mtsc_tgt);
+
+                sptlrpc_rule_set_free(&tgtconf->mtsc_rset);
+                OBD_FREE(tgtconf->mtsc_tgt, strlen(tgtconf->mtsc_tgt) + 1);
+                OBD_FREE_PTR(tgtconf);
+        }
+
+        /* free general rules */
+        sptlrpc_rule_set_free(&fsdb->fsdb_srpc_gen);
+}
+
 static struct fs_db *mgs_find_fsdb(struct obd_device *obd, char *fsname)
 {
         struct mgs_obd *mgs = &obd->u.mgs;
@@ -309,6 +336,7 @@ static struct fs_db *mgs_new_fsdb(struct obd_device *obd, char *fsname)
         if (rc) 
                 GOTO(err, rc);
 
+        fsdb->fsdb_srpc_fl_udesc = 1;
         sema_init(&fsdb->fsdb_sem, 1);
         list_add(&fsdb->fsdb_list, &mgs->mgs_fs_db_list);
         lproc_mgs_add_live(obd, fsdb);
@@ -340,6 +368,7 @@ static void mgs_free_fsdb(struct obd_device *obd, struct fs_db *fsdb)
         name_destroy(&fsdb->fsdb_mdtlov); 
         name_destroy(&fsdb->fsdb_mdtlmv); 
         name_destroy(&fsdb->fsdb_mdc); 
+        mgs_free_fsdb_srpc(fsdb);
         OBD_FREE_PTR(fsdb);
 }
 
@@ -393,6 +422,14 @@ static int mgs_find_or_make_fsdb(struct obd_device *obd, char *name,
                 return rc;
         }
 
+        /* populate srpc rules from params llog */
+        rc = mgs_get_fsdb_srpc_from_llog(obd, fsdb);
+        if (rc) {
+                CERROR("Can't get db from params log %d\n", rc);
+                mgs_free_fsdb(obd, fsdb);
+                return rc;
+        }
+
         *dbh = fsdb;
 
         return 0;
@@ -712,17 +749,18 @@ static inline int record_setup(struct obd_device *obd, struct llog_handle *llh,
         return record_base(obd,llh,devname,0,LCFG_SETUP,s1,s2,s3,s4);
 }
 
-static inline int record_sec_flavor(struct obd_device *obd,
-                                    struct llog_handle *llh, char *devname,
-                                    struct sec_flavor_config *conf)
+static inline int record_sptlrpc_conf(struct obd_device *obd,
+                                      struct llog_handle *llh,
+                                      char *devname,
+                                      struct sptlrpc_conf_log *srpc_log)
 {
         struct lustre_cfg_bufs bufs;
         struct lustre_cfg *lcfg;
         int rc;
 
         lustre_cfg_bufs_reset(&bufs, devname);
-        lustre_cfg_bufs_set(&bufs, 1, conf, sizeof(*conf));
-        lcfg = lustre_cfg_new(LCFG_SEC_FLAVOR, &bufs);
+        lustre_cfg_bufs_set(&bufs, 1, srpc_log, sizeof(*srpc_log));
+        lcfg = lustre_cfg_new(LCFG_SPTLRPC_CONF, &bufs);
 
         rc = record_lcfg(obd, llh, lcfg);
 
@@ -952,7 +990,9 @@ int mgs_write_log_direct_all(struct obd_device *obd, struct fs_db *fsdb,
         /* Could use fsdb index maps instead of directory listing */
         list_for_each_entry_safe(dirent, n, &dentry_list, lld_list) {
                 list_del(&dirent->lld_list);
-                if (strncmp(fsname, dirent->lld_name, len) == 0) {
+                /* don't write to sptlrpc rule log */
+                if (strncmp(fsname, dirent->lld_name, len) == 0 &&
+                    strstr(dirent->lld_name, "-sptlrpc") == NULL) {
                         CDEBUG(D_MGS, "Changing log %s\n", dirent->lld_name);
                         /* Erase any old settings of this same parameter */
                         mgs_modify(obd, fsdb, mti, dirent->lld_name, devname, 
@@ -976,12 +1016,10 @@ struct temp_comp
         struct mgs_target_info   *comp_mti;
         struct fs_db             *comp_fsdb;
         struct obd_device        *comp_obd;
-        struct sec_flavor_config  comp_sec;
 };
 
 static int mgs_write_log_mdc_to_mdt(struct obd_device *, struct fs_db *,
-                                    struct mgs_target_info *,
-                                    struct sec_flavor_config *, char *);
+                                    struct mgs_target_info *, char *);
 
 static int mgs_steal_llog_handler(struct llog_handle *llh,
                                   struct llog_rec_hdr *rec,
@@ -993,7 +1031,6 @@ static int mgs_steal_llog_handler(struct llog_handle *llh,
         int cfg_len = rec->lrh_len;
         char *cfg_buf = (char*) (rec + 1);
         struct lustre_cfg *lcfg;
-        struct sec_flavor_config *sec_conf;
         int rc = 0;
         struct llog_handle *mdt_llh = NULL;
         static int got_an_osc_or_mdc = 0;
@@ -1009,7 +1046,6 @@ static int mgs_steal_llog_handler(struct llog_handle *llh,
         tmti = ((struct temp_comp*)data)->comp_tmti;
         fsdb = ((struct temp_comp*)data)->comp_fsdb;
         obd = ((struct temp_comp*)data)->comp_obd;
-        sec_conf = &((struct temp_comp*)data)->comp_sec;
 
         if (rec->lrh_type != OBD_CFG_REC) {
                 CERROR("unhandled lrh_type: %#x\n", rec->lrh_type);
@@ -1087,10 +1123,9 @@ static int mgs_steal_llog_handler(struct llog_handle *llh,
                 RETURN(rc);
         }
 
-        if (lcfg->lcfg_command == LCFG_SEC_FLAVOR) {
-                memcpy(sec_conf, lustre_cfg_buf(lcfg, 1), sizeof(*sec_conf));
+        /* ignore client side sptlrpc_conf_log */
+        if (lcfg->lcfg_command == LCFG_SPTLRPC_CONF)
                 RETURN(rc);
-        }
 
         if (lcfg->lcfg_command == LCFG_ADD_MDC) {
                 int index;
@@ -1102,8 +1137,7 @@ static int mgs_steal_llog_handler(struct llog_handle *llh,
                        strlen(mti->mti_fsname));
                 tmti->mti_stripe_index = index;
 
-                mgs_write_log_mdc_to_mdt(obd, fsdb, tmti, sec_conf,
-                                         mti->mti_svname);
+                mgs_write_log_mdc_to_mdt(obd, fsdb, tmti, mti->mti_svname);
                 memset(tmti, 0, sizeof(*tmti));
                 RETURN(rc);
         }
@@ -1283,37 +1317,12 @@ static int mgs_write_log_failnids(struct obd_device *obd,
         return rc;
 }
 
-static
-void extract_sec_flavor(char *params, char *key, char **ptr)
-{
-        char *val = NULL, *tail;
-        int   len;
-
-        *ptr = NULL;
-
-        if (class_find_param(params, key, &val))
-                return;
-
-        tail = strchr(val, ' ');
-        if (tail == NULL)
-                len = strlen(val);
-        else
-                len = tail - val;
-
-        OBD_ALLOC(*ptr, len + 1);
-        if (*ptr == NULL)
-                return;
-
-        memcpy(*ptr, val, len);
-        (*ptr)[len] = '\0';
-}
-
 static int mgs_write_log_mdc_to_lmv(struct obd_device *obd, struct fs_db *fsdb,
                                     struct mgs_target_info *mti,
-                                    struct sec_flavor_config *sec_conf,
                                     char *logname, char *lmvname)
 {
         struct llog_handle *llh = NULL;
+        struct sptlrpc_conf_log *srpc_log;
         char *mdcname, *nodeuuid, *mdcuuid, *lmvuuid;
         char index[5];
         int i, rc;
@@ -1327,6 +1336,16 @@ static int mgs_write_log_mdc_to_lmv(struct obd_device *obd, struct fs_db *fsdb,
         CDEBUG(D_MGS, "adding mdc for %s to log %s:lmv(%s)\n",
                mti->mti_svname, logname, lmvname);
 
+        srpc_log = sptlrpc_conf_log_alloc();
+        if (IS_ERR(srpc_log))
+                RETURN(PTR_ERR(srpc_log));
+        srpc_log->scl_part = LUSTRE_SP_CLI;
+
+        rc = mgs_get_srpc_conf_log(fsdb, mti->mti_svname,
+                                   LUSTRE_SP_CLI, LUSTRE_SP_MDT, srpc_log);
+        if (rc)
+                goto out_srpc;
+
         name_create(&nodeuuid, libcfs_nid2str(mti->mti_nids[0]), "");
         name_create(&mdcname, mti->mti_svname, "-mdc");
         name_create(&mdcuuid, mdcname, "_UUID");
@@ -1345,7 +1364,7 @@ static int mgs_write_log_mdc_to_lmv(struct obd_device *obd, struct fs_db *fsdb,
 
         rc = record_attach(obd, llh, mdcname, LUSTRE_MDC_NAME, lmvuuid);
         rc = record_setup(obd, llh, mdcname, mti->mti_uuid, nodeuuid, 0, 0);
-        rc = record_sec_flavor(obd, llh, mdcname, sec_conf);
+        rc = record_sptlrpc_conf(obd, llh, mdcname, srpc_log);
         rc = mgs_write_log_failnids(obd, mti, llh, mdcname);
         snprintf(index, sizeof(index), "%d", mti->mti_stripe_index);
         rc = record_mdc_add(obd, llh, lmvname, mdcuuid, mti->mti_uuid,
@@ -1358,16 +1377,17 @@ static int mgs_write_log_mdc_to_lmv(struct obd_device *obd, struct fs_db *fsdb,
         name_destroy(&mdcuuid);
         name_destroy(&mdcname);
         name_destroy(&nodeuuid);
+out_srpc:
+        sptlrpc_conf_log_free(srpc_log);
         RETURN(rc);
 }
 
 /* add new mdc to already existent MDS */
 static int mgs_write_log_mdc_to_mdt(struct obd_device *obd, struct fs_db *fsdb,
-                                    struct mgs_target_info *mti,
-                                    struct sec_flavor_config *sec_conf,
-                                    char *logname)
+                                    struct mgs_target_info *mti, char *logname)
 {
         struct llog_handle *llh = NULL;
+        struct sptlrpc_conf_log *srpc_log;
         char *nodeuuid, *mdcname, *mdcuuid, *mdtuuid;
         int idx = mti->mti_stripe_index;
         char index[9];
@@ -1381,6 +1401,16 @@ static int mgs_write_log_mdc_to_mdt(struct obd_device *obd, struct fs_db *fsdb,
 
         CDEBUG(D_MGS, "adding mdc index %d to %s\n", idx, logname);
 
+        srpc_log = sptlrpc_conf_log_alloc();
+        if (IS_ERR(srpc_log))
+                RETURN(PTR_ERR(srpc_log));
+        srpc_log->scl_part = LUSTRE_SP_MDT;
+
+        rc = mgs_get_srpc_conf_log(fsdb, mti->mti_svname,
+                                   LUSTRE_SP_MDT, LUSTRE_SP_MDT, srpc_log);
+        if (rc)
+                goto out_srpc;
+
         name_create(&nodeuuid, libcfs_nid2str(mti->mti_nids[0]), "");
         snprintf(index, sizeof(index), "-mdc%04x", idx);
         name_create(&mdcname, logname, index);
@@ -1396,7 +1426,7 @@ static int mgs_write_log_mdc_to_mdt(struct obd_device *obd, struct fs_db *fsdb,
         }
         rc = record_attach(obd, llh, mdcname, LUSTRE_MDC_NAME, mdcuuid);
         rc = record_setup(obd, llh, mdcname, mti->mti_uuid, nodeuuid, 0, 0);
-        rc = record_sec_flavor(obd, llh, mdcname, sec_conf);
+        rc = record_sptlrpc_conf(obd, llh, mdcname, srpc_log);
         rc = mgs_write_log_failnids(obd, mti, llh, mdcname);
         snprintf(index, sizeof(index), "%d", idx);
 
@@ -1409,6 +1439,8 @@ static int mgs_write_log_mdc_to_mdt(struct obd_device *obd, struct fs_db *fsdb,
         name_destroy(&mdcname);
         name_destroy(&nodeuuid);
         name_destroy(&mdtuuid);
+out_srpc:
+        sptlrpc_conf_log_free(srpc_log);
         RETURN(rc);
 }
 
@@ -1419,13 +1451,24 @@ static int mgs_write_log_mdt0(struct obd_device *obd, struct fs_db *fsdb,
         struct llog_handle *llh = NULL;
         char *uuid, *lovname;
         char mdt_index[5];
+        struct sptlrpc_conf_log *srpc_log;
         char *ptr = mti->mti_params;
         int rc = 0, failout = 0;
         ENTRY;
 
+        srpc_log = sptlrpc_conf_log_alloc();
+        if (IS_ERR(srpc_log))
+                RETURN(PTR_ERR(srpc_log));
+        srpc_log->scl_part = LUSTRE_SP_MDT;
+
+        rc = mgs_get_srpc_conf_log(fsdb, mti->mti_svname,
+                                   LUSTRE_SP_ANY, LUSTRE_SP_MDT, srpc_log);
+        if (rc)
+                GOTO(out_srpc, rc);
+
         OBD_ALLOC(uuid, sizeof(struct obd_uuid));
         if (uuid == NULL)
-                RETURN(-ENOMEM);
+                GOTO(out_srpc, rc = -ENOMEM);
 
         if (class_find_param(ptr, PARAM_FAILMODE, &ptr) == 0) 
                 failout = (strncmp(ptr, "failout", 7) == 0);
@@ -1448,11 +1491,14 @@ static int mgs_write_log_mdt0(struct obd_device *obd, struct fs_db *fsdb,
         rc = record_mount_opt(obd, llh, log, lovname, NULL);
         rc = record_setup(obd, llh, log, uuid, mdt_index, lovname, 
                         failout ? "n" : "f");
+        rc = record_sptlrpc_conf(obd, llh, log, srpc_log);
         rc = record_marker(obd, llh, fsdb, CM_END, log, "add mdt");
         rc = record_end_log(obd, &llh);
 out:
         name_destroy(&lovname);
         OBD_FREE(uuid, sizeof(struct obd_uuid));
+out_srpc:
+        sptlrpc_conf_log_free(srpc_log);
         RETURN(rc);
 }
 
@@ -1460,10 +1506,9 @@ out:
 static int mgs_write_log_mdt(struct obd_device *obd, struct fs_db *fsdb,
                               struct mgs_target_info *mti)
 {
-        char *cliname, *sec;
         struct llog_handle *llh = NULL;
+        char *cliname;
         struct temp_comp comp = { 0 };
-        struct sec_flavor_config sec_conf_mdt, sec_conf_cli;
         char mdt_index[9];
         int rc, i = 0;
         ENTRY;
@@ -1492,19 +1537,6 @@ static int mgs_write_log_mdt(struct obd_device *obd, struct fs_db *fsdb,
                          "%s_UUID", mti->mti_svname);
         }
 
-        /* security flavor */
-        extract_sec_flavor(mti->mti_params, PARAM_SEC_RPC_MDT, &sec);
-        rc = sptlrpc_parse_flavor(LUSTRE_MDT, LUSTRE_MDT, sec, &sec_conf_mdt);
-        name_destroy(&sec);
-        if (rc)
-                RETURN(rc);
-
-        extract_sec_flavor(mti->mti_params, PARAM_SEC_RPC_CLI, &sec);
-        rc = sptlrpc_parse_flavor(LUSTRE_CLI, LUSTRE_MDT, sec, &sec_conf_cli);
-        name_destroy(&sec);
-        if (rc)
-                RETURN(rc);
-
         /* add mdt */
         rc = mgs_write_log_mdt0(obd, fsdb, mti);
         
@@ -1559,8 +1591,8 @@ static int mgs_write_log_mdt(struct obd_device *obd, struct fs_db *fsdb,
                 rc = mgs_steal_llog_for_mdt_from_client(obd, cliname, 
                                                         &comp);
 
-                rc = mgs_write_log_mdc_to_lmv(obd, fsdb, mti, &sec_conf_cli,
-                                              cliname, fsdb->fsdb_clilmv);
+                rc = mgs_write_log_mdc_to_lmv(obd, fsdb, mti, cliname,
+                                              fsdb->fsdb_clilmv);
                 /* add mountopts */
                 rc = record_start_log(obd, &llh, cliname);
                 if (rc) 
@@ -1586,8 +1618,7 @@ out:
                         sprintf(mdt_index,"-MDT%04x",i);
                         
                         name_create(&mdtname, mti->mti_fsname, mdt_index);
-                        rc = mgs_write_log_mdc_to_mdt(obd, fsdb, mti,
-                                                      &sec_conf_mdt, mdtname);
+                        rc = mgs_write_log_mdc_to_mdt(obd, fsdb, mti, mdtname);
                         name_destroy(&mdtname);
                 }
         }
@@ -1599,10 +1630,10 @@ out:
 static int mgs_write_log_osc_to_lov(struct obd_device *obd, struct fs_db *fsdb,
                                     struct mgs_target_info *mti,
                                     char *logname, char *suffix, char *lovname,
-                                    struct sec_flavor_config *sec_conf,
-                                    int flags)
+                                    enum lustre_sec_part sec_part, int flags)
 {
         struct llog_handle *llh = NULL;
+        struct sptlrpc_conf_log *srpc_log;
         char *nodeuuid, *oscname, *oscuuid, *lovuuid, *svname;
         char index[5];
         int i, rc;
@@ -1611,6 +1642,16 @@ static int mgs_write_log_osc_to_lov(struct obd_device *obd, struct fs_db *fsdb,
         CDEBUG(D_INFO, "adding osc for %s to log %s\n",
                mti->mti_svname, logname);
         
+        srpc_log = sptlrpc_conf_log_alloc();
+        if (IS_ERR(srpc_log))
+                RETURN(PTR_ERR(srpc_log));
+        srpc_log->scl_part = sec_part;
+
+        rc = mgs_get_srpc_conf_log(fsdb, mti->mti_svname,
+                                   sec_part, LUSTRE_SP_OST, srpc_log);
+        if (rc)
+                goto out_srpc;
+
         if (mgs_log_is_empty(obd, logname)) {
                 /* The first item in the log must be the lov, so we have
                    somewhere to add our osc. */
@@ -1647,7 +1688,7 @@ static int mgs_write_log_osc_to_lov(struct obd_device *obd, struct fs_db *fsdb,
         }
         rc = record_attach(obd, llh, oscname, LUSTRE_OSC_NAME, lovuuid);
         rc = record_setup(obd, llh, oscname, mti->mti_uuid, nodeuuid, 0, 0);
-        rc = record_sec_flavor(obd, llh, oscname, sec_conf);
+        rc = record_sptlrpc_conf(obd, llh, oscname, srpc_log);
         rc = mgs_write_log_failnids(obd, mti, llh, oscname);
         snprintf(index, sizeof(index), "%d", mti->mti_stripe_index);
         rc = record_lov_add(obd, llh, lovname, mti->mti_uuid, index, "1");
@@ -1660,6 +1701,8 @@ out:
         name_destroy(&oscname);
         name_destroy(&svname);
         name_destroy(&nodeuuid);
+out_srpc:
+        sptlrpc_conf_log_free(srpc_log);
         RETURN(rc);
 }
 
@@ -1667,10 +1710,10 @@ static int mgs_write_log_ost(struct obd_device *obd, struct fs_db *fsdb,
                              struct mgs_target_info *mti)
 {
         struct llog_handle *llh = NULL;
-        char *logname, *lovname, *sec;
+        struct sptlrpc_conf_log *srpc_log;
+        char *logname, *lovname;
         char mdt_index[9];
         char *ptr = mti->mti_params;
-        struct sec_flavor_config sec_conf_mdt, sec_conf_cli;
         int rc, flags = 0, failout = 0, i;
         ENTRY;
         
@@ -1689,18 +1732,15 @@ static int mgs_write_log_ost(struct obd_device *obd, struct fs_db *fsdb,
                 RETURN(-EALREADY);
         }
 
-        /* security flavors */
-        extract_sec_flavor(mti->mti_params, PARAM_SEC_RPC_MDT, &sec);
-        rc = sptlrpc_parse_flavor(LUSTRE_MDT, LUSTRE_OST, sec, &sec_conf_mdt);
-        name_destroy(&sec);
-        if (rc)
-                RETURN(rc);
+        srpc_log = sptlrpc_conf_log_alloc();
+        if (IS_ERR(srpc_log))
+                RETURN(PTR_ERR(srpc_log));
+        srpc_log->scl_part = LUSTRE_SP_OST;
 
-        extract_sec_flavor(mti->mti_params, PARAM_SEC_RPC_CLI, &sec);
-        rc = sptlrpc_parse_flavor(LUSTRE_CLI, LUSTRE_OST, sec, &sec_conf_cli);
-        name_destroy(&sec);
+        rc = mgs_get_srpc_conf_log(fsdb, mti->mti_svname,
+                                   LUSTRE_SP_ANY, LUSTRE_SP_OST, srpc_log);
         if (rc)
-                RETURN(rc);
+                goto out_srpc;
 
         /*
         attach obdfilter ost1 ost1_UUID
@@ -1721,6 +1761,7 @@ static int mgs_write_log_ost(struct obd_device *obd, struct fs_db *fsdb,
         rc = record_setup(obd, llh, mti->mti_svname,
                           "dev"/*ignored*/, "type"/*ignored*/,
                           failout ? "n" : "f", 0/*options*/);
+        rc = record_sptlrpc_conf(obd, llh, mti->mti_svname, srpc_log);
         rc = record_marker(obd, llh, fsdb, CM_END, mti->mti_svname, "add ost"); 
         rc = record_end_log(obd, &llh);
 
@@ -1751,7 +1792,7 @@ static int mgs_write_log_ost(struct obd_device *obd, struct fs_db *fsdb,
                         name_create(&lovname, logname, "-mdtlov");
                         mgs_write_log_osc_to_lov(obd, fsdb, mti, logname,
                                                  mdt_index, lovname,
-                                                 &sec_conf_mdt, flags);
+                                                 LUSTRE_SP_MDT, flags);
                         name_destroy(&logname);
                         name_destroy(&lovname);
                 }
@@ -1760,9 +1801,10 @@ static int mgs_write_log_ost(struct obd_device *obd, struct fs_db *fsdb,
         /* Append ost info to the client log */
         name_create(&logname, mti->mti_fsname, "-client");
         mgs_write_log_osc_to_lov(obd, fsdb, mti, logname, "",
-                                 fsdb->fsdb_clilov, &sec_conf_cli, 0);
+                                 fsdb->fsdb_clilov, LUSTRE_SP_CLI, 0);
         name_destroy(&logname);
-        
+out_srpc:
+        sptlrpc_conf_log_free(srpc_log);
         RETURN(rc);
 }
 
@@ -1865,6 +1907,662 @@ static int mgs_wlp_lcfg(struct obd_device *obd, struct fs_db *fsdb,
         return rc;
 }
 
+/*
+ * populate rules which applied to a target device
+ */
+static int mgs_get_srpc_conf_log(struct fs_db *fsdb, const char *tgt,
+                                 enum lustre_sec_part from,
+                                 enum lustre_sec_part to,
+                                 struct sptlrpc_conf_log *log)
+{
+        struct mgs_tgt_srpc_conf *tgtconf;
+        struct sptlrpc_rule_set  *tgt_rset;
+        int                       found_tgt = 0, rc;
+
+        for (tgtconf = fsdb->fsdb_srpc_tgt; tgtconf;
+             tgtconf = tgtconf->mtsc_next) {
+                if (!strcmp(tgt, tgtconf->mtsc_tgt)) {
+                        found_tgt = 1;
+                        break;
+                }
+        }
+
+        if (found_tgt)
+                tgt_rset = &tgtconf->mtsc_rset;
+        else
+                tgt_rset = NULL;
+
+        rc = sptlrpc_conf_log_populate(&fsdb->fsdb_srpc_gen, tgt_rset,
+                                       from, to, fsdb->fsdb_srpc_fl_udesc, log);
+        if (rc)
+                CERROR("failed to populate srpc log for %s: %d\n", tgt, rc);
+
+        return rc;
+}
+
+struct mgs_msl_data {
+        struct obd_device      *mmd_obd;
+        struct fs_db           *mmd_fsdb;
+        struct mgs_target_info *mmd_mti;
+        int                     mmd_skip;
+        int                     mmd_attached;
+        int                     mmd_server;
+        enum lustre_sec_part    mmd_tgtpart;
+        char                    mmd_tgtname[MTI_NAME_MAXLEN];
+};
+
+static void mgs_msl_data_cleanup(struct mgs_msl_data *mmd)
+{
+        mmd->mmd_attached = 0;
+        mmd->mmd_tgtname[0] = '\0';
+}
+
+static int mgs_msl_tgt_uuid2name(char *tgtname, char *tgtuuid)
+{
+        char    *ptr;
+
+        if (tgtuuid == NULL) {
+                CERROR("missing target UUID???\n");
+                return -EINVAL;
+        }
+
+        ptr = strstr(tgtuuid, "_UUID");
+        if (ptr == NULL) {
+                CERROR("unrecognized UUID: %s\n", tgtuuid);
+                return -EINVAL;
+        }
+
+        *ptr = '\0';;
+        strncpy(tgtname, tgtuuid, MTI_NAME_MAXLEN);
+        tgtname[MTI_NAME_MAXLEN - 1] = '\0';
+
+        return 0;
+}
+
+static int mgs_modify_srpc_log_handler(struct llog_handle *llh,
+                                       struct llog_rec_hdr *rec, 
+                                       void *data)
+{
+        struct mgs_msl_data *mmd = (struct mgs_msl_data *)data;
+        struct cfg_marker   *marker;
+        struct lustre_cfg   *lcfg = (struct lustre_cfg *)(rec + 1);
+        int                  cfg_len, rc;
+        ENTRY;
+
+        if (rec->lrh_type != OBD_CFG_REC) {
+                CERROR("unhandled lrh_type: %#x\n", rec->lrh_type);
+                RETURN(-EINVAL);
+        }
+
+        cfg_len = rec->lrh_len - sizeof(struct llog_rec_hdr) - 
+                  sizeof(struct llog_rec_tail);
+
+        rc = lustre_cfg_sanity_check(lcfg, cfg_len);
+        if (rc) {
+                CERROR("Insane cfg\n");
+                RETURN(rc);
+        }
+
+        if (lcfg->lcfg_command == LCFG_MARKER) {
+                marker = lustre_cfg_buf(lcfg, 1);
+
+                if (marker->cm_flags & CM_START &&
+                    marker->cm_flags & CM_SKIP)
+                        mmd->mmd_skip = 1;
+                if (marker->cm_flags & CM_END)
+                        mmd->mmd_skip = 0;
+
+                RETURN(0);
+        }
+
+        if (mmd->mmd_skip)
+                RETURN(0);
+
+        switch (lcfg->lcfg_command) {
+        case LCFG_ATTACH:
+                mmd->mmd_attached = 1;
+
+                if (!strcmp(lustre_cfg_string(lcfg, 1), LUSTRE_OST_NAME)) {
+                        mmd->mmd_server = 1;
+                        mmd->mmd_tgtpart = LUSTRE_SP_OST;
+                } else if (!strcmp(lustre_cfg_string(lcfg, 1),
+                                   LUSTRE_MDT_NAME)) {
+                        mmd->mmd_server = 1;
+                        mmd->mmd_tgtpart = LUSTRE_SP_MDT;
+                } else if (!strcmp(lustre_cfg_string(lcfg, 1),
+                                   LUSTRE_OSC_NAME)) {
+                        mmd->mmd_server = 0;
+                        mmd->mmd_tgtpart = LUSTRE_SP_OST;
+                } else if (!strcmp(lustre_cfg_string(lcfg, 1),
+                                   LUSTRE_MDC_NAME)) {
+                        mmd->mmd_server = 0;
+                        mmd->mmd_tgtpart = LUSTRE_SP_MDT;
+                } else {
+                        mmd->mmd_attached = 0;
+                }
+
+                if (mmd->mmd_attached && mmd->mmd_server) {
+                        rc = mgs_msl_tgt_uuid2name(mmd->mmd_tgtname,
+                                                   lustre_cfg_string(lcfg, 2));
+                        if (rc) {
+                                mgs_msl_data_cleanup(mmd);
+                                break;
+                        }
+                }
+
+                break;
+        case LCFG_SETUP:
+                if (!mmd->mmd_attached)
+                        break;
+
+                /* already got tgtname at LCFG_ATTACH */
+                if (mmd->mmd_server)
+                        break;
+
+                rc = mgs_msl_tgt_uuid2name(mmd->mmd_tgtname,
+                                           lustre_cfg_string(lcfg, 1));
+                if (rc) {
+                        mgs_msl_data_cleanup(mmd);
+                        break;
+                }
+
+                break;
+        case LCFG_SPTLRPC_CONF: {
+                struct sptlrpc_conf_log *log;
+                enum lustre_sec_part from;
+
+                if (!mmd->mmd_attached)
+                        break;
+
+                log = sptlrpc_conf_log_extract(lcfg);
+                if (log == NULL) {
+                        CERROR("missing sptlrpc config log???\n");
+                        mgs_msl_data_cleanup(mmd);
+                        break;
+                }
+
+                if (mmd->mmd_server)
+                        from = LUSTRE_SP_ANY;
+                else
+                        from = log->scl_part;
+
+                /* cleanup the old log */
+                sptlrpc_conf_log_cleanup(log);
+
+                /* populate new log */
+                rc = mgs_get_srpc_conf_log(mmd->mmd_fsdb, mmd->mmd_tgtname,
+                                           from, mmd->mmd_tgtpart, log);
+                if (rc) {
+                        mgs_msl_data_cleanup(mmd);
+                        break;
+                }
+
+                /* Overwrite the log */
+                rec->lrh_len = cfg_len; 
+                rc = llog_write_rec(llh, rec, NULL, 0, (void *)lcfg, 
+                                    rec->lrh_index);
+                if (rc)
+                        CERROR("overwrite sptlrpc conf log failed: %d\n", rc);
+
+                /* append new one */
+                rc = record_marker(mmd->mmd_obd, llh, mmd->mmd_fsdb, CM_START,
+                                   mmd->mmd_mti->mti_svname, "sptlrpc config");
+                rc = record_sptlrpc_conf(mmd->mmd_obd, llh,
+                                         lustre_cfg_string(lcfg, 0), log);
+                rc = record_marker(mmd->mmd_obd, llh, mmd->mmd_fsdb, CM_END,
+                                   mmd->mmd_mti->mti_svname, "sptlrpc config");
+
+                mgs_msl_data_cleanup(mmd);
+                break;
+        }
+        default:
+                /* ignore all others */
+                break;
+        }
+
+        RETURN(rc);
+}
+
+static int mgs_modify_srpc_log(struct obd_device *obd,
+                               struct fs_db *fsdb,
+                               struct mgs_target_info *mti,
+                               char *logname)
+{
+        struct llog_handle   *llh;
+        struct lvfs_run_ctxt  saved;
+        struct mgs_msl_data  *mmd;
+        int rc, rc2;
+        ENTRY;
+
+        CDEBUG(D_MGS, "modify sptlrpc log for %s\n", logname);
+
+        push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
+        
+        rc = llog_create(llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT),
+                         &llh, NULL, logname);
+        if (rc)
+                GOTO(out_pop, rc);
+
+        rc = llog_init_handle(llh, LLOG_F_IS_PLAIN, NULL);
+        if (rc)
+                GOTO(out_close, rc);
+
+        if (llog_get_size(llh) <= 1)
+                GOTO(out_close, rc = 0);
+
+        OBD_ALLOC_PTR(mmd);
+        if (!mmd) 
+                GOTO(out_close, rc = -ENOMEM);
+
+        mmd->mmd_obd = obd;
+        mmd->mmd_fsdb = fsdb;
+        mmd->mmd_mti = mti;
+
+        rc = llog_process(llh, mgs_modify_srpc_log_handler, (void *) mmd, NULL);
+
+        OBD_FREE_PTR(mmd);
+
+out_close:
+        rc2 = llog_close(llh);
+        if (!rc)
+                rc = rc2;
+
+out_pop:
+        pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
+        if (rc) 
+                CERROR("modify sptlrpc log %s failed %d\n", logname, rc);
+
+        RETURN(rc);
+}
+
+/*
+ * for each of log, remove old conf at first
+ */
+static int mgs_modify_srpc_log_all(struct obd_device *obd,
+                                   struct fs_db *fsdb,
+                                   struct mgs_target_info *mti)
+{
+        char             tgt_index[9];
+        char            *logname;
+        int              i, rc = 0, rc2;
+        ENTRY;
+
+        for (i = 0; i < INDEX_MAP_SIZE * 8; i++){
+                if (test_bit(i,  fsdb->fsdb_mdt_index_map)) {
+                        sprintf(tgt_index,"-MDT%04x",i);
+
+                        name_create(&logname, mti->mti_fsname, tgt_index);
+                        rc2 = mgs_modify(obd, fsdb, mti, logname,
+                                         mti->mti_fsname, "sptlrpc config",
+                                         CM_SKIP);
+                        rc2 = mgs_modify_srpc_log(obd, fsdb, mti, logname);
+                        name_destroy(&logname);
+
+                        if (rc2 && rc == 0)
+                                rc = rc2;
+                }
+        }
+
+        for (i = 0; i < INDEX_MAP_SIZE * 8; i++){
+                if (test_bit(i,  fsdb->fsdb_ost_index_map)) {
+                        sprintf(tgt_index,"-OST%04x",i);
+
+                        name_create(&logname, mti->mti_fsname, tgt_index);
+                        rc2 = mgs_modify(obd, fsdb, mti, logname,
+                                         mti->mti_fsname, "sptlrpc config",
+                                         CM_SKIP);
+                        rc2 = mgs_modify_srpc_log(obd, fsdb, mti, logname);
+                        name_destroy(&logname);
+
+                        if (rc2 && rc == 0)
+                                rc = rc2;
+                }
+        }
+
+        name_create(&logname, mti->mti_fsname, "-client");
+        rc2 = mgs_modify(obd, fsdb, mti, logname,
+                         mti->mti_fsname, "sptlrpc config", CM_SKIP);
+        rc2 = mgs_modify_srpc_log(obd, fsdb, mti, logname);
+        name_destroy(&logname);
+
+        if (rc2 && rc == 0)
+                rc = rc2;
+
+        RETURN(rc);
+}
+
+static int mgs_srpc_set_param_disk(struct obd_device *obd,
+                                   struct fs_db *fsdb,
+                                   struct mgs_target_info *mti,
+                                   char *param)
+{
+        struct llog_handle     *llh = NULL;
+        char                   *logname;
+        char                   *comment, *ptr;
+        struct lustre_cfg_bufs  bufs;
+        struct lustre_cfg      *lcfg;
+        int                     rc, len;
+        ENTRY;
+
+        /* get comment */
+        ptr = strchr(param, '=');
+        LASSERT(ptr);
+        len = ptr - param;
+
+        OBD_ALLOC(comment, len + 1);
+        if (comment == NULL)
+                RETURN(-ENOMEM);
+        strncpy(comment, param, len);
+        comment[len] = '\0';
+
+        /* prepare lcfg */
+        lustre_cfg_bufs_reset(&bufs, mti->mti_svname);
+        lustre_cfg_bufs_set_string(&bufs, 1, param);
+        lcfg = lustre_cfg_new(0, &bufs);
+        if (lcfg == NULL)
+                GOTO(out_comment, rc = -ENOMEM);
+
+        /* construct log name */
+        rc = name_create(&logname, mti->mti_fsname, "-sptlrpc");
+        if (rc)
+                GOTO(out_lcfg, rc);
+
+        if (mgs_log_is_empty(obd, logname)) {
+                rc = record_start_log(obd, &llh, logname);
+                record_end_log(obd, &llh);
+                if (rc) 
+                        GOTO(out, rc);
+        }
+
+        /* obsolete old one */
+        mgs_modify(obd, fsdb, mti, logname, mti->mti_svname, comment, CM_SKIP);
+
+        /* write the new one */
+        rc = mgs_write_log_direct(obd, fsdb, logname, lcfg,
+                                  mti->mti_svname, comment);
+        if (rc)
+                CERROR("err %d writing log %s\n", rc, logname);
+
+out:
+        name_destroy(&logname);
+out_lcfg:
+        lustre_cfg_free(lcfg);
+out_comment:
+        OBD_FREE(comment, len + 1);
+        RETURN(rc);
+}
+
+static int mgs_srpc_set_param_udesc_mem(struct fs_db *fsdb,
+                                        char *param)
+{
+        char    *ptr;
+
+        /* disable the adjustable udesc parameter for now, i.e. use default
+         * setting that client always ship udesc to MDT if possible. to enable
+         * it simply remove the following line */
+        goto error_out;
+
+        ptr = strchr(param, '=');
+        if (ptr == NULL)
+                goto error_out;
+        *ptr++ = '\0';
+
+        if (strcmp(param, PARAM_SRPC_UDESC))
+                goto error_out;
+
+        if (strcmp(ptr, "yes") == 0) {
+                fsdb->fsdb_srpc_fl_udesc = 1;
+                CWARN("Enable user descriptor shipping from client to MDT\n");
+        } else if (strcmp(ptr, "no") == 0) {
+                fsdb->fsdb_srpc_fl_udesc = 0;
+                CWARN("Disable user descriptor shipping from client to MDT\n");
+        } else {
+                *(ptr - 1) = '=';
+                goto error_out;
+        }
+        return 0;
+
+error_out:
+        CERROR("Invalid param: %s\n", param);
+        return -EINVAL;
+}
+
+static int mgs_srpc_set_param_mem(struct fs_db *fsdb,
+                                  const char *svname,
+                                  char *param)
+{
+        struct sptlrpc_rule      rule;
+        struct sptlrpc_rule_set *rset;
+        int                      rc;
+        ENTRY;
+
+        if (strncmp(param, PARAM_SRPC, sizeof(PARAM_SRPC) - 1) != 0) {
+                CERROR("Invalid sptlrpc parameter: %s\n", param);
+                RETURN(-EINVAL);
+        }
+
+        if (strncmp(param, PARAM_SRPC_UDESC,
+                    sizeof(PARAM_SRPC_UDESC) - 1) == 0) {
+                RETURN(mgs_srpc_set_param_udesc_mem(fsdb, param));
+        }
+
+        if (strncmp(param, PARAM_SRPC_FLVR, sizeof(PARAM_SRPC_FLVR) - 1) != 0) {
+                CERROR("Invalid sptlrpc flavor parameter: %s\n", param);
+                RETURN(-EINVAL);
+        }
+
+        param += sizeof(PARAM_SRPC_FLVR) - 1;
+
+        rc = sptlrpc_parse_rule(param, &rule);
+        if (rc)
+                RETURN(rc);
+
+        /* preapre room for this coming rule. svcname format should be:
+         * - fsname: general rule
+         * - fsname-tgtname: target-specific rule
+         */
+        if (strchr(svname, '-')) {
+                struct mgs_tgt_srpc_conf *tgtconf;
+                int                       found = 0;
+
+                for (tgtconf = fsdb->fsdb_srpc_tgt; tgtconf != NULL;
+                     tgtconf = tgtconf->mtsc_next) {
+                        if (!strcmp(tgtconf->mtsc_tgt, svname)) {
+                                found = 1;
+                                break;
+                        }
+                }
+
+                if (!found) {
+                        int name_len;
+
+                        OBD_ALLOC_PTR(tgtconf);
+                        if (tgtconf == NULL)
+                                RETURN(-ENOMEM);
+
+                        name_len = strlen(svname);
+
+                        OBD_ALLOC(tgtconf->mtsc_tgt, name_len + 1);
+                        if (tgtconf->mtsc_tgt == NULL) {
+                                OBD_FREE_PTR(tgtconf);
+                                RETURN(-ENOMEM);
+                        }
+                        memcpy(tgtconf->mtsc_tgt, svname, name_len);
+
+                        tgtconf->mtsc_next = fsdb->fsdb_srpc_tgt;
+                        fsdb->fsdb_srpc_tgt = tgtconf;
+                }
+
+                rset = &tgtconf->mtsc_rset;
+        } else {
+                rset = &fsdb->fsdb_srpc_gen;
+        }
+
+        /* limit the maximum number of rules, but allow deletion in any case */
+        if (rset->srs_nrule >= SPTLRPC_CONF_LOG_MAX / 2 &&
+            rule.sr_flvr.sf_rpc != SPTLRPC_FLVR_INVALID) {
+                CERROR("too many (%d) rules already for %s\n",
+                       rset->srs_nrule, svname);
+                RETURN(-E2BIG);
+        }
+
+        rc = sptlrpc_rule_set_merge(rset, &rule, 1);
+
+        RETURN(rc);
+}
+
+static int mgs_srpc_set_param(struct obd_device *obd,
+                              struct fs_db *fsdb,
+                              struct mgs_target_info *mti,
+                              char *param)
+{
+        char                    *copy;
+        int                      rc, copy_size;
+        ENTRY;
+
+        /* keep a copy of original param, which could be destroied
+         * during parsing */
+        copy_size = strlen(param) + 1;
+        OBD_ALLOC(copy, copy_size);
+        if (copy == NULL)
+                return -ENOMEM;
+        memcpy(copy, param, copy_size);
+
+        rc = mgs_srpc_set_param_mem(fsdb, mti->mti_svname, param);
+        if (rc)
+                goto out_free;
+
+        /* previous steps guaranteed the syntax is correct */
+        rc = mgs_srpc_set_param_disk(obd, fsdb, mti, copy);
+        if (rc)
+                goto out_free;
+
+        /* now apply the new rules to all existing config logs */
+        rc = mgs_modify_srpc_log_all(obd, fsdb, mti);
+
+out_free:
+        OBD_FREE(copy, copy_size);
+        RETURN(rc);
+}
+
+struct mgs_srpc_read_data {
+        struct fs_db   *msrd_fsdb;
+        int             msrd_skip;
+};
+
+static int mgs_srpc_read_handler(struct llog_handle *llh,
+                                 struct llog_rec_hdr *rec, 
+                                 void *data)
+{
+        struct mgs_srpc_read_data *msrd = (struct mgs_srpc_read_data *) data;
+        struct cfg_marker         *marker;
+        struct lustre_cfg         *lcfg = (struct lustre_cfg *)(rec + 1);
+        char                      *svname, *param;
+        int                        cfg_len, rc;
+        ENTRY;
+
+        if (rec->lrh_type != OBD_CFG_REC) {
+                CERROR("unhandled lrh_type: %#x\n", rec->lrh_type);
+                RETURN(-EINVAL);
+        }
+
+        cfg_len = rec->lrh_len - sizeof(struct llog_rec_hdr) - 
+                  sizeof(struct llog_rec_tail);
+
+        rc = lustre_cfg_sanity_check(lcfg, cfg_len);
+        if (rc) {
+                CERROR("Insane cfg\n");
+                RETURN(rc);
+        }
+
+        if (lcfg->lcfg_command == LCFG_MARKER) {
+                marker = lustre_cfg_buf(lcfg, 1);
+
+                if (marker->cm_flags & CM_START &&
+                    marker->cm_flags & CM_SKIP)
+                        msrd->msrd_skip = 1;
+                if (marker->cm_flags & CM_END)
+                        msrd->msrd_skip = 0;
+
+                RETURN(0);
+        }
+
+        if (msrd->msrd_skip)
+                RETURN(0);
+
+        if (lcfg->lcfg_command != 0) {
+                CERROR("invalid command (%x)\n", lcfg->lcfg_command);
+                RETURN(0);
+        }
+
+        svname = lustre_cfg_string(lcfg, 0);
+        if (svname == NULL) {
+                CERROR("svname is empty\n");
+                RETURN(0);
+        }
+
+        param = lustre_cfg_string(lcfg, 1);
+        if (param == NULL) {
+                CERROR("param is empty\n");
+                RETURN(0);
+        }
+
+        rc = mgs_srpc_set_param_mem(msrd->msrd_fsdb, svname, param);
+        if (rc)
+                CERROR("read sptlrpc record error (%d): %s\n", rc, param);
+
+        RETURN(0);
+}
+
+static int mgs_get_fsdb_srpc_from_llog(struct obd_device *obd,
+                                       struct fs_db *fsdb)
+{
+        struct llog_handle        *llh = NULL;
+        struct lvfs_run_ctxt       saved;
+        char                      *logname;
+        struct mgs_srpc_read_data  msrd;
+        int                        rc;
+        ENTRY;
+
+        /* construct log name */
+        rc = name_create(&logname, fsdb->fsdb_name, "-sptlrpc");
+        if (rc)
+                RETURN(rc);
+
+        if (mgs_log_is_empty(obd, logname))
+                GOTO(out, rc = 0);
+
+        push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
+
+        rc = llog_create(llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT),
+                         &llh, NULL, logname);
+        if (rc)
+                GOTO(out_pop, rc);
+
+        rc = llog_init_handle(llh, LLOG_F_IS_PLAIN, NULL);
+        if (rc)
+                GOTO(out_close, rc);
+
+        if (llog_get_size(llh) <= 1)
+                GOTO(out_close, rc = 0);
+
+        msrd.msrd_fsdb = fsdb;
+        msrd.msrd_skip = 0;
+
+        rc = llog_process(llh, mgs_srpc_read_handler, (void *) &msrd, NULL);
+
+out_close:
+        llog_close(llh);
+out_pop:
+        pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
+out:
+        name_destroy(&logname);
+
+        if (rc)
+                CERROR("failed to read sptlrpc config database: %d\n", rc);
+        RETURN(rc);
+}
+
 static int mgs_write_log_params(struct obd_device *obd, struct fs_db *fsdb,
                                 struct mgs_target_info *mti)
 {
@@ -1908,10 +2606,11 @@ static int mgs_write_log_params(struct obd_device *obd, struct fs_db *fsdb,
                         }
                         GOTO(end_while, rc);
                 }
-                /* Processed in mgs_write_log_mdt/mgs_write_log_ost */
-                if (class_match_param(ptr, PARAM_SEC_RPC_MDT, NULL) == 0 ||
-                    class_match_param(ptr, PARAM_SEC_RPC_CLI, NULL) == 0)
+
+                if (class_match_param(ptr, PARAM_SRPC, NULL) == 0) {
+                        rc = mgs_srpc_set_param(obd, fsdb, mti, ptr);
                         GOTO(end_while, rc);
+                }
 
                 if (class_match_param(ptr, PARAM_FAILNODE, NULL) == 0) {
                         /* Add a failover nidlist */