Whamcloud - gitweb
Branch: b1_4
authoradilger <adilger>
Wed, 4 May 2005 08:30:17 +0000 (08:30 +0000)
committeradilger <adilger>
Wed, 4 May 2005 08:30:17 +0000 (08:30 +0000)
Don't clobber last_rcvd upon mds_client_add() failure.  This was caused by
bad error handling in mds_connect() not setting med_mcd = NULL after freeing
it, and then zeroing the "client data" at med_off = 0.  Includes test.
This commit adds some safety checks to {mds,filter}_finish_transno()
and reconciles the differences between the mds and obdfilter last_rcvd
functions.
b=6086
r=nathan

14 files changed:
lustre/ChangeLog
lustre/include/linux/lustre_export.h
lustre/include/linux/lustre_net.h
lustre/include/linux/obd_support.h
lustre/mds/handler.c
lustre/mds/mds_fs.c
lustre/mds/mds_internal.h
lustre/mds/mds_open.c
lustre/mds/mds_reint.c
lustre/obdfilter/filter.c
lustre/obdfilter/lproc_obdfilter.c
lustre/osc/osc_quota.c
lustre/osc/osc_request.c
lustre/tests/recovery-small.sh

index 60e97ab..2609f03 100644 (file)
@@ -27,6 +27,7 @@ tbd         Cluster File Systems, Inc. <info@clusterfs.com>
        - reserve enough journal credits in fsfilt_start_log for setattr (4554)
        - ldlm_enqueue freed-export error path would always LBUG (6149,6184)
        - don't reference lr_lvb_data until after we hold lr_lvb_sem (6170)
+       - don't overwrite last_rcvd if there is a *_client_add() error (6068)
        * miscellania
        - by default create 1 inode per 4kB space on MDS, per 16kB on OSTs
        - allow --write-conf on an MDS with different nettype than client (5619)
index d06af11..2b83f97 100644 (file)
@@ -14,8 +14,8 @@ struct mds_export_data {
         struct list_head        med_open_head;
         spinlock_t              med_open_lock;
         struct mds_client_data *med_mcd;
-        loff_t                  med_off;
-        int                     med_idx;
+        loff_t                  med_lr_off;
+        int                     med_lr_idx;
 };
 
 struct osc_creator {
index 31ad9a5..e45f5fe 100644 (file)
@@ -291,7 +291,7 @@ struct ptlrpc_request {
                 rq_timedout:1, rq_resend:1, rq_restart:1, rq_replay:1,
                 rq_no_resend:1, rq_waiting:1, rq_receiving_reply:1,
                 rq_no_delay:1, rq_net_err:1;
-        int rq_phase;
+        int rq_phase; /* one of RQ_PHASE_* */
         atomic_t rq_refcount;   /* client-side refcount for SENT race */
 
         int rq_request_portal;  /* XXX FIXME bug 249 */
index c7604da..9f22cfe 100644 (file)
@@ -91,6 +91,7 @@ extern wait_queue_head_t obd_race_waitq;
 #define OBD_FAIL_MDS_OST_SETATTR         0x12c
 #define OBD_FAIL_MDS_QUOTACHECK_NET      0x12d
 #define OBD_FAIL_MDS_QUOTACTL_NET        0x12e
+#define OBD_FAIL_MDS_CLIENT_ADD          0x12f
 
 #define OBD_FAIL_OST                     0x200
 #define OBD_FAIL_OST_CONNECT_NET         0x201
index bbc69a3..1781dad 100644 (file)
@@ -366,7 +366,7 @@ static int mds_destroy_export(struct obd_export *export)
         spin_unlock(&med->med_open_lock);
         pop_ctxt(&saved, &obd->obd_ctxt, NULL);
 out:
-        mds_client_free(export, !(export->exp_flags & OBD_OPT_FAILOVER));
+        mds_client_free(export);
 
         RETURN(rc);
 }
index a71a5c2..78044f5 100644 (file)
@@ -56,6 +56,9 @@
  * in the last_rcvd file if cl_off is -1 (i.e. a new client).
  * Otherwise, we have just read the data from the last_rcvd file and
  * we know its offset.
+ *
+ * It should not be possible to fail adding an existing client - otherwise
+ * mds_init_server_data() callsite needs to be fixed.
  */
 int mds_client_add(struct obd_device *obd, struct mds_obd *mds,
                    struct mds_export_data *med, int cl_idx)
@@ -65,6 +68,7 @@ int mds_client_add(struct obd_device *obd, struct mds_obd *mds,
         ENTRY;
 
         LASSERT(bitmap != NULL);
+        LASSERTF(cl_idx > -2, "%d\n", cl_idx);
 
         /* XXX if mcd_uuid were a real obd_uuid, I could use obd_uuid_equals */
         if (!strcmp(med->med_mcd->mcd_uuid, obd->obd_uuid.uuid))
@@ -76,9 +80,10 @@ int mds_client_add(struct obd_device *obd, struct mds_obd *mds,
         if (new_client) {
                 cl_idx = find_first_zero_bit(bitmap, MDS_MAX_CLIENTS);
         repeat:
-                if (cl_idx >= MDS_MAX_CLIENTS) {
+                if (cl_idx >= MDS_MAX_CLIENTS ||
+                    OBD_FAIL_CHECK_ONCE(OBD_FAIL_MDS_CLIENT_ADD)) {
                         CERROR("no room for clients - fix MDS_MAX_CLIENTS\n");
-                        return -ENOMEM;
+                        return -EOVERFLOW;
                 }
                 if (test_and_set_bit(cl_idx, bitmap)) {
                         cl_idx = find_next_zero_bit(bitmap, MDS_MAX_CLIENTS,
@@ -96,13 +101,14 @@ int mds_client_add(struct obd_device *obd, struct mds_obd *mds,
         CDEBUG(D_INFO, "client at idx %d with UUID '%s' added\n",
                cl_idx, med->med_mcd->mcd_uuid);
 
-        med->med_idx = cl_idx;
-        med->med_off = le32_to_cpu(mds->mds_server_data->msd_client_start) +
+        med->med_lr_idx = cl_idx;
+        med->med_lr_off = le32_to_cpu(mds->mds_server_data->msd_client_start) +
                 (cl_idx * le16_to_cpu(mds->mds_server_data->msd_client_size));
+        LASSERTF(med->med_lr_off > 0, "med_lr_off = %llu\n", med->med_lr_off);
 
         if (new_client) {
                 struct obd_run_ctxt saved;
-                loff_t off = med->med_off;
+                loff_t off = med->med_lr_off;
                 struct file *file = mds->mds_rcvd_filp;
                 int rc;
 
@@ -114,13 +120,13 @@ int mds_client_add(struct obd_device *obd, struct mds_obd *mds,
                 if (rc)
                         return rc;
                 CDEBUG(D_INFO, "wrote client mcd at idx %u off %llu (len %u)\n",
-                       med->med_idx, med->med_off,
+                       med->med_lr_idx, med->med_lr_off,
                        (unsigned int)sizeof(*med->med_mcd));
         }
         return 0;
 }
 
-int mds_client_free(struct obd_export *exp, int clear_client)
+int mds_client_free(struct obd_export *exp)
 {
         struct mds_export_data *med = &exp->exp_mds_data;
         struct mds_obd *mds = &exp->exp_obd->u.mds;
@@ -128,47 +134,59 @@ int mds_client_free(struct obd_export *exp, int clear_client)
         struct mds_client_data zero_mcd;
         struct obd_run_ctxt saved;
         int rc;
-        unsigned long *bitmap = mds->mds_client_bitmap;
+        loff_t off;
+        ENTRY;
 
         if (!med->med_mcd)
                 RETURN(0);
 
         /* XXX if mcd_uuid were a real obd_uuid, I could use obd_uuid_equals */
         if (!strcmp(med->med_mcd->mcd_uuid, obd->obd_uuid.uuid))
-                GOTO(free_and_out, 0);
+                GOTO(free, 0);
+
+        CDEBUG(D_INFO, "freeing client at idx %u, offset %lld with UUID '%s'\n",
+               med->med_lr_idx, med->med_lr_off, med->med_mcd->mcd_uuid);
 
-        CDEBUG(D_INFO, "freeing client at idx %u (%lld)with UUID '%s'\n",
-               med->med_idx, med->med_off, med->med_mcd->mcd_uuid);
+        LASSERT(mds->mds_client_bitmap != NULL);
 
-        LASSERT(bitmap);
+        off = med->med_lr_off;
+
+        /* Don't clear med_lr_idx here as it is likely also unset.  At worst
+         * we leak a client slot that will be cleaned on the next recovery. */
+        if (off <= 0) {
+                CERROR("%s: client idx %d has offset %lld\n",
+                        obd->obd_name, med->med_lr_idx, off);
+                GOTO(free, rc = -EINVAL);
+        }
 
         /* Clear the bit _after_ zeroing out the client so we don't
            race with mds_client_add and zero out new clients.*/
-        if (!test_bit(med->med_idx, bitmap)) {
+        if (!test_bit(med->med_lr_idx, mds->mds_client_bitmap)) {
                 CERROR("MDS client %u: bit already clear in bitmap!!\n",
-                       med->med_idx);
+                       med->med_lr_idx);
                 LBUG();
         }
 
-        if (clear_client) {
+        if (!(exp->exp_flags & OBD_OPT_FAILOVER)) {
                 memset(&zero_mcd, 0, sizeof zero_mcd);
                 push_ctxt(&saved, &obd->obd_ctxt, NULL);
                 rc = fsfilt_write_record(obd, mds->mds_rcvd_filp, &zero_mcd,
-                                         sizeof(zero_mcd), &med->med_off, 1);
+                                         sizeof(zero_mcd), &off, 1);
                 pop_ctxt(&saved, &obd->obd_ctxt, NULL);
 
                 CDEBUG(rc == 0 ? D_INFO : D_ERROR,
                        "zeroing out client %s idx %u in %s rc %d\n",
-                       med->med_mcd->mcd_uuid, med->med_idx, LAST_RCVD, rc);
+                       med->med_mcd->mcd_uuid, med->med_lr_idx, LAST_RCVD, rc);
         }
 
-        if (!test_and_clear_bit(med->med_idx, bitmap)) {
+        if (!test_and_clear_bit(med->med_lr_idx, mds->mds_client_bitmap)) {
                 CERROR("MDS client %u: bit already clear in bitmap!!\n",
-                       med->med_idx);
+                       med->med_lr_idx);
                 LBUG();
         }
 
- free_and_out:
+        EXIT;
+free:
         OBD_FREE(med->med_mcd, sizeof(*med->med_mcd));
         med->med_mcd = NULL;
 
@@ -323,7 +341,9 @@ static int mds_init_server_data(struct obd_device *obd, struct file *file)
                        sizeof exp->exp_client_uuid.uuid);
                 med = &exp->exp_mds_data;
                 med->med_mcd = mcd;
-                mds_client_add(obd, mds, med, cl_idx);
+                rc = mds_client_add(obd, mds, med, cl_idx);
+                LASSERTF(rc == 0, "rc = %d\n", rc); /* can't fail existing */
+
                 /* create helper if export init gets more complex */
                 INIT_LIST_HEAD(&med->med_open_head);
                 spin_lock_init(&med->med_open_lock);
@@ -338,7 +358,7 @@ static int mds_init_server_data(struct obd_device *obd, struct file *file)
                        cl_idx, last_transno);
 
                 if (last_transno > mds->mds_last_transno)
-                       mds->mds_last_transno = last_transno;
+                        mds->mds_last_transno = last_transno;
         }
 
         if (mcd)
index 792d6d0..421fa75 100644 (file)
@@ -171,7 +171,7 @@ int mds_done_writing(struct ptlrpc_request *req);
 /* mds/mds_fs.c */
 int mds_client_add(struct obd_device *obd, struct mds_obd *mds,
                    struct mds_export_data *med, int cl_off);
-int mds_client_free(struct obd_export *exp, int clear_client);
+int mds_client_free(struct obd_export *exp);
 int mds_obd_create(struct obd_export *exp, struct obdo *oa,
                    struct lov_stripe_md **ea, struct obd_trans_info *oti);
 int mds_obd_destroy(struct obd_export *exp, struct obdo *oa,
index b2b9d60..cb09d80 100644 (file)
@@ -295,8 +295,8 @@ cleanup_dentry:
 static int mds_create_objects(struct ptlrpc_request *req, int offset,
                               struct mds_update_record *rec,
                               struct mds_obd *mds, struct obd_device *obd,
-                              struct dentry *dchild, void **handle, obd_id **ids,
-                              struct llog_cookie **ret_logcookies, 
+                              struct dentry *dchild, void **handle,
+                              obd_id **ids, struct llog_cookie **ret_logcookies,
                               int *setattr_async_flag)
 {
         struct obdo *oa;
index 075fcd7..ea46f39 100644 (file)
@@ -126,7 +126,7 @@ int mds_finish_transno(struct mds_obd *mds, struct inode *inode, void *handle,
                 }
         }
 
-        off = med->med_off;
+        off = med->med_lr_off;
 
         transno = req->rq_reqmsg->transno;
         if (rc != 0) {
@@ -147,10 +147,15 @@ int mds_finish_transno(struct mds_obd *mds, struct inode *inode, void *handle,
         mcd->mcd_last_result = cpu_to_le32(rc);
         mcd->mcd_last_data = cpu_to_le32(op_data);
 
-        fsfilt_add_journal_cb(req->rq_export->exp_obd, transno, handle,
-                              mds_commit_cb, NULL);
-        err = fsfilt_write_record(obd, mds->mds_rcvd_filp, mcd, sizeof(*mcd),
-                                  &off, 0);
+        if (off <= 0) {
+                CERROR("client idx %d has offset %lld\n", med->med_lr_idx, off);
+                err = -EINVAL;
+        } else {
+                fsfilt_add_journal_cb(req->rq_export->exp_obd, transno, handle,
+                                      mds_commit_cb, NULL);
+                err = fsfilt_write_record(obd, mds->mds_rcvd_filp, mcd,
+                                          sizeof(*mcd), &off, 0);
+        }
 
         if (err) {
                 log_pri = D_ERROR;
@@ -160,7 +165,7 @@ int mds_finish_transno(struct mds_obd *mds, struct inode *inode, void *handle,
 
         DEBUG_REQ(log_pri, req,
                   "wrote trans #"LPU64" rc %d client %s at idx %u: err = %d",
-                  transno, rc, mcd->mcd_uuid, med->med_idx, err);
+                  transno, rc, mcd->mcd_uuid, med->med_lr_idx, err);
 
         err = mds_lov_write_objids(obd);
         if (err) {
@@ -390,7 +395,7 @@ int mds_osc_setattr_async(struct obd_device *obd, struct inode *inode,
                 CERROR("Error unpack md %p\n", lmm);
                 GOTO(cleanup, rc);
         }
-        
+
         cleanup_phase = 2;
         /* then fill oa */
         oa->o_id = lsm->lsm_object_id;
@@ -401,7 +406,7 @@ int mds_osc_setattr_async(struct obd_device *obd, struct inode *inode,
                 oa->o_valid |= OBD_MD_FLCOOKIE;
                 oti.oti_logcookies = logcookies;
         }
-                                                                                                                             
+
         /* do setattr from mds to ost asynchronously */
         rc = obd_setattr_async(mds->mds_osc_exp, oa, lsm, &oti);
         if (rc)
@@ -417,7 +422,7 @@ cleanup:
                 if (logcookies)
                         OBD_FREE(logcookies, mds->mds_max_cookiesize);
         }
-                                                                                                                             
+
         RETURN(rc);
 }
 
@@ -484,17 +489,17 @@ static int mds_reint_setattr(struct mds_update_record *rec, int offset,
 
         OBD_FAIL_WRITE(OBD_FAIL_MDS_REINT_SETATTR_WRITE, inode->i_sb);
 
-        /* start a log jounal handle if needed*/
+        /* start a log jounal handle if needed */
         if (S_ISREG(inode->i_mode) &&
             rec->ur_iattr.ia_valid & (ATTR_UID | ATTR_GID)) {
                 lmm_size = mds->mds_max_mdsize;
                 OBD_ALLOC(lmm, lmm_size);
                 if (lmm == NULL)
                         GOTO(cleanup, rc = -ENOMEM);
-                
+
                 cleanup_phase = 2;
                 rc = mds_get_md(obd, inode, lmm, &lmm_size, need_lock);
-                if (rc < 0) 
+                if (rc < 0)
                         GOTO(cleanup, rc);
 
                 handle = fsfilt_start_log(obd, inode, FSFILT_OP_SETATTR, NULL,
@@ -543,20 +548,19 @@ static int mds_reint_setattr(struct mds_update_record *rec, int offset,
                         GOTO(cleanup, rc);
 
                 lum = rec->ur_eadata;
-                /* if lmm_stripe_size is -1,  then delete the stripe 
-                        info from the dir */
-                if (S_ISDIR(inode->i_mode) && 
+                /* if lmm_stripe_size is -1 then delete stripe info from dir */
+                if (S_ISDIR(inode->i_mode) &&
                     lum->lmm_stripe_size == (typeof(lum->lmm_stripe_size))(-1)){
                         rc = fsfilt_set_md(obd, inode, handle, NULL, 0);
                         if (rc)
                                 GOTO(cleanup, rc);
                 } else {
                         rc = obd_iocontrol(OBD_IOC_LOV_SETSTRIPE,
-                                           mds->mds_osc_exp, 0, 
+                                           mds->mds_osc_exp, 0,
                                            &lsm, rec->ur_eadata);
                         if (rc)
                                 GOTO(cleanup, rc);
-                        
+
                         obd_free_memmd(mds->mds_osc_exp, &lsm);
 
                         rc = fsfilt_set_md(obd, inode, handle, rec->ur_eadata,
@@ -603,7 +607,7 @@ static int mds_reint_setattr(struct mds_update_record *rec, int offset,
                                       mds_cancel_cookies_cb, mlcd);
         err = mds_finish_transno(mds, inode, handle, req, rc, 0);
         /* do mds to ost setattr if needed */
-        if (!rc && !err && lmm_size) 
+        if (!rc && !err && lmm_size)
                 mds_osc_setattr_async(obd, inode, lmm, lmm_size, logcookies);
 
         switch (cleanup_phase) {
@@ -633,7 +637,7 @@ static int mds_reint_setattr(struct mds_update_record *rec, int offset,
 
         /* trigger dqrel/dqacq for original owner and new owner */
         if (rec->ur_iattr.ia_valid & (ATTR_UID | ATTR_GID)) {
-                mds_adjust_qunit(obd, rec->ur_iattr.ia_uid, 
+                mds_adjust_qunit(obd, rec->ur_iattr.ia_uid,
                                  rec->ur_iattr.ia_gid, 0, 0, rc);
                 mds_adjust_qunit(obd, child_uid, child_gid, 0, 0, rc);
         }
@@ -846,12 +850,12 @@ static int mds_reint_create(struct mds_update_record *rec, int offset,
                         rc = mds_get_md(obd, dir, &lmm, &lmm_size, 1);
                         if (rc > 0) {
                                 down(&inode->i_sem);
-                                rc = fsfilt_set_md(obd, inode, handle, 
+                                rc = fsfilt_set_md(obd, inode, handle,
                                                    &lmm, lmm_size);
                                 up(&inode->i_sem);
                         }
                         if (rc)
-                                CERROR("error on copy stripe info: rc = %d\n", 
+                                CERROR("error on copy stripe info: rc = %d\n",
                                         rc);
                 }
 
@@ -907,9 +911,9 @@ cleanup:
                 LBUG();
         }
         req->rq_status = rc;
-        
+
         /* trigger dqacq on the owner of child and parent */
-        mds_adjust_qunit(obd, current->fsuid, current->fsgid, 
+        mds_adjust_qunit(obd, current->fsuid, current->fsgid,
                          parent_uid, parent_gid, rc);
         return 0;
 }
@@ -1402,7 +1406,7 @@ static int mds_reint_unlink(struct mds_update_record *rec, int offset,
         child_gid = child_inode->i_gid;
         parent_uid = dparent->d_inode->i_uid;
         parent_gid = dparent->d_inode->i_gid;
-        
+
         cleanup_phase = 2; /* dchild has a lock */
 
         /* We have to do these checks ourselves, in case we are making an
index 093a583..79ba6ed 100644 (file)
@@ -110,10 +110,16 @@ int filter_finish_transno(struct obd_export *exp, struct obd_trans_info *oti,
         fcd->fcd_last_xid = 0;
 
         off = fed->fed_lr_off;
-        fsfilt_add_journal_cb(exp->exp_obd, last_rcvd, oti->oti_handle,
-                              filter_commit_cb, NULL);
-        err = fsfilt_write_record(exp->exp_obd, filter->fo_rcvd_filp, fcd,
-                                  sizeof(*fcd), &off, 0);
+        if (off <= 0) {
+                CERROR("%s: client idx %d is %lld\n", exp->exp_obd->obd_name,
+                       fed->fed_lr_idx, fed->fed_lr_off);
+                err = -EINVAL;
+        } else {
+                fsfilt_add_journal_cb(exp->exp_obd, last_rcvd, oti->oti_handle,
+                                      filter_commit_cb, NULL);
+                err = fsfilt_write_record(exp->exp_obd, filter->fo_rcvd_filp,
+                                          fcd, sizeof(*fcd), &off, 0);
+        }
         if (err) {
                 log_pri = D_ERROR;
                 if (rc == 0)
@@ -148,6 +154,7 @@ static int filter_client_add(struct obd_device *obd, struct filter_obd *filter,
         ENTRY;
 
         LASSERT(bitmap != NULL);
+        LASSERTF(cl_idx > -2, "%d\n", cl_idx);
 
         /* XXX if fcd_uuid were a real obd_uuid, I could use obd_uuid_equals */
         if (!strcmp(fed->fed_fcd->fcd_uuid, obd->obd_uuid.uuid))
@@ -161,7 +168,7 @@ static int filter_client_add(struct obd_device *obd, struct filter_obd *filter,
         repeat:
                 if (cl_idx >= FILTER_LR_MAX_CLIENTS) {
                         CERROR("no client slots - fix FILTER_LR_MAX_CLIENTS\n");
-                        RETURN(-ENOMEM);
+                        RETURN(-EOVERFLOW);
                 }
                 if (test_and_set_bit(cl_idx, bitmap)) {
                         cl_idx = find_next_zero_bit(bitmap,
@@ -180,6 +187,7 @@ static int filter_client_add(struct obd_device *obd, struct filter_obd *filter,
         fed->fed_lr_idx = cl_idx;
         fed->fed_lr_off = le32_to_cpu(filter->fo_fsd->fsd_client_start) +
                 cl_idx * le16_to_cpu(filter->fo_fsd->fsd_client_size);
+        LASSERTF(fed->fed_lr_off > 0, "fed_lr_off = %llu\n", fed->fed_lr_off);
 
         CDEBUG(D_INFO, "client at index %d (%llu) with UUID '%s' added\n",
                fed->fed_lr_idx, fed->fed_lr_off, fed->fed_fcd->fcd_uuid);
@@ -235,19 +243,24 @@ static int filter_client_free(struct obd_export *exp)
         if (fed->fed_fcd == NULL)
                 RETURN(0);
 
-        if (exp->exp_flags & OBD_OPT_FAILOVER)
-                GOTO(free, 0);
-
         /* XXX if fcd_uuid were a real obd_uuid, I could use obd_uuid_equals */
         if (strcmp(fed->fed_fcd->fcd_uuid, obd->obd_uuid.uuid ) == 0)
                 GOTO(free, 0);
 
+        CDEBUG(D_INFO, "freeing client at idx %u, offset %lld with UUID '%s'\n",
+               fed->fed_lr_idx, off, fed->fed_fcd->fcd_uuid);
+
         LASSERT(filter->fo_last_rcvd_slots != NULL);
 
         off = fed->fed_lr_off;
 
-        CDEBUG(D_INFO, "freeing client at idx %u (%lld) with UUID '%s'\n",
-               fed->fed_lr_idx, fed->fed_lr_off, fed->fed_fcd->fcd_uuid);
+        /* Don't clear fed_lr_idx here as it is likely also unset.  At worst
+         * we leak a client slot that will be cleaned on the next recovery. */
+        if (off <= 0) {
+                CERROR("%s: client idx %d has med_off %lld\n",
+                       obd->obd_name, fed->fed_lr_idx, off);
+                GOTO(free, rc = -EINVAL);
+        }
 
         /* Clear the bit _after_ zeroing out the client so we don't
            race with filter_client_add and zero out new clients.*/
@@ -257,21 +270,23 @@ static int filter_client_free(struct obd_export *exp)
                 LBUG();
         }
 
-        memset(&zero_fcd, 0, sizeof zero_fcd);
-        push_ctxt(&saved, &obd->obd_ctxt, NULL);
-        rc = fsfilt_write_record(obd, filter->fo_rcvd_filp, &zero_fcd,
-                                 sizeof(zero_fcd), &off, 0);
+        if (!(exp->exp_flags & OBD_OPT_FAILOVER)) {
+                memset(&zero_fcd, 0, sizeof zero_fcd);
+                push_ctxt(&saved, &obd->obd_ctxt, NULL);
+                rc = fsfilt_write_record(obd, filter->fo_rcvd_filp, &zero_fcd,
+                                         sizeof(zero_fcd), &off, 0);
 
-        if (rc == 0)
-                /* update server's transno */
-                filter_update_server_data(obd, filter->fo_rcvd_filp,
-                                          filter->fo_fsd, 1);
-        pop_ctxt(&saved, &obd->obd_ctxt, NULL);
+                if (rc == 0)
+                        /* update server's transno */
+                        filter_update_server_data(obd, filter->fo_rcvd_filp,
+                                                  filter->fo_fsd, 1);
+                pop_ctxt(&saved, &obd->obd_ctxt, NULL);
 
-        CDEBUG(rc == 0 ? D_INFO : D_ERROR,
-               "zeroing disconnecting client %s at idx %u (%llu) in %s rc %d\n",
-               fed->fed_fcd->fcd_uuid, fed->fed_lr_idx, fed->fed_lr_off,
-               LAST_RCVD, rc);
+                CDEBUG(rc == 0 ? D_INFO : D_ERROR,
+                       "zeroing out client %s at idx %u (%llu) in %s rc %d\n",
+                       fed->fed_fcd->fcd_uuid, fed->fed_lr_idx, fed->fed_lr_off,
+                       LAST_RCVD, rc);
+        }
 
         if (!test_and_clear_bit(fed->fed_lr_idx, filter->fo_last_rcvd_slots)) {
                 CERROR("FILTER client %u: bit already clear in bitmap!!\n",
@@ -279,10 +294,12 @@ static int filter_client_free(struct obd_export *exp)
                 LBUG();
         }
 
+        EXIT;
 free:
         OBD_FREE(fed->fed_fcd, sizeof(*fed->fed_fcd));
+        fed->fed_fcd = NULL;
 
-        RETURN(0);
+        return 0;
 }
 
 static int filter_free_server_data(struct filter_obd *filter)
@@ -475,7 +492,9 @@ static int filter_init_server_data(struct obd_device *obd, struct file * filp)
                        sizeof exp->exp_client_uuid.uuid);
                 fed = &exp->exp_filter_data;
                 fed->fed_fcd = fcd;
-                filter_client_add(obd, filter, fed, cl_idx);
+                rc = filter_client_add(obd, filter, fed, cl_idx);
+                LASSERTF(rc == 0, "rc = %d\n", rc); /* can't fail existing */
+
                 /* create helper if export init gets more complex */
                 spin_lock_init(&fed->fed_lock);
 
@@ -1431,7 +1450,7 @@ static int filter_cleanup(struct obd_device *obd)
 
 /* nearly identical to mds_connect */
 static int filter_connect(struct lustre_handle *conn, struct obd_device *obd,
-                          struct obd_uuid *cluuid, struct obd_connect_data *data)
+                          struct obd_uuid *cluuid,struct obd_connect_data *data)
 {
         struct obd_export *exp;
         struct filter_export_data *fed;
index 61d0fb8..f504922 100644 (file)
@@ -369,7 +369,7 @@ static int filter_brw_stats_seq_show(struct seq_file *seq, void *v)
                 unsigned long w = filter->fo_w_pages.oh_buckets[i];
                 read_cum += r;
                 write_cum += w;
-                seq_printf(seq, "%d:\t\t%10lu %3lu %3lu   | %10lu %3lu %3lu\n",
+                seq_printf(seq, "%u:\t\t%10lu %3lu %3lu   | %10lu %3lu %3lu\n",
                                  1 << i, r, pct(r, read_tot),
                                  pct(read_cum, read_tot), w,
                                  pct(w, write_tot),
@@ -393,7 +393,7 @@ static int filter_brw_stats_seq_show(struct seq_file *seq, void *v)
                 unsigned long w = filter->fo_w_discont_pages.oh_buckets[i];
                 read_cum += r;
                 write_cum += w;
-                seq_printf(seq, "%d:\t\t%10lu %3lu %3lu   | %10lu %3lu %3lu\n",
+                seq_printf(seq, "%u:\t\t%10lu %3lu %3lu   | %10lu %3lu %3lu\n",
                                  i, r, pct(r, read_tot),
                                  pct(read_cum, read_tot), w,
                                  pct(w, write_tot),
@@ -416,7 +416,7 @@ static int filter_brw_stats_seq_show(struct seq_file *seq, void *v)
                 unsigned long w = filter->fo_w_discont_blocks.oh_buckets[i];
                 read_cum += r;
                 write_cum += w;
-                seq_printf(seq, "%d:\t\t%10lu %3lu %3lu   | %10lu %3lu %3lu\n",
+                seq_printf(seq, "%u:\t\t%10lu %3lu %3lu   | %10lu %3lu %3lu\n",
                                  i, r, pct(r, read_tot),
                                  pct(read_cum, read_tot), w,
                                  pct(w, write_tot),
@@ -440,7 +440,7 @@ static int filter_brw_stats_seq_show(struct seq_file *seq, void *v)
                 unsigned long w = filter->fo_write_rpc_hist.oh_buckets[i];
                 read_cum += r;
                 write_cum += w;
-                seq_printf(seq, "%d:\t\t%10lu %3lu %3lu   | %10lu %3lu %3lu\n",
+                seq_printf(seq, "%u:\t%10lu %3lu %3lu   | %10lu %3lu %3lu\n",
                                  i, r, pct(r, read_tot),
                                  pct(read_cum, read_tot), w,
                                  pct(w, write_tot),
@@ -463,7 +463,7 @@ static int filter_brw_stats_seq_show(struct seq_file *seq, void *v)
                 unsigned long w = filter->fo_w_io_time.oh_buckets[i];
                 read_cum += r;
                 write_cum += w;
-                seq_printf(seq, "%d:\t\t%10lu %3lu %3lu   | %10lu %3lu %3lu\n",
+                seq_printf(seq, "%10u:\t%10lu %3lu %3lu   | %10lu %3lu %3lu\n",
                                  1 << i, r, pct(r, read_tot),
                                  pct(read_cum, read_tot), w,
                                  pct(w, write_tot),
@@ -487,12 +487,15 @@ static int filter_brw_stats_seq_show(struct seq_file *seq, void *v)
 
                 read_cum += r;
                 write_cum += w;
+                if (read_cum == 0 && write_cum == 0)
+                        continue;
+
                 if (i < 10)
-                        seq_printf(seq, "%d", 1<<i);
+                        seq_printf(seq, "%u", 1<<i);
                 else if (i < 20)
-                        seq_printf(seq, "%dK", 1<<(i-10));
+                        seq_printf(seq, "%uK", 1<<(i-10));
                 else
-                        seq_printf(seq, "%dM", 1<<(i-20));
+                        seq_printf(seq, "%uM", 1<<(i-20));
 
                 seq_printf(seq, ":\t\t%10lu %3lu %3lu   | %10lu %3lu %3lu\n",
                            r, pct(r, read_tot), pct(read_cum, read_tot), 
index b09c2c5..2ace56a 100644 (file)
@@ -100,20 +100,19 @@ static struct osc_quota_info *alloc_qinfo(struct client_obd *cli,
 {
         struct osc_quota_info *oqi;
         ENTRY;
-                                                                                                                             
-        OBD_SLAB_ALLOC(oqi, qinfo_cachep, SLAB_KERNEL,
-                       sizeof(*oqi));
+
+        OBD_SLAB_ALLOC(oqi, qinfo_cachep, SLAB_KERNEL, sizeof(*oqi));
         if(!oqi)
                 RETURN(NULL);
-                                                                                                                             
+
         INIT_LIST_HEAD(&oqi->oqi_hash);
         oqi->oqi_cli = cli;
         oqi->oqi_id = id;
         oqi->oqi_type = type;
-                                                                                                                             
+
         RETURN(oqi);
 }
-                                                                                                                             
+
 static void free_qinfo(struct osc_quota_info *oqi)
 {
         OBD_SLAB_FREE(oqi, qinfo_cachep, sizeof(*oqi));
index cf888d6..0e21a8c 100644 (file)
@@ -1960,7 +1960,7 @@ static int osc_queue_async_io(struct obd_export *exp, struct lov_stripe_md *lsm,
         if (cmd == OBD_BRW_WRITE){
                 struct obd_async_page_ops *ops;
                 struct obdo *oa = NULL;
-                                                                                                                             
+
                 oa = obdo_alloc();
                 if (oa == NULL)
                         RETURN(-ENOMEM);
index 2c1fbd9..fb740b3 100755 (executable)
@@ -136,14 +136,14 @@ run_test 9 "pause bulk on OST (bug 1420)"
 
 #bug 1521
 test_10() {
-    do_facet client mcreate $MOUNT/f10        || return 1
-    drop_bl_callback "chmod 0777 $MOUNT/f10"  || return 2
+    do_facet client mcreate $MOUNT/$tfile        || return 1
+    drop_bl_callback "chmod 0777 $MOUNT/$tfile"  || return 2
     # wait for the mds to evict the client
     #echo "sleep $(($TIMEOUT*2))"
     #sleep $(($TIMEOUT*2))
-    do_facet client touch  $MOUNT/f10 || echo "touch failed, evicted"
-    do_facet client checkstat -v -p 0777 $MOUNT/f10  || return 3
-    do_facet client "munlink $MOUNT/f10"
+    do_facet client touch $MOUNT/$tfile || echo "touch failed, evicted"
+    do_facet client checkstat -v -p 0777 $MOUNT/$tfile  || return 3
+    do_facet client "munlink $MOUNT/$tfile"
 }
 run_test 10 "finish request on server after client eviction (bug 1521)"
 
@@ -421,6 +421,18 @@ test_26() {      # bug 5921 - evict dead exports
 }
 run_test 26 "evict dead exports"
 
+test_28() {      # bug 6086 - error adding new clients
+       do_facet client mcreate $MOUNT/$tfile        || return 1
+       drop_bl_callback "chmod 0777 $MOUNT/$tfile"  || return 2
+       #define OBD_FAIL_MDS_ADD_CLIENT 0x12f
+       do_facet mds sysctl -w lustre.fail_loc=0x8000012f
+       # fail once (evicted), reconnect fail (fail_loc), ok
+       df || (sleep 1; df) || (sleep 1; df) || error "reconnect failed"
+       rm -f $MOUNT/$tfile
+       fail mds                # verify MDS last_rcvd can be loaded
+}
+run_test 28 "handle error adding new clients (bug 6086)"
+
 test_50() {     # bug 4834 - failover under load failures
        mkdir -p $DIR/$tdir
        # put a load of file creates/writes/deletes for 10 min.