Whamcloud - gitweb
LU-14462 gss: fix support for namespace in lgss_keyring
[fs/lustre-release.git] / lustre / quota / qsd_request.c
index 2db0433..8f451f8 100644 (file)
  * GPL HEADER END
  */
 /*
- * Copyright (c) 2012, 2013, Intel Corporation.
+ * Copyright (c) 2012, 2016, Intel Corporation.
  * Use is subject to license terms.
  *
  * Author: Johann Lombardi <johann.lombardi@intel.com>
  * Author: Niu    Yawei    <yawei.niu@intel.com>
  */
 
-#ifndef EXPORT_SYMTAB
-# define EXPORT_SYMTAB
-#endif
-
 #define DEBUG_SUBSYSTEM S_LQUOTA
 
 #include <lustre_net.h>
@@ -117,13 +113,13 @@ int qsd_send_dqacq(const struct lu_env *env, struct obd_export *exp,
                GOTO(out, rc);
        }
 
+       req->rq_request_portal = MDS_READPAGE_PORTAL;
        req_qbody = req_capsule_client_get(&req->rq_pill, &RMF_QUOTA_BODY);
        *req_qbody = *qbody;
 
        ptlrpc_request_set_replen(req);
 
-       CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
-       aa = ptlrpc_req_async_args(req);
+       aa = ptlrpc_req_async_args(aa, req);
        aa->aa_exp = exp;
        aa->aa_qqi = qqi;
        aa->aa_arg = (void *)lqe;
@@ -136,7 +132,7 @@ int qsd_send_dqacq(const struct lu_env *env, struct obd_export *exp,
                ptlrpc_req_finished(req);
        } else {
                req->rq_interpret_reply = qsd_dqacq_interpret;
-               ptlrpcd_add_req(req, PDL_POLICY_LOCAL, -1);
+               ptlrpcd_add_req(req);
        }
 
        RETURN(rc);
@@ -161,27 +157,45 @@ static int qsd_intent_interpret(const struct lu_env *env,
 {
        struct lustre_handle     *lockh;
        struct quota_body        *rep_qbody = NULL, *req_qbody;
-       struct ldlm_intent       *lit;
        struct qsd_async_args    *aa = (struct qsd_async_args *)arg;
+       struct ldlm_reply        *lockrep;
        __u64                     flags = LDLM_FL_HAS_INTENT;
+       struct ldlm_enqueue_info  einfo = {
+               .ei_type = LDLM_PLAIN,
+               .ei_mode = LCK_CR,
+       };
        ENTRY;
 
        LASSERT(aa->aa_exp);
        lockh = &aa->aa_lockh;
        req_qbody = req_capsule_client_get(&req->rq_pill, &RMF_QUOTA_BODY);
-       lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
+       req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
 
-       rc = ldlm_cli_enqueue_fini(aa->aa_exp, req, LDLM_PLAIN, 0, LCK_CR,
-                                  &flags, (void *)aa->aa_lvb,
-                                  sizeof(struct lquota_lvb), lockh, rc);
-       if (rc < 0)
+       rc = ldlm_cli_enqueue_fini(aa->aa_exp, req, &einfo, 0, &flags,
+                                  aa->aa_lvb, sizeof(*(aa->aa_lvb)),
+                                  lockh, rc);
+       if (rc < 0) {
                /* the lock has been destroyed, forget about the lock handle */
                memset(lockh, 0, sizeof(*lockh));
+               /*
+                * To avoid the server being fullfilled by LDLM locks, server
+                * may reject the locking request by returning -EINPROGRESS,
+                * this is different from the -EINPROGRESS returned by quota
+                * code.
+                */
+               if (rc == -EINPROGRESS)
+                       rc = -EAGAIN;
+               GOTO(out, rc);
+       }
+
+       lockrep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
+       LASSERT(lockrep != NULL);
+       rc = ptlrpc_status_ntoh(lockrep->lock_policy_res2);
 
        if (rc == 0 || rc == -EDQUOT || rc == -EINPROGRESS)
                rep_qbody = req_capsule_server_get(&req->rq_pill,
                                                   &RMF_QUOTA_BODY);
-
+out:
        aa->aa_completion(env, aa->aa_qqi, req_qbody, rep_qbody, lockh,
                          aa->aa_lvb, aa->aa_arg, rc);
        RETURN(rc);
@@ -235,6 +249,7 @@ int qsd_intent_lock(const struct lu_env *env, struct obd_export *exp,
                ptlrpc_request_free(req);
                GOTO(out, rc);
        }
+       req->rq_request_portal = MDS_READPAGE_PORTAL;
 
        lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
        lit->opc = (__u64)it_op;
@@ -268,7 +283,7 @@ int qsd_intent_lock(const struct lu_env *env, struct obd_export *exp,
                qti->qti_einfo.ei_cbdata = arg;
                break;
        default:
-               LASSERTF(0, "invalid it_op %d", it_op);
+               LASSERTF(0, "invalid it_op %d\n", it_op);
        }
 
        /* build lock enqueue request */
@@ -291,7 +306,7 @@ int qsd_intent_lock(const struct lu_env *env, struct obd_export *exp,
                lock = ldlm_handle2lock(&qti->qti_lockh);
                if (lock == NULL) {
                        ptlrpc_req_finished(req);
-                       GOTO(out, -ENOLCK);
+                       GOTO(out, rc = -ENOLCK);
                }
                lu_ref_add(&qqi->qqi_reference, "glb_lock", lock);
                LDLM_LOCK_PUT(lock);
@@ -310,8 +325,7 @@ int qsd_intent_lock(const struct lu_env *env, struct obd_export *exp,
                break;
        }
 
-       CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
-       aa = ptlrpc_req_async_args(req);
+       aa = ptlrpc_req_async_args(aa, req);
        aa->aa_exp = exp;
        aa->aa_qqi = qqi;
        aa->aa_arg = arg;
@@ -327,7 +341,7 @@ int qsd_intent_lock(const struct lu_env *env, struct obd_export *exp,
        } else {
                /* queue lock request and return */
                req->rq_interpret_reply = qsd_intent_interpret;
-               ptlrpcd_add_req(req, PDL_POLICY_LOCAL, -1);
+               ptlrpcd_add_req(req);
        }
 
        RETURN(rc);
@@ -376,16 +390,17 @@ int qsd_fetch_index(const struct lu_env *env, struct obd_export *exp,
        ptlrpc_at_set_req_timeout(req);
 
        /* allocate bulk descriptor */
-       desc = ptlrpc_prep_bulk_imp(req, npages, 1, BULK_PUT_SINK,
-                                   MDS_BULK_PORTAL);
-       if (desc == NULL) {
-               ptlrpc_request_free(req);
-               RETURN(-ENOMEM);
-       }
+       desc = ptlrpc_prep_bulk_imp(req, npages, 1,
+                                   PTLRPC_BULK_PUT_SINK,
+                                   MDS_BULK_PORTAL,
+                                   &ptlrpc_bulk_kiov_pin_ops);
+       if (desc == NULL)
+               GOTO(out, rc = -ENOMEM);
 
        /* req now owns desc and will free it when it gets freed */
        for (i = 0; i < npages; i++)
-               ptlrpc_prep_bulk_page_pin(desc, pages[i], 0, PAGE_CACHE_SIZE);
+               desc->bd_frag_ops->add_kiov_frag(desc, pages[i], 0,
+                                                PAGE_SIZE);
 
        /* pack index information in request */
        req_ii = req_capsule_client_get(&req->rq_pill, &RMF_IDX_INFO);