Whamcloud - gitweb
LU-1842 quota: setup/shutdown qmt device
authorJohann Lombardi <johann.lombardi@intel.com>
Fri, 28 Sep 2012 11:40:49 +0000 (13:40 +0200)
committerOleg Drokin <green@whamcloud.com>
Tue, 2 Oct 2012 17:19:36 +0000 (13:19 -0400)
The quota master target (QMT) is in charge of the cluster wide quota
settings. The QMT device is setup/shutdown by the MDT which then
forwards all quota requests to the QMT via a list of request/lock
handlers exported by the QMT.

Signed-off-by: Johann Lombardi <johann.lombardi@intel.com>
Change-Id: I0bd800d2996f5c3b9d861d4c6fbd70dabc3b2860
Reviewed-on: http://review.whamcloud.com/4126
Reviewed-by: Niu Yawei <niu@whamcloud.com>
Tested-by: Hudson
Reviewed-by: Alex Zhuravlev <bzzz@whamcloud.com>
Tested-by: Maloo <whamcloud.maloo@gmail.com>
Reviewed-by: Oleg Drokin <green@whamcloud.com>
15 files changed:
lustre/include/lquota.h
lustre/include/lustre_req_layout.h
lustre/include/obd.h
lustre/include/obd_support.h
lustre/mdt/mdt_handler.c
lustre/mdt/mdt_internal.h
lustre/mdt/mdt_lvb.c
lustre/ptlrpc/layout.c
lustre/quota/Makefile.in
lustre/quota/lquota_internal.h
lustre/quota/lquota_lib.c
lustre/quota/qmt_dev.c [new file with mode: 0644]
lustre/quota/qmt_handler.c [new file with mode: 0644]
lustre/quota/qmt_internal.h [new file with mode: 0644]
lustre/quota/qmt_lock.c [new file with mode: 0644]

index e6bf161..689bfc1 100644 (file)
@@ -32,6 +32,9 @@
 #ifndef _LUSTRE_LQUOTA_H
 #define _LUSTRE_LQUOTA_H
 
+struct lquota_id_info;
+struct lquota_trans;
+
 /* Gather all quota record type in an union that can be used to read any records
  * from disk. All fields of these records must be 64-bit aligned, otherwise the
  * OSD layer may swab them incorrectly. */
@@ -56,48 +59,47 @@ extern struct dt_index_features dt_quota_bgrp_features;
 #define QUOTA_DATAPOOL_NAME   "ost="
 
 /*
- * Quota information attached to a transaction
+ * Quota Master Target support
  */
 
-struct lquota_entry;
+/* Request handlers for quota master operations.
+ * This is used by the MDT to pass quota/lock requests to the quota master
+ * target. This won't be needed any more once the QMT is a real target and
+ * does not rely any more on the MDT service threads and namespace. */
+struct qmt_handlers {
+       /* Handle quotactl request from client. */
+       int (*qmth_quotactl)(const struct lu_env *, struct lu_device *,
+                            struct obd_quotactl *);
 
-struct lquota_id_info {
-       /* quota identifier */
-       union lquota_id          lqi_id;
+       /* Handle dqacq/dqrel request from slave. */
+       int (*qmth_dqacq)(const struct lu_env *, struct lu_device *,
+                         struct ptlrpc_request *);
 
-       /* USRQUOTA or GRPQUOTA for now, could be expanded for
-        * directory quota or other types later.  */
-       int                      lqi_type;
+       /* LDLM intent policy associated with quota locks */
+       int (*qmth_intent_policy)(const struct lu_env *, struct lu_device *,
+                                 struct ptlrpc_request *, struct ldlm_lock **,
+                                 int);
 
-       /* inodes or kbytes to be consumed or released, it could
-        * be negative when releasing space.  */
-       long long                lqi_space;
+       /* Initialize LVB of ldlm resource associated with quota objects */
+       int (*qmth_lvbo_init)(struct lu_device *, struct ldlm_resource *);
 
-       /* quota slave entry structure associated with this ID */
-       struct lquota_entry     *lqi_qentry;
+       /* Update LVB of ldlm resource associated with quota objects */
+       int (*qmth_lvbo_update)(struct lu_device *, struct ldlm_resource *,
+                               struct ptlrpc_request *, int);
 
-       /* whether we are reporting blocks or inodes */
-       bool                     lqi_is_blk;
-};
+       /* Return size of LVB to be packed in ldlm message */
+       int (*qmth_lvbo_size)(struct lu_device *, struct ldlm_lock *);
 
-/* Since we enforce only inode quota in meta pool (MDTs), and block quota in
- * data pool (OSTs), there are at most 4 quota ids being enforced in a single
- * transaction, which is chown transaction:
- * original uid and gid, new uid and gid.
- *
- * This value might need to be revised when directory quota is added.  */
-#define QUOTA_MAX_TRANSIDS    4
+       /* Fill request buffer with lvb */
+       int (*qmth_lvbo_fill)(struct lu_device *, struct ldlm_lock *, void *,
+                             int);
 
-/* all qids involved in a single transaction */
-struct lquota_trans {
-        unsigned short         lqt_id_cnt;
-        struct lquota_id_info  lqt_ids[QUOTA_MAX_TRANSIDS];
+       /* Free lvb associated with ldlm resource */
+       int (*qmth_lvbo_free)(struct lu_device *, struct ldlm_resource *);
 };
 
-/* flags for quota local enforcement */
-#define QUOTA_FL_OVER_USRQUOTA  0x01
-#define QUOTA_FL_OVER_GRPQUOTA  0x02
-#define QUOTA_FL_SYNC           0x04
+/* actual handlers are defined in lustre/quota/qmt_handler.c */
+extern struct qmt_handlers qmt_hdls;
 
 /*
  * Quota enforcement support on slaves
@@ -126,11 +128,6 @@ struct qsd_instance *qsd_init(const struct lu_env *, char *, struct dt_device *,
 
 void qsd_fini(const struct lu_env *, struct qsd_instance *);
 
-/* helper function used by MDT & OFD to retrieve quota accounting information
- * on slave */
-int lquotactl_slv(const struct lu_env *, struct dt_device *,
-                 struct obd_quotactl *);
-
 /* XXX: dummy qsd_op_begin() & qsd_op_end(), will be replaced with the real
  *      one once all the enforcement code landed. */
 static inline int qsd_op_begin(const struct lu_env *env,
@@ -147,4 +144,57 @@ static inline void qsd_op_end(const struct lu_env *env,
                              struct lquota_trans *trans)
 {
 }
+
+/*
+ * Quota information attached to a transaction
+ */
+
+struct lquota_entry;
+
+struct lquota_id_info {
+       /* quota identifier */
+       union lquota_id          lqi_id;
+
+       /* USRQUOTA or GRPQUOTA for now, could be expanded for
+        * directory quota or other types later.  */
+       int                      lqi_type;
+
+       /* inodes or kbytes to be consumed or released, it could
+        * be negative when releasing space.  */
+       long long                lqi_space;
+
+       /* quota slave entry structure associated with this ID */
+       struct lquota_entry     *lqi_qentry;
+
+       /* whether we are reporting blocks or inodes */
+       bool                     lqi_is_blk;
+};
+
+/* Since we enforce only inode quota in meta pool (MDTs), and block quota in
+ * data pool (OSTs), there are at most 4 quota ids being enforced in a single
+ * transaction, which is chown transaction:
+ * original uid and gid, new uid and gid.
+ *
+ * This value might need to be revised when directory quota is added.  */
+#define QUOTA_MAX_TRANSIDS    4
+
+/* all qids involved in a single transaction */
+struct lquota_trans {
+       unsigned short          lqt_id_cnt;
+       struct lquota_id_info   lqt_ids[QUOTA_MAX_TRANSIDS];
+};
+
+/* flags for quota local enforcement */
+#define QUOTA_FL_OVER_USRQUOTA  0x01
+#define QUOTA_FL_OVER_GRPQUOTA  0x02
+#define QUOTA_FL_SYNC           0x04
+
+#define IS_LQUOTA_RES(res)                                             \
+       (res->lr_name.name[LUSTRE_RES_ID_SEQ_OFF] == FID_SEQ_QUOTA ||   \
+        res->lr_name.name[LUSTRE_RES_ID_SEQ_OFF] == FID_SEQ_QUOTA_GLB)
+
+/* helper function used by MDT & OFD to retrieve quota accounting information
+ * on slave */
+int lquotactl_slv(const struct lu_env *, struct dt_device *,
+                 struct obd_quotactl *);
 #endif /* _LUSTRE_LQUOTA_H */
index 01f1e12..eeb3ea8 100644 (file)
@@ -209,6 +209,7 @@ extern struct req_format RQF_LDLM_ENQUEUE;
 extern struct req_format RQF_LDLM_ENQUEUE_LVB;
 extern struct req_format RQF_LDLM_CONVERT;
 extern struct req_format RQF_LDLM_INTENT;
+extern struct req_format RQF_LDLM_INTENT_BASIC;
 extern struct req_format RQF_LDLM_INTENT_GETATTR;
 extern struct req_format RQF_LDLM_INTENT_OPEN;
 extern struct req_format RQF_LDLM_INTENT_CREATE;
index 48ec59e..1db2234 100644 (file)
@@ -846,6 +846,7 @@ struct niobuf_local {
 #define LUSTRE_CACHEOBD_NAME    "cobd"
 #define LUSTRE_ECHO_NAME        "obdecho"
 #define LUSTRE_ECHO_CLIENT_NAME "echo_client"
+#define LUSTRE_QMT_NAME         "qmt"
 
 /* Constant obd names (post-rename) */
 #define LUSTRE_MDS_OBDNAME "MDS"
index cecb687..d34c464 100644 (file)
@@ -404,6 +404,7 @@ int obd_alloc_fail(const void *ptr, const char *name, const char *type,
 #define OBD_FAIL_MGS_PAUSE_REQ           0x904
 #define OBD_FAIL_MGS_PAUSE_TARGET_REG    0x905
 
+#define OBD_FAIL_QUOTA_DQACQ             0xA01
 #define OBD_FAIL_QUOTA_RET_QDATA         0xA02
 #define OBD_FAIL_QUOTA_DELAY_REL         0xA03
 #define OBD_FAIL_QUOTA_DELAY_SD          0xA04
index 2afda58..d9f7188 100644 (file)
@@ -2049,6 +2049,7 @@ static int mdt_quotactl(struct mdt_thread_info *info)
        struct req_capsule      *pill = info->mti_pill;
        struct obd_quotactl     *oqctl, *repoqc;
        int                      id, rc;
+       struct lu_device        *qmt = info->mti_mdt->mdt_qmt_dev;
        ENTRY;
 
        oqctl = req_capsule_client_get(pill, &RMF_OBD_QUOTACTL);
@@ -2073,6 +2074,8 @@ static int mdt_quotactl(struct mdt_thread_info *info)
        case Q_SETINFO:
        case Q_SETQUOTA:
        case Q_GETQUOTA:
+               if (qmt == NULL)
+                       RETURN(-EOPNOTSUPP);
                /* slave quotactl */
        case Q_GETOINFO:
        case Q_GETOQUOTA:
@@ -2117,25 +2120,17 @@ static int mdt_quotactl(struct mdt_thread_info *info)
 
        switch (oqctl->qc_cmd) {
 
-               /* master quotactl */
        case Q_GETINFO:
        case Q_SETINFO:
        case Q_SETQUOTA:
-               /* XXX: not implemented yet. Will be when qmt support is
-                * added */
-               CERROR("quotactl operation %d not implemented yet\n",
-                                       oqctl->qc_cmd);
-               RETURN(-EOPNOTSUPP);
        case Q_GETQUOTA:
-               /* XXX: return no limit for now, just for testing purpose */
-               memset(&oqctl->qc_dqblk, 0, sizeof(struct obd_dqblk));
-               oqctl->qc_dqblk.dqb_valid = QIF_LIMITS;
-               rc = 0;
+               /* forward quotactl request to QMT */
+               rc = qmt_hdls.qmth_quotactl(info->mti_env, qmt, oqctl);
                break;
 
-               /* slave quotactl */
        case Q_GETOINFO:
        case Q_GETOQUOTA:
+               /* slave quotactl */
                rc = lquotactl_slv(info->mti_env, info->mti_mdt->mdt_bottom,
                                   oqctl);
                break;
@@ -2431,6 +2426,22 @@ static int mdt_sec_ctx_handle(struct mdt_thread_info *info)
         return rc;
 }
 
+/*
+ * quota request handlers
+ */
+static int mdt_quota_dqacq(struct mdt_thread_info *info)
+{
+       struct lu_device        *qmt = info->mti_mdt->mdt_qmt_dev;
+       int                      rc;
+       ENTRY;
+
+       if (qmt == NULL)
+               RETURN(err_serious(-EOPNOTSUPP));
+
+       rc = qmt_hdls.qmth_dqacq(info->mti_env, qmt, mdt_info_req(info));
+       RETURN(rc);
+}
+
 static struct mdt_object *mdt_obj(struct lu_object *o)
 {
         LASSERT(lu_device_is_mdt(o->lo_dev));
@@ -3468,6 +3479,7 @@ enum mdt_it_code {
         MDT_IT_TRUNC,
         MDT_IT_GETXATTR,
         MDT_IT_LAYOUT,
+       MDT_IT_QUOTA,
         MDT_IT_NR
 };
 
@@ -3892,6 +3904,10 @@ static int mdt_intent_code(long itcode)
         case IT_LAYOUT:
                 rc = MDT_IT_LAYOUT;
                 break;
+       case IT_QUOTA_DQACQ:
+       case IT_QUOTA_CONN:
+               rc = MDT_IT_QUOTA;
+               break;
         default:
                 CERROR("Unknown intent opcode: %ld\n", itcode);
                 rc = -EINVAL;
@@ -3914,8 +3930,21 @@ static int mdt_intent_opc(long itopc, struct mdt_thread_info *info,
                 RETURN(-EINVAL);
 
         pill = info->mti_pill;
-        flv  = &mdt_it_flavor[opc];
 
+       if (opc == MDT_IT_QUOTA) {
+               struct lu_device *qmt = info->mti_mdt->mdt_qmt_dev;
+
+               if (qmt == NULL)
+                       RETURN(-EOPNOTSUPP);
+
+               /* pass the request to quota master */
+               rc = qmt_hdls.qmth_intent_policy(info->mti_env, qmt,
+                                                mdt_info_req(info), lockp,
+                                                flags);
+               RETURN(rc);
+       }
+
+       flv  = &mdt_it_flavor[opc];
         if (flv->it_fmt != NULL)
                 req_capsule_extend(pill, flv->it_fmt);
 
@@ -3956,7 +3985,7 @@ static int mdt_intent_policy(struct ldlm_namespace *ns,
         LASSERT(pill->rc_req == req);
 
         if (req->rq_reqmsg->lm_bufcount > DLM_INTENT_IT_OFF) {
-                req_capsule_extend(pill, &RQF_LDLM_INTENT);
+               req_capsule_extend(pill, &RQF_LDLM_INTENT_BASIC);
                 it = req_capsule_client_get(pill, &RMF_LDLM_INTENT);
                 if (it != NULL) {
                         rc = mdt_intent_opc(it->opc, info, lockp, flags);
@@ -4846,6 +4875,154 @@ cleanup_mem:
        RETURN(rc);
 }
 
+/* setup quota master target on MDT0 */
+static int mdt_quota_init(const struct lu_env *env, struct mdt_device *mdt,
+                         struct lustre_cfg *cfg)
+{
+       struct obd_device       *obd;
+       char                    *dev = lustre_cfg_string(cfg, 0);
+       char                    *qmtname, *uuid, *p;
+       struct lustre_cfg_bufs  *bufs;
+       struct lustre_cfg       *lcfg;
+       struct lustre_profile   *lprof;
+       struct obd_connect_data *data;
+       int                      rc;
+       ENTRY;
+
+       LASSERT(mdt->mdt_qmt_exp == NULL);
+       LASSERT(mdt->mdt_qmt_dev == NULL);
+
+       /* quota master is on MDT0 only for now */
+       if (mdt->mdt_mite.ms_node_id != 0)
+               RETURN(0);
+
+       /* MGS generates config commands which look as follows:
+        *   #01 (160)setup   0:lustre-MDT0000  1:lustre-MDT0000_UUID  2:0
+        *                    3:lustre-MDT0000-mdtlov  4:f
+        *
+        * We generate the QMT name from the MDT one, just replacing MD with QM
+        * after all the preparations, the logical equivalent will be:
+        *   #01 (160)setup   0:lustre-QMT0000  1:lustre-QMT0000_UUID  2:0
+        *                    3:lustre-MDT0000-osd  4:f */
+       OBD_ALLOC(qmtname, MAX_OBD_NAME);
+       OBD_ALLOC(uuid, UUID_MAX);
+       OBD_ALLOC_PTR(bufs);
+       OBD_ALLOC_PTR(data);
+       if (qmtname == NULL || uuid == NULL || bufs == NULL || data == NULL)
+               GOTO(cleanup_mem, rc = -ENOMEM);
+
+       strcpy(qmtname, dev);
+       p = strstr(qmtname, "-MDT");
+       if (p == NULL)
+               GOTO(cleanup_mem, rc = -ENOMEM);
+       /* replace MD with QM */
+       p[1] = 'Q';
+       p[2] = 'M';
+
+       snprintf(uuid, UUID_MAX, "%s_UUID", qmtname);
+
+       lprof = class_get_profile(lustre_cfg_string(cfg, 0));
+       if (lprof == NULL || lprof->lp_dt == NULL) {
+               CERROR("can't find profile for %s\n",
+                      lustre_cfg_string(cfg, 0));
+               GOTO(cleanup_mem, rc = -EINVAL);
+       }
+
+       lustre_cfg_bufs_reset(bufs, qmtname);
+       lustre_cfg_bufs_set_string(bufs, 1, LUSTRE_QMT_NAME);
+       lustre_cfg_bufs_set_string(bufs, 2, uuid);
+       lustre_cfg_bufs_set_string(bufs, 3, lprof->lp_dt);
+
+       lcfg = lustre_cfg_new(LCFG_ATTACH, bufs);
+       if (!lcfg)
+               GOTO(cleanup_mem, rc = -ENOMEM);
+
+       rc = class_attach(lcfg);
+       if (rc)
+               GOTO(lcfg_cleanup, rc);
+
+       obd = class_name2obd(qmtname);
+       if (!obd) {
+               CERROR("Can not find obd %s (%s in config)\n", qmtname,
+                      lustre_cfg_string(cfg, 0));
+               GOTO(class_detach, rc = -EINVAL);
+       }
+
+       lustre_cfg_free(lcfg);
+
+       lustre_cfg_bufs_reset(bufs, qmtname);
+       lustre_cfg_bufs_set_string(bufs, 1, uuid);
+       lustre_cfg_bufs_set_string(bufs, 2, dev);
+
+       /* for quota, the next device should be the OSD device */
+       lustre_cfg_bufs_set_string(bufs, 3,
+                                  mdt->mdt_bottom->dd_lu_dev.ld_obd->obd_name);
+
+       lcfg = lustre_cfg_new(LCFG_SETUP, bufs);
+
+       rc = class_setup(obd, lcfg);
+       if (rc)
+               GOTO(class_detach, rc);
+
+       mdt->mdt_qmt_dev = obd->obd_lu_dev;
+
+       /* configure local quota objects */
+       rc = mdt->mdt_qmt_dev->ld_ops->ldo_prepare(env,
+                                                  &mdt->mdt_md_dev.md_lu_dev,
+                                                  mdt->mdt_qmt_dev);
+       if (rc)
+               GOTO(class_cleanup, rc);
+
+       /* connect to quota master target */
+       data->ocd_connect_flags = OBD_CONNECT_VERSION;
+       data->ocd_version = LUSTRE_VERSION_CODE;
+       rc = obd_connect(NULL, &mdt->mdt_qmt_exp, obd, &obd->obd_uuid,
+                        data, NULL);
+       if (rc) {
+               CERROR("cannot connect to quota master device %s (%d)\n",
+                      qmtname, rc);
+               GOTO(class_cleanup, rc);
+       }
+
+       EXIT;
+class_cleanup:
+       if (rc) {
+               class_manual_cleanup(obd);
+               mdt->mdt_qmt_dev = NULL;
+       }
+class_detach:
+       if (rc)
+               class_detach(obd, lcfg);
+lcfg_cleanup:
+       lustre_cfg_free(lcfg);
+cleanup_mem:
+       if (bufs)
+               OBD_FREE_PTR(bufs);
+       if (qmtname)
+               OBD_FREE(qmtname, MAX_OBD_NAME);
+       if (uuid)
+               OBD_FREE(uuid, UUID_MAX);
+       if (data)
+               OBD_FREE_PTR(data);
+       return rc;
+}
+
+/* Shutdown quota master target associated with mdt */
+static void mdt_quota_fini(const struct lu_env *env, struct mdt_device *mdt)
+{
+       ENTRY;
+
+       if (mdt->mdt_qmt_exp == NULL)
+               RETURN_EXIT;
+       LASSERT(mdt->mdt_qmt_dev != NULL);
+
+       /* the qmt automatically shuts down when the mdt disconnects */
+       obd_disconnect(mdt->mdt_qmt_exp);
+       mdt->mdt_qmt_exp = NULL;
+       mdt->mdt_qmt_dev = NULL;
+       EXIT;
+}
+
 /**
  * setup CONFIG_ORIG context, used to access local config log.
  * this may need to be rewrite as part of llog rewrite for lu-api.
@@ -4919,6 +5096,8 @@ static void mdt_fini(const struct lu_env *env, struct mdt_device *m)
 
         mdt_procfs_fini(m);
 
+       mdt_quota_fini(env, m);
+
         lut_fini(env, &m->mdt_lut);
         mdt_fs_cleanup(env, m);
         upcall_cache_cleanup(m->mdt_identity_cache);
@@ -5169,9 +5348,13 @@ static int mdt_init0(const struct lu_env *env, struct mdt_device *m,
                 GOTO(err_recovery, rc);
         }
 
+       rc = mdt_quota_init(env, m, cfg);
+       if (rc)
+               GOTO(err_procfs, rc);
+
         rc = mdt_start_ptlrpc_service(m);
         if (rc)
-                GOTO(err_procfs, rc);
+               GOTO(err_quota, rc);
 
         ping_evictor_start();
 
@@ -5191,6 +5374,8 @@ static int mdt_init0(const struct lu_env *env, struct mdt_device *m,
 
         ping_evictor_stop();
         mdt_stop_ptlrpc_service(m);
+err_quota:
+       mdt_quota_fini(env, m);
 err_procfs:
         mdt_procfs_fini(m);
 err_recovery:
@@ -6413,6 +6598,13 @@ static struct mdt_handler mdt_sec_ctx_ops[] = {
         DEF_SEC_CTX_HNDL(FINI,          mdt_sec_ctx_handle)
 };
 
+#define DEF_QUOTA_HNDLF(flags, name, fn)                   \
+       DEF_HNDL(QUOTA, DQACQ, , flags, name, fn, &RQF_QUOTA_ ## name)
+
+static struct mdt_handler mdt_quota_ops[] = {
+       DEF_QUOTA_HNDLF(HABEO_REFERO, DQACQ, mdt_quota_dqacq),
+};
+
 static struct mdt_opc_slice mdt_regular_handlers[] = {
         {
                 .mos_opc_start = MDS_GETATTR,
@@ -6439,6 +6631,11 @@ static struct mdt_opc_slice mdt_regular_handlers[] = {
                 .mos_opc_end   = SEC_LAST_OPC,
                 .mos_hs        = mdt_sec_ctx_ops
         },
+       {
+               .mos_opc_start = QUOTA_DQACQ,
+               .mos_opc_end   = QUOTA_LAST_OPC,
+               .mos_hs        = mdt_quota_ops
+       },
         {
                 .mos_hs        = NULL
         }
index 93a5e75..f83d4e0 100644 (file)
@@ -71,6 +71,7 @@
 #include <lustre_idmap.h>
 #include <lustre_eacl.h>
 #include <lustre_fsfilt.h>
+#include <lquota.h>
 
 /* check if request's xid is equal to last one or not*/
 static inline int req_xid_is_last(struct ptlrpc_request *req)
@@ -179,6 +180,11 @@ struct mdt_device {
         int                        mdt_sec_level;
         struct rename_stats        mdt_rename_stats;
        struct lu_fid              mdt_md_root_fid;
+
+       /* connection to quota master */
+       struct obd_export         *mdt_qmt_exp;
+       /* quota master device associated with this MDT */
+       struct lu_device          *mdt_qmt_dev;
 };
 
 #define MDT_SERVICE_WATCHDOG_FACTOR     (2)
index a61d46a..892c0ea 100644 (file)
 /* Called with res->lr_lvb_sem held */
 static int mdt_lvbo_init(struct ldlm_resource *res)
 {
+       if (IS_LQUOTA_RES(res)) {
+               struct mdt_device       *mdt;
+
+               mdt = ldlm_res_to_ns(res)->ns_lvbp;
+               if (mdt->mdt_qmt_dev == NULL)
+                       return 0;
+
+               /* call lvbo init function of quota master */
+               return qmt_hdls.qmth_lvbo_init(mdt->mdt_qmt_dev, res);
+       }
+
+       return 0;
+}
+
+static int mdt_lvbo_update(struct ldlm_resource *res,
+                          struct ptlrpc_request *req,
+                          int increase_only)
+{
+       if (IS_LQUOTA_RES(res)) {
+               struct mdt_device       *mdt;
+
+               mdt = ldlm_res_to_ns(res)->ns_lvbp;
+               if (mdt->mdt_qmt_dev == NULL)
+                       return 0;
+
+               /* call lvbo update function of quota master */
+               return qmt_hdls.qmth_lvbo_update(mdt->mdt_qmt_dev, res, req,
+                                                increase_only);
+       }
+
        return 0;
 }
 
+
 static int mdt_lvbo_size(struct ldlm_lock *lock)
 {
+       if (IS_LQUOTA_RES(lock->l_resource)) {
+               struct mdt_device       *mdt;
+
+               mdt = ldlm_res_to_ns(lock->l_resource)->ns_lvbp;
+               if (mdt->mdt_qmt_dev == NULL)
+                       return 0;
+
+               /* call lvbo size function of quota master */
+               return qmt_hdls.qmth_lvbo_size(mdt->mdt_qmt_dev, lock);
+       }
+
        return 0;
 }
 
 static int mdt_lvbo_fill(struct ldlm_lock *lock, void *lvb, int lvblen)
 {
+       if (IS_LQUOTA_RES(lock->l_resource)) {
+               struct mdt_device       *mdt;
+
+               mdt = ldlm_res_to_ns(lock->l_resource)->ns_lvbp;
+               if (mdt->mdt_qmt_dev == NULL)
+                       return 0;
+
+               /* call lvbo fill function of quota master */
+               return qmt_hdls.qmth_lvbo_fill(mdt->mdt_qmt_dev, lock, lvb,
+                                              lvblen);
+       }
+
+       return 0;
+}
+
+static int mdt_lvbo_free(struct ldlm_resource *res)
+{
+       if (IS_LQUOTA_RES(res)) {
+               struct mdt_device       *mdt;
+
+               mdt = ldlm_res_to_ns(res)->ns_lvbp;
+               if (mdt->mdt_qmt_dev == NULL)
+                       return 0;
+
+               /* call lvbo free function of quota master */
+               return qmt_hdls.qmth_lvbo_free(mdt->mdt_qmt_dev, res);
+       }
+
        return 0;
 }
 
 struct ldlm_valblock_ops mdt_lvbo = {
        lvbo_init:      mdt_lvbo_init,
-       lvbo_size:      mdt_lvbo_size,
-       lvbo_fill:      mdt_lvbo_fill
+       lvbo_update:    mdt_lvbo_update,
+       lvbo_size:      mdt_lvbo_size,
+       lvbo_fill:      mdt_lvbo_fill,
+       lvbo_free:      mdt_lvbo_free
 };
index 02e434a..c6e3169 100644 (file)
@@ -371,6 +371,12 @@ static const struct req_msg_field *ldlm_gl_callback_server[] = {
         &RMF_DLM_LVB
 };
 
+static const struct req_msg_field *ldlm_intent_basic_client[] = {
+       &RMF_PTLRPC_BODY,
+       &RMF_DLM_REQ,
+       &RMF_LDLM_INTENT,
+};
+
 static const struct req_msg_field *ldlm_intent_client[] = {
         &RMF_PTLRPC_BODY,
         &RMF_DLM_REQ,
@@ -646,6 +652,7 @@ static struct req_format *req_formats[] = {
         &RQF_LDLM_GL_CALLBACK,
        &RQF_LDLM_GL_DESC_CALLBACK,
         &RQF_LDLM_INTENT,
+       &RQF_LDLM_INTENT_BASIC,
         &RQF_LDLM_INTENT_GETATTR,
         &RQF_LDLM_INTENT_OPEN,
         &RQF_LDLM_INTENT_CREATE,
@@ -1243,6 +1250,11 @@ struct req_format RQF_LDLM_GL_DESC_CALLBACK =
                        ldlm_gl_callback_server);
 EXPORT_SYMBOL(RQF_LDLM_GL_DESC_CALLBACK);
 
+struct req_format RQF_LDLM_INTENT_BASIC =
+       DEFINE_REQ_FMT0("LDLM_INTENT_BASIC",
+                       ldlm_intent_basic_client, ldlm_enqueue_lvb_server);
+EXPORT_SYMBOL(RQF_LDLM_INTENT_BASIC);
+
 struct req_format RQF_LDLM_INTENT =
         DEFINE_REQ_FMT0("LDLM_INTENT",
                         ldlm_intent_client, ldlm_intent_server);
index 398d01b..b8745cf 100644 (file)
@@ -1,5 +1,7 @@
 MODULES := lquota
 
+qmt-objs := qmt_dev.o qmt_handler.o qmt_lock.o
+
 quota-objs := lproc_quota.o lquota_lib.o lquota_disk.o lquota_entry.o
 
 qsd-objs := qsd_lib.o qsd_request.o qsd_entry.o qsd_lock.o
@@ -7,6 +9,9 @@ qsd-objs += qsd_reint.o qsd_writeback.o
 
 lquota-objs := $(quota-objs) $(qsd-objs)
 
+lquota-objs += $(qmt-objs)
+
 EXTRA_DIST := $(lquota-objs:%.o=%.c) lquota_internal.h qsd_internal.h
+EXTRA_DIST += qmt_internal.h
 
 @INCLUDE_RULES@
index fd88fe9..b75e5ce 100644 (file)
@@ -377,6 +377,10 @@ int lquota_disk_write(const struct lu_env *, struct thandle *,
 int lquota_disk_update_ver(const struct lu_env *, struct dt_device *,
                           struct dt_object *, __u64);
 
+/* qmt_dev.c */
+int qmt_glb_init(void);
+void qmt_glb_fini(void);
+
 /* lproc_quota.c */
 extern struct file_operations lprocfs_quota_seq_fops;
 
index cb9aa28..6ab1f79 100644 (file)
@@ -325,10 +325,15 @@ const struct dt_index_features *glb_idx_feature(struct lu_fid *fid)
 
 static int __init init_lquota(void)
 {
-       /* new quota initialization */
+       int     rc;
+
        lquota_key_init_generic(&lquota_thread_key, NULL);
        lu_context_key_register(&lquota_thread_key);
 
+       rc = qmt_glb_init();
+       if (rc)
+               return rc;
+
 #if LUSTRE_VERSION_CODE < OBD_OCD_VERSION(2,7,50,0)
        dt_quota_iusr_features = dt_quota_busr_features = dt_quota_glb_features;
        dt_quota_igrp_features = dt_quota_bgrp_features = dt_quota_glb_features;
@@ -341,6 +346,7 @@ static int __init init_lquota(void)
 
 static void exit_lquota(void)
 {
+       qmt_glb_fini();
        lu_context_key_degister(&lquota_thread_key);
 }
 
diff --git a/lustre/quota/qmt_dev.c b/lustre/quota/qmt_dev.c
new file mode 100644 (file)
index 0000000..e165ae7
--- /dev/null
@@ -0,0 +1,402 @@
+/*
+ * GPL HEADER START
+ *
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License version 2 only,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * General Public License version 2 for more details (a copy is included
+ * in the LICENSE file that accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License
+ * version 2 along with this program; if not, write to the
+ * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+ * Boston, MA 021110-1307, USA
+ *
+ * GPL HEADER END
+ */
+/*
+ * Copyright (c) 2012 Intel, Inc.
+ * Use is subject to license terms.
+ *
+ * Author: Johann Lombardi <johann.lombardi@intel.com>
+ * Author: Niu    Yawei    <yawei.niu@intel.com>
+ */
+
+/*
+ * Management of the device associated with a Quota Master Target (QMT).
+ *
+ * The QMT holds the cluster wide quota limits. It stores the quota settings
+ * ({hard,soft} limit & grace time) in a global index file and is in charge
+ * of allocating quota space to slaves while guaranteeing that the overall
+ * limits aren't exceeded. The QMT also maintains one index per slave (in fact,
+ * one per slave per quota type) used to track how much space is allocated
+ * to a given slave. Now that the QMT is aware of the quota space distribution
+ * among slaves, it can afford to rebalance efficiently quota space from one
+ * slave to another. Slaves are asked to release quota space via glimpse
+ * callbacks sent on DLM locks which are granted to slaves when those latters
+ * acquire quota space.
+ *
+ * The QMT device is currently set up by the MDT and should probably be moved
+ * to a separate target in the future. Meanwhile, the MDT forwards all quota
+ * requests to the QMT via a list of request handlers (see struct qmt_handlers
+ * in lquota.h). The QMT also borrows the LDLM namespace from the MDT.
+ *
+ * To bring up a QMT device, the following steps must be completed:
+ *
+ * - call ->ldto_device_alloc to allocate the QMT device and perform basic
+ *   initialization like connecting to the backend OSD device or setting up the
+ *   default pools and the QMT procfs directory.
+ *
+ * - the MDT can then connect to the QMT instance via legacy obd_connect path.
+ *
+ * - once the MDT stack has been fully configured, ->ldto_prepare must be called
+ *   to configure on-disk objects associated with this master target.
+ *
+ * To shutdown a QMT device, the MDT just has to disconnect from the QMT.
+ *
+ * The qmt_device_type structure is registered when the lquota module is
+ * loaded and all the steps described above are automatically done when the MDT
+ * set up the Quota Master Target via calls to class_attach/class_setup, see
+ * mdt_quota_init() for more details.
+ */
+
+#ifndef EXPORT_SYMTAB
+# define EXPORT_SYMTAB
+#endif
+
+#define DEBUG_SUBSYSTEM S_LQUOTA
+
+#include <obd_class.h>
+#include <lprocfs_status.h>
+#include <lustre_disk.h>
+#include "qmt_internal.h"
+
+static const struct lu_device_operations qmt_lu_ops;
+
+/*
+ * Release quota master target and all data structure associated with this
+ * target.
+ * Called on MDT0 cleanup.
+ *
+ * \param env - is the environment passed by the caller
+ * \param ld  - is the lu_device associated with the qmt device to be released
+ *
+ * \retval - NULL on success (backend OSD device is managed by the main stack),
+ *           appropriate error on failure
+ */
+static struct lu_device *qmt_device_fini(const struct lu_env *env,
+                                        struct lu_device *ld)
+{
+       struct qmt_device       *qmt = lu2qmt_dev(ld);
+       ENTRY;
+
+       LASSERT(qmt != NULL);
+
+       CDEBUG(D_QUOTA, "%s: initiating QMT shutdown\n", qmt->qmt_svname);
+
+       /* disconnect from OSD */
+       if (qmt->qmt_child_exp != NULL) {
+               obd_disconnect(qmt->qmt_child_exp);
+               qmt->qmt_child_exp = NULL;
+               qmt->qmt_child     = NULL;
+       }
+
+       RETURN(NULL);
+}
+
+/*
+ * Connect a quota master to the backend OSD device.
+ *
+ * \param env - is the environment passed by the caller
+ * \param qmt - is the quota master target to be connected
+ * \param cfg - is the configuration log record from which we need to extract
+ *              the service name of the backend OSD device to connect to.
+ *
+ * \retval - 0 on success, appropriate error on failure
+ */
+static int qmt_connect_to_osd(const struct lu_env *env, struct qmt_device *qmt,
+                             struct lustre_cfg *cfg)
+{
+       struct obd_connect_data *data = NULL;
+       struct obd_device       *obd;
+       struct lu_device        *ld = qmt2lu_dev(qmt);
+       int                      rc;
+       ENTRY;
+
+       LASSERT(qmt->qmt_child_exp == NULL);
+
+       OBD_ALLOC_PTR(data);
+       if (data == NULL)
+               GOTO(out, rc = -ENOMEM);
+
+       /* look-up OBD device associated with the backend OSD device.
+        * The MDT is kind enough to pass the OBD name in QMT configuration */
+       obd = class_name2obd(lustre_cfg_string(cfg, 3));
+       if (obd == NULL) {
+               CERROR("%s: can't locate backend osd device: %s\n",
+                      qmt->qmt_svname, lustre_cfg_string(cfg, 3));
+               GOTO(out, rc = -ENOTCONN);
+       }
+
+       data->ocd_connect_flags = OBD_CONNECT_VERSION;
+       data->ocd_version = LUSTRE_VERSION_CODE;
+
+       /* connect to OSD device */
+       rc = obd_connect(NULL, &qmt->qmt_child_exp, obd, &obd->obd_uuid, data,
+                        NULL);
+       if (rc) {
+               CERROR("%s: cannot connect to osd dev %s (%d)\n",
+                      qmt->qmt_svname, obd->obd_name, rc);
+               GOTO(out, rc);
+       }
+
+       /* initialize site (although it isn't used anywhere) and lu_device
+        * pointer to next device */
+       qmt->qmt_child = lu2dt_dev(qmt->qmt_child_exp->exp_obd->obd_lu_dev);
+       ld->ld_site    = qmt->qmt_child_exp->exp_obd->obd_lu_dev->ld_site;
+       EXIT;
+out:
+       if (data)
+               OBD_FREE_PTR(data);
+       return rc;
+}
+
+/*
+ * Initialize quota master target device. This includers connecting to
+ * the backend OSD device, initializing the pool configuration and creating the
+ * root procfs directory dedicated to this quota target.
+ * The rest of the initialization is done when the stack is fully configured
+ * (i.e. when ->ldo_start is called across the stack).
+ *
+ * This function is called on MDT0 setup.
+ *
+ * \param env - is the environment passed by the caller
+ * \param qmt - is the quota master target to be initialized
+ * \param ldt - is the device type structure associated with the qmt device
+ * \param cfg - is the configuration record used to configure the qmt device
+ *
+ * \retval - 0 on success, appropriate error on failure
+ */
+static int qmt_device_init0(const struct lu_env *env, struct qmt_device *qmt,
+                           struct lu_device_type *ldt, struct lustre_cfg *cfg)
+{
+       struct lu_device        *ld = qmt2lu_dev(qmt);
+       struct obd_device       *obd;
+       int                      rc;
+       ENTRY;
+
+       /* record who i am, it might be useful ... */
+       strncpy(qmt->qmt_svname, lustre_cfg_string(cfg, 0),
+               sizeof(qmt->qmt_svname) - 1);
+
+       /* look-up the obd_device associated with the qmt */
+       obd = class_name2obd(qmt->qmt_svname);
+       if (obd == NULL)
+               RETURN(-ENOENT);
+
+       /* reference each other */
+       obd->obd_lu_dev = ld;
+       ld->ld_obd      = obd;
+
+       /* connect to backend osd device */
+       rc = qmt_connect_to_osd(env, qmt, cfg);
+       if (rc)
+               GOTO(out, rc);
+
+       EXIT;
+out:
+       if (rc)
+               qmt_device_fini(env, ld);
+       return rc;
+}
+
+/*
+ * Free quota master target device. Companion of qmt_device_alloc()
+ *
+ * \param env - is the environment passed by the caller
+ * \param ld  - is the lu_device associated with the qmt dev to be freed
+ *
+ * \retval - NULL on success (backend OSD device is managed by the main stack),
+ *           appropriate error on failure
+ */
+static struct lu_device *qmt_device_free(const struct lu_env *env,
+                                        struct lu_device *ld)
+{
+       struct qmt_device       *qmt = lu2qmt_dev(ld);
+       ENTRY;
+
+       LASSERT(qmt != NULL);
+
+       lu_device_fini(ld);
+       OBD_FREE_PTR(qmt);
+       RETURN(NULL);
+}
+
+/*
+ * Allocate quota master target and initialize it.
+ *
+ * \param env - is the environment passed by the caller
+ * \param ldt - is the device type structure associated with the qmt
+ * \param cfg - is the configuration record used to configure the qmt
+ *
+ * \retval - lu_device structure associated with the qmt on success,
+ *           appropriate error on failure
+ */
+static struct lu_device *qmt_device_alloc(const struct lu_env *env,
+                                         struct lu_device_type *ldt,
+                                         struct lustre_cfg *cfg)
+{
+       struct qmt_device       *qmt;
+       struct lu_device        *ld;
+       int                      rc;
+       ENTRY;
+
+       /* allocate qmt device */
+       OBD_ALLOC_PTR(qmt);
+       if (qmt == NULL)
+               RETURN(ERR_PTR(-ENOMEM));
+
+       /* configure lu/dt_device */
+       ld = qmt2lu_dev(qmt);
+       dt_device_init(&qmt->qmt_dt_dev, ldt);
+       ld->ld_ops = &qmt_lu_ops;
+
+       /* initialize qmt device */
+       rc = qmt_device_init0(env, qmt, ldt, cfg);
+       if (rc != 0) {
+               qmt_device_free(env, ld);
+               RETURN(ERR_PTR(rc));
+       }
+
+       RETURN(ld);
+}
+
+LU_KEY_INIT_FINI(qmt, struct qmt_thread_info);
+LU_TYPE_INIT_FINI(qmt, &qmt_thread_key);
+LU_CONTEXT_KEY_DEFINE(qmt, LCT_MD_THREAD);
+
+/*
+ * lu device type operations associated with the master target.
+ */
+static struct lu_device_type_operations qmt_device_type_ops = {
+       .ldto_init              = qmt_type_init,
+       .ldto_fini              = qmt_type_fini,
+
+       .ldto_start             = qmt_type_start,
+       .ldto_stop              = qmt_type_stop,
+
+       .ldto_device_alloc      = qmt_device_alloc,
+       .ldto_device_free       = qmt_device_free,
+
+       .ldto_device_fini       = qmt_device_fini,
+};
+
+/*
+ * lu device type structure associated with the master target.
+ * MDT0 uses this structure to configure the qmt.
+ */
+static struct lu_device_type qmt_device_type = {
+       .ldt_tags       = LU_DEVICE_DT,
+       .ldt_name       = LUSTRE_QMT_NAME,
+       .ldt_ops        = &qmt_device_type_ops,
+       .ldt_ctx_tags   = LCT_MD_THREAD,
+};
+
+/*
+ * obd_connect handler used by the MDT to connect to the master target.
+ */
+static int qmt_device_obd_connect(const struct lu_env *env,
+                                 struct obd_export **exp,
+                                 struct obd_device *obd,
+                                 struct obd_uuid *cluuid,
+                                 struct obd_connect_data *data,
+                                 void *localdata)
+{
+       struct lustre_handle    conn;
+       int                     rc;
+       ENTRY;
+
+       rc = class_connect(&conn, obd, cluuid);
+       if (rc)
+               RETURN(rc);
+
+       *exp = class_conn2export(&conn);
+       RETURN(0);
+}
+
+/*
+ * obd_disconnect handler used by the MDT to disconnect from the master target.
+ * We trigger cleanup on disconnect since it means that the MDT is about to
+ * shutdown.
+ */
+static int qmt_device_obd_disconnect(struct obd_export *exp)
+{
+       struct obd_device       *obd = exp->exp_obd;
+       int                      rc;
+       ENTRY;
+
+       rc = class_disconnect(exp);
+       if (rc)
+               RETURN(rc);
+
+       rc = class_manual_cleanup(obd);
+       RETURN(0);
+}
+
+/*
+ * obd device operations associated with the master target.
+ */
+struct obd_ops qmt_obd_ops = {
+       .o_owner        = THIS_MODULE,
+       .o_connect      = qmt_device_obd_connect,
+       .o_disconnect   = qmt_device_obd_disconnect,
+};
+
+/*
+ * Called when the MDS is fully configured. We use it to set up local objects
+ * associated with the quota master target.
+ *
+ * \param env - is the environment passed by the caller
+ * \param parent - is the lu_device of the parent, that's to say the mdt
+ * \param ld  - is the lu_device associated with the master target
+ *
+ * \retval    - 0 on success, appropriate error on failure
+ */
+static int qmt_device_prepare(const struct lu_env *env,
+                             struct lu_device *parent,
+                             struct lu_device *ld)
+{
+       return 0;
+}
+
+/*
+ * lu device operations for the quota master target
+ */
+static const struct lu_device_operations qmt_lu_ops = {
+       .ldo_prepare            = qmt_device_prepare,
+       .ldo_process_config     = NULL, /* to be defined for dynamic pool
+                                        * configuration */
+};
+
+/* global variable initialization called when the lquota module is loaded */
+int qmt_glb_init(void)
+{
+       int rc;
+       ENTRY;
+
+       rc = class_register_type(&qmt_obd_ops, NULL, NULL, LUSTRE_QMT_NAME,
+                                &qmt_device_type);
+       RETURN(rc);
+}
+
+/* called when the lquota module is about to be unloaded */
+void qmt_glb_fini(void)
+{
+       class_unregister_type(LUSTRE_QMT_NAME);
+}
diff --git a/lustre/quota/qmt_handler.c b/lustre/quota/qmt_handler.c
new file mode 100644 (file)
index 0000000..c077867
--- /dev/null
@@ -0,0 +1,125 @@
+/*
+ * GPL HEADER START
+ *
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License version 2 only,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * General Public License version 2 for more details (a copy is included
+ * in the LICENSE file that accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License
+ * version 2 along with this program; if not, write to the
+ * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+ * Boston, MA 021110-1307, USA
+ *
+ * GPL HEADER END
+ */
+/*
+ * Copyright (c) 2012 Intel, Inc.
+ * Use is subject to license terms.
+ *
+ * Author: Johann Lombardi <johann.lombardi@intel.com>
+ * Author: Niu    Yawei    <yawei.niu@intel.com>
+ */
+
+#ifndef EXPORT_SYMTAB
+# define EXPORT_SYMTAB
+#endif
+
+#define DEBUG_SUBSYSTEM S_LQUOTA
+
+#include <obd_class.h>
+#include "qmt_internal.h"
+
+/*
+ * Handle quotactl request.
+ *
+ * \param env   - is the environment passed by the caller
+ * \param ld    - is the lu device associated with the qmt
+ * \param oqctl - is the quotactl request
+ */
+static int qmt_quotactl(const struct lu_env *env, struct lu_device *ld,
+                       struct obd_quotactl *oqctl)
+{
+       struct qmt_device       *qmt = lu2qmt_dev(ld);
+       int                      rc = 0;
+       ENTRY;
+
+       LASSERT(qmt != NULL);
+
+       if (oqctl->qc_type >= MAXQUOTAS)
+               /* invalid quota type */
+               RETURN(-EINVAL);
+
+       switch (oqctl->qc_cmd) {
+
+       case Q_GETINFO:
+       case Q_SETINFO:
+       case Q_SETQUOTA:
+               /* XXX: not implemented yet. */
+               CERROR("quotactl operation %d not implemented yet\n",
+                      oqctl->qc_cmd);
+               RETURN(-EOPNOTSUPP);
+
+       case Q_GETQUOTA:
+               /* XXX: return no limit for now, just for testing purpose */
+               memset(&oqctl->qc_dqblk, 0, sizeof(struct obd_dqblk));
+               oqctl->qc_dqblk.dqb_valid = QIF_LIMITS;
+               rc = 0;
+               break;
+
+       default:
+               CERROR("%s: unsupported quotactl command: %d\n",
+                      qmt->qmt_svname, oqctl->qc_cmd);
+               RETURN(-EFAULT);
+       }
+
+       RETURN(rc);
+}
+
+/*
+ * Handle quota request from slave.
+ *
+ * \param env  - is the environment passed by the caller
+ * \param ld   - is the lu device associated with the qmt
+ * \param req  - is the quota acquire request
+ */
+static int qmt_dqacq(const struct lu_env *env, struct lu_device *ld,
+                    struct ptlrpc_request *req)
+{
+       struct quota_body       *qbody, *repbody;
+       ENTRY;
+
+       qbody = req_capsule_client_get(&req->rq_pill, &RMF_QUOTA_BODY);
+       if (qbody == NULL)
+               RETURN(err_serious(-EPROTO));
+
+       repbody = req_capsule_server_get(&req->rq_pill, &RMF_QUOTA_BODY);
+       if (repbody == NULL)
+               RETURN(err_serious(-EFAULT));
+
+       RETURN(0);
+}
+
+/* Vector of quota request handlers. This vector is used by the MDT to forward
+ * requests to the quota master. */
+struct qmt_handlers qmt_hdls = {
+       /* quota request handlers */
+       .qmth_quotactl          = qmt_quotactl,
+       .qmth_dqacq             = qmt_dqacq,
+
+       /* ldlm handlers */
+       .qmth_intent_policy     = qmt_intent_policy,
+       .qmth_lvbo_init         = qmt_lvbo_init,
+       .qmth_lvbo_update       = qmt_lvbo_update,
+       .qmth_lvbo_size         = qmt_lvbo_size,
+       .qmth_lvbo_fill         = qmt_lvbo_fill,
+       .qmth_lvbo_free         = qmt_lvbo_free,
+};
+EXPORT_SYMBOL(qmt_hdls);
diff --git a/lustre/quota/qmt_internal.h b/lustre/quota/qmt_internal.h
new file mode 100644 (file)
index 0000000..abe782d
--- /dev/null
@@ -0,0 +1,101 @@
+/*
+ * GPL HEADER START
+ *
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License version 2 only,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * General Public License version 2 for more details (a copy is included
+ * in the LICENSE file that accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License
+ * version 2 along with this program; if not, write to the
+ * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+ * Boston, MA 021110-1307, USA
+ *
+ * GPL HEADER END
+ */
+/*
+ * Copyright (c) 2012 Intel, Inc.
+ * Use is subject to license terms.
+ */
+
+#include "lquota_internal.h"
+
+#ifndef _QMT_INTERNAL_H
+#define _QMT_INTERNAL_H
+
+/*
+ * The Quota Master Target Device.
+ * The qmt is responsible for:
+ * - all interactions with MDT0 (provide request handlers, share ldlm namespace,
+ *   manage ldlm lvbo, ...)
+ * - all quota lock management (i.e. global quota locks as well as per-ID locks)
+ * - manage the quota pool configuration
+ *
+ * That's the structure MDT0 connects to in mdt_quota_init().
+ */
+struct qmt_device {
+       /* Super-class. dt_device/lu_device for this master target */
+       struct dt_device        qmt_dt_dev;
+
+       /* service name of this qmt */
+       char                    qmt_svname[MAX_OBD_NAME];
+
+       /* Reference to the next device in the side stack
+        * The child device is actually the OSD device where we store the quota
+        * index files */
+       struct obd_export       *qmt_child_exp;
+       struct dt_device        *qmt_child;
+};
+
+/* Common data shared by qmt handlers */
+struct qmt_thread_info {
+       union lquota_rec        qti_rec;
+       union lquota_id         qti_id;
+};
+
+extern struct lu_context_key qmt_thread_key;
+
+/* helper function to extract qmt_thread_info from current environment */
+static inline
+struct qmt_thread_info *qmt_info(const struct lu_env *env)
+{
+       struct qmt_thread_info  *info;
+
+       info = lu_context_key_get(&env->le_ctx, &qmt_thread_key);
+       if (info == NULL) {
+               lu_env_refill((struct lu_env *)env);
+               info = lu_context_key_get(&env->le_ctx, &qmt_thread_key);
+       }
+       LASSERT(info);
+       return info;
+}
+
+/* helper routine to convert a lu_device into a qmt_device */
+static inline struct qmt_device *lu2qmt_dev(struct lu_device *ld)
+{
+       return container_of0(lu2dt_dev(ld), struct qmt_device, qmt_dt_dev);
+}
+
+/* helper routine to convert a qmt_device into lu_device */
+static inline struct lu_device *qmt2lu_dev(struct qmt_device *qmt)
+{
+       return &qmt->qmt_dt_dev.dd_lu_dev;
+}
+
+/* qmt_lock.c */
+int qmt_intent_policy(const struct lu_env *, struct lu_device *,
+                     struct ptlrpc_request *, struct ldlm_lock **, int);
+int qmt_lvbo_init(struct lu_device *, struct ldlm_resource *);
+int qmt_lvbo_update(struct lu_device *, struct ldlm_resource *,
+                   struct ptlrpc_request *, int);
+int qmt_lvbo_size(struct lu_device *, struct ldlm_lock *);
+int qmt_lvbo_fill(struct lu_device *, struct ldlm_lock *, void *, int);
+int qmt_lvbo_free(struct lu_device *, struct ldlm_resource *);
+#endif /* _QMT_INTERNAL_H */
diff --git a/lustre/quota/qmt_lock.c b/lustre/quota/qmt_lock.c
new file mode 100644 (file)
index 0000000..18730cd
--- /dev/null
@@ -0,0 +1,97 @@
+/*
+ * GPL HEADER START
+ *
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License version 2 only,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * General Public License version 2 for more details (a copy is included
+ * in the LICENSE file that accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License
+ * version 2 along with this program; if not, write to the
+ * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+ * Boston, MA 021110-1307, USA
+ *
+ * GPL HEADER END
+ */
+/*
+ * Copyright (c) 2012 Intel, Inc.
+ * Use is subject to license terms.
+ *
+ * Author: Johann Lombardi <johann.lombardi@intel.com>
+ * Author: Niu    Yawei    <yawei.niu@intel.com>
+ */
+
+#ifndef EXPORT_SYMTAB
+# define EXPORT_SYMTAB
+#endif
+
+#define DEBUG_SUBSYSTEM S_LQUOTA
+
+#include <lustre_dlm.h>
+#include "qmt_internal.h"
+
+/* intent policy function called from mdt_intent_opc() when the intent is of
+ * quota type */
+int qmt_intent_policy(const struct lu_env *env, struct lu_device *ld,
+                     struct ptlrpc_request *req, struct ldlm_lock **lockp,
+                     int flags)
+{
+       ENTRY;
+
+       req_capsule_extend(&req->rq_pill, &RQF_LDLM_INTENT_QUOTA);
+       RETURN(ELDLM_LOCK_ABORTED);
+}
+
+/*
+ * Initialize quota LVB associated with quota indexes.
+ * Called with res->lr_lvb_sem held
+ */
+int qmt_lvbo_init(struct lu_device *ld, struct ldlm_resource *res)
+{
+       return 0;
+}
+
+/*
+ * Update LVB associated with the global quota index.
+ * This function is called from the DLM itself after a glimpse callback, in this
+ * case valid ptlrpc request is passed. It is also called directly from the
+ * quota master in order to refresh the global index version after a quota limit
+ * change.
+ */
+int qmt_lvbo_update(struct lu_device *ld, struct ldlm_resource *res,
+                   struct ptlrpc_request *req, int increase_only)
+{
+       return 0;
+}
+
+/*
+ * Report size of lvb to ldlm layer in order to allocate lvb buffer
+ */
+int qmt_lvbo_size(struct lu_device *ld, struct ldlm_lock *lock)
+{
+       return 0;
+}
+
+/*
+ * Fill request buffer with lvb
+ */
+int qmt_lvbo_fill(struct lu_device *ld, struct ldlm_lock *lock, void *lvb,
+                 int lvblen)
+{
+       return 0;
+}
+
+/*
+ * Free lvb associated with a given ldlm resource
+ */
+int qmt_lvbo_free(struct lu_device *ld, struct ldlm_resource *res)
+{
+       return 0;
+}