Whamcloud - gitweb
LU-8726 osd-ldiskfs: bypass read for benchmarking
[fs/lustre-release.git] / lustre / quota / qsd_lock.c
index c63d033..1109414 100644 (file)
  * GPL HEADER END
  */
 /*
- * Copyright (c) 2012, Intel Corporation.
+ * Copyright (c) 2012, 2016, Intel Corporation.
  * Use is subject to license terms.
  *
  * Author: Johann Lombardi <johann.lombardi@intel.com>
  * Author: Niu    Yawei    <yawei.niu@intel.com>
  */
 
-#ifndef EXPORT_SYMTAB
-# define EXPORT_SYMTAB
-#endif
-
 #define DEBUG_SUBSYSTEM S_LQUOTA
 
 #include <lustre_dlm.h>
 #include <obd_class.h>
+#include <lustre_swab.h>
 
 #include "qsd_internal.h"
 
+typedef int (enqi_bl_cb_t)(struct ldlm_lock *lock,
+                           struct ldlm_lock_desc *desc, void *data,
+                           int flag);
+static enqi_bl_cb_t qsd_glb_blocking_ast, qsd_id_blocking_ast;
+
+typedef int (enqi_gl_cb_t)(struct ldlm_lock *lock, void *data);
+static enqi_gl_cb_t qsd_glb_glimpse_ast, qsd_id_glimpse_ast;
+
+struct ldlm_enqueue_info qsd_glb_einfo = {
+       .ei_type        = LDLM_PLAIN,
+       .ei_mode        = LCK_CR,
+       .ei_cb_bl       = qsd_glb_blocking_ast,
+       .ei_cb_cp       = ldlm_completion_ast,
+       .ei_cb_gl       = qsd_glb_glimpse_ast,
+};
+
+struct ldlm_enqueue_info qsd_id_einfo = {
+       .ei_type        = LDLM_PLAIN,
+       .ei_mode        = LCK_CR,
+       .ei_cb_bl       = qsd_id_blocking_ast,
+       .ei_cb_cp       = ldlm_completion_ast,
+       .ei_cb_gl       = qsd_id_glimpse_ast,
+};
+
 /*
  * Return qsd_qtype_info structure associated with a global lock
  *
@@ -128,6 +149,9 @@ static int qsd_common_glimpse_ast(struct ptlrpc_request *req,
        if (*desc == NULL)
                RETURN(-EFAULT);
 
+       if (ptlrpc_req_need_swab(req))
+               lustre_swab_gl_lquota_desc(*desc);
+
        /* prepare reply */
        req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
                             sizeof(struct lquota_lvb));
@@ -166,7 +190,7 @@ static int qsd_glb_blocking_ast(struct ldlm_lock *lock,
 
                LDLM_DEBUG(lock, "blocking AST on global quota lock");
                ldlm_lock2handle(lock, &lockh);
-               rc = ldlm_cli_cancel(&lockh);
+               rc = ldlm_cli_cancel(&lockh, LCF_ASYNC);
                break;
        }
        case LDLM_CB_CANCELING: {
@@ -193,7 +217,7 @@ static int qsd_glb_blocking_ast(struct ldlm_lock *lock,
                /* kick off reintegration thread if not running already, if
                 * it's just local cancel (for stack clean up or eviction),
                 * don't re-trigger the reintegration. */
-               if ((lock->l_flags & LDLM_FL_LOCAL_ONLY) == 0)
+               if (!ldlm_is_local_only(lock))
                        qsd_start_reint_thread(qqi);
 
                lu_ref_del(&qqi->qqi_reference, "ast_data_get", lock);
@@ -201,7 +225,7 @@ static int qsd_glb_blocking_ast(struct ldlm_lock *lock,
                break;
        }
        default:
-               LASSERTF(0, "invalid flags for blocking ast %d", flag);
+               LASSERTF(0, "invalid flags for blocking ast %d\n", flag);
        }
 
        RETURN(rc);
@@ -232,13 +256,13 @@ static int qsd_glb_glimpse_ast(struct ldlm_lock *lock, void *data)
                /* valid race */
                GOTO(out, rc = -ELDLM_NO_LOCK_DATA);
 
-       CDEBUG(D_QUOTA, "%s: glimpse on glb quota locks, id:"LPU64" ver:"LPU64
-              " hard:" LPU64" soft:"LPU64"\n", qqi->qqi_qsd->qsd_svname,
+       CDEBUG(D_QUOTA, "%s: glimpse on glb quota locks, id:%llu ver:%llu"
+              " hard:" "%llu soft:%llu\n", qqi->qqi_qsd->qsd_svname,
               desc->gl_id.qid_uid, desc->gl_ver, desc->gl_hardlimit,
               desc->gl_softlimit);
 
        if (desc->gl_ver == 0) {
-               CERROR("%s: invalid global index version "LPU64"\n",
+               CERROR("%s: invalid global index version %llu\n",
                       qqi->qqi_qsd->qsd_svname, desc->gl_ver);
                GOTO(out_qqi, rc = -EINVAL);
        }
@@ -262,13 +286,7 @@ out:
        return rc;
 }
 
-struct ldlm_enqueue_info qsd_glb_einfo = { LDLM_PLAIN,
-                                          LCK_CR,
-                                          qsd_glb_blocking_ast,
-                                          ldlm_completion_ast,
-                                          qsd_glb_glimpse_ast,
-                                          NULL, NULL };
-/*
+/**
  * Blocking callback handler for per-ID lock
  *
  * \param lock - is the lock for which ast occurred.
@@ -290,70 +308,66 @@ static int qsd_id_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *de
 
                LDLM_DEBUG(lock, "blocking AST on ID quota lock");
                ldlm_lock2handle(lock, &lockh);
-               rc = ldlm_cli_cancel(&lockh);
+               rc = ldlm_cli_cancel(&lockh, LCF_ASYNC);
                break;
        }
        case LDLM_CB_CANCELING: {
                struct lu_env           *env;
                struct lquota_entry     *lqe;
-               bool                     rel = false;
 
-               LDLM_DEBUG(lock, "canceling global quota lock");
+               LDLM_DEBUG(lock, "canceling ID quota lock");
                lqe = qsd_id_ast_data_get(lock, true);
                if (lqe == NULL)
                        break;
 
                LQUOTA_DEBUG(lqe, "losing ID lock");
 
-               /* just local cancel (for stack clean up or eviction), don't
-                * release quota space in this case */
-               if ((lock->l_flags & LDLM_FL_LOCAL_ONLY) != 0) {
-                       lqe_putref(lqe);
-                       break;
-               }
-
-               /* allocate environment */
-               OBD_ALLOC_PTR(env);
-               if (env == NULL) {
-                       lqe_putref(lqe);
-                       rc = -ENOMEM;
-                       break;
-               }
-
-               /* initialize environment */
-               rc = lu_env_init(env, LCT_DT_THREAD);
-               if (rc) {
-                       OBD_FREE_PTR(env);
-                       lqe_putref(lqe);
-                       break;
-               }
-
                ldlm_lock2handle(lock, &lockh);
                lqe_write_lock(lqe);
                if (lustre_handle_equal(&lockh, &lqe->lqe_lockh)) {
                        /* Clear lqe_lockh & reset qunit to 0 */
                        qsd_set_qunit(lqe, 0);
                        memset(&lqe->lqe_lockh, 0, sizeof(lqe->lqe_lockh));
-                       lqe->lqe_edquot = false;
-                       rel = true;
+                       qsd_set_edquot(lqe, false);
                }
                lqe_write_unlock(lqe);
 
-               /* If there is qqacq inflight, the release will be skipped
+               /* If there is dqacq inflight, the release will be skipped
                 * at this time, and triggered on dqacq completion later,
                 * which means there could be a short window that slave is
                 * holding spare grant wihtout per-ID lock. */
-               if (rel)
+
+               /* don't release quota space for local cancel (stack clean
+                * up or eviction) */
+               if (!ldlm_is_local_only(lock)) {
+                       /* allocate environment */
+                       OBD_ALLOC_PTR(env);
+                       if (env == NULL) {
+                               lqe_putref(lqe);
+                               rc = -ENOMEM;
+                               break;
+                       }
+
+                       /* initialize environment */
+                       rc = lu_env_init(env, LCT_DT_THREAD);
+                       if (rc) {
+                               OBD_FREE_PTR(env);
+                               lqe_putref(lqe);
+                               break;
+                       }
+
                        rc = qsd_adjust(env, lqe);
 
+                       lu_env_fini(env);
+                       OBD_FREE_PTR(env);
+               }
+
                /* release lqe reference grabbed by qsd_id_ast_data_get() */
                lqe_putref(lqe);
-               lu_env_fini(env);
-               OBD_FREE_PTR(env);
                break;
        }
        default:
-               LASSERTF(0, "invalid flags for blocking ast %d", flag);
+               LASSERTF(0, "invalid flags for blocking ast %d\n", flag);
        }
 
        RETURN(rc);
@@ -369,7 +383,6 @@ static int qsd_id_glimpse_ast(struct ldlm_lock *lock, void *data)
 {
        struct ptlrpc_request           *req = data;
        struct lquota_entry             *lqe;
-       struct qsd_instance             *qsd;
        struct ldlm_gl_lquota_desc      *desc;
        struct lquota_lvb               *lvb;
        int                              rc;
@@ -385,11 +398,9 @@ static int qsd_id_glimpse_ast(struct ldlm_lock *lock, void *data)
                /* valid race */
                GOTO(out, rc = -ELDLM_NO_LOCK_DATA);
 
-       LQUOTA_DEBUG(lqe, "glimpse on quota locks, new qunit:"LPU64,
+       LQUOTA_DEBUG(lqe, "glimpse on quota locks, new qunit:%llu",
                     desc->gl_qunit);
 
-       qsd = lqe2qqi(lqe)->qqi_qsd;
-
        lqe_write_lock(lqe);
        lvb->lvb_id_rel = 0;
        if (desc->gl_qunit != 0 && desc->gl_qunit != lqe->lqe_qunit) {
@@ -406,13 +417,13 @@ static int qsd_id_glimpse_ast(struct ldlm_lock *lock, void *data)
                if (space > 0) {
                        if (lqe->lqe_pending_req > 0) {
                                LQUOTA_DEBUG(lqe, "request in flight, postpone "
-                                            "release of "LPD64, space);
+                                            "release of %lld", space);
                                lvb->lvb_id_may_rel = space;
                        } else {
                                lqe->lqe_pending_req++;
 
                                /* release quota space in glimpse reply */
-                               LQUOTA_DEBUG(lqe, "releasing "LPD64, space);
+                               LQUOTA_DEBUG(lqe, "releasing %lld", space);
                                lqe->lqe_granted -= space;
                                lvb->lvb_id_rel   = space;
 
@@ -429,25 +440,18 @@ static int qsd_id_glimpse_ast(struct ldlm_lock *lock, void *data)
                }
        }
 
-       lqe->lqe_edquot = !!(desc->gl_flags & LQUOTA_FL_EDQUOT);
+       qsd_set_edquot(lqe, !!(desc->gl_flags & LQUOTA_FL_EDQUOT));
        lqe_write_unlock(lqe);
 
        if (wakeup)
-               cfs_waitq_broadcast(&lqe->lqe_waiters);
+               wake_up_all(&lqe->lqe_waiters);
        lqe_putref(lqe);
 out:
        req->rq_status = rc;
        RETURN(rc);
 }
 
-struct ldlm_enqueue_info qsd_id_einfo = { LDLM_PLAIN,
-                                         LCK_CR,
-                                         qsd_id_blocking_ast,
-                                         ldlm_completion_ast,
-                                         qsd_id_glimpse_ast,
-                                         NULL, NULL };
-
-/*
+/**
  * Check whether a slave already own a ldlm lock for the quota identifier \qid.
  *
  * \param lockh  - is the local lock handle from lquota entry.
@@ -505,7 +509,7 @@ int qsd_id_lock_cancel(const struct lu_env *env, struct lquota_entry *lqe)
        if (lustre_handle_is_used(&qti->qti_lockh)) {
                memset(&lqe->lqe_lockh, 0, sizeof(lqe->lqe_lockh));
                qsd_set_qunit(lqe, 0);
-               lqe->lqe_edquot = false;
+               qsd_set_edquot(lqe, false);
        }
        lqe_write_unlock(lqe);