Whamcloud - gitweb
- possibly (I'm almost sure) fix for FLD lookup err -2. Allocating new seq and setup...
authoryury <yury>
Sun, 15 Oct 2006 19:36:47 +0000 (19:36 +0000)
committeryury <yury>
Sun, 15 Oct 2006 19:36:47 +0000 (19:36 +0000)
- fix in lmv_placement_policy(). Take into account split dirs for creating child dirs too, not only files.

lustre/cmm/cmm_split.c
lustre/cmm/mdc_device.c
lustre/cmm/mdc_internal.h
lustre/include/obd.h
lustre/lmv/lmv_intent.c
lustre/lmv/lmv_internal.h
lustre/lmv/lmv_obd.c

index fe8c0ef..e7e4bc2 100644 (file)
@@ -149,16 +149,20 @@ static int cmm_fid_alloc(const struct lu_env *env,
         LASSERT(cmm != NULL);
         LASSERT(mc != NULL);
         LASSERT(fid != NULL);
-        
+
+        down(&mc->mc_fid_sem);
+
+        /* Alloc new fid on @mc. */
         rc = obd_fid_alloc(mc->mc_desc.cl_exp, fid, NULL);
         if (rc > 0) {
-                /* Setup FLD for new sequence. */
-                rc = fld_client_create(cmm->cmm_fld,
-                                       fid_seq(fid),
+                /* Setup FLD for new sequenceif needed. */
+                rc = fld_client_create(cmm->cmm_fld, fid_seq(fid),
                                        mc->mc_num, env);
                 if (rc)
                         CERROR("Can't create fld entry, rc %d\n", rc);
         }
+        up(&mc->mc_fid_sem);
+        
         RETURN(rc);
 }
 
index 49a902b..c98f10b 100644 (file)
@@ -239,6 +239,8 @@ struct lu_device *mdc_device_alloc(const struct lu_env *env,
                 mc->mc_md_dev.md_ops = &mdc_md_ops;
                ld = mdc2lu_dev(mc);
                 ld->ld_ops = &mdc_lu_ops;
+                sema_init(&mc->mc_fid_sem, 1);
+
         }
 
         RETURN (ld);
index afe9ca4..eb846f9 100644 (file)
@@ -52,6 +52,7 @@ struct mdc_device {
         struct list_head        mc_linkage;
         mdsno_t                 mc_num;
         struct mdc_cli_desc     mc_desc;
+        struct semaphore        mc_fid_sem;
 };
 
 struct mdc_thread_info {
index b32f33e..b1cbee6 100644 (file)
@@ -670,6 +670,7 @@ struct lmv_tgt_desc {
         struct obd_export       *ltd_exp;
         int                     active;   /* is this target up for requests */
         int                     idx;
+        struct semaphore        fid_sem;
 };
 
 struct lmv_obd {
index ddf39d7..76411ce 100644 (file)
@@ -147,35 +147,31 @@ int lmv_alloc_fid_for_split(struct obd_device *obd, struct lu_fid *pid,
         ENTRY;
 
         obj = lmv_obj_grab(obd, pid);
-        if (!obj)
+        if (!obj) {
+                CERROR("Object "DFID" should be split\n",
+                       PFID(pid));
                 RETURN(0);
+        }
         
         mds = raw_name2idx(obj->lo_hashtype, obj->lo_objcount,
                            (char *)op->name, op->namelen);
         rpid = &obj->lo_inodes[mds].li_fid;
         rc = lmv_fld_lookup(lmv, rpid, &mds);
+        lmv_obj_put(obj);
         if (rc)
-                GOTO(cleanup, rc);
+                RETURN(rc);
 
-        rc = obd_fid_alloc(lmv->tgts[mds].ltd_exp, fid, NULL);
-        if (rc > 0) {
-                LASSERT(fid_is_sane(fid));
-                rc = fld_client_create(&lmv->lmv_fld,
-                                       fid_seq(fid), mds, NULL);
-                if (rc) {
-                        CERROR("Can't create fld entry, rc %d\n", rc);
-                        GOTO(cleanup, rc);
-                }
-        }
-        if (rc == 0) {
-                CDEBUG(D_INFO, "Allocate new fid "DFID" for split "
-                       "obj\n", PFID(fid));
+        rc = __lmv_fid_alloc(lmv, fid, mds);
+        if (rc) {
+                CERROR("Can't allocate new fid, rc %d\n",
+                       rc);
+                RETURN(rc);
         }
         
-        EXIT;
-cleanup:
-        lmv_obj_put(obj);
-        return rc;
+        CDEBUG(D_INFO, "Allocate new fid "DFID" for split "
+               "obj\n", PFID(fid));
+        
+        RETURN(rc);
 }
 
 /*
index af1454e..4e5b397 100644 (file)
@@ -138,6 +138,8 @@ int lmv_blocking_ast(struct ldlm_lock *, struct ldlm_lock_desc *,
                     void *, int);
 int lmv_fld_lookup(struct lmv_obd *lmv, const struct lu_fid *fid,
                    mdsno_t *mds);
+int __lmv_fid_alloc(struct lmv_obd *lmv, struct lu_fid *fid,
+                    mdsno_t mds);
 int lmv_alloc_fid_for_split(struct obd_device *obd, struct lu_fid *pid,
                             struct md_op_data *op, struct lu_fid *fid);
 
index a9cae35..1e9d1f5 100644 (file)
@@ -725,6 +725,7 @@ static int lmv_placement_policy(struct obd_device *obd,
                                 mdsno_t *mds)
 {
         struct lmv_obd *lmv = &obd->u.lmv;
+        struct lmv_obj *obj;
         int rc;
         ENTRY;
 
@@ -737,53 +738,42 @@ static int lmv_placement_policy(struct obd_device *obd,
                  * balanced, that is all sequences have more or less equal
                  * number of objects created.
                  */
-                if (hint->ph_cname && (hint->ph_opc == LUSTRE_OPC_MKDIR)) {
-#if 1
-                        *mds = lmv_all_chars_policy(lmv->desc.ld_tgt_count,
-                                                    hint->ph_cname);
-                        rc = 0;
-#else
-                        /* Stress policy for tests - to use non-parent MDS */
-                        LASSERT(fid_is_sane(hint->ph_pfid));
-                        rc = lmv_fld_lookup(lmv, hint->ph_pfid, mds);
+                obj = lmv_obj_grab(obd, hint->ph_pfid);
+                if (obj) {
+                        /*
+                         * If the dir got split, alloc fid according to its
+                         * hash. No matter what we create, object create should
+                         * go to correct MDS. 
+                         */
+                        struct lu_fid *rpid;
+
+                        *mds = raw_name2idx(obj->lo_hashtype,
+                                            obj->lo_objcount,
+                                            hint->ph_cname->name,
+                                            hint->ph_cname->len);
+                        rpid = &obj->lo_inodes[*mds].li_fid;
+                        rc = lmv_fld_lookup(lmv, rpid, mds);
+                        lmv_obj_put(obj);
                         if (rc)
-                                RETURN(rc);
-                        *mds = (int)(*mds + 1) % lmv->desc.ld_tgt_count;
-
-#endif
-                } else {
-                        struct lmv_obj *obj;
-                        LASSERT(fid_is_sane(hint->ph_pfid));
-
-                        obj = lmv_obj_grab(obd, hint->ph_pfid);
-                        if (obj) {
-                                /*
-                                 * If the dir got split, alloc fid according to
-                                 * its hash
-                                 */
-                                struct lu_fid *rpid;
-
-                                *mds = raw_name2idx(obj->lo_hashtype,
-                                                    obj->lo_objcount,
-                                                    hint->ph_cname->name,
-                                                    hint->ph_cname->len);
-                                rpid = &obj->lo_inodes[*mds].li_fid;
-                                lmv_obj_put(obj);
+                                GOTO(exit, rc);
+                        rc = 0;
                                 
-                                rc = lmv_fld_lookup(lmv, rpid, mds);
-                                if (rc)
-                                        GOTO(exit, rc);
-
-                                CDEBUG(D_INODE, "The obj "DFID" has been"
-                                       "split, got MDS at "LPU64" by name %s\n",
-                                       PFID(hint->ph_pfid), *mds,
-                                       hint->ph_cname->name);
+                        CDEBUG(D_INODE, "The obj "DFID" has been split, got "
+                               "MDS at "LPU64" by name %s\n",PFID(hint->ph_pfid),
+                               *mds, hint->ph_cname->name);
+                } else {
+                        if (hint->ph_cname && (hint->ph_opc == LUSTRE_OPC_MKDIR)) {
+                                /* Default policy for directories. */
+                                *mds = lmv_all_chars_policy(lmv->desc.ld_tgt_count,
+                                                            hint->ph_cname);
                                 rc = 0;
                         } else {
-                                /* Default policy is to use parent MDS */
+                                /*
+                                 * Default policy for others is to use parent
+                                 * MDS.
+                                 */
                                 rc = lmv_fld_lookup(lmv, hint->ph_pfid, mds);
                         }
-
                 }
         } else {
                 /*
@@ -792,7 +782,7 @@ static int lmv_placement_policy(struct obd_device *obd,
                  * yet!
                  */
                 *mds = 0;
-                rc = -EINVAL;
+                rc = -ENOSYS;
         }
 exit:
         if (rc) {
@@ -840,6 +830,34 @@ static int lmv_fid_fini(struct obd_export *exp)
         RETURN(rc);
 }
 
+int __lmv_fid_alloc(struct lmv_obd *lmv, struct lu_fid *fid,
+                    mdsno_t mds)
+{
+        struct lmv_tgt_desc *tgt = &lmv->tgts[mds];
+        int rc;
+        ENTRY;
+
+        /* New seq alloc and FLD setup should be atomic. */
+        down(&tgt->fid_sem);
+
+        /* Asking underlaying tgt layer to allocate new fid. */
+        rc = obd_fid_alloc(tgt->ltd_exp, fid, NULL);
+        if (rc > 0) {
+                LASSERT(fid_is_sane(fid));
+
+                /* Client switches to new sequence, setup FLD. */
+                rc = fld_client_create(&lmv->lmv_fld, fid_seq(fid),
+                                       mds, NULL);
+                if (rc) {
+                        CERROR("Can't create fld entry, "
+                               "rc %d\n", rc);
+                }
+        }
+
+        up(&tgt->fid_sem);
+        RETURN(rc);
+}
+
 static int lmv_fid_alloc(struct obd_export *exp, struct lu_fid *fid,
                          struct lu_placement_hint *hint)
 {
@@ -849,8 +867,8 @@ static int lmv_fid_alloc(struct obd_export *exp, struct lu_fid *fid,
         int rc;
         ENTRY;
 
-        LASSERT(fid != NULL);
         LASSERT(hint != NULL);
+        LASSERT(fid != NULL);
 
         rc = lmv_placement_policy(obd, hint, &mds);
         if (rc) {
@@ -859,19 +877,11 @@ static int lmv_fid_alloc(struct obd_export *exp, struct lu_fid *fid,
                 RETURN(rc);
         }
 
-        /* Asking underlaying tgt layer to allocate new fid. */
-        rc = obd_fid_alloc(lmv->tgts[mds].ltd_exp, fid, hint);
-
-        /* Client switches to new sequence, setup fld. */
-        if (rc > 0) {
-                LASSERT(fid_is_sane(fid));
-
-                rc = fld_client_create(&lmv->lmv_fld, fid_seq(fid),
-                                       mds, NULL);
-                if (rc) {
-                        CERROR("Can't create fld entry, rc %d\n", rc);
-                        RETURN(rc);
-                }
+        rc = __lmv_fid_alloc(lmv, fid, mds);
+        if (rc) {
+                CERROR("Can't alloc new fid, rc %d\n",
+                       rc);
+                RETURN(rc);
         }
 
         RETURN(rc);
@@ -915,8 +925,10 @@ static int lmv_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
         if (lmv->tgts == NULL)
                 RETURN(-ENOMEM);
 
-        for (i = 0; i < LMV_MAX_TGT_COUNT; i++)
+        for (i = 0; i < LMV_MAX_TGT_COUNT; i++) {
+                sema_init(&lmv->tgts[i].fid_sem, 1);
                 lmv->tgts[i].idx = i;
+        }
 
         lmv->datas_size = LMV_MAX_TGT_COUNT * sizeof(struct obd_connect_data);