Whamcloud - gitweb
Branch b1_8_gate
authortianzy <tianzy>
Thu, 11 Sep 2008 14:51:41 +0000 (14:51 +0000)
committertianzy <tianzy>
Thu, 11 Sep 2008 14:51:41 +0000 (14:51 +0000)
This patch includes att18982, att18236, att18237,att18983 and att19061 in bz14840.
Slove "quota recovery deadlock during mds failover", it includes:
1. fix osts hang when mds does failover with quotaon
2. prevent watchdog storm when osts threads wait for the
   recovery of mds
b=14840
i=johann
i=shadow
i=panda

lustre/quota/quota_interface.c

index 1c75df7..0097c94 100644 (file)
 #include "quota_internal.h"
 
 #ifdef __KERNEL__
+
+static cfs_time_t last_print = 0;
+static spinlock_t last_print_lock = SPIN_LOCK_UNLOCKED;
+
 static int filter_quota_setup(struct obd_device *obd)
 {
         int rc = 0;
@@ -96,11 +100,15 @@ static int filter_quota_cleanup(struct obd_device *obd)
 static int filter_quota_setinfo(struct obd_export *exp, struct obd_device *obd)
 {
         struct obd_import *imp;
+        struct lustre_quota_ctxt *qctxt = &obd->u.obt.obt_qctxt;
+        ENTRY;
 
         /* setup the quota context import */
         spin_lock(&obd->u.obt.obt_qctxt.lqc_lock);
         obd->u.obt.obt_qctxt.lqc_import = exp->exp_imp_reverse;
         spin_unlock(&obd->u.obt.obt_qctxt.lqc_lock);
+        CDEBUG(D_QUOTA, "%s: lqc_import(%p) of obd(%p) is reactivated now, \n",
+               obd->obd_name,exp->exp_imp_reverse, obd);
 
         /* make imp's connect flags equal relative exp's connect flags
          * adding it to avoid the scan export list
@@ -111,14 +119,16 @@ static int filter_quota_setinfo(struct obd_export *exp, struct obd_device *obd)
                         (exp->exp_connect_flags &
                          (OBD_CONNECT_QUOTA64 | OBD_CONNECT_CHANGE_QS));
 
+        cfs_waitq_signal(&qctxt->lqc_wait_for_qmaster);
         /* start quota slave recovery thread. (release high limits) */
         qslave_start_recovery(obd, &obd->u.obt.obt_qctxt);
-        return 0;
+        RETURN(0);
 }
 
 static int filter_quota_clearinfo(struct obd_export *exp, struct obd_device *obd)
 {
         struct lustre_quota_ctxt *qctxt = &obd->u.obt.obt_qctxt;
+        ENTRY;
 
         /* lquota may be not set up before destroying export, b=14896 */
         if (!obd->obd_set_up)
@@ -129,10 +139,12 @@ static int filter_quota_clearinfo(struct obd_export *exp, struct obd_device *obd
         if (qctxt->lqc_import == exp->exp_imp_reverse) {
                 spin_lock(&qctxt->lqc_lock);
                 qctxt->lqc_import = NULL;
+                CDEBUG(D_QUOTA, "%s: lqc_import of obd(%p) is invalid now.\n",
+                       obd->obd_name, obd);
                 spin_unlock(&qctxt->lqc_lock);
         }
 
-        return 0;
+        RETURN(0);
 }
 
 static int filter_quota_enforce(struct obd_device *obd, unsigned int ignore)
@@ -142,10 +154,12 @@ static int filter_quota_enforce(struct obd_device *obd, unsigned int ignore)
         if (!sb_any_quota_enabled(obd->u.obt.obt_sb))
                 RETURN(0);
 
-        if (ignore)
+        if (ignore) {
+                CDEBUG(D_QUOTA, "blocks will be written with ignoring quota.\n");
                 cap_raise(current->cap_effective, CAP_SYS_RESOURCE);
-        else
+        } else {
                 cap_lower(current->cap_effective, CAP_SYS_RESOURCE);
+        }
 
         RETURN(0);
 }
@@ -195,13 +209,13 @@ static int filter_quota_getflag(struct obd_device *obd, struct obdo *oa)
 }
 
 static int filter_quota_acquire(struct obd_device *obd, unsigned int uid,
-                                unsigned int gid)
+                                unsigned int gid, struct obd_trans_info *oti)
 {
         struct lustre_quota_ctxt *qctxt = &obd->u.obt.obt_qctxt;
         int rc;
         ENTRY;
 
-        rc = qctxt_adjust_qunit(obd, qctxt, uid, gid, LQUOTA_FLAGS_BLK, 1);
+        rc = qctxt_adjust_qunit(obd, qctxt, uid, gid, LQUOTA_FLAGS_BLK, 1, oti);
         RETURN(rc);
 }
 
@@ -221,6 +235,13 @@ static int quota_check_common(struct obd_device *obd, unsigned int uid,
         if (!sb_any_quota_enabled(qctxt->lqc_sb))
                 RETURN(rc);
 
+        spin_lock(&qctxt->lqc_lock);
+        if (!qctxt->lqc_valid){
+                spin_unlock(&qctxt->lqc_lock);
+                RETURN(rc);
+        }
+        spin_unlock(&qctxt->lqc_lock);
+
         for (i = 0; i < MAXQUOTAS; i++) {
                 struct lustre_qunit_size *lqs = NULL;
 
@@ -248,7 +269,8 @@ static int quota_check_common(struct obd_device *obd, unsigned int uid,
                                 lqs->lqs_iwrite_pending += count;
                 }
 
-                CDEBUG(D_QUOTA, "write pending: %lu, qd_count: "LPU64".\n",
+                CDEBUG(D_QUOTA, "count: %d, write pending: %lu, qd_count: "LPU64
+                       ".\n", count,
                        isblk ? lqs->lqs_bwrite_pending : lqs->lqs_iwrite_pending,
                        qdata[i].qd_count);
                 if (rc2[i] == QUOTA_RET_OK) {
@@ -280,13 +302,15 @@ static int quota_check_common(struct obd_device *obd, unsigned int uid,
 
 static int quota_chk_acq_common(struct obd_device *obd, unsigned int uid,
                                 unsigned int gid, int count, int *pending,
-                                int isblk, quota_acquire acquire)
+                                int isblk, quota_acquire acquire,
+                                struct obd_trans_info *oti)
 {
         struct lustre_quota_ctxt *qctxt = &obd->u.obt.obt_qctxt;
         struct timeval work_start;
         struct timeval work_end;
         long timediff;
-        int rc = 0, cycle = 0, count_err = 0;
+        struct l_wait_info lwi = { 0 };
+        int rc = 0, cycle = 0, count_err = 1;
         ENTRY;
 
         CDEBUG(D_QUOTA, "check quota for %s\n", obd->obd_name);
@@ -298,6 +322,23 @@ static int quota_chk_acq_common(struct obd_device *obd, unsigned int uid,
         while ((rc = quota_check_common(obd, uid, gid, count, cycle, isblk)) &
                QUOTA_RET_ACQUOTA) {
 
+                spin_lock(&qctxt->lqc_lock);
+                if (!qctxt->lqc_import && oti) {
+                        spin_unlock(&qctxt->lqc_lock);
+
+                        LASSERT(oti && oti->oti_thread &&
+                                oti->oti_thread->t_watchdog);
+
+                        lc_watchdog_disable(oti->oti_thread->t_watchdog);
+                        CDEBUG(D_QUOTA, "sleep for quota master\n");
+                        l_wait_event(qctxt->lqc_wait_for_qmaster, check_qm(qctxt),
+                                     &lwi);
+                        CDEBUG(D_QUOTA, "wake up when quota master is back\n");
+                        lc_watchdog_touch(oti->oti_thread->t_watchdog);
+                } else {
+                        spin_unlock(&qctxt->lqc_lock);
+                }
+
                 if (rc & QUOTA_RET_INC_PENDING)
                         *pending = 1;
 
@@ -306,7 +347,7 @@ static int quota_chk_acq_common(struct obd_device *obd, unsigned int uid,
                         OBD_FAIL_TIMEOUT(OBD_FAIL_OST_HOLD_WRITE_RPC, 90);
                 /* after acquire(), we should run quota_check_common again
                  * so that we confirm there are enough quota to finish write */
-                rc = acquire(obd, uid, gid);
+                rc = acquire(obd, uid, gid, oti);
 
                 /* please reference to dqacq_completion for the below */
                 /* a new request is finished, try again */
@@ -321,18 +362,34 @@ static int quota_chk_acq_common(struct obd_device *obd, unsigned int uid,
                         break;
                 }
 
-                /* -EBUSY and others, try 10 times */
-                if (rc < 0 && count_err < 10) {
-                        CDEBUG(D_QUOTA, "rc: %d, count_err: %d\n", rc, count_err++);
-                        cfs_schedule_timeout(CFS_TASK_INTERRUPTIBLE, HZ);
-                        continue;
+                /* -EBUSY and others, wait a second and try again */
+                if (rc < 0) {
+                        cfs_waitq_t        waitq;
+                        struct l_wait_info lwi;
+
+                        if (oti && oti->oti_thread && oti->oti_thread->t_watchdog)
+                                lc_watchdog_touch(oti->oti_thread->t_watchdog);
+                        CDEBUG(D_QUOTA, "rc: %d, count_err: %d\n", rc,
+                               count_err++);
+
+                        init_waitqueue_head(&waitq);
+                        lwi = LWI_TIMEOUT(cfs_time_seconds(min(cycle, 10)), NULL,
+                                          NULL);
+                        l_wait_event(waitq, 0, &lwi);
                 }
 
-                if (count_err >= 10 || cycle >= 1000) {
-                        CDEBUG(D_ERROR, "we meet 10 errors or run too many"
-                               " cycles when acquiring quota, quit checking with"
-                               " rc: %d, cycle: %d.\n", rc, cycle);
-                        break;
+                if (rc < 0 || cycle % 10 == 2) {
+                        spin_lock(&last_print_lock);
+                        if (last_print == 0 ||
+                            cfs_time_before((last_print + cfs_time_seconds(30)),
+                                            cfs_time_current())) {
+                                CWARN("still haven't managed to acquire quota "
+                                      "space from the quota master after %d "
+                                      "retries (err=%d, rc=%d)\n",
+                                      cycle, count_err - 1, rc);
+                                last_print = cfs_time_current();
+                        }
+                        spin_unlock(&last_print_lock);
                 }
 
                 CDEBUG(D_QUOTA, "recheck quota with rc: %d, cycle: %d\n", rc,
@@ -354,10 +411,10 @@ static int quota_chk_acq_common(struct obd_device *obd, unsigned int uid,
 
 static int filter_quota_check(struct obd_device *obd, unsigned int uid,
                               unsigned int gid, int npage, int *flag,
-                              quota_acquire acquire)
+                              quota_acquire acquire, struct obd_trans_info *oti)
 {
         return quota_chk_acq_common(obd, uid, gid, npage, flag, LQUOTA_FLAGS_BLK,
-                                    acquire);
+                                    acquire, oti);
 }
 
 /* when a block_write or inode_create rpc is finished, adjust the record for
@@ -500,19 +557,20 @@ static int mds_quota_fs_cleanup(struct obd_device *obd)
 
 static int mds_quota_check(struct obd_device *obd, unsigned int uid,
                            unsigned int gid, int inodes, int *flag,
-                           quota_acquire acquire)
+                           quota_acquire acquire, struct obd_trans_info *oti)
 {
-        return quota_chk_acq_common(obd, uid, gid, inodes, flag, 0, acquire);
+        return quota_chk_acq_common(obd, uid, gid, inodes, flag, 0,
+                                    acquire, oti);
 }
 
 static int mds_quota_acquire(struct obd_device *obd, unsigned int uid,
-                             unsigned int gid)
+                             unsigned int gid, struct obd_trans_info *oti)
 {
         struct lustre_quota_ctxt *qctxt = &obd->u.obt.obt_qctxt;
         int rc;
         ENTRY;
 
-        rc = qctxt_adjust_qunit(obd, qctxt, uid, gid, 0, 1);
+        rc = qctxt_adjust_qunit(obd, qctxt, uid, gid, 0, 1, oti);
         RETURN(rc);
 }