Whamcloud - gitweb
Branch b1_6
authortianzy <tianzy>
Thu, 11 Sep 2008 14:50:17 +0000 (14:50 +0000)
committertianzy <tianzy>
Thu, 11 Sep 2008 14:50:17 +0000 (14:50 +0000)
This patch includes att18982, att18236, att18237,att18983 and att19061 in bz14840.
Slove "quota recovery deadlock during mds failover", it includes:
1. fix osts hang when mds does failover with quotaon
2. prevent watchdog storm when osts threads wait for the
   recovery of mds
b=14840
i=johann
i=shadow
i=panda

20 files changed:
lustre/ChangeLog
lustre/include/lustre_net.h
lustre/include/lustre_quota.h
lustre/include/obd.h
lustre/ldlm/ldlm_lib.c
lustre/mds/mds_open.c
lustre/mds/mds_reint.c
lustre/obdecho/echo_client.c
lustre/obdfilter/filter.c
lustre/obdfilter/filter_io_24.c
lustre/obdfilter/filter_io_26.c
lustre/ptlrpc/service.c
lustre/quota/quota_adjust_qunit.c
lustre/quota/quota_context.c
lustre/quota/quota_ctl.c
lustre/quota/quota_interface.c
lustre/quota/quota_internal.h
lustre/quota/quota_master.c
lustre/tests/sanity-quota.sh
lustre/tests/test-framework.sh

index 1ece8a7..f2a852c 100644 (file)
@@ -122,6 +122,15 @@ Description: Early reply size mismatch, MGC loses connection
 Details    : Apply the MGS_CONNECT_SUPPORTED mask at reconnect time so
              the connect flags are properly negotiated.
 
+Severity   : major
+Bugzilla   : 14840
+Description: quota recovery deadlock during mds failover
+Details    : This patch includes att18982, att18236, att18237 in bz14840.
+             Slove the problems:
+             1. fix osts hang when mds does failover with quotaon
+             2. prevent watchdog storm when osts threads wait for the
+               recovery of mds
+
 --------------------------------------------------------------------------
 
 2008-08-31 Sun Microsystems, Inc.
index cf77abc..000e3e2 100644 (file)
@@ -520,6 +520,8 @@ struct ptlrpc_thread {
         __u32 t_flags;
 
         unsigned int t_id; /* service thread index, from ptlrpc_start_threads */
+        struct lc_watchdog *t_watchdog; /* put watchdog in the structure per
+                                         * thread b=14840 */
         cfs_waitq_t t_ctl_waitq;
 };
 
index d1d2fa1..c2ded55 100644 (file)
@@ -232,6 +232,7 @@ struct lustre_quota_ctxt {
         unsigned long lqc_recovery:1,   /* Doing recovery */
                       lqc_switch_qs:1,  /* the function of change qunit size
                                          * 0:Off, 1:On */
+                      lqc_valid:1,      /* this qctxt is valid or not */
                       lqc_setup:1;      /* tell whether of not quota_type has
                                          * been processed, so that the master
                                          * knows when it can start processing
@@ -264,6 +265,10 @@ struct lustre_quota_ctxt {
                                              * seconds must be waited between
                                              * enlarging and shinking qunit */
         spinlock_t    lqc_lock;         /* guard lqc_imp_valid now */
+        cfs_waitq_t   lqc_wait_for_qmaster; /* when mds isn't connected, threads
+                                             * on osts who send the quota reqs
+                                             * with wait==1 will be put here
+                                             * b=14840 */
         struct proc_dir_entry *lqc_proc_dir;
         struct lprocfs_stats  *lqc_stats; /* lquota statistics */
 };
@@ -355,8 +360,9 @@ struct quotacheck_thread_args {
         atomic_t            *qta_sem;   /* obt_quotachecking */
 };
 
-typedef int (*quota_acquire)(struct obd_device *obd,
-                             unsigned int uid, unsigned int gid);
+struct obd_trans_info;
+typedef int (*quota_acquire)(struct obd_device *obd, unsigned int uid,
+                             unsigned int gid, struct obd_trans_info *oti);
 
 typedef struct {
         int (*quota_init) (void);
@@ -386,13 +392,15 @@ typedef struct {
         int (*quota_getflag) (struct obd_device *, struct obdo *);
 
         /* For quota slave, acquire/release quota from master if needed */
-        int (*quota_acquire) (struct obd_device *, unsigned int, unsigned int);
+        int (*quota_acquire) (struct obd_device *, unsigned int, unsigned int,
+                              struct obd_trans_info *);
 
         /* For quota slave, check whether specified uid/gid's remaining quota
          * can finish a block_write or inode_create rpc. It updates the pending
          * record of block and inode, acquires quota if necessary */
         int (*quota_chkquota) (struct obd_device *, unsigned int, unsigned int,
-                               int, int *, quota_acquire);
+                               int, int *, quota_acquire,
+                               struct obd_trans_info *);
 
         /* For quota client, poll if the quota check done */
         int (*quota_poll_check) (struct obd_export *, struct if_quotacheck *);
@@ -593,20 +601,21 @@ static inline int lquota_getflag(quota_interface_t *interface,
 
 static inline int lquota_acquire(quota_interface_t *interface,
                                  struct obd_device *obd,
-                                 unsigned int uid, unsigned int gid)
+                                 unsigned int uid, unsigned int gid,
+                                 struct obd_trans_info *oti)
 {
         int rc;
         ENTRY;
 
         QUOTA_CHECK_OP(interface, acquire);
-        rc = QUOTA_OP(interface, acquire)(obd, uid, gid);
+        rc = QUOTA_OP(interface, acquire)(obd, uid, gid, oti);
         RETURN(rc);
 }
 
 static inline int lquota_chkquota(quota_interface_t *interface,
                                   struct obd_device *obd,
-                                  unsigned int uid, unsigned int gid,
-                                  int count, int *flag)
+                                  unsigned int uid, unsigned int gid, int count,
+                                  int *flag, struct obd_trans_info *oti)
 {
         int rc;
         ENTRY;
@@ -614,7 +623,7 @@ static inline int lquota_chkquota(quota_interface_t *interface,
         QUOTA_CHECK_OP(interface, chkquota);
         QUOTA_CHECK_OP(interface, acquire);
         rc = QUOTA_OP(interface, chkquota)(obd, uid, gid, count, flag,
-                                           QUOTA_OP(interface, acquire));
+                                           QUOTA_OP(interface, acquire), oti);
         RETURN(rc);
 }
 
index b80d350..9527818 100644 (file)
@@ -687,7 +687,7 @@ struct obd_trans_info {
         int                      oti_numcookies;
 
         /* initial thread handling transaction */
-        int                      oti_thread_id;
+        struct ptlrpc_thread *   oti_thread;
         __u32                    oti_conn_cnt;
 
         struct obd_uuid         *oti_ost_uuid;
@@ -707,7 +707,7 @@ static inline void oti_init(struct obd_trans_info *oti,
 
         if (req->rq_repmsg != NULL)
                 oti->oti_transno = lustre_msg_get_transno(req->rq_repmsg);
-        oti->oti_thread_id = req->rq_svc_thread ? req->rq_svc_thread->t_id : -1;
+        oti->oti_thread = req->rq_svc_thread;
         if (req->rq_reqmsg != NULL)
                 oti->oti_conn_cnt = lustre_msg_get_conn_cnt(req->rq_reqmsg);
 }
index 6538c66..e799c54 100644 (file)
@@ -1788,7 +1788,12 @@ int target_handle_dqacq_callback(struct ptlrpc_request *req)
         }
 
         /* we use the observer */
-        LASSERT(obd->obd_observer && obd->obd_observer->obd_observer);
+        if (!obd->obd_observer || !obd->obd_observer->obd_observer) {
+                CERROR("Can't find the observer, it is recovering\n");
+                req->rq_status = -EIO;
+                GOTO(send_reply, rc = -EIO);
+        }
+
         master_obd = obd->obd_observer->obd_observer;
         qctxt = &master_obd->u.obt.obt_qctxt;
 
@@ -1821,7 +1826,7 @@ int target_handle_dqacq_callback(struct ptlrpc_request *req)
 
         /* Block the quota req. b=14840 */
         OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_BLOCK_QUOTA_REQ, obd_timeout);
-
+ send_reply:
         rc = ptlrpc_reply(req);
 out:
         OBD_FREE(qdata, sizeof(struct qunit_data));
index cfab6d3..9616308 100644 (file)
@@ -1071,9 +1071,11 @@ int mds_open(struct mds_update_record *rec, int offset,
                         gid = current->fsgid;
 
                 /* we try to get enough quota to write here, and let ldiskfs
-                 * decide if it is out of quota or not b=14783 */
+                 * decide if it is out of quota or not b=14783
+                 * FIXME: after CMD is used, pointer to obd_trans_info* couldn't
+                 * be NULL, b=14840 */
                 lquota_chkquota(mds_quota_interface_ref, obd,
-                                current->fsuid, gid, 1, &rec_pending);
+                                current->fsuid, gid, 1, &rec_pending, NULL);
 
                 intent_set_disposition(rep, DISP_OPEN_CREATE);
                 handle = fsfilt_start(obd, dparent->d_inode, FSFILT_OP_CREATE,
index 7d94fe2..c7da418 100644 (file)
@@ -896,9 +896,11 @@ static int mds_reint_create(struct mds_update_record *rec, int offset,
                 gid = current->fsgid;
 
         /* we try to get enough quota to write here, and let ldiskfs
-         * decide if it is out of quota or not b=14783 */
+         * decide if it is out of quota or not b=14783
+         * FIXME: after CMD is used, pointer to obd_trans_info* couldn't
+         * be NULL, b=14840 */
         lquota_chkquota(mds_quota_interface_ref, obd,
-                        current->fsuid, gid, 1, &rec_pending);
+                        current->fsuid, gid, 1, &rec_pending, NULL);
 
         switch (type) {
         case S_IFREG:{
index c9a28be..8e54dbb 100644 (file)
@@ -933,7 +933,7 @@ int echo_client_brw_ioctl(int rw, struct obd_export *exp,
 {
         struct obd_device *obd = class_exp2obd(exp);
         struct echo_client_obd *ec = &obd->u.echo_client;
-        struct obd_trans_info dummy_oti = { .oti_thread_id = -1 };
+        struct obd_trans_info dummy_oti = { .oti_thread = NULL };
         struct ec_object *eco;
         int rc;
         ENTRY;
index e611d96..ebbac7e 100644 (file)
@@ -1686,7 +1686,8 @@ static int filter_iobuf_pool_init(struct filter_obd *filter)
  * If we haven't allocated a pool entry for this thread before, do so now. */
 void *filter_iobuf_get(struct filter_obd *filter, struct obd_trans_info *oti)
 {
-        int thread_id                    = oti ? oti->oti_thread_id : -1;
+        int thread_id                    = (oti && oti->oti_thread) ?
+                                           oti->oti_thread->t_id : -1;
         struct filter_iobuf  *pool       = NULL;
         struct filter_iobuf **pool_place = NULL;
 
index 3ac9f9d..45fe017 100644 (file)
@@ -396,7 +396,8 @@ void filter_free_iobuf(struct filter_iobuf *buf)
 void filter_iobuf_put(struct filter_obd *filter, struct filter_iobuf *iobuf,
                       struct obd_trans_info *oti)
 {
-        int thread_id = oti ? oti->oti_thread_id : -1;
+        int thread_id = (oti && oti->oti_thread) ?
+                        oti->oti_thread->t_id : -1;
 
         if (unlikely(thread_id < 0)) {
                 filter_free_iobuf(iobuf);
index 7fc3220..6fd6b3a 100644 (file)
@@ -241,7 +241,8 @@ void filter_free_iobuf(struct filter_iobuf *iobuf)
 void filter_iobuf_put(struct filter_obd *filter, struct filter_iobuf *iobuf,
                       struct obd_trans_info *oti)
 {
-        int thread_id = oti ? oti->oti_thread_id : -1;
+        int thread_id = (oti && oti->oti_thread) ?
+                        oti->oti_thread->t_id : -1;
 
         if (unlikely(thread_id < 0)) {
                 filter_free_iobuf(iobuf);
@@ -661,7 +662,7 @@ int filter_commitrw_write(struct obd_export *exp, struct obdo *oa,
         /* we try to get enough quota to write here, and let ldiskfs
          * decide if it is out of quota or not b=14783 */
         lquota_chkquota(filter_quota_interface_ref, obd, oa->o_uid,
-                        oa->o_gid, niocount, &rec_pending);
+                        oa->o_gid, niocount, &rec_pending, oti);
 
         iobuf = filter_iobuf_get(&obd->u.filter, oti);
         if (IS_ERR(iobuf))
index 5ec0105..3fbfc44 100644 (file)
@@ -1313,7 +1313,6 @@ static int ptlrpc_main(void *arg)
         struct ptlrpc_thread   *thread = data->thread;
         struct obd_device      *dev = data->dev;
         struct ptlrpc_reply_state *rs;
-        struct lc_watchdog     *watchdog;
 #ifdef WITH_GROUP_INFO
         struct group_info *ginfo = NULL;
 #endif
@@ -1371,9 +1370,10 @@ static int ptlrpc_main(void *arg)
          */
         cfs_waitq_signal(&thread->t_ctl_waitq);
 
-        watchdog = lc_watchdog_add(max_t(int, obd_timeout, AT_OFF ? 0 :
-                                   at_get(&svc->srv_at_estimate)) *
-                                   svc->srv_watchdog_factor, NULL, NULL);
+        thread->t_watchdog = lc_watchdog_add(max_t(int, obd_timeout, AT_OFF ? 0 :
+                                                   at_get(&svc->srv_at_estimate))
+                                             *  svc->srv_watchdog_factor,
+                                             NULL, NULL);
 
         spin_lock(&svc->srv_lock);
         svc->srv_threads_running++;
@@ -1392,7 +1392,7 @@ static int ptlrpc_main(void *arg)
                 struct l_wait_info lwi = LWI_TIMEOUT(svc->srv_rqbd_timeout,
                                                      ptlrpc_retry_rqbds, svc);
 
-                lc_watchdog_disable(watchdog);
+                lc_watchdog_disable(thread->t_watchdog);
 
                 cond_resched();
 
@@ -1409,7 +1409,7 @@ static int ptlrpc_main(void *arg)
                               svc->srv_at_check,
                               &lwi);
 
-                lc_watchdog_touch_ms(watchdog, max_t(int, obd_timeout,
+                lc_watchdog_touch_ms(thread->t_watchdog, max_t(int, obd_timeout,
                                      AT_OFF ? 0 :
                                      at_get(&svc->srv_at_estimate)) *
                                      svc->srv_watchdog_factor);
@@ -1453,7 +1453,8 @@ static int ptlrpc_main(void *arg)
                 }
         }
 
-        lc_watchdog_delete(watchdog);
+        lc_watchdog_delete(thread->t_watchdog);
+        thread->t_watchdog = NULL;
 
 out_srv_init:
         /*
index c959bc9..d2a33dd 100644 (file)
@@ -341,8 +341,8 @@ int filter_quota_adjust_qunit(struct obd_export *exp,
                 uid = oqaq->qaq_id;
 
         if (rc > 0) {
-                rc = qctxt_adjust_qunit(obd, qctxt, uid, gid, 1, 0);
-                if (rc == -EDQUOT || rc == -EBUSY) {
+                rc = qctxt_adjust_qunit(obd, qctxt, uid, gid, 1, 0, NULL);
+                if (rc == -EDQUOT || rc == -EBUSY || rc == -EAGAIN) {
                         CDEBUG(D_QUOTA, "rc: %d.\n", rc);
                         rc = 0;
                 }
index 71c3dc5..d2402cc 100644 (file)
@@ -241,6 +241,13 @@ check_cur_qunit(struct obd_device *obd,
         if (!sb_any_quota_enabled(sb))
                 RETURN(0);
 
+        spin_lock(&qctxt->lqc_lock);
+        if (!qctxt->lqc_valid){
+                spin_unlock(&qctxt->lqc_lock);
+                RETURN(0);
+        }
+        spin_unlock(&qctxt->lqc_lock);
+
         OBD_ALLOC_PTR(qctl);
         if (qctl == NULL)
                 RETURN(-ENOMEM);
@@ -458,6 +465,7 @@ insert_qunit_nolock(struct lustre_quota_ctxt *qctxt, struct lustre_qunit *qunit)
         struct list_head *head;
 
         LASSERT(list_empty(&qunit->lq_hash));
+        qunit_get(qunit);
         head = qunit_hash + qunit_hashfn(qctxt, &qunit->lq_data);
         list_add(&qunit->lq_hash, head);
         QUNIT_SET_STATE(qunit, QUNIT_IN_HASH);
@@ -490,6 +498,7 @@ static void remove_qunit_nolock(struct lustre_qunit *qunit)
 
         list_del_init(&qunit->lq_hash);
         QUNIT_SET_STATE(qunit, QUNIT_RM_FROM_HASH);
+        qunit_put(qunit);
 }
 
 #define INC_QLIMIT(limit, count) (limit == MIN_QLIMIT) ? \
@@ -506,11 +515,13 @@ is_master(struct obd_device *obd, struct lustre_quota_ctxt *qctxt,
 
 static int
 schedule_dqacq(struct obd_device *obd, struct lustre_quota_ctxt *qctxt,
-               struct qunit_data *qdata, int opc, int wait);
+               struct qunit_data *qdata, int opc, int wait,
+               struct obd_trans_info *oti);
 
 static int split_before_schedule_dqacq(struct obd_device *obd,
                                        struct lustre_quota_ctxt *qctxt,
-                                       struct qunit_data *qdata, int opc, int wait)
+                                       struct qunit_data *qdata, int opc,
+                                       int wait, struct obd_trans_info *oti)
 {
         int rc = 0;
         unsigned long factor;
@@ -538,10 +549,10 @@ static int split_before_schedule_dqacq(struct obd_device *obd,
                 tmp_qdata.qd_count = factor;
                 qdata->qd_count -= tmp_qdata.qd_count;
                 QDATA_DEBUG((&tmp_qdata), "be split.\n");
-                rc = schedule_dqacq(obd, qctxt, &tmp_qdata, opc, wait);
+                rc = schedule_dqacq(obd, qctxt, &tmp_qdata, opc, wait, oti);
         } else{
                 QDATA_DEBUG(qdata, "don't be split.\n");
-                rc = schedule_dqacq(obd, qctxt, qdata, opc, wait);
+                rc = schedule_dqacq(obd, qctxt, qdata, opc, wait, oti);
         }
         do_gettimeofday(&work_end);
         timediff = cfs_timeval_sub(&work_end, &work_start, NULL);
@@ -703,7 +714,8 @@ out:
         if (rc1 > 0) {
                 int opc;
                 opc = rc1 == 1 ? QUOTA_DQACQ : QUOTA_DQREL;
-                rc1 = split_before_schedule_dqacq(obd, qctxt, qdata, opc, 0);
+                rc1 = split_before_schedule_dqacq(obd, qctxt, qdata, opc,
+                                                  0, NULL);
                 QDATA_DEBUG(qdata, "reschedudle opc(%d) rc(%d)\n", opc, rc1);
         }
         RETURN(err);
@@ -714,6 +726,10 @@ struct dqacq_async_args {
         struct lustre_qunit *aa_qunit;
 };
 
+#define QUOTA_NOSENT(rc)                                                \
+        (rc == -EIO || rc == -EINTR || rc == -ENOTCONN ||               \
+         rc == -ETIMEDOUT || rc == -EWOULDBLOCK)
+
 static int dqacq_interpret(struct ptlrpc_request *req, void *data, int rc)
 {
         struct dqacq_async_args *aa = (struct dqacq_async_args *)data;
@@ -733,14 +749,13 @@ static int dqacq_interpret(struct ptlrpc_request *req, void *data, int rc)
         if (!qdata)
                 RETURN(-ENOMEM);
 
-        if (rc == -EIO || rc == -EINTR || rc == -ENOTCONN )
-                /* if a quota req timeouts or is dropped, we should update quota
-                 * statistics which will be handled in dqacq_completion. And in
-                 * this situation we should get qdata from request instead of
-                 * reply */
-                rc1 = quota_get_qdata(req, qdata, QUOTA_REQUEST, QUOTA_IMPORT);
-        else
-                rc1 = quota_get_qdata(req, qdata, QUOTA_REPLY, QUOTA_IMPORT);
+        /* if a quota req timeouts or is dropped, we should update quota
+         * statistics which will be handled in dqacq_completion. And in
+         * this situation we should get qdata from request instead of
+         * reply */
+        rc1 = quota_get_qdata(req, qdata,
+                              QUOTA_NOSENT(rc) ? QUOTA_REQUEST : QUOTA_REPLY,
+                              QUOTA_IMPORT);
         if (rc1 < 0) {
                 DEBUG_REQ(D_ERROR, req, "error unpacking qunit_data\n");
                 GOTO(exit, rc = -EPROTO);
@@ -783,6 +798,20 @@ exit:
         RETURN(rc);
 }
 
+/* check if quota master is online */
+int check_qm(struct lustre_quota_ctxt *qctxt)
+{
+        int rc;
+        ENTRY;
+
+        spin_lock(&qctxt->lqc_lock);
+        /* quit waiting when mds is back or qctxt is cleaned up */
+        rc = qctxt->lqc_import || !qctxt->lqc_valid;
+        spin_unlock(&qctxt->lqc_lock);
+
+        RETURN(rc);
+}
+
 static int got_qunit(struct lustre_qunit *qunit)
 {
         int rc;
@@ -807,7 +836,8 @@ static int got_qunit(struct lustre_qunit *qunit)
 
 static int
 schedule_dqacq(struct obd_device *obd, struct lustre_quota_ctxt *qctxt,
-               struct qunit_data *qdata, int opc, int wait)
+               struct qunit_data *qdata, int opc, int wait,
+               struct obd_trans_info *oti)
 {
         struct lustre_qunit *qunit, *empty;
         struct l_wait_info lwi = { 0 };
@@ -829,7 +859,7 @@ schedule_dqacq(struct obd_device *obd, struct lustre_quota_ctxt *qctxt,
                 if (wait)
                         qunit_get(qunit);
                 spin_unlock(&qunit_hash_lock);
-                free_qunit(empty);
+                qunit_put(empty);
 
                 goto wait_completion;
         }
@@ -880,6 +910,23 @@ schedule_dqacq(struct obd_device *obd, struct lustre_quota_ctxt *qctxt,
                 wake_up(&qunit->lq_waitq);
 
                 qunit_put(qunit);
+                spin_lock(&qctxt->lqc_lock);
+                if (wait && !qctxt->lqc_import) {
+                        spin_unlock(&qctxt->lqc_lock);
+
+                        LASSERT(oti && oti->oti_thread &&
+                                oti->oti_thread->t_watchdog);
+
+                        lc_watchdog_disable(oti->oti_thread->t_watchdog);
+                        CDEBUG(D_QUOTA, "sleep for quota master\n");
+                        l_wait_event(qctxt->lqc_wait_for_qmaster,
+                                     check_qm(qctxt), &lwi);
+                        CDEBUG(D_QUOTA, "wake up when quota master is back\n");
+                        lc_watchdog_touch(oti->oti_thread->t_watchdog);
+                } else {
+                        spin_unlock(&qctxt->lqc_lock);
+                }
+
                 RETURN(-EAGAIN);
         }
         imp = class_import_get(qctxt->lqc_import);
@@ -911,12 +958,14 @@ schedule_dqacq(struct obd_device *obd, struct lustre_quota_ctxt *qctxt,
         rc = quota_copy_qdata(req, qdata, QUOTA_REQUEST, QUOTA_IMPORT);
         if (rc < 0) {
                 CDEBUG(D_ERROR, "Can't pack qunit_data\n");
+                class_import_put(imp);
                 RETURN(-EPROTO);
         }
         ptlrpc_req_set_repsize(req, 2, size);
+        req->rq_no_resend = req->rq_no_delay = 1;
         class_import_put(imp);
 
-        if (wait && qunit) 
+        if (wait && qunit)
                 qunit_get(qunit);
 
         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
@@ -955,9 +1004,10 @@ wait_completion:
 
 int
 qctxt_adjust_qunit(struct obd_device *obd, struct lustre_quota_ctxt *qctxt,
-                   uid_t uid, gid_t gid, __u32 isblk, int wait)
+                   uid_t uid, gid_t gid, __u32 isblk, int wait,
+                   struct obd_trans_info *oti)
 {
-        int ret, rc = 0, i = USRQUOTA;
+        int rc = 0, i = USRQUOTA;
         __u32 id[MAXQUOTAS] = { uid, gid };
         struct qunit_data qdata[MAXQUOTAS];
         ENTRY;
@@ -973,19 +1023,21 @@ qctxt_adjust_qunit(struct obd_device *obd, struct lustre_quota_ctxt *qctxt,
                         QDATA_SET_BLK(&qdata[i]);
                 qdata[i].qd_count = 0;
 
-                ret = check_cur_qunit(obd, qctxt, &qdata[i]);
-                if (ret > 0) {
+                rc = check_cur_qunit(obd, qctxt, &qdata[i]);
+                if (rc > 0) {
                         int opc;
                         /* need acquire or release */
-                        opc = ret == 1 ? QUOTA_DQACQ : QUOTA_DQREL;
-                        ret = split_before_schedule_dqacq(obd, qctxt, &qdata[i], 
-                                                          opc, wait);
-                        if (!rc)
-                                rc = ret;
+                        opc = rc == 1 ? QUOTA_DQACQ : QUOTA_DQREL;
+                        rc = split_before_schedule_dqacq(obd, qctxt, &qdata[i],
+                                                         opc, wait, oti);
+                        if (rc < 0)
+                                RETURN(rc);
                 } else if (wait == 1) {
                         /* when wait equates 1, that means mds_quota_acquire
                          * or filter_quota_acquire is calling it. */
-                        qctxt_wait_pending_dqacq(qctxt, id[i], i, isblk);
+                        rc = qctxt_wait_pending_dqacq(qctxt, id[i], i, isblk);
+                        if (rc < 0)
+                                RETURN(rc);
                 }
         }
 
@@ -1067,6 +1119,7 @@ qctxt_init(struct obd_device *obd, dqacq_handler_t handler)
         qctxt->lqc_import = NULL;
         qctxt->lqc_recovery = 0;
         qctxt->lqc_switch_qs = 1; /* Change qunit size in default setting */
+        qctxt->lqc_valid = 1;
         qctxt->lqc_cqs_boundary_factor = 4;
         qctxt->lqc_cqs_least_bunit = PTLRPC_MAX_BRW_SIZE;
         qctxt->lqc_cqs_least_iunit = 2;
@@ -1086,6 +1139,7 @@ qctxt_init(struct obd_device *obd, dqacq_handler_t handler)
                        obd->obd_name);
                 lustre_hash_exit(&LQC_HASH_BODY(qctxt));
         }
+        cfs_waitq_init(&qctxt->lqc_wait_for_qmaster);
         spin_unlock(&qctxt->lqc_lock);
 
 #ifdef LPROCFS
@@ -1105,6 +1159,10 @@ void qctxt_cleanup(struct lustre_quota_ctxt *qctxt, int force)
 
         INIT_LIST_HEAD(&tmp_list);
 
+        spin_lock(&qctxt->lqc_lock);
+        qctxt->lqc_valid = 0;
+        spin_unlock(&qctxt->lqc_lock);
+
         spin_lock(&qunit_hash_lock);
         for (i = 0; i < NR_DQHASH; i++) {
                 list_for_each_entry_safe(qunit, tmp, &qunit_hash[i], lq_hash) {
@@ -1127,6 +1185,14 @@ void qctxt_cleanup(struct lustre_quota_ctxt *qctxt, int force)
         }
 
         lustre_hash_exit(&LQC_HASH_BODY(qctxt));
+        /* after qctxt_cleanup, qctxt might be freed, then check_qm() is
+         * unpredicted. So we must wait until lqc_wait_for_qmaster is empty */
+        while (cfs_waitq_active(&qctxt->lqc_wait_for_qmaster)) {
+                cfs_waitq_signal(&qctxt->lqc_wait_for_qmaster);
+                cfs_schedule_timeout(CFS_TASK_INTERRUPTIBLE,
+                                     cfs_time_seconds(1));
+        }
+
         ptlrpcd_decref();
 
 #ifdef LPROCFS
@@ -1204,7 +1270,7 @@ static int qslave_recovery_main(void *arg)
                                 opc = ret == 1 ? QUOTA_DQACQ : QUOTA_DQREL;
                                 rc = split_before_schedule_dqacq(obd, qctxt,
                                                                  &qdata, opc,
-                                                                 0);
+                                                                 0, NULL);
                                 if (rc == -EDQUOT)
                                         rc = 0;
                         } else {
index 178a54e..1e10b4e 100644 (file)
@@ -232,8 +232,8 @@ adjust:
                 else
                         gid = oqctl->qc_id;
 
-                rc = qctxt_adjust_qunit(obd, &obd->u.obt.obt_qctxt, 
-                                        uid, gid, 1, 0);
+                rc = qctxt_adjust_qunit(obd, &obd->u.obt.obt_qctxt,
+                                        uid, gid, 1, 0, NULL);
                 if (rc == -EDQUOT || rc == -EBUSY) {
                         CDEBUG(D_QUOTA, "rc: %d.\n", rc);
                         rc = 0;
index 1c75df7..0097c94 100644 (file)
 #include "quota_internal.h"
 
 #ifdef __KERNEL__
+
+static cfs_time_t last_print = 0;
+static spinlock_t last_print_lock = SPIN_LOCK_UNLOCKED;
+
 static int filter_quota_setup(struct obd_device *obd)
 {
         int rc = 0;
@@ -96,11 +100,15 @@ static int filter_quota_cleanup(struct obd_device *obd)
 static int filter_quota_setinfo(struct obd_export *exp, struct obd_device *obd)
 {
         struct obd_import *imp;
+        struct lustre_quota_ctxt *qctxt = &obd->u.obt.obt_qctxt;
+        ENTRY;
 
         /* setup the quota context import */
         spin_lock(&obd->u.obt.obt_qctxt.lqc_lock);
         obd->u.obt.obt_qctxt.lqc_import = exp->exp_imp_reverse;
         spin_unlock(&obd->u.obt.obt_qctxt.lqc_lock);
+        CDEBUG(D_QUOTA, "%s: lqc_import(%p) of obd(%p) is reactivated now, \n",
+               obd->obd_name,exp->exp_imp_reverse, obd);
 
         /* make imp's connect flags equal relative exp's connect flags
          * adding it to avoid the scan export list
@@ -111,14 +119,16 @@ static int filter_quota_setinfo(struct obd_export *exp, struct obd_device *obd)
                         (exp->exp_connect_flags &
                          (OBD_CONNECT_QUOTA64 | OBD_CONNECT_CHANGE_QS));
 
+        cfs_waitq_signal(&qctxt->lqc_wait_for_qmaster);
         /* start quota slave recovery thread. (release high limits) */
         qslave_start_recovery(obd, &obd->u.obt.obt_qctxt);
-        return 0;
+        RETURN(0);
 }
 
 static int filter_quota_clearinfo(struct obd_export *exp, struct obd_device *obd)
 {
         struct lustre_quota_ctxt *qctxt = &obd->u.obt.obt_qctxt;
+        ENTRY;
 
         /* lquota may be not set up before destroying export, b=14896 */
         if (!obd->obd_set_up)
@@ -129,10 +139,12 @@ static int filter_quota_clearinfo(struct obd_export *exp, struct obd_device *obd
         if (qctxt->lqc_import == exp->exp_imp_reverse) {
                 spin_lock(&qctxt->lqc_lock);
                 qctxt->lqc_import = NULL;
+                CDEBUG(D_QUOTA, "%s: lqc_import of obd(%p) is invalid now.\n",
+                       obd->obd_name, obd);
                 spin_unlock(&qctxt->lqc_lock);
         }
 
-        return 0;
+        RETURN(0);
 }
 
 static int filter_quota_enforce(struct obd_device *obd, unsigned int ignore)
@@ -142,10 +154,12 @@ static int filter_quota_enforce(struct obd_device *obd, unsigned int ignore)
         if (!sb_any_quota_enabled(obd->u.obt.obt_sb))
                 RETURN(0);
 
-        if (ignore)
+        if (ignore) {
+                CDEBUG(D_QUOTA, "blocks will be written with ignoring quota.\n");
                 cap_raise(current->cap_effective, CAP_SYS_RESOURCE);
-        else
+        } else {
                 cap_lower(current->cap_effective, CAP_SYS_RESOURCE);
+        }
 
         RETURN(0);
 }
@@ -195,13 +209,13 @@ static int filter_quota_getflag(struct obd_device *obd, struct obdo *oa)
 }
 
 static int filter_quota_acquire(struct obd_device *obd, unsigned int uid,
-                                unsigned int gid)
+                                unsigned int gid, struct obd_trans_info *oti)
 {
         struct lustre_quota_ctxt *qctxt = &obd->u.obt.obt_qctxt;
         int rc;
         ENTRY;
 
-        rc = qctxt_adjust_qunit(obd, qctxt, uid, gid, LQUOTA_FLAGS_BLK, 1);
+        rc = qctxt_adjust_qunit(obd, qctxt, uid, gid, LQUOTA_FLAGS_BLK, 1, oti);
         RETURN(rc);
 }
 
@@ -221,6 +235,13 @@ static int quota_check_common(struct obd_device *obd, unsigned int uid,
         if (!sb_any_quota_enabled(qctxt->lqc_sb))
                 RETURN(rc);
 
+        spin_lock(&qctxt->lqc_lock);
+        if (!qctxt->lqc_valid){
+                spin_unlock(&qctxt->lqc_lock);
+                RETURN(rc);
+        }
+        spin_unlock(&qctxt->lqc_lock);
+
         for (i = 0; i < MAXQUOTAS; i++) {
                 struct lustre_qunit_size *lqs = NULL;
 
@@ -248,7 +269,8 @@ static int quota_check_common(struct obd_device *obd, unsigned int uid,
                                 lqs->lqs_iwrite_pending += count;
                 }
 
-                CDEBUG(D_QUOTA, "write pending: %lu, qd_count: "LPU64".\n",
+                CDEBUG(D_QUOTA, "count: %d, write pending: %lu, qd_count: "LPU64
+                       ".\n", count,
                        isblk ? lqs->lqs_bwrite_pending : lqs->lqs_iwrite_pending,
                        qdata[i].qd_count);
                 if (rc2[i] == QUOTA_RET_OK) {
@@ -280,13 +302,15 @@ static int quota_check_common(struct obd_device *obd, unsigned int uid,
 
 static int quota_chk_acq_common(struct obd_device *obd, unsigned int uid,
                                 unsigned int gid, int count, int *pending,
-                                int isblk, quota_acquire acquire)
+                                int isblk, quota_acquire acquire,
+                                struct obd_trans_info *oti)
 {
         struct lustre_quota_ctxt *qctxt = &obd->u.obt.obt_qctxt;
         struct timeval work_start;
         struct timeval work_end;
         long timediff;
-        int rc = 0, cycle = 0, count_err = 0;
+        struct l_wait_info lwi = { 0 };
+        int rc = 0, cycle = 0, count_err = 1;
         ENTRY;
 
         CDEBUG(D_QUOTA, "check quota for %s\n", obd->obd_name);
@@ -298,6 +322,23 @@ static int quota_chk_acq_common(struct obd_device *obd, unsigned int uid,
         while ((rc = quota_check_common(obd, uid, gid, count, cycle, isblk)) &
                QUOTA_RET_ACQUOTA) {
 
+                spin_lock(&qctxt->lqc_lock);
+                if (!qctxt->lqc_import && oti) {
+                        spin_unlock(&qctxt->lqc_lock);
+
+                        LASSERT(oti && oti->oti_thread &&
+                                oti->oti_thread->t_watchdog);
+
+                        lc_watchdog_disable(oti->oti_thread->t_watchdog);
+                        CDEBUG(D_QUOTA, "sleep for quota master\n");
+                        l_wait_event(qctxt->lqc_wait_for_qmaster, check_qm(qctxt),
+                                     &lwi);
+                        CDEBUG(D_QUOTA, "wake up when quota master is back\n");
+                        lc_watchdog_touch(oti->oti_thread->t_watchdog);
+                } else {
+                        spin_unlock(&qctxt->lqc_lock);
+                }
+
                 if (rc & QUOTA_RET_INC_PENDING)
                         *pending = 1;
 
@@ -306,7 +347,7 @@ static int quota_chk_acq_common(struct obd_device *obd, unsigned int uid,
                         OBD_FAIL_TIMEOUT(OBD_FAIL_OST_HOLD_WRITE_RPC, 90);
                 /* after acquire(), we should run quota_check_common again
                  * so that we confirm there are enough quota to finish write */
-                rc = acquire(obd, uid, gid);
+                rc = acquire(obd, uid, gid, oti);
 
                 /* please reference to dqacq_completion for the below */
                 /* a new request is finished, try again */
@@ -321,18 +362,34 @@ static int quota_chk_acq_common(struct obd_device *obd, unsigned int uid,
                         break;
                 }
 
-                /* -EBUSY and others, try 10 times */
-                if (rc < 0 && count_err < 10) {
-                        CDEBUG(D_QUOTA, "rc: %d, count_err: %d\n", rc, count_err++);
-                        cfs_schedule_timeout(CFS_TASK_INTERRUPTIBLE, HZ);
-                        continue;
+                /* -EBUSY and others, wait a second and try again */
+                if (rc < 0) {
+                        cfs_waitq_t        waitq;
+                        struct l_wait_info lwi;
+
+                        if (oti && oti->oti_thread && oti->oti_thread->t_watchdog)
+                                lc_watchdog_touch(oti->oti_thread->t_watchdog);
+                        CDEBUG(D_QUOTA, "rc: %d, count_err: %d\n", rc,
+                               count_err++);
+
+                        init_waitqueue_head(&waitq);
+                        lwi = LWI_TIMEOUT(cfs_time_seconds(min(cycle, 10)), NULL,
+                                          NULL);
+                        l_wait_event(waitq, 0, &lwi);
                 }
 
-                if (count_err >= 10 || cycle >= 1000) {
-                        CDEBUG(D_ERROR, "we meet 10 errors or run too many"
-                               " cycles when acquiring quota, quit checking with"
-                               " rc: %d, cycle: %d.\n", rc, cycle);
-                        break;
+                if (rc < 0 || cycle % 10 == 2) {
+                        spin_lock(&last_print_lock);
+                        if (last_print == 0 ||
+                            cfs_time_before((last_print + cfs_time_seconds(30)),
+                                            cfs_time_current())) {
+                                CWARN("still haven't managed to acquire quota "
+                                      "space from the quota master after %d "
+                                      "retries (err=%d, rc=%d)\n",
+                                      cycle, count_err - 1, rc);
+                                last_print = cfs_time_current();
+                        }
+                        spin_unlock(&last_print_lock);
                 }
 
                 CDEBUG(D_QUOTA, "recheck quota with rc: %d, cycle: %d\n", rc,
@@ -354,10 +411,10 @@ static int quota_chk_acq_common(struct obd_device *obd, unsigned int uid,
 
 static int filter_quota_check(struct obd_device *obd, unsigned int uid,
                               unsigned int gid, int npage, int *flag,
-                              quota_acquire acquire)
+                              quota_acquire acquire, struct obd_trans_info *oti)
 {
         return quota_chk_acq_common(obd, uid, gid, npage, flag, LQUOTA_FLAGS_BLK,
-                                    acquire);
+                                    acquire, oti);
 }
 
 /* when a block_write or inode_create rpc is finished, adjust the record for
@@ -500,19 +557,20 @@ static int mds_quota_fs_cleanup(struct obd_device *obd)
 
 static int mds_quota_check(struct obd_device *obd, unsigned int uid,
                            unsigned int gid, int inodes, int *flag,
-                           quota_acquire acquire)
+                           quota_acquire acquire, struct obd_trans_info *oti)
 {
-        return quota_chk_acq_common(obd, uid, gid, inodes, flag, 0, acquire);
+        return quota_chk_acq_common(obd, uid, gid, inodes, flag, 0,
+                                    acquire, oti);
 }
 
 static int mds_quota_acquire(struct obd_device *obd, unsigned int uid,
-                             unsigned int gid)
+                             unsigned int gid, struct obd_trans_info *oti)
 {
         struct lustre_quota_ctxt *qctxt = &obd->u.obt.obt_qctxt;
         int rc;
         ENTRY;
 
-        rc = qctxt_adjust_qunit(obd, qctxt, uid, gid, 0, 1);
+        rc = qctxt_adjust_qunit(obd, qctxt, uid, gid, 0, 1, oti);
         RETURN(rc);
 }
 
index 0f0a209..a357cbf 100644 (file)
@@ -99,7 +99,8 @@
 void qunit_cache_cleanup(void);
 int qunit_cache_init(void);
 int qctxt_adjust_qunit(struct obd_device *obd, struct lustre_quota_ctxt *qctxt,
-                       uid_t uid, gid_t gid, __u32 isblk, int wait);
+                       uid_t uid, gid_t gid, __u32 isblk, int wait,
+                       struct obd_trans_info *oti);
 int qctxt_wait_pending_dqacq(struct lustre_quota_ctxt *qctxt, unsigned int id,
                              unsigned short type, int isblk);
 int qctxt_init(struct obd_device *obd, dqacq_handler_t handler);
@@ -109,6 +110,7 @@ void qslave_start_recovery(struct obd_device *obd,
 int compute_remquota(struct obd_device *obd,
                      struct lustre_quota_ctxt *qctxt, struct qunit_data *qdata,
                      int isblk);
+int check_qm(struct lustre_quota_ctxt *qctxt);
 /* quota_master.c */
 int lustre_dquot_init(void);
 void lustre_dquot_exit(void);
index 65b3fd8..10c994b 100644 (file)
@@ -309,7 +309,7 @@ int dqacq_adjust_qunit_sz(struct obd_device *obd, qid_t id, int type,
 
         up(&dquot->dq_sem);
 
-        rc = qctxt_adjust_qunit(obd, qctxt, uid, gid, is_blk, 0);
+        rc = qctxt_adjust_qunit(obd, qctxt, uid, gid, is_blk, 0, NULL);
         if (rc == -EDQUOT || rc == -EBUSY) {
                 CDEBUG(D_QUOTA, "rc: %d.\n", rc);
                 rc = 0;
@@ -481,21 +481,26 @@ int mds_quota_adjust(struct obd_device *obd, unsigned int qcids[],
         switch (opc) {
         case FSFILT_OP_RENAME:
                 /* acquire/release block quota on owner of original parent */
-                rc2 = qctxt_adjust_qunit(obd, qctxt, qpids[2], qpids[3], 1, 0);
+                rc2 = qctxt_adjust_qunit(obd, qctxt, qpids[2], qpids[3], 1, 0,
+                                         NULL);
                 /* fall-through */
         case FSFILT_OP_SETATTR:
                 /* acquire/release file quota on original owner */
-                rc2 |= qctxt_adjust_qunit(obd, qctxt, qpids[0], qpids[1], 0, 0);
+                rc2 |= qctxt_adjust_qunit(obd, qctxt, qpids[0], qpids[1], 0, 0,
+                                          NULL);
                 /* fall-through */
         case FSFILT_OP_CREATE:
         case FSFILT_OP_UNLINK:
                 /* acquire/release file/block quota on owner of child
                  * (or current owner) */
-                rc2 |= qctxt_adjust_qunit(obd, qctxt, qcids[0], qcids[1], 0, 0);
-                rc2 |= qctxt_adjust_qunit(obd, qctxt, qcids[0], qcids[1], 1, 0);
+                rc2 |= qctxt_adjust_qunit(obd, qctxt, qcids[0], qcids[1], 0, 0,
+                                          NULL);
+                rc2 |= qctxt_adjust_qunit(obd, qctxt, qcids[0], qcids[1], 1, 0,
+                                          NULL);
                 /* acquire/release block quota on owner of parent
                  * (or original owner) */
-                rc2 |= qctxt_adjust_qunit(obd, qctxt, qpids[0], qpids[1], 1, 0);
+                rc2 |= qctxt_adjust_qunit(obd, qctxt, qpids[0], qpids[1], 1, 0,
+                                          NULL);
                 break;
         default:
                 LBUG();
@@ -521,14 +526,17 @@ int filter_quota_adjust(struct obd_device *obd, unsigned int qcids[],
         switch (opc) {
         case FSFILT_OP_SETATTR:
                 /* acquire/release block quota on original & current owner */
-                rc = qctxt_adjust_qunit(obd, qctxt, qcids[0], qcids[1], 1, 0);
-                rc2 = qctxt_adjust_qunit(obd, qctxt, qpids[0], qpids[1], 1, 0);
+                rc = qctxt_adjust_qunit(obd, qctxt, qcids[0], qcids[1], 1, 0,
+                                        NULL);
+                rc2 = qctxt_adjust_qunit(obd, qctxt, qpids[0], qpids[1], 1, 0,
+                                         NULL);
                 break;
         case FSFILT_OP_UNLINK:
                 /* release block quota on this owner */
         case FSFILT_OP_CREATE: /* XXX for write operation on obdfilter */
                 /* acquire block quota on this owner */
-                rc = qctxt_adjust_qunit(obd, qctxt, qcids[0], qcids[1], 1, 0);
+                rc = qctxt_adjust_qunit(obd, qctxt, qcids[0], qcids[1], 1, 0,
+                                        NULL);
                 break;
         default:
                 LBUG();
@@ -1145,7 +1153,8 @@ static int mds_init_slave_ilimits(struct obd_device *obd,
         else
                 gid = oqctl->qc_id;
 
-        rc = qctxt_adjust_qunit(obd, &obd->u.obt.obt_qctxt, uid, gid, 0, 0);
+        rc = qctxt_adjust_qunit(obd, &obd->u.obt.obt_qctxt, uid, gid, 0, 0,
+                                NULL);
         if (rc == -EDQUOT || rc == -EBUSY) {
                 CDEBUG(D_QUOTA, "rc: %d.\n", rc);
                 rc = 0;
@@ -1213,7 +1222,8 @@ static int mds_init_slave_blimits(struct obd_device *obd,
         /* initialize all slave's limit */
         rc = obd_quotactl(mds->mds_osc_exp, ioqc);
 
-        rc = qctxt_adjust_qunit(obd, &obd->u.obt.obt_qctxt, uid, gid, 1, 0);
+        rc = qctxt_adjust_qunit(obd, &obd->u.obt.obt_qctxt, uid, gid, 1, 0,
+                                NULL);
         if (rc == -EDQUOT || rc == -EBUSY) {
                 CDEBUG(D_QUOTA, "rc: %d.\n", rc);
                 rc = 0;
index a2841ff..8f328af 100644 (file)
@@ -48,7 +48,7 @@ init_test_env $@
 . ${CONFIG:=$LUSTRE/tests/cfg/$NAME.sh}
 DIRECTIO=${DIRECTIO:-$LUSTRE/tests/directio}
 
-[ "$SLOW" = "no" ] && EXCEPT_SLOW="9 10 11 21"
+[ "$SLOW" = "no" ] && EXCEPT_SLOW="9 10 11 18b 21"
 
 QUOTALOG=${TESTSUITELOG:-$TMP/$(basename $0 .sh).log}
 
@@ -1506,6 +1506,105 @@ run_to_block_limit() {
                error "(usr) write success, should be EDQUOT"
 }
 
+# test when mds do failover, the ost still could work well without trigger
+# watchdog b=14840
+test_18bc_sub() {
+        type=$1
+
+        LIMIT=$((110 * 1024 )) # 110M
+        TESTFILE="$DIR/$tdir/$tfile"
+        mkdir -p $DIR/$tdir
+
+        wait_delete_completed
+
+        set_blk_tunesz 512
+        set_blk_unitsz 1024
+
+        log "   User quota (limit: $LIMIT kbytes)"
+        $LFS setquota -u $TSTUSR -b 0 -B $LIMIT -i 0 -I 0 $MOUNT
+        $SHOW_QUOTA_USER
+
+        $LFS setstripe $TESTFILE -i 0 -c 1
+        chown $TSTUSR.$TSTUSR $TESTFILE
+
+        timeout=$(sysctl -n lustre.timeout)
+
+       if [ $type = "directio" ]; then
+           log "   write 100M block(directio) ..."
+           $RUNAS $DIRECTIO write $TESTFILE 0 100 $((BLK_SZ * 1024)) &
+       else
+           log "   write 100M block(normal) ..."
+           $RUNAS dd if=/dev/zero of=$TESTFILE bs=$((BLK_SZ * 1024)) count=100 &
+       fi
+
+        DDPID=$!
+        do_facet mds "$LCTL conf_param lustre-MDT0000.mdt.quota_type=ug"
+
+       log "failing mds for $((2 * timeout)) seconds"
+        fail mds $((2 * timeout))
+
+        # check if quotaon successful
+        $LFS quota -u $TSTUSR $MOUNT 2>&1 | grep -q "quotas are not enabled"
+        if [ $? -eq 0 ]; then
+            error "quotaon failed!"
+            rm -rf $TESTFILE
+            return
+        fi
+
+        count=0
+        while [ true ]; do
+            if [ -z `ps -ef | awk '$2 == '${DDPID}' { print $8 }'` ]; then break; fi
+            if [ $((++count % (2 * timeout) )) -eq 0 ]; then
+                log "it took $count second"
+            fi
+            sleep 1
+        done
+        log "(dd_pid=$DDPID, time=$count, timeout=$timeout)"
+        sync; sleep 1; sync
+
+        testfile_size=$(stat -c %s $TESTFILE)
+        [ $testfile_size -ne $((BLK_SZ * 1024 * 100)) ] && \
+            error "verifying file failed!"
+        $SHOW_QUOTA_USER
+        $LFS setquota -u $TSTUSR -b 0 -B 0 -i 0 -I 0 $MOUNT
+        rm -rf $TESTFILE
+        sync; sleep 1; sync
+}
+
+# test when mds does failover, the ost still could work well
+# this test shouldn't trigger watchdog b=14840
+test_18b() {
+       test_18bc_sub normal
+       test_18bc_sub directio
+       # check if watchdog is triggered
+       MSG="test 18b: run for fixing bug14840"
+       do_facet ost1 "dmesg > $TMP/lustre-log-${TESTNAME}.log"
+       do_facet client cat > $TMP/lustre-log-${TESTNAME}.awk <<-EOF
+               /$MSG/ {
+                   start = 1;
+               }
+               /Watchdog triggered/ {
+                   if (start) {
+                       print \$0;
+                   }
+               }
+       EOF
+       watchdog=`do_facet ost1 awk -f $TMP/lustre-log-${TESTNAME}.awk $TMP/lustre-log-${TESTNAME}.log`
+       if [ -n "$watchdog" ]; then error "$watchdog"; fi
+}
+run_test_with_stat 18b "run for fixing bug14840(mds failover, no watchdog) ==========="
+
+# test when mds does failover, the ost still could work well
+# this test will prevent OST_DISCONNET from happening b=14840
+test_18c() {
+       # define OBD_FAIL_OST_DISCONNECT_NET 0x202(disable ost_disconnect for osts)
+       lustre_fail ost  0x202
+       test_18bc_sub normal
+       test_18bc_sub directio
+       lustre_fail ost  0
+}
+run_test_with_stat 18c "run for fixing bug14840(mds failover, OST_DISCONNECT is disabled) ==========="
+
 test_19() {
        # 1 Mb bunit per each MDS/OSS
        local TESTFILE="$DIR/$tdir/$tfile"
index 6abf04c..628703e 100644 (file)
@@ -590,8 +590,10 @@ client_reconnect() {
 
 facet_failover() {
     facet=$1
+    sleep_time=$2
     echo "Failing $facet on node `facet_active_host $facet`"
     shutdown_facet $facet
+    [ -n "$sleep_time" ] && sleep $sleep_time
     reboot_facet $facet
     client_df &
     DFPID=$!