X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=blobdiff_plain;f=lustre%2Fosc%2Fosc_request.c;h=42d132dba3a10589befd9fb91c0a9cba0cb3a831;hp=d0fa025d6a23c1ca9caca7a63ac2582b99c3ae0d;hb=8f01f8b51d114b0d2d54a5ab7db3161782e52447;hpb=3e6c20afa18a64c5cb949ecf2ed0f49202ba3e15 diff --git a/lustre/osc/osc_request.c b/lustre/osc/osc_request.c index d0fa025..42d132d 100644 --- a/lustre/osc/osc_request.c +++ b/lustre/osc/osc_request.c @@ -49,9 +49,19 @@ #include #include #include +#include +#include #include "osc_internal.h" #include "osc_cl_internal.h" +atomic_t osc_pool_req_count; +unsigned int osc_reqpool_maxreqcount; +struct ptlrpc_request_pool *osc_rq_pool; + +/* max memory used for request pool, unit is MB */ +static unsigned int osc_reqpool_mem_max = 5; +module_param(osc_reqpool_mem_max, uint, 0444); + struct osc_brw_async_args { struct obdo *aa_oa; int aa_requested_nob; @@ -62,7 +72,6 @@ struct osc_brw_async_args { struct client_obd *aa_cli; struct list_head aa_oaps; struct list_head aa_exts; - struct obd_capa *aa_ocapa; struct cl_req *aa_clerq; }; @@ -75,15 +84,16 @@ struct osc_setattr_args { }; struct osc_fsync_args { - struct obd_info *fa_oi; - obd_enqueue_update_f fa_upcall; + struct osc_object *fa_obj; + struct obdo *fa_oa; + obd_enqueue_update_f fa_upcall; void *fa_cookie; }; struct osc_enqueue_args { struct obd_export *oa_exp; - ldlm_type_t oa_type; - ldlm_mode_t oa_mode; + enum ldlm_type oa_type; + enum ldlm_mode oa_mode; __u64 *oa_flags; osc_enqueue_upcall_f oa_upcall; void *oa_cookie; @@ -96,158 +106,98 @@ static void osc_release_ppga(struct brw_page **ppga, size_t count); static int brw_interpret(const struct lu_env *env, struct ptlrpc_request *req, void *data, int rc); -static inline void osc_pack_capa(struct ptlrpc_request *req, - struct ost_body *body, void *capa) -{ - struct obd_capa *oc = (struct obd_capa *)capa; - struct lustre_capa *c; - - if (!capa) - return; - - c = req_capsule_client_get(&req->rq_pill, &RMF_CAPA1); - LASSERT(c); - capa_cpy(c, oc); - body->oa.o_valid |= OBD_MD_FLOSSCAPA; - DEBUG_CAPA(D_SEC, c, "pack"); -} - -void osc_pack_req_body(struct ptlrpc_request *req, struct obd_info *oinfo) +void osc_pack_req_body(struct ptlrpc_request *req, struct obdo *oa) { struct ost_body *body; body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY); LASSERT(body); - lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, - oinfo->oi_oa); - osc_pack_capa(req, body, oinfo->oi_capa); -} - -void osc_set_capa_size(struct ptlrpc_request *req, - const struct req_msg_field *field, - struct obd_capa *oc) -{ - if (oc == NULL) - req_capsule_set_size(&req->rq_pill, field, RCL_CLIENT, 0); - else - /* it is already calculated as sizeof struct obd_capa */ - ; -} - -int osc_getattr_interpret(const struct lu_env *env, - struct ptlrpc_request *req, - struct osc_async_args *aa, int rc) -{ - struct ost_body *body; - ENTRY; - - if (rc != 0) - GOTO(out, rc); - - body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY); - if (body) { - CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode); - lustre_get_wire_obdo(&req->rq_import->imp_connect_data, - aa->aa_oi->oi_oa, &body->oa); - - /* This should really be sent by the OST */ - aa->aa_oi->oi_oa->o_blksize = DT_MAX_BRW_SIZE; - aa->aa_oi->oi_oa->o_valid |= OBD_MD_FLBLKSZ; - } else { - CDEBUG(D_INFO, "can't unpack ost_body\n"); - rc = -EPROTO; - aa->aa_oi->oi_oa->o_valid = 0; - } -out: - rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc); - RETURN(rc); + lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa); } static int osc_getattr(const struct lu_env *env, struct obd_export *exp, - struct obd_info *oinfo) + struct obdo *oa) { - struct ptlrpc_request *req; - struct ost_body *body; - int rc; - ENTRY; + struct ptlrpc_request *req; + struct ost_body *body; + int rc; - req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR); - if (req == NULL) - RETURN(-ENOMEM); + ENTRY; + req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR); + if (req == NULL) + RETURN(-ENOMEM); - osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa); - rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR); - if (rc) { - ptlrpc_request_free(req); - RETURN(rc); - } + rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR); + if (rc) { + ptlrpc_request_free(req); + RETURN(rc); + } - osc_pack_req_body(req, oinfo); + osc_pack_req_body(req, oa); - ptlrpc_request_set_replen(req); + ptlrpc_request_set_replen(req); - rc = ptlrpc_queue_wait(req); - if (rc) - GOTO(out, rc); + rc = ptlrpc_queue_wait(req); + if (rc) + GOTO(out, rc); - body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY); - if (body == NULL) - GOTO(out, rc = -EPROTO); + body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY); + if (body == NULL) + GOTO(out, rc = -EPROTO); CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode); - lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oinfo->oi_oa, - &body->oa); + lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa); - oinfo->oi_oa->o_blksize = cli_brw_size(exp->exp_obd); - oinfo->oi_oa->o_valid |= OBD_MD_FLBLKSZ; + oa->o_blksize = cli_brw_size(exp->exp_obd); + oa->o_valid |= OBD_MD_FLBLKSZ; - EXIT; - out: - ptlrpc_req_finished(req); - return rc; + EXIT; +out: + ptlrpc_req_finished(req); + + return rc; } static int osc_setattr(const struct lu_env *env, struct obd_export *exp, - struct obd_info *oinfo, struct obd_trans_info *oti) + struct obdo *oa) { - struct ptlrpc_request *req; - struct ost_body *body; - int rc; - ENTRY; + struct ptlrpc_request *req; + struct ost_body *body; + int rc; - LASSERT(oinfo->oi_oa->o_valid & OBD_MD_FLGROUP); + ENTRY; + LASSERT(oa->o_valid & OBD_MD_FLGROUP); - req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR); - if (req == NULL) - RETURN(-ENOMEM); + req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR); + if (req == NULL) + RETURN(-ENOMEM); - osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa); - rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR); - if (rc) { - ptlrpc_request_free(req); - RETURN(rc); - } + rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR); + if (rc) { + ptlrpc_request_free(req); + RETURN(rc); + } - osc_pack_req_body(req, oinfo); + osc_pack_req_body(req, oa); - ptlrpc_request_set_replen(req); + ptlrpc_request_set_replen(req); - rc = ptlrpc_queue_wait(req); - if (rc) - GOTO(out, rc); + rc = ptlrpc_queue_wait(req); + if (rc) + GOTO(out, rc); - body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY); - if (body == NULL) - GOTO(out, rc = -EPROTO); + body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY); + if (body == NULL) + GOTO(out, rc = -EPROTO); - lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oinfo->oi_oa, - &body->oa); + lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa); - EXIT; + EXIT; out: - ptlrpc_req_finished(req); - RETURN(rc); + ptlrpc_req_finished(req); + + RETURN(rc); } static int osc_setattr_interpret(const struct lu_env *env, @@ -271,55 +221,55 @@ out: RETURN(rc); } -int osc_setattr_async(struct obd_export *exp, struct obd_info *oinfo, +int osc_setattr_async(struct obd_export *exp, struct obdo *oa, obd_enqueue_update_f upcall, void *cookie, struct ptlrpc_request_set *rqset) { - struct ptlrpc_request *req; - struct osc_setattr_args *sa; - int rc; - ENTRY; + struct ptlrpc_request *req; + struct osc_setattr_args *sa; + int rc; - req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR); - if (req == NULL) - RETURN(-ENOMEM); + ENTRY; - osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa); - rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR); - if (rc) { - ptlrpc_request_free(req); - RETURN(rc); - } + req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR); + if (req == NULL) + RETURN(-ENOMEM); - osc_pack_req_body(req, oinfo); + rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR); + if (rc) { + ptlrpc_request_free(req); + RETURN(rc); + } - ptlrpc_request_set_replen(req); + osc_pack_req_body(req, oa); - /* do mds to ost setattr asynchronously */ - if (!rqset) { - /* Do not wait for response. */ - ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1); - } else { - req->rq_interpret_reply = - (ptlrpc_interpterer_t)osc_setattr_interpret; - - CLASSERT (sizeof(*sa) <= sizeof(req->rq_async_args)); - sa = ptlrpc_req_async_args(req); - sa->sa_oa = oinfo->oi_oa; - sa->sa_upcall = upcall; - sa->sa_cookie = cookie; - - if (rqset == PTLRPCD_SET) - ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1); - else - ptlrpc_set_add_req(rqset, req); - } + ptlrpc_request_set_replen(req); - RETURN(0); + /* do mds to ost setattr asynchronously */ + if (!rqset) { + /* Do not wait for response. */ + ptlrpcd_add_req(req); + } else { + req->rq_interpret_reply = + (ptlrpc_interpterer_t)osc_setattr_interpret; + + CLASSERT(sizeof(*sa) <= sizeof(req->rq_async_args)); + sa = ptlrpc_req_async_args(req); + sa->sa_oa = oa; + sa->sa_upcall = upcall; + sa->sa_cookie = cookie; + + if (rqset == PTLRPCD_SET) + ptlrpcd_add_req(req); + else + ptlrpc_set_add_req(rqset, req); + } + + RETURN(0); } static int osc_create(const struct lu_env *env, struct obd_export *exp, - struct obdo *oa, struct obd_trans_info *oti) + struct obdo *oa) { struct ptlrpc_request *req; struct ost_body *body; @@ -347,14 +297,6 @@ static int osc_create(const struct lu_env *env, struct obd_export *exp, ptlrpc_request_set_replen(req); - if ((oa->o_valid & OBD_MD_FLFLAGS) && - oa->o_flags == OBD_FL_DELORPHAN) { - DEBUG_REQ(D_HA, req, - "delorphan from OST integration"); - /* Don't resend the delorphan req */ - req->rq_no_resend = req->rq_no_delay = 1; - } - rc = ptlrpc_queue_wait(req); if (rc) GOTO(out_req, rc); @@ -369,24 +311,15 @@ static int osc_create(const struct lu_env *env, struct obd_export *exp, oa->o_blksize = cli_brw_size(exp->exp_obd); oa->o_valid |= OBD_MD_FLBLKSZ; - if (oti != NULL) { - if (oa->o_valid & OBD_MD_FLCOOKIE) { - if (oti->oti_logcookies == NULL) - oti->oti_logcookies = &oti->oti_onecookie; - - *oti->oti_logcookies = oa->o_lcookie; - } - } - - CDEBUG(D_HA, "transno: "LPD64"\n", - lustre_msg_get_transno(req->rq_repmsg)); + CDEBUG(D_HA, "transno: "LPD64"\n", + lustre_msg_get_transno(req->rq_repmsg)); out_req: - ptlrpc_req_finished(req); + ptlrpc_req_finished(req); out: RETURN(rc); } -int osc_punch_base(struct obd_export *exp, struct obd_info *oinfo, +int osc_punch_base(struct obd_export *exp, struct obdo *oa, obd_enqueue_update_f upcall, void *cookie, struct ptlrpc_request_set *rqset) { @@ -400,7 +333,6 @@ int osc_punch_base(struct obd_export *exp, struct obd_info *oinfo, if (req == NULL) RETURN(-ENOMEM); - osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa); rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH); if (rc) { ptlrpc_request_free(req); @@ -411,53 +343,68 @@ int osc_punch_base(struct obd_export *exp, struct obd_info *oinfo, body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY); LASSERT(body); - lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, - oinfo->oi_oa); - osc_pack_capa(req, body, oinfo->oi_capa); + lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa); - ptlrpc_request_set_replen(req); + ptlrpc_request_set_replen(req); - req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_setattr_interpret; - CLASSERT (sizeof(*sa) <= sizeof(req->rq_async_args)); - sa = ptlrpc_req_async_args(req); - sa->sa_oa = oinfo->oi_oa; - sa->sa_upcall = upcall; - sa->sa_cookie = cookie; - if (rqset == PTLRPCD_SET) - ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1); - else - ptlrpc_set_add_req(rqset, req); + req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_setattr_interpret; + CLASSERT(sizeof(*sa) <= sizeof(req->rq_async_args)); + sa = ptlrpc_req_async_args(req); + sa->sa_oa = oa; + sa->sa_upcall = upcall; + sa->sa_cookie = cookie; + if (rqset == PTLRPCD_SET) + ptlrpcd_add_req(req); + else + ptlrpc_set_add_req(rqset, req); - RETURN(0); + RETURN(0); } static int osc_sync_interpret(const struct lu_env *env, struct ptlrpc_request *req, void *arg, int rc) { - struct osc_fsync_args *fa = arg; - struct ost_body *body; - ENTRY; + struct osc_fsync_args *fa = arg; + struct ost_body *body; + struct cl_attr *attr = &osc_env_info(env)->oti_attr; + unsigned long valid = 0; + struct cl_object *obj; + ENTRY; - if (rc) - GOTO(out, rc); + if (rc != 0) + GOTO(out, rc); - body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY); - if (body == NULL) { - CERROR ("can't unpack ost_body\n"); - GOTO(out, rc = -EPROTO); - } + body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY); + if (body == NULL) { + CERROR("can't unpack ost_body\n"); + GOTO(out, rc = -EPROTO); + } + + *fa->fa_oa = body->oa; + obj = osc2cl(fa->fa_obj); + + /* Update osc object's blocks attribute */ + cl_object_attr_lock(obj); + if (body->oa.o_valid & OBD_MD_FLBLOCKS) { + attr->cat_blocks = body->oa.o_blocks; + valid |= CAT_BLOCKS; + } + + if (valid != 0) + cl_object_attr_update(env, obj, attr, valid); + cl_object_attr_unlock(obj); - *fa->fa_oi->oi_oa = body->oa; out: rc = fa->fa_upcall(fa->fa_cookie, rc); RETURN(rc); } -int osc_sync_base(struct obd_export *exp, struct obd_info *oinfo, +int osc_sync_base(struct osc_object *obj, struct obdo *oa, obd_enqueue_update_f upcall, void *cookie, struct ptlrpc_request_set *rqset) { + struct obd_export *exp = osc_export(obj); struct ptlrpc_request *req; struct ost_body *body; struct osc_fsync_args *fa; @@ -468,7 +415,6 @@ int osc_sync_base(struct obd_export *exp, struct obd_info *oinfo, if (req == NULL) RETURN(-ENOMEM); - osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa); rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC); if (rc) { ptlrpc_request_free(req); @@ -478,21 +424,20 @@ int osc_sync_base(struct obd_export *exp, struct obd_info *oinfo, /* overload the size and blocks fields in the oa with start/end */ body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY); LASSERT(body); - lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, - oinfo->oi_oa); - osc_pack_capa(req, body, oinfo->oi_capa); + lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa); - ptlrpc_request_set_replen(req); - req->rq_interpret_reply = osc_sync_interpret; + ptlrpc_request_set_replen(req); + req->rq_interpret_reply = osc_sync_interpret; CLASSERT(sizeof(*fa) <= sizeof(req->rq_async_args)); fa = ptlrpc_req_async_args(req); - fa->fa_oi = oinfo; + fa->fa_obj = obj; + fa->fa_oa = oa; fa->fa_upcall = upcall; fa->fa_cookie = cookie; if (rqset == PTLRPCD_SET) - ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1); + ptlrpcd_add_req(req); else ptlrpc_set_add_req(rqset, req); @@ -504,13 +449,13 @@ int osc_sync_base(struct obd_export *exp, struct obd_info *oinfo, * locks added to @cancels list. */ static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa, struct list_head *cancels, - ldlm_mode_t mode, __u64 lock_flags) + enum ldlm_mode mode, __u64 lock_flags) { - struct ldlm_namespace *ns = exp->exp_obd->obd_namespace; - struct ldlm_res_id res_id; - struct ldlm_resource *res; - int count; - ENTRY; + struct ldlm_namespace *ns = exp->exp_obd->obd_namespace; + struct ldlm_res_id res_id; + struct ldlm_resource *res; + int count; + ENTRY; /* Return, i.e. cancel nothing, only if ELC is supported (flag in * export) but disabled through procfs (flag in NS). @@ -563,18 +508,8 @@ static int osc_can_send_destroy(struct client_obd *cli) return 0; } -/* Destroy requests can be async always on the client, and we don't even really - * care about the return code since the client cannot do anything at all about - * a destroy failure. - * When the MDS is unlinking a filename, it saves the file objects into a - * recovery llog, and these object records are cancelled when the OST reports - * they were destroyed and sync'd to disk (i.e. transaction committed). - * If the client dies, or the OST is down when the object should be destroyed, - * the records are not cancelled, and when the OST reconnects to the MDS next, - * it will retrieve the llog unlink logs and then sends the log cancellation - * cookies to the MDS after committing destroy transactions. */ static int osc_destroy(const struct lu_env *env, struct obd_export *exp, - struct obdo *oa, struct obd_trans_info *oti) + struct obdo *oa) { struct client_obd *cli = &exp->exp_obd->u.cli; struct ptlrpc_request *req; @@ -597,7 +532,6 @@ static int osc_destroy(const struct lu_env *env, struct obd_export *exp, RETURN(-ENOMEM); } - osc_set_capa_size(req, &RMF_CAPA1, NULL); rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY, 0, &cancels, count); if (rc) { @@ -608,36 +542,27 @@ static int osc_destroy(const struct lu_env *env, struct obd_export *exp, req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */ ptlrpc_at_set_req_timeout(req); - if (oti != NULL && oa->o_valid & OBD_MD_FLCOOKIE) - oa->o_lcookie = *oti->oti_logcookies; body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY); LASSERT(body); lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa); ptlrpc_request_set_replen(req); - /* If osc_destory is for destroying the unlink orphan, - * sent from MDT to OST, which should not be blocked here, - * because the process might be triggered by ptlrpcd, and - * it is not good to block ptlrpcd thread (b=16006)*/ - if (!(oa->o_flags & OBD_FL_DELORPHAN)) { - req->rq_interpret_reply = osc_destroy_interpret; - if (!osc_can_send_destroy(cli)) { - struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, - NULL); - - /* - * Wait until the number of on-going destroy RPCs drops - * under max_rpc_in_flight - */ - l_wait_event_exclusive(cli->cl_destroy_waitq, - osc_can_send_destroy(cli), &lwi); - } - } + req->rq_interpret_reply = osc_destroy_interpret; + if (!osc_can_send_destroy(cli)) { + struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL); - /* Do not wait for response */ - ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1); - RETURN(0); + /* + * Wait until the number of on-going destroy RPCs drops + * under max_rpc_in_flight + */ + l_wait_event_exclusive(cli->cl_destroy_waitq, + osc_can_send_destroy(cli), &lwi); + } + + /* Do not wait for response */ + ptlrpcd_add_req(req); + RETURN(0); } static void osc_announce_cached(struct client_obd *cli, struct obdo *oa, @@ -658,13 +583,12 @@ static void osc_announce_cached(struct client_obd *cli, struct obdo *oa, oa->o_undirty = 0; } else if (unlikely(atomic_long_read(&obd_dirty_pages) - atomic_long_read(&obd_dirty_transit_pages) > - (obd_max_dirty_pages + 1))) { + (long)(obd_max_dirty_pages + 1))) { /* The atomic_read() allowing the atomic_inc() are * not covered by a lock thus they may safely race and trip * this CERROR() unless we add in a small fudge factor (+1). */ - CERROR("%s: dirty %ld - %ld > system dirty_max %lu\n", - cli->cl_import->imp_obd->obd_name, - atomic_long_read(&obd_dirty_pages), + CERROR("%s: dirty %ld - %ld > system dirty_max %ld\n", + cli_name(cli), atomic_long_read(&obd_dirty_pages), atomic_long_read(&obd_dirty_transit_pages), obd_max_dirty_pages); oa->o_undirty = 0; @@ -851,21 +775,19 @@ static int osc_grant_shrink_grant_cb(struct timeout_item *item, void *data) static int osc_add_shrink_grant(struct client_obd *client) { - int rc; + int rc; - rc = ptlrpc_add_timeout_client(client->cl_grant_shrink_interval, - TIMEOUT_GRANT, - osc_grant_shrink_grant_cb, NULL, - &client->cl_grant_shrink_list); - if (rc) { - CERROR("add grant client %s error %d\n", - client->cl_import->imp_obd->obd_name, rc); - return rc; - } - CDEBUG(D_CACHE, "add grant client %s \n", - client->cl_import->imp_obd->obd_name); - osc_update_next_shrink(client); - return 0; + rc = ptlrpc_add_timeout_client(client->cl_grant_shrink_interval, + TIMEOUT_GRANT, + osc_grant_shrink_grant_cb, NULL, + &client->cl_grant_shrink_list); + if (rc) { + CERROR("add grant client %s error %d\n", cli_name(client), rc); + return rc; + } + CDEBUG(D_CACHE, "add grant client %s\n", cli_name(client)); + osc_update_next_shrink(client); + return 0; } static int osc_del_shrink_grant(struct client_obd *client) @@ -894,7 +816,7 @@ static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd) if (cli->cl_avail_grant < 0) { CWARN("%s: available grant < 0: avail/ocd/dirty %ld/%u/%ld\n", - cli->cl_import->imp_obd->obd_name, cli->cl_avail_grant, + cli_name(cli), cli->cl_avail_grant, ocd->ocd_grant, cli->cl_dirty_pages << PAGE_CACHE_SHIFT); /* workaround for servers which do not have the patch from * LU-2679 */ @@ -906,8 +828,8 @@ static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd) spin_unlock(&cli->cl_loi_list_lock); CDEBUG(D_CACHE, "%s, setting cl_avail_grant: %ld cl_lost_grant: %ld." - "chunk bits: %d.\n", cli->cl_import->imp_obd->obd_name, - cli->cl_avail_grant, cli->cl_lost_grant, cli->cl_chunkbits); + "chunk bits: %d.\n", cli_name(cli), cli->cl_avail_grant, + cli->cl_lost_grant, cli->cl_chunkbits); if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT_SHRINK && list_empty(&cli->cl_grant_shrink_list)) @@ -931,7 +853,7 @@ static void handle_short_read(int nob_read, size_t page_count, if (pga[i]->count > nob_read) { /* EOF inside this page */ ptr = kmap(pga[i]->pg) + - (pga[i]->off & ~CFS_PAGE_MASK); + (pga[i]->off & ~PAGE_MASK); memset(ptr + nob_read, 0, pga[i]->count - nob_read); kunmap(pga[i]->pg); page_count--; @@ -946,7 +868,7 @@ static void handle_short_read(int nob_read, size_t page_count, /* zero remaining pages */ while (page_count-- > 0) { - ptr = kmap(pga[i]->pg) + (pga[i]->off & ~CFS_PAGE_MASK); + ptr = kmap(pga[i]->pg) + (pga[i]->off & ~PAGE_MASK); memset(ptr, 0, pga[i]->count); kunmap(pga[i]->pg); i++; @@ -1037,16 +959,16 @@ static u32 osc_checksum_bulk(int nob, size_t pg_count, if (i == 0 && opc == OST_READ && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE)) { unsigned char *ptr = kmap(pga[i]->pg); - int off = pga[i]->off & ~CFS_PAGE_MASK; + int off = pga[i]->off & ~PAGE_MASK; memcpy(ptr + off, "bad1", min_t(typeof(nob), 4, nob)); kunmap(pga[i]->pg); } cfs_crypto_hash_update_page(hdesc, pga[i]->pg, - pga[i]->off & ~CFS_PAGE_MASK, + pga[i]->off & ~PAGE_MASK, count); LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d\n", - (int)(pga[i]->off & ~CFS_PAGE_MASK)); + (int)(pga[i]->off & ~PAGE_MASK)); nob -= pga[i]->count; pg_count--; @@ -1064,12 +986,10 @@ static u32 osc_checksum_bulk(int nob, size_t pg_count, return cksum; } -static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa, - struct lov_stripe_md *lsm, u32 page_count, - struct brw_page **pga, - struct ptlrpc_request **reqp, - struct obd_capa *ocapa, int reserve, - int resend) +static int +osc_brw_prep_request(int cmd, struct client_obd *cli, struct obdo *oa, + u32 page_count, struct brw_page **pga, + struct ptlrpc_request **reqp, int resend) { struct ptlrpc_request *req; struct ptlrpc_bulk_desc *desc; @@ -1087,15 +1007,15 @@ static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa, if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2)) RETURN(-EINVAL); /* Fatal */ - if ((cmd & OBD_BRW_WRITE) != 0) { - opc = OST_WRITE; - req = ptlrpc_request_alloc_pool(cli->cl_import, - cli->cl_import->imp_rq_pool, - &RQF_OST_BRW_WRITE); - } else { - opc = OST_READ; - req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW_READ); - } + if ((cmd & OBD_BRW_WRITE) != 0) { + opc = OST_WRITE; + req = ptlrpc_request_alloc_pool(cli->cl_import, + osc_rq_pool, + &RQF_OST_BRW_WRITE); + } else { + opc = OST_READ; + req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW_READ); + } if (req == NULL) RETURN(-ENOMEM); @@ -1109,7 +1029,6 @@ static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa, sizeof(*ioobj)); req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT, niocount * sizeof(*niobuf)); - osc_set_capa_size(req, &RMF_CAPA1, ocapa); rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc); if (rc) { @@ -1124,8 +1043,11 @@ static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa, desc = ptlrpc_prep_bulk_imp(req, page_count, cli->cl_import->imp_connect_data.ocd_brw_size >> LNET_MTU_BITS, - opc == OST_WRITE ? BULK_GET_SOURCE : BULK_PUT_SINK, - OST_BULK_PORTAL); + (opc == OST_WRITE ? PTLRPC_BULK_GET_SOURCE : + PTLRPC_BULK_PUT_SINK) | + PTLRPC_BULK_BUF_KIOV, + OST_BULK_PORTAL, + &ptlrpc_bulk_kiov_pin_ops); if (desc == NULL) GOTO(out, rc = -ENOMEM); @@ -1146,12 +1068,11 @@ static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa, * "max - 1" for old client compatibility sending "0", and also so the * the actual maximum is a power-of-two number, not one less. LU-1431 */ ioobj_max_brw_set(ioobj, desc->bd_md_max_brw); - osc_pack_capa(req, body, ocapa); LASSERT(page_count > 0); pg_prev = pga[0]; for (requested_nob = i = 0; i < page_count; i++, niobuf++) { struct brw_page *pg = pga[i]; - int poff = pg->off & ~CFS_PAGE_MASK; + int poff = pg->off & ~PAGE_MASK; LASSERT(pg->count > 0); /* make sure there is no gap in the middle of page array */ @@ -1172,7 +1093,7 @@ static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa, LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) == (pg->flag & OBD_BRW_SRVLOCK)); - ptlrpc_prep_bulk_page_pin(desc, pg->pg, poff, pg->count); + desc->bd_frag_ops->add_kiov_frag(desc, pg->pg, poff, pg->count); requested_nob += pg->count; if (i > 0 && can_merge_pages(pg_prev, pg)) { @@ -1256,8 +1177,6 @@ static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa, aa->aa_ppga = pga; aa->aa_cli = cli; INIT_LIST_HEAD(&aa->aa_oaps); - if (ocapa && reserve) - aa->aa_ocapa = capa_get(ocapa); *reqp = req; niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE); @@ -1477,12 +1396,10 @@ static int osc_brw_redo_request(struct ptlrpc_request *request, DEBUG_REQ(rc == -EINPROGRESS ? D_RPCTRACE : D_ERROR, request, "redo for recoverable error %d", rc); - rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) == - OST_WRITE ? OBD_BRW_WRITE :OBD_BRW_READ, - aa->aa_cli, aa->aa_oa, - NULL /* lsm unused by osc currently */, - aa->aa_page_count, aa->aa_ppga, - &new_req, aa->aa_ocapa, 0, 1); + rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) == + OST_WRITE ? OBD_BRW_WRITE : OBD_BRW_READ, + aa->aa_cli, aa->aa_oa, aa->aa_page_count, + aa->aa_ppga, &new_req, 1); if (rc) RETURN(rc); @@ -1527,14 +1444,11 @@ static int osc_brw_redo_request(struct ptlrpc_request *request, } } - new_aa->aa_ocapa = aa->aa_ocapa; - aa->aa_ocapa = NULL; - /* XXX: This code will run into problem if we're going to support * to add a series of BRW RPCs into a self-defined ptlrpc_request_set * and wait for all of them to be finished. We should inherit request * set from old request. */ - ptlrpcd_add_req(new_req, PDL_POLICY_SAME, -1); + ptlrpcd_add_req(new_req); DEBUG_REQ(D_INFO, new_req, "new request"); RETURN(0); @@ -1613,11 +1527,6 @@ static int brw_interpret(const struct lu_env *env, rc = -EIO; } - if (aa->aa_ocapa) { - capa_put(aa->aa_ocapa); - aa->aa_ocapa = NULL; - } - if (rc == 0) { struct obdo *oa = aa->aa_oa; struct cl_attr *attr = &osc_env_info(env)->oti_attr; @@ -1697,7 +1606,7 @@ static int brw_interpret(const struct lu_env *env, osc_wake_cache_waiters(cli); spin_unlock(&cli->cl_loi_list_lock); - osc_io_unplug(env, cli, NULL, PDL_POLICY_SAME); + osc_io_unplug(env, cli, NULL); RETURN(rc); } @@ -1725,7 +1634,7 @@ static void brw_commit(struct ptlrpc_request *req) * Extents in the list must be in OES_RPC state. */ int osc_build_rpc(const struct lu_env *env, struct client_obd *cli, - struct list_head *ext_list, int cmd, pdl_policy_t pol) + struct list_head *ext_list, int cmd) { struct ptlrpc_request *req = NULL; struct osc_extent *ext; @@ -1822,8 +1731,7 @@ int osc_build_rpc(const struct lu_env *env, struct client_obd *cli, } sort_brw_pages(pga, page_count); - rc = osc_brw_prep_request(cmd, cli, oa, NULL, page_count, - pga, &req, crattr->cra_capa, 1, 0); + rc = osc_brw_prep_request(cmd, cli, oa, page_count, pga, &req, 0); if (rc != 0) { CERROR("prep_req failed: %d\n", rc); GOTO(out, rc); @@ -1892,19 +1800,7 @@ int osc_build_rpc(const struct lu_env *env, struct client_obd *cli, page_count, aa, cli->cl_r_in_flight, cli->cl_w_in_flight); - /* XXX: Maybe the caller can check the RPC bulk descriptor to - * see which CPU/NUMA node the majority of pages were allocated - * on, and try to assign the async RPC to the CPU core - * (PDL_POLICY_PREFERRED) to reduce cross-CPU memory traffic. - * - * But on the other hand, we expect that multiple ptlrpcd - * threads and the initial write sponsor can run in parallel, - * especially when data checksum is enabled, which is CPU-bound - * operation and single ptlrpcd thread cannot process in time. - * So more ptlrpcd threads sharing BRW load - * (with PDL_POLICY_ROUND) seems better. - */ - ptlrpcd_add_req(req, pol, -1); + ptlrpcd_add_req(req); rc = 0; EXIT; @@ -1912,10 +1808,8 @@ out: if (mem_tight != 0) cfs_memory_pressure_restore(mpflag); - if (crattr != NULL) { - capa_put(crattr->cra_capa); + if (crattr != NULL) OBD_FREE(crattr, sizeof(*crattr)); - } if (rc != 0) { LASSERT(req == NULL); @@ -1979,7 +1873,7 @@ static int osc_set_data_with_check(struct lustre_handle *lockh, static int osc_enqueue_fini(struct ptlrpc_request *req, osc_enqueue_upcall_f upcall, void *cookie, - struct lustre_handle *lockh, ldlm_mode_t mode, + struct lustre_handle *lockh, enum ldlm_mode mode, __u64 *flags, int agl, int errcode) { bool intent = *flags & LDLM_FL_HAS_INTENT; @@ -2016,12 +1910,12 @@ static int osc_enqueue_fini(struct ptlrpc_request *req, } static int osc_enqueue_interpret(const struct lu_env *env, - struct ptlrpc_request *req, - struct osc_enqueue_args *aa, int rc) + struct ptlrpc_request *req, + struct osc_enqueue_args *aa, int rc) { struct ldlm_lock *lock; struct lustre_handle *lockh = &aa->oa_lockh; - ldlm_mode_t mode = aa->oa_mode; + enum ldlm_mode mode = aa->oa_mode; struct ost_lvb *lvb = aa->oa_lvb; __u32 lvb_len = sizeof(*lvb); __u64 flags = 0; @@ -2078,7 +1972,7 @@ struct ptlrpc_request_set *PTLRPCD_SET = (void *)1; * is evicted from the cluster -- such scenarious make the life difficult, so * release locks just after they are obtained. */ int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id, - __u64 *flags, ldlm_policy_data_t *policy, + __u64 *flags, union ldlm_policy_data *policy, struct ost_lvb *lvb, int kms_valid, osc_enqueue_upcall_f upcall, void *cookie, struct ldlm_enqueue_info *einfo, @@ -2089,14 +1983,14 @@ int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id, struct ptlrpc_request *req = NULL; int intent = *flags & LDLM_FL_HAS_INTENT; __u64 match_lvb = agl ? 0 : LDLM_FL_LVB_READY; - ldlm_mode_t mode; + enum ldlm_mode mode; int rc; ENTRY; /* Filesystem lock extents are extended to page boundaries so that * dealing with the page cache is a little smoother. */ - policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK; - policy->l_extent.end |= ~CFS_PAGE_MASK; + policy->l_extent.start -= policy->l_extent.start & ~PAGE_MASK; + policy->l_extent.end |= ~PAGE_MASK; /* * kms is not valid when either object is completely fresh (so that no @@ -2163,8 +2057,8 @@ no_match: if (req == NULL) RETURN(-ENOMEM); - rc = ptlrpc_request_pack(req, LUSTRE_DLM_VERSION, LDLM_ENQUEUE); - if (rc < 0) { + rc = ldlm_prep_enqueue_req(exp, req, NULL, 0); + if (rc) { ptlrpc_request_free(req); RETURN(rc); } @@ -2202,17 +2096,17 @@ no_match: aa->oa_flags = NULL; } - req->rq_interpret_reply = - (ptlrpc_interpterer_t)osc_enqueue_interpret; - if (rqset == PTLRPCD_SET) - ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1); - else - ptlrpc_set_add_req(rqset, req); - } else if (intent) { - ptlrpc_req_finished(req); - } - RETURN(rc); - } + req->rq_interpret_reply = + (ptlrpc_interpterer_t)osc_enqueue_interpret; + if (rqset == PTLRPCD_SET) + ptlrpcd_add_req(req); + else + ptlrpc_set_add_req(rqset, req); + } else if (intent) { + ptlrpc_req_finished(req); + } + RETURN(rc); + } rc = osc_enqueue_fini(req, upcall, cookie, &lockh, einfo->ei_mode, flags, agl, rc); @@ -2223,22 +2117,22 @@ no_match: } int osc_match_base(struct obd_export *exp, struct ldlm_res_id *res_id, - __u32 type, ldlm_policy_data_t *policy, __u32 mode, - __u64 *flags, void *data, struct lustre_handle *lockh, - int unref) + enum ldlm_type type, union ldlm_policy_data *policy, + enum ldlm_mode mode, __u64 *flags, void *data, + struct lustre_handle *lockh, int unref) { struct obd_device *obd = exp->exp_obd; __u64 lflags = *flags; - ldlm_mode_t rc; + enum ldlm_mode rc; ENTRY; - if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH)) - RETURN(-EIO); + if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH)) + RETURN(-EIO); - /* Filesystem lock extents are extended to page boundaries so that - * dealing with the page cache is a little smoother */ - policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK; - policy->l_extent.end |= ~CFS_PAGE_MASK; + /* Filesystem lock extents are extended to page boundaries so that + * dealing with the page cache is a little smoother */ + policy->l_extent.start -= policy->l_extent.start & ~PAGE_MASK; + policy->l_extent.end |= ~PAGE_MASK; /* Next, search for already existing extent locks that will cover us */ /* If we're trying to read, we also search for an existing PW lock. The @@ -2431,9 +2325,6 @@ static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len, err = ptlrpc_set_import_active(obd->u.cli.cl_import, data->ioc_offset); GOTO(out, err); - case OBD_IOC_POLL_QUOTACHECK: - err = osc_quota_poll_check(exp, (struct if_quotacheck *)karg); - GOTO(out, err); case OBD_IOC_PING_TARGET: err = ptlrpc_obd_ping(obd); GOTO(out, err); @@ -2555,15 +2446,16 @@ static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp, req->rq_interpret_reply = osc_shrink_grant_interpret; } - ptlrpc_request_set_replen(req); - if (!KEY_IS(KEY_GRANT_SHRINK)) { - LASSERT(set != NULL); - ptlrpc_set_add_req(set, req); - ptlrpc_check_set(NULL, set); - } else - ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1); + ptlrpc_request_set_replen(req); + if (!KEY_IS(KEY_GRANT_SHRINK)) { + LASSERT(set != NULL); + ptlrpc_set_add_req(set, req); + ptlrpc_check_set(NULL, set); + } else { + ptlrpcd_add_req(req); + } - RETURN(0); + RETURN(0); } static int osc_reconnect(const struct lu_env *env, @@ -2621,6 +2513,33 @@ static int osc_disconnect(struct obd_export *exp) return rc; } +static int osc_ldlm_resource_invalidate(struct cfs_hash *hs, + struct cfs_hash_bd *bd, struct hlist_node *hnode, void *arg) +{ + struct lu_env *env = arg; + struct ldlm_resource *res = cfs_hash_object(hs, hnode); + struct ldlm_lock *lock; + struct osc_object *osc = NULL; + ENTRY; + + lock_res(res); + list_for_each_entry(lock, &res->lr_granted, l_res_link) { + if (lock->l_ast_data != NULL && osc == NULL) { + osc = lock->l_ast_data; + cl_object_get(osc2cl(osc)); + } + lock->l_ast_data = NULL; + } + unlock_res(res); + + if (osc != NULL) { + osc_object_invalidate(env, osc); + cl_object_put(env, osc2cl(osc)); + } + + RETURN(0); +} + static int osc_import_event(struct obd_device *obd, struct obd_import *imp, enum obd_import_event event) @@ -2649,16 +2568,18 @@ static int osc_import_event(struct obd_device *obd, struct lu_env *env; int refcheck; + ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY); + env = cl_env_get(&refcheck); if (!IS_ERR(env)) { - /* Reset grants */ - cli = &obd->u.cli; - /* all pages go to failing rpcs due to the invalid - * import */ - osc_io_unplug(env, cli, NULL, PDL_POLICY_ROUND); - - ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY); - cl_env_put(env, &refcheck); + osc_io_unplug(env, &obd->u.cli, NULL); + + cfs_hash_for_each_nolock(ns->ns_rs_hash, + osc_ldlm_resource_invalidate, + env, 0); + cl_env_put(env, &refcheck); + + ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY); } else rc = PTR_ERR(env); break; @@ -2721,7 +2642,7 @@ static int brw_queue_work(const struct lu_env *env, void *data) CDEBUG(D_CACHE, "Run writeback work for client obd %p.\n", cli); - osc_io_unplug(env, cli, NULL, PDL_POLICY_SAME); + osc_io_unplug(env, cli, NULL); RETURN(0); } @@ -2731,6 +2652,9 @@ int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg) struct obd_type *type; void *handler; int rc; + int adding; + int added; + int req_count; ENTRY; rc = ptlrpcd_addref(); @@ -2787,15 +2711,20 @@ int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg) ptlrpc_lprocfs_register_obd(obd); } - /* We need to allocate a few requests more, because - * brw_interpret tries to create new requests before freeing - * previous ones, Ideally we want to have 2x max_rpcs_in_flight - * reserved, but I'm afraid that might be too much wasted RAM - * in fact, so 2 is just my guess and still should work. */ - cli->cl_import->imp_rq_pool = - ptlrpc_init_rq_pool(cli->cl_max_rpcs_in_flight + 2, - OST_MAXREQSIZE, - ptlrpc_add_rqs_to_pool); + /* + * We try to control the total number of requests with a upper limit + * osc_reqpool_maxreqcount. There might be some race which will cause + * over-limit allocation, but it is fine. + */ + req_count = atomic_read(&osc_pool_req_count); + if (req_count < osc_reqpool_maxreqcount) { + adding = cli->cl_max_rpcs_in_flight + 2; + if (req_count + adding > osc_reqpool_maxreqcount) + adding = osc_reqpool_maxreqcount - req_count; + + added = ptlrpc_add_rqs_to_pool(osc_rq_pool, adding); + atomic_add(added, &osc_pool_req_count); + } INIT_LIST_HEAD(&cli->cl_grant_shrink_list); ns_register_cancel(obd->obd_namespace, osc_cancel_weight); @@ -2882,12 +2811,12 @@ int osc_cleanup(struct obd_device *obd) } /* free memory of osc quota cache */ - osc_quota_cleanup(obd); + osc_quota_cleanup(obd); - rc = client_obd_cleanup(obd); + rc = client_obd_cleanup(obd); - ptlrpcd_decref(); - RETURN(rc); + ptlrpcd_decref(); + RETURN(rc); } int osc_process_config_base(struct obd_device *obd, struct lustre_cfg *lcfg) @@ -2922,14 +2851,16 @@ static struct obd_ops osc_obd_ops = { .o_import_event = osc_import_event, .o_process_config = osc_process_config, .o_quotactl = osc_quotactl, - .o_quotacheck = osc_quotacheck, }; static int __init osc_init(void) { bool enable_proc = true; struct obd_type *type; + unsigned int reqpool_size; + unsigned int reqsize; int rc; + ENTRY; /* print an address of _any_ initialized kernel symbol from this @@ -2947,11 +2878,39 @@ static int __init osc_init(void) rc = class_register_type(&osc_obd_ops, NULL, enable_proc, NULL, LUSTRE_OSC_NAME, &osc_device_type); - if (rc) { - lu_kmem_fini(osc_caches); - RETURN(rc); - } + if (rc) + GOTO(out_kmem, rc); + /* This is obviously too much memory, only prevent overflow here */ + if (osc_reqpool_mem_max >= 1 << 12 || osc_reqpool_mem_max == 0) + GOTO(out_type, rc = -EINVAL); + + reqpool_size = osc_reqpool_mem_max << 20; + + reqsize = 1; + while (reqsize < OST_IO_MAXREQSIZE) + reqsize = reqsize << 1; + + /* + * We don't enlarge the request count in OSC pool according to + * cl_max_rpcs_in_flight. The allocation from the pool will only be + * tried after normal allocation failed. So a small OSC pool won't + * cause much performance degression in most of cases. + */ + osc_reqpool_maxreqcount = reqpool_size / reqsize; + + atomic_set(&osc_pool_req_count, 0); + osc_rq_pool = ptlrpc_init_rq_pool(0, OST_IO_MAXREQSIZE, + ptlrpc_add_rqs_to_pool); + + if (osc_rq_pool != NULL) + GOTO(out, rc); + rc = -ENOMEM; +out_type: + class_unregister_type(LUSTRE_OSC_NAME); +out_kmem: + lu_kmem_fini(osc_caches); +out: RETURN(rc); } @@ -2959,10 +2918,13 @@ static void /*__exit*/ osc_exit(void) { class_unregister_type(LUSTRE_OSC_NAME); lu_kmem_fini(osc_caches); + ptlrpc_free_rq_pool(osc_rq_pool); } MODULE_AUTHOR("Sun Microsystems, Inc. "); MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)"); +MODULE_VERSION(LUSTRE_VERSION_STRING); MODULE_LICENSE("GPL"); -cfs_module(osc, LUSTRE_VERSION_STRING, osc_init, osc_exit); +module_init(osc_init); +module_exit(osc_exit);