X-Git-Url: https://git.whamcloud.com/?a=blobdiff_plain;f=lustre%2Fquota%2Fqsd_request.c;h=b4243ff1d50d2b9ab1a352071d0fee62d927f9f1;hb=e1bd38e27a810bad7a25813ebc1ca0535c9d7228;hp=57574a18aec513b801e042384fe8839f41946c9d;hpb=318fd8d197ff607a032dac6ed9cb15922e44a53f;p=fs%2Flustre-release.git diff --git a/lustre/quota/qsd_request.c b/lustre/quota/qsd_request.c index 57574a1..b4243ff 100644 --- a/lustre/quota/qsd_request.c +++ b/lustre/quota/qsd_request.c @@ -21,17 +21,13 @@ * GPL HEADER END */ /* - * Copyright (c) 2012 Intel, Inc. + * Copyright (c) 2012, 2016, Intel Corporation. * Use is subject to license terms. * - * Author: Johann Lombardi - * Author: Niu Yawei + * Author: Johann Lombardi + * Author: Niu Yawei */ -#ifndef EXPORT_SYMTAB -# define EXPORT_SYMTAB -#endif - #define DEBUG_SUBSYSTEM S_LQUOTA #include @@ -45,7 +41,7 @@ struct qsd_async_args { struct obd_export *aa_exp; struct qsd_qtype_info *aa_qqi; void *aa_arg; - union ldlm_wire_lvb *aa_lvb; + struct lquota_lvb *aa_lvb; struct lustre_handle aa_lockh; qsd_req_completion_t aa_completion; }; @@ -110,19 +106,20 @@ int qsd_send_dqacq(const struct lu_env *env, struct obd_export *exp, GOTO(out, rc = -ENOMEM); req->rq_no_resend = req->rq_no_delay = 1; + req->rq_no_retry_einprogress = 1; rc = ptlrpc_request_pack(req, LUSTRE_MDS_VERSION, QUOTA_DQACQ); if (rc) { ptlrpc_request_free(req); GOTO(out, rc); } + req->rq_request_portal = MDS_READPAGE_PORTAL; req_qbody = req_capsule_client_get(&req->rq_pill, &RMF_QUOTA_BODY); *req_qbody = *qbody; ptlrpc_request_set_replen(req); - CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args)); - aa = ptlrpc_req_async_args(req); + aa = ptlrpc_req_async_args(aa, req); aa->aa_exp = exp; aa->aa_qqi = qqi; aa->aa_arg = (void *)lqe; @@ -135,7 +132,7 @@ int qsd_send_dqacq(const struct lu_env *env, struct obd_export *exp, ptlrpc_req_finished(req); } else { req->rq_interpret_reply = qsd_dqacq_interpret; - ptlrpcd_add_req(req, PDL_POLICY_LOCAL, -1); + ptlrpcd_add_req(req); } RETURN(rc); @@ -160,27 +157,41 @@ static int qsd_intent_interpret(const struct lu_env *env, { struct lustre_handle *lockh; struct quota_body *rep_qbody = NULL, *req_qbody; - struct ldlm_intent *lit; struct qsd_async_args *aa = (struct qsd_async_args *)arg; - int flags = LDLM_FL_HAS_INTENT; + struct ldlm_reply *lockrep; + __u64 flags = LDLM_FL_HAS_INTENT; ENTRY; LASSERT(aa->aa_exp); lockh = &aa->aa_lockh; req_qbody = req_capsule_client_get(&req->rq_pill, &RMF_QUOTA_BODY); - lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT); + req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT); rc = ldlm_cli_enqueue_fini(aa->aa_exp, req, LDLM_PLAIN, 0, LCK_CR, &flags, (void *)aa->aa_lvb, - sizeof(union ldlm_wire_lvb), lockh, rc); - if (rc < 0) + sizeof(struct lquota_lvb), lockh, rc); + if (rc < 0) { /* the lock has been destroyed, forget about the lock handle */ memset(lockh, 0, sizeof(*lockh)); + /* + * To avoid the server being fullfilled by LDLM locks, server + * may reject the locking request by returning -EINPROGRESS, + * this is different from the -EINPROGRESS returned by quota + * code. + */ + if (rc == -EINPROGRESS) + rc = -EAGAIN; + GOTO(out, rc); + } + + lockrep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP); + LASSERT(lockrep != NULL); + rc = ptlrpc_status_ntoh(lockrep->lock_policy_res2); if (rc == 0 || rc == -EDQUOT || rc == -EINPROGRESS) rep_qbody = req_capsule_server_get(&req->rq_pill, &RMF_QUOTA_BODY); - +out: aa->aa_completion(env, aa->aa_qqi, req_qbody, rep_qbody, lockh, aa->aa_lvb, aa->aa_arg, rc); RETURN(rc); @@ -207,14 +218,15 @@ static int qsd_intent_interpret(const struct lu_env *env, int qsd_intent_lock(const struct lu_env *env, struct obd_export *exp, struct quota_body *qbody, bool sync, int it_op, qsd_req_completion_t completion, struct qsd_qtype_info *qqi, - union ldlm_wire_lvb *lvb, void *arg) + struct lquota_lvb *lvb, void *arg) { struct qsd_thread_info *qti = qsd_info(env); struct ptlrpc_request *req; struct qsd_async_args *aa = NULL; struct ldlm_intent *lit; struct quota_body *req_qbody; - int rc, flags = LDLM_FL_HAS_INTENT; + __u64 flags = LDLM_FL_HAS_INTENT; + int rc; ENTRY; LASSERT(exp != NULL); @@ -227,12 +239,13 @@ int qsd_intent_lock(const struct lu_env *env, struct obd_export *exp, if (req == NULL) GOTO(out, rc = -ENOMEM); - req->rq_no_resend = req->rq_no_delay = 1; + req->rq_no_retry_einprogress = 1; rc = ldlm_prep_enqueue_req(exp, req, NULL, 0); if (rc) { ptlrpc_request_free(req); GOTO(out, rc); } + req->rq_request_portal = MDS_READPAGE_PORTAL; lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT); lit->opc = (__u64)it_op; @@ -240,6 +253,8 @@ int qsd_intent_lock(const struct lu_env *env, struct obd_export *exp, req_qbody = req_capsule_client_get(&req->rq_pill, &RMF_QUOTA_BODY); *req_qbody = *qbody; + req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER, + sizeof(*lvb)); ptlrpc_request_set_replen(req); switch(it_op) { @@ -256,21 +271,21 @@ int qsd_intent_lock(const struct lu_env *env, struct obd_export *exp, break; case IT_QUOTA_DQACQ: /* build resource name associated for per-ID quota lock */ - fid_build_quota_resid(&qbody->qb_fid, &qbody->qb_id, - &qti->qti_resid); + fid_build_quota_res_name(&qbody->qb_fid, &qbody->qb_id, + &qti->qti_resid); /* copy einfo template and fill ei_cbdata with lqe pointer */ memcpy(&qti->qti_einfo, &qsd_id_einfo, sizeof(qti->qti_einfo)); qti->qti_einfo.ei_cbdata = arg; break; default: - LASSERTF(0, "invalid it_op %d", it_op); + LASSERTF(0, "invalid it_op %d\n", it_op); } /* build lock enqueue request */ rc = ldlm_cli_enqueue(exp, &req, &qti->qti_einfo, &qti->qti_resid, NULL, - &flags, (void *)lvb, sizeof(*lvb), &qti->qti_lockh, - 1); + &flags, (void *)lvb, sizeof(*lvb), LVB_T_LQUOTA, + &qti->qti_lockh, 1); if (rc < 0) { ptlrpc_req_finished(req); GOTO(out, rc); @@ -281,28 +296,32 @@ int qsd_intent_lock(const struct lu_env *env, struct obd_export *exp, case IT_QUOTA_CONN: /* grab reference on qqi for new lock */ #ifdef USE_LU_REF + { struct ldlm_lock *lock; lock = ldlm_handle2lock(&qti->qti_lockh); if (lock == NULL) { ptlrpc_req_finished(req); - GOTO(out, -ENOLCK); + GOTO(out, rc = -ENOLCK); } lu_ref_add(&qqi->qqi_reference, "glb_lock", lock); LDLM_LOCK_PUT(lock); + } #endif qqi_getref(qqi); break; case IT_QUOTA_DQACQ: /* grab reference on lqe for new lock */ lqe_getref((struct lquota_entry *)arg); + /* all acquire/release request are sent with no_resend and + * no_delay flag */ + req->rq_no_resend = req->rq_no_delay = 1; break; default: break; } - CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args)); - aa = ptlrpc_req_async_args(req); + aa = ptlrpc_req_async_args(aa, req); aa->aa_exp = exp; aa->aa_qqi = qqi; aa->aa_arg = arg; @@ -318,12 +337,12 @@ int qsd_intent_lock(const struct lu_env *env, struct obd_export *exp, } else { /* queue lock request and return */ req->rq_interpret_reply = qsd_intent_interpret; - ptlrpcd_add_req(req, PDL_POLICY_LOCAL, -1); + ptlrpcd_add_req(req); } RETURN(rc); out: - completion(env, qqi, qbody, NULL, &qti->qti_lockh, NULL, arg, rc); + completion(env, qqi, qbody, NULL, &qti->qti_lockh, lvb, arg, rc); return rc; } @@ -343,7 +362,7 @@ out: */ int qsd_fetch_index(const struct lu_env *env, struct obd_export *exp, struct idx_info *ii, unsigned int npages, - cfs_page_t **pages, bool *need_swab) + struct page **pages, bool *need_swab) { struct ptlrpc_request *req; struct idx_info *req_ii; @@ -367,16 +386,17 @@ int qsd_fetch_index(const struct lu_env *env, struct obd_export *exp, ptlrpc_at_set_req_timeout(req); /* allocate bulk descriptor */ - desc = ptlrpc_prep_bulk_imp(req, npages, BULK_PUT_SINK, - MDS_BULK_PORTAL); - if (desc == NULL) { - ptlrpc_request_free(req); - RETURN(-ENOMEM); - } + desc = ptlrpc_prep_bulk_imp(req, npages, 1, + PTLRPC_BULK_PUT_SINK, + MDS_BULK_PORTAL, + &ptlrpc_bulk_kiov_pin_ops); + if (desc == NULL) + GOTO(out, rc = -ENOMEM); /* req now owns desc and will free it when it gets freed */ for (i = 0; i < npages; i++) - ptlrpc_prep_bulk_page(desc, pages[i], 0, CFS_PAGE_SIZE); + desc->bd_frag_ops->add_kiov_frag(desc, pages[i], 0, + PAGE_SIZE); /* pack index information in request */ req_ii = req_capsule_client_get(&req->rq_pill, &RMF_IDX_INFO); @@ -393,6 +413,10 @@ int qsd_fetch_index(const struct lu_env *env, struct obd_export *exp, req->rq_bulk->bd_nob_transferred); if (rc < 0) GOTO(out, rc); + else + /* sptlrpc_cli_unwrap_bulk_read() returns the number of bytes + * transferred*/ + rc = 0; req_ii = req_capsule_server_get(&req->rq_pill, &RMF_IDX_INFO); *ii = *req_ii;