Whamcloud - gitweb
Branch HEAD
[fs/lustre-release.git] / lustre / quota / quota_context.c
index 6bcbcc5..54a9de4 100644 (file)
@@ -113,15 +113,17 @@ struct lustre_qunit {
         spinlock_t lq_lock;                /** Protect the whole structure */
         enum qunit_state lq_state;         /** Present the status of qunit */
         int lq_rc;                         /** The rc of lq_data */
+        pid_t lq_owner;
 };
 
 #define QUNIT_SET_STATE(qunit, state)                                   \
 do {                                                                    \
         spin_lock(&qunit->lq_lock);                                     \
         QDATA_DEBUG((&qunit->lq_data), "qunit(%p) lq_state(%s->%s), "   \
-                    "lq_rc(%d)\n",                                      \
+                    "lq_rc(%d), lq_owner(%d)\n",                        \
                     qunit, qunit_state_names[qunit->lq_state],          \
-                    qunit_state_names[state], qunit->lq_rc);            \
+                    qunit_state_names[state], qunit->lq_rc,             \
+                    qunit->lq_owner);                                   \
         qunit->lq_state = state;                                        \
         spin_unlock(&qunit->lq_lock);                                   \
 } while(0)
@@ -131,9 +133,10 @@ do {                                                                    \
         spin_lock(&qunit->lq_lock);                                     \
         qunit->lq_rc = rc;                                              \
         QDATA_DEBUG((&qunit->lq_data), "qunit(%p) lq_state(%s->%s), "   \
-                    "lq_rc(%d)\n",                                      \
+                    "lq_rc(%d), lq_owner(%d)\n",                        \
                     qunit, qunit_state_names[qunit->lq_state],          \
-                    qunit_state_names[state], qunit->lq_rc);            \
+                    qunit_state_names[state], qunit->lq_rc,             \
+                    qunit->lq_owner);                                   \
         qunit->lq_state = state;                                        \
         spin_unlock(&qunit->lq_lock);                                   \
 } while(0)
@@ -435,6 +438,7 @@ static struct lustre_qunit *alloc_qunit(struct lustre_quota_ctxt *qctxt,
         qunit->lq_opc = opc;
         qunit->lq_lock = SPIN_LOCK_UNLOCKED;
         QUNIT_SET_STATE_AND_RC(qunit, QUNIT_CREATED, 0);
+        qunit->lq_owner = cfs_curproc_pid();
         RETURN(qunit);
 }
 
@@ -499,7 +503,6 @@ static void compute_lqs_after_removing_qunit(struct lustre_qunit *qunit)
                 /* this is for schedule_dqacq */
                 lqs_putref(lqs);
         }
-
 }
 
 static void remove_qunit_nolock(struct lustre_qunit *qunit)
@@ -704,30 +707,24 @@ static int dqacq_interpret(const struct lu_env *env,
         struct lustre_qunit *qunit = aa->aa_qunit;
         struct obd_device *obd = req->rq_import->imp_obd;
         struct qunit_data *qdata = NULL;
-        int rc1 = 0;
         ENTRY;
 
         LASSERT(req);
         LASSERT(req->rq_import);
 
-        /* there are several forms of qunit(historic causes), so we need to
-         * adjust qunit from slaves to the same form here */
-        OBD_ALLOC(qdata, sizeof(struct qunit_data));
-        if (!qdata)
-                RETURN(-ENOMEM);
-
         down_read(&obt->obt_rwsem);
         /* if a quota req timeouts or is dropped, we should update quota
          * statistics which will be handled in dqacq_completion. And in
          * this situation we should get qdata from request instead of
          * reply */
-        rc1 = quota_get_qdata(req, qdata,
-                              (rc != 0) ? QUOTA_REQUEST : QUOTA_REPLY,
-                              QUOTA_IMPORT);
-        if (rc1 < 0) {
+        qdata = quota_get_qdata(req, (rc != 0) ? QUOTA_REQUEST : QUOTA_REPLY,
+                                QUOTA_IMPORT);
+        if (IS_ERR(qdata)) {
+                rc = PTR_ERR(qdata);
                 DEBUG_REQ(D_ERROR, req,
-                          "error unpacking qunit_data(rc: %d)\n", rc1);
-                GOTO(exit, rc = rc1);
+                          "error unpacking qunit_data(rc: %ld)\n",
+                          PTR_ERR(qdata));
+                RETURN(PTR_ERR(qdata));
         }
 
         QDATA_DEBUG(qdata, "qdata: interpret rc(%d).\n", rc);
@@ -761,10 +758,7 @@ static int dqacq_interpret(const struct lu_env *env,
         rc = dqacq_completion(obd, qctxt, qdata, rc,
                               lustre_msg_get_opc(req->rq_reqmsg));
 
-exit:
         up_read(&obt->obt_rwsem);
-        OBD_FREE(qdata, sizeof(struct qunit_data));
-
         RETURN(rc);
 }
 
@@ -809,7 +803,7 @@ void dqacq_interrupt(struct lustre_quota_ctxt *qctxt)
         EXIT;
 }
 
-static int got_qunit(struct lustre_qunit *qunit)
+static int got_qunit(struct lustre_qunit *qunit, int is_master)
 {
         struct lustre_quota_ctxt *qctxt = qunit->lq_ctxt;
         int rc = 0;
@@ -830,7 +824,9 @@ static int got_qunit(struct lustre_qunit *qunit)
 
         if (!rc) {
                 spin_lock(&qctxt->lqc_lock);
-                rc = !qctxt->lqc_import || !qctxt->lqc_valid;
+                rc = !qctxt->lqc_valid;
+                if (!is_master)
+                        rc |= !qctxt->lqc_import;
                 spin_unlock(&qctxt->lqc_lock);
         }
 
@@ -941,7 +937,8 @@ schedule_dqacq(struct obd_device *obd, struct lustre_quota_ctxt *qctxt,
                         l_wait_event(qctxt->lqc_wait_for_qmaster,
                                      check_qm(qctxt), &lwi);
                         CDEBUG(D_QUOTA, "wake up when quota master is back\n");
-                        lc_watchdog_touch(oti->oti_thread->t_watchdog);
+                        lc_watchdog_touch(oti->oti_thread->t_watchdog,
+                                 GET_TIMEOUT(oti->oti_thread->t_svc));
                 } else {
                         spin_unlock(&qctxt->lqc_lock);
                 }
@@ -992,7 +989,8 @@ wait_completion:
                 struct qunit_data *p = &qunit->lq_data;
 
                 QDATA_DEBUG(p, "qunit(%p) is waiting for dqacq.\n", qunit);
-                l_wait_event(qunit->lq_waitq, got_qunit(qunit), &lwi);
+                l_wait_event(qunit->lq_waitq, got_qunit(qunit, is_master(qctxt)),
+                             &lwi);
                 /* rc = -EAGAIN, it means the quota master isn't ready yet
                  * rc = QUOTA_REQ_RETURNED, it means a quota req is finished;
                  * rc = -EDQUOT, it means out of quota
@@ -1002,8 +1000,9 @@ wait_completion:
                 spin_lock(&qunit->lq_lock);
                 rc = qunit->lq_rc;
                 spin_unlock(&qunit->lq_lock);
-                CDEBUG(D_QUOTA, "qunit(%p) finishes waiting. (rc:%d)\n",
-                       qunit, rc);
+                CDEBUG(D_QUOTA, "qunit(%p) finishes waiting: id(%u) flag(%u) "
+                       "rc(%d) owner(%d)\n", qunit, qunit->lq_data.qd_id,
+                       qunit->lq_data.qd_flags, rc, qunit->lq_owner);
         }
 
         qunit_put(qunit);
@@ -1023,11 +1022,10 @@ wait_completion:
 
 int
 qctxt_adjust_qunit(struct obd_device *obd, struct lustre_quota_ctxt *qctxt,
-                   uid_t uid, gid_t gid, __u32 isblk, int wait,
+                   const unsigned int id[], __u32 isblk, int wait,
                    struct obd_trans_info *oti)
 {
         int rc = 0, i = USRQUOTA;
-        __u32 id[MAXQUOTAS] = { uid, gid };
         struct qunit_data qdata[MAXQUOTAS];
         ENTRY;
 
@@ -1091,9 +1089,10 @@ qctxt_wait_pending_dqacq(struct lustre_quota_ctxt *qctxt, unsigned int id,
                 struct qunit_data *p = &qunit->lq_data;
 
                 QDATA_DEBUG(p, "qunit(%p) is waiting for dqacq.\n", qunit);
-                l_wait_event(qunit->lq_waitq, got_qunit(qunit), &lwi);
-                CDEBUG(D_QUOTA, "qunit(%p) finishes waiting. (rc:%d)\n",
-                       qunit, qunit->lq_rc);
+                l_wait_event(qunit->lq_waitq, got_qunit(qunit, is_master(qctxt)),
+                             &lwi);
+                CDEBUG(D_QUOTA, "qunit(%p) finishes waiting: rc(%d) "
+                       "owner(%d)\n", qunit, qunit->lq_rc, qunit->lq_owner);
                 /* keep same as schedule_dqacq() b=17030 */
                 spin_lock(&qunit->lq_lock);
                 rc = qunit->lq_rc;
@@ -1158,7 +1157,9 @@ qctxt_init(struct obd_device *obd, dqacq_handler_t handler)
         qctxt->lqc_sync_blk = 0;
         spin_unlock(&qctxt->lqc_lock);
 
-        qctxt->lqc_lqs_hash = lustre_hash_init("LQS_HASH", 7, 7,
+        qctxt->lqc_lqs_hash = lustre_hash_init("LQS_HASH",
+                                               HASH_LQS_CUR_BITS,
+                                               HASH_LQS_MAX_BITS,
                                                &lqs_hash_ops, 0);
         if (!qctxt->lqc_lqs_hash) {
                 CERROR("initialize hash lqs for %s error!\n", obd->obd_name);
@@ -1250,11 +1251,20 @@ static int qslave_recovery_main(void *arg)
 
         ptlrpc_daemonize("qslave_recovd");
 
+        /* for obdfilter */
+        class_incref(obd, "qslave_recovd_filter", obd);
+
         complete(&data->comp);
 
-        if (qctxt->lqc_recovery)
+        spin_lock(&qctxt->lqc_lock);
+        if (qctxt->lqc_recovery) {
+                spin_unlock(&qctxt->lqc_lock);
+                class_decref(obd, "qslave_recovd_filter", obd);
                 RETURN(0);
-        qctxt->lqc_recovery = 1;
+        } else {
+                qctxt->lqc_recovery = 1;
+                spin_unlock(&qctxt->lqc_lock);
+        }
 
         for (type = USRQUOTA; type < MAXQUOTAS; type++) {
                 struct qunit_data qdata;
@@ -1314,7 +1324,10 @@ free:
                 }
         }
 
+        spin_lock(&qctxt->lqc_lock);
         qctxt->lqc_recovery = 0;
+        spin_unlock(&qctxt->lqc_lock);
+        class_decref(obd, "qslave_recovd_filter", obd);
         RETURN(rc);
 }
 
@@ -1387,7 +1400,7 @@ lqs_compare(void *key, struct hlist_node *hnode)
 static void *
 lqs_get(struct hlist_node *hnode)
 {
-        struct lustre_qunit_size *q = 
+        struct lustre_qunit_size *q =
             hlist_entry(hnode, struct lustre_qunit_size, lqs_hash);
         ENTRY;
 
@@ -1401,7 +1414,7 @@ lqs_get(struct hlist_node *hnode)
 static void *
 lqs_put(struct hlist_node *hnode)
 {
-        struct lustre_qunit_size *q = 
+        struct lustre_qunit_size *q =
             hlist_entry(hnode, struct lustre_qunit_size, lqs_hash);
         ENTRY;
 
@@ -1420,12 +1433,12 @@ lqs_exit(struct hlist_node *hnode)
         ENTRY;
 
         q = hlist_entry(hnode, struct lustre_qunit_size, lqs_hash);
-        /* 
+        /*
          * Nothing should be left. User of lqs put it and
          * lqs also was deleted from table by this time
          * so we should have 0 refs.
          */
-        LASSERTF(atomic_read(&q->lqs_refcount) == 0, 
+        LASSERTF(atomic_read(&q->lqs_refcount) == 0,
                  "Busy lqs %p with %d refs\n", q,
                  atomic_read(&q->lqs_refcount));
         OBD_FREE_PTR(q);