Whamcloud - gitweb
LU-1842 quota: add core QSD code
authorJohann Lombardi <johann.lombardi@intel.com>
Thu, 4 Oct 2012 14:18:26 +0000 (16:18 +0200)
committerOleg Drokin <green@whamcloud.com>
Sat, 6 Oct 2012 19:06:01 +0000 (15:06 -0400)
Add acquire/release logic on the qsd side as well as reintegration
which is handled in 3 steps:
- enqueue global index lock
- fetch global index and update local enforce list
- fetch slave index and update local copy
- adjust/release quota space as appropriate

This patch also fixes the ofd code to call ->ldo_recovery_complete
across the stack.

Signed-off-by: Johann Lombardi <johann.lombardi@intel.com>
Change-Id: Ib4a8910fe34de878159827c76b40a8f899aaad10
Signed-off-by: Niu Yawei <yawei.niu@intel.com>
Reviewed-on: http://review.whamcloud.com/4182
Tested-by: Hudson
Reviewed-by: Alex Zhuravlev <bzzz@whamcloud.com>
Reviewed-by: Jinshan Xiong <jinshan.xiong@whamcloud.com>
Tested-by: Maloo <whamcloud.maloo@gmail.com>
Reviewed-by: Niu Yawei <niu@whamcloud.com>
Reviewed-by: Oleg Drokin <green@whamcloud.com>
17 files changed:
lustre/include/lquota.h
lustre/ofd/ofd_dev.c
lustre/ofd/ofd_internal.h
lustre/ofd/ofd_obd.c
lustre/osd-ldiskfs/osd_handler.c
lustre/osd-zfs/osd_handler.c
lustre/osp/osp_dev.c
lustre/quota/Makefile.in
lustre/quota/lquota_internal.h
lustre/quota/qsd_config.c
lustre/quota/qsd_handler.c [new file with mode: 0644]
lustre/quota/qsd_internal.h
lustre/quota/qsd_lib.c
lustre/quota/qsd_lock.c
lustre/quota/qsd_reint.c
lustre/quota/qsd_request.c
lustre/quota/qsd_writeback.c

index 7af5baa..043daf1 100644 (file)
@@ -122,36 +122,48 @@ struct qsd_instance;
  *                  needed. qsd_prepare() should typically be called when
  *                  ->ldo_prepare is invoked.
  *
+ * - qsd_start(): a qsd instance should be started once recovery is completed
+ *                (i.e. when ->ldo_recovery_complete is called). This is used
+ *                to notify the qsd layer that quota should now be enforced
+ *                again via the qsd_op_begin/end functions. The last step of the
+ *                reintegration prodecure (namely usage reconciliation) will be
+ *                completed during start.
+ *
  * - qsd_fini(): is used to release a qsd_instance structure allocated with
  *               qsd_init(). This releases all quota slave objects and frees the
  *               structures associated with the qsd_instance.
  *
+ * - qsd_op_begin(): is used to enforce quota, it must be called in the
+ *                   declaration of each operation. qsd_op_end() should then be
+ *                   invoked later once all operations have been completed in
+ *                   order to release/adjust the quota space.
+ *                   Running qsd_op_begin() before qsd_start() isn't fatal and
+ *                   will return success.
+ *                   Once qsd_start() has been run, qsd_op_begin() will block
+ *                   until the reintegration procedure is completed.
+ *
+ * - qsd_op_end(): performs the post operation quota processing. This must be
+ *                 called after the operation transaction stopped.
+ *                 While qsd_op_begin() must be invoked each time a new
+ *                 operation is declared, qsd_op_end() should be called only
+ *                 once for the whole transaction.
+ *
+ * - qsd_adjust_quota(): triggers pre-acquire/release if necessary.
+ *
  * Below are the function prototypes to be used by OSD layer to manage quota
  * enforcement. Arguments are documented where each function is defined.  */
 
 struct qsd_instance *qsd_init(const struct lu_env *, char *, struct dt_device *,
                              cfs_proc_dir_entry_t *);
-
 int qsd_prepare(const struct lu_env *, struct qsd_instance *);
-
+int qsd_start(const struct lu_env *, struct qsd_instance *);
 void qsd_fini(const struct lu_env *, struct qsd_instance *);
-
-/* XXX: dummy qsd_op_begin() & qsd_op_end(), will be replaced with the real
- *      one once all the enforcement code landed. */
-static inline int qsd_op_begin(const struct lu_env *env,
-                              struct qsd_instance *qsd,
-                              struct lquota_trans *trans,
-                              struct lquota_id_info *qi,
-                              int *flags)
-{
-       return 0;
-}
-
-static inline void qsd_op_end(const struct lu_env *env,
-                             struct qsd_instance *qsd,
-                             struct lquota_trans *trans)
-{
-}
+int qsd_op_begin(const struct lu_env *, struct qsd_instance *,
+                struct lquota_trans *, struct lquota_id_info *, int *);
+void qsd_op_end(const struct lu_env *, struct qsd_instance *,
+               struct lquota_trans *);
+void qsd_adjust_quota(const struct lu_env *, struct qsd_instance *,
+                     union lquota_id *, int);
 
 /*
  * Quota information attached to a transaction
index a637f03..302f435 100644 (file)
@@ -293,17 +293,35 @@ static struct lu_object *ofd_object_alloc(const struct lu_env *env,
 
 extern int ost_handle(struct ptlrpc_request *req);
 
-static int ofd_start(const struct lu_env *env, struct lu_device *dev)
+static int ofd_prepare(const struct lu_env *env, struct lu_device *pdev,
+                      struct lu_device *dev)
 {
+       struct ofd_thread_info *info;
        struct ofd_device       *ofd = ofd_dev(dev);
+       struct obd_device       *obd = ofd_obd(ofd);
        struct lu_device        *next = &ofd->ofd_osd->dd_lu_dev;
        int                      rc;
 
        ENTRY;
 
+       rc = lu_env_refill((struct lu_env *)env);
+       if (rc != 0) {
+               CERROR("Failure to refill session: '%d'\n", rc);
+               RETURN(rc);
+       }
+
+       info = ofd_info_init(env, NULL);
+       if (info == NULL)
+               RETURN(-EFAULT);
+
        /* initialize lower device */
        rc = next->ld_ops->ldo_prepare(env, dev, next);
 
+       target_recovery_init(&ofd->ofd_lut, ost_handle);
+
+       if (obd->obd_recovering == 0)
+               ofd_postrecov(env, ofd);
+
        RETURN(rc);
 }
 
@@ -331,6 +349,7 @@ static struct lu_device_operations ofd_lu_ops = {
        .ldo_object_alloc       = ofd_object_alloc,
        .ldo_process_config     = ofd_process_config,
        .ldo_recovery_complete  = ofd_recovery_complete,
+       .ldo_prepare            = ofd_prepare,
 };
 
 static int ofd_procfs_init(struct ofd_device *ofd)
@@ -546,10 +565,6 @@ static int ofd_init0(const struct lu_env *env, struct ofd_device *m,
        m->ofd_grant_ratio =
                ofd_grant_ratio_conv(m->ofd_dt_conf.ddp_grant_reserved);
 
-       rc = ofd_start(env, &m->ofd_dt_dev.dd_lu_dev);
-       if (rc)
-               GOTO(err_fini_stack, rc);
-
        rc = lut_init(env, &m->ofd_lut, obd, m->ofd_osd);
        if (rc)
                GOTO(err_free_ns, rc);
@@ -558,8 +573,6 @@ static int ofd_init0(const struct lu_env *env, struct ofd_device *m,
        if (rc)
                GOTO(err_fini_lut, rc);
 
-       target_recovery_init(&m->ofd_lut, ost_handle);
-
        RETURN(0);
 err_fini_lut:
        lut_fini(env, &m->ofd_lut);
index e130a08..cc5fe41 100644 (file)
@@ -308,6 +308,7 @@ void ofd_free_capa_keys(struct ofd_device *ofd);
 
 /* ofd_dev.c */
 extern struct lu_context_key ofd_thread_key;
+int ofd_postrecov(const struct lu_env *env, struct ofd_device *ofd);
 
 /* ofd_obd.c */
 extern struct obd_ops ofd_obd_ops;
index 556eb4b..c44524d 100644 (file)
@@ -425,6 +425,14 @@ static int ofd_destroy_export(struct obd_export *exp)
        return 0;
 }
 
+int ofd_postrecov(const struct lu_env *env, struct ofd_device *ofd)
+{
+       struct lu_device *ldev = &ofd->ofd_dt_dev.dd_lu_dev;
+
+       CDEBUG(D_HA, "%s: recovery is over\n", ofd_obd(ofd)->obd_name);
+       return ldev->ld_ops->ldo_recovery_complete(env, ldev);
+}
+
 int ofd_obd_postrecov(struct obd_device *obd)
 {
        struct lu_env            env;
@@ -438,7 +446,8 @@ int ofd_obd_postrecov(struct obd_device *obd)
                RETURN(rc);
        ofd_info_init(&env, obd->obd_self_export);
 
-       rc = ldev->ld_ops->ldo_recovery_complete(&env, ldev);
+       rc = ofd_postrecov(&env, ofd_dev(ldev));
+
        lu_env_fini(&env);
        RETURN(rc);
 }
index e5797c3..66600be 100644 (file)
@@ -837,8 +837,24 @@ static void osd_object_delete(const struct lu_env *env, struct lu_object *l)
 
         osd_index_fini(obj);
         if (inode != NULL) {
+               struct qsd_instance     *qsd = osd_obj2dev(obj)->od_quota_slave;
+               qid_t                    uid = inode->i_uid;
+               qid_t                    gid = inode->i_gid;
+
                 iput(inode);
                 obj->oo_inode = NULL;
+
+               if (qsd != NULL) {
+                       struct osd_thread_info  *info = osd_oti_get(env);
+                       struct lquota_id_info   *qi = &info->oti_qi;
+
+                       /* Release granted quota to master if necessary */
+                       qi->lqi_id.qid_uid = uid;
+                       qsd_adjust_quota(env, qsd, &qi->lqi_id, USRQUOTA);
+
+                       qi->lqi_id.qid_uid = gid;
+                       qsd_adjust_quota(env, qsd, &qi->lqi_id, GRPQUOTA);
+               }
         }
 }
 
@@ -4624,7 +4640,17 @@ static int osd_process_config(const struct lu_env *env,
 static int osd_recovery_complete(const struct lu_env *env,
                                  struct lu_device *d)
 {
-        RETURN(0);
+       struct osd_device       *osd = osd_dev(d);
+       int                      rc = 0;
+       ENTRY;
+
+       if (osd->od_quota_slave == NULL)
+               RETURN(0);
+
+       /* start qsd instance on recovery completion, this notifies the quota
+        * slave code that we are about to process new requests now */
+       rc = qsd_start(env, osd->od_quota_slave);
+       RETURN(rc);
 }
 
 /*
index 9f0c539..c107ff0 100644 (file)
@@ -754,8 +754,17 @@ static int osd_process_config(const struct lu_env *env,
 
 static int osd_recovery_complete(const struct lu_env *env, struct lu_device *d)
 {
+       struct osd_device       *osd = osd_dev(d);
+       int                      rc = 0;
        ENTRY;
-       RETURN(0);
+
+       if (osd->od_quota_slave == NULL)
+               RETURN(0);
+
+       /* start qsd instance on recovery completion, this notifies the quota
+        * slave code that we are about to process new requests now */
+       rc = qsd_start(env, osd->od_quota_slave);
+       RETURN(rc);
 }
 
 /*
index 78d4943..e99b663 100644 (file)
@@ -713,6 +713,10 @@ static int osp_obd_connect(const struct lu_env *env, struct obd_export **exp,
                                 OBD_CONNECT_SKIP_ORPHAN |
                                 OBD_CONNECT_VERSION |
                                 OBD_CONNECT_FID;
+
+       if (is_osp_on_ost(osp->opd_obd->obd_name))
+               ocd->ocd_connect_flags |= OBD_CONNECT_LIGHTWEIGHT;
+
        ocd->ocd_version = LUSTRE_VERSION_CODE;
        LASSERT(data->ocd_connect_flags & OBD_CONNECT_INDEX);
        ocd->ocd_index = data->ocd_index;
index 7b61887..4097c3c 100644 (file)
@@ -1,17 +1,15 @@
 MODULES := lquota
 
-qmt-objs := qmt_dev.o qmt_handler.o qmt_lock.o qmt_entry.o qmt_pool.o
-
 quota-objs := lproc_quota.o lquota_lib.o lquota_disk.o lquota_entry.o
 
-qsd-objs := qsd_lib.o qsd_request.o qsd_entry.o qsd_lock.o
-qsd-objs += qsd_reint.o qsd_writeback.o qsd_config.o
+qmt-objs := qmt_dev.o qmt_handler.o qmt_lock.o qmt_entry.o qmt_pool.o
 
-lquota-objs := $(quota-objs) $(qsd-objs)
+qsd-objs := qsd_lib.o qsd_request.o qsd_entry.o qsd_lock.o
+qsd-objs += qsd_reint.o qsd_writeback.o qsd_config.o qsd_handler.o
 
-lquota-objs += $(qmt-objs)
+lquota-objs := $(quota-objs) $(qsd-objs) $(qmt-objs)
 
-EXTRA_DIST := $(lquota-objs:%.o=%.c) lquota_internal.h qsd_internal.h
-EXTRA_DIST += qmt_internal.h
+EXTRA_DIST := $(lquota-objs:%.o=%.c)
+EXTRA_DIST += lquota_internal.h qsd_internal.h qmt_internal.h
 
 @INCLUDE_RULES@
index 7b2bc57..6bd6924 100644 (file)
@@ -265,6 +265,9 @@ static inline void lqe_read_unlock(struct lquota_entry *lqe)
 #define LQUOTA_LEAST_QUNIT(type) \
        (type == LQUOTA_RES_MD ? (1 << 10) : toqb(PTLRPC_MAX_BRW_SIZE))
 
+#define LQUOTA_OVER_FL(type) \
+       (type == USRQUOTA ? QUOTA_FL_OVER_USRQUOTA : QUOTA_FL_OVER_GRPQUOTA)
+
 /* Common data shared by quota-level handlers. This is allocated per-thread to
  * reduce stack consumption */
 struct lquota_thread_info {
index 26c06bb..0ad0d64 100644 (file)
@@ -136,6 +136,7 @@ int qsd_process_config(struct lustre_cfg *lcfg)
        char                    *cfgstr = lustre_cfg_string(lcfg, 1);
        char                    *keystr, *valstr;
        int                      rc, pool, enabled = 0;
+       bool                     reint = false;
        ENTRY;
 
        CDEBUG(D_QUOTA, "processing quota parameter: fs:%s cfgstr:%s\n", fsname,
@@ -153,7 +154,7 @@ int qsd_process_config(struct lustre_cfg *lcfg)
 
        qfs = qsd_get_fsinfo(fsname, 0);
        if (qfs == NULL) {
-               CERROR("Fail to find quota filesystem information for %s\n",
+               CERROR("failed to find quota filesystem information for %s\n",
                       fsname);
                RETURN(-ENOENT);
        }
@@ -163,12 +164,42 @@ int qsd_process_config(struct lustre_cfg *lcfg)
        if (strchr(valstr, 'g'))
                enabled |= 1 << GRPQUOTA;
 
+       cfs_down(&qfs->qfs_sem);
        if (qfs->qfs_enabled[pool - LQUOTA_FIRST_RES] == enabled)
                /* no change required */
                GOTO(out, rc = 0);
 
+       if ((qfs->qfs_enabled[pool - LQUOTA_FIRST_RES] & enabled) != enabled)
+               reint = true;
+
        qfs->qfs_enabled[pool - LQUOTA_FIRST_RES] = enabled;
+
+       /* trigger reintegration for all qsd */
+       if (reint) {
+               struct qsd_instance     *qsd;
+               struct qsd_qtype_info   *qqi;
+
+               cfs_list_for_each_entry(qsd, &qfs->qfs_qsd_list, qsd_link) {
+                       bool    skip = false;
+                       int     type;
+
+                       /* start reintegration only if qsd_prepare() was
+                        * successfully called */
+                       cfs_read_lock(&qsd->qsd_lock);
+                       if (!qsd->qsd_prepared)
+                               skip = true;
+                       cfs_read_unlock(&qsd->qsd_lock);
+                       if (skip)
+                               continue;
+
+                       for (type = USRQUOTA; type < MAXQUOTAS; type++) {
+                               qqi = qsd->qsd_type_array[type];
+                               qsd_start_reint_thread(qqi);
+                       }
+               }
+       }
 out:
+       cfs_up(&qfs->qfs_sem);
        qsd_put_fsinfo(qfs);
        RETURN(0);
 }
diff --git a/lustre/quota/qsd_handler.c b/lustre/quota/qsd_handler.c
new file mode 100644 (file)
index 0000000..b1ec4a0
--- /dev/null
@@ -0,0 +1,956 @@
+/*
+ * GPL HEADER START
+ *
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License version 2 only,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * General Public License version 2 for more details (a copy is included
+ * in the LICENSE file that accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License
+ * version 2 along with this program; if not, write to the
+ * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+ * Boston, MA 021110-1307, USA
+ *
+ * GPL HEADER END
+ */
+/*
+ * Copyright (c) 2012 Intel, Inc.
+ * Use is subject to license terms.
+ *
+ * Author: Johann Lombardi <johann.lombardi@intel.com>
+ * Author: Niu    Yawei    <yawei.niu@intel.com>
+ */
+
+#ifndef EXPORT_SYMTAB
+# define EXPORT_SYMTAB
+#endif
+
+#define DEBUG_SUBSYSTEM S_LQUOTA
+
+#include <linux/version.h>
+#include <linux/fs.h>
+#include <asm/unistd.h>
+#include <linux/quotaops.h>
+#include <linux/init.h>
+
+#include <obd_class.h>
+#include <lustre_param.h>
+#include <lprocfs_status.h>
+
+#include "qsd_internal.h"
+
+/*
+ * helper function returning how much space is currently reserved for requests
+ * in flight.
+ */
+static inline int lqe_pending_dqacq(struct lquota_entry *lqe)
+{
+       int     pending;
+
+       lqe_read_lock(lqe);
+       pending = lqe->lqe_pending_req;
+       lqe_read_unlock(lqe);
+
+       return pending;
+}
+
+/*
+ * helper function returning true when the connection to master is ready to be
+ * used.
+ */
+static inline int qsd_ready(struct qsd_instance *qsd)
+{
+       struct obd_import       *imp = NULL;
+
+       cfs_read_lock(&qsd->qsd_lock);
+       if (qsd->qsd_exp_valid)
+               imp = class_exp2cliimp(qsd->qsd_exp);
+       cfs_read_unlock(&qsd->qsd_lock);
+
+       return (imp == NULL || imp->imp_invalid) ? false : true;
+}
+
+/*
+ * Helper function returning true when quota space need to be adjusted (some
+ * unused space should be free or pre-acquire) and false otherwise.
+ */
+static bool qsd_adjust_needed(struct lquota_entry *lqe)
+{
+       struct qsd_qtype_info   *qqi;
+       __u64                    usage, granted;
+
+       qqi = lqe2qqi(lqe);
+
+       if (!lqe->lqe_enforced || qqi->qqi_qsd->qsd_stopping)
+               /* if quota isn't enforced for this id, no need to adjust
+                * Similarly, no need to perform adjustment if the target is in
+                * the process of shutting down. */
+               return false;
+
+       usage  = lqe->lqe_usage;
+       usage += lqe->lqe_pending_write + lqe->lqe_waiting_write;
+       granted = lqe->lqe_granted - lqe->lqe_pending_rel;
+
+       /* need to re-acquire per-ID lock or release all grant */
+       if (!lustre_handle_is_used(&lqe->lqe_lockh) &&
+           lqe->lqe_granted > lqe->lqe_usage)
+               return true;
+
+       /* good old quota qunit adjustment logic which has been around since
+        * lustre 1.4:
+        * 1. Need to release some space? */
+       if (granted > usage + lqe->lqe_qunit)
+               return true;
+
+       /* 2. Any quota overrun? */
+       if (lqe->lqe_usage > lqe->lqe_granted)
+               /* we ended up consuming more than we own, we need to have this
+                * fixed ASAP */
+               return true;
+
+       /* 3. Time to pre-acquire? */
+       if (!lqe->lqe_edquot && !lqe->lqe_nopreacq && lqe->lqe_qunit != 0 &&
+           granted < usage + lqe->lqe_qtune)
+               /* need to pre-acquire some space if we don't want to block
+                * client's requests */
+               return true;
+
+       return false;
+}
+
+/*
+ * Callback function called when an acquire/release request sent to the master
+ * is completed
+ */
+static void qsd_dqacq_completion(const struct lu_env *env,
+                                struct qsd_qtype_info *qqi,
+                                struct quota_body *reqbody,
+                                struct quota_body *repbody,
+                                struct lustre_handle *lockh,
+                                union ldlm_wire_lvb *lvb,
+                                void *arg, int ret)
+{
+       struct lquota_entry     *lqe = (struct lquota_entry *)arg;
+       struct qsd_thread_info  *qti;
+       int                      rc;
+       bool                     adjust = false, cancel = false;
+       ENTRY;
+
+       LASSERT(qqi != NULL && lqe != NULL);
+
+       /* environment passed by ptlrpcd is mostly used by CLIO and hasn't the
+        * DT tags set. */
+       rc = lu_env_refill_by_tags((struct lu_env *)env, LCT_DT_THREAD, 0);
+       if (rc) {
+               LQUOTA_ERROR(lqe, "failed to refill environmnent %d", rc);
+               lqe_write_lock(lqe);
+               /* can't afford to adjust quota space with no suitable lu_env */
+               GOTO(out_noadjust, rc);
+       }
+       qti = qsd_info(env);
+
+       lqe_write_lock(lqe);
+
+       if (ret != 0 && ret != -EDQUOT && ret != -EINPROGRESS) {
+               LQUOTA_ERROR(lqe, "DQACQ failed with %d, op:%x", ret,
+                            reqbody->qb_flags);
+               GOTO(out, ret);
+       }
+
+       /* despite -EDQUOT & -EINPROGRESS errors, the master might still
+        * grant us back quota space to adjust quota overrun */
+
+       LQUOTA_DEBUG(lqe, "DQACQ returned %d", ret);
+
+       /* Set the lqe_lockh */
+       if (lustre_handle_is_used(lockh) &&
+           !lustre_handle_equal(lockh, &lqe->lqe_lockh))
+               lustre_handle_copy(&lqe->lqe_lockh, lockh);
+
+       /* If the replied qb_count is zero, it means master didn't process
+        * the DQACQ since the limit for this ID has been removed, so we
+        * should not update quota entry & slave index copy neither. */
+       if (repbody != NULL && repbody->qb_count != 0) {
+               LQUOTA_DEBUG(lqe, "DQACQ qb_count:"LPU64, repbody->qb_count);
+
+               if (req_is_rel(reqbody->qb_flags)) {
+                       if (lqe->lqe_granted < repbody->qb_count) {
+                               LQUOTA_ERROR(lqe, "can't release more space "
+                                            "than owned "LPU64"<"LPU64,
+                                            lqe->lqe_granted,
+                                            repbody->qb_count);
+                               lqe->lqe_granted = 0;
+                       } else {
+                               lqe->lqe_granted -= repbody->qb_count;
+                       }
+                       /* Cancel the per-ID lock initiatively when there
+                        * isn't any usage & grant, which can avoid master
+                        * sending glimpse unnecessarily to this slave on
+                        * quota revoking */
+                       if (!lqe->lqe_pending_write && !lqe->lqe_granted &&
+                           !lqe->lqe_waiting_write && !lqe->lqe_usage)
+                               cancel = true;
+               } else {
+                       lqe->lqe_granted += repbody->qb_count;
+               }
+               qti->qti_rec.lqr_slv_rec.qsr_granted = lqe->lqe_granted;
+               lqe_write_unlock(lqe);
+
+               /* Update the slave index file in the dedicated thread. So far,
+                * We don't update the version of slave index copy on DQACQ.
+                * No locking is necessary since nobody can change
+                * lqe->lqe_granted while lqe->lqe_pending_req > 0 */
+               qsd_upd_schedule(qqi, lqe, &lqe->lqe_id, &qti->qti_rec, 0,
+                                false);
+               lqe_write_lock(lqe);
+       }
+
+       /* extract information from lvb */
+       if (ret == 0 && lvb != 0) {
+               if (lvb->l_lquota.lvb_id_qunit != 0)
+                       qsd_set_qunit(lqe, lvb->l_lquota.lvb_id_qunit);
+               if (lvb->l_lquota.lvb_flags & LQUOTA_FL_EDQUOT)
+                       lqe->lqe_edquot = true;
+               else
+                       lqe->lqe_edquot = false;
+       } else if (repbody != NULL && repbody->qb_qunit != 0) {
+               qsd_set_qunit(lqe, repbody->qb_qunit);
+       }
+
+       /* turn off pre-acquire if it failed with -EDQUOT. This is done to avoid
+        * flooding the master with acquire request. Pre-acquire will be turned
+        * on again as soon as qunit is modified */
+       if (req_is_preacq(reqbody->qb_flags) && ret == -EDQUOT)
+               lqe->lqe_nopreacq = true;
+out:
+       adjust = qsd_adjust_needed(lqe);
+out_noadjust:
+       lqe->lqe_pending_req--;
+       lqe->lqe_pending_rel = 0;
+       lqe_write_unlock(lqe);
+
+       cfs_waitq_broadcast(&lqe->lqe_waiters);
+
+       /* release reference on per-ID lock */
+       if (lustre_handle_is_used(lockh))
+               ldlm_lock_decref(lockh, qsd_id_einfo.ei_mode);
+
+       if (cancel) {
+               qsd_adjust_schedule(lqe, false, true);
+       } else if (adjust) {
+               if (!ret || ret == -EDQUOT)
+                       qsd_adjust_schedule(lqe, false, false);
+               else
+                       qsd_adjust_schedule(lqe, true, false);
+       }
+
+       if (lvb)
+               /* free lvb allocated in qsd_dqacq */
+               OBD_FREE_PTR(lvb);
+
+       lqe_putref(lqe);
+       EXIT;
+}
+
+static int qsd_acquire_local(struct lquota_entry *lqe, __u64 space)
+{
+       __u64   usage;
+       int     rc;
+       ENTRY;
+
+       if (!lqe->lqe_enforced)
+               /* not enforced any more, we are good */
+               RETURN(0);
+
+       lqe_write_lock(lqe);
+       /* use latest usage */
+       usage = lqe->lqe_usage;
+       /* take pending write into account */
+       usage += lqe->lqe_pending_write;
+
+       if (space + usage <= lqe->lqe_granted - lqe->lqe_pending_rel) {
+               /* Yay! we got enough space */
+               lqe->lqe_pending_write += space;
+               lqe->lqe_waiting_write -= space;
+               rc = 0;
+       } else if (lqe->lqe_edquot) {
+               rc = -EDQUOT;
+       } else {
+               rc = -EAGAIN;
+       }
+       lqe_write_unlock(lqe);
+
+       RETURN(rc);
+}
+
+static bool qsd_calc_space(struct lquota_entry *lqe, enum qsd_ops op,
+                          struct quota_body *qbody)
+{
+       struct qsd_qtype_info   *qqi;
+       __u64                    usage, granted;
+
+       if (!lqe->lqe_enforced && op != QSD_REL)
+               return 0;
+
+       qqi = lqe2qqi(lqe);
+
+       LASSERT(lqe->lqe_pending_rel == 0);
+       usage   = lqe->lqe_usage;
+       usage  += lqe->lqe_pending_write + lqe->lqe_waiting_write;
+       granted = lqe->lqe_granted;
+
+       qbody->qb_flags = 0;
+again:
+       switch (op) {
+       case QSD_ACQ:
+               /* if we overconsumed quota space, we report usage in request
+                * so that master can adjust it unconditionally */
+               if (lqe->lqe_usage > lqe->lqe_granted) {
+                       qbody->qb_usage = lqe->lqe_usage;
+                       qbody->qb_flags = QUOTA_DQACQ_FL_REPORT;
+                       granted = lqe->lqe_usage;
+               }
+               /* acquire as much as needed, but not more */
+               if (usage > granted) {
+                       qbody->qb_count  = usage - granted;
+                       qbody->qb_flags |= QUOTA_DQACQ_FL_ACQ;
+               }
+               break;
+       case QSD_REP:
+               /* When reporting quota (during reintegration or on setquota
+                * glimpse), we should release granted space if usage is 0.
+                * Otherwise, if the usage is less than granted, we need to
+                * acquire the per-ID lock to make sure the unused grant can be
+                * reclaimed by per-ID lock glimpse. */
+               if (lqe->lqe_usage == 0 && lqe->lqe_granted != 0) {
+                       LQUOTA_DEBUG(lqe, "Release on report!");
+                       GOTO(again, op = QSD_REL);
+               } else if (lqe->lqe_usage == lqe->lqe_granted) {
+                       LQUOTA_DEBUG(lqe, "Usage matches granted, needn't do "
+                                    "anything on report!");
+               } else if (lqe->lqe_usage < lqe->lqe_granted) {
+                       LQUOTA_DEBUG(lqe, "Acquire per-ID lock on report!");
+                       qbody->qb_count = 0;
+                       qbody->qb_flags = QUOTA_DQACQ_FL_ACQ;
+               } else {
+                       LASSERT(lqe->lqe_usage > lqe->lqe_granted);
+                       LQUOTA_DEBUG(lqe, "Reporting usage");
+                       qbody->qb_usage = lqe->lqe_usage;
+                       qbody->qb_flags = QUOTA_DQACQ_FL_REPORT;
+               }
+               break;
+       case QSD_REL:
+               /* release unused quota space unconditionally */
+               if (lqe->lqe_granted > lqe->lqe_usage) {
+                       qbody->qb_count = lqe->lqe_granted - lqe->lqe_usage;
+                       qbody->qb_flags = QUOTA_DQACQ_FL_REL;
+               }
+               break;
+       case QSD_ADJ: {
+               /* need to re-acquire per-ID lock or release all grant */
+               if (!lustre_handle_is_used(&lqe->lqe_lockh) &&
+                   lqe->lqe_granted > lqe->lqe_usage)
+                       GOTO(again, op = QSD_REP);
+
+               /* release spare grant */
+               if (granted > usage + lqe->lqe_qunit) {
+                       /* pre-release quota space */
+                       qbody->qb_count  = granted - usage;
+                       /* if usage == 0, release all granted space */
+                       if (usage) {
+                               /* try to keep one qunit of quota space */
+                               qbody->qb_count -= lqe->lqe_qunit;
+                               /* but don't release less than qtune to avoid
+                                * releasing space too often */
+                               if (qbody->qb_count < lqe->lqe_qtune)
+                                       qbody->qb_count = lqe->lqe_qtune;
+                       }
+                       qbody->qb_flags = QUOTA_DQACQ_FL_REL;
+                       break;
+               }
+
+               /* if we overconsumed quota space, we report usage in request
+                * so that master can adjust it unconditionally */
+               if (lqe->lqe_usage > lqe->lqe_granted) {
+                       qbody->qb_usage = lqe->lqe_usage;
+                       qbody->qb_flags = QUOTA_DQACQ_FL_REPORT;
+                       granted         = lqe->lqe_usage;
+               }
+
+               if (!lqe->lqe_edquot && !lqe->lqe_nopreacq &&
+                   lustre_handle_is_used(&lqe->lqe_lockh) &&
+                   lqe->lqe_qunit != 0 && granted < usage + lqe->lqe_qtune) {
+                       /* To pre-acquire quota space, we report how much spare
+                        * quota space the slave currently owns, then the master
+                        * will grant us back how much we can pretend given the
+                        * current state of affairs */
+                       if (granted <= usage)
+                               qbody->qb_count = 0;
+                       else
+                               qbody->qb_count = granted - usage;
+                       qbody->qb_flags |= QUOTA_DQACQ_FL_PREACQ;
+               }
+               break;
+       }
+       default:
+               CERROR("Invalid qsd operation:%u\n", op);
+               LBUG();
+               break;
+       }
+       return qbody->qb_flags != 0;
+}
+
+/*
+ * Acquire/release quota space from master.
+ * There are at most 1 in-flight dqacq/dqrel.
+ *
+ * \param env    - the environment passed by the caller
+ * \param lqe    - is the qid entry to be processed
+ * \param op     - operation that want to be performed by the caller
+ *
+ * \retval 0     - success
+ * \retval -EDQUOT      : out of quota
+ *         -EINPROGRESS : inform client to retry write/create
+ *         -ve          : other appropriate errors
+ */
+int qsd_dqacq(const struct lu_env *env, struct lquota_entry *lqe,
+             enum qsd_ops op)
+{
+       struct qsd_thread_info  *qti = qsd_info(env);
+       struct quota_body       *qbody = &qti->qti_body;
+       struct qsd_instance     *qsd;
+       struct qsd_qtype_info   *qqi;
+       struct ldlm_lock        *lock;
+       int                      rc;
+       bool                     intent = false, sync;
+       ENTRY;
+
+       qqi = lqe2qqi(lqe);
+       qsd = qqi->qqi_qsd;
+
+       if (qsd->qsd_stopping) {
+               LQUOTA_DEBUG(lqe, "Dropping quota req since qsd is stopping");
+               /* Target is about to shut down, client will retry */
+               RETURN(-EINPROGRESS);
+       }
+
+       if (!qsd_ready(qsd)) {
+               LQUOTA_DEBUG(lqe, "Connection to master not ready");
+               RETURN(-ENOTCONN);
+       }
+
+       /* In most case, reintegration must have been triggered (when enable
+        * quota or on OST start), however, in rare race condition (enabling
+        * quota when starting OSTs), we might miss triggering reintegration
+        * for some qqi.
+        *
+        * If the previous reintegration failed for some reason, we'll
+        * re-trigger it here as well. */
+       if (!qqi->qqi_glb_uptodate || !qqi->qqi_slv_uptodate) {
+               LQUOTA_DEBUG(lqe, "Not up-to-date, dropping request and kicking"
+                            " off reintegration");
+               qsd_start_reint_thread(qqi);
+               RETURN(-EINPROGRESS);
+       }
+
+       LQUOTA_DEBUG(lqe, "DQACQ starts op=%u", op);
+
+       /* Fill the remote global lock handle, master will check this handle
+        * to see if the slave is sending request with stale lock */
+       cfs_read_lock(&qsd->qsd_lock);
+       lustre_handle_copy(&qbody->qb_glb_lockh, &qqi->qqi_lockh);
+       cfs_read_unlock(&qsd->qsd_lock);
+
+       if (!lustre_handle_is_used(&qbody->qb_glb_lockh))
+               RETURN(-ENOLCK);
+
+       lock = ldlm_handle2lock(&qbody->qb_glb_lockh);
+       if (lock == NULL)
+               RETURN(-ENOLCK);
+       lustre_handle_copy(&qbody->qb_glb_lockh, &lock->l_remote_handle);
+       LDLM_LOCK_PUT(lock);
+
+       /* We allow only one in-flight dqacq/dqrel for specified qid, if
+        * there is already in-flight dqacq/dqrel:
+        *
+        * - For QSD_ADJ: we should just abort it, since local limit is going
+        *   to be changed soon;
+        * - For QSD_ACQ & QSD_REL: we just wait for the in-flight dqacq/dqrel
+        *   finished, and return success to the caller. The caller is
+        *   responsible for retrying;
+        * - For QSD_REP: we should just abort it, since slave has already
+        *   acquired/released grant; */
+       sync = (op == QSD_ACQ || op == QSD_REL) ? true : false;
+       LASSERTF(lqe->lqe_pending_req <= 1, "pending dqacq/dqrel:%d",
+                lqe->lqe_pending_req);
+
+       lqe_write_lock(lqe);
+       if (lqe->lqe_pending_req != 0) {
+               struct l_wait_info lwi = { 0 };
+
+               lqe_write_unlock(lqe);
+               if (!sync) {
+                       LQUOTA_DEBUG(lqe, "Abort DQACQ, op=%d", op);
+                       RETURN(0);
+               }
+
+               LQUOTA_DEBUG(lqe, "waiting for in-flight dqacq/dqrel");
+               l_wait_event(lqe->lqe_waiters,
+                            !lqe_pending_dqacq(lqe) || qsd->qsd_stopping,
+                            &lwi);
+               RETURN(0);
+       }
+
+       /* fill qb_count & qb_flags */
+       if (!qsd_calc_space(lqe, op, qbody)) {
+               lqe_write_unlock(lqe);
+               LQUOTA_DEBUG(lqe, "No DQACQ required, op=%u", op);
+               RETURN(0);
+       }
+       lqe->lqe_pending_req++;
+       lqe_write_unlock(lqe);
+
+       /* fill other quota body fields */
+       qbody->qb_fid = qqi->qqi_fid;
+       qbody->qb_id  = lqe->lqe_id;
+       memset(&qbody->qb_lockh, 0, sizeof(qbody->qb_lockh));
+       memset(&qti->qti_lockh, 0, sizeof(qti->qti_lockh));
+
+       /* hold a refcount until completion */
+       lqe_getref(lqe);
+
+       if (req_is_acq(qbody->qb_flags) || req_is_preacq(qbody->qb_flags)) {
+               /* check whether we already own a lock for this ID */
+               lqe_read_lock(lqe);
+               lustre_handle_copy(&qti->qti_lockh, &lqe->lqe_lockh);
+               lqe_read_unlock(lqe);
+
+               rc = qsd_id_lock_match(&qti->qti_lockh, &qbody->qb_lockh);
+               if (rc) {
+                       memset(&qti->qti_lockh, 0, sizeof(qti->qti_lockh));
+                       if (req_is_preacq(qbody->qb_flags)) {
+                               if (req_has_rep(qbody->qb_flags))
+                                       /* still want to report usage */
+                                       qbody->qb_flags = QUOTA_DQACQ_FL_REPORT;
+                               else
+                                       /* no pre-acquire if no per-ID lock */
+                                       GOTO(out, rc = -ENOLCK);
+                       } else {
+                               /* no lock found, should use intent */
+                               intent = true;
+                       }
+               } else if (req_is_acq(qbody->qb_flags) &&
+                          qbody->qb_count == 0) {
+                       /* found cached lock, no need to acquire */
+                       GOTO(out, rc = 0);
+               }
+       }
+
+       if (!intent) {
+               rc = qsd_send_dqacq(env, qsd->qsd_exp, qbody, sync,
+                                   qsd_dqacq_completion, qqi, &qti->qti_lockh,
+                                   lqe);
+        } else {
+               union ldlm_wire_lvb *lvb;
+
+               OBD_ALLOC_PTR(lvb);
+               if (lvb == NULL)
+                       GOTO(out, rc = -ENOMEM);
+
+               rc = qsd_intent_lock(env, qsd->qsd_exp, qbody, sync,
+                                    IT_QUOTA_DQACQ, qsd_dqacq_completion,
+                                    qqi, lvb, (void *)lqe);
+       }
+       /* the completion function will be called by qsd_send_dqacq or
+        * qsd_intent_lock */
+       RETURN(rc);
+out:
+       qsd_dqacq_completion(env, qqi, qbody, NULL, &qti->qti_lockh, NULL, lqe,
+                            rc);
+       return rc;
+}
+
+/*
+ * Quota enforcement handler. If local quota can satisfy this operation,
+ * return success, otherwise, acquire more quota from master.
+ * (for write operation, if master isn't available at this moment, return
+ * -EINPROGRESS to inform client to retry the write)
+ *
+ * \param env   - the environment passed by the caller
+ * \param qsd   - is the qsd instance associated with the device in charge
+ *                of the operation.
+ * \param qid   - is the qid information attached in the transaction handle
+ * \param space - is the space required by the operation
+ * \param flags - if the operation is write, return caller no user/group
+ *                and sync commit flags
+ *
+ * \retval 0        - success
+ * \retval -EDQUOT      : out of quota
+ *         -EINPROGRESS : inform client to retry write
+ *         -ve          : other appropriate errors
+ */
+static int qsd_op_begin0(const struct lu_env *env, struct qsd_qtype_info *qqi,
+                        struct lquota_id_info *qid, long long space,
+                        int *flags)
+{
+       struct lquota_entry *lqe;
+       int                  rc = 0, retry_cnt;
+       ENTRY;
+
+       if (qid->lqi_qentry != NULL) {
+               /* we already had to deal with this id for this transaction */
+               lqe = qid->lqi_qentry;
+               if (!lqe->lqe_enforced)
+                       RETURN(0);
+       } else {
+               /* look up lquota entry associated with qid */
+               lqe = lqe_locate(env, qqi->qqi_site, &qid->lqi_id);
+               if (IS_ERR(lqe))
+                       RETURN(PTR_ERR(lqe));
+               if (!lqe->lqe_enforced) {
+                       lqe_putref(lqe);
+                       RETURN(0);
+               }
+               qid->lqi_qentry = lqe;
+               /* lqe will be released in qsd_op_end() */
+       }
+
+       if (space <= 0) {
+               /* when space is negative or null, we don't need to consume
+                * quota space. That said, we still want to perform space
+                * adjustments in qsd_op_end, so we return here, but with
+                * a reference on the lqe */
+               if (flags != NULL) {
+                       rc = qsd_refresh_usage(env, lqe);
+                       GOTO(out_flags, rc);
+               }
+               RETURN(0);
+       }
+
+       LQUOTA_DEBUG(lqe, "op_begin space:"LPD64, space);
+
+       lqe_write_lock(lqe);
+       lqe->lqe_waiting_write += space;
+       lqe_write_unlock(lqe);
+
+       for (retry_cnt = 0; rc == 0; retry_cnt++) {
+               /* refresh disk usage if required */
+               rc = qsd_refresh_usage(env, lqe);
+               if (rc)
+                       break;
+
+               /* try to consume local quota space */
+               rc = qsd_acquire_local(lqe, space);
+               if (rc != -EAGAIN)
+                       /* rc == 0, Wouhou! enough local quota space
+                        * rc < 0, something bad happened */
+                       break;
+
+               /* need to acquire more quota space from master, this is done
+                * synchronously */
+               rc = qsd_dqacq(env, lqe, QSD_ACQ);
+               LQUOTA_DEBUG(lqe, "Acquired quota space, retry cnt:%d rc:%d",
+                            retry_cnt, rc);
+       }
+
+       if (rc == 0) {
+               qid->lqi_space += space;
+       } else {
+               LQUOTA_DEBUG(lqe, "Acquire quota failed:%d", rc);
+
+               lqe_write_lock(lqe);
+               lqe->lqe_waiting_write -= space;
+
+               if (flags && lqe->lqe_pending_write != 0)
+                       /* Inform OSD layer that there are pending writes.
+                        * It might want to retry after a sync if appropriate */
+                        *flags |= QUOTA_FL_SYNC;
+               lqe_write_unlock(lqe);
+
+               /* convert recoverable error into -EINPROGRESS, and client will
+                * retry write on -EINPROGRESS. */
+               if (rc == -ETIMEDOUT || rc == -ENOTCONN || rc == -ENOLCK ||
+                   rc == -EAGAIN || rc == -EINTR)
+                       rc = -EINPROGRESS;
+       }
+
+       if (flags != NULL) {
+out_flags:
+               LASSERT(qid->lqi_is_blk);
+               if (rc != 0) {
+                       *flags |= LQUOTA_OVER_FL(qqi->qqi_qtype);
+               } else {
+                       __u64   usage;
+
+                       lqe_read_lock(lqe);
+                       usage  = lqe->lqe_usage;
+                       usage += lqe->lqe_pending_write;
+                       usage += lqe->lqe_waiting_write;
+                       usage += qqi->qqi_qsd->qsd_sync_threshold;
+
+                       /* if we should notify client to start sync write */
+                       if (usage >= lqe->lqe_granted - lqe->lqe_pending_rel)
+                               *flags |= LQUOTA_OVER_FL(qqi->qqi_qtype);
+                       else
+                               *flags &= ~LQUOTA_OVER_FL(qqi->qqi_qtype);
+                       lqe_read_unlock(lqe);
+               }
+       }
+       RETURN(rc);
+}
+
+static inline bool qid_equal(struct lquota_id_info *q1,
+                            struct lquota_id_info *q2)
+{
+       if (q1->lqi_type != q2->lqi_type)
+               return false;
+       return (q1->lqi_id.qid_uid == q2->lqi_id.qid_uid) ? true : false;
+}
+
+/*
+ * Enforce quota, it's called in the declaration of each operation.
+ * qsd_op_end() will then be called later once all the operations have been
+ * completed in order to release/adjust the quota space.
+ *
+ * \param env        - the environment passed by the caller
+ * \param qsd        - is the qsd instance associated with the device in charge
+ *                     of the operation.
+ * \param trans      - is the quota transaction information
+ * \param qi         - qid & space required by current operation
+ * \param flags      - if the operation is write, return caller no user/group
+ *                     and sync commit flags
+ *
+ * \retval 0        - success
+ * \retval -EDQUOT      : out of quota
+ *         -EINPROGRESS : inform client to retry write
+ *         -ve          : other appropriate errors
+ */
+int qsd_op_begin(const struct lu_env *env, struct qsd_instance *qsd,
+                struct lquota_trans *trans, struct lquota_id_info *qi,
+                int *flags)
+{
+       struct qsd_qtype_info *qqi;
+       int                    i, rc;
+       bool                   found = false;
+       ENTRY;
+
+       if (unlikely(qsd == NULL))
+               RETURN(0);
+
+       /* We don't enforce quota until the qsd_instance is started */
+       cfs_read_lock(&qsd->qsd_lock);
+       if (!qsd->qsd_started) {
+               cfs_read_unlock(&qsd->qsd_lock);
+               RETURN(0);
+       }
+       cfs_read_unlock(&qsd->qsd_lock);
+
+       /* ignore block quota on MDTs, ignore inode quota on OSTs */
+       if ((!qsd->qsd_is_md && !qi->lqi_is_blk) ||
+           (qsd->qsd_is_md && qi->lqi_is_blk))
+               RETURN(0);
+
+       qqi = qsd->qsd_type_array[qi->lqi_type];
+
+       /* ignore quota enforcement request when:
+        *    - quota isn't enforced for this quota type
+        * or - we failed to access the accounting object for this quota type
+        * or - the space to acquire is null
+        * or - the user/group is root */
+       if (!qsd_type_enabled(qsd, qi->lqi_type) || qqi->qqi_acct_obj == NULL ||
+           qi->lqi_id.qid_uid == 0)
+               RETURN(0);
+
+       LASSERTF(trans->lqt_id_cnt <= QUOTA_MAX_TRANSIDS, "id_cnt=%d",
+                trans->lqt_id_cnt);
+       /* check whether we already allocated a slot for this id */
+       for (i = 0; i < trans->lqt_id_cnt; i++) {
+               if (qid_equal(qi, &trans->lqt_ids[i])) {
+                       found = true;
+                       /* make sure we are not mixing inodes & blocks */
+                       LASSERT(trans->lqt_ids[i].lqi_is_blk == qi->lqi_is_blk);
+                       break;
+               }
+       }
+
+       if (!found) {
+               if (unlikely(i >= QUOTA_MAX_TRANSIDS)) {
+                       CERROR("%s: more than %d qids enforced for a "
+                              "transaction?\n", qsd->qsd_svname, i);
+                       RETURN(-EINVAL);
+               }
+
+               /* fill new slot */
+               trans->lqt_ids[i].lqi_id     = qi->lqi_id;
+               trans->lqt_ids[i].lqi_type   = qi->lqi_type;
+               trans->lqt_ids[i].lqi_is_blk = qi->lqi_is_blk;
+               trans->lqt_id_cnt++;
+       }
+
+       /* manage quota enforcement for this ID */
+       rc = qsd_op_begin0(env, qqi, &trans->lqt_ids[i], qi->lqi_space, flags);
+
+       RETURN(rc);
+}
+EXPORT_SYMBOL(qsd_op_begin);
+
+/**
+ * Post quota operation, pre-acquire/release quota from master.
+ *
+ * \param  env  - the environment passed by the caller
+ * \param  qsd  - is the qsd instance attached to the OSD device which
+ *                is handling the operation.
+ * \param  qqi  - is the qsd_qtype_info structure associated with the quota ID
+ *                subject to the operation
+ * \param  qid  - stores information related to his ID for the operation
+ *                which has just completed
+ *
+ * \retval 0    - success
+ * \retval -ve  - failure
+ */
+static void qsd_op_end0(const struct lu_env *env, struct qsd_qtype_info *qqi,
+                       struct lquota_id_info *qid)
+{
+       struct lquota_entry     *lqe;
+       bool                     adjust;
+       ENTRY;
+
+       lqe = qid->lqi_qentry;
+       if (lqe == NULL)
+               RETURN_EXIT;
+       qid->lqi_qentry = NULL;
+
+       /* refresh cached usage if a suitable environment is passed */
+       if (env != NULL)
+               qsd_refresh_usage(env, lqe);
+
+       lqe_write_lock(lqe);
+       if (qid->lqi_space > 0)
+               lqe->lqe_pending_write -= qid->lqi_space;
+       if (env != NULL)
+               adjust = qsd_adjust_needed(lqe);
+       else
+               adjust = true;
+       lqe_write_unlock(lqe);
+
+       if (adjust) {
+               /* pre-acquire/release quota space is needed */
+               if (env != NULL)
+                       qsd_dqacq(env, lqe, QSD_ADJ);
+               else
+                       /* no suitable environment, handle adjustment in
+                        * separate thread context */
+                       qsd_adjust_schedule(lqe, false, false);
+       }
+       lqe_putref(lqe);
+       EXIT;
+}
+
+/*
+ * Post quota operation. It's called after each operation transaction stopped.
+ *
+ * \param  env   - the environment passed by the caller
+ * \param  qsd   - is the qsd instance associated with device which is handling
+ *                 the operation.
+ * \param  qids  - all qids information attached in the transaction handle
+ * \param  count - is the number of qid entries in the qids array.
+ *
+ * \retval 0     - success
+ * \retval -ve   - failure
+ */
+void qsd_op_end(const struct lu_env *env, struct qsd_instance *qsd,
+               struct lquota_trans *trans)
+{
+       int i;
+       ENTRY;
+
+       if (unlikely(qsd == NULL))
+               RETURN_EXIT;
+
+       /* We don't enforce quota until the qsd_instance is started */
+       cfs_read_lock(&qsd->qsd_lock);
+       if (!qsd->qsd_started) {
+               cfs_read_unlock(&qsd->qsd_lock);
+               RETURN_EXIT;
+       }
+       cfs_read_unlock(&qsd->qsd_lock);
+
+       LASSERT(trans != NULL);
+
+       for (i = 0; i < trans->lqt_id_cnt; i++) {
+               struct qsd_qtype_info *qqi;
+
+               if (trans->lqt_ids[i].lqi_qentry == NULL)
+                       continue;
+
+               qqi = qsd->qsd_type_array[trans->lqt_ids[i].lqi_type];
+               qsd_op_end0(env, qqi, &trans->lqt_ids[i]);
+       }
+
+       /* reset id_count to 0 so that a second accidental call to qsd_op_end()
+        * does not result in failure */
+       trans->lqt_id_cnt = 0;
+       EXIT;
+}
+EXPORT_SYMBOL(qsd_op_end);
+
+void qsd_adjust_quota(const struct lu_env *env, struct qsd_instance *qsd,
+                     union lquota_id *qid, int qtype)
+{
+       struct lquota_entry    *lqe;
+       struct qsd_qtype_info  *qqi;
+       bool                    adjust;
+       ENTRY;
+
+       if (unlikely(qsd == NULL))
+               RETURN_EXIT;
+
+       /* We don't enforce quota until the qsd_instance is started */
+       cfs_read_lock(&qsd->qsd_lock);
+       if (!qsd->qsd_started) {
+               cfs_read_unlock(&qsd->qsd_lock);
+               RETURN_EXIT;
+       }
+       cfs_read_unlock(&qsd->qsd_lock);
+
+       qqi = qsd->qsd_type_array[qtype];
+       LASSERT(qqi);
+
+       if (!qsd_type_enabled(qsd, qtype) || qqi->qqi_acct_obj == NULL ||
+           qid->qid_uid == 0)
+               RETURN_EXIT;
+
+       cfs_read_lock(&qsd->qsd_lock);
+       if (!qsd->qsd_started) {
+               cfs_read_unlock(&qsd->qsd_lock);
+               RETURN_EXIT;
+       }
+       cfs_read_unlock(&qsd->qsd_lock);
+
+       lqe = lqe_locate(env, qqi->qqi_site, qid);
+       if (IS_ERR(lqe)) {
+               CERROR("%s: fail to locate lqe for id:"LPU64", type:%d\n",
+                      qsd->qsd_svname, qid->qid_uid, qtype);
+               RETURN_EXIT;
+       }
+
+       qsd_refresh_usage(env, lqe);
+
+       lqe_read_lock(lqe);
+       adjust = qsd_adjust_needed(lqe);
+       lqe_read_unlock(lqe);
+
+       if (adjust)
+               qsd_dqacq(env, lqe, QSD_ADJ);
+
+       lqe_putref(lqe);
+       EXIT;
+}
+EXPORT_SYMBOL(qsd_adjust_quota);
index f37a5a8..2a84bf6 100644 (file)
@@ -55,6 +55,12 @@ struct qsd_instance {
         * are exported */
        cfs_proc_dir_entry_t    *qsd_proc;
 
+       /* export used for the connection to quota master */
+       struct obd_export     *qsd_exp;
+
+       /* ldlm namespace used for quota locks */
+       struct ldlm_namespace *qsd_ns;
+
        /* on-disk directory where to store index files for this qsd instance */
        struct dt_object        *qsd_root;
 
@@ -66,23 +72,41 @@ struct qsd_instance {
         * future. For the time being, we can just use an array. */
        struct qsd_qtype_info   *qsd_type_array[MAXQUOTAS];
 
+       /* per-filesystem quota information */
+       struct qsd_fsinfo       *qsd_fsinfo;
+
+       /* link into qfs_qsd_list of qfs_fsinfo */
+       cfs_list_t               qsd_link;
+
+       /* list of lqe entry which might need quota space adjustment */
+       cfs_list_t               qsd_adjust_list;
+
+       /* lock protecting adjust list */
+       cfs_spinlock_t           qsd_adjust_lock;
+
+       /* dedicated thread for updating slave index files. */
+       struct ptlrpc_thread     qsd_upd_thread;
+
+       /* list of update tasks */
+       cfs_list_t               qsd_upd_list;
+
        /* r/w spinlock protecting:
         * - the state flags
         * - the qsd update list
         * - the deferred list
-        * - flags of the qsd_qtype_info
-        *
-        * probably way too much :(
-        */
+        * - flags of the qsd_qtype_info */
        cfs_rwlock_t             qsd_lock;
 
-       /* per-filesystem quota information */
-       struct qsd_fsinfo       *qsd_fsinfo;
-
-       /* link into qfs_qsd_list of qfs_fsinfo */
-       cfs_list_t               qsd_link;
+       /* Default quota settings which apply to all identifiers */
+       /* when blk qunit reaches this value, later write reqs from client
+        * should be sync. b=16642 */
+       unsigned long            qsd_sync_threshold;
 
        unsigned long            qsd_is_md:1,    /* managing quota for mdt */
+                                qsd_started:1,  /* instance is now started */
+                                qsd_prepared:1, /* qsd_prepare() successfully
+                                                 * called */
+                                qsd_exp_valid:1,/* qsd_exp is now valid */
                                 qsd_stopping:1; /* qsd_instance is stopping */
 };
 
@@ -102,6 +126,9 @@ struct qsd_qtype_info {
        /* Global index FID to use for this quota type */
        struct lu_fid            qqi_fid;
 
+       /* Slave index FID allocated by the master */
+       struct lu_fid            qqi_slv_fid;
+
        /* back pointer to qsd device
         * immutable after creation. */
        struct qsd_instance     *qqi_qsd;
@@ -118,6 +145,21 @@ struct qsd_qtype_info {
        __u64                    qqi_slv_ver; /* slave index version */
        __u64                    qqi_glb_ver; /* global index version */
 
+       /* per quota ID information. All lquota entry are kept in a hash table
+        * and read from disk on cache miss. */
+       struct lquota_site      *qqi_site;
+
+       /* Reintegration thread */
+       struct ptlrpc_thread     qqi_reint_thread;
+
+       /* statistics on operations performed by this slave */
+       struct lprocfs_stats    *qqi_stats;
+
+       /* deferred update for the global index copy */
+       cfs_list_t               qqi_deferred_glb;
+       /* deferred update for the slave index copy */
+       cfs_list_t               qqi_deferred_slv;
+
        /* Various flags representing the current state of the slave for this
         * quota type. */
        unsigned long            qqi_glb_uptodate:1, /* global index uptodate
@@ -185,6 +227,17 @@ enum qsd_ops {
 
 #define QSD_RES_TYPE(qsd) ((qsd)->qsd_is_md ? LQUOTA_RES_MD : LQUOTA_RES_DT)
 
+/* udpate record for slave & global index copy */
+struct qsd_upd_rec {
+       cfs_list_t              qur_link; /* link into qsd_upd_list */
+       union lquota_id         qur_qid;
+       union lquota_rec        qur_rec;
+       struct qsd_qtype_info  *qur_qqi;
+       struct lquota_entry    *qur_lqe;
+       __u64                   qur_ver;
+       bool                    qur_global;
+};
+
 /* Common data shared by qsd-level handlers. This is allocated per-thread to
  * reduce stack consumption.  */
 struct qsd_thread_info {
@@ -265,6 +318,8 @@ static inline void qsd_set_qunit(struct lquota_entry *lqe, __u64 qunit)
        lqe->lqe_nopreacq = false;
 }
 
+#define QSD_WB_INTERVAL        15 /* 15 seconds */
+
 /* qsd_entry.c */
 extern struct lquota_entry_operations qsd_lqe_ops;
 int qsd_refresh_usage(const struct lu_env *, struct lquota_entry *);
@@ -311,11 +366,14 @@ void qsd_put_fsinfo(struct qsd_fsinfo *);
 int qsd_process_config(struct lustre_cfg *);
 
 /* qsd_handler.c */
-/* XXX to be replaced with real function once qsd_handler landed */
-static inline int qsd_dqacq(const struct lu_env *env, struct lquota_entry *lqe,
-                           enum qsd_ops op)
-{
-       return 0;
-}
+int qsd_dqacq(const struct lu_env *, struct lquota_entry *, enum qsd_ops);
+__u64 qsd_calc_grants(struct lquota_entry *, __u64, __u32);
 
+/* qsd_writeback.c */
+void qsd_upd_schedule(struct qsd_qtype_info *, struct lquota_entry *,
+                     union lquota_id *, union lquota_rec *, __u64, bool);
+void qsd_bump_version(struct qsd_qtype_info *, __u64, bool);
+int qsd_start_upd_thread(struct qsd_instance *);
+void qsd_stop_upd_thread(struct qsd_instance *);
+void qsd_adjust_schedule(struct lquota_entry *, bool, bool);
 #endif /* _QSD_INTERNAL_H */
index 65208d2..c1ba0c3 100644 (file)
 #include <obd_class.h>
 #include "qsd_internal.h"
 
+cfs_mem_cache_t *upd_kmem;
+
+struct lu_kmem_descr qsd_caches[] = {
+       {
+               .ckd_cache = &upd_kmem,
+               .ckd_name  = "upd_kmem",
+               .ckd_size  = sizeof(struct qsd_upd_rec)
+       },
+       {
+               .ckd_cache = NULL
+       }
+};
+
 /* define qsd thread key */
 LU_KEY_INIT_FINI(qsd, struct qsd_thread_info);
 LU_CONTEXT_KEY_DEFINE(qsd, LCT_MD_THREAD | LCT_DT_THREAD | LCT_LOCAL);
@@ -68,16 +81,40 @@ static int lprocfs_qsd_rd_state(char *page, char **start, off_t off,
                                int count, int *eof, void *data)
 {
        struct qsd_instance     *qsd = (struct qsd_instance *)data;
+       char                     enabled[5];
+       int                      rc;
 
        LASSERT(qsd != NULL);
 
-       return snprintf(page, count,
-                       "target name:    %s\n"
-                       "pool ID:        %d\n"
-                       "type:           %s\n"
-                       "quota enabled:  none\n",
-                       qsd->qsd_svname, qsd->qsd_pool_id,
-                       qsd->qsd_is_md ? "md" : "dt");
+       memset(enabled, 0, sizeof(enabled));
+       if (qsd_type_enabled(qsd, USRQUOTA))
+               strcat(enabled, "u");
+       if (qsd_type_enabled(qsd, GRPQUOTA))
+               strcat(enabled, "g");
+       if (strlen(enabled) == 0)
+               strcat(enabled, "none");
+
+       rc = snprintf(page, count,
+                     "target name:    %s\n"
+                     "pool ID:        %d\n"
+                     "type:           %s\n"
+                     "quota enabled:  %s\n"
+                     "conn to master: %s\n",
+                     qsd->qsd_svname, qsd->qsd_pool_id,
+                     qsd->qsd_is_md ? "md" : "dt", enabled,
+                     qsd->qsd_exp_valid ? "setup" : "not setup yet");
+
+       if (qsd->qsd_prepared)
+               rc +=  snprintf(page + rc, count - rc,
+                               "user uptodate:  glb[%d],slv[%d],reint[%d]\n"
+                               "group uptodate: glb[%d],slv[%d],reint[%d]\n",
+                               qsd->qsd_type_array[USRQUOTA]->qqi_glb_uptodate,
+                               qsd->qsd_type_array[USRQUOTA]->qqi_slv_uptodate,
+                               qsd->qsd_type_array[USRQUOTA]->qqi_reint,
+                               qsd->qsd_type_array[GRPQUOTA]->qqi_glb_uptodate,
+                               qsd->qsd_type_array[GRPQUOTA]->qqi_slv_uptodate,
+                               qsd->qsd_type_array[GRPQUOTA]->qqi_reint);
+       return rc;
 }
 
 static int lprocfs_qsd_rd_enabled(char *page, char **start, off_t off,
@@ -105,6 +142,47 @@ static struct lprocfs_vars lprocfs_quota_qsd_vars[] = {
 };
 
 /*
+ * Callback function invoked by the OSP layer when the connection to the master
+ * has been set up.
+ *
+ * \param data - is a pointer to the qsd_instance
+ *
+ * \retval - 0 on success, appropriate error on failure
+ */
+static int qsd_conn_callback(void *data)
+{
+       struct qsd_instance *qsd = (struct qsd_instance *)data;
+       int                  type;
+       ENTRY;
+
+       /* qsd_exp should now be valid */
+       LASSERT(qsd->qsd_exp);
+
+       /* grab reference on namespace */
+       ldlm_namespace_get(class_exp2obd(qsd->qsd_exp)->obd_namespace);
+       qsd->qsd_ns = class_exp2obd(qsd->qsd_exp)->obd_namespace;
+
+       cfs_write_lock(&qsd->qsd_lock);
+       /* notify that qsd_exp is now valid */
+       qsd->qsd_exp_valid = true;
+       cfs_write_unlock(&qsd->qsd_lock);
+
+       /* Now that the connection to master is setup, we can initiate the
+        * reintegration procedure for quota types which are enabled.
+        * It is worth noting that, if the qsd_instance hasn't been started
+        * already, then we can only complete the first two steps of the
+        * reintegration procedure (i.e. global lock enqueue and slave
+        * index transfer) since the space usage reconciliation (i.e.
+        * step 3) will have to wait for qsd_start() to be called */
+       for (type = USRQUOTA; type < MAXQUOTAS; type++) {
+               struct qsd_qtype_info *qqi = qsd->qsd_type_array[type];
+               cfs_waitq_signal(&qqi->qqi_reint_thread.t_ctl_waitq);
+       }
+
+       RETURN(0);
+}
+
+/*
  * Release qsd_qtype_info structure which contains data associated with a
  * given quota type. This releases the accounting objects.
  * It's called on OSD cleanup when the qsd instance is released.
@@ -125,6 +203,16 @@ static void qsd_qtype_fini(const struct lu_env *env, struct qsd_instance *qsd,
        qqi = qsd->qsd_type_array[qtype];
        qsd->qsd_type_array[qtype] = NULL;
 
+       /* all deferred work lists should be empty */
+       LASSERT(cfs_list_empty(&qqi->qqi_deferred_glb));
+       LASSERT(cfs_list_empty(&qqi->qqi_deferred_slv));
+
+       /* shutdown lquota site */
+       if (qqi->qqi_site != NULL && !IS_ERR(qqi->qqi_site)) {
+               lquota_site_free(env, qqi->qqi_site);
+               qqi->qqi_site = NULL;
+       }
+
        /* by now, all qqi users should have gone away */
        LASSERT(cfs_atomic_read(&qqi->qqi_ref) == 1);
        lu_ref_fini(&qqi->qqi_reference);
@@ -193,17 +281,22 @@ static int qsd_qtype_init(const struct lu_env *env, struct qsd_instance *qsd,
        qqi->qqi_glb_uptodate = false;
        qqi->qqi_slv_uptodate = false;
        qqi->qqi_reint        = false;
+       cfs_waitq_init(&qqi->qqi_reint_thread.t_ctl_waitq);
+       thread_set_flags(&qqi->qqi_reint_thread, SVC_STOPPED);
+       CFS_INIT_LIST_HEAD(&qqi->qqi_deferred_glb);
+       CFS_INIT_LIST_HEAD(&qqi->qqi_deferred_slv);
        memset(&qqi->qqi_lockh, 0, sizeof(qqi->qqi_lockh));
 
-        /* open accounting object */
-        LASSERT(qqi->qqi_acct_obj == NULL);
-       qqi->qqi_acct_obj = acct_obj_lookup(env, qsd->qsd_dev,
-                                           qtype == USRQUOTA ? ACCT_USER_OID
-                                                             : ACCT_GROUP_OID);
-       /* don't print any error message on failure in order not to confuse
-        * non-OFD user (e.g. 2.3 MDT stack) */
-       if (IS_ERR(qqi->qqi_acct_obj))
-               qqi->qqi_acct_obj = NULL;
+       /* open accounting object */
+       LASSERT(qqi->qqi_acct_obj == NULL);
+       qqi->qqi_acct_obj = acct_obj_lookup(env, qsd->qsd_dev, qtype);
+       if (qqi->qqi_acct_obj == NULL) {
+               LCONSOLE_ERROR("%s: No %s space accounting support. Please use "
+                              "tunefs.lustre --quota option to enable quota "
+                              "accounting.\n",
+                              qsd->qsd_svname, QTYPE_NAME(qtype));
+               GOTO(out, rc = -ENOENT);
+       }
 
        /* open global index copy */
        LASSERT(qqi->qqi_glb_obj == NULL);
@@ -233,17 +326,24 @@ static int qsd_qtype_init(const struct lu_env *env, struct qsd_instance *qsd,
        }
        qqi->qqi_slv_ver = dt_version_get(env, qqi->qqi_slv_obj);
 
+       /* allocate site */
+       qqi->qqi_site = lquota_site_alloc(env, qqi, false, qtype, &qsd_lqe_ops);
+       if (IS_ERR(qqi->qqi_site)) {
+               CERROR("%s: can't allocate site "DFID" %ld\n", qsd->qsd_svname,
+                      PFID(&qqi->qqi_fid), PTR_ERR(qqi->qqi_site));
+               GOTO(out, rc = PTR_ERR(qqi->qqi_site));
+       }
+
        /* register proc entry for accounting object */
        rc = lprocfs_seq_create(qsd->qsd_proc,
                                qtype == USRQUOTA ? "acct_user" : "acct_group",
                                0444, &lprocfs_quota_seq_fops,
                                qqi->qqi_acct_obj);
        if (rc) {
-               CWARN("%s: can't add procfs entry for accounting file %d\n",
-                     qsd->qsd_svname, rc);
+               CERROR("%s: can't add procfs entry for accounting file %d\n",
+                      qsd->qsd_svname, rc);
                GOTO(out, rc);
        }
-
        EXIT;
 out:
        if (rc)
@@ -253,7 +353,8 @@ out:
 
 /*
  * Release a qsd_instance. Companion of qsd_init(). This releases all data
- * structures associated with the quota slave.
+ * structures associated with the quota slave (on-disk objects, lquota entry
+ * tables, ...).
  * This function should be called when the OSD is shutting down.
  *
  * \param env - is the environment passed by the caller
@@ -265,7 +366,9 @@ void qsd_fini(const struct lu_env *env, struct qsd_instance *qsd)
        ENTRY;
 
        CDEBUG(D_QUOTA, "%s: initiating QSD shutdown\n", qsd->qsd_svname);
+       cfs_write_lock(&qsd->qsd_lock);
        qsd->qsd_stopping = true;
+       cfs_write_unlock(&qsd->qsd_lock);
 
        /* remove from the list of fsinfo */
        if (!cfs_list_empty(&qsd->qsd_link)) {
@@ -281,10 +384,30 @@ void qsd_fini(const struct lu_env *env, struct qsd_instance *qsd)
                qsd->qsd_proc = NULL;
        }
 
+       /* stop the writeback thread */
+       qsd_stop_upd_thread(qsd);
+
+       /* shutdown the reintegration threads */
+       for (qtype = USRQUOTA; qtype < MAXQUOTAS; qtype++) {
+               if (qsd->qsd_type_array[qtype] == NULL)
+                       continue;
+               qsd_stop_reint_thread(qsd->qsd_type_array[qtype]);
+       }
+
+       /* release reference on namespace */
+       if (qsd->qsd_ns != NULL) {
+               ldlm_namespace_put(qsd->qsd_ns);
+               qsd->qsd_ns = NULL;
+       }
+
        /* free per-quota type data */
        for (qtype = USRQUOTA; qtype < MAXQUOTAS; qtype++)
                qsd_qtype_fini(env, qsd, qtype);
 
+       /* deregister connection to the quota master */
+       qsd->qsd_exp_valid = false;
+       lustre_deregister_osp_item(&qsd->qsd_exp);
+
        /* release per-filesystem information */
        if (qsd->qsd_fsinfo != NULL)
                qsd_put_fsinfo(qsd->qsd_fsinfo);
@@ -302,6 +425,7 @@ void qsd_fini(const struct lu_env *env, struct qsd_instance *qsd)
                qsd->qsd_dev = NULL;
        }
 
+       CDEBUG(D_QUOTA, "%s: QSD shutdown completed\n", qsd->qsd_svname);
        OBD_FREE_PTR(qsd);
        EXIT;
 }
@@ -334,8 +458,17 @@ struct qsd_instance *qsd_init(const struct lu_env *env, char *svname,
        if (qsd == NULL)
                RETURN(ERR_PTR(-ENOMEM));
 
+       /* generic initializations */
        cfs_rwlock_init(&qsd->qsd_lock);
        CFS_INIT_LIST_HEAD(&qsd->qsd_link);
+       thread_set_flags(&qsd->qsd_upd_thread, SVC_STOPPED);
+       cfs_waitq_init(&qsd->qsd_upd_thread.t_ctl_waitq);
+       CFS_INIT_LIST_HEAD(&qsd->qsd_upd_list);
+       cfs_spin_lock_init(&qsd->qsd_adjust_lock);
+       CFS_INIT_LIST_HEAD(&qsd->qsd_adjust_list);
+       qsd->qsd_prepared = false;
+       qsd->qsd_started = false;
+
        /* copy service name */
        strncpy(qsd->qsd_svname, svname, MAX_OBD_NAME);
 
@@ -378,6 +511,7 @@ struct qsd_instance *qsd_init(const struct lu_env *env, char *svname,
                       svname, rc);
                GOTO(out, rc);
         }
+       EXIT;
 out:
        if (rc) {
                qsd_fini(env, qsd);
@@ -404,15 +538,29 @@ EXPORT_SYMBOL(qsd_init);
  */
 int qsd_prepare(const struct lu_env *env, struct qsd_instance *qsd)
 {
-       int     rc, qtype;
+       struct qsd_thread_info  *qti = qsd_info(env);
+       int                      qtype, rc = 0;
        ENTRY;
 
        LASSERT(qsd != NULL);
 
+       cfs_read_lock(&qsd->qsd_lock);
+       if (qsd->qsd_prepared) {
+               CERROR("%s: qsd instance already prepared\n", qsd->qsd_svname);
+               rc = -EALREADY;
+       }
+       cfs_read_unlock(&qsd->qsd_lock);
+       if (rc)
+               RETURN(rc);
+
        /* Record whether this qsd instance is managing quota enforcement for a
         * MDT (i.e. inode quota) or OST (block quota) */
-       if (lu_device_is_md(qsd->qsd_dev->dd_lu_dev.ld_site->ls_top_dev))
+       if (lu_device_is_md(qsd->qsd_dev->dd_lu_dev.ld_site->ls_top_dev)) {
                qsd->qsd_is_md = true;
+               qsd->qsd_sync_threshold = LQUOTA_LEAST_QUNIT(LQUOTA_RES_MD);
+       } else {
+               qsd->qsd_sync_threshold = LQUOTA_LEAST_QUNIT(LQUOTA_RES_DT);
+       }
 
        /* look-up on-disk directory for the quota slave */
        qsd->qsd_root = lquota_disk_dir_find_create(env, qsd->qsd_dev, NULL,
@@ -432,10 +580,96 @@ int qsd_prepare(const struct lu_env *env, struct qsd_instance *qsd)
                        RETURN(rc);
        }
 
+       /* pools successfully setup, mark the qsd as prepared */
+       cfs_write_lock(&qsd->qsd_lock);
+       qsd->qsd_prepared = true;
+       cfs_write_unlock(&qsd->qsd_lock);
+
+       /* start reintegration thread for each type, if required */
+       for (qtype = USRQUOTA; qtype < MAXQUOTAS; qtype++) {
+               struct qsd_qtype_info   *qqi = qsd->qsd_type_array[qtype];
+
+               rc = qsd_start_reint_thread(qqi);
+               if (rc) {
+                       CERROR("%s: failed to start reint thread for type %s "
+                              "(%d)\n", qsd->qsd_svname, QTYPE_NAME(qtype),
+                              rc);
+                       RETURN(rc);
+               }
+       }
+
+       /* start writeback thread */
+       rc = qsd_start_upd_thread(qsd);
+       if (rc) {
+               CERROR("%s: failed to start writeback thread (%d)\n",
+                      qsd->qsd_svname, rc);
+               RETURN(rc);
+       }
+
+       /* generate osp name */
+       rc = tgt_name2ospname((char *)qsd->qsd_svname, qti->qti_buf);
+       if (rc) {
+               CERROR("%s: failed to generate ospname (%d)\n",
+                      qsd->qsd_svname, rc);
+               RETURN(rc);
+       }
+
+       /* the connection callback will start the reintegration
+        * procedure if quota is enabled */
+       rc = lustre_register_osp_item(qti->qti_buf, &qsd->qsd_exp,
+                                     qsd_conn_callback, (void *)qsd);
+       if (rc) {
+               CERROR("%s: fail to get connection to master (%d)\n",
+                      qsd->qsd_svname, rc);
+               RETURN(rc);
+       }
+
        RETURN(0);
 }
 EXPORT_SYMBOL(qsd_prepare);
 
+/*
+ * Start a qsd instance. This will complete the last step of the reintegration
+ * procedure as soon as possible (provided that the master is reachable).
+ * This should be called when recovery has been completed and quota should now
+ * be enforced on every operations.
+ *
+ * \param env - the environment passed by the caller
+ * \param qsd - is the qsd instance associated with the osd device to start
+ */
+int qsd_start(const struct lu_env *env, struct qsd_instance *qsd)
+{
+       int     type, rc = 0;
+       ENTRY;
+
+       cfs_write_lock(&qsd->qsd_lock);
+       if (!qsd->qsd_prepared) {
+               CERROR("%s: can't start qsd instance since it was properly "
+                      "initialized\n", qsd->qsd_svname);
+               rc = -EFAULT;
+       } else if (qsd->qsd_started) {
+               CERROR("%s: qsd instance already started\n", qsd->qsd_svname);
+               rc = -EALREADY;
+       } else {
+               /* notify that the qsd_instance is now started */
+               qsd->qsd_started = true;
+       }
+       cfs_write_unlock(&qsd->qsd_lock);
+
+       if (rc)
+               RETURN(rc);
+
+       /* Trigger the 3rd step of reintegration: If usage > granted, acquire
+        * up to usage; If usage < granted, release down to usage.  */
+       for (type = USRQUOTA; type < MAXQUOTAS; type++) {
+               struct qsd_qtype_info   *qqi = qsd->qsd_type_array[type];
+               cfs_waitq_signal(&qqi->qqi_reint_thread.t_ctl_waitq);
+       }
+
+       RETURN(rc);
+}
+EXPORT_SYMBOL(qsd_start);
+
 void lustre_register_quota_process_config(int (*qpc)(struct lustre_cfg *lcfg));
 
 /*
@@ -443,9 +677,16 @@ void lustre_register_quota_process_config(int (*qpc)(struct lustre_cfg *lcfg));
  */
 int qsd_glb_init(void)
 {
+       int     rc;
+
+       rc = lu_kmem_init(qsd_caches);
+       if (rc)
+               return rc;
+
        qsd_key_init_generic(&qsd_thread_key, NULL);
        lu_context_key_register(&qsd_thread_key);
        lustre_register_quota_process_config(qsd_process_config);
+
        return 0;
 }
 
@@ -455,5 +696,6 @@ int qsd_glb_init(void)
 void qsd_glb_fini(void)
 {
        lustre_register_quota_process_config(NULL);
+       lu_kmem_fini(qsd_caches);
        lu_context_key_degister(&qsd_thread_key);
 }
index 7c17a62..ae916c7 100644 (file)
@@ -227,10 +227,10 @@ static int qsd_glb_glimpse_ast(struct ldlm_lock *lock, void *data)
                /* valid race */
                GOTO(out, rc = -ELDLM_NO_LOCK_DATA);
 
-       LCONSOLE_INFO("%s: glimpse on glb quota locks, id:"LPU64" ver:"LPU64
-                     " hard:" LPU64" soft:"LPU64"\n", qqi->qqi_qsd->qsd_svname,
-                     desc->gl_id.qid_uid, desc->gl_ver, desc->gl_hardlimit,
-                     desc->gl_softlimit);
+       CDEBUG(D_QUOTA, "%s: glimpse on glb quota locks, id:"LPU64" ver:"LPU64
+              " hard:" LPU64" soft:"LPU64"\n", qqi->qqi_qsd->qsd_svname,
+              desc->gl_id.qid_uid, desc->gl_ver, desc->gl_hardlimit,
+              desc->gl_softlimit);
 
        if (desc->gl_ver == 0) {
                CERROR("%s: invalid global index version "LPU64"\n",
@@ -380,8 +380,8 @@ static int qsd_id_glimpse_ast(struct ldlm_lock *lock, void *data)
                /* valid race */
                GOTO(out, rc = -ELDLM_NO_LOCK_DATA);
 
-       LQUOTA_CONSOLE(lqe, "glimpse on quota locks, new qunit:"LPU64,
-                      desc->gl_qunit);
+       LQUOTA_DEBUG(lqe, "glimpse on quota locks, new qunit:"LPU64,
+                    desc->gl_qunit);
 
        qsd = lqe2qqi(lqe)->qqi_qsd;
 
@@ -400,14 +400,14 @@ static int qsd_id_glimpse_ast(struct ldlm_lock *lock, void *data)
 
                if (space > 0) {
                        if (lqe->lqe_pending_req > 0) {
-                               LQUOTA_ERROR(lqe, "request in flight, postpone "
+                               LQUOTA_DEBUG(lqe, "request in flight, postpone "
                                             "release of "LPD64, space);
                                lvb->lvb_id_may_rel = space;
                        } else {
                                lqe->lqe_pending_req++;
 
                                /* release quota space in glimpse reply */
-                               LQUOTA_ERROR(lqe, "releasing "LPD64, space);
+                               LQUOTA_DEBUG(lqe, "releasing "LPD64, space);
                                lqe->lqe_granted -= space;
                                lvb->lvb_id_rel   = space;
 
index fdbfdc8..040b885 100644 (file)
 
 #include "qsd_internal.h"
 
+/*
+ * Completion function invoked when the global quota lock enqueue has completed
+ */
+static void qsd_reint_completion(const struct lu_env *env,
+                                struct qsd_qtype_info *qqi,
+                                struct quota_body *req_qbody,
+                                struct quota_body *rep_qbody,
+                                struct lustre_handle *lockh,
+                                union ldlm_wire_lvb *lvb,
+                                void *arg, int rc)
+{
+       struct qsd_instance     *qsd = qqi->qqi_qsd;
+       __u64                   *slv_ver = (__u64 *)arg;
+       ENTRY;
+
+       if (rc) {
+               CERROR("%s: failed to enqueue global quota lock, glb "
+                      "fid:"DFID", rc:%d\n", qsd->qsd_svname,
+                      PFID(&req_qbody->qb_fid), rc);
+               RETURN_EXIT;
+       }
+
+       CDEBUG(D_QUOTA, "%s: global quota lock successfully acquired, glb "
+              "fid:"DFID", glb ver:"LPU64", slv fid:"DFID", slv ver:"LPU64"\n",
+              qsd->qsd_svname, PFID(&req_qbody->qb_fid),
+              lvb->l_lquota.lvb_glb_ver, PFID(&rep_qbody->qb_slv_fid),
+              rep_qbody->qb_slv_ver);
+
+       *slv_ver = rep_qbody->qb_slv_ver;
+       memcpy(&qqi->qqi_slv_fid, &rep_qbody->qb_slv_fid,
+              sizeof(struct lu_fid));
+       lustre_handle_copy(&qqi->qqi_lockh, lockh);
+       EXIT;
+}
+
+static int qsd_reint_qid(const struct lu_env *env, struct qsd_qtype_info *qqi,
+                        bool global, union lquota_id *qid, void *rec)
+{
+       struct lquota_entry     *lqe;
+       int                      rc;
+       ENTRY;
+
+       lqe = lqe_locate(env, qqi->qqi_site, qid);
+       if (IS_ERR(lqe))
+               RETURN(PTR_ERR(lqe));
+
+       rc = qsd_update_lqe(env, lqe, global, rec);
+       if (rc)
+               GOTO(out, rc);
+
+       rc = qsd_update_index(env, qqi, qid, global, 0, rec);
+out:
+       lqe_putref(lqe);
+       RETURN(rc);
+}
+
+static int qsd_reint_entries(const struct lu_env *env,
+                            struct qsd_qtype_info *qqi,
+                            struct idx_info *ii, bool global,
+                            cfs_page_t **pages,
+                            unsigned int npages, bool need_swab)
+{
+       struct qsd_thread_info  *qti = qsd_info(env);
+       union lquota_id         *qid = &qti->qti_id;
+       int                      i, j, k, size;
+       int                      rc = 0;
+       ENTRY;
+
+       /* sanity check on the record size */
+       if ((global && ii->ii_recsize != sizeof(struct lquota_glb_rec)) ||
+           (!global && ii->ii_recsize != sizeof(struct lquota_slv_rec))) {
+               CERROR("Invalid record size:%d, global:%s\n",
+                      ii->ii_recsize, global ? "true" : "false");
+               RETURN(-EINVAL);
+       }
+
+       size = ii->ii_recsize + ii->ii_keysize + sizeof(__u64);
+
+       for (i = 0; i < npages; i++) {
+               union lu_page   *lip = cfs_kmap(pages[i]);
+
+               for (j = 0; j < LU_PAGE_COUNT; j++) {
+                       for (k = 0; k < lip->lp_idx.lip_nr; k++) {
+                               char *entry;
+
+                               entry = lip->lp_idx.lip_entries + k * size;
+                               memcpy(qid, entry, ii->ii_keysize); /* key */
+                               entry += ii->ii_keysize;            /* value */
+
+                               if (need_swab) {
+                                       int offset = 0;
+
+                                       /* swab key */
+                                       __swab64s(&qid->qid_uid);
+                                       /* quota records only include 64-bit
+                                        * fields */
+                                       while (offset < ii->ii_recsize) {
+                                               __swab64s((__u64 *)
+                                                             (entry + offset));
+                                               offset += sizeof(__u64);
+                                       }
+                               }
+
+                               rc = qsd_reint_qid(env, qqi, global, qid,
+                                                  (void *)entry);
+                               if (rc)
+                                       GOTO(out, rc);
+                       }
+                       lip++;
+               }
+out:
+               cfs_kunmap(pages[i]);
+               if (rc)
+                       break;
+       }
+       RETURN(rc);
+}
+
+static int qsd_reint_index(const struct lu_env *env, struct qsd_qtype_info *qqi,
+                          bool global)
+{
+       struct qsd_thread_info  *qti = qsd_info(env);
+       struct qsd_instance     *qsd = qqi->qqi_qsd;
+       struct idx_info         *ii = &qti->qti_ii;
+       struct lu_fid           *fid;
+       cfs_page_t              **pages = NULL;
+       unsigned int             npages, pg_cnt;
+       __u64                    start_hash = 0, ver = 0;
+       bool                     need_swab = false;
+       int                      i, rc;
+       ENTRY;
+
+       fid = global ? &qqi->qqi_fid : &qqi->qqi_slv_fid;
+
+       /* let's do a 1MB bulk */
+       npages = min_t(unsigned int, PTLRPC_MAX_BRW_SIZE, 1 << 20);
+       npages /= CFS_PAGE_SIZE;
+
+       /* allocate pages for bulk index read */
+       OBD_ALLOC(pages, npages * sizeof(*pages));
+       if (pages == NULL)
+               GOTO(out, rc = -ENOMEM);
+       for (i = 0; i < npages; i++) {
+               pages[i] = cfs_alloc_page(CFS_ALLOC_STD);
+               if (pages[i] == NULL)
+                       GOTO(out, rc = -ENOMEM);
+       }
+
+repeat:
+       /* initialize index_info request with FID of global index */
+       memset(ii, 0, sizeof(*ii));
+       memcpy(&ii->ii_fid, fid, sizeof(*fid));
+       ii->ii_magic = IDX_INFO_MAGIC;
+       ii->ii_flags = II_FL_NOHASH;
+       ii->ii_count = npages * LU_PAGE_COUNT;
+       ii->ii_hash_start = start_hash;
+
+       /* send bulk request to quota master to read global index */
+       rc = qsd_fetch_index(env, qsd->qsd_exp, ii, npages, pages, &need_swab);
+       if (rc) {
+               CWARN("%s: failed to fetch index for "DFID". %d\n",
+                     qsd->qsd_svname, PFID(fid), rc);
+               GOTO(out, rc);
+       }
+
+       /* various sanity checks */
+       if (ii->ii_magic != IDX_INFO_MAGIC) {
+               CERROR("%s: invalid magic in index transfer %x != %x\n",
+                      qsd->qsd_svname, ii->ii_magic, IDX_INFO_MAGIC);
+               GOTO(out, rc = -EPROTO);
+       }
+       if ((ii->ii_flags & II_FL_VARKEY) != 0)
+               CWARN("%s: II_FL_VARKEY is set on index transfer for fid "DFID
+                     ", it shouldn't be\n", qsd->qsd_svname, PFID(fid));
+       if ((ii->ii_flags & II_FL_NONUNQ) != 0)
+               CWARN("%s: II_FL_NONUNQ is set on index transfer for fid "DFID
+                     ", it shouldn't be\n", qsd->qsd_svname, PFID(fid));
+       if (ii->ii_keysize != sizeof(__u64)) {
+               CERROR("%s: invalid key size reported on index transfer for "
+                      "fid "DFID", %u != %u\n", qsd->qsd_svname, PFID(fid),
+                      ii->ii_keysize, (int)sizeof(__u64));
+               GOTO(out, rc = -EPROTO);
+       }
+       if (ii->ii_version == 0 && ii->ii_count != 0)
+               CWARN("%s: index version for fid "DFID" is 0, but index isn't "
+                     "empty (%d)\n", qsd->qsd_svname, PFID(fid), ii->ii_count);
+
+       CDEBUG(D_QUOTA, "%s: reintegration process for fid "DFID" successfully "
+              "fetched %s index, count = %d\n", qsd->qsd_svname,
+              PFID(fid), global ? "global" : "slave", ii->ii_count);
+
+       if (start_hash == 0)
+               /* record version associated with the first bulk transfer */
+               ver = ii->ii_version;
+
+       pg_cnt = (ii->ii_count + (LU_PAGE_COUNT) - 1);
+       pg_cnt >>= CFS_PAGE_SHIFT - LU_PAGE_SHIFT;
+
+       if (pg_cnt > npages) {
+               CERROR("%s: master returned more pages than expected, %u > %u"
+                      "\n", qsd->qsd_svname, pg_cnt, npages);
+               pg_cnt = npages;
+       }
+
+       rc = qsd_reint_entries(env, qqi, ii, global, pages, pg_cnt, need_swab);
+       if (rc)
+               GOTO(out, rc);
+
+       if (ii->ii_hash_end != II_END_OFF) {
+               start_hash = ii->ii_hash_end;
+               goto repeat;
+       }
+out:
+       if (pages != NULL) {
+               for (i = 0; i < npages; i++)
+                       if (pages[i] != NULL)
+                               cfs_free_page(pages[i]);
+               OBD_FREE(pages, npages * sizeof(*pages));
+       }
+
+       /* Update index version */
+       if (rc == 0) {
+               rc = qsd_write_version(env, qqi, ver, global);
+               if (rc)
+                       CERROR("%s: write version "LPU64" to "DFID" failed. "
+                              "%d\n", qsd->qsd_svname, ver, PFID(fid), rc);
+       }
+
+       RETURN(rc);
+}
+
+static int qsd_reconciliation(const struct lu_env *env,
+                             struct qsd_qtype_info *qqi)
+{
+       struct qsd_thread_info  *qti = qsd_info(env);
+       struct qsd_instance     *qsd = qqi->qqi_qsd;
+       const struct dt_it_ops  *iops;
+       struct dt_it            *it;
+       struct dt_key           *key;
+       struct lquota_entry     *lqe;
+       union lquota_id         *qid = &qti->qti_id;
+       int                      rc;
+       ENTRY;
+
+       LASSERT(qqi->qqi_glb_obj != NULL);
+       iops = &qqi->qqi_glb_obj->do_index_ops->dio_it;
+
+       it = iops->init(env, qqi->qqi_glb_obj, 0, BYPASS_CAPA);
+       if (IS_ERR(it)) {
+               CWARN("%s: Initialize it for "DFID" failed. %ld\n",
+                     qsd->qsd_svname, PFID(&qqi->qqi_fid), PTR_ERR(it));
+               RETURN(PTR_ERR(it));
+       }
+
+       rc = iops->load(env, it, 0);
+       if (rc < 0) {
+               CWARN("%s: Load first entry for "DFID" failed. %d\n",
+                     qsd->qsd_svname, PFID(&qqi->qqi_fid), rc);
+               GOTO(out, rc);
+       } else if (rc == 0) {
+               rc = iops->next(env, it);
+               if (rc != 0)
+                       GOTO(out, rc = (rc < 0) ? rc : 0);
+       }
+
+       do {
+               key = iops->key(env, it);
+               if (IS_ERR(key)) {
+                       CWARN("%s: Error key for "DFID". %ld\n",
+                             qsd->qsd_svname, PFID(&qqi->qqi_fid),
+                             PTR_ERR(key));
+                       GOTO(out, rc = PTR_ERR(key));
+               }
+
+               /* skip the root user/group */
+               if (*((__u64 *)key) == 0)
+                       goto next;
+
+               qid->qid_uid = *((__u64 *)key);
+
+               lqe = lqe_locate(env, qqi->qqi_site, qid);
+               if (IS_ERR(lqe)) {
+                       CWARN("%s: Fail to locate lqe. "DFID", %ld\n",
+                             qsd->qsd_svname, PFID(&qqi->qqi_fid),
+                             PTR_ERR(lqe));
+                       GOTO(out, rc = PTR_ERR(lqe));
+               }
+
+               if (!lqe->lqe_enforced) {
+                       lqe_putref(lqe);
+                       goto next;
+               }
+
+               rc = qsd_refresh_usage(env, lqe);
+               if (rc) {
+                       CWARN("%s: Fail to get usage. "DFID", %d\n",
+                             qsd->qsd_svname, PFID(&qqi->qqi_fid), rc);
+                       lqe_putref(lqe);
+                       GOTO(out, rc);
+               }
+
+               rc = qsd_dqacq(env, lqe, QSD_REP);
+               lqe_putref(lqe);
+
+               if (rc) {
+                       CWARN("%s: Fail to report quota. "DFID", %d\n",
+                             qsd->qsd_svname, PFID(&qqi->qqi_fid), rc);
+                       GOTO(out, rc);
+               }
+next:
+               rc = iops->next(env, it);
+               if (rc < 0)
+                       CWARN("%s: Error next "DFID". %d\n", qsd->qsd_svname,
+                             PFID(&qqi->qqi_fid), rc);
+       } while (rc == 0);
+
+       /* reach the end */
+       if (rc > 0)
+               rc = 0;
+out:
+       iops->put(env, it);
+       iops->fini(env, it);
+       RETURN(rc);
+}
+
+static int qsd_connected(struct qsd_instance *qsd)
+{
+       int     connected;
+
+       cfs_read_lock(&qsd->qsd_lock);
+       connected = qsd->qsd_exp_valid ? 1 : 0;
+       cfs_read_unlock(&qsd->qsd_lock);
+
+       return connected;
+}
+
+static int qsd_started(struct qsd_instance *qsd)
+{
+       int     started;
+
+       cfs_read_lock(&qsd->qsd_lock);
+       started = qsd->qsd_started ? 1 : 0;
+       cfs_read_unlock(&qsd->qsd_lock);
+
+       return started;
+}
+
+/*
+ * Routine executed by the reintegration thread.
+ */
+static int qsd_reint_main(void *args)
+{
+       struct lu_env           *env;
+       struct qsd_thread_info  *qti;
+       struct qsd_qtype_info   *qqi = (struct qsd_qtype_info *)args;
+       struct qsd_instance     *qsd = qqi->qqi_qsd;
+       struct ptlrpc_thread    *thread = &qqi->qqi_reint_thread;
+       struct l_wait_info       lwi = { 0 };
+       int                      rc;
+       ENTRY;
+
+       cfs_daemonize("qsd_reint");
+
+       CDEBUG(D_QUOTA, "%s: Starting reintegration thread for "DFID"\n",
+              qsd->qsd_svname, PFID(&qqi->qqi_fid));
+
+       qqi_getref(qqi);
+       lu_ref_add(&qqi->qqi_reference, "reint_thread", thread);
+
+       thread_set_flags(thread, SVC_RUNNING);
+       cfs_waitq_signal(&thread->t_ctl_waitq);
+
+       OBD_ALLOC_PTR(env);
+       if (env == NULL)
+               GOTO(out, rc = -ENOMEM);
+
+       /* initialize environment */
+       rc = lu_env_init(env, LCT_DT_THREAD);
+       if (rc)
+               GOTO(out_env, rc);
+       qti = qsd_info(env);
+
+       /* wait for the connection to master established */
+       l_wait_event(thread->t_ctl_waitq,
+                    qsd_connected(qsd) || !thread_is_running(thread), &lwi);
+
+       /* Step 1: enqueue global index lock */
+       if (!thread_is_running(thread))
+               GOTO(out_env_init, rc = 0);
+
+       LASSERT(qsd->qsd_exp != NULL);
+       LASSERT(qqi->qqi_glb_uptodate == 0 || qqi->qqi_slv_uptodate == 0);
+
+       memset(&qti->qti_lvb, 0, sizeof(qti->qti_lvb));
+
+       cfs_read_lock(&qsd->qsd_lock);
+       /* check whether we already own a global quota lock for this type */
+       if (lustre_handle_is_used(&qqi->qqi_lockh) &&
+           ldlm_lock_addref_try(&qqi->qqi_lockh, qsd_glb_einfo.ei_mode) == 0) {
+               cfs_read_unlock(&qsd->qsd_lock);
+               /* force refresh of global & slave index copy */
+               qti->qti_lvb.l_lquota.lvb_glb_ver = ~0ULL;
+               qti->qti_slv_ver = ~0ULL;
+       } else {
+               /* no valid lock found, let's enqueue a new one */
+               cfs_read_unlock(&qsd->qsd_lock);
+
+               memset(&qti->qti_body, 0, sizeof(qti->qti_body));
+               memcpy(&qti->qti_body.qb_fid, &qqi->qqi_fid,
+                      sizeof(qqi->qqi_fid));
+
+               rc = qsd_intent_lock(env, qsd->qsd_exp, &qti->qti_body, true,
+                                    IT_QUOTA_CONN, qsd_reint_completion, qqi,
+                                    &qti->qti_lvb, (void *)&qti->qti_slv_ver);
+               if (rc)
+                       GOTO(out_env_init, rc);
+
+               CDEBUG(D_QUOTA, "%s: glb_ver:"LPU64"/"LPU64",slv_ver:"LPU64"/"
+                      LPU64"\n", qsd->qsd_svname,
+                      qti->qti_lvb.l_lquota.lvb_glb_ver, qqi->qqi_glb_ver,
+                      qti->qti_slv_ver, qqi->qqi_slv_ver);
+       }
+
+       /* Step 2: reintegrate global index */
+       if (!thread_is_running(thread))
+               GOTO(out_lock, rc = 0);
+
+       OBD_FAIL_TIMEOUT(OBD_FAIL_QUOTA_DELAY_REINT, 10);
+
+       if (qqi->qqi_glb_ver != qti->qti_lvb.l_lquota.lvb_glb_ver) {
+               rc = qsd_reint_index(env, qqi, true);
+               if (rc) {
+                       CWARN("%s: reint global for "DFID" failed. %d\n",
+                             qsd->qsd_svname, PFID(&qqi->qqi_fid), rc);
+                       GOTO(out_lock, rc);
+               }
+       } else {
+               qsd_bump_version(qqi, qqi->qqi_glb_ver, true);
+       }
+
+       /* Step 3: reintegrate slave index */
+       if (!thread_is_running(thread))
+               GOTO(out_lock, rc = 0);
+
+       if (qqi->qqi_slv_ver != qti->qti_slv_ver) {
+               rc = qsd_reint_index(env, qqi, false);
+               if (rc) {
+                       CWARN("%s: Reint slave for "DFID" failed. %d\n",
+                             qsd->qsd_svname, PFID(&qqi->qqi_slv_fid), rc);
+                       GOTO(out_lock, rc);
+               }
+       } else {
+               qsd_bump_version(qqi, qqi->qqi_slv_ver, false);
+       }
+
+       /* wait for the connection to master established */
+       l_wait_event(thread->t_ctl_waitq,
+                    qsd_started(qsd) || !thread_is_running(thread), &lwi);
+
+       if (!thread_is_running(thread))
+               GOTO(out_lock, rc = 0);
+
+       /* Step 4: start reconciliation for each enforced ID */
+       rc = qsd_reconciliation(env, qqi);
+       if (rc)
+               CWARN("%s: reconciliation failed. "DFID", %d\n",
+                     qsd->qsd_svname, PFID(&qti->qti_fid), rc);
+
+       EXIT;
+out_lock:
+       ldlm_lock_decref(&qqi->qqi_lockh, qsd_glb_einfo.ei_mode);
+out_env_init:
+       lu_env_fini(env);
+out_env:
+       OBD_FREE_PTR(env);
+out:
+       cfs_write_lock(&qsd->qsd_lock);
+       qqi->qqi_reint = 0;
+       cfs_write_unlock(&qsd->qsd_lock);
+
+       qqi_putref(qqi);
+       lu_ref_del(&qqi->qqi_reference, "reint_thread", thread);
+
+       thread_set_flags(thread, SVC_STOPPED);
+       cfs_waitq_signal(&thread->t_ctl_waitq);
+       return rc;
+}
+
 void qsd_stop_reint_thread(struct qsd_qtype_info *qqi)
 {
+       struct ptlrpc_thread    *thread = &qqi->qqi_reint_thread;
+       struct l_wait_info       lwi = { 0 };
+
+       if (!thread_is_stopped(thread)) {
+               thread_set_flags(thread, SVC_STOPPING);
+               cfs_waitq_signal(&thread->t_ctl_waitq);
+
+               l_wait_event(thread->t_ctl_waitq,
+                            thread_is_stopped(thread), &lwi);
+       }
 }
 
 int qsd_start_reint_thread(struct qsd_qtype_info *qqi)
 {
+       struct ptlrpc_thread    *thread = &qqi->qqi_reint_thread;
+       struct qsd_instance     *qsd = qqi->qqi_qsd;
+       struct l_wait_info      lwi = { 0 };
+       int                     rc;
+       ENTRY;
+
+       /* don't bother to do reintegration when quota isn't enabled */
+       if (!qsd_type_enabled(qqi->qqi_qsd, qqi->qqi_qtype))
+               RETURN(0);
+
+       /* check if the reintegration has already started or finished */
+       cfs_write_lock(&qsd->qsd_lock);
+
+       if ((qqi->qqi_glb_uptodate && qqi->qqi_slv_uptodate) ||
+            qqi->qqi_reint || qsd->qsd_stopping) {
+               cfs_write_unlock(&qsd->qsd_lock);
+               RETURN(0);
+       }
+       qqi->qqi_reint = 1;
+
+       cfs_write_unlock(&qsd->qsd_lock);
+
+       rc = cfs_create_thread(qsd_reint_main, (void *)qqi, 0);
+       if (rc < 0) {
+               thread_set_flags(thread, SVC_STOPPED);
+               cfs_write_lock(&qsd->qsd_lock);
+               qqi->qqi_reint = 0;
+               cfs_write_unlock(&qsd->qsd_lock);
+               RETURN(rc);
+       }
+
+       l_wait_event(thread->t_ctl_waitq,
+                    thread_is_running(thread) || thread_is_stopped(thread),
+                    &lwi);
        RETURN(0);
 }
index a0c830c..fa98b56 100644 (file)
@@ -393,6 +393,7 @@ int qsd_fetch_index(const struct lu_env *env, struct obd_export *exp,
                                          req->rq_bulk->bd_nob_transferred);
        if (rc < 0)
                GOTO(out, rc);
+       rc = 0;
 
        req_ii = req_capsule_server_get(&req->rq_pill, &RMF_IDX_INFO);
        *ii = *req_ii;
index 4ed599b..cafadd6 100644 (file)
 
 #include "qsd_internal.h"
 
+extern cfs_mem_cache_t *upd_kmem;
+
+/*
+ * Allocate and fill an qsd_upd_rec structure to be processed by the writeback
+ * thread.
+ *
+ * \param qqi - is the qsd_qtype_info structure relevant to the update
+ * \param lqe - is the lquota entry subject to the update
+ * \param qid - is the identifier subject to the update
+ * \param rec - is the record storing the new quota settings
+ * \param ver - is the version associated with the update
+ * \param global - is a boolean set to true if this is an update of the global
+ *                 index and false for a slave index.
+ */
+static struct qsd_upd_rec *qsd_upd_alloc(struct qsd_qtype_info *qqi,
+                                        struct lquota_entry *lqe,
+                                        union lquota_id *qid,
+                                        union lquota_rec *rec, __u64 ver,
+                                        bool global)
+{
+       struct qsd_upd_rec      *upd;
+
+       OBD_SLAB_ALLOC_PTR_GFP(upd, upd_kmem, CFS_ALLOC_IO);
+       if (upd == NULL) {
+               CERROR("Failed to allocate upd");
+               return NULL;
+       }
+
+       /* fill it */
+       CFS_INIT_LIST_HEAD(&upd->qur_link);
+       upd->qur_qqi = qqi;
+       upd->qur_lqe = lqe;
+       if (lqe)
+               lqe_getref(lqe);
+       upd->qur_qid    = *qid;
+       upd->qur_rec    = *rec;
+       upd->qur_ver    = ver;
+       upd->qur_global = global;
+
+       return upd;
+}
+
+static void qsd_upd_free(struct qsd_upd_rec *upd)
+{
+       if (upd->qur_lqe)
+               lqe_putref(upd->qur_lqe);
+       OBD_SLAB_FREE_PTR(upd, upd_kmem);
+}
+
+/* must hold the qsd_lock */
+static void qsd_upd_add(struct qsd_instance *qsd, struct qsd_upd_rec *upd)
+{
+       if (!qsd->qsd_stopping) {
+               list_add_tail(&upd->qur_link, &qsd->qsd_upd_list);
+               /* wake up the upd thread */
+               cfs_waitq_signal(&qsd->qsd_upd_thread.t_ctl_waitq);
+       } else {
+               CWARN("%s: discard deferred update.\n", qsd->qsd_svname);
+               if (upd->qur_lqe)
+                       LQUOTA_WARN(upd->qur_lqe, "discard deferred update.");
+               qsd_upd_free(upd);
+       }
+}
+
+/* must hold the qsd_lock */
+static void qsd_add_deferred(cfs_list_t *list, struct qsd_upd_rec *upd)
+{
+       struct qsd_upd_rec      *tmp;
+
+       /* Sort the updates in ascending order */
+       cfs_list_for_each_entry_reverse(tmp, list, qur_link) {
+
+               LASSERTF(upd->qur_ver != tmp->qur_ver, "ver:"LPU64"\n",
+                        upd->qur_ver);
+
+               if (upd->qur_ver < tmp->qur_ver) {
+                       continue;
+               } else {
+                       cfs_list_add_tail(&upd->qur_link, &tmp->qur_link);
+                       return;
+               }
+       }
+       cfs_list_add(&upd->qur_link, list);
+}
+
+/* must hold the qsd_lock */
+static void qsd_kickoff_deferred(struct qsd_qtype_info *qqi, cfs_list_t *list,
+                                __u64 ver)
+{
+       struct qsd_upd_rec      *upd, *tmp;
+       ENTRY;
+
+       /* Get the first update record in the list, which has the smallest
+        * version, discard all records with versions smaller than the current
+        * one */
+       cfs_list_for_each_entry_safe(upd, tmp, list, qur_link) {
+               if (upd->qur_ver <= ver) {
+                       /* drop this update */
+                       cfs_list_del_init(&upd->qur_link);
+                       CDEBUG(D_QUOTA, "%s: skipping deferred update ver:"
+                              LPU64"/"LPU64", global:%d, qid:"LPU64"\n",
+                              qqi->qqi_qsd->qsd_svname, upd->qur_ver, ver,
+                              upd->qur_global, upd->qur_qid.qid_uid);
+                       qsd_upd_free(upd);
+               } else {
+                       break;
+               }
+       }
+
+       /* No remaining deferred update */
+       if (cfs_list_empty(list))
+               RETURN_EXIT;
+
+       CDEBUG(D_QUOTA, "%s: found deferred update record. "
+              "version:"LPU64"/"LPU64", global:%d, qid:"LPU64"\n",
+              qqi->qqi_qsd->qsd_svname, upd->qur_ver, ver,
+              upd->qur_global, upd->qur_qid.qid_uid);
+
+       LASSERTF(upd->qur_ver > ver, "lur_ver:"LPU64", cur_ver:"LPU64"\n",
+                upd->qur_ver, ver);
+
+       /* Kick off the deferred udpate */
+       if (upd->qur_ver == ver + 1) {
+               list_del_init(&upd->qur_link);
+               qsd_upd_add(qqi->qqi_qsd, upd);
+       }
+       EXIT;
+}
+
 /* Bump version of global or slave index copy
  *
  * \param qqi    - qsd_qtype_info
  */
 void qsd_bump_version(struct qsd_qtype_info *qqi, __u64 ver, bool global)
 {
+       cfs_list_t      *list;
+       __u64           *idx_ver;
+
+       idx_ver = global ? &qqi->qqi_glb_ver : &qqi->qqi_slv_ver;
+       list    = global ? &qqi->qqi_deferred_glb : &qqi->qqi_deferred_slv;
+
+       cfs_write_lock(&qqi->qqi_qsd->qsd_lock);
+       *idx_ver = ver;
+       if (global)
+               qqi->qqi_glb_uptodate = 1;
+       else
+               qqi->qqi_slv_uptodate = 1;
+       qsd_kickoff_deferred(qqi, list, ver);
+       cfs_write_unlock(&qqi->qqi_qsd->qsd_lock);
 }
 
 /*
@@ -60,4 +203,344 @@ void qsd_upd_schedule(struct qsd_qtype_info *qqi, struct lquota_entry *lqe,
                      union lquota_id *qid, union lquota_rec *rec, __u64 ver,
                      bool global)
 {
+       struct qsd_upd_rec      *upd;
+       struct qsd_instance     *qsd = qqi->qqi_qsd;
+       __u64                    cur_ver;
+       ENTRY;
+
+       CDEBUG(D_QUOTA, "%s: schedule update. global:%s, version:"LPU64"\n",
+              qsd->qsd_svname, global ? "true" : "false", ver);
+
+       upd = qsd_upd_alloc(qqi, lqe, qid, rec, ver, global);
+       if (upd == NULL)
+               RETURN_EXIT;
+
+       /* If we don't want update index version, no need to sort the
+        * records in version order, just schedule the updates instantly. */
+       if (ver == 0) {
+               cfs_write_lock(&qsd->qsd_lock);
+               qsd_upd_add(qsd, upd);
+               cfs_write_unlock(&qsd->qsd_lock);
+               RETURN_EXIT;
+       }
+
+       cfs_write_lock(&qsd->qsd_lock);
+
+       cur_ver = global ? qqi->qqi_glb_ver : qqi->qqi_slv_ver;
+
+       if (ver <= cur_ver) {
+               if (global)
+                       /* legitimate race between glimpse AST and
+                        * reintegration */
+                       CDEBUG(D_QUOTA, "%s: discarding glb update from glimpse"
+                              " ver:"LPU64" local ver:"LPU64"\n",
+                              qsd->qsd_svname, ver, cur_ver);
+               else
+                       CERROR("%s: discard slv update, ver:"LPU64" local ver:"
+                              LPU64"\n", qsd->qsd_svname, ver, cur_ver);
+               qsd_upd_free(upd);
+       } else if ((ver == cur_ver + 1) && qqi->qqi_glb_uptodate &&
+                  qqi->qqi_slv_uptodate) {
+               /* In order update, and reintegration has been done. */
+               qsd_upd_add(qsd, upd);
+       } else {
+               /* Out of order update (the one with smaller version hasn't
+                * reached slave or hasn't been flushed to disk yet), or
+                * the reintegration is in progress. Defer the update. */
+               cfs_list_t *list = global ? &qqi->qqi_deferred_glb :
+                                           &qqi->qqi_deferred_slv;
+               qsd_add_deferred(list, upd);
+       }
+
+       cfs_write_unlock(&qsd->qsd_lock);
+
+       EXIT;
+}
+
+static int qsd_process_upd(const struct lu_env *env, struct qsd_upd_rec *upd)
+{
+       struct lquota_entry     *lqe = upd->qur_lqe;
+       struct qsd_qtype_info   *qqi = upd->qur_qqi;
+       int                      rc;
+       ENTRY;
+
+       if (lqe == NULL) {
+               lqe = lqe_locate(env, qqi->qqi_site, &upd->qur_qid);
+               if (IS_ERR(lqe))
+                       GOTO(out, rc = PTR_ERR(lqe));
+       }
+
+       /* The in-memory lqe update for slave index copy isn't deferred,
+        * we shouldn't touch it here. */
+       if (upd->qur_global) {
+               rc = qsd_update_lqe(env, lqe, upd->qur_global, &upd->qur_rec);
+               if (rc)
+                       GOTO(out, rc);
+               /* Report usage asynchronously */
+               if (lqe->lqe_enforced &&
+                   !qsd_refresh_usage(env, lqe)) {
+                       rc = qsd_dqacq(env, lqe, QSD_REP);
+                       LQUOTA_DEBUG(lqe, "Report usage. rc:%d", rc);
+               }
+       }
+
+       rc = qsd_update_index(env, qqi, &upd->qur_qid, upd->qur_global,
+                             upd->qur_ver, &upd->qur_rec);
+out:
+       if (lqe && !IS_ERR(lqe)) {
+               lqe_putref(lqe);
+               upd->qur_lqe = NULL;
+       }
+       RETURN(rc);
+}
+
+void qsd_adjust_schedule(struct lquota_entry *lqe, bool defer, bool cancel)
+{
+       struct qsd_instance     *qsd = lqe2qqi(lqe)->qqi_qsd;
+       bool                     added = false;
+
+       lqe_getref(lqe);
+       cfs_spin_lock(&qsd->qsd_adjust_lock);
+
+       /* the lqe is being queued for the per-ID lock cancel, we should
+        * cancel the lock cancel and re-add it for quota adjust */
+       if (!cfs_list_empty(&lqe->lqe_link) &&
+           lqe->lqe_adjust_time == 0) {
+               cfs_list_del_init(&lqe->lqe_link);
+               lqe_putref(lqe);
+       }
+
+       if (cfs_list_empty(&lqe->lqe_link)) {
+               if (cancel)
+                       lqe->lqe_adjust_time = 0;
+               else
+                       lqe->lqe_adjust_time = defer ?
+                               cfs_time_shift_64(QSD_WB_INTERVAL) :
+                               cfs_time_current_64();
+               /* lqe reference transfered to list */
+               if (defer)
+                       cfs_list_add_tail(&lqe->lqe_link,
+                                         &qsd->qsd_adjust_list);
+               else
+                       cfs_list_add(&lqe->lqe_link, &qsd->qsd_adjust_list);
+               added = true;
+       }
+       cfs_spin_unlock(&qsd->qsd_adjust_lock);
+
+       if (added)
+               cfs_waitq_signal(&qsd->qsd_upd_thread.t_ctl_waitq);
+       else
+               lqe_putref(lqe);
+}
+
+/* return true if there is pending writeback records or the pending
+ * adjust requests */
+static bool qsd_job_pending(struct qsd_instance *qsd, cfs_list_t *upd,
+                           bool *uptodate)
+{
+       bool    job_pending = false;
+       int     qtype;
+
+       LASSERT(cfs_list_empty(upd));
+       *uptodate = true;
+
+       cfs_spin_lock(&qsd->qsd_adjust_lock);
+       if (!cfs_list_empty(&qsd->qsd_adjust_list)) {
+               struct lquota_entry *lqe;
+               lqe = cfs_list_entry(qsd->qsd_adjust_list.next,
+                                    struct lquota_entry, lqe_link);
+               if (cfs_time_beforeq_64(lqe->lqe_adjust_time,
+                                       cfs_time_current_64()))
+                       job_pending = true;
+       }
+       cfs_spin_unlock(&qsd->qsd_adjust_lock);
+
+       cfs_write_lock(&qsd->qsd_lock);
+       if (!cfs_list_empty(&qsd->qsd_upd_list)) {
+               cfs_list_splice_init(&qsd->qsd_upd_list, upd);
+               job_pending = true;
+       }
+
+       for (qtype = USRQUOTA; qtype < MAXQUOTAS; qtype++) {
+               struct qsd_qtype_info *qqi = qsd->qsd_type_array[qtype];
+
+               if (!qsd_type_enabled(qsd, qtype))
+                       continue;
+
+               if (!qqi->qqi_glb_uptodate || !qqi->qqi_slv_uptodate)
+                       *uptodate = false;
+       }
+
+       cfs_write_unlock(&qsd->qsd_lock);
+       return job_pending;
+}
+
+static int qsd_upd_thread(void *arg)
+{
+       struct qsd_instance     *qsd = (struct qsd_instance *)arg;
+       struct ptlrpc_thread    *thread = &qsd->qsd_upd_thread;
+       struct l_wait_info       lwi;
+       cfs_list_t               queue;
+       struct qsd_upd_rec      *upd, *n;
+       char                     pname[MTI_NAME_MAXLEN];
+       struct lu_env           *env;
+       int                      qtype, rc = 0;
+       bool                     uptodate;
+       struct lquota_entry     *lqe, *tmp;
+       __u64                    cur_time;
+       ENTRY;
+
+       OBD_ALLOC_PTR(env);
+       if (env == NULL)
+               RETURN(-ENOMEM);
+
+       rc = lu_env_init(env, LCT_DT_THREAD);
+       if (rc) {
+               CERROR("%s: Fail to init env.", qsd->qsd_svname);
+               OBD_FREE_PTR(env);
+               RETURN(rc);
+       }
+
+       snprintf(pname, MTI_NAME_MAXLEN, "lquota_wb_%s", qsd->qsd_svname);
+       cfs_daemonize(pname);
+
+       thread_set_flags(thread, SVC_RUNNING);
+       cfs_waitq_signal(&thread->t_ctl_waitq);
+
+       CFS_INIT_LIST_HEAD(&queue);
+       lwi = LWI_TIMEOUT(cfs_time_seconds(QSD_WB_INTERVAL), NULL, NULL);
+       while (1) {
+               l_wait_event(thread->t_ctl_waitq,
+                            qsd_job_pending(qsd, &queue, &uptodate) ||
+                            !thread_is_running(thread), &lwi);
+
+               cfs_list_for_each_entry_safe(upd, n, &queue, qur_link) {
+                       cfs_list_del_init(&upd->qur_link);
+                       qsd_process_upd(env, upd);
+                       qsd_upd_free(upd);
+               }
+
+               cfs_spin_lock(&qsd->qsd_adjust_lock);
+               cur_time = cfs_time_current_64();
+               cfs_list_for_each_entry_safe(lqe, tmp, &qsd->qsd_adjust_list,
+                                            lqe_link) {
+                       /* deferred items are sorted by time */
+                       if (!cfs_time_beforeq_64(lqe->lqe_adjust_time,
+                                                cur_time))
+                               break;
+
+                       cfs_list_del_init(&lqe->lqe_link);
+                       cfs_spin_unlock(&qsd->qsd_adjust_lock);
+
+                       if (thread_is_running(thread) && uptodate) {
+                               qsd_refresh_usage(env, lqe);
+                               if (lqe->lqe_adjust_time == 0)
+                                       qsd_id_lock_cancel(env, lqe);
+                               else
+                                       qsd_dqacq(env, lqe, QSD_ADJ);
+                       }
+
+                       lqe_putref(lqe);
+                       cfs_spin_lock(&qsd->qsd_adjust_lock);
+               }
+               cfs_spin_unlock(&qsd->qsd_adjust_lock);
+
+               if (!thread_is_running(thread))
+                       break;
+
+               if (uptodate)
+                       continue;
+
+               for (qtype = USRQUOTA; qtype < MAXQUOTAS; qtype++)
+                       qsd_start_reint_thread(qsd->qsd_type_array[qtype]);
+       }
+       lu_env_fini(env);
+       OBD_FREE_PTR(env);
+       thread_set_flags(thread, SVC_STOPPED);
+       cfs_waitq_signal(&thread->t_ctl_waitq);
+       RETURN(rc);
+}
+
+int qsd_start_upd_thread(struct qsd_instance *qsd)
+{
+       struct ptlrpc_thread    *thread = &qsd->qsd_upd_thread;
+       struct l_wait_info       lwi = { 0 };
+       int                      rc;
+       ENTRY;
+
+       rc = cfs_create_thread(qsd_upd_thread, (void *)qsd, 0);
+       if (rc < 0) {
+               CERROR("Fail to start quota update thread. rc: %d\n", rc);
+               thread_set_flags(thread, SVC_STOPPED);
+               RETURN(rc);
+       }
+
+       l_wait_event(thread->t_ctl_waitq,
+                    thread_is_running(thread) || thread_is_stopped(thread),
+                    &lwi);
+       RETURN(0);
+}
+
+static void qsd_cleanup_deferred(struct qsd_instance *qsd)
+{
+       int     qtype;
+
+       for (qtype = USRQUOTA; qtype < MAXQUOTAS; qtype++) {
+               struct qsd_upd_rec      *upd, *tmp;
+               struct qsd_qtype_info   *qqi = qsd->qsd_type_array[qtype];
+
+               if (qqi == NULL)
+                       continue;
+
+               cfs_write_lock(&qsd->qsd_lock);
+               cfs_list_for_each_entry_safe(upd, tmp, &qqi->qqi_deferred_glb,
+                                            qur_link) {
+                       CWARN("%s: Free global deferred upd: ID:"LPU64", "
+                             "ver:"LPU64"/"LPU64"\n", qsd->qsd_svname,
+                             upd->qur_qid.qid_uid, upd->qur_ver,
+                             qqi->qqi_glb_ver);
+                       list_del_init(&upd->qur_link);
+                       qsd_upd_free(upd);
+               }
+               cfs_list_for_each_entry_safe(upd, tmp, &qqi->qqi_deferred_slv,
+                                            qur_link) {
+                       CWARN("%s: Free slave deferred upd: ID:"LPU64", "
+                             "ver:"LPU64"/"LPU64"\n", qsd->qsd_svname,
+                             upd->qur_qid.qid_uid, upd->qur_ver,
+                             qqi->qqi_slv_ver);
+                       list_del_init(&upd->qur_link);
+                       qsd_upd_free(upd);
+               }
+               cfs_write_unlock(&qsd->qsd_lock);
+       }
+}
+
+static void qsd_cleanup_adjust(struct qsd_instance *qsd)
+{
+       struct lquota_entry     *lqe;
+
+       cfs_spin_lock(&qsd->qsd_adjust_lock);
+       while (!cfs_list_empty(&qsd->qsd_adjust_list)) {
+               lqe = cfs_list_entry(qsd->qsd_adjust_list.next,
+                                    struct lquota_entry, lqe_link);
+               cfs_list_del_init(&lqe->lqe_link);
+               lqe_putref(lqe);
+       }
+       cfs_spin_unlock(&qsd->qsd_adjust_lock);
+}
+
+void qsd_stop_upd_thread(struct qsd_instance *qsd)
+{
+       struct ptlrpc_thread    *thread = &qsd->qsd_upd_thread;
+       struct l_wait_info       lwi    = { 0 };
+
+       if (!thread_is_stopped(thread)) {
+               thread_set_flags(thread, SVC_STOPPING);
+               cfs_waitq_signal(&thread->t_ctl_waitq);
+
+               l_wait_event(thread->t_ctl_waitq, thread_is_stopped(thread),
+                            &lwi);
+       }
+       qsd_cleanup_deferred(qsd);
+       qsd_cleanup_adjust(qsd);
 }