Whamcloud - gitweb
LU-6142 quota: Fix style issues for qsd_lock.c
[fs/lustre-release.git] / lustre / quota / qsd_lock.c
index 1649d63..f5e24a3 100644 (file)
  * GPL HEADER END
  */
 /*
- * Copyright (c) 2012, 2013, Intel Corporation.
+ * Copyright (c) 2012, 2017, Intel Corporation.
  * Use is subject to license terms.
  *
  * Author: Johann Lombardi <johann.lombardi@intel.com>
  * Author: Niu    Yawei    <yawei.niu@intel.com>
  */
 
-#ifndef EXPORT_SYMTAB
-# define EXPORT_SYMTAB
-#endif
-
 #define DEBUG_SUBSYSTEM S_LQUOTA
 
 #include <lustre_dlm.h>
 #include <obd_class.h>
+#include <lustre_swab.h>
 
 #include "qsd_internal.h"
 
 typedef int (enqi_bl_cb_t)(struct ldlm_lock *lock,
-                           struct ldlm_lock_desc *desc, void *data,
-                           int flag);
+                          struct ldlm_lock_desc *desc, void *data,
+                          int flag);
 static enqi_bl_cb_t qsd_glb_blocking_ast, qsd_id_blocking_ast;
 
 typedef int (enqi_gl_cb_t)(struct ldlm_lock *lock, void *data);
@@ -70,24 +67,26 @@ struct ldlm_enqueue_info qsd_id_einfo = {
  * \param reset - whether lock->l_ast_data should be cleared
  */
 static struct qsd_qtype_info *qsd_glb_ast_data_get(struct ldlm_lock *lock,
-                                                  bool reset) {
+                                                  bool reset)
+{
        struct qsd_qtype_info *qqi;
+
        ENTRY;
 
        lock_res_and_lock(lock);
        qqi = lock->l_ast_data;
-       if (qqi != NULL) {
+       if (qqi) {
                qqi_getref(qqi);
                if (reset)
                        lock->l_ast_data = NULL;
        }
        unlock_res_and_lock(lock);
 
-       if (qqi != NULL)
+       if (qqi)
                /* it is not safe to call lu_ref_add() under spinlock */
                lu_ref_add(&qqi->qqi_reference, "ast_data_get", lock);
 
-       if (reset && qqi != NULL) {
+       if (reset && qqi) {
                /* release qqi reference hold for the lock */
                lu_ref_del(&qqi->qqi_reference, "glb_lock", lock);
                qqi_putref(qqi);
@@ -103,20 +102,22 @@ static struct qsd_qtype_info *qsd_glb_ast_data_get(struct ldlm_lock *lock,
  * \param reset - whether lock->l_ast_data should be cleared
  */
 static struct lquota_entry *qsd_id_ast_data_get(struct ldlm_lock *lock,
-                                               bool reset) {
+                                               bool reset)
+{
        struct lquota_entry *lqe;
+
        ENTRY;
 
        lock_res_and_lock(lock);
        lqe = lock->l_ast_data;
-       if (lqe != NULL) {
+       if (lqe) {
                lqe_getref(lqe);
                if (reset)
                        lock->l_ast_data = NULL;
        }
        unlock_res_and_lock(lock);
 
-       if (reset && lqe != NULL)
+       if (reset && lqe)
                /* release lqe reference hold for the lock */
                lqe_putref(lqe);
        RETURN(lqe);
@@ -140,18 +141,22 @@ static int qsd_common_glimpse_ast(struct ptlrpc_request *req,
                                  struct ldlm_gl_lquota_desc **desc, void **lvb)
 {
        int rc;
+
        ENTRY;
 
        LASSERT(lustre_msg_get_opc(req->rq_reqmsg) == LDLM_GL_CALLBACK);
 
        /* glimpse on quota locks always packs a glimpse descriptor */
-       req_capsule_extend(&req->rq_pill, &RQF_LDLM_GL_DESC_CALLBACK);
+       req_capsule_extend(&req->rq_pill, &RQF_LDLM_GL_CALLBACK_DESC);
 
        /* extract glimpse descriptor */
        *desc = req_capsule_client_get(&req->rq_pill, &RMF_DLM_GL_DESC);
-       if (*desc == NULL)
+       if (!*desc)
                RETURN(-EFAULT);
 
+       if (ptlrpc_req_need_swab(req))
+               lustre_swab_gl_lquota_desc(*desc);
+
        /* prepare reply */
        req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
                             sizeof(struct lquota_lvb));
@@ -182,9 +187,10 @@ static int qsd_glb_blocking_ast(struct ldlm_lock *lock,
                                int flag)
 {
        int rc = 0;
+
        ENTRY;
 
-       switch(flag) {
+       switch (flag) {
        case LDLM_CB_BLOCKING: {
                struct lustre_handle lockh;
 
@@ -199,11 +205,13 @@ static int qsd_glb_blocking_ast(struct ldlm_lock *lock,
                LDLM_DEBUG(lock, "canceling global quota lock");
 
                qqi = qsd_glb_ast_data_get(lock, true);
-               if (qqi == NULL)
+               if (!qqi)
                        break;
 
-               /* we are losing the global index lock, so let's mark the
-                * global & slave indexes as not up-to-date any more */
+               /*
+                * we are losing the global index lock, so let's mark the
+                * global & slave indexes as not up-to-date any more
+                */
                write_lock(&qqi->qqi_qsd->qsd_lock);
                qqi->qqi_glb_uptodate = false;
                qqi->qqi_slv_uptodate = false;
@@ -212,12 +220,14 @@ static int qsd_glb_blocking_ast(struct ldlm_lock *lock,
                write_unlock(&qqi->qqi_qsd->qsd_lock);
 
                CDEBUG(D_QUOTA, "%s: losing global index lock for %s type\n",
-                      qqi->qqi_qsd->qsd_svname, QTYPE_NAME((qqi->qqi_qtype)));
+                      qqi->qqi_qsd->qsd_svname, qtype_name((qqi->qqi_qtype)));
 
-               /* kick off reintegration thread if not running already, if
+               /*
+                * kick off reintegration thread if not running already, if
                 * it's just local cancel (for stack clean up or eviction),
-                * don't re-trigger the reintegration. */
-               if ((lock->l_flags & LDLM_FL_LOCAL_ONLY) == 0)
+                * don't re-trigger the reintegration.
+                */
+               if (!ldlm_is_local_only(lock))
                        qsd_start_reint_thread(qqi);
 
                lu_ref_del(&qqi->qqi_reference, "ast_data_get", lock);
@@ -225,12 +235,55 @@ static int qsd_glb_blocking_ast(struct ldlm_lock *lock,
                break;
        }
        default:
-               LASSERTF(0, "invalid flags for blocking ast %d", flag);
+               LASSERTF(0, "invalid flags for blocking ast %d\n", flag);
        }
 
        RETURN(rc);
 }
 
+static int qsd_entry_def_iter_cb(struct cfs_hash *hs, struct cfs_hash_bd *bd,
+                                struct hlist_node *hnode, void *data)
+{
+       struct qsd_qtype_info *qqi = (struct qsd_qtype_info *)data;
+       struct lquota_entry *lqe;
+
+       lqe = hlist_entry(hnode, struct lquota_entry, lqe_hash);
+       LASSERT(atomic_read(&lqe->lqe_ref) > 0);
+
+       if (lqe->lqe_id.qid_uid == 0 || !lqe->lqe_is_default)
+               return 0;
+
+       lqe_write_lock(lqe);
+       if (qqi->qqi_default_hardlimit == 0 && qqi->qqi_default_softlimit == 0)
+               lqe->lqe_enforced = false;
+       else
+               lqe->lqe_enforced = true;
+       lqe_write_unlock(lqe);
+
+       return 0;
+}
+
+/* Update the quota entries after receiving default quota update
+ *
+ * \param qqi       - is the qsd_qtype_info associated with the quota entries
+ * \param hardlimit - new hardlimit of default quota
+ * \param softlimit - new softlimit of default quota
+ * \param gracetime - new gracetime of default quota
+ */
+void qsd_update_default_quota(struct qsd_qtype_info *qqi, __u64 hardlimit,
+                             __u64 softlimit, __u64 gracetime)
+{
+       CDEBUG(D_QUOTA, "%s: update default quota setting from QMT.\n",
+              qqi->qqi_qsd->qsd_svname);
+
+       qqi->qqi_default_hardlimit = hardlimit;
+       qqi->qqi_default_softlimit = softlimit;
+       qqi->qqi_default_gracetime = gracetime;
+
+       cfs_hash_for_each_safe(qqi->qqi_site->lqs_hash,
+                              qsd_entry_def_iter_cb, qqi);
+}
+
 /*
  * Glimpse callback handler for global quota lock.
  *
@@ -239,12 +292,13 @@ static int qsd_glb_blocking_ast(struct ldlm_lock *lock,
  */
 static int qsd_glb_glimpse_ast(struct ldlm_lock *lock, void *data)
 {
-       struct ptlrpc_request           *req = data;
-       struct qsd_qtype_info           *qqi;
-       struct ldlm_gl_lquota_desc      *desc;
-       struct lquota_lvb               *lvb;
-       struct lquota_glb_rec            rec;
-       int                              rc;
+       struct ptlrpc_request *req = data;
+       struct qsd_qtype_info *qqi;
+       struct ldlm_gl_lquota_desc *desc;
+       struct lquota_lvb *lvb;
+       struct lquota_glb_rec rec;
+       int rc;
+
        ENTRY;
 
        rc = qsd_common_glimpse_ast(req, &desc, (void **)&lvb);
@@ -252,17 +306,18 @@ static int qsd_glb_glimpse_ast(struct ldlm_lock *lock, void *data)
                GOTO(out, rc);
 
        qqi = qsd_glb_ast_data_get(lock, false);
-       if (qqi == NULL)
+       if (!qqi)
                /* valid race */
                GOTO(out, rc = -ELDLM_NO_LOCK_DATA);
 
-       CDEBUG(D_QUOTA, "%s: glimpse on glb quota locks, id:"LPU64" ver:"LPU64
-              " hard:" LPU64" soft:"LPU64"\n", qqi->qqi_qsd->qsd_svname,
+       CDEBUG(D_QUOTA,
+              "%s: glimpse on glb quota locks, id:%llu ver:%llu hard:%llu soft:%llu\n",
+              qqi->qqi_qsd->qsd_svname,
               desc->gl_id.qid_uid, desc->gl_ver, desc->gl_hardlimit,
               desc->gl_softlimit);
 
        if (desc->gl_ver == 0) {
-               CERROR("%s: invalid global index version "LPU64"\n",
+               CERROR("%s: invalid global index version %llu\n",
                       qqi->qqi_qsd->qsd_svname, desc->gl_ver);
                GOTO(out_qqi, rc = -EINVAL);
        }
@@ -273,8 +328,14 @@ static int qsd_glb_glimpse_ast(struct ldlm_lock *lock, void *data)
        rec.qbr_time      = desc->gl_time;
        rec.qbr_granted   = 0;
 
-       /* We can't afford disk io in the context of glimpse callback handling
-        * thread, so the on-disk global limits update has to be deferred. */
+       if (desc->gl_id.qid_uid == 0)
+               qsd_update_default_quota(qqi, desc->gl_hardlimit,
+                                        desc->gl_softlimit, desc->gl_time);
+
+       /*
+        * We can't afford disk io in the context of glimpse callback handling
+        * thread, so the on-disk global limits update has to be deferred.
+        */
        qsd_upd_schedule(qqi, NULL, &desc->gl_id, (union lquota_rec *)&rec,
                         desc->gl_ver, true);
        EXIT;
@@ -296,14 +357,16 @@ out:
  * \param flag - LDLM_CB_BLOCKING or LDLM_CB_CANCELING. Used to distinguish
  *               cancellation and blocking ast's.
  */
-static int qsd_id_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc,
+static int qsd_id_blocking_ast(struct ldlm_lock *lock,
+                              struct ldlm_lock_desc *desc,
                               void *data, int flag)
 {
-       struct lustre_handle    lockh;
-       int                     rc = 0;
+       struct lustre_handle lockh;
+       int rc = 0;
+
        ENTRY;
 
-       switch(flag) {
+       switch (flag) {
        case LDLM_CB_BLOCKING: {
 
                LDLM_DEBUG(lock, "blocking AST on ID quota lock");
@@ -312,66 +375,66 @@ static int qsd_id_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *de
                break;
        }
        case LDLM_CB_CANCELING: {
-               struct lu_env           *env;
-               struct lquota_entry     *lqe;
-               bool                     rel = false;
+               struct lu_env *env;
+               struct lquota_entry *lqe;
 
-               LDLM_DEBUG(lock, "canceling global quota lock");
+               LDLM_DEBUG(lock, "canceling ID quota lock");
                lqe = qsd_id_ast_data_get(lock, true);
-               if (lqe == NULL)
+               if (!lqe)
                        break;
 
                LQUOTA_DEBUG(lqe, "losing ID lock");
 
-               /* just local cancel (for stack clean up or eviction), don't
-                * release quota space in this case */
-               if ((lock->l_flags & LDLM_FL_LOCAL_ONLY) != 0) {
-                       lqe_putref(lqe);
-                       break;
-               }
-
-               /* allocate environment */
-               OBD_ALLOC_PTR(env);
-               if (env == NULL) {
-                       lqe_putref(lqe);
-                       rc = -ENOMEM;
-                       break;
-               }
-
-               /* initialize environment */
-               rc = lu_env_init(env, LCT_DT_THREAD);
-               if (rc) {
-                       OBD_FREE_PTR(env);
-                       lqe_putref(lqe);
-                       break;
-               }
-
                ldlm_lock2handle(lock, &lockh);
                lqe_write_lock(lqe);
                if (lustre_handle_equal(&lockh, &lqe->lqe_lockh)) {
                        /* Clear lqe_lockh & reset qunit to 0 */
                        qsd_set_qunit(lqe, 0);
                        memset(&lqe->lqe_lockh, 0, sizeof(lqe->lqe_lockh));
-                       lqe->lqe_edquot = false;
-                       rel = true;
+                       qsd_set_edquot(lqe, false);
                }
                lqe_write_unlock(lqe);
 
-               /* If there is qqacq inflight, the release will be skipped
+               /*
+                * If there is dqacq inflight, the release will be skipped
                 * at this time, and triggered on dqacq completion later,
                 * which means there could be a short window that slave is
-                * holding spare grant wihtout per-ID lock. */
-               if (rel)
+                * holding spare grant wihtout per-ID lock.
+                */
+
+               /*
+                * don't release quota space for local cancel (stack clean
+                * up or eviction)
+                */
+               if (!ldlm_is_local_only(lock)) {
+                       /* allocate environment */
+                       OBD_ALLOC_PTR(env);
+                       if (!env) {
+                               lqe_putref(lqe);
+                               rc = -ENOMEM;
+                               break;
+                       }
+
+                       /* initialize environment */
+                       rc = lu_env_init(env, LCT_DT_THREAD);
+                       if (rc) {
+                               OBD_FREE_PTR(env);
+                               lqe_putref(lqe);
+                               break;
+                       }
+
                        rc = qsd_adjust(env, lqe);
 
+                       lu_env_fini(env);
+                       OBD_FREE_PTR(env);
+               }
+
                /* release lqe reference grabbed by qsd_id_ast_data_get() */
                lqe_putref(lqe);
-               lu_env_fini(env);
-               OBD_FREE_PTR(env);
                break;
        }
        default:
-               LASSERTF(0, "invalid flags for blocking ast %d", flag);
+               LASSERTF(0, "invalid flags for blocking ast %d\n", flag);
        }
 
        RETURN(rc);
@@ -385,13 +448,13 @@ static int qsd_id_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *de
  */
 static int qsd_id_glimpse_ast(struct ldlm_lock *lock, void *data)
 {
-       struct ptlrpc_request           *req = data;
-       struct lquota_entry             *lqe;
-       struct qsd_instance             *qsd;
-       struct ldlm_gl_lquota_desc      *desc;
-       struct lquota_lvb               *lvb;
-       int                              rc;
-       bool                             wakeup = false;
+       struct ptlrpc_request *req = data;
+       struct lquota_entry *lqe;
+       struct ldlm_gl_lquota_desc *desc;
+       struct lquota_lvb *lvb;
+       int rc;
+       bool wakeup = false;
+
        ENTRY;
 
        rc = qsd_common_glimpse_ast(req, &desc, (void **)&lvb);
@@ -399,15 +462,13 @@ static int qsd_id_glimpse_ast(struct ldlm_lock *lock, void *data)
                GOTO(out, rc);
 
        lqe = qsd_id_ast_data_get(lock, false);
-       if (lqe == NULL)
+       if (!lqe)
                /* valid race */
                GOTO(out, rc = -ELDLM_NO_LOCK_DATA);
 
-       LQUOTA_DEBUG(lqe, "glimpse on quota locks, new qunit:"LPU64,
+       LQUOTA_DEBUG(lqe, "glimpse on quota locks, new qunit:%llu",
                     desc->gl_qunit);
 
-       qsd = lqe2qqi(lqe)->qqi_qsd;
-
        lqe_write_lock(lqe);
        lvb->lvb_id_rel = 0;
        if (desc->gl_qunit != 0 && desc->gl_qunit != lqe->lqe_qunit) {
@@ -423,22 +484,24 @@ static int qsd_id_glimpse_ast(struct ldlm_lock *lock, void *data)
 
                if (space > 0) {
                        if (lqe->lqe_pending_req > 0) {
-                               LQUOTA_DEBUG(lqe, "request in flight, postpone "
-                                            "release of "LPD64, space);
+                               LQUOTA_DEBUG(lqe,
+                                            "request in flight, postpone release of %lld",
+                                            space);
                                lvb->lvb_id_may_rel = space;
                        } else {
                                lqe->lqe_pending_req++;
 
                                /* release quota space in glimpse reply */
-                               LQUOTA_DEBUG(lqe, "releasing "LPD64, space);
+                               LQUOTA_DEBUG(lqe, "releasing %lld", space);
                                lqe->lqe_granted -= space;
                                lvb->lvb_id_rel   = space;
 
                                lqe_write_unlock(lqe);
                                /* change the lqe_granted */
-                               qsd_upd_schedule(lqe2qqi(lqe), lqe, &lqe->lqe_id,
-                                                (union lquota_rec *)&lqe->lqe_granted,
-                                                0, false);
+                               qsd_upd_schedule(lqe2qqi(lqe), lqe,
+                                                &lqe->lqe_id,
+                                                (union lquota_rec *)
+                                                 &lqe->lqe_granted, 0, false);
                                lqe_write_lock(lqe);
 
                                lqe->lqe_pending_req--;
@@ -447,11 +510,11 @@ static int qsd_id_glimpse_ast(struct ldlm_lock *lock, void *data)
                }
        }
 
-       lqe->lqe_edquot = !!(desc->gl_flags & LQUOTA_FL_EDQUOT);
+       qsd_set_edquot(lqe, !!(desc->gl_flags & LQUOTA_FL_EDQUOT));
        lqe_write_unlock(lqe);
 
        if (wakeup)
-               cfs_waitq_broadcast(&lqe->lqe_waiters);
+               wake_up_all(&lqe->lqe_waiters);
        lqe_putref(lqe);
 out:
        req->rq_status = rc;
@@ -469,8 +532,9 @@ out:
  */
 int qsd_id_lock_match(struct lustre_handle *lockh, struct lustre_handle *rlockh)
 {
-       struct ldlm_lock        *lock;
-       int                      rc;
+       struct ldlm_lock *lock;
+       int rc;
+
        ENTRY;
 
        LASSERT(lockh);
@@ -485,12 +549,14 @@ int qsd_id_lock_match(struct lustre_handle *lockh, struct lustre_handle *rlockh)
        LASSERT(lustre_handle_is_used(lockh));
        ldlm_lock_dump_handle(D_QUOTA, lockh);
 
-       if (rlockh == NULL)
+       if (!rlockh)
                /* caller not interested in remote handle */
                RETURN(0);
 
-       /* look up lock associated with local handle and extract remote handle
-        * to be packed in quota request */
+       /*
+        * look up lock associated with local handle and extract remote handle
+        * to be packed in quota request
+        */
        lock = ldlm_handle2lock(lockh);
        LASSERT(lock != NULL);
        lustre_handle_copy(rlockh, &lock->l_remote_handle);
@@ -501,8 +567,9 @@ int qsd_id_lock_match(struct lustre_handle *lockh, struct lustre_handle *rlockh)
 
 int qsd_id_lock_cancel(const struct lu_env *env, struct lquota_entry *lqe)
 {
-       struct qsd_thread_info  *qti = qsd_info(env);
-       int                      rc;
+       struct qsd_thread_info *qti = qsd_info(env);
+       int rc;
+
        ENTRY;
 
        lqe_write_lock(lqe);
@@ -516,7 +583,7 @@ int qsd_id_lock_cancel(const struct lu_env *env, struct lquota_entry *lqe)
        if (lustre_handle_is_used(&qti->qti_lockh)) {
                memset(&lqe->lqe_lockh, 0, sizeof(lqe->lqe_lockh));
                qsd_set_qunit(lqe, 0);
-               lqe->lqe_edquot = false;
+               qsd_set_edquot(lqe, false);
        }
        lqe_write_unlock(lqe);