Whamcloud - gitweb
LU-6158 mdt: always shrink_capsule in getxattr_all
[fs/lustre-release.git] / lustre / quota / qsd_request.c
index a0c830c..8c13ff5 100644 (file)
  * GPL HEADER END
  */
 /*
- * Copyright (c) 2012 Intel, Inc.
+ * Copyright (c) 2012, 2013, Intel Corporation.
  * Use is subject to license terms.
  *
  * Author: Johann Lombardi <johann.lombardi@intel.com>
  * Author: Niu    Yawei    <yawei.niu@intel.com>
  */
 
-#ifndef EXPORT_SYMTAB
-# define EXPORT_SYMTAB
-#endif
-
 #define DEBUG_SUBSYSTEM S_LQUOTA
 
 #include <lustre_net.h>
@@ -45,7 +41,7 @@ struct qsd_async_args {
        struct obd_export     *aa_exp;
        struct qsd_qtype_info *aa_qqi;
        void                  *aa_arg;
-       union ldlm_wire_lvb   *aa_lvb;
+       struct lquota_lvb     *aa_lvb;
        struct lustre_handle   aa_lockh;
        qsd_req_completion_t   aa_completion;
 };
@@ -110,12 +106,14 @@ int qsd_send_dqacq(const struct lu_env *env, struct obd_export *exp,
                GOTO(out, rc = -ENOMEM);
 
        req->rq_no_resend = req->rq_no_delay = 1;
+       req->rq_no_retry_einprogress = 1;
        rc = ptlrpc_request_pack(req, LUSTRE_MDS_VERSION, QUOTA_DQACQ);
        if (rc) {
                ptlrpc_request_free(req);
                GOTO(out, rc);
        }
 
+       req->rq_request_portal = MDS_READPAGE_PORTAL;
        req_qbody = req_capsule_client_get(&req->rq_pill, &RMF_QUOTA_BODY);
        *req_qbody = *qbody;
 
@@ -135,7 +133,7 @@ int qsd_send_dqacq(const struct lu_env *env, struct obd_export *exp,
                ptlrpc_req_finished(req);
        } else {
                req->rq_interpret_reply = qsd_dqacq_interpret;
-               ptlrpcd_add_req(req, PDL_POLICY_LOCAL, -1);
+               ptlrpcd_add_req(req);
        }
 
        RETURN(rc);
@@ -162,7 +160,7 @@ static int qsd_intent_interpret(const struct lu_env *env,
        struct quota_body        *rep_qbody = NULL, *req_qbody;
        struct ldlm_intent       *lit;
        struct qsd_async_args    *aa = (struct qsd_async_args *)arg;
-       int                       flags = LDLM_FL_HAS_INTENT;
+       __u64                     flags = LDLM_FL_HAS_INTENT;
        ENTRY;
 
        LASSERT(aa->aa_exp);
@@ -172,7 +170,7 @@ static int qsd_intent_interpret(const struct lu_env *env,
 
        rc = ldlm_cli_enqueue_fini(aa->aa_exp, req, LDLM_PLAIN, 0, LCK_CR,
                                   &flags, (void *)aa->aa_lvb,
-                                  sizeof(union ldlm_wire_lvb), lockh, rc);
+                                  sizeof(struct lquota_lvb), lockh, rc);
        if (rc < 0)
                /* the lock has been destroyed, forget about the lock handle */
                memset(lockh, 0, sizeof(*lockh));
@@ -207,14 +205,15 @@ static int qsd_intent_interpret(const struct lu_env *env,
 int qsd_intent_lock(const struct lu_env *env, struct obd_export *exp,
                    struct quota_body *qbody, bool sync, int it_op,
                    qsd_req_completion_t completion, struct qsd_qtype_info *qqi,
-                   union ldlm_wire_lvb *lvb, void *arg)
+                   struct lquota_lvb *lvb, void *arg)
 {
        struct qsd_thread_info  *qti = qsd_info(env);
        struct ptlrpc_request   *req;
        struct qsd_async_args   *aa = NULL;
        struct ldlm_intent      *lit;
        struct quota_body       *req_qbody;
-       int                      rc, flags = LDLM_FL_HAS_INTENT;
+       __u64                    flags = LDLM_FL_HAS_INTENT;
+       int                      rc;
        ENTRY;
 
        LASSERT(exp != NULL);
@@ -227,12 +226,13 @@ int qsd_intent_lock(const struct lu_env *env, struct obd_export *exp,
        if (req == NULL)
                GOTO(out, rc = -ENOMEM);
 
-       req->rq_no_resend = req->rq_no_delay = 1;
+       req->rq_no_retry_einprogress = 1;
        rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
        if (rc) {
                ptlrpc_request_free(req);
                GOTO(out, rc);
        }
+       req->rq_request_portal = MDS_READPAGE_PORTAL;
 
        lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
        lit->opc = (__u64)it_op;
@@ -240,6 +240,8 @@ int qsd_intent_lock(const struct lu_env *env, struct obd_export *exp,
        req_qbody = req_capsule_client_get(&req->rq_pill, &RMF_QUOTA_BODY);
        *req_qbody = *qbody;
 
+       req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
+                            sizeof(*lvb));
        ptlrpc_request_set_replen(req);
 
        switch(it_op) {
@@ -256,21 +258,21 @@ int qsd_intent_lock(const struct lu_env *env, struct obd_export *exp,
                break;
        case IT_QUOTA_DQACQ:
                /* build resource name associated for per-ID quota lock */
-               fid_build_quota_resid(&qbody->qb_fid, &qbody->qb_id,
-                                     &qti->qti_resid);
+               fid_build_quota_res_name(&qbody->qb_fid, &qbody->qb_id,
+                                        &qti->qti_resid);
 
                /* copy einfo template and fill ei_cbdata with lqe pointer */
                memcpy(&qti->qti_einfo, &qsd_id_einfo, sizeof(qti->qti_einfo));
                qti->qti_einfo.ei_cbdata = arg;
                break;
        default:
-               LASSERTF(0, "invalid it_op %d", it_op);
+               LASSERTF(0, "invalid it_op %d\n", it_op);
        }
 
        /* build lock enqueue request */
        rc = ldlm_cli_enqueue(exp, &req, &qti->qti_einfo, &qti->qti_resid, NULL,
-                             &flags, (void *)lvb, sizeof(*lvb), &qti->qti_lockh,
-                             1);
+                             &flags, (void *)lvb, sizeof(*lvb), LVB_T_LQUOTA,
+                             &qti->qti_lockh, 1);
        if (rc < 0) {
                ptlrpc_req_finished(req);
                GOTO(out, rc);
@@ -281,21 +283,26 @@ int qsd_intent_lock(const struct lu_env *env, struct obd_export *exp,
        case IT_QUOTA_CONN:
                /* grab reference on qqi for new lock */
 #ifdef USE_LU_REF
+       {
                struct ldlm_lock        *lock;
 
                lock = ldlm_handle2lock(&qti->qti_lockh);
                if (lock == NULL) {
                        ptlrpc_req_finished(req);
-                       GOTO(out, -ENOLCK);
+                       GOTO(out, rc = -ENOLCK);
                }
                lu_ref_add(&qqi->qqi_reference, "glb_lock", lock);
                LDLM_LOCK_PUT(lock);
+       }
 #endif
                qqi_getref(qqi);
                break;
        case IT_QUOTA_DQACQ:
                /* grab reference on lqe for new lock */
                lqe_getref((struct lquota_entry *)arg);
+               /* all acquire/release request are sent with no_resend and
+                * no_delay flag */
+               req->rq_no_resend = req->rq_no_delay = 1;
                break;
        default:
                break;
@@ -318,12 +325,12 @@ int qsd_intent_lock(const struct lu_env *env, struct obd_export *exp,
        } else {
                /* queue lock request and return */
                req->rq_interpret_reply = qsd_intent_interpret;
-               ptlrpcd_add_req(req, PDL_POLICY_LOCAL, -1);
+               ptlrpcd_add_req(req);
        }
 
        RETURN(rc);
 out:
-       completion(env, qqi, qbody, NULL, &qti->qti_lockh, NULL, arg, rc);
+       completion(env, qqi, qbody, NULL, &qti->qti_lockh, lvb, arg, rc);
        return rc;
 }
 
@@ -343,7 +350,7 @@ out:
  */
 int qsd_fetch_index(const struct lu_env *env, struct obd_export *exp,
                    struct idx_info *ii, unsigned int npages,
-                   cfs_page_t **pages, bool *need_swab)
+                   struct page **pages, bool *need_swab)
 {
        struct ptlrpc_request   *req;
        struct idx_info         *req_ii;
@@ -367,8 +374,10 @@ int qsd_fetch_index(const struct lu_env *env, struct obd_export *exp,
        ptlrpc_at_set_req_timeout(req);
 
        /* allocate bulk descriptor */
-       desc = ptlrpc_prep_bulk_imp(req, npages, BULK_PUT_SINK,
-                                   MDS_BULK_PORTAL);
+       desc = ptlrpc_prep_bulk_imp(req, npages, 1,
+                                   PTLRPC_BULK_PUT_SINK | PTLRPC_BULK_BUF_KIOV,
+                                   MDS_BULK_PORTAL,
+                                   &ptlrpc_bulk_kiov_pin_ops);
        if (desc == NULL) {
                ptlrpc_request_free(req);
                RETURN(-ENOMEM);
@@ -376,7 +385,8 @@ int qsd_fetch_index(const struct lu_env *env, struct obd_export *exp,
 
        /* req now owns desc and will free it when it gets freed */
        for (i = 0; i < npages; i++)
-               ptlrpc_prep_bulk_page(desc, pages[i], 0, CFS_PAGE_SIZE);
+               desc->bd_frag_ops->add_kiov_frag(desc, pages[i], 0,
+                                                PAGE_CACHE_SIZE);
 
        /* pack index information in request */
        req_ii = req_capsule_client_get(&req->rq_pill, &RMF_IDX_INFO);
@@ -393,6 +403,10 @@ int qsd_fetch_index(const struct lu_env *env, struct obd_export *exp,
                                          req->rq_bulk->bd_nob_transferred);
        if (rc < 0)
                GOTO(out, rc);
+       else
+               /* sptlrpc_cli_unwrap_bulk_read() returns the number of bytes
+                * transferred*/
+               rc = 0;
 
        req_ii = req_capsule_server_get(&req->rq_pill, &RMF_IDX_INFO);
        *ii = *req_ii;