Whamcloud - gitweb
LU-1842 quota: qsd request
authorNiu Yawei <niu@whamcloud.com>
Thu, 27 Sep 2012 12:44:55 +0000 (08:44 -0400)
committerOleg Drokin <green@whamcloud.com>
Tue, 2 Oct 2012 16:15:19 +0000 (12:15 -0400)
qsd request handling APIs.

Signed-off-by: Johann Lombardi <johann@whamcloud.com>
Signed-off-by: Niu Yawei <niu@whamcloud.com>
Change-Id: Id88e0a49482e7d56af39f9aa1d3105316b6cb3dd
Reviewed-on: http://review.whamcloud.com/4109
Tested-by: Hudson
Tested-by: Maloo <whamcloud.maloo@gmail.com>
Reviewed-by: Johann Lombardi <johann.lombardi@intel.com>
Reviewed-by: Alex Zhuravlev <bzzz@whamcloud.com>
lustre/include/lustre/lustre_idl.h
lustre/include/lustre_fid.h
lustre/ptlrpc/wiretest.c
lustre/quota/Makefile.in
lustre/quota/lquota_internal.h
lustre/quota/qsd_internal.h
lustre/quota/qsd_lib.c
lustre/quota/qsd_request.c [new file with mode: 0644]
lustre/utils/wirecheck.c
lustre/utils/wiretest.c

index 3bb6f37..67a414e 100644 (file)
@@ -1682,6 +1682,8 @@ enum {
         LUSTRE_RES_ID_SEQ_OFF = 0,
         LUSTRE_RES_ID_VER_OID_OFF = 1,
         LUSTRE_RES_ID_WAS_VER_OFF = 2, /* see note above */
+       LUSTRE_RES_ID_QUOTA_SEQ_OFF = 2,
+       LUSTRE_RES_ID_QUOTA_VER_OID_OFF = 3,
         LUSTRE_RES_ID_HSH_OFF = 3
 };
 
index 96ac595..b5c702a 100644 (file)
@@ -333,6 +333,36 @@ fid_build_reg_res_name(const struct lu_fid *f,
 }
 
 /*
+ * Build (DLM) resource identifier from global quota FID and quota ID.
+ */
+static inline struct ldlm_res_id *
+fid_build_quota_resid(const struct lu_fid *glb_fid, union lquota_id *qid,
+                     struct ldlm_res_id *res)
+{
+       fid_build_reg_res_name(glb_fid, res);
+       res->name[LUSTRE_RES_ID_QUOTA_SEQ_OFF] = fid_seq(&qid->qid_fid);
+       res->name[LUSTRE_RES_ID_QUOTA_VER_OID_OFF] = fid_ver_oid(&qid->qid_fid);
+       return res;
+}
+
+/*
+ * Extract global FID and quota ID from resource name
+ */
+static inline void fid_extract_quota_resid(struct ldlm_res_id *res,
+                                          struct lu_fid *glb_fid,
+                                          union lquota_id *qid)
+{
+       glb_fid->f_seq = res->name[LUSTRE_RES_ID_SEQ_OFF];
+       glb_fid->f_oid = (__u32)res->name[LUSTRE_RES_ID_VER_OID_OFF];
+       glb_fid->f_ver = (__u32)(res->name[LUSTRE_RES_ID_VER_OID_OFF] >> 32);
+
+       qid->qid_fid.f_seq = res->name[LUSTRE_RES_ID_QUOTA_SEQ_OFF];
+       qid->qid_fid.f_oid = (__u32)res->name[LUSTRE_RES_ID_QUOTA_VER_OID_OFF];
+       qid->qid_fid.f_ver =
+               (__u32)(res->name[LUSTRE_RES_ID_QUOTA_VER_OID_OFF] >> 32);
+}
+
+/*
  * Return true if resource is for object identified by fid.
  */
 static inline int fid_res_name_eq(const struct lu_fid *f,
index b0fe417..0fff13f 100644 (file)
@@ -335,6 +335,8 @@ void lustre_assert_wire_constants(void)
         CLASSERT(LDLM_MAX_TYPE == 14);
         CLASSERT(LUSTRE_RES_ID_SEQ_OFF == 0);
         CLASSERT(LUSTRE_RES_ID_VER_OID_OFF == 1);
+       CLASSERT(LUSTRE_RES_ID_QUOTA_SEQ_OFF == 2);
+       CLASSERT(LUSTRE_RES_ID_QUOTA_VER_OID_OFF == 3);
         CLASSERT(LUSTRE_RES_ID_HSH_OFF == 3);
        CLASSERT(LQUOTA_TYPE_USR == 0);
        CLASSERT(LQUOTA_TYPE_GRP == 1);
index be4a908..140a1d6 100644 (file)
@@ -2,7 +2,7 @@ MODULES := lquota
 
 quota-objs := lproc_quota.o lquota_lib.o lquota_disk.o lquota_entry.o
 
-qsd-objs := qsd_lib.o
+qsd-objs := qsd_lib.o qsd_request.o
 
 lquota-objs := $(quota-objs) $(qsd-objs)
 
index 9f74c9a..fd88fe9 100644 (file)
@@ -379,4 +379,8 @@ int lquota_disk_update_ver(const struct lu_env *, struct dt_device *,
 
 /* lproc_quota.c */
 extern struct file_operations lprocfs_quota_seq_fops;
+
+/* qsd_lib.c */
+int qsd_glb_init(void);
+void qsd_glb_fini(void);
 #endif /* _LQUOTA_INTERNAL_H */
index 2ffeb3f..2af0307 100644 (file)
@@ -75,6 +75,9 @@ struct qsd_instance {
  * structure for each quota type (i.e. user & group).
  */
 struct qsd_qtype_info {
+       /* reference count incremented by each user of this structure */
+       cfs_atomic_t             qqi_ref;
+
        /* quota type, either USRQUOTA or GRPQUOTA
         * immutable after creation. */
        int                      qqi_qtype;
@@ -96,5 +99,68 @@ struct qsd_qtype_info {
        __u64                    qqi_glb_ver; /* global index version */
 };
 
+/*
+ * Helper functions & prototypes
+ */
+
+/* qqi_getref/putref is used to track users of a qqi structure  */
+static inline void qqi_getref(struct qsd_qtype_info *qqi)
+{
+       cfs_atomic_inc(&qqi->qqi_ref);
+}
+
+static inline void qqi_putref(struct qsd_qtype_info *qqi)
+{
+       LASSERT(cfs_atomic_read(&qqi->qqi_ref) > 0);
+       cfs_atomic_dec(&qqi->qqi_ref);
+}
+
 #define QSD_RES_TYPE(qsd) ((qsd)->qsd_is_md ? LQUOTA_RES_MD : LQUOTA_RES_DT)
+
+/* Common data shared by qsd-level handlers. This is allocated per-thread to
+ * reduce stack consumption.  */
+struct qsd_thread_info {
+       union lquota_rec                qti_rec;
+       union lquota_id                 qti_id;
+       struct lu_fid                   qti_fid;
+       struct ldlm_res_id              qti_resid;
+       struct ldlm_enqueue_info        qti_einfo;
+       struct lustre_handle            qti_lockh;
+       __u64                           qti_slv_ver;
+       union ldlm_wire_lvb             qti_lvb;
+       union {
+               struct quota_body       qti_body;
+               struct idx_info         qti_ii;
+       };
+       char                            qti_buf[MTI_NAME_MAXLEN];
+};
+
+extern struct lu_context_key qsd_thread_key;
+
+static inline
+struct qsd_thread_info *qsd_info(const struct lu_env *env)
+{
+       struct qsd_thread_info *info;
+
+       info = lu_context_key_get(&env->le_ctx, &qsd_thread_key);
+       LASSERT(info);
+       return info;
+}
+
+/* qsd_request.c */
+typedef void (*qsd_req_completion_t) (const struct lu_env *,
+                                     struct qsd_qtype_info *,
+                                     struct quota_body *, struct quota_body *,
+                                     struct lustre_handle *,
+                                     union ldlm_wire_lvb *, void *, int);
+int qsd_send_dqacq(const struct lu_env *, struct obd_export *,
+                  struct quota_body *, bool, qsd_req_completion_t,
+                  struct qsd_qtype_info *, struct lustre_handle *,
+                  struct lquota_entry *);
+int qsd_intent_lock(const struct lu_env *, struct obd_export *,
+                   struct quota_body *, bool, int, qsd_req_completion_t,
+                   struct qsd_qtype_info *, union ldlm_wire_lvb *, void *);
+int qsd_fetch_index(const struct lu_env *, struct obd_export *,
+                   struct idx_info *, unsigned int, cfs_page_t **, bool *);
+
 #endif /* _QSD_INTERNAL_H */
index 09efa51..1430410 100644 (file)
 
 #include "qsd_internal.h"
 
+/* define qsd thread key */
+LU_KEY_INIT_FINI(qsd, struct qsd_thread_info);
+LU_CONTEXT_KEY_DEFINE(qsd, LCT_MD_THREAD | LCT_DT_THREAD | LCT_LOCAL);
+LU_KEY_INIT_GENERIC(qsd);
+
 /* some procfs helpers */
 static int lprocfs_qsd_rd_state(char *page, char **start, off_t off,
                                int count, int *eof, void *data)
@@ -319,3 +324,21 @@ out:
        RETURN(qsd);
 }
 EXPORT_SYMBOL(qsd_init);
+
+/*
+ * Global initialization performed at module load time
+ */
+int qsd_glb_init(void)
+{
+       qsd_key_init_generic(&qsd_thread_key, NULL);
+       lu_context_key_register(&qsd_thread_key);
+       return 0;
+}
+
+/*
+ * Companion of qsd_glb_init() called at module unload time
+ */
+void qsd_glb_fini(void)
+{
+       lu_context_key_degister(&qsd_thread_key);
+}
diff --git a/lustre/quota/qsd_request.c b/lustre/quota/qsd_request.c
new file mode 100644 (file)
index 0000000..0bad3de
--- /dev/null
@@ -0,0 +1,410 @@
+/*
+ * GPL HEADER START
+ *
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License version 2 only,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * General Public License version 2 for more details (a copy is included
+ * in the LICENSE file that accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License
+ * version 2 along with this program; if not, write to the
+ * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+ * Boston, MA 021110-1307, USA
+ *
+ * GPL HEADER END
+ */
+/*
+ * Copyright (c) 2012 Intel, Inc.
+ * Use is subject to license terms.
+ *
+ * Author: Johann Lombardi <johann@whamcloud.com>
+ * Author: Niu    Yawei    <niu@whamcloud.com>
+ */
+
+#ifndef EXPORT_SYMTAB
+# define EXPORT_SYMTAB
+#endif
+
+#define DEBUG_SUBSYSTEM S_LQUOTA
+
+#include <lustre_net.h>
+#include <lustre_import.h>
+#include <lustre_dlm.h>
+#include <obd_class.h>
+
+#include "qsd_internal.h"
+
+struct qsd_async_args {
+       struct obd_export     *aa_exp;
+       struct qsd_qtype_info *aa_qqi;
+       void                  *aa_arg;
+       union ldlm_wire_lvb   *aa_lvb;
+       struct lustre_handle   aa_lockh;
+       qsd_req_completion_t   aa_completion;
+};
+
+/*
+ * non-intent quota request interpret callback.
+ *
+ * \param env    - the environment passed by the caller
+ * \param req    - the non-intent quota request
+ * \param arg    - qsd_async_args
+ * \param rc     - request status
+ *
+ * \retval 0     - success
+ * \retval -ve   - appropriate errors
+ */
+static int qsd_dqacq_interpret(const struct lu_env *env,
+                              struct ptlrpc_request *req, void *arg, int rc)
+{
+       struct quota_body     *rep_qbody = NULL, *req_qbody;
+       struct qsd_async_args *aa = (struct qsd_async_args *)arg;
+       ENTRY;
+
+       req_qbody = req_capsule_client_get(&req->rq_pill, &RMF_QUOTA_BODY);
+       if (rc == 0 || rc == -EDQUOT || rc == -EINPROGRESS)
+               rep_qbody = req_capsule_server_get(&req->rq_pill,
+                                                  &RMF_QUOTA_BODY);
+       aa->aa_completion(env, aa->aa_qqi, req_qbody, rep_qbody, &aa->aa_lockh,
+                         NULL, aa->aa_arg, rc);
+       RETURN(rc);
+}
+
+/*
+ * Send non-intent quota request to master.
+ *
+ * \param env    - the environment passed by the caller
+ * \param exp    - is the export to use to send the acquire RPC
+ * \param qbody  - quota body to be packed in request
+ * \param sync   - synchronous or asynchronous
+ * \param completion - completion callback
+ * \param qqi    - is the qsd_qtype_info structure to pass to the completion
+ *                 function
+ * \param lqe    - is the qid entry to be processed
+ *
+ * \retval 0     - success
+ * \retval -ve   - appropriate errors
+ */
+int qsd_send_dqacq(const struct lu_env *env, struct obd_export *exp,
+                  struct quota_body *qbody, bool sync,
+                  qsd_req_completion_t completion, struct qsd_qtype_info *qqi,
+                  struct lustre_handle *lockh, struct lquota_entry *lqe)
+{
+       struct ptlrpc_request   *req;
+       struct quota_body       *req_qbody;
+       struct qsd_async_args   *aa;
+       int                      rc;
+       ENTRY;
+
+       LASSERT(exp);
+
+       req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_QUOTA_DQACQ);
+       if (req == NULL)
+               GOTO(out, rc = -ENOMEM);
+
+       req->rq_no_resend = req->rq_no_delay = 1;
+       rc = ptlrpc_request_pack(req, LUSTRE_MDS_VERSION, QUOTA_DQACQ);
+       if (rc) {
+               ptlrpc_request_free(req);
+               GOTO(out, rc);
+       }
+
+       req_qbody = req_capsule_client_get(&req->rq_pill, &RMF_QUOTA_BODY);
+       *req_qbody = *qbody;
+
+       ptlrpc_request_set_replen(req);
+
+       CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
+       aa = ptlrpc_req_async_args(req);
+       aa->aa_exp = exp;
+       aa->aa_qqi = qqi;
+       aa->aa_arg = (void *)lqe;
+       aa->aa_completion = completion;
+       lustre_handle_copy(&aa->aa_lockh, lockh);
+
+       if (sync) {
+               rc = ptlrpc_queue_wait(req);
+               rc = qsd_dqacq_interpret(env, req, aa, rc);
+               ptlrpc_req_finished(req);
+       } else {
+               req->rq_interpret_reply = qsd_dqacq_interpret;
+               ptlrpcd_add_req(req, PDL_POLICY_LOCAL, -1);
+       }
+
+       RETURN(rc);
+out:
+       completion(env, qqi, qbody, NULL, lockh, NULL, lqe, rc);
+       return rc;
+}
+
+/*
+ * intent quota request interpret callback.
+ *
+ * \param env    - the environment passed by the caller
+ * \param req    - the intent quota request
+ * \param arg    - qsd_async_args
+ * \param rc     - request status
+ *
+ * \retval 0     - success
+ * \retval -ve   - appropriate errors
+ */
+static int qsd_intent_interpret(const struct lu_env *env,
+                               struct ptlrpc_request *req, void *arg, int rc)
+{
+       struct lustre_handle     *lockh;
+       struct quota_body        *rep_qbody = NULL, *req_qbody;
+       struct ldlm_intent       *lit;
+       struct qsd_async_args    *aa = (struct qsd_async_args *)arg;
+       int                       flags = LDLM_FL_HAS_INTENT;
+       ENTRY;
+
+       LASSERT(aa->aa_exp);
+       lockh = &aa->aa_lockh;
+       req_qbody = req_capsule_client_get(&req->rq_pill, &RMF_QUOTA_BODY);
+       lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
+
+       rc = ldlm_cli_enqueue_fini(aa->aa_exp, req, LDLM_PLAIN, 0, LCK_CR,
+                                  &flags, (void *)aa->aa_lvb,
+                                  sizeof(union ldlm_wire_lvb), lockh, rc);
+       if (rc < 0)
+               /* the lock has been destroyed, forget about the lock handle */
+               memset(lockh, 0, sizeof(*lockh));
+
+       if (rc == 0 || rc == -EDQUOT || rc == -EINPROGRESS)
+               rep_qbody = req_capsule_server_get(&req->rq_pill,
+                                                  &RMF_QUOTA_BODY);
+
+       aa->aa_completion(env, aa->aa_qqi, req_qbody, rep_qbody, lockh,
+                         aa->aa_lvb, aa->aa_arg, rc);
+       RETURN(rc);
+}
+
+/* XXX to be removed when the qsd lock landed */
+struct ldlm_enqueue_info qsd_glb_einfo;
+struct ldlm_enqueue_info qsd_id_einfo;
+
+/*
+ * Get intent per-ID lock or global-index lock from master.
+ *
+ * \param env    - the environment passed by the caller
+ * \param exp    - is the export to use to send the intent RPC
+ * \param qbody  - quota body to be packed in request
+ * \param sync   - synchronous or asynchronous (pre-acquire)
+ * \param it_op  - IT_QUOTA_DQACQ or IT_QUOTA_CONN
+ * \param completion - completion callback
+ * \param qqi    - is the qsd_qtype_info structure to pass to the completion
+ *                 function
+ * \param lvb    - is the lvb associated with the lock and returned by the
+ *                 server
+ * \param arg    - is an opaq argument passed to the completion callback
+ *
+ * \retval 0     - success
+ * \retval -ve   - appropriate errors
+ */
+int qsd_intent_lock(const struct lu_env *env, struct obd_export *exp,
+                   struct quota_body *qbody, bool sync, int it_op,
+                   qsd_req_completion_t completion, struct qsd_qtype_info *qqi,
+                   union ldlm_wire_lvb *lvb, void *arg)
+{
+       struct qsd_thread_info  *qti = qsd_info(env);
+       struct ptlrpc_request   *req;
+       struct qsd_async_args   *aa = NULL;
+       struct ldlm_intent      *lit;
+       struct quota_body       *req_qbody;
+       int                      rc, flags = LDLM_FL_HAS_INTENT;
+       ENTRY;
+
+       LASSERT(exp != NULL);
+       LASSERT(!lustre_handle_is_used(&qbody->qb_lockh));
+
+       memset(&qti->qti_lockh, 0, sizeof(qti->qti_lockh));
+
+       req = ptlrpc_request_alloc(class_exp2cliimp(exp),
+                                  &RQF_LDLM_INTENT_QUOTA);
+       if (req == NULL)
+               GOTO(out, rc = -ENOMEM);
+
+       req->rq_no_resend = req->rq_no_delay = 1;
+       rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
+       if (rc) {
+               ptlrpc_request_free(req);
+               GOTO(out, rc);
+       }
+
+       lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
+       lit->opc = (__u64)it_op;
+
+       req_qbody = req_capsule_client_get(&req->rq_pill, &RMF_QUOTA_BODY);
+       *req_qbody = *qbody;
+
+       ptlrpc_request_set_replen(req);
+
+       switch(it_op) {
+       case IT_QUOTA_CONN:
+               /* build resource name associated with global index */
+               fid_build_reg_res_name(&qbody->qb_fid, &qti->qti_resid);
+
+               /* copy einfo template and fill ei_cbdata with qqi pointer */
+               memcpy(&qti->qti_einfo, &qsd_glb_einfo, sizeof(qti->qti_einfo));
+               qti->qti_einfo.ei_cbdata = qqi;
+
+               /* don't cancel global lock on memory pressure */
+               flags |= LDLM_FL_NO_LRU;
+               break;
+       case IT_QUOTA_DQACQ:
+               /* build resource name associated for per-ID quota lock */
+               fid_build_quota_resid(&qbody->qb_fid, &qbody->qb_id,
+                                     &qti->qti_resid);
+
+               /* copy einfo template and fill ei_cbdata with lqe pointer */
+               memcpy(&qti->qti_einfo, &qsd_id_einfo, sizeof(qti->qti_einfo));
+               qti->qti_einfo.ei_cbdata = arg;
+               break;
+       default:
+               LASSERTF(0, "invalid it_op %d", it_op);
+       }
+
+       /* build lock enqueue request */
+       rc = ldlm_cli_enqueue(exp, &req, &qti->qti_einfo, &qti->qti_resid, NULL,
+                             &flags, (void *)lvb, sizeof(*lvb), &qti->qti_lockh,
+                             1);
+       if (rc < 0) {
+               ptlrpc_req_finished(req);
+               GOTO(out, rc);
+       }
+
+       /* grab reference on backend structure for the new lock */
+       switch(it_op) {
+       case IT_QUOTA_CONN:
+               /* grab reference on qqi for new lock */
+#ifdef USE_LU_REF
+               struct ldlm_lock        *lock;
+
+               lock = ldlm_handle2lock(&qti->qti_lockh);
+               if (lock == NULL) {
+                       ptlrpc_req_finished(req);
+                       GOTO(out, -ENOLCK);
+               }
+               lu_ref_add(&qqi->qqi_reference, "glb_lock", lock);
+               LDLM_LOCK_PUT(lock);
+#endif
+               qqi_getref(qqi);
+               break;
+       case IT_QUOTA_DQACQ:
+               /* grab reference on lqe for new lock */
+               lqe_getref((struct lquota_entry *)arg);
+               break;
+       default:
+               break;
+       }
+
+       CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
+       aa = ptlrpc_req_async_args(req);
+       aa->aa_exp = exp;
+       aa->aa_qqi = qqi;
+       aa->aa_arg = arg;
+       aa->aa_lvb = lvb;
+       aa->aa_completion = completion;
+       lustre_handle_copy(&aa->aa_lockh, &qti->qti_lockh);
+
+       if (sync) {
+               /* send lock enqueue request and wait for completion */
+               rc = ptlrpc_queue_wait(req);
+               rc = qsd_intent_interpret(env, req, aa, rc);
+               ptlrpc_req_finished(req);
+       } else {
+               /* queue lock request and return */
+               req->rq_interpret_reply = qsd_intent_interpret;
+               ptlrpcd_add_req(req, PDL_POLICY_LOCAL, -1);
+       }
+
+       RETURN(rc);
+out:
+       completion(env, qqi, qbody, NULL, &qti->qti_lockh, NULL, arg, rc);
+       return rc;
+}
+
+/*
+ * Fetch a global or slave index from the QMT.
+ *
+ * \param env    - the environment passed by the caller
+ * \param exp    - is the export to use to issue the OBD_IDX_READ RPC
+ * \param ii     - is the index information to be packed in the request
+ *                 on success, the index information returned by the server
+ *                 is copied there.
+ * \param npages - is the number of pages in the pages array
+ * \param pages  - is an array of @npages pages
+ *
+ * \retval 0     - success
+ * \retval -ve   - appropriate errors
+ */
+int qsd_fetch_index(const struct lu_env *env, struct obd_export *exp,
+                   struct idx_info *ii, unsigned int npages,
+                   cfs_page_t **pages, bool *need_swab)
+{
+       struct ptlrpc_request   *req;
+       struct idx_info         *req_ii;
+       struct ptlrpc_bulk_desc *desc;
+       int                      rc, i;
+       ENTRY;
+
+       LASSERT(exp);
+
+       req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OBD_IDX_READ);
+       if (req == NULL)
+               RETURN(-ENOMEM);
+
+       rc = ptlrpc_request_pack(req, LUSTRE_OBD_VERSION, OBD_IDX_READ);
+       if (rc) {
+               ptlrpc_request_free(req);
+               RETURN(rc);
+       }
+
+       req->rq_request_portal = MDS_READPAGE_PORTAL;
+       ptlrpc_at_set_req_timeout(req);
+
+       /* allocate bulk descriptor */
+       desc = ptlrpc_prep_bulk_imp(req, npages, BULK_PUT_SINK,
+                                   MDS_BULK_PORTAL);
+       if (desc == NULL) {
+               ptlrpc_request_free(req);
+               RETURN(-ENOMEM);
+       }
+
+       /* req now owns desc and will free it when it gets freed */
+       for (i = 0; i < npages; i++)
+               ptlrpc_prep_bulk_page(desc, pages[i], 0, CFS_PAGE_SIZE);
+
+       /* pack index information in request */
+       req_ii = req_capsule_client_get(&req->rq_pill, &RMF_IDX_INFO);
+       *req_ii = *ii;
+
+       ptlrpc_request_set_replen(req);
+
+       /* send request to master and wait for RPC to complete */
+       rc = ptlrpc_queue_wait(req);
+       if (rc)
+               GOTO(out, rc);
+
+       rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk,
+                                         req->rq_bulk->bd_nob_transferred);
+       if (rc < 0)
+               GOTO(out, rc);
+
+       req_ii = req_capsule_server_get(&req->rq_pill, &RMF_IDX_INFO);
+       *ii = *req_ii;
+
+       *need_swab = ptlrpc_rep_need_swab(req);
+
+       EXIT;
+out:
+       ptlrpc_req_finished(req);
+       return rc;
+}
index 111cfbc..5984497 100644 (file)
@@ -2097,6 +2097,8 @@ main(int argc, char **argv)
         CHECK_CVALUE(LUSTRE_RES_ID_SEQ_OFF);
         CHECK_CVALUE(LUSTRE_RES_ID_VER_OID_OFF);
         /* CHECK_CVALUE(LUSTRE_RES_ID_WAS_VER_OFF); packed with OID */
+       CHECK_CVALUE(LUSTRE_RES_ID_QUOTA_SEQ_OFF);
+       CHECK_CVALUE(LUSTRE_RES_ID_QUOTA_VER_OID_OFF);
         CHECK_CVALUE(LUSTRE_RES_ID_HSH_OFF);
 
        CHECK_CVALUE(LQUOTA_TYPE_USR);
index b724534..c389afb 100644 (file)
@@ -343,6 +343,8 @@ void lustre_assert_wire_constants(void)
         CLASSERT(LDLM_MAX_TYPE == 14);
         CLASSERT(LUSTRE_RES_ID_SEQ_OFF == 0);
         CLASSERT(LUSTRE_RES_ID_VER_OID_OFF == 1);
+       CLASSERT(LUSTRE_RES_ID_QUOTA_SEQ_OFF == 2);
+       CLASSERT(LUSTRE_RES_ID_QUOTA_VER_OID_OFF == 3);
         CLASSERT(LUSTRE_RES_ID_HSH_OFF == 3);
        CLASSERT(LQUOTA_TYPE_USR == 0);
        CLASSERT(LQUOTA_TYPE_GRP == 1);