Whamcloud - gitweb
b=20500
[fs/lustre-release.git] / lustre / quota / quota_context.c
index 82a5762..440ddf8 100644 (file)
@@ -141,7 +141,6 @@ do {                                                                    \
         spin_unlock(&qunit->lq_lock);                                   \
 } while(0)
 
-
 int should_translate_quota (struct obd_import *imp)
 {
         ENTRY;
@@ -288,17 +287,12 @@ check_cur_qunit(struct obd_device *obd,
         if (!limit)
                 GOTO(out, ret = 0);
 
- search_lqs:
-        quota_search_lqs(qdata, NULL, qctxt, &lqs);
-        if (!lqs) {
-                CDEBUG(D_QUOTA, "Can't find the lustre qunit size!\n");
-                ret = quota_create_lqs(qdata, NULL, qctxt, &lqs);
-                if (ret == -EALREADY) {
-                        ret = 0;
-                        goto search_lqs;
-                }
-                if (ret < 0)
-                        GOTO (out, ret);
+        lqs = quota_search_lqs(LQS_KEY(QDATA_IS_GRP(qdata), qdata->qd_id),
+                               qctxt, 0);
+        if (IS_ERR(lqs) || lqs == NULL) {
+                CDEBUG(D_ERROR, "fail to find a lqs(%s id: %u)!\n",
+                       QDATA_IS_GRP(qdata) ? "group" : "user", qdata->qd_id);
+                GOTO (out, ret = 0);
         }
         spin_lock(&lqs->lqs_lock);
 
@@ -358,6 +352,7 @@ check_cur_qunit(struct obd_device *obd,
 
         spin_unlock(&lqs->lqs_lock);
         lqs_putref(lqs);
+
         EXIT;
  out:
         OBD_FREE_PTR(qctl);
@@ -488,10 +483,12 @@ insert_qunit_nolock(struct lustre_quota_ctxt *qctxt, struct lustre_qunit *qunit)
 
 static void compute_lqs_after_removing_qunit(struct lustre_qunit *qunit)
 {
-        struct lustre_qunit_size *lqs = NULL;
+        struct lustre_qunit_size *lqs;
 
-        quota_search_lqs(&qunit->lq_data, NULL, qunit->lq_ctxt, &lqs);
-        if (lqs) {
+        lqs = quota_search_lqs(LQS_KEY(QDATA_IS_GRP(&qunit->lq_data),
+                                       qunit->lq_data.qd_id),
+                               qunit->lq_ctxt, 0);
+        if (lqs && !IS_ERR(lqs)) {
                 spin_lock(&lqs->lqs_lock);
                 if (qunit->lq_opc == QUOTA_DQACQ)
                         quota_compute_lqs(&qunit->lq_data, lqs, 0, 1);
@@ -515,6 +512,70 @@ static void remove_qunit_nolock(struct lustre_qunit *qunit)
         qunit_put(qunit);
 }
 
+void* quota_barrier(struct lustre_quota_ctxt *qctxt,
+                    struct obd_quotactl *oqctl, int isblk)
+{
+        struct lustre_qunit *qunit, *find_qunit;
+        int cycle = 1;
+
+        OBD_SLAB_ALLOC(qunit, qunit_cachep, CFS_ALLOC_IO, sizeof(*qunit));
+        if (qunit == NULL) {
+                CERROR("locating qunit failed.(id=%u isblk=%d %s)\n",
+                       oqctl->qc_id, isblk, oqctl->qc_type ? "grp" : "usr");
+                qctxt_wait_pending_dqacq(qctxt, oqctl->qc_id,
+                                         oqctl->qc_type, isblk);
+                return NULL;
+        }
+
+        INIT_LIST_HEAD(&qunit->lq_hash);
+        qunit->lq_lock = SPIN_LOCK_UNLOCKED;
+        init_waitqueue_head(&qunit->lq_waitq);
+        atomic_set(&qunit->lq_refcnt, 1);
+        qunit->lq_ctxt = qctxt;
+        qunit->lq_data.qd_id = oqctl->qc_id;
+        qunit->lq_data.qd_flags =  oqctl->qc_type;
+        if (isblk)
+                QDATA_SET_BLK(&qunit->lq_data);
+        QUNIT_SET_STATE_AND_RC(qunit, QUNIT_CREATED, 0);
+        /* it means it is only an invalid qunit for barrier */
+        qunit->lq_opc = QUOTA_LAST_OPC;
+
+        while (1) {
+                spin_lock(&qunit_hash_lock);
+                find_qunit = dqacq_in_flight(qctxt, &qunit->lq_data);
+                if (find_qunit) {
+                        spin_unlock(&qunit_hash_lock);
+                        qunit_put(find_qunit);
+                        qctxt_wait_pending_dqacq(qctxt, oqctl->qc_id,
+                                                 oqctl->qc_type, isblk);
+                        CDEBUG(D_QUOTA, "cycle=%d\n", cycle++);
+                        continue;
+                }
+                break;
+        }
+        insert_qunit_nolock(qctxt, qunit);
+        spin_unlock(&qunit_hash_lock);
+        return qunit;
+}
+
+void quota_unbarrier(void *handle)
+{
+        struct lustre_qunit *qunit = (struct lustre_qunit *)handle;
+
+        if (qunit == NULL) {
+                CERROR("handle is NULL\n");
+                return;
+        }
+
+        LASSERT(qunit->lq_opc == QUOTA_LAST_OPC);
+        spin_lock(&qunit_hash_lock);
+        remove_qunit_nolock(qunit);
+        spin_unlock(&qunit_hash_lock);
+        QUNIT_SET_STATE_AND_RC(qunit, QUNIT_FINISHED, QUOTA_REQ_RETURNED);
+        wake_up(&qunit->lq_waitq);
+        qunit_put(qunit);
+}
+
 #define INC_QLIMIT(limit, count) (limit == MIN_QLIMIT) ? \
                                  (limit = count) : (limit += count)
 
@@ -529,6 +590,20 @@ schedule_dqacq(struct obd_device *obd, struct lustre_quota_ctxt *qctxt,
                struct qunit_data *qdata, int opc, int wait,
                struct obd_trans_info *oti);
 
+static inline void qdata_to_oqaq(struct qunit_data *qdata,
+                                 struct quota_adjust_qunit *oqaq)
+{
+        LASSERT(qdata);
+        LASSERT(oqaq);
+
+        oqaq->qaq_flags = qdata->qd_flags;
+        oqaq->qaq_id    = qdata->qd_id;
+        if (QDATA_IS_ADJBLK(qdata))
+                oqaq->qaq_bunit_sz = qdata->qd_qunit;
+        if (QDATA_IS_ADJINO(qdata))
+                oqaq->qaq_iunit_sz = qdata->qd_qunit;
+}
+
 static int
 dqacq_completion(struct obd_device *obd, struct lustre_quota_ctxt *qctxt,
                  struct qunit_data *qdata, int rc, int opc)
@@ -803,7 +878,7 @@ void dqacq_interrupt(struct lustre_quota_ctxt *qctxt)
         EXIT;
 }
 
-static int got_qunit(struct lustre_qunit *qunit)
+static int got_qunit(struct lustre_qunit *qunit, int is_master)
 {
         struct lustre_quota_ctxt *qctxt = qunit->lq_ctxt;
         int rc = 0;
@@ -824,7 +899,9 @@ static int got_qunit(struct lustre_qunit *qunit)
 
         if (!rc) {
                 spin_lock(&qctxt->lqc_lock);
-                rc = !qctxt->lqc_import || !qctxt->lqc_valid;
+                rc = !qctxt->lqc_valid;
+                if (!is_master)
+                        rc |= !qctxt->lqc_import;
                 spin_unlock(&qctxt->lqc_lock);
         }
 
@@ -866,8 +943,9 @@ schedule_dqacq(struct obd_device *obd, struct lustre_quota_ctxt *qctxt,
         insert_qunit_nolock(qctxt, qunit);
         spin_unlock(&qunit_hash_lock);
 
-        quota_search_lqs(qdata, NULL, qctxt, &lqs);
-        if (lqs) {
+        lqs = quota_search_lqs(LQS_KEY(QDATA_IS_GRP(qdata), qdata->qd_id),
+                               qctxt, 0);
+        if (lqs && !IS_ERR(lqs)) {
                 spin_lock(&lqs->lqs_lock);
                 quota_compute_lqs(qdata, lqs, 1, (opc == QUOTA_DQACQ) ? 1 : 0);
                 /* when this qdata returned from mds, it will call lqs_putref */
@@ -987,7 +1065,8 @@ wait_completion:
                 struct qunit_data *p = &qunit->lq_data;
 
                 QDATA_DEBUG(p, "qunit(%p) is waiting for dqacq.\n", qunit);
-                l_wait_event(qunit->lq_waitq, got_qunit(qunit), &lwi);
+                l_wait_event(qunit->lq_waitq, got_qunit(qunit, is_master(qctxt)),
+                             &lwi);
                 /* rc = -EAGAIN, it means the quota master isn't ready yet
                  * rc = QUOTA_REQ_RETURNED, it means a quota req is finished;
                  * rc = -EDQUOT, it means out of quota
@@ -1019,16 +1098,14 @@ wait_completion:
 
 int
 qctxt_adjust_qunit(struct obd_device *obd, struct lustre_quota_ctxt *qctxt,
-                   uid_t uid, gid_t gid, __u32 isblk, int wait,
+                   const unsigned int id[], __u32 isblk, int wait,
                    struct obd_trans_info *oti)
 {
         int rc = 0, i = USRQUOTA;
-        __u32 id[MAXQUOTAS] = { uid, gid };
         struct qunit_data qdata[MAXQUOTAS];
         ENTRY;
 
-        CLASSERT(MAXQUOTAS < 4);
-        if (!sb_any_quota_enabled(qctxt->lqc_sb))
+        if (quota_is_set(obd, id, isblk ? QB_SET : QI_SET) == 0)
                 RETURN(0);
 
         for (i = 0; i < MAXQUOTAS; i++) {
@@ -1087,7 +1164,8 @@ qctxt_wait_pending_dqacq(struct lustre_quota_ctxt *qctxt, unsigned int id,
                 struct qunit_data *p = &qunit->lq_data;
 
                 QDATA_DEBUG(p, "qunit(%p) is waiting for dqacq.\n", qunit);
-                l_wait_event(qunit->lq_waitq, got_qunit(qunit), &lwi);
+                l_wait_event(qunit->lq_waitq, got_qunit(qunit, is_master(qctxt)),
+                             &lwi);
                 CDEBUG(D_QUOTA, "qunit(%p) finishes waiting: rc(%d) "
                        "owner(%d)\n", qunit, qunit->lq_rc, qunit->lq_owner);
                 /* keep same as schedule_dqacq() b=17030 */
@@ -1130,6 +1208,8 @@ qctxt_init(struct obd_device *obd, dqacq_handler_t handler)
                 RETURN(rc);
 
         cfs_waitq_init(&qctxt->lqc_wait_for_qmaster);
+        cfs_waitq_init(&qctxt->lqc_lqs_waitq);
+        atomic_set(&qctxt->lqc_lqs, 0);
         spin_lock_init(&qctxt->lqc_lock);
         spin_lock(&qctxt->lqc_lock);
         qctxt->lqc_handler = handler;
@@ -1172,10 +1252,27 @@ qctxt_init(struct obd_device *obd, dqacq_handler_t handler)
         RETURN(rc);
 }
 
+static int check_lqs(struct lustre_quota_ctxt *qctxt)
+{
+        int rc;
+        ENTRY;
+
+        rc = !atomic_read(&qctxt->lqc_lqs);
+
+        RETURN(rc);
+}
+
+
+void hash_put_lqs(void *obj, void *data)
+{
+        lqs_putref((struct lustre_qunit_size *)obj);
+}
+
 void qctxt_cleanup(struct lustre_quota_ctxt *qctxt, int force)
 {
         struct lustre_qunit *qunit, *tmp;
         struct list_head tmp_list;
+        struct l_wait_info lwi = { 0 };
         struct obd_device_target *obt = qctxt->lqc_obt;
         int i;
         ENTRY;
@@ -1207,11 +1304,6 @@ void qctxt_cleanup(struct lustre_quota_ctxt *qctxt, int force)
                 qunit_put(qunit);
         }
 
-        down_write(&obt->obt_rwsem);
-        lustre_hash_exit(qctxt->lqc_lqs_hash);
-        qctxt->lqc_lqs_hash = NULL;
-        up_write(&obt->obt_rwsem);
-
         /* after qctxt_cleanup, qctxt might be freed, then check_qm() is
          * unpredicted. So we must wait until lqc_wait_for_qmaster is empty */
         while (cfs_waitq_active(&qctxt->lqc_wait_for_qmaster)) {
@@ -1220,6 +1312,13 @@ void qctxt_cleanup(struct lustre_quota_ctxt *qctxt, int force)
                                      cfs_time_seconds(1));
         }
 
+        lustre_hash_for_each_safe(qctxt->lqc_lqs_hash, hash_put_lqs, NULL);
+        l_wait_event(qctxt->lqc_lqs_waitq, check_lqs(qctxt), &lwi);
+        down_write(&obt->obt_rwsem);
+        lustre_hash_exit(qctxt->lqc_lqs_hash);
+        qctxt->lqc_lqs_hash = NULL;
+        up_write(&obt->obt_rwsem);
+
         ptlrpcd_decref();
 
 #ifdef LPROCFS
@@ -1246,7 +1345,7 @@ static int qslave_recovery_main(void *arg)
         int rc = 0;
         ENTRY;
 
-        ptlrpc_daemonize("qslave_recovd");
+        cfs_daemonize_ctxt("qslave_recovd");
 
         /* for obdfilter */
         class_incref(obd, "qslave_recovd_filter", obd);
@@ -1317,7 +1416,7 @@ static int qslave_recovery_main(void *arg)
                                        "qslave recovery failed! (id:%d type:%d "
                                        " rc:%d)\n", dqid->di_id, type, rc);
 free:
-                        kfree(dqid);
+                        OBD_FREE_PTR(dqid);
                 }
         }
 
@@ -1352,6 +1451,87 @@ exit:
         EXIT;
 }
 
+int quota_is_on(struct lustre_quota_ctxt *qctxt, struct obd_quotactl *oqctl)
+{
+        unsigned int type;
+
+        for (type = USRQUOTA; type < MAXQUOTAS; type++) {
+                if (!Q_TYPESET(oqctl, type))
+                        continue;
+                if (!(qctxt->lqc_flags & UGQUOTA2LQC(oqctl->qc_type)))
+                        return 0;
+        }
+        return 1;
+}
+
+int quota_is_off(struct lustre_quota_ctxt *qctxt, struct obd_quotactl *oqctl)
+{
+        unsigned int type;
+
+        for (type = USRQUOTA; type < MAXQUOTAS; type++) {
+                if (!Q_TYPESET(oqctl, type))
+                        continue;
+                if (qctxt->lqc_flags & UGQUOTA2LQC(oqctl->qc_type))
+                        return 0;
+        }
+        return 1;
+}
+
+/**
+ * When quotaon, build a lqs for every uid/gid who has been set limitation
+ * for quota. After quota_search_lqs, it will hold one ref for the lqs.
+ * It will be released when qctxt_cleanup() is executed b=18574
+ *
+ * Should be called with obt->obt_quotachecking held. b=20152 
+ */
+void build_lqs(struct obd_device *obd)
+{
+        struct obd_device_target *obt = &obd->u.obt;
+        struct lustre_quota_ctxt *qctxt = &obt->obt_qctxt;
+        struct list_head id_list;
+        int i, rc;
+
+        LASSERT_SEM_LOCKED(&obt->obt_quotachecking);
+        INIT_LIST_HEAD(&id_list);
+        for (i = 0; i < MAXQUOTAS; i++) {
+                struct dquot_id *dqid, *tmp;
+
+                if (sb_dqopt(qctxt->lqc_sb)->files[i] == NULL)
+                        continue;
+
+#ifndef KERNEL_SUPPORTS_QUOTA_READ
+                rc = fsfilt_qids(obd, sb_dqopt(qctxt->lqc_sb)->files[i], NULL,
+                                 i, &id_list);
+#else
+                rc = fsfilt_qids(obd, NULL, sb_dqopt(qctxt->lqc_sb)->files[i],
+                                 i, &id_list);
+#endif
+                if (rc) {
+                        CDEBUG(D_ERROR, "fail to get %s qids!\n",
+                               i ? "group" : "user");
+                        continue;
+                }
+
+                list_for_each_entry_safe(dqid, tmp, &id_list,
+                                         di_link) {
+                        struct lustre_qunit_size *lqs;
+
+                        list_del_init(&dqid->di_link);
+                        lqs = quota_search_lqs(LQS_KEY(i, dqid->di_id),
+                                               qctxt, 1);
+                        if (lqs && !IS_ERR(lqs)) {
+                                lqs->lqs_flags |= dqid->di_flag;
+                                lqs_putref(lqs);
+                        } else {
+                                CDEBUG(D_ERROR, "fail to create a lqs"
+                                       "(%s id: %u)!\n", i ? "group" : "user",
+                                       dqid->di_id);
+                        }
+
+                        OBD_FREE_PTR(dqid);
+                }
+        }
+}
 
 /**
  * lqs<->qctxt hash operations
@@ -1377,18 +1557,15 @@ lqs_hash(lustre_hash_t *lh, void *key, unsigned mask)
 static int
 lqs_compare(void *key, struct hlist_node *hnode)
 {
-        struct quota_adjust_qunit *lqs_key;
         struct lustre_qunit_size *q;
         int rc;
         ENTRY;
 
         LASSERT(key);
-        lqs_key = (struct quota_adjust_qunit *)key;
         q = hlist_entry(hnode, struct lustre_qunit_size, lqs_hash);
 
         spin_lock(&q->lqs_lock);
-        rc = ((lqs_key->qaq_id == q->lqs_id) &&
-              (QAQ_IS_GRP(lqs_key) == LQS_IS_GRP(q)));
+        rc = (q->lqs_key == *((unsigned long long *)key));
         spin_unlock(&q->lqs_lock);
 
         RETURN(rc);
@@ -1398,12 +1575,10 @@ static void *
 lqs_get(struct hlist_node *hnode)
 {
         struct lustre_qunit_size *q =
-            hlist_entry(hnode, struct lustre_qunit_size, lqs_hash);
+                hlist_entry(hnode, struct lustre_qunit_size, lqs_hash);
         ENTRY;
 
-        atomic_inc(&q->lqs_refcount);
-        CDEBUG(D_QUOTA, "lqs=%p refcount %d\n",
-               q, atomic_read(&q->lqs_refcount));
+        __lqs_getref(q);
 
         RETURN(q);
 }
@@ -1412,13 +1587,10 @@ static void *
 lqs_put(struct hlist_node *hnode)
 {
         struct lustre_qunit_size *q =
-            hlist_entry(hnode, struct lustre_qunit_size, lqs_hash);
+                hlist_entry(hnode, struct lustre_qunit_size, lqs_hash);
         ENTRY;
 
-        LASSERT(atomic_read(&q->lqs_refcount) > 0);
-        atomic_dec(&q->lqs_refcount);
-        CDEBUG(D_QUOTA, "lqs=%p refcount %d\n",
-               q, atomic_read(&q->lqs_refcount));
+        __lqs_putref(q, 0);
 
         RETURN(q);
 }
@@ -1426,10 +1598,10 @@ lqs_put(struct hlist_node *hnode)
 static void
 lqs_exit(struct hlist_node *hnode)
 {
-        struct lustre_qunit_size *q;
+        struct lustre_qunit_size *q =
+                hlist_entry(hnode, struct lustre_qunit_size, lqs_hash);
         ENTRY;
 
-        q = hlist_entry(hnode, struct lustre_qunit_size, lqs_hash);
         /*
          * Nothing should be left. User of lqs put it and
          * lqs also was deleted from table by this time