Whamcloud - gitweb
Branch b1_6
authortianzy <tianzy>
Thu, 28 Feb 2008 05:33:09 +0000 (05:33 +0000)
committertianzy <tianzy>
Thu, 28 Feb 2008 05:33:09 +0000 (05:33 +0000)
After an unnecessary block in osts for waiting in-flight quota request is
deleted, a few places should be fixed in order to work with it.
b=14783
i=johann
i=andrew.perepechko

lustre/mds/mds_open.c
lustre/mds/mds_reint.c
lustre/obdfilter/filter_io_26.c
lustre/quota/quota_adjust_qunit.c
lustre/quota/quota_context.c
lustre/quota/quota_ctl.c
lustre/quota/quota_interface.c
lustre/quota/quota_master.c

index 53c4af6..e879d0e 100644 (file)
@@ -1024,11 +1024,11 @@ int mds_open(struct mds_update_record *rec, int offset,
                         gid = dparent->d_inode->i_gid;
                 else
                         gid = current->fsgid;
-                rc = lquota_chkquota(mds_quota_interface_ref, obd,
-                                     current->fsuid, gid, 1, &rec_pending);
 
-                if (rc < 0)
-                        GOTO(cleanup, rc);
+                /* we try to get enough quota to write here, and let ldiskfs
+                 * decide if it is out of quota or not b=14783 */
+                lquota_chkquota(mds_quota_interface_ref, obd,
+                                current->fsuid, gid, 1, &rec_pending);
 
                 intent_set_disposition(rep, DISP_OPEN_CREATE);
                 handle = fsfilt_start(obd, dparent->d_inode, FSFILT_OP_CREATE,
index 55a5cd1..f366f12 100644 (file)
@@ -842,11 +842,10 @@ static int mds_reint_create(struct mds_update_record *rec, int offset,
         else
                 gid = current->fsgid;
 
-        rc = lquota_chkquota(mds_quota_interface_ref, obd,
-                             current->fsuid, gid, 1, &rec_pending);
-
-        if (rc < 0)
-                GOTO(cleanup, rc);
+        /* we try to get enough quota to write here, and let ldiskfs
+         * decide if it is out of quota or not b=14783 */
+        lquota_chkquota(mds_quota_interface_ref, obd,
+                        current->fsuid, gid, 1, &rec_pending);
 
         switch (type) {
         case S_IFREG:{
index 01c1518..cf13af1 100644 (file)
@@ -633,7 +633,7 @@ int filter_commitrw_write(struct obd_export *exp, struct obdo *oa,
         int i, err, cleanup_phase = 0;
         struct obd_device *obd = exp->exp_obd;
         void *wait_handle;
-        int total_size = 0, rc2 = 0;
+        int total_size = 0;
         int rec_pending = 0;
         unsigned int qcids[MAXQUOTAS] = {0, 0};
         ENTRY;
@@ -645,15 +645,10 @@ int filter_commitrw_write(struct obd_export *exp, struct obdo *oa,
         if (rc != 0)
                 GOTO(cleanup, rc);
 
-        /* Unfortunately, if quota master is too busy to handle the
-         * pre-dqacq in time and quota hash on ost is used up, we
-         * have to wait for the completion of in flight dqacq/dqrel,
-         * in order not to get enough quota for write b=12588 */
-        rc2 = lquota_chkquota(filter_quota_interface_ref, obd, oa->o_uid,
-                              oa->o_gid, niocount, &rec_pending);
-
-        if (rc2 < 0)
-                GOTO(cleanup, rc = rc2);
+        /* we try to get enough quota to write here, and let ldiskfs
+         * decide if it is out of quota or not b=14783 */
+        lquota_chkquota(filter_quota_interface_ref, obd, oa->o_uid,
+                        oa->o_gid, niocount, &rec_pending);
 
         iobuf = filter_iobuf_get(&obd->u.filter, oti);
         if (IS_ERR(iobuf))
index d22c3f9..bcab209 100644 (file)
@@ -320,6 +320,10 @@ int filter_quota_adjust_qunit(struct obd_export *exp,
 
         if (rc > 0) {
                 rc = qctxt_adjust_qunit(obd, qctxt, uid, gid, 1, 0);
+                if (rc == -EDQUOT || rc == -EBUSY) {
+                        CDEBUG(D_QUOTA, "rc: %d.\n", rc);
+                        rc = 0;
+                }
                 if (rc)
                         CERROR("slave adjust block quota failed!(rc:%d)\n", rc);
         }
index bb61f07..173a8d7 100644 (file)
@@ -258,12 +258,6 @@ check_cur_qunit(struct obd_device *obd,
                record, qunit_sz, tune_sz, ret);
         LASSERT(ret == 0 || qdata->qd_count);
 
-        if (ret > 0) {
-                quota_compute_lqs(qdata, lqs, 1, (ret == 1) ? 1 : 0);
-                /* when this qdata returned from mds, it will call lqs_putref */
-                lqs_getref(lqs);
-        }
-
         spin_unlock(&lqs->lqs_lock);
         lqs_putref(lqs);
         EXIT;
@@ -400,7 +394,7 @@ static void compute_lqs_after_removing_qunit(struct lustre_qunit *qunit)
                 spin_unlock(&lqs->lqs_lock);
                 /* this is for quota_search_lqs */
                 lqs_putref(lqs);
-                /* this is for check_cur_qunit */
+                /* this is for schedule_dqacq */
                 lqs_putref(lqs);
         }
 
@@ -459,7 +453,7 @@ static int split_before_schedule_dqacq(struct obd_device *obd,
             qdata->qd_count > factor) {
                 tmp_qdata = *qdata;
                 tmp_qdata.qd_count = factor;
-                        qdata->qd_count -= tmp_qdata.qd_count;
+                qdata->qd_count -= tmp_qdata.qd_count;
                 QDATA_DEBUG((&tmp_qdata), "be split.\n");
                 rc = schedule_dqacq(obd, qctxt, &tmp_qdata, opc, wait);
         } else{
@@ -699,6 +693,7 @@ schedule_dqacq(struct obd_device *obd, struct lustre_quota_ctxt *qctxt,
         int size[2] = { sizeof(struct ptlrpc_body), 0 };
         struct obd_import *imp = NULL;
         unsigned long factor;
+        struct lustre_qunit_size *lqs = NULL;
         int rc = 0;
         ENTRY;
 
@@ -713,27 +708,11 @@ schedule_dqacq(struct obd_device *obd, struct lustre_quota_ctxt *qctxt,
 
         qunit = dqacq_in_flight(qctxt, qdata);
         if (qunit) {
-                struct lustre_qunit_size *lqs = NULL;
-
                 if (wait)
                         list_add_tail(&qw.qw_entry, &qunit->lq_waiters);
                 spin_unlock(&qunit_hash_lock);
                 free_qunit(empty);
 
-                quota_search_lqs(qdata, NULL, qctxt, &lqs);
-                if (lqs) {
-                        spin_lock(&lqs->lqs_lock);
-                        quota_compute_lqs(qdata, lqs, 0,
-                                          (opc == QUOTA_DQACQ) ? 1 : 0);
-                        spin_unlock(&lqs->lqs_lock);
-                        /* this is for quota_search_lqs */
-                        lqs_putref(lqs);
-                        /* this is for check_cur_qunit */
-                        lqs_putref(lqs);
-                } else {
-                        CDEBUG(D_ERROR, "Can't find the lustre qunit size!\n");
-                }
-
                 goto wait_completion;
         }
         qunit = empty;
@@ -744,6 +723,19 @@ schedule_dqacq(struct obd_device *obd, struct lustre_quota_ctxt *qctxt,
 
         LASSERT(qunit);
 
+        quota_search_lqs(qdata, NULL, qctxt, &lqs);
+        if (lqs) {
+                spin_lock(&lqs->lqs_lock);
+                quota_compute_lqs(qdata, lqs, 1, (opc == QUOTA_DQACQ) ? 1 : 0);
+                /* when this qdata returned from mds, it will call lqs_putref */
+                lqs_getref(lqs);
+                spin_unlock(&lqs->lqs_lock);
+                /* this is for quota_search_lqs */
+                lqs_putref(lqs);
+        } else {
+                CDEBUG(D_ERROR, "Can't find the lustre qunit size!\n");
+        }
+
         QDATA_DEBUG(qdata, "obd(%s): send %s quota req\n",
                     obd->obd_name, (opc == QUOTA_DQACQ) ? "acq" : "rel");
         /* master is going to dqacq/dqrel from itself */
@@ -753,7 +745,7 @@ schedule_dqacq(struct obd_device *obd, struct lustre_quota_ctxt *qctxt,
                             opc == QUOTA_DQACQ ? "DQACQ" : "DQREL");
                 rc = qctxt->lqc_handler(obd, qdata, opc);
                 rc2 = dqacq_completion(obd, qctxt, qdata, rc, opc);
-                RETURN((rc && rc != -EDQUOT) ? rc : rc2);
+                RETURN(rc ? rc : rc2);
         }
 
         spin_lock(&qctxt->lqc_lock);
@@ -822,8 +814,15 @@ wait_completion:
                 QDATA_DEBUG(p, "wait for dqacq.\n");
 
                 l_wait_event(qw.qw_waitq, got_qunit(&qw), &lwi);
+                /* rc = -EAGAIN, it means a quota req is finished;
+                 * rc = -EDQUOT, it means out of quota
+                 * rc = -EBUSY, it means recovery is happening
+                 * other rc < 0, it means real errors, functions who call
+                 * schedule_dqacq should take care of this */
                 if (qw.qw_rc == 0)
                         rc = -EAGAIN;
+                else
+                        rc = qw.qw_rc;
 
                 CDEBUG(D_QUOTA, "wait dqacq done. (rc:%d)\n", qw.qw_rc);
         }
@@ -859,6 +858,10 @@ qctxt_adjust_qunit(struct obd_device *obd, struct lustre_quota_ctxt *qctxt,
                                                           opc, wait);
                         if (!rc)
                                 rc = ret;
+                } else if (wait == 1) {
+                        /* when wait equates 1, that means mds_quota_acquire
+                         * or filter_quota_acquire is calling it. */
+                        qctxt_wait_pending_dqacq(qctxt, id[i], i, isblk);
                 }
         }
 
@@ -1053,6 +1056,8 @@ static int qslave_recovery_main(void *arg)
                                 rc = split_before_schedule_dqacq(obd, qctxt,
                                                                  &qdata, opc,
                                                                  0);
+                                if (rc == -EDQUOT)
+                                        rc = 0;
                         } else {
                                 rc = 0;
                         }
index 30ae7c9..3bd569b 100644 (file)
@@ -187,6 +187,11 @@ adjust:
 
                 rc = qctxt_adjust_qunit(obd, &obd->u.obt.obt_qctxt, 
                                         uid, gid, 1, 0);
+                if (rc == -EDQUOT || rc == -EBUSY) {
+                        CDEBUG(D_QUOTA, "rc: %d.\n", rc);
+                        rc = 0;
+                }
+
                 break;
                 }
         default:
index 7e0d83e..ada2ce6 100644 (file)
@@ -402,10 +402,15 @@ static int quota_check_common(struct obd_device *obd, unsigned int uid,
                 }
 
                 spin_unlock(&lqs->lqs_lock);
+
                 /* When cycle is zero, lqs_*_pending will be changed. We will
-                 * putref lqs in quota_pending_commit instead of here b=14784 */
-                if (cycle)
-                        lqs_putref(lqs);
+                 * get reference of the lqs here and put reference of lqs in
+                 * quota_pending_commit b=14784 */
+                if (!cycle)
+                        lqs_getref(lqs);
+
+                /* this is for quota_search_lqs */
+                lqs_putref(lqs);
         }
 
         if (rc2[0] == QUOTA_RET_ACQUOTA || rc2[1] == QUOTA_RET_ACQUOTA)
@@ -418,9 +423,13 @@ static int quota_chk_acq_common(struct obd_device *obd, unsigned int uid,
                                 unsigned int gid, int count, int *pending,
                                 int isblk, quota_acquire acquire)
 {
-        int rc = 0, cycle = 0;
+        int rc = 0, cycle = 0, count_err = 0;
         ENTRY;
 
+        /* Unfortunately, if quota master is too busy to handle the
+         * pre-dqacq in time and quota hash on ost is used up, we
+         * have to wait for the completion of in flight dqacq/dqrel,
+         * in order to get enough quota for write b=12588 */
         while ((rc = quota_check_common(obd, uid, gid, count, cycle, isblk)) &
                QUOTA_RET_ACQUOTA) {
 
@@ -430,6 +439,8 @@ static int quota_chk_acq_common(struct obd_device *obd, unsigned int uid,
                 cycle++;
                 if (isblk)
                         OBD_FAIL_TIMEOUT(OBD_FAIL_OST_HOLD_WRITE_RPC, 90);
+                /* after acquire(), we should run quota_check_common again
+                 * so that we confirm there are enough quota to finish write */
                 rc = acquire(obd, uid, gid);
 
                 /* please reference to dqacq_completion for the below */
@@ -446,14 +457,21 @@ static int quota_chk_acq_common(struct obd_device *obd, unsigned int uid,
                 }
 
                 /* -EBUSY and others, try 10 times */
-                if (rc < 0 && cycle < 10) {
-                        CDEBUG(D_QUOTA, "rc: %d, cycle: %d\n", rc, cycle);
+                if (rc < 0 && count_err < 10) {
+                        CDEBUG(D_QUOTA, "rc: %d, count_err: %d\n", rc, count_err++);
                         cfs_schedule_timeout(CFS_TASK_INTERRUPTIBLE, HZ);
                         continue;
                 }
 
-                CDEBUG(D_QUOTA, "exit with rc: %d\n", rc);
-                break;
+                if (count_err >= 10 || cycle >= 1000) {
+                        CDEBUG(D_ERROR, "we meet 10 errors or run too many"
+                               " cycles when acquiring quota, quit checking with"
+                               " rc: %d, cycle: %d.\n", rc, cycle);
+                        break;
+                }
+
+                CDEBUG(D_QUOTA, "recheck quota with rc: %d, cycle: %d\n", rc,
+                       cycle);
         }
 
         if (!cycle && rc & QUOTA_RET_INC_PENDING)
index 16bd3a9..42b20a2 100644 (file)
@@ -282,6 +282,10 @@ int dqacq_adjust_qunit_sz(struct obd_device *obd, qid_t id, int type,
         up(&dquot->dq_sem);
 
         rc = qctxt_adjust_qunit(obd, qctxt, uid, gid, is_blk, 0);
+        if (rc == -EDQUOT || rc == -EBUSY) {
+                CDEBUG(D_QUOTA, "rc: %d.\n", rc);
+                rc = 0;
+        }
         if (rc) {
                 CDEBUG(D_ERROR, "mds fail to adjust file quota! \
                                (rc:%d)\n", rc);
@@ -1093,6 +1097,10 @@ static int mds_init_slave_ilimits(struct obd_device *obd,
                 gid = oqctl->qc_id;
 
         rc = qctxt_adjust_qunit(obd, &obd->u.obt.obt_qctxt, uid, gid, 0, 0);
+        if (rc == -EDQUOT || rc == -EBUSY) {
+                CDEBUG(D_QUOTA, "rc: %d.\n", rc);
+                rc = 0;
+        }
         if (rc) {
                 CDEBUG(D_QUOTA,"error mds adjust local file quota! (rc:%d)\n",
                        rc);
@@ -1157,6 +1165,10 @@ static int mds_init_slave_blimits(struct obd_device *obd,
         rc = obd_quotactl(mds->mds_osc_exp, ioqc);
 
         rc = qctxt_adjust_qunit(obd, &obd->u.obt.obt_qctxt, uid, gid, 1, 0);
+        if (rc == -EDQUOT || rc == -EBUSY) {
+                CDEBUG(D_QUOTA, "rc: %d.\n", rc);
+                rc = 0;
+        }
         if (rc) {
                 CERROR("error mds adjust local block quota! (rc:%d)\n", rc);
                 GOTO(out, rc);