Whamcloud - gitweb
refine locking for avoid write wrong info into lov_objid file and some races.
authorshadow <shadow>
Thu, 4 Oct 2007 12:59:44 +0000 (12:59 +0000)
committershadow <shadow>
Thu, 4 Oct 2007 12:59:44 +0000 (12:59 +0000)
b=12702
i=nathan
i=tappro

lustre/ChangeLog
lustre/include/obd.h
lustre/mds/handler.c
lustre/mds/mds_lov.c

index 2bce9ed..7faba05 100644 (file)
@@ -703,6 +703,19 @@ Details    : If a OST has no remain object, system will block on the creating
             empty osc while others are not empty.  If we must block, we block
             for the shortest possible period of time.
 
             empty osc while others are not empty.  If we must block, we block
             for the shortest possible period of time.
 
+Severity   : major
+Bugzilla   : 11710
+Description: improve handling recoverable errors
+Details    : if request processig with error which can be recoverable on server
+             request should be resend, otherwise page released from cache and
+             marked as error.
+
+Severity   : enhancement
+Bugzilla   : 12702
+Description: refine locking for avoid write wrong info into lov_objid file
+Details    : fix possible races with add new target and write/update data in 
+             lov_objid file.
+
 --------------------------------------------------------------------------------
 
 2007-05-03  Cluster File Systems, Inc. <info@clusterfs.com>
 --------------------------------------------------------------------------------
 
 2007-05-03  Cluster File Systems, Inc. <info@clusterfs.com>
index e86e153..5d1fe3b 100644 (file)
@@ -523,12 +523,19 @@ struct mds_obd {
         struct obd_export               *mds_osc_exp; /* XXX lov_exp */
         struct lov_desc                  mds_lov_desc;
         __u32                            mds_id;
         struct obd_export               *mds_osc_exp; /* XXX lov_exp */
         struct lov_desc                  mds_lov_desc;
         __u32                            mds_id;
-        obd_id                          *mds_lov_objids;
-        int                              mds_lov_objids_size;
-        __u32                            mds_lov_objids_in_file;
+
         unsigned int                     mds_lov_objids_dirty:1;
         unsigned int                     mds_lov_objids_dirty:1;
-        int                              mds_lov_nextid_set;
         struct file                     *mds_lov_objid_filp;
         struct file                     *mds_lov_objid_filp;
+        /* protect update vs free in lov_add_target */
+        struct rw_semaphore              mds_lov_objids_sem;
+        /* protect update vs update or memmove vs update */
+        spinlock_t                       mds_lov_objids_lock;
+        /* wait for safe free */
+        cfs_waitq_t                      mds_lov_objids_wait;
+        obd_id                          *mds_lov_objids;
+        __u32                            mds_lov_objids_count;
+        int                              mds_lov_nextid_set;
+
         struct file                     *mds_health_check_filp;
         unsigned long                   *mds_client_bitmap;
 //        struct upcall_cache             *mds_group_hash;
         struct file                     *mds_health_check_filp;
         unsigned long                   *mds_client_bitmap;
 //        struct upcall_cache             *mds_group_hash;
@@ -552,6 +559,8 @@ struct mds_obd {
         struct lustre_capa_key          *mds_capa_keys;
 };
 
         struct lustre_capa_key          *mds_capa_keys;
 };
 
+#define mds_lov_objids_size(mds)        ((mds)->mds_lov_objids_count*sizeof(obd_id))
+
 struct echo_obd {
         struct obdo          eo_oa;
         spinlock_t           eo_lock;
 struct echo_obd {
         struct obdo          eo_oa;
         spinlock_t           eo_lock;
index a15c52b..8cbed31 100644 (file)
@@ -2247,7 +2247,7 @@ static int mds_cleanup(struct obd_device *obd)
 
         mds_update_server_data(obd, 1);
         if (mds->mds_lov_objids != NULL)
 
         mds_update_server_data(obd, 1);
         if (mds->mds_lov_objids != NULL)
-                OBD_FREE(mds->mds_lov_objids, mds->mds_lov_objids_size);
+                OBD_FREE(mds->mds_lov_objids, mds_lov_objids_size(mds));
         mds_fs_cleanup(obd);
 
 #if 0
         mds_fs_cleanup(obd);
 
 #if 0
@@ -2846,12 +2846,15 @@ static int mds_cmd_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
                 CERROR("cannot open/create %s file: rc = %d\n", LOV_OBJID, rc);
                 GOTO(err_fid, rc = PTR_ERR(file));
         }
                 CERROR("cannot open/create %s file: rc = %d\n", LOV_OBJID, rc);
                 GOTO(err_fid, rc = PTR_ERR(file));
         }
-        mds->mds_lov_objid_filp = file;
         if (!S_ISREG(file->f_dentry->d_inode->i_mode)) {
                 CERROR("%s is not a regular file!: mode = %o\n", LOV_OBJID,
                        file->f_dentry->d_inode->i_mode);
                 GOTO(err_lov_objid, rc = -ENOENT);
         }
         if (!S_ISREG(file->f_dentry->d_inode->i_mode)) {
                 CERROR("%s is not a regular file!: mode = %o\n", LOV_OBJID,
                        file->f_dentry->d_inode->i_mode);
                 GOTO(err_lov_objid, rc = -ENOENT);
         }
+        init_rwsem(&mds->mds_lov_objids_sem);
+        spin_lock_init(&mds->mds_lov_objids_lock);
+        cfs_waitq_init(&mds->mds_lov_objids_wait);
+        mds->mds_lov_objid_filp = file;
 
         rc = mds_lov_presetup(mds, lcfg);
         if (rc < 0)
 
         rc = mds_lov_presetup(mds, lcfg);
         if (rc < 0)
@@ -2911,7 +2914,7 @@ static int mds_cmd_cleanup(struct obd_device *obd)
         }
 
         if (mds->mds_lov_objids != NULL)
         }
 
         if (mds->mds_lov_objids != NULL)
-                OBD_FREE(mds->mds_lov_objids, mds->mds_lov_objids_size);
+                OBD_FREE(mds->mds_lov_objids, mds_lov_objids_size(mds));
 
         shrink_dcache_parent(mds->mds_fid_de);
         dput(mds->mds_fid_de);
 
         shrink_dcache_parent(mds->mds_fid_de);
         dput(mds->mds_fid_de);
index a0ef3ce..7688a2d 100644 (file)
@@ -45,15 +45,24 @@ void mds_lov_update_objids(struct obd_device *obd, obd_id *ids)
 {
         struct mds_obd *mds = &obd->u.mds;
         int i;
 {
         struct mds_obd *mds = &obd->u.mds;
         int i;
+        int dirty = 0;
         ENTRY;
 
         ENTRY;
 
-        lock_kernel();
-        for (i = 0; i < mds->mds_lov_desc.ld_tgt_count; i++)
+        down_read(&mds->mds_lov_objids_sem);
+
+        spin_lock(&mds->mds_lov_objids_lock);
+        for (i = 0; i < mds->mds_lov_desc.ld_tgt_count; i++) {
                 if (ids[i] > (mds->mds_lov_objids)[i]) {
                         (mds->mds_lov_objids)[i] = ids[i];
                 if (ids[i] > (mds->mds_lov_objids)[i]) {
                         (mds->mds_lov_objids)[i] = ids[i];
-                        mds->mds_lov_objids_dirty = 1;
+                        dirty = 1;
                 }
                 }
-        unlock_kernel();
+        }
+        mds->mds_lov_objids_dirty = dirty;
+
+        spin_unlock(&mds->mds_lov_objids_lock);
+
+        up_read(&mds->mds_lov_objids_sem);
+
         EXIT;
 }
 EXPORT_SYMBOL(mds_lov_update_objids);
         EXIT;
 }
 EXPORT_SYMBOL(mds_lov_update_objids);
@@ -66,7 +75,7 @@ static int mds_lov_read_objids(struct obd_device *obd)
         int i, rc, size;
         ENTRY;
 
         int i, rc, size;
         ENTRY;
 
-        LASSERT(!mds->mds_lov_objids_size);
+        LASSERT(!mds->mds_lov_objids_count);
         LASSERT(!mds->mds_lov_objids_dirty);
 
         /* Read everything in the file, even if our current lov desc
         LASSERT(!mds->mds_lov_objids_dirty);
 
         /* Read everything in the file, even if our current lov desc
@@ -74,41 +83,46 @@ static int mds_lov_read_objids(struct obd_device *obd)
            during mds setup may still have valid objids. */
         size = i_size_read(mds->mds_lov_objid_filp->f_dentry->d_inode);
         if (size == 0)
            during mds setup may still have valid objids. */
         size = i_size_read(mds->mds_lov_objid_filp->f_dentry->d_inode);
         if (size == 0)
-                RETURN(0);
+                GOTO(out, rc = 0);
 
         OBD_ALLOC(ids, size);
         if (ids == NULL)
 
         OBD_ALLOC(ids, size);
         if (ids == NULL)
-                RETURN(-ENOMEM);
-        mds->mds_lov_objids = ids;
-        mds->mds_lov_objids_size = size;
+                GOTO(out, rc = -ENOMEM);
 
         rc = fsfilt_read_record(obd, mds->mds_lov_objid_filp, ids, size, &off);
         if (rc < 0) {
 
         rc = fsfilt_read_record(obd, mds->mds_lov_objid_filp, ids, size, &off);
         if (rc < 0) {
+               OBD_FREE(ids, size);
                 CERROR("Error reading objids %d\n", rc);
                 CERROR("Error reading objids %d\n", rc);
-                RETURN(rc);
+                GOTO(out, rc);
         }
         }
+        mds->mds_lov_objids = ids;
+        mds->mds_lov_objids_count = size / sizeof(*ids);
 
 
-        mds->mds_lov_objids_in_file = size / sizeof(*ids);
-
-        for (i = 0; i < mds->mds_lov_objids_in_file; i++) {
+        for (i = 0; i < mds->mds_lov_objids_count; i++) {
                 CDEBUG(D_INFO, "read last object "LPU64" for idx %d\n",
                        mds->mds_lov_objids[i], i);
         }
                 CDEBUG(D_INFO, "read last object "LPU64" for idx %d\n",
                        mds->mds_lov_objids[i], i);
         }
-        RETURN(0);
-}
 
 
+out:
+        RETURN(rc);
+}
+/* must held mds_lov_objids_sem */
 int mds_lov_write_objids(struct obd_device *obd)
 {
         struct mds_obd *mds = &obd->u.mds;
         loff_t off = 0;
         int i, rc, tgts;
         ENTRY;
 int mds_lov_write_objids(struct obd_device *obd)
 {
         struct mds_obd *mds = &obd->u.mds;
         loff_t off = 0;
         int i, rc, tgts;
         ENTRY;
-
-        if (!mds->mds_lov_objids_dirty)
+        
+        spin_lock(&mds->mds_lov_objids_lock);
+        if (!mds->mds_lov_objids_dirty) {
+                spin_unlock(&mds->mds_lov_objids_lock);
                 RETURN(0);
                 RETURN(0);
+        }
+        mds->mds_lov_objids_dirty = 0;
+        spin_unlock(&mds->mds_lov_objids_lock);
 
 
-        tgts = max(mds->mds_lov_desc.ld_tgt_count, mds->mds_lov_objids_in_file);
-
+        tgts = mds->mds_lov_objids_count;
         if (!tgts)
                 RETURN(0);
 
         if (!tgts)
                 RETURN(0);
 
@@ -120,8 +134,10 @@ int mds_lov_write_objids(struct obd_device *obd)
                                  mds->mds_lov_objids, tgts * sizeof(obd_id),
                                  &off, 0);
         if (rc >= 0) {
                                  mds->mds_lov_objids, tgts * sizeof(obd_id),
                                  &off, 0);
         if (rc >= 0) {
-                mds->mds_lov_objids_dirty = 0;
                 rc = 0;
                 rc = 0;
+                cfs_waitq_signal(&mds->mds_lov_objids_wait);
+        } else {
+                mds->mds_lov_objids_dirty = 1;
         }
 
         RETURN(rc);
         }
 
         RETURN(rc);
@@ -162,10 +178,15 @@ int mds_lov_set_nextid(struct obd_device *obd)
         ENTRY;
 
         LASSERT(!obd->obd_recovering);
         ENTRY;
 
         LASSERT(!obd->obd_recovering);
+        if (mds->mds_lov_objids_count == 0)
+                RETURN(0);
 
         LASSERT(mds->mds_lov_objids != NULL);
 
 
         LASSERT(mds->mds_lov_objids != NULL);
 
-        rc = obd_set_info_async(mds->mds_osc_exp, strlen(KEY_NEXT_ID),
+       /* mds->mds_lov_objids_sem must be held so mds_lov_objids doesn't change 
+        * we need use mds->mds_lov_desc.ld_tgt_count because in recovery not all
+        * target can be connected at start time */
+        rc = obd_set_info_async(mds->mds_osc_exp, sizeof(KEY_NEXT_ID),
                                 KEY_NEXT_ID,
                                 mds->mds_lov_desc.ld_tgt_count *
                                 sizeof(*mds->mds_lov_objids),
                                 KEY_NEXT_ID,
                                 mds->mds_lov_desc.ld_tgt_count *
                                 sizeof(*mds->mds_lov_objids),
@@ -191,36 +212,39 @@ static int mds_lov_update_desc(struct obd_device *obd, struct obd_export *lov)
         if (!ld)
                 RETURN(-ENOMEM);
 
         if (!ld)
                 RETURN(-ENOMEM);
 
-        rc = obd_get_info(lov, strlen(KEY_LOVDESC) + 1, KEY_LOVDESC,
+        rc = obd_get_info(lov, sizeof(KEY_LOVDESC), KEY_LOVDESC,
                           &valsize, ld);
         if (rc)
                 GOTO(out, rc);
 
         /* The size of the LOV target table may have increased. */
                           &valsize, ld);
         if (rc)
                 GOTO(out, rc);
 
         /* The size of the LOV target table may have increased. */
-        size = ld->ld_tgt_count * sizeof(obd_id);
-        if ((mds->mds_lov_objids_size == 0) ||
-            (size > mds->mds_lov_objids_size)) {
+        if (ld->ld_tgt_count > mds->mds_lov_objids_count) {
                 obd_id *ids;
 
                 obd_id *ids;
 
-                /* add room by powers of 2 */
-                size = 1;
-                while (size < ld->ld_tgt_count)
-                        size = size << 1;
-                size = size * sizeof(obd_id);
+                size = ld->ld_tgt_count * sizeof(obd_id);
 
                 OBD_ALLOC(ids, size);
                 if (ids == NULL)
                         GOTO(out, rc = -ENOMEM);
                 memset(ids, 0, size);
 
                 OBD_ALLOC(ids, size);
                 if (ids == NULL)
                         GOTO(out, rc = -ENOMEM);
                 memset(ids, 0, size);
-                if (mds->mds_lov_objids_size) {
+
+                /* write lock is enough for protect from access to
+                 * old pointer in mds_lov_update_objids and dirty == 0
+                 * is enough for protect from access in write_objids */
+                if (mds->mds_lov_objids) {
+                        struct l_wait_info lwi = { 0 };
                         obd_id *old_ids = mds->mds_lov_objids;
                         obd_id *old_ids = mds->mds_lov_objids;
+
                         memcpy(ids, mds->mds_lov_objids,
                         memcpy(ids, mds->mds_lov_objids,
-                               mds->mds_lov_objids_size);
-                        mds->mds_lov_objids = ids;
-                        OBD_FREE(old_ids, mds->mds_lov_objids_size);
+                               mds_lov_objids_size(mds));
+
+                        l_wait_event(mds->mds_lov_objids_wait,
+                                     mds->mds_lov_objids_dirty == 0, &lwi);
+
+                        OBD_FREE(old_ids, mds_lov_objids_size(mds));
                 }
                 mds->mds_lov_objids = ids;
                 }
                 mds->mds_lov_objids = ids;
-                mds->mds_lov_objids_size = size;
+                mds->mds_lov_objids_count = ld->ld_tgt_count;
         }
 
         /* Don't change the mds_lov_desc until the objids size matches the
         }
 
         /* Don't change the mds_lov_desc until the objids size matches the
@@ -231,7 +255,7 @@ static int mds_lov_update_desc(struct obd_device *obd, struct obd_export *lov)
 
         stripes = min_t(__u32, LOV_MAX_STRIPE_COUNT,
                         max(mds->mds_lov_desc.ld_tgt_count,
 
         stripes = min_t(__u32, LOV_MAX_STRIPE_COUNT,
                         max(mds->mds_lov_desc.ld_tgt_count,
-                            mds->mds_lov_objids_in_file));
+                            mds->mds_lov_objids_count));
         mds->mds_max_mdsize = lov_mds_md_size(stripes);
         mds->mds_max_cookiesize = stripes * sizeof(struct llog_cookie);
         CDEBUG(D_CONFIG, "updated max_mdsize/max_cookiesize for %d stripes: "
         mds->mds_max_mdsize = lov_mds_md_size(stripes);
         mds->mds_max_cookiesize = stripes * sizeof(struct llog_cookie);
         CDEBUG(D_CONFIG, "updated max_mdsize/max_cookiesize for %d stripes: "
@@ -242,7 +266,7 @@ static int mds_lov_update_desc(struct obd_device *obd, struct obd_export *lov)
         /* We only _need_ to do this at first add (idx), or the first time
            after recovery.  However, it should now be safe to call anytime. */
         mutex_down(&obd->obd_dev_sem);
         /* We only _need_ to do this at first add (idx), or the first time
            after recovery.  However, it should now be safe to call anytime. */
         mutex_down(&obd->obd_dev_sem);
-        llog_cat_initialize(obd, NULL, mds->mds_lov_desc.ld_tgt_count, NULL);
+        rc = llog_cat_initialize(obd, NULL, mds->mds_lov_desc.ld_tgt_count, NULL);
         mutex_up(&obd->obd_dev_sem);
 
         /*XXX this notifies the MDD until lov handling use old mds code */
         mutex_up(&obd->obd_dev_sem);
 
         /*XXX this notifies the MDD until lov handling use old mds code */
@@ -267,57 +291,103 @@ static int mds_lov_update_mds(struct obd_device *obd,
         struct mds_obd *mds = &obd->u.mds;
         int old_count;
         int rc = 0;
         struct mds_obd *mds = &obd->u.mds;
         int old_count;
         int rc = 0;
+        obd_id lastid;
+        __u32 size = sizeof(lastid);
+
         ENTRY;
 
         old_count = mds->mds_lov_desc.ld_tgt_count;
         ENTRY;
 
         old_count = mds->mds_lov_desc.ld_tgt_count;
+        down_write(&mds->mds_lov_objids_sem);
         rc = mds_lov_update_desc(obd, mds->mds_osc_exp);
         rc = mds_lov_update_desc(obd, mds->mds_osc_exp);
+        downgrade_write(&mds->mds_lov_objids_sem);
         if (rc)
         if (rc)
-                RETURN(rc);
+                GOTO(out, rc);
 
         CDEBUG(D_CONFIG, "idx=%d, recov=%d/%d, cnt=%d/%d\n",
                idx, obd->obd_recovering, obd->obd_async_recov, old_count,
                mds->mds_lov_desc.ld_tgt_count);
 
         /* idx is set as data from lov_notify. */
 
         CDEBUG(D_CONFIG, "idx=%d, recov=%d/%d, cnt=%d/%d\n",
                idx, obd->obd_recovering, obd->obd_async_recov, old_count,
                mds->mds_lov_desc.ld_tgt_count);
 
         /* idx is set as data from lov_notify. */
-        if (idx != MDSLOV_NO_INDEX && !obd->obd_recovering) {
-                if (idx >= mds->mds_lov_desc.ld_tgt_count) {
-                        CERROR("index %d > count %d!\n", idx,
-                               mds->mds_lov_desc.ld_tgt_count);
-                        RETURN(-EINVAL);
-                }
+        if (idx == MDSLOV_NO_INDEX || obd->obd_recovering)
+                GOTO(out, rc);
 
 
-                if (idx >= mds->mds_lov_objids_in_file) {
-                        /* We never read this lastid; ask the osc */
-                        obd_id lastid;
-                        __u32 size = sizeof(lastid);
-                        rc = obd_get_info(watched->obd_self_export,
-                                          strlen("last_id"),
-                                          "last_id", &size, &lastid);
-                        if (rc)
-                                RETURN(rc);
-                        mds->mds_lov_objids[idx] = lastid;
-                        mds->mds_lov_objids_dirty = 1;
-                        mds_lov_write_objids(obd);
-                } else {
-                        /* We have read this lastid from disk; tell the osc.
-                           Don't call this during recovery. */
-                        rc = mds_lov_set_nextid(obd);
-                }
+        if (idx >= mds->mds_lov_desc.ld_tgt_count) {
+                CERROR("index %d > count %d!\n", idx,
+                       mds->mds_lov_desc.ld_tgt_count);
+                 GOTO(out, rc = -EINVAL);
+        }
+        rc = obd_get_info(watched->obd_self_export, sizeof("last_id"),
+                          "last_id", &size, &lastid);
+        if (rc)
+                GOTO(out, rc);
 
 
-                CDEBUG(D_CONFIG, "last object "LPU64" from OST %d\n",
-                      mds->mds_lov_objids[idx], idx);
+        spin_lock(&mds->mds_lov_objids_lock);
+        if (mds->mds_lov_objids[idx] == 0 || mds->mds_lov_objids[idx] > lastid) {
+                /* last id not init or corrupted - use data from osc */
+                mds->mds_lov_objids[idx] = lastid;
+                mds->mds_lov_objids_dirty = 1;
+                spin_unlock(&mds->mds_lov_objids_lock);
+                /* not need write immediately, mark for write for avoid
+                 * lock inverse */
+        } else {
+                spin_unlock(&mds->mds_lov_objids_lock);
+                /* We have read this lastid from disk; tell the osc.
+                   Don't call this during recovery. */
+                rc = mds_lov_set_nextid(obd);
         }
 
         }
 
+        CDEBUG(D_CONFIG, "last object "LPU64" from OST %d\n",
+               mds->mds_lov_objids[idx], idx);
+        
+out:
+        up_read(&mds->mds_lov_objids_sem);
+        RETURN(rc);
+}
+
+int mds_update_objids_from_lastid(struct obd_device *obd)
+{
+        struct mds_obd *mds = &obd->u.mds;
+        int size;
+        int rc, i;
+
+        if (mds->mds_lov_objids_count < mds->mds_lov_desc.ld_tgt_count) {
+                obd_id *ids;
+
+                size = mds->mds_lov_desc.ld_tgt_count * sizeof(obd_id);
+                OBD_ALLOC(ids, size);
+                if (ids == NULL)
+                        GOTO(out, rc = -ENOMEM);
+
+                OBD_FREE(mds->mds_lov_objids, mds_lov_objids_size(mds));
+                mds->mds_lov_objids = ids;
+                mds->mds_lov_objids_count = size / sizeof(obd_id);
+        }
+
+        size = mds->mds_lov_desc.ld_tgt_count * sizeof(obd_id);
+        rc = obd_get_info(mds->mds_osc_exp, sizeof("last_id"),
+                          "last_id", &size, mds->mds_lov_objids);
+        if (!rc) {
+                for (i = 0; i < mds->mds_lov_desc.ld_tgt_count; i++)
+                        CWARN("got last object "LPU64" from OST %d\n",
+                              mds->mds_lov_objids[i], i);
+                mds->mds_lov_objids_dirty = 1;
+                rc = mds_lov_write_objids(obd);
+                if (rc)
+                        CERROR("got last objids from OSTs, but error "
+                               "writing objids file: %d\n", rc);
+        }
+out:
         RETURN(rc);
 }
 
         RETURN(rc);
 }
 
+
 /* update the LOV-OSC knowledge of the last used object id's */
 int mds_lov_connect(struct obd_device *obd, char * lov_name)
 {
         struct mds_obd *mds = &obd->u.mds;
         struct lustre_handle conn = {0,};
         struct obd_connect_data *data;
 /* update the LOV-OSC knowledge of the last used object id's */
 int mds_lov_connect(struct obd_device *obd, char * lov_name)
 {
         struct mds_obd *mds = &obd->u.mds;
         struct lustre_handle conn = {0,};
         struct obd_connect_data *data;
-        int rc, i;
+        int rc;
         ENTRY;
 
         if (IS_ERR(mds->mds_osc_obd))
         ENTRY;
 
         if (IS_ERR(mds->mds_osc_obd))
@@ -364,6 +434,7 @@ int mds_lov_connect(struct obd_device *obd, char * lov_name)
         /* Deny new client connections until we are sure we have some OSTs */
         obd->obd_no_conn = 1;
 
         /* Deny new client connections until we are sure we have some OSTs */
         obd->obd_no_conn = 1;
 
+        down_write(&mds->mds_lov_objids_sem);
         rc = mds_lov_read_objids(obd);
         if (rc) {
                 CERROR("cannot read %s: rc = %d\n", "lov_objids", rc);
         rc = mds_lov_read_objids(obd);
         if (rc) {
                 CERROR("cannot read %s: rc = %d\n", "lov_objids", rc);
@@ -374,30 +445,12 @@ int mds_lov_connect(struct obd_device *obd, char * lov_name)
         if (rc)
                 GOTO(err_reg, rc);
 
         if (rc)
                 GOTO(err_reg, rc);
 
-        /* tgt_count may be 0! */
-        rc = llog_cat_initialize(obd, NULL, mds->mds_lov_desc.ld_tgt_count, NULL);
-        if (rc) {
-                CERROR("failed to initialize catalog %d\n", rc);
-                GOTO(err_reg, rc);
-        }
-
         /* If we're mounting this code for the first time on an existing FS,
          * we need to populate the objids array from the real OST values */
         /* If we're mounting this code for the first time on an existing FS,
          * we need to populate the objids array from the real OST values */
-        if (mds->mds_lov_desc.ld_tgt_count > mds->mds_lov_objids_in_file) {
-                int size = sizeof(obd_id) * mds->mds_lov_desc.ld_tgt_count;
-                rc = obd_get_info(mds->mds_osc_exp, strlen("last_id"),
-                                  "last_id", &size, mds->mds_lov_objids);
-                if (!rc) {
-                        for (i = 0; i < mds->mds_lov_desc.ld_tgt_count; i++)
-                                CWARN("got last object "LPU64" from OST %d\n",
-                                      mds->mds_lov_objids[i], i);
-                        mds->mds_lov_objids_dirty = 1;
-                        rc = mds_lov_write_objids(obd);
-                        if (rc)
-                                CERROR("got last objids from OSTs, but error "
-                                       "writing objids file: %d\n", rc);
-                }
+        if (mds->mds_lov_desc.ld_tgt_count > mds->mds_lov_objids_count) {
+                mds_update_objids_from_lastid(obd);
         }
         }
+        up_write(&mds->mds_lov_objids_sem);
 
         /* I want to see a callback happen when the OBD moves to a
          * "For General Use" state, and that's when we'll call
 
         /* I want to see a callback happen when the OBD moves to a
          * "For General Use" state, and that's when we'll call
@@ -411,6 +464,7 @@ int mds_lov_connect(struct obd_device *obd, char * lov_name)
         RETURN(rc);
 
 err_reg:
         RETURN(rc);
 
 err_reg:
+        up_write(&mds->mds_lov_objids_sem);
         obd_register_observer(mds->mds_osc_obd, NULL);
 err_discon:
         obd_disconnect(mds->mds_osc_exp);
         obd_register_observer(mds->mds_osc_obd, NULL);
 err_discon:
         obd_disconnect(mds->mds_osc_exp);
@@ -822,6 +876,7 @@ int mds_lov_start_synchronize(struct obd_device *obd,
 int mds_notify(struct obd_device *obd, struct obd_device *watched,
                enum obd_notify_event ev, void *data)
 {
 int mds_notify(struct obd_device *obd, struct obd_device *watched,
                enum obd_notify_event ev, void *data)
 {
+        struct mds_obd *mds = &obd->u.mds;
         int rc = 0;
         ENTRY;
 
         int rc = 0;
         ENTRY;
 
@@ -852,7 +907,9 @@ int mds_notify(struct obd_device *obd, struct obd_device *watched,
                 /* We still have to fix the lov descriptor for ost's added
                    after the mdt in the config log.  They didn't make it into
                    mds_lov_connect. */
                 /* We still have to fix the lov descriptor for ost's added
                    after the mdt in the config log.  They didn't make it into
                    mds_lov_connect. */
-                rc = mds_lov_update_desc(obd, obd->u.mds.mds_osc_exp);
+                down_write(&mds->mds_lov_objids_sem);
+                rc = mds_lov_update_desc(obd, mds->mds_osc_exp);
+                up_write(&mds->mds_lov_objids_sem);
                 if (rc)
                         RETURN(rc);
                 /* We should update init llog here too for replay unlink and 
                 if (rc)
                         RETURN(rc);
                 /* We should update init llog here too for replay unlink and