Whamcloud - gitweb
- landed b_hd_mdref (mostly WB cache fixes)
authoryury <yury>
Mon, 8 Aug 2005 18:40:52 +0000 (18:40 +0000)
committeryury <yury>
Mon, 8 Aug 2005 18:40:52 +0000 (18:40 +0000)
68 files changed:
lustre/cmobd/cm_internal.h
lustre/cmobd/cm_mds_reint.c
lustre/cmobd/cm_obd.c
lustre/cmobd/cm_oss_reint.c
lustre/cmobd/cm_reint.c
lustre/cobd/cache_obd.c
lustre/include/linux/lustre_cmobd.h
lustre/include/linux/lustre_dlm.h
lustre/include/linux/lustre_fsfilt.h
lustre/include/linux/lustre_idl.h
lustre/include/linux/lustre_lib.h
lustre/include/linux/lustre_mds.h
lustre/include/linux/lustre_mgmt.h
lustre/include/linux/lustre_net.h
lustre/include/linux/lustre_smfs.h
lustre/include/linux/obd.h
lustre/include/linux/obd_cache.h
lustre/include/linux/obd_class.h
lustre/include/linux/obd_echo.h
lustre/include/linux/obd_lmv.h
lustre/include/linux/obd_lov.h
lustre/include/linux/obd_ost.h
lustre/include/linux/obd_ptlbd.h
lustre/ldlm/ldlm_lib.c
lustre/llite/dcache.c
lustre/llite/llite_close.c
lustre/llite/llite_gns.c
lustre/llite/llite_internal.h
lustre/llite/llite_lib.c
lustre/lmv/lmv_obd.c
lustre/lov/lov_obd.c
lustre/lvfs/fsfilt_ext3.c
lustre/lvfs/lvfs_linux.c
lustre/mdc/mdc_internal.h
lustre/mdc/mdc_lib.c
lustre/mdc/mdc_request.c
lustre/mds/handler.c
lustre/mds/mds_fs.c
lustre/mds/mds_internal.h
lustre/mds/mds_lib.c
lustre/mds/mds_lmv.c
lustre/mds/mds_open.c
lustre/mds/mds_reint.c
lustre/obdclass/obd_config.c
lustre/obdfilter/filter.c
lustre/obdfilter/filter_internal.h
lustre/osc/osc_request.c
lustre/ptlrpc/pack_generic.c
lustre/ptlrpc/ptlrpc_module.c
lustre/sec/gss/sec_gss.c
lustre/sec/sec.c
lustre/smfs/dir.c
lustre/smfs/file.c
lustre/smfs/fsfilt.c
lustre/smfs/kml.c
lustre/smfs/mds_kml.c
lustre/smfs/ost_kml.c
lustre/smfs/smfs_api.h
lustre/smfs/smfs_internal.h
lustre/smfs/smfs_lib.c
lustre/smfs/smfs_llog.c
lustre/tests/cmobd.sh
lustre/tests/lmv.sh
lustre/tests/test45-mountain.sh [new file with mode: 0644]
lustre/tests/test45.sh [new file with mode: 0644]
lustre/utils/lconf
lustre/utils/lmc
lustre/utils/obd.c

index fabdb0b..987eae3 100644 (file)
@@ -35,13 +35,6 @@ void cmobd_cleanup_write_srv(struct obd_device *);
 int cmobd_reint_mds(struct obd_device*obd, void *record, int opcode);
 int cmobd_reint_oss(struct obd_device *obd, void *record, int opcode);
 
-/* methods for updating/reading master lustre_id from local MDS inode EA.*/
-int mds_update_mid(struct obd_device *obd, struct lustre_id *id,
-                   void *data, int data_len);
-
-int mds_read_mid(struct obd_device *obd, struct lustre_id *id,
-                 void *data, int data_len);
-
 int mds_read_md(struct obd_device *obd, struct lustre_id *id, 
                 char **data, int *datalen);
 #endif /* CM_INTERNAL_H */
index c51923e..34f51a5 100644 (file)
@@ -49,7 +49,7 @@ static inline void cmobd_rec2iattr(struct mds_rec_setattr *rec,
 static void
 cmobd_prepare_mdc_data(struct mdc_op_data *data, struct lustre_id *id1,
                        struct lustre_id *id2, const char *name,
-                       int namelen, __u32 mode)
+                       int namelen, __u32 mode, __u32 flags)
 {
         LASSERT(id1);
         LASSERT(data);
@@ -59,14 +59,18 @@ cmobd_prepare_mdc_data(struct mdc_op_data *data, struct lustre_id *id1,
         data->id1 = *id1;
         if (id2)
                 data->id2 = *id2;
-       else
-               memset(&data->id2, 0, sizeof(data->id2));
 
        data->valid = 0;
         data->name = name;
+       data->flags = flags;
         data->namelen = namelen;
         data->create_mode = mode;
         data->mod_time = LTIME_S(CURRENT_TIME);
+
+        /* zeroing out store cookie, as it makes no sense on master MDS and may
+         * also confuse it as may be considered as recovery case. */
+        memset(&data->id1.li_stc, 0, sizeof(data->id1.li_stc));
+        memset(&data->id2.li_stc, 0, sizeof(data->id2.li_stc));
 }
 
 /* If mdc_setattr() is called with an 'iattr', then it is a normal RPC that
@@ -102,20 +106,11 @@ static int cmobd_reint_setattr(struct obd_device *obd, void *record)
         /* FIXME-UMKA: here should be handling of setattr() from open. Bug
          * #249. Will be fixed later. */
 
-        /* converting localstore cookie to remote lustre_id. */
-        rc = mds_read_mid(cmobd->cache_exp->exp_obd, &rec->sa_id,
-                          &rec->sa_id, sizeof(rec->sa_id));
-        if (rc) {
-                CERROR("Can't read master MDS store cookie "
-                       "from local inode EA, err = %d.\n", rc);
-                RETURN(rc);
-        }
-
         OBD_ALLOC(op_data, sizeof(*op_data));
         if (op_data == NULL)
                 RETURN(-ENOMEM);
         cmobd_prepare_mdc_data(op_data, &rec->sa_id, NULL,
-                               NULL, 0, 0);
+                               NULL, 0, 0, MDS_REINT_REQ);
 
         /* handling possible EAs. */
         ea1 = lustre_msg_buf(msg, 1, 0);
@@ -138,12 +133,10 @@ static int cmobd_reint_create(struct obd_device *obd, void *record)
         struct cm_obd *cmobd = &obd->u.cm;
         struct ptlrpc_request *req = NULL;
         struct mds_kml_pack_info *mkpi;
-        int rc = 0, namelen, datalen, alloc = 0;
-        struct mds_rec_create *rec;
+        int rc = 0, namelen, datalen;
         struct mdc_op_data *op_data;
+        struct mds_rec_create *rec;
         struct lustre_msg *msg;
-        struct mds_body *body;
-        struct lustre_id lid;
         char *name, *data;
         ENTRY;
 
@@ -153,17 +146,6 @@ static int cmobd_reint_create(struct obd_device *obd, void *record)
         rec = lustre_msg_buf(msg, 0, 0);
         if (!rec) 
                 RETURN(-EINVAL);
-        
-        lid = rec->cr_replayid;
-
-        /* converting local inode store cookie to remote lustre_id. */
-        rc = mds_read_mid(cmobd->cache_exp->exp_obd, &rec->cr_id,
-                          &rec->cr_id, sizeof(rec->cr_id));
-        if (rc) {
-                CERROR("Can't read master MDS store cookie "
-                       "from local inode EA, err = %d.\n", rc);
-                RETURN(rc);
-        }
 
         /* getting name to be created and its length */
         name = lustre_msg_string(msg, 1, 0);
@@ -176,36 +158,27 @@ static int cmobd_reint_create(struct obd_device *obd, void *record)
         OBD_ALLOC(op_data, sizeof(*op_data));
         if (op_data == NULL) 
                 GOTO(exit, rc = -ENOMEM);
+
+        /* XXX: here is the issue preventing LMV from being used as master
+         * device for flushing cache to it. It is allusive to the fact that
+         * cache MDS parent id with wrong group component is used for forwarding
+         * reint requests to some MDS from those LMV knows about. As group is
+         * wrong - LMV forwards reqs to wrong MDS. Do not know how to fix it
+         * yet.  --umka */
  
-        /* zeroing @rec->cr_replayid out in request, as master MDS should create
-         * own inode (with own store cookie). */
-        memset(&rec->cr_replayid, 0, sizeof(rec->cr_replayid));
-       
         /* prepare mdc request data. */
         cmobd_prepare_mdc_data(op_data, &rec->cr_id, &rec->cr_replayid,
-                               name, namelen, rec->cr_mode);
+                               name, namelen, rec->cr_mode, MDS_REINT_REQ);
 
         /* requesting to master to create object with passed attributes. */
         rc = md_create(cmobd->master_exp, op_data, data, datalen,
                        rec->cr_mode, current->fsuid, current->fsgid,
                        rec->cr_rdev, &req);
         OBD_FREE(op_data, sizeof(*op_data));
-
-        if (!rc) {
-                /* here we save store cookie from master MDS to local 
-                 * inode EA. */
-                body = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*body));
-
-                rc = mds_update_mid(cmobd->cache_exp->exp_obd, &lid,
-                                    &body->id1, sizeof(body->id1));
-        }
 exit:
         if (req)
                 ptlrpc_req_finished(req);
         
-        if (alloc == 1)
-                OBD_FREE(data, datalen);
-        
         RETURN(rc);
 }
 
@@ -228,15 +201,6 @@ static int cmobd_reint_unlink(struct obd_device *obd, void *record)
         if (!rec) 
                 RETURN(-EINVAL);
 
-        /* converting local store cookie to remote lustre_id. */
-        rc = mds_read_mid(cmobd->cache_exp->exp_obd, &rec->ul_id1,
-                          &rec->ul_id1, sizeof(rec->ul_id1));
-        if (rc) {
-                CERROR("Can't read master MDS store cookie "
-                       "from local inode EA, err = %d.\n", rc);
-                RETURN(rc);
-        }
-
         /* getting name to be created and its length */
         name = lustre_msg_string(msg, 1, 0);
         namelen = name ? msg->buflens[1] - 1 : 0;
@@ -247,7 +211,8 @@ static int cmobd_reint_unlink(struct obd_device *obd, void *record)
 
         /* prepare mdc request data. */
         cmobd_prepare_mdc_data(op_data, &rec->ul_id1, NULL,
-                               name, namelen, rec->ul_mode);
+                               name, namelen, rec->ul_mode,
+                               MDS_REINT_REQ);
 
         rc = md_unlink(cmobd->master_exp, op_data, &req);
         OBD_FREE(op_data, sizeof(*op_data));
@@ -276,23 +241,6 @@ static int cmobd_reint_link(struct obd_device *obd, void *record)
         if (!rec) 
                 RETURN(-EINVAL);
 
-        /* converting local store cookie for both ids to remote lustre_id. */
-        rc = mds_read_mid(cmobd->cache_exp->exp_obd, &rec->lk_id1,
-                          &rec->lk_id1, sizeof(rec->lk_id1));
-        if (rc) {
-                CERROR("Can't read master MDS store cookie "
-                       "from local inode EA, err = %d.\n", rc);
-                RETURN(rc);
-        }
-        
-        rc = mds_read_mid(cmobd->cache_exp->exp_obd, &rec->lk_id2,
-                          &rec->lk_id2, sizeof(rec->lk_id2));
-        if (rc) {
-                CERROR("Can't read master MDS store cookie "
-                       "from local inode EA, err = %d.\n", rc);
-                RETURN(rc);
-        }
-        
         /* getting name to be created and its length */
         name = lustre_msg_string(msg, 1, 0);
         namelen = name ? msg->buflens[1] - 1: 0;
@@ -303,7 +251,7 @@ static int cmobd_reint_link(struct obd_device *obd, void *record)
 
         /* prepare mdc request data. */
         cmobd_prepare_mdc_data(op_data, &rec->lk_id1, &rec->lk_id2,
-                               name, namelen, 0);
+                               name, namelen, 0, MDS_REINT_REQ);
 
         rc = md_link(cmobd->master_exp, op_data, &req);
         OBD_FREE(op_data, sizeof(*op_data));
@@ -331,23 +279,6 @@ static int cmobd_reint_rename(struct obd_device *obd, void *record)
         rec = lustre_msg_buf(msg, 0, 0);
         if (!rec) 
                 RETURN(-EINVAL);
-        
-        /* converting local store cookie for both ids to remote lustre_id. */
-        rc = mds_read_mid(cmobd->cache_exp->exp_obd, &rec->rn_id1,
-                          &rec->rn_id1, sizeof(rec->rn_id1));
-        if (rc) {
-                CERROR("Can't read master MDS store cookie "
-                       "from local inode EA, err = %d.\n", rc);
-                RETURN(rc);
-        }
-        
-        rc = mds_read_mid(cmobd->cache_exp->exp_obd, &rec->rn_id2,
-                          &rec->rn_id2, sizeof(rec->rn_id2));
-        if (rc) {
-                CERROR("Can't read master MDS store cookie "
-                       "from local inode EA, err = %d.\n", rc);
-                RETURN(rc);
-        }
 
         /* getting old name and its length */
         old = lustre_msg_string(msg, 1, 0);
@@ -363,7 +294,7 @@ static int cmobd_reint_rename(struct obd_device *obd, void *record)
         
         /* prepare mdc request data. */
         cmobd_prepare_mdc_data(op_data, &rec->rn_id1, &rec->rn_id1,
-                               NULL, 0, 0);
+                               NULL, 0, 0, MDS_REINT_REQ);
 
         rc = md_rename(cmobd->master_exp, op_data, old, oldlen,
                        new, newlen, &req);
index c3065df..fe384a9 100644 (file)
@@ -51,24 +51,6 @@ static int cmobd_detach(struct obd_device *obd)
         return lprocfs_obd_detach(obd);
 }
 
-static inline int cmobd_md_obd(struct obd_device *obd)
-{
-        if (!strcmp(obd->obd_type->typ_name, OBD_MDC_DEVICENAME) ||
-            !strcmp(obd->obd_type->typ_name, OBD_LMV_DEVICENAME))
-                return 1;
-
-        return 0;
-}
-
-static inline int cmobd_dt_obd(struct obd_device *obd)
-{
-        if (!strcmp(obd->obd_type->typ_name, OBD_LOV_DEVICENAME) ||
-            !strcmp(obd->obd_type->typ_name, OBD_OSC_DEVICENAME))
-                return 1;
-
-        return 0;
-}
-
 static int cmobd_init_dt_desc(struct obd_device *obd)
 {
         struct cm_obd *cmobd = &obd->u.cm;
@@ -139,8 +121,6 @@ static int cmobd_setup(struct obd_device *obd, obd_count len, void *buf)
         struct lustre_handle conn = { 0 };
         struct cm_obd *cmobd = &obd->u.cm;
         struct lustre_cfg* lcfg = buf;
-        struct lustre_id mid, lid;
-        __u32 valsize;
         int rc;
         ENTRY;
 
@@ -200,7 +180,7 @@ static int cmobd_setup(struct obd_device *obd, obd_count len, void *buf)
                 GOTO(put_cache, rc);
         }
 
-        if (cmobd_dt_obd(cmobd->master_exp->exp_obd)) {
+        if (obd_dt_type(cmobd->master_exp->exp_obd)) {
                 /* for master dt device remove the recovery flag. */
                 rc = obd_set_info(cmobd->master_exp, strlen("unrecovery"),
                                   "unrecovery", 0, NULL); 
@@ -212,7 +192,10 @@ static int cmobd_setup(struct obd_device *obd, obd_count len, void *buf)
                         GOTO(put_cache, rc);
         }
 
-        if (cmobd_md_obd(cmobd->master_exp->exp_obd)) {
+        if (obd_md_type(cmobd->master_exp->exp_obd)) {
+                __u32 size = sizeof(struct fid_extent);
+                struct fid_extent ext;
+                
                 rc = cmobd_init_ea_size(obd);
                 if (rc) {
                         CERROR("can't init MD layer EA size, "
@@ -221,38 +204,31 @@ static int cmobd_setup(struct obd_device *obd, obd_count len, void *buf)
                 }
                 cmobd->write_srv = NULL;
 
-                /* requesting master obd to have its root inode store cookie to
-                 * be able to save it to local root inode EA. */
-                valsize = sizeof(struct lustre_id);
-        
-                rc = obd_get_info(cmobd->master_exp, strlen("rootid"),
-                                  "rootid", &valsize, &mid);
+                /* getting fid pool from master to set it on cache */
+                rc = obd_get_info(cmobd->master_exp, strlen("getext"),
+                                  "getext", &size, &ext);
                 if (rc) {
-                        CERROR("can't get rootid from master MDS %s, "
-                               "err= %d.\n", master_uuid.uuid, rc);
+                        CERROR("can't get fids extent from master, "
+                               "err %d\n", rc);
                         GOTO(put_cache, rc);
                 }
 
-                /* getting rootid from cache MDS. It is needed to update local
-                 * (cache) root inode by rootid value from master obd. */
-                rc = obd_get_info(cmobd->cache_exp, strlen("rootid"),
-                                  "rootid", &valsize, &lid);
-                if (rc) {
-                        CERROR("can't get rootid from local MDS %s, "
-                               "err= %d.\n", cache_uuid.uuid, rc);
-                        GOTO(put_cache, rc);
+                /* simple checks for validness */
+                if (!ext.fe_start || !ext.fe_width || ext.fe_start == ext.fe_width) {
+                        CERROR("invalid fids extent from master, ["LPD64"-"LPD64"]\n", 
+                              ext.fe_start, ext.fe_width);
+                        GOTO(put_cache, rc = -EINVAL);
                 }
 
-                /* storing master MDS rootid to local root inode EA. */
-                CWARN("storing "DLID4" to local inode "DLID4".\n",
-                      OLID4(&mid), OLID4(&lid));
-
-                rc = mds_update_mid(cmobd->cache_exp->exp_obd, &lid,
-                                    &mid, sizeof(mid));
+                CWARN("setting master fids extent ["LPD64"-"LPD64
+                      "] -> %s\n", ext.fe_start, ext.fe_width,
+                      cmobd->cache_exp->exp_obd->obd_name);
+                
+                rc = obd_set_info(cmobd->cache_exp, strlen("setext"),
+                                  "setext", size, &ext); 
                 if (rc) {
-                        CERROR("can't update local root inode by ID "
-                               "from master MDS %s, err = %d.\n",
-                               master_uuid.uuid, rc);
+                        CERROR("can't set fids extent to cache, "
+                               "err %d\n", rc);
                         GOTO(put_cache, rc);
                 }
         }
@@ -297,7 +273,11 @@ static int cmobd_iocontrol(unsigned int cmd, struct obd_export *exp,
         ENTRY;
         
         switch (cmd) {
-        case OBD_IOC_CMOBD_SYNC: /* trigger reintegration */
+        case OBD_IOC_CMOBD_SYNC:
+                /* here would be nice to make sure somehow that all data is in
+                 * cache and there are no outstanding requests, as otherwise
+                 * cahce is not coherent. But how to check that from CMOBD? I do
+                 * not know. --umka */
                 rc = cmobd_reintegrate(obd);
                 break;
         default:
@@ -334,8 +314,8 @@ static int __init cmobd_init(void)
         if (rc)
                 RETURN(rc);
         cmobd_extent_slab = kmem_cache_create("cmobd_extents",
-                                               sizeof(struct cmobd_extent_info), 0,
-                                               SLAB_HWCACHE_ALIGN, NULL, NULL);
+                                              sizeof(struct cmobd_extent_info), 0,
+                                              SLAB_HWCACHE_ALIGN, NULL, NULL);
         if (cmobd_extent_slab == NULL) {
                 class_unregister_type(OBD_CMOBD_DEVICENAME);
                 RETURN(-ENOMEM);
index 3db865a..8f0dade 100644 (file)
@@ -99,11 +99,11 @@ out:
 
 static int cmobd_create_reint(struct obd_device *obd, void *rec)
 {
-        struct obdo *oa = (struct obdo *)rec;
         struct cm_obd *cmobd = &obd->u.cm;
         struct obd_export *exp = cmobd->master_exp;
-        struct lov_stripe_md *lsm;
+        struct obdo *oa = (struct obdo *)rec;
         struct obd_trans_info oti = { 0 };
+        struct lov_stripe_md *lsm;
         int rc;
         ENTRY;
          
@@ -114,17 +114,20 @@ static int cmobd_create_reint(struct obd_device *obd, void *rec)
         if (cmobd->master_group != oa->o_gr) {
                 int group = oa->o_gr;
                 int valsize = sizeof(group);
-                rc = obd_set_info(exp, strlen("mds_conn"), "mds_conn",
-                                  valsize, &group);
+
+                rc = obd_set_info(exp, strlen("mds_conn"),
+                                  "mds_conn", valsize, &group);
                 if (rc)
-                        GOTO(out, rc = -EINVAL);
+                        GOTO(out, rc);
                 cmobd->master_group = oa->o_gr;
         }
-        rc = obd_create(exp, oa, NULL, 0, &lsm, &oti);
 
+        oti.oti_flags |= OBD_MODE_CROW;
+        rc = obd_create(exp, oa, NULL, 0, &lsm, &oti);
         cmobd_free_lsm(&lsm);
+        EXIT;
 out:
-        RETURN(rc);
+        return rc;
 }
 
 /* direct cut-n-paste of filter_blocking_ast() */
@@ -132,7 +135,7 @@ static int cache_blocking_ast(struct ldlm_lock *lock,
                               struct ldlm_lock_desc *desc,
                               void *data, int flag)
 {
-        int do_ast;
+        int rc, do_ast;
         ENTRY;
 
         if (flag == LDLM_CB_CANCELING) {
@@ -143,7 +146,7 @@ static int cache_blocking_ast(struct ldlm_lock *lock,
         /* XXX layering violation!  -phil */
         lock_res_and_lock(lock);
         
-        /* Get this: if filter_blocking_ast is racing with ldlm_intent_policy,
+        /* get this: if filter_blocking_ast() is racing with ldlm_intent_policy,
          * such that filter_blocking_ast is called just before l_i_p takes the
          * ns_lock, then by the time we get the lock, we might not be the
          * correct blocking function anymore.  So check, and return early, if
@@ -159,8 +162,6 @@ static int cache_blocking_ast(struct ldlm_lock *lock,
 
         if (do_ast) {
                 struct lustre_handle lockh;
-                int rc;
-
                 LDLM_DEBUG(lock, "already unused, calling ldlm_cli_cancel");
                 ldlm_lock2handle(lock, &lockh);
                 rc = ldlm_cli_cancel(&lockh);
@@ -227,12 +228,6 @@ static int cmobd_write_extents(struct obd_device *obd, struct obdo *oa,
                 RETURN(rc);
         
         /* construct the pseudo lsm */
-
-        /*
-         * it is not good to access lov fields like @desc directly. This is
-         * layering violation. It should be accessed via some interface method,
-         * like llite does. --umka
-         */
         rc = cmobd_dummy_lsm(&lsm, cmobd->master_desc.ld_tgt_count, oa,
                              (__u32)cmobd->master_desc.ld_default_stripe_size);
         if (rc)
@@ -250,8 +245,8 @@ static int cmobd_write_extents(struct obd_device *obd, struct obdo *oa,
         rc = obd_cancel(cmobd->master_exp, lsm, LCK_PW, &lockh_dst);
         if (rc)
                 GOTO(out_lsm, rc);
-        /* XXX in fact, I just want to cancel the only lockh_dst 
-         *     instantly. */
+
+        /* XXX in fact, I just want to cancel the only lockh_dst instantly. */
         rc = obd_cancel_unused(cmobd->master_exp, lsm, 0, NULL);
         if (err)
                 rc = err;
@@ -264,8 +259,8 @@ out_lock:
 
 static int cmobd_write_reint(struct obd_device *obd, void *rec)
 {
-        struct cm_obd *cmobd = &obd->u.cm;
         struct obdo *oa = (struct obdo *)rec;
+        struct cm_obd *cmobd = &obd->u.cm;
         struct ldlm_extent *extent = NULL; 
         char *extents_buf = NULL;
         struct obd_device *cache;
index a06c533..ddebc7f 100644 (file)
@@ -110,14 +110,16 @@ int cmobd_reintegrate(struct obd_device *obd)
         if (rc)
                 RETURN(rc);
 
-        /* use the already opened log handle instead of 
-         * reopen a new log handle */
+        /* use the already opened log handle instead of reopen a new log
+         * handle */
         llh = ctxt ? ctxt->loc_handle : NULL;
-        if (llh == NULL)
-                RETURN(-EFAULT);
+        if (llh == NULL) {
+                CERROR("reint log is not found, wrong fstype "
+                       "or smfs plugin is used.\n");
+                RETURN(-EINVAL);
+        }
 
-        /* FIXME should we insert a LLOG_GEN_REC before process log ? */
+        /* FIXME: should we insert a LLOG_GEN_REC before process log? */
         rc = llog_cat_process(llh, (llog_cb_t)cmobd_reint_cb, obd);
-
         RETURN(rc);
 }
index ab37de8..dca4582 100644 (file)
 #include <linux/obd_class.h>
 #include <linux/obd_cache.h>
 #include <linux/obd_lmv.h>
+#include <linux/obd_lov.h>
 
 static int cobd_attach(struct obd_device *obd,
                        obd_count len, void *buf)
 {
         struct lprocfs_static_vars lvars;
+        int rc = 0;
+        ENTRY;
         
         lprocfs_init_vars(cobd, &lvars);
-        return lprocfs_obd_attach(obd, lvars.obd_vars);
+        rc = lprocfs_obd_attach(obd, lvars.obd_vars);
+        
+        RETURN(rc);
 }
 
 static int cobd_detach(struct obd_device *obd)
 {
-        return lprocfs_obd_detach(obd);
+        ENTRY;
+        RETURN(lprocfs_obd_detach(obd));
 }
 
 static int cobd_setup(struct obd_device *obd, obd_count len, void *buf)
@@ -141,8 +147,15 @@ static int cobd_setup(struct obd_device *obd, obd_count len, void *buf)
         }
         cobd->cache_exp = class_conn2export(&conn);
         
-        /* default set cache on */
+        /* default set cache on, but nothins is realy connected yet, will be
+         * done in cobd_connect() time. */
         cobd->cache_on = 1;
+
+        /* nothing is connected, make exports reflect this state to not confuse
+         * cobd_switch() later. */
+        cobd->cache_real_exp = NULL;
+        cobd->master_real_exp = NULL;
+        
         EXIT;
 put_names:
         if (rc) {
@@ -192,24 +205,64 @@ static inline struct obd_export *
 cobd_get_exp(struct obd_device *obd)
 {
         struct cache_obd *cobd = &obd->u.cobd;
+        ENTRY;
+        
         if (cobd->cache_on) {
                 CDEBUG(D_TRACE, "get cache exp %p \n", cobd->cache_exp); 
                 if (cobd->cache_real_exp)
-                       return cobd->cache_real_exp;
-                return cobd->cache_exp;
+                        RETURN(cobd->cache_real_exp);
+                RETURN(cobd->cache_exp);
         }
+        
         CDEBUG(D_TRACE, "get master exp %p \n", cobd->master_exp);
         if (cobd->master_real_exp)
-                return cobd->master_real_exp; 
-        return cobd->master_exp;
+                RETURN(cobd->master_real_exp); 
+        RETURN(cobd->master_exp);
+}
+
+static int cobd_init_dt_desc(struct obd_device *obd)
+{
+        struct cache_obd *cobd = &obd->u.cobd;
+        struct obd_export *cobd_exp;
+        __u32 valsize;
+        int rc = 0;
+        ENTRY;
+        
+        valsize = sizeof(cobd->dt_desc);
+        memset(&cobd->dt_desc, 0, sizeof(cobd->dt_desc));
+
+        cobd_exp = cobd_get_exp(obd);
+        rc = obd_get_info(cobd_exp, strlen("lovdesc") + 1,
+                          "lovdesc", &valsize, &cobd->dt_desc);
+        RETURN(rc);
+}
+        
+static int cobd_init_ea_size(struct obd_device *obd)
+{
+        int rc = 0, tgt_count, easize, cookiesize;
+        struct cache_obd *cobd = &obd->u.cobd;
+        struct obd_export *cobd_exp;
+        ENTRY;
+
+        tgt_count = cobd->dt_desc.ld_tgt_count;
+
+        /* no EA setup is needed as there is single OST with no LOV */
+        if (tgt_count == 0)
+                RETURN(0);
+
+        cobd_exp = cobd_get_exp(obd);
+        easize = lov_mds_md_size(tgt_count);
+        cookiesize = tgt_count * sizeof(struct llog_cookie);
+        rc = obd_init_ea_size(cobd_exp, easize, cookiesize);
+        RETURN(rc);
 }
 
 static int
-client_obd_connect(struct obd_device *obd,
-                   struct obd_export *exp,
-                   struct lustre_handle *conn,
-                   struct obd_connect_data *data,
-                   unsigned long flags)
+cobd_connect_client(struct obd_device *obd,
+                    struct obd_export *exp,
+                    struct lustre_handle *conn,
+                    struct obd_connect_data *data,
+                    unsigned long flags)
 { 
         struct obd_device *cli_obd;
         int rc = 0;
@@ -222,17 +275,18 @@ client_obd_connect(struct obd_device *obd,
         if (cli_obd == NULL) 
                 RETURN(-EINVAL);
 
-        rc = obd_connect(conn, cli_obd, &obd->obd_uuid, data, flags);
+        rc = obd_connect(conn, cli_obd, &obd->obd_uuid,
+                         data, flags);
         if (rc) 
                 CERROR("error connecting err %d\n", rc);
-        
+
         RETURN(rc);
 }
 
 static int
-client_obd_disconnect(struct obd_device *obd,
-                      struct obd_export *exp,
-                      unsigned long flags)
+cobd_disconnect_client(struct obd_device *obd,
+                       struct obd_export *exp,
+                       unsigned long flags)
 {
         struct obd_device *cli_obd;
         int rc = 0;
@@ -250,34 +304,116 @@ client_obd_disconnect(struct obd_device *obd,
         RETURN(rc);
 }
 
+#define COBD_CONNECT (1 << 0)
+#define COBD_DISCON (1 << 1)
+#define COBD_BOTH (1 << 2)
+
+/* magic function for switching cobd between two exports cache and master in
+ * strong correspondence with passed @cache_on. It also may perform partial
+ * actions like only turn off old export or only turn on new one.
+ *
+ * bias == COBD_CONNECT only connect new export (used in cobd_connect())
+ * bias == COBD_DISCON only disconnect old export (used in cobd_disconnect())
+ * bias == COBD_BOTH do both (disconnect old and connect new) (used in
+ * cobd_iocontrol())
+ */
+static int cobd_switch(struct obd_device *obd,
+                       int cache_on, int bias)
+{
+        struct cache_obd *cobd = &obd->u.cobd;
+        struct obd_device *cli_obd = NULL;
+        struct lustre_handle conn = {0,};
+        struct obd_export *discon_exp;
+        struct obd_export *conn_exp;
+        int rc = 0;
+        ENTRY;
+
+        if (cache_on) {
+                discon_exp = cobd->master_real_exp;
+                conn_exp = cobd->cache_exp;
+        } else {
+                discon_exp = cobd->cache_real_exp;
+                conn_exp = cobd->master_exp;
+        }
+
+        /* disconnect old export */
+        if (bias == COBD_BOTH || bias == COBD_DISCON) {
+                if (discon_exp) {
+                        rc = cobd_disconnect_client(obd, discon_exp, 0);
+                        if (rc) {
+                                CWARN("can't disconnect export %p, err %d\n",
+                                      discon_exp, rc);
+                        }
+                }
+                
+                if (cache_on)
+                        cobd->master_real_exp = NULL;
+                else
+                        cobd->cache_real_exp = NULL; 
+        }
+
+        /* connect new export */
+        if (bias == COBD_BOTH || bias == COBD_CONNECT) {
+                rc = cobd_connect_client(obd, conn_exp, &conn,
+                                         NULL, OBD_OPT_REAL_CLIENT);
+                if (rc) {
+                        CERROR("can't connect export %p, err %d\n",
+                               conn_exp, rc);
+                        RETURN(rc);
+                }
+
+                if (cache_on) {
+                        cobd->cache_real_exp = class_conn2export(&conn);
+                        cli_obd = class_exp2obd(cobd->cache_exp);
+                } else {
+                        cobd->master_real_exp = class_conn2export(&conn);
+                        cli_obd = class_exp2obd(cobd->master_exp);
+                }
+        }
+
+        cobd->cache_on = cache_on;
+        
+        if (bias == COBD_BOTH || bias == COBD_CONNECT) {
+                /* re-init EA size for new selected export. This should be done after
+                 * assigining new state to @cobd->cache_on to not call not connened
+                 * already old export. */
+                if (obd_md_type(cli_obd)) {
+                        rc = cobd_init_dt_desc(obd);
+                        if (rc == 0) {
+                                rc = cobd_init_ea_size(obd);
+                                if (rc) {
+                                        CERROR("can't initialize EA size, "
+                                               "err %d\n", rc);
+                                }
+                        } else {
+                                /* ignore cases when we di dnot manage to init
+                                 * lovdesc. This is because some devices may not know
+                                 * "lovdesc" info command. */
+                                rc = 0;
+                        }
+                }
+        }
+
+        RETURN(rc);
+}
+
 static int
 cobd_connect(struct lustre_handle *conn, struct obd_device *obd,
              struct obd_uuid *cluuid, struct obd_connect_data *data,
              unsigned long flags)
 {
-        struct lustre_handle cache_conn = { 0 };
         struct cache_obd *cobd = &obd->u.cobd;
-        struct obd_export *exp, *cobd_exp;
+        struct obd_export *exp;
         int rc = 0;
         ENTRY;
 
         rc = class_connect(conn, obd, cluuid);
         if (rc)
                 RETURN(rc);
-        exp = class_conn2export(conn);
 
-        cobd_exp = cobd_get_exp(obd);
-        
-        /* connecting cache */
-        rc = client_obd_connect(obd, cobd_exp, &cache_conn, 
-                                data, flags);
-        if (rc)
-                GOTO(err_discon, rc);
-       
-        cobd->cache_real_exp = class_conn2export(&cache_conn);
-        cobd->cache_on = 1;
-        EXIT;
-err_discon:
+        exp = class_conn2export(conn);
+        rc = cobd_switch(obd, cobd->cache_on,
+                         COBD_CONNECT);
         if (rc)
                 class_disconnect(exp, 0);
         else
@@ -288,8 +424,8 @@ err_discon:
 static int
 cobd_disconnect(struct obd_export *exp, unsigned long flags)
 {
+        struct cache_obd *cobd;
         struct obd_device *obd;
-        struct obd_export *cobd_exp;
         int rc = 0;
         ENTRY;
         
@@ -300,12 +436,15 @@ cobd_disconnect(struct obd_export *exp, unsigned long flags)
                        LPX64"\n", exp->exp_handle.h_cookie);
                 RETURN(-EINVAL);
         }
-        cobd_exp = cobd_get_exp(obd);
-        
-        rc = client_obd_disconnect(obd, cobd_exp, flags);
 
+        /* here would be nice also to check that disconnect goes to the same
+         * export as connect did. But as now we are accepting the notion that
+         * cache should be switched after client umount this is not needed.
+         * --umka. */
+        cobd = &obd->u.cobd;
+        rc = cobd_switch(obd, cobd->cache_on, COBD_DISCON);
         class_disconnect(exp, flags);
-        
+
         RETURN(rc);
 }
 
@@ -314,16 +453,19 @@ static int cobd_get_info(struct obd_export *exp, __u32 keylen,
 {
         struct obd_device *obd = class_exp2obd(exp);
         struct obd_export *cobd_exp;
+        int rc = 0;
+        ENTRY;
         
         if (obd == NULL) {
                 CERROR("invalid client cookie "LPX64"\n", 
                        exp->exp_handle.h_cookie);
-                return -EINVAL;
+                RETURN(-EINVAL);
         }
         cobd_exp = cobd_get_exp(obd);
 
         /* intercept cache utilisation info? */
-        return obd_get_info(cobd_exp, keylen, key, vallen, val);
+        rc = obd_get_info(cobd_exp, keylen, key, vallen, val);
+        RETURN(rc);
 }
 
 static int cobd_set_info(struct obd_export *exp, obd_count keylen,
@@ -331,17 +473,21 @@ static int cobd_set_info(struct obd_export *exp, obd_count keylen,
 {
         struct obd_device *obd = class_exp2obd(exp);
         struct obd_export *cobd_exp;
+        int rc = 0;
+        ENTRY;
 
         if (obd == NULL) {
                 CERROR("invalid client cookie "LPX64"\n", 
                        exp->exp_handle.h_cookie);
-                return -EINVAL;
+                RETURN(-EINVAL);
         }
         cobd_exp = cobd_get_exp(obd);
        
-        LASSERT(cobd_exp); 
+        LASSERT(cobd_exp);
+        
         /* intercept cache utilisation info? */
-        return obd_set_info(cobd_exp, keylen, key, vallen, val);
+        rc = obd_set_info(cobd_exp, keylen, key, vallen, val);
+        RETURN(rc);
 }
 
 static int cobd_statfs(struct obd_device *obd,
@@ -349,9 +495,44 @@ static int cobd_statfs(struct obd_device *obd,
                        unsigned long max_age)
 {
         struct obd_export *cobd_exp;
+        int rc = 0;
+        ENTRY;
 
         cobd_exp = cobd_get_exp(obd);
-        return obd_statfs(class_exp2obd(cobd_exp), osfs, max_age);
+        rc = obd_statfs(class_exp2obd(cobd_exp), osfs, max_age);
+        RETURN(rc);
+}
+
+static int cobd_iocontrol(unsigned int cmd, struct obd_export *exp,
+                          int len, void *karg, void *uarg)
+{
+        struct obd_device *obd = class_exp2obd(exp);
+        struct cache_obd  *cobd = &obd->u.cobd;
+        struct obd_export *cobd_exp;
+        int rc = 0;
+        ENTRY;
+
+        down(&cobd->sem);
+        
+        /* here would be nice also to make sure somehow that there are no
+         * out-standing requests which go to wrong MDS after cache switch (close
+         * RPCs). But how to check that from COBD? I do not know. --umka */
+        switch (cmd) {
+        case OBD_IOC_COBD_CON:
+                if (!cobd->cache_on)
+                        rc = cobd_switch(obd, 1, COBD_BOTH);
+                break;
+        case OBD_IOC_COBD_COFF: 
+                if (cobd->cache_on)
+                        rc = cobd_switch(obd, 0, COBD_BOTH);
+                break;
+        default:
+                cobd_exp = cobd_get_exp(obd);
+                rc = obd_iocontrol(cmd, cobd_exp, len, karg, uarg);
+        }
+
+        up(&cobd->sem);
+        RETURN(rc);
 }
 
 static int cobd_dt_packmd(struct obd_export *exp,
@@ -360,14 +541,17 @@ static int cobd_dt_packmd(struct obd_export *exp,
 {
         struct obd_device *obd = class_exp2obd(exp);
         struct obd_export *cobd_exp;
+        int rc = 0;
+        ENTRY;
 
         if (obd == NULL) {
                 CERROR("invalid client cookie "LPX64"\n", 
                        exp->exp_handle.h_cookie);
-                return -EINVAL;
+                RETURN(-EINVAL);
         }
         cobd_exp = cobd_get_exp(obd);
-        return obd_packmd(cobd_exp, disk_tgt, mem_src);
+        rc = obd_packmd(cobd_exp, disk_tgt, mem_src);
+        RETURN(rc);
 }
 
 static int cobd_dt_unpackmd(struct obd_export *exp,
@@ -377,14 +561,17 @@ static int cobd_dt_unpackmd(struct obd_export *exp,
 {
         struct obd_device *obd = class_exp2obd(exp);
         struct obd_export *cobd_exp;
+        int rc = 0;
+        ENTRY;
 
         if (obd == NULL) {
                 CERROR("invalid client cookie "LPX64"\n", 
                        exp->exp_handle.h_cookie);
-                return -EINVAL;
+                RETURN(-EINVAL);
         }
         cobd_exp = cobd_get_exp(obd);
-        return obd_unpackmd(cobd_exp, mem_tgt, disk_src, disk_len);
+        rc = obd_unpackmd(cobd_exp, mem_tgt, disk_src, disk_len);
+        RETURN(rc);
 }
 
 static int cobd_dt_create(struct obd_export *exp,
@@ -395,14 +582,17 @@ static int cobd_dt_create(struct obd_export *exp,
 {
         struct obd_device *obd = class_exp2obd(exp);
         struct obd_export *cobd_exp;
+        int rc = 0;
+        ENTRY;
 
         if (obd == NULL) {
                 CERROR("invalid client cookie "LPX64"\n", 
                        exp->exp_handle.h_cookie);
-                return -EINVAL;
+                RETURN(-EINVAL);
         }
         cobd_exp = cobd_get_exp(obd);
-        return obd_create(cobd_exp, obdo, acl, acl_size, ea, oti);
+        rc = obd_create(cobd_exp, obdo, acl, acl_size, ea, oti);
+        RETURN(rc);
 }
 
 static int cobd_dt_destroy(struct obd_export *exp,
@@ -412,20 +602,24 @@ static int cobd_dt_destroy(struct obd_export *exp,
 {
         struct obd_device *obd = class_exp2obd(exp);
         struct obd_export *cobd_exp;
+        int rc = 0;
+        ENTRY;
 
         if (obd == NULL) {
                 CERROR("invalid client cookie "LPX64"\n", 
                        exp->exp_handle.h_cookie);
-                return -EINVAL;
+                RETURN(-EINVAL);
         }
         cobd_exp = cobd_get_exp(obd);
-        return obd_destroy(cobd_exp, obdo, ea, oti); 
+        rc = obd_destroy(cobd_exp, obdo, ea, oti);
+        RETURN(rc);
 }
 
 static int cobd_dt_precleanup(struct obd_device *obd, int flags)
 {
-        /* FIXME-WANGDI: do we need some cleanup here? */
-        return 0;
+        /* FIXME: do we need some cleanup here? */
+        ENTRY;
+        RETURN(0);
 }
 
 static int cobd_dt_getattr(struct obd_export *exp, struct obdo *oa,
@@ -433,14 +627,17 @@ static int cobd_dt_getattr(struct obd_export *exp, struct obdo *oa,
 {
         struct obd_device *obd = class_exp2obd(exp);
         struct obd_export *cobd_exp;
+        int rc = 0;
+        ENTRY;
 
         if (obd == NULL) {
                 CERROR("invalid client cookie "LPX64"\n", 
                        exp->exp_handle.h_cookie);
-                return -EINVAL;
+                RETURN(-EINVAL);
         }
         cobd_exp = cobd_get_exp(obd);
-        return obd_getattr(cobd_exp, oa, ea);
+        rc = obd_getattr(cobd_exp, oa, ea);
+        RETURN(rc);
 }
 
 static int cobd_dt_getattr_async(struct obd_export *exp,
@@ -449,14 +646,17 @@ static int cobd_dt_getattr_async(struct obd_export *exp,
 {
         struct obd_device *obd = class_exp2obd(exp);
         struct obd_export *cobd_exp;
+        int rc = 0;
+        ENTRY;
 
         if (obd == NULL) {
                 CERROR("invalid client cookie "LPX64"\n", 
                        exp->exp_handle.h_cookie);
-                return -EINVAL;
+                RETURN(-EINVAL);
         }
         cobd_exp = cobd_get_exp(obd);
-        return obd_getattr_async(cobd_exp, obdo, ea, set);
+        rc = obd_getattr_async(cobd_exp, obdo, ea, set);
+        RETURN(rc);
 }
 
 static int cobd_dt_setattr(struct obd_export *exp, struct obdo *obdo,
@@ -465,14 +665,17 @@ static int cobd_dt_setattr(struct obd_export *exp, struct obdo *obdo,
 {
         struct obd_device *obd = class_exp2obd(exp);
         struct obd_export *cobd_exp;
+        int rc = 0;
+        ENTRY;
 
         if (obd == NULL) {
                 CERROR("invalid client cookie "LPX64"\n", 
                        exp->exp_handle.h_cookie);
-                return -EINVAL;
+                RETURN(-EINVAL);
         }
         cobd_exp = cobd_get_exp(obd);
-        return obd_setattr(cobd_exp, obdo, ea, oti);
+        rc = obd_setattr(cobd_exp, obdo, ea, oti);
+        RETURN(rc);
 }
 
 static int cobd_dt_brw(int cmd, struct obd_export *exp, struct obdo *oa,
@@ -481,14 +684,17 @@ static int cobd_dt_brw(int cmd, struct obd_export *exp, struct obdo *oa,
 {
         struct obd_device *obd = class_exp2obd(exp);
         struct obd_export *cobd_exp;
+        int rc = 0;
+        ENTRY;
 
         if (obd == NULL) {
                 CERROR("invalid client cookie "LPX64"\n", 
                        exp->exp_handle.h_cookie);
-                return -EINVAL;
+                RETURN(-EINVAL);
         }
         cobd_exp = cobd_get_exp(obd);
-        return obd_brw(cmd, cobd_exp, oa, ea, oa_bufs, pg, oti);
+        rc = obd_brw(cmd, cobd_exp, oa, ea, oa_bufs, pg, oti);
+        RETURN(rc);
 }
 
 static int cobd_dt_brw_async(int cmd, struct obd_export *exp,
@@ -499,15 +705,18 @@ static int cobd_dt_brw_async(int cmd, struct obd_export *exp,
 {
         struct obd_device *obd = class_exp2obd(exp);
         struct obd_export *cobd_exp;
+        int rc = 0;
+        ENTRY;
 
         if (obd == NULL) {
                 CERROR("invalid client cookie "LPX64"\n", 
                        exp->exp_handle.h_cookie);
-                return -EINVAL;
+                RETURN(-EINVAL);
         }
         cobd_exp = cobd_get_exp(obd);
-        return obd_brw_async(cmd, cobd_exp, oa, ea, oa_bufs, 
-                             pg, set, oti);
+        rc = obd_brw_async(cmd, cobd_exp, oa, ea, oa_bufs, 
+                           pg, set, oti);
+        RETURN(rc);
 }
 
 static int cobd_dt_prep_async_page(struct obd_export *exp, 
@@ -519,15 +728,18 @@ static int cobd_dt_prep_async_page(struct obd_export *exp,
 {
         struct obd_device *obd = class_exp2obd(exp);
         struct obd_export *cobd_exp;
+        int rc = 0;
+        ENTRY;
 
         if (obd == NULL) {
                 CERROR("invalid client cookie "LPX64"\n", 
                        exp->exp_handle.h_cookie);
-                return -EINVAL;
+                RETURN(-EINVAL);
         }
         cobd_exp = cobd_get_exp(obd);
-        return obd_prep_async_page(cobd_exp, lsm, loi, page, offset,
-                                   ops, data, res);
+        rc = obd_prep_async_page(cobd_exp, lsm, loi, page,
+                                 offset, ops, data, res);
+        RETURN(rc);
 }
 
 static int cobd_dt_queue_async_io(struct obd_export *exp,
@@ -538,15 +750,18 @@ static int cobd_dt_queue_async_io(struct obd_export *exp,
 {
         struct obd_device *obd = class_exp2obd(exp);
         struct obd_export *cobd_exp;
+        int rc = 0;
+        ENTRY;
 
         if (obd == NULL) {
                 CERROR("invalid client cookie "LPX64"\n", 
                        exp->exp_handle.h_cookie);
-                return -EINVAL;
+                RETURN(-EINVAL);
         }
         cobd_exp = cobd_get_exp(obd);
-        return obd_queue_async_io(cobd_exp, lsm, loi, cookie, cmd, off, count,
-                                  brw_flags, async_flags);
+        rc = obd_queue_async_io(cobd_exp, lsm, loi, cookie, cmd, off,
+                                count, brw_flags, async_flags);
+        RETURN(rc);
 }
 
 static int cobd_dt_set_async_flags(struct obd_export *exp,
@@ -556,14 +771,18 @@ static int cobd_dt_set_async_flags(struct obd_export *exp,
 {
         struct obd_device *obd = class_exp2obd(exp);
         struct obd_export *cobd_exp;
+        int rc = 0;
+        ENTRY;
 
         if (obd == NULL) {
                 CERROR("invalid client cookie "LPX64"\n", 
                        exp->exp_handle.h_cookie);
-                return -EINVAL;
+                RETURN(-EINVAL);
         }
         cobd_exp = cobd_get_exp(obd);
-        return obd_set_async_flags(cobd_exp, lsm, loi, cookie, async_flags);
+        rc = obd_set_async_flags(cobd_exp, lsm, loi, cookie,
+                                 async_flags);
+        RETURN(rc);
 }
 
 static int cobd_dt_queue_group_io(struct obd_export *exp, 
@@ -576,15 +795,19 @@ static int cobd_dt_queue_group_io(struct obd_export *exp,
 {
         struct obd_device *obd = class_exp2obd(exp);
         struct obd_export *cobd_exp;
+        int rc = 0;
+        ENTRY;
 
         if (obd == NULL) {
                 CERROR("invalid client cookie "LPX64"\n", 
                        exp->exp_handle.h_cookie);
-                return -EINVAL;
+                RETURN(-EINVAL);
         }
         cobd_exp = cobd_get_exp(obd);
-        return obd_queue_group_io(cobd_exp, lsm, loi, oig, cookie,
-                                  cmd, off, count, brw_flags, async_flags);
+        rc = obd_queue_group_io(cobd_exp, lsm, loi, oig, cookie,
+                                cmd, off, count, brw_flags,
+                                async_flags);
+        RETURN(rc);
 }
 
 static int cobd_dt_trigger_group_io(struct obd_export *exp, 
@@ -594,30 +817,37 @@ static int cobd_dt_trigger_group_io(struct obd_export *exp,
 {
         struct obd_device *obd = class_exp2obd(exp);
         struct obd_export *cobd_exp;
+        int rc = 0;
+        ENTRY;
 
         if (obd == NULL) {
                 CERROR("invalid client cookie "LPX64"\n", 
                        exp->exp_handle.h_cookie);
-                return -EINVAL;
+                RETURN(-EINVAL);
         }
         cobd_exp = cobd_get_exp(obd);
-        return obd_trigger_group_io(cobd_exp, lsm, loi, oig); 
+        rc = obd_trigger_group_io(cobd_exp, lsm, loi, oig);
+        RETURN(rc);
 }
 
 static int cobd_dt_teardown_async_page(struct obd_export *exp,
                                        struct lov_stripe_md *lsm,
-                                       struct lov_oinfo *loi, void *cookie)
+                                       struct lov_oinfo *loi,
+                                       void *cookie)
 {
         struct obd_device *obd = class_exp2obd(exp);
         struct obd_export *cobd_exp;
+        int rc = 0;
+        ENTRY;
 
         if (obd == NULL) {
                 CERROR("invalid client cookie "LPX64"\n", 
                        exp->exp_handle.h_cookie);
-                return -EINVAL;
+                RETURN(-EINVAL);
         }
         cobd_exp = cobd_get_exp(obd);
-        return obd_teardown_async_page(cobd_exp, lsm, loi, cookie);
+        rc = obd_teardown_async_page(cobd_exp, lsm, loi, cookie);
+        RETURN(rc);
 }
 
 static int cobd_dt_punch(struct obd_export *exp, struct obdo *oa,
@@ -626,14 +856,17 @@ static int cobd_dt_punch(struct obd_export *exp, struct obdo *oa,
 {
         struct obd_device *obd = class_exp2obd(exp);
         struct obd_export *cobd_exp;
+        int rc = 0;
+        ENTRY;
 
         if (obd == NULL) {
                 CERROR("invalid client cookie "LPX64"\n", 
                        exp->exp_handle.h_cookie);
-                return -EINVAL;
+                RETURN(-EINVAL);
         }
         cobd_exp = cobd_get_exp(obd);
-        return obd_punch(cobd_exp, oa, ea, start, end, oti);
+        rc = obd_punch(cobd_exp, oa, ea, start, end, oti);
+        RETURN(rc);
 }
 
 static int cobd_dt_sync(struct obd_export *exp, struct obdo *oa,
@@ -642,14 +875,17 @@ static int cobd_dt_sync(struct obd_export *exp, struct obdo *oa,
 {
         struct obd_device *obd = class_exp2obd(exp);
         struct obd_export *cobd_exp;
+        int rc = 0;
+        ENTRY;
 
         if (obd == NULL) {
                 CERROR("invalid client cookie "LPX64"\n", 
                        exp->exp_handle.h_cookie);
-                return -EINVAL;
+                RETURN(-EINVAL);
         }
         cobd_exp = cobd_get_exp(obd);
-        return obd_sync(cobd_exp, oa, ea, start, end);
+        rc = obd_sync(cobd_exp, oa, ea, start, end);
+        RETURN(rc);
 }
 
 static int cobd_dt_enqueue(struct obd_export *exp, struct lov_stripe_md *ea,
@@ -660,16 +896,19 @@ static int cobd_dt_enqueue(struct obd_export *exp, struct lov_stripe_md *ea,
 {
         struct obd_device *obd = class_exp2obd(exp);
         struct obd_export *cobd_exp;
+        int rc = 0;
+        ENTRY;
 
         if (obd == NULL) {
                 CERROR("invalid client cookie "LPX64"\n", 
                        exp->exp_handle.h_cookie);
-                return -EINVAL;
+                RETURN(-EINVAL);
         }
         cobd_exp = cobd_get_exp(obd);
-        return obd_enqueue(cobd_exp, ea, type, policy, mode, flags, 
-                           bl_cb, cp_cb, gl_cb, data, lvb_len,
-                           lvb_swabber, lockh);
+        rc = obd_enqueue(cobd_exp, ea, type, policy, mode, flags, 
+                         bl_cb, cp_cb, gl_cb, data, lvb_len,
+                         lvb_swabber, lockh);
+        RETURN(rc);
 }
 
 static int cobd_dt_match(struct obd_export *exp, struct lov_stripe_md *ea,
@@ -678,15 +917,18 @@ static int cobd_dt_match(struct obd_export *exp, struct lov_stripe_md *ea,
 {
         struct obd_device *obd = class_exp2obd(exp);
         struct obd_export *cobd_exp;
+        int rc = 0;
+        ENTRY;
 
         if (obd == NULL) {
                 CERROR("invalid client cookie "LPX64"\n", 
                        exp->exp_handle.h_cookie);
-                return -EINVAL;
+                RETURN(-EINVAL);
         }
         cobd_exp = cobd_get_exp(obd);
-        return obd_match(cobd_exp, ea, type, policy, mode, flags, data,
-                         lockh); 
+        rc = obd_match(cobd_exp, ea, type, policy, mode, flags,
+                       data, lockh);
+        RETURN(rc);
 }
 static int cobd_dt_change_cbdata(struct obd_export *exp,
                                  struct lov_stripe_md *lsm, 
@@ -694,14 +936,17 @@ static int cobd_dt_change_cbdata(struct obd_export *exp,
 {
         struct obd_device *obd = class_exp2obd(exp);
         struct obd_export *cobd_exp;
+        int rc = 0;
+        ENTRY;
 
         if (obd == NULL) {
                 CERROR("invalid client cookie "LPX64"\n", 
                        exp->exp_handle.h_cookie);
-                return -EINVAL;
+                RETURN(-EINVAL);
         }
         cobd_exp = cobd_get_exp(obd);
-        return obd_change_cbdata(cobd_exp, lsm, it, data);
+        rc = obd_change_cbdata(cobd_exp, lsm, it, data);
+        RETURN(rc);
 }
 
 static int cobd_dt_cancel(struct obd_export *exp,
@@ -710,14 +955,17 @@ static int cobd_dt_cancel(struct obd_export *exp,
 {
         struct obd_device *obd = class_exp2obd(exp);
         struct obd_export *cobd_exp;
+        int rc = 0;
+        ENTRY;
 
         if (obd == NULL) {
                 CERROR("invalid client cookie "LPX64"\n", 
                        exp->exp_handle.h_cookie);
-                return -EINVAL;
+                RETURN(-EINVAL);
         }
         cobd_exp = cobd_get_exp(obd);
-        return obd_cancel(cobd_exp, ea, mode, lockh);
+        rc = obd_cancel(cobd_exp, ea, mode, lockh);
+        RETURN(rc);
 }
 
 static int cobd_dt_cancel_unused(struct obd_export *exp,
@@ -726,14 +974,17 @@ static int cobd_dt_cancel_unused(struct obd_export *exp,
 {
         struct obd_device *obd = class_exp2obd(exp);
         struct obd_export *cobd_exp;
-
+        int rc = 0;
+        ENTRY;
+        
         if (obd == NULL) {
                 CERROR("invalid client cookie "LPX64"\n", 
                        exp->exp_handle.h_cookie);
-                return -EINVAL;
+                RETURN(-EINVAL);
         }
         cobd_exp = cobd_get_exp(obd);
-        return obd_cancel_unused(cobd_exp, ea, flags, opaque);
+        rc = obd_cancel_unused(cobd_exp, ea, flags, opaque);
+        RETURN(rc);
 }
 
 static int cobd_dt_preprw(int cmd, struct obd_export *exp,
@@ -745,15 +996,18 @@ static int cobd_dt_preprw(int cmd, struct obd_export *exp,
 {
         struct obd_device *obd = class_exp2obd(exp);
         struct obd_export *cobd_exp;
+        int rc = 0;
+        ENTRY;
 
         if (obd == NULL) {
                 CERROR("invalid client cookie "LPX64"\n", 
                        exp->exp_handle.h_cookie);
-                return -EINVAL;
+                RETURN(-EINVAL);
         }
         cobd_exp = cobd_get_exp(obd);
-        return obd_preprw(cmd, cobd_exp, oa, objcount, obj,
-                          niocount, nb, res, oti);
+        rc = obd_preprw(cmd, cobd_exp, oa, objcount, obj,
+                        niocount, nb, res, oti);
+        RETURN(rc);
 }
 
 static int cobd_dt_commitrw(int cmd, struct obd_export *exp, struct obdo *oa,
@@ -763,99 +1017,38 @@ static int cobd_dt_commitrw(int cmd, struct obd_export *exp, struct obdo *oa,
 {
         struct obd_device *obd = class_exp2obd(exp);
         struct obd_export *cobd_exp;
+        int err = 0;
+        ENTRY;
 
         if (obd == NULL) {
                 CERROR("invalid client cookie "LPX64"\n", 
                        exp->exp_handle.h_cookie);
-                return -EINVAL;
+                RETURN(-EINVAL);
         }
         cobd_exp = cobd_get_exp(obd);
-        return obd_commitrw(cmd, cobd_exp, oa, objcount, obj,
-                            niocount, local, oti, rc);
-}
-
-static int cobd_flush(struct obd_device *obd)
-{
-        return 0; 
+        err = obd_commitrw(cmd, cobd_exp, oa, objcount, obj,
+                           niocount, local, oti, rc);
+        RETURN(err);
 }
 
-static int cobd_dt_iocontrol(unsigned int cmd, struct obd_export *exp,
-                             int len, void *karg, void *uarg)
+static int cobd_dt_adjust_kms(struct obd_export *exp,
+                              struct lov_stripe_md *lsm,
+                              obd_off size, int shrink)
 {
         struct obd_device *obd = class_exp2obd(exp);
-        struct cache_obd  *cobd = &obd->u.cobd;
         struct obd_export *cobd_exp;
         int rc = 0;
         ENTRY;
 
-        down(&cobd->sem);
-        
-        switch (cmd) {
-        case OBD_IOC_COBD_CON:
-                if (!cobd->cache_on) {
-                        struct lustre_handle conn = {0};
-
-                        rc = client_obd_disconnect(obd, cobd->master_real_exp, 0);
-                        if (rc) {
-                                CWARN("can't disconnect master export, err %d\n",
-                                      rc);
-                        }
-                        
-                        rc = client_obd_connect(obd, cobd->cache_exp, &conn,
-                                                NULL, OBD_OPT_REAL_CLIENT);
-                        if (rc)
-                                GOTO(out, rc);
-
-                        cobd->cache_real_exp = class_conn2export(&conn);
-                        cobd->cache_on = 1;
-                }
-                break;
-        case OBD_IOC_COBD_COFF: 
-                if (cobd->cache_on) {
-                        struct lustre_handle conn = {0,};
-                        struct obd_device *master = NULL;
-                        struct obd_device *cache = NULL;
-                        int easize, cooksize;
-
-                        cache = class_exp2obd(cobd->cache_exp); 
-                        easize = cache->u.cli.cl_max_mds_easize; 
-                        cooksize = cache->u.cli.cl_max_mds_cookiesize;
-
-                        rc = client_obd_disconnect(obd, cobd->cache_real_exp, 0);
-                        if (rc) {
-                                CWARN("can't disconnect cache export, err %d\n",
-                                      rc);
-                        }
-                        rc = client_obd_connect(obd, cobd->master_exp, &conn,
-                                                NULL, OBD_OPT_REAL_CLIENT);
-                        if (rc)
-                                GOTO(out, rc);
-                        cobd->master_real_exp = class_conn2export(&conn);
-
-                        master = class_exp2obd(cobd->master_exp);
-                        master->u.cli.cl_max_mds_easize = easize;
-                        master->u.cli.cl_max_mds_cookiesize = cooksize;
-                        cobd->cache_on = 0;
-                }
-                break;
-        case OBD_IOC_COBD_CFLUSH:
-                if (cobd->cache_on) {
-                        cobd->cache_on = 0;
-                        cobd_flush(obd);
-                        cobd->cache_on = 1;
-                } else {
-                        CERROR("%s: cache is turned off\n", obd->obd_name);
-                }
-                break;
-        default:
-                cobd_exp = cobd_get_exp(obd);
-                rc = obd_iocontrol(cmd, cobd_exp, len, karg, uarg);
+        if (obd == NULL) {
+                CERROR("invalid client cookie "LPX64"\n", 
+                       exp->exp_handle.h_cookie);
+                RETURN(-EINVAL);
         }
-
-        EXIT;
-out:
-        up(&cobd->sem);
-        return rc;
+        cobd_exp = cobd_get_exp(obd);
+        rc = obd_adjust_kms(cobd_exp, lsm, size, shrink);
+        
+        RETURN(rc);
 }
 
 static int cobd_dt_llog_init(struct obd_device *obd,
@@ -865,12 +1058,15 @@ static int cobd_dt_llog_init(struct obd_device *obd,
 {
         struct obd_export *cobd_exp;
         struct obd_device *cobd_obd;
+        int rc = 0;
+        ENTRY;
 
         cobd_exp = cobd_get_exp(obd);
         cobd_obd = class_exp2obd(cobd_exp);
         
-        return obd_llog_init(cobd_obd, &cobd_obd->obd_llogs, 
-                             disk_obd, count, logid);
+        rc = obd_llog_init(cobd_obd, &cobd_obd->obd_llogs, 
+                           disk_obd, count, logid);
+        RETURN(rc);
 }
 
 static int cobd_dt_llog_finish(struct obd_device *obd,
@@ -879,21 +1075,26 @@ static int cobd_dt_llog_finish(struct obd_device *obd,
 {
         struct obd_export *cobd_exp;
         struct obd_device *cobd_obd;
+        int rc = 0;
+        ENTRY;
 
         cobd_exp = cobd_get_exp(obd);
         cobd_obd = class_exp2obd(cobd_exp);
 
-        return obd_llog_finish(cobd_obd, &cobd_obd->obd_llogs, count);
+        rc = obd_llog_finish(cobd_obd, &cobd_obd->obd_llogs, count);
+        RETURN(rc);
 }
 
 static int cobd_dt_notify(struct obd_device *obd, struct obd_device *watched,
                           int active, void *data)
 {
         struct obd_export *cobd_exp;
+        int rc = 0;
+        ENTRY;
 
         cobd_exp = cobd_get_exp(obd);
-
-        return obd_notify(class_exp2obd(cobd_exp), watched, active, data);
+        rc = obd_notify(class_exp2obd(cobd_exp), watched, active, data);
+        RETURN(rc);
 }
 
 static int cobd_dt_pin(struct obd_export *exp, obd_id ino, __u32 gen,
@@ -902,15 +1103,17 @@ static int cobd_dt_pin(struct obd_export *exp, obd_id ino, __u32 gen,
 {
         struct obd_device *obd = class_exp2obd(exp);
         struct obd_export *cobd_exp;
+        int rc = 0;
+        ENTRY;
 
         if (obd == NULL) {
                 CERROR("invalid client cookie "LPX64"\n", 
                        exp->exp_handle.h_cookie);
-                return -EINVAL;
+                RETURN(-EINVAL);
         }
         cobd_exp = cobd_get_exp(obd);
-
-        return obd_pin(cobd_exp, ino, gen, type, handle, flag);
+        rc = obd_pin(cobd_exp, ino, gen, type, handle, flag);
+        RETURN(rc);
 }
 
 static int cobd_dt_unpin(struct obd_export *exp,
@@ -919,24 +1122,29 @@ static int cobd_dt_unpin(struct obd_export *exp,
 {
         struct obd_device *obd = class_exp2obd(exp);
         struct obd_export *cobd_exp;
+        int rc = 0;
+        ENTRY;
 
         if (obd == NULL) {
                 CERROR("invalid client cookie "LPX64"\n", 
                        exp->exp_handle.h_cookie);
-                return -EINVAL;
+                RETURN(-EINVAL);
         }
         cobd_exp = cobd_get_exp(obd);
-
-        return obd_unpin(cobd_exp, handle, flag);
+        rc = obd_unpin(cobd_exp, handle, flag);
+        RETURN(rc);
 }
 
 static int cobd_dt_init_ea_size(struct obd_export *exp, int easize,
                                 int cookiesize)
 {
         struct obd_export *cobd_exp;
+        int rc = 0;
+        ENTRY;
 
         cobd_exp = cobd_get_exp(exp->exp_obd);
-        return obd_init_ea_size(cobd_exp, easize, cookiesize);
+        rc = obd_init_ea_size(cobd_exp, easize, cookiesize);
+        RETURN(rc);
 }
 
 static int cobd_dt_import_event(struct obd_device *obd,
@@ -944,10 +1152,11 @@ static int cobd_dt_import_event(struct obd_device *obd,
                                 enum obd_import_event event)
 {
         struct obd_export *cobd_exp;
+        ENTRY;
 
         cobd_exp = cobd_get_exp(obd);
         obd_import_event(class_exp2obd(cobd_exp), imp, event);
-        return 0
+        RETURN(0)
 }
 
 static int cobd_md_getstatus(struct obd_export *exp,
@@ -955,14 +1164,17 @@ static int cobd_md_getstatus(struct obd_export *exp,
 {
         struct obd_device *obd = class_exp2obd(exp);
         struct obd_export *cobd_exp;
+        int rc = 0;
+        ENTRY;
 
         if (obd == NULL) {
                 CERROR("invalid client cookie "LPX64"\n", 
                        exp->exp_handle.h_cookie);
-                return -EINVAL;
+                RETURN(-EINVAL);
         }
         cobd_exp = cobd_get_exp(obd);
-        return md_getstatus(cobd_exp, rootid);
+        rc = md_getstatus(cobd_exp, rootid);
+        RETURN(rc);
 }
 
 static int cobd_md_getattr(struct obd_export *exp, struct lustre_id *id,
@@ -973,15 +1185,18 @@ static int cobd_md_getattr(struct obd_export *exp, struct lustre_id *id,
 {
         struct obd_device *obd = class_exp2obd(exp);
         struct obd_export *cobd_exp;
+        int rc = 0;
+        ENTRY;
 
         if (obd == NULL) {
                 CERROR("invalid client cookie "LPX64"\n", 
                        exp->exp_handle.h_cookie);
-                return -EINVAL;
+                RETURN(-EINVAL);
         }
         cobd_exp = cobd_get_exp(obd);
-        return md_getattr(cobd_exp, id, valid, xattr_name,
-                          xattr_data, xattr_datalen, ea_size, request);
+        rc = md_getattr(cobd_exp, id, valid, xattr_name,
+                        xattr_data, xattr_datalen, ea_size, request);
+        RETURN(rc);
 }
 
 static int cobd_md_req2lustre_md(struct obd_export *mdc_exp, 
@@ -992,14 +1207,17 @@ static int cobd_md_req2lustre_md(struct obd_export *mdc_exp,
 {
         struct obd_device *obd = class_exp2obd(mdc_exp);
         struct obd_export *cobd_exp;
+        int rc = 0;
+        ENTRY;
 
         if (obd == NULL) {
                 CERROR("invalid client cookie "LPX64"\n", 
                        mdc_exp->exp_handle.h_cookie);
-                return -EINVAL;
+                RETURN(-EINVAL);
         }
         cobd_exp = cobd_get_exp(obd);
-        return md_req2lustre_md(cobd_exp, req, offset, osc_exp, md);
+        rc = md_req2lustre_md(cobd_exp, req, offset, osc_exp, md);
+        RETURN(rc);
 }
 
 static int cobd_md_change_cbdata(struct obd_export *exp, struct lustre_id *id, 
@@ -1007,14 +1225,17 @@ static int cobd_md_change_cbdata(struct obd_export *exp, struct lustre_id *id,
 {
         struct obd_device *obd = class_exp2obd(exp);
         struct obd_export *cobd_exp;
+        int rc = 0;
+        ENTRY;
 
         if (obd == NULL) {
                 CERROR("invalid client cookie "LPX64"\n", 
                        exp->exp_handle.h_cookie);
-                return -EINVAL;
+                RETURN(-EINVAL);
         }
         cobd_exp = cobd_get_exp(obd);
-        return md_change_cbdata(cobd_exp, id, it, data);
+        rc = md_change_cbdata(cobd_exp, id, it, data);
+        RETURN(rc);
 }
 
 static int cobd_md_getattr_lock(struct obd_export *exp, struct lustre_id *id,
@@ -1023,15 +1244,18 @@ static int cobd_md_getattr_lock(struct obd_export *exp, struct lustre_id *id,
 {
         struct obd_device *obd = class_exp2obd(exp);
         struct obd_export *cobd_exp;
+        int rc = 0;
+        ENTRY;
 
         if (obd == NULL) {
                 CERROR("invalid client cookie "LPX64"\n", 
                        exp->exp_handle.h_cookie);
-                return -EINVAL;
+                RETURN(-EINVAL);
         }
         cobd_exp = cobd_get_exp(obd);
-        return md_getattr_lock(cobd_exp, id, filename, namelen, valid,
-                               ea_size, request);
+        rc = md_getattr_lock(cobd_exp, id, filename, namelen,
+                             valid, ea_size, request);
+        RETURN(rc);
 }
 
 static int cobd_md_create(struct obd_export *exp, struct mdc_op_data *op_data,
@@ -1041,15 +1265,18 @@ static int cobd_md_create(struct obd_export *exp, struct mdc_op_data *op_data,
 {
         struct obd_device *obd = class_exp2obd(exp);
         struct obd_export *cobd_exp;
+        int rc = 0;
+        ENTRY;
 
         if (obd == NULL) {
                 CERROR("invalid client cookie "LPX64"\n", 
                        exp->exp_handle.h_cookie);
-                return -EINVAL;
+                RETURN(-EINVAL);
         }
         cobd_exp = cobd_get_exp(obd);
-        return md_create(cobd_exp, op_data, data, datalen, mode,
-                         uid, gid, rdev, request);
+        rc = md_create(cobd_exp, op_data, data, datalen, mode,
+                       uid, gid, rdev, request);
+        RETURN(rc);
 }
 
 static int cobd_md_unlink(struct obd_export *exp,
@@ -1058,14 +1285,17 @@ static int cobd_md_unlink(struct obd_export *exp,
 {
         struct obd_device *obd = class_exp2obd(exp);
         struct obd_export *cobd_exp;
+        int rc = 0;
+        ENTRY;
 
         if (obd == NULL) {
                 CERROR("invalid client cookie "LPX64"\n", 
                        exp->exp_handle.h_cookie);
-                return -EINVAL;
+                RETURN(-EINVAL);
         }
         cobd_exp = cobd_get_exp(obd);
-        return md_unlink(cobd_exp, data, request);
+        rc = md_unlink(cobd_exp, data, request);
+        RETURN(rc);
 }
 
 static int cobd_md_valid_attrs(struct obd_export *exp,
@@ -1073,14 +1303,17 @@ static int cobd_md_valid_attrs(struct obd_export *exp,
 {
         struct obd_device *obd = class_exp2obd(exp);
         struct obd_export *cobd_exp;
+        int rc = 0;
+        ENTRY;
 
         if (obd == NULL) {
                 CERROR("invalid client cookie "LPX64"\n", 
                        exp->exp_handle.h_cookie);
-                return -EINVAL;
+                RETURN(-EINVAL);
         }
         cobd_exp = cobd_get_exp(obd);
-        return md_valid_attrs(cobd_exp, id);
+        rc = md_valid_attrs(cobd_exp, id);
+        RETURN(rc);
 }
 
 static int cobd_md_rename(struct obd_export *exp, struct mdc_op_data *data,
@@ -1089,14 +1322,17 @@ static int cobd_md_rename(struct obd_export *exp, struct mdc_op_data *data,
 {
         struct obd_device *obd = class_exp2obd(exp);
         struct obd_export *cobd_exp;
+        int rc = 0;
+        ENTRY;
 
         if (obd == NULL) {
                 CERROR("invalid client cookie "LPX64"\n", 
                        exp->exp_handle.h_cookie);
-                return -EINVAL;
+                RETURN(-EINVAL);
         }
         cobd_exp = cobd_get_exp(obd);
-        return md_rename(cobd_exp, data, old, oldlen, new, newlen, request);
+        rc = md_rename(cobd_exp, data, old, oldlen, new, newlen, request);
+        RETURN(rc);
 }
 
 static int cobd_md_link(struct obd_export *exp, struct mdc_op_data *data,
@@ -1104,14 +1340,17 @@ static int cobd_md_link(struct obd_export *exp, struct mdc_op_data *data,
 {
         struct obd_device *obd = class_exp2obd(exp);
         struct obd_export *cobd_exp;
+        int rc = 0;
+        ENTRY;
 
         if (obd == NULL) {
                 CERROR("invalid client cookie "LPX64"\n", 
                        exp->exp_handle.h_cookie);
-                return -EINVAL;
+                RETURN(-EINVAL);
         }
         cobd_exp = cobd_get_exp(obd);
-        return md_link(cobd_exp, data, request);
+        rc = md_link(cobd_exp, data, request);
+        RETURN(rc);
 }
 
 static int cobd_md_setattr(struct obd_export *exp, struct mdc_op_data *data,
@@ -1121,15 +1360,18 @@ static int cobd_md_setattr(struct obd_export *exp, struct mdc_op_data *data,
 {
         struct obd_device *obd = class_exp2obd(exp);
         struct obd_export *cobd_exp;
+        int rc = 0;
+        ENTRY;
 
         if (obd == NULL) {
                 CERROR("invalid client cookie "LPX64"\n", 
                        exp->exp_handle.h_cookie);
-                return -EINVAL;
+                RETURN(-EINVAL);
         }
         cobd_exp = cobd_get_exp(obd);
-        return md_setattr(cobd_exp, data, iattr, ea,
+        rc = md_setattr(cobd_exp, data, iattr, ea,
                           ealen, ea2, ea2len, ea3, ea3len, request);
+        RETURN(rc);
 }
 
 static int cobd_md_readpage(struct obd_export *exp,
@@ -1139,14 +1381,17 @@ static int cobd_md_readpage(struct obd_export *exp,
 {
         struct obd_device *obd = class_exp2obd(exp);
         struct obd_export *cobd_exp;
+        int rc = 0;
+        ENTRY;
 
         if (obd == NULL) {
                 CERROR("invalid client cookie "LPX64"\n", 
                        exp->exp_handle.h_cookie);
-                return -EINVAL;
+                RETURN(-EINVAL);
         }
         cobd_exp = cobd_get_exp(obd);
-        return md_readpage(cobd_exp, mdc_id, offset, page, request);
+        rc = md_readpage(cobd_exp, mdc_id, offset, page, request);
+        RETURN(rc);
 }
 
 static int cobd_md_close(struct obd_export *exp, struct obdo *obdo,
@@ -1155,14 +1400,17 @@ static int cobd_md_close(struct obd_export *exp, struct obdo *obdo,
 {
         struct obd_device *obd = class_exp2obd(exp);
         struct obd_export *cobd_exp;
+        int rc = 0;
+        ENTRY;
 
         if (obd == NULL) {
                 CERROR("invalid client cookie "LPX64"\n", 
                        exp->exp_handle.h_cookie);
-                return -EINVAL;
+                RETURN(-EINVAL);
         }
         cobd_exp = cobd_get_exp(obd);
-        return md_close(cobd_exp, obdo, och, request);
+        rc = md_close(cobd_exp, obdo, och, request);
+        RETURN(rc);
 }
 
 static int cobd_md_done_writing(struct obd_export *exp,
@@ -1170,14 +1418,17 @@ static int cobd_md_done_writing(struct obd_export *exp,
 {
         struct obd_device *obd = class_exp2obd(exp);
         struct obd_export *cobd_exp;
+        int rc = 0;
+        ENTRY;
 
         if (obd == NULL) {
                 CERROR("invalid client cookie "LPX64"\n", 
                        exp->exp_handle.h_cookie);
-                return -EINVAL;
+                RETURN(-EINVAL);
         }
         cobd_exp = cobd_get_exp(obd);
-        return md_done_writing(cobd_exp, obdo);
+        rc = md_done_writing(cobd_exp, obdo);
+        RETURN(rc);
 }
 
 static int cobd_md_sync(struct obd_export *exp, struct lustre_id *id,
@@ -1185,15 +1436,17 @@ static int cobd_md_sync(struct obd_export *exp, struct lustre_id *id,
 {
         struct obd_device *obd = class_exp2obd(exp);
         struct obd_export *cobd_exp;
+        int rc = 0;
+        ENTRY;
 
         if (obd == NULL) {
                 CERROR("invalid client cookie "LPX64"\n", 
                        exp->exp_handle.h_cookie);
-                return -EINVAL;
+                RETURN(-EINVAL);
         }
         cobd_exp = cobd_get_exp(obd);
-        
-        return md_sync(cobd_exp, id, request);
+        rc = md_sync(cobd_exp, id, request);
+        RETURN(rc);
 }
 
 static int cobd_md_set_open_replay_data(struct obd_export *exp,
@@ -1202,15 +1455,17 @@ static int cobd_md_set_open_replay_data(struct obd_export *exp,
 {
         struct obd_device *obd = class_exp2obd(exp);
         struct obd_export *cobd_exp;
+        int rc = 0;
+        ENTRY;
 
         if (obd == NULL) {
                 CERROR("invalid client cookie "LPX64"\n", 
                        exp->exp_handle.h_cookie);
-                return -EINVAL;
+                RETURN(-EINVAL);
         }
         cobd_exp = cobd_get_exp(obd);
-        
-        return md_set_open_replay_data(cobd_exp, och, open_req);
+        rc = md_set_open_replay_data(cobd_exp, och, open_req);
+        RETURN(rc);
 }
 
 static int cobd_md_clear_open_replay_data(struct obd_export *exp,
@@ -1218,15 +1473,17 @@ static int cobd_md_clear_open_replay_data(struct obd_export *exp,
 {
         struct obd_device *obd = class_exp2obd(exp);
         struct obd_export *cobd_exp;
+        int rc = 0;
+        ENTRY;
 
         if (obd == NULL) {
                 CERROR("invalid client cookie "LPX64"\n", 
                        exp->exp_handle.h_cookie);
-                return -EINVAL;
+                RETURN(-EINVAL);
         }
         cobd_exp = cobd_get_exp(obd);
-        return md_clear_open_replay_data(cobd_exp, och);
+        rc = md_clear_open_replay_data(cobd_exp, och);
+        RETURN(rc);
 }
 
 static int cobd_md_store_inode_generation(struct obd_export *exp,
@@ -1235,15 +1492,17 @@ static int cobd_md_store_inode_generation(struct obd_export *exp,
 {
         struct obd_device *obd = class_exp2obd(exp);
         struct obd_export *cobd_exp;
+        int rc = 0;
+        ENTRY;
 
         if (obd == NULL) {
                 CERROR("invalid client cookie "LPX64"\n", 
                        exp->exp_handle.h_cookie);
-                return -EINVAL;
+                RETURN(-EINVAL);
         }
         cobd_exp = cobd_get_exp(obd);
-
-        return md_store_inode_generation(cobd_exp, req, reqoff, repoff);
+        rc = md_store_inode_generation(cobd_exp, req, reqoff, repoff);
+        RETURN(rc);
 }
 
 static int cobd_md_set_lock_data(struct obd_export *exp,
@@ -1251,15 +1510,17 @@ static int cobd_md_set_lock_data(struct obd_export *exp,
 {
         struct obd_device *obd = class_exp2obd(exp);
         struct obd_export *cobd_exp;
+        int rc = 0;
+        ENTRY;
 
         if (obd == NULL) {
                 CERROR("invalid client cookie "LPX64"\n", 
                        exp->exp_handle.h_cookie);
-                return -EINVAL;
+                RETURN(-EINVAL);
         }
         cobd_exp = cobd_get_exp(obd);
-
-        return md_set_lock_data(cobd_exp, l, data);
+        rc = md_set_lock_data(cobd_exp, l, data);
+        RETURN(rc);
 }
 
 static int cobd_md_enqueue(struct obd_export *exp, int lock_type,
@@ -1271,16 +1532,19 @@ static int cobd_md_enqueue(struct obd_export *exp, int lock_type,
 {
         struct obd_device *obd = class_exp2obd(exp);
         struct obd_export *cobd_exp;
+        int rc = 0;
+        ENTRY;
 
         if (obd == NULL) {
                 CERROR("invalid client cookie "LPX64"\n", 
                        exp->exp_handle.h_cookie);
-                return -EINVAL;
+                RETURN(-EINVAL);
         }
         cobd_exp = cobd_get_exp(obd);
-        return md_enqueue(cobd_exp, lock_type, it, lock_mode, data,
-                          lockh, lmm, lmmsize, cb_completion, cb_blocking,
-                          cb_data);
+        rc = md_enqueue(cobd_exp, lock_type, it, lock_mode, data,
+                        lockh, lmm, lmmsize, cb_completion, cb_blocking,
+                        cb_data);
+        RETURN(rc);
 }
 
 static int cobd_md_intent_lock(struct obd_export *exp, struct lustre_id *pid, 
@@ -1291,17 +1555,20 @@ static int cobd_md_intent_lock(struct obd_export *exp, struct lustre_id *pid,
 {
         struct obd_device *obd = class_exp2obd(exp);
         struct obd_export *cobd_exp;
+        int rc = 0;
+        ENTRY;
 
         if (obd == NULL) {
                 CERROR("invalid client cookie "LPX64"\n", 
                        exp->exp_handle.h_cookie);
-                return -EINVAL;
+                RETURN(-EINVAL);
         }
         lookup_flags |= LOOKUP_COBD;
         cobd_exp = cobd_get_exp(obd);
         
-        return md_intent_lock(cobd_exp, pid, name, len, lmm, lmmsize,
-                              cid, it, lookup_flags, reqp, cb_blocking);
+        rc = md_intent_lock(cobd_exp, pid, name, len, lmm, lmmsize,
+                            cid, it, lookup_flags, reqp, cb_blocking);
+        RETURN(rc);
 }
 
 static struct obd_device *cobd_md_get_real_obd(struct obd_export *exp,
@@ -1309,14 +1576,15 @@ static struct obd_device *cobd_md_get_real_obd(struct obd_export *exp,
 {
         struct obd_device *obd = class_exp2obd(exp);
         struct obd_export *cobd_exp;
+        ENTRY;
 
         if (obd == NULL) {
                 CERROR("invalid client cookie "LPX64"\n", 
                        exp->exp_handle.h_cookie);
-                return NULL;
+                RETURN(NULL);
         }
         cobd_exp = cobd_get_exp(obd);
-        return md_get_real_obd(cobd_exp, id);
+        RETURN(md_get_real_obd(cobd_exp, id));
 }
 
 static int cobd_md_change_cbdata_name(struct obd_export *exp,
@@ -1326,16 +1594,19 @@ static int cobd_md_change_cbdata_name(struct obd_export *exp,
 {
         struct obd_device *obd = class_exp2obd(exp);
         struct obd_export *cobd_exp;
+        int rc = 0;
 
         if (obd == NULL) {
                 CERROR("invalid client cookie "LPX64"\n", 
                        exp->exp_handle.h_cookie);
-                return -EINVAL;
+                RETURN(-EINVAL);
         }
         cobd_exp = cobd_get_exp(obd);
-        return md_change_cbdata_name(cobd_exp, id, name, namelen,
-                                     id2, it, data);
+        rc = md_change_cbdata_name(cobd_exp, id, name, namelen,
+                                   id2, it, data);
+        RETURN(rc);
 }
+
 static struct obd_ops cobd_obd_ops = {
         .o_owner                  = THIS_MODULE,
         .o_attach                 = cobd_attach,
@@ -1347,6 +1618,7 @@ static struct obd_ops cobd_obd_ops = {
         .o_set_info               = cobd_set_info,
         .o_get_info               = cobd_get_info,
         .o_statfs                 = cobd_statfs,
+        .o_iocontrol              = cobd_iocontrol,
 
         .o_packmd                 = cobd_dt_packmd,
         .o_unpackmd               = cobd_dt_unpackmd,
@@ -1372,7 +1644,6 @@ static struct obd_ops cobd_obd_ops = {
         .o_change_cbdata          = cobd_dt_change_cbdata,
         .o_cancel                 = cobd_dt_cancel,
         .o_cancel_unused          = cobd_dt_cancel_unused,
-        .o_iocontrol              = cobd_dt_iocontrol,
         .o_commitrw               = cobd_dt_commitrw,
         .o_llog_init              = cobd_dt_llog_init,
         .o_llog_finish            = cobd_dt_llog_finish,
@@ -1381,6 +1652,7 @@ static struct obd_ops cobd_obd_ops = {
         .o_unpin                  = cobd_dt_unpin,
         .o_import_event           = cobd_dt_import_event,
         .o_init_ea_size           = cobd_dt_init_ea_size,
+        .o_adjust_kms             = cobd_dt_adjust_kms,
 };
 
 struct md_ops cobd_md_ops = {
index 71dc9f1..5b966e3 100644 (file)
 
 #include <linux/obd_class.h>
 
-#define OBD_CMOBD_DEVICENAME    "cmobd"
-
 #define CMOBD_MAX_THREADS       32UL
-
 #define CMOBD_NUM_THREADS       max(min_t(unsigned long, num_physpages / 8192, \
                                           CMOBD_MAX_THREADS), 2UL)
 
 #define CMOBD_MAX_EXTENT_SZ     PTLRPC_MAX_BRW_PAGES * PAGE_SIZE
-
 #define CMOBD_MAX_EXTENTS       1024
 
 /* for keeping the capacity of handle multi extents simultaneously */
index dad6885..adfae7f 100644 (file)
@@ -19,8 +19,6 @@
 struct obd_ops;
 struct obd_device;
 
-#define OBD_LDLM_DEVICENAME  "ldlm"
-
 #define LDLM_DEFAULT_LRU_SIZE 100
 
 typedef enum {
index 2aacd72..2461eaf 100644 (file)
@@ -44,11 +44,16 @@ enum ea_type {
         EA_LOV   = (1 << 0),
         EA_MEA   = (1 << 1),
         EA_SID   = (1 << 2),
-        EA_MID   = (1 << 3),
-        EA_KEY   = (1 << 4),
-        EA_PID   = (1 << 5),
+        EA_KEY   = (1 << 3),
+        EA_PID   = (1 << 4),
 };
 
+#define XATTR_LUSTRE_MDS_LOV_EA "lov"
+#define XATTR_LUSTRE_MDS_MEA_EA "mea"
+#define XATTR_LUSTRE_MDS_SID_EA "sid"
+#define XATTR_LUSTRE_MDS_KEY_EA "key"
+#define XATTR_LUSTRE_MDS_PID_EA "pid"
+
 struct fsfilt_operations {
         struct list_head fs_list;
         struct module *fs_owner;
index 23902f4..fa924b2 100644 (file)
@@ -305,6 +305,7 @@ typedef uint32_t        obd_count;
 #define OBD_FL_IDONLY        (0x00000010) // if set in o_flags only adjust obj id
 #define OBD_FL_RECREATE_OBJS (0x00000020) // recreate missing obj
 #define OBD_FL_DEBUG_CHECK   (0x00000040) /* echo client/server debug check */
+#define OBD_FL_REINT         (0x00000080) /* reint during cache flush */
 
 /* this should be sizeof(struct lustre_handle) + sizeof(struct llog_cookie) +
  * sizeof(struct lustre_id). */
@@ -617,6 +618,11 @@ struct lustre_stc {
         } u;
 };
 
+struct fid_extent {
+        __u64 fe_start;
+        __u64 fe_width;
+};
+
 /* lustre file id */
 struct lustre_fid {
         __u64 lf_id;                   /* fid counter maintained on per 
@@ -745,25 +751,12 @@ struct mdc_op_data {
         struct mea      *mea1;       /* mea of inode1 */
         struct mea      *mea2;       /* mea of inode2 */
         __u64            valid;
+        __u32            flags;
 };
 
 #define MDS_MODE_DONT_LOCK      (1 << 30)
 #define MDS_MODE_REPLAY         (1 << 31)
 
-struct mds_rec_setattr {
-        __u32            sa_opcode;
-        __u32            sa_valid;
-        struct lustre_id sa_id;
-        __u32            sa_mode;
-        __u32            sa_uid;
-        __u32            sa_gid;
-        __u32            sa_attr_flags;
-        __u64            sa_size;
-        __u64            sa_atime;
-        __u64            sa_mtime;
-        __u64            sa_ctime;
-};
-
 /* XXX Following ATTR_XXX should go to vfs patch...  */
 #ifdef ATTR_CTIME_SET
 #error "ATTR_CTIME_SET has been defined somewhere else"
@@ -781,7 +774,6 @@ struct mds_rec_setattr {
 #define ATTR_EA_CMOBD   0x00100000
 #define ATTR_KEY        0x00200000
 #define ATTR_MAC        0x00400000
-extern void lustre_swab_mds_rec_setattr (struct mds_rec_setattr *sa);
 
 #ifndef FMODE_READ
 #define FMODE_READ               00000001
@@ -796,6 +788,7 @@ extern void lustre_swab_mds_rec_setattr (struct mds_rec_setattr *sa);
 #define MDS_OPEN_APPEND          00002000
 #define MDS_OPEN_SYNC            00010000
 #define MDS_OPEN_DIRECTORY       00200000
+#define MDS_REINT_REQ            01000000
 
 #define MDS_OPEN_DELAY_CREATE    0100000000   /* delay initial object create */
 #define MDS_OPEN_HAS_KEY         01000000000 /* just set the EA the obj exist */
@@ -819,6 +812,7 @@ extern void lustre_swab_mds_rec_create (struct mds_rec_create *cr);
 
 struct mds_rec_link {
         __u32            lk_opcode;
+        __u32            lk_flags;
         __u32            lk_padding;
         struct lustre_id lk_id1;
         struct lustre_id lk_id2;
@@ -829,6 +823,7 @@ extern void lustre_swab_mds_rec_link (struct mds_rec_link *lk);
 
 struct mds_rec_unlink {
         __u32            ul_opcode;
+        __u32            ul_flags;
         __u32            ul_mode;
         struct lustre_id ul_id1;
         struct lustre_id ul_id2;
@@ -839,12 +834,28 @@ extern void lustre_swab_mds_rec_unlink (struct mds_rec_unlink *ul);
 
 struct mds_rec_rename {
         __u32            rn_opcode;
+        __u32            rn_flags;
         __u32            rn_padding;
         struct lustre_id rn_id1;
         struct lustre_id rn_id2;
         __u64            rn_time;
 };
 
+struct mds_rec_setattr {
+        __u32            sa_opcode;
+        __u32            sa_flags;
+        __u32            sa_valid;
+        struct lustre_id sa_id;
+        __u32            sa_mode;
+        __u32            sa_uid;
+        __u32            sa_gid;
+        __u32            sa_attr_flags;
+        __u64            sa_size;
+        __u64            sa_atime;
+        __u64            sa_mtime;
+        __u64            sa_ctime;
+};
+
 extern void lustre_swab_mds_rec_rename (struct mds_rec_rename *rn);
 
 /*
@@ -1278,9 +1289,13 @@ typedef int (*crypt_cb_t)(struct page *page, __u64 offset, __u64 count,
 extern void lustre_swab_key_context (struct key_context *kctxt);
 extern void lustre_swab_key_perms (struct key_perm *kperm);
 #endif /*for define __KERNEL*/
+
 extern void lustre_swab_lustre_id(struct lustre_id *id);
 extern void lustre_swab_lov_desc(struct lov_desc *desc);
+extern void lustre_swab_fid_extent(struct fid_extent *ext);
 extern void lustre_swab_lustre_stc(struct lustre_stc *stc);
 extern void lustre_swab_lustre_fid(struct lustre_fid *fid);
 extern void lustre_swab_mds_status_req(struct mds_status_req *r);
+extern void lustre_swab_mds_rec_setattr(struct mds_rec_setattr *sa);
+
 #endif
index 6eada1e..3c9f081 100644 (file)
@@ -468,7 +468,6 @@ static inline void obd_ioctl_freedata(char *buf, int len)
 
 #define OBD_IOC_COBD_CON               _IOWR('f', 220, long)
 #define OBD_IOC_COBD_COFF              _IOWR('f', 221, long)
-#define OBD_IOC_COBD_CFLUSH            _IOWR('f', 222, long)
 
 #define OBD_IOC_SMFS_SNAP_ADD          _IOWR('f', 230, long)
 
index 290711e..3b413db 100644 (file)
@@ -49,10 +49,6 @@ struct ptlrpc_request;
 struct obd_device;
 struct ll_file_data;
 
-#define OBD_MDS_DEVICENAME "mds"
-#define OBD_MDT_DEVICENAME "mdt"
-#define OBD_MDC_DEVICENAME "mdc"
-
 struct mds_update_record {
         __u32               ur_opcode;
         struct lustre_id   *ur_id1;
@@ -288,6 +284,7 @@ mds_id2locked_dentry(struct obd_device *obd, struct lustre_id *id,
                      struct vfsmount **mnt, int lock_mode,
                      struct lustre_handle *lockh, int *pmode,
                      char *name, int namelen, __u64 lockpart);
+                    
 int mds_update_server_data(struct obd_device *, int force_sync);
 int mds_update_last_fid(struct obd_device *obd, void *handle, 
                        int force_sync);
index feec630..085014e 100644 (file)
@@ -10,8 +10,6 @@
 #ifndef LUSTRE_MGMT_H
 #define LUSTRE_MGMT_H
 
-#define OBD_MGMTCLI_DEVICENAME "mgmt_cli"
-
 /* For the convenience and type-safety of inter_module_getters. */
 
 struct obd_device;
index 0e47bd7..9b64f63 100644 (file)
@@ -759,7 +759,7 @@ void *lustre_swab_repbuf (struct ptlrpc_request *req, int n, int minlen,
 void lustre_init_msg (struct lustre_msg *msg, int count, 
                       int *lens, char **bufs);
 void *mdc_setattr_pack(struct lustre_msg *msg, int offset,
-                       struct mdc_op_data *data, struct iattr *iattr,
+                       struct mdc_op_data *op_data, struct iattr *iattr,
                        void *ea, int ealen, void *ea2, int ea2len, 
                        void *ea3, int ea3len);
 void *mdc_create_pack(struct lustre_msg *msg, int offset,
@@ -853,33 +853,6 @@ void mdc_pack_id(struct lustre_id *id, obd_id ino,
                 id_type((id)) = cpu_to_le32(id_type((id)));             \
         } while (0)
 
-#ifdef __KERNEL__
-static inline void
-mdc_inode2id(struct lustre_id *id, struct inode *inode)
-{
-        mdc_pack_id(id, inode->i_ino, inode->i_generation,
-                    (inode->i_mode & S_IFMT), 0, 0);
-}
-
-static inline void 
-mdc_prepare_mdc_data(struct mdc_op_data *data, struct inode *i1,
-                     struct inode *i2, const char *name, int namelen,
-                     int mode)
-{
-        LASSERT(i1);
-
-        mdc_inode2id(&data->id1, i1);
-        if (i2)
-                mdc_inode2id(&data->id2, i2);
-
-       data->valid = 0;
-        data->name = name;
-        data->namelen = namelen;
-        data->create_mode = mode;
-        data->mod_time = LTIME_S(CURRENT_TIME);
-}
-#endif
-
 /* ldlm/ldlm_lib.c */
 int client_obd_setup(struct obd_device *obddev, obd_count len, void *buf);
 int client_obd_cleanup(struct obd_device * obddev, int flags);
index e6d907c..e6cede1 100644 (file)
@@ -94,13 +94,13 @@ struct smfs_super_info {
         char                     *smsi_ftype;       /* file system type */
        struct obd_export        *smsi_exp;         /* file system obd exp */
        struct snap_super_info   *smsi_snap_info;   /* snap table cow */
-        //smfs_pack_rec_func            smsi_pack_rec[PACK_MAX]; /* sm_pack_rec type ops */
-        struct rw_semaphore      plg_sem; /*rw semaphore to protect plg operations */
-        __u32                    plg_flags;        /* flags */
+
+        struct rw_semaphore      plg_sem;           /* rw semaphore to protect
+                                                     * plg operations */
+        __u32                    plg_flags;         /* flags */
         __u32                    smsi_flags;
         __u32                    smsi_ops_check;
         struct list_head         smsi_plg_list;
-        //kmem_cache_t *           smsi_inode_cachep;  /*inode_cachep*/
 };
 
 
@@ -469,12 +469,13 @@ static inline struct dentry *pre_smfs_dentry(struct dentry *parent_dentry,
         if (!parent_dentry)
                 cache_dentry->d_parent = cache_dentry;
         
+        //defines d_delete op to force killing dentry
+        cache_dentry->d_op = &sm_dop;
+
         if (cache_inode) {
                 atomic_inc(&cache_inode->i_count); //d_instantiate suppose that
                 d_add(cache_dentry, cache_inode);
         }
-        //defines d_delete op to force killing dentry
-        cache_dentry->d_op = &sm_dop;
         
         RETURN(cache_dentry);
 }
index b53949a..784fc49 100644 (file)
 # include <linux/mount.h>
 #endif
 
+#define OBD_MDS_DEVICENAME         "mds"
+#define OBD_MDT_DEVICENAME         "mdt"
+#define OBD_MDC_DEVICENAME         "mdc"
+#define OBD_LMV_DEVICENAME         "lmv"
+#define OBD_LOV_DEVICENAME         "lov"
+#define OBD_OST_DEVICENAME         "ost"
+#define OBD_OSC_DEVICENAME         "osc"
+
+#define OBD_LDLM_DEVICENAME        "ldlm"
+#define OBD_CACHE_DEVICENAME       "cobd"
+#define OBD_CMOBD_DEVICENAME       "cmobd"
+#define OBD_CONF_DEVICENAME        "confobd"
+
+#define OBD_SANOSC_DEVICENAME      "sanosc"
+#define OBD_SANOST_DEVICENAME      "sanost"
+
+#define OBD_ECHO_DEVICENAME        "obdecho"
+#define OBD_ECHO_CLIENT_DEVICENAME "echo_client"
+
+#define OBD_FILTER_DEVICENAME      "obdfilter"
+#define OBD_FILTER_SAN_DEVICENAME  "sanobdfilter"
+
+#define OBD_MGMTCLI_DEVICENAME     "mgmt_cli"
+#define OBD_PTLBD_SV_DEVICENAME    "ptlbd_server"
+#define OBD_PTLBD_CL_DEVICENAME    "ptlbd_client"
+
 #include <linux/lvfs.h>
 #include <linux/lustre_lib.h>
 #include <linux/lustre_idl.h>
@@ -182,7 +208,6 @@ struct obd_service_time {
         __u64           st_total_us;
 };
 
-
 struct ost_server_data;
 
 #define FILTER_SUBDIR_COUNT      32            /* set to zero for no subdirs */
@@ -407,10 +432,19 @@ struct mds_obd {
         /* security related */
         char                            *mds_mds_sec;
         char                            *mds_ost_sec;
+
         /* which secure flavor from remote to this mds is denied */
         spinlock_t                      mds_denylist_lock;
         struct list_head                mds_denylist;
-        struct semaphore                mds_create_sem;
+
+        /* fid->ino mapping related fields */
+        spinlock_t                      mds_fidmap_lock;
+        struct hlist_head              *mds_fidmap_table;
+        unsigned long                   mds_fidmap_size;
+
+        /* cache fid extents stuff */
+        spinlock_t                      mds_fidext_lock;
+        __u64                           mds_fidext_thumb;
         int                             mds_crypto_type;
 };
 
@@ -478,6 +512,7 @@ struct cache_obd {
         int                     refcount;
         int                     cache_on;
         struct semaphore        sem;
+        struct lov_desc         dt_desc; /* data lovdesc */
 };
 
 struct cm_obd {
@@ -987,4 +1022,22 @@ static inline void obd_transno_commit_cb(struct obd_device *obd,
         }
 }
 
+static inline int obd_md_type(struct obd_device *obd)
+{
+        if (!strcmp(obd->obd_type->typ_name, OBD_MDC_DEVICENAME) ||
+            !strcmp(obd->obd_type->typ_name, OBD_LMV_DEVICENAME))
+                return 1;
+
+        return 0;
+}
+
+static inline int obd_dt_type(struct obd_device *obd)
+{
+        if (!strcmp(obd->obd_type->typ_name, OBD_LOV_DEVICENAME) ||
+            !strcmp(obd->obd_type->typ_name, OBD_OSC_DEVICENAME))
+                return 1;
+
+        return 0;
+}
+
 #endif /* __OBD_H */
index e75b9f4..c5ec326 100644 (file)
@@ -7,7 +7,5 @@
 
 #ifdef __KERNEL__
 
-#define OBD_CACHE_DEVICENAME "cobd"
-
 #endif
 #endif
index d208c2b..f5182dd 100644 (file)
@@ -44,8 +44,6 @@
 #include <linux/lprocfs_status.h>
 #include <linux/lustre_log.h>
 
-#define OBD_CONF_DEVICENAME "confobd"
-
 /* OBD Device Declarations */
 #define MAX_OBD_DEVICES 256
 extern struct obd_device obd_dev[MAX_OBD_DEVICES];
index d885579..d46d7ef 100644 (file)
@@ -10,9 +10,6 @@
 #ifndef _OBD_ECHO_H
 #define _OBD_ECHO_H
 
-#define OBD_ECHO_DEVICENAME "obdecho"
-#define OBD_ECHO_CLIENT_DEVICENAME "echo_client"
-
 /* The persistent object (i.e. actually stores stuff!) */
 #define ECHO_PERSISTENT_OBJID    1ULL
 #define ECHO_PERSISTENT_SIZE     ((__u64)(1<<20))
index ad5d653..5327e7a 100644 (file)
@@ -5,6 +5,4 @@
 #ifndef _OBD_LMV_H__
 #define _OBD_LMV_H__
 
-#define OBD_LMV_DEVICENAME "lmv"
-
 #endif
index 9692a9b..1e9ec7c 100644 (file)
@@ -5,8 +5,6 @@
 #ifndef _OBD_LOV_H__
 #define _OBD_LOV_H__
 
-#define OBD_LOV_DEVICENAME "lov"
-
 static inline int lov_stripe_md_size(int stripes)
 {
         return sizeof(struct lov_stripe_md) + stripes*sizeof(struct lov_oinfo);
index d1af9df..fbff6b3 100644 (file)
 
 #include <linux/obd_class.h>
 
-#define OBD_OST_DEVICENAME "ost"
-#define OBD_OSC_DEVICENAME "osc"
-#define OBD_SANOSC_DEVICENAME "sanosc"
-#define OBD_SANOST_DEVICENAME "sanost"
-
 struct osc_brw_async_args {
         struct obdo     *aa_oa;
         int              aa_requested_nob;
index c213dac..c371af9 100644 (file)
@@ -9,9 +9,6 @@
  * See the file COPYING in this distribution
  */
 
-#define OBD_PTLBD_SV_DEVICENAME "ptlbd_server"
-#define OBD_PTLBD_CL_DEVICENAME "ptlbd_client"
-
 /* XXX maybe this isn't the best header to be dumping all this in.. */
 
 extern int ptlbd_blk_init(void);
index adf05f0..5738725 100644 (file)
@@ -571,7 +571,7 @@ int target_check_deny_sec(struct obd_device *target, struct ptlrpc_request *req)
                 spin_lock(&target->u.mds.mds_denylist_lock);
                 rc = check_deny_list(&target->u.mds.mds_denylist, flavor);
                 spin_unlock(&target->u.mds.mds_denylist_lock);
-        } else if (!strcmp(target->obd_type->typ_name, "obdfilter")) {
+        } else if (!strcmp(target->obd_type->typ_name, OBD_FILTER_DEVICENAME)) {
                 spin_lock(&target->u.filter.fo_denylist_lock);
                 rc = check_deny_list(&target->u.filter.fo_denylist, flavor);
                 spin_unlock(&target->u.filter.fo_denylist_lock);
index a40f558..35217be 100644 (file)
@@ -657,6 +657,7 @@ static int ll_revalidate_nd(struct dentry *dentry, struct nameidata *nd)
 }
 #endif
 
+#if 0
 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
 static void ll_dentry_iput(struct dentry *dentry, struct inode *inode)
 {
@@ -692,6 +693,7 @@ static void ll_dentry_iput(struct dentry *dentry, struct inode *inode)
 
 }
 #endif
+#endif
 
 struct dentry_operations ll_d_ops = {
 #if (LINUX_VERSION_CODE > KERNEL_VERSION(2,5,0))
@@ -700,7 +702,9 @@ struct dentry_operations ll_d_ops = {
         .d_revalidate_it = ll_revalidate_it,
 #endif
         .d_release = ll_release,
-        /*.d_iput = ll_dentry_iput,*/
+#if 0
+        .d_iput = ll_dentry_iput,
+#endif
         .d_delete = ll_ddelete,
         .d_compare = ll_dcompare,
 #if 0
index 31219db..31ac436 100644 (file)
@@ -195,8 +195,10 @@ static int ll_close_thread(void *arg)
                 iput(inode);
         }
 
-        complete(&lcq->lcq_comp);
-        RETURN(0);
+        EXIT;
+
+        /* SMF-safe way to finish threads */
+        complete_and_exit(&lcq->lcq_comp, 0);
 }
 
 int ll_close_thread_start(struct ll_close_queue **lcq_ret)
@@ -224,7 +226,7 @@ int ll_close_thread_start(struct ll_close_queue **lcq_ret)
         return 0;
 }
 
-void ll_close_thread_shutdown(struct ll_close_queue *lcq)
+void ll_close_thread_stop(struct ll_close_queue *lcq)
 {
         init_completion(&lcq->lcq_comp);
         lcq->lcq_list.next = NULL;
index 0b88b4e..00f4b17 100644 (file)
@@ -453,7 +453,7 @@ static int inline ll_gns_check_stop(void)
 }
 
 /* GNS control thread function. */
-static int ll_gns_thread_main(void *arg)
+static int ll_gns_thread(void *arg)
 {
         struct ll_gns_ctl *ctl = arg;
         unsigned long flags;
@@ -532,7 +532,7 @@ int ll_gns_start_thread(void)
         init_completion(&gns_ctl.gc_finishing);
         init_waitqueue_head(&gns_thread.t_ctl_waitq);
         
-        rc = kernel_thread(ll_gns_thread_main, &gns_ctl,
+        rc = kernel_thread(ll_gns_thread, &gns_ctl,
                            (CLONE_VM | CLONE_FILES));
         if (rc < 0) {
                 CERROR("cannot start GNS control thread, "
index a9104cd..a71ac76 100644 (file)
@@ -444,7 +444,7 @@ void ll_open_complete(struct inode *inode);
 int ll_is_inode_dirty(struct inode *inode);
 void ll_try_done_writing(struct inode *inode);
 void ll_queue_done_writing(struct inode *inode);
-void ll_close_thread_shutdown(struct ll_close_queue *lcq);
+void ll_close_thread_stop(struct ll_close_queue *lcq);
 int ll_close_thread_start(struct ll_close_queue **lcq_ret);
 
 
index f506717..b23522e 100644 (file)
@@ -452,7 +452,7 @@ void lustre_common_put_super(struct super_block *sb)
         ENTRY;
 
         ll_gns_del_timer(sbi);
-        ll_close_thread_shutdown(sbi->ll_lcq);
+        ll_close_thread_stop(sbi->ll_lcq);
 
         lustre_destroy_crypto(sb);
 
index 79d2aff..994e002 100644 (file)
@@ -1869,13 +1869,13 @@ static int lmv_get_info(struct obd_export *exp, __u32 keylen,
         }
 
         lmv = &obd->u.lmv;
-        if (keylen == 6 && memcmp(key, "mdsize", 6) == 0) {
+        if (keylen == strlen("mdsize") && !strcmp(key, "mdsize")) {
                 __u32 *mdsize = val;
                 *vallen = sizeof(__u32);
                 *mdsize = sizeof(struct lustre_id) * lmv->desc.ld_tgt_count
                         + sizeof(struct mea);
                 RETURN(0);
-        } else if (keylen == 6 && memcmp(key, "mdsnum", 6) == 0) {
+        } else if (keylen == strlen("mdsnum") && !strcmp(key, "mdsnum")) {
                 struct obd_uuid *cluuid = &lmv->cluuid;
                 struct lmv_tgt_desc *tgts;
                 __u32 *mdsnum = val;
@@ -1889,7 +1889,7 @@ static int lmv_get_info(struct obd_export *exp, __u32 keylen,
                         }
                 }
                 LASSERT(0);
-        } else if (keylen == 6 && memcmp(key, "rootid", 6) == 0) {
+        } else if (keylen == strlen("rootid") && !strcmp(key, "rootid")) {
                 rc = lmv_check_connect(obd);
                 if (rc)
                         RETURN(rc);
@@ -1898,12 +1898,11 @@ static int lmv_get_info(struct obd_export *exp, __u32 keylen,
                 rc = obd_get_info(lmv->tgts[0].ltd_exp, keylen, key,
                                   vallen, val);
                 RETURN(rc);
-        } else if (keylen >= strlen("lmvdesc") && strcmp(key, "lmvdesc") == 0) {
+        } else if (keylen >= strlen("lmvdesc") && !strcmp(key, "lmvdesc")) {
                 struct lmv_desc *desc_ret = val;
                 *desc_ret = lmv->desc;
                 RETURN(0);
-        } else if (keylen == strlen("remote_flag") &&
-                   !strcmp(key, "remote_flag")) {
+        } else if (keylen >= strlen("remote_flag") && !strcmp(key, "remote_flag")) {
                 struct lmv_tgt_desc *tgts;
                 int i;
 
@@ -1926,7 +1925,7 @@ static int lmv_get_info(struct obd_export *exp, __u32 keylen,
                                 RETURN(0);
                 }
                 RETURN(-EINVAL);
-        } else if (keylen >= strlen("lovdesc") && strcmp(key, "lovdesc") == 0) {
+        } else if (keylen >= strlen("lovdesc") && !strcmp(key, "lovdesc")) {
                 rc = lmv_check_connect(obd);
                 if (rc)
                         RETURN(rc);
@@ -1936,6 +1935,30 @@ static int lmv_get_info(struct obd_export *exp, __u32 keylen,
                 rc = obd_get_info(lmv->tgts[0].ltd_exp, keylen, key,
                                   vallen, val);
                 RETURN(rc);
+        } else if (keylen >= strlen("getext") && !strcmp(key, "getext")) {
+                struct lmv_tgt_desc *tgts;
+                int i;
+
+                rc = lmv_check_connect(obd);
+                if (rc)
+                        RETURN(rc);
+
+                LASSERT(*vallen == sizeof(struct fid_extent));
+                for (i = 0, tgts = lmv->tgts; i < lmv->desc.ld_tgt_count;
+                     i++, tgts++) {
+
+                        /* all tgts should be connected when this get called. */
+                        if (!tgts || !tgts->ltd_exp) {
+                                CERROR("target not setup?\n");
+                                continue;
+                        }
+
+                        rc = obd_get_info(tgts->ltd_exp, keylen, key,
+                                          vallen, val);
+                        if (rc)
+                                RETURN(rc);
+                }
+                RETURN(0);
         }
 
         CDEBUG(D_IOCTL, "invalid key\n");
index e1c0226..1ed048b 100644 (file)
@@ -2222,6 +2222,21 @@ static int lov_set_info(struct obd_export *exp, obd_count keylen,
                 }
 
                 RETURN(0);
+        } else if (KEY_IS("setext")) {
+                struct lov_tgt_desc *tgt;
+                int rc = 0, i;
+
+                for (i = 0, tgt = lov->tgts; i < lov->desc.ld_tgt_count;
+                     i++, tgt++) {
+                        if (!tgt->ltd_exp)
+                                continue;
+                        rc = obd_set_info(tgt->ltd_exp,
+                                          keylen, key, vallen, val);
+                        if (rc)
+                                RETURN(rc);
+                }
+
+                RETURN(0);
         } else {
                 RETURN(-EINVAL);
         }
index bdb6252..8e40cde 100644 (file)
@@ -51,7 +51,6 @@
 #include <linux/iobuf.h>
 #endif
 
-
 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,6,7))
 # define lock_24kernel() lock_kernel()
 # define unlock_24kernel() unlock_kernel()
@@ -75,13 +74,6 @@ struct fsfilt_cb_data {
 #define EXT3_XATTR_INDEX_TRUSTED        4
 #endif
 
-#define XATTR_LUSTRE_MDS_LOV_EA         "lov"
-#define XATTR_LUSTRE_MDS_MEA_EA         "mea"
-#define XATTR_LUSTRE_MDS_MID_EA         "mid"
-#define XATTR_LUSTRE_MDS_SID_EA         "sid"
-#define XATTR_LUSTRE_MDS_PID_EA         "pid"
-#define XATTR_LUSTRE_MDS_KEY_EA         "key"
-
 /*
  * We don't currently need any additional blocks for rmdir and
  * unlink transactions because we are storing the OST oa_id inside
@@ -509,11 +501,6 @@ static int fsfilt_ext3_set_md(struct inode *inode, void *handle,
                                            XATTR_LUSTRE_MDS_SID_EA,
                                            lmm, lmm_size);
                 break;
-        case EA_MID:
-                rc = fsfilt_ext3_set_xattr(inode, handle,
-                                           XATTR_LUSTRE_MDS_MID_EA,
-                                           lmm, lmm_size);
-                break;
         case EA_PID:
                  rc = fsfilt_ext3_set_xattr(inode, handle,
                                             XATTR_LUSTRE_MDS_PID_EA,
@@ -552,11 +539,6 @@ static int fsfilt_ext3_get_md(struct inode *inode, void *lmm,
                                            XATTR_LUSTRE_MDS_SID_EA,
                                            lmm, lmm_size);
                 break;
-        case EA_MID:
-                rc = fsfilt_ext3_get_xattr(inode,
-                                           XATTR_LUSTRE_MDS_MID_EA,
-                                           lmm, lmm_size);
-                break;
         case EA_PID:
                 rc = fsfilt_ext3_get_xattr(inode,
                                            XATTR_LUSTRE_MDS_PID_EA,
index 2db772c..008c7ef 100644 (file)
@@ -444,7 +444,7 @@ static int lvfs_memdbg_init(int size)
         LASSERT(size > sizeof(sizeof(struct hlist_head)));
         obd_memtable_size = size / sizeof(struct hlist_head);
 
-        CWARN("allocating %lu malloc entries...\n",
+        CWARN("allocating %lu malloc entries\n",
               (unsigned long)obd_memtable_size);
 
         obd_memtable = kmalloc(size, GFP_KERNEL);
index 6a0abe9..c2faf05 100644 (file)
@@ -28,7 +28,8 @@ int mdc_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp,
                  struct lov_mds_md *lmm, int lmm_size);
 
 void mdc_getattr_pack(struct lustre_msg *msg, int offset,
-                      __u64 valid, int flags, struct mdc_op_data *data);
+                      __u64 valid, int flags,
+                      struct mdc_op_data *op_data);
 
 void mdc_open_pack(struct lustre_msg *msg, int offset,
                    struct mdc_op_data *op_data, __u32 mode,
index 901e4a1..5967d97 100644 (file)
@@ -66,7 +66,7 @@ void mdc_open_pack(struct lustre_msg *msg, int offset,
         if (op_data != NULL)
                 rec->cr_id = op_data->id1;
         memset(&rec->cr_replayid, 0, sizeof(rec->cr_replayid));
-        rec->cr_flags = mds_pack_open_flags(flags);
+        rec->cr_flags = mds_pack_open_flags(flags) | op_data->flags ;
         rec->cr_time = op_data->mod_time;
         rec->cr_mode = mode;
         rec->cr_rdev = rdev;
index 550fbd5..01fd020 100644 (file)
@@ -602,8 +602,8 @@ int mdc_clear_open_replay_data(struct obd_export *exp,
 static void mdc_commit_close(struct ptlrpc_request *req)
 {
         struct mdc_open_data *mod = req->rq_cb_data;
-        struct ptlrpc_request *open_req;
         struct obd_import *imp = req->rq_import;
+        struct ptlrpc_request *open_req;
 
         DEBUG_REQ(D_HA, req, "close req committed");
         if (mod == NULL)
@@ -936,6 +936,20 @@ int mdc_set_info(struct obd_export *exp, obd_count keylen,
                 CDEBUG(D_HA, "%s: set async = %d\n",
                        exp->exp_obd->obd_name, cl->cl_async);
                 RETURN(0);
+        } else if (keylen == strlen("setext") && memcmp(key, "setext", keylen) == 0) {
+                struct ptlrpc_request *req;
+                char *bufs[2] = {key, val};
+                int rc, size[2] = {keylen, vallen};
+
+                req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OBD_VERSION,
+                                      OST_SET_INFO, 2, size, bufs);
+                if (req == NULL)
+                        RETURN(-ENOMEM);
+
+                req->rq_replen = lustre_msg_size(0, NULL);
+                rc = ptlrpc_queue_wait(req);
+                ptlrpc_req_finished(req);
+                RETURN(rc);
         } else if (keylen == 5 && strcmp(key, "audit") == 0) {
                 struct ptlrpc_request *req;
                 char *bufs[2] = {key, val};
@@ -1322,7 +1336,7 @@ static int mdc_get_info(struct obd_export *exp, __u32 keylen,
         if (!valsize || !val)
                 RETURN(-EFAULT);
 
-        if (keylen == strlen("remote_flag") && !strcmp(key, "remote_flag")) {
+        if (keylen >= strlen("remote_flag") && !strcmp(key, "remote_flag")) {
                 struct obd_import *imp;
                 struct obd_connect_data *data;
 
@@ -1351,6 +1365,8 @@ static int mdc_get_info(struct obd_export *exp, __u32 keylen,
 
         if ((keylen < strlen("mdsize") || strcmp(key, "mdsize") != 0) &&
             (keylen < strlen("mdsnum") || strcmp(key, "mdsnum") != 0) &&
+            (keylen < strlen("lovdesc") || strcmp(key, "lovdesc") != 0) &&
+            (keylen < strlen("getext") || strcmp(key, "getext") != 0) &&
             (keylen < strlen("rootid") || strcmp(key, "rootid") != 0) &&
             (keylen < strlen("auditid") || strcmp(key, "auditid") != 0))
                 RETURN(-EPROTO);
@@ -1389,6 +1405,18 @@ static int mdc_get_info(struct obd_export *exp, __u32 keylen,
 
                 *(struct lov_desc *)val = *reply;
                 RETURN(0);
+        } else if (keylen >= strlen("getext") && !strcmp(key, "getext")) {
+                struct fid_extent *reply;
+                
+                reply = lustre_swab_repbuf(req, 0, sizeof(*reply),
+                                           lustre_swab_fid_extent);
+                if (reply == NULL) {
+                        CERROR("Can't unpack %s\n", (char *)key);
+                        GOTO(out_req, rc = -EPROTO);
+                }
+
+                *(struct fid_extent *)val = *reply;
+                RETURN(0);
         } else {
                 __u32 *reply;
                 
index 84cc5d1..2a9fda7 100644 (file)
@@ -330,13 +330,36 @@ struct dentry *mds_id2locked_dentry(struct obd_device *obd, struct lustre_id *id
 struct dentry *mds_id2dentry(struct obd_device *obd, struct lustre_id *id,
                              struct vfsmount **mnt)
 {
-        unsigned long ino = (unsigned long)id_ino(id);
-        __u32 generation = (__u32)id_gen(id);
         struct mds_obd *mds = &obd->u.mds;
         struct dentry *result;
         struct inode *inode;
+        unsigned long ino;
+        __u32 generation;
         char idname[32];
 
+        if (!id_ino(id) && id_fid(id)) {
+                struct lustre_id *lid;
+                
+                /* if this is reint case we should use fidmap for resolving
+                 * correct local store cookie. */
+                lid = mds_fidmap_lookup(obd, id);
+                if (!lid)
+                        RETURN(ERR_PTR(-ENOENT));
+
+                ino = (unsigned long)id_ino(lid);
+                generation = (__u32)id_gen(lid);
+
+                CDEBUG(D_DENTRY, "fidmap resolved "DLID4"->"DLID4"\n",
+                       OLID4(id), OLID4(lid));
+
+        } else if (id_ino(id)) {
+                ino = (unsigned long)id_ino(id);
+                generation = (__u32)id_gen(id);
+        } else {
+                CERROR("invalid id for lookup "
+                       DLID4"\n", OLID4(id));
+        }
+
         if (ino == 0)
                 RETURN(ERR_PTR(-ESTALE));
 
@@ -346,7 +369,7 @@ struct dentry *mds_id2dentry(struct obd_device *obd, struct lustre_id *id,
                ino, generation, mds->mds_sb);
 
         /* under ext3 this is neither supposed to return bad inodes nor NULL
-           inodes. */
+         * inodes. */
         result = ll_lookup_one_len(idname, mds->mds_id_de, 
                                    strlen(idname));
         if (IS_ERR(result))
@@ -365,7 +388,7 @@ struct dentry *mds_id2dentry(struct obd_device *obd, struct lustre_id *id,
 
         /* here we disabled generation check, as root inode i_generation
          * of cache mds and real mds are different. */
-        if (inode->i_ino != id_ino(&mds->mds_rootid) && generation &&
+        if (id_fid(id) != id_fid(&mds->mds_rootid) && generation != 0 &&
             inode->i_generation != generation) {
                 /* we didn't find the right inode.. */
                 if (id_group(id) != mds->mds_num) {
@@ -1577,8 +1600,7 @@ static int mds_getattr_lock(struct ptlrpc_request *req, int offset,
 
                         dchild = mds_id2locked_dentry(obd, &body->id1, NULL,
                                                       LCK_PR, parent_lockh,
-                                                      &update_mode,
-                                                      NULL, 0, 
+                                                      &update_mode, NULL, 0, 
                                                       MDS_INODELOCK_UPDATE);
                         if (IS_ERR(dchild)) {
                                 CERROR("can't find inode with id "DLID4", err = %d\n", 
@@ -1988,92 +2010,6 @@ out:
         return 0;
 }
 
-/* update master MDS ID, which is stored in local inode EA. */
-int mds_update_mid(struct obd_device *obd, struct lustre_id *id,
-                   void *data, int data_len)
-{
-        struct mds_obd *mds = &obd->u.mds;
-        struct dentry *dentry;
-        void *handle;
-        int rc = 0;
-        ENTRY;
-
-        LASSERT(id);
-        LASSERT(obd);
-        
-        dentry = mds_id2dentry(obd, id, NULL);
-        if (IS_ERR(dentry))
-                GOTO(out, rc = PTR_ERR(dentry));
-
-        if (!dentry->d_inode) {
-                CERROR("Can't find object "DLID4".\n",
-                       OLID4(id));
-                GOTO(out_dentry, rc = -EINVAL);
-        }
-
-        handle = fsfilt_start(obd, dentry->d_inode,
-                              FSFILT_OP_SETATTR, NULL);
-        if (IS_ERR(handle))
-                GOTO(out_dentry, rc = PTR_ERR(handle));
-
-        rc = mds_update_inode_mid(obd, dentry->d_inode, handle,
-                                  (struct lustre_id *)data);
-        if (rc) {
-                CERROR("Can't update inode "DLID4" master id, "
-                       "error = %d.\n", OLID4(id), rc);
-                GOTO(out_commit, rc);
-        }
-
-        EXIT;
-out_commit:
-        fsfilt_commit(obd, mds->mds_sb, dentry->d_inode,
-                      handle, 0);
-out_dentry:
-        l_dput(dentry);
-out:
-        return rc;
-}
-EXPORT_SYMBOL(mds_update_mid);
-
-/* read master MDS ID, which is stored in local inode EA. */
-int mds_read_mid(struct obd_device *obd, struct lustre_id *id,
-                 void *data, int data_len)
-{
-        struct dentry *dentry;
-        int rc = 0;
-        ENTRY;
-
-        LASSERT(id);
-        LASSERT(obd);
-        
-        dentry = mds_id2dentry(obd, id, NULL);
-        if (IS_ERR(dentry))
-                GOTO(out, rc = PTR_ERR(dentry));
-
-        if (!dentry->d_inode) {
-                CERROR("Can't find object "DLID4".\n",
-                       OLID4(id));
-                GOTO(out_dentry, rc = -EINVAL);
-        }
-
-        down(&dentry->d_inode->i_sem);
-        rc = mds_read_inode_mid(obd, dentry->d_inode,
-                                (struct lustre_id *)data);
-        up(&dentry->d_inode->i_sem);
-        if (rc) {
-                CERROR("Can't read inode "DLID4" master id, "
-                       "error = %d.\n", OLID4(id), rc);
-                GOTO(out_dentry, rc);
-        }
-
-        EXIT;
-out_dentry:
-        l_dput(dentry);
-out:
-        return rc;
-}
-EXPORT_SYMBOL(mds_read_mid);
-
 int mds_read_md(struct obd_device *obd, struct lustre_id *id, 
                 char **data, int *datalen)
 {
@@ -2517,6 +2453,7 @@ repeat:
         new->d_fsdata = (void *)&dp;
         dp.p_inum = 0;
         dp.p_ptr = req;
+        
         dp.p_fid = fid;
         dp.p_group = mds->mds_num;
 
@@ -2648,6 +2585,16 @@ repeat:
 
         EXIT;
 cleanup:
+        if (rc == 0 && (body->oa.o_flags & OBD_FL_REINT)) {
+                rc = mds_fidmap_add(obd, &id);
+                if (rc < 0) {
+                        CERROR("can't create fid->ino mapping, "
+                               "err %d\n", rc);
+                } else {
+                        rc = 0;
+                }
+        }
+                
         switch (cleanup_phase) {
         case 2: /* object has been created, but we'll may want to replay it later */
                 if (rc == 0)
@@ -2678,6 +2625,8 @@ static int mdt_get_info(struct ptlrpc_request *req)
 
         if ((keylen < strlen("mdsize") || strcmp(key, "mdsize") != 0) &&
             (keylen < strlen("mdsnum") || strcmp(key, "mdsnum") != 0) &&
+            (keylen < strlen("lovdesc") || strcmp(key, "lovdesc") != 0) &&
+            (keylen < strlen("getext") || strcmp(key, "getext") != 0) &&
             (keylen < strlen("rootid") || strcmp(key, "rootid") != 0))
                 RETURN(-EPROTO);
 
@@ -2691,6 +2640,26 @@ static int mdt_get_info(struct ptlrpc_request *req)
 
                 reply = lustre_msg_buf(req->rq_repmsg, 0, size);
                 rc = obd_get_info(exp, keylen, key, (__u32 *)&size, reply);
+        } else if (keylen >= strlen("lovdesc") && !strcmp(key, "lovdesc")) {
+                struct lov_desc *reply;
+                int size = sizeof(*reply);
+                
+                rc = lustre_pack_reply(req, 1, &size, NULL);
+                if (rc)
+                        RETURN(rc);
+
+                reply = lustre_msg_buf(req->rq_repmsg, 0, size);
+                rc = obd_get_info(exp, keylen, key, (__u32 *)&size, reply);
+        } else if (keylen >= strlen("getext") && !strcmp(key, "getext")) {
+                struct fid_extent *reply;
+                int size = sizeof(*reply);
+                
+                rc = lustre_pack_reply(req, 1, &size, NULL);
+                if (rc)
+                        RETURN(rc);
+
+                reply = lustre_msg_buf(req->rq_repmsg, 0, size);
+                rc = obd_get_info(exp, keylen, key, (__u32 *)&size, reply);
         } else {
                 obd_id *reply;
                 int size = sizeof(*reply);
@@ -2723,23 +2692,7 @@ static int mds_set_info(struct obd_export *exp, __u32 keylen,
         }
 
         mds = &obd->u.mds;
-        if (keylen >= strlen("mds_type") &&
-             memcmp(key, "mds_type", keylen) == 0) {
-                int valsize;
-                __u32 group;
-                
-                CDEBUG(D_IOCTL, "set mds type to %x\n", *(int*)val);
-                
-                mds->mds_obd_type = *(int*)val;
-                group = FILTER_GROUP_FIRST_MDS + mds->mds_obd_type;
-                valsize = sizeof(group);
-                
-                /* mds number has been changed, so the corresponding obdfilter
-                 * exp need to be changed too. */
-                rc = obd_set_info(mds->mds_dt_exp, strlen("mds_conn"),
-                                  "mds_conn", valsize, &group);
-                RETURN(rc);
-        } else if (keylen == 5 && memcmp(key, "audit", 5) == 0) {
+        if (keylen == 5 && memcmp(key, "audit", 5) == 0) {
                 rc = mds_set_audit(obd, val);
                 RETURN(rc);
         } else if (keylen >= strlen("ids") && memcmp(key, "ids", keylen) == 0) {
@@ -2785,7 +2738,28 @@ out_put:
                 rc = mds_set_crypto_type(obd, val, vallen); 
                 RETURN(rc);
         }
+        
+        if (keylen >= strlen("setext") && !memcmp(key, "setext", keylen)) {
+                struct fid_extent *ext = val;
+
+                CDEBUG(D_IOCTL, "set last fid to extent ["LPD64"-"LPD64"]\n",
+                       ext->fe_start, ext->fe_width);
 
+                /* set lastfid into fid extent start. All next object creates
+                 * will use that fid. */
+                mds_set_last_fid(obd, ext->fe_start);
+
+                /* setting the same extent to OSC to avoid ids intersecting in
+                 * object ids, as all cache MDSs have the same group 0. */
+                rc = obd_set_info(mds->mds_dt_exp, strlen("setext"),
+                                  "setext", sizeof(*ext), ext);
+                if (rc) {
+                        CERROR("can't set extent ["LPD64"-"LPD64"] to %s, "
+                               "err %d\n", ext->fe_start, ext->fe_width,
+                               mds->mds_dt_exp->exp_obd->obd_name, rc);
+                }
+                RETURN(rc);
+        }
         CDEBUG(D_IOCTL, "invalid key\n");
         RETURN(-EINVAL);
 }
@@ -2804,9 +2778,7 @@ static int mdt_set_info(struct ptlrpc_request *req)
         }
         keylen = req->rq_reqmsg->buflens[0];
 
-        if ((keylen == strlen("mds_type") &&
-            memcmp(key, "mds_type", keylen) == 0) ||
-            (keylen == strlen("crypto_type") &&
+        if ((keylen == strlen("crypto_type") &&
             memcmp(key, "crypto_type", keylen) == 0)) {
                 rc = lustre_pack_reply(req, 0, NULL, NULL);
                 if (rc)
@@ -2852,6 +2824,18 @@ static int mdt_set_info(struct ptlrpc_request *req)
                 rc = obd_set_info(exp, keylen, key, vallen, ids);
                 req->rq_repmsg->status = rc;
                 RETURN(rc);
+        } else if (keylen == strlen("setext") && 
+                   memcmp(key, "setext", keylen) == 0) {
+                rc = lustre_pack_reply(req, 0, NULL, NULL);
+                if (rc)
+                        RETURN(rc);
+                
+                val = lustre_msg_buf(req->rq_reqmsg, 1, 0);
+                vallen = req->rq_reqmsg->buflens[1];
+
+                rc = obd_set_info(exp, keylen, key, vallen, val);
+                req->rq_repmsg->status = 0;
+                RETURN(rc);
         }
 
         CDEBUG(D_IOCTL, "invalid key\n");
@@ -3548,61 +3532,6 @@ int mds_update_inode_ids(struct obd_device *obd, struct inode *inode,
         RETURN(rc);
 }
 
-/* 
- * reads inode id on master MDS. This is usualy done by CMOBD to update requests
- * to master MDS by correct store cookie, needed to find inode on master MDS
- * quickly.
- */
-int mds_read_inode_mid(struct obd_device *obd, struct inode *inode,
-                       struct lustre_id *id)
-{
-        int rc;
-        ENTRY;
-
-        LASSERT(id != NULL);
-        LASSERT(obd != NULL);
-        LASSERT(inode != NULL);
-
-        rc = fsfilt_get_md(obd, inode, id, sizeof(*id), EA_MID);
-        if (rc < 0) {
-                CERROR("fsfilt_get_md() failed, rc = %d\n", rc);
-                RETURN(rc);
-        } else if (!rc) {
-                rc = -ENODATA;
-                RETURN(rc);
-        } else {
-                rc = 0;
-        }
-
-        RETURN(rc);
-}
-
-/*
- * updates master inode id. Usualy this is done by CMOBD after an inode is
- * created and relationship between cache MDS and master one should be
- * established.
- */
-int mds_update_inode_mid(struct obd_device *obd, struct inode *inode,
-                         void *handle, struct lustre_id *id)
-{
-        int rc = 0;
-        ENTRY;
-
-        LASSERT(id != NULL);
-        LASSERT(obd != NULL);
-        LASSERT(inode != NULL);
-        
-        rc = fsfilt_set_md(obd, inode, handle, id,
-                           sizeof(*id), EA_MID);
-        if (rc) {
-                CERROR("fsfilt_set_md() failed, "
-                       "rc = %d\n", rc);
-                RETURN(rc);
-        }
-
-        RETURN(rc);
-}
-
 /* mount the file system (secretly) */
 static int mds_setup(struct obd_device *obd, obd_count len, void *buf)
 {
@@ -3647,7 +3576,6 @@ static int mds_setup(struct obd_device *obd, obd_count len, void *buf)
         /* we have to know mdsnum before touching underlying fs -bzzz */
         atomic_set(&mds->mds_open_count, 0);
         sema_init(&mds->mds_md_sem, 1);
-        sema_init(&mds->mds_create_sem, 1);
         mds->mds_md_connected = 0;
         mds->mds_md_name = NULL;
 
@@ -3706,8 +3634,11 @@ static int mds_setup(struct obd_device *obd, obd_count len, void *buf)
 
         CDEBUG(D_SUPER, "%s: mnt = %p\n", lustre_cfg_string(lcfg, 1), mnt);
 
+        mds->mds_fidext_thumb = 0;
         sema_init(&mds->mds_epoch_sem, 1);
         atomic_set(&mds->mds_real_clients, 0);
+        spin_lock_init(&mds->mds_fidext_lock);
+        spin_lock_init(&mds->mds_fidmap_lock);
         spin_lock_init(&mds->mds_transno_lock);
         spin_lock_init(&mds->mds_last_fid_lock);
         sema_init(&mds->mds_orphan_recovery_sem, 1);
@@ -3734,7 +3665,6 @@ static int mds_setup(struct obd_device *obd, obd_count len, void *buf)
 
                 GOTO(err_fs, rc);
 
-
         if (LUSTRE_CFG_BUFLEN(lcfg, 3) > 0 && lustre_cfg_buf(lcfg, 3) &&
             strncmp(lustre_cfg_string(lcfg, 3), "dumb", 
                     LUSTRE_CFG_BUFLEN(lcfg, 3))) {
@@ -4598,6 +4528,19 @@ static int mds_get_info(struct obd_export *exp, __u32 keylen,
                 RETURN(0);
         }
         
+        if (keylen >= strlen("getext") && strcmp(key, "getext") == 0) {
+                struct fid_extent *ext = val;
+                *valsize = sizeof(*ext);
+
+                spin_lock(&mds->mds_fidext_lock);
+                ext->fe_width = MDS_FIDEXT_SIZE;
+                ext->fe_start = mds->mds_fidext_thumb + 1;
+                mds->mds_fidext_thumb += MDS_FIDEXT_SIZE;
+                spin_unlock(&mds->mds_fidext_lock);
+                
+                RETURN(0);
+        }
+        
         rc = fsfilt_get_info(obd, mds->mds_sb, NULL, keylen, key, valsize, val);
         if (rc)
                 CDEBUG(D_IOCTL, "invalid key\n");
@@ -4610,10 +4553,10 @@ struct lvfs_callback_ops mds_lvfs_ops = {
 };
 
 int mds_preprw(int cmd, struct obd_export *exp, struct obdo *oa,
-                int objcount, struct obd_ioobj *obj,
-                int niocount, struct niobuf_remote *nb,
-                struct niobuf_local *res,
-                struct obd_trans_info *oti);
+               int objcount, struct obd_ioobj *obj,
+               int niocount, struct niobuf_remote *nb,
+               struct niobuf_local *res,
+               struct obd_trans_info *oti);
 
 int mds_commitrw(int cmd, struct obd_export *exp, struct obdo *oa,
                  int objcount, struct obd_ioobj *obj, int niocount,
index 59a5d98..ee59aa0 100644 (file)
 #define LAST_FID  "last_fid"
 #define VIRT_FID  "virt_fid"
 
+struct fidmap_entry {
+        struct hlist_node fm_hash;
+        struct lustre_id  fm_id;
+};
+
+int mds_fidmap_init(struct obd_device *obd, int size)
+{
+        struct mds_obd *mds = &obd->u.mds;
+        struct hlist_head *head;
+        int i = 0;
+        ENTRY;
+
+        LASSERT(size > sizeof(sizeof(struct hlist_head)));
+        mds->mds_fidmap_size = size / sizeof(struct hlist_head);
+
+        CWARN("allocating %lu fid mapping entries\n",
+              (unsigned long)mds->mds_fidmap_size);
+
+        OBD_ALLOC(mds->mds_fidmap_table, size);
+        if (!mds->mds_fidmap_table)
+                RETURN(-ENOMEM);
+
+        i = mds->mds_fidmap_size;
+        head = mds->mds_fidmap_table;
+        do {
+                INIT_HLIST_HEAD(head);
+                head++;
+                i--;
+        } while(i);
+
+        RETURN(0);
+}
+
+int mds_fidmap_cleanup(struct obd_device *obd)
+{
+        struct hlist_node *node = NULL, *tmp = NULL;
+        struct mds_obd *mds = &obd->u.mds;
+        struct fidmap_entry *entry;
+        struct hlist_head *head;
+        int i = 0;
+        ENTRY;
+
+        spin_lock(&mds->mds_fidmap_lock);
+        for (i = 0, head = mds->mds_fidmap_table;
+             i < mds->mds_fidmap_size; i++, head++) {
+                hlist_for_each_safe(node, tmp, head) {
+                        entry = hlist_entry(node, struct fidmap_entry, fm_hash);
+                        hlist_del_init(&entry->fm_hash);
+                        OBD_FREE(entry, sizeof(*entry));
+                }
+        }
+        spin_unlock(&mds->mds_fidmap_lock);
+        OBD_FREE(mds->mds_fidmap_table, mds->mds_fidmap_size *
+                 sizeof(struct hlist_head));
+        RETURN(0);
+}
+
+static inline unsigned long
+const hashfn(struct obd_device *obd, __u64 fid)
+{
+        struct mds_obd *mds = &obd->u.mds;
+        return (unsigned long)(fid & (mds->mds_fidmap_size - 1));
+}
+
+static struct fidmap_entry *
+__mds_fidmap_find(struct obd_device *obd, __u64 fid)
+{
+        struct fidmap_entry *entry = NULL;
+        struct mds_obd *mds = &obd->u.mds;
+        struct hlist_node *node = NULL;
+        struct hlist_head *head;
+        ENTRY;
+
+        head = mds->mds_fidmap_table + hashfn(obd, fid);
+        hlist_for_each(node, head) {
+                entry = hlist_entry(node, struct fidmap_entry, fm_hash);
+                if (id_fid(&entry->fm_id) == fid)
+                        RETURN(entry);
+        }
+        RETURN(NULL);
+}
+
+struct fidmap_entry *
+mds_fidmap_find(struct obd_device *obd, __u64 fid)
+{
+        struct mds_obd *mds = &obd->u.mds;
+        struct fidmap_entry *entry;
+        ENTRY;
+
+        spin_lock(&mds->mds_fidmap_lock);
+        entry = __mds_fidmap_find(obd, fid);
+        spin_unlock(&mds->mds_fidmap_lock);
+        
+        RETURN(entry);
+}
+
+static void __mds_fidmap_insert(struct obd_device *obd,
+                                struct fidmap_entry *entry)
+{
+        struct mds_obd *mds = &obd->u.mds;
+        struct hlist_head *head;
+        unsigned long idx;
+        ENTRY;
+
+        idx = hashfn(obd, id_fid(&entry->fm_id));
+        head = mds->mds_fidmap_table + idx;
+        hlist_add_head(&entry->fm_hash, head);
+        
+        EXIT;
+}
+
+void mds_fidmap_insert(struct obd_device *obd,
+                       struct fidmap_entry *entry)
+{
+        struct mds_obd *mds = &obd->u.mds;
+        ENTRY;
+        
+        spin_lock(&mds->mds_fidmap_lock);
+        __mds_fidmap_insert(obd, entry);
+        spin_unlock(&mds->mds_fidmap_lock);
+        
+        EXIT;
+}
+
+static void __mds_fidmap_remove(struct obd_device *obd,
+                                struct fidmap_entry *entry)
+{
+        ENTRY;
+        hlist_del_init(&entry->fm_hash);
+        EXIT;
+}
+
+void mds_fidmap_remove(struct obd_device *obd,
+                       struct fidmap_entry *entry)
+{
+        struct mds_obd *mds = &obd->u.mds;
+        ENTRY;
+        
+        spin_lock(&mds->mds_fidmap_lock);
+        __mds_fidmap_remove(obd, entry);
+        spin_unlock(&mds->mds_fidmap_lock);
+
+        EXIT;
+}
+
+/* creates new mapping remote fid -> local inode store cookie. Both are saved in
+ * lustre_id for better usability, as all mds function use lustre_id as input
+ * params.*/
+int mds_fidmap_add(struct obd_device *obd,
+                   struct lustre_id *id)
+{
+        struct mds_obd *mds = &obd->u.mds;
+        struct fidmap_entry *entry;
+        ENTRY;
+
+        OBD_ALLOC(entry, sizeof(*entry));
+        if (!entry)
+                RETURN(-ENOMEM);
+
+        entry->fm_id = *id;
+        
+        spin_lock(&mds->mds_fidmap_lock);
+        if (!__mds_fidmap_find(obd, id_fid(id))) {
+                __mds_fidmap_insert(obd, entry);
+                spin_unlock(&mds->mds_fidmap_lock);
+                CDEBUG(D_INODE, "added mapping to "DLID4"\n",
+                       OLID4(id));
+                RETURN(1);
+        }
+        spin_unlock(&mds->mds_fidmap_lock);
+        OBD_FREE(entry, sizeof(*entry));
+        
+        RETURN(0);
+}
+
+/* removes mapping using fid component from passed @id */
+void mds_fidmap_del(struct obd_device *obd,
+                    struct lustre_id *id)
+{
+        struct mds_obd *mds = &obd->u.mds;
+        struct fidmap_entry *entry;
+        ENTRY;
+
+        spin_lock(&mds->mds_fidmap_lock);
+        entry = __mds_fidmap_find(obd, id_fid(id));
+        if (entry) {
+                __mds_fidmap_remove(obd, entry);
+                spin_unlock(&mds->mds_fidmap_lock);
+                OBD_FREE(entry, sizeof(*entry));
+                CDEBUG(D_INODE, "removed mapping to "DLID4"\n",
+                       OLID4(id));
+                goto out;
+        }
+        spin_unlock(&mds->mds_fidmap_lock);
+out:
+        EXIT;
+}
+
+struct lustre_id *mds_fidmap_lookup(struct obd_device *obd,
+                                    struct lustre_id *id)
+{
+        struct mds_obd *mds = &obd->u.mds;
+        struct fidmap_entry *entry;
+        ENTRY;
+
+        spin_lock(&mds->mds_fidmap_lock);
+        entry = __mds_fidmap_find(obd, id_fid(id));
+        spin_unlock(&mds->mds_fidmap_lock);
+
+        if (!entry)
+                RETURN(NULL);
+        
+        RETURN(&entry->fm_id);
+}
+
 /* Add client data to the MDS.  We use a bitmap to locate a free space
  * in the last_rcvd file if cl_off is -1 (i.e. a new client).
  * Otherwise, we have just read the data from the last_rcvd file and
@@ -454,17 +669,22 @@ int mds_fs_setup_rootid(struct obd_device *obd)
         LASSERT(dentry->d_inode);
 
         rc = mds_pack_inode2id(obd, &mds->mds_rootid, inode, 1);
-        if (rc < 0) {
+        if (rc && rc != -ENODATA)
+                GOTO(out_dentry, rc);
+
+        if (rc) {
                 if (rc != -ENODATA)
                         GOTO(out_dentry, rc);
         } else {
-                /*
-                 * rootid is filled by mds_read_inode_sid(), so we do not need
-                 * to allocate it and update. The only thing we need to check is
-                 * mds_num.
-                 */
+                /* rootid is filled by mds_read_inode_sid(), so we do not need
+                 * to allocate it and update. */
                 LASSERT(id_group(&mds->mds_rootid) == mds->mds_num);
                 mds_set_last_fid(obd, id_fid(&mds->mds_rootid));
+
+                rc = mds_fidmap_add(obd, &mds->mds_rootid);
+                if (rc > 0)
+                        rc = 0;
+                
                 GOTO(out_dentry, rc);
         }
 
@@ -485,6 +705,12 @@ int mds_fs_setup_rootid(struct obd_device *obd)
                 GOTO(out_dentry, rc);
         }
 
+        rc = mds_fidmap_add(obd, &mds->mds_rootid);
+        if (rc < 0)
+                GOTO(out_dentry, rc);
+       else
+               rc = 0;
+        
         rc = fsfilt_commit(obd, mds->mds_sb, inode, handle, 0);
         if (rc)
                 CERROR("fsfilt_commit() failed, rc = %d\n", rc);
@@ -587,6 +813,12 @@ int mds_fs_setup_virtid(struct obd_device *obd)
                 RETURN(rc);
         }
 
+        rc = mds_fidmap_add(obd, &sid);
+        if (rc < 0)
+                RETURN(rc);
+       else
+               rc = 0;
+        
         rc = fsfilt_commit(obd, mds->mds_sb, inode, handle, 0);
         if (rc) {
                 CERROR("fsfilt_commit() failed, rc = %d\n", rc);
@@ -596,6 +828,8 @@ int mds_fs_setup_virtid(struct obd_device *obd)
         RETURN(rc);
 }
 
+#define MDS_FIDMAP_SIZE (2*PAGE_SIZE)
+
 int mds_fs_setup(struct obd_device *obd, struct vfsmount *mnt)
 {
         struct mds_obd *mds = &obd->u.mds;
@@ -764,6 +998,16 @@ int mds_fs_setup(struct obd_device *obd, struct vfsmount *mnt)
                        file->f_dentry->d_inode->i_mode);
                 GOTO(err_lov_objid, rc = -ENOENT);
         }
+
+        /* reint fidext thumb by last fid after root and virt are initialized */
+        mds->mds_fidext_thumb = mds->mds_last_fid;
+                
+        rc = mds_fidmap_init(obd, MDS_FIDMAP_SIZE);
+        if (rc) {
+                CERROR("cannot init fid mapping tables, err %d\n", rc);
+                GOTO(err_lov_objid, rc);
+        }
+        
 err_pop:
         pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
         return rc;
@@ -817,7 +1061,8 @@ int mds_fs_cleanup(struct obd_device *obd, int flags)
         class_disconnect_exports(obd, flags); /* cleans up client info too */
         target_cleanup_recovery(obd);
         mds_server_free_data(mds);
-
+        mds_fidmap_cleanup(obd);
+        
         push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
         if (mds->mds_virtid_filp) {
                 rc = filp_close(mds->mds_virtid_filp, 0);
index fd5eaec..f707e98 100644 (file)
@@ -23,6 +23,9 @@ struct mds_filter_data {
 #define MDS_EXPECT_SPLIT        1
 #define MDS_NO_SPLITTABLE       2
 
+/* 1048576 should be enough for one client pool */
+#define MDS_FIDEXT_SIZE (1 << 20)
+
 static inline struct mds_obd *mds_req2mds(struct ptlrpc_request *req)
 {
         return &req->rq_export->exp_obd->u.mds;
@@ -178,11 +181,10 @@ void mds_dt_update_objids(struct obd_device *obd, obd_id *ids);
 void mds_dt_save_objids(struct obd_device *obd, obd_id *ids);
 
 /* mds/mds_open.c */
-int
-mds_create_object(struct obd_device *obd, struct ptlrpc_request *req,
-                  int offset, struct mds_update_record *rec,
-                  struct dentry *dchild, void **handle,
-                  obd_id *ids);
+int mds_create_object(struct obd_device *obd, struct ptlrpc_request *req,
+                      int offset, struct mds_update_record *rec,
+                      struct dentry *dchild, void **handle,
+                      obd_id *ids);
 int mds_destroy_object(struct obd_device *obd,
                        struct inode *inode, int async);
 int mds_query_write_access(struct inode *inode);
@@ -198,6 +200,28 @@ int mds_validate_size(struct obd_device *obd, struct inode *inode,
                       struct mds_body *body, struct iattr *iattr);
 
 /* mds/mds_fs.c */
+int mds_fidmap_init(struct obd_device *obd, int size);
+int mds_fidmap_cleanup(struct obd_device *obd);
+
+struct fidmap_entry *
+mds_fidmap_find(struct obd_device *obd, __u64 fid);
+
+struct lustre_id *
+mds_fidmap_lookup(struct obd_device *obd,
+                  struct lustre_id *id);
+
+void mds_fidmap_insert(struct obd_device *obd,
+                       struct fidmap_entry *entry);
+
+void mds_fidmap_remove(struct obd_device *obd,
+                       struct fidmap_entry *entry);
+
+int mds_fidmap_add(struct obd_device *obd,
+                   struct lustre_id *id);
+
+void mds_fidmap_del(struct obd_device *obd,
+                    struct lustre_id *id);
+
 int mds_client_add(struct obd_device *obd, struct mds_obd *mds,
                    struct mds_export_data *med, int cl_off);
 int mds_client_free(struct obd_export *exp, int clear_client);
@@ -240,12 +264,6 @@ int mds_read_inode_sid(struct obd_device *, struct inode *,
 int mds_read_inode_pid(struct obd_device *, struct inode *,
                        struct lustre_id *);
 
-int mds_update_inode_mid(struct obd_device *, struct inode *,
-                         void *, struct lustre_id *);
-
-int mds_read_inode_mid(struct obd_device *, struct inode *,
-                       struct lustre_id *);
-
 void mds_commit_last_fid_cb(struct obd_device *, __u64 fid,
                             void *data, int error);
 
@@ -290,7 +308,8 @@ int mds_md_disconnect(struct obd_device *obd, int flags);
 int mds_try_to_split_dir(struct obd_device *, struct dentry *, struct mea **,
                          int, int);
 int mds_md_get_attr(struct obd_device *, struct inode *, struct mea **, int *);
-int mds_choose_mdsnum(struct obd_device *, const char *, int, int, struct ptlrpc_peer *, struct inode *);
+int mds_choose_mdsnum(struct obd_device *, const char *, int, int,
+                      struct ptlrpc_peer *, struct inode *, int);
 int mds_md_postsetup(struct obd_device *);
 int mds_splitting_expected(struct obd_device *, struct dentry *);
 int mds_lock_slave_objs(struct obd_device *, struct dentry *,
index 5477e25..4584f1c 100644 (file)
@@ -422,6 +422,7 @@ static int mds_setattr_unpack(struct ptlrpc_request *req, int offset,
                 RETURN (-EFAULT);
 
         r->ur_id1 = &rec->sa_id;
+        r->ur_flags = rec->sa_flags;
         attr->ia_valid = rec->sa_valid;
         attr->ia_mode = rec->sa_mode;
         attr->ia_uid = rec->sa_uid;
@@ -432,10 +433,10 @@ static int mds_setattr_unpack(struct ptlrpc_request *req, int offset,
         LTIME_S(attr->ia_ctime) = rec->sa_ctime;
         attr->ia_attr_flags = rec->sa_attr_flags;
 
-        LASSERT_REQSWAB (req, offset + 1);
+        LASSERT_REQSWAB(req, offset + 1);
         if (req->rq_reqmsg->bufcount > offset + 1) {
-                r->ur_eadata = lustre_msg_buf (req->rq_reqmsg,
-                                               offset + 1, 0);
+                r->ur_eadata = lustre_msg_buf(req->rq_reqmsg,
+                                              offset + 1, 0);
                 if (r->ur_eadata == NULL)
                         RETURN (-EFAULT);
                 r->ur_eadatalen = req->rq_reqmsg->buflens[offset + 1];
@@ -466,8 +467,8 @@ static int mds_create_unpack(struct ptlrpc_request *req, int offset,
         struct mds_rec_create *rec;
         ENTRY;
 
-        rec = lustre_swab_reqbuf (req, offset, sizeof (*rec),
-                                  lustre_swab_mds_rec_create);
+        rec = lustre_swab_reqbuf(req, offset, sizeof(*rec),
+                                 lustre_swab_mds_rec_create);
         if (rec == NULL)
                 RETURN (-EFAULT);
 
@@ -478,19 +479,19 @@ static int mds_create_unpack(struct ptlrpc_request *req, int offset,
         r->ur_time = rec->cr_time;
         r->ur_flags = rec->cr_flags;
 
-        LASSERT_REQSWAB (req, offset + 1);
-        r->ur_name = lustre_msg_string (req->rq_reqmsg, offset + 1, 0);
+        LASSERT_REQSWAB(req, offset + 1);
+        r->ur_name = lustre_msg_string(req->rq_reqmsg, offset + 1, 0);
         if (r->ur_name == NULL)
-                RETURN (-EFAULT);
+                RETURN(-EFAULT);
         r->ur_namelen = req->rq_reqmsg->buflens[offset + 1];
 
-        LASSERT_REQSWAB (req, offset + 2);
+        LASSERT_REQSWAB(req, offset + 2);
         if (req->rq_reqmsg->bufcount > offset + 2) {
                 if (S_ISLNK(r->ur_mode)) {
                         r->ur_tgt = lustre_msg_string(req->rq_reqmsg,
                                                       offset + 2, 0);
                         if (r->ur_tgt == NULL)
-                                RETURN (-EFAULT);
+                                RETURN(-EFAULT);
                         r->ur_tgtlen = req->rq_reqmsg->buflens[offset + 2];
                 } else if (S_ISDIR(r->ur_mode) ) {
                         /* Stripe info for mkdir - just a 16bit integer */
@@ -498,14 +499,15 @@ static int mds_create_unpack(struct ptlrpc_request *req, int offset,
                                 CERROR("mkdir stripe info does not match "
                                        "expected size %d vs 2\n",
                                        req->rq_reqmsg->buflens[offset + 2]);
-                                RETURN (-EINVAL);
+                                RETURN(-EINVAL);
                         }
-                        r->ur_eadata = lustre_swab_buf (req->rq_reqmsg,
-                                               offset + 2, 2, __swab16s);
+                        r->ur_eadata = lustre_swab_buf(req->rq_reqmsg,
+                                                       offset + 2, 2,
+                                                       __swab16s);
                         r->ur_eadatalen = req->rq_reqmsg->buflens[offset + 2];
                 } else if (S_ISREG(r->ur_mode)){
-                        r->ur_eadata = lustre_msg_buf (req->rq_reqmsg, 
-                                                       offset + 2, 0);
+                        r->ur_eadata = lustre_msg_buf(req->rq_reqmsg, 
+                                                      offset + 2, 0);
                         r->ur_eadatalen = req->rq_reqmsg->buflens[offset + 2];
                 } else {
                         /* Hm, no other users so far? */
@@ -521,19 +523,20 @@ static int mds_link_unpack(struct ptlrpc_request *req, int offset,
         struct mds_rec_link *rec;
         ENTRY;
 
-        rec = lustre_swab_reqbuf (req, offset, sizeof (*rec),
-                                  lustre_swab_mds_rec_link);
+        rec = lustre_swab_reqbuf(req, offset, sizeof(*rec),
+                                 lustre_swab_mds_rec_link);
         if (rec == NULL)
-                RETURN (-EFAULT);
+                RETURN(-EFAULT);
 
         r->ur_id1 = &rec->lk_id1;
         r->ur_id2 = &rec->lk_id2;
         r->ur_time = rec->lk_time;
+        r->ur_flags = rec->lk_flags;
 
-        LASSERT_REQSWAB (req, offset + 1);
-        r->ur_name = lustre_msg_string (req->rq_reqmsg, offset + 1, 0);
+        LASSERT_REQSWAB(req, offset + 1);
+        r->ur_name = lustre_msg_string(req->rq_reqmsg, offset + 1, 0);
         if (r->ur_name == NULL)
-                RETURN (-EFAULT);
+                RETURN(-EFAULT);
         r->ur_namelen = req->rq_reqmsg->buflens[offset + 1];
         RETURN(0);
 }
@@ -544,8 +547,8 @@ static int mds_unlink_unpack(struct ptlrpc_request *req, int offset,
         struct mds_rec_unlink *rec;
         ENTRY;
 
-        rec = lustre_swab_reqbuf (req, offset, sizeof (*rec),
-                                  lustre_swab_mds_rec_unlink);
+        rec = lustre_swab_reqbuf(req, offset, sizeof (*rec),
+                                 lustre_swab_mds_rec_unlink);
         if (rec == NULL)
                 RETURN(-EFAULT);
 
@@ -553,8 +556,9 @@ static int mds_unlink_unpack(struct ptlrpc_request *req, int offset,
         r->ur_id1 = &rec->ul_id1;
         r->ur_id2 = &rec->ul_id2;
         r->ur_time = rec->ul_time;
+        r->ur_flags = rec->ul_flags;
 
-        LASSERT_REQSWAB (req, offset + 1);
+        LASSERT_REQSWAB(req, offset + 1);
         r->ur_name = lustre_msg_string(req->rq_reqmsg, offset + 1, 0);
         if (r->ur_name == NULL)
                 RETURN(-EFAULT);
@@ -568,22 +572,23 @@ static int mds_rename_unpack(struct ptlrpc_request *req, int offset,
         struct mds_rec_rename *rec;
         ENTRY;
 
-        rec = lustre_swab_reqbuf (req, offset, sizeof (*rec),
-                                  lustre_swab_mds_rec_rename);
+        rec = lustre_swab_reqbuf(req, offset, sizeof (*rec),
+                                 lustre_swab_mds_rec_rename);
         if (rec == NULL)
                 RETURN(-EFAULT);
 
         r->ur_id1 = &rec->rn_id1;
         r->ur_id2 = &rec->rn_id2;
         r->ur_time = rec->rn_time;
+        r->ur_flags = rec->rn_flags;
 
-        LASSERT_REQSWAB (req, offset + 1);
+        LASSERT_REQSWAB(req, offset + 1);
         r->ur_name = lustre_msg_string(req->rq_reqmsg, offset + 1, 0);
         if (r->ur_name == NULL)
                 RETURN(-EFAULT);
         r->ur_namelen = req->rq_reqmsg->buflens[offset + 1];
 
-        LASSERT_REQSWAB (req, offset + 2);
+        LASSERT_REQSWAB(req, offset + 2);
         r->ur_tgt = lustre_msg_string(req->rq_reqmsg, offset + 2, 0);
         if (r->ur_tgt == NULL)
                 RETURN(-EFAULT);
@@ -597,8 +602,8 @@ static int mds_open_unpack(struct ptlrpc_request *req, int offset,
         struct mds_rec_create *rec;
         ENTRY;
 
-        rec = lustre_swab_reqbuf (req, offset, sizeof (*rec),
-                                  lustre_swab_mds_rec_create);
+        rec = lustre_swab_reqbuf(req, offset, sizeof (*rec),
+                                 lustre_swab_mds_rec_create);
         if (rec == NULL)
                 RETURN(-EFAULT);
 
@@ -608,16 +613,16 @@ static int mds_open_unpack(struct ptlrpc_request *req, int offset,
         r->ur_rdev = rec->cr_rdev;
         r->ur_time = rec->cr_time;
         r->ur_flags = rec->cr_flags;
+
+        LASSERT_REQSWAB(req, offset + 1);
+        r->ur_name = lustre_msg_string(req->rq_reqmsg, offset + 1, 0);
  
-        LASSERT_REQSWAB (req, offset + 1);
-        r->ur_name = lustre_msg_string (req->rq_reqmsg, offset + 1, 0);
         if (r->ur_name == NULL)
-                RETURN (-EFAULT);
+                RETURN(-EFAULT);
         r->ur_namelen = req->rq_reqmsg->buflens[offset + 1];
 
-        LASSERT_REQSWAB (req, offset + 2);
-       
-        if (req->rq_reqmsg->bufcount > offset + 2) { 
+        LASSERT_REQSWAB(req, offset + 2);
+        if (req->rq_reqmsg->bufcount > offset + 2) {
                 r->ur_eadata = lustre_msg_buf(req->rq_reqmsg, offset + 2, 0);
                 if (r->ur_eadata == NULL)
                         RETURN(-EFAULT);
@@ -652,22 +657,20 @@ int mds_update_unpack(struct ptlrpc_request *req, int offset,
         int rc;
         ENTRY;
 
-        /*
-         * NB don't lustre_swab_reqbuf() here. We're just taking a peek and we
+        /* NB don't lustre_swab_reqbuf() here. We're just taking a peek and we
          * want to leave it to the specific unpacker once we've identified the
-         * message type.
-         */
-        opcodep = lustre_msg_buf (req->rq_reqmsg, offset, sizeof(*opcodep));
+         * message type. */
+        opcodep = lustre_msg_buf(req->rq_reqmsg, offset, sizeof(*opcodep));
         if (opcodep == NULL)
                 RETURN(-EFAULT);
 
         opcode = *opcodep;
-        if (lustre_msg_swabbed (req->rq_reqmsg))
-                __swab32s (&opcode);
+        if (lustre_msg_swabbed(req->rq_reqmsg))
+                __swab32s(&opcode);
 
         if (opcode > REINT_MAX ||
             mds_unpackers[opcode] == NULL) {
-                CERROR ("Unexpected opcode %d\n", opcode);
+                CERROR("Unexpected opcode %d\n", opcode);
                 RETURN(-EFAULT);
         }
 
index f40db1c..c7ee9cd 100644 (file)
@@ -920,13 +920,18 @@ int mds_commitrw(int cmd, struct obd_export *exp, struct obdo *oa,
 }
 
 int mds_choose_mdsnum(struct obd_device *obd, const char *name, int len, int flags,
-                        struct ptlrpc_peer *peer, struct inode *parent)
+                      struct ptlrpc_peer *peer, struct inode *parent, int local)
 {
         struct mds_obd *mds = &obd->u.mds;
-        struct lmv_obd *lmv;
-        int i = mds->mds_num;
         char peer_str[PTL_NALFMT_SIZE];
-        if (flags & REC_REINT_CREATE) { 
+        int i = mds->mds_num;
+        struct lmv_obd *lmv;
+        ENTRY;
+
+        if (local)
+                RETURN(mds->mds_num);
+        
+        if (flags & REC_REINT_CREATE) {
                 i = mds->mds_num;
         } else if (mds->mds_md_exp != NULL && peer != NULL) {
                 LASSERT(parent != NULL);
index ad4f02c..7486ce1 100644 (file)
@@ -831,10 +831,8 @@ static int mds_open_by_id(struct ptlrpc_request *req,
         }
         l_dput(dchild);
 
-        /*
-         * we didn't find it in PENDING so it isn't an orphan.  See if it was a
-         * regular inode that was previously created.
-         */
+        /* we didn't find it in PENDING so it isn't an orphan.  See if it was a
+         * regular inode that was previously created. */
         dchild = mds_id2dentry(req2obd(req), id, NULL);
         if (IS_ERR(dchild))
                 RETURN(PTR_ERR(dchild));
@@ -1168,15 +1166,17 @@ got_child:
                         handle = NULL;
                         GOTO(cleanup, rc);
                 }
-                if (ino)
+                if (id_fid(rec->ur_id2))
                         fid = id_fid(rec->ur_id2); 
                 else 
                         fid = mds_alloc_fid(obd);
+                
                 dchild->d_fsdata = (void *) &dp;
                 dp.p_ptr = req;
                 dp.p_inum = ino;
+                
                 dp.p_fid = fid;
-                dp.p_group = mds->mds_num; 
+                dp.p_group = mds->mds_num;
 
                 rc = ll_vfs_create(dparent->d_inode, dchild, rec->ur_mode, NULL);
                 if (dchild->d_fsdata == (void *)(unsigned long)ino)
@@ -1186,7 +1186,9 @@ got_child:
                         CDEBUG(D_INODE, "error during create: %d\n", rc);
                         GOTO(cleanup, rc);
                 }
+
                 inode = dchild->d_inode;
+
                 if (ino) {
                         LASSERT(ino == inode->i_ino);
                         
@@ -1362,6 +1364,17 @@ got_child:
         if (rc)
                 GOTO(cleanup, rc);
 
+        /* reintegration case */
+        if ((rec->ur_flags & MDS_REINT_REQ)) {
+                rc = mds_fidmap_add(obd, &body->id1);
+                if (rc < 0) {
+                        CERROR("can't create fid->ino mapping, err %d\n",
+                               rc);
+                } else {
+                       rc = 0;
+               }
+        }
+        
         /* if this is a writer, we have to invalidate client's
          * update locks in order to make sure they don't use
          * isize/iblocks from mds anymore.
index be146f6..d6e771a 100644 (file)
@@ -453,23 +453,12 @@ out:
         return 0;
 }
 
-/*This is a tmp fix for cmobd setattr reint*/
-
-#define XATTR_LUSTRE_MDS_LOV_EA         "lov"
-#define XATTR_LUSTRE_MDS_MEA_EA         "mea"
-#define XATTR_LUSTRE_MDS_MID_EA         "mid"
-#define XATTR_LUSTRE_MDS_SID_EA         "sid"
-#define XATTR_LUSTRE_MDS_PID_EA         "pid"
-#define XATTR_LUSTRE_MDS_KEY_EA         "key"
-
 static int mds_get_md_type(char *name)
 {
         if (!strcmp(name, XATTR_LUSTRE_MDS_LOV_EA)) 
                 RETURN(EA_LOV);
         if (!strcmp(name, XATTR_LUSTRE_MDS_MEA_EA))
                 RETURN(EA_MEA);
-        if (!strcmp(name, XATTR_LUSTRE_MDS_MID_EA))
-                RETURN(EA_MID);
         if (!strcmp(name, XATTR_LUSTRE_MDS_SID_EA))
                 RETURN(EA_SID);
         if (!strcmp(name, XATTR_LUSTRE_MDS_PID_EA))
@@ -603,16 +592,20 @@ static int mds_reint_setattr(struct mds_update_record *rec, int offset,
                 } else if (rec->ur_iattr.ia_valid & ATTR_EA_CMOBD) {
                         char *name;
                         int type;
-                        /*tmp fix for cmobd set md reint*/
+                        
+                        /* tmp fix for cmobd set md reint */
                         LASSERT(rec->ur_eadata != NULL);
                         LASSERT(rec->ur_ea2data != NULL);
                         name = rec->ur_eadata;
+                        
                         CDEBUG(D_INFO, "set %s EA for cmobd \n", name);
+
                         type = mds_get_md_type(name);
-                        if (type != 0) 
+                        if (type != 0) {
                                 rc = fsfilt_set_md(obd, inode, handle, 
                                                    rec->ur_ea2data,
                                                    rec->ur_ea2datalen, type);
+                        }
                         if (rc)
                                 GOTO(cleanup, rc);               
                 } else if ((S_ISREG(inode->i_mode) || S_ISDIR(inode->i_mode)) &&
@@ -845,7 +838,8 @@ static int mds_reint_create(struct mds_update_record *rec, int offset,
                                        rec->ur_namelen - 1, MDS_INODELOCK_UPDATE);
         if (IS_ERR(dparent)) {
                 rc = PTR_ERR(dparent);
-                CERROR("parent lookup error %d\n", rc);
+                CERROR("parent lookup error %d, id "DLID4"\n",
+                       rc, OLID4(rec->ur_id1));
                 GOTO(cleanup, rc);
         }
         cleanup_phase = 1; /* locked parent dentry */
@@ -915,18 +909,21 @@ static int mds_reint_create(struct mds_update_record *rec, int offset,
                         rec->ur_mode |= S_ISGID;
         }
 
-        /*
-         * here inode number should be used only in the case of replaying. It is
-         * needed to check if object already created in the case of creating
-         * remote inode.
-         */
-        if (id_ino(rec->ur_id2)) 
+        /* for reint case stor ecookie should be zero */
+        if (rec->ur_flags & MDS_REINT_REQ) {
+                LASSERT(id_ino(rec->ur_id1) == 0);
+                LASSERT(id_ino(rec->ur_id2) == 0);
+        }
+        
+        if (id_fid(rec->ur_id2))
                 fid = id_fid(rec->ur_id2);
-        else 
+        else
                 fid = mds_alloc_fid(obd);
+        
         dchild->d_fsdata = (void *)&dp;
         dp.p_inum = (unsigned long)id_ino(rec->ur_id2);
         dp.p_ptr = req;
+
         dp.p_fid = fid;
         dp.p_group = mds->mds_num;
 
@@ -938,15 +935,28 @@ static int mds_reint_create(struct mds_update_record *rec, int offset,
                 if (IS_ERR(handle))
                         GOTO(cleanup, rc = PTR_ERR(handle));
                 rc = ll_vfs_create(dir, dchild, rec->ur_mode, NULL);
+                
+                /* XXX: here we should check what type of EA is in ur_eadata 
+                 * and do appropriate actions. --umka */
                 if (rec->ur_eadata && rec->ur_eadatalen && 
-                    (rc == 0) && (dchild->d_inode != NULL)) {
-                        /*Assumption: When ur_eadata is not NULL, 
-                         *ur_eadata is crypto key, should fix it later, Wangdi*/
+                    rc == 0 && dchild->d_inode != NULL) {
+                    if (rec->ur_flags & MDS_REINT_REQ) {
+                        /* for CMOBD to set lov md info when cmobd reint
+                         * create */
+                        CDEBUG(D_INFO, "set lsm %p, len %d to inode %lu \n", 
+                               rec->ur_eadata, rec->ur_eadatalen, 
+                               dchild->d_inode->i_ino); 
+                        fsfilt_set_md(obd, dchild->d_inode, handle, rec->ur_eadata,
+                                      rec->ur_eadatalen, EA_LOV);  
+                    } else {
+                        /* assumption: when ur_eadata is not NULL, 
+                         * ur_eadata is crypto key, should fix it later, 
+                         * --wangdi */
                         mds_set_gskey(obd, handle, dchild->d_inode, 
                                       rec->ur_eadata, rec->ur_eadatalen, 
                                       ATTR_MAC | ATTR_KEY); 
+                    }
                 }
-                EXIT;
                 break;
         }
         case S_IFDIR: {
@@ -958,8 +968,14 @@ static int mds_reint_create(struct mds_update_record *rec, int offset,
                  * processing load. first, we calculate which MDS to use to put
                  * new directory's inode in.
                  */
+
+                /* XXX: here we order mds_choose_mdsnum() to use local mdsnum
+                 * for reint requests. This should be gone when real flushing on
+                 * LMV is fixed. --umka */
                 i = mds_choose_mdsnum(obd, rec->ur_name, rec->ur_namelen - 1, 
-                                      rec->ur_flags, &req->rq_peer, dir);
+                                      rec->ur_flags, &req->rq_peer, dir,
+                                      (rec->ur_flags & MDS_REINT_REQ));
+                
                 if (i == mds->mds_num) {
                         /* inode will be created locally */
                         handle = fsfilt_start(obd, dir, FSFILT_OP_MKDIR, NULL);
@@ -1000,7 +1016,13 @@ static int mds_reint_create(struct mds_update_record *rec, int offset,
                         oa->o_mode = rec->ur_mode;
                         oa->o_uid = current->fsuid;
                         oa->o_gid = (dir->i_mode & S_ISGID) ?
-                                                dir->i_gid : current->fsgid;
+                                     dir->i_gid : current->fsgid;
+
+                        /* letting remote MDS know that this is reint
+                         * request. */
+                        if (rec->ur_flags & MDS_REINT_REQ)
+                                oa->o_flags |= OBD_FL_REINT;
+
                         /* transfer parent id to remote inode */
                         memcpy(obdo_id(oa), &sid, sizeof(sid));
                         oa->o_valid |= OBD_MD_FLTYPE | OBD_MD_FLUID | 
@@ -1083,7 +1105,6 @@ static int mds_reint_create(struct mds_update_record *rec, int offset,
                         /* requested name exists in the directory */
                         rc = -EEXIST;
                 }
-                EXIT;
                 break;
         }
         case S_IFLNK:{
@@ -1094,7 +1115,6 @@ static int mds_reint_create(struct mds_update_record *rec, int offset,
                         rc = -EINVAL;           /* -EPROTO? */
                 else
                         rc = ll_vfs_symlink(dir, dchild, rec->ur_tgt, S_IALLUGO);
-                EXIT;
                 break;
         }
         case S_IFCHR:
@@ -1106,7 +1126,6 @@ static int mds_reint_create(struct mds_update_record *rec, int offset,
                 if (IS_ERR(handle))
                         GOTO(cleanup, (handle = NULL, rc = PTR_ERR(handle)));
                 rc = vfs_mknod(dir, dchild, rec->ur_mode, rdev);
-                EXIT;
                 break;
         }
         default:
@@ -1124,8 +1143,8 @@ static int mds_reint_create(struct mds_update_record *rec, int offset,
                 GOTO(cleanup, rc);
         } else if (dchild->d_inode) {
                 struct mds_export_data *med = &req->rq_export->u.eu_mds_data;
-                struct iattr iattr;
                 struct inode *inode = dchild->d_inode;
+                struct iattr iattr;
 
                 created = 1;
                 iattr.ia_uid = rec->ur_fsuid;
@@ -1207,6 +1226,17 @@ static int mds_reint_create(struct mds_update_record *rec, int offset,
                 
                 mds_pack_inode2body(obd, body, inode, 1);
                 mds_body_do_reverse_map(med, body);
+
+                if (rec->ur_flags & MDS_REINT_REQ) {
+                        LASSERT(body != NULL);
+                        rc = mds_fidmap_add(obd, &body->id1);
+                        if (rc < 0) {
+                                CERROR("can't create fid->ino mapping, "
+                                       "err %d\n", rc);
+                        } else {
+                                rc = 0;
+                        }
+                }
         }
 
         EXIT;
@@ -2097,8 +2127,7 @@ cleanup:
 }
 
 static int mds_reint_unlink(struct mds_update_record *rec, int offset,
-                            struct ptlrpc_request *req,
-                            struct lustre_handle *lh)
+                            struct ptlrpc_request *req, struct lustre_handle *lh)
 {
         struct dentry *dparent = NULL, *dchild;
         struct mds_obd *mds = mds_req2mds(req);
@@ -2177,8 +2206,8 @@ static int mds_reint_unlink(struct mds_update_record *rec, int offset,
                                                  &update_mode, rec->ur_name,
                                                  rec->ur_namelen, &child_lockh,
                                                  &dchild, LCK_EX,
-                                                 MDS_INODELOCK_LOOKUP |
-                                                 MDS_INODELOCK_UPDATE);
+                                                 (MDS_INODELOCK_LOOKUP |
+                                                  MDS_INODELOCK_UPDATE));
         }
         if (rc)
                 GOTO(cleanup, rc);
@@ -2331,6 +2360,9 @@ static int mds_reint_unlink(struct mds_update_record *rec, int offset,
                         CERROR("can't remove OST object, err %d\n",
                                rc);
                 }
+
+                if (child_inode->i_nlink == 0)
+                        mds_fidmap_del(obd, &body->id1);
         }
 
         GOTO(cleanup, rc);
@@ -2586,8 +2618,7 @@ cleanup:
 }
 
 static int mds_reint_link(struct mds_update_record *rec, int offset,
-                          struct ptlrpc_request *req,
-                          struct lustre_handle *lh)
+                          struct ptlrpc_request *req, struct lustre_handle *lh)
 {
         struct obd_device *obd = req->rq_export->exp_obd;
         struct dentry *de_src = NULL;
index 0d825c7..f562487 100644 (file)
@@ -359,8 +359,8 @@ int class_add_conn(struct obd_device *obd, struct lustre_cfg *lcfg)
                 CERROR("invalid priority\n");
                 RETURN(-EINVAL);
         }
-        if (strcmp(obd->obd_type->typ_name, "mdc") &&
-            strcmp(obd->obd_type->typ_name, "osc")) {
+        if (strcmp(obd->obd_type->typ_name, OBD_MDC_DEVICENAME) &&
+            strcmp(obd->obd_type->typ_name, OBD_OSC_DEVICENAME)) {
                 CERROR("can't add connection on non-client dev\n");
                 RETURN(-EINVAL);
         }
@@ -376,6 +376,7 @@ int class_add_conn(struct obd_device *obd, struct lustre_cfg *lcfg)
 
         RETURN(rc);
 }
+
 int class_del_conn(struct obd_device *obd, struct lustre_cfg *lcfg)
 {
         struct obd_import *imp;
@@ -389,8 +390,8 @@ int class_del_conn(struct obd_device *obd, struct lustre_cfg *lcfg)
                 RETURN(-EINVAL);
         }
 
-        if (strcmp(obd->obd_type->typ_name, "mdc") &&
-            strcmp(obd->obd_type->typ_name, "osc")) {
+        if (strcmp(obd->obd_type->typ_name, OBD_MDC_DEVICENAME) &&
+            strcmp(obd->obd_type->typ_name, OBD_OSC_DEVICENAME)) {
                 CERROR("can't add connection on non-client dev\n");
                 RETURN(-EINVAL);
         }
index 5b7928d..086ae3c 100644 (file)
@@ -1427,10 +1427,11 @@ static int filter_post_fs_setup(struct obd_device *obd)
 {
         struct filter_obd *filter = &obd->u.filter;
         int rc = 0;
+        ENTRY;
         
         rc = fsfilt_post_setup(obd, filter->fo_dentry_O);
 
-        return rc;
+        RETURN(rc);
 }
 
 /* mount the file system (secretly) */
index 0b6af9e..2aa014f 100644 (file)
 #include <linux/lustre_handles.h>
 #include <linux/obd.h>
 
-#ifndef OBD_FILTER_DEVICENAME
-# define OBD_FILTER_DEVICENAME "obdfilter"
-#endif
-
-#ifndef OBD_FILTER_SAN_DEVICENAME
-# define OBD_FILTER_SAN_DEVICENAME "sanobdfilter"
-#endif
-
 #define LAST_RCVD "last_rcvd"
 #define FILTER_INIT_OBJID 0
 
index 369145b..8421c85 100644 (file)
@@ -3041,6 +3041,16 @@ static int osc_set_info(struct obd_export *exp, obd_count keylen,
                 RETURN(0);
         }
 
+        if (keylen == strlen("setext") &&
+            memcmp(key, "setext", keylen) == 0) {
+                struct client_obd *cli = &exp->exp_obd->u.cli;
+                struct osc_creator *oscc = &cli->cl_oscc;
+                struct fid_extent *ext = val;
+
+                oscc->oscc_next_id = (obd_id)ext->fe_start;
+                RETURN(0);
+        }
+
         if (keylen < strlen("mds_conn") ||
             memcmp(key, "mds_conn", keylen) != 0)
                 RETURN(-EINVAL);
index 2869561..1072c8c 100644 (file)
@@ -432,7 +432,7 @@ void lustre_swab_connect(struct obd_connect_data *ocd)
         __swab32s(&ocd->ocd_nllu[1]);
 }
 
-void lustre_swab_obdo(struct obdo  *o)
+void lustre_swab_obdo(struct obdo *o)
 {
         __swab64s(&o->o_id);
         __swab64s(&o->o_gr);
@@ -468,9 +468,10 @@ void *mdc_create_pack(struct lustre_msg *msg, int offset,
 
         rec->cr_opcode = REINT_CREATE;
         rec->cr_id = op_data->id1;
-        memset(&rec->cr_replayid, 0, sizeof(rec->cr_replayid));
+        rec->cr_replayid = op_data->id2;
         rec->cr_mode = mode;
         rec->cr_rdev = rdev;
+        rec->cr_flags = op_data->flags;
         rec->cr_time = op_data->mod_time;
 
         tmp = lustre_msg_buf(msg, offset + 1, op_data->namelen + 1);
@@ -499,7 +500,7 @@ __u32 mds_pack_open_flags(__u32 flags)
 }
 
 void *mdc_setattr_pack(struct lustre_msg *msg, int offset,
-                       struct mdc_op_data *data, struct iattr *iattr,
+                       struct mdc_op_data *op_data, struct iattr *iattr,
                        void *ea, int ealen, void *ea2, int ea2len, 
                        void *ea3, int ea3len)
 {
@@ -507,7 +508,8 @@ void *mdc_setattr_pack(struct lustre_msg *msg, int offset,
         char *tmp = NULL;
 
         rec->sa_opcode = REINT_SETATTR;
-        rec->sa_id = data->id1;
+        rec->sa_flags = op_data->flags;
+        rec->sa_id = op_data->id1;
 
         if (iattr) {
                 rec->sa_valid = iattr->ia_valid;
@@ -544,7 +546,7 @@ void *mdc_setattr_pack(struct lustre_msg *msg, int offset,
 }
 
 void *mdc_unlink_pack(struct lustre_msg *msg, int offset,
-                      struct mdc_op_data *data)
+                      struct mdc_op_data *op_data)
 {
         struct mds_rec_unlink *rec;
         char *tmp;
@@ -553,19 +555,20 @@ void *mdc_unlink_pack(struct lustre_msg *msg, int offset,
         LASSERT (rec != NULL);
 
         rec->ul_opcode = REINT_UNLINK;
-        rec->ul_mode = data->create_mode;
-        rec->ul_id1 = data->id1;
-        rec->ul_id2 = data->id2;
-        rec->ul_time = data->mod_time;
+        rec->ul_mode = op_data->create_mode;
+        rec->ul_id1 = op_data->id1;
+        rec->ul_id2 = op_data->id2;
+        rec->ul_time = op_data->mod_time;
+        rec->ul_flags = op_data->flags;
 
-        tmp = lustre_msg_buf(msg, offset + 1, data->namelen + 1);
+        tmp = lustre_msg_buf(msg, offset + 1, op_data->namelen + 1);
         LASSERT (tmp != NULL);
-        LOGL0(data->name, data->namelen, tmp);
-        return (void*)tmp;        
+        LOGL0(op_data->name, op_data->namelen, tmp);
+        return (void*)tmp;  
 }
 
 void *mdc_link_pack(struct lustre_msg *msg, int offset,
-                    struct mdc_op_data *data)
+                    struct mdc_op_data *op_data)
 {
         struct mds_rec_link *rec;
         char *tmp;
@@ -573,18 +576,19 @@ void *mdc_link_pack(struct lustre_msg *msg, int offset,
         rec = lustre_msg_buf(msg, offset, sizeof (*rec));
 
         rec->lk_opcode = REINT_LINK;
-        rec->lk_id1 = data->id1;
-        rec->lk_id2 = data->id2;
-        rec->lk_time = data->mod_time;
+        rec->lk_id1 = op_data->id1;
+        rec->lk_id2 = op_data->id2;
+        rec->lk_flags = op_data->flags;
+        rec->lk_time = op_data->mod_time;
 
-        tmp = lustre_msg_buf(msg, offset + 1, data->namelen + 1);
-        LOGL0(data->name, data->namelen, tmp);
+        tmp = lustre_msg_buf(msg, offset + 1, op_data->namelen + 1);
+        LOGL0(op_data->name, op_data->namelen, tmp);
         
         return (void*)tmp; 
 }
 
 void *mdc_rename_pack(struct lustre_msg *msg, int offset,
-                      struct mdc_op_data *data,
+                      struct mdc_op_data *op_data,
                       const char *old, int oldlen,
                       const char *new, int newlen)
 {
@@ -595,9 +599,10 @@ void *mdc_rename_pack(struct lustre_msg *msg, int offset,
 
         /* XXX do something about time, uid, gid */
         rec->rn_opcode = REINT_RENAME;
-        rec->rn_id1 = data->id1;
-        rec->rn_id2 = data->id2;
-        rec->rn_time = data->mod_time;
+        rec->rn_id1 = op_data->id1;
+        rec->rn_id2 = op_data->id2;
+        rec->rn_flags = op_data->flags;
+        rec->rn_time = op_data->mod_time;
 
         tmp = lustre_msg_buf(msg, offset + 1, oldlen + 1);
         LOGL0(old, oldlen, tmp);
@@ -764,6 +769,7 @@ void lustre_swab_mds_body(struct mds_body *b)
 void lustre_swab_mds_rec_setattr(struct mds_rec_setattr *sa)
 {
         __swab32s(&sa->sa_opcode);
+        __swab32s(&sa->sa_flags);
         __swab32s(&sa->sa_valid);
         lustre_swab_lustre_id(&sa->sa_id);
         __swab32s(&sa->sa_mode);
@@ -779,7 +785,7 @@ void lustre_swab_mds_rec_setattr(struct mds_rec_setattr *sa)
 void lustre_swab_mds_rec_create(struct mds_rec_create *cr)
 {
         __swab32s(&cr->cr_opcode);
-        __swab32s(&cr->cr_flags); /* for use with open */
+        __swab32s(&cr->cr_flags);
         __swab32s(&cr->cr_mode);
         lustre_swab_lustre_id(&cr->cr_id);
         lustre_swab_lustre_id(&cr->cr_replayid);
@@ -790,6 +796,7 @@ void lustre_swab_mds_rec_create(struct mds_rec_create *cr)
 void lustre_swab_mds_rec_link(struct mds_rec_link *lk)
 {
         __swab32s(&lk->lk_opcode);
+        __swab32s(&lk->lk_flags);
         lustre_swab_lustre_id(&lk->lk_id1);
         lustre_swab_lustre_id(&lk->lk_id2);
 }
@@ -797,6 +804,7 @@ void lustre_swab_mds_rec_link(struct mds_rec_link *lk)
 void lustre_swab_mds_rec_unlink(struct mds_rec_unlink *ul)
 {
         __swab32s(&ul->ul_opcode);
+        __swab32s(&ul->ul_flags);
         __swab32s(&ul->ul_mode);
         lustre_swab_lustre_id(&ul->ul_id1);
         lustre_swab_lustre_id(&ul->ul_id2);
@@ -805,6 +813,7 @@ void lustre_swab_mds_rec_unlink(struct mds_rec_unlink *ul)
 void lustre_swab_mds_rec_rename (struct mds_rec_rename *rn)
 {
         __swab32s(&rn->rn_opcode);
+        __swab32s(&rn->rn_flags);
         lustre_swab_lustre_id(&rn->rn_id1);
         lustre_swab_lustre_id(&rn->rn_id2);
 }
@@ -820,6 +829,12 @@ void lustre_swab_lov_desc(struct lov_desc *ld)
         /* uuid endian insensitive */
 }
 
+void lustre_swab_fid_extent(struct fid_extent *ext)
+{
+        __swab64s(&ext->fe_start);
+        __swab64s(&ext->fe_width);
+}
+
 void lustre_swab_ldlm_res_id(struct ldlm_res_id *id)
 {
         int  i;
index 5c5be86..8367c3a 100644 (file)
@@ -189,6 +189,7 @@ EXPORT_SYMBOL(lustre_swab_mds_rec_link);
 EXPORT_SYMBOL(lustre_swab_mds_rec_unlink);
 EXPORT_SYMBOL(lustre_swab_mds_rec_rename);
 EXPORT_SYMBOL(lustre_swab_lov_desc);
+EXPORT_SYMBOL(lustre_swab_fid_extent);
 EXPORT_SYMBOL(lustre_swab_ldlm_res_id);
 EXPORT_SYMBOL(lustre_swab_ldlm_policy_data);
 EXPORT_SYMBOL(lustre_swab_ldlm_intent);
index 96f3505..a2e6a83 100644 (file)
@@ -764,9 +764,9 @@ static int gss_cred_refresh(struct ptlrpc_cred *cred)
         gmd.gum_nid = import->imp_connection->c_peer.peer_id.nid;
 
         obdtype = import->imp_obd->obd_type->typ_name;
-        if (!strcmp(obdtype, "mdc"))
+        if (!strcmp(obdtype, OBD_MDC_DEVICENAME))
                 gmd.gum_svc = LUSTRE_GSS_SVC_MDS;
-        else if (!strcmp(obdtype, "osc"))
+        else if (!strcmp(obdtype, OBD_OSC_DEVICENAME))
                 gmd.gum_svc = LUSTRE_GSS_SVC_OSS;
         else {
                 CERROR("gss on %s?\n", obdtype);
index f7e832d..b2e8b64 100644 (file)
@@ -1036,10 +1036,9 @@ int ptlrpcs_import_get_sec(struct obd_import *imp)
                 RETURN(0);
 
         /* find actual flavor for client obd. right now server side
-         * obd (reverse imp, etc) will simply use NULL.
-         */
-        if (!strcmp(imp->imp_obd->obd_type->typ_name, "mdc") ||
-            !strcmp(imp->imp_obd->obd_type->typ_name, "osc")) {
+         * obd (reverse imp, etc) will simply use NULL. */
+        if (!strcmp(imp->imp_obd->obd_type->typ_name, OBD_MDC_DEVICENAME) ||
+            !strcmp(imp->imp_obd->obd_type->typ_name, OBD_OSC_DEVICENAME)) {
                 struct client_obd *cli = &imp->imp_obd->u.cli;
 
                 switch (SEC_FLAVOR_MAJOR(cli->cl_sec_flavor)) {
@@ -1057,7 +1056,8 @@ int ptlrpcs_import_get_sec(struct obd_import *imp)
                         pipedir = imp->imp_obd->obd_name;
                         break;
                 default:
-                        CWARN("unknown security flavor for %s(%s), use null\n",
+                        CWARN("unknown security flavor for %s(%s), "
+                              "use null\n",
                               imp->imp_obd->obd_type->typ_name,
                               imp->imp_obd->obd_name);
                 }
index c45b289..bea797e 100644 (file)
@@ -555,7 +555,6 @@ static int smfs_mkdir(struct inode *dir, struct dentry *dentry, int mode)
         CDEBUG(D_VFSTRACE, "VFS Op:name=%.*s (%p),dir=%lu/%u\n",
                dentry->d_name.len, dentry->d_name.name, dentry,
                dir->i_ino, dir->i_generation);
-
         
         cache_parent = pre_smfs_dentry(NULL, parent, dentry->d_parent);
         cache_dentry = pre_smfs_dentry(cache_parent, NULL, dentry);
index 49763fd..583bff2 100644 (file)
@@ -427,8 +427,8 @@ exit:
         RETURN(rc);
 }
 
-int smfs_setxattr(struct dentry *dentry, const char *name, const void *value,
-                  size_t size, int flags)
+int smfs_setxattr(struct dentry *dentry, const char *name,
+                  const void *value, size_t size, int flags)
 {
         struct inode *cache_inode = I2CI(dentry->d_inode);
         struct dentry *cache_dentry = NULL;
@@ -460,8 +460,8 @@ int smfs_setxattr(struct dentry *dentry, const char *name, const void *value,
         RETURN(rc);
 }
 
-int smfs_getxattr(struct dentry *dentry, const char *name, void *buffer,
-                  size_t size)
+int smfs_getxattr(struct dentry *dentry, const char *name,
+                  void *buffer, size_t size)
 {
         struct inode *cache_inode = I2CI(dentry->d_inode);
         struct dentry *cache_dentry;
index 3d24664..b454b8d 100644 (file)
@@ -698,17 +698,19 @@ static int fsfilt_smfs_setattr(struct dentry *dentry, void *handle,
         post_smfs_dentry(cache_dentry);
         RETURN(rc);
 }
+
 static int fsfilt_smfs_set_xattr(struct inode *inode, void *handle, char *name,
                                  void *buffer, int buffer_size)
 {
-        struct  fsfilt_operations *cache_fsfilt = I2FOPS(inode);
-        struct  inode *cache_inode = NULL;
+        struct fsfilt_operations *cache_fsfilt = I2FOPS(inode);
+        struct inode *cache_inode = NULL;
         struct hook_xattr_msg msg = {
                 .name = name,
                 .buffer = buffer,
                 .buffer_size = buffer_size
         };
-        int     rc = -EIO;
+        int    rc = -EIO;
+        int    lov = 0;
         
         ENTRY;
         
@@ -719,6 +721,7 @@ static int fsfilt_smfs_set_xattr(struct inode *inode, void *handle, char *name,
         if (!cache_inode)
                 RETURN(rc);
         
+        lov = (!strcmp(name, XATTR_LUSTRE_MDS_LOV_EA));
         pre_smfs_inode(inode, cache_inode);
         SMFS_PRE_HOOK(inode, HOOK_F_SETXATTR, &msg);
         if (cache_fsfilt->fs_set_xattr)
@@ -750,17 +753,9 @@ static int fsfilt_smfs_get_xattr(struct inode *inode, char *name,
                 rc = cache_fsfilt->fs_get_xattr(cache_inode, name,
                                                 buffer, buffer_size);
         post_smfs_inode(inode, cache_inode);
-
         RETURN(rc);
 }
 
-#define XATTR_LUSTRE_MDS_LOV_EA         "lov"
-#define XATTR_LUSTRE_MDS_MEA_EA         "mea"
-#define XATTR_LUSTRE_MDS_MID_EA         "mid"
-#define XATTR_LUSTRE_MDS_SID_EA         "sid"
-#define XATTR_LUSTRE_MDS_PID_EA         "pid"
-#define XATTR_LUSTRE_MDS_KEY_EA         "key"
-
 static int fsfilt_smfs_set_md(struct inode *inode, void *handle,
                               void *lmm, int lmm_size, enum ea_type type)
 {
@@ -782,11 +777,6 @@ static int fsfilt_smfs_set_md(struct inode *inode, void *handle,
                                            XATTR_LUSTRE_MDS_SID_EA,
                                            lmm, lmm_size);
                 break;
-        case EA_MID:
-                rc = fsfilt_smfs_set_xattr(inode, handle,
-                                           XATTR_LUSTRE_MDS_MID_EA,
-                                           lmm, lmm_size);
-                break;
         case EA_PID:
                 rc = fsfilt_smfs_set_xattr(inode, handle,
                                            XATTR_LUSTRE_MDS_PID_EA,
@@ -825,11 +815,6 @@ static int fsfilt_smfs_get_md(struct inode *inode, void *lmm,
                                            XATTR_LUSTRE_MDS_SID_EA,
                                            lmm, lmm_size);
                 break;
-        case EA_MID:
-                rc = fsfilt_smfs_get_xattr(inode,
-                                           XATTR_LUSTRE_MDS_MID_EA,
-                                           lmm, lmm_size);
-                break;
         case EA_PID:
                 rc = fsfilt_smfs_get_xattr(inode,
                                            XATTR_LUSTRE_MDS_PID_EA,
index 80996bf..501e0eb 100644 (file)
@@ -105,8 +105,7 @@ static int smfs_llog_process_rec_cb(struct llog_handle *handle,
 exit:
         RETURN(rc);
 }
-#endif
-#if 0
+
 /* not used curently */
 static smfs_pack_rec_func smfs_get_rec_pack_type(struct super_block *sb)
 {
@@ -243,8 +242,8 @@ int smfs_process_rec(struct super_block *sb,
         RETURN(rc);
 }
 
-/*smfs_path is gotten from intermezzo*/
-static charsmfs_path(struct dentry *dentry, struct dentry *root, char *buffer,
+/* smfs_path is gotten from intermezzo */
+static char *smfs_path(struct dentry *dentry, struct dentry *root, char *buffer,
                        int buflen)
 {
         char * end = buffer + buflen;
@@ -424,7 +423,9 @@ static int kml_pack_path (char **buf, struct dentry * dentry)
         return length;
 }
 #endif
-static int kml_create(struct inode * inode, void *arg, struct kml_priv * priv) 
+
+static int kml_create(struct inode *inode, void *arg,
+                      struct kml_priv *priv) 
 {
         struct hook_msg * msg = arg;
         //return smfs_post_rec_create(inode, msg->dentry, NULL, NULL);
@@ -576,7 +577,7 @@ static int kml_setattr(struct inode *inode, void *arg, struct kml_priv *priv)
                 GOTO(exit, rc);
         
         length += rc;
-        rc = smfs_llog_add_rec(S2SMI(inode->i_sb), (void*)buffer, length); 
+        rc = smfs_llog_add_rec(S2SMI(inode->i_sb), (void *)buffer, length); 
         /*
         if (!rc) {
                 if (attr && attr->ia_valid & ATTR_SIZE) {
@@ -610,13 +611,13 @@ static int kml_setxattr(struct inode *inode, void *arg, struct kml_priv *priv)
         kbuf.buf = msg->buffer;
         kbuf.buf_size = msg->buffer_size;
 
-        rc = priv->pack_fn(REINT_SETXATTR, buffer, NULL, inode, msg->name, 
-                           &kbuf);
+        rc = priv->pack_fn(REINT_SETXATTR, buffer, NULL, inode,
+                           msg->name, &kbuf);
         if (rc <= 0) 
                 GOTO(exit, rc);
         
         length += rc;
-        rc = smfs_llog_add_rec(S2SMI(inode->i_sb), (void*)buffer, length); 
+        rc = smfs_llog_add_rec(S2SMI(inode->i_sb), (void *)buffer, length); 
         /*
         if (!rc) {
                 if (attr && attr->ia_valid & ATTR_SIZE) {
@@ -703,7 +704,9 @@ exit:
 }
 */
 
-typedef int (*post_kml_op)(struct inode * inode, void *msg, struct kml_priv * priv);
+typedef int (*post_kml_op)(struct inode *inode, void *msg,
+                           struct kml_priv *priv);
+
 static post_kml_op smfs_kml_post[HOOK_MAX] = {
         [HOOK_CREATE]  kml_create,
         [HOOK_LOOKUP]  NULL,
@@ -736,7 +739,8 @@ static int smfs_kml_post_op(hook_op code, struct inode * inode,
                 RETURN(0);
 
         if (smfs_kml_post[code]) {
-                CDEBUG(D_INODE,"KML: inode %lu, code: %u\n", inode->i_ino, code);
+                CDEBUG(D_INODE,"KML: inode %lu, code: %u\n",
+                       inode->i_ino, code);
                 rc = smfs_kml_post[code](inode, msg, priv);
         }
                 
@@ -747,45 +751,46 @@ static int smfs_kml_post_op(hook_op code, struct inode * inode,
 static int smfs_trans_kml (struct super_block *sb, void *arg,
                            struct kml_priv * priv)
 {
-        int size;
-        
-        //TODO: pass fs opcode and see if kml can participate or not
-        //one record in log per operation
-        size = 1;
+        int size = 1;
+        ENTRY;
         
-        return size;
-}
+        /* FIXME-MIKE: pass fs opcode and see if kml can participate or not one
+         * record in log per operation size = 1 */
 
-extern int mds_rec_pack(int, char*, struct dentry*, struct inode*, void*, void*);
+        RETURN(size);
+}
 
 static int smfs_start_kml(struct super_block *sb, void *arg,
-                          struct kml_priv * kml_p)
+                          struct kml_priv *kml_p)
 {
-        int rc = 0;
-        struct smfs_super_info * smb = S2SMI(sb);
+        struct smfs_super_info *smb = S2SMI(sb);
         struct llog_ctxt **ctxt = &smb->smsi_kml_log;
         struct obd_device *obd = arg;
-
+        int rc = 0;
         ENTRY;
-        //is plugin already activated
+
+        /* is plugin already activated */
         if (SMFS_IS(smb->plg_flags, SMFS_PLG_KML))
                 RETURN(0);
-        
+
         if (obd && obd->obd_type && obd->obd_type->typ_name) {
-                if (strcmp(obd->obd_type->typ_name, "mds"))
-                        RETURN(0);                
+                if (!strcmp(obd->obd_type->typ_name, OBD_MDS_DEVICENAME)) {
+                        kml_p->pack_fn = mds_rec_pack;
+                } else if (!strcmp(obd->obd_type->typ_name, OBD_FILTER_DEVICENAME)) {
+                        kml_p->pack_fn = ost_rec_pack;
+                } else {
+                        CWARN("unexpected device type: %s\n", obd->obd_type->typ_name);
+                }
         }
-        
-        kml_p->pack_fn = mds_rec_pack;
-        
-        //this will do OBD_ALLOC() for ctxt
+
+        LASSERT(kml_p->pack_fn != NULL);
+
+        /* this will do OBD_ALLOC() for ctxt */
         rc = llog_catalog_setup(ctxt, KML_LOG_NAME, smb->smsi_exp,
                                 smb->smsi_ctxt, smb->sm_fsfilt,
-                                smb->smsi_logs_dir,
-                                smb->smsi_objects_dir);
-        
+                                smb->smsi_logs_dir, smb->smsi_objects_dir);
         if (rc) {
-                CERROR("Failed to initialize kml log list catalog %d\n", rc);
+                CERROR("failed to initialize kml log list catalog %d\n", rc);
                 RETURN(rc);
         }
         
@@ -798,7 +803,6 @@ static int smfs_start_kml(struct super_block *sb, void *arg,
         }
 
         SMFS_SET(smb->plg_flags, SMFS_PLG_KML);
-
         RETURN(0);
 }
 
index 196a392..acc0551 100644 (file)
 #include <linux/lustre_smfs.h>
 #include "smfs_internal.h"
 
+/* get lustre_id from "sid" attribute. */
+static int mds_rec_get_id(struct inode *inode, struct lustre_id *id)
+{
+        struct fsfilt_operations *fsfilt = S2SMI(inode->i_sb)->sm_fsfilt;
+        int rc = 0;
+        ENTRY;
+
+        LASSERT(id != NULL);
+        LASSERT(inode != NULL);
+
+        rc = fsfilt->fs_get_xattr(inode, XATTR_LUSTRE_MDS_SID_EA,
+                                  &id->li_fid, sizeof(id->li_fid));
+        
+        if (rc > 0) {
+                LASSERT(id_fid(id) != 0);
+                rc = 0;
+        } else if (rc == 0) {
+                rc = -ENODATA;
+        }
+        
+        RETURN(rc);
+}
+
 static int mds_rec_link_pack(char *buffer, struct dentry *dentry,
                              struct inode *dir, void *data1, void *data2)
 {
@@ -48,14 +71,18 @@ static int mds_rec_link_pack(char *buffer, struct dentry *dentry,
         struct mdc_op_data *op_data;
         void *tmp = NULL;
         int rc = 0;
+        ENTRY;
 
         OBD_ALLOC(op_data, sizeof(*op_data));
         if (op_data == NULL)
                 return -ENOMEM;
         
-        mdc_prepare_mdc_data(op_data, src->d_inode, dir,
-                             (char *)tgt->d_name.name,
-                             tgt->d_name.len, 0);
+        smfs_prepare_mdc_data(op_data, src->d_inode, dir,
+                              (char *)tgt->d_name.name,
+                              tgt->d_name.len, 0);
+
+        mds_rec_get_id(src->d_inode, &op_data->id1);
+        mds_rec_get_id(dir, &op_data->id2);
 
         PACK_KML_REC_INIT(buffer, MDS_REINT);
         mkpi = (struct mds_kml_pack_info *)buffer;
@@ -72,29 +99,32 @@ static int mds_rec_link_pack(char *buffer, struct dentry *dentry,
         OBD_FREE(op_data, sizeof(*op_data));
         mkpi->mpi_total_size = tmp - (void *)msg;
         rc = mkpi->mpi_total_size + sizeof(*mkpi) + sizeof(int);
-        return rc;
+        
+        RETURN(rc);
 }
 
 static int mds_rec_setxattr_pack(char *buffer, struct dentry *dentry,
-                                struct inode *dir, void *data1, void *data2)
+                                 struct inode *dir, void *data1, void *data2)
 {
+        struct kml_buffer *kbuf = (struct kml_buffer *)data2;
         struct mds_rec_setattr *rec = NULL;
         struct mds_kml_pack_info *mkpi;
         struct lustre_msg *msg = NULL;
-        char *name = (char *)data1;
-        struct kml_buffer *kbuf = (struct kml_buffer*)data2;
         struct mdc_op_data *op_data;
-        int rc = 0;
+        char *name = (char *)data1;
         void *tmp = NULL;
+        int rc = 0;
         ENTRY;
 
         OBD_ALLOC(op_data, sizeof(*op_data));
         if (op_data == NULL)
                 return -ENOMEM;
-        mdc_prepare_mdc_data(op_data, dir, NULL, NULL, 0, 0);
+        
+        smfs_prepare_mdc_data(op_data, dir, NULL, NULL, 0, 0);
+        mds_rec_get_id(dir, &op_data->id1);
 
         PACK_KML_REC_INIT(buffer, MDS_REINT);
-        mkpi = (struct mds_kml_pack_info*)buffer;
+        mkpi = (struct mds_kml_pack_info *)buffer;
 
         mkpi->mpi_bufcount = 3;
         mkpi->mpi_size[0] = sizeof(struct mds_rec_setattr);
@@ -111,7 +141,6 @@ static int mds_rec_setxattr_pack(char *buffer, struct dentry *dentry,
         OBD_FREE(op_data, sizeof(*op_data));
 
         rec = (struct mds_rec_setattr *)lustre_msg_buf(msg, 0, 0);
-        
         rec->sa_valid = ATTR_EA_CMOBD;
         
         mkpi->mpi_total_size = tmp - (void *)msg;
@@ -119,6 +148,7 @@ static int mds_rec_setxattr_pack(char *buffer, struct dentry *dentry,
 
         RETURN(rc);
 }
+
 /* FIXME-WANGDI: did not think about EA situation. */
 static int mds_rec_setattr_pack(char *buffer, struct dentry *dentry,
                                 struct inode *dir, void *data1, void *data2)
@@ -131,14 +161,17 @@ static int mds_rec_setattr_pack(char *buffer, struct dentry *dentry,
         int rc = 0, ealen = 0;
         char *ea = NULL;
         void *tmp = NULL;
+        ENTRY;
 
         OBD_ALLOC(op_data, sizeof(*op_data));
         if (op_data == NULL)
                 return -ENOMEM;
-        mdc_prepare_mdc_data(op_data, dir, NULL, NULL, 0, 0);
+
+        smfs_prepare_mdc_data(op_data, dir, NULL, NULL, 0, 0);
+        mds_rec_get_id(dir, &op_data->id1);
 
         PACK_KML_REC_INIT(buffer, MDS_REINT);
-        mkpi = (struct mds_kml_pack_info*)buffer;
+        mkpi = (struct mds_kml_pack_info *)buffer;
 
         mkpi->mpi_bufcount = 1;
         mkpi->mpi_size[0] = sizeof(struct mds_rec_setattr);
@@ -168,8 +201,7 @@ static int mds_rec_setattr_pack(char *buffer, struct dentry *dentry,
 
         mkpi->mpi_total_size = tmp - (void *)msg;
         rc = mkpi->mpi_total_size + sizeof(*mkpi) + sizeof(int);
-
-        return rc;
+        RETURN(rc);
 }
 
 static int mds_rec_create_pack(char *buffer, struct dentry *dentry,
@@ -178,25 +210,27 @@ static int mds_rec_create_pack(char *buffer, struct dentry *dentry,
 {
         struct mds_kml_pack_info *mkpi;
         struct lustre_msg *msg = NULL;
+        struct dentry_params *param;
         struct mdc_op_data *op_data;
         struct mds_rec_create *rec;
-        struct dentry_params *param = 
-                (struct dentry_params *) dentry->d_fsdata;
         int rc = 0, tgt_len = 0;
         void *tmp = NULL;
-
         ENTRY;
         
+        param = (struct dentry_params *)dentry->d_fsdata;
         OBD_ALLOC(op_data, sizeof(*op_data));
         if (op_data == NULL)
                 return -ENOMEM;
         
-        mdc_prepare_mdc_data(op_data, dir, dentry->d_inode,
-                             (char *)dentry->d_name.name,
-                             dentry->d_name.len, 0);
+        smfs_prepare_mdc_data(op_data, dir, dentry->d_inode,
+                              (char *)dentry->d_name.name,
+                              dentry->d_name.len, 0);
 
-        id_fid(&op_data->id1) = param->p_fid;
-        id_group(&op_data->id1) = param->p_group;
+        mds_rec_get_id(dir, &op_data->id1);
+
+        id_fid(&op_data->id2) = param->p_fid;
+        id_group(&op_data->id2) = param->p_group;
+        
         PACK_KML_REC_INIT(buffer, MDS_REINT);
         mkpi = (struct mds_kml_pack_info *)buffer;
 
@@ -209,10 +243,10 @@ static int mds_rec_create_pack(char *buffer, struct dentry *dentry,
                 mkpi->mpi_bufcount++;
         }
 
-        if (data1) {
-                /* for symlink, data1 will be the tgt name. */
+        /* for symlink, data1 will be the tgt name. */
+        if (data1)
                 tgt_len = *(int *)data2;
-        }
+
         msg = (struct lustre_msg *)(buffer + sizeof(*mkpi));
         lustre_init_msg(msg, mkpi->mpi_bufcount, mkpi->mpi_size, NULL);
 
@@ -225,8 +259,7 @@ static int mds_rec_create_pack(char *buffer, struct dentry *dentry,
         mkpi->mpi_total_size = tmp - (void *)msg;
         rc = mkpi->mpi_total_size + sizeof(*mkpi) + sizeof(int);
         OBD_FREE(op_data, sizeof(*op_data));
-        
-        return rc;
+        RETURN(rc);
 }
 
 static int mds_rec_unlink_pack(char *buffer, struct dentry *dentry,
@@ -239,17 +272,20 @@ static int mds_rec_unlink_pack(char *buffer, struct dentry *dentry,
         int mode = *(int*)data1;
         void *tmp = NULL;
         int rc = 0;
+        ENTRY;
 
         OBD_ALLOC(op_data, sizeof(*op_data));
         if (op_data == NULL)
                 return -ENOMEM;
         
-        mdc_prepare_mdc_data(op_data, dir, NULL,
-                             (char *)dentry->d_name.name,
-                             dentry->d_name.len, mode);
+        smfs_prepare_mdc_data(op_data, dir, NULL,
+                              (char *)dentry->d_name.name,
+                              dentry->d_name.len, mode);
+
+        mds_rec_get_id(dir, &op_data->id1);
 
         PACK_KML_REC_INIT(buffer, MDS_REINT);
-        mkpi = (struct mds_kml_pack_info*)buffer;
+        mkpi = (struct mds_kml_pack_info *)buffer;
 
         mkpi->mpi_bufcount = 2;
         mkpi->mpi_size[0] = sizeof(struct mds_rec_unlink);
@@ -260,11 +296,10 @@ static int mds_rec_unlink_pack(char *buffer, struct dentry *dentry,
 
         tmp = mdc_unlink_pack(msg, 0, op_data);
 
-        mkpi->mpi_total_size = tmp - (void*)msg;
+        mkpi->mpi_total_size = tmp - (void *)msg;
         rc = mkpi->mpi_total_size + sizeof(*mkpi) + sizeof(int);
         OBD_FREE(op_data, sizeof(*op_data));
-
-        return rc;
+        RETURN(rc);
 }
 
 static int mds_rec_rename_pack(char *buffer, struct dentry *dentry,
@@ -278,14 +313,18 @@ static int mds_rec_rename_pack(char *buffer, struct dentry *dentry,
         struct mds_rec_rename *rec;
         void *tmp = NULL;
         int rc = 0;
+        ENTRY;
 
         OBD_ALLOC(op_data, sizeof(*op_data));
         if (op_data == NULL)
                 return -ENOMEM;
-        mdc_prepare_mdc_data(op_data, dir, new_dir, NULL, 0, 0);
+        smfs_prepare_mdc_data(op_data, dir, new_dir, NULL, 0, 0);
+
+        mds_rec_get_id(dir, &op_data->id1);
+        mds_rec_get_id(new_dir, &op_data->id2);
 
         PACK_KML_REC_INIT(buffer, MDS_REINT);
-        mkpi = (struct mds_kml_pack_info*)buffer;
+        mkpi = (struct mds_kml_pack_info *)buffer;
 
         mkpi->mpi_bufcount = 3;
         mkpi->mpi_size[0] = sizeof(struct mds_rec_rename);
@@ -294,7 +333,6 @@ static int mds_rec_rename_pack(char *buffer, struct dentry *dentry,
 
         rec = (struct mds_rec_rename *)(buffer + sizeof(*mkpi));
 
-
         msg = (struct lustre_msg *)(buffer + sizeof(*mkpi));
         lustre_init_msg(msg, mkpi->mpi_bufcount, mkpi->mpi_size, NULL);
 
@@ -302,13 +340,14 @@ static int mds_rec_rename_pack(char *buffer, struct dentry *dentry,
                               dentry->d_name.len, (char *)new_dentry->d_name.name,
                               new_dentry->d_name.len);
 
-        mkpi->mpi_total_size = tmp - (void*)msg;
+        mkpi->mpi_total_size = tmp - (void *)msg;
         rc = mkpi->mpi_total_size + sizeof(*mkpi) + sizeof(int);
         OBD_FREE(op_data, sizeof(*op_data));
-        return rc;
+        RETURN(rc);
 }
 
-typedef int (*mds_pack_rec_func)(char *, struct dentry*, struct inode *, void *, void*);
+typedef int (*mds_pack_rec_func)(char *, struct dentry *,
+                                 struct inode *, void *, void *);
 
 static mds_pack_rec_func mds_kml_pack[REINT_MAX + 1] = {
         [REINT_LINK]    mds_rec_link_pack,
@@ -320,7 +359,7 @@ static mds_pack_rec_func mds_kml_pack[REINT_MAX + 1] = {
 };
 
 int mds_rec_pack(int op, char *buffer, struct dentry *dentry, 
-                 struct inode *dir, void * arg, void * arg2)
+                 struct inode *dir, void *arg, void *arg2)
 {
         return mds_kml_pack[op](buffer, dentry, dir, arg, arg2);
 }
index b3a16df..7c0b962 100644 (file)
@@ -173,25 +173,17 @@ out:
 
 typedef int (*ost_pack_rec_func)(char *buffer, struct dentry *dentry,
                                  struct inode *dir, void *data1, void *data2);
+
 static ost_pack_rec_func ost_kml_pack[REINT_MAX + 1] = {
         [REINT_SETATTR] ost_rec_setattr_pack,
         [REINT_CREATE]  ost_rec_create_pack,
         [REINT_WRITE]   ost_rec_write_pack,
 };
 
-int ost_rec_pack(char *buffer, struct dentry *dentry, struct inode *dir,
-                 void *data1, void *data2, int op)
+int ost_rec_pack(int op, char *buffer, struct dentry *dentry,
+                 struct inode *dir, void *data1, void *data2)
 {
-        if (op == REINT_SETATTR || op == REINT_CREATE || op == REINT_WRITE) {
+        if (op == REINT_SETATTR || op == REINT_CREATE || op == REINT_WRITE)
                 return ost_kml_pack[op](buffer, dentry, dir, data1, data2);
-        }
         return 0;
 }
-#if 0
-int ost_rec_pack_init(struct smfs_super_info *smsi)
-{
-
-        smsi->smsi_pack_rec[PACK_OST] = ost_rec_pack;
-        return 0;
-}
-#endif
index 112ecba..3c76c03 100644 (file)
@@ -89,7 +89,7 @@ struct smfs_plugin {
 
 struct kml_priv {
         /* llog pack function */
-        int (* pack_fn)(int, char *, struct dentry*,
+        int (*pack_fn)(int, char *, struct dentry *,
                         struct inode *, void *, void *);
 };
 
@@ -184,7 +184,7 @@ struct plg_info_msg {
         void * val;
 };
 int smfs_helper (struct super_block *, int, void *);
-#define SMFS_PLG_HELP(sb, op, data)  smfs_helper(sb, op, data)
+#define SMFS_PLG_HELP(sb, op, data) smfs_helper(sb, op, data)
 
 int smfs_register_plugin(struct super_block *, struct smfs_plugin *);
 struct smfs_plugin * smfs_deregister_plugin(struct super_block *, int);
index de8d9fe..2c12316 100644 (file)
@@ -150,10 +150,10 @@ extern int smfs_rec_unpack(struct smfs_proc_args *args, char *record,
 extern int smfs_process_rec(struct super_block *sb, int count,
                             char *dir, int flags);
 
-/*mds_kml.c*/
-int mds_rec_pack_init(struct smfs_super_info *smb);
-/*ost_kml.c*/
-int ost_rec_pack_init(struct smfs_super_info *smb);
+extern int mds_rec_pack(int, char *, struct dentry *, struct inode *,
+                        void *, void *);
+extern int ost_rec_pack(int, char *, struct dentry *, struct inode *,
+                        void *, void *);
 
 /*smfs_llog.c*/
 extern int smfs_llog_setup(struct dentry **, struct dentry **);
@@ -247,6 +247,36 @@ struct kml_buffer {
         char *buf;
         int   buf_size;
 }; 
+
+#ifdef __KERNEL__
+static inline void
+smfs_inode2id(struct lustre_id *id, struct inode *inode)
+{
+        mdc_pack_id(id, inode->i_ino, inode->i_generation,
+                    (inode->i_mode & S_IFMT), 0, 0);
+}
+
+static inline void 
+smfs_prepare_mdc_data(struct mdc_op_data *data, struct inode *i1,
+                      struct inode *i2, const char *name, int namelen,
+                      int mode)
+{
+        LASSERT(i1);
+
+        smfs_inode2id(&data->id1, i1);
+        if (i2)
+                smfs_inode2id(&data->id2, i2);
+        else
+                memset(&data->id2, 0, sizeof(data->id2));
+
+       data->valid = 0;
+        data->name = name;
+        data->namelen = namelen;
+        data->create_mode = mode;
+        data->mod_time = LTIME_S(CURRENT_TIME);
+}
+#endif
+
 #if CONFIG_SNAPFS
 int smfs_cow_init(struct super_block *sb);
 int smfs_cow_cleanup(struct smfs_super_info *smb);
index be6bed9..4f7a59a 100644 (file)
@@ -184,28 +184,28 @@ static void smfs_filter_flags(struct filter_obd * filt, struct inode * o_dir)
         }
 }
 
-static void smfs_mds_flags(struct mds_obd * mds, struct inode * root)
+static void smfs_mds_flags(struct mds_obd *mds, struct inode *root)
 {
-        struct inode * pend = mds->mds_pending_dir->d_inode;
+        struct inode *pend = mds->mds_pending_dir->d_inode;
         
         CDEBUG(D_SUPER,"MDS OBD post_setup\n");
+
         /* enable plugins for all in ROOT */        
         SMFS_SET(I2SMI(root)->smi_flags, SMFS_PLG_ALL);
+
         /* the same for PENDING */
         SMFS_SET(I2SMI(pend)->smi_flags, SMFS_PLG_ALL);
 }
-                        
 
 extern int (*audit_id2name_superhack) (struct obd_device *obd, char **name,
                                        int *namelen, struct lustre_id *id);
 
 int smfs_post_setup(struct obd_device *obd, struct vfsmount *mnt,
-                    struct dentry * root_dentry)
+                    struct dentry *root_dentry)
 {
         struct lvfs_run_ctxt saved, *current_ctxt = NULL;
         struct smfs_super_info *smb = S2SMI(mnt->mnt_sb);
         int rc = 0;
-        
         ENTRY;
 
         /* XXX to register id2name function of mds in smfs */
@@ -226,33 +226,29 @@ int smfs_post_setup(struct obd_device *obd, struct vfsmount *mnt,
         push_ctxt(&saved, smb->smsi_ctxt, NULL);
 
         rc = smfs_llog_setup(&smb->smsi_logs_dir, &smb->smsi_objects_dir);
-        if (!rc) {
+        if (!rc)
                 rc = SMFS_PLG_HELP(mnt->mnt_sb, PLG_START, obd);
-        }
 
         pop_ctxt(&saved, smb->smsi_ctxt, NULL);
 
         /* enable plugins for directories on MDS or OST */
         if (obd && obd->obd_type && obd->obd_type->typ_name) {
-                if (!strcmp(obd->obd_type->typ_name, "obdfilter")) {
+                if (!strcmp(obd->obd_type->typ_name, OBD_FILTER_DEVICENAME)) {
                         struct filter_obd *filt = &obd->u.filter;
                         smfs_filter_flags(filt, root_dentry->d_inode);
-                }
-                else if (!strcmp(obd->obd_type->typ_name, "mds")) {
+                } else if (!strcmp(obd->obd_type->typ_name, OBD_MDS_DEVICENAME)) {
                         struct mds_obd * mds = &obd->u.mds;
-                        
                         smfs_mds_flags(mds, root_dentry->d_inode);
                         SMFS_SET_HND_IBLOCKS(smb);
-                }
-                else
+                } else {
                         CDEBUG(D_SUPER,"Unknown OBD (%s) post_setup\n",
                                obd->obd_type->typ_name);
+                }
         }
 
         if (rc)
                 OBD_FREE(current_ctxt, sizeof(*current_ctxt));
-  
+        
         RETURN(rc);
 }
 
index d538eb1..c2d315c 100644 (file)
 #include <linux/lvfs.h>
 
 #include "smfs_internal.h"
-int smfs_llog_setup(struct dentry ** logs, struct dentry ** objects)
+int smfs_llog_setup(struct dentry **logs, struct dentry **objects)
 {
         struct dentry *dentry = NULL;
         int rc = 0;
-
-        /* create OBJECTS and LOGS for writing logs */
         ENTRY;
 
-        //push_ctxt(&saved, S2SMI(sb)->smsi_ctxt, NULL);
+        /* create OBJECTS and LOGS for writing logs */
         dentry = simple_mkdir(current->fs->pwd, "LOGS", 0777, 1);
         if (IS_ERR(dentry)) {
-                rc = PTR_ERR(dentry);
-                CERROR("cannot create LOGS directory: rc = %d\n", rc);
-                //rc = -EINVAL;
-                goto exit;
+                CERROR("cannot create LOGS directory: rc = %d\n",
+                       (int)PTR_ERR(dentry));
+                RETURN(rc);
         }
-
         *logs = dentry;
-                
+        
         dentry = simple_mkdir(current->fs->pwd, "OBJECTS", 0777, 1);
         if (IS_ERR(dentry)) {
-                rc = PTR_ERR(dentry);
-                CERROR("cannot create OBJECTS directory: rc = %d\n", rc);
-                //rc = -EINVAL;
-                goto exit;
+                CERROR("cannot create OBJECTS directory: rc = %d\n",
+                       (int)PTR_ERR(dentry));
+                RETURN(rc);
         }
-
         *objects = dentry;
         
-exit:
         RETURN(rc);
 }
 
index 22d598a..e456fe9 100644 (file)
@@ -58,12 +58,10 @@ h2tcp () {
 ${LMC} -m $config --add net --node $NODE --nid `h2$NIDTYPE $NODE` \
 --nettype $NETTYPE || exit 1
 
-
 ${LMC} -m $config --add mds --node $NODE --mds $CACHE_MDS \
 --fstype $FSTYPE --backfstype $BACK_FSTYPE --dev $MDS_CACHE_DEV \
 --mountfsoptions $MDS_MOUNT_OPTS --size $MDSSIZE --format || exit 10
 
-#add master lmv
 ${LMC} -m $config --add lmv --lmv $MASTER_LMV || exit 12
 
 ${LMC} -m $config --add mds --node $NODE --mds $MASTER_MDS1 \
@@ -74,7 +72,7 @@ ${LMC} -m $config --add mds --node $NODE --mds $MASTER_MDS2 \
 --fstype $BACK_FSTYPE --dev $MDS2_MASTER_DEV --size $MDSSIZE \
 --lmv $MASTER_LMV --format || exit 10
 
-${LMC} -m $config --add lov --lov $CACHE_LOV --mds $CACHE_MDS --masterlmv $MASTER_LMV \
+${LMC} -m $config --add lov --lov $CACHE_LOV --mds $CACHE_MDS --aware $MASTER_LMV \
 --stripe_sz $STRIPE_SIZE --stripe_cnt $STRIPECNT --stripe_pattern 0 || exit 20
 
 ${LMC} -m $config --add ost --ost $CACHE_OST --node $NODE --lov $CACHE_LOV \
index a402665..18d6937 100755 (executable)
@@ -28,6 +28,7 @@ STRIPE_BYTES=65536
 STRIPES_PER_OBJ=0
 
 MOUNT=${MOUNT:-/mnt/lustre}
+MOUNT2=${MOUNT2:-/mnt/lustre2}
 
 # specific journal size for the ost, in MB
 JSIZE=${JSIZE:-0}
diff --git a/lustre/tests/test45-mountain.sh b/lustre/tests/test45-mountain.sh
new file mode 100644 (file)
index 0000000..e0a3786
--- /dev/null
@@ -0,0 +1,136 @@
+#!/bin/bash
+
+export PATH="$PATH:`dirname $0`/../utils"
+
+config=${1:-test45-mountain.xml}
+LMC=${LMC:-lmc}
+TMP=${TMP:-/tmp}
+
+MDS_MASTER_DEV=/dev/mdtdev1_sh
+OST_MASTER_DEV=/dev/ostdev1_sh
+
+MDS_CACHE_DEV=$TMP/mds-cache-localhost
+OST_CACHE_DEV=$TMP/ost-cache-localhost
+
+MDSSIZE=${MDSSIZE:-200000}
+OSTSIZE=${OSTSIZE:-200000}
+
+FSTYPE=${FSTYPE:-smfs}
+BACK_FSTYPE=${BACK_FSTYPE:-ldiskfs}
+
+NETTYPE=${NETTYPE:-tcp}
+NIDTYPE=${NIDTYPE:-$NETTYPE}
+
+# define clients related stuff
+for i in `seq 0 33`; do
+    CLIENT[$i]="mnt$((i+3))"
+    COBD_MDS[$i]="cobd_mds$((i+1))"
+    CMOBD_MDS[$i]="cmobd_mds$((i+1))"
+    CACHE_MDS[$i]="cache_mds$((i+1))"
+    CACHE_OST[$i]="cache_ost$((i+1))"
+    CACHE_LOV[$i]="cache_lov$((i+1))"
+    CACHE_MDS_MOUNT_OPT[$i]="kml"
+    CACHE_MDS_MKFS_OPT[$i]="-b 4096"
+    CACHE_OST_MKFS_OPT[$i]="-b 4096"
+done
+
+CACHE_LOV_STRIPE_COUNT="1"
+CACHE_LOV_STRIPE_SIZE="1048576"
+
+# define MDS related stuff
+for i in `seq 0 3`; do
+    MDS[$i]="mnt$((i+51))"
+    MASTER_MDS[$i]="master_mds$((i+1))"
+    MASTER_MDS_MKFS_OPT[$i]="-b 4096"
+done
+
+MASTER_LMV="master_lmv"
+
+# define OST related stuff
+for i in `seq 0 7`; do
+    OST[$i]="mnt$((i+61))"
+    MASTER_OST[$i]="master_ost$((i+1))"
+    MASTER_OST_MKFS_OPT[$i]="-b 4096"
+done
+
+MASTER_LOV="master_lov"
+MASTER_LOV_STRIPE_COUNT="8"
+MASTER_LOV_STRIPE_SIZE="1048576"
+
+rm -f $config
+
+h2tcp () {
+       case $1 in
+       client) echo '\*' ;;
+       *) echo $1 ;;
+       esac
+}
+
+# add OST nodes
+echo "adding OST nodes..."
+for node in "${OST[@]}"; do
+    ${LMC} -m $config --add node --node $node || exit 1
+    ${LMC} -m $config --add net --node $node --nid `h2$NIDTYPE $node` --nettype $NETTYPE || exit 1
+done
+
+# add master LMV
+${LMC} -m $config --add lmv --lmv $MASTER_LMV || exit 2
+
+# add master OSTs
+echo "adding master OSTs..."
+${LMC} -m $config --add lov --lov $MASTER_LOV --lmv $MASTER_LMV \
+--stripe_sz $MASTER_LOV_STRIPE_SIZE --stripe_cnt $MASTER_LOV_STRIPE_COUNT \
+--stripe_pattern 0 || exit 3
+
+for ((i=0;i<${#MASTER_OST[@]};i++)); do
+    ${LMC} -m $config --add ost --ost ${MASTER_OST[i]} --node ${OST[i]} --lov $MASTER_LOV \
+--fstype $BACK_FSTYPE --dev $OST_MASTER_DEV --size $OSTSIZE --mkfsoptions "${MASTER_OST_MKFS_OPT[i]}" || exit 4
+done
+
+# add MDS nodes
+echo "adding MDS nodes..."
+for node in "${MDS[@]}"; do 
+    ${LMC} -m $config --add node --node $node || exit 1
+    ${LMC} -m $config --add net --node $node --nid `h2$NIDTYPE $node` --nettype $NETTYPE || exit 5
+done
+
+# add master MDSs
+echo "adding master MDSs..."
+for ((i=0;i<${#MASTER_MDS[@]};i++)); do
+    ${LMC} -m $config --add mds --node ${MDS[i]} --mds ${MASTER_MDS[i]} \
+--fstype $BACK_FSTYPE --dev $MDS_MASTER_DEV --size $MDSSIZE \
+--lmv $MASTER_LMV --format --mkfsoptions "${MASTER_MDS_MKFS_OPT[i]}" || exit 6
+done
+
+# add client nodes
+echo "adding client nodes..."
+for node in "${CLIENT[@]}"; do 
+    ${LMC} -m $config --add node --node $node || exit 1
+    ${LMC} -m $config --add net --node $node --nid `h2$NIDTYPE $node` --nettype $NETTYPE || exit 7
+done
+
+# add cache stuff
+echo "adding cache MDSs, OSTs and clients..."
+for ((i=0;i<${#CACHE_MDS[@]};i++)); do
+    ${LMC} -m $config --add mds --node ${CLIENT[i]} --mds ${CACHE_MDS[i]} \
+--fstype $FSTYPE --backfstype $BACK_FSTYPE --dev $MDS_CACHE_DEV \
+--mountfsoptions ${CACHE_MDS_MOUNT_OPT[i]} --mkfsoptions "${CACHE_MDS_MKFS_OPT[i]}" \
+--size $MDSSIZE --format || exit 8
+
+    ${LMC} -m $config --add cmobd --node ${CLIENT[i]} --cmobd ${CMOBD_MDS[i]} \
+--master_obd $MASTER_LMV --cache_obd ${CACHE_MDS[i]} || exit 9
+
+    ${LMC} -m $config --add lov --lov ${CACHE_LOV[i]} --mds ${CACHE_MDS[i]} \
+--stripe_sz $CACHE_LOV_STRIPE_SIZE --stripe_cnt $CACHE_LOV_STRIPE_COUNT \
+--stripe_pattern 0 || exit 10
+
+    ${LMC} -m $config --add ost --ost ${CACHE_OST[i]} --node ${CLIENT[i]} \
+--lov ${CACHE_LOV[i]} --fstype $BACK_FSTYPE --dev $OST_CACHE_DEV \
+--size $OSTSIZE --mkfsoptions "${CACHE_OST_MKFS_OPT[i]}" || exit 11
+
+    ${LMC} -m $config --add cobd --node ${CLIENT[i]} --cobd ${COBD_MDS[i]} \
+--master_obd $MASTER_LMV --cache_obd ${CACHE_MDS[i]} || exit 12
+
+    ${LMC} -m $config --add mtpt --node ${CLIENT[i]} --path /mnt/lustre \
+--lmv ${COBD_MDS[i]} --lov ${CACHE_LOV[i]} || exit 13
+done
diff --git a/lustre/tests/test45.sh b/lustre/tests/test45.sh
new file mode 100644 (file)
index 0000000..ef1f6f7
--- /dev/null
@@ -0,0 +1,162 @@
+#!/bin/bash
+
+export PATH="$PATH:`dirname $0`/../utils"
+
+config=${1:-test45.xml}
+LMC=${LMC:-lmc}
+TMP=${TMP:-/tmp}
+
+COBD_MDS1=${COBD_MDS1:-"cobd_mds1"}
+COBD_MDS2=${COBD_MDS2:-"cobd_mds2"}
+COBD_LOV1=${COBD_LOV1:-"cobd_lov1"}
+COBD_LOV2=${COBD_LOV2:-"cobd_lov2"}
+CMOBD_MDS1=${CMOBD_MDS1:-"cmobd-mds1"}
+CMOBD_MDS2=${CMOBD_MDS2:-"cmobd-mds2"}
+CMOBD_OST1=${CMOBD_OST1:-"cmobd-ost1"}
+CMOBD_OST2=${CMOBD_OST2:-"cmobd-ost2"}
+
+MASTER_LMV=${MASTER_LMV:-master-lmv1}
+MASTER_MDS1=${MASTER_MDS1:-"master-mds1"}
+MASTER_MDS2=${MASTER_MDS2:-"master-mds2"}
+
+CACHE_MDS1=${CACHE_MDS1:-"cache-mds1"}
+CACHE_MDS2=${CACHE_MDS2:-"cache-mds2"}
+
+MDS1_MASTER_DEV=$TMP/mds1-master-localhost
+MDS2_MASTER_DEV=$TMP/mds2-master-localhost
+
+MDS1_CACHE_DEV=$TMP/mds1-cache-localhost
+MDS2_CACHE_DEV=$TMP/mds2-cache-localhost
+
+MDS_MOUNT_OPTS=${MDS_MOUNT_OPTS:-"kml"}
+OST_MOUNT_OPTS=${OST_MOUNT_OPTS:-"kml"}
+
+MDSSIZE=${MDSSIZE:-100000}
+
+MASTER_LOV=${MASTER_LOV:-"master-lov"}
+MASTER_OST=${MASTER_OST:-"master-ost"}
+OST_MASTER_DEV=$TMP/ost1-master-localhost
+
+CACHE_LOV1=${CACHE_LOV1:-"cache-lov1"}
+CACHE_LOV2=${CACHE_LOV2:-"cache-lov2"}
+CACHE_OST1=${CACHE_OST1:-"cache-ost1"}
+CACHE_OST2=${CACHE_OST2:-"cache-ost2"}
+OST1_CACHE_DEV=$TMP/ost1-cache-localhost
+OST2_CACHE_DEV=$TMP/ost2-cache-localhost
+
+OSTSIZE=${OSTSIZE:-100000}
+
+STRIPECNT=${STRIPECNT:-1}
+OSDTYPE=${OSDTYPE:-obdfilter}
+OSTFAILOVER=${OSTFAILOVER:-}
+
+FSTYPE=${FSTYPE:-smfs}
+BACK_FSTYPE=${BACK_FSTYPE:-ldiskfs}
+
+NETTYPE=${NETTYPE:-tcp}
+NIDTYPE=${NIDTYPE:-$NETTYPE}
+STRIPE_SIZE=${STRIPE_SIZE:-65536}
+
+NODE1=${NODE1:-"node1"}
+NODE2=${NODE2:-"node2"}
+
+FS_NODE1="FS_node1"
+FS_NODE2="FS_node2"
+FS_MASTER="FS_master"
+
+rm -f $config
+
+h2tcp () {
+       case $1 in
+       client) echo '\*' ;;
+       *) echo $1 ;;
+       esac
+}
+
+${LMC} -m $config --add filesystem --filesystem $FS_NODE1 || exit 1
+${LMC} -m $config --add filesystem --filesystem $FS_NODE2 || exit 1
+${LMC} -m $config --add filesystem --filesystem $FS_MASTER || exit 1
+
+# node 1
+${LMC} -m $config --add node --node $NODE1 || exit 1
+
+${LMC} -m $config --add net --node $NODE1 --nid `h2$NIDTYPE $NODE1` \
+--nettype $NETTYPE || exit 1
+
+${LMC} -m $config --add mds --node $NODE1 --mds $CACHE_MDS1 \
+--fstype $FSTYPE --backfstype $BACK_FSTYPE --dev $MDS1_CACHE_DEV \
+--mountfsoptions $MDS_MOUNT_OPTS --size $MDSSIZE --format \
+--filesystem $FS_NODE1 || exit 10
+
+${LMC} -m $config --add lov --lov $CACHE_LOV1 --mds $CACHE_MDS1 \
+--stripe_sz $STRIPE_SIZE --stripe_cnt $STRIPECNT --stripe_pattern 0 || exit 20
+
+${LMC} -m $config --add ost --ost $CACHE_OST1 --node $NODE1 --lov $CACHE_LOV1 \
+--fstype $FSTYPE --backfstype $BACK_FSTYPE --dev $OST1_CACHE_DEV --size $OSTSIZE \
+--filesystem $FS_NODE1 --mountfsoptions $OST_MOUNT_OPTS || exit 21
+
+${LMC} -m $config --add lmv --lmv $MASTER_LMV || exit 12
+
+${LMC} -m $config --add mds --node $NODE1 --mds $MASTER_MDS1 \
+--fstype $BACK_FSTYPE --dev $MDS1_MASTER_DEV --size $MDSSIZE \
+--lmv $MASTER_LMV --format --filesystem $FS_MASTER || exit 10
+
+${LMC} -m $config --add mds --node $NODE1 --mds $MASTER_MDS2 \
+--fstype $BACK_FSTYPE --dev $MDS2_MASTER_DEV --size $MDSSIZE \
+--lmv $MASTER_LMV --format --filesystem $FS_MASTER || exit 10
+
+${LMC} -m $config --add lov --lov $MASTER_LOV --lmv $MASTER_LMV \
+--stripe_sz $STRIPE_SIZE --stripe_cnt $STRIPECNT --stripe_pattern 0 || exit 20
+
+${LMC} -m $config --add ost --ost $MASTER_OST --node $NODE1 --lov $MASTER_LOV \
+--fstype $BACK_FSTYPE --dev $OST_MASTER_DEV --size $OSTSIZE --filesystem $FS_MASTER || exit 21
+
+${LMC} -m $config --add cmobd --node $NODE1 --cmobd $CMOBD_MDS1 \
+--master_obd $MASTER_LMV --cache_obd $CACHE_MDS1 || exit 23
+
+${LMC} -m $config --add cmobd --node $NODE1 --cmobd $CMOBD_OST1 \
+--master_obd $MASTER_LOV --cache_obd $CACHE_OST1 || exit 23
+
+# node 2
+${LMC} -m $config --add node --node $NODE2 || exit 1
+
+${LMC} -m $config --add net --node $NODE2 --nid `h2$NIDTYPE $NODE2` \
+--nettype $NETTYPE || exit 1
+
+${LMC} -m $config --add mds --node $NODE2 --mds $CACHE_MDS2 \
+--fstype $FSTYPE --backfstype $BACK_FSTYPE --dev $MDS2_CACHE_DEV \
+--mountfsoptions $MDS_MOUNT_OPTS --size $MDSSIZE --format \
+--filesystem $FS_NODE2 || exit 10
+
+${LMC} -m $config --add lov --lov $CACHE_LOV2 --mds $CACHE_MDS2 \
+--stripe_sz $STRIPE_SIZE --stripe_cnt $STRIPECNT --stripe_pattern 0 || exit 20
+
+${LMC} -m $config --add ost --ost $CACHE_OST2 --node $NODE2 --lov $CACHE_LOV2 \
+--fstype $FSTYPE --backfstype $BACK_FSTYPE --dev $OST2_CACHE_DEV --size $OSTSIZE \
+--filesystem $FS_NODE2 --mountfsoptions $OST_MOUNT_OPTS || exit 21
+
+${LMC} -m $config --add cmobd --node $NODE2 --cmobd $CMOBD_MDS2 \
+--master_obd $MASTER_LMV --cache_obd $CACHE_MDS2 || exit 23
+
+${LMC} -m $config --add cmobd --node $NODE2 --cmobd $CMOBD_OST2 \
+--master_obd $MASTER_LOV --cache_obd $CACHE_OST2 || exit 23
+
+# client of node1
+${LMC} -m $config --add cobd --node $NODE1 --cobd $COBD_MDS1 \
+--master_obd $MASTER_LMV --cache_obd $CACHE_MDS1 || exit 22
+
+${LMC} -m $config --add cobd --node $NODE1 --cobd $COBD_LOV1 \
+--master_obd $MASTER_LOV --cache_obd $CACHE_LOV1 || exit 22
+
+${LMC} -m $config --add mtpt --filesystem $FS_NODE1 --node $NODE1 \
+--path /mnt/lustre --lmv $COBD_MDS1 --lov $COBD_LOV1 || exit 30
+
+# client of node2
+${LMC} -m $config --add cobd --node $NODE2 --cobd $COBD_MDS2 \
+--master_obd $MASTER_LMV --cache_obd $CACHE_MDS2 || exit 22
+
+${LMC} -m $config --add cobd --node $NODE2 --cobd $COBD_LOV2 \
+--master_obd $MASTER_LOV --cache_obd $CACHE_LOV2 || exit 22
+
+${LMC} -m $config --add mtpt --filesystem $FS_NODE2 --node $NODE2 \
+--path /mnt/lustre --lmv $COBD_MDS2 --lov $COBD_LOV2 || exit 30
index b40a4df..bbc5787 100755 (executable)
@@ -1625,10 +1625,13 @@ class LOV(Module):
                 self.osclist.append((osc, index, gen, active))
             else:
                 panic('osc not found:', obd_uuid)
+    
     def get_uuid(self):
         return self.uuid
+    
     def get_name(self):
         return self.name
+    
     def prepare(self):
         if not config.record and is_prepared(self.name):
             return
@@ -1741,7 +1744,8 @@ class GKD(Module):
         else:
             self.active = 0
       
-       self.uuid = target_uuid  
+       self.uuid = target_uuid
+       
     def prepare(self):
         if is_prepared(self.name):
             return
@@ -1785,7 +1789,6 @@ class GKD(Module):
     def correct_level(self, level, op=None):
         return level
 
-
 class CONFDEV(Module):
     def __init__(self, db, name, target_uuid, uuid):
         Module.__init__(self, 'CONFDEV', db)
@@ -1800,6 +1803,7 @@ class CONFDEV(Module):
        self.target = self.db.lookup(target_uuid)
         self.name = "conf_%s" % self.target.getName()
         self.client_uuids = self.target.get_refs('client')
+        self.fs_uuid = self.db.get_first_ref('filesystem')
         self.obdtype = self.db.get_val('obdtype', '')
        
         self.mds_sec = self.db.get_val('mds_sec', '')
@@ -1967,6 +1971,19 @@ class CONFDEV(Module):
     def add_module(self, manager):
        manager.add_lustre_module('obdclass', 'confobd')
 
+    # this method checks if current OBD belong to the same FS as passed
+    # mount point uuid. If not - do not write mountpoint and echo client 
+    # to log, it is not needed, but take damn long time (WB test case) 
+    # bug #7210  --umka
+    def belong_to_fs(self, mtpt_uuid):
+       mtpt = self.db.lookup(mtpt_uuid)
+        fs_uuid = mtpt.get_first_ref('filesystem')
+       
+       if not self.fs_uuid or self.fs_uuid == "" or fs_uuid == self.fs_uuid:
+           return 1
+    
+        return 0
+    
     def write_conf(self):
         if self.target.get_class() == 'ost':
             config.record = 1
@@ -2048,8 +2065,9 @@ class CONFDEV(Module):
                 # refactor this into a funtion to test "clientness"
                 # of a node.
                 for ref_class, ref_uuid in prof_db.get_all_refs():
-                    if ref_class in ('mountpoint','echoclient'):
-                        debug("recording", client_name)
+                    if ref_class in ('mountpoint','echoclient') and self.belong_to_fs(ref_uuid):
+                        debug("recording:", client_name)
+                       log("recording mountpoint:", ref_uuid)
                         old_noexec = config.noexec
                         config.noexec = 0
                         noexec_opt = ('', '-n')
@@ -2176,7 +2194,7 @@ class MDSDEV(Module):
        if self.master != None:
             self.master.add_module(manager)
  
-       # add CONFOBD modules
+       # add CONFDEV modules
        if self.confobd != None:
             self.confobd.add_module(manager)
            
@@ -2356,7 +2374,7 @@ class OSD(Module):
 
         manager.add_lustre_module(self.osdtype, self.osdtype)
        
-       # add CONFOBD modules
+       # add CONFDEV modules
        if self.confobd != None:
             self.confobd.add_module(manager)
 
@@ -2625,12 +2643,16 @@ class CMOBD(Module):
         master_class = master_obd.get_class()
         cache_class = cache_obd.get_class()
 
-       if master_class == 'ost' or master_class == 'lov':
+       if master_class == 'lov':
            client_uuid = "%s_lov_master_UUID" % (self.name)
             self.master = LOV(master_obd, client_uuid, self.name,
                              "master_%s" % (self.name));
+        elif master_class == 'ost':
+           client_uuid = "%s_ost_master_UUID" % (self.name)
+           self.master = get_osc(master_obd, client_uuid, self.master_uuid) 
         elif master_class == 'mds':
-           self.master = get_mdc(db, self.name, self.master_uuid) 
+           client_uuid = "%s_mds_master_UUID" % (self.name)
+           self.master = get_mdc(master_obd, client_uuid, self.master_uuid) 
         elif master_class == 'lmv':
            client_uuid = "%s_lmv_master_UUID" % (self.name)
             self.master = LMV(master_obd, client_uuid, self.name, 
@@ -2638,24 +2660,20 @@ class CMOBD(Module):
        else:
            panic("unknown master obd class '%s'" %(master_class))
            
-       if cache_class == 'ost' or cache_class == 'lov':
-           client_uuid = "%s_lov_cache_UUID" % (self.name)
-            self.cache = LOV(cache_obd, client_uuid, self.name,
-                            "cache_%s" % (self.name));
+        if cache_class == 'ost':
+           self.cache = get_osc(cache_obd, cache_obd.getUUID(), 
+                                self.cache_uuid)
         elif cache_class == 'mds':
-           self.cache = get_mdc(db, self.name, self.cache_uuid)
-        elif cache_class == 'lmv':
-           client_uuid = "%s_lmv_cache_UUID" % (self.name) 
-            self.cache = LMV(cache_obd, client_uuid, self.name, 
-                            "cache_%s" % (self.name));
+           self.cache = get_mdc(cache_obd, cache_obd.getUUID(), 
+                                self.cache_uuid)
        else:
-           panic("unknown cache obd class '%s'" %(cache_class))
+           panic("invalid cache obd class '%s'" %(cache_class))
 
     def prepare(self):
-        self.master.prepare()
         if not config.record and is_prepared(self.name):
             return
         self.info(self.master_uuid, self.cache_uuid)
+        self.master.prepare()
         lctl.newdev("cmobd", self.name, self.uuid,
                     setup ="%s %s" %(self.master.uuid,
                                      self.cache.uuid))
@@ -2766,7 +2784,7 @@ class COBD(Module):
         manager.add_lustre_module('cobd', 'cobd')
         self.master.add_module(manager)
 
-# virtual interface for  OSC and LOV
+# virtual interface for OSC and LOV
 class VOSC(Module):
     def __init__(self, db, client_uuid, name, name_override = None):
         Module.__init__(self, 'VOSC', db)
@@ -2865,7 +2883,7 @@ def generate_client_uuid(name):
         return client_uuid[:36]
 
 class Mountpoint(Module):
-    def __init__(self,db):
+    def __init__(self, db):
         Module.__init__(self, 'MTPT', db)
         self.path = self.db.get_val('path')
         self.clientoptions = self.db.get_val('clientoptions', '')
@@ -2875,7 +2893,7 @@ class Mountpoint(Module):
        if not self.mds_uuid:
            self.mds_uuid = fs.get_first_ref('mds')
         self.obd_uuid = fs.get_first_ref('obd')
-        self.gks_uuid =fs.get_first_ref('gks')
+        self.gks_uuid = fs.get_first_ref('gks')
        client_uuid = generate_client_uuid(self.name)
 
         self.oss_sec = self.db.get_val('oss_sec','null')
@@ -2904,7 +2922,8 @@ class Mountpoint(Module):
        self.vmdc = VMDC(mds, client_uuid, self.name, self.name)
 
        if self.gks_uuid:
-           self.gkc = get_gkc(db, client_uuid, self.name, self.gks_uuid)      
+           self.gkc = get_gkc(db, client_uuid, self.name, self.gks_uuid)
+           
     def prepare(self):
         if not config.record and fs_is_mounted(self.path):
             log(self.path, "already mounted.")
@@ -2979,6 +2998,7 @@ class Mountpoint(Module):
         manager.add_lustre_module('llite', 'llite')
        if self.gks_uuid:
            manager.add_lustre_module('sec/gks', 'gkc')
+
     def correct_level(self, level, op=None):
         return level
 
@@ -3587,7 +3607,6 @@ def sysctl(path, val):
     except IOError, e:
         panic(str(e))
 
-
 def sys_set_debug_path():
     sysctl('portals/debug_path', config.debug_path)
 
index b50b376..9f4893c 100755 (executable)
@@ -81,6 +81,10 @@ Object creation command summary:
   --irq_affinity 0|1
   --router
 
+--add filesystem
+  --filesystem fs_name
+  --gks gks_name
+
 --add mds
   --node node_name
   --mds mds_name
@@ -102,14 +106,16 @@ Object creation command summary:
   --mds_mds_sec flavor
   --mds_oss_sec flavor
   --mds_deny_sec flavor[,flavor[...]]
+  --filesystem filesystem name
 
 --add lov
   --lov lov_name
   --mds mds_name
+  --lmv lmv_name
+  --aware one or few mds/lmv names separated by comma
   --stripe_sz num
   --stripe_cnt num
   --stripe_pattern num
-  --lmv lmv_name
 
 --add ost
   --node node_name
@@ -130,11 +136,13 @@ Object creation command summary:
   --mountfsoptions options
   --nspath
   --ost_deny_sec flavor[,flavor[...]]
+  --filesystem filesystem name
  
 --delete ost
   --node node_name
   --ost ost_name
   --migrate
+  
 --deactivate ost
   --node node_name
   --ost ost_name
@@ -250,6 +258,7 @@ lmc_options = [
     ('mds_oss_sec', "Specify the secure flavor for connection from this mds to ost.", PARAM, ""),
     ('mds_deny_sec', "Specify the secure flavor which is denied from remote to this mds.", PARAM, ""),
     ('ost_deny_sec', "Specify the secure flavor which is denied from remote to this ost.", PARAM, ""),
+    ('filesystem', "Specify the filesystem name device belong to.", PARAM, ""),
     ('format', ""),
     ('migrate', "used for offline migrate of an ost in conjunctio with add/delete"),
 
@@ -263,25 +272,23 @@ lmc_options = [
 
     # lov
     ('lov', "Specify LOV name.", PARAM,""),
+    ('mds/lmv', "Specify MDS/LMV name using this LOV.", PARAM,""),
+    ('aware', "Specify MDS/LMV aware of this LOV.", PARAM,""),
     ('index', "Specify index for OBD in LOV target table.", PARAM),
     ('stripe_sz', "Specify the stripe size in bytes.", PARAM),
     ('stripe_cnt', "Specify the number of OSTs each file should be striped on.", PARAM, 0),
     ('stripe_pattern', "Specify the stripe pattern. RAID 0 is the only one currently supported.", PARAM, 0),
 
     # cobd
-    
     ('master_obd', "Specify the real device for the cache obd system.", PARAM),
     ('cache_obd', "Specify the cache device for the cache obd system.", PARAM),
     ('cobd', "Specify COBD name", PARAM),
-    ('cachelmv', "Specify cache lmv name", PARAM, ""),
-    ('masterlmv', "Specify master lmv name", PARAM, ""),
 
     # cmobd
     ('master_obd', "Specify the master device for the cmobd system.", PARAM),
     ('cache_obd',  "Specify the cache device for the cmobd obd system.", PARAM),
     ('cmobd',      "Specify COBD name", PARAM),
 
-
     ('mgmt', "Specify management/monitoring service name.", PARAM, ""),
 
     # lmv
@@ -457,7 +464,7 @@ class GenConfig:
     def osd(self, name, uuid, fstype, osdtype, devname, format, ost_uuid,
             node_uuid, dev_size=0, journal_size=0, inode_size=0, nspath="", 
             mkfsoptions="", mountfsoptions="", backfstype="", backdevname="",
-            deny_sec=""):
+            deny_sec="", fs_uuid=""):
         osd = self.newService("osd", name, uuid)
         osd.setAttribute('osdtype', osdtype)
         osd.appendChild(self.ref("target", ost_uuid))
@@ -485,6 +492,8 @@ class GenConfig:
                 self.addElement(osd, "mountfsoptions", mountfsoptions)
             if deny_sec:
                 self.addElement(osd, "deny_sec", deny_sec)
+            if fs_uuid:
+               osd.appendChild(self.ref("filesystem", fs_uuid))
         if nspath:
             self.addElement(osd, "nspath", nspath)
         return osd
@@ -565,7 +574,7 @@ class GenConfig:
                mds_uuid, dev_size=0, journal_size=0, inode_size=256,
                nspath="", mkfsoptions="", mountfsoptions="", backfstype="", 
                backdevname="",lmv_uuid="", root_squash="", no_root_squash="",
-               mds_sec="", oss_sec="", deny_sec=""):
+               mds_sec="", oss_sec="", deny_sec="", fs_uuid=""):
         mdd = self.newService("mdsdev", name, uuid)
         self.addElement(mdd, "fstype", fstype)
         if backfstype:
@@ -596,6 +605,8 @@ class GenConfig:
             self.addElement(mdd, "oss_sec", oss_sec)
         if deny_sec:
             self.addElement(mdd, "deny_sec", deny_sec)
+        if fs_uuid:
+           mdd.appendChild(self.ref("filesystem", fs_uuid))
         mdd.appendChild(self.ref("node", node_uuid))
         mdd.appendChild(self.ref("target", mds_uuid))
        
@@ -605,7 +616,6 @@ class GenConfig:
 
        if lmv_uuid:
             mdd.appendChild(self.ref("lmv", lmv_uuid))
-           self.addElement(mdd, "lmv", lmv_uuid)
 
         return mdd
 
@@ -631,9 +641,13 @@ class GenConfig:
 
     def filesystem(self, name, uuid, mds_uuid, obd_uuid, mgmt_uuid, gks_uuid):
         fs = self.newService("filesystem", name, uuid)
-        fs.appendChild(self.ref("mds", mds_uuid))
-        fs.appendChild(self.ref("obd", obd_uuid))
-        fs.appendChild(self.ref("gks", gks_uuid))
+       
+       if mds_uuid:
+           fs.appendChild(self.ref("mds", mds_uuid))
+       if obd_uuid:
+           fs.appendChild(self.ref("obd", obd_uuid))
+       if gks_uuid:
+           fs.appendChild(self.ref("gks", gks_uuid))
         if mgmt_uuid:
             fs.appendChild(self.ref("mgmt", mgmt_uuid))
         return fs
@@ -745,10 +759,10 @@ def name2uuid(lustre, name, tag="",  fatal=1):
             return ""
     return getUUID(ret)
 
-def lookup_filesystem(lustre, mds_uuid, ost_uuid):
+def lookup_filesystem(lustre, fs_name):
     for n in lustre.childNodes:
         if n.nodeType == n.ELEMENT_NODE and n.nodeName == 'filesystem':
-            if ref_exists(n, mds_uuid) and ref_exists(n, ost_uuid):
+            if getName(n) == fs_name:
                 return getUUID(n)
     return None
 
@@ -796,14 +810,12 @@ def lov_add_obd(gen, lustre, lov, osc_uuid, options):
              uuidref = tgt.getAttribute('uuidref')
              tmp = int(tgt.getAttribute('index'))
              own_lov_uuid = tgt.getAttribute('lov_uuid')
-             if tmp != lov_index:
-                 error('malformed xml: LOV targets are not ordered; found index '+str(tmp)+', expected '+str(lov_index)+'.')
-             uuidref = tgt.getAttribute('uuidref')
+            if lov_uuid != own_lov_uuid:
+               continue
              if uuidref == '':
                  lov_mod_obd(gen, lustre, lov, tgt, osc_uuid, options)
                  return
-            if own_lov_uuid == lov_uuid:  
-               lov_index = lov_index + 1
+            lov_index = lov_index + 1
 
     lov.appendChild(gen.lov_tgt(osc_uuid, lov_uuid, str(lov_index), '1'))
     addUpdate(gen, lustre, gen.add(getUUID(lov), lov_uuid, str(lov_index), '1'))
@@ -926,7 +938,6 @@ def do_add_node(gen, lustre,  options, node_name):
 
     return node
 
-
 def add_node(gen, lustre, options):
     """ create a node with a network config """
 
@@ -1078,6 +1089,7 @@ def add_mds(gen, lustre, options):
     mds_sec = get_option(options, 'mds_mds_sec')
     oss_sec = get_option(options, 'mds_oss_sec')
     deny_sec = get_option(options, 'mds_deny_sec')
+    fs_name = get_option(options, 'filesystem')
 
     node_uuid = name2uuid(lustre, node_name, 'node')
 
@@ -1090,14 +1102,19 @@ def add_mds(gen, lustre, options):
     if lmv_name:
         mds.appendChild(gen.ref("lmv", lmv_uuid))
 
+    if fs_name != "":
+       fs_uuid = name2uuid(lustre, fs_name, 'filesystem', fatal=1)
+    else:
+       fs_uuid = ""
+    
     mdd = gen.mdsdev(mdd_name, mdd_uuid, fstype, devname,
                      get_format_flag(options), node_uuid, mds_uuid,
                      size, journal_size, inode_size, nspath, mkfsoptions, 
                      mountfsoptions, backfstype, backdevname,lmv_uuid, 
-                    root_squash, no_root_squash, mds_sec, oss_sec, deny_sec)
+                    root_squash, no_root_squash, mds_sec, oss_sec, deny_sec, 
+                    fs_uuid)
     lustre.appendChild(mdd)
 
-
 def add_mgmt(gen, lustre, options):
     node_name = get_option(options, 'node')
     node_uuid = name2uuid(lustre, node_name, 'node')
@@ -1145,6 +1162,7 @@ def add_ost(gen, lustre, options):
         mountfsoptions = get_option(options, 'mountfsoptions')
         deny_sec = get_option(options, 'ost_deny_sec')
 
+    fs_name = get_option(options, 'filesystem')
     nspath = get_option(options, 'nspath')
 
     ostname = get_option(options, 'ost')
@@ -1177,20 +1195,18 @@ def add_ost(gen, lustre, options):
     if options.failover:
         ost.setAttribute('failover', "1")
 
-
+    if fs_name != "":
+       fs_uuid = name2uuid(lustre, fs_name, 'filesystem', fatal=1)
+    else:
+       fs_uuid = ""
+       
     osd = gen.osd(osdname, osd_uuid, fstype, osdtype, devname,
                   get_format_flag(options), ost_uuid, node_uuid, size,
                   journal_size, inode_size, nspath, mkfsoptions, 
-                  mountfsoptions, backfstype, backdevname, deny_sec)
+                  mountfsoptions, backfstype, backdevname, deny_sec, 
+                 fs_uuid)
 
     node = findByName(lustre, node_name, "node")
-
-##     if node_add_profile(gen, node, 'oss', oss_uuid):
-##         ossname = 'OSS'
-##         oss_uuid = new_uuid(ossname)
-##         oss = gen.oss(ossname, oss_uuid)
-##         lustre.appendChild(oss)
-
     node_add_profile(gen, node, 'osd', osd_uuid)
     lustre.appendChild(osd)
 
@@ -1262,11 +1278,9 @@ def add_cmobd(gen, lustre, options):
             cache_uuid = name2uuid(lustre, cache_name, tag='mds', fatal=0)
 
     if not master_uuid: 
-       panic("add_cmobd", "cannot find master_uuid by name '" + 
-              master_name + "'")
+       error("cannot find master_uuid by name '" + master_name + "'")
     if not cache_uuid: 
-       panic("add_cmobd", "cannot find cache_uuid by name '" + 
-              cache_name + "'")
+       error("cannot find cache_uuid by name '" + cache_name + "'")
 
     node = findByName(lustre, node_name, "node")
     node_add_profile(gen, node, "cmobd", uuid)
@@ -1274,11 +1288,9 @@ def add_cmobd(gen, lustre, options):
     master_node = lookup(lustre, master_uuid)
     cache_node = lookup(lustre, cache_uuid)
     if not master_node:
-        panic("cmobd_add", "cannot find master node by its uuid " + 
-              master_uuid);
+        error("cannot find master node by its uuid " + master_uuid);
     if not cache_node:
-        panic("cmobd_add", "cannot find cache node by its uuid " + 
-              cache_uuid);
+        error("cannot find cache node by its uuid " + cache_uuid);
 
     active = master_node.getElementsByTagName('active_ref')
     if active:
@@ -1325,7 +1337,7 @@ def add_cobd(gen, lustre, options):
 
     # init cache
     cache_uuid = name2uuid(lustre, cache_name, tag='lov', fatal=0)
-    if not not cache_uuid:
+    if not cache_uuid:
         cache_uuid = name2uuid(lustre, cache_name, tag='ost', fatal=0)
 
     if cache_uuid:
@@ -1394,22 +1406,23 @@ def add_echo_client(gen, lustre, options):
     echo = gen.echo_client(echoname, echo_uuid, lov_uuid)
     lustre.appendChild(echo)
 
-
 def add_lov(gen, lustre, options):
     """ create a lov """
 
-    lmv_name = get_option(options, 'lmv')
-    cache_lmv_name = get_option(options, 'cachelmv')
-    master_lmv_name = get_option(options, 'masterlmv')
     lov_orig = get_option(options, 'lov')
     name = new_name(lov_orig)
     if name != lov_orig:
         warning("name:", lov_orig, "already used. using:", name)
 
+    lmv_name = get_option(options, 'lmv')
     mds_name = get_option(options, 'mds')
-    if not mds_name and not lmv_name and not cache_lmv_name and not master_lmv_name:
-        error("LOV: MDS or LMV must be specified.");
-
+    aware = get_option(options, 'aware')
+    
+    if not mds_name and not lmv_name:
+        error("LOV: either MDS or LMV must be specified.");
+    if mds_name and lmv_name:
+        error("LOV: either MDS or LMV must be specified.");
+    
     stripe_sz = get_option_int(options, 'stripe_sz')
     stripe_cnt = get_option_int(options, 'stripe_cnt')
     pattern = get_option_int(options, 'stripe_pattern')
@@ -1430,34 +1443,34 @@ def add_lov(gen, lustre, options):
     # add an lovconfig entry to the active mdsdev profile
     lovconfig_name = new_name('LVCFG_' + name)
     lovconfig_uuid = new_uuid(lovconfig_name)
-    if mds_name:
-        mds = findByName(lustre, mds_name, "mds")
-        if not mds:
-            panic("add_lov", "can't find MDS '" + mds_name + "'")
-        mds.appendChild(gen.ref("lovconfig", lovconfig_uuid))
-        mds.appendChild(gen.ref("client", uuid))
 
-    if lmv_name:
-        lmv = findByName(lustre, lmv_name, "lmv")
-        if not lmv:
-            panic("add_lov", "can't find LMV '" + lmv_name + "'")
-        lmv.appendChild(gen.ref("lovconfig", lovconfig_uuid))
-        lmv.appendChild(gen.ref("client", uuid))
-       
-    if cache_lmv_name:
-        lmv = findByName(lustre, cache_lmv_name, "lmv")
-        if not lmv:
-            panic("add_lov", "can't find LMV '" + cache_lmv_name + "'")
-        lmv.appendChild(gen.ref("lovconfig", lovconfig_uuid))
-        lmv.appendChild(gen.ref("client", uuid))
+    if mds_name:
+       md_tgt = findByName(lustre, mds_name, "mds")
+       if not md_tgt:
+           error("can't find MDS '" + mds_name + "'")
+    else:
+        md_tgt = findByName(lustre, lmv_name, "lmv")
+       if not md_tgt:
+           error("can't find LMV '" + lmv_name + "'")
     
-    if master_lmv_name:
-        lmv = findByName(lustre, master_lmv_name, "lmv")
-        if not lmv:
-            panic("add_lov", "can't find LMV '" + master_lmv_name + "'")
-        lmv.appendChild(gen.ref("lovconfig", lovconfig_uuid))
-        lmv.appendChild(gen.ref("client", uuid))
-
+    md_tgt.appendChild(gen.ref("lovconfig", lovconfig_uuid))
+    md_tgt.appendChild(gen.ref("client", uuid))
+    
+    # adding lovconfig and client to aware MD targets
+    if aware:
+       md_names = string.split(aware, ',');
+       for md_name in md_names:
+       
+           md_tgt = findByName(lustre, md_name, "mds")
+           if not md_tgt:
+               md_tgt = findByName(lustre, md_name, "lmv")
+
+           if not md_tgt:
+               error("can't find '" + mds_name + "'")
+           
+           md_tgt.appendChild(gen.ref("lovconfig", lovconfig_uuid))
+           md_tgt.appendChild(gen.ref("client", uuid))
+       
     lovconfig = gen.lovconfig(lovconfig_name, lovconfig_uuid, uuid)
     lustre.appendChild(lovconfig)
 
@@ -1501,33 +1514,16 @@ def add_lmv(gen, lustre, options):
     lmv = gen.lmv(name, uuid)
     lustre.appendChild(lmv)
    
-def find_client(lustre, mds_uuid, client_uuid):
-    mds = lookup(lustre, mds_uuid)
-    clients = mds.getElementsByTagName('client_ref')
-    
-    if clients:
-       for client in clients:
-           if client.getAttribute("uuidref") == client_uuid:
-               return 1
-    return 0
-    
-def new_filesystem(gen, lustre, mds_uuid, obd_uuid, mgmt_uuid, gks_uuid):
-    fs_name = new_name("FS_fsname")
+def new_filesystem(gen, lustre, fs_name, mds_uuid, obd_uuid, 
+                  mgmt_uuid, gks_uuid):
     fs_uuid = new_uuid(fs_name)
-    
-    mds = lookup(lustre, mds_uuid)
-    clients = mds.getElementsByTagName('client_ref')
-    
-    if find_client(lustre, mds_uuid, obd_uuid) == 0:
-       mds.appendChild(gen.ref("cl