Whamcloud - gitweb
LU-5823 clio: add coo_obd_info_get and coo_data_version
[fs/lustre-release.git] / lustre / quota / qsd_handler.c
index 68803d0..16f274c 100644 (file)
  * GPL HEADER END
  */
 /*
- * Copyright (c) 2012 Intel, Inc.
+ * Copyright (c) 2012, 2014, Intel Corporation.
  * Use is subject to license terms.
  *
  * Author: Johann Lombardi <johann.lombardi@intel.com>
  * Author: Niu    Yawei    <yawei.niu@intel.com>
  */
 
-#ifndef EXPORT_SYMTAB
-# define EXPORT_SYMTAB
-#endif
-
 #define DEBUG_SUBSYSTEM S_LQUOTA
 
 #include "qsd_internal.h"
@@ -69,7 +65,7 @@ static inline void qsd_request_exit(struct lquota_entry *lqe)
        }
        lqe->lqe_pending_req--;
        lqe->lqe_pending_rel = 0;
-       cfs_waitq_broadcast(&lqe->lqe_waiters);
+       wake_up_all(&lqe->lqe_waiters);
 }
 
 /**
@@ -94,10 +90,10 @@ static int qsd_ready(struct lquota_entry *lqe, struct lustre_handle *lockh)
        struct ldlm_lock        *lock;
        ENTRY;
 
-       cfs_read_lock(&qsd->qsd_lock);
+       read_lock(&qsd->qsd_lock);
        /* is the qsd about to shut down? */
        if (qsd->qsd_stopping) {
-               cfs_read_unlock(&qsd->qsd_lock);
+               read_unlock(&qsd->qsd_lock);
                LQUOTA_DEBUG(lqe, "dropping quota req since qsd is stopping");
                /* Target is about to shut down, client will retry */
                RETURN(-EINPROGRESS);
@@ -107,7 +103,7 @@ static int qsd_ready(struct lquota_entry *lqe, struct lustre_handle *lockh)
        if (qsd->qsd_exp_valid)
                imp = class_exp2cliimp(qsd->qsd_exp);
        if (imp == NULL || imp->imp_invalid) {
-               cfs_read_unlock(&qsd->qsd_lock);
+               read_unlock(&qsd->qsd_lock);
                LQUOTA_DEBUG(lqe, "connection to master not ready");
                RETURN(-ENOTCONN);
        }
@@ -120,7 +116,7 @@ static int qsd_ready(struct lquota_entry *lqe, struct lustre_handle *lockh)
         * If the previous reintegration failed for some reason, we'll
         * re-trigger it here as well. */
        if (!qqi->qqi_glb_uptodate || !qqi->qqi_slv_uptodate) {
-               cfs_read_unlock(&qsd->qsd_lock);
+               read_unlock(&qsd->qsd_lock);
                LQUOTA_DEBUG(lqe, "not up-to-date, dropping request and "
                             "kicking off reintegration");
                qsd_start_reint_thread(qqi);
@@ -130,7 +126,7 @@ static int qsd_ready(struct lquota_entry *lqe, struct lustre_handle *lockh)
        /* Fill the remote global lock handle, master will check this handle
         * to see if the slave is sending request with stale lock */
        lustre_handle_copy(lockh, &qqi->qqi_lockh);
-       cfs_read_unlock(&qsd->qsd_lock);
+       read_unlock(&qsd->qsd_lock);
 
        if (!lustre_handle_is_used(lockh))
                RETURN(-ENOLCK);
@@ -308,7 +304,7 @@ static void qsd_req_completion(const struct lu_env *env,
                               struct quota_body *reqbody,
                               struct quota_body *repbody,
                               struct lustre_handle *lockh,
-                              union ldlm_wire_lvb *lvb,
+                              struct lquota_lvb *lvb,
                               void *arg, int ret)
 {
        struct lquota_entry     *lqe = (struct lquota_entry *)arg;
@@ -389,13 +385,10 @@ static void qsd_req_completion(const struct lu_env *env,
        }
 
        /* extract information from lvb */
-       if (ret == 0 && lvb != 0) {
-               if (lvb->l_lquota.lvb_id_qunit != 0)
-                       qsd_set_qunit(lqe, lvb->l_lquota.lvb_id_qunit);
-               if (lvb->l_lquota.lvb_flags & LQUOTA_FL_EDQUOT)
-                       lqe->lqe_edquot = true;
-               else
-                       lqe->lqe_edquot = false;
+       if (ret == 0 && lvb != NULL) {
+               if (lvb->lvb_id_qunit != 0)
+                       qsd_set_qunit(lqe, lvb->lvb_id_qunit);
+               qsd_set_edquot(lqe, !!(lvb->lvb_flags & LQUOTA_FL_EDQUOT));
        } else if (repbody != NULL && repbody->qb_qunit != 0) {
                qsd_set_qunit(lqe, repbody->qb_qunit);
        }
@@ -407,6 +400,10 @@ static void qsd_req_completion(const struct lu_env *env,
                lqe->lqe_nopreacq = true;
 out:
        adjust = qsd_adjust_needed(lqe);
+       if (reqbody && req_is_acq(reqbody->qb_flags) && ret != -EDQUOT) {
+               lqe->lqe_acq_rc = ret;
+               lqe->lqe_acq_time = cfs_time_current_64();
+       }
 out_noadjust:
        qsd_request_exit(lqe);
        lqe_write_unlock(lqe);
@@ -461,9 +458,15 @@ static int qsd_acquire_local(struct lquota_entry *lqe, __u64 space)
                lqe->lqe_pending_write += space;
                lqe->lqe_waiting_write -= space;
                rc = 0;
-       } else if (lqe->lqe_edquot) {
+       /* lqe_edquot flag is used to avoid flooding dqacq requests when
+        * the user is over quota, however, the lqe_edquot could be stale
+        * sometimes due to the race reply of dqacq vs. id lock glimpse
+        * (see LU-4505), so we revalidate it every 5 seconds. */
+       } else if (lqe->lqe_edquot &&
+                  cfs_time_before_64(cfs_time_shift_64(-5),
+                                     lqe->lqe_edquot_time)) {
                rc = -EDQUOT;
-       } else {
+       }else {
                rc = -EAGAIN;
        }
        lqe_write_unlock(lqe);
@@ -558,6 +561,14 @@ static int qsd_acquire_remote(const struct lu_env *env,
                RETURN(0);
        }
 
+       /* check whether an acquire request completed recently */
+       if (lqe->lqe_acq_rc != 0 &&
+           cfs_time_before_64(cfs_time_shift_64(-1), lqe->lqe_acq_time)) {
+               lqe_write_unlock(lqe);
+               LQUOTA_DEBUG(lqe, "using cached return code %d", lqe->lqe_acq_rc);
+               RETURN(lqe->lqe_acq_rc);
+       }
+
        /* only 1 quota request in flight for a given ID is allowed */
        rc = qsd_request_enter(lqe);
        if (rc) {
@@ -578,7 +589,7 @@ static int qsd_acquire_remote(const struct lu_env *env,
        /* check whether we already own a valid lock for this ID */
        rc = qsd_id_lock_match(&qti->qti_lockh, &qbody->qb_lockh);
        if (rc) {
-               union ldlm_wire_lvb *lvb;
+               struct lquota_lvb *lvb;
 
                OBD_ALLOC_PTR(lvb);
                if (lvb == NULL) {
@@ -823,12 +834,12 @@ int qsd_op_begin(const struct lu_env *env, struct qsd_instance *qsd,
                RETURN(0);
 
        /* We don't enforce quota until the qsd_instance is started */
-       cfs_read_lock(&qsd->qsd_lock);
+       read_lock(&qsd->qsd_lock);
        if (!qsd->qsd_started) {
-               cfs_read_unlock(&qsd->qsd_lock);
+               read_unlock(&qsd->qsd_lock);
                RETURN(0);
        }
-       cfs_read_unlock(&qsd->qsd_lock);
+       read_unlock(&qsd->qsd_lock);
 
        /* ignore block quota on MDTs, ignore inode quota on OSTs */
        if ((!qsd->qsd_is_md && !qi->lqi_is_blk) ||
@@ -837,11 +848,13 @@ int qsd_op_begin(const struct lu_env *env, struct qsd_instance *qsd,
 
        /* ignore quota enforcement request when:
         *    - quota isn't enforced for this quota type
-        * or - the user/group is root */
-       if (!qsd_type_enabled(qsd, qi->lqi_type) || qi->lqi_id.qid_uid == 0)
+        * or - the user/group is root
+        * or - quota accounting isn't enabled */
+       if (!qsd_type_enabled(qsd, qi->lqi_type) || qi->lqi_id.qid_uid == 0 ||
+           qsd->qsd_acct_failed)
                RETURN(0);
 
-       LASSERTF(trans->lqt_id_cnt <= QUOTA_MAX_TRANSIDS, "id_cnt=%d",
+       LASSERTF(trans->lqt_id_cnt <= QUOTA_MAX_TRANSIDS, "id_cnt=%d\n",
                 trans->lqt_id_cnt);
        /* check whether we already allocated a slot for this id */
        for (i = 0; i < trans->lqt_id_cnt; i++) {
@@ -898,8 +911,13 @@ int qsd_adjust(const struct lu_env *env, struct lquota_entry *lqe)
 
        memset(qbody, 0, sizeof(*qbody));
        rc = qsd_ready(lqe, &qbody->qb_glb_lockh);
-       if (rc)
-               RETURN(rc);
+       if (rc) {
+               /* add to adjust list again to trigger adjustment later when
+                * slave is ready */
+               LQUOTA_DEBUG(lqe, "delaying adjustment since qsd isn't ready");
+               qsd_adjust_schedule(lqe, true, false);
+               RETURN(0);
+       }
 
        qqi = lqe2qqi(lqe);
        qsd = qqi->qqi_qsd;
@@ -909,7 +927,7 @@ int qsd_adjust(const struct lu_env *env, struct lquota_entry *lqe)
        /* fill qb_count & qb_flags */
        if (!qsd_calc_adjust(lqe, qbody)) {
                lqe_write_unlock(lqe);
-               LQUOTA_DEBUG(lqe, "No adjust required");
+               LQUOTA_DEBUG(lqe, "no adjustment required");
                RETURN(0);
        }
 
@@ -965,7 +983,7 @@ int qsd_adjust(const struct lu_env *env, struct lquota_entry *lqe)
                                    qsd_req_completion, qqi, &qti->qti_lockh,
                                    lqe);
        } else {
-               union ldlm_wire_lvb *lvb;
+               struct lquota_lvb *lvb;
 
                OBD_ALLOC_PTR(lvb);
                if (lvb == NULL)
@@ -1058,12 +1076,12 @@ void qsd_op_end(const struct lu_env *env, struct qsd_instance *qsd,
                RETURN_EXIT;
 
        /* We don't enforce quota until the qsd_instance is started */
-       cfs_read_lock(&qsd->qsd_lock);
+       read_lock(&qsd->qsd_lock);
        if (!qsd->qsd_started) {
-               cfs_read_unlock(&qsd->qsd_lock);
+               read_unlock(&qsd->qsd_lock);
                RETURN_EXIT;
        }
-       cfs_read_unlock(&qsd->qsd_lock);
+       read_unlock(&qsd->qsd_lock);
 
        LASSERT(trans != NULL);
 
@@ -1110,12 +1128,12 @@ void qsd_op_adjust(const struct lu_env *env, struct qsd_instance *qsd,
                RETURN_EXIT;
 
        /* We don't enforce quota until the qsd_instance is started */
-       cfs_read_lock(&qsd->qsd_lock);
+       read_lock(&qsd->qsd_lock);
        if (!qsd->qsd_started) {
-               cfs_read_unlock(&qsd->qsd_lock);
+               read_unlock(&qsd->qsd_lock);
                RETURN_EXIT;
        }
-       cfs_read_unlock(&qsd->qsd_lock);
+       read_unlock(&qsd->qsd_lock);
 
        qqi = qsd->qsd_type_array[qtype];
        LASSERT(qqi);
@@ -1124,12 +1142,12 @@ void qsd_op_adjust(const struct lu_env *env, struct qsd_instance *qsd,
            qid->qid_uid == 0)
                RETURN_EXIT;
 
-       cfs_read_lock(&qsd->qsd_lock);
+       read_lock(&qsd->qsd_lock);
        if (!qsd->qsd_started) {
-               cfs_read_unlock(&qsd->qsd_lock);
+               read_unlock(&qsd->qsd_lock);
                RETURN_EXIT;
        }
-       cfs_read_unlock(&qsd->qsd_lock);
+       read_unlock(&qsd->qsd_lock);
 
        lqe = lqe_locate(env, qqi->qqi_site, qid);
        if (IS_ERR(lqe)) {