Whamcloud - gitweb
Do proper setup/cleanup of MDS exports and client data.
authoradilger <adilger>
Sat, 10 Aug 2002 10:05:34 +0000 (10:05 +0000)
committeradilger <adilger>
Sat, 10 Aug 2002 10:05:34 +0000 (10:05 +0000)
This also changes the behaviour of MDS connections so that the export
and client data is set up immediately at connect time, rather than at
"getinfo" time.  I am ASSUMING that at recovery time the client does
another connect to the MDS, or that some other mechanism is in place
so that it will get the correct export back (it looks like this is
correct, but I didn't follow the whole code path through for recovery).

I was torn on whether to zap the on-disk MDS client record in the case
where it does a proper disconnect.  In the end I decided against it,
because it was too difficult to pass a parameter to the mds_disconnect()
call telling whether we should zap or not.  We don't want to change the
disk if there is some error in restarting after a failure or if we are
forcibly shutting down the MDS, but only on a clean disconnect by the
client.

So far, the only potential harm that comes from not doing the zapping
of the client record is that we get an (empty) export for each client
that shut down cleanly (and was not overwritten) on the last MDS incarnation.
On the following MDS incarnation this client export will be dropped
because the incarnation number is too low (assuming it remains unused).

Another thing of note is that we pass "struct file *" back to the client
upon open, and dereference this at close time.  We need to move this
into the export struct and pass a cookie to the client (and validate
the cookie) instead of (or in addition to) passing the pointer directly.
This is needed for recovery of the client open state anyways...

lustre/mdc/mdc_reint.c
lustre/mds/handler.c
lustre/mds/mds_fs.c
lustre/mds/mds_reint.c

index 621fb35..81cb40f 100644 (file)
@@ -58,8 +58,7 @@ int mdc_setattr(struct lustre_handle *conn,
         ENTRY;
 
         mdc_con2cl(conn, &cl, &connection, &rconn);
-        req = ptlrpc_prep_req2(conn, 
-                               MDS_REINT, 1, &size, NULL);
+        req = ptlrpc_prep_req2(conn, MDS_REINT, 1, &size, NULL);
         if (!req)
                 RETURN(-ENOMEM);
 
index 5362336..70116ff 100644 (file)
@@ -93,7 +93,15 @@ static int mds_sendpage(struct ptlrpc_request *req, struct file *file,
         return rc;
 }
 
-/* 'dir' is a inode for which a lock has already been taken */
+/*
+ * Look up a named entry in a directory, and get an LDLM lock on it.
+ * 'dir' is a inode for which an LDLM lock has already been taken.
+ *
+ * If we do not need an exclusive or write lock on this entry (e.g.
+ * a read lock for attribute lookup only) then we do not hold the
+ * directory on return.  It is up to the caller to know what type
+ * of lock it is getting, and clean up appropriately.
+ */
 struct dentry *mds_name2locked_dentry(struct obd_device *obd,
                                       struct dentry *dir, struct vfsmount **mnt,
                                       char *name, int namelen, int lock_mode,
@@ -161,6 +169,7 @@ struct dentry *mds_fid2locked_dentry(struct obd_device *obd, struct ll_fid *fid,
         RETURN(retval);
 }
 
+/* Look up an entry by inode number. */
 struct dentry *mds_fid2dentry(struct mds_obd *mds, struct ll_fid *fid,
                               struct vfsmount **mnt)
 {
@@ -221,50 +230,85 @@ struct dentry *mds_fid2dentry(struct mds_obd *mds, struct ll_fid *fid,
         return result;
 }
 
+/* Establish a connection to the MDS.
+ *
+ * This will set up an export structure for the client to hold state data
+ * about that client, like open files, the last operation number it did
+ * on the server, etc.
+ */
 static int mds_connect(struct lustre_handle *conn, struct obd_device *obd,
                        char *cluuid)
 {
+        struct obd_export *exp;
+        struct mds_client_data *mcd;
         int rc;
-        struct list_head *p, *n;
 
         MOD_INC_USE_COUNT;
         if (cluuid) {
-                list_for_each_safe(p, n, &obd->obd_exports) {
-                        struct obd_export *exp;
-                        struct mds_client_data *mcd;
-                        
+                struct list_head *p;
+                list_for_each(p, &obd->obd_exports) {
                         exp = list_entry(p, struct obd_export, exp_chain);
                         mcd = exp->exp_mds_data.med_mcd;
                         if (mcd && !memcmp(cluuid, mcd->mcd_uuid,
                                            sizeof(mcd->mcd_uuid))) {
-                                CDEBUG(D_INFO, 
+                                CDEBUG(D_INFO,
                                        "existing export for UUID '%s' at %p\n",
                                        cluuid, exp);
-                                if (exp->exp_obd != obd) 
-                                        LBUG();
+                                LASSERT(exp->exp_obd == obd);
                                 exp->exp_rconnh.addr = conn->addr;
                                 exp->exp_rconnh.cookie = conn->cookie;
                                 conn->addr = (__u64) (unsigned long)exp;
                                 conn->cookie = exp->exp_cookie;
-                                CDEBUG(D_IOCTL, "connect: addr %Lx cookie %Lx\n",
+                                CDEBUG(D_IOCTL,"connect: addr %Lx cookie %Lx\n",
                                        (long long)conn->addr,
                                        (long long)conn->cookie);
-                                return 0;
+                                RETURN(0);
                         }
                 }
         }
+#warning shaver: we might need a real cluuid here
         rc = class_connect(conn, obd, NULL);
+        if (rc)
+                GOTO(out_dec, rc);
+        exp = class_conn2export(conn);
+        LASSERT(exp);
 
+        OBD_ALLOC(mcd, sizeof(*mcd));
+        if (!mcd) {
+                CERROR("mds: out of memory for client data\n");
+                GOTO(out_export, rc = -ENOMEM);
+        }
+        memcpy(mcd->mcd_uuid, cluuid, sizeof(mcd->mcd_uuid));
+        exp->exp_mds_data.med_mcd = mcd;
+        rc = mds_client_add(&exp->exp_mds_data, -1);
         if (rc)
-                MOD_DEC_USE_COUNT;
+                GOTO(out_mdc, rc);
 
-        return rc;
+        return 0;
+
+out_mdc:
+        OBD_FREE(mcd, sizeof(*mcd));
+out_export:
+        class_disconnect(conn);
+out_dec:
+        MOD_DEC_USE_COUNT;
+
+        RETURN(rc);
 }
 
 static int mds_disconnect(struct lustre_handle *conn)
 {
+        struct obd_export *exp;
         int rc;
 
+        exp = class_conn2export(conn);
+        if (!exp)
+                RETURN(-EINVAL);
+
+        rc = mds_client_free(&exp->exp_mds_data);
+        if (rc)
+                CERROR("error freeing client data: rc = %d\n", rc);
+
         rc = class_disconnect(conn);
         if (!rc)
                 MOD_DEC_USE_COUNT;
@@ -272,12 +316,10 @@ static int mds_disconnect(struct lustre_handle *conn)
         return rc;
 }
 
-/* FIXME: the error cases need fixing to avoid leaks */
 static int mds_getstatus(struct ptlrpc_request *req)
 {
         struct mds_obd *mds = mds_req2mds(req);
         struct mds_body *body;
-        struct mds_client_data *mcd;
         struct mds_export_data *med = &req->rq_export->exp_mds_data;
         int rc, size = sizeof(*body);
         ENTRY;
@@ -296,34 +338,11 @@ static int mds_getstatus(struct ptlrpc_request *req)
         body = lustre_msg_buf(req->rq_repmsg, 0);
         memcpy(&body->fid1, &mds->mds_rootfid, sizeof(body->fid1));
 
-        mcd = med->med_mcd;
-        if (!mcd) {
-                int rc;
-                
-                CDEBUG(D_INFO, "allocating new client data for UUID '%s'\n",
-                        ptlrpc_req_to_uuid(req));
-                OBD_ALLOC(mcd, sizeof(*mcd));
-                if (!mcd) {
-                        CERROR("mds: out of memory for client data\n");
-                        req->rq_status = -ENOMEM;
-                        RETURN(0);
-                }
-                memcpy(mcd->mcd_uuid, ptlrpc_req_to_uuid(req),
-                       sizeof(mcd->mcd_uuid));
-                rc = mds_client_add(mds, med, -1);
-                if (rc) {
-                        req->rq_status = rc;
-                        OBD_FREE(mcd, sizeof(*mcd));
-                        RETURN(0);
-                }
-                med->med_mcd = mcd;
-        } else {
-                CDEBUG(D_INFO, "found existing data for UUID '%s' at #%d\n",
-                       mcd->mcd_uuid, med->med_off);
-        }
+        LASSERT(med->med_mcd);
+
         /* mcd_last_xid is is stored in little endian on the disk and
            mds_pack_rep_body converts it to network order */
-        body->last_xid = le32_to_cpu(mcd->mcd_last_xid);
+        body->last_xid = le32_to_cpu(med->med_mcd->mcd_last_xid);
         mds_pack_rep_body(req);
         RETURN(0);
 }
@@ -332,7 +351,7 @@ static int mds_getlovinfo(struct ptlrpc_request *req)
 {
         struct mds_obd *mds = mds_req2mds(req);
         struct mds_status_req *streq;
-        struct lov_desc *desc; 
+        struct lov_desc *desc;
         int tgt_count;
         int rc, size[2] = {sizeof(*desc)};
         ENTRY;
@@ -401,8 +420,7 @@ static int mds_getattr_name(int offset, struct ptlrpc_request *req)
         __u64 res_id[3] = {0, 0, 0};
         ENTRY;
 
-        if (strcmp(req->rq_export->exp_obd->obd_type->typ_name, "mds") != 0)
-                LBUG();
+        LASSERT(!strcmp(req->rq_export->exp_obd->obd_type->typ_name, "mds"));
 
         if (req->rq_reqmsg->bufcount <= offset + 1) {
                 LBUG();
@@ -424,7 +442,7 @@ static int mds_getattr_name(int offset, struct ptlrpc_request *req)
         }
 
         dir = de->d_inode;
-        CDEBUG(D_INODE, "parent ino %ld\n", dir->i_ino);
+        CDEBUG(D_INODE, "parent ino %ld, name %*s\n", dir->i_ino,namelen,name);
 
         lock_mode = (req->rq_reqmsg->opc == MDS_REINT) ? LCK_CW : LCK_PW;
         res_id[0] = dir->i_ino;
@@ -694,6 +712,7 @@ static int mds_open(struct ptlrpc_request *req)
         list_add(&mfd->mfd_list, &med->med_open_head);
 
         body = lustre_msg_buf(req->rq_repmsg, 0);
+        /* FIXME: need to have cookies involved here */
         body->extra = (__u64) (unsigned long)file;
         RETURN(0);
 }
@@ -723,6 +742,7 @@ static int mds_close(struct ptlrpc_request *req)
                 RETURN(0);
         }
 
+        /* FIXME: need to have cookies involved here */
         file = (struct file *)(unsigned long)body->extra;
         if (!file->f_dentry)
                 LBUG();
@@ -872,7 +892,7 @@ int mds_handle(struct ptlrpc_request *req)
                 rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen,
                                      &req->rq_repmsg);
                 if (rc) {
-                        rc = req->rq_status = -ENOMEM;
+                        req->rq_status = rc;
                         break;
                 }
                 rc = mds_reint(0, req);
@@ -959,11 +979,12 @@ int mds_handle(struct ptlrpc_request *req)
  * also the last_rcvd value to disk.  If we don't have a clean shutdown,
  * then the server last_rcvd value may be less than that of the clients.
  * This will alert us that we may need to do client recovery.
+ *
+ * Assumes we are already in the server filesystem context.
  */
 static
 int mds_update_server_data(struct mds_obd *mds)
 {
-        struct obd_run_ctxt saved;
         struct mds_server_data *msd = mds->mds_server_data;
         struct file *filp = mds->mds_rcvd_filp;
         loff_t off = 0;
@@ -975,7 +996,6 @@ int mds_update_server_data(struct mds_obd *mds)
         CDEBUG(D_SUPER, "MDS mount_count is %Lu, last_rcvd is %Lu\n",
                (unsigned long long)mds->mds_mount_count,
                (unsigned long long)mds->mds_last_rcvd);
-        push_ctxt(&saved, &mds->mds_ctxt);
         rc = lustre_fwrite(filp, (char *)msd, sizeof(*msd), &off);
         if (rc != sizeof(*msd)) {
                 CERROR("error writing MDS server data: rc = %d\n", rc);
@@ -984,7 +1004,6 @@ int mds_update_server_data(struct mds_obd *mds)
                 RETURN(rc);
         }
         rc = fsync_dev(filp->f_dentry->d_inode->i_rdev);
-        pop_ctxt(&saved);
         if (rc)
                 CERROR("error flushing MDS server data: rc = %d\n", rc);
 
@@ -995,11 +1014,14 @@ int mds_update_server_data(struct mds_obd *mds)
 static int mds_recover(struct obd_device *obddev)
 {
         struct mds_obd *mds = &obddev->u.mds;
+        struct obd_run_ctxt saved;
         int rc;
 
         /* This happens at the end when recovery is complete */
         ++mds->mds_mount_count;
+        push_ctxt(&saved, &mds->mds_ctxt);
         rc = mds_update_server_data(mds);
+        pop_ctxt(&saved);
 
         return rc;
 }
@@ -1049,13 +1071,11 @@ static int mds_setup(struct obd_device *obddev, obd_count len, void *buf)
                 GOTO(err_fs, rc = -EINVAL);
         }
 
-        rc = -ENOENT;
         obddev->obd_namespace =
                 ldlm_namespace_new("mds_server", LDLM_NAMESPACE_SERVER);
         if (obddev->obd_namespace == NULL) {
-                LBUG();
                 mds_cleanup(obddev);
-                GOTO(err_svc, rc);
+                GOTO(err_svc, rc = -ENOMEM);
         }
 
         for (i = 0; i < MDS_NUM_THREADS; i++) {
@@ -1064,7 +1084,6 @@ static int mds_setup(struct obd_device *obddev, obd_count len, void *buf)
                 rc = ptlrpc_start_thread(obddev, mds->mds_service, name);
                 if (rc) {
                         CERROR("cannot start MDS thread #%d: rc %d\n", i, rc);
-                        LBUG();
                         GOTO(err_thread, rc);
                 }
         }
@@ -1097,18 +1116,9 @@ static int mds_cleanup(struct obd_device *obddev)
 {
         struct super_block *sb;
         struct mds_obd *mds = &obddev->u.mds;
-
+        struct obd_run_ctxt saved;
         ENTRY;
 
-        if (!list_empty(&obddev->obd_exports)) {
-                CERROR("still has exports; forcing cleanup\n");
-                class_disconnect_all(obddev);
-                if (!list_empty(&obddev->obd_exports)) {
-                        CERROR("still has exports after forced cleanup?\n");
-                        RETURN(-EBUSY);
-                }
-        }
-
         ptlrpc_stop_all_threads(mds->mds_service);
         ptlrpc_unregister_service(mds->mds_service);
 
@@ -1116,6 +1126,7 @@ static int mds_cleanup(struct obd_device *obddev)
         if (!mds->mds_sb)
                 RETURN(0);
 
+        push_ctxt(&saved, &mds->mds_ctxt);
         mds_update_server_data(mds);
 
         if (mds->mds_rcvd_filp) {
@@ -1125,6 +1136,7 @@ static int mds_cleanup(struct obd_device *obddev)
                 if (rc)
                         CERROR("last_rcvd file won't close, rc=%d\n", rc);
         }
+        pop_ctxt(&saved);
 
         unlock_kernel();
         mntput(mds->mds_vfsmnt);
index b161f6a..2524218 100644 (file)
@@ -42,11 +42,8 @@ static unsigned long last_rcvd_slots[MDS_MAX_CLIENT_WORDS];
  * Otherwise, we have just read the data from the last_rcvd file and
  * we know its offset.
  */
-int mds_client_add(struct mds_obd *mds, struct mds_export_data *med, int cl_off)
+int mds_client_add(struct mds_export_data *med, int cl_off)
 {
-        CDEBUG(D_INFO, "client at offset %d with UUID '%s' added\n",
-               cl_off, med->med_mcd->mcd_uuid);
-
         if (cl_off == -1) {
                 unsigned long *word;
                 int bit;
@@ -56,13 +53,14 @@ int mds_client_add(struct mds_obd *mds, struct mds_export_data *med, int cl_off)
                 while(*word == ~0UL)
                         ++word;
                 if (word - last_rcvd_slots >= MDS_MAX_CLIENT_WORDS) {
-                        CERROR("no room in client MDS bitmap - fix code\n");
+                        CERROR("no room for clients - fix MDS_MAX_CLIENTS\n");
                         return -ENOMEM;
                 }
                 bit = ffz(*word);
                 if (test_and_set_bit(bit, word)) {
-                        CERROR("found bit %d set for word %d - fix code\n",
+                        CERROR("found bit %d set for word %d - fix locking\n",
                                bit, word - last_rcvd_slots);
+                        LBUG();
                         goto repeat;
                 }
                 cl_off = word - last_rcvd_slots + bit;
@@ -74,45 +72,32 @@ int mds_client_add(struct mds_obd *mds, struct mds_export_data *med, int cl_off)
                 }
         }
 
-        med->med_off = cl_off;
-        mds->mds_client_count++;
+        CDEBUG(D_INFO, "client at offset %d with UUID '%s' added\n",
+               cl_off, med->med_mcd->mcd_uuid);
 
+        med->med_off = cl_off;
         return 0;
 }
 
-static int mds_client_free_all(struct obd_device *obddev)
+int mds_client_free(struct mds_export_data *med)
 {
-        struct mds_obd *mds = &obddev->u.mds;
-        struct list_head *p, *n;
+        unsigned long *word;
+        int bit;
 
-        list_for_each_safe(p, n, &obddev->obd_exports) {
-                struct obd_export *exp;
-                struct mds_export_data *med;
-                unsigned long *word;
-                int bit;
-
-                exp = list_entry(p, struct obd_export, exp_chain);
-                med = &exp->exp_mds_data;
-                
-                word = last_rcvd_slots + med->med_off / sizeof(unsigned long);
-                bit = med->med_off % sizeof(unsigned long);
+        CDEBUG(D_INFO, "freeing client at offset %d with UUID '%s'\n",
+               med->med_off, med->med_mcd->mcd_uuid);
 
-                if (!test_and_clear_bit(bit, word)) {
-                        CERROR("bit %d already clear in word %d - bad bad\n",
-                               bit, word - last_rcvd_slots);
-                        LBUG();
-                }
+        word = last_rcvd_slots + med->med_off / sizeof(unsigned long);
+        bit = med->med_off % sizeof(unsigned long);
 
-                OBD_FREE(med->med_mcd, sizeof(*med->med_mcd));
-                --mds->mds_client_count;
+        if (!test_and_clear_bit(bit, word)) {
+                CERROR("bit %d already clear in word %d - bad bad\n",
+                       bit, word - last_rcvd_slots);
+                LBUG();
         }
 
-        if (mds->mds_client_count) {
-                CERROR("%d mds clients remaining after cleanup\n",
-                       mds->mds_client_count);
-                /* LBUG()? */
-        }
-        
+        OBD_FREE(med->med_mcd, sizeof(*med->med_mcd));
+
         return 0;
 }
 
@@ -136,6 +121,7 @@ static int mds_read_last_rcvd(struct obd_device *obddev, struct file *f)
         int cl_off;
         __u64 last_rcvd = 0;
         __u64 last_mount;
+        int clients = 0;
         int rc = 0;
 
         OBD_ALLOC(msd, sizeof(*msd));
@@ -175,10 +161,11 @@ static int mds_read_last_rcvd(struct obd_device *obddev, struct file *f)
         for (off = MDS_LR_CLIENT, cl_off = 0, rc = sizeof(*mcd);
              off <= fsize - sizeof(*mcd) && rc == sizeof(*mcd);
              off = MDS_LR_CLIENT + ++cl_off * MDS_LR_SIZE) {
-                if (!mcd)
+                if (!mcd) {
                         OBD_ALLOC(mcd, sizeof(*mcd));
-                if (!mcd)
-                        GOTO(err_msd, rc = -ENOMEM);
+                        if (!mcd)
+                                GOTO(err_msd, rc = -ENOMEM);
+                }
 
                 rc = lustre_fread(f, (char *)mcd, sizeof(*mcd), &off);
                 if (rc != sizeof(*mcd)) {
@@ -191,6 +178,7 @@ static int mds_read_last_rcvd(struct obd_device *obddev, struct file *f)
 
                 last_rcvd = le64_to_cpu(mcd->mcd_last_rcvd);
 
+                /* Do client recovery here (open files, etc) */
                 if (last_rcvd && (last_mount - le64_to_cpu(mcd->mcd_mount_count)
                                   < MDS_MOUNT_RECOV)) {
                         struct obd_export *export = class_new_export(obddev);
@@ -199,11 +187,14 @@ static int mds_read_last_rcvd(struct obd_device *obddev, struct file *f)
                                 break;
                         }
                         export->exp_mds_data.med_mcd = mcd;
+                        mds_client_add(&export->exp_mds_data, cl_off);
                         mcd = NULL;
+                        clients++;
                 } else {
                         CDEBUG(D_INFO,
-                               "client at offset %d with UUID '%s' ignored\n",
-                               cl_off, mcd->mcd_uuid);
+                               "ignored client %d, UUID '%s', last_mount %Ld\n",
+                               cl_off, mcd->mcd_uuid,
+                               (long long)le64_to_cpu(mcd->mcd_mount_count));
                 }
 
                 if (last_rcvd > mds->mds_last_rcvd) {
@@ -213,8 +204,8 @@ static int mds_read_last_rcvd(struct obd_device *obddev, struct file *f)
                         mds->mds_last_rcvd = last_rcvd;
                 }
         }
-        CDEBUG(D_INODE, "got %Lu for highest last_rcvd value, %d clients\n",
-               (unsigned long long)mds->mds_last_rcvd, mds->mds_client_count);
+        CDEBUG(D_INODE, "got %Lu for highest last_rcvd value, %d/%d clients\n",
+               (unsigned long long)mds->mds_last_rcvd, clients, cl_off);
 
         if (mcd)
                 OBD_FREE(mcd, sizeof(*mcd));
@@ -244,25 +235,12 @@ static int mds_fs_prep(struct obd_device *obddev)
                 CERROR("cannot create ROOT directory: rc = %d\n", rc);
                 GOTO(err_pop, rc);
         }
-        /* XXX probably want to hold on to this later... */
-        dput(dentry);
-        f = filp_open("ROOT", O_RDONLY, 0);
-        if (IS_ERR(f)) {
-                rc = PTR_ERR(f);
-                CERROR("cannot open ROOT: rc = %d\n", rc);
-                LBUG();
-                GOTO(err_pop, rc);
-        }
 
-        mds->mds_rootfid.id = f->f_dentry->d_inode->i_ino;
-        mds->mds_rootfid.generation = f->f_dentry->d_inode->i_generation;
+        mds->mds_rootfid.id = dentry->d_inode->i_ino;
+        mds->mds_rootfid.generation = dentry->d_inode->i_generation;
         mds->mds_rootfid.f_type = S_IFDIR;
 
-        rc = filp_close(f, 0);
-        if (rc) {
-                CERROR("cannot close ROOT: rc = %d\n", rc);
-                LBUG();
-        }
+        dput(dentry);
 
         dentry = simple_mkdir(current->fs->pwd, "FH", 0700);
         if (IS_ERR(dentry)) {
@@ -297,19 +275,17 @@ static int mds_fs_prep(struct obd_device *obddev)
                 GOTO(err_client, rc);
         }
         mds->mds_rcvd_filp = f;
+err_pop:
         pop_ctxt(&saved);
 
-        RETURN(0);
+        return rc;
 
 err_client:
-        mds_client_free_all(obddev);
+        class_disconnect_all(obddev);
 err_filp:
         if (filp_close(f, 0))
                 CERROR("can't close %s after error\n", LAST_RCVD);
-err_pop:
-        pop_ctxt(&saved);
-
-        return rc;
+        goto err_pop;
 }
 
 static struct mds_fs_operations *mds_search_fs_type(const char *name)
@@ -466,7 +442,8 @@ out_dec:
 void mds_fs_cleanup(struct obd_device *obddev)
 {
         struct mds_obd *mds = &obddev->u.mds;
-        mds_client_free_all(obddev);
+
+        class_disconnect_all(obddev); /* this cleans up client info too */
         mds_server_free_data(mds);
 
         OBD_FREE(mds->mds_sop, sizeof(*mds->mds_sop));
index c63ebc6..0642aa7 100644 (file)
@@ -128,11 +128,11 @@ static int mds_reint_setattr(struct mds_update_record *rec, int offset,
         }
 
         EXIT;
-      out_unlock:
+out_unlock:
         unlock_kernel();
-      out_setattr_de:
+out_setattr_de:
         l_dput(de);
-      out_setattr:
+out_setattr:
         req->rq_status = rc;
         return (0);
 }
@@ -177,10 +177,10 @@ static int mds_reint_recreate(struct mds_update_record *rec, int offset,
                 LBUG();
         }
 
-      out_create_dchild:
+out_create_dchild:
         l_dput(dchild);
         up(&dir->i_sem);
-      out_create_de:
+out_create_de:
         l_dput(de);
         req->rq_status = rc;
         return 0;
@@ -352,24 +352,24 @@ static int mds_reint_create(struct mds_update_record *rec, int offset,
                 body->valid = OBD_MD_FLID | OBD_MD_FLGENER;
         }
         EXIT;
-      out_create_commit:
+out_create_commit:
         err = mds_fs_commit(mds, dir, handle);
         if (err) {
                 CERROR("error on commit: err = %d\n", err);
                 if (!rc)
                         rc = err;
         }
-      out_create_dchild:
+out_create_dchild:
         l_dput(dchild);
         ldlm_lock_decref(&lockh, lock_mode);
-      out_create_de:
+out_create_de:
         up(&dir->i_sem);
         l_dput(de);
-      out_create:
+out_create:
         req->rq_status = rc;
         return 0;
 
-      out_create_unlink:
+out_create_unlink:
         /* Destroy the file we just created.  This should not need extra
          * journal credits, as we have already modified all of the blocks
          * needed in order to create the file in the first place.
@@ -482,7 +482,7 @@ static int mds_reint_unlink(struct mds_update_record *rec, int offset,
 
         EXIT;
 
-      out_unlink_cancel:
+out_unlink_cancel:
         ldlm_lock_decref(&child_lockh, LCK_EX);
         err = ldlm_cli_cancel(&child_lockh);
         if (err < 0) {
@@ -490,10 +490,10 @@ static int mds_reint_unlink(struct mds_update_record *rec, int offset,
                 if (!rc)
                         rc = -ENOLCK;   /*XXX translate LDLM lock error */
         }
-      out_unlink_dchild:
+out_unlink_dchild:
         l_dput(dchild);
         up(&dir->i_sem);
-      out_unlink:
+out_unlink:
         ldlm_lock_decref(&lockh, lock_mode);
         l_dput(de);
         req->rq_status = rc;
@@ -555,14 +555,14 @@ static int mds_reint_link(struct mds_update_record *rec, int offset,
         }
         EXIT;
 
-      out_link_dchild:
+out_link_dchild:
         l_dput(dchild);
-      out_link_de_tgt_dir:
+out_link_de_tgt_dir:
         up(&de_tgt_dir->d_inode->i_sem);
         l_dput(de_tgt_dir);
-      out_link_de_src:
+out_link_de_src:
         l_dput(de_src);
-      out_link:
+out_link:
         req->rq_status = rc;
         return 0;
 }
@@ -692,16 +692,16 @@ static int mds_reint_rename(struct mds_update_record *rec, int offset,
                         CERROR("failed to cancel child inode lock ino "
                                "%Ld: %d\n", res_id[0], rc);
         }
-      out_rename_tgtdir:
+out_rename_tgtdir:
         double_up(&de_srcdir->d_inode->i_sem, &de_tgtdir->d_inode->i_sem);
         ldlm_lock_decref(&tgtlockh, lock_mode);
-      out_rename_tgtput:
+out_rename_tgtput:
         l_dput(de_tgtdir);
-      out_rename_srcdir:
+out_rename_srcdir:
         ldlm_lock_decref(&srclockh, lock_mode);
-      out_rename_srcput:
+out_rename_srcput:
         l_dput(de_srcdir);
-      out_rename:
+out_rename:
         req->rq_status = rc;
         return 0;
 }