X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=blobdiff_plain;f=lustre%2Fosc%2Fosc_request.c;h=f636360dfda9288076eead9f3cd98fd9c2b2fd7e;hp=6b90effc33c323f7f3650e2dbe98af970cf62e32;hb=713174908cb8e5e3ceadd3ca1cb42a88b200e576;hpb=2b294992edce5af7b79d4300ed3aa1ea6a8db850 diff --git a/lustre/osc/osc_request.c b/lustre/osc/osc_request.c index 6b90eff..f636360 100644 --- a/lustre/osc/osc_request.c +++ b/lustre/osc/osc_request.c @@ -27,7 +27,7 @@ * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved. * Use is subject to license terms. * - * Copyright (c) 2011, 2014, Intel Corporation. + * Copyright (c) 2011, 2015, Intel Corporation. */ /* * This file is part of Lustre, http://www.lustre.org/ @@ -38,40 +38,46 @@ #include -#include -#include #include -#include -#include + #include -#include #include -#include +#include #include +#include +#include +#include +#include +#include +#include +#include #include -#include "osc_internal.h" + #include "osc_cl_internal.h" +#include "osc_internal.h" + +atomic_t osc_pool_req_count; +unsigned int osc_reqpool_maxreqcount; +struct ptlrpc_request_pool *osc_rq_pool; + +/* max memory used for request pool, unit is MB */ +static unsigned int osc_reqpool_mem_max = 5; +module_param(osc_reqpool_mem_max, uint, 0444); struct osc_brw_async_args { struct obdo *aa_oa; int aa_requested_nob; int aa_nio_count; - obd_count aa_page_count; + u32 aa_page_count; int aa_resends; struct brw_page **aa_ppga; struct client_obd *aa_cli; struct list_head aa_oaps; struct list_head aa_exts; - struct obd_capa *aa_ocapa; - struct cl_req *aa_clerq; }; #define osc_grant_args osc_brw_async_args -struct osc_async_args { - struct obd_info *aa_oi; -}; - struct osc_setattr_args { struct obdo *sa_oa; obd_enqueue_update_f sa_upcall; @@ -79,15 +85,16 @@ struct osc_setattr_args { }; struct osc_fsync_args { - struct obd_info *fa_oi; - obd_enqueue_update_f fa_upcall; + struct osc_object *fa_obj; + struct obdo *fa_oa; + obd_enqueue_update_f fa_upcall; void *fa_cookie; }; struct osc_enqueue_args { struct obd_export *oa_exp; - ldlm_type_t oa_type; - ldlm_mode_t oa_mode; + enum ldlm_type oa_type; + enum ldlm_mode oa_mode; __u64 *oa_flags; osc_enqueue_upcall_f oa_upcall; void *oa_cookie; @@ -96,195 +103,102 @@ struct osc_enqueue_args { unsigned int oa_agl:1; }; -static void osc_release_ppga(struct brw_page **ppga, obd_count count); +static void osc_release_ppga(struct brw_page **ppga, size_t count); static int brw_interpret(const struct lu_env *env, struct ptlrpc_request *req, void *data, int rc); -static inline void osc_pack_capa(struct ptlrpc_request *req, - struct ost_body *body, void *capa) -{ - struct obd_capa *oc = (struct obd_capa *)capa; - struct lustre_capa *c; - - if (!capa) - return; - - c = req_capsule_client_get(&req->rq_pill, &RMF_CAPA1); - LASSERT(c); - capa_cpy(c, oc); - body->oa.o_valid |= OBD_MD_FLOSSCAPA; - DEBUG_CAPA(D_SEC, c, "pack"); -} - -static inline void osc_pack_req_body(struct ptlrpc_request *req, - struct obd_info *oinfo) +void osc_pack_req_body(struct ptlrpc_request *req, struct obdo *oa) { struct ost_body *body; body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY); LASSERT(body); - lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, - oinfo->oi_oa); - osc_pack_capa(req, body, oinfo->oi_capa); -} - -static inline void osc_set_capa_size(struct ptlrpc_request *req, - const struct req_msg_field *field, - struct obd_capa *oc) -{ - if (oc == NULL) - req_capsule_set_size(&req->rq_pill, field, RCL_CLIENT, 0); - else - /* it is already calculated as sizeof struct obd_capa */ - ; -} - -static int osc_getattr_interpret(const struct lu_env *env, - struct ptlrpc_request *req, - struct osc_async_args *aa, int rc) -{ - struct ost_body *body; - ENTRY; - - if (rc != 0) - GOTO(out, rc); - - body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY); - if (body) { - CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode); - lustre_get_wire_obdo(&req->rq_import->imp_connect_data, - aa->aa_oi->oi_oa, &body->oa); - - /* This should really be sent by the OST */ - aa->aa_oi->oi_oa->o_blksize = DT_MAX_BRW_SIZE; - aa->aa_oi->oi_oa->o_valid |= OBD_MD_FLBLKSZ; - } else { - CDEBUG(D_INFO, "can't unpack ost_body\n"); - rc = -EPROTO; - aa->aa_oi->oi_oa->o_valid = 0; - } -out: - rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc); - RETURN(rc); -} - -static int osc_getattr_async(struct obd_export *exp, struct obd_info *oinfo, - struct ptlrpc_request_set *set) -{ - struct ptlrpc_request *req; - struct osc_async_args *aa; - int rc; - ENTRY; - - req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR); - if (req == NULL) - RETURN(-ENOMEM); - - osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa); - rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR); - if (rc) { - ptlrpc_request_free(req); - RETURN(rc); - } - - osc_pack_req_body(req, oinfo); - - ptlrpc_request_set_replen(req); - req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_getattr_interpret; - - CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args)); - aa = ptlrpc_req_async_args(req); - aa->aa_oi = oinfo; - - ptlrpc_set_add_req(set, req); - RETURN(0); + lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa); } static int osc_getattr(const struct lu_env *env, struct obd_export *exp, - struct obd_info *oinfo) + struct obdo *oa) { - struct ptlrpc_request *req; - struct ost_body *body; - int rc; - ENTRY; + struct ptlrpc_request *req; + struct ost_body *body; + int rc; - req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR); - if (req == NULL) - RETURN(-ENOMEM); + ENTRY; + req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR); + if (req == NULL) + RETURN(-ENOMEM); - osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa); - rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR); - if (rc) { - ptlrpc_request_free(req); - RETURN(rc); - } + rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR); + if (rc) { + ptlrpc_request_free(req); + RETURN(rc); + } - osc_pack_req_body(req, oinfo); + osc_pack_req_body(req, oa); - ptlrpc_request_set_replen(req); + ptlrpc_request_set_replen(req); - rc = ptlrpc_queue_wait(req); - if (rc) - GOTO(out, rc); + rc = ptlrpc_queue_wait(req); + if (rc) + GOTO(out, rc); - body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY); - if (body == NULL) - GOTO(out, rc = -EPROTO); + body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY); + if (body == NULL) + GOTO(out, rc = -EPROTO); CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode); - lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oinfo->oi_oa, - &body->oa); + lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa); + + oa->o_blksize = cli_brw_size(exp->exp_obd); + oa->o_valid |= OBD_MD_FLBLKSZ; - oinfo->oi_oa->o_blksize = cli_brw_size(exp->exp_obd); - oinfo->oi_oa->o_valid |= OBD_MD_FLBLKSZ; + EXIT; +out: + ptlrpc_req_finished(req); - EXIT; - out: - ptlrpc_req_finished(req); - return rc; + return rc; } static int osc_setattr(const struct lu_env *env, struct obd_export *exp, - struct obd_info *oinfo, struct obd_trans_info *oti) + struct obdo *oa) { - struct ptlrpc_request *req; - struct ost_body *body; - int rc; - ENTRY; + struct ptlrpc_request *req; + struct ost_body *body; + int rc; - LASSERT(oinfo->oi_oa->o_valid & OBD_MD_FLGROUP); + ENTRY; + LASSERT(oa->o_valid & OBD_MD_FLGROUP); - req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR); - if (req == NULL) - RETURN(-ENOMEM); + req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR); + if (req == NULL) + RETURN(-ENOMEM); - osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa); - rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR); - if (rc) { - ptlrpc_request_free(req); - RETURN(rc); - } + rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR); + if (rc) { + ptlrpc_request_free(req); + RETURN(rc); + } - osc_pack_req_body(req, oinfo); + osc_pack_req_body(req, oa); - ptlrpc_request_set_replen(req); + ptlrpc_request_set_replen(req); - rc = ptlrpc_queue_wait(req); - if (rc) - GOTO(out, rc); + rc = ptlrpc_queue_wait(req); + if (rc) + GOTO(out, rc); - body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY); - if (body == NULL) - GOTO(out, rc = -EPROTO); + body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY); + if (body == NULL) + GOTO(out, rc = -EPROTO); - lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oinfo->oi_oa, - &body->oa); + lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa); - EXIT; + EXIT; out: - ptlrpc_req_finished(req); - RETURN(rc); + ptlrpc_req_finished(req); + + RETURN(rc); } static int osc_setattr_interpret(const struct lu_env *env, @@ -308,67 +222,55 @@ out: RETURN(rc); } -int osc_setattr_async_base(struct obd_export *exp, struct obd_info *oinfo, - struct obd_trans_info *oti, - obd_enqueue_update_f upcall, void *cookie, - struct ptlrpc_request_set *rqset) +int osc_setattr_async(struct obd_export *exp, struct obdo *oa, + obd_enqueue_update_f upcall, void *cookie, + struct ptlrpc_request_set *rqset) { - struct ptlrpc_request *req; - struct osc_setattr_args *sa; - int rc; - ENTRY; + struct ptlrpc_request *req; + struct osc_setattr_args *sa; + int rc; - req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR); - if (req == NULL) - RETURN(-ENOMEM); + ENTRY; - osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa); - rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR); - if (rc) { - ptlrpc_request_free(req); - RETURN(rc); - } + req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR); + if (req == NULL) + RETURN(-ENOMEM); - if (oti && oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE) - oinfo->oi_oa->o_lcookie = *oti->oti_logcookies; + rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR); + if (rc) { + ptlrpc_request_free(req); + RETURN(rc); + } - osc_pack_req_body(req, oinfo); + osc_pack_req_body(req, oa); - ptlrpc_request_set_replen(req); + ptlrpc_request_set_replen(req); - /* do mds to ost setattr asynchronously */ - if (!rqset) { - /* Do not wait for response. */ - ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1); - } else { - req->rq_interpret_reply = - (ptlrpc_interpterer_t)osc_setattr_interpret; - - CLASSERT (sizeof(*sa) <= sizeof(req->rq_async_args)); - sa = ptlrpc_req_async_args(req); - sa->sa_oa = oinfo->oi_oa; - sa->sa_upcall = upcall; - sa->sa_cookie = cookie; - - if (rqset == PTLRPCD_SET) - ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1); - else - ptlrpc_set_add_req(rqset, req); - } + /* do mds to ost setattr asynchronously */ + if (!rqset) { + /* Do not wait for response. */ + ptlrpcd_add_req(req); + } else { + req->rq_interpret_reply = + (ptlrpc_interpterer_t)osc_setattr_interpret; - RETURN(0); -} + CLASSERT(sizeof(*sa) <= sizeof(req->rq_async_args)); + sa = ptlrpc_req_async_args(req); + sa->sa_oa = oa; + sa->sa_upcall = upcall; + sa->sa_cookie = cookie; -static int osc_setattr_async(struct obd_export *exp, struct obd_info *oinfo, - struct obd_trans_info *oti, - struct ptlrpc_request_set *rqset) -{ - return osc_setattr_async_base(exp, oinfo, oti, - oinfo->oi_cb_up, oinfo, rqset); + if (rqset == PTLRPCD_SET) + ptlrpcd_add_req(req); + else + ptlrpc_set_add_req(rqset, req); + } + + RETURN(0); } static int osc_create(const struct lu_env *env, struct obd_export *exp, - struct obdo *oa, struct obd_trans_info *oti) + struct obdo *oa) { struct ptlrpc_request *req; struct ost_body *body; @@ -396,14 +298,6 @@ static int osc_create(const struct lu_env *env, struct obd_export *exp, ptlrpc_request_set_replen(req); - if ((oa->o_valid & OBD_MD_FLFLAGS) && - oa->o_flags == OBD_FL_DELORPHAN) { - DEBUG_REQ(D_HA, req, - "delorphan from OST integration"); - /* Don't resend the delorphan req */ - req->rq_no_resend = req->rq_no_delay = 1; - } - rc = ptlrpc_queue_wait(req); if (rc) GOTO(out_req, rc); @@ -418,24 +312,15 @@ static int osc_create(const struct lu_env *env, struct obd_export *exp, oa->o_blksize = cli_brw_size(exp->exp_obd); oa->o_valid |= OBD_MD_FLBLKSZ; - if (oti != NULL) { - if (oa->o_valid & OBD_MD_FLCOOKIE) { - if (oti->oti_logcookies == NULL) - oti->oti_logcookies = &oti->oti_onecookie; - - *oti->oti_logcookies = oa->o_lcookie; - } - } - - CDEBUG(D_HA, "transno: "LPD64"\n", - lustre_msg_get_transno(req->rq_repmsg)); + CDEBUG(D_HA, "transno: "LPD64"\n", + lustre_msg_get_transno(req->rq_repmsg)); out_req: - ptlrpc_req_finished(req); + ptlrpc_req_finished(req); out: RETURN(rc); } -int osc_punch_base(struct obd_export *exp, struct obd_info *oinfo, +int osc_punch_base(struct obd_export *exp, struct obdo *oa, obd_enqueue_update_f upcall, void *cookie, struct ptlrpc_request_set *rqset) { @@ -449,7 +334,6 @@ int osc_punch_base(struct obd_export *exp, struct obd_info *oinfo, if (req == NULL) RETURN(-ENOMEM); - osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa); rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH); if (rc) { ptlrpc_request_free(req); @@ -460,53 +344,68 @@ int osc_punch_base(struct obd_export *exp, struct obd_info *oinfo, body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY); LASSERT(body); - lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, - oinfo->oi_oa); - osc_pack_capa(req, body, oinfo->oi_capa); + lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa); - ptlrpc_request_set_replen(req); + ptlrpc_request_set_replen(req); - req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_setattr_interpret; - CLASSERT (sizeof(*sa) <= sizeof(req->rq_async_args)); - sa = ptlrpc_req_async_args(req); - sa->sa_oa = oinfo->oi_oa; - sa->sa_upcall = upcall; - sa->sa_cookie = cookie; - if (rqset == PTLRPCD_SET) - ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1); - else - ptlrpc_set_add_req(rqset, req); + req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_setattr_interpret; + CLASSERT(sizeof(*sa) <= sizeof(req->rq_async_args)); + sa = ptlrpc_req_async_args(req); + sa->sa_oa = oa; + sa->sa_upcall = upcall; + sa->sa_cookie = cookie; + if (rqset == PTLRPCD_SET) + ptlrpcd_add_req(req); + else + ptlrpc_set_add_req(rqset, req); - RETURN(0); + RETURN(0); } static int osc_sync_interpret(const struct lu_env *env, struct ptlrpc_request *req, void *arg, int rc) { - struct osc_fsync_args *fa = arg; - struct ost_body *body; - ENTRY; + struct osc_fsync_args *fa = arg; + struct ost_body *body; + struct cl_attr *attr = &osc_env_info(env)->oti_attr; + unsigned long valid = 0; + struct cl_object *obj; + ENTRY; - if (rc) - GOTO(out, rc); + if (rc != 0) + GOTO(out, rc); - body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY); - if (body == NULL) { - CERROR ("can't unpack ost_body\n"); - GOTO(out, rc = -EPROTO); - } + body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY); + if (body == NULL) { + CERROR("can't unpack ost_body\n"); + GOTO(out, rc = -EPROTO); + } + + *fa->fa_oa = body->oa; + obj = osc2cl(fa->fa_obj); + + /* Update osc object's blocks attribute */ + cl_object_attr_lock(obj); + if (body->oa.o_valid & OBD_MD_FLBLOCKS) { + attr->cat_blocks = body->oa.o_blocks; + valid |= CAT_BLOCKS; + } + + if (valid != 0) + cl_object_attr_update(env, obj, attr, valid); + cl_object_attr_unlock(obj); - *fa->fa_oi->oi_oa = body->oa; out: rc = fa->fa_upcall(fa->fa_cookie, rc); RETURN(rc); } -int osc_sync_base(struct obd_export *exp, struct obd_info *oinfo, +int osc_sync_base(struct osc_object *obj, struct obdo *oa, obd_enqueue_update_f upcall, void *cookie, struct ptlrpc_request_set *rqset) { + struct obd_export *exp = osc_export(obj); struct ptlrpc_request *req; struct ost_body *body; struct osc_fsync_args *fa; @@ -517,7 +416,6 @@ int osc_sync_base(struct obd_export *exp, struct obd_info *oinfo, if (req == NULL) RETURN(-ENOMEM); - osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa); rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC); if (rc) { ptlrpc_request_free(req); @@ -527,21 +425,20 @@ int osc_sync_base(struct obd_export *exp, struct obd_info *oinfo, /* overload the size and blocks fields in the oa with start/end */ body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY); LASSERT(body); - lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, - oinfo->oi_oa); - osc_pack_capa(req, body, oinfo->oi_capa); + lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa); - ptlrpc_request_set_replen(req); - req->rq_interpret_reply = osc_sync_interpret; + ptlrpc_request_set_replen(req); + req->rq_interpret_reply = osc_sync_interpret; CLASSERT(sizeof(*fa) <= sizeof(req->rq_async_args)); fa = ptlrpc_req_async_args(req); - fa->fa_oi = oinfo; + fa->fa_obj = obj; + fa->fa_oa = oa; fa->fa_upcall = upcall; fa->fa_cookie = cookie; if (rqset == PTLRPCD_SET) - ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1); + ptlrpcd_add_req(req); else ptlrpc_set_add_req(rqset, req); @@ -553,13 +450,13 @@ int osc_sync_base(struct obd_export *exp, struct obd_info *oinfo, * locks added to @cancels list. */ static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa, struct list_head *cancels, - ldlm_mode_t mode, __u64 lock_flags) + enum ldlm_mode mode, __u64 lock_flags) { - struct ldlm_namespace *ns = exp->exp_obd->obd_namespace; - struct ldlm_res_id res_id; - struct ldlm_resource *res; - int count; - ENTRY; + struct ldlm_namespace *ns = exp->exp_obd->obd_namespace; + struct ldlm_res_id res_id; + struct ldlm_resource *res; + int count; + ENTRY; /* Return, i.e. cancel nothing, only if ELC is supported (flag in * export) but disabled through procfs (flag in NS). @@ -612,18 +509,8 @@ static int osc_can_send_destroy(struct client_obd *cli) return 0; } -/* Destroy requests can be async always on the client, and we don't even really - * care about the return code since the client cannot do anything at all about - * a destroy failure. - * When the MDS is unlinking a filename, it saves the file objects into a - * recovery llog, and these object records are cancelled when the OST reports - * they were destroyed and sync'd to disk (i.e. transaction committed). - * If the client dies, or the OST is down when the object should be destroyed, - * the records are not cancelled, and when the OST reconnects to the MDS next, - * it will retrieve the llog unlink logs and then sends the log cancellation - * cookies to the MDS after committing destroy transactions. */ static int osc_destroy(const struct lu_env *env, struct obd_export *exp, - struct obdo *oa, struct obd_trans_info *oti) + struct obdo *oa) { struct client_obd *cli = &exp->exp_obd->u.cli; struct ptlrpc_request *req; @@ -646,7 +533,6 @@ static int osc_destroy(const struct lu_env *env, struct obd_export *exp, RETURN(-ENOMEM); } - osc_set_capa_size(req, &RMF_CAPA1, NULL); rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY, 0, &cancels, count); if (rc) { @@ -657,48 +543,42 @@ static int osc_destroy(const struct lu_env *env, struct obd_export *exp, req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */ ptlrpc_at_set_req_timeout(req); - if (oti != NULL && oa->o_valid & OBD_MD_FLCOOKIE) - oa->o_lcookie = *oti->oti_logcookies; body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY); LASSERT(body); lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa); ptlrpc_request_set_replen(req); - /* If osc_destory is for destroying the unlink orphan, - * sent from MDT to OST, which should not be blocked here, - * because the process might be triggered by ptlrpcd, and - * it is not good to block ptlrpcd thread (b=16006)*/ - if (!(oa->o_flags & OBD_FL_DELORPHAN)) { - req->rq_interpret_reply = osc_destroy_interpret; - if (!osc_can_send_destroy(cli)) { - struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, - NULL); - - /* - * Wait until the number of on-going destroy RPCs drops - * under max_rpc_in_flight - */ - l_wait_event_exclusive(cli->cl_destroy_waitq, - osc_can_send_destroy(cli), &lwi); - } - } + req->rq_interpret_reply = osc_destroy_interpret; + if (!osc_can_send_destroy(cli)) { + struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL); - /* Do not wait for response */ - ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1); - RETURN(0); + /* + * Wait until the number of on-going destroy RPCs drops + * under max_rpc_in_flight + */ + l_wait_event_exclusive(cli->cl_destroy_waitq, + osc_can_send_destroy(cli), &lwi); + } + + /* Do not wait for response */ + ptlrpcd_add_req(req); + RETURN(0); } static void osc_announce_cached(struct client_obd *cli, struct obdo *oa, long writing_bytes) { - obd_flag bits = OBD_MD_FLBLOCKS|OBD_MD_FLGRANT; + u64 bits = OBD_MD_FLBLOCKS | OBD_MD_FLGRANT; - LASSERT(!(oa->o_valid & bits)); + LASSERT(!(oa->o_valid & bits)); oa->o_valid |= bits; spin_lock(&cli->cl_loi_list_lock); - oa->o_dirty = cli->cl_dirty_pages << PAGE_CACHE_SHIFT; + if (OCD_HAS_FLAG(&cli->cl_import->imp_connect_data, GRANT_PARAM)) + oa->o_dirty = cli->cl_dirty_grant; + else + oa->o_dirty = cli->cl_dirty_pages << PAGE_CACHE_SHIFT; if (unlikely(cli->cl_dirty_pages - cli->cl_dirty_transit > cli->cl_dirty_max_pages)) { CERROR("dirty %lu - %lu > dirty_max %lu\n", @@ -707,13 +587,12 @@ static void osc_announce_cached(struct client_obd *cli, struct obdo *oa, oa->o_undirty = 0; } else if (unlikely(atomic_long_read(&obd_dirty_pages) - atomic_long_read(&obd_dirty_transit_pages) > - (obd_max_dirty_pages + 1))) { + (long)(obd_max_dirty_pages + 1))) { /* The atomic_read() allowing the atomic_inc() are * not covered by a lock thus they may safely race and trip * this CERROR() unless we add in a small fudge factor (+1). */ - CERROR("%s: dirty %ld - %ld > system dirty_max %lu\n", - cli->cl_import->imp_obd->obd_name, - atomic_long_read(&obd_dirty_pages), + CERROR("%s: dirty %ld - %ld > system dirty_max %ld\n", + cli_name(cli), atomic_long_read(&obd_dirty_pages), atomic_long_read(&obd_dirty_transit_pages), obd_max_dirty_pages); oa->o_undirty = 0; @@ -723,11 +602,22 @@ static void osc_announce_cached(struct client_obd *cli, struct obdo *oa, cli->cl_dirty_pages, cli->cl_dirty_max_pages); oa->o_undirty = 0; } else { - unsigned long max_in_flight = (cli->cl_max_pages_per_rpc << - PAGE_CACHE_SHIFT) * - (cli->cl_max_rpcs_in_flight + 1); - oa->o_undirty = max(cli->cl_dirty_max_pages << PAGE_CACHE_SHIFT, - max_in_flight); + unsigned long nrpages; + + nrpages = cli->cl_max_pages_per_rpc; + nrpages *= cli->cl_max_rpcs_in_flight + 1; + nrpages = max(nrpages, cli->cl_dirty_max_pages); + oa->o_undirty = nrpages << PAGE_CACHE_SHIFT; + if (OCD_HAS_FLAG(&cli->cl_import->imp_connect_data, + GRANT_PARAM)) { + int nrextents; + + /* take extent tax into account when asking for more + * grant space */ + nrextents = (nrpages + cli->cl_max_extent_pages - 1) / + cli->cl_max_extent_pages; + oa->o_undirty += nrextents * cli->cl_grant_extent_tax; + } } oa->o_grant = cli->cl_avail_grant + cli->cl_reserved_grant; oa->o_dropped = cli->cl_lost_grant; @@ -735,7 +625,6 @@ static void osc_announce_cached(struct client_obd *cli, struct obdo *oa, spin_unlock(&cli->cl_loi_list_lock); CDEBUG(D_CACHE,"dirty: "LPU64" undirty: %u dropped %u grant: "LPU64"\n", oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant); - } void osc_update_next_shrink(struct client_obd *cli) @@ -746,7 +635,7 @@ void osc_update_next_shrink(struct client_obd *cli) cli->cl_next_shrink_grant); } -static void __osc_update_grant(struct client_obd *cli, obd_size grant) +static void __osc_update_grant(struct client_obd *cli, u64 grant) { spin_lock(&cli->cl_loi_list_lock); cli->cl_avail_grant += grant; @@ -762,8 +651,9 @@ static void osc_update_grant(struct client_obd *cli, struct ost_body *body) } static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp, - obd_count keylen, void *key, obd_count vallen, - void *val, struct ptlrpc_request_set *set); + u32 keylen, void *key, + u32 vallen, void *val, + struct ptlrpc_request_set *set); static int osc_shrink_grant_interpret(const struct lu_env *env, struct ptlrpc_request *req, @@ -899,21 +789,19 @@ static int osc_grant_shrink_grant_cb(struct timeout_item *item, void *data) static int osc_add_shrink_grant(struct client_obd *client) { - int rc; + int rc; - rc = ptlrpc_add_timeout_client(client->cl_grant_shrink_interval, - TIMEOUT_GRANT, - osc_grant_shrink_grant_cb, NULL, - &client->cl_grant_shrink_list); - if (rc) { - CERROR("add grant client %s error %d\n", - client->cl_import->imp_obd->obd_name, rc); - return rc; - } - CDEBUG(D_CACHE, "add grant client %s \n", - client->cl_import->imp_obd->obd_name); - osc_update_next_shrink(client); - return 0; + rc = ptlrpc_add_timeout_client(client->cl_grant_shrink_interval, + TIMEOUT_GRANT, + osc_grant_shrink_grant_cb, NULL, + &client->cl_grant_shrink_list); + if (rc) { + CERROR("add grant client %s error %d\n", cli_name(client), rc); + return rc; + } + CDEBUG(D_CACHE, "add grant client %s\n", cli_name(client)); + osc_update_next_shrink(client); + return 0; } static int osc_del_shrink_grant(struct client_obd *client) @@ -934,28 +822,50 @@ static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd) * left EVICTED state, then cl_dirty_pages must be 0 already. */ spin_lock(&cli->cl_loi_list_lock); - if (cli->cl_import->imp_state == LUSTRE_IMP_EVICTED) - cli->cl_avail_grant = ocd->ocd_grant; - else - cli->cl_avail_grant = ocd->ocd_grant - - (cli->cl_dirty_pages << PAGE_CACHE_SHIFT); + cli->cl_avail_grant = ocd->ocd_grant; + if (cli->cl_import->imp_state != LUSTRE_IMP_EVICTED) { + cli->cl_avail_grant -= cli->cl_reserved_grant; + if (OCD_HAS_FLAG(ocd, GRANT_PARAM)) + cli->cl_avail_grant -= cli->cl_dirty_grant; + else + cli->cl_avail_grant -= + cli->cl_dirty_pages << PAGE_CACHE_SHIFT; + } if (cli->cl_avail_grant < 0) { CWARN("%s: available grant < 0: avail/ocd/dirty %ld/%u/%ld\n", - cli->cl_import->imp_obd->obd_name, cli->cl_avail_grant, + cli_name(cli), cli->cl_avail_grant, ocd->ocd_grant, cli->cl_dirty_pages << PAGE_CACHE_SHIFT); /* workaround for servers which do not have the patch from * LU-2679 */ cli->cl_avail_grant = ocd->ocd_grant; } - /* determine the appropriate chunk size used by osc_extent. */ - cli->cl_chunkbits = max_t(int, PAGE_CACHE_SHIFT, ocd->ocd_blocksize); + if (OCD_HAS_FLAG(ocd, GRANT_PARAM)) { + u64 size; + + /* overhead for each extent insertion */ + cli->cl_grant_extent_tax = ocd->ocd_grant_tax_kb << 10; + /* determine the appropriate chunk size used by osc_extent. */ + cli->cl_chunkbits = max_t(int, PAGE_CACHE_SHIFT, + ocd->ocd_grant_blkbits); + /* determine maximum extent size, in #pages */ + size = (u64)ocd->ocd_grant_max_blks << ocd->ocd_grant_blkbits; + cli->cl_max_extent_pages = size >> PAGE_CACHE_SHIFT; + if (cli->cl_max_extent_pages == 0) + cli->cl_max_extent_pages = 1; + } else { + cli->cl_grant_extent_tax = 0; + cli->cl_chunkbits = PAGE_CACHE_SHIFT; + cli->cl_max_extent_pages = DT_MAX_BRW_PAGES; + } spin_unlock(&cli->cl_loi_list_lock); CDEBUG(D_CACHE, "%s, setting cl_avail_grant: %ld cl_lost_grant: %ld." - "chunk bits: %d.\n", cli->cl_import->imp_obd->obd_name, - cli->cl_avail_grant, cli->cl_lost_grant, cli->cl_chunkbits); + "chunk bits: %d cl_max_extent_pages: %d\n", + cli_name(cli), + cli->cl_avail_grant, cli->cl_lost_grant, cli->cl_chunkbits, + cli->cl_max_extent_pages); if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT_SHRINK && list_empty(&cli->cl_grant_shrink_list)) @@ -966,7 +876,7 @@ static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd) * beyond the end of a stripe file; i.e. lustre is reading a sparse file * via the LOV, and it _knows_ it's reading inside the file, it's just that * this stripe never got written at or beyond this stripe offset yet. */ -static void handle_short_read(int nob_read, obd_count page_count, +static void handle_short_read(int nob_read, size_t page_count, struct brw_page **pga) { char *ptr; @@ -979,7 +889,7 @@ static void handle_short_read(int nob_read, obd_count page_count, if (pga[i]->count > nob_read) { /* EOF inside this page */ ptr = kmap(pga[i]->pg) + - (pga[i]->off & ~CFS_PAGE_MASK); + (pga[i]->off & ~PAGE_MASK); memset(ptr + nob_read, 0, pga[i]->count - nob_read); kunmap(pga[i]->pg); page_count--; @@ -994,7 +904,7 @@ static void handle_short_read(int nob_read, obd_count page_count, /* zero remaining pages */ while (page_count-- > 0) { - ptr = kmap(pga[i]->pg) + (pga[i]->off & ~CFS_PAGE_MASK); + ptr = kmap(pga[i]->pg) + (pga[i]->off & ~PAGE_MASK); memset(ptr, 0, pga[i]->count); kunmap(pga[i]->pg); i++; @@ -1002,8 +912,8 @@ static void handle_short_read(int nob_read, obd_count page_count, } static int check_write_rcs(struct ptlrpc_request *req, - int requested_nob, int niocount, - obd_count page_count, struct brw_page **pga) + int requested_nob, int niocount, + size_t page_count, struct brw_page **pga) { int i; __u32 *remote_rcs; @@ -1057,11 +967,11 @@ static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2) return (p1->off + p1->count == p2->off); } -static obd_count osc_checksum_bulk(int nob, obd_count pg_count, - struct brw_page **pga, int opc, - cksum_type_t cksum_type) +static u32 osc_checksum_bulk(int nob, size_t pg_count, + struct brw_page **pga, int opc, + cksum_type_t cksum_type) { - __u32 cksum; + u32 cksum; int i = 0; struct cfs_crypto_hash_desc *hdesc; unsigned int bufsize; @@ -1078,23 +988,23 @@ static obd_count osc_checksum_bulk(int nob, obd_count pg_count, } while (nob > 0 && pg_count > 0) { - int count = pga[i]->count > nob ? nob : pga[i]->count; + unsigned int count = pga[i]->count > nob ? nob : pga[i]->count; /* corrupt the data before we compute the checksum, to * simulate an OST->client data error */ if (i == 0 && opc == OST_READ && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE)) { unsigned char *ptr = kmap(pga[i]->pg); - int off = pga[i]->off & ~CFS_PAGE_MASK; + int off = pga[i]->off & ~PAGE_MASK; - memcpy(ptr + off, "bad1", min(4, nob)); + memcpy(ptr + off, "bad1", min_t(typeof(nob), 4, nob)); kunmap(pga[i]->pg); } cfs_crypto_hash_update_page(hdesc, pga[i]->pg, - pga[i]->off & ~CFS_PAGE_MASK, + pga[i]->off & ~PAGE_MASK, count); LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d\n", - (int)(pga[i]->off & ~CFS_PAGE_MASK)); + (int)(pga[i]->off & ~PAGE_MASK)); nob -= pga[i]->count; pg_count--; @@ -1112,12 +1022,10 @@ static obd_count osc_checksum_bulk(int nob, obd_count pg_count, return cksum; } -static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa, - struct lov_stripe_md *lsm, obd_count page_count, - struct brw_page **pga, - struct ptlrpc_request **reqp, - struct obd_capa *ocapa, int reserve, - int resend) +static int +osc_brw_prep_request(int cmd, struct client_obd *cli, struct obdo *oa, + u32 page_count, struct brw_page **pga, + struct ptlrpc_request **reqp, int resend) { struct ptlrpc_request *req; struct ptlrpc_bulk_desc *desc; @@ -1135,15 +1043,15 @@ static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa, if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2)) RETURN(-EINVAL); /* Fatal */ - if ((cmd & OBD_BRW_WRITE) != 0) { - opc = OST_WRITE; - req = ptlrpc_request_alloc_pool(cli->cl_import, - cli->cl_import->imp_rq_pool, - &RQF_OST_BRW_WRITE); - } else { - opc = OST_READ; - req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW_READ); - } + if ((cmd & OBD_BRW_WRITE) != 0) { + opc = OST_WRITE; + req = ptlrpc_request_alloc_pool(cli->cl_import, + osc_rq_pool, + &RQF_OST_BRW_WRITE); + } else { + opc = OST_READ; + req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW_READ); + } if (req == NULL) RETURN(-ENOMEM); @@ -1157,7 +1065,6 @@ static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa, sizeof(*ioobj)); req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT, niocount * sizeof(*niobuf)); - osc_set_capa_size(req, &RMF_CAPA1, ocapa); rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc); if (rc) { @@ -1172,8 +1079,11 @@ static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa, desc = ptlrpc_prep_bulk_imp(req, page_count, cli->cl_import->imp_connect_data.ocd_brw_size >> LNET_MTU_BITS, - opc == OST_WRITE ? BULK_GET_SOURCE : BULK_PUT_SINK, - OST_BULK_PORTAL); + (opc == OST_WRITE ? PTLRPC_BULK_GET_SOURCE : + PTLRPC_BULK_PUT_SINK) | + PTLRPC_BULK_BUF_KIOV, + OST_BULK_PORTAL, + &ptlrpc_bulk_kiov_pin_ops); if (desc == NULL) GOTO(out, rc = -ENOMEM); @@ -1194,12 +1104,11 @@ static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa, * "max - 1" for old client compatibility sending "0", and also so the * the actual maximum is a power-of-two number, not one less. LU-1431 */ ioobj_max_brw_set(ioobj, desc->bd_md_max_brw); - osc_pack_capa(req, body, ocapa); LASSERT(page_count > 0); pg_prev = pga[0]; for (requested_nob = i = 0; i < page_count; i++, niobuf++) { struct brw_page *pg = pga[i]; - int poff = pg->off & ~CFS_PAGE_MASK; + int poff = pg->off & ~PAGE_MASK; LASSERT(pg->count > 0); /* make sure there is no gap in the middle of page array */ @@ -1220,7 +1129,7 @@ static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa, LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) == (pg->flag & OBD_BRW_SRVLOCK)); - ptlrpc_prep_bulk_page_pin(desc, pg->pg, poff, pg->count); + desc->bd_frag_ops->add_kiov_frag(desc, pg->pg, poff, pg->count); requested_nob += pg->count; if (i > 0 && can_merge_pages(pg_prev, pg)) { @@ -1304,8 +1213,6 @@ static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa, aa->aa_ppga = pga; aa->aa_cli = cli; INIT_LIST_HEAD(&aa->aa_oaps); - if (ocapa && reserve) - aa->aa_ocapa = capa_get(ocapa); *reqp = req; niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE); @@ -1320,9 +1227,9 @@ static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa, } static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer, - __u32 client_cksum, __u32 server_cksum, int nob, - obd_count page_count, struct brw_page **pga, - cksum_type_t client_cksum_type) + __u32 client_cksum, __u32 server_cksum, int nob, + size_t page_count, struct brw_page **pga, + cksum_type_t client_cksum_type) { __u32 new_cksum; char *msg; @@ -1372,7 +1279,7 @@ static int osc_brw_fini_request(struct ptlrpc_request *req, int rc) &req->rq_import->imp_connection->c_peer; struct client_obd *cli = aa->aa_cli; struct ost_body *body; - __u32 client_cksum = 0; + u32 client_cksum = 0; ENTRY; if (rc < 0 && rc != -EDQUOT) { @@ -1452,9 +1359,9 @@ static int osc_brw_fini_request(struct ptlrpc_request *req, int rc) if (body->oa.o_valid & OBD_MD_FLCKSUM) { static int cksum_counter; - __u32 server_cksum = body->oa.o_cksum; - char *via; - char *router; + u32 server_cksum = body->oa.o_cksum; + char *via = ""; + char *router = ""; cksum_type_t cksum_type; cksum_type = cksum_type_unpack(body->oa.o_valid &OBD_MD_FLFLAGS? @@ -1463,12 +1370,10 @@ static int osc_brw_fini_request(struct ptlrpc_request *req, int rc) aa->aa_ppga, OST_READ, cksum_type); - if (peer->nid == req->rq_bulk->bd_sender) { - via = router = ""; - } else { - via = " via "; - router = libcfs_nid2str(req->rq_bulk->bd_sender); - } + if (peer->nid != req->rq_bulk->bd_sender) { + via = " via "; + router = libcfs_nid2str(req->rq_bulk->bd_sender); + } if (server_cksum != client_cksum) { LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from " @@ -1527,12 +1432,10 @@ static int osc_brw_redo_request(struct ptlrpc_request *request, DEBUG_REQ(rc == -EINPROGRESS ? D_RPCTRACE : D_ERROR, request, "redo for recoverable error %d", rc); - rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) == - OST_WRITE ? OBD_BRW_WRITE :OBD_BRW_READ, - aa->aa_cli, aa->aa_oa, - NULL /* lsm unused by osc currently */, - aa->aa_page_count, aa->aa_ppga, - &new_req, aa->aa_ocapa, 0, 1); + rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) == + OST_WRITE ? OBD_BRW_WRITE : OBD_BRW_READ, + aa->aa_cli, aa->aa_oa, aa->aa_page_count, + aa->aa_ppga, &new_req, 1); if (rc) RETURN(rc); @@ -1577,14 +1480,11 @@ static int osc_brw_redo_request(struct ptlrpc_request *request, } } - new_aa->aa_ocapa = aa->aa_ocapa; - aa->aa_ocapa = NULL; - /* XXX: This code will run into problem if we're going to support * to add a series of BRW RPCs into a self-defined ptlrpc_request_set * and wait for all of them to be finished. We should inherit request * set from old request. */ - ptlrpcd_add_req(new_req, PDL_POLICY_SAME, -1); + ptlrpcd_add_req(new_req); DEBUG_REQ(D_INFO, new_req, "new request"); RETURN(0); @@ -1621,7 +1521,7 @@ static void sort_brw_pages(struct brw_page **array, int num) } while (stride > 1); } -static void osc_release_ppga(struct brw_page **ppga, obd_count count) +static void osc_release_ppga(struct brw_page **ppga, size_t count) { LASSERT(ppga != NULL); OBD_FREE(ppga, sizeof(*ppga) * count); @@ -1663,11 +1563,6 @@ static int brw_interpret(const struct lu_env *env, rc = -EIO; } - if (aa->aa_ocapa) { - capa_put(aa->aa_ocapa); - aa->aa_ocapa = NULL; - } - if (rc == 0) { struct obdo *oa = aa->aa_oa; struct cl_attr *attr = &osc_env_info(env)->oti_attr; @@ -1716,7 +1611,7 @@ static int brw_interpret(const struct lu_env *env, } if (valid != 0) - cl_object_attr_set(env, obj, attr, valid); + cl_object_attr_update(env, obj, attr, valid); cl_object_attr_unlock(obj); } OBDO_FREE(aa->aa_oa); @@ -1731,8 +1626,6 @@ static int brw_interpret(const struct lu_env *env, LASSERT(list_empty(&aa->aa_exts)); LASSERT(list_empty(&aa->aa_oaps)); - cl_req_completion(env, aa->aa_clerq, rc < 0 ? rc : - req->rq_bulk->bd_nob_transferred); osc_release_ppga(aa->aa_ppga, aa->aa_page_count); ptlrpc_lprocfs_brw(req, req->rq_bulk->bd_nob_transferred); @@ -1747,7 +1640,7 @@ static int brw_interpret(const struct lu_env *env, osc_wake_cache_waiters(cli); spin_unlock(&cli->cl_loi_list_lock); - osc_io_unplug(env, cli, NULL, PDL_POLICY_SAME); + osc_io_unplug(env, cli, NULL); RETURN(rc); } @@ -1775,7 +1668,7 @@ static void brw_commit(struct ptlrpc_request *req) * Extents in the list must be in OES_RPC state. */ int osc_build_rpc(const struct lu_env *env, struct client_obd *cli, - struct list_head *ext_list, int cmd, pdl_policy_t pol) + struct list_head *ext_list, int cmd) { struct ptlrpc_request *req = NULL; struct osc_extent *ext; @@ -1783,18 +1676,17 @@ int osc_build_rpc(const struct lu_env *env, struct client_obd *cli, struct osc_brw_async_args *aa = NULL; struct obdo *oa = NULL; struct osc_async_page *oap; - struct osc_async_page *tmp; - struct cl_req *clerq = NULL; - enum cl_req_type crt = (cmd & OBD_BRW_WRITE) ? CRT_WRITE : - CRT_READ; + struct osc_object *obj = NULL; struct cl_req_attr *crattr = NULL; - obd_off starting_offset = OBD_OBJECT_EOF; - obd_off ending_offset = 0; + loff_t starting_offset = OBD_OBJECT_EOF; + loff_t ending_offset = 0; int mpflag = 0; int mem_tight = 0; int page_count = 0; bool soft_sync = false; + bool interrupted = false; int i; + int grant = 0; int rc; struct list_head rpc_list = LIST_HEAD_INIT(rpc_list); struct ost_body *body; @@ -1805,30 +1697,16 @@ int osc_build_rpc(const struct lu_env *env, struct client_obd *cli, list_for_each_entry(ext, ext_list, oe_link) { LASSERT(ext->oe_state == OES_RPC); mem_tight |= ext->oe_memalloc; - list_for_each_entry(oap, &ext->oe_pages, oap_pending_item) { - ++page_count; - list_add_tail(&oap->oap_rpc_item, &rpc_list); - if (starting_offset > oap->oap_obj_off) - starting_offset = oap->oap_obj_off; - else - LASSERT(oap->oap_page_off == 0); - if (ending_offset < oap->oap_obj_off + oap->oap_count) - ending_offset = oap->oap_obj_off + - oap->oap_count; - else - LASSERT(oap->oap_page_off + oap->oap_count == - PAGE_CACHE_SIZE); - } + grant += ext->oe_grants; + page_count += ext->oe_nr_pages; + if (obj == NULL) + obj = ext->oe_obj; } soft_sync = osc_over_unstable_soft_limit(cli); if (mem_tight) mpflag = cfs_memory_pressure_get_and_set(); - OBD_ALLOC(crattr, sizeof(*crattr)); - if (crattr == NULL) - GOTO(out, rc = -ENOMEM); - OBD_ALLOC(pga, sizeof(*pga) * page_count); if (pga == NULL) GOTO(out, rc = -ENOMEM); @@ -1838,41 +1716,49 @@ int osc_build_rpc(const struct lu_env *env, struct client_obd *cli, GOTO(out, rc = -ENOMEM); i = 0; - list_for_each_entry(oap, &rpc_list, oap_rpc_item) { - struct cl_page *page = oap2cl_page(oap); - if (clerq == NULL) { - clerq = cl_req_alloc(env, page, crt, - 1 /* only 1-object rpcs for now */); - if (IS_ERR(clerq)) - GOTO(out, rc = PTR_ERR(clerq)); + list_for_each_entry(ext, ext_list, oe_link) { + list_for_each_entry(oap, &ext->oe_pages, oap_pending_item) { + if (mem_tight) + oap->oap_brw_flags |= OBD_BRW_MEMALLOC; + if (soft_sync) + oap->oap_brw_flags |= OBD_BRW_SOFT_SYNC; + pga[i] = &oap->oap_brw_page; + pga[i]->off = oap->oap_obj_off + oap->oap_page_off; + i++; + + list_add_tail(&oap->oap_rpc_item, &rpc_list); + if (starting_offset == OBD_OBJECT_EOF || + starting_offset > oap->oap_obj_off) + starting_offset = oap->oap_obj_off; + else + LASSERT(oap->oap_page_off == 0); + if (ending_offset < oap->oap_obj_off + oap->oap_count) + ending_offset = oap->oap_obj_off + + oap->oap_count; + else + LASSERT(oap->oap_page_off + oap->oap_count == + PAGE_CACHE_SIZE); + if (oap->oap_interrupted) + interrupted = true; } - if (mem_tight) - oap->oap_brw_flags |= OBD_BRW_MEMALLOC; - if (soft_sync) - oap->oap_brw_flags |= OBD_BRW_SOFT_SYNC; - pga[i] = &oap->oap_brw_page; - pga[i]->off = oap->oap_obj_off + oap->oap_page_off; - CDEBUG(0, "put page %p index %lu oap %p flg %x to pga\n", - pga[i]->pg, page_index(oap->oap_page), oap, - pga[i]->flag); - i++; - cl_req_page_add(env, clerq, page); } - /* always get the data for the obdo for the rpc */ - LASSERT(clerq != NULL); + /* first page in the list */ + oap = list_entry(rpc_list.next, typeof(*oap), oap_rpc_item); + + crattr = &osc_env_info(env)->oti_req_attr; + memset(crattr, 0, sizeof(*crattr)); + crattr->cra_type = (cmd & OBD_BRW_WRITE) ? CRT_WRITE : CRT_READ; + crattr->cra_flags = ~0ULL; + crattr->cra_page = oap2cl_page(oap); crattr->cra_oa = oa; - cl_req_attr_set(env, clerq, crattr, ~0ULL); + cl_req_attr_set(env, osc2cl(obj), crattr); - rc = cl_req_prep(env, clerq); - if (rc != 0) { - CERROR("cl_req_prep failed: %d\n", rc); - GOTO(out, rc); - } + if (cmd == OBD_BRW_WRITE) + oa->o_grant_used = grant; sort_brw_pages(pga, page_count); - rc = osc_brw_prep_request(cmd, cli, oa, NULL, page_count, - pga, &req, crattr->cra_capa, 1, 0); + rc = osc_brw_prep_request(cmd, cli, oa, page_count, pga, &req, 0); if (rc != 0) { CERROR("prep_req failed: %d\n", rc); GOTO(out, rc); @@ -1880,9 +1766,10 @@ int osc_build_rpc(const struct lu_env *env, struct client_obd *cli, req->rq_commit_cb = brw_commit; req->rq_interpret_reply = brw_interpret; - - if (mem_tight != 0) - req->rq_memalloc = 1; + req->rq_memalloc = mem_tight != 0; + oap->oap_request = ptlrpc_request_addref(req); + if (interrupted && !req->rq_intr) + ptlrpc_mark_interrupted(req); /* Need to update the timestamps after the request is built in case * we race with setattr (locally or in queue at OST). If OST gets @@ -1891,9 +1778,8 @@ int osc_build_rpc(const struct lu_env *env, struct client_obd *cli, * way to do this in a single call. bug 10150 */ body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY); crattr->cra_oa = &body->oa; - cl_req_attr_set(env, clerq, crattr, - OBD_MD_FLMTIME|OBD_MD_FLCTIME|OBD_MD_FLATIME); - + crattr->cra_flags = OBD_MD_FLMTIME|OBD_MD_FLCTIME|OBD_MD_FLATIME; + cl_req_attr_set(env, osc2cl(obj), crattr); lustre_msg_set_jobid(req->rq_reqmsg, crattr->cra_jobid); CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args)); @@ -1902,23 +1788,6 @@ int osc_build_rpc(const struct lu_env *env, struct client_obd *cli, list_splice_init(&rpc_list, &aa->aa_oaps); INIT_LIST_HEAD(&aa->aa_exts); list_splice_init(ext_list, &aa->aa_exts); - aa->aa_clerq = clerq; - - /* queued sync pages can be torn down while the pages - * were between the pending list and the rpc */ - tmp = NULL; - list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) { - /* only one oap gets a request reference */ - if (tmp == NULL) - tmp = oap; - if (oap->oap_interrupted && !req->rq_intr) { - CDEBUG(D_INODE, "oap %p in req %p interrupted\n", - oap, req); - ptlrpc_mark_interrupted(req); - } - } - if (tmp != NULL) - tmp->oap_request = ptlrpc_request_addref(req); spin_lock(&cli->cl_loi_list_lock); starting_offset >>= PAGE_CACHE_SHIFT; @@ -1941,19 +1810,7 @@ int osc_build_rpc(const struct lu_env *env, struct client_obd *cli, page_count, aa, cli->cl_r_in_flight, cli->cl_w_in_flight); - /* XXX: Maybe the caller can check the RPC bulk descriptor to - * see which CPU/NUMA node the majority of pages were allocated - * on, and try to assign the async RPC to the CPU core - * (PDL_POLICY_PREFERRED) to reduce cross-CPU memory traffic. - * - * But on the other hand, we expect that multiple ptlrpcd - * threads and the initial write sponsor can run in parallel, - * especially when data checksum is enabled, which is CPU-bound - * operation and single ptlrpcd thread cannot process in time. - * So more ptlrpcd threads sharing BRW load - * (with PDL_POLICY_ROUND) seems better. - */ - ptlrpcd_add_req(req, pol, -1); + ptlrpcd_add_req(req); rc = 0; EXIT; @@ -1961,11 +1818,6 @@ out: if (mem_tight != 0) cfs_memory_pressure_restore(mpflag); - if (crattr != NULL) { - capa_put(crattr->cra_capa); - OBD_FREE(crattr, sizeof(*crattr)); - } - if (rc != 0) { LASSERT(req == NULL); @@ -1981,8 +1833,6 @@ out: list_del_init(&ext->oe_link); osc_extent_finish(env, ext, 0, rc); } - if (clerq && !IS_ERR(clerq)) - cl_req_completion(env, clerq, rc); } RETURN(rc); } @@ -2026,40 +1876,9 @@ static int osc_set_data_with_check(struct lustre_handle *lockh, return set; } -static int osc_change_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm, - ldlm_iterator_t replace, void *data) -{ - struct ldlm_res_id res_id; - struct obd_device *obd = class_exp2obd(exp); - - ostid_build_res_name(&lsm->lsm_oi, &res_id); - ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data); - return 0; -} - -/* find any ldlm lock of the inode in osc - * return 0 not find - * 1 find one - * < 0 error */ -static int osc_find_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm, - ldlm_iterator_t replace, void *data) -{ - struct ldlm_res_id res_id; - struct obd_device *obd = class_exp2obd(exp); - int rc = 0; - - ostid_build_res_name(&lsm->lsm_oi, &res_id); - rc = ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data); - if (rc == LDLM_ITER_STOP) - return(1); - if (rc == LDLM_ITER_CONTINUE) - return(0); - return(rc); -} - static int osc_enqueue_fini(struct ptlrpc_request *req, osc_enqueue_upcall_f upcall, void *cookie, - struct lustre_handle *lockh, ldlm_mode_t mode, + struct lustre_handle *lockh, enum ldlm_mode mode, __u64 *flags, int agl, int errcode) { bool intent = *flags & LDLM_FL_HAS_INTENT; @@ -2096,12 +1915,12 @@ static int osc_enqueue_fini(struct ptlrpc_request *req, } static int osc_enqueue_interpret(const struct lu_env *env, - struct ptlrpc_request *req, - struct osc_enqueue_args *aa, int rc) + struct ptlrpc_request *req, + struct osc_enqueue_args *aa, int rc) { struct ldlm_lock *lock; struct lustre_handle *lockh = &aa->oa_lockh; - ldlm_mode_t mode = aa->oa_mode; + enum ldlm_mode mode = aa->oa_mode; struct ost_lvb *lvb = aa->oa_lvb; __u32 lvb_len = sizeof(*lvb); __u64 flags = 0; @@ -2158,7 +1977,7 @@ struct ptlrpc_request_set *PTLRPCD_SET = (void *)1; * is evicted from the cluster -- such scenarious make the life difficult, so * release locks just after they are obtained. */ int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id, - __u64 *flags, ldlm_policy_data_t *policy, + __u64 *flags, union ldlm_policy_data *policy, struct ost_lvb *lvb, int kms_valid, osc_enqueue_upcall_f upcall, void *cookie, struct ldlm_enqueue_info *einfo, @@ -2169,14 +1988,14 @@ int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id, struct ptlrpc_request *req = NULL; int intent = *flags & LDLM_FL_HAS_INTENT; __u64 match_lvb = agl ? 0 : LDLM_FL_LVB_READY; - ldlm_mode_t mode; + enum ldlm_mode mode; int rc; ENTRY; /* Filesystem lock extents are extended to page boundaries so that * dealing with the page cache is a little smoother. */ - policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK; - policy->l_extent.end |= ~CFS_PAGE_MASK; + policy->l_extent.start -= policy->l_extent.start & ~PAGE_MASK; + policy->l_extent.end |= ~PAGE_MASK; /* * kms is not valid when either object is completely fresh (so that no @@ -2234,7 +2053,7 @@ int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id, } no_match: - if (*flags & LDLM_FL_TEST_LOCK) + if (*flags & (LDLM_FL_TEST_LOCK | LDLM_FL_MATCH_LOCK)) RETURN(-ENOLCK); if (intent) { @@ -2243,8 +2062,8 @@ no_match: if (req == NULL) RETURN(-ENOMEM); - rc = ptlrpc_request_pack(req, LUSTRE_DLM_VERSION, LDLM_ENQUEUE); - if (rc < 0) { + rc = ldlm_prep_enqueue_req(exp, req, NULL, 0); + if (rc) { ptlrpc_request_free(req); RETURN(rc); } @@ -2282,17 +2101,17 @@ no_match: aa->oa_flags = NULL; } - req->rq_interpret_reply = - (ptlrpc_interpterer_t)osc_enqueue_interpret; - if (rqset == PTLRPCD_SET) - ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1); - else - ptlrpc_set_add_req(rqset, req); - } else if (intent) { - ptlrpc_req_finished(req); - } - RETURN(rc); - } + req->rq_interpret_reply = + (ptlrpc_interpterer_t)osc_enqueue_interpret; + if (rqset == PTLRPCD_SET) + ptlrpcd_add_req(req); + else + ptlrpc_set_add_req(rqset, req); + } else if (intent) { + ptlrpc_req_finished(req); + } + RETURN(rc); + } rc = osc_enqueue_fini(req, upcall, cookie, &lockh, einfo->ei_mode, flags, agl, rc); @@ -2303,22 +2122,22 @@ no_match: } int osc_match_base(struct obd_export *exp, struct ldlm_res_id *res_id, - __u32 type, ldlm_policy_data_t *policy, __u32 mode, - __u64 *flags, void *data, struct lustre_handle *lockh, - int unref) + enum ldlm_type type, union ldlm_policy_data *policy, + enum ldlm_mode mode, __u64 *flags, void *data, + struct lustre_handle *lockh, int unref) { struct obd_device *obd = exp->exp_obd; __u64 lflags = *flags; - ldlm_mode_t rc; + enum ldlm_mode rc; ENTRY; - if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH)) - RETURN(-EIO); + if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH)) + RETURN(-EIO); - /* Filesystem lock extents are extended to page boundaries so that - * dealing with the page cache is a little smoother */ - policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK; - policy->l_extent.end |= ~CFS_PAGE_MASK; + /* Filesystem lock extents are extended to page boundaries so that + * dealing with the page cache is a little smoother */ + policy->l_extent.start -= policy->l_extent.start & ~PAGE_MASK; + policy->l_extent.end |= ~PAGE_MASK; /* Next, search for already existing extent locks that will cover us */ /* If we're trying to read, we also search for an existing PW lock. The @@ -2346,18 +2165,6 @@ int osc_match_base(struct obd_export *exp, struct ldlm_res_id *res_id, RETURN(rc); } -int osc_cancel_base(struct lustre_handle *lockh, __u32 mode) -{ - ENTRY; - - if (unlikely(mode == LCK_GROUP)) - ldlm_lock_decref_and_cancel(lockh, mode); - else - ldlm_lock_decref(lockh, mode); - - RETURN(0); -} - static int osc_statfs_interpret(const struct lu_env *env, struct ptlrpc_request *req, struct osc_async_args *aa, int rc) @@ -2500,7 +2307,7 @@ static int osc_statfs(const struct lu_env *env, struct obd_export *exp, } static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len, - void *karg, void *uarg) + void *karg, void __user *uarg) { struct obd_device *obd = exp->exp_obd; struct obd_ioctl_data *data = karg; @@ -2523,9 +2330,6 @@ static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len, err = ptlrpc_set_import_active(obd->u.cli.cl_import, data->ioc_offset); GOTO(out, err); - case OBD_IOC_POLL_QUOTACHECK: - err = osc_quota_poll_check(exp, (struct if_quotacheck *)karg); - GOTO(out, err); case OBD_IOC_PING_TARGET: err = ptlrpc_obd_ping(obd); GOTO(out, err); @@ -2539,104 +2343,10 @@ out: return err; } -static int osc_get_info(const struct lu_env *env, struct obd_export *exp, - obd_count keylen, void *key, __u32 *vallen, void *val, - struct lov_stripe_md *lsm) -{ - ENTRY; - if (!vallen || !val) - RETURN(-EFAULT); - - if (KEY_IS(KEY_FIEMAP)) { - struct ll_fiemap_info_key *fm_key = - (struct ll_fiemap_info_key *)key; - struct ldlm_res_id res_id; - ldlm_policy_data_t policy; - struct lustre_handle lockh; - ldlm_mode_t mode = 0; - struct ptlrpc_request *req; - struct ll_user_fiemap *reply; - char *tmp; - int rc; - - if (!(fm_key->fiemap.fm_flags & FIEMAP_FLAG_SYNC)) - goto skip_locking; - - policy.l_extent.start = fm_key->fiemap.fm_start & - CFS_PAGE_MASK; - - if (OBD_OBJECT_EOF - fm_key->fiemap.fm_length <= - fm_key->fiemap.fm_start + PAGE_CACHE_SIZE - 1) - policy.l_extent.end = OBD_OBJECT_EOF; - else - policy.l_extent.end = (fm_key->fiemap.fm_start + - fm_key->fiemap.fm_length + - PAGE_CACHE_SIZE - 1) & CFS_PAGE_MASK; - - ostid_build_res_name(&fm_key->oa.o_oi, &res_id); - mode = ldlm_lock_match(exp->exp_obd->obd_namespace, - LDLM_FL_BLOCK_GRANTED | - LDLM_FL_LVB_READY, - &res_id, LDLM_EXTENT, &policy, - LCK_PR | LCK_PW, &lockh, 0); - if (mode) { /* lock is cached on client */ - if (mode != LCK_PR) { - ldlm_lock_addref(&lockh, LCK_PR); - ldlm_lock_decref(&lockh, LCK_PW); - } - } else { /* no cached lock, needs acquire lock on server side */ - fm_key->oa.o_valid |= OBD_MD_FLFLAGS; - fm_key->oa.o_flags |= OBD_FL_SRVLOCK; - } - -skip_locking: - req = ptlrpc_request_alloc(class_exp2cliimp(exp), - &RQF_OST_GET_INFO_FIEMAP); - if (req == NULL) - GOTO(drop_lock, rc = -ENOMEM); - - req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_KEY, - RCL_CLIENT, keylen); - req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL, - RCL_CLIENT, *vallen); - req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL, - RCL_SERVER, *vallen); - - rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO); - if (rc) { - ptlrpc_request_free(req); - GOTO(drop_lock, rc); - } - - tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_KEY); - memcpy(tmp, key, keylen); - tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_VAL); - memcpy(tmp, val, *vallen); - - ptlrpc_request_set_replen(req); - rc = ptlrpc_queue_wait(req); - if (rc) - GOTO(fini_req, rc); - - reply = req_capsule_server_get(&req->rq_pill, &RMF_FIEMAP_VAL); - if (reply == NULL) - GOTO(fini_req, rc = -EPROTO); - - memcpy(val, reply, *vallen); -fini_req: - ptlrpc_req_finished(req); -drop_lock: - if (mode) - ldlm_lock_decref(&lockh, LCK_PR); - RETURN(rc); - } - - RETURN(-EINVAL); -} - static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp, - obd_count keylen, void *key, obd_count vallen, - void *val, struct ptlrpc_request_set *set) + u32 keylen, void *key, + u32 vallen, void *val, + struct ptlrpc_request_set *set) { struct ptlrpc_request *req; struct obd_device *obd = exp->exp_obd; @@ -2669,7 +2379,7 @@ static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp, LASSERT(cli->cl_cache == NULL); /* only once */ cli->cl_cache = (struct cl_client_cache *)val; - atomic_inc(&cli->cl_cache->ccc_users); + cl_cache_incref(cli->cl_cache); cli->cl_lru_left = &cli->cl_cache->ccc_lru_left; /* add this osc into entity list */ @@ -2741,15 +2451,16 @@ static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp, req->rq_interpret_reply = osc_shrink_grant_interpret; } - ptlrpc_request_set_replen(req); - if (!KEY_IS(KEY_GRANT_SHRINK)) { - LASSERT(set != NULL); - ptlrpc_set_add_req(set, req); - ptlrpc_check_set(NULL, set); - } else - ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1); + ptlrpc_request_set_replen(req); + if (!KEY_IS(KEY_GRANT_SHRINK)) { + LASSERT(set != NULL); + ptlrpc_set_add_req(set, req); + ptlrpc_check_set(NULL, set); + } else { + ptlrpcd_add_req(req); + } - RETURN(0); + RETURN(0); } static int osc_reconnect(const struct lu_env *env, @@ -2762,11 +2473,15 @@ static int osc_reconnect(const struct lu_env *env, if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) { long lost_grant; + long grant; spin_lock(&cli->cl_loi_list_lock); - data->ocd_grant = (cli->cl_avail_grant + - (cli->cl_dirty_pages << PAGE_CACHE_SHIFT)) ?: - 2 * cli_brw_size(obd); + grant = cli->cl_avail_grant + cli->cl_reserved_grant; + if (data->ocd_connect_flags & OBD_CONNECT_GRANT_PARAM) + grant += cli->cl_dirty_grant; + else + grant += cli->cl_dirty_pages << PAGE_CACHE_SHIFT; + data->ocd_grant = grant ? : 2 * cli_brw_size(obd); lost_grant = cli->cl_lost_grant; cli->cl_lost_grant = 0; spin_unlock(&cli->cl_loi_list_lock); @@ -2807,6 +2522,37 @@ static int osc_disconnect(struct obd_export *exp) return rc; } +static int osc_ldlm_resource_invalidate(struct cfs_hash *hs, + struct cfs_hash_bd *bd, struct hlist_node *hnode, void *arg) +{ + struct lu_env *env = arg; + struct ldlm_resource *res = cfs_hash_object(hs, hnode); + struct ldlm_lock *lock; + struct osc_object *osc = NULL; + ENTRY; + + lock_res(res); + list_for_each_entry(lock, &res->lr_granted, l_res_link) { + if (lock->l_ast_data != NULL && osc == NULL) { + osc = lock->l_ast_data; + cl_object_get(osc2cl(osc)); + } + + /* clear LDLM_FL_CLEANED flag to make sure it will be canceled + * by the 2nd round of ldlm_namespace_clean() call in + * osc_import_event(). */ + ldlm_clear_cleaned(lock); + } + unlock_res(res); + + if (osc != NULL) { + osc_object_invalidate(env, osc); + cl_object_put(env, osc2cl(osc)); + } + + RETURN(0); +} + static int osc_import_event(struct obd_device *obd, struct obd_import *imp, enum obd_import_event event) @@ -2833,18 +2579,20 @@ static int osc_import_event(struct obd_device *obd, case IMP_EVENT_INVALIDATE: { struct ldlm_namespace *ns = obd->obd_namespace; struct lu_env *env; - int refcheck; + __u16 refcheck; + + ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY); env = cl_env_get(&refcheck); if (!IS_ERR(env)) { - /* Reset grants */ - cli = &obd->u.cli; - /* all pages go to failing rpcs due to the invalid - * import */ - osc_io_unplug(env, cli, NULL, PDL_POLICY_ROUND); - - ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY); - cl_env_put(env, &refcheck); + osc_io_unplug(env, &obd->u.cli, NULL); + + cfs_hash_for_each_nolock(ns->ns_rs_hash, + osc_ldlm_resource_invalidate, + env, 0); + cl_env_put(env, &refcheck); + + ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY); } else rc = PTR_ERR(env); break; @@ -2907,7 +2655,7 @@ static int brw_queue_work(const struct lu_env *env, void *data) CDEBUG(D_CACHE, "Run writeback work for client obd %p.\n", cli); - osc_io_unplug(env, cli, NULL, PDL_POLICY_SAME); + osc_io_unplug(env, cli, NULL); RETURN(0); } @@ -2917,6 +2665,9 @@ int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg) struct obd_type *type; void *handler; int rc; + int adding; + int added; + int req_count; ENTRY; rc = ptlrpcd_addref(); @@ -2943,7 +2694,7 @@ int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg) cli->cl_grant_shrink_interval = GRANT_SHRINK_INTERVAL; -#ifdef LPROCFS +#ifdef CONFIG_PROC_FS obd->obd_vars = lprocfs_osc_obd_vars; #endif /* If this is true then both client (osc) and server (osp) are on the @@ -2952,9 +2703,9 @@ int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg) * tree to type->typ_procsym instead of obd->obd_type->typ_procroot. */ type = class_search_type(LUSTRE_OSP_NAME); if (type && type->typ_procsym) { - obd->obd_proc_entry = lprocfs_seq_register(obd->obd_name, - type->typ_procsym, - obd->obd_vars, obd); + obd->obd_proc_entry = lprocfs_register(obd->obd_name, + type->typ_procsym, + obd->obd_vars, obd); if (IS_ERR(obd->obd_proc_entry)) { rc = PTR_ERR(obd->obd_proc_entry); CERROR("error %d setting up lprocfs for %s\n", rc, @@ -2973,18 +2724,28 @@ int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg) ptlrpc_lprocfs_register_obd(obd); } - /* We need to allocate a few requests more, because - * brw_interpret tries to create new requests before freeing - * previous ones, Ideally we want to have 2x max_rpcs_in_flight - * reserved, but I'm afraid that might be too much wasted RAM - * in fact, so 2 is just my guess and still should work. */ - cli->cl_import->imp_rq_pool = - ptlrpc_init_rq_pool(cli->cl_max_rpcs_in_flight + 2, - OST_MAXREQSIZE, - ptlrpc_add_rqs_to_pool); + /* + * We try to control the total number of requests with a upper limit + * osc_reqpool_maxreqcount. There might be some race which will cause + * over-limit allocation, but it is fine. + */ + req_count = atomic_read(&osc_pool_req_count); + if (req_count < osc_reqpool_maxreqcount) { + adding = cli->cl_max_rpcs_in_flight + 2; + if (req_count + adding > osc_reqpool_maxreqcount) + adding = osc_reqpool_maxreqcount - req_count; + + added = ptlrpc_add_rqs_to_pool(osc_rq_pool, adding); + atomic_add(added, &osc_pool_req_count); + } INIT_LIST_HEAD(&cli->cl_grant_shrink_list); ns_register_cancel(obd->obd_namespace, osc_cancel_weight); + + spin_lock(&osc_shrink_lock); + list_add_tail(&cli->cl_shrink_list, &osc_shrink_list); + spin_unlock(&osc_shrink_lock); + RETURN(0); out_ptlrpcd_work: @@ -3003,50 +2764,35 @@ out_ptlrpcd: RETURN(rc); } -static int osc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage) +static int osc_precleanup(struct obd_device *obd) { - int rc = 0; - ENTRY; + struct client_obd *cli = &obd->u.cli; + ENTRY; - switch (stage) { - case OBD_CLEANUP_EARLY: { - struct obd_import *imp; - imp = obd->u.cli.cl_import; - CDEBUG(D_HA, "Deactivating import %s\n", obd->obd_name); - /* ptlrpc_abort_inflight to stop an mds_lov_synchronize */ - ptlrpc_deactivate_import(imp); - spin_lock(&imp->imp_lock); - imp->imp_pingable = 0; - spin_unlock(&imp->imp_lock); - break; - } - case OBD_CLEANUP_EXPORTS: { - struct client_obd *cli = &obd->u.cli; - /* LU-464 - * for echo client, export may be on zombie list, wait for - * zombie thread to cull it, because cli.cl_import will be - * cleared in client_disconnect_export(): - * class_export_destroy() -> obd_cleanup() -> - * echo_device_free() -> echo_client_cleanup() -> - * obd_disconnect() -> osc_disconnect() -> - * client_disconnect_export() - */ - obd_zombie_barrier(); - if (cli->cl_writeback_work) { - ptlrpcd_destroy_work(cli->cl_writeback_work); - cli->cl_writeback_work = NULL; - } - if (cli->cl_lru_work) { - ptlrpcd_destroy_work(cli->cl_lru_work); - cli->cl_lru_work = NULL; - } - obd_cleanup_client_import(obd); - ptlrpc_lprocfs_unregister_obd(obd); - lprocfs_obd_cleanup(obd); - break; - } - } - RETURN(rc); + /* LU-464 + * for echo client, export may be on zombie list, wait for + * zombie thread to cull it, because cli.cl_import will be + * cleared in client_disconnect_export(): + * class_export_destroy() -> obd_cleanup() -> + * echo_device_free() -> echo_client_cleanup() -> + * obd_disconnect() -> osc_disconnect() -> + * client_disconnect_export() + */ + obd_zombie_barrier(); + if (cli->cl_writeback_work) { + ptlrpcd_destroy_work(cli->cl_writeback_work); + cli->cl_writeback_work = NULL; + } + + if (cli->cl_lru_work) { + ptlrpcd_destroy_work(cli->cl_lru_work); + cli->cl_lru_work = NULL; + } + + obd_cleanup_client_import(obd); + ptlrpc_lprocfs_unregister_obd(obd); + lprocfs_obd_cleanup(obd); + RETURN(0); } int osc_cleanup(struct obd_device *obd) @@ -3056,6 +2802,10 @@ int osc_cleanup(struct obd_device *obd) ENTRY; + spin_lock(&osc_shrink_lock); + list_del(&cli->cl_shrink_list); + spin_unlock(&osc_shrink_lock); + /* lru cleanup */ if (cli->cl_cache != NULL) { LASSERT(atomic_read(&cli->cl_cache->ccc_users) > 0); @@ -3063,17 +2813,17 @@ int osc_cleanup(struct obd_device *obd) list_del_init(&cli->cl_lru_osc); spin_unlock(&cli->cl_cache->ccc_lru_lock); cli->cl_lru_left = NULL; - atomic_dec(&cli->cl_cache->ccc_users); + cl_cache_decref(cli->cl_cache); cli->cl_cache = NULL; } - /* free memory of osc quota cache */ - osc_quota_cleanup(obd); + /* free memory of osc quota cache */ + osc_quota_cleanup(obd); - rc = client_obd_cleanup(obd); + rc = client_obd_cleanup(obd); - ptlrpcd_decref(); - RETURN(rc); + ptlrpcd_decref(); + RETURN(rc); } int osc_process_config_base(struct obd_device *obd, struct lustre_cfg *lcfg) @@ -3102,33 +2852,52 @@ static struct obd_ops osc_obd_ops = { .o_create = osc_create, .o_destroy = osc_destroy, .o_getattr = osc_getattr, - .o_getattr_async = osc_getattr_async, .o_setattr = osc_setattr, - .o_setattr_async = osc_setattr_async, - .o_change_cbdata = osc_change_cbdata, - .o_find_cbdata = osc_find_cbdata, .o_iocontrol = osc_iocontrol, - .o_get_info = osc_get_info, .o_set_info_async = osc_set_info_async, .o_import_event = osc_import_event, .o_process_config = osc_process_config, .o_quotactl = osc_quotactl, - .o_quotacheck = osc_quotacheck, }; +static struct shrinker *osc_cache_shrinker; +struct list_head osc_shrink_list = LIST_HEAD_INIT(osc_shrink_list); +DEFINE_SPINLOCK(osc_shrink_lock); + +#ifndef HAVE_SHRINKER_COUNT +static int osc_cache_shrink(SHRINKER_ARGS(sc, nr_to_scan, gfp_mask)) +{ + struct shrink_control scv = { + .nr_to_scan = shrink_param(sc, nr_to_scan), + .gfp_mask = shrink_param(sc, gfp_mask) + }; +#if !defined(HAVE_SHRINKER_WANT_SHRINK_PTR) && !defined(HAVE_SHRINK_CONTROL) + struct shrinker *shrinker = NULL; +#endif + + (void)osc_cache_shrink_scan(shrinker, &scv); + + return osc_cache_shrink_count(shrinker, &scv); +} +#endif + static int __init osc_init(void) { bool enable_proc = true; struct obd_type *type; + unsigned int reqpool_size; + unsigned int reqsize; int rc; + DEF_SHRINKER_VAR(osc_shvar, osc_cache_shrink, + osc_cache_shrink_count, osc_cache_shrink_scan); ENTRY; - /* print an address of _any_ initialized kernel symbol from this - * module, to allow debugging with gdb that doesn't support data - * symbols from modules.*/ - CDEBUG(D_INFO, "Lustre OSC module (%p).\n", &osc_caches); + /* print an address of _any_ initialized kernel symbol from this + * module, to allow debugging with gdb that doesn't support data + * symbols from modules.*/ + CDEBUG(D_INFO, "Lustre OSC module (%p).\n", &osc_caches); - rc = lu_kmem_init(osc_caches); + rc = lu_kmem_init(osc_caches); if (rc) RETURN(rc); @@ -3138,22 +2907,56 @@ static int __init osc_init(void) rc = class_register_type(&osc_obd_ops, NULL, enable_proc, NULL, LUSTRE_OSC_NAME, &osc_device_type); - if (rc) { - lu_kmem_fini(osc_caches); - RETURN(rc); - } + if (rc) + GOTO(out_kmem, rc); + osc_cache_shrinker = set_shrinker(DEFAULT_SEEKS, &osc_shvar); + + /* This is obviously too much memory, only prevent overflow here */ + if (osc_reqpool_mem_max >= 1 << 12 || osc_reqpool_mem_max == 0) + GOTO(out_type, rc = -EINVAL); + + reqpool_size = osc_reqpool_mem_max << 20; + + reqsize = 1; + while (reqsize < OST_IO_MAXREQSIZE) + reqsize = reqsize << 1; + + /* + * We don't enlarge the request count in OSC pool according to + * cl_max_rpcs_in_flight. The allocation from the pool will only be + * tried after normal allocation failed. So a small OSC pool won't + * cause much performance degression in most of cases. + */ + osc_reqpool_maxreqcount = reqpool_size / reqsize; + + atomic_set(&osc_pool_req_count, 0); + osc_rq_pool = ptlrpc_init_rq_pool(0, OST_IO_MAXREQSIZE, + ptlrpc_add_rqs_to_pool); + + if (osc_rq_pool != NULL) + GOTO(out, rc); + rc = -ENOMEM; +out_type: + class_unregister_type(LUSTRE_OSC_NAME); +out_kmem: + lu_kmem_fini(osc_caches); +out: RETURN(rc); } -static void /*__exit*/ osc_exit(void) +static void __exit osc_exit(void) { + remove_shrinker(osc_cache_shrinker); class_unregister_type(LUSTRE_OSC_NAME); lu_kmem_fini(osc_caches); + ptlrpc_free_rq_pool(osc_rq_pool); } -MODULE_AUTHOR("Sun Microsystems, Inc. "); +MODULE_AUTHOR("OpenSFS, Inc. "); MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)"); +MODULE_VERSION(LUSTRE_VERSION_STRING); MODULE_LICENSE("GPL"); -cfs_module(osc, LUSTRE_VERSION_STRING, osc_init, osc_exit); +module_init(osc_init); +module_exit(osc_exit);