Whamcloud - gitweb
Branch HEAD
[fs/lustre-release.git] / lustre / quota / quota_context.c
index 6bcbcc5..c96cbc2 100644 (file)
@@ -113,15 +113,17 @@ struct lustre_qunit {
         spinlock_t lq_lock;                /** Protect the whole structure */
         enum qunit_state lq_state;         /** Present the status of qunit */
         int lq_rc;                         /** The rc of lq_data */
+        pid_t lq_owner;
 };
 
 #define QUNIT_SET_STATE(qunit, state)                                   \
 do {                                                                    \
         spin_lock(&qunit->lq_lock);                                     \
         QDATA_DEBUG((&qunit->lq_data), "qunit(%p) lq_state(%s->%s), "   \
-                    "lq_rc(%d)\n",                                      \
+                    "lq_rc(%d), lq_owner(%d)\n",                        \
                     qunit, qunit_state_names[qunit->lq_state],          \
-                    qunit_state_names[state], qunit->lq_rc);            \
+                    qunit_state_names[state], qunit->lq_rc,             \
+                    qunit->lq_owner);                                   \
         qunit->lq_state = state;                                        \
         spin_unlock(&qunit->lq_lock);                                   \
 } while(0)
@@ -131,9 +133,10 @@ do {                                                                    \
         spin_lock(&qunit->lq_lock);                                     \
         qunit->lq_rc = rc;                                              \
         QDATA_DEBUG((&qunit->lq_data), "qunit(%p) lq_state(%s->%s), "   \
-                    "lq_rc(%d)\n",                                      \
+                    "lq_rc(%d), lq_owner(%d)\n",                        \
                     qunit, qunit_state_names[qunit->lq_state],          \
-                    qunit_state_names[state], qunit->lq_rc);            \
+                    qunit_state_names[state], qunit->lq_rc,             \
+                    qunit->lq_owner);                                   \
         qunit->lq_state = state;                                        \
         spin_unlock(&qunit->lq_lock);                                   \
 } while(0)
@@ -285,18 +288,10 @@ check_cur_qunit(struct obd_device *obd,
         if (!limit)
                 GOTO(out, ret = 0);
 
- search_lqs:
-        quota_search_lqs(qdata, NULL, qctxt, &lqs);
-        if (!lqs) {
-                CDEBUG(D_QUOTA, "Can't find the lustre qunit size!\n");
-                ret = quota_create_lqs(qdata, NULL, qctxt, &lqs);
-                if (ret == -EALREADY) {
-                        ret = 0;
-                        goto search_lqs;
-                }
-                if (ret < 0)
-                        GOTO (out, ret);
-        }
+        lqs = quota_search_lqs(LQS_KEY(QDATA_IS_GRP(qdata), qdata->qd_id),
+                               qctxt, 1);
+        if (IS_ERR(lqs))
+                GOTO (out, ret = PTR_ERR(lqs));
         spin_lock(&lqs->lqs_lock);
 
         if (QDATA_IS_BLK(qdata)) {
@@ -435,6 +430,7 @@ static struct lustre_qunit *alloc_qunit(struct lustre_quota_ctxt *qctxt,
         qunit->lq_opc = opc;
         qunit->lq_lock = SPIN_LOCK_UNLOCKED;
         QUNIT_SET_STATE_AND_RC(qunit, QUNIT_CREATED, 0);
+        qunit->lq_owner = cfs_curproc_pid();
         RETURN(qunit);
 }
 
@@ -484,10 +480,12 @@ insert_qunit_nolock(struct lustre_quota_ctxt *qctxt, struct lustre_qunit *qunit)
 
 static void compute_lqs_after_removing_qunit(struct lustre_qunit *qunit)
 {
-        struct lustre_qunit_size *lqs = NULL;
+        struct lustre_qunit_size *lqs;
 
-        quota_search_lqs(&qunit->lq_data, NULL, qunit->lq_ctxt, &lqs);
-        if (lqs) {
+        lqs = quota_search_lqs(LQS_KEY(QDATA_IS_GRP(&qunit->lq_data),
+                                       qunit->lq_data.qd_id),
+                               qunit->lq_ctxt, 0);
+        if (lqs && !IS_ERR(lqs)) {
                 spin_lock(&lqs->lqs_lock);
                 if (qunit->lq_opc == QUOTA_DQACQ)
                         quota_compute_lqs(&qunit->lq_data, lqs, 0, 1);
@@ -499,7 +497,6 @@ static void compute_lqs_after_removing_qunit(struct lustre_qunit *qunit)
                 /* this is for schedule_dqacq */
                 lqs_putref(lqs);
         }
-
 }
 
 static void remove_qunit_nolock(struct lustre_qunit *qunit)
@@ -526,6 +523,20 @@ schedule_dqacq(struct obd_device *obd, struct lustre_quota_ctxt *qctxt,
                struct qunit_data *qdata, int opc, int wait,
                struct obd_trans_info *oti);
 
+static inline void qdata_to_oqaq(struct qunit_data *qdata,
+                                 struct quota_adjust_qunit *oqaq)
+{
+        LASSERT(qdata);
+        LASSERT(oqaq);
+
+        oqaq->qaq_flags = qdata->qd_flags;
+        oqaq->qaq_id    = qdata->qd_id;
+        if (QDATA_IS_ADJBLK(qdata))
+                oqaq->qaq_bunit_sz = qdata->qd_qunit;
+        if (QDATA_IS_ADJINO(qdata))
+                oqaq->qaq_iunit_sz = qdata->qd_qunit;
+}
+
 static int
 dqacq_completion(struct obd_device *obd, struct lustre_quota_ctxt *qctxt,
                  struct qunit_data *qdata, int rc, int opc)
@@ -704,30 +715,24 @@ static int dqacq_interpret(const struct lu_env *env,
         struct lustre_qunit *qunit = aa->aa_qunit;
         struct obd_device *obd = req->rq_import->imp_obd;
         struct qunit_data *qdata = NULL;
-        int rc1 = 0;
         ENTRY;
 
         LASSERT(req);
         LASSERT(req->rq_import);
 
-        /* there are several forms of qunit(historic causes), so we need to
-         * adjust qunit from slaves to the same form here */
-        OBD_ALLOC(qdata, sizeof(struct qunit_data));
-        if (!qdata)
-                RETURN(-ENOMEM);
-
         down_read(&obt->obt_rwsem);
         /* if a quota req timeouts or is dropped, we should update quota
          * statistics which will be handled in dqacq_completion. And in
          * this situation we should get qdata from request instead of
          * reply */
-        rc1 = quota_get_qdata(req, qdata,
-                              (rc != 0) ? QUOTA_REQUEST : QUOTA_REPLY,
-                              QUOTA_IMPORT);
-        if (rc1 < 0) {
+        qdata = quota_get_qdata(req, (rc != 0) ? QUOTA_REQUEST : QUOTA_REPLY,
+                                QUOTA_IMPORT);
+        if (IS_ERR(qdata)) {
+                rc = PTR_ERR(qdata);
                 DEBUG_REQ(D_ERROR, req,
-                          "error unpacking qunit_data(rc: %d)\n", rc1);
-                GOTO(exit, rc = rc1);
+                          "error unpacking qunit_data(rc: %ld)\n",
+                          PTR_ERR(qdata));
+                RETURN(PTR_ERR(qdata));
         }
 
         QDATA_DEBUG(qdata, "qdata: interpret rc(%d).\n", rc);
@@ -761,10 +766,7 @@ static int dqacq_interpret(const struct lu_env *env,
         rc = dqacq_completion(obd, qctxt, qdata, rc,
                               lustre_msg_get_opc(req->rq_reqmsg));
 
-exit:
         up_read(&obt->obt_rwsem);
-        OBD_FREE(qdata, sizeof(struct qunit_data));
-
         RETURN(rc);
 }
 
@@ -809,7 +811,7 @@ void dqacq_interrupt(struct lustre_quota_ctxt *qctxt)
         EXIT;
 }
 
-static int got_qunit(struct lustre_qunit *qunit)
+static int got_qunit(struct lustre_qunit *qunit, int is_master)
 {
         struct lustre_quota_ctxt *qctxt = qunit->lq_ctxt;
         int rc = 0;
@@ -830,7 +832,9 @@ static int got_qunit(struct lustre_qunit *qunit)
 
         if (!rc) {
                 spin_lock(&qctxt->lqc_lock);
-                rc = !qctxt->lqc_import || !qctxt->lqc_valid;
+                rc = !qctxt->lqc_valid;
+                if (!is_master)
+                        rc |= !qctxt->lqc_import;
                 spin_unlock(&qctxt->lqc_lock);
         }
 
@@ -872,8 +876,9 @@ schedule_dqacq(struct obd_device *obd, struct lustre_quota_ctxt *qctxt,
         insert_qunit_nolock(qctxt, qunit);
         spin_unlock(&qunit_hash_lock);
 
-        quota_search_lqs(qdata, NULL, qctxt, &lqs);
-        if (lqs) {
+        lqs = quota_search_lqs(LQS_KEY(QDATA_IS_GRP(qdata), qdata->qd_id),
+                               qctxt, 0);
+        if (lqs && !IS_ERR(lqs)) {
                 spin_lock(&lqs->lqs_lock);
                 quota_compute_lqs(qdata, lqs, 1, (opc == QUOTA_DQACQ) ? 1 : 0);
                 /* when this qdata returned from mds, it will call lqs_putref */
@@ -941,7 +946,8 @@ schedule_dqacq(struct obd_device *obd, struct lustre_quota_ctxt *qctxt,
                         l_wait_event(qctxt->lqc_wait_for_qmaster,
                                      check_qm(qctxt), &lwi);
                         CDEBUG(D_QUOTA, "wake up when quota master is back\n");
-                        lc_watchdog_touch(oti->oti_thread->t_watchdog);
+                        lc_watchdog_touch(oti->oti_thread->t_watchdog,
+                                 GET_TIMEOUT(oti->oti_thread->t_svc));
                 } else {
                         spin_unlock(&qctxt->lqc_lock);
                 }
@@ -992,7 +998,8 @@ wait_completion:
                 struct qunit_data *p = &qunit->lq_data;
 
                 QDATA_DEBUG(p, "qunit(%p) is waiting for dqacq.\n", qunit);
-                l_wait_event(qunit->lq_waitq, got_qunit(qunit), &lwi);
+                l_wait_event(qunit->lq_waitq, got_qunit(qunit, is_master(qctxt)),
+                             &lwi);
                 /* rc = -EAGAIN, it means the quota master isn't ready yet
                  * rc = QUOTA_REQ_RETURNED, it means a quota req is finished;
                  * rc = -EDQUOT, it means out of quota
@@ -1002,8 +1009,9 @@ wait_completion:
                 spin_lock(&qunit->lq_lock);
                 rc = qunit->lq_rc;
                 spin_unlock(&qunit->lq_lock);
-                CDEBUG(D_QUOTA, "qunit(%p) finishes waiting. (rc:%d)\n",
-                       qunit, rc);
+                CDEBUG(D_QUOTA, "qunit(%p) finishes waiting: id(%u) flag(%u) "
+                       "rc(%d) owner(%d)\n", qunit, qunit->lq_data.qd_id,
+                       qunit->lq_data.qd_flags, rc, qunit->lq_owner);
         }
 
         qunit_put(qunit);
@@ -1023,11 +1031,10 @@ wait_completion:
 
 int
 qctxt_adjust_qunit(struct obd_device *obd, struct lustre_quota_ctxt *qctxt,
-                   uid_t uid, gid_t gid, __u32 isblk, int wait,
+                   const unsigned int id[], __u32 isblk, int wait,
                    struct obd_trans_info *oti)
 {
         int rc = 0, i = USRQUOTA;
-        __u32 id[MAXQUOTAS] = { uid, gid };
         struct qunit_data qdata[MAXQUOTAS];
         ENTRY;
 
@@ -1091,9 +1098,10 @@ qctxt_wait_pending_dqacq(struct lustre_quota_ctxt *qctxt, unsigned int id,
                 struct qunit_data *p = &qunit->lq_data;
 
                 QDATA_DEBUG(p, "qunit(%p) is waiting for dqacq.\n", qunit);
-                l_wait_event(qunit->lq_waitq, got_qunit(qunit), &lwi);
-                CDEBUG(D_QUOTA, "qunit(%p) finishes waiting. (rc:%d)\n",
-                       qunit, qunit->lq_rc);
+                l_wait_event(qunit->lq_waitq, got_qunit(qunit, is_master(qctxt)),
+                             &lwi);
+                CDEBUG(D_QUOTA, "qunit(%p) finishes waiting: rc(%d) "
+                       "owner(%d)\n", qunit, qunit->lq_rc, qunit->lq_owner);
                 /* keep same as schedule_dqacq() b=17030 */
                 spin_lock(&qunit->lq_lock);
                 rc = qunit->lq_rc;
@@ -1158,7 +1166,9 @@ qctxt_init(struct obd_device *obd, dqacq_handler_t handler)
         qctxt->lqc_sync_blk = 0;
         spin_unlock(&qctxt->lqc_lock);
 
-        qctxt->lqc_lqs_hash = lustre_hash_init("LQS_HASH", 7, 7,
+        qctxt->lqc_lqs_hash = lustre_hash_init("LQS_HASH",
+                                               HASH_LQS_CUR_BITS,
+                                               HASH_LQS_MAX_BITS,
                                                &lqs_hash_ops, 0);
         if (!qctxt->lqc_lqs_hash) {
                 CERROR("initialize hash lqs for %s error!\n", obd->obd_name);
@@ -1250,11 +1260,20 @@ static int qslave_recovery_main(void *arg)
 
         ptlrpc_daemonize("qslave_recovd");
 
+        /* for obdfilter */
+        class_incref(obd, "qslave_recovd_filter", obd);
+
         complete(&data->comp);
 
-        if (qctxt->lqc_recovery)
+        spin_lock(&qctxt->lqc_lock);
+        if (qctxt->lqc_recovery) {
+                spin_unlock(&qctxt->lqc_lock);
+                class_decref(obd, "qslave_recovd_filter", obd);
                 RETURN(0);
-        qctxt->lqc_recovery = 1;
+        } else {
+                qctxt->lqc_recovery = 1;
+                spin_unlock(&qctxt->lqc_lock);
+        }
 
         for (type = USRQUOTA; type < MAXQUOTAS; type++) {
                 struct qunit_data qdata;
@@ -1314,7 +1333,10 @@ free:
                 }
         }
 
+        spin_lock(&qctxt->lqc_lock);
         qctxt->lqc_recovery = 0;
+        spin_unlock(&qctxt->lqc_lock);
+        class_decref(obd, "qslave_recovd_filter", obd);
         RETURN(rc);
 }
 
@@ -1367,18 +1389,15 @@ lqs_hash(lustre_hash_t *lh, void *key, unsigned mask)
 static int
 lqs_compare(void *key, struct hlist_node *hnode)
 {
-        struct quota_adjust_qunit *lqs_key;
         struct lustre_qunit_size *q;
         int rc;
         ENTRY;
 
         LASSERT(key);
-        lqs_key = (struct quota_adjust_qunit *)key;
         q = hlist_entry(hnode, struct lustre_qunit_size, lqs_hash);
 
         spin_lock(&q->lqs_lock);
-        rc = ((lqs_key->qaq_id == q->lqs_id) &&
-              (QAQ_IS_GRP(lqs_key) == LQS_IS_GRP(q)));
+        rc = (q->lqs_key == *((unsigned long long *)key));
         spin_unlock(&q->lqs_lock);
 
         RETURN(rc);
@@ -1387,7 +1406,7 @@ lqs_compare(void *key, struct hlist_node *hnode)
 static void *
 lqs_get(struct hlist_node *hnode)
 {
-        struct lustre_qunit_size *q = 
+        struct lustre_qunit_size *q =
             hlist_entry(hnode, struct lustre_qunit_size, lqs_hash);
         ENTRY;
 
@@ -1401,7 +1420,7 @@ lqs_get(struct hlist_node *hnode)
 static void *
 lqs_put(struct hlist_node *hnode)
 {
-        struct lustre_qunit_size *q = 
+        struct lustre_qunit_size *q =
             hlist_entry(hnode, struct lustre_qunit_size, lqs_hash);
         ENTRY;
 
@@ -1420,12 +1439,12 @@ lqs_exit(struct hlist_node *hnode)
         ENTRY;
 
         q = hlist_entry(hnode, struct lustre_qunit_size, lqs_hash);
-        /* 
+        /*
          * Nothing should be left. User of lqs put it and
          * lqs also was deleted from table by this time
          * so we should have 0 refs.
          */
-        LASSERTF(atomic_read(&q->lqs_refcount) == 0, 
+        LASSERTF(atomic_read(&q->lqs_refcount) == 0,
                  "Busy lqs %p with %d refs\n", q,
                  atomic_read(&q->lqs_refcount));
         OBD_FREE_PTR(q);