Whamcloud - gitweb
LU-1842 quota: qsd lock
authorNiu Yawei <niu@whamcloud.com>
Sat, 29 Sep 2012 06:12:43 +0000 (02:12 -0400)
committerOleg Drokin <green@whamcloud.com>
Tue, 2 Oct 2012 16:58:30 +0000 (12:58 -0400)
qsd lock glimpse/blocking ast, qsd lock cancel/match functions.

Signed-off-by: Johann Lombardi <johann@whamcloud.com>
Signed-off-by: Niu Yawei <niu@whamcloud.com>
Change-Id: Ief424ca77dd9538d56689410fadfd71b01a03974
Reviewed-on: http://review.whamcloud.com/4129
Tested-by: Hudson
Reviewed-by: Johann Lombardi <johann.lombardi@intel.com>
Tested-by: Maloo <whamcloud.maloo@gmail.com>
Reviewed-by: Mike Pershin <tappro@whamcloud.com>
Reviewed-by: Alex Zhuravlev <bzzz@whamcloud.com>
lustre/quota/Makefile.in
lustre/quota/qsd_entry.c
lustre/quota/qsd_internal.h
lustre/quota/qsd_lib.c
lustre/quota/qsd_lock.c [new file with mode: 0644]
lustre/quota/qsd_reint.c [new file with mode: 0644]
lustre/quota/qsd_request.c
lustre/quota/qsd_writeback.c [new file with mode: 0644]

index 93dc089..398d01b 100644 (file)
@@ -2,7 +2,8 @@ MODULES := lquota
 
 quota-objs := lproc_quota.o lquota_lib.o lquota_disk.o lquota_entry.o
 
-qsd-objs := qsd_lib.o qsd_request.o qsd_entry.o
+qsd-objs := qsd_lib.o qsd_request.o qsd_entry.o qsd_lock.o
+qsd-objs += qsd_reint.o qsd_writeback.o
 
 lquota-objs := $(quota-objs) $(qsd-objs)
 
index b27d329..f66df72 100644 (file)
@@ -93,8 +93,8 @@ static int qsd_lqe_read(const struct lu_env *env, struct lquota_entry *lqe,
                        lqe->lqe_enforced = true;
                break;
        default:
-               LQUOTA_ERROR(lqe, "failed to read quota entry from global index"
-                            "copy, rc:%d", rc);
+               LQUOTA_ERROR(lqe, "failed to read quota entry from global "
+                            "index copy, rc:%d", rc);
                return rc;
        }
 
@@ -110,8 +110,8 @@ static int qsd_lqe_read(const struct lu_env *env, struct lquota_entry *lqe,
                lqe->lqe_granted = qti->qti_slv_rec.qsr_granted;
                break;
        default:
-               LQUOTA_ERROR(lqe, "failed to read quota entry from slave index"
-                            "copy, rc:%d", rc);
+               LQUOTA_ERROR(lqe, "failed to read quota entry from slave "
+                            "index copy, rc:%d", rc);
                return rc;
        }
 
@@ -261,7 +261,6 @@ int qsd_update_index(const struct lu_env *env, struct qsd_qtype_info *qqi,
        if (rc)
                GOTO(out, rc);
 
-       /* write lock lquota entry */
        if (global) {
                /* Update record in global index copy */
                struct lquota_glb_rec *glb_rec = (struct lquota_glb_rec *)rec;
index ac39419..c829588 100644 (file)
@@ -65,6 +65,16 @@ struct qsd_instance {
         * future. For the time being, we can just use an array. */
        struct qsd_qtype_info   *qsd_type_array[MAXQUOTAS];
 
+       /* r/w spinlock protecting:
+        * - the state flags
+        * - the qsd update list
+        * - the deferred list
+        * - flags of the qsd_qtype_info
+        *
+        * probably way too much :(
+        */
+       cfs_rwlock_t             qsd_lock;
+
        unsigned long            qsd_is_md:1,    /* managing quota for mdt */
                                 qsd_stopping:1; /* qsd_instance is stopping */
 };
@@ -89,6 +99,9 @@ struct qsd_qtype_info {
         * immutable after creation. */
        struct qsd_instance     *qqi_qsd;
 
+       /* handle of global quota lock */
+       struct lustre_handle     qqi_lockh;
+
        /* Local index files storing quota settings for this quota type */
        struct dt_object        *qqi_acct_obj; /* accounting object */
        struct dt_object        *qqi_slv_obj;  /* slave index copy */
@@ -97,6 +110,17 @@ struct qsd_qtype_info {
        /* Current object versions */
        __u64                    qqi_slv_ver; /* slave index version */
        __u64                    qqi_glb_ver; /* global index version */
+
+       /* Various flags representing the current state of the slave for this
+        * quota type. */
+       unsigned long            qqi_glb_uptodate:1, /* global index uptodate
+                                                       with master */
+                                qqi_slv_uptodate:1, /* slave index uptodate
+                                                       with master */
+                                qqi_reint:1;    /* in reintegration or not */
+
+       /* A list of references to this instance, for debugging */
+       struct lu_ref            qqi_reference;
 };
 
 /*
@@ -122,6 +146,14 @@ static inline void qqi_putref(struct qsd_qtype_info *qqi)
        cfs_atomic_dec(&qqi->qqi_ref);
 }
 
+/* all kind of operations supported by qsd_dqacq() */
+enum qsd_ops {
+       QSD_ADJ, /* adjust quota space based on current qunit */
+       QSD_ACQ, /* acquire space for requests */
+       QSD_REL, /* release all space quota space uncondionnally */
+       QSD_REP, /* report space usage during reintegration */
+};
+
 #define QSD_RES_TYPE(qsd) ((qsd)->qsd_is_md ? LQUOTA_RES_MD : LQUOTA_RES_DT)
 
 /* Common data shared by qsd-level handlers. This is allocated per-thread to
@@ -196,6 +228,16 @@ int qsd_update_lqe(const struct lu_env *, struct lquota_entry *, bool,
 int qsd_write_version(const struct lu_env *, struct qsd_qtype_info *,
                      __u64, bool);
 
+/* qsd_lock.c */
+extern struct ldlm_enqueue_info qsd_glb_einfo;
+extern struct ldlm_enqueue_info qsd_id_einfo;
+int qsd_id_lock_match(struct lustre_handle *, struct lustre_handle *);
+int qsd_id_lock_cancel(const struct lu_env *, struct lquota_entry *);
+
+/* qsd_reint.c */
+int qsd_start_reint_thread(struct qsd_qtype_info *);
+void qsd_stop_reint_thread(struct qsd_qtype_info *);
+
 /* qsd_request.c */
 typedef void (*qsd_req_completion_t) (const struct lu_env *,
                                      struct qsd_qtype_info *,
@@ -213,10 +255,16 @@ int qsd_fetch_index(const struct lu_env *, struct obd_export *,
                    struct idx_info *, unsigned int, cfs_page_t **, bool *);
 
 /* qsd_writeback.c */
-/* XXX to be replaced with real function when reintegration landed. */
-static inline void qsd_bump_version(struct qsd_qtype_info *qqi, __u64 ver,
-                                   bool global)
+void qsd_bump_version(struct qsd_qtype_info *, __u64, bool);
+void qsd_upd_schedule(struct qsd_qtype_info *, struct lquota_entry *,
+                     union lquota_id *, union lquota_rec *, __u64, bool);
+
+/* qsd_handler.c */
+/* XXX to be replaced with real function once qsd_handler landed */
+static inline int qsd_dqacq(const struct lu_env *env, struct lquota_entry *lqe,
+                           enum qsd_ops op)
 {
+       return 0;
 }
 
 #endif /* _QSD_INTERNAL_H */
index 1430410..16a5cc4 100644 (file)
@@ -88,6 +88,10 @@ static void qsd_qtype_fini(const struct lu_env *env, struct qsd_instance *qsd,
        qqi = qsd->qsd_type_array[qtype];
        qsd->qsd_type_array[qtype] = NULL;
 
+       /* by now, all qqi users should have gone away */
+       LASSERT(cfs_atomic_read(&qqi->qqi_ref) == 1);
+       lu_ref_fini(&qqi->qqi_reference);
+
        /* release accounting object */
        if (qqi->qqi_acct_obj != NULL && !IS_ERR(qqi->qqi_acct_obj)) {
                lu_object_put(env, &qqi->qqi_acct_obj->do_lu);
@@ -140,12 +144,18 @@ static int qsd_qtype_init(const struct lu_env *env, struct qsd_instance *qsd,
        if (qqi == NULL)
                RETURN(-ENOMEM);
        qsd->qsd_type_array[qtype] = qqi;
+       cfs_atomic_set(&qqi->qqi_ref, 1); /* referenced from qsd */
 
        /* set backpointer and other parameters */
        qqi->qqi_qsd   = qsd;
        qqi->qqi_qtype = qtype;
+       lu_ref_init(&qqi->qqi_reference);
        lquota_generate_fid(&qqi->qqi_fid, qsd->qsd_pool_id, QSD_RES_TYPE(qsd),
                            qtype);
+       qqi->qqi_glb_uptodate = false;
+       qqi->qqi_slv_uptodate = false;
+       qqi->qqi_reint        = false;
+       memset(&qqi->qqi_lockh, 0, sizeof(qqi->qqi_lockh));
 
         /* open accounting object */
         LASSERT(qqi->qqi_acct_obj == NULL);
@@ -274,6 +284,7 @@ struct qsd_instance *qsd_init(const struct lu_env *env, char *svname,
        if (qsd == NULL)
                RETURN(ERR_PTR(-ENOMEM));
 
+       cfs_rwlock_init(&qsd->qsd_lock);
        /* copy service name */
        strncpy(qsd->qsd_svname, svname, MAX_OBD_NAME);
 
diff --git a/lustre/quota/qsd_lock.c b/lustre/quota/qsd_lock.c
new file mode 100644 (file)
index 0000000..679cbb7
--- /dev/null
@@ -0,0 +1,512 @@
+/*
+ * GPL HEADER START
+ *
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License version 2 only,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * General Public License version 2 for more details (a copy is included
+ * in the LICENSE file that accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License
+ * version 2 along with this program; if not, write to the
+ * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+ * Boston, MA 021110-1307, USA
+ *
+ * GPL HEADER END
+ */
+/*
+ * Copyright (c) 2011, 2012, Intel, Inc.
+ * Use is subject to license terms.
+ *
+ * Author: Johann Lombardi <johann@whamcloud.com>
+ * Author: Niu    Yawei    <niu@whamcloud.com>
+ */
+
+#ifndef EXPORT_SYMTAB
+# define EXPORT_SYMTAB
+#endif
+
+#define DEBUG_SUBSYSTEM S_LQUOTA
+
+#include <lustre_dlm.h>
+#include <obd_class.h>
+
+#include "qsd_internal.h"
+
+/*
+ * Return qsd_qtype_info structure associated with a global lock
+ *
+ * \param lock - is the global lock from which we should extract the qqi
+ * \param reset - whether lock->l_ast_data should be cleared
+ */
+static struct qsd_qtype_info *qsd_glb_ast_data_get(struct ldlm_lock *lock,
+                                                  bool reset) {
+       struct qsd_qtype_info *qqi;
+       ENTRY;
+
+       lock_res_and_lock(lock);
+       qqi = lock->l_ast_data;
+       if (qqi != NULL) {
+               qqi_getref(qqi);
+               lu_ref_add(&qqi->qqi_reference, "ast_data_get", lock);
+               if (reset)
+                       lock->l_ast_data = NULL;
+       }
+       unlock_res_and_lock(lock);
+
+       if (reset && qqi != NULL) {
+               /* release qqi reference hold for the lock */
+               qqi_putref(qqi);
+               lu_ref_del(&qqi->qqi_reference, "glb_lock", lock);
+       }
+       RETURN(qqi);
+}
+
+/*
+ * Return lquota entry structure associated with a per-ID lock
+ *
+ * \param lock - is the per-ID lock from which we should extract the lquota
+ *               entry
+ * \param reset - whether lock->l_ast_data should be cleared
+ */
+static struct lquota_entry *qsd_id_ast_data_get(struct ldlm_lock *lock,
+                                               bool reset) {
+       struct lquota_entry *lqe;
+       ENTRY;
+
+       lock_res_and_lock(lock);
+       lqe = lock->l_ast_data;
+       if (lqe != NULL) {
+               lqe_getref(lqe);
+               if (reset)
+                       lock->l_ast_data = NULL;
+       }
+       unlock_res_and_lock(lock);
+
+       if (reset && lqe != NULL)
+               /* release lqe reference hold for the lock */
+               lqe_putref(lqe);
+       RETURN(lqe);
+}
+
+/*
+ * Glimpse callback handler for all quota locks. This function extracts
+ * information from the glimpse request.
+ *
+ * \param lock - is the lock targeted by the glimpse
+ * \param data - is a pointer to the glimpse ptlrpc request
+ * \param req  - is the glimpse request
+ * \param desc - is the glimpse descriptor describing the purpose of the glimpse
+ *               request.
+ * \param lvb  - is the pointer to the lvb in the reply buffer
+ *
+ * \retval 0 on success and \desc, \lvb & \arg point to a valid structures,
+ *         appropriate error on failure
+ */
+static int qsd_common_glimpse_ast(struct ptlrpc_request *req,
+                                 struct ldlm_gl_lquota_desc **desc, void **lvb)
+{
+       int rc;
+       ENTRY;
+
+       LASSERT(lustre_msg_get_opc(req->rq_reqmsg) == LDLM_GL_CALLBACK);
+
+       /* glimpse on quota locks always packs a glimpse descriptor */
+       req_capsule_extend(&req->rq_pill, &RQF_LDLM_GL_DESC_CALLBACK);
+
+       /* extract glimpse descriptor */
+       *desc = req_capsule_client_get(&req->rq_pill, &RMF_DLM_GL_DESC);
+       if (*desc == NULL)
+               RETURN(-EFAULT);
+
+       /* prepare reply */
+       rc = req_capsule_server_pack(&req->rq_pill);
+       if (rc != 0) {
+               CERROR("Can't pack response, rc %d\n", rc);
+               RETURN(rc);
+       }
+
+       /* extract lvb */
+       *lvb = req_capsule_server_get(&req->rq_pill, &RMF_DLM_LVB);
+
+       RETURN(0);
+}
+
+/*
+ * Blocking callback handler for global index lock
+ *
+ * \param lock - is the lock for which ast occurred.
+ * \param desc - is the description of a conflicting lock in case of blocking
+ *               ast.
+ * \param data - is the value of lock->l_ast_data
+ * \param flag - LDLM_CB_BLOCKING or LDLM_CB_CANCELING. Used to distinguish
+ *               cancellation and blocking ast's.
+ */
+static int qsd_glb_blocking_ast(struct ldlm_lock *lock,
+                               struct ldlm_lock_desc *desc, void *data,
+                               int flag)
+{
+       int rc = 0;
+       ENTRY;
+
+       switch(flag) {
+       case LDLM_CB_BLOCKING: {
+               struct lustre_handle lockh;
+
+               LDLM_DEBUG(lock, "blocking AST on global quota lock");
+               ldlm_lock2handle(lock, &lockh);
+               rc = ldlm_cli_cancel(&lockh);
+               break;
+       }
+       case LDLM_CB_CANCELING: {
+               struct qsd_qtype_info *qqi;
+
+               LDLM_DEBUG(lock, "canceling global quota lock");
+
+               qqi = qsd_glb_ast_data_get(lock, true);
+               if (qqi == NULL)
+                       break;
+
+               /* we are losing the global index lock, so let's mark the
+                * global & slave indexes as not up-to-date any more */
+               cfs_write_lock(&qqi->qqi_qsd->qsd_lock);
+               qqi->qqi_glb_uptodate = false;
+               qqi->qqi_slv_uptodate = false;
+               if (lock->l_handle.h_cookie == qqi->qqi_lockh.cookie)
+                       memset(&qqi->qqi_lockh, 0, sizeof(qqi->qqi_lockh));
+               cfs_write_unlock(&qqi->qqi_qsd->qsd_lock);
+
+               CDEBUG(D_QUOTA, "%s: losing global index lock for %s type\n",
+                      qqi->qqi_qsd->qsd_svname, QTYPE_NAME((qqi->qqi_qtype)));
+
+               /* kick off reintegration thread if not running already, if
+                * it's just local cancel (for stack clean up or eviction),
+                * don't re-trigger the reintegration. */
+               if ((lock->l_flags & LDLM_FL_LOCAL_ONLY) == 0)
+                       qsd_start_reint_thread(qqi);
+
+               lu_ref_del(&qqi->qqi_reference, "ast_data_get", lock);
+               qqi_putref(qqi);
+               break;
+       }
+       default:
+               LASSERTF(0, "invalid flags for blocking ast %d", flag);
+       }
+
+       RETURN(rc);
+}
+
+/*
+ * Glimpse callback handler for global quota lock.
+ *
+ * \param lock - is the lock targeted by the glimpse
+ * \param data - is a pointer to the glimpse ptlrpc request
+ */
+static int qsd_glb_glimpse_ast(struct ldlm_lock *lock, void *data)
+{
+       struct ptlrpc_request           *req = data;
+       struct qsd_qtype_info           *qqi;
+       struct ldlm_gl_lquota_desc      *desc;
+       struct lquota_lvb               *lvb;
+       struct lquota_glb_rec            rec;
+       int                              rc;
+       ENTRY;
+
+       rc = qsd_common_glimpse_ast(req, &desc, (void **)&lvb);
+       if (rc)
+               GOTO(out, rc);
+
+       qqi = qsd_glb_ast_data_get(lock, false);
+       if (qqi == NULL)
+               /* valid race */
+               GOTO(out, rc = -ELDLM_NO_LOCK_DATA);
+
+       LCONSOLE_INFO("%s: glimpse on glb quota locks, id:"LPU64" ver:"LPU64
+                     " hard:" LPU64" soft:"LPU64"\n", qqi->qqi_qsd->qsd_svname,
+                     desc->gl_id.qid_uid, desc->gl_ver, desc->gl_hardlimit,
+                     desc->gl_softlimit);
+
+       if (desc->gl_ver == 0) {
+               CERROR("%s: invalid global index version "LPU64"\n",
+                      qqi->qqi_qsd->qsd_svname, desc->gl_ver);
+               GOTO(out_qqi, rc = -EINVAL);
+       }
+
+       /* extract new hard & soft limits from the glimpse descriptor */
+       rec.qbr_hardlimit = desc->gl_hardlimit;
+       rec.qbr_softlimit = desc->gl_softlimit;
+       rec.qbr_time      = 0;
+       rec.qbr_granted   = 0;
+
+       /* We can't afford disk io in the context of glimpse callback handling
+        * thread, so the on-disk global limits update has to be deferred. */
+       qsd_upd_schedule(qqi, NULL, &desc->gl_id, (union lquota_rec *)&rec,
+                        desc->gl_ver, true);
+       EXIT;
+out_qqi:
+       lu_ref_del(&qqi->qqi_reference, "ast_data_get", lock);
+       qqi_putref(qqi);
+out:
+       req->rq_status = rc;
+       return rc;
+}
+
+struct ldlm_enqueue_info qsd_glb_einfo = { LDLM_PLAIN,
+                                          LCK_CR,
+                                          qsd_glb_blocking_ast,
+                                          ldlm_completion_ast,
+                                          qsd_glb_glimpse_ast,
+                                          NULL, NULL };
+/*
+ * Blocking callback handler for per-ID lock
+ *
+ * \param lock - is the lock for which ast occurred.
+ * \param desc - is the description of a conflicting lock in case of blocking
+ *               ast.
+ * \param data - is the value of lock->l_ast_data
+ * \param flag - LDLM_CB_BLOCKING or LDLM_CB_CANCELING. Used to distinguish
+ *               cancellation and blocking ast's.
+ */
+static int qsd_id_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc,
+                              void *data, int flag)
+{
+       struct lustre_handle    lockh;
+       int                     rc = 0;
+       ENTRY;
+
+       switch(flag) {
+       case LDLM_CB_BLOCKING: {
+
+               LDLM_DEBUG(lock, "blocking AST on ID quota lock");
+               ldlm_lock2handle(lock, &lockh);
+               rc = ldlm_cli_cancel(&lockh);
+               break;
+       }
+       case LDLM_CB_CANCELING: {
+               struct lu_env           *env;
+               struct lquota_entry     *lqe;
+               bool                     rel = false;
+
+               LDLM_DEBUG(lock, "canceling global quota lock");
+               lqe = qsd_id_ast_data_get(lock, true);
+               if (lqe == NULL)
+                       break;
+
+               LQUOTA_DEBUG(lqe, "losing ID lock");
+
+               /* just local cancel (for stack clean up or eviction), don't
+                * release quota space in this case */
+               if ((lock->l_flags & LDLM_FL_LOCAL_ONLY) != 0) {
+                       lqe_putref(lqe);
+                       break;
+               }
+
+               /* allocate environment */
+               OBD_ALLOC_PTR(env);
+               if (env == NULL) {
+                       lqe_putref(lqe);
+                       rc = -ENOMEM;
+                       break;
+               }
+
+               /* initialize environment */
+               rc = lu_env_init(env, LCT_DT_THREAD);
+               if (rc) {
+                       OBD_FREE_PTR(env);
+                       lqe_putref(lqe);
+                       break;
+               }
+
+               ldlm_lock2handle(lock, &lockh);
+               lqe_write_lock(lqe);
+               if (lustre_handle_equal(&lockh, &lqe->lqe_lockh)) {
+                       /* Clear lqe_lockh & reset qunit to 0 */
+                       qsd_set_qunit(lqe, 0);
+                       memset(&lqe->lqe_lockh, 0, sizeof(lqe->lqe_lockh));
+                       lqe->lqe_edquot = false;
+                       rel = true;
+               }
+               lqe_write_unlock(lqe);
+
+               /* If there is qqacq inflight, the release will be skipped
+                * at this time, and triggered on dqacq completion later,
+                * which means there could be a short window that slave is
+                * holding spare grant wihtout per-ID lock. */
+               if (rel)
+                       rc = qsd_dqacq(env, lqe, QSD_REL);
+
+               /* release lqe reference grabbed by qsd_id_ast_data_get() */
+               lqe_putref(lqe);
+               lu_env_fini(env);
+               OBD_FREE_PTR(env);
+               break;
+       }
+       default:
+               LASSERTF(0, "invalid flags for blocking ast %d", flag);
+       }
+
+       RETURN(rc);
+}
+
+/*
+ * Glimpse callback handler for per-ID quota locks.
+ *
+ * \param lock - is the lock targeted by the glimpse
+ * \param data - is a pointer to the glimpse ptlrpc request
+ */
+static int qsd_id_glimpse_ast(struct ldlm_lock *lock, void *data)
+{
+       struct ptlrpc_request           *req = data;
+       struct lquota_entry             *lqe;
+       struct qsd_instance             *qsd;
+       struct ldlm_gl_lquota_desc      *desc;
+       struct lquota_lvb               *lvb;
+       int                              rc;
+       bool                             wakeup = false;
+       ENTRY;
+
+       rc = qsd_common_glimpse_ast(req, &desc, (void **)&lvb);
+       if (rc)
+               GOTO(out, rc);
+
+       lqe = qsd_id_ast_data_get(lock, false);
+       if (lqe == NULL)
+               /* valid race */
+               GOTO(out, rc = -ELDLM_NO_LOCK_DATA);
+
+       LQUOTA_CONSOLE(lqe, "glimpse on quota locks, new qunit:"LPU64,
+                      desc->gl_qunit);
+
+       qsd = lqe2qqi(lqe)->qqi_qsd;
+
+       lqe_write_lock(lqe);
+       lvb->lvb_id_rel = 0;
+       if (desc->gl_qunit != 0 && desc->gl_qunit != lqe->lqe_qunit) {
+               long long space;
+
+               /* extract new qunit from glimpse request */
+               qsd_set_qunit(lqe, desc->gl_qunit);
+
+               space  = lqe->lqe_granted - lqe->lqe_pending_rel;
+               space -= lqe->lqe_usage;
+               space -= lqe->lqe_pending_write + lqe->lqe_waiting_write;
+               space -= lqe->lqe_qunit;
+
+               if (space > 0) {
+                       if (lqe->lqe_pending_req > 0) {
+                               LQUOTA_ERROR(lqe, "request in flight, postpone "
+                                            "release of "LPD64, space);
+                               lvb->lvb_id_may_rel = space;
+                       } else {
+                               lqe->lqe_pending_req++;
+
+                               /* release quota space in glimpse reply */
+                               LQUOTA_ERROR(lqe, "releasing "LPD64, space);
+                               lqe->lqe_granted -= space;
+                               lvb->lvb_id_rel   = space;
+
+                               lqe_write_unlock(lqe);
+                               /* change the lqe_granted */
+                               qsd_upd_schedule(lqe2qqi(lqe), lqe, &lqe->lqe_id,
+                                                (union lquota_rec *)&lqe->lqe_granted,
+                                                0, false);
+                               lqe_write_lock(lqe);
+
+                               lqe->lqe_pending_req--;
+                               wakeup = true;
+                       }
+               }
+       }
+
+       lqe->lqe_edquot = !!(desc->gl_flags & LQUOTA_FL_EDQUOT);
+       lqe_write_unlock(lqe);
+
+       if (wakeup)
+               cfs_waitq_broadcast(&lqe->lqe_waiters);
+       lqe_putref(lqe);
+out:
+       req->rq_status = rc;
+       RETURN(rc);
+}
+
+struct ldlm_enqueue_info qsd_id_einfo = { LDLM_PLAIN,
+                                         LCK_CR,
+                                         qsd_id_blocking_ast,
+                                         ldlm_completion_ast,
+                                         qsd_id_glimpse_ast,
+                                         NULL, NULL };
+
+/*
+ * Check whether a slave already own a ldlm lock for the quota identifier \qid.
+ *
+ * \param lockh  - is the local lock handle from lquota entry.
+ * \param rlockh - is the remote lock handle of the matched lock, if any.
+ *
+ * \retval 0      : on successful look up and \lockh contains the lock handle.
+ * \retval -ENOENT: no lock found
+ */
+int qsd_id_lock_match(struct lustre_handle *lockh, struct lustre_handle *rlockh)
+{
+       struct ldlm_lock        *lock;
+       int                      rc;
+       ENTRY;
+
+       LASSERT(lockh);
+
+       if (!lustre_handle_is_used(lockh))
+               RETURN(-ENOENT);
+
+       rc = ldlm_lock_addref_try(lockh, qsd_id_einfo.ei_mode);
+       if (rc)
+               RETURN(-ENOENT);
+
+       LASSERT(lustre_handle_is_used(lockh));
+       ldlm_lock_dump_handle(D_QUOTA, lockh);
+
+       if (rlockh == NULL)
+               RETURN(0);
+
+       /* look up lock associated with local handle and extract remote handle
+        * to be packed in quota request */
+       lock = ldlm_handle2lock(lockh);
+       LASSERT(lock != NULL);
+       lustre_handle_copy(rlockh, &lock->l_remote_handle);
+       LDLM_LOCK_PUT(lock);
+
+       RETURN(0);
+}
+
+int qsd_id_lock_cancel(const struct lu_env *env, struct lquota_entry *lqe)
+{
+       struct qsd_thread_info  *qti = qsd_info(env);
+       int                      rc;
+       ENTRY;
+
+       lqe_write_lock(lqe);
+       if (lqe->lqe_pending_write || lqe->lqe_waiting_write ||
+           lqe->lqe_usage || lqe->lqe_granted) {
+               lqe_write_unlock(lqe);
+               RETURN(0);
+       }
+
+       lustre_handle_copy(&qti->qti_lockh, &lqe->lqe_lockh);
+       if (lustre_handle_is_used(&qti->qti_lockh)) {
+               memset(&lqe->lqe_lockh, 0, sizeof(lqe->lqe_lockh));
+               qsd_set_qunit(lqe, 0);
+               lqe->lqe_edquot = false;
+       }
+       lqe_write_unlock(lqe);
+
+       rc = qsd_id_lock_match(&qti->qti_lockh, NULL);
+       if (rc)
+               RETURN(rc);
+
+       ldlm_lock_decref_and_cancel(&qti->qti_lockh, qsd_id_einfo.ei_mode);
+       RETURN(0);
+}
diff --git a/lustre/quota/qsd_reint.c b/lustre/quota/qsd_reint.c
new file mode 100644 (file)
index 0000000..64405fc
--- /dev/null
@@ -0,0 +1,46 @@
+/*
+ * GPL HEADER START
+ *
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License version 2 only,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * General Public License version 2 for more details (a copy is included
+ * in the LICENSE file that accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License
+ * version 2 along with this program; if not, write to the
+ * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+ * Boston, MA 021110-1307, USA
+ *
+ * GPL HEADER END
+ */
+/*
+ * Copyright (c) 2011, 2012, Intel, Inc.
+ * Use is subject to license terms.
+ *
+ * Author: Johann Lombardi <johann@whamcloud.com>
+ * Author: Niu    Yawei    <niu@whamcloud.com>
+ */
+
+#ifndef EXPORT_SYMTAB
+# define EXPORT_SYMTAB
+#endif
+
+#define DEBUG_SUBSYSTEM S_LQUOTA
+
+#include "qsd_internal.h"
+
+void qsd_stop_reint_thread(struct qsd_qtype_info *qqi)
+{
+}
+
+int qsd_start_reint_thread(struct qsd_qtype_info *qqi)
+{
+       RETURN(0);
+}
index 0bad3de..57574a1 100644 (file)
@@ -186,10 +186,6 @@ static int qsd_intent_interpret(const struct lu_env *env,
        RETURN(rc);
 }
 
-/* XXX to be removed when the qsd lock landed */
-struct ldlm_enqueue_info qsd_glb_einfo;
-struct ldlm_enqueue_info qsd_id_einfo;
-
 /*
  * Get intent per-ID lock or global-index lock from master.
  *
diff --git a/lustre/quota/qsd_writeback.c b/lustre/quota/qsd_writeback.c
new file mode 100644 (file)
index 0000000..40b064e
--- /dev/null
@@ -0,0 +1,63 @@
+/*
+ * GPL HEADER START
+ *
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License version 2 only,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * General Public License version 2 for more details (a copy is included
+ * in the LICENSE file that accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License
+ * version 2 along with this program; if not, write to the
+ * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+ * Boston, MA 021110-1307, USA
+ *
+ * GPL HEADER END
+ */
+/*
+ * Copyright (c) 2011, 2012, Intel, Inc.
+ * Use is subject to license terms.
+ *
+ * Author: Johann Lombardi <johann@whamcloud.com>
+ * Author: Niu    Yawei    <niu@whamcloud.com>
+ */
+
+#ifndef EXPORT_SYMTAB
+# define EXPORT_SYMTAB
+#endif
+
+#define DEBUG_SUBSYSTEM S_LQUOTA
+
+#include "qsd_internal.h"
+
+/* Bump version of global or slave index copy
+ *
+ * \param qqi    - qsd_qtype_info
+ * \param ver    - version to be bumped to
+ * \param global - global or slave index copy?
+ */
+void qsd_bump_version(struct qsd_qtype_info *qqi, __u64 ver, bool global)
+{
+}
+
+/*
+ * Schedule a commit of a lquota entry
+ *
+ * \param  qqi   - qsd_qtype_info
+ * \param  lqe   - lquota_entry
+ * \param  qid   - quota id
+ * \param  rec   - global or slave record to be updated to disk
+ * \param  ver   - new index file version
+ * \param  global- ture : master record; false : slave record
+ */
+void qsd_upd_schedule(struct qsd_qtype_info *qqi, struct lquota_entry *lqe,
+                     union lquota_id *qid, union lquota_rec *rec, __u64 ver,
+                     bool global)
+{
+}