* GPL HEADER END
*/
/*
- * Copyright (c) 2012 Intel, Inc.
+ * Copyright (c) 2012, 2016, Intel Corporation.
* Use is subject to license terms.
*
- * Author: Johann Lombardi <johann@whamcloud.com>
- * Author: Niu Yawei <niu@whamcloud.com>
+ * Author: Johann Lombardi <johann.lombardi@intel.com>
+ * Author: Niu Yawei <yawei.niu@intel.com>
*/
-#ifndef EXPORT_SYMTAB
-# define EXPORT_SYMTAB
-#endif
-
#define DEBUG_SUBSYSTEM S_LQUOTA
#include <lustre_net.h>
struct obd_export *aa_exp;
struct qsd_qtype_info *aa_qqi;
void *aa_arg;
- union ldlm_wire_lvb *aa_lvb;
+ struct lquota_lvb *aa_lvb;
struct lustre_handle aa_lockh;
qsd_req_completion_t aa_completion;
};
GOTO(out, rc = -ENOMEM);
req->rq_no_resend = req->rq_no_delay = 1;
+ req->rq_no_retry_einprogress = 1;
rc = ptlrpc_request_pack(req, LUSTRE_MDS_VERSION, QUOTA_DQACQ);
if (rc) {
ptlrpc_request_free(req);
GOTO(out, rc);
}
+ req->rq_request_portal = MDS_READPAGE_PORTAL;
req_qbody = req_capsule_client_get(&req->rq_pill, &RMF_QUOTA_BODY);
*req_qbody = *qbody;
ptlrpc_req_finished(req);
} else {
req->rq_interpret_reply = qsd_dqacq_interpret;
- ptlrpcd_add_req(req, PDL_POLICY_LOCAL, -1);
+ ptlrpcd_add_req(req);
}
RETURN(rc);
{
struct lustre_handle *lockh;
struct quota_body *rep_qbody = NULL, *req_qbody;
- struct ldlm_intent *lit;
struct qsd_async_args *aa = (struct qsd_async_args *)arg;
- int flags = LDLM_FL_HAS_INTENT;
+ struct ldlm_reply *lockrep;
+ __u64 flags = LDLM_FL_HAS_INTENT;
ENTRY;
LASSERT(aa->aa_exp);
lockh = &aa->aa_lockh;
req_qbody = req_capsule_client_get(&req->rq_pill, &RMF_QUOTA_BODY);
- lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
+ req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
rc = ldlm_cli_enqueue_fini(aa->aa_exp, req, LDLM_PLAIN, 0, LCK_CR,
&flags, (void *)aa->aa_lvb,
- sizeof(union ldlm_wire_lvb), lockh, rc);
- if (rc < 0)
+ sizeof(struct lquota_lvb), lockh, rc);
+ if (rc < 0) {
/* the lock has been destroyed, forget about the lock handle */
memset(lockh, 0, sizeof(*lockh));
+ /*
+ * To avoid the server being fullfilled by LDLM locks, server
+ * may reject the locking request by returning -EINPROGRESS,
+ * this is different from the -EINPROGRESS returned by quota
+ * code.
+ */
+ if (rc == -EINPROGRESS)
+ rc = -EAGAIN;
+ GOTO(out, rc);
+ }
+
+ lockrep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
+ LASSERT(lockrep != NULL);
+ rc = ptlrpc_status_ntoh(lockrep->lock_policy_res2);
if (rc == 0 || rc == -EDQUOT || rc == -EINPROGRESS)
rep_qbody = req_capsule_server_get(&req->rq_pill,
&RMF_QUOTA_BODY);
-
+out:
aa->aa_completion(env, aa->aa_qqi, req_qbody, rep_qbody, lockh,
aa->aa_lvb, aa->aa_arg, rc);
RETURN(rc);
int qsd_intent_lock(const struct lu_env *env, struct obd_export *exp,
struct quota_body *qbody, bool sync, int it_op,
qsd_req_completion_t completion, struct qsd_qtype_info *qqi,
- union ldlm_wire_lvb *lvb, void *arg)
+ struct lquota_lvb *lvb, void *arg)
{
struct qsd_thread_info *qti = qsd_info(env);
struct ptlrpc_request *req;
struct qsd_async_args *aa = NULL;
struct ldlm_intent *lit;
struct quota_body *req_qbody;
- int rc, flags = LDLM_FL_HAS_INTENT;
+ __u64 flags = LDLM_FL_HAS_INTENT;
+ int rc;
ENTRY;
LASSERT(exp != NULL);
if (req == NULL)
GOTO(out, rc = -ENOMEM);
- req->rq_no_resend = req->rq_no_delay = 1;
+ req->rq_no_retry_einprogress = 1;
rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
if (rc) {
ptlrpc_request_free(req);
GOTO(out, rc);
}
+ req->rq_request_portal = MDS_READPAGE_PORTAL;
lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
lit->opc = (__u64)it_op;
req_qbody = req_capsule_client_get(&req->rq_pill, &RMF_QUOTA_BODY);
*req_qbody = *qbody;
+ req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
+ sizeof(*lvb));
ptlrpc_request_set_replen(req);
switch(it_op) {
break;
case IT_QUOTA_DQACQ:
/* build resource name associated for per-ID quota lock */
- fid_build_quota_resid(&qbody->qb_fid, &qbody->qb_id,
- &qti->qti_resid);
+ fid_build_quota_res_name(&qbody->qb_fid, &qbody->qb_id,
+ &qti->qti_resid);
/* copy einfo template and fill ei_cbdata with lqe pointer */
memcpy(&qti->qti_einfo, &qsd_id_einfo, sizeof(qti->qti_einfo));
qti->qti_einfo.ei_cbdata = arg;
break;
default:
- LASSERTF(0, "invalid it_op %d", it_op);
+ LASSERTF(0, "invalid it_op %d\n", it_op);
}
/* build lock enqueue request */
rc = ldlm_cli_enqueue(exp, &req, &qti->qti_einfo, &qti->qti_resid, NULL,
- &flags, (void *)lvb, sizeof(*lvb), &qti->qti_lockh,
- 1);
+ &flags, (void *)lvb, sizeof(*lvb), LVB_T_LQUOTA,
+ &qti->qti_lockh, 1);
if (rc < 0) {
ptlrpc_req_finished(req);
GOTO(out, rc);
case IT_QUOTA_CONN:
/* grab reference on qqi for new lock */
#ifdef USE_LU_REF
+ {
struct ldlm_lock *lock;
lock = ldlm_handle2lock(&qti->qti_lockh);
if (lock == NULL) {
ptlrpc_req_finished(req);
- GOTO(out, -ENOLCK);
+ GOTO(out, rc = -ENOLCK);
}
lu_ref_add(&qqi->qqi_reference, "glb_lock", lock);
LDLM_LOCK_PUT(lock);
+ }
#endif
qqi_getref(qqi);
break;
case IT_QUOTA_DQACQ:
/* grab reference on lqe for new lock */
lqe_getref((struct lquota_entry *)arg);
+ /* all acquire/release request are sent with no_resend and
+ * no_delay flag */
+ req->rq_no_resend = req->rq_no_delay = 1;
break;
default:
break;
} else {
/* queue lock request and return */
req->rq_interpret_reply = qsd_intent_interpret;
- ptlrpcd_add_req(req, PDL_POLICY_LOCAL, -1);
+ ptlrpcd_add_req(req);
}
RETURN(rc);
out:
- completion(env, qqi, qbody, NULL, &qti->qti_lockh, NULL, arg, rc);
+ completion(env, qqi, qbody, NULL, &qti->qti_lockh, lvb, arg, rc);
return rc;
}
*/
int qsd_fetch_index(const struct lu_env *env, struct obd_export *exp,
struct idx_info *ii, unsigned int npages,
- cfs_page_t **pages, bool *need_swab)
+ struct page **pages, bool *need_swab)
{
struct ptlrpc_request *req;
struct idx_info *req_ii;
ptlrpc_at_set_req_timeout(req);
/* allocate bulk descriptor */
- desc = ptlrpc_prep_bulk_imp(req, npages, BULK_PUT_SINK,
- MDS_BULK_PORTAL);
- if (desc == NULL) {
- ptlrpc_request_free(req);
- RETURN(-ENOMEM);
- }
+ desc = ptlrpc_prep_bulk_imp(req, npages, 1,
+ PTLRPC_BULK_PUT_SINK | PTLRPC_BULK_BUF_KIOV,
+ MDS_BULK_PORTAL,
+ &ptlrpc_bulk_kiov_pin_ops);
+ if (desc == NULL)
+ GOTO(out, rc = -ENOMEM);
/* req now owns desc and will free it when it gets freed */
for (i = 0; i < npages; i++)
- ptlrpc_prep_bulk_page(desc, pages[i], 0, CFS_PAGE_SIZE);
+ desc->bd_frag_ops->add_kiov_frag(desc, pages[i], 0,
+ PAGE_SIZE);
/* pack index information in request */
req_ii = req_capsule_client_get(&req->rq_pill, &RMF_IDX_INFO);
req->rq_bulk->bd_nob_transferred);
if (rc < 0)
GOTO(out, rc);
+ else
+ /* sptlrpc_cli_unwrap_bulk_read() returns the number of bytes
+ * transferred*/
+ rc = 0;
req_ii = req_capsule_server_get(&req->rq_pill, &RMF_IDX_INFO);
*ii = *req_ii;