Whamcloud - gitweb
Branch HEAD
[fs/lustre-release.git] / lustre / quota / quota_context.c
index 917a162..54a9de4 100644 (file)
@@ -113,15 +113,17 @@ struct lustre_qunit {
         spinlock_t lq_lock;                /** Protect the whole structure */
         enum qunit_state lq_state;         /** Present the status of qunit */
         int lq_rc;                         /** The rc of lq_data */
+        pid_t lq_owner;
 };
 
 #define QUNIT_SET_STATE(qunit, state)                                   \
 do {                                                                    \
         spin_lock(&qunit->lq_lock);                                     \
         QDATA_DEBUG((&qunit->lq_data), "qunit(%p) lq_state(%s->%s), "   \
-                    "lq_rc(%d)\n",                                      \
+                    "lq_rc(%d), lq_owner(%d)\n",                        \
                     qunit, qunit_state_names[qunit->lq_state],          \
-                    qunit_state_names[state], qunit->lq_rc);            \
+                    qunit_state_names[state], qunit->lq_rc,             \
+                    qunit->lq_owner);                                   \
         qunit->lq_state = state;                                        \
         spin_unlock(&qunit->lq_lock);                                   \
 } while(0)
@@ -131,9 +133,10 @@ do {                                                                    \
         spin_lock(&qunit->lq_lock);                                     \
         qunit->lq_rc = rc;                                              \
         QDATA_DEBUG((&qunit->lq_data), "qunit(%p) lq_state(%s->%s), "   \
-                    "lq_rc(%d)\n",                                      \
+                    "lq_rc(%d), lq_owner(%d)\n",                        \
                     qunit, qunit_state_names[qunit->lq_state],          \
-                    qunit_state_names[state], qunit->lq_rc);            \
+                    qunit_state_names[state], qunit->lq_rc,             \
+                    qunit->lq_owner);                                   \
         qunit->lq_state = state;                                        \
         spin_unlock(&qunit->lq_lock);                                   \
 } while(0)
@@ -423,7 +426,7 @@ static struct lustre_qunit *alloc_qunit(struct lustre_quota_ctxt *qctxt,
         struct lustre_qunit *qunit = NULL;
         ENTRY;
 
-        OBD_SLAB_ALLOC(qunit, qunit_cachep, CFS_ALLOC_IO, sizeof(*qunit));
+        OBD_SLAB_ALLOC_PTR_GFP(qunit, qunit_cachep, CFS_ALLOC_IO);
         if (qunit == NULL)
                 RETURN(NULL);
 
@@ -435,6 +438,7 @@ static struct lustre_qunit *alloc_qunit(struct lustre_quota_ctxt *qctxt,
         qunit->lq_opc = opc;
         qunit->lq_lock = SPIN_LOCK_UNLOCKED;
         QUNIT_SET_STATE_AND_RC(qunit, QUNIT_CREATED, 0);
+        qunit->lq_owner = cfs_curproc_pid();
         RETURN(qunit);
 }
 
@@ -499,7 +503,6 @@ static void compute_lqs_after_removing_qunit(struct lustre_qunit *qunit)
                 /* this is for schedule_dqacq */
                 lqs_putref(lqs);
         }
-
 }
 
 static void remove_qunit_nolock(struct lustre_qunit *qunit)
@@ -541,6 +544,10 @@ dqacq_completion(struct obd_device *obd, struct lustre_quota_ctxt *qctxt,
         QDATA_DEBUG(qdata, "obd(%s): complete %s quota req\n",
                     obd->obd_name, (opc == QUOTA_DQACQ) ? "acq" : "rel");
 
+        /* do it only when a releasing quota req more than 5MB b=18491 */
+        if (opc == QUOTA_DQREL && qdata->qd_count >= 5242880)
+                OBD_FAIL_TIMEOUT(OBD_FAIL_QUOTA_DELAY_REL, 5);
+
         /* update local operational quota file */
         if (rc == 0) {
                 __u64 count = QUSG(qdata->qd_count, QDATA_IS_BLK(qdata));
@@ -636,8 +643,10 @@ out:
 
         compute_lqs_after_removing_qunit(qunit);
 
-        /* wake up all waiters */
+        if (rc == 0)
+                rc = QUOTA_REQ_RETURNED;
         QUNIT_SET_STATE_AND_RC(qunit, QUNIT_FINISHED, rc);
+        /* wake up all waiters */
         wake_up_all(&qunit->lq_waitq);
 
         /* this is for dqacq_in_flight() */
@@ -664,8 +673,12 @@ out:
                  CERROR("adjust slave's qunit size failed!(rc:%d)\n", rc1);
                  RETURN(rc1);
          }
-         if (err || (rc && rc != -EBUSY && rc1 == 0) || is_master(qctxt))
-                RETURN(err);
+         if (err || (rc < 0 && rc != -EBUSY && rc1 == 0) || is_master(qctxt))
+                 RETURN(err);
+
+         if (opc == QUOTA_DQREL && qdata->qd_count >= 5242880 &&
+             OBD_FAIL_CHECK(OBD_FAIL_QUOTA_DELAY_REL))
+                 RETURN(err);
 
         /* reschedule another dqacq/dqrel if needed */
         qdata->qd_count = 0;
@@ -694,30 +707,24 @@ static int dqacq_interpret(const struct lu_env *env,
         struct lustre_qunit *qunit = aa->aa_qunit;
         struct obd_device *obd = req->rq_import->imp_obd;
         struct qunit_data *qdata = NULL;
-        int rc1 = 0;
         ENTRY;
 
         LASSERT(req);
         LASSERT(req->rq_import);
 
-        /* there are several forms of qunit(historic causes), so we need to
-         * adjust qunit from slaves to the same form here */
-        OBD_ALLOC(qdata, sizeof(struct qunit_data));
-        if (!qdata)
-                RETURN(-ENOMEM);
-
         down_read(&obt->obt_rwsem);
         /* if a quota req timeouts or is dropped, we should update quota
          * statistics which will be handled in dqacq_completion. And in
          * this situation we should get qdata from request instead of
          * reply */
-        rc1 = quota_get_qdata(req, qdata,
-                              (rc != 0) ? QUOTA_REQUEST : QUOTA_REPLY,
-                              QUOTA_IMPORT);
-        if (rc1 < 0) {
+        qdata = quota_get_qdata(req, (rc != 0) ? QUOTA_REQUEST : QUOTA_REPLY,
+                                QUOTA_IMPORT);
+        if (IS_ERR(qdata)) {
+                rc = PTR_ERR(qdata);
                 DEBUG_REQ(D_ERROR, req,
-                          "error unpacking qunit_data(rc: %d)\n", rc1);
-                GOTO(exit, rc = rc1);
+                          "error unpacking qunit_data(rc: %ld)\n",
+                          PTR_ERR(qdata));
+                RETURN(PTR_ERR(qdata));
         }
 
         QDATA_DEBUG(qdata, "qdata: interpret rc(%d).\n", rc);
@@ -751,10 +758,7 @@ static int dqacq_interpret(const struct lu_env *env,
         rc = dqacq_completion(obd, qctxt, qdata, rc,
                               lustre_msg_get_opc(req->rq_reqmsg));
 
-exit:
         up_read(&obt->obt_rwsem);
-        OBD_FREE(qdata, sizeof(struct qunit_data));
-
         RETURN(rc);
 }
 
@@ -799,7 +803,7 @@ void dqacq_interrupt(struct lustre_quota_ctxt *qctxt)
         EXIT;
 }
 
-static int got_qunit(struct lustre_qunit *qunit)
+static int got_qunit(struct lustre_qunit *qunit, int is_master)
 {
         struct lustre_quota_ctxt *qctxt = qunit->lq_ctxt;
         int rc = 0;
@@ -820,7 +824,9 @@ static int got_qunit(struct lustre_qunit *qunit)
 
         if (!rc) {
                 spin_lock(&qctxt->lqc_lock);
-                rc = !qctxt->lqc_import || !qctxt->lqc_valid;
+                rc = !qctxt->lqc_valid;
+                if (!is_master)
+                        rc |= !qctxt->lqc_import;
                 spin_unlock(&qctxt->lqc_lock);
         }
 
@@ -931,7 +937,8 @@ schedule_dqacq(struct obd_device *obd, struct lustre_quota_ctxt *qctxt,
                         l_wait_event(qctxt->lqc_wait_for_qmaster,
                                      check_qm(qctxt), &lwi);
                         CDEBUG(D_QUOTA, "wake up when quota master is back\n");
-                        lc_watchdog_touch(oti->oti_thread->t_watchdog);
+                        lc_watchdog_touch(oti->oti_thread->t_watchdog,
+                                 GET_TIMEOUT(oti->oti_thread->t_svc));
                 } else {
                         spin_unlock(&qctxt->lqc_lock);
                 }
@@ -982,20 +989,20 @@ wait_completion:
                 struct qunit_data *p = &qunit->lq_data;
 
                 QDATA_DEBUG(p, "qunit(%p) is waiting for dqacq.\n", qunit);
-                l_wait_event(qunit->lq_waitq, got_qunit(qunit), &lwi);
-                /* rc = -EAGAIN, it means a quota req is finished;
+                l_wait_event(qunit->lq_waitq, got_qunit(qunit, is_master(qctxt)),
+                             &lwi);
+                /* rc = -EAGAIN, it means the quota master isn't ready yet
+                 * rc = QUOTA_REQ_RETURNED, it means a quota req is finished;
                  * rc = -EDQUOT, it means out of quota
                  * rc = -EBUSY, it means recovery is happening
                  * other rc < 0, it means real errors, functions who call
                  * schedule_dqacq should take care of this */
                 spin_lock(&qunit->lq_lock);
-                if (qunit->lq_rc == 0)
-                        rc = -EAGAIN;
-                else
-                        rc = qunit->lq_rc;
+                rc = qunit->lq_rc;
                 spin_unlock(&qunit->lq_lock);
-                CDEBUG(D_QUOTA, "qunit(%p) finishes waiting. (rc:%d)\n",
-                       qunit, rc);
+                CDEBUG(D_QUOTA, "qunit(%p) finishes waiting: id(%u) flag(%u) "
+                       "rc(%d) owner(%d)\n", qunit, qunit->lq_data.qd_id,
+                       qunit->lq_data.qd_flags, rc, qunit->lq_owner);
         }
 
         qunit_put(qunit);
@@ -1015,11 +1022,10 @@ wait_completion:
 
 int
 qctxt_adjust_qunit(struct obd_device *obd, struct lustre_quota_ctxt *qctxt,
-                   uid_t uid, gid_t gid, __u32 isblk, int wait,
+                   const unsigned int id[], __u32 isblk, int wait,
                    struct obd_trans_info *oti)
 {
         int rc = 0, i = USRQUOTA;
-        __u32 id[MAXQUOTAS] = { uid, gid };
         struct qunit_data qdata[MAXQUOTAS];
         ENTRY;
 
@@ -1083,15 +1089,13 @@ qctxt_wait_pending_dqacq(struct lustre_quota_ctxt *qctxt, unsigned int id,
                 struct qunit_data *p = &qunit->lq_data;
 
                 QDATA_DEBUG(p, "qunit(%p) is waiting for dqacq.\n", qunit);
-                l_wait_event(qunit->lq_waitq, got_qunit(qunit), &lwi);
-                CDEBUG(D_QUOTA, "qunit(%p) finishes waiting. (rc:%d)\n",
-                       qunit, qunit->lq_rc);
+                l_wait_event(qunit->lq_waitq, got_qunit(qunit, is_master(qctxt)),
+                             &lwi);
+                CDEBUG(D_QUOTA, "qunit(%p) finishes waiting: rc(%d) "
+                       "owner(%d)\n", qunit, qunit->lq_rc, qunit->lq_owner);
                 /* keep same as schedule_dqacq() b=17030 */
                 spin_lock(&qunit->lq_lock);
-                if (qunit->lq_rc == 0)
-                        rc = -EAGAIN;
-                else
-                        rc = qunit->lq_rc;
+                rc = qunit->lq_rc;
                 spin_unlock(&qunit->lq_lock);
                 /* this is for dqacq_in_flight() */
                 qunit_put(qunit);
@@ -1153,7 +1157,9 @@ qctxt_init(struct obd_device *obd, dqacq_handler_t handler)
         qctxt->lqc_sync_blk = 0;
         spin_unlock(&qctxt->lqc_lock);
 
-        qctxt->lqc_lqs_hash = lustre_hash_init("LQS_HASH", 7, 7,
+        qctxt->lqc_lqs_hash = lustre_hash_init("LQS_HASH",
+                                               HASH_LQS_CUR_BITS,
+                                               HASH_LQS_MAX_BITS,
                                                &lqs_hash_ops, 0);
         if (!qctxt->lqc_lqs_hash) {
                 CERROR("initialize hash lqs for %s error!\n", obd->obd_name);
@@ -1245,11 +1251,20 @@ static int qslave_recovery_main(void *arg)
 
         ptlrpc_daemonize("qslave_recovd");
 
+        /* for obdfilter */
+        class_incref(obd, "qslave_recovd_filter", obd);
+
         complete(&data->comp);
 
-        if (qctxt->lqc_recovery)
+        spin_lock(&qctxt->lqc_lock);
+        if (qctxt->lqc_recovery) {
+                spin_unlock(&qctxt->lqc_lock);
+                class_decref(obd, "qslave_recovd_filter", obd);
                 RETURN(0);
-        qctxt->lqc_recovery = 1;
+        } else {
+                qctxt->lqc_recovery = 1;
+                spin_unlock(&qctxt->lqc_lock);
+        }
 
         for (type = USRQUOTA; type < MAXQUOTAS; type++) {
                 struct qunit_data qdata;
@@ -1309,7 +1324,10 @@ free:
                 }
         }
 
+        spin_lock(&qctxt->lqc_lock);
         qctxt->lqc_recovery = 0;
+        spin_unlock(&qctxt->lqc_lock);
+        class_decref(obd, "qslave_recovd_filter", obd);
         RETURN(rc);
 }
 
@@ -1382,7 +1400,7 @@ lqs_compare(void *key, struct hlist_node *hnode)
 static void *
 lqs_get(struct hlist_node *hnode)
 {
-        struct lustre_qunit_size *q = 
+        struct lustre_qunit_size *q =
             hlist_entry(hnode, struct lustre_qunit_size, lqs_hash);
         ENTRY;
 
@@ -1396,7 +1414,7 @@ lqs_get(struct hlist_node *hnode)
 static void *
 lqs_put(struct hlist_node *hnode)
 {
-        struct lustre_qunit_size *q = 
+        struct lustre_qunit_size *q =
             hlist_entry(hnode, struct lustre_qunit_size, lqs_hash);
         ENTRY;
 
@@ -1415,12 +1433,12 @@ lqs_exit(struct hlist_node *hnode)
         ENTRY;
 
         q = hlist_entry(hnode, struct lustre_qunit_size, lqs_hash);
-        /* 
+        /*
          * Nothing should be left. User of lqs put it and
          * lqs also was deleted from table by this time
          * so we should have 0 refs.
          */
-        LASSERTF(atomic_read(&q->lqs_refcount) == 0, 
+        LASSERTF(atomic_read(&q->lqs_refcount) == 0,
                  "Busy lqs %p with %d refs\n", q,
                  atomic_read(&q->lqs_refcount));
         OBD_FREE_PTR(q);