X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=blobdiff_plain;f=lustre%2Fosc%2Fosc_request.c;h=f636360dfda9288076eead9f3cd98fd9c2b2fd7e;hp=6575d79627e4ff8feee54f86b3ff0d731dc76134;hb=713174908cb8e5e3ceadd3ca1cb42a88b200e576;hpb=df497dc560062a0a0c7178498cba8853843d39f1 diff --git a/lustre/osc/osc_request.c b/lustre/osc/osc_request.c index 6575d79..f636360 100644 --- a/lustre/osc/osc_request.c +++ b/lustre/osc/osc_request.c @@ -27,7 +27,7 @@ * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved. * Use is subject to license terms. * - * Copyright (c) 2011, 2013, Intel Corporation. + * Copyright (c) 2011, 2015, Intel Corporation. */ /* * This file is part of Lustre, http://www.lustre.org/ @@ -38,49 +38,46 @@ #include -#ifndef __KERNEL__ -# include -#endif - -#include -#include #include -#include -#ifdef __CYGWIN__ -# include -#endif - -#include #include -#include -#include #include -#include +#include #include -#include "osc_internal.h" +#include +#include +#include +#include +#include +#include +#include +#include + #include "osc_cl_internal.h" +#include "osc_internal.h" + +atomic_t osc_pool_req_count; +unsigned int osc_reqpool_maxreqcount; +struct ptlrpc_request_pool *osc_rq_pool; + +/* max memory used for request pool, unit is MB */ +static unsigned int osc_reqpool_mem_max = 5; +module_param(osc_reqpool_mem_max, uint, 0444); struct osc_brw_async_args { struct obdo *aa_oa; int aa_requested_nob; int aa_nio_count; - obd_count aa_page_count; + u32 aa_page_count; int aa_resends; struct brw_page **aa_ppga; struct client_obd *aa_cli; struct list_head aa_oaps; struct list_head aa_exts; - struct obd_capa *aa_ocapa; - struct cl_req *aa_clerq; }; #define osc_grant_args osc_brw_async_args -struct osc_async_args { - struct obd_info *aa_oi; -}; - struct osc_setattr_args { struct obdo *sa_oa; obd_enqueue_update_f sa_upcall; @@ -88,273 +85,120 @@ struct osc_setattr_args { }; struct osc_fsync_args { - struct obd_info *fa_oi; - obd_enqueue_update_f fa_upcall; + struct osc_object *fa_obj; + struct obdo *fa_oa; + obd_enqueue_update_f fa_upcall; void *fa_cookie; }; struct osc_enqueue_args { - struct obd_export *oa_exp; - __u64 *oa_flags; - obd_enqueue_update_f oa_upcall; - void *oa_cookie; - struct ost_lvb *oa_lvb; - struct lustre_handle *oa_lockh; - struct ldlm_enqueue_info *oa_ei; - unsigned int oa_agl:1; + struct obd_export *oa_exp; + enum ldlm_type oa_type; + enum ldlm_mode oa_mode; + __u64 *oa_flags; + osc_enqueue_upcall_f oa_upcall; + void *oa_cookie; + struct ost_lvb *oa_lvb; + struct lustre_handle oa_lockh; + unsigned int oa_agl:1; }; -static void osc_release_ppga(struct brw_page **ppga, obd_count count); +static void osc_release_ppga(struct brw_page **ppga, size_t count); static int brw_interpret(const struct lu_env *env, struct ptlrpc_request *req, void *data, int rc); -/* Unpack OSC object metadata from disk storage (LE byte order). */ -static int osc_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp, - struct lov_mds_md *lmm, int lmm_bytes) -{ - int lsm_size; - struct obd_import *imp = class_exp2cliimp(exp); - ENTRY; - - if (lmm != NULL) { - if (lmm_bytes < sizeof(*lmm)) { - CERROR("%s: lov_mds_md too small: %d, need %d\n", - exp->exp_obd->obd_name, lmm_bytes, - (int)sizeof(*lmm)); - RETURN(-EINVAL); - } - /* XXX LOV_MAGIC etc check? */ - - if (unlikely(ostid_id(&lmm->lmm_oi) == 0)) { - CERROR("%s: zero lmm_object_id: rc = %d\n", - exp->exp_obd->obd_name, -EINVAL); - RETURN(-EINVAL); - } - } - - lsm_size = lov_stripe_md_size(1); - if (lsmp == NULL) - RETURN(lsm_size); - - if (*lsmp != NULL && lmm == NULL) { - OBD_FREE((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo)); - OBD_FREE(*lsmp, lsm_size); - *lsmp = NULL; - RETURN(0); - } - - if (*lsmp == NULL) { - OBD_ALLOC(*lsmp, lsm_size); - if (unlikely(*lsmp == NULL)) - RETURN(-ENOMEM); - OBD_ALLOC((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo)); - if (unlikely((*lsmp)->lsm_oinfo[0] == NULL)) { - OBD_FREE(*lsmp, lsm_size); - RETURN(-ENOMEM); - } - loi_init((*lsmp)->lsm_oinfo[0]); - } else if (unlikely(ostid_id(&(*lsmp)->lsm_oi) == 0)) { - RETURN(-EBADF); - } - - if (lmm != NULL) - /* XXX zero *lsmp? */ - ostid_le_to_cpu(&lmm->lmm_oi, &(*lsmp)->lsm_oi); - - if (imp != NULL && - (imp->imp_connect_data.ocd_connect_flags & OBD_CONNECT_MAXBYTES)) - (*lsmp)->lsm_maxbytes = imp->imp_connect_data.ocd_maxbytes; - else - (*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES; - - RETURN(lsm_size); -} - -static inline void osc_pack_capa(struct ptlrpc_request *req, - struct ost_body *body, void *capa) -{ - struct obd_capa *oc = (struct obd_capa *)capa; - struct lustre_capa *c; - - if (!capa) - return; - - c = req_capsule_client_get(&req->rq_pill, &RMF_CAPA1); - LASSERT(c); - capa_cpy(c, oc); - body->oa.o_valid |= OBD_MD_FLOSSCAPA; - DEBUG_CAPA(D_SEC, c, "pack"); -} - -static inline void osc_pack_req_body(struct ptlrpc_request *req, - struct obd_info *oinfo) +void osc_pack_req_body(struct ptlrpc_request *req, struct obdo *oa) { struct ost_body *body; body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY); LASSERT(body); - lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, - oinfo->oi_oa); - osc_pack_capa(req, body, oinfo->oi_capa); -} - -static inline void osc_set_capa_size(struct ptlrpc_request *req, - const struct req_msg_field *field, - struct obd_capa *oc) -{ - if (oc == NULL) - req_capsule_set_size(&req->rq_pill, field, RCL_CLIENT, 0); - else - /* it is already calculated as sizeof struct obd_capa */ - ; -} - -static int osc_getattr_interpret(const struct lu_env *env, - struct ptlrpc_request *req, - struct osc_async_args *aa, int rc) -{ - struct ost_body *body; - ENTRY; - - if (rc != 0) - GOTO(out, rc); - - body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY); - if (body) { - CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode); - lustre_get_wire_obdo(&req->rq_import->imp_connect_data, - aa->aa_oi->oi_oa, &body->oa); - - /* This should really be sent by the OST */ - aa->aa_oi->oi_oa->o_blksize = DT_MAX_BRW_SIZE; - aa->aa_oi->oi_oa->o_valid |= OBD_MD_FLBLKSZ; - } else { - CDEBUG(D_INFO, "can't unpack ost_body\n"); - rc = -EPROTO; - aa->aa_oi->oi_oa->o_valid = 0; - } -out: - rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc); - RETURN(rc); -} - -static int osc_getattr_async(struct obd_export *exp, struct obd_info *oinfo, - struct ptlrpc_request_set *set) -{ - struct ptlrpc_request *req; - struct osc_async_args *aa; - int rc; - ENTRY; - - req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR); - if (req == NULL) - RETURN(-ENOMEM); - - osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa); - rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR); - if (rc) { - ptlrpc_request_free(req); - RETURN(rc); - } - - osc_pack_req_body(req, oinfo); - - ptlrpc_request_set_replen(req); - req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_getattr_interpret; - - CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args)); - aa = ptlrpc_req_async_args(req); - aa->aa_oi = oinfo; - - ptlrpc_set_add_req(set, req); - RETURN(0); + lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa); } static int osc_getattr(const struct lu_env *env, struct obd_export *exp, - struct obd_info *oinfo) + struct obdo *oa) { - struct ptlrpc_request *req; - struct ost_body *body; - int rc; - ENTRY; + struct ptlrpc_request *req; + struct ost_body *body; + int rc; - req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR); - if (req == NULL) - RETURN(-ENOMEM); + ENTRY; + req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR); + if (req == NULL) + RETURN(-ENOMEM); - osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa); - rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR); - if (rc) { - ptlrpc_request_free(req); - RETURN(rc); - } + rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR); + if (rc) { + ptlrpc_request_free(req); + RETURN(rc); + } - osc_pack_req_body(req, oinfo); + osc_pack_req_body(req, oa); - ptlrpc_request_set_replen(req); + ptlrpc_request_set_replen(req); - rc = ptlrpc_queue_wait(req); - if (rc) - GOTO(out, rc); + rc = ptlrpc_queue_wait(req); + if (rc) + GOTO(out, rc); - body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY); - if (body == NULL) - GOTO(out, rc = -EPROTO); + body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY); + if (body == NULL) + GOTO(out, rc = -EPROTO); CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode); - lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oinfo->oi_oa, - &body->oa); + lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa); - oinfo->oi_oa->o_blksize = cli_brw_size(exp->exp_obd); - oinfo->oi_oa->o_valid |= OBD_MD_FLBLKSZ; + oa->o_blksize = cli_brw_size(exp->exp_obd); + oa->o_valid |= OBD_MD_FLBLKSZ; - EXIT; - out: - ptlrpc_req_finished(req); - return rc; + EXIT; +out: + ptlrpc_req_finished(req); + + return rc; } static int osc_setattr(const struct lu_env *env, struct obd_export *exp, - struct obd_info *oinfo, struct obd_trans_info *oti) + struct obdo *oa) { - struct ptlrpc_request *req; - struct ost_body *body; - int rc; - ENTRY; + struct ptlrpc_request *req; + struct ost_body *body; + int rc; - LASSERT(oinfo->oi_oa->o_valid & OBD_MD_FLGROUP); + ENTRY; + LASSERT(oa->o_valid & OBD_MD_FLGROUP); - req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR); - if (req == NULL) - RETURN(-ENOMEM); + req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR); + if (req == NULL) + RETURN(-ENOMEM); - osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa); - rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR); - if (rc) { - ptlrpc_request_free(req); - RETURN(rc); - } + rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR); + if (rc) { + ptlrpc_request_free(req); + RETURN(rc); + } - osc_pack_req_body(req, oinfo); + osc_pack_req_body(req, oa); - ptlrpc_request_set_replen(req); + ptlrpc_request_set_replen(req); - rc = ptlrpc_queue_wait(req); - if (rc) - GOTO(out, rc); + rc = ptlrpc_queue_wait(req); + if (rc) + GOTO(out, rc); - body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY); - if (body == NULL) - GOTO(out, rc = -EPROTO); + body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY); + if (body == NULL) + GOTO(out, rc = -EPROTO); - lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oinfo->oi_oa, - &body->oa); + lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa); - EXIT; + EXIT; out: - ptlrpc_req_finished(req); - RETURN(rc); + ptlrpc_req_finished(req); + + RETURN(rc); } static int osc_setattr_interpret(const struct lu_env *env, @@ -378,83 +222,64 @@ out: RETURN(rc); } -int osc_setattr_async_base(struct obd_export *exp, struct obd_info *oinfo, - struct obd_trans_info *oti, - obd_enqueue_update_f upcall, void *cookie, - struct ptlrpc_request_set *rqset) +int osc_setattr_async(struct obd_export *exp, struct obdo *oa, + obd_enqueue_update_f upcall, void *cookie, + struct ptlrpc_request_set *rqset) { - struct ptlrpc_request *req; - struct osc_setattr_args *sa; - int rc; - ENTRY; + struct ptlrpc_request *req; + struct osc_setattr_args *sa; + int rc; - req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR); - if (req == NULL) - RETURN(-ENOMEM); + ENTRY; - osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa); - rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR); - if (rc) { - ptlrpc_request_free(req); - RETURN(rc); - } + req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR); + if (req == NULL) + RETURN(-ENOMEM); - if (oti && oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE) - oinfo->oi_oa->o_lcookie = *oti->oti_logcookies; + rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR); + if (rc) { + ptlrpc_request_free(req); + RETURN(rc); + } - osc_pack_req_body(req, oinfo); + osc_pack_req_body(req, oa); - ptlrpc_request_set_replen(req); + ptlrpc_request_set_replen(req); - /* do mds to ost setattr asynchronously */ - if (!rqset) { - /* Do not wait for response. */ - ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1); - } else { - req->rq_interpret_reply = - (ptlrpc_interpterer_t)osc_setattr_interpret; - - CLASSERT (sizeof(*sa) <= sizeof(req->rq_async_args)); - sa = ptlrpc_req_async_args(req); - sa->sa_oa = oinfo->oi_oa; - sa->sa_upcall = upcall; - sa->sa_cookie = cookie; - - if (rqset == PTLRPCD_SET) - ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1); - else - ptlrpc_set_add_req(rqset, req); - } + /* do mds to ost setattr asynchronously */ + if (!rqset) { + /* Do not wait for response. */ + ptlrpcd_add_req(req); + } else { + req->rq_interpret_reply = + (ptlrpc_interpterer_t)osc_setattr_interpret; - RETURN(0); -} + CLASSERT(sizeof(*sa) <= sizeof(req->rq_async_args)); + sa = ptlrpc_req_async_args(req); + sa->sa_oa = oa; + sa->sa_upcall = upcall; + sa->sa_cookie = cookie; -static int osc_setattr_async(struct obd_export *exp, struct obd_info *oinfo, - struct obd_trans_info *oti, - struct ptlrpc_request_set *rqset) -{ - return osc_setattr_async_base(exp, oinfo, oti, - oinfo->oi_cb_up, oinfo, rqset); + if (rqset == PTLRPCD_SET) + ptlrpcd_add_req(req); + else + ptlrpc_set_add_req(rqset, req); + } + + RETURN(0); } -int osc_real_create(struct obd_export *exp, struct obdo *oa, - struct lov_stripe_md **ea, struct obd_trans_info *oti) +static int osc_create(const struct lu_env *env, struct obd_export *exp, + struct obdo *oa) { struct ptlrpc_request *req; struct ost_body *body; - struct lov_stripe_md *lsm; int rc; ENTRY; - LASSERT(oa); - LASSERT(ea); - - lsm = *ea; - if (!lsm) { - rc = obd_alloc_memmd(exp, &lsm); - if (rc < 0) - RETURN(rc); - } + LASSERT(oa != NULL); + LASSERT(oa->o_valid & OBD_MD_FLGROUP); + LASSERT(fid_seq_is_echo(ostid_seq(&oa->o_oi))); req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE); if (req == NULL) @@ -473,14 +298,6 @@ int osc_real_create(struct obd_export *exp, struct obdo *oa, ptlrpc_request_set_replen(req); - if ((oa->o_valid & OBD_MD_FLFLAGS) && - oa->o_flags == OBD_FL_DELORPHAN) { - DEBUG_REQ(D_HA, req, - "delorphan from OST integration"); - /* Don't resend the delorphan req */ - req->rq_no_resend = req->rq_no_delay = 1; - } - rc = ptlrpc_queue_wait(req); if (rc) GOTO(out_req, rc); @@ -495,34 +312,15 @@ int osc_real_create(struct obd_export *exp, struct obdo *oa, oa->o_blksize = cli_brw_size(exp->exp_obd); oa->o_valid |= OBD_MD_FLBLKSZ; - /* XXX LOV STACKING: the lsm that is passed to us from LOV does not - * have valid lsm_oinfo data structs, so don't go touching that. - * This needs to be fixed in a big way. - */ - lsm->lsm_oi = oa->o_oi; - *ea = lsm; - - if (oti != NULL) { - oti->oti_transno = lustre_msg_get_transno(req->rq_repmsg); - - if (oa->o_valid & OBD_MD_FLCOOKIE) { - if (!oti->oti_logcookies) - oti_alloc_cookies(oti, 1); - *oti->oti_logcookies = oa->o_lcookie; - } - } - - CDEBUG(D_HA, "transno: "LPD64"\n", - lustre_msg_get_transno(req->rq_repmsg)); + CDEBUG(D_HA, "transno: "LPD64"\n", + lustre_msg_get_transno(req->rq_repmsg)); out_req: - ptlrpc_req_finished(req); + ptlrpc_req_finished(req); out: - if (rc && !*ea) - obd_free_memmd(exp, &lsm); - RETURN(rc); + RETURN(rc); } -int osc_punch_base(struct obd_export *exp, struct obd_info *oinfo, +int osc_punch_base(struct obd_export *exp, struct obdo *oa, obd_enqueue_update_f upcall, void *cookie, struct ptlrpc_request_set *rqset) { @@ -536,7 +334,6 @@ int osc_punch_base(struct obd_export *exp, struct obd_info *oinfo, if (req == NULL) RETURN(-ENOMEM); - osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa); rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH); if (rc) { ptlrpc_request_free(req); @@ -547,53 +344,68 @@ int osc_punch_base(struct obd_export *exp, struct obd_info *oinfo, body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY); LASSERT(body); - lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, - oinfo->oi_oa); - osc_pack_capa(req, body, oinfo->oi_capa); + lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa); - ptlrpc_request_set_replen(req); + ptlrpc_request_set_replen(req); - req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_setattr_interpret; - CLASSERT (sizeof(*sa) <= sizeof(req->rq_async_args)); - sa = ptlrpc_req_async_args(req); - sa->sa_oa = oinfo->oi_oa; - sa->sa_upcall = upcall; - sa->sa_cookie = cookie; - if (rqset == PTLRPCD_SET) - ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1); - else - ptlrpc_set_add_req(rqset, req); + req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_setattr_interpret; + CLASSERT(sizeof(*sa) <= sizeof(req->rq_async_args)); + sa = ptlrpc_req_async_args(req); + sa->sa_oa = oa; + sa->sa_upcall = upcall; + sa->sa_cookie = cookie; + if (rqset == PTLRPCD_SET) + ptlrpcd_add_req(req); + else + ptlrpc_set_add_req(rqset, req); - RETURN(0); + RETURN(0); } static int osc_sync_interpret(const struct lu_env *env, struct ptlrpc_request *req, void *arg, int rc) { - struct osc_fsync_args *fa = arg; - struct ost_body *body; - ENTRY; + struct osc_fsync_args *fa = arg; + struct ost_body *body; + struct cl_attr *attr = &osc_env_info(env)->oti_attr; + unsigned long valid = 0; + struct cl_object *obj; + ENTRY; - if (rc) - GOTO(out, rc); + if (rc != 0) + GOTO(out, rc); - body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY); - if (body == NULL) { - CERROR ("can't unpack ost_body\n"); - GOTO(out, rc = -EPROTO); - } + body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY); + if (body == NULL) { + CERROR("can't unpack ost_body\n"); + GOTO(out, rc = -EPROTO); + } + + *fa->fa_oa = body->oa; + obj = osc2cl(fa->fa_obj); + + /* Update osc object's blocks attribute */ + cl_object_attr_lock(obj); + if (body->oa.o_valid & OBD_MD_FLBLOCKS) { + attr->cat_blocks = body->oa.o_blocks; + valid |= CAT_BLOCKS; + } + + if (valid != 0) + cl_object_attr_update(env, obj, attr, valid); + cl_object_attr_unlock(obj); - *fa->fa_oi->oi_oa = body->oa; out: rc = fa->fa_upcall(fa->fa_cookie, rc); RETURN(rc); } -int osc_sync_base(struct obd_export *exp, struct obd_info *oinfo, +int osc_sync_base(struct osc_object *obj, struct obdo *oa, obd_enqueue_update_f upcall, void *cookie, struct ptlrpc_request_set *rqset) { + struct obd_export *exp = osc_export(obj); struct ptlrpc_request *req; struct ost_body *body; struct osc_fsync_args *fa; @@ -604,7 +416,6 @@ int osc_sync_base(struct obd_export *exp, struct obd_info *oinfo, if (req == NULL) RETURN(-ENOMEM); - osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa); rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC); if (rc) { ptlrpc_request_free(req); @@ -614,21 +425,20 @@ int osc_sync_base(struct obd_export *exp, struct obd_info *oinfo, /* overload the size and blocks fields in the oa with start/end */ body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY); LASSERT(body); - lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, - oinfo->oi_oa); - osc_pack_capa(req, body, oinfo->oi_capa); + lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa); - ptlrpc_request_set_replen(req); - req->rq_interpret_reply = osc_sync_interpret; + ptlrpc_request_set_replen(req); + req->rq_interpret_reply = osc_sync_interpret; CLASSERT(sizeof(*fa) <= sizeof(req->rq_async_args)); fa = ptlrpc_req_async_args(req); - fa->fa_oi = oinfo; + fa->fa_obj = obj; + fa->fa_oa = oa; fa->fa_upcall = upcall; fa->fa_cookie = cookie; if (rqset == PTLRPCD_SET) - ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1); + ptlrpcd_add_req(req); else ptlrpc_set_add_req(rqset, req); @@ -640,13 +450,13 @@ int osc_sync_base(struct obd_export *exp, struct obd_info *oinfo, * locks added to @cancels list. */ static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa, struct list_head *cancels, - ldlm_mode_t mode, __u64 lock_flags) + enum ldlm_mode mode, __u64 lock_flags) { - struct ldlm_namespace *ns = exp->exp_obd->obd_namespace; - struct ldlm_res_id res_id; - struct ldlm_resource *res; - int count; - ENTRY; + struct ldlm_namespace *ns = exp->exp_obd->obd_namespace; + struct ldlm_res_id res_id; + struct ldlm_resource *res; + int count; + ENTRY; /* Return, i.e. cancel nothing, only if ELC is supported (flag in * export) but disabled through procfs (flag in NS). @@ -659,7 +469,7 @@ static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa, ostid_build_res_name(&oa->o_oi, &res_id); res = ldlm_resource_get(ns, NULL, &res_id, 0, 0); - if (res == NULL) + if (IS_ERR(res)) RETURN(0); LDLM_RESOURCE_ADDREF(res); @@ -699,45 +509,8 @@ static int osc_can_send_destroy(struct client_obd *cli) return 0; } -int osc_create(const struct lu_env *env, struct obd_export *exp, - struct obdo *oa, struct lov_stripe_md **ea, - struct obd_trans_info *oti) -{ - int rc = 0; - ENTRY; - - LASSERT(oa); - LASSERT(ea); - LASSERT(oa->o_valid & OBD_MD_FLGROUP); - - if ((oa->o_valid & OBD_MD_FLFLAGS) && - oa->o_flags == OBD_FL_RECREATE_OBJS) { - RETURN(osc_real_create(exp, oa, ea, oti)); - } - - if (!fid_seq_is_mdt(ostid_seq(&oa->o_oi))) - RETURN(osc_real_create(exp, oa, ea, oti)); - - /* we should not get here anymore */ - LBUG(); - - RETURN(rc); -} - -/* Destroy requests can be async always on the client, and we don't even really - * care about the return code since the client cannot do anything at all about - * a destroy failure. - * When the MDS is unlinking a filename, it saves the file objects into a - * recovery llog, and these object records are cancelled when the OST reports - * they were destroyed and sync'd to disk (i.e. transaction committed). - * If the client dies, or the OST is down when the object should be destroyed, - * the records are not cancelled, and when the OST reconnects to the MDS next, - * it will retrieve the llog unlink logs and then sends the log cancellation - * cookies to the MDS after committing destroy transactions. */ static int osc_destroy(const struct lu_env *env, struct obd_export *exp, - struct obdo *oa, struct lov_stripe_md *ea, - struct obd_trans_info *oti, struct obd_export *md_export, - void *capa) + struct obdo *oa) { struct client_obd *cli = &exp->exp_obd->u.cli; struct ptlrpc_request *req; @@ -760,7 +533,6 @@ static int osc_destroy(const struct lu_env *env, struct obd_export *exp, RETURN(-ENOMEM); } - osc_set_capa_size(req, &RMF_CAPA1, (struct obd_capa *)capa); rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY, 0, &cancels, count); if (rc) { @@ -771,85 +543,88 @@ static int osc_destroy(const struct lu_env *env, struct obd_export *exp, req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */ ptlrpc_at_set_req_timeout(req); - if (oti != NULL && oa->o_valid & OBD_MD_FLCOOKIE) - oa->o_lcookie = *oti->oti_logcookies; body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY); LASSERT(body); lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa); - osc_pack_capa(req, body, (struct obd_capa *)capa); ptlrpc_request_set_replen(req); - /* If osc_destory is for destroying the unlink orphan, - * sent from MDT to OST, which should not be blocked here, - * because the process might be triggered by ptlrpcd, and - * it is not good to block ptlrpcd thread (b=16006)*/ - if (!(oa->o_flags & OBD_FL_DELORPHAN)) { - req->rq_interpret_reply = osc_destroy_interpret; - if (!osc_can_send_destroy(cli)) { - struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, - NULL); - - /* - * Wait until the number of on-going destroy RPCs drops - * under max_rpc_in_flight - */ - l_wait_event_exclusive(cli->cl_destroy_waitq, - osc_can_send_destroy(cli), &lwi); - } - } + req->rq_interpret_reply = osc_destroy_interpret; + if (!osc_can_send_destroy(cli)) { + struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL); - /* Do not wait for response */ - ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1); - RETURN(0); + /* + * Wait until the number of on-going destroy RPCs drops + * under max_rpc_in_flight + */ + l_wait_event_exclusive(cli->cl_destroy_waitq, + osc_can_send_destroy(cli), &lwi); + } + + /* Do not wait for response */ + ptlrpcd_add_req(req); + RETURN(0); } static void osc_announce_cached(struct client_obd *cli, struct obdo *oa, long writing_bytes) { - obd_flag bits = OBD_MD_FLBLOCKS|OBD_MD_FLGRANT; + u64 bits = OBD_MD_FLBLOCKS | OBD_MD_FLGRANT; - LASSERT(!(oa->o_valid & bits)); + LASSERT(!(oa->o_valid & bits)); - oa->o_valid |= bits; - client_obd_list_lock(&cli->cl_loi_list_lock); - oa->o_dirty = cli->cl_dirty; - if (unlikely(cli->cl_dirty - cli->cl_dirty_transit > - cli->cl_dirty_max)) { + oa->o_valid |= bits; + spin_lock(&cli->cl_loi_list_lock); + if (OCD_HAS_FLAG(&cli->cl_import->imp_connect_data, GRANT_PARAM)) + oa->o_dirty = cli->cl_dirty_grant; + else + oa->o_dirty = cli->cl_dirty_pages << PAGE_CACHE_SHIFT; + if (unlikely(cli->cl_dirty_pages - cli->cl_dirty_transit > + cli->cl_dirty_max_pages)) { CERROR("dirty %lu - %lu > dirty_max %lu\n", - cli->cl_dirty, cli->cl_dirty_transit, cli->cl_dirty_max); + cli->cl_dirty_pages, cli->cl_dirty_transit, + cli->cl_dirty_max_pages); oa->o_undirty = 0; - } else if (unlikely(atomic_read(&obd_unstable_pages) + - atomic_read(&obd_dirty_pages) - - atomic_read(&obd_dirty_transit_pages) > + } else if (unlikely(atomic_long_read(&obd_dirty_pages) - + atomic_long_read(&obd_dirty_transit_pages) > (long)(obd_max_dirty_pages + 1))) { /* The atomic_read() allowing the atomic_inc() are * not covered by a lock thus they may safely race and trip * this CERROR() unless we add in a small fudge factor (+1). */ - CERROR("%s: dirty %d + %d - %d > system dirty_max %d\n", - cli->cl_import->imp_obd->obd_name, - atomic_read(&obd_unstable_pages), - atomic_read(&obd_dirty_pages), - atomic_read(&obd_dirty_transit_pages), + CERROR("%s: dirty %ld - %ld > system dirty_max %ld\n", + cli_name(cli), atomic_long_read(&obd_dirty_pages), + atomic_long_read(&obd_dirty_transit_pages), obd_max_dirty_pages); oa->o_undirty = 0; - } else if (unlikely(cli->cl_dirty_max - cli->cl_dirty > 0x7fffffff)) { + } else if (unlikely(cli->cl_dirty_max_pages - cli->cl_dirty_pages > + 0x7fffffff)) { CERROR("dirty %lu - dirty_max %lu too big???\n", - cli->cl_dirty, cli->cl_dirty_max); + cli->cl_dirty_pages, cli->cl_dirty_max_pages); oa->o_undirty = 0; } else { - long max_in_flight = (cli->cl_max_pages_per_rpc << - PAGE_CACHE_SHIFT) * - (cli->cl_max_rpcs_in_flight + 1); - oa->o_undirty = max(cli->cl_dirty_max, max_in_flight); + unsigned long nrpages; + + nrpages = cli->cl_max_pages_per_rpc; + nrpages *= cli->cl_max_rpcs_in_flight + 1; + nrpages = max(nrpages, cli->cl_dirty_max_pages); + oa->o_undirty = nrpages << PAGE_CACHE_SHIFT; + if (OCD_HAS_FLAG(&cli->cl_import->imp_connect_data, + GRANT_PARAM)) { + int nrextents; + + /* take extent tax into account when asking for more + * grant space */ + nrextents = (nrpages + cli->cl_max_extent_pages - 1) / + cli->cl_max_extent_pages; + oa->o_undirty += nrextents * cli->cl_grant_extent_tax; + } } oa->o_grant = cli->cl_avail_grant + cli->cl_reserved_grant; oa->o_dropped = cli->cl_lost_grant; cli->cl_lost_grant = 0; - client_obd_list_unlock(&cli->cl_loi_list_lock); + spin_unlock(&cli->cl_loi_list_lock); CDEBUG(D_CACHE,"dirty: "LPU64" undirty: %u dropped %u grant: "LPU64"\n", oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant); - } void osc_update_next_shrink(struct client_obd *cli) @@ -860,11 +635,11 @@ void osc_update_next_shrink(struct client_obd *cli) cli->cl_next_shrink_grant); } -static void __osc_update_grant(struct client_obd *cli, obd_size grant) +static void __osc_update_grant(struct client_obd *cli, u64 grant) { - client_obd_list_lock(&cli->cl_loi_list_lock); - cli->cl_avail_grant += grant; - client_obd_list_unlock(&cli->cl_loi_list_lock); + spin_lock(&cli->cl_loi_list_lock); + cli->cl_avail_grant += grant; + spin_unlock(&cli->cl_loi_list_lock); } static void osc_update_grant(struct client_obd *cli, struct ost_body *body) @@ -876,8 +651,9 @@ static void osc_update_grant(struct client_obd *cli, struct ost_body *body) } static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp, - obd_count keylen, void *key, obd_count vallen, - void *val, struct ptlrpc_request_set *set); + u32 keylen, void *key, + u32 vallen, void *val, + struct ptlrpc_request_set *set); static int osc_shrink_grant_interpret(const struct lu_env *env, struct ptlrpc_request *req, @@ -902,10 +678,10 @@ out: static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa) { - client_obd_list_lock(&cli->cl_loi_list_lock); - oa->o_grant = cli->cl_avail_grant / 4; - cli->cl_avail_grant -= oa->o_grant; - client_obd_list_unlock(&cli->cl_loi_list_lock); + spin_lock(&cli->cl_loi_list_lock); + oa->o_grant = cli->cl_avail_grant / 4; + cli->cl_avail_grant -= oa->o_grant; + spin_unlock(&cli->cl_loi_list_lock); if (!(oa->o_valid & OBD_MD_FLFLAGS)) { oa->o_valid |= OBD_MD_FLFLAGS; oa->o_flags = 0; @@ -923,10 +699,10 @@ static int osc_shrink_grant(struct client_obd *cli) __u64 target_bytes = (cli->cl_max_rpcs_in_flight + 1) * (cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT); - client_obd_list_lock(&cli->cl_loi_list_lock); + spin_lock(&cli->cl_loi_list_lock); if (cli->cl_avail_grant <= target_bytes) target_bytes = cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT; - client_obd_list_unlock(&cli->cl_loi_list_lock); + spin_unlock(&cli->cl_loi_list_lock); return osc_shrink_grant_to_target(cli, target_bytes); } @@ -937,7 +713,7 @@ int osc_shrink_grant_to_target(struct client_obd *cli, __u64 target_bytes) struct ost_body *body; ENTRY; - client_obd_list_lock(&cli->cl_loi_list_lock); + spin_lock(&cli->cl_loi_list_lock); /* Don't shrink if we are already above or below the desired limit * We don't want to shrink below a single RPC, as that will negatively * impact block allocation and long-term performance. */ @@ -945,10 +721,10 @@ int osc_shrink_grant_to_target(struct client_obd *cli, __u64 target_bytes) target_bytes = cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT; if (target_bytes >= cli->cl_avail_grant) { - client_obd_list_unlock(&cli->cl_loi_list_lock); + spin_unlock(&cli->cl_loi_list_lock); RETURN(0); } - client_obd_list_unlock(&cli->cl_loi_list_lock); + spin_unlock(&cli->cl_loi_list_lock); OBD_ALLOC_PTR(body); if (!body) @@ -956,10 +732,10 @@ int osc_shrink_grant_to_target(struct client_obd *cli, __u64 target_bytes) osc_announce_cached(cli, &body->oa, 0); - client_obd_list_lock(&cli->cl_loi_list_lock); + spin_lock(&cli->cl_loi_list_lock); body->oa.o_grant = cli->cl_avail_grant - target_bytes; cli->cl_avail_grant = target_bytes; - client_obd_list_unlock(&cli->cl_loi_list_lock); + spin_unlock(&cli->cl_loi_list_lock); if (!(body->oa.o_valid & OBD_MD_FLFLAGS)) { body->oa.o_valid |= OBD_MD_FLFLAGS; body->oa.o_flags = 0; @@ -1013,21 +789,19 @@ static int osc_grant_shrink_grant_cb(struct timeout_item *item, void *data) static int osc_add_shrink_grant(struct client_obd *client) { - int rc; + int rc; - rc = ptlrpc_add_timeout_client(client->cl_grant_shrink_interval, - TIMEOUT_GRANT, - osc_grant_shrink_grant_cb, NULL, - &client->cl_grant_shrink_list); - if (rc) { - CERROR("add grant client %s error %d\n", - client->cl_import->imp_obd->obd_name, rc); - return rc; - } - CDEBUG(D_CACHE, "add grant client %s \n", - client->cl_import->imp_obd->obd_name); - osc_update_next_shrink(client); - return 0; + rc = ptlrpc_add_timeout_client(client->cl_grant_shrink_interval, + TIMEOUT_GRANT, + osc_grant_shrink_grant_cb, NULL, + &client->cl_grant_shrink_list); + if (rc) { + CERROR("add grant client %s error %d\n", cli_name(client), rc); + return rc; + } + CDEBUG(D_CACHE, "add grant client %s\n", cli_name(client)); + osc_update_next_shrink(client); + return 0; } static int osc_del_shrink_grant(struct client_obd *client) @@ -1038,36 +812,60 @@ static int osc_del_shrink_grant(struct client_obd *client) static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd) { - /* - * ocd_grant is the total grant amount we're expect to hold: if we've - * been evicted, it's the new avail_grant amount, cl_dirty will drop - * to 0 as inflight RPCs fail out; otherwise, it's avail_grant + dirty. - * - * race is tolerable here: if we're evicted, but imp_state already - * left EVICTED state, then cl_dirty must be 0 already. - */ - client_obd_list_lock(&cli->cl_loi_list_lock); - if (cli->cl_import->imp_state == LUSTRE_IMP_EVICTED) - cli->cl_avail_grant = ocd->ocd_grant; - else - cli->cl_avail_grant = ocd->ocd_grant - cli->cl_dirty; + /* + * ocd_grant is the total grant amount we're expect to hold: if we've + * been evicted, it's the new avail_grant amount, cl_dirty_pages will + * drop to 0 as inflight RPCs fail out; otherwise, it's avail_grant + + * dirty. + * + * race is tolerable here: if we're evicted, but imp_state already + * left EVICTED state, then cl_dirty_pages must be 0 already. + */ + spin_lock(&cli->cl_loi_list_lock); + cli->cl_avail_grant = ocd->ocd_grant; + if (cli->cl_import->imp_state != LUSTRE_IMP_EVICTED) { + cli->cl_avail_grant -= cli->cl_reserved_grant; + if (OCD_HAS_FLAG(ocd, GRANT_PARAM)) + cli->cl_avail_grant -= cli->cl_dirty_grant; + else + cli->cl_avail_grant -= + cli->cl_dirty_pages << PAGE_CACHE_SHIFT; + } if (cli->cl_avail_grant < 0) { CWARN("%s: available grant < 0: avail/ocd/dirty %ld/%u/%ld\n", - cli->cl_import->imp_obd->obd_name, cli->cl_avail_grant, - ocd->ocd_grant, cli->cl_dirty); + cli_name(cli), cli->cl_avail_grant, + ocd->ocd_grant, cli->cl_dirty_pages << PAGE_CACHE_SHIFT); /* workaround for servers which do not have the patch from * LU-2679 */ cli->cl_avail_grant = ocd->ocd_grant; } - /* determine the appropriate chunk size used by osc_extent. */ - cli->cl_chunkbits = max_t(int, PAGE_CACHE_SHIFT, ocd->ocd_blocksize); - client_obd_list_unlock(&cli->cl_loi_list_lock); + if (OCD_HAS_FLAG(ocd, GRANT_PARAM)) { + u64 size; + + /* overhead for each extent insertion */ + cli->cl_grant_extent_tax = ocd->ocd_grant_tax_kb << 10; + /* determine the appropriate chunk size used by osc_extent. */ + cli->cl_chunkbits = max_t(int, PAGE_CACHE_SHIFT, + ocd->ocd_grant_blkbits); + /* determine maximum extent size, in #pages */ + size = (u64)ocd->ocd_grant_max_blks << ocd->ocd_grant_blkbits; + cli->cl_max_extent_pages = size >> PAGE_CACHE_SHIFT; + if (cli->cl_max_extent_pages == 0) + cli->cl_max_extent_pages = 1; + } else { + cli->cl_grant_extent_tax = 0; + cli->cl_chunkbits = PAGE_CACHE_SHIFT; + cli->cl_max_extent_pages = DT_MAX_BRW_PAGES; + } + spin_unlock(&cli->cl_loi_list_lock); CDEBUG(D_CACHE, "%s, setting cl_avail_grant: %ld cl_lost_grant: %ld." - "chunk bits: %d.\n", cli->cl_import->imp_obd->obd_name, - cli->cl_avail_grant, cli->cl_lost_grant, cli->cl_chunkbits); + "chunk bits: %d cl_max_extent_pages: %d\n", + cli_name(cli), + cli->cl_avail_grant, cli->cl_lost_grant, cli->cl_chunkbits, + cli->cl_max_extent_pages); if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT_SHRINK && list_empty(&cli->cl_grant_shrink_list)) @@ -1078,7 +876,7 @@ static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd) * beyond the end of a stripe file; i.e. lustre is reading a sparse file * via the LOV, and it _knows_ it's reading inside the file, it's just that * this stripe never got written at or beyond this stripe offset yet. */ -static void handle_short_read(int nob_read, obd_count page_count, +static void handle_short_read(int nob_read, size_t page_count, struct brw_page **pga) { char *ptr; @@ -1091,7 +889,7 @@ static void handle_short_read(int nob_read, obd_count page_count, if (pga[i]->count > nob_read) { /* EOF inside this page */ ptr = kmap(pga[i]->pg) + - (pga[i]->off & ~CFS_PAGE_MASK); + (pga[i]->off & ~PAGE_MASK); memset(ptr + nob_read, 0, pga[i]->count - nob_read); kunmap(pga[i]->pg); page_count--; @@ -1106,7 +904,7 @@ static void handle_short_read(int nob_read, obd_count page_count, /* zero remaining pages */ while (page_count-- > 0) { - ptr = kmap(pga[i]->pg) + (pga[i]->off & ~CFS_PAGE_MASK); + ptr = kmap(pga[i]->pg) + (pga[i]->off & ~PAGE_MASK); memset(ptr, 0, pga[i]->count); kunmap(pga[i]->pg); i++; @@ -1114,8 +912,8 @@ static void handle_short_read(int nob_read, obd_count page_count, } static int check_write_rcs(struct ptlrpc_request *req, - int requested_nob, int niocount, - obd_count page_count, struct brw_page **pga) + int requested_nob, int niocount, + size_t page_count, struct brw_page **pga) { int i; __u32 *remote_rcs; @@ -1160,7 +958,7 @@ static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2) * safe to combine */ if (unlikely((p1->flag & mask) != (p2->flag & mask))) { CWARN("Saw flags 0x%x and 0x%x in the same brw, please " - "report this at http://bugs.whamcloud.com/\n", + "report this at https://jira.hpdd.intel.com/\n", p1->flag, p2->flag); } return 0; @@ -1169,11 +967,11 @@ static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2) return (p1->off + p1->count == p2->off); } -static obd_count osc_checksum_bulk(int nob, obd_count pg_count, - struct brw_page **pga, int opc, - cksum_type_t cksum_type) +static u32 osc_checksum_bulk(int nob, size_t pg_count, + struct brw_page **pga, int opc, + cksum_type_t cksum_type) { - __u32 cksum; + u32 cksum; int i = 0; struct cfs_crypto_hash_desc *hdesc; unsigned int bufsize; @@ -1190,34 +988,32 @@ static obd_count osc_checksum_bulk(int nob, obd_count pg_count, } while (nob > 0 && pg_count > 0) { - int count = pga[i]->count > nob ? nob : pga[i]->count; + unsigned int count = pga[i]->count > nob ? nob : pga[i]->count; /* corrupt the data before we compute the checksum, to * simulate an OST->client data error */ if (i == 0 && opc == OST_READ && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE)) { unsigned char *ptr = kmap(pga[i]->pg); - int off = pga[i]->off & ~CFS_PAGE_MASK; - memcpy(ptr + off, "bad1", min(4, nob)); + int off = pga[i]->off & ~PAGE_MASK; + + memcpy(ptr + off, "bad1", min_t(typeof(nob), 4, nob)); kunmap(pga[i]->pg); } cfs_crypto_hash_update_page(hdesc, pga[i]->pg, - pga[i]->off & ~CFS_PAGE_MASK, - count); + pga[i]->off & ~PAGE_MASK, + count); LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d\n", - (int)(pga[i]->off & ~CFS_PAGE_MASK)); + (int)(pga[i]->off & ~PAGE_MASK)); nob -= pga[i]->count; pg_count--; i++; } - bufsize = 4; + bufsize = sizeof(cksum); err = cfs_crypto_hash_final(hdesc, (unsigned char *)&cksum, &bufsize); - if (err) - cfs_crypto_hash_final(hdesc, NULL, NULL); - /* For sending we only compute the wrong checksum instead * of corrupting the data so it is still correct on a redo */ if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND)) @@ -1226,12 +1022,10 @@ static obd_count osc_checksum_bulk(int nob, obd_count pg_count, return cksum; } -static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa, - struct lov_stripe_md *lsm, obd_count page_count, - struct brw_page **pga, - struct ptlrpc_request **reqp, - struct obd_capa *ocapa, int reserve, - int resend) +static int +osc_brw_prep_request(int cmd, struct client_obd *cli, struct obdo *oa, + u32 page_count, struct brw_page **pga, + struct ptlrpc_request **reqp, int resend) { struct ptlrpc_request *req; struct ptlrpc_bulk_desc *desc; @@ -1249,15 +1043,15 @@ static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa, if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2)) RETURN(-EINVAL); /* Fatal */ - if ((cmd & OBD_BRW_WRITE) != 0) { - opc = OST_WRITE; - req = ptlrpc_request_alloc_pool(cli->cl_import, - cli->cl_import->imp_rq_pool, - &RQF_OST_BRW_WRITE); - } else { - opc = OST_READ; - req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW_READ); - } + if ((cmd & OBD_BRW_WRITE) != 0) { + opc = OST_WRITE; + req = ptlrpc_request_alloc_pool(cli->cl_import, + osc_rq_pool, + &RQF_OST_BRW_WRITE); + } else { + opc = OST_READ; + req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW_READ); + } if (req == NULL) RETURN(-ENOMEM); @@ -1271,7 +1065,6 @@ static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa, sizeof(*ioobj)); req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT, niocount * sizeof(*niobuf)); - osc_set_capa_size(req, &RMF_CAPA1, ocapa); rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc); if (rc) { @@ -1286,8 +1079,11 @@ static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa, desc = ptlrpc_prep_bulk_imp(req, page_count, cli->cl_import->imp_connect_data.ocd_brw_size >> LNET_MTU_BITS, - opc == OST_WRITE ? BULK_GET_SOURCE : BULK_PUT_SINK, - OST_BULK_PORTAL); + (opc == OST_WRITE ? PTLRPC_BULK_GET_SOURCE : + PTLRPC_BULK_PUT_SINK) | + PTLRPC_BULK_BUF_KIOV, + OST_BULK_PORTAL, + &ptlrpc_bulk_kiov_pin_ops); if (desc == NULL) GOTO(out, rc = -ENOMEM); @@ -1308,12 +1104,11 @@ static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa, * "max - 1" for old client compatibility sending "0", and also so the * the actual maximum is a power-of-two number, not one less. LU-1431 */ ioobj_max_brw_set(ioobj, desc->bd_md_max_brw); - osc_pack_capa(req, body, ocapa); LASSERT(page_count > 0); pg_prev = pga[0]; for (requested_nob = i = 0; i < page_count; i++, niobuf++) { struct brw_page *pg = pga[i]; - int poff = pg->off & ~CFS_PAGE_MASK; + int poff = pg->off & ~PAGE_MASK; LASSERT(pg->count > 0); /* make sure there is no gap in the middle of page array */ @@ -1324,7 +1119,6 @@ static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa, ergo(i == page_count - 1, poff == 0)), "i: %d/%d pg: %p off: "LPU64", count: %u\n", i, page_count, pg, pg->off, pg->count); -#ifdef __linux__ LASSERTF(i == 0 || pg->off > pg_prev->off, "i %d p_c %u pg %p [pri %lu ind %lu] off "LPU64 " prev_pg %p [pri %lu ind %lu] off "LPU64"\n", @@ -1332,23 +1126,19 @@ static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa, pg->pg, page_private(pg->pg), pg->pg->index, pg->off, pg_prev->pg, page_private(pg_prev->pg), pg_prev->pg->index, pg_prev->off); -#else - LASSERTF(i == 0 || pg->off > pg_prev->off, - "i %d p_c %u\n", i, page_count); -#endif LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) == (pg->flag & OBD_BRW_SRVLOCK)); - ptlrpc_prep_bulk_page_pin(desc, pg->pg, poff, pg->count); + desc->bd_frag_ops->add_kiov_frag(desc, pg->pg, poff, pg->count); requested_nob += pg->count; if (i > 0 && can_merge_pages(pg_prev, pg)) { niobuf--; - niobuf->len += pg->count; - } else { - niobuf->offset = pg->off; - niobuf->len = pg->count; - niobuf->flags = pg->flag; + niobuf->rnb_len += pg->count; + } else { + niobuf->rnb_offset = pg->off; + niobuf->rnb_len = pg->count; + niobuf->rnb_flags = pg->flag; } pg_prev = pg; } @@ -1423,10 +1213,12 @@ static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa, aa->aa_ppga = pga; aa->aa_cli = cli; INIT_LIST_HEAD(&aa->aa_oaps); - if (ocapa && reserve) - aa->aa_ocapa = capa_get(ocapa); - *reqp = req; + *reqp = req; + niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE); + CDEBUG(D_RPCTRACE, "brw rpc %p - object "DOSTID" offset %lld<>%lld\n", + req, POSTID(&oa->o_oi), niobuf[0].rnb_offset, + niobuf[niocount - 1].rnb_offset + niobuf[niocount - 1].rnb_len); RETURN(0); out: @@ -1435,9 +1227,9 @@ static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa, } static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer, - __u32 client_cksum, __u32 server_cksum, int nob, - obd_count page_count, struct brw_page **pga, - cksum_type_t client_cksum_type) + __u32 client_cksum, __u32 server_cksum, int nob, + size_t page_count, struct brw_page **pga, + cksum_type_t client_cksum_type) { __u32 new_cksum; char *msg; @@ -1487,7 +1279,7 @@ static int osc_brw_fini_request(struct ptlrpc_request *req, int rc) &req->rq_import->imp_connection->c_peer; struct client_obd *cli = aa->aa_cli; struct ost_body *body; - __u32 client_cksum = 0; + u32 client_cksum = 0; ENTRY; if (rc < 0 && rc != -EDQUOT) { @@ -1567,9 +1359,9 @@ static int osc_brw_fini_request(struct ptlrpc_request *req, int rc) if (body->oa.o_valid & OBD_MD_FLCKSUM) { static int cksum_counter; - __u32 server_cksum = body->oa.o_cksum; - char *via; - char *router; + u32 server_cksum = body->oa.o_cksum; + char *via = ""; + char *router = ""; cksum_type_t cksum_type; cksum_type = cksum_type_unpack(body->oa.o_valid &OBD_MD_FLFLAGS? @@ -1578,19 +1370,12 @@ static int osc_brw_fini_request(struct ptlrpc_request *req, int rc) aa->aa_ppga, OST_READ, cksum_type); - if (peer->nid == req->rq_bulk->bd_sender) { - via = router = ""; - } else { - via = " via "; - router = libcfs_nid2str(req->rq_bulk->bd_sender); - } + if (peer->nid != req->rq_bulk->bd_sender) { + via = " via "; + router = libcfs_nid2str(req->rq_bulk->bd_sender); + } - if (server_cksum == ~0 && rc > 0) { - CERROR("Protocol error: server %s set the 'checksum' " - "bit, but didn't send a checksum. Not fatal, " - "but please notify on http://bugs.whamcloud.com/\n", - libcfs_nid2str(peer->nid)); - } else if (server_cksum != client_cksum) { + if (server_cksum != client_cksum) { LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from " "%s%s%s inode "DFID" object "DOSTID " extent ["LPU64"-"LPU64"]\n", @@ -1647,12 +1432,10 @@ static int osc_brw_redo_request(struct ptlrpc_request *request, DEBUG_REQ(rc == -EINPROGRESS ? D_RPCTRACE : D_ERROR, request, "redo for recoverable error %d", rc); - rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) == - OST_WRITE ? OBD_BRW_WRITE :OBD_BRW_READ, - aa->aa_cli, aa->aa_oa, - NULL /* lsm unused by osc currently */, - aa->aa_page_count, aa->aa_ppga, - &new_req, aa->aa_ocapa, 0, 1); + rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) == + OST_WRITE ? OBD_BRW_WRITE : OBD_BRW_READ, + aa->aa_cli, aa->aa_oa, aa->aa_page_count, + aa->aa_ppga, &new_req, 1); if (rc) RETURN(rc); @@ -1697,14 +1480,11 @@ static int osc_brw_redo_request(struct ptlrpc_request *request, } } - new_aa->aa_ocapa = aa->aa_ocapa; - aa->aa_ocapa = NULL; - /* XXX: This code will run into problem if we're going to support * to add a series of BRW RPCs into a self-defined ptlrpc_request_set * and wait for all of them to be finished. We should inherit request * set from old request. */ - ptlrpcd_add_req(new_req, PDL_POLICY_SAME, -1); + ptlrpcd_add_req(new_req); DEBUG_REQ(D_INFO, new_req, "new request"); RETURN(0); @@ -1741,7 +1521,7 @@ static void sort_brw_pages(struct brw_page **array, int num) } while (stride > 1); } -static void osc_release_ppga(struct brw_page **ppga, obd_count count) +static void osc_release_ppga(struct brw_page **ppga, size_t count) { LASSERT(ppga != NULL); OBD_FREE(ppga, sizeof(*ppga) * count); @@ -1783,11 +1563,6 @@ static int brw_interpret(const struct lu_env *env, rc = -EIO; } - if (aa->aa_ocapa) { - capa_put(aa->aa_ocapa); - aa->aa_ocapa = NULL; - } - if (rc == 0) { struct obdo *oa = aa->aa_oa; struct cl_attr *attr = &osc_env_info(env)->oti_attr; @@ -1818,7 +1593,8 @@ static int brw_interpret(const struct lu_env *env, if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) { struct lov_oinfo *loi = cl2osc(obj)->oo_oinfo; - loff_t last_off = last->oap_count + last->oap_obj_off; + loff_t last_off = last->oap_count + last->oap_obj_off + + last->oap_page_off; /* Change file size if this is an out of quota or * direct IO write and it extends the file size */ @@ -1835,11 +1611,14 @@ static int brw_interpret(const struct lu_env *env, } if (valid != 0) - cl_object_attr_set(env, obj, attr, valid); + cl_object_attr_update(env, obj, attr, valid); cl_object_attr_unlock(obj); } OBDO_FREE(aa->aa_oa); + if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE && rc == 0) + osc_inc_unstable_pages(req); + list_for_each_entry_safe(ext, tmp, &aa->aa_exts, oe_link) { list_del_init(&ext->oe_link); osc_extent_finish(env, ext, 1, rc); @@ -1847,12 +1626,10 @@ static int brw_interpret(const struct lu_env *env, LASSERT(list_empty(&aa->aa_exts)); LASSERT(list_empty(&aa->aa_oaps)); - cl_req_completion(env, aa->aa_clerq, rc < 0 ? rc : - req->rq_bulk->bd_nob_transferred); osc_release_ppga(aa->aa_ppga, aa->aa_page_count); ptlrpc_lprocfs_brw(req, req->rq_bulk->bd_nob_transferred); - client_obd_list_lock(&cli->cl_loi_list_lock); + spin_lock(&cli->cl_loi_list_lock); /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters * is called so we know whether to go to sync BRWs or wait for more * RPCs to complete */ @@ -1861,9 +1638,9 @@ static int brw_interpret(const struct lu_env *env, else cli->cl_r_in_flight--; osc_wake_cache_waiters(cli); - client_obd_list_unlock(&cli->cl_loi_list_lock); + spin_unlock(&cli->cl_loi_list_lock); - osc_io_unplug(env, cli, NULL, PDL_POLICY_SAME); + osc_io_unplug(env, cli, NULL); RETURN(rc); } @@ -1891,7 +1668,7 @@ static void brw_commit(struct ptlrpc_request *req) * Extents in the list must be in OES_RPC state. */ int osc_build_rpc(const struct lu_env *env, struct client_obd *cli, - struct list_head *ext_list, int cmd, pdl_policy_t pol) + struct list_head *ext_list, int cmd) { struct ptlrpc_request *req = NULL; struct osc_extent *ext; @@ -1899,21 +1676,20 @@ int osc_build_rpc(const struct lu_env *env, struct client_obd *cli, struct osc_brw_async_args *aa = NULL; struct obdo *oa = NULL; struct osc_async_page *oap; - struct osc_async_page *tmp; - struct cl_req *clerq = NULL; - enum cl_req_type crt = (cmd & OBD_BRW_WRITE) ? CRT_WRITE : - CRT_READ; - struct ldlm_lock *lock = NULL; + struct osc_object *obj = NULL; struct cl_req_attr *crattr = NULL; - obd_off starting_offset = OBD_OBJECT_EOF; - obd_off ending_offset = 0; + loff_t starting_offset = OBD_OBJECT_EOF; + loff_t ending_offset = 0; int mpflag = 0; int mem_tight = 0; int page_count = 0; + bool soft_sync = false; + bool interrupted = false; int i; + int grant = 0; int rc; struct list_head rpc_list = LIST_HEAD_INIT(rpc_list); - + struct ost_body *body; ENTRY; LASSERT(!list_empty(ext_list)); @@ -1921,29 +1697,16 @@ int osc_build_rpc(const struct lu_env *env, struct client_obd *cli, list_for_each_entry(ext, ext_list, oe_link) { LASSERT(ext->oe_state == OES_RPC); mem_tight |= ext->oe_memalloc; - list_for_each_entry(oap, &ext->oe_pages, oap_pending_item) { - ++page_count; - list_add_tail(&oap->oap_rpc_item, &rpc_list); - if (starting_offset > oap->oap_obj_off) - starting_offset = oap->oap_obj_off; - else - LASSERT(oap->oap_page_off == 0); - if (ending_offset < oap->oap_obj_off + oap->oap_count) - ending_offset = oap->oap_obj_off + - oap->oap_count; - else - LASSERT(oap->oap_page_off + oap->oap_count == - PAGE_CACHE_SIZE); - } + grant += ext->oe_grants; + page_count += ext->oe_nr_pages; + if (obj == NULL) + obj = ext->oe_obj; } + soft_sync = osc_over_unstable_soft_limit(cli); if (mem_tight) mpflag = cfs_memory_pressure_get_and_set(); - OBD_ALLOC(crattr, sizeof(*crattr)); - if (crattr == NULL) - GOTO(out, rc = -ENOMEM); - OBD_ALLOC(pga, sizeof(*pga) * page_count); if (pga == NULL) GOTO(out, rc = -ENOMEM); @@ -1953,44 +1716,49 @@ int osc_build_rpc(const struct lu_env *env, struct client_obd *cli, GOTO(out, rc = -ENOMEM); i = 0; - list_for_each_entry(oap, &rpc_list, oap_rpc_item) { - struct cl_page *page = oap2cl_page(oap); - if (clerq == NULL) { - clerq = cl_req_alloc(env, page, crt, - 1 /* only 1-object rpcs for now */); - if (IS_ERR(clerq)) - GOTO(out, rc = PTR_ERR(clerq)); - lock = oap->oap_ldlm_lock; + list_for_each_entry(ext, ext_list, oe_link) { + list_for_each_entry(oap, &ext->oe_pages, oap_pending_item) { + if (mem_tight) + oap->oap_brw_flags |= OBD_BRW_MEMALLOC; + if (soft_sync) + oap->oap_brw_flags |= OBD_BRW_SOFT_SYNC; + pga[i] = &oap->oap_brw_page; + pga[i]->off = oap->oap_obj_off + oap->oap_page_off; + i++; + + list_add_tail(&oap->oap_rpc_item, &rpc_list); + if (starting_offset == OBD_OBJECT_EOF || + starting_offset > oap->oap_obj_off) + starting_offset = oap->oap_obj_off; + else + LASSERT(oap->oap_page_off == 0); + if (ending_offset < oap->oap_obj_off + oap->oap_count) + ending_offset = oap->oap_obj_off + + oap->oap_count; + else + LASSERT(oap->oap_page_off + oap->oap_count == + PAGE_CACHE_SIZE); + if (oap->oap_interrupted) + interrupted = true; } - if (mem_tight) - oap->oap_brw_flags |= OBD_BRW_MEMALLOC; - pga[i] = &oap->oap_brw_page; - pga[i]->off = oap->oap_obj_off + oap->oap_page_off; - CDEBUG(0, "put page %p index %lu oap %p flg %x to pga\n", - pga[i]->pg, page_index(oap->oap_page), oap, - pga[i]->flag); - i++; - cl_req_page_add(env, clerq, page); } - /* always get the data for the obdo for the rpc */ - LASSERT(clerq != NULL); + /* first page in the list */ + oap = list_entry(rpc_list.next, typeof(*oap), oap_rpc_item); + + crattr = &osc_env_info(env)->oti_req_attr; + memset(crattr, 0, sizeof(*crattr)); + crattr->cra_type = (cmd & OBD_BRW_WRITE) ? CRT_WRITE : CRT_READ; + crattr->cra_flags = ~0ULL; + crattr->cra_page = oap2cl_page(oap); crattr->cra_oa = oa; - cl_req_attr_set(env, clerq, crattr, ~0ULL); - if (lock) { - oa->o_handle = lock->l_remote_handle; - oa->o_valid |= OBD_MD_FLHANDLE; - } + cl_req_attr_set(env, osc2cl(obj), crattr); - rc = cl_req_prep(env, clerq); - if (rc != 0) { - CERROR("cl_req_prep failed: %d\n", rc); - GOTO(out, rc); - } + if (cmd == OBD_BRW_WRITE) + oa->o_grant_used = grant; sort_brw_pages(pga, page_count); - rc = osc_brw_prep_request(cmd, cli, oa, NULL, page_count, - pga, &req, crattr->cra_capa, 1, 0); + rc = osc_brw_prep_request(cmd, cli, oa, page_count, pga, &req, 0); if (rc != 0) { CERROR("prep_req failed: %d\n", rc); GOTO(out, rc); @@ -1998,18 +1766,20 @@ int osc_build_rpc(const struct lu_env *env, struct client_obd *cli, req->rq_commit_cb = brw_commit; req->rq_interpret_reply = brw_interpret; - - if (mem_tight != 0) - req->rq_memalloc = 1; + req->rq_memalloc = mem_tight != 0; + oap->oap_request = ptlrpc_request_addref(req); + if (interrupted && !req->rq_intr) + ptlrpc_mark_interrupted(req); /* Need to update the timestamps after the request is built in case * we race with setattr (locally or in queue at OST). If OST gets * later setattr before earlier BRW (as determined by the request xid), * the OST will not use BRW timestamps. Sadly, there is no obvious * way to do this in a single call. bug 10150 */ - cl_req_attr_set(env, clerq, crattr, - OBD_MD_FLMTIME|OBD_MD_FLCTIME|OBD_MD_FLATIME); - + body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY); + crattr->cra_oa = &body->oa; + crattr->cra_flags = OBD_MD_FLMTIME|OBD_MD_FLCTIME|OBD_MD_FLATIME; + cl_req_attr_set(env, osc2cl(obj), crattr); lustre_msg_set_jobid(req->rq_reqmsg, crattr->cra_jobid); CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args)); @@ -2018,25 +1788,8 @@ int osc_build_rpc(const struct lu_env *env, struct client_obd *cli, list_splice_init(&rpc_list, &aa->aa_oaps); INIT_LIST_HEAD(&aa->aa_exts); list_splice_init(ext_list, &aa->aa_exts); - aa->aa_clerq = clerq; - /* queued sync pages can be torn down while the pages - * were between the pending list and the rpc */ - tmp = NULL; - list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) { - /* only one oap gets a request reference */ - if (tmp == NULL) - tmp = oap; - if (oap->oap_interrupted && !req->rq_intr) { - CDEBUG(D_INODE, "oap %p in req %p interrupted\n", - oap, req); - ptlrpc_mark_interrupted(req); - } - } - if (tmp != NULL) - tmp->oap_request = ptlrpc_request_addref(req); - - client_obd_list_lock(&cli->cl_loi_list_lock); + spin_lock(&cli->cl_loi_list_lock); starting_offset >>= PAGE_CACHE_SHIFT; if (cmd == OBD_BRW_READ) { cli->cl_r_in_flight++; @@ -2051,25 +1804,13 @@ int osc_build_rpc(const struct lu_env *env, struct client_obd *cli, lprocfs_oh_tally_log2(&cli->cl_write_offset_hist, starting_offset + 1); } - client_obd_list_unlock(&cli->cl_loi_list_lock); + spin_unlock(&cli->cl_loi_list_lock); - DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %dr/%dw in flight", + DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %ur/%uw in flight", page_count, aa, cli->cl_r_in_flight, cli->cl_w_in_flight); - /* XXX: Maybe the caller can check the RPC bulk descriptor to - * see which CPU/NUMA node the majority of pages were allocated - * on, and try to assign the async RPC to the CPU core - * (PDL_POLICY_PREFERRED) to reduce cross-CPU memory traffic. - * - * But on the other hand, we expect that multiple ptlrpcd - * threads and the initial write sponsor can run in parallel, - * especially when data checksum is enabled, which is CPU-bound - * operation and single ptlrpcd thread cannot process in time. - * So more ptlrpcd threads sharing BRW load - * (with PDL_POLICY_ROUND) seems better. - */ - ptlrpcd_add_req(req, pol, -1); + ptlrpcd_add_req(req); rc = 0; EXIT; @@ -2077,11 +1818,6 @@ out: if (mem_tight != 0) cfs_memory_pressure_restore(mpflag); - if (crattr != NULL) { - capa_put(crattr->cra_capa); - OBD_FREE(crattr, sizeof(*crattr)); - } - if (rc != 0) { LASSERT(req == NULL); @@ -2097,8 +1833,6 @@ out: list_del_init(&ext->oe_link); osc_extent_finish(env, ext, 0, rc); } - if (clerq && !IS_ERR(clerq)) - cl_req_completion(env, clerq, rc); } RETURN(rc); } @@ -2116,14 +1850,12 @@ static int osc_set_lock_data_with_check(struct ldlm_lock *lock, LASSERT(lock->l_glimpse_ast == einfo->ei_cb_gl); lock_res_and_lock(lock); - spin_lock(&osc_ast_guard); if (lock->l_ast_data == NULL) lock->l_ast_data = data; if (lock->l_ast_data == data) set = 1; - spin_unlock(&osc_ast_guard); unlock_res_and_lock(lock); return set; @@ -2144,134 +1876,95 @@ static int osc_set_data_with_check(struct lustre_handle *lockh, return set; } -static int osc_change_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm, - ldlm_iterator_t replace, void *data) +static int osc_enqueue_fini(struct ptlrpc_request *req, + osc_enqueue_upcall_f upcall, void *cookie, + struct lustre_handle *lockh, enum ldlm_mode mode, + __u64 *flags, int agl, int errcode) { - struct ldlm_res_id res_id; - struct obd_device *obd = class_exp2obd(exp); + bool intent = *flags & LDLM_FL_HAS_INTENT; + int rc; + ENTRY; - ostid_build_res_name(&lsm->lsm_oi, &res_id); - ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data); - return 0; -} + /* The request was created before ldlm_cli_enqueue call. */ + if (intent && errcode == ELDLM_LOCK_ABORTED) { + struct ldlm_reply *rep; + + rep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP); + LASSERT(rep != NULL); + + rep->lock_policy_res1 = + ptlrpc_status_ntoh(rep->lock_policy_res1); + if (rep->lock_policy_res1) + errcode = rep->lock_policy_res1; + if (!agl) + *flags |= LDLM_FL_LVB_READY; + } else if (errcode == ELDLM_OK) { + *flags |= LDLM_FL_LVB_READY; + } -/* find any ldlm lock of the inode in osc - * return 0 not find - * 1 find one - * < 0 error */ -static int osc_find_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm, - ldlm_iterator_t replace, void *data) -{ - struct ldlm_res_id res_id; - struct obd_device *obd = class_exp2obd(exp); - int rc = 0; + /* Call the update callback. */ + rc = (*upcall)(cookie, lockh, errcode); - ostid_build_res_name(&lsm->lsm_oi, &res_id); - rc = ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data); - if (rc == LDLM_ITER_STOP) - return(1); - if (rc == LDLM_ITER_CONTINUE) - return(0); - return(rc); + /* release the reference taken in ldlm_cli_enqueue() */ + if (errcode == ELDLM_LOCK_MATCHED) + errcode = ELDLM_OK; + if (errcode == ELDLM_OK && lustre_handle_is_used(lockh)) + ldlm_lock_decref(lockh, mode); + + RETURN(rc); } -static int osc_enqueue_fini(struct ptlrpc_request *req, struct ost_lvb *lvb, - obd_enqueue_update_f upcall, void *cookie, - __u64 *flags, int agl, int rc) +static int osc_enqueue_interpret(const struct lu_env *env, + struct ptlrpc_request *req, + struct osc_enqueue_args *aa, int rc) { - int intent = *flags & LDLM_FL_HAS_INTENT; - ENTRY; + struct ldlm_lock *lock; + struct lustre_handle *lockh = &aa->oa_lockh; + enum ldlm_mode mode = aa->oa_mode; + struct ost_lvb *lvb = aa->oa_lvb; + __u32 lvb_len = sizeof(*lvb); + __u64 flags = 0; - if (intent) { - /* The request was created before ldlm_cli_enqueue call. */ - if (rc == ELDLM_LOCK_ABORTED) { - struct ldlm_reply *rep; - rep = req_capsule_server_get(&req->rq_pill, - &RMF_DLM_REP); - - LASSERT(rep != NULL); - rep->lock_policy_res1 = - ptlrpc_status_ntoh(rep->lock_policy_res1); - if (rep->lock_policy_res1) - rc = rep->lock_policy_res1; - } - } - - if ((intent != 0 && rc == ELDLM_LOCK_ABORTED && agl == 0) || - (rc == 0)) { - *flags |= LDLM_FL_LVB_READY; - CDEBUG(D_INODE,"got kms "LPU64" blocks "LPU64" mtime "LPU64"\n", - lvb->lvb_size, lvb->lvb_blocks, lvb->lvb_mtime); - } + ENTRY; - /* Call the update callback. */ - rc = (*upcall)(cookie, rc); - RETURN(rc); -} + /* ldlm_cli_enqueue is holding a reference on the lock, so it must + * be valid. */ + lock = ldlm_handle2lock(lockh); + LASSERTF(lock != NULL, + "lockh "LPX64", req %p, aa %p - client evicted?\n", + lockh->cookie, req, aa); -static int osc_enqueue_interpret(const struct lu_env *env, - struct ptlrpc_request *req, - struct osc_enqueue_args *aa, int rc) -{ - struct ldlm_lock *lock; - struct lustre_handle handle; - __u32 mode; - struct ost_lvb *lvb; - __u32 lvb_len; - __u64 *flags = aa->oa_flags; - - /* Make a local copy of a lock handle and a mode, because aa->oa_* - * might be freed anytime after lock upcall has been called. */ - lustre_handle_copy(&handle, aa->oa_lockh); - mode = aa->oa_ei->ei_mode; - - /* ldlm_cli_enqueue is holding a reference on the lock, so it must - * be valid. */ - lock = ldlm_handle2lock(&handle); - - /* Take an additional reference so that a blocking AST that - * ldlm_cli_enqueue_fini() might post for a failed lock, is guaranteed - * to arrive after an upcall has been executed by - * osc_enqueue_fini(). */ - ldlm_lock_addref(&handle, mode); + /* Take an additional reference so that a blocking AST that + * ldlm_cli_enqueue_fini() might post for a failed lock, is guaranteed + * to arrive after an upcall has been executed by + * osc_enqueue_fini(). */ + ldlm_lock_addref(lockh, mode); /* Let cl_lock_state_wait fail with -ERESTARTSYS to unuse sublocks. */ OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_ENQUEUE_HANG, 2); - /* Let CP AST to grant the lock first. */ - OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 1); + /* Let CP AST to grant the lock first. */ + OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 1); - if (aa->oa_agl && rc == ELDLM_LOCK_ABORTED) { - lvb = NULL; - lvb_len = 0; - } else { - lvb = aa->oa_lvb; - lvb_len = sizeof(*aa->oa_lvb); - } + if (aa->oa_agl) { + LASSERT(aa->oa_lvb == NULL); + LASSERT(aa->oa_flags == NULL); + aa->oa_flags = &flags; + } - /* Complete obtaining the lock procedure. */ - rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_ei->ei_type, 1, - mode, flags, lvb, lvb_len, &handle, rc); - /* Complete osc stuff. */ - rc = osc_enqueue_fini(req, aa->oa_lvb, aa->oa_upcall, aa->oa_cookie, - flags, aa->oa_agl, rc); + /* Complete obtaining the lock procedure. */ + rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_type, 1, + aa->oa_mode, aa->oa_flags, lvb, lvb_len, + lockh, rc); + /* Complete osc stuff. */ + rc = osc_enqueue_fini(req, aa->oa_upcall, aa->oa_cookie, lockh, mode, + aa->oa_flags, aa->oa_agl, rc); OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10); - /* Release the lock for async request. */ - if (lustre_handle_is_used(&handle) && rc == ELDLM_OK) - /* - * Releases a reference taken by ldlm_cli_enqueue(), if it is - * not already released by - * ldlm_cli_enqueue_fini()->failed_lock_cleanup() - */ - ldlm_lock_decref(&handle, mode); - - LASSERTF(lock != NULL, "lockh %p, req %p, aa %p - client evicted?\n", - aa->oa_lockh, req, aa); - ldlm_lock_decref(&handle, mode); - LDLM_LOCK_PUT(lock); - return rc; + ldlm_lock_decref(lockh, mode); + LDLM_LOCK_PUT(lock); + RETURN(rc); } struct ptlrpc_request_set *PTLRPCD_SET = (void *)1; @@ -2281,28 +1974,28 @@ struct ptlrpc_request_set *PTLRPCD_SET = (void *)1; * other synchronous requests, however keeping some locks and trying to obtain * others may take a considerable amount of time in a case of ost failure; and * when other sync requests do not get released lock from a client, the client - * is excluded from the cluster -- such scenarious make the life difficult, so + * is evicted from the cluster -- such scenarious make the life difficult, so * release locks just after they are obtained. */ int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id, - __u64 *flags, ldlm_policy_data_t *policy, + __u64 *flags, union ldlm_policy_data *policy, struct ost_lvb *lvb, int kms_valid, - obd_enqueue_update_f upcall, void *cookie, + osc_enqueue_upcall_f upcall, void *cookie, struct ldlm_enqueue_info *einfo, - struct lustre_handle *lockh, struct ptlrpc_request_set *rqset, int async, int agl) { struct obd_device *obd = exp->exp_obd; + struct lustre_handle lockh = { 0 }; struct ptlrpc_request *req = NULL; int intent = *flags & LDLM_FL_HAS_INTENT; - __u64 match_lvb = (agl != 0 ? 0 : LDLM_FL_LVB_READY); - ldlm_mode_t mode; + __u64 match_lvb = agl ? 0 : LDLM_FL_LVB_READY; + enum ldlm_mode mode; int rc; ENTRY; /* Filesystem lock extents are extended to page boundaries so that * dealing with the page cache is a little smoother. */ - policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK; - policy->l_extent.end |= ~CFS_PAGE_MASK; + policy->l_extent.start -= policy->l_extent.start & ~PAGE_MASK; + policy->l_extent.end |= ~PAGE_MASK; /* * kms is not valid when either object is completely fresh (so that no @@ -2329,57 +2022,48 @@ int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id, if (einfo->ei_mode == LCK_PR) mode |= LCK_PW; mode = ldlm_lock_match(obd->obd_namespace, *flags | match_lvb, res_id, - einfo->ei_type, policy, mode, lockh, 0); - if (mode) { - struct ldlm_lock *matched = ldlm_handle2lock(lockh); - - if ((agl != 0) && !ldlm_is_lvb_ready(matched)) { - /* For AGL, if enqueue RPC is sent but the lock is not - * granted, then skip to process this strpe. - * Return -ECANCELED to tell the caller. */ - ldlm_lock_decref(lockh, mode); - LDLM_LOCK_PUT(matched); - RETURN(-ECANCELED); - } else if (osc_set_lock_data_with_check(matched, einfo)) { - *flags |= LDLM_FL_LVB_READY; - /* addref the lock only if not async requests and PW - * lock is matched whereas we asked for PR. */ - if (!rqset && einfo->ei_mode != mode) - ldlm_lock_addref(lockh, LCK_PR); - if (intent) { - /* I would like to be able to ASSERT here that - * rss <= kms, but I can't, for reasons which - * are explained in lov_enqueue() */ - } + einfo->ei_type, policy, mode, &lockh, 0); + if (mode) { + struct ldlm_lock *matched; + + if (*flags & LDLM_FL_TEST_LOCK) + RETURN(ELDLM_OK); + + matched = ldlm_handle2lock(&lockh); + if (agl) { + /* AGL enqueues DLM locks speculatively. Therefore if + * it already exists a DLM lock, it wll just inform the + * caller to cancel the AGL process for this stripe. */ + ldlm_lock_decref(&lockh, mode); + LDLM_LOCK_PUT(matched); + RETURN(-ECANCELED); + } else if (osc_set_lock_data_with_check(matched, einfo)) { + *flags |= LDLM_FL_LVB_READY; + + /* We already have a lock, and it's referenced. */ + (*upcall)(cookie, &lockh, ELDLM_LOCK_MATCHED); + + ldlm_lock_decref(&lockh, mode); + LDLM_LOCK_PUT(matched); + RETURN(ELDLM_OK); + } else { + ldlm_lock_decref(&lockh, mode); + LDLM_LOCK_PUT(matched); + } + } - /* We already have a lock, and it's referenced. - * - * At this point, the cl_lock::cll_state is CLS_QUEUING, - * AGL upcall may change it to CLS_HELD directly. */ - (*upcall)(cookie, ELDLM_OK); - - if (einfo->ei_mode != mode) - ldlm_lock_decref(lockh, LCK_PW); - else if (rqset) - /* For async requests, decref the lock. */ - ldlm_lock_decref(lockh, einfo->ei_mode); - LDLM_LOCK_PUT(matched); - RETURN(ELDLM_OK); - } else { - ldlm_lock_decref(lockh, mode); - LDLM_LOCK_PUT(matched); - } - } +no_match: + if (*flags & (LDLM_FL_TEST_LOCK | LDLM_FL_MATCH_LOCK)) + RETURN(-ENOLCK); - no_match: - if (intent) { + if (intent) { req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_LDLM_ENQUEUE_LVB); if (req == NULL) RETURN(-ENOMEM); - rc = ptlrpc_request_pack(req, LUSTRE_DLM_VERSION, LDLM_ENQUEUE); - if (rc < 0) { + rc = ldlm_prep_enqueue_req(exp, req, NULL, 0); + if (rc) { ptlrpc_request_free(req); RETURN(rc); } @@ -2393,57 +2077,67 @@ int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id, *flags &= ~LDLM_FL_BLOCK_GRANTED; rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb, - sizeof(*lvb), LVB_T_OST, lockh, async); - if (rqset) { - if (!rc) { - struct osc_enqueue_args *aa; - CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args)); - aa = ptlrpc_req_async_args(req); - aa->oa_ei = einfo; - aa->oa_exp = exp; - aa->oa_flags = flags; - aa->oa_upcall = upcall; - aa->oa_cookie = cookie; - aa->oa_lvb = lvb; - aa->oa_lockh = lockh; - aa->oa_agl = !!agl; - - req->rq_interpret_reply = - (ptlrpc_interpterer_t)osc_enqueue_interpret; - if (rqset == PTLRPCD_SET) - ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1); - else - ptlrpc_set_add_req(rqset, req); - } else if (intent) { - ptlrpc_req_finished(req); - } - RETURN(rc); - } + sizeof(*lvb), LVB_T_OST, &lockh, async); + if (async) { + if (!rc) { + struct osc_enqueue_args *aa; + CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args)); + aa = ptlrpc_req_async_args(req); + aa->oa_exp = exp; + aa->oa_mode = einfo->ei_mode; + aa->oa_type = einfo->ei_type; + lustre_handle_copy(&aa->oa_lockh, &lockh); + aa->oa_upcall = upcall; + aa->oa_cookie = cookie; + aa->oa_agl = !!agl; + if (!agl) { + aa->oa_flags = flags; + aa->oa_lvb = lvb; + } else { + /* AGL is essentially to enqueue an DLM lock + * in advance, so we don't care about the + * result of AGL enqueue. */ + aa->oa_lvb = NULL; + aa->oa_flags = NULL; + } + + req->rq_interpret_reply = + (ptlrpc_interpterer_t)osc_enqueue_interpret; + if (rqset == PTLRPCD_SET) + ptlrpcd_add_req(req); + else + ptlrpc_set_add_req(rqset, req); + } else if (intent) { + ptlrpc_req_finished(req); + } + RETURN(rc); + } - rc = osc_enqueue_fini(req, lvb, upcall, cookie, flags, agl, rc); - if (intent) - ptlrpc_req_finished(req); + rc = osc_enqueue_fini(req, upcall, cookie, &lockh, einfo->ei_mode, + flags, agl, rc); + if (intent) + ptlrpc_req_finished(req); - RETURN(rc); + RETURN(rc); } int osc_match_base(struct obd_export *exp, struct ldlm_res_id *res_id, - __u32 type, ldlm_policy_data_t *policy, __u32 mode, - __u64 *flags, void *data, struct lustre_handle *lockh, - int unref) + enum ldlm_type type, union ldlm_policy_data *policy, + enum ldlm_mode mode, __u64 *flags, void *data, + struct lustre_handle *lockh, int unref) { struct obd_device *obd = exp->exp_obd; __u64 lflags = *flags; - ldlm_mode_t rc; + enum ldlm_mode rc; ENTRY; - if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH)) - RETURN(-EIO); + if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH)) + RETURN(-EIO); - /* Filesystem lock extents are extended to page boundaries so that - * dealing with the page cache is a little smoother */ - policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK; - policy->l_extent.end |= ~CFS_PAGE_MASK; + /* Filesystem lock extents are extended to page boundaries so that + * dealing with the page cache is a little smoother */ + policy->l_extent.start -= policy->l_extent.start & ~PAGE_MASK; + policy->l_extent.end |= ~PAGE_MASK; /* Next, search for already existing extent locks that will cover us */ /* If we're trying to read, we also search for an existing PW lock. The @@ -2471,18 +2165,6 @@ int osc_match_base(struct obd_export *exp, struct ldlm_res_id *res_id, RETURN(rc); } -int osc_cancel_base(struct lustre_handle *lockh, __u32 mode) -{ - ENTRY; - - if (unlikely(mode == LCK_GROUP)) - ldlm_lock_decref_and_cancel(lockh, mode); - else - ldlm_lock_decref(lockh, mode); - - RETURN(0); -} - static int osc_statfs_interpret(const struct lu_env *env, struct ptlrpc_request *req, struct osc_async_args *aa, int rc) @@ -2625,7 +2307,7 @@ static int osc_statfs(const struct lu_env *env, struct obd_export *exp, } static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len, - void *karg, void *uarg) + void *karg, void __user *uarg) { struct obd_device *obd = exp->exp_obd; struct obd_ioctl_data *data = karg; @@ -2633,7 +2315,8 @@ static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len, ENTRY; if (!try_module_get(THIS_MODULE)) { - CERROR("Can't get module. Is it alive?"); + CERROR("%s: cannot get module '%s'\n", obd->obd_name, + module_name(THIS_MODULE)); return -EINVAL; } switch (cmd) { @@ -2647,9 +2330,6 @@ static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len, err = ptlrpc_set_import_active(obd->u.cli.cl_import, data->ioc_offset); GOTO(out, err); - case OBD_IOC_POLL_QUOTACHECK: - err = osc_quota_poll_check(exp, (struct if_quotacheck *)karg); - GOTO(out, err); case OBD_IOC_PING_TARGET: err = ptlrpc_obd_ping(obd); GOTO(out, err); @@ -2663,145 +2343,10 @@ out: return err; } -static int osc_get_info(const struct lu_env *env, struct obd_export *exp, - obd_count keylen, void *key, __u32 *vallen, void *val, - struct lov_stripe_md *lsm) -{ - ENTRY; - if (!vallen || !val) - RETURN(-EFAULT); - - if (KEY_IS(KEY_LOCK_TO_STRIPE)) { - __u32 *stripe = val; - *vallen = sizeof(*stripe); - *stripe = 0; - RETURN(0); - } else if (KEY_IS(KEY_LAST_ID)) { - struct ptlrpc_request *req; - obd_id *reply; - char *tmp; - int rc; - - req = ptlrpc_request_alloc(class_exp2cliimp(exp), - &RQF_OST_GET_INFO_LAST_ID); - if (req == NULL) - RETURN(-ENOMEM); - - req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY, - RCL_CLIENT, keylen); - rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO); - if (rc) { - ptlrpc_request_free(req); - RETURN(rc); - } - - tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY); - memcpy(tmp, key, keylen); - - req->rq_no_delay = req->rq_no_resend = 1; - ptlrpc_request_set_replen(req); - rc = ptlrpc_queue_wait(req); - if (rc) - GOTO(out, rc); - - reply = req_capsule_server_get(&req->rq_pill, &RMF_OBD_ID); - if (reply == NULL) - GOTO(out, rc = -EPROTO); - - *((obd_id *)val) = *reply; - out: - ptlrpc_req_finished(req); - RETURN(rc); - } else if (KEY_IS(KEY_FIEMAP)) { - struct ll_fiemap_info_key *fm_key = - (struct ll_fiemap_info_key *)key; - struct ldlm_res_id res_id; - ldlm_policy_data_t policy; - struct lustre_handle lockh; - ldlm_mode_t mode = 0; - struct ptlrpc_request *req; - struct ll_user_fiemap *reply; - char *tmp; - int rc; - - if (!(fm_key->fiemap.fm_flags & FIEMAP_FLAG_SYNC)) - goto skip_locking; - - policy.l_extent.start = fm_key->fiemap.fm_start & - CFS_PAGE_MASK; - - if (OBD_OBJECT_EOF - fm_key->fiemap.fm_length <= - fm_key->fiemap.fm_start + PAGE_CACHE_SIZE - 1) - policy.l_extent.end = OBD_OBJECT_EOF; - else - policy.l_extent.end = (fm_key->fiemap.fm_start + - fm_key->fiemap.fm_length + - PAGE_CACHE_SIZE - 1) & CFS_PAGE_MASK; - - ostid_build_res_name(&fm_key->oa.o_oi, &res_id); - mode = ldlm_lock_match(exp->exp_obd->obd_namespace, - LDLM_FL_BLOCK_GRANTED | - LDLM_FL_LVB_READY, - &res_id, LDLM_EXTENT, &policy, - LCK_PR | LCK_PW, &lockh, 0); - if (mode) { /* lock is cached on client */ - if (mode != LCK_PR) { - ldlm_lock_addref(&lockh, LCK_PR); - ldlm_lock_decref(&lockh, LCK_PW); - } - } else { /* no cached lock, needs acquire lock on server side */ - fm_key->oa.o_valid |= OBD_MD_FLFLAGS; - fm_key->oa.o_flags |= OBD_FL_SRVLOCK; - } - -skip_locking: - req = ptlrpc_request_alloc(class_exp2cliimp(exp), - &RQF_OST_GET_INFO_FIEMAP); - if (req == NULL) - GOTO(drop_lock, rc = -ENOMEM); - - req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_KEY, - RCL_CLIENT, keylen); - req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL, - RCL_CLIENT, *vallen); - req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL, - RCL_SERVER, *vallen); - - rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO); - if (rc) { - ptlrpc_request_free(req); - GOTO(drop_lock, rc); - } - - tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_KEY); - memcpy(tmp, key, keylen); - tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_VAL); - memcpy(tmp, val, *vallen); - - ptlrpc_request_set_replen(req); - rc = ptlrpc_queue_wait(req); - if (rc) - GOTO(fini_req, rc); - - reply = req_capsule_server_get(&req->rq_pill, &RMF_FIEMAP_VAL); - if (reply == NULL) - GOTO(fini_req, rc = -EPROTO); - - memcpy(val, reply, *vallen); -fini_req: - ptlrpc_req_finished(req); -drop_lock: - if (mode) - ldlm_lock_decref(&lockh, LCK_PR); - RETURN(rc); - } - - RETURN(-EINVAL); -} - static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp, - obd_count keylen, void *key, obd_count vallen, - void *val, struct ptlrpc_request_set *set) + u32 keylen, void *key, + u32 vallen, void *val, + struct ptlrpc_request_set *set) { struct ptlrpc_request *req; struct obd_device *obd = exp->exp_obd; @@ -2834,7 +2379,7 @@ static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp, LASSERT(cli->cl_cache == NULL); /* only once */ cli->cl_cache = (struct cl_client_cache *)val; - atomic_inc(&cli->cl_cache->ccc_users); + cl_cache_incref(cli->cl_cache); cli->cl_lru_left = &cli->cl_cache->ccc_lru_left; /* add this osc into entity list */ @@ -2848,11 +2393,11 @@ static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp, if (KEY_IS(KEY_CACHE_LRU_SHRINK)) { struct client_obd *cli = &obd->u.cli; - int nr = atomic_read(&cli->cl_lru_in_list) >> 1; - int target = *(int *)val; + long nr = atomic_long_read(&cli->cl_lru_in_list) >> 1; + long target = *(long *)val; nr = osc_lru_shrink(env, cli, min(nr, target), true); - *(int *)val -= nr; + *(long *)val -= nr; RETURN(0); } @@ -2906,15 +2451,16 @@ static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp, req->rq_interpret_reply = osc_shrink_grant_interpret; } - ptlrpc_request_set_replen(req); - if (!KEY_IS(KEY_GRANT_SHRINK)) { - LASSERT(set != NULL); - ptlrpc_set_add_req(set, req); - ptlrpc_check_set(NULL, set); - } else - ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1); + ptlrpc_request_set_replen(req); + if (!KEY_IS(KEY_GRANT_SHRINK)) { + LASSERT(set != NULL); + ptlrpc_set_add_req(set, req); + ptlrpc_check_set(NULL, set); + } else { + ptlrpcd_add_req(req); + } - RETURN(0); + RETURN(0); } static int osc_reconnect(const struct lu_env *env, @@ -2927,13 +2473,18 @@ static int osc_reconnect(const struct lu_env *env, if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) { long lost_grant; + long grant; - client_obd_list_lock(&cli->cl_loi_list_lock); - data->ocd_grant = (cli->cl_avail_grant + cli->cl_dirty) ?: - 2 * cli_brw_size(obd); - lost_grant = cli->cl_lost_grant; - cli->cl_lost_grant = 0; - client_obd_list_unlock(&cli->cl_loi_list_lock); + spin_lock(&cli->cl_loi_list_lock); + grant = cli->cl_avail_grant + cli->cl_reserved_grant; + if (data->ocd_connect_flags & OBD_CONNECT_GRANT_PARAM) + grant += cli->cl_dirty_grant; + else + grant += cli->cl_dirty_pages << PAGE_CACHE_SHIFT; + data->ocd_grant = grant ? : 2 * cli_brw_size(obd); + lost_grant = cli->cl_lost_grant; + cli->cl_lost_grant = 0; + spin_unlock(&cli->cl_loi_list_lock); CDEBUG(D_RPCTRACE, "ocd_connect_flags: "LPX64" ocd_version: %d" " ocd_grant: %d, lost: %ld.\n", data->ocd_connect_flags, @@ -2945,22 +2496,8 @@ static int osc_reconnect(const struct lu_env *env, static int osc_disconnect(struct obd_export *exp) { - struct obd_device *obd = class_exp2obd(exp); - struct llog_ctxt *ctxt; - int rc; - - ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT); - if (ctxt) { - if (obd->u.cli.cl_conn_count == 1) { - /* Flush any remaining cancel messages out to the - * target */ - llog_sync(ctxt, exp, 0); - } - llog_ctxt_put(ctxt); - } else { - CDEBUG(D_HA, "No LLOG_SIZE_REPL_CTXT found in obd %p\n", - obd); - } + struct obd_device *obd = class_exp2obd(exp); + int rc; rc = client_disconnect_export(exp); /** @@ -2985,6 +2522,37 @@ static int osc_disconnect(struct obd_export *exp) return rc; } +static int osc_ldlm_resource_invalidate(struct cfs_hash *hs, + struct cfs_hash_bd *bd, struct hlist_node *hnode, void *arg) +{ + struct lu_env *env = arg; + struct ldlm_resource *res = cfs_hash_object(hs, hnode); + struct ldlm_lock *lock; + struct osc_object *osc = NULL; + ENTRY; + + lock_res(res); + list_for_each_entry(lock, &res->lr_granted, l_res_link) { + if (lock->l_ast_data != NULL && osc == NULL) { + osc = lock->l_ast_data; + cl_object_get(osc2cl(osc)); + } + + /* clear LDLM_FL_CLEANED flag to make sure it will be canceled + * by the 2nd round of ldlm_namespace_clean() call in + * osc_import_event(). */ + ldlm_clear_cleaned(lock); + } + unlock_res(res); + + if (osc != NULL) { + osc_object_invalidate(env, osc); + cl_object_put(env, osc2cl(osc)); + } + + RETURN(0); +} + static int osc_import_event(struct obd_device *obd, struct obd_import *imp, enum obd_import_event event) @@ -2998,10 +2566,10 @@ static int osc_import_event(struct obd_device *obd, switch (event) { case IMP_EVENT_DISCON: { cli = &obd->u.cli; - client_obd_list_lock(&cli->cl_loi_list_lock); - cli->cl_avail_grant = 0; - cli->cl_lost_grant = 0; - client_obd_list_unlock(&cli->cl_loi_list_lock); + spin_lock(&cli->cl_loi_list_lock); + cli->cl_avail_grant = 0; + cli->cl_lost_grant = 0; + spin_unlock(&cli->cl_loi_list_lock); break; } case IMP_EVENT_INACTIVE: { @@ -3011,18 +2579,20 @@ static int osc_import_event(struct obd_device *obd, case IMP_EVENT_INVALIDATE: { struct ldlm_namespace *ns = obd->obd_namespace; struct lu_env *env; - int refcheck; + __u16 refcheck; + + ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY); env = cl_env_get(&refcheck); if (!IS_ERR(env)) { - /* Reset grants */ - cli = &obd->u.cli; - /* all pages go to failing rpcs due to the invalid - * import */ - osc_io_unplug(env, cli, NULL, PDL_POLICY_ROUND); - - ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY); - cl_env_put(env, &refcheck); + osc_io_unplug(env, &obd->u.cli, NULL); + + cfs_hash_for_each_nolock(ns->ns_rs_hash, + osc_ldlm_resource_invalidate, + env, 0); + cl_env_put(env, &refcheck); + + ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY); } else rc = PTR_ERR(env); break; @@ -3085,7 +2655,7 @@ static int brw_queue_work(const struct lu_env *env, void *data) CDEBUG(D_CACHE, "Run writeback work for client obd %p.\n", cli); - osc_io_unplug(env, cli, NULL, PDL_POLICY_SAME); + osc_io_unplug(env, cli, NULL); RETURN(0); } @@ -3095,6 +2665,9 @@ int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg) struct obd_type *type; void *handler; int rc; + int adding; + int added; + int req_count; ENTRY; rc = ptlrpcd_addref(); @@ -3121,7 +2694,7 @@ int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg) cli->cl_grant_shrink_interval = GRANT_SHRINK_INTERVAL; -#ifdef LPROCFS +#ifdef CONFIG_PROC_FS obd->obd_vars = lprocfs_osc_obd_vars; #endif /* If this is true then both client (osc) and server (osp) are on the @@ -3130,9 +2703,9 @@ int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg) * tree to type->typ_procsym instead of obd->obd_type->typ_procroot. */ type = class_search_type(LUSTRE_OSP_NAME); if (type && type->typ_procsym) { - obd->obd_proc_entry = lprocfs_seq_register(obd->obd_name, - type->typ_procsym, - obd->obd_vars, obd); + obd->obd_proc_entry = lprocfs_register(obd->obd_name, + type->typ_procsym, + obd->obd_vars, obd); if (IS_ERR(obd->obd_proc_entry)) { rc = PTR_ERR(obd->obd_proc_entry); CERROR("error %d setting up lprocfs for %s\n", rc, @@ -3140,7 +2713,7 @@ int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg) obd->obd_proc_entry = NULL; } } else { - rc = lprocfs_seq_obd_setup(obd); + rc = lprocfs_obd_setup(obd); } /* If the basic OSC proc tree construction succeeded then @@ -3151,18 +2724,28 @@ int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg) ptlrpc_lprocfs_register_obd(obd); } - /* We need to allocate a few requests more, because - * brw_interpret tries to create new requests before freeing - * previous ones, Ideally we want to have 2x max_rpcs_in_flight - * reserved, but I'm afraid that might be too much wasted RAM - * in fact, so 2 is just my guess and still should work. */ - cli->cl_import->imp_rq_pool = - ptlrpc_init_rq_pool(cli->cl_max_rpcs_in_flight + 2, - OST_MAXREQSIZE, - ptlrpc_add_rqs_to_pool); + /* + * We try to control the total number of requests with a upper limit + * osc_reqpool_maxreqcount. There might be some race which will cause + * over-limit allocation, but it is fine. + */ + req_count = atomic_read(&osc_pool_req_count); + if (req_count < osc_reqpool_maxreqcount) { + adding = cli->cl_max_rpcs_in_flight + 2; + if (req_count + adding > osc_reqpool_maxreqcount) + adding = osc_reqpool_maxreqcount - req_count; + + added = ptlrpc_add_rqs_to_pool(osc_rq_pool, adding); + atomic_add(added, &osc_pool_req_count); + } INIT_LIST_HEAD(&cli->cl_grant_shrink_list); ns_register_cancel(obd->obd_namespace, osc_cancel_weight); + + spin_lock(&osc_shrink_lock); + list_add_tail(&cli->cl_shrink_list, &osc_shrink_list); + spin_unlock(&osc_shrink_lock); + RETURN(0); out_ptlrpcd_work: @@ -3181,53 +2764,35 @@ out_ptlrpcd: RETURN(rc); } -static int osc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage) +static int osc_precleanup(struct obd_device *obd) { - int rc = 0; - ENTRY; + struct client_obd *cli = &obd->u.cli; + ENTRY; - switch (stage) { - case OBD_CLEANUP_EARLY: { - struct obd_import *imp; - imp = obd->u.cli.cl_import; - CDEBUG(D_HA, "Deactivating import %s\n", obd->obd_name); - /* ptlrpc_abort_inflight to stop an mds_lov_synchronize */ - ptlrpc_deactivate_import(imp); - spin_lock(&imp->imp_lock); - imp->imp_pingable = 0; - spin_unlock(&imp->imp_lock); - break; - } - case OBD_CLEANUP_EXPORTS: { - struct client_obd *cli = &obd->u.cli; - /* LU-464 - * for echo client, export may be on zombie list, wait for - * zombie thread to cull it, because cli.cl_import will be - * cleared in client_disconnect_export(): - * class_export_destroy() -> obd_cleanup() -> - * echo_device_free() -> echo_client_cleanup() -> - * obd_disconnect() -> osc_disconnect() -> - * client_disconnect_export() - */ - obd_zombie_barrier(); - if (cli->cl_writeback_work) { - ptlrpcd_destroy_work(cli->cl_writeback_work); - cli->cl_writeback_work = NULL; - } - if (cli->cl_lru_work) { - ptlrpcd_destroy_work(cli->cl_lru_work); - cli->cl_lru_work = NULL; - } - obd_cleanup_client_import(obd); - ptlrpc_lprocfs_unregister_obd(obd); - lprocfs_obd_cleanup(obd); - rc = obd_llog_finish(obd, 0); - if (rc != 0) - CERROR("failed to cleanup llogging subsystems\n"); - break; - } - } - RETURN(rc); + /* LU-464 + * for echo client, export may be on zombie list, wait for + * zombie thread to cull it, because cli.cl_import will be + * cleared in client_disconnect_export(): + * class_export_destroy() -> obd_cleanup() -> + * echo_device_free() -> echo_client_cleanup() -> + * obd_disconnect() -> osc_disconnect() -> + * client_disconnect_export() + */ + obd_zombie_barrier(); + if (cli->cl_writeback_work) { + ptlrpcd_destroy_work(cli->cl_writeback_work); + cli->cl_writeback_work = NULL; + } + + if (cli->cl_lru_work) { + ptlrpcd_destroy_work(cli->cl_lru_work); + cli->cl_lru_work = NULL; + } + + obd_cleanup_client_import(obd); + ptlrpc_lprocfs_unregister_obd(obd); + lprocfs_obd_cleanup(obd); + RETURN(0); } int osc_cleanup(struct obd_device *obd) @@ -3237,6 +2802,10 @@ int osc_cleanup(struct obd_device *obd) ENTRY; + spin_lock(&osc_shrink_lock); + list_del(&cli->cl_shrink_list); + spin_unlock(&osc_shrink_lock); + /* lru cleanup */ if (cli->cl_cache != NULL) { LASSERT(atomic_read(&cli->cl_cache->ccc_users) > 0); @@ -3244,32 +2813,31 @@ int osc_cleanup(struct obd_device *obd) list_del_init(&cli->cl_lru_osc); spin_unlock(&cli->cl_cache->ccc_lru_lock); cli->cl_lru_left = NULL; - atomic_dec(&cli->cl_cache->ccc_users); + cl_cache_decref(cli->cl_cache); cli->cl_cache = NULL; } - /* free memory of osc quota cache */ - osc_quota_cleanup(obd); + /* free memory of osc quota cache */ + osc_quota_cleanup(obd); - rc = client_obd_cleanup(obd); + rc = client_obd_cleanup(obd); - ptlrpcd_decref(); - RETURN(rc); + ptlrpcd_decref(); + RETURN(rc); } int osc_process_config_base(struct obd_device *obd, struct lustre_cfg *lcfg) { - int rc = class_process_proc_seq_param(PARAM_OSC, obd->obd_vars, - lcfg, obd); + int rc = class_process_proc_param(PARAM_OSC, obd->obd_vars, lcfg, obd); return rc > 0 ? 0: rc; } -static int osc_process_config(struct obd_device *obd, obd_count len, void *buf) +static int osc_process_config(struct obd_device *obd, size_t len, void *buf) { return osc_process_config_base(obd, buf); } -struct obd_ops osc_obd_ops = { +static struct obd_ops osc_obd_ops = { .o_owner = THIS_MODULE, .o_setup = osc_setup, .o_precleanup = osc_precleanup, @@ -3281,41 +2849,55 @@ struct obd_ops osc_obd_ops = { .o_disconnect = osc_disconnect, .o_statfs = osc_statfs, .o_statfs_async = osc_statfs_async, - .o_unpackmd = osc_unpackmd, .o_create = osc_create, .o_destroy = osc_destroy, .o_getattr = osc_getattr, - .o_getattr_async = osc_getattr_async, .o_setattr = osc_setattr, - .o_setattr_async = osc_setattr_async, - .o_change_cbdata = osc_change_cbdata, - .o_find_cbdata = osc_find_cbdata, .o_iocontrol = osc_iocontrol, - .o_get_info = osc_get_info, .o_set_info_async = osc_set_info_async, .o_import_event = osc_import_event, .o_process_config = osc_process_config, .o_quotactl = osc_quotactl, - .o_quotacheck = osc_quotacheck, }; -extern struct lu_kmem_descr osc_caches[]; -extern spinlock_t osc_ast_guard; -extern struct lock_class_key osc_ast_guard_class; +static struct shrinker *osc_cache_shrinker; +struct list_head osc_shrink_list = LIST_HEAD_INIT(osc_shrink_list); +DEFINE_SPINLOCK(osc_shrink_lock); -int __init osc_init(void) +#ifndef HAVE_SHRINKER_COUNT +static int osc_cache_shrink(SHRINKER_ARGS(sc, nr_to_scan, gfp_mask)) +{ + struct shrink_control scv = { + .nr_to_scan = shrink_param(sc, nr_to_scan), + .gfp_mask = shrink_param(sc, gfp_mask) + }; +#if !defined(HAVE_SHRINKER_WANT_SHRINK_PTR) && !defined(HAVE_SHRINK_CONTROL) + struct shrinker *shrinker = NULL; +#endif + + (void)osc_cache_shrink_scan(shrinker, &scv); + + return osc_cache_shrink_count(shrinker, &scv); +} +#endif + +static int __init osc_init(void) { bool enable_proc = true; struct obd_type *type; + unsigned int reqpool_size; + unsigned int reqsize; int rc; + DEF_SHRINKER_VAR(osc_shvar, osc_cache_shrink, + osc_cache_shrink_count, osc_cache_shrink_scan); ENTRY; - /* print an address of _any_ initialized kernel symbol from this - * module, to allow debugging with gdb that doesn't support data - * symbols from modules.*/ - CDEBUG(D_INFO, "Lustre OSC module (%p).\n", &osc_caches); + /* print an address of _any_ initialized kernel symbol from this + * module, to allow debugging with gdb that doesn't support data + * symbols from modules.*/ + CDEBUG(D_INFO, "Lustre OSC module (%p).\n", &osc_caches); - rc = lu_kmem_init(osc_caches); + rc = lu_kmem_init(osc_caches); if (rc) RETURN(rc); @@ -3324,31 +2906,57 @@ int __init osc_init(void) enable_proc = false; rc = class_register_type(&osc_obd_ops, NULL, enable_proc, NULL, -#ifndef HAVE_ONLY_PROCFS_SEQ - NULL, -#endif LUSTRE_OSC_NAME, &osc_device_type); - if (rc) { - lu_kmem_fini(osc_caches); - RETURN(rc); - } + if (rc) + GOTO(out_kmem, rc); + + osc_cache_shrinker = set_shrinker(DEFAULT_SEEKS, &osc_shvar); + + /* This is obviously too much memory, only prevent overflow here */ + if (osc_reqpool_mem_max >= 1 << 12 || osc_reqpool_mem_max == 0) + GOTO(out_type, rc = -EINVAL); + + reqpool_size = osc_reqpool_mem_max << 20; + + reqsize = 1; + while (reqsize < OST_IO_MAXREQSIZE) + reqsize = reqsize << 1; - spin_lock_init(&osc_ast_guard); - lockdep_set_class(&osc_ast_guard, &osc_ast_guard_class); + /* + * We don't enlarge the request count in OSC pool according to + * cl_max_rpcs_in_flight. The allocation from the pool will only be + * tried after normal allocation failed. So a small OSC pool won't + * cause much performance degression in most of cases. + */ + osc_reqpool_maxreqcount = reqpool_size / reqsize; + atomic_set(&osc_pool_req_count, 0); + osc_rq_pool = ptlrpc_init_rq_pool(0, OST_IO_MAXREQSIZE, + ptlrpc_add_rqs_to_pool); + + if (osc_rq_pool != NULL) + GOTO(out, rc); + rc = -ENOMEM; +out_type: + class_unregister_type(LUSTRE_OSC_NAME); +out_kmem: + lu_kmem_fini(osc_caches); +out: RETURN(rc); } -#ifdef __KERNEL__ -static void /*__exit*/ osc_exit(void) +static void __exit osc_exit(void) { + remove_shrinker(osc_cache_shrinker); class_unregister_type(LUSTRE_OSC_NAME); lu_kmem_fini(osc_caches); + ptlrpc_free_rq_pool(osc_rq_pool); } -MODULE_AUTHOR("Sun Microsystems, Inc. "); +MODULE_AUTHOR("OpenSFS, Inc. "); MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)"); +MODULE_VERSION(LUSTRE_VERSION_STRING); MODULE_LICENSE("GPL"); -cfs_module(osc, LUSTRE_VERSION_STRING, osc_init, osc_exit); -#endif +module_init(osc_init); +module_exit(osc_exit);