* structure for each quota type (i.e. user & group).
*/
struct qsd_qtype_info {
+ /* reference count incremented by each user of this structure */
+ cfs_atomic_t qqi_ref;
+
/* quota type, either USRQUOTA or GRPQUOTA
* immutable after creation. */
int qqi_qtype;
__u64 qqi_glb_ver; /* global index version */
};
+/*
+ * Helper functions & prototypes
+ */
+
+/* qqi_getref/putref is used to track users of a qqi structure */
+static inline void qqi_getref(struct qsd_qtype_info *qqi)
+{
+ cfs_atomic_inc(&qqi->qqi_ref);
+}
+
+static inline void qqi_putref(struct qsd_qtype_info *qqi)
+{
+ LASSERT(cfs_atomic_read(&qqi->qqi_ref) > 0);
+ cfs_atomic_dec(&qqi->qqi_ref);
+}
+
#define QSD_RES_TYPE(qsd) ((qsd)->qsd_is_md ? LQUOTA_RES_MD : LQUOTA_RES_DT)
+
+/* Common data shared by qsd-level handlers. This is allocated per-thread to
+ * reduce stack consumption. */
+struct qsd_thread_info {
+ union lquota_rec qti_rec;
+ union lquota_id qti_id;
+ struct lu_fid qti_fid;
+ struct ldlm_res_id qti_resid;
+ struct ldlm_enqueue_info qti_einfo;
+ struct lustre_handle qti_lockh;
+ __u64 qti_slv_ver;
+ union ldlm_wire_lvb qti_lvb;
+ union {
+ struct quota_body qti_body;
+ struct idx_info qti_ii;
+ };
+ char qti_buf[MTI_NAME_MAXLEN];
+};
+
+extern struct lu_context_key qsd_thread_key;
+
+static inline
+struct qsd_thread_info *qsd_info(const struct lu_env *env)
+{
+ struct qsd_thread_info *info;
+
+ info = lu_context_key_get(&env->le_ctx, &qsd_thread_key);
+ LASSERT(info);
+ return info;
+}
+
+/* qsd_request.c */
+typedef void (*qsd_req_completion_t) (const struct lu_env *,
+ struct qsd_qtype_info *,
+ struct quota_body *, struct quota_body *,
+ struct lustre_handle *,
+ union ldlm_wire_lvb *, void *, int);
+int qsd_send_dqacq(const struct lu_env *, struct obd_export *,
+ struct quota_body *, bool, qsd_req_completion_t,
+ struct qsd_qtype_info *, struct lustre_handle *,
+ struct lquota_entry *);
+int qsd_intent_lock(const struct lu_env *, struct obd_export *,
+ struct quota_body *, bool, int, qsd_req_completion_t,
+ struct qsd_qtype_info *, union ldlm_wire_lvb *, void *);
+int qsd_fetch_index(const struct lu_env *, struct obd_export *,
+ struct idx_info *, unsigned int, cfs_page_t **, bool *);
+
#endif /* _QSD_INTERNAL_H */
--- /dev/null
+/*
+ * GPL HEADER START
+ *
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License version 2 only,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * General Public License version 2 for more details (a copy is included
+ * in the LICENSE file that accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License
+ * version 2 along with this program; if not, write to the
+ * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+ * Boston, MA 021110-1307, USA
+ *
+ * GPL HEADER END
+ */
+/*
+ * Copyright (c) 2012 Intel, Inc.
+ * Use is subject to license terms.
+ *
+ * Author: Johann Lombardi <johann@whamcloud.com>
+ * Author: Niu Yawei <niu@whamcloud.com>
+ */
+
+#ifndef EXPORT_SYMTAB
+# define EXPORT_SYMTAB
+#endif
+
+#define DEBUG_SUBSYSTEM S_LQUOTA
+
+#include <lustre_net.h>
+#include <lustre_import.h>
+#include <lustre_dlm.h>
+#include <obd_class.h>
+
+#include "qsd_internal.h"
+
+struct qsd_async_args {
+ struct obd_export *aa_exp;
+ struct qsd_qtype_info *aa_qqi;
+ void *aa_arg;
+ union ldlm_wire_lvb *aa_lvb;
+ struct lustre_handle aa_lockh;
+ qsd_req_completion_t aa_completion;
+};
+
+/*
+ * non-intent quota request interpret callback.
+ *
+ * \param env - the environment passed by the caller
+ * \param req - the non-intent quota request
+ * \param arg - qsd_async_args
+ * \param rc - request status
+ *
+ * \retval 0 - success
+ * \retval -ve - appropriate errors
+ */
+static int qsd_dqacq_interpret(const struct lu_env *env,
+ struct ptlrpc_request *req, void *arg, int rc)
+{
+ struct quota_body *rep_qbody = NULL, *req_qbody;
+ struct qsd_async_args *aa = (struct qsd_async_args *)arg;
+ ENTRY;
+
+ req_qbody = req_capsule_client_get(&req->rq_pill, &RMF_QUOTA_BODY);
+ if (rc == 0 || rc == -EDQUOT || rc == -EINPROGRESS)
+ rep_qbody = req_capsule_server_get(&req->rq_pill,
+ &RMF_QUOTA_BODY);
+ aa->aa_completion(env, aa->aa_qqi, req_qbody, rep_qbody, &aa->aa_lockh,
+ NULL, aa->aa_arg, rc);
+ RETURN(rc);
+}
+
+/*
+ * Send non-intent quota request to master.
+ *
+ * \param env - the environment passed by the caller
+ * \param exp - is the export to use to send the acquire RPC
+ * \param qbody - quota body to be packed in request
+ * \param sync - synchronous or asynchronous
+ * \param completion - completion callback
+ * \param qqi - is the qsd_qtype_info structure to pass to the completion
+ * function
+ * \param lqe - is the qid entry to be processed
+ *
+ * \retval 0 - success
+ * \retval -ve - appropriate errors
+ */
+int qsd_send_dqacq(const struct lu_env *env, struct obd_export *exp,
+ struct quota_body *qbody, bool sync,
+ qsd_req_completion_t completion, struct qsd_qtype_info *qqi,
+ struct lustre_handle *lockh, struct lquota_entry *lqe)
+{
+ struct ptlrpc_request *req;
+ struct quota_body *req_qbody;
+ struct qsd_async_args *aa;
+ int rc;
+ ENTRY;
+
+ LASSERT(exp);
+
+ req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_QUOTA_DQACQ);
+ if (req == NULL)
+ GOTO(out, rc = -ENOMEM);
+
+ req->rq_no_resend = req->rq_no_delay = 1;
+ rc = ptlrpc_request_pack(req, LUSTRE_MDS_VERSION, QUOTA_DQACQ);
+ if (rc) {
+ ptlrpc_request_free(req);
+ GOTO(out, rc);
+ }
+
+ req_qbody = req_capsule_client_get(&req->rq_pill, &RMF_QUOTA_BODY);
+ *req_qbody = *qbody;
+
+ ptlrpc_request_set_replen(req);
+
+ CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
+ aa = ptlrpc_req_async_args(req);
+ aa->aa_exp = exp;
+ aa->aa_qqi = qqi;
+ aa->aa_arg = (void *)lqe;
+ aa->aa_completion = completion;
+ lustre_handle_copy(&aa->aa_lockh, lockh);
+
+ if (sync) {
+ rc = ptlrpc_queue_wait(req);
+ rc = qsd_dqacq_interpret(env, req, aa, rc);
+ ptlrpc_req_finished(req);
+ } else {
+ req->rq_interpret_reply = qsd_dqacq_interpret;
+ ptlrpcd_add_req(req, PDL_POLICY_LOCAL, -1);
+ }
+
+ RETURN(rc);
+out:
+ completion(env, qqi, qbody, NULL, lockh, NULL, lqe, rc);
+ return rc;
+}
+
+/*
+ * intent quota request interpret callback.
+ *
+ * \param env - the environment passed by the caller
+ * \param req - the intent quota request
+ * \param arg - qsd_async_args
+ * \param rc - request status
+ *
+ * \retval 0 - success
+ * \retval -ve - appropriate errors
+ */
+static int qsd_intent_interpret(const struct lu_env *env,
+ struct ptlrpc_request *req, void *arg, int rc)
+{
+ struct lustre_handle *lockh;
+ struct quota_body *rep_qbody = NULL, *req_qbody;
+ struct ldlm_intent *lit;
+ struct qsd_async_args *aa = (struct qsd_async_args *)arg;
+ int flags = LDLM_FL_HAS_INTENT;
+ ENTRY;
+
+ LASSERT(aa->aa_exp);
+ lockh = &aa->aa_lockh;
+ req_qbody = req_capsule_client_get(&req->rq_pill, &RMF_QUOTA_BODY);
+ lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
+
+ rc = ldlm_cli_enqueue_fini(aa->aa_exp, req, LDLM_PLAIN, 0, LCK_CR,
+ &flags, (void *)aa->aa_lvb,
+ sizeof(union ldlm_wire_lvb), lockh, rc);
+ if (rc < 0)
+ /* the lock has been destroyed, forget about the lock handle */
+ memset(lockh, 0, sizeof(*lockh));
+
+ if (rc == 0 || rc == -EDQUOT || rc == -EINPROGRESS)
+ rep_qbody = req_capsule_server_get(&req->rq_pill,
+ &RMF_QUOTA_BODY);
+
+ aa->aa_completion(env, aa->aa_qqi, req_qbody, rep_qbody, lockh,
+ aa->aa_lvb, aa->aa_arg, rc);
+ RETURN(rc);
+}
+
+/* XXX to be removed when the qsd lock landed */
+struct ldlm_enqueue_info qsd_glb_einfo;
+struct ldlm_enqueue_info qsd_id_einfo;
+
+/*
+ * Get intent per-ID lock or global-index lock from master.
+ *
+ * \param env - the environment passed by the caller
+ * \param exp - is the export to use to send the intent RPC
+ * \param qbody - quota body to be packed in request
+ * \param sync - synchronous or asynchronous (pre-acquire)
+ * \param it_op - IT_QUOTA_DQACQ or IT_QUOTA_CONN
+ * \param completion - completion callback
+ * \param qqi - is the qsd_qtype_info structure to pass to the completion
+ * function
+ * \param lvb - is the lvb associated with the lock and returned by the
+ * server
+ * \param arg - is an opaq argument passed to the completion callback
+ *
+ * \retval 0 - success
+ * \retval -ve - appropriate errors
+ */
+int qsd_intent_lock(const struct lu_env *env, struct obd_export *exp,
+ struct quota_body *qbody, bool sync, int it_op,
+ qsd_req_completion_t completion, struct qsd_qtype_info *qqi,
+ union ldlm_wire_lvb *lvb, void *arg)
+{
+ struct qsd_thread_info *qti = qsd_info(env);
+ struct ptlrpc_request *req;
+ struct qsd_async_args *aa = NULL;
+ struct ldlm_intent *lit;
+ struct quota_body *req_qbody;
+ int rc, flags = LDLM_FL_HAS_INTENT;
+ ENTRY;
+
+ LASSERT(exp != NULL);
+ LASSERT(!lustre_handle_is_used(&qbody->qb_lockh));
+
+ memset(&qti->qti_lockh, 0, sizeof(qti->qti_lockh));
+
+ req = ptlrpc_request_alloc(class_exp2cliimp(exp),
+ &RQF_LDLM_INTENT_QUOTA);
+ if (req == NULL)
+ GOTO(out, rc = -ENOMEM);
+
+ req->rq_no_resend = req->rq_no_delay = 1;
+ rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
+ if (rc) {
+ ptlrpc_request_free(req);
+ GOTO(out, rc);
+ }
+
+ lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
+ lit->opc = (__u64)it_op;
+
+ req_qbody = req_capsule_client_get(&req->rq_pill, &RMF_QUOTA_BODY);
+ *req_qbody = *qbody;
+
+ ptlrpc_request_set_replen(req);
+
+ switch(it_op) {
+ case IT_QUOTA_CONN:
+ /* build resource name associated with global index */
+ fid_build_reg_res_name(&qbody->qb_fid, &qti->qti_resid);
+
+ /* copy einfo template and fill ei_cbdata with qqi pointer */
+ memcpy(&qti->qti_einfo, &qsd_glb_einfo, sizeof(qti->qti_einfo));
+ qti->qti_einfo.ei_cbdata = qqi;
+
+ /* don't cancel global lock on memory pressure */
+ flags |= LDLM_FL_NO_LRU;
+ break;
+ case IT_QUOTA_DQACQ:
+ /* build resource name associated for per-ID quota lock */
+ fid_build_quota_resid(&qbody->qb_fid, &qbody->qb_id,
+ &qti->qti_resid);
+
+ /* copy einfo template and fill ei_cbdata with lqe pointer */
+ memcpy(&qti->qti_einfo, &qsd_id_einfo, sizeof(qti->qti_einfo));
+ qti->qti_einfo.ei_cbdata = arg;
+ break;
+ default:
+ LASSERTF(0, "invalid it_op %d", it_op);
+ }
+
+ /* build lock enqueue request */
+ rc = ldlm_cli_enqueue(exp, &req, &qti->qti_einfo, &qti->qti_resid, NULL,
+ &flags, (void *)lvb, sizeof(*lvb), &qti->qti_lockh,
+ 1);
+ if (rc < 0) {
+ ptlrpc_req_finished(req);
+ GOTO(out, rc);
+ }
+
+ /* grab reference on backend structure for the new lock */
+ switch(it_op) {
+ case IT_QUOTA_CONN:
+ /* grab reference on qqi for new lock */
+#ifdef USE_LU_REF
+ struct ldlm_lock *lock;
+
+ lock = ldlm_handle2lock(&qti->qti_lockh);
+ if (lock == NULL) {
+ ptlrpc_req_finished(req);
+ GOTO(out, -ENOLCK);
+ }
+ lu_ref_add(&qqi->qqi_reference, "glb_lock", lock);
+ LDLM_LOCK_PUT(lock);
+#endif
+ qqi_getref(qqi);
+ break;
+ case IT_QUOTA_DQACQ:
+ /* grab reference on lqe for new lock */
+ lqe_getref((struct lquota_entry *)arg);
+ break;
+ default:
+ break;
+ }
+
+ CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
+ aa = ptlrpc_req_async_args(req);
+ aa->aa_exp = exp;
+ aa->aa_qqi = qqi;
+ aa->aa_arg = arg;
+ aa->aa_lvb = lvb;
+ aa->aa_completion = completion;
+ lustre_handle_copy(&aa->aa_lockh, &qti->qti_lockh);
+
+ if (sync) {
+ /* send lock enqueue request and wait for completion */
+ rc = ptlrpc_queue_wait(req);
+ rc = qsd_intent_interpret(env, req, aa, rc);
+ ptlrpc_req_finished(req);
+ } else {
+ /* queue lock request and return */
+ req->rq_interpret_reply = qsd_intent_interpret;
+ ptlrpcd_add_req(req, PDL_POLICY_LOCAL, -1);
+ }
+
+ RETURN(rc);
+out:
+ completion(env, qqi, qbody, NULL, &qti->qti_lockh, NULL, arg, rc);
+ return rc;
+}
+
+/*
+ * Fetch a global or slave index from the QMT.
+ *
+ * \param env - the environment passed by the caller
+ * \param exp - is the export to use to issue the OBD_IDX_READ RPC
+ * \param ii - is the index information to be packed in the request
+ * on success, the index information returned by the server
+ * is copied there.
+ * \param npages - is the number of pages in the pages array
+ * \param pages - is an array of @npages pages
+ *
+ * \retval 0 - success
+ * \retval -ve - appropriate errors
+ */
+int qsd_fetch_index(const struct lu_env *env, struct obd_export *exp,
+ struct idx_info *ii, unsigned int npages,
+ cfs_page_t **pages, bool *need_swab)
+{
+ struct ptlrpc_request *req;
+ struct idx_info *req_ii;
+ struct ptlrpc_bulk_desc *desc;
+ int rc, i;
+ ENTRY;
+
+ LASSERT(exp);
+
+ req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OBD_IDX_READ);
+ if (req == NULL)
+ RETURN(-ENOMEM);
+
+ rc = ptlrpc_request_pack(req, LUSTRE_OBD_VERSION, OBD_IDX_READ);
+ if (rc) {
+ ptlrpc_request_free(req);
+ RETURN(rc);
+ }
+
+ req->rq_request_portal = MDS_READPAGE_PORTAL;
+ ptlrpc_at_set_req_timeout(req);
+
+ /* allocate bulk descriptor */
+ desc = ptlrpc_prep_bulk_imp(req, npages, BULK_PUT_SINK,
+ MDS_BULK_PORTAL);
+ if (desc == NULL) {
+ ptlrpc_request_free(req);
+ RETURN(-ENOMEM);
+ }
+
+ /* req now owns desc and will free it when it gets freed */
+ for (i = 0; i < npages; i++)
+ ptlrpc_prep_bulk_page(desc, pages[i], 0, CFS_PAGE_SIZE);
+
+ /* pack index information in request */
+ req_ii = req_capsule_client_get(&req->rq_pill, &RMF_IDX_INFO);
+ *req_ii = *ii;
+
+ ptlrpc_request_set_replen(req);
+
+ /* send request to master and wait for RPC to complete */
+ rc = ptlrpc_queue_wait(req);
+ if (rc)
+ GOTO(out, rc);
+
+ rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk,
+ req->rq_bulk->bd_nob_transferred);
+ if (rc < 0)
+ GOTO(out, rc);
+
+ req_ii = req_capsule_server_get(&req->rq_pill, &RMF_IDX_INFO);
+ *ii = *req_ii;
+
+ *need_swab = ptlrpc_rep_need_swab(req);
+
+ EXIT;
+out:
+ ptlrpc_req_finished(req);
+ return rc;
+}