Whamcloud - gitweb
not send LOV EA under replay, we can't know about they size at this
authorshadow <shadow>
Thu, 20 Nov 2008 05:07:25 +0000 (05:07 +0000)
committershadow <shadow>
Thu, 20 Nov 2008 05:07:25 +0000 (05:07 +0000)
time. Don't allow client connect to mds before any ost connected,
for avoid problems with LOV EA size and returning EIO to client.

Branch HEAD
b=16080
i=umka
i=tappro

14 files changed:
lustre/ChangeLog
lustre/include/lustre_mds.h
lustre/include/md_object.h
lustre/include/obd.h
lustre/lov/lov_obd.c
lustre/mdd/mdd_dir.c
lustre/mdd/mdd_internal.h
lustre/mdd/mdd_lov.c
lustre/mds/handler.c
lustre/mds/mds_lov.c
lustre/mdt/mdt_handler.c
lustre/mdt/mdt_lib.c
lustre/mdt/mdt_open.c
lustre/tests/conf-sanity.sh

index 1a55ea8..cd15c0d 100644 (file)
@@ -14,6 +14,13 @@ tbd  Sun Microsystems, Inc.
        * File join has been disabled in this release, refer to Bugzilla 16929.
  
 
+Severity   : normal
+Bugzilla   : 16080
+Description: more cleanup in mds_lov
+Details    : not send LOV EA under replay, we can't know about they size at this
+             time. Don't allow client connect to mds before any ost connected,
+             for avoid problems with LOV EA size and returning EIO to client.
+
 Severity   : enhancement
 Bugzilla   : 11826
 Description: Interoperability at server side (Disk interoperability)
@@ -149,6 +156,7 @@ Details    : When connection is reused this not moved from CONN_UNUSED_HASH
             into CONN_USED_HASH and this prodice warning when put connection
             again in unused hash.
 
+
 Severity   : enhancement
 Bugzilla   : 15899
 Description: File striping can now be set to use an arbitrary pool of OSTs.
index 4b81027..fb63c75 100644 (file)
@@ -65,8 +65,9 @@ struct mds_group_info {
         int group;
 };
 
-/* mds/mds_reint.c */
+/* mds/mds_lov.c */
 int mds_lov_write_objids(struct obd_device *obd);
+int mds_lov_prepare_objids(struct obd_device *obd, struct lov_mds_md *lmm);
 void mds_lov_update_objids(struct obd_device *obd, struct lov_mds_md *lmm);
 
 
index 5a20550..9202385 100644 (file)
@@ -166,11 +166,12 @@ struct md_op_spec {
                 struct md_spec_reg {
                         /** lov objs exist already */
                         const struct lu_fid   *fid;
-                        int no_lov_create;
                         const void *eadata;
                         int  eadatalen;
                 } sp_ea;
         } u;
+        /** don't create lov objects or llog cookie - this replay */
+        int no_create;
 
         /** Create flag from client: such as MDS_OPEN_CREAT, and others. */
         __u32      sp_cr_flags;
index 2d499b6..d3fc0e8 100644 (file)
@@ -526,6 +526,7 @@ struct mds_obd {
         /* file for store objid */
         struct file                     *mds_lov_objid_filp;
         __u32                            mds_lov_objid_count;
+        __u32                            mds_lov_objid_max_index;
         __u32                            mds_lov_objid_lastpage;
         __u32                            mds_lov_objid_lastidx;
 
@@ -541,7 +542,9 @@ struct mds_obd {
                                          mds_evict_ost_nids:1,
                                          mds_fl_cfglog:1,
                                          mds_fl_synced:1,
-                                         mds_quota:1;
+                                         mds_quota:1,
+                                         mds_fl_target:1; /* mds have one or
+                                                           * more targets */
 
         struct upcall_cache             *mds_identity_cache;
 
@@ -868,8 +871,9 @@ enum obd_notify_event {
 
 /* bit-mask flags for config events */
 enum config_flags {
-        CONFIG_LOG = 0x1,  /* finished processing config log */
-        CONFIG_SYNC = 0x2  /* mdt synced 1 ost */
+        CONFIG_LOG      = 0x1,  /* finished processing config log */
+        CONFIG_SYNC     = 0x2,  /* mdt synced 1 ost */
+        CONFIG_TARGET   = 0x4   /* one target is added */
 };
 
 /*
index 7e49a54..ca0f68c 100644 (file)
@@ -104,6 +104,12 @@ void lov_putref(struct obd_device *obd)
         mutex_up(&lov->lov_lock);
 }
 
+static int lov_set_osc_active(struct obd_device *obd, struct obd_uuid *uuid,
+                              int activate);
+static int lov_notify(struct obd_device *obd, struct obd_device *watched,
+                      enum obd_notify_event ev, void *data);
+
+
 #define MAX_STRING_SIZE 128
 int lov_connect_obd(struct obd_device *obd, __u32 index, int activate,
                     struct obd_connect_data *data)
@@ -263,6 +269,16 @@ static int lov_connect(const struct lu_env *env,
                                obd->obd_name, i, rc);
                         continue;
                 }
+                /* connect to administrative disabled ost */
+                if (!lov->lov_tgts[i]->ltd_exp)
+                        continue;
+
+                rc = lov_notify(obd, lov->lov_tgts[i]->ltd_exp->exp_obd,
+                                OBD_NOTIFY_ACTIVE, (void *)&i);
+                if (rc) {
+                        CERROR("%s error sending notify %d\n",
+                               obd->obd_name, rc);
+                }
         }
         lov_putref(obd);
 
@@ -368,7 +384,9 @@ out:
 /* Error codes:
  *
  *  -EINVAL  : UUID can't be found in the LOV's target list
- * - any other is lov index
+ *  -ENOTCONN: The UUID is found, but the target connection is bad (!)
+ *  -EBADF   : The UUID is found, but the OBD is the wrong type (!)
+ *  any >= 0 : is log target index
  */
 static int lov_set_osc_active(struct obd_device *obd, struct obd_uuid *uuid,
                               int activate)
@@ -428,7 +446,7 @@ static int lov_notify(struct obd_device *obd, struct obd_device *watched,
 {
         int rc = 0;
         ENTRY;
-
+       
         if (ev == OBD_NOTIFY_ACTIVE || ev == OBD_NOTIFY_INACTIVE) {
                 struct obd_uuid *uuid;
 
@@ -452,6 +470,7 @@ static int lov_notify(struct obd_device *obd, struct obd_device *watched,
                                obd_uuid2str(uuid), rc);
                         RETURN(rc);
                 }
+                /* active event should be pass lov target index as data */
                 data = &rc;
         }
 
@@ -474,6 +493,7 @@ static int lov_notify(struct obd_device *obd, struct obd_device *watched,
                                 data = &i;
 
                         tgt_obd = class_exp2obd(lov->lov_tgts[i]->ltd_exp);
+
                         rc = obd_notify_observer(obd, tgt_obd, ev, data);
                         if (rc) {
                                 CERROR("%s: notify %s of %s failed %d\n",
@@ -587,6 +607,10 @@ int lov_add_target(struct obd_device *obd, struct obd_uuid *uuidp,
         if (rc)
                 GOTO(out, rc);
 
+        /* connect to administrative disabled ost */
+        if (!tgt->ltd_exp)
+                GOTO(out, rc = 0);
+
         rc = lov_notify(obd, tgt->ltd_exp->exp_obd,
                         active ? OBD_NOTIFY_ACTIVE : OBD_NOTIFY_INACTIVE,
                         (void *)&index);
index 7fe28f3..fa56fcd 100644 (file)
@@ -1247,7 +1247,7 @@ static int mdd_create_data(const struct lu_env *env, struct md_object *pobj,
 
         /* Replay creates has objects already */
 #if 0
-        if (spec->u.sp_ea.no_lov_create) {
+        if (spec->no_create) {
                 CDEBUG(D_INFO, "we already have lov ea\n");
                 rc = mdd_lov_set_md(env, mdd_pobj, son,
                                     (struct lov_mds_md *)spec->u.sp_ea.eadata,
index fce9bda..aa2f6f6 100644 (file)
@@ -183,6 +183,7 @@ int mdd_lov_create(const struct lu_env *env, struct mdd_device *mdd,
                    struct mdd_object *parent, struct mdd_object *child,
                    struct lov_mds_md **lmm, int *lmm_size,
                    const struct md_op_spec *spec, struct lu_attr *la);
+int mdd_lov_objid_prepare(struct mdd_device *mdd, struct lov_mds_md *lmm);
 void mdd_lov_objid_update(struct mdd_device *mdd, struct lov_mds_md *lmm);
 void mdd_lov_create_finish(const struct lu_env *env, struct mdd_device *mdd,
                            struct lov_mds_md *lmm, int lmm_size,
index 1659902..6d62c7e 100644 (file)
@@ -206,7 +206,7 @@ int mdd_get_md(const struct lu_env *env, struct mdd_object *obj,
                 *md_size = 0;
                 rc = 0;
         } else if (rc < 0) {
-                CERROR("Error %d reading eadata \n", rc);
+                CERROR("Error %d reading eadata - %d\n", rc, *md_size);
         } else {
                 /* XXX: Convert lov EA but fixed after verification test. */
                 *md_size = rc;
@@ -356,49 +356,23 @@ static obd_id mdd_lov_create_id(const struct lu_fid *fid)
         return fid_flatten(fid);
 }
 
-static void mdd_lov_update_objids(struct obd_device *obd, struct lov_mds_md *lmm)
+int mdd_lov_objid_prepare(struct mdd_device *mdd, struct lov_mds_md *lmm)
 {
-        struct mds_obd *mds = &obd->u.mds;
-        int j;
-        struct lov_ost_data_v1 *lmm_objects;
-        ENTRY;
-
-        /* if we create file without objects - lmm is NULL */
-        if (lmm == NULL)
-                return;
-
-        if (le32_to_cpu(lmm->lmm_magic) == LOV_MAGIC_V3)
-                lmm_objects = ((struct lov_mds_md_v3 *)lmm)->lmm_objects;
-        else
-                lmm_objects = lmm->lmm_objects;
-
-        for (j = 0; j < le32_to_cpu(lmm->lmm_stripe_count); j++) {
-                int i = le32_to_cpu(lmm_objects[j].l_ost_idx);
-                obd_id id = le64_to_cpu(lmm_objects[j].l_object_id);
-                int page = i / OBJID_PER_PAGE();
-                int idx = i % OBJID_PER_PAGE();
-                obd_id *data = mds->mds_lov_page_array[page];
-
-                CDEBUG(D_INODE,"update last object for ost %d - new %llu"
-                               " old %llu\n", i, id, data[idx]);
-                if (id > data[idx]) {
-                        data[idx] = id;
-                        cfs_bitmap_set(mds->mds_lov_page_dirty, page);
-                }
-        }
-        EXIT;
+        /* copy mds_lov code is using wrong layer */
+        return mds_lov_prepare_objids(mdd->mdd_obd_dev, lmm);
 }
 
 void mdd_lov_objid_update(struct mdd_device *mdd, struct lov_mds_md *lmm)
 {
-        mdd_lov_update_objids(mdd->mdd_obd_dev, lmm);
+        /* copy mds_lov code is using wrong layer */
+        mds_lov_update_objids(mdd->mdd_obd_dev, lmm);
 }
 
 void mdd_lov_create_finish(const struct lu_env *env, struct mdd_device *mdd,
                            struct lov_mds_md *lmm, int lmm_size,
                            const struct md_op_spec *spec)
 {
-        if (lmm && !spec->u.sp_ea.no_lov_create)
+        if (lmm && !spec->no_create)
                 OBD_FREE(lmm, lmm_size);
 }
 
@@ -424,7 +398,7 @@ int mdd_lov_create(const struct lu_env *env, struct mdd_device *mdd,
         oti_init(oti, NULL);
 
         /* replay case, has objects already, only get lov from eadata */
-        if (spec->u.sp_ea.no_lov_create != 0) {
+        if (spec->no_create != 0) {
                 *lmm = (struct lov_mds_md *)spec->u.sp_ea.eadata;
                 *lmm_size = spec->u.sp_ea.eadatalen;
                 RETURN(0);
@@ -546,6 +520,12 @@ int mdd_lov_create(const struct lu_env *env, struct mdd_device *mdd,
                 CERROR("Cannot pack lsm, err = %d\n", rc);
                 GOTO(out_oti, rc);
         }
+        if (mdd_lov_objid_prepare(mdd, *lmm) != 0) {
+                CERROR("Not have memory for update objid\n");
+                OBD_FREE(*lmm, rc);
+                *lmm = NULL;
+                GOTO(out_oti, rc = -ENOMEM);
+        }
         *lmm_size = rc;
         rc = 0;
         EXIT;
index a3e34df..6e5c40a 100644 (file)
@@ -404,9 +404,6 @@ static int mds_cmd_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
         if (rc)
                 GOTO(err_objects, rc);
 
-        mds->mds_max_mdsize = sizeof(struct lov_mds_md_v3);
-        mds->mds_max_cookiesize = sizeof(struct llog_cookie);
-
 err_pop:
         pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
         RETURN(rc);
index b623979..0b4120b 100644 (file)
@@ -159,6 +159,174 @@ void mds_lov_destroy_objids(struct obd_device *obd)
         EXIT;
 }
 
+/**
+ * currently exist two ways for know about ost count and max ost index.
+ * first - after ost is connected to mds and sync process finished
+ * second - get from lmm in recovery process, in case when mds not have configs,
+ * and ost isn't registered in mgs.
+ *
+ * \param mds pointer to mds structure
+ * \param index maxium ost index
+ *
+ * \retval -ENOMEM is not hame memory for new page
+ * \retval 0 is update passed
+ */
+static int mds_lov_update_max_ost(struct mds_obd *mds, obd_id index)
+{
+        __u32 page = index / OBJID_PER_PAGE();
+        __u32 off = index % OBJID_PER_PAGE();
+        obd_id *data =  mds->mds_lov_page_array[page];
+
+        if (data == NULL) {
+                OBD_ALLOC(data, MDS_LOV_ALLOC_SIZE);
+                if (data == NULL)
+                        RETURN(-ENOMEM);
+
+                mds->mds_lov_page_array[page] = data;
+        }
+
+        if (index > mds->mds_lov_objid_max_index) {
+                mds->mds_lov_objid_lastpage = page;
+                mds->mds_lov_objid_lastidx = off;
+                mds->mds_lov_objid_max_index = index;
+        }
+
+        /* workaround - New target not in objids file; increase mdsize */
+        /* ld_tgt_count is used as the max index everywhere, despite its name. */
+        if (data[off] == 0) {
+                __u32 stripes;
+
+                data[off] = 1;
+                mds->mds_lov_objid_count++;
+                stripes = min_t(__u32, LOV_MAX_STRIPE_COUNT,
+                                mds->mds_lov_objid_count);
+
+                mds->mds_max_mdsize = lov_mds_md_size(stripes, LOV_MAGIC_V3);
+                mds->mds_max_cookiesize = stripes * sizeof(struct llog_cookie);
+
+                CDEBUG(D_CONFIG, "updated max_mdsize/max_cookiesize for %d"
+                       " stripes: %d/%d\n", stripes, mds->mds_max_mdsize,
+                       mds->mds_max_cookiesize);
+        }
+
+        EXIT;
+        return 0;
+}
+
+int mds_lov_prepare_objids(struct obd_device *obd, struct lov_mds_md *lmm)
+{
+        struct lov_ost_data_v1 *data;
+        __u32 count;
+        int rc = 0;
+        __u32 j;
+
+        /* if we create file without objects - lmm is NULL */
+        if (lmm == NULL)
+                return 0;
+
+        switch (le32_to_cpu(lmm->lmm_magic)) {
+                case LOV_MAGIC_V1:
+                        count = le32_to_cpu(((struct lov_mds_md_v1*)lmm)->lmm_stripe_count);
+                        data = &(((struct lov_mds_md_v1*)lmm)->lmm_objects[0]);
+                        break;
+                case LOV_MAGIC_V3:
+                        count = le32_to_cpu(((struct lov_mds_md_v3*)lmm)->lmm_stripe_count);
+                        data = &(((struct lov_mds_md_v3*)lmm)->lmm_objects[0]);
+                        break;
+                default:
+                        CERROR("Unknow lmm type %X!\n", le32_to_cpu(lmm->lmm_magic));
+                        RETURN(-EINVAL);
+        }
+
+
+        mutex_down(&obd->obd_dev_sem);
+        for (j = 0; j < count; j++) {
+                __u32 i = le32_to_cpu(data[j].l_ost_idx);
+                if (mds_lov_update_max_ost(&obd->u.mds, i)) {
+                        rc = -ENOMEM;
+                        break;
+                }
+        }
+        mutex_up(&obd->obd_dev_sem);
+
+        RETURN(rc);
+}
+EXPORT_SYMBOL(mds_lov_prepare_objids);
+
+void mds_lov_update_objids(struct obd_device *obd, struct lov_mds_md *lmm)
+{
+        struct mds_obd *mds = &obd->u.mds;
+        int j;
+        struct lov_ost_data_v1 *obj;
+        int count;
+        ENTRY;
+
+        /* if we create file without objects - lmm is NULL */
+        if (lmm == NULL)
+                return;
+
+        switch (le32_to_cpu(lmm->lmm_magic)) {
+                case LOV_MAGIC_V1:
+                        count = le32_to_cpu(((struct lov_mds_md_v1*)lmm)->lmm_stripe_count);
+                        obj = &(((struct lov_mds_md_v1*)lmm)->lmm_objects[0]);
+                        break;
+                case LOV_MAGIC_V3:
+                        count = le32_to_cpu(((struct lov_mds_md_v3*)lmm)->lmm_stripe_count);
+                        obj = &(((struct lov_mds_md_v3*)lmm)->lmm_objects[0]);
+                        break;
+                default:
+                        CERROR("Unknow lmm type %X !\n", le32_to_cpu(lmm->lmm_magic));
+                        return;
+        }
+
+        for (j = 0; j < count; j++) {
+                __u32 i = le32_to_cpu(obj[j].l_ost_idx);
+                obd_id id = le64_to_cpu(obj[j].l_object_id);
+                __u32 page = i / OBJID_PER_PAGE();
+                __u32 idx = i % OBJID_PER_PAGE();
+                obd_id *data;
+
+                data = mds->mds_lov_page_array[page];
+
+                CDEBUG(D_INODE,"update last object for ost %u"
+                       " - new "LPU64" old "LPU64"\n", i, id, data[idx]);
+                if (id > data[idx]) {
+                        data[idx] = id;
+                        cfs_bitmap_set(mds->mds_lov_page_dirty, page);
+                }
+        }
+        EXIT;
+        return;
+}
+EXPORT_SYMBOL(mds_lov_update_objids);
+
+
+static int mds_lov_update_from_read(struct mds_obd *mds, obd_id *data,
+                                    __u32 count)
+{
+        __u32 i;
+        __u32 stripes;
+
+        for(i = 0; i < count; i++) {
+                if (data[i] == 0)
+                        continue;
+
+                mds->mds_lov_objid_count++;
+        }
+
+        stripes = min_t(__u32, LOV_MAX_STRIPE_COUNT,
+                         mds->mds_lov_objid_count);
+
+        mds->mds_max_mdsize = lov_mds_md_size(stripes, LOV_MAGIC_V3);
+        mds->mds_max_cookiesize = stripes * sizeof(struct llog_cookie);
+
+        CDEBUG(D_CONFIG, "updated max_mdsize/max_cookiesize for %d stripes: "
+               "%d/%d\n", stripes, mds->mds_max_mdsize, mds->mds_max_cookiesize);
+
+        EXIT;
+        return 0;
+}
+
 static int mds_lov_read_objids(struct obd_device *obd)
 {
         struct mds_obd *mds = &obd->u.mds;
@@ -178,34 +346,33 @@ static int mds_lov_read_objids(struct obd_device *obd)
         CDEBUG(D_INFO, "file size %lu pages %d\n", size, page);
 
         for (i = 0; i < page; i++) {
-                obd_id *data =  mds->mds_lov_page_array[i];
                 loff_t off_old = off;
 
-                LASSERT(data == NULL);
-                OBD_ALLOC(data, MDS_LOV_ALLOC_SIZE);
-                if (data == NULL)
+                LASSERT(mds->mds_lov_page_array[i] == NULL);
+                OBD_ALLOC(mds->mds_lov_page_array[i], MDS_LOV_ALLOC_SIZE);
+                if (mds->mds_lov_page_array[i] == NULL)
                         GOTO(out, rc = -ENOMEM);
 
-                mds->mds_lov_page_array[i] = data;
-
-                rc = fsfilt_read_record(obd, mds->mds_lov_objid_filp, data,
+                rc = fsfilt_read_record(obd, mds->mds_lov_objid_filp, mds->mds_lov_page_array[i],
                                         OBJID_PER_PAGE()*sizeof(obd_id), &off);
                 if (rc < 0) {
                         CERROR("Error reading objids %d\n", rc);
                         GOTO(out, rc);
                 }
-                if (off == off_old)
-                        break; // eof
 
                 count += (off - off_old)/sizeof(obd_id);
+                if (mds_lov_update_from_read(mds, mds->mds_lov_page_array[i], count)) {
+                         CERROR("Can't update mds data\n");
+                         GOTO(out, rc = -EIO);
+                 }
+
+                if (off == off_old)
+                        break; // eof
         }
-        mds->mds_lov_objid_count = count;
-       if (count) {
-                count --;
-                mds->mds_lov_objid_lastpage = count / OBJID_PER_PAGE();
-                mds->mds_lov_objid_lastidx = count % OBJID_PER_PAGE();
-       }
-        CDEBUG(D_INFO, "Read %u - %u %u objid\n", count,
+        mds->mds_lov_objid_lastpage = i;
+        mds->mds_lov_objid_lastidx = count % OBJID_PER_PAGE();
+
+        CDEBUG(D_INFO, "Read %u - %u %u objid\n", mds->mds_lov_objid_count,
                mds->mds_lov_objid_lastpage, mds->mds_lov_objid_lastidx);
 out:
         mds_lov_dump_objids("read",obd);
@@ -249,7 +416,7 @@ int mds_lov_write_objids(struct obd_device *obd)
 EXPORT_SYMBOL(mds_lov_write_objids);
 
 static int mds_lov_get_objid(struct obd_device * obd,
-                             __u32 idx)
+                             obd_id idx)
 {
         struct mds_obd *mds = &obd->u.mds;
         unsigned int page;
@@ -261,14 +428,6 @@ static int mds_lov_get_objid(struct obd_device * obd,
         page = idx / OBJID_PER_PAGE();
         off = idx % OBJID_PER_PAGE();
         data = mds->mds_lov_page_array[page];
-        if (data == NULL) {
-                OBD_ALLOC(data, MDS_LOV_ALLOC_SIZE);
-                if (data == NULL)
-                        GOTO(out, rc = -ENOMEM);
-
-                mds->mds_lov_page_array[page] = data;
-        }
-
         if (data[off] == 0) {
                 /* We never read this lastid; ask the osc */
                 struct obd_id_info lastid;
@@ -281,11 +440,6 @@ static int mds_lov_get_objid(struct obd_device * obd,
                 if (rc)
                         GOTO(out, rc);
 
-                if (idx > mds->mds_lov_objid_count) {
-                        mds->mds_lov_objid_count = idx;
-                        mds->mds_lov_objid_lastpage = page;
-                        mds->mds_lov_objid_lastidx = off;
-                }
                 cfs_bitmap_set(mds->mds_lov_page_dirty, page);
         }
 out:
@@ -326,9 +480,6 @@ static int mds_lov_set_one_nextid(struct obd_device *obd, __u32 idx, obd_id *id)
 
         LASSERT(!obd->obd_recovering);
 
-        /* obd->obd_dev_sem must be held so mds_lov_objids doesn't change */
-        LASSERT_SEM_LOCKED(&obd->obd_dev_sem);
-
         info.idx = idx;
         info.data = id;
         rc = obd_set_info_async(mds->mds_osc_exp, sizeof(KEY_NEXT_ID),
@@ -346,7 +497,7 @@ static int mds_lov_update_desc(struct obd_device *obd, int idx,
 {
         struct mds_obd *mds = &obd->u.mds;
         struct lov_desc *ld;
-        __u32 stripes, valsize = sizeof(mds->mds_lov_desc);
+        __u32 valsize = sizeof(mds->mds_lov_desc);
         int rc = 0;
         ENTRY;
 
@@ -365,14 +516,12 @@ static int mds_lov_update_desc(struct obd_device *obd, int idx,
         CDEBUG(D_CONFIG, "updated lov_desc, tgt_count: %d - idx %d / uuid %s\n",
                mds->mds_lov_desc.ld_tgt_count, idx, uuid->uuid);
 
-        stripes = min_t(__u32, LOV_MAX_STRIPE_COUNT,
-                               mds->mds_lov_desc.ld_tgt_count);
+        mutex_down(&obd->obd_dev_sem);
+        rc = mds_lov_update_max_ost(mds, idx);
+        mutex_up(&obd->obd_dev_sem);
+        if (rc != 0)
+                GOTO(out, rc );
 
-        mds->mds_max_mdsize = lov_mds_md_size(stripes, LOV_MAGIC_V3);
-        mds->mds_max_cookiesize = stripes * sizeof(struct llog_cookie);
-        CDEBUG(D_CONFIG, "updated max_mdsize/max_cookiesize for %d stripes: "
-               "%d/%d\n", mds->mds_max_mdsize, mds->mds_max_cookiesize,
-               stripes);
 
         /* If we added a target we have to reconnect the llogs */
         /* We only _need_ to do this at first add (idx), or the first time
@@ -384,7 +533,7 @@ static int mds_lov_update_desc(struct obd_device *obd, int idx,
         /*XXX this notifies the MDD until lov handling use old mds code */
         if (obd->obd_upcall.onu_owner) {
                  LASSERT(obd->obd_upcall.onu_upcall != NULL);
-                 rc = obd->obd_upcall.onu_upcall(NULL, NULL, 0,
+                 rc = obd->obd_upcall.onu_upcall(obd, NULL, OBD_NOTIFY_ACTIVE,
                                                  obd->obd_upcall.onu_owner);
         }
 out:
@@ -406,8 +555,6 @@ static int mds_lov_update_mds(struct obd_device *obd,
         ENTRY;
 
         /* Don't let anyone else mess with mds_lov_objids now */
-        mutex_down(&obd->obd_dev_sem);
-
         rc = mds_lov_update_desc(obd, idx, &watched->u.cli.cl_target_uuid);
         if (rc)
                 GOTO(out, rc);
@@ -446,7 +593,6 @@ static int mds_lov_update_mds(struct obd_device *obd,
                         data[off], idx, rc);
         }
 out:
-        mutex_up(&obd->obd_dev_sem);
         RETURN(rc);
 }
 
@@ -472,6 +618,21 @@ int mds_lov_connect(struct obd_device *obd, char * lov_name)
                 RETURN(-ENOTCONN);
         }
 
+        mutex_down(&obd->obd_dev_sem);
+        rc = mds_lov_read_objids(obd);
+        mutex_up(&obd->obd_dev_sem);
+        if (rc) {
+                CERROR("cannot read %s: rc = %d\n", "lov_objids", rc);
+                GOTO(err_exit, rc);
+        }
+
+        rc = obd_register_observer(mds->mds_osc_obd, obd);
+        if (rc) {
+                CERROR("MDS cannot register as observer of LOV %s (%d)\n",
+                       lov_name, rc);
+                GOTO(err_exit, rc);
+        }
+
         OBD_ALLOC(data, sizeof(*data));
         if (data == NULL)
                 RETURN(-ENOMEM);
@@ -494,24 +655,6 @@ int mds_lov_connect(struct obd_device *obd, char * lov_name)
         }
         mds->mds_osc_exp = class_conn2export(&conn);
 
-        rc = obd_register_observer(mds->mds_osc_obd, obd);
-        if (rc) {
-                CERROR("MDS cannot register as observer of LOV %s (%d)\n",
-                       lov_name, rc);
-                GOTO(err_discon, rc);
-        }
-
-        /* Deny new client connections until we are sure we have some OSTs */
-        obd->obd_no_conn = 1;
-
-        mutex_down(&obd->obd_dev_sem);
-        rc = mds_lov_read_objids(obd);
-        if (rc) {
-                CERROR("cannot read %s: rc = %d\n", "lov_objids", rc);
-                GOTO(err_reg, rc);
-        }
-        mutex_up(&obd->obd_dev_sem);
-
         /* I want to see a callback happen when the OBD moves to a
          * "For General Use" state, and that's when we'll call
          * set_nextid().  The class driver can help us here, because
@@ -523,11 +666,7 @@ int mds_lov_connect(struct obd_device *obd, char * lov_name)
          */
         RETURN(rc);
 
-err_reg:
-        mutex_up(&obd->obd_dev_sem);
-        obd_register_observer(mds->mds_osc_obd, NULL);
-err_discon:
-        obd_disconnect(mds->mds_osc_exp);
+err_exit:
         mds->mds_osc_exp = NULL;
         mds->mds_osc_obd = ERR_PTR(rc);
         RETURN(rc);
@@ -554,18 +693,6 @@ int mds_lov_disconnect(struct obd_device *obd)
         RETURN(rc);
 }
 
-/* Collect the preconditions we need to allow client connects */
-static void mds_allow_cli(struct obd_device *obd, unsigned int flag)
-{
-        if (flag & CONFIG_LOG)
-                obd->u.mds.mds_fl_cfglog = 1;
-        if (flag & CONFIG_SYNC)
-                obd->u.mds.mds_fl_synced = 1;
-        if (obd->u.mds.mds_fl_cfglog /* bz11778: && obd->u.mds.mds_fl_synced */)
-                /* Open for clients */
-                obd->obd_no_conn = 0;
-}
-
 struct mds_lov_sync_info {
         struct obd_device *mlsi_obd;     /* the lov device to sync */
         struct obd_device *mlsi_watched; /* target osc */
@@ -673,7 +800,7 @@ static int __mds_lov_synchronize(void *data)
                  * in mdd is removed, This hack should be removed.
                  */
                  LASSERT(obd->obd_upcall.onu_upcall != NULL);
-                 rc = obd->obd_upcall.onu_upcall(NULL, NULL, 0,
+                 rc = obd->obd_upcall.onu_upcall(obd, NULL, OBD_NOTIFY_ACTIVE,
                                                  obd->obd_upcall.onu_owner);
         }
         EXIT;
@@ -720,6 +847,7 @@ int mds_lov_start_synchronize(struct obd_device *obd,
         if (mlsi == NULL)
                 RETURN(-ENOMEM);
 
+        LASSERT(data);
         mlsi->mlsi_obd = obd;
         mlsi->mlsi_watched = watched;
         mlsi->mlsi_index = *(__u32 *)data;
@@ -761,19 +889,22 @@ int mds_notify(struct obd_device *obd, struct obd_device *watched,
         int rc = 0;
         ENTRY;
 
+        CDEBUG(D_CONFIG, "notify %s ev=%d\n", watched->obd_name, ev);
+
         switch (ev) {
         /* We only handle these: */
         case OBD_NOTIFY_ACTIVE:
+                /* lov want one or more _active_ targets for work */
+                /* activate event should be pass lov idx as argument */
         case OBD_NOTIFY_SYNC:
         case OBD_NOTIFY_SYNC_NONBLOCK:
+                /* sync event should be pass lov idx as argument */
                 break;
         case OBD_NOTIFY_CONFIG:
-                mds_allow_cli(obd, (unsigned long)data);
         default:
                 RETURN(0);
         }
 
-        CDEBUG(D_CONFIG, "notify %s ev=%d\n", watched->obd_name, ev);
         if (strcmp(watched->obd_type->typ_name, LUSTRE_OSC_NAME) != 0) {
                 CERROR("unexpected notification of %s %s!\n",
                        watched->obd_type->typ_name, watched->obd_name);
@@ -787,12 +918,8 @@ int mds_notify(struct obd_device *obd, struct obd_device *watched,
                 /* We still have to fix the lov descriptor for ost's added
                    after the mdt in the config log.  They didn't make it into
                    mds_lov_connect. */
-                mutex_down(&obd->obd_dev_sem);
                 rc = mds_lov_update_desc(obd, *(__u32 *)data,
                                         &watched->u.cli.cl_target_uuid);
-                mutex_up(&obd->obd_dev_sem);
-                if (rc == 0)
-                        mds_allow_cli(obd, CONFIG_SYNC);
                 RETURN(rc);
         }
 
index 5547e62..451ea3e 100644 (file)
@@ -1480,6 +1480,16 @@ static int mdt_reint_internal(struct mdt_thread_info *info,
                 GOTO(out_shrink, rc = err_serious(rc));
         }
 
+        /* for replay no cookkie / lmm need, because client have this already */
+        if (info->mti_spec.no_create == 1)  {
+                if (req_capsule_has_field(pill, &RMF_MDT_MD, RCL_SERVER))
+                        req_capsule_set_size(pill, &RMF_MDT_MD, RCL_SERVER, 0);
+
+                if (req_capsule_has_field(pill, &RMF_LOGCOOKIES, RCL_SERVER))
+                        req_capsule_set_size(pill, &RMF_LOGCOOKIES, RCL_SERVER,
+                                             0);
+        }
+
         rc = mdt_init_ucred_reint(info);
         if (rc)
                 GOTO(out_shrink, rc);
@@ -4955,10 +4965,12 @@ static void mdt_allow_cli(struct mdt_device *m, unsigned int flag)
 {
         if (flag & CONFIG_LOG)
                 m->mdt_fl_cfglog = 1;
+
+        /* also notify active event */
         if (flag & CONFIG_SYNC)
                 m->mdt_fl_synced = 1;
 
-        if (m->mdt_fl_cfglog /* bz11778: && m->mdt_fl_synced */)
+        if (m->mdt_fl_cfglog && m->mdt_fl_synced)
                 /* Open for clients */
                 m->mdt_md_dev.md_lu_dev.ld_obd->obd_no_conn = 0;
 }
index d3bbed9..fadd75d 100644 (file)
@@ -527,6 +527,12 @@ void mdt_shrink_reply(struct mdt_thread_info *info)
 
         acl_size = body->aclsize;
 
+        /* this replay - not send info to client */
+        if (info->mti_spec.no_create == 1) {
+                md_size = 0;
+                acl_size = 0;
+        }
+
         CDEBUG(D_INFO, "Shrink to md_size = %d cookie/acl_size = %d"
                         " MDSCAPA = "LPX64", OSSCAPA = "LPX64"\n",
                         md_size, acl_size,
@@ -1019,7 +1025,6 @@ static int mdt_unlink_unpack(struct mdt_thread_info *info)
         } else {
                 rr->rr_name = NULL;
                 rr->rr_namelen = 0;
-                
         }
         info->mti_spec.sp_ck_split = !!(rec->ul_bias & MDS_CHECK_SPLIT);
         if (rec->ul_bias & MDS_VTX_BYPASS)
@@ -1027,6 +1032,9 @@ static int mdt_unlink_unpack(struct mdt_thread_info *info)
         else
                 ma->ma_attr_flags &= ~MDS_VTX_BYPASS;
 
+        if (lustre_msg_get_flags(mdt_info_req(info)->rq_reqmsg) & MSG_REPLAY)
+                info->mti_spec.no_create = 1;
+
         rc = mdt_dlmreq_unpack(info);
         RETURN(rc);
 }
@@ -1158,7 +1166,7 @@ static int mdt_open_unpack(struct mdt_thread_info *info)
         if (sp->u.sp_ea.eadatalen) {
                 sp->u.sp_ea.eadata = req_capsule_client_get(pill, &RMF_EADATA);
                 if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_REPLAY)
-                        sp->u.sp_ea.no_lov_create = 1;
+                        sp->no_create = 1;
         }
 
         RETURN(0);
index 48c6af1..e2e8802 100644 (file)
@@ -683,7 +683,10 @@ void mdt_reconstruct_open(struct mdt_thread_info *info,
         ma->ma_lmm = req_capsule_server_get(pill, &RMF_MDT_MD);
         ma->ma_lmm_size = req_capsule_get_size(pill, &RMF_MDT_MD,
                                                RCL_SERVER);
-        ma->ma_need = MA_INODE | MA_LOV;
+        ma->ma_need = MA_INODE;
+        if (ma->ma_lmm_size > 0)
+                ma->ma_need |= MA_LOV;
+
         ma->ma_valid = 0;
 
         mdt_req_from_lcd(req, med->med_lcd);
@@ -884,7 +887,10 @@ int mdt_reint_open(struct mdt_thread_info *info, struct mdt_lock_handle *lhc)
         ma->ma_lmm = req_capsule_server_get(info->mti_pill, &RMF_MDT_MD);
         ma->ma_lmm_size = req_capsule_get_size(info->mti_pill, &RMF_MDT_MD,
                                                RCL_SERVER);
-        ma->ma_need = MA_INODE | MA_LOV;
+        ma->ma_need = MA_INODE;
+        if (ma->ma_lmm_size > 0)
+                ma->ma_need |= MA_LOV;
+
         ma->ma_valid = 0;
 
         LASSERT(info->mti_pill->rc_fmt == &RQF_LDLM_INTENT_OPEN);
index 11b2768..1576482 100644 (file)
@@ -572,10 +572,6 @@ run_test 21c "start mds between two osts, stop mds last"
 
 test_22() {
        start_mds
-       echo Client mount before any osts are in the logs
-       mount_client $MOUNT
-       check_mount && return 41
-       pass
 
        echo Client mount with ost in logs, but none running
        start_ost