X-Git-Url: https://git.whamcloud.com/?a=blobdiff_plain;ds=sidebyside;f=lustre%2Fosc%2Fosc_request.c;h=fcac713242db3ed93320f91d141728228de084a5;hb=HEAD;hp=4bebfaa5b937a91e5588fe4272134555a476e878;hpb=a6798c5806088dc1892dd752012a54f0ec8f1798;p=fs%2Flustre-release.git diff --git a/lustre/osc/osc_request.c b/lustre/osc/osc_request.c index 4bebfaa..522d663 100644 --- a/lustre/osc/osc_request.c +++ b/lustre/osc/osc_request.c @@ -27,7 +27,6 @@ */ /* * This file is part of Lustre, http://www.lustre.org/ - * Lustre is a trademark of Sun Microsystems, Inc. */ #define DEBUG_SUBSYSTEM S_OSC @@ -36,20 +35,20 @@ #include #include #include -#include #include #include #include #include +#include #include #include +#include #include #include #include -#include -#include #include "osc_internal.h" +#include atomic_t osc_pool_req_count; unsigned int osc_reqpool_maxreqcount; @@ -87,7 +86,7 @@ static void osc_release_ppga(struct brw_page **ppga, size_t count); static int brw_interpret(const struct lu_env *env, struct ptlrpc_request *req, void *data, int rc); -void osc_pack_req_body(struct ptlrpc_request *req, struct obdo *oa) +static void osc_pack_req_body(struct ptlrpc_request *req, struct obdo *oa) { struct ost_body *body; @@ -100,9 +99,9 @@ void osc_pack_req_body(struct ptlrpc_request *req, struct obdo *oa) static int osc_getattr(const struct lu_env *env, struct obd_export *exp, struct obdo *oa) { - struct ptlrpc_request *req; - struct ost_body *body; - int rc; + struct ptlrpc_request *req; + struct ost_body *body; + int rc; ENTRY; req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR); @@ -135,7 +134,7 @@ static int osc_getattr(const struct lu_env *env, struct obd_export *exp, EXIT; out: - ptlrpc_req_finished(req); + ptlrpc_req_put(req); return rc; } @@ -143,9 +142,9 @@ out: static int osc_setattr(const struct lu_env *env, struct obd_export *exp, struct obdo *oa) { - struct ptlrpc_request *req; - struct ost_body *body; - int rc; + struct ptlrpc_request *req; + struct ost_body *body; + int rc; ENTRY; LASSERT(oa->o_valid & OBD_MD_FLGROUP); @@ -176,7 +175,7 @@ static int osc_setattr(const struct lu_env *env, struct obd_export *exp, EXIT; out: - ptlrpc_req_finished(req); + ptlrpc_req_put(req); RETURN(rc); } @@ -184,7 +183,7 @@ out: static int osc_setattr_interpret(const struct lu_env *env, struct ptlrpc_request *req, void *args, int rc) { - struct osc_setattr_args *sa = args; + struct osc_setattr_args *sa = args; struct ost_body *body; ENTRY; @@ -207,9 +206,9 @@ int osc_setattr_async(struct obd_export *exp, struct obdo *oa, obd_enqueue_update_f upcall, void *cookie, struct ptlrpc_request_set *rqset) { - struct ptlrpc_request *req; - struct osc_setattr_args *sa; - int rc; + struct ptlrpc_request *req; + struct osc_setattr_args *sa; + int rc; ENTRY; @@ -251,8 +250,8 @@ static int osc_ladvise_interpret(const struct lu_env *env, { struct osc_ladvise_args *la = arg; struct ost_body *body; - ENTRY; + ENTRY; if (rc != 0) GOTO(out, rc); @@ -275,16 +274,16 @@ int osc_ladvise_base(struct obd_export *exp, struct obdo *oa, obd_enqueue_update_f upcall, void *cookie, struct ptlrpc_request_set *rqset) { - struct ptlrpc_request *req; - struct ost_body *body; - struct osc_ladvise_args *la; - int rc; - struct lu_ladvise *req_ladvise; - struct lu_ladvise *ladvise = ladvise_hdr->lah_advise; - int num_advise = ladvise_hdr->lah_count; - struct ladvise_hdr *req_ladvise_hdr; - ENTRY; + struct ptlrpc_request *req; + struct ost_body *body; + struct osc_ladvise_args *la; + struct lu_ladvise *req_ladvise; + struct lu_ladvise *ladvise = ladvise_hdr->lah_advise; + int num_advise = ladvise_hdr->lah_count; + struct ladvise_hdr *req_ladvise_hdr; + int rc; + ENTRY; req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_LADVISE); if (req == NULL) RETURN(-ENOMEM); @@ -332,39 +331,39 @@ int osc_ladvise_base(struct obd_export *exp, struct obdo *oa, static int osc_create(const struct lu_env *env, struct obd_export *exp, struct obdo *oa) { - struct ptlrpc_request *req; - struct ost_body *body; - int rc; - ENTRY; + struct ptlrpc_request *req; + struct ost_body *body; + int rc; + ENTRY; LASSERT(oa != NULL); LASSERT(oa->o_valid & OBD_MD_FLGROUP); LASSERT(fid_seq_is_echo(ostid_seq(&oa->o_oi))); - req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE); - if (req == NULL) - GOTO(out, rc = -ENOMEM); + req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE); + if (req == NULL) + GOTO(out, rc = -ENOMEM); - rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE); - if (rc) { - ptlrpc_request_free(req); - GOTO(out, rc); - } + rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE); + if (rc) { + ptlrpc_request_free(req); + GOTO(out, rc); + } - body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY); - LASSERT(body); + body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY); + LASSERT(body); lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa); - ptlrpc_request_set_replen(req); + ptlrpc_request_set_replen(req); - rc = ptlrpc_queue_wait(req); - if (rc) - GOTO(out_req, rc); + rc = ptlrpc_queue_wait(req); + if (rc) + GOTO(out_req, rc); - body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY); - if (body == NULL) - GOTO(out_req, rc = -EPROTO); + body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY); + if (body == NULL) + GOTO(out_req, rc = -EPROTO); CDEBUG(D_INFO, "oa flags %x\n", oa->o_flags); lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa); @@ -375,7 +374,7 @@ static int osc_create(const struct lu_env *env, struct obd_export *exp, CDEBUG(D_HA, "transno: %lld\n", lustre_msg_get_transno(req->rq_repmsg)); out_req: - ptlrpc_req_finished(req); + ptlrpc_req_put(req); out: RETURN(rc); } @@ -448,16 +447,9 @@ int osc_fallocate_base(struct obd_export *exp, struct obdo *oa, struct ost_body *body; struct obd_import *imp = class_exp2cliimp(exp); int rc; - ENTRY; - /* - * Only mode == 0 (which is standard prealloc) is supported now. - * Punch is not supported yet. - */ - if (mode & ~FALLOC_FL_KEEP_SIZE) - RETURN(-EOPNOTSUPP); + ENTRY; oa->o_falloc_mode = mode; - req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_FALLOCATE); if (req == NULL) @@ -476,7 +468,7 @@ int osc_fallocate_base(struct obd_export *exp, struct obdo *oa, ptlrpc_request_set_replen(req); - req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_setattr_interpret; + req->rq_interpret_reply = osc_setattr_interpret; BUILD_BUG_ON(sizeof(*sa) > sizeof(req->rq_async_args)); sa = ptlrpc_req_async_args(sa, req); sa->sa_oa = oa; @@ -487,24 +479,28 @@ int osc_fallocate_base(struct obd_export *exp, struct obdo *oa, RETURN(0); } +EXPORT_SYMBOL(osc_fallocate_base); static int osc_sync_interpret(const struct lu_env *env, struct ptlrpc_request *req, void *args, int rc) { + const char *obd_name = req->rq_import->imp_obd->obd_name; struct osc_fsync_args *fa = args; struct ost_body *body; struct cl_attr *attr = &osc_env_info(env)->oti_attr; unsigned long valid = 0; struct cl_object *obj; - ENTRY; + ENTRY; if (rc != 0) GOTO(out, rc); body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY); if (body == NULL) { - CERROR("can't unpack ost_body\n"); - GOTO(out, rc = -EPROTO); + rc = -EPROTO; + CERROR("%s: Failed to unpack ost_body: rc = %d\n", obd_name, + rc); + GOTO(out, rc); } *fa->fa_oa = body->oa; @@ -528,24 +524,24 @@ out: int osc_sync_base(struct osc_object *obj, struct obdo *oa, obd_enqueue_update_f upcall, void *cookie, - struct ptlrpc_request_set *rqset) + struct ptlrpc_request_set *rqset) { - struct obd_export *exp = osc_export(obj); + struct obd_export *exp = osc_export(obj); struct ptlrpc_request *req; - struct ost_body *body; + struct ost_body *body; struct osc_fsync_args *fa; - int rc; - ENTRY; + int rc; - req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC); - if (req == NULL) - RETURN(-ENOMEM); + ENTRY; + req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC); + if (req == NULL) + RETURN(-ENOMEM); - rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC); - if (rc) { - ptlrpc_request_free(req); - RETURN(rc); - } + rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC); + if (rc) { + ptlrpc_request_free(req); + RETURN(rc); + } /* overload the size and blocks fields in the oa with start/end */ body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY); @@ -563,12 +559,13 @@ int osc_sync_base(struct osc_object *obj, struct obdo *oa, ptlrpc_set_add_req(rqset, req); - RETURN (0); + RETURN(0); } /* Find and cancel locally locks matched by @mode in the resource found by * @objid. Found locks are added into @cancel list. Returns the amount of - * locks added to @cancels list. */ + * locks added to @cancels list. + */ static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa, struct list_head *cancels, enum ldlm_mode mode, __u64 lock_flags) @@ -577,28 +574,27 @@ static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa, struct ldlm_res_id res_id; struct ldlm_resource *res; int count; - ENTRY; + ENTRY; /* Return, i.e. cancel nothing, only if ELC is supported (flag in * export) but disabled through procfs (flag in NS). * * This distinguishes from a case when ELC is not supported originally, * when we still want to cancel locks in advance and just cancel them - * locally, without sending any RPC. */ + * locally, without sending any RPC. + */ if (exp_connect_cancelset(exp) && !ns_connect_cancelset(ns)) RETURN(0); ostid_build_res_name(&oa->o_oi, &res_id); - res = ldlm_resource_get(ns, NULL, &res_id, 0, 0); + res = ldlm_resource_get(ns, &res_id, 0, 0); if (IS_ERR(res)) RETURN(0); - LDLM_RESOURCE_ADDREF(res); - count = ldlm_cancel_resource_local(res, cancels, NULL, mode, - lock_flags, 0, NULL); - LDLM_RESOURCE_DELREF(res); - ldlm_resource_putref(res); - RETURN(count); + count = ldlm_cancel_resource_local(res, cancels, NULL, mode, + lock_flags, 0, NULL); + ldlm_resource_putref(res); + RETURN(count); } static int osc_destroy_interpret(const struct lu_env *env, @@ -633,42 +629,42 @@ static int osc_can_send_destroy(struct client_obd *cli) static int osc_destroy(const struct lu_env *env, struct obd_export *exp, struct obdo *oa) { - struct client_obd *cli = &exp->exp_obd->u.cli; - struct ptlrpc_request *req; - struct ost_body *body; + struct client_obd *cli = &exp->exp_obd->u.cli; + struct ptlrpc_request *req; + struct ost_body *body; LIST_HEAD(cancels); - int rc, count; - ENTRY; + int rc, count; - if (!oa) { - CDEBUG(D_INFO, "oa NULL\n"); - RETURN(-EINVAL); - } + ENTRY; + if (!oa) { + CDEBUG(D_INFO, "oa NULL\n"); + RETURN(-EINVAL); + } - count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW, - LDLM_FL_DISCARD_DATA); + count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW, + LDLM_FL_DISCARD_DATA); - req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY); - if (req == NULL) { - ldlm_lock_list_put(&cancels, l_bl_ast, count); - RETURN(-ENOMEM); - } + req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY); + if (req == NULL) { + ldlm_lock_list_put(&cancels, l_bl_ast, count); + RETURN(-ENOMEM); + } - rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY, - 0, &cancels, count); - if (rc) { - ptlrpc_request_free(req); - RETURN(rc); - } + rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY, + 0, &cancels, count); + if (rc) { + ptlrpc_request_free(req); + RETURN(rc); + } - req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */ - ptlrpc_at_set_req_timeout(req); + req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */ + ptlrpc_at_set_req_timeout(req); body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY); LASSERT(body); lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa); - ptlrpc_request_set_replen(req); + ptlrpc_request_set_replen(req); req->rq_interpret_reply = osc_destroy_interpret; if (!osc_can_send_destroy(cli)) { @@ -680,7 +676,7 @@ static int osc_destroy(const struct lu_env *env, struct obd_export *exp, cli->cl_destroy_waitq, osc_can_send_destroy(cli)); if (rc) { - ptlrpc_req_finished(req); + ptlrpc_req_put(req); RETURN(-EINTR); } } @@ -691,7 +687,7 @@ static int osc_destroy(const struct lu_env *env, struct obd_export *exp, } static void osc_announce_cached(struct client_obd *cli, struct obdo *oa, - long writing_bytes) + long writing_bytes) { u64 bits = OBD_MD_FLBLOCKS | OBD_MD_FLGRANT; @@ -699,12 +695,12 @@ static void osc_announce_cached(struct client_obd *cli, struct obdo *oa, oa->o_valid |= bits; spin_lock(&cli->cl_loi_list_lock); - if (OCD_HAS_FLAG(&cli->cl_import->imp_connect_data, GRANT_PARAM)) + if (cli->cl_ocd_grant_param) oa->o_dirty = cli->cl_dirty_grant; else oa->o_dirty = cli->cl_dirty_pages << PAGE_SHIFT; if (unlikely(cli->cl_dirty_pages > cli->cl_dirty_max_pages)) { - CERROR("dirty %lu > dirty_max %lu\n", + CERROR("%s: dirty %lu > dirty_max %lu\n", cli_name(cli), cli->cl_dirty_pages, cli->cl_dirty_max_pages); oa->o_undirty = 0; @@ -712,15 +708,17 @@ static void osc_announce_cached(struct client_obd *cli, struct obdo *oa, (long)(obd_max_dirty_pages + 1))) { /* The atomic_read() allowing the atomic_inc() are * not covered by a lock thus they may safely race and trip - * this CERROR() unless we add in a small fudge factor (+1). */ + * this CERROR() unless we add in a small fudge factor (+1) + */ CERROR("%s: dirty %ld > system dirty_max %ld\n", cli_name(cli), atomic_long_read(&obd_dirty_pages), obd_max_dirty_pages); oa->o_undirty = 0; } else if (unlikely(cli->cl_dirty_max_pages - cli->cl_dirty_pages > 0x7fffffff)) { - CERROR("dirty %lu - dirty_max %lu too big???\n", - cli->cl_dirty_pages, cli->cl_dirty_max_pages); + CERROR("%s: dirty %lu - dirty_max %lu too big???\n", + cli_name(cli), cli->cl_dirty_pages, + cli->cl_dirty_max_pages); oa->o_undirty = 0; } else { unsigned long nrpages; @@ -730,13 +728,13 @@ static void osc_announce_cached(struct client_obd *cli, struct obdo *oa, nrpages *= cli->cl_max_rpcs_in_flight + 1; nrpages = max(nrpages, cli->cl_dirty_max_pages); undirty = nrpages << PAGE_SHIFT; - if (OCD_HAS_FLAG(&cli->cl_import->imp_connect_data, - GRANT_PARAM)) { + if (cli->cl_ocd_grant_param) { int nrextents; /* take extent tax into account when asking for more - * grant space */ - nrextents = (nrpages + cli->cl_max_extent_pages - 1) / + * grant space + */ + nrextents = (nrpages + cli->cl_max_extent_pages - 1) / cli->cl_max_extent_pages; undirty += nrextents * cli->cl_grant_extent_tax; } @@ -745,13 +743,22 @@ static void osc_announce_cached(struct client_obd *cli, struct obdo *oa, */ oa->o_undirty = min(undirty, OBD_MAX_GRANT & ~(PTLRPC_MAX_BRW_SIZE * 4UL)); - } + } oa->o_grant = cli->cl_avail_grant + cli->cl_reserved_grant; - oa->o_dropped = cli->cl_lost_grant; - cli->cl_lost_grant = 0; + /* o_dropped AKA o_misc is 32 bits, but cl_lost_grant is 64 bits */ + if (cli->cl_lost_grant > INT_MAX) { + CDEBUG(D_CACHE, + "%s: avoided o_dropped overflow: cl_lost_grant %lu\n", + cli_name(cli), cli->cl_lost_grant); + oa->o_dropped = INT_MAX; + } else { + oa->o_dropped = cli->cl_lost_grant; + } + cli->cl_lost_grant -= oa->o_dropped; spin_unlock(&cli->cl_loi_list_lock); - CDEBUG(D_CACHE, "dirty: %llu undirty: %u dropped %u grant: %llu\n", - oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant); + CDEBUG(D_CACHE, "%s: dirty: %llu undirty: %u dropped %u grant: %llu cl_lost_grant %lu\n", + cli_name(cli), oa->o_dirty, oa->o_undirty, oa->o_dropped, + oa->o_grant, cli->cl_lost_grant); } void osc_update_next_shrink(struct client_obd *cli) @@ -762,6 +769,7 @@ void osc_update_next_shrink(struct client_obd *cli) CDEBUG(D_CACHE, "next time %lld to shrink grant\n", cli->cl_next_shrink_grant); } +EXPORT_SYMBOL(osc_update_next_shrink); static void __osc_update_grant(struct client_obd *cli, u64 grant) { @@ -772,10 +780,10 @@ static void __osc_update_grant(struct client_obd *cli, u64 grant) static void osc_update_grant(struct client_obd *cli, struct ost_body *body) { - if (body->oa.o_valid & OBD_MD_FLGRANT) { + if (body->oa.o_valid & OBD_MD_FLGRANT) { CDEBUG(D_CACHE, "got %llu extra grant\n", body->oa.o_grant); - __osc_update_grant(cli, body->oa.o_grant); - } + __osc_update_grant(cli, body->oa.o_grant); + } } /** @@ -817,18 +825,19 @@ static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa) oa->o_grant = cli->cl_avail_grant / 4; cli->cl_avail_grant -= oa->o_grant; spin_unlock(&cli->cl_loi_list_lock); - if (!(oa->o_valid & OBD_MD_FLFLAGS)) { - oa->o_valid |= OBD_MD_FLFLAGS; - oa->o_flags = 0; - } - oa->o_flags |= OBD_FL_SHRINK_GRANT; - osc_update_next_shrink(cli); + if (!(oa->o_valid & OBD_MD_FLFLAGS)) { + oa->o_valid |= OBD_MD_FLFLAGS; + oa->o_flags = 0; + } + oa->o_flags |= OBD_FL_SHRINK_GRANT; + osc_update_next_shrink(cli); } /* Shrink the current grant, either from some large amount to enough for a * full set of in-flight RPCs, or if we have already shrunk to that limit * then to enough for a single RPC. This avoids keeping more grant than - * needed, and avoids shrinking the grant piecemeal. */ + * needed, and avoids shrinking the grant piecemeal. + */ static int osc_shrink_grant(struct client_obd *cli) { __u64 target_bytes = (cli->cl_max_rpcs_in_flight + 1) * @@ -844,14 +853,15 @@ static int osc_shrink_grant(struct client_obd *cli) int osc_shrink_grant_to_target(struct client_obd *cli, __u64 target_bytes) { - int rc = 0; - struct ost_body *body; - ENTRY; + int rc = 0; + struct ost_body *body; + ENTRY; spin_lock(&cli->cl_loi_list_lock); /* Don't shrink if we are already above or below the desired limit * We don't want to shrink below a single RPC, as that will negatively - * impact block allocation and long-term performance. */ + * impact block allocation and long-term performance. + */ if (target_bytes < cli->cl_max_pages_per_rpc << PAGE_SHIFT) target_bytes = cli->cl_max_pages_per_rpc << PAGE_SHIFT; @@ -876,21 +886,21 @@ int osc_shrink_grant_to_target(struct client_obd *cli, __u64 target_bytes) body->oa.o_grant = cli->cl_avail_grant - target_bytes; cli->cl_avail_grant = target_bytes; spin_unlock(&cli->cl_loi_list_lock); - if (!(body->oa.o_valid & OBD_MD_FLFLAGS)) { - body->oa.o_valid |= OBD_MD_FLFLAGS; - body->oa.o_flags = 0; - } - body->oa.o_flags |= OBD_FL_SHRINK_GRANT; - osc_update_next_shrink(cli); - - rc = osc_set_info_async(NULL, cli->cl_import->imp_obd->obd_self_export, - sizeof(KEY_GRANT_SHRINK), KEY_GRANT_SHRINK, - sizeof(*body), body, NULL); - if (rc != 0) - __osc_update_grant(cli, body->oa.o_grant); + if (!(body->oa.o_valid & OBD_MD_FLFLAGS)) { + body->oa.o_valid |= OBD_MD_FLFLAGS; + body->oa.o_flags = 0; + } + body->oa.o_flags |= OBD_FL_SHRINK_GRANT; + osc_update_next_shrink(cli); + + rc = osc_set_info_async(NULL, cli->cl_import->imp_obd->obd_self_export, + sizeof(KEY_GRANT_SHRINK), KEY_GRANT_SHRINK, + sizeof(*body), body, NULL); + if (rc != 0) + __osc_update_grant(cli, body->oa.o_grant); out_free: - OBD_FREE_PTR(body); - RETURN(rc); + OBD_FREE_PTR(body); + RETURN(rc); } static int osc_should_shrink_grant(struct client_obd *client) @@ -909,16 +919,16 @@ static int osc_should_shrink_grant(struct client_obd *client) if (ktime_get_seconds() >= next_shrink - 5) { /* Get the current RPC size directly, instead of going via: * cli_brw_size(obd->u.cli.cl_import->imp_obd->obd_self_export) - * Keep comment here so that it can be found by searching. */ + * Keep comment here so that it can be found by searching. + */ int brw_size = client->cl_max_pages_per_rpc << PAGE_SHIFT; if (client->cl_import->imp_state == LUSTRE_IMP_FULL && client->cl_avail_grant > brw_size) return 1; - else - osc_update_next_shrink(client); + osc_update_next_shrink(client); } - return 0; + return 0; } #define GRANT_SHRINK_RPC_BATCH 100 @@ -970,6 +980,7 @@ void osc_schedule_grant_work(void) cancel_delayed_work_sync(&work); schedule_work(&work.work); } +EXPORT_SYMBOL(osc_schedule_grant_work); /** * Start grant thread for returing grant to server for idle clients. @@ -1053,10 +1064,10 @@ void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd) ~chunk_mask) & chunk_mask; /* determine maximum extent size, in #pages */ size = (u64)ocd->ocd_grant_max_blks << ocd->ocd_grant_blkbits; - cli->cl_max_extent_pages = size >> PAGE_SHIFT; - if (cli->cl_max_extent_pages == 0) - cli->cl_max_extent_pages = 1; + cli->cl_max_extent_pages = (size >> PAGE_SHIFT) ?: 1; + cli->cl_ocd_grant_param = 1; } else { + cli->cl_ocd_grant_param = 0; cli->cl_grant_extent_tax = 0; cli->cl_chunkbits = PAGE_SHIFT; cli->cl_max_extent_pages = DT_MAX_BRW_PAGES; @@ -1077,38 +1088,39 @@ EXPORT_SYMBOL(osc_init_grant); /* We assume that the reason this OSC got a short read is because it read * beyond the end of a stripe file; i.e. lustre is reading a sparse file * via the LOV, and it _knows_ it's reading inside the file, it's just that - * this stripe never got written at or beyond this stripe offset yet. */ + * this stripe never got written at or beyond this stripe offset yet. + */ static void handle_short_read(int nob_read, size_t page_count, - struct brw_page **pga) + struct brw_page **pga) { - char *ptr; - int i = 0; + char *ptr; + int i = 0; - /* skip bytes read OK */ - while (nob_read > 0) { - LASSERT (page_count > 0); + /* skip bytes read OK */ + while (nob_read > 0) { + LASSERT(page_count > 0); - if (pga[i]->count > nob_read) { + if (pga[i]->bp_count > nob_read) { /* EOF inside this page */ - ptr = kmap(pga[i]->pg) + - (pga[i]->off & ~PAGE_MASK); - memset(ptr + nob_read, 0, pga[i]->count - nob_read); - kunmap(pga[i]->pg); + ptr = kmap(pga[i]->bp_page) + + (pga[i]->bp_off & ~PAGE_MASK); + memset(ptr + nob_read, 0, pga[i]->bp_count - nob_read); + kunmap(pga[i]->bp_page); page_count--; i++; break; } - nob_read -= pga[i]->count; - page_count--; - i++; - } + nob_read -= pga[i]->bp_count; + page_count--; + i++; + } /* zero remaining pages */ while (page_count-- > 0) { - ptr = kmap(pga[i]->pg) + (pga[i]->off & ~PAGE_MASK); - memset(ptr, 0, pga[i]->count); - kunmap(pga[i]->pg); + ptr = kmap(pga[i]->bp_page) + (pga[i]->bp_off & ~PAGE_MASK); + memset(ptr, 0, pga[i]->bp_count); + kunmap(pga[i]->bp_page); i++; } } @@ -1117,59 +1129,60 @@ static int check_write_rcs(struct ptlrpc_request *req, int requested_nob, int niocount, size_t page_count, struct brw_page **pga) { - int i; - __u32 *remote_rcs; + const char *obd_name = req->rq_import->imp_obd->obd_name; + __u32 *remote_rcs; + int i; - remote_rcs = req_capsule_server_sized_get(&req->rq_pill, &RMF_RCS, - sizeof(*remote_rcs) * - niocount); - if (remote_rcs == NULL) { - CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n"); - return(-EPROTO); - } + remote_rcs = req_capsule_server_sized_get(&req->rq_pill, &RMF_RCS, + sizeof(*remote_rcs) * + niocount); + if (remote_rcs == NULL) { + CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n"); + return(-EPROTO); + } - /* return error if any niobuf was in error */ - for (i = 0; i < niocount; i++) { + /* return error if any niobuf was in error */ + for (i = 0; i < niocount; i++) { if ((int)remote_rcs[i] < 0) { CDEBUG(D_INFO, "rc[%d]: %d req %p\n", i, remote_rcs[i], req); return remote_rcs[i]; } - if (remote_rcs[i] != 0) { - CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n", - i, remote_rcs[i], req); - return(-EPROTO); - } - } + if (remote_rcs[i] != 0) { + CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n", + i, remote_rcs[i], req); + return(-EPROTO); + } + } if (req->rq_bulk != NULL && req->rq_bulk->bd_nob_transferred != requested_nob) { - CERROR("Unexpected # bytes transferred: %d (requested %d)\n", - req->rq_bulk->bd_nob_transferred, requested_nob); - return(-EPROTO); - } + CERROR("%s: Unexpected # bytes transferred: %d (requested %d)\n", + obd_name, req->rq_bulk->bd_nob_transferred, + requested_nob); + return(-EPROTO); + } - return (0); + return (0); } static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2) { - if (p1->flag != p2->flag) { - unsigned mask = ~(OBD_BRW_FROM_GRANT | OBD_BRW_NOCACHE | - OBD_BRW_SYNC | OBD_BRW_ASYNC | - OBD_BRW_NOQUOTA | OBD_BRW_SOFT_SYNC); - - /* warn if we try to combine flags that we don't know to be - * safe to combine */ - if (unlikely((p1->flag & mask) != (p2->flag & mask))) { - CWARN("Saw flags 0x%x and 0x%x in the same brw, please " - "report this at https://jira.whamcloud.com/\n", - p1->flag, p2->flag); - } - return 0; - } + if (p1->bp_flag != p2->bp_flag) { + unsigned int mask = ~(OBD_BRW_FROM_GRANT | OBD_BRW_NOCACHE | + OBD_BRW_SYNC | OBD_BRW_ASYNC | + OBD_BRW_NOQUOTA | OBD_BRW_SOFT_SYNC | + OBD_BRW_SYS_RESOURCE); + + /* warn if combine flags that we don't know to be safe */ + if (unlikely((p1->bp_flag & mask) != (p2->bp_flag & mask))) { + CWARN("Saw flags 0x%x and 0x%x in the same brw, please report this at https://jira.whamcloud.com/\n", + p1->bp_flag, p2->bp_flag); + } + return 0; + } - return (p1->off + p1->count == p2->off); + return (p1->bp_off + p1->bp_count == p2->bp_off); } #if IS_ENABLED(CONFIG_CRC_T10DIF) @@ -1177,20 +1190,20 @@ static int osc_checksum_bulk_t10pi(const char *obd_name, int nob, size_t pg_count, struct brw_page **pga, int opc, obd_dif_csum_fn *fn, int sector_size, - u32 *check_sum) + u32 *check_sum, bool resend) { struct ahash_request *req; /* Used Adler as the default checksum type on top of DIF tags */ unsigned char cfs_alg = cksum_obd2cfs(OBD_CKSUM_T10_TOP); struct page *__page; unsigned char *buffer; - __u16 *guard_start; - unsigned int bufsize; + __be16 *guard_start; int guard_number; int used_number = 0; int used; u32 cksum; - int rc = 0; + unsigned int bufsize = sizeof(cksum); + int rc = 0, rc2; int i = 0; LASSERT(pg_count > 0); @@ -1208,64 +1221,83 @@ static int osc_checksum_bulk_t10pi(const char *obd_name, int nob, } buffer = kmap(__page); - guard_start = (__u16 *)buffer; + guard_start = (__be16 *)buffer; guard_number = PAGE_SIZE / sizeof(*guard_start); + CDEBUG(D_PAGE | (resend ? D_HA : 0), + "GRD tags per page=%u, resend=%u, bytes=%u, pages=%zu\n", + guard_number, resend, nob, pg_count); + while (nob > 0 && pg_count > 0) { - unsigned int count = pga[i]->count > nob ? nob : pga[i]->count; + int off = pga[i]->bp_off & ~PAGE_MASK; + unsigned int count = + pga[i]->bp_count > nob ? nob : pga[i]->bp_count; + int guards_needed = DIV_ROUND_UP(off + count, sector_size) - + (off / sector_size); + + if (guards_needed > guard_number - used_number) { + cfs_crypto_hash_update_page(req, __page, 0, + used_number * sizeof(*guard_start)); + used_number = 0; + } /* corrupt the data before we compute the checksum, to - * simulate an OST->client data error */ + * simulate an OST->client data error + */ if (unlikely(i == 0 && opc == OST_READ && - OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE))) { - unsigned char *ptr = kmap(pga[i]->pg); - int off = pga[i]->off & ~PAGE_MASK; + CFS_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE))) { + unsigned char *ptr = kmap(pga[i]->bp_page); memcpy(ptr + off, "bad1", min_t(typeof(nob), 4, nob)); - kunmap(pga[i]->pg); + kunmap(pga[i]->bp_page); } /* * The left guard number should be able to hold checksums of a * whole page */ - rc = obd_page_dif_generate_buffer(obd_name, pga[i]->pg, - pga[i]->off & ~PAGE_MASK, + rc = obd_page_dif_generate_buffer(obd_name, pga[i]->bp_page, + pga[i]->bp_off & ~PAGE_MASK, count, guard_start + used_number, guard_number - used_number, &used, sector_size, fn); + if (unlikely(resend)) + CDEBUG(D_PAGE | D_HA, + "pga[%u]: used %u off %llu+%u gen checksum: %*phN\n", + i, used, pga[i]->bp_off & ~PAGE_MASK, count, + (int)(used * sizeof(*guard_start)), + guard_start + used_number); if (rc) break; used_number += used; - if (used_number == guard_number) { - cfs_crypto_hash_update_page(req, __page, 0, - used_number * sizeof(*guard_start)); - used_number = 0; - } - - nob -= pga[i]->count; + nob -= pga[i]->bp_count; pg_count--; i++; } kunmap(__page); if (rc) - GOTO(out, rc); + GOTO(out_hash, rc); if (used_number != 0) cfs_crypto_hash_update_page(req, __page, 0, used_number * sizeof(*guard_start)); - bufsize = sizeof(cksum); - cfs_crypto_hash_final(req, (unsigned char *)&cksum, &bufsize); - - /* For sending we only compute the wrong checksum instead - * of corrupting the data so it is still correct on a redo */ - if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND)) - cksum++; +out_hash: + rc2 = cfs_crypto_hash_final(req, (unsigned char *)&cksum, &bufsize); + if (!rc) + rc = rc2; + if (rc == 0) { + /* For sending we only compute the wrong checksum instead + * of corrupting the data so it is still correct on a redo + */ + if (opc == OST_WRITE && + CFS_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND)) + cksum++; - *check_sum = cksum; + *check_sum = cksum; + } out: __free_page(__page); return rc; @@ -1273,7 +1305,7 @@ out: #else /* !CONFIG_CRC_T10DIF */ #define obd_dif_ip_fn NULL #define obd_dif_crc_fn NULL -#define osc_checksum_bulk_t10pi(name, nob, pgc, pga, opc, fn, ssize, csum) \ +#define osc_checksum_bulk_t10pi(name, nob, pgc, pga, opc, fn, ssize, csum, re) \ -EOPNOTSUPP #endif /* CONFIG_CRC_T10DIF */ @@ -1282,10 +1314,10 @@ static int osc_checksum_bulk(int nob, size_t pg_count, enum cksum_types cksum_type, u32 *cksum) { - int i = 0; - struct ahash_request *req; - unsigned int bufsize; - unsigned char cfs_alg = cksum_obd2cfs(cksum_type); + int i = 0; + struct ahash_request *req; + unsigned int bufsize; + unsigned char cfs_alg = cksum_obd2cfs(cksum_type); LASSERT(pg_count > 0); @@ -1297,25 +1329,27 @@ static int osc_checksum_bulk(int nob, size_t pg_count, } while (nob > 0 && pg_count > 0) { - unsigned int count = pga[i]->count > nob ? nob : pga[i]->count; + unsigned int count = + pga[i]->bp_count > nob ? nob : pga[i]->bp_count; /* corrupt the data before we compute the checksum, to - * simulate an OST->client data error */ + * simulate an OST->client data error + */ if (i == 0 && opc == OST_READ && - OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE)) { - unsigned char *ptr = kmap(pga[i]->pg); - int off = pga[i]->off & ~PAGE_MASK; + CFS_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE)) { + unsigned char *ptr = kmap(pga[i]->bp_page); + int off = pga[i]->bp_off & ~PAGE_MASK; memcpy(ptr + off, "bad1", min_t(typeof(nob), 4, nob)); - kunmap(pga[i]->pg); + kunmap(pga[i]->bp_page); } - cfs_crypto_hash_update_page(req, pga[i]->pg, - pga[i]->off & ~PAGE_MASK, + cfs_crypto_hash_update_page(req, pga[i]->bp_page, + pga[i]->bp_off & ~PAGE_MASK, count); - LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d\n", - (int)(pga[i]->off & ~PAGE_MASK)); + LL_CDEBUG_PAGE(D_PAGE, pga[i]->bp_page, "off %d\n", + (int)(pga[i]->bp_off & ~PAGE_MASK)); - nob -= pga[i]->count; + nob -= pga[i]->bp_count; pg_count--; i++; } @@ -1324,8 +1358,9 @@ static int osc_checksum_bulk(int nob, size_t pg_count, cfs_crypto_hash_final(req, (unsigned char *)cksum, &bufsize); /* For sending we only compute the wrong checksum instead - * of corrupting the data so it is still correct on a redo */ - if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND)) + * of corrupting the data so it is still correct on a redo + */ + if (opc == OST_WRITE && CFS_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND)) (*cksum)++; return 0; @@ -1335,7 +1370,7 @@ static int osc_checksum_bulk_rw(const char *obd_name, enum cksum_types cksum_type, int nob, size_t pg_count, struct brw_page **pga, int opc, - u32 *check_sum) + u32 *check_sum, bool resend) { obd_dif_csum_fn *fn = NULL; int sector_size = 0; @@ -1346,7 +1381,8 @@ static int osc_checksum_bulk_rw(const char *obd_name, if (fn) rc = osc_checksum_bulk_t10pi(obd_name, nob, pg_count, pga, - opc, fn, sector_size, check_sum); + opc, fn, sector_size, check_sum, + resend); else rc = osc_checksum_bulk(nob, pg_count, pga, opc, cksum_type, check_sum); @@ -1354,22 +1390,158 @@ static int osc_checksum_bulk_rw(const char *obd_name, RETURN(rc); } +#ifdef CONFIG_LL_ENCRYPTION +/** + * osc_encrypt_pagecache_blocks() - overlay to llcrypt_encrypt_pagecache_blocks + * @srcpage: The locked pagecache page containing the block(s) to encrypt + * @dstpage: The page to put encryption result + * @len: Total size of the block(s) to encrypt. Must be a nonzero + * multiple of the filesystem's block size. + * @offs: Byte offset within @page of the first block to encrypt. Must be + * a multiple of the filesystem's block size. + * @gfp_flags: Memory allocation flags + * + * This overlay function is necessary to be able to provide our own bounce page. + */ +static struct page *osc_encrypt_pagecache_blocks(struct page *srcpage, + struct page *dstpage, + unsigned int len, + unsigned int offs, + gfp_t gfp_flags) + +{ + const struct inode *inode = srcpage->mapping->host; + const unsigned int blockbits = inode->i_blkbits; + const unsigned int blocksize = 1 << blockbits; + u64 lblk_num = ((u64)srcpage->index << (PAGE_SHIFT - blockbits)) + + (offs >> blockbits); + unsigned int i; + int err; + + if (unlikely(!dstpage)) + return llcrypt_encrypt_pagecache_blocks(srcpage, len, offs, + gfp_flags); + + if (WARN_ON_ONCE(!PageLocked(srcpage))) + return ERR_PTR(-EINVAL); + + if (WARN_ON_ONCE(len <= 0 || !IS_ALIGNED(len | offs, blocksize))) + return ERR_PTR(-EINVAL); + + /* Set PagePrivate2 for disambiguation in + * osc_finalize_bounce_page(). + * It means cipher page was not allocated by llcrypt. + */ + SetPagePrivate2(dstpage); + + for (i = offs; i < offs + len; i += blocksize, lblk_num++) { + err = llcrypt_encrypt_block(inode, srcpage, dstpage, blocksize, + i, lblk_num, gfp_flags); + if (err) + return ERR_PTR(err); + } + SetPagePrivate(dstpage); + set_page_private(dstpage, (unsigned long)srcpage); + return dstpage; +} + +/** + * osc_finalize_bounce_page() - overlay to llcrypt_finalize_bounce_page + * + * This overlay function is necessary to handle bounce pages + * allocated by ourselves. + */ +static inline void osc_finalize_bounce_page(struct page **pagep) +{ + struct page *page = *pagep; + + ClearPageChecked(page); + /* PagePrivate2 was set in osc_encrypt_pagecache_blocks + * to indicate the cipher page was allocated by ourselves. + * So we must not free it via llcrypt. + */ + if (unlikely(!page || !PagePrivate2(page))) + return llcrypt_finalize_bounce_page(pagep); + + if (llcrypt_is_bounce_page(page)) { + *pagep = llcrypt_pagecache_page(page); + ClearPagePrivate2(page); + set_page_private(page, (unsigned long)NULL); + ClearPagePrivate(page); + } +} +#else /* !CONFIG_LL_ENCRYPTION */ +#define osc_encrypt_pagecache_blocks(srcpage, dstpage, len, offs, gfp_flags) \ + llcrypt_encrypt_pagecache_blocks(srcpage, len, offs, gfp_flags) +#define osc_finalize_bounce_page(page) llcrypt_finalize_bounce_page(page) +#endif + static inline void osc_release_bounce_pages(struct brw_page **pga, u32 page_count) { #ifdef HAVE_LUSTRE_CRYPTO - int i; + struct page **pa = NULL; + int i, j = 0; + + if (!pga[0]) + return; + +#ifdef CONFIG_LL_ENCRYPTION + if (PageChecked(pga[0]->bp_page)) { + OBD_ALLOC_PTR_ARRAY_LARGE(pa, page_count); + if (!pa) + return; + } +#endif for (i = 0; i < page_count; i++) { - if (!pga[i]->pg->mapping) - /* bounce pages are unmapped */ - llcrypt_finalize_bounce_page(&pga[i]->pg); - pga[i]->count -= pga[i]->bp_count_diff; - pga[i]->off += pga[i]->bp_off_diff; + /* Bounce pages used by osc_encrypt_pagecache_blocks() + * called from osc_brw_prep_request() + * are identified thanks to the PageChecked flag. + */ + if (PageChecked(pga[i]->bp_page)) { + if (pa) + pa[j++] = pga[i]->bp_page; + osc_finalize_bounce_page(&pga[i]->bp_page); + } + pga[i]->bp_count -= pga[i]->bp_count_diff; + pga[i]->bp_off += pga[i]->bp_off_diff; + } + + if (pa) { + obd_pool_put_pages_array(pa, j); + OBD_FREE_PTR_ARRAY_LARGE(pa, page_count); } #endif } +static inline bool is_interop_required(u64 foffset, u32 off0, u32 npgs, + struct brw_page **pga) +{ + struct brw_page *pg0 = pga[0]; + struct brw_page *pgN = pga[npgs - 1]; + const u32 nob = ((npgs - 2) << PAGE_SHIFT) + pg0->bp_count + + pgN->bp_count; + + return ((nob + off0) >= LNET_MTU && + cl_io_nob_aligned(foffset, nob, MD_MAX_INTEROP_PAGE_SIZE) != + cl_io_nob_aligned(foffset, nob, MD_MIN_INTEROP_PAGE_SIZE)); +} + +static inline u32 interop_pages(u64 foffset, u32 npgs, struct brw_page **pga) +{ + u32 off0; + + if (foffset == 0 || npgs < 15) + return 0; + + off0 = (foffset & (MD_MAX_INTEROP_PAGE_SIZE - 1)); + if (is_interop_required(foffset, off0, npgs, pga)) + return off0 >> MD_MIN_INTEROP_PAGE_SHIFT; + + return 0; +} + static int osc_brw_prep_request(int cmd, struct client_obd *cli, struct obdo *oa, u32 page_count, struct brw_page **pga, @@ -1386,13 +1558,29 @@ osc_brw_prep_request(int cmd, struct client_obd *cli, struct obdo *oa, struct brw_page *pg_prev; void *short_io_buf; const char *obd_name = cli->cl_import->imp_obd->obd_name; - struct inode *inode; + struct inode *inode = NULL; + bool directio = false; + bool gpu = 0; + bool enable_checksum = true; + struct cl_page *clpage; + u64 foffset = 0; ENTRY; - inode = page2inode(pga[0]->pg); - if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ)) + if (pga[0]->bp_page) { + clpage = oap2cl_page(brw_page2oap(pga[0])); + inode = clpage->cp_inode; + if (clpage->cp_type == CPT_TRANSIENT) { + directio = true; + /* When page size interop logic is not supported by the + * remote server use the old logic. + */ + if (imp_connect_unaligned_dio(cli->cl_import)) + foffset = pga[0]->bp_off; + } + } + if (CFS_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ)) RETURN(-ENOMEM); /* Recoverable */ - if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2)) + if (CFS_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2)) RETURN(-EINVAL); /* Fatal */ if ((cmd & OBD_BRW_WRITE) != 0) { @@ -1404,21 +1592,41 @@ osc_brw_prep_request(int cmd, struct client_obd *cli, struct obdo *oa, opc = OST_READ; req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW_READ); } - if (req == NULL) - RETURN(-ENOMEM); + if (req == NULL) + RETURN(-ENOMEM); + + if (opc == OST_WRITE && inode && IS_ENCRYPTED(inode) && + llcrypt_has_encryption_key(inode)) { + struct page **pa = NULL; + +#ifdef CONFIG_LL_ENCRYPTION + OBD_ALLOC_PTR_ARRAY_LARGE(pa, page_count); + if (pa == NULL) { + ptlrpc_request_free(req); + RETURN(-ENOMEM); + } + + rc = obd_pool_get_pages_array(pa, page_count); + if (rc) { + CDEBUG(D_SEC, "failed to allocate from enc pool: %d\n", + rc); + ptlrpc_request_free(req); + RETURN(rc); + } +#endif - if (opc == OST_WRITE && inode && IS_ENCRYPTED(inode)) { for (i = 0; i < page_count; i++) { - struct brw_page *pg = pga[i]; + struct brw_page *brwpg = pga[i]; struct page *data_page = NULL; bool retried = false; bool lockedbymyself; - u32 nunits = (pg->off & ~PAGE_MASK) + pg->count; + u32 nunits = + (brwpg->bp_off & ~PAGE_MASK) + brwpg->bp_count; + struct address_space *map_orig = NULL; + pgoff_t index_orig; retry_encrypt: - if (nunits & ~LUSTRE_ENCRYPTION_MASK) - nunits = (nunits & LUSTRE_ENCRYPTION_MASK) + - LUSTRE_ENCRYPTION_UNIT_SIZE; + nunits = round_up(nunits, LUSTRE_ENCRYPTION_UNIT_SIZE); /* The page can already be locked when we arrive here. * This is possible when cl_page_assume/vvp_page_assume * is stuck on wait_on_page_writeback with page lock @@ -1428,13 +1636,25 @@ retry_encrypt: * end in vvp_page_completion_write/cl_page_completion, * which means only once the page is fully processed. */ - lockedbymyself = trylock_page(pg->pg); + lockedbymyself = trylock_page(brwpg->bp_page); + if (directio) { + map_orig = brwpg->bp_page->mapping; + brwpg->bp_page->mapping = inode->i_mapping; + index_orig = brwpg->bp_page->index; + clpage = oap2cl_page(brw_page2oap(brwpg)); + brwpg->bp_page->index = clpage->cp_page_index; + } data_page = - llcrypt_encrypt_pagecache_blocks(pg->pg, - nunits, 0, - GFP_NOFS); + osc_encrypt_pagecache_blocks(brwpg->bp_page, + pa ? pa[i] : NULL, + nunits, 0, + GFP_NOFS); + if (directio) { + brwpg->bp_page->mapping = map_orig; + brwpg->bp_page->index = index_orig; + } if (lockedbymyself) - unlock_page(pg->pg); + unlock_page(brwpg->bp_page); if (IS_ERR(data_page)) { rc = PTR_ERR(data_page); if (rc == -ENOMEM && !retried) { @@ -1442,13 +1662,24 @@ retry_encrypt: rc = 0; goto retry_encrypt; } + if (pa) { + obd_pool_put_pages_array(pa + i, + page_count - i); + OBD_FREE_PTR_ARRAY_LARGE(pa, + page_count); + } ptlrpc_request_free(req); RETURN(rc); } - pg->pg = data_page; + /* Set PageChecked flag on bounce page for + * disambiguation in osc_release_bounce_pages(). + */ + SetPageChecked(data_page); + brwpg->bp_page = data_page; /* there should be no gap in the middle of page array */ if (i == page_count - 1) { - struct osc_async_page *oap = brw_page2oap(pg); + struct osc_async_page *oap = + brw_page2oap(brwpg); oa->o_size = oap->oap_count + oap->oap_obj_off + oap->oap_page_off; @@ -1456,71 +1687,111 @@ retry_encrypt: /* len is forced to nunits, and relative offset to 0 * so store the old, clear text info */ - pg->bp_count_diff = nunits - pg->count; - pg->count = nunits; - pg->bp_off_diff = pg->off & ~PAGE_MASK; - pg->off = pg->off & PAGE_MASK; + brwpg->bp_count_diff = nunits - brwpg->bp_count; + brwpg->bp_count = nunits; + brwpg->bp_off_diff = brwpg->bp_off & ~PAGE_MASK; + brwpg->bp_off = brwpg->bp_off & PAGE_MASK; + } + + if (pa) + OBD_FREE_PTR_ARRAY_LARGE(pa, page_count); + } else if (opc == OST_WRITE && inode && IS_ENCRYPTED(inode)) { + struct osc_async_page *oap = brw_page2oap(pga[0]); + struct cl_page *clpage = oap2cl_page(oap); + struct cl_object *clobj = clpage->cp_obj; + struct cl_attr attr = { 0 }; + struct lu_env *env; + __u16 refcheck; + + env = cl_env_get(&refcheck); + if (IS_ERR(env)) { + rc = PTR_ERR(env); + ptlrpc_request_free(req); + RETURN(rc); + } + + cl_object_attr_lock(clobj); + rc = cl_object_attr_get(env, clobj, &attr); + cl_object_attr_unlock(clobj); + cl_env_put(env, &refcheck); + if (rc != 0) { + ptlrpc_request_free(req); + RETURN(rc); } - } else if (opc == OST_READ && inode && IS_ENCRYPTED(inode)) { + if (attr.cat_size) + oa->o_size = attr.cat_size; + } else if (opc == OST_READ && inode && IS_ENCRYPTED(inode) && + llcrypt_has_encryption_key(inode)) { for (i = 0; i < page_count; i++) { struct brw_page *pg = pga[i]; - u32 nunits = (pg->off & ~PAGE_MASK) + pg->count; + u32 nunits = (pg->bp_off & ~PAGE_MASK) + pg->bp_count; - if (nunits & ~LUSTRE_ENCRYPTION_MASK) - nunits = (nunits & LUSTRE_ENCRYPTION_MASK) + - LUSTRE_ENCRYPTION_UNIT_SIZE; + nunits = round_up(nunits, LUSTRE_ENCRYPTION_UNIT_SIZE); /* count/off are forced to cover the whole encryption * unit size so that all encrypted data is stored on the * OST, so adjust bp_{count,off}_diff for the size of * the clear text. */ - pg->bp_count_diff = nunits - pg->count; - pg->count = nunits; - pg->bp_off_diff = pg->off & ~PAGE_MASK; - pg->off = pg->off & PAGE_MASK; + pg->bp_count_diff = nunits - pg->bp_count; + pg->bp_count = nunits; + pg->bp_off_diff = pg->bp_off & ~PAGE_MASK; + pg->bp_off = pg->bp_off & PAGE_MASK; } } - for (niocount = i = 1; i < page_count; i++) { - if (!can_merge_pages(pga[i - 1], pga[i])) - niocount++; - } + for (niocount = i = 1; i < page_count; i++) { + if (!can_merge_pages(pga[i - 1], pga[i])) + niocount++; + } - pill = &req->rq_pill; - req_capsule_set_size(pill, &RMF_OBD_IOOBJ, RCL_CLIENT, - sizeof(*ioobj)); - req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT, - niocount * sizeof(*niobuf)); + pill = &req->rq_pill; + req_capsule_set_size(pill, &RMF_OBD_IOOBJ, RCL_CLIENT, sizeof(*ioobj)); + req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT, + niocount * sizeof(*niobuf)); for (i = 0; i < page_count; i++) { - short_io_size += pga[i]->count; - if (!inode || !IS_ENCRYPTED(inode)) { + short_io_size += pga[i]->bp_count; + if (!inode || !IS_ENCRYPTED(inode) || + !llcrypt_has_encryption_key(inode)) { pga[i]->bp_count_diff = 0; pga[i]->bp_off_diff = 0; } } + if (brw_page2oap(pga[0])->oap_brw_flags & OBD_BRW_RDMA_ONLY) { + enable_checksum = false; + short_io_size = 0; + gpu = 1; + } + /* Check if read/write is small enough to be a short io. */ if (short_io_size > cli->cl_max_short_io_bytes || niocount > 1 || !imp_connect_shortio(cli->cl_import)) short_io_size = 0; + /* If this is an empty RPC to old server, just ignore it */ + if (!short_io_size && !pga[0]->bp_page) { + ptlrpc_request_free(req); + RETURN(-ENODATA); + } + req_capsule_set_size(pill, &RMF_SHORT_IO, RCL_CLIENT, opc == OST_READ ? 0 : short_io_size); if (opc == OST_READ) req_capsule_set_size(pill, &RMF_SHORT_IO, RCL_SERVER, short_io_size); - rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc); - if (rc) { - ptlrpc_request_free(req); - RETURN(rc); - } + rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc); + if (rc) { + ptlrpc_request_free(req); + RETURN(rc); + } osc_set_io_portal(req); ptlrpc_at_set_req_timeout(req); /* ask ptlrpc not to resend on EINPROGRESS since BRWs have their own - * retry logic */ + * retry logic + */ req->rq_no_retry_einprogress = 1; if (short_io_size != 0) { @@ -1536,14 +1807,18 @@ retry_encrypt: OST_BULK_PORTAL, &ptlrpc_bulk_kiov_pin_ops); - if (desc == NULL) - GOTO(out, rc = -ENOMEM); - /* NB request now owns desc and will free it when it gets freed */ + if (desc == NULL) + GOTO(out, rc = -ENOMEM); + /* NB request now owns desc and will free it when it gets freed */ + desc->bd_is_rdma = gpu; + if (directio && foffset) + desc->bd_md_offset = interop_pages(foffset, page_count, pga); + no_bulk: - body = req_capsule_client_get(pill, &RMF_OST_BODY); - ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ); - niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE); - LASSERT(body != NULL && ioobj != NULL && niobuf != NULL); + body = req_capsule_client_get(pill, &RMF_OST_BODY); + ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ); + niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE); + LASSERT(body != NULL && ioobj != NULL && niobuf != NULL); lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa); @@ -1552,21 +1827,20 @@ no_bulk: * oa contains valid o_uid and o_gid in these two operations. * Besides, filling o_uid and o_gid is enough for nrs-tbf, see LU-9658. * OBD_MD_FLUID and OBD_MD_FLUID is not set in order to avoid breaking - * other process logic */ + * other process logic + */ body->oa.o_uid = oa->o_uid; body->oa.o_gid = oa->o_gid; - obdo_to_ioobj(oa, ioobj); - ioobj->ioo_bufcnt = niocount; - /* The high bits of ioo_max_brw tells server _maximum_ number of bulks - * that might be send for this request. The actual number is decided - * when the RPC is finally sent in ptlrpc_register_bulk(). It sends - * "max - 1" for old client compatibility sending "0", and also so the - * the actual maximum is a power-of-two number, not one less. LU-1431 */ - if (desc != NULL) - ioobj_max_brw_set(ioobj, desc->bd_md_max_brw); - else /* short io */ - ioobj_max_brw_set(ioobj, 0); + if (inode && IS_ENCRYPTED(inode) && + llcrypt_has_encryption_key(inode) && + !CFS_FAIL_CHECK(OBD_FAIL_LFSCK_NO_ENCFLAG)) { + if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) { + body->oa.o_valid |= OBD_MD_FLFLAGS; + body->oa.o_flags = 0; + } + body->oa.o_flags |= LUSTRE_ENCRYPT_FL; + } if (short_io_size != 0) { if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) { @@ -1585,124 +1859,150 @@ no_bulk: LASSERT(page_count > 0); pg_prev = pga[0]; - for (requested_nob = i = 0; i < page_count; i++, niobuf++) { - struct brw_page *pg = pga[i]; - int poff = pg->off & ~PAGE_MASK; + for (requested_nob = i = 0; i < page_count; i++, niobuf++) { + struct brw_page *pg = pga[i]; + int poff = pg->bp_off & ~PAGE_MASK; - LASSERT(pg->count > 0); - /* make sure there is no gap in the middle of page array */ + LASSERT(pg->bp_count > 0); + /* make sure there is no gap in the middle of page array */ LASSERTF(page_count == 1 || - (ergo(i == 0, poff + pg->count == PAGE_SIZE) && + (ergo(i == 0, poff + pg->bp_count == PAGE_SIZE) && ergo(i > 0 && i < page_count - 1, - poff == 0 && pg->count == PAGE_SIZE) && + poff == 0 && pg->bp_count == PAGE_SIZE) && ergo(i == page_count - 1, poff == 0)), - "i: %d/%d pg: %p off: %llu, count: %u\n", - i, page_count, pg, pg->off, pg->count); - LASSERTF(i == 0 || pg->off > pg_prev->off, - "i %d p_c %u pg %p [pri %lu ind %lu] off %llu" - " prev_pg %p [pri %lu ind %lu] off %llu\n", - i, page_count, - pg->pg, page_private(pg->pg), pg->pg->index, pg->off, - pg_prev->pg, page_private(pg_prev->pg), - pg_prev->pg->index, pg_prev->off); - LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) == - (pg->flag & OBD_BRW_SRVLOCK)); + "i: %d/%d pg: %px off: %llu, count: %u\n", + i, page_count, pg, pg->bp_off, pg->bp_count); + LASSERTF(i == 0 || pg->bp_off > pg_prev->bp_off, + "i %d p_c %u pg %px [pri %lu ind %lu] off %llu prev_pg %px [pri %lu ind %lu] off %llu\n", + i, page_count, + pg->bp_page, page_private(pg->bp_page), + pg->bp_page->index, pg->bp_off, + pg_prev->bp_page, page_private(pg_prev->bp_page), + pg_prev->bp_page->index, pg_prev->bp_off); + LASSERT((pga[0]->bp_flag & OBD_BRW_SRVLOCK) == + (pg->bp_flag & OBD_BRW_SRVLOCK)); if (short_io_size != 0 && opc == OST_WRITE) { - unsigned char *ptr = kmap_atomic(pg->pg); + unsigned char *ptr = kmap_atomic(pg->bp_page); - LASSERT(short_io_size >= requested_nob + pg->count); + LASSERT(short_io_size >= requested_nob + pg->bp_count); memcpy(short_io_buf + requested_nob, ptr + poff, - pg->count); + pg->bp_count); kunmap_atomic(ptr); } else if (short_io_size == 0) { - desc->bd_frag_ops->add_kiov_frag(desc, pg->pg, poff, - pg->count); + desc->bd_frag_ops->add_kiov_frag(desc, pg->bp_page, + poff, pg->bp_count); } - requested_nob += pg->count; + requested_nob += pg->bp_count; - if (i > 0 && can_merge_pages(pg_prev, pg)) { - niobuf--; - niobuf->rnb_len += pg->count; + if (i > 0 && can_merge_pages(pg_prev, pg)) { + niobuf--; + niobuf->rnb_len += pg->bp_count; } else { - niobuf->rnb_offset = pg->off; - niobuf->rnb_len = pg->count; - niobuf->rnb_flags = pg->flag; - } - pg_prev = pg; - } - - LASSERTF((void *)(niobuf - niocount) == - req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE), - "want %p - real %p\n", req_capsule_client_get(&req->rq_pill, - &RMF_NIOBUF_REMOTE), (void *)(niobuf - niocount)); - - osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0); - if (resend) { - if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) { - body->oa.o_valid |= OBD_MD_FLFLAGS; - body->oa.o_flags = 0; - } - body->oa.o_flags |= OBD_FL_RECOV_RESEND; - } - - if (osc_should_shrink_grant(cli)) - osc_shrink_grant_local(cli, &body->oa); - - /* size[REQ_REC_OFF] still sizeof (*body) */ - if (opc == OST_WRITE) { - if (cli->cl_checksum && - !sptlrpc_flavor_has_bulk(&req->rq_flvr)) { - /* store cl_cksum_type in a local variable since - * it can be changed via lprocfs */ + niobuf->rnb_offset = pg->bp_off; + niobuf->rnb_len = pg->bp_count; + niobuf->rnb_flags = pg->bp_flag; + } + pg_prev = pg; + if (CFS_FAIL_CHECK(OBD_FAIL_OSC_MARK_COMPRESSED)) + niobuf->rnb_flags |= OBD_BRW_COMPRESSED; + } + + obdo_to_ioobj(oa, ioobj); + ioobj->ioo_bufcnt = niocount; + + /* The high bits of ioo_max_brw tells server _maximum_ number of bulks + * that might be send for this request. The actual number is decided + * when the RPC is finally sent in ptlrpc_register_bulk(). It sends + * "max - 1" for old client compatibility sending "0", and also so the + * actual maximum is a power-of-two number, not one less. LU-1431 + * + * The low bits are reserved for md flags used for interopability, Ex: + * - OBD_IOOBJ_INTEROP_PAGE_ALIGNMENT + */ + if (desc) + ioobj_max_brw_set(ioobj, desc->bd_md_max_brw, + desc->bd_md_offset); + else + ioobj_max_brw_set(ioobj, 0, 0); /* short io */ + + LASSERTF((void *)(niobuf - niocount) == + req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE), + "want %px - real %px\n", + req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE), + (void *)(niobuf - niocount)); + + osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0); + if (resend) { + if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) { + body->oa.o_valid |= OBD_MD_FLFLAGS; + body->oa.o_flags = 0; + } + body->oa.o_flags |= OBD_FL_RECOV_RESEND; + } + + if (osc_should_shrink_grant(cli)) + osc_shrink_grant_local(cli, &body->oa); + + if (!cli->cl_checksum || sptlrpc_flavor_has_bulk(&req->rq_flvr)) + enable_checksum = false; + + /* size[REQ_REC_OFF] still sizeof (*body) */ + if (opc == OST_WRITE) { + if (enable_checksum) { + /* store cl_cksum_type in a local variable since + * it can be changed via lprocfs + */ enum cksum_types cksum_type = cli->cl_cksum_type; - if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) - body->oa.o_flags = 0; + if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) + body->oa.o_flags = 0; body->oa.o_flags |= obd_cksum_type_pack(obd_name, cksum_type); - body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS; + body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS; rc = osc_checksum_bulk_rw(obd_name, cksum_type, requested_nob, page_count, pga, OST_WRITE, - &body->oa.o_cksum); + &body->oa.o_cksum, resend); if (rc < 0) { - CDEBUG(D_PAGE, "failed to checksum, rc = %d\n", + CDEBUG(D_PAGE, "failed to checksum: rc = %d\n", rc); GOTO(out, rc); } - CDEBUG(D_PAGE, "checksum at write origin: %x\n", - body->oa.o_cksum); + CDEBUG(D_PAGE | (resend ? D_HA : 0), + "checksum at write origin: %x (%x)\n", + body->oa.o_cksum, cksum_type); - /* save this in 'oa', too, for later checking */ - oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS; + /* save this in 'oa', too, for later checking */ + oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS; oa->o_flags |= obd_cksum_type_pack(obd_name, cksum_type); - } else { - /* clear out the checksum flag, in case this is a - * resend but cl_checksum is no longer set. b=11238 */ - oa->o_valid &= ~OBD_MD_FLCKSUM; - } - oa->o_cksum = body->oa.o_cksum; - /* 1 RC per niobuf */ - req_capsule_set_size(pill, &RMF_RCS, RCL_SERVER, - sizeof(__u32) * niocount); - } else { - if (cli->cl_checksum && - !sptlrpc_flavor_has_bulk(&req->rq_flvr)) { - if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) - body->oa.o_flags = 0; + } else { + /* clear out the checksum flag, in case this is a + * resend but cl_checksum is no longer set. b=11238 + */ + oa->o_valid &= ~OBD_MD_FLCKSUM; + } + oa->o_cksum = body->oa.o_cksum; + /* 1 RC per niobuf */ + req_capsule_set_size(pill, &RMF_RCS, RCL_SERVER, + sizeof(__u32) * niocount); + } else { + if (enable_checksum) { + if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) + body->oa.o_flags = 0; body->oa.o_flags |= obd_cksum_type_pack(obd_name, cli->cl_cksum_type); - body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS; + body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS; } /* Client cksum has been already copied to wire obdo in previous * lustre_set_wire_obdo(), and in the case a bulk-read is being * resent due to cksum error, this will allow Server to - * check+dump pages on its side */ + * check+dump pages on its side + */ } ptlrpc_request_set_replen(req); @@ -1721,11 +2021,11 @@ no_bulk: CDEBUG(D_RPCTRACE, "brw rpc %p - object "DOSTID" offset %lld<>%lld\n", req, POSTID(&oa->o_oi), niobuf[0].rnb_offset, niobuf[niocount - 1].rnb_offset + niobuf[niocount - 1].rnb_len); - RETURN(0); + RETURN(0); - out: - ptlrpc_req_finished(req); - RETURN(rc); +out: + ptlrpc_req_finished(req); + RETURN(rc); } char dbgcksum_file_name[PATH_MAX]; @@ -1735,63 +2035,64 @@ static void dump_all_bulk_pages(struct obdo *oa, __u32 page_count, __u32 client_cksum) { struct file *filp; - int rc, i; unsigned int len; + int rc, i; char *buf; /* will only keep dump of pages on first error for the same range in - * file/fid, not during the resends/retries. */ + * file/fid, not during the resends/retries. + */ snprintf(dbgcksum_file_name, sizeof(dbgcksum_file_name), "%s-checksum_dump-osc-"DFID":[%llu-%llu]-%x-%x", - (strncmp(libcfs_debug_file_path_arr, "NONE", 4) != 0 ? - libcfs_debug_file_path_arr : - LIBCFS_DEBUG_FILE_PATH_DEFAULT), + (strncmp(libcfs_debug_file_path, "NONE", 4) != 0 ? + libcfs_debug_file_path : LIBCFS_DEBUG_FILE_PATH_DEFAULT), oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : 0ULL, oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0, oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0, - pga[0]->off, - pga[page_count-1]->off + pga[page_count-1]->count - 1, + pga[0]->bp_off, + pga[page_count-1]->bp_off + pga[page_count-1]->bp_count - 1, client_cksum, server_cksum); + CWARN("%s: dumping checksum data\n", dbgcksum_file_name); filp = filp_open(dbgcksum_file_name, O_CREAT | O_EXCL | O_WRONLY | O_LARGEFILE, 0600); if (IS_ERR(filp)) { rc = PTR_ERR(filp); if (rc == -EEXIST) - CDEBUG(D_INFO, "%s: can't open to dump pages with " - "checksum error: rc = %d\n", dbgcksum_file_name, - rc); + CDEBUG(D_INFO, + "%s: can't open to dump pages with checksum error: rc = %d\n", + dbgcksum_file_name, rc); else - CERROR("%s: can't open to dump pages with checksum " - "error: rc = %d\n", dbgcksum_file_name, rc); + CERROR("%s: can't open to dump pages with checksum error: rc = %d\n", + dbgcksum_file_name, rc); return; } for (i = 0; i < page_count; i++) { - len = pga[i]->count; - buf = kmap(pga[i]->pg); + len = pga[i]->bp_count; + buf = kmap(pga[i]->bp_page); while (len != 0) { rc = cfs_kernel_write(filp, buf, len, &filp->f_pos); if (rc < 0) { - CERROR("%s: wanted to write %u but got %d " - "error\n", dbgcksum_file_name, len, rc); + CERROR("%s: wanted to write %u but got error: rc = %d\n", + dbgcksum_file_name, len, rc); break; } len -= rc; buf += rc; - CDEBUG(D_INFO, "%s: wrote %d bytes\n", - dbgcksum_file_name, rc); } - kunmap(pga[i]->pg); + kunmap(pga[i]->bp_page); } rc = vfs_fsync_range(filp, 0, LLONG_MAX, 1); if (rc) CERROR("%s: sync returns %d\n", dbgcksum_file_name, rc); filp_close(filp, NULL); + + libcfs_debug_dumplog(); } static int -check_write_checksum(struct obdo *oa, const struct lnet_process_id *peer, +check_write_checksum(struct obdo *oa, const struct lnet_processid *peer, __u32 client_cksum, __u32 server_cksum, struct osc_brw_async_args *aa) { @@ -1803,10 +2104,10 @@ check_write_checksum(struct obdo *oa, const struct lnet_process_id *peer, char *msg; int rc; - if (server_cksum == client_cksum) { - CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum); - return 0; - } + if (server_cksum == client_cksum) { + CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum); + return 0; + } if (aa->aa_cli->cl_checksum_dump) dump_all_bulk_pages(oa, aa->aa_page_count, aa->aa_ppga, @@ -1840,7 +2141,7 @@ check_write_checksum(struct obdo *oa, const struct lnet_process_id *peer, rc = osc_checksum_bulk_t10pi(obd_name, aa->aa_requested_nob, aa->aa_page_count, aa->aa_ppga, OST_WRITE, fn, sector_size, - &new_cksum); + &new_cksum, true); else rc = osc_checksum_bulk(aa->aa_requested_nob, aa->aa_page_count, aa->aa_ppga, OST_WRITE, cksum_type, @@ -1849,31 +2150,25 @@ check_write_checksum(struct obdo *oa, const struct lnet_process_id *peer, if (rc < 0) msg = "failed to calculate the client write checksum"; else if (cksum_type != obd_cksum_type_unpack(aa->aa_oa->o_flags)) - msg = "the server did not use the checksum type specified in " - "the original request - likely a protocol problem"; - else if (new_cksum == server_cksum) - msg = "changed on the client after we checksummed it - " - "likely false positive due to mmap IO (bug 11742)"; - else if (new_cksum == client_cksum) - msg = "changed in transit before arrival at OST"; - else - msg = "changed in transit AND doesn't match the original - " - "likely false positive due to mmap IO (bug 11742)"; - - LCONSOLE_ERROR_MSG(0x132, "%s: BAD WRITE CHECKSUM: %s: from %s inode " - DFID " object "DOSTID" extent [%llu-%llu], original " - "client csum %x (type %x), server csum %x (type %x)," - " client csum now %x\n", - obd_name, msg, libcfs_nid2str(peer->nid), - oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : (__u64)0, - oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0, - oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0, - POSTID(&oa->o_oi), aa->aa_ppga[0]->off, - aa->aa_ppga[aa->aa_page_count - 1]->off + - aa->aa_ppga[aa->aa_page_count-1]->count - 1, - client_cksum, - obd_cksum_type_unpack(aa->aa_oa->o_flags), - server_cksum, cksum_type, new_cksum); + msg = "the server did not use the checksum type specified in the original request - likely a protocol problem"; + else if (new_cksum == server_cksum) + msg = "changed on the client after we checksummed it - likely false positive due to mmap IO (bug 11742)"; + else if (new_cksum == client_cksum) + msg = "changed in transit before arrival at OST"; + else + msg = "changed in transit AND doesn't match the original - likely false positive due to mmap IO (bug 11742)"; + + LCONSOLE_ERROR("%s: BAD WRITE CHECKSUM: %s: from %s inode " DFID " object " DOSTID " extent [%llu-%llu], original client csum %x (type %x), server csum %x (type %x), client csum now %x\n", + obd_name, msg, libcfs_nidstr(&peer->nid), + oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : (__u64)0, + oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0, + oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0, + POSTID(&oa->o_oi), aa->aa_ppga[0]->bp_off, + aa->aa_ppga[aa->aa_page_count - 1]->bp_off + + aa->aa_ppga[aa->aa_page_count-1]->bp_count - 1, + client_cksum, + obd_cksum_type_unpack(aa->aa_oa->o_flags), + server_cksum, cksum_type, new_cksum); return 1; } @@ -1883,11 +2178,13 @@ static int osc_brw_fini_request(struct ptlrpc_request *req, int rc) struct osc_brw_async_args *aa = (void *)&req->rq_async_args; struct client_obd *cli = aa->aa_cli; const char *obd_name = cli->cl_import->imp_obd->obd_name; - const struct lnet_process_id *peer = + const struct lnet_processid *peer = &req->rq_import->imp_connection->c_peer; struct ost_body *body; u32 client_cksum = 0; - struct inode *inode; + struct inode *inode = NULL; + unsigned int blockbits = 0, blocksize = 0; + struct cl_page *clpage; ENTRY; @@ -1906,15 +2203,15 @@ static int osc_brw_fini_request(struct ptlrpc_request *req, int rc) /* set/clear over quota flag for a uid/gid/projid */ if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE && body->oa.o_valid & (OBD_MD_FLALLQUOTA)) { - unsigned qid[LL_MAXQUOTAS] = { + unsigned int qid[LL_MAXQUOTAS] = { body->oa.o_uid, body->oa.o_gid, body->oa.o_projid }; CDEBUG(D_QUOTA, "setdq for [%u %u %u] with valid %#llx, flags %x\n", body->oa.o_uid, body->oa.o_gid, body->oa.o_projid, body->oa.o_valid, body->oa.o_flags); - osc_quota_setdq(cli, req->rq_xid, qid, body->oa.o_valid, - body->oa.o_flags); + osc_quota_setdq(cli, req->rq_xid, qid, body->oa.o_valid, + body->oa.o_flags); } osc_update_grant(cli, body); @@ -1984,13 +2281,13 @@ static int osc_brw_fini_request(struct ptlrpc_request *req, int rc) nob = rc; while (nob > 0 && pg_count > 0) { unsigned char *ptr; - int count = aa->aa_ppga[i]->count > nob ? - nob : aa->aa_ppga[i]->count; + int count = aa->aa_ppga[i]->bp_count > nob ? + nob : aa->aa_ppga[i]->bp_count; CDEBUG(D_CACHE, "page %p count %d\n", - aa->aa_ppga[i]->pg, count); - ptr = kmap_atomic(aa->aa_ppga[i]->pg); - memcpy(ptr + (aa->aa_ppga[i]->off & ~PAGE_MASK), buf, + aa->aa_ppga[i]->bp_page, count); + ptr = kmap_atomic(aa->aa_ppga[i]->bp_page); + memcpy(ptr + (aa->aa_ppga[i]->bp_off & ~PAGE_MASK), buf, count); kunmap_atomic((void *) ptr); @@ -2001,35 +2298,40 @@ static int osc_brw_fini_request(struct ptlrpc_request *req, int rc) } } - if (rc < aa->aa_requested_nob) - handle_short_read(rc, aa->aa_page_count, aa->aa_ppga); + if (rc < aa->aa_requested_nob) + handle_short_read(rc, aa->aa_page_count, aa->aa_ppga); - if (body->oa.o_valid & OBD_MD_FLCKSUM) { - static int cksum_counter; - u32 server_cksum = body->oa.o_cksum; - char *via = ""; - char *router = ""; + if (body->oa.o_valid & OBD_MD_FLCKSUM) { + static int cksum_counter; + u32 server_cksum = body->oa.o_cksum; + int nob = rc; + char *via = ""; + char *router = ""; enum cksum_types cksum_type; u32 o_flags = body->oa.o_valid & OBD_MD_FLFLAGS ? body->oa.o_flags : 0; cksum_type = obd_cksum_type_unpack(o_flags); - rc = osc_checksum_bulk_rw(obd_name, cksum_type, rc, + rc = osc_checksum_bulk_rw(obd_name, cksum_type, nob, aa->aa_page_count, aa->aa_ppga, - OST_READ, &client_cksum); + OST_READ, &client_cksum, false); if (rc < 0) GOTO(out, rc); if (req->rq_bulk != NULL && - peer->nid != req->rq_bulk->bd_sender) { + !nid_same(&peer->nid, &req->rq_bulk->bd_sender)) { via = " via "; - router = libcfs_nid2str(req->rq_bulk->bd_sender); + router = libcfs_nidstr(&req->rq_bulk->bd_sender); } if (server_cksum != client_cksum) { struct ost_body *clbody; + __u32 client_cksum2; u32 page_count = aa->aa_page_count; + osc_checksum_bulk_rw(obd_name, cksum_type, nob, + page_count, aa->aa_ppga, + OST_READ, &client_cksum2, true); clbody = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY); if (cli->cl_checksum_dump) @@ -2037,25 +2339,21 @@ static int osc_brw_fini_request(struct ptlrpc_request *req, int rc) aa->aa_ppga, server_cksum, client_cksum); - LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from " - "%s%s%s inode "DFID" object "DOSTID - " extent [%llu-%llu], client %x, " - "server %x, cksum_type %x\n", - obd_name, - libcfs_nid2str(peer->nid), - via, router, - clbody->oa.o_valid & OBD_MD_FLFID ? - clbody->oa.o_parent_seq : 0ULL, - clbody->oa.o_valid & OBD_MD_FLFID ? - clbody->oa.o_parent_oid : 0, - clbody->oa.o_valid & OBD_MD_FLFID ? - clbody->oa.o_parent_ver : 0, - POSTID(&body->oa.o_oi), - aa->aa_ppga[0]->off, - aa->aa_ppga[page_count-1]->off + - aa->aa_ppga[page_count-1]->count - 1, - client_cksum, server_cksum, - cksum_type); + LCONSOLE_ERROR("%s: BAD READ CHECKSUM: from %s%s%s inode " DFID " object " DOSTID " extent [%llu-%llu], client %x/%x, server %x, cksum_type %x\n", + obd_name, libcfs_nidstr(&peer->nid), + via, router, + clbody->oa.o_valid & OBD_MD_FLFID ? + clbody->oa.o_parent_seq : 0ULL, + clbody->oa.o_valid & OBD_MD_FLFID ? + clbody->oa.o_parent_oid : 0, + clbody->oa.o_valid & OBD_MD_FLFID ? + clbody->oa.o_parent_ver : 0, + POSTID(&body->oa.o_oi), + aa->aa_ppga[0]->bp_off, + aa->aa_ppga[page_count-1]->bp_off + + aa->aa_ppga[page_count-1]->bp_count - 1, + client_cksum, client_cksum2, + server_cksum, cksum_type); cksum_counter = 0; aa->aa_oa->o_cksum = client_cksum; rc = -EAGAIN; @@ -2071,12 +2369,18 @@ static int osc_brw_fini_request(struct ptlrpc_request *req, int rc) if ((cksum_missed & (-cksum_missed)) == cksum_missed) CERROR("%s: checksum %u requested from %s but not sent\n", obd_name, cksum_missed, - libcfs_nid2str(peer->nid)); + libcfs_nidstr(&peer->nid)); } else { rc = 0; } - inode = page2inode(aa->aa_ppga[0]->pg); + /* get the inode from the first cl_page */ + clpage = oap2cl_page(brw_page2oap(aa->aa_ppga[0])); + inode = clpage->cp_inode; + if (clpage->cp_type == CPT_TRANSIENT && inode) { + blockbits = inode->i_blkbits; + blocksize = 1 << blockbits; + } if (inode && IS_ENCRYPTED(inode)) { int idx; @@ -2085,34 +2389,56 @@ static int osc_brw_fini_request(struct ptlrpc_request *req, int rc) GOTO(out, rc); } for (idx = 0; idx < aa->aa_page_count; idx++) { - struct brw_page *pg = aa->aa_ppga[idx]; + struct brw_page *brwpg = aa->aa_ppga[idx]; unsigned int offs = 0; while (offs < PAGE_SIZE) { /* do not decrypt if page is all 0s */ - if (memchr_inv(page_address(pg->pg) + offs, 0, - LUSTRE_ENCRYPTION_UNIT_SIZE) == NULL) { + if (memchr_inv(page_address(brwpg->bp_page) + + offs, 0, + LUSTRE_ENCRYPTION_UNIT_SIZE) == + NULL) { /* if page is empty forward info to * upper layers (ll_io_zero_page) by * clearing PagePrivate2 */ if (!offs) - ClearPagePrivate2(pg->pg); + ClearPagePrivate2(brwpg->bp_page); break; } - /* The page is already locked when we arrive here, - * except when we deal with a twisted page for - * specific Direct IO support, in which case - * PageChecked flag is set on page. - */ - if (PageChecked(pg->pg)) - lock_page(pg->pg); - rc = llcrypt_decrypt_pagecache_blocks(pg->pg, - LUSTRE_ENCRYPTION_UNIT_SIZE, - offs); - if (PageChecked(pg->pg)) - unlock_page(pg->pg); + if (blockbits) { + /* This is direct IO case. Directly call + * decrypt function that takes inode as + * input parameter. Page does not need + * to be locked. + */ + u64 lblk_num; + unsigned int i; + + clpage = + oap2cl_page(brw_page2oap(brwpg)); + lblk_num = + ((u64)(clpage->cp_page_index) << + (PAGE_SHIFT - blockbits)) + + (offs >> blockbits); + for (i = offs; i < offs + + LUSTRE_ENCRYPTION_UNIT_SIZE; + i += blocksize, lblk_num++) { + rc = + llcrypt_decrypt_block_inplace( + inode, brwpg->bp_page, + blocksize, i, + lblk_num); + if (rc) + break; + } + } else { + rc = llcrypt_decrypt_pagecache_blocks( + brwpg->bp_page, + LUSTRE_ENCRYPTION_UNIT_SIZE, + offs); + } if (rc) GOTO(out, rc); @@ -2134,10 +2460,9 @@ static int osc_brw_redo_request(struct ptlrpc_request *request, { struct ptlrpc_request *new_req; struct osc_brw_async_args *new_aa; - struct osc_async_page *oap; - ENTRY; - /* The below message is checked in replay-ost-single.sh test_8ae*/ + ENTRY; + /* The below message is checked in replay-ost-single.sh test_8ae */ DEBUG_REQ(rc == -EINPROGRESS ? D_RPCTRACE : D_ERROR, request, "redo for recoverable error %d", rc); @@ -2145,18 +2470,14 @@ static int osc_brw_redo_request(struct ptlrpc_request *request, OST_WRITE ? OBD_BRW_WRITE : OBD_BRW_READ, aa->aa_cli, aa->aa_oa, aa->aa_page_count, aa->aa_ppga, &new_req, 1); - if (rc) - RETURN(rc); - - list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) { - if (oap->oap_request != NULL) { - LASSERTF(request == oap->oap_request, - "request %p != oap_request %p\n", - request, oap->oap_request); - } - } - /* - * New request takes over pga and oaps from old request. + if (rc) + RETURN(rc); + + + LASSERTF(request == aa->aa_request, + "request %p != aa_request %p\n", + request, aa->aa_request); + /* New request takes over pga and oaps from old request. * Note that copying a list_head doesn't work, need to move it... */ aa->aa_resends++; @@ -2164,13 +2485,15 @@ static int osc_brw_redo_request(struct ptlrpc_request *request, new_req->rq_async_args = request->rq_async_args; new_req->rq_commit_cb = request->rq_commit_cb; /* cap resend delay to the current request timeout, this is similar to - * what ptlrpc does (see after_reply()) */ + * what ptlrpc does (see after_reply()) + */ if (aa->aa_resends > new_req->rq_timeout) - new_req->rq_sent = ktime_get_real_seconds() + new_req->rq_timeout; + new_req->rq_sent = ktime_get_real_seconds() + + new_req->rq_timeout; else new_req->rq_sent = ktime_get_real_seconds() + aa->aa_resends; - new_req->rq_generation_set = 1; - new_req->rq_import_generation = request->rq_import_generation; + new_req->rq_generation_set = 1; + new_req->rq_import_generation = request->rq_import_generation; new_aa = ptlrpc_req_async_args(new_aa, new_req); @@ -2180,25 +2503,23 @@ static int osc_brw_redo_request(struct ptlrpc_request *request, list_splice_init(&aa->aa_exts, &new_aa->aa_exts); new_aa->aa_resends = aa->aa_resends; - list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) { - if (oap->oap_request) { - ptlrpc_req_finished(oap->oap_request); - oap->oap_request = ptlrpc_request_addref(new_req); - } - } + if (aa->aa_request) { + ptlrpc_req_put(aa->aa_request); + new_aa->aa_request = ptlrpc_request_addref(new_req); + } /* XXX: This code will run into problem if we're going to support * to add a series of BRW RPCs into a self-defined ptlrpc_request_set * and wait for all of them to be finished. We should inherit request - * set from old request. */ + * set from old request. + */ ptlrpcd_add_req(new_req); DEBUG_REQ(D_INFO, new_req, "new request"); RETURN(0); } -/* - * ugh, we want disk allocation on the target to happen in offset order. we'll +/* ugh, we want disk allocation on the target to happen in offset order. we'll * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do * fine for our small page arrays and doesn't require allocation. its an * insertion sort that swaps elements that are strides apart, shrinking the @@ -2206,45 +2527,74 @@ static int osc_brw_redo_request(struct ptlrpc_request *request, */ static void sort_brw_pages(struct brw_page **array, int num) { - int stride, i, j; - struct brw_page *tmp; - - if (num == 1) - return; - for (stride = 1; stride < num ; stride = (stride * 3) + 1) - ; + int stride, i, j; + struct brw_page *tmp; - do { - stride /= 3; - for (i = stride ; i < num ; i++) { - tmp = array[i]; - j = i; - while (j >= stride && array[j - stride]->off > tmp->off) { - array[j] = array[j - stride]; - j -= stride; - } - array[j] = tmp; - } - } while (stride > 1); + if (num == 1) + return; + for (stride = 1; stride < num ; stride = (stride * 3) + 1) + ; + + do { + stride /= 3; + for (i = stride ; i < num ; i++) { + tmp = array[i]; + j = i; + while (j >= stride && + array[j - stride]->bp_off > tmp->bp_off) { + array[j] = array[j - stride]; + j -= stride; + } + array[j] = tmp; + } + } while (stride > 1); } static void osc_release_ppga(struct brw_page **ppga, size_t count) { LASSERT(ppga != NULL); - OBD_FREE_PTR_ARRAY(ppga, count); + OBD_FREE_PTR_ARRAY_LARGE(ppga, count); +} + +/* this is trying to propogate async writeback errors back up to the + * application. As an async write fails we record the error code for later if + * the app does an fsync. As long as errors persist we force future rpcs to be + * sync so that the app can get a sync error and break the cycle of queueing + * pages for which writeback will fail. + */ +static void osc_process_ar(struct osc_async_rc *ar, __u64 xid, + int rc) +{ + if (rc) { + if (!ar->ar_rc) + ar->ar_rc = rc; + + ar->ar_force_sync = 1; + ar->ar_min_xid = ptlrpc_sample_next_xid(); + return; + + } + + if (ar->ar_force_sync && (xid >= ar->ar_min_xid)) + ar->ar_force_sync = 0; } static int brw_interpret(const struct lu_env *env, struct ptlrpc_request *req, void *args, int rc) { struct osc_brw_async_args *aa = args; - struct osc_extent *ext; - struct osc_extent *tmp; struct client_obd *cli = aa->aa_cli; unsigned long transferred = 0; + struct cl_object *obj = NULL; + struct osc_async_page *last; + struct osc_extent *ext; + struct osc_extent *tmp; + struct lov_oinfo *loi; ENTRY; + ext = list_first_entry(&aa->aa_exts, struct osc_extent, oe_link); + rc = osc_brw_fini_request(req, rc); CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc); @@ -2258,16 +2608,15 @@ static int brw_interpret(const struct lu_env *env, if (osc_recoverable_error(rc) && !req->rq_no_delay) { if (req->rq_import_generation != req->rq_import->imp_generation) { - CDEBUG(D_HA, "%s: resend cross eviction for object: " - ""DOSTID", rc = %d.\n", + CDEBUG(D_HA, + "%s: resend cross eviction for object: "DOSTID": rc = %d.\n", req->rq_import->imp_obd->obd_name, POSTID(&aa->aa_oa->o_oi), rc); } else if (rc == -EINPROGRESS || - client_should_resend(aa->aa_resends, aa->aa_cli)) { + client_should_resend(aa->aa_resends, aa->aa_cli)) { rc = osc_brw_redo_request(req, aa, rc); } else { - CERROR("%s: too many resent retries for object: " - "%llu:%llu, rc = %d.\n", + CERROR("%s: too many resent retries for object: %llu:%llu: rc = %d\n", req->rq_import->imp_obd->obd_name, POSTID(&aa->aa_oa->o_oi), rc); } @@ -2278,15 +2627,14 @@ static int brw_interpret(const struct lu_env *env, rc = -EIO; } + last = brw_page2oap(aa->aa_ppga[aa->aa_page_count - 1]); + obj = osc2cl(ext->oe_obj); + loi = cl2osc(obj)->oo_oinfo; + if (rc == 0) { struct obdo *oa = aa->aa_oa; struct cl_attr *attr = &osc_env_info(env)->oti_attr; unsigned long valid = 0; - struct cl_object *obj; - struct osc_async_page *last; - - last = brw_page2oap(aa->aa_ppga[aa->aa_page_count - 1]); - obj = osc2cl(last->oap_obj); cl_object_attr_lock(obj); if (oa->o_valid & OBD_MD_FLBLOCKS) { @@ -2307,12 +2655,12 @@ static int brw_interpret(const struct lu_env *env, } if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) { - struct lov_oinfo *loi = cl2osc(obj)->oo_oinfo; loff_t last_off = last->oap_count + last->oap_obj_off + last->oap_page_off; /* Change file size if this is an out of quota or - * direct IO write and it extends the file size */ + * direct IO write and it extends the file size + */ if (loi->loi_lvb.lvb_size < last_off) { attr->cat_size = last_off; valid |= CAT_SIZE; @@ -2332,13 +2680,32 @@ static int brw_interpret(const struct lu_env *env, OBD_SLAB_FREE_PTR(aa->aa_oa, osc_obdo_kmem); aa->aa_oa = NULL; - if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE && rc == 0) + if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE && rc == 0) { osc_inc_unstable_pages(req); + /* + * If req->rq_committed is set, it means that the dirty pages + * have already committed into the stable storage on OSTs + * (i.e. Direct I/O). + */ + if (!req->rq_committed) + cl_object_dirty_for_sync(env, cl_object_top(obj)); + } + + if (aa->aa_request) { + __u64 xid = ptlrpc_req_xid(req); + ptlrpc_req_put(req); + if (xid && lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) { + spin_lock(&cli->cl_loi_list_lock); + osc_process_ar(&cli->cl_ar, xid, rc); + osc_process_ar(&loi->loi_ar, xid, rc); + spin_unlock(&cli->cl_loi_list_lock); + } + } list_for_each_entry_safe(ext, tmp, &aa->aa_exts, oe_link) { list_del_init(&ext->oe_link); osc_extent_finish(env, ext, 1, - rc && req->rq_no_delay ? -EWOULDBLOCK : rc); + rc && req->rq_no_delay ? -EAGAIN : rc); } LASSERT(list_empty(&aa->aa_exts)); LASSERT(list_empty(&aa->aa_oaps)); @@ -2353,7 +2720,8 @@ static int brw_interpret(const struct lu_env *env, spin_lock(&cli->cl_loi_list_lock); /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters * is called so we know whether to go to sync BRWs or wait for more - * RPCs to complete */ + * RPCs to complete + */ if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) cli->cl_w_in_flight--; else @@ -2370,7 +2738,8 @@ static void brw_commit(struct ptlrpc_request *req) /* If osc_inc_unstable_pages (via osc_extent_finish) races with * this called via the rq_commit_cb, I need to ensure * osc_dec_unstable_pages is still called. Otherwise unstable - * pages may be leaked. */ + * pages may be leaked. + */ spin_lock(&req->rq_lock); if (likely(req->rq_unstable)) { req->rq_unstable = 0; @@ -2391,33 +2760,35 @@ static void brw_commit(struct ptlrpc_request *req) int osc_build_rpc(const struct lu_env *env, struct client_obd *cli, struct list_head *ext_list, int cmd) { - struct ptlrpc_request *req = NULL; - struct osc_extent *ext; - struct brw_page **pga = NULL; - struct osc_brw_async_args *aa = NULL; - struct obdo *oa = NULL; - struct osc_async_page *oap; - struct osc_object *obj = NULL; - struct cl_req_attr *crattr = NULL; - loff_t starting_offset = OBD_OBJECT_EOF; - loff_t ending_offset = 0; + struct ptlrpc_request *req = NULL; + struct osc_extent *ext; + struct brw_page **pga = NULL; + struct osc_brw_async_args *aa = NULL; + struct obdo *oa = NULL; + struct osc_async_page *oap; + struct osc_object *obj = NULL; + struct cl_req_attr *crattr = NULL; + loff_t starting_offset = OBD_OBJECT_EOF; + loff_t ending_offset = 0; /* '1' for consistency with code that checks !mpflag to restore */ int mpflag = 1; - int mem_tight = 0; - int page_count = 0; - bool soft_sync = false; - bool ndelay = false; - int i; - int grant = 0; - int rc; - __u32 layout_version = 0; + int mem_tight = 0; + int page_count = 0; + bool soft_sync = false; + bool ndelay = false; + int grant = 0; + int i, rc; + __u32 layout_version = 0; LIST_HEAD(rpc_list); - struct ost_body *body; + struct ost_body *body; + ENTRY; LASSERT(!list_empty(ext_list)); /* add pages into rpc_list to build BRW rpc */ list_for_each_entry(ext, ext_list, oe_link) { + struct cl_sub_dio *sdio = ext->oe_csd; + LASSERT(ext->oe_state == OES_RPC); mem_tight |= ext->oe_memalloc; grant += ext->oe_grants; @@ -2425,13 +2796,24 @@ int osc_build_rpc(const struct lu_env *env, struct client_obd *cli, layout_version = max(layout_version, ext->oe_layout_version); if (obj == NULL) obj = ext->oe_obj; + + /* for unaligned writes, we do the data copying here */ + if (sdio && sdio->csd_unaligned && sdio->csd_write) { + rc = ll_dio_user_copy(sdio); + if (rc < 0) + GOTO(out, rc); + /* dio_user_copy has some concurrency handling in it, + * so we add this assert to ensure it did its job... + */ + LASSERT(sdio->csd_write_copied); + } } soft_sync = osc_over_unstable_soft_limit(cli); if (mem_tight) mpflag = memalloc_noreclaim_save(); - OBD_ALLOC_PTR_ARRAY(pga, page_count); + OBD_ALLOC_PTR_ARRAY_LARGE(pga, page_count); if (pga == NULL) GOTO(out, rc = -ENOMEM); @@ -2447,28 +2829,32 @@ int osc_build_rpc(const struct lu_env *env, struct client_obd *cli, if (soft_sync) oap->oap_brw_flags |= OBD_BRW_SOFT_SYNC; pga[i] = &oap->oap_brw_page; - pga[i]->off = oap->oap_obj_off + oap->oap_page_off; + pga[i]->bp_off = oap->oap_obj_off + oap->oap_page_off; i++; list_add_tail(&oap->oap_rpc_item, &rpc_list); if (starting_offset == OBD_OBJECT_EOF || - starting_offset > oap->oap_obj_off) + starting_offset > oap->oap_obj_off) { starting_offset = oap->oap_obj_off; - else + } else { + CDEBUG(D_CACHE, "page i:%d, oap->oap_obj_off %llu, oap->oap_page_off %u\n", + i, oap->oap_obj_off, oap->oap_page_off); LASSERT(oap->oap_page_off == 0); - if (ending_offset < oap->oap_obj_off + oap->oap_count) + } + if (ending_offset < oap->oap_obj_off + oap->oap_count) { ending_offset = oap->oap_obj_off + oap->oap_count; - else + } else { LASSERT(oap->oap_page_off + oap->oap_count == PAGE_SIZE); + } } if (ext->oe_ndelay) ndelay = true; } /* first page in the list */ - oap = list_entry(rpc_list.next, typeof(*oap), oap_rpc_item); + oap = list_first_entry(&rpc_list, typeof(*oap), oap_rpc_item); crattr = &osc_env_info(env)->oti_req_attr; memset(crattr, 0, sizeof(*crattr)); @@ -2492,40 +2878,44 @@ int osc_build_rpc(const struct lu_env *env, struct client_obd *cli, sort_brw_pages(pga, page_count); rc = osc_brw_prep_request(cmd, cli, oa, page_count, pga, &req, 0); if (rc != 0) { - CERROR("prep_req failed: %d\n", rc); + CERROR("%s: prep_req failed: rc = %d\n", + cli->cl_import->imp_obd->obd_name, rc); GOTO(out, rc); } req->rq_commit_cb = brw_commit; req->rq_interpret_reply = brw_interpret; req->rq_memalloc = mem_tight != 0; - oap->oap_request = ptlrpc_request_addref(req); if (ndelay) { req->rq_no_resend = req->rq_no_delay = 1; /* probably set a shorter timeout value. - * to handle ETIMEDOUT in brw_interpret() correctly. */ - /* lustre_msg_set_timeout(req, req->rq_timeout / 2); */ + * to handle ETIMEDOUT in brw_interpret() correctly. + * lustre_msg_set_timeout(req, req->rq_timeout / 2); + */ } /* Need to update the timestamps after the request is built in case * we race with setattr (locally or in queue at OST). If OST gets * later setattr before earlier BRW (as determined by the request xid), * the OST will not use BRW timestamps. Sadly, there is no obvious - * way to do this in a single call. bug 10150 */ + * way to do this in a single call. bug 10150 + */ body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY); crattr->cra_oa = &body->oa; crattr->cra_flags = OBD_MD_FLMTIME | OBD_MD_FLCTIME | OBD_MD_FLATIME; cl_req_attr_set(env, osc2cl(obj), crattr); - lustre_msg_set_jobid(req->rq_reqmsg, crattr->cra_jobid); + lustre_msg_set_jobinfo(req->rq_reqmsg, &crattr->cra_jobinfo); aa = ptlrpc_req_async_args(aa, req); INIT_LIST_HEAD(&aa->aa_oaps); list_splice_init(&rpc_list, &aa->aa_oaps); INIT_LIST_HEAD(&aa->aa_exts); list_splice_init(ext_list, &aa->aa_exts); + aa->aa_request = ptlrpc_request_addref(req); spin_lock(&cli->cl_loi_list_lock); starting_offset >>= PAGE_SHIFT; + ending_offset >>= PAGE_SHIFT; if (cmd == OBD_BRW_READ) { cli->cl_r_in_flight++; lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count); @@ -2542,9 +2932,20 @@ int osc_build_rpc(const struct lu_env *env, struct client_obd *cli, spin_unlock(&cli->cl_loi_list_lock); DEBUG_REQ(D_INODE, req, "%d pages, aa %p, now %ur/%uw in flight", - page_count, aa, cli->cl_r_in_flight, - cli->cl_w_in_flight); - OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_DELAY_IO, cfs_fail_val); + page_count, aa, cli->cl_r_in_flight, cli->cl_w_in_flight); + if (libcfs_debug & D_IOTRACE) { + struct lu_fid fid; + + fid.f_seq = crattr->cra_oa->o_parent_seq; + fid.f_oid = crattr->cra_oa->o_parent_oid; + fid.f_ver = crattr->cra_oa->o_parent_ver; + CDEBUG(D_IOTRACE, + DFID": %d %s pages, start %lld, end %lld, now %ur/%uw in flight\n", + PFID(&fid), page_count, + cmd == OBD_BRW_READ ? "read" : "write", starting_offset, + ending_offset, cli->cl_r_in_flight, cli->cl_w_in_flight); + } + CFS_FAIL_TIMEOUT(OBD_FAIL_OSC_DELAY_IO, cfs_fail_val); ptlrpcd_add_req(req); rc = 0; @@ -2564,10 +2965,11 @@ out: osc_release_ppga(pga, page_count); } /* this should happen rarely and is pretty bad, it makes the - * pending list not follow the dirty order */ - while (!list_empty(ext_list)) { - ext = list_entry(ext_list->next, struct osc_extent, - oe_link); + * pending list not follow the dirty order + */ + while ((ext = list_first_entry_or_null(ext_list, + struct osc_extent, + oe_link)) != NULL) { list_del_init(&ext->oe_link); osc_extent_finish(env, ext, 0, rc); } @@ -2575,13 +2977,41 @@ out: RETURN(rc); } +/* This is to refresh our lock in face of no RPCs. */ +void osc_send_empty_rpc(struct osc_object *osc, pgoff_t start) +{ + struct ptlrpc_request *req; + struct obdo oa; + struct brw_page bpg = { .bp_off = start, .bp_count = 1}; + struct brw_page *pga = &bpg; + int rc; + + memset(&oa, 0, sizeof(oa)); + oa.o_oi = osc->oo_oinfo->loi_oi; + oa.o_valid = OBD_MD_FLID | OBD_MD_FLGROUP | OBD_MD_FLFLAGS; + /* For updated servers - don't do a read */ + oa.o_flags = OBD_FL_NORPC; + + rc = osc_brw_prep_request(OBD_BRW_READ, osc_cli(osc), &oa, 1, &pga, + &req, 0); + + /* If we succeeded we ship it off, if not there's no point in doing + * anything. Also no resends. + * No interpret callback, no commit callback. + */ + if (!rc) { + req->rq_no_resend = 1; + ptlrpcd_add_req(req); + } +} + static int osc_set_lock_data(struct ldlm_lock *lock, void *data) { - int set = 0; + int set = 0; - LASSERT(lock != NULL); + LASSERT(lock != NULL); - lock_res_and_lock(lock); + lock_res_and_lock(lock); if (lock->l_ast_data == NULL) lock->l_ast_data = data; @@ -2593,15 +3023,16 @@ static int osc_set_lock_data(struct ldlm_lock *lock, void *data) return set; } -int osc_enqueue_fini(struct ptlrpc_request *req, osc_enqueue_upcall_f upcall, - void *cookie, struct lustre_handle *lockh, - enum ldlm_mode mode, __u64 *flags, bool speculative, - int errcode) +static int osc_enqueue_fini(struct ptlrpc_request *req, + osc_enqueue_upcall_f upcall, + void *cookie, struct lustre_handle *lockh, + enum ldlm_mode mode, __u64 *flags, + bool speculative, int errcode) { bool intent = *flags & LDLM_FL_HAS_INTENT; int rc; - ENTRY; + ENTRY; /* The request was created before ldlm_cli_enqueue call. */ if (intent && errcode == ELDLM_LOCK_ABORTED) { struct ldlm_reply *rep; @@ -2619,7 +3050,7 @@ int osc_enqueue_fini(struct ptlrpc_request *req, osc_enqueue_upcall_f upcall, *flags |= LDLM_FL_LVB_READY; } - /* Call the update callback. */ + /* Call the update callback. */ rc = (*upcall)(cookie, lockh, errcode); /* release the reference taken in ldlm_cli_enqueue() */ @@ -2631,8 +3062,9 @@ int osc_enqueue_fini(struct ptlrpc_request *req, osc_enqueue_upcall_f upcall, RETURN(rc); } -int osc_enqueue_interpret(const struct lu_env *env, struct ptlrpc_request *req, - void *args, int rc) +static int osc_enqueue_interpret(const struct lu_env *env, + struct ptlrpc_request *req, + void *args, int rc) { struct osc_enqueue_args *aa = args; struct ldlm_lock *lock; @@ -2641,27 +3073,31 @@ int osc_enqueue_interpret(const struct lu_env *env, struct ptlrpc_request *req, struct ost_lvb *lvb = aa->oa_lvb; __u32 lvb_len = sizeof(*lvb); __u64 flags = 0; + struct ldlm_enqueue_info einfo = { + .ei_type = aa->oa_type, + .ei_mode = mode, + }; ENTRY; - /* ldlm_cli_enqueue is holding a reference on the lock, so it must - * be valid. */ + /* ldlm_cli_enqueue holds a reference on the lock, it must be valid. */ lock = ldlm_handle2lock(lockh); LASSERTF(lock != NULL, - "lockh %#llx, req %p, aa %p - client evicted?\n", + "lockh %#llx, req %px, aa %px - client evicted?\n", lockh->cookie, req, aa); /* Take an additional reference so that a blocking AST that * ldlm_cli_enqueue_fini() might post for a failed lock, is guaranteed * to arrive after an upcall has been executed by - * osc_enqueue_fini(). */ + * osc_enqueue_fini(). + */ ldlm_lock_addref(lockh, mode); /* Let cl_lock_state_wait fail with -ERESTARTSYS to unuse sublocks. */ - OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_ENQUEUE_HANG, 2); + CFS_FAIL_TIMEOUT(OBD_FAIL_LDLM_ENQUEUE_HANG, 2); /* Let CP AST to grant the lock first. */ - OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 1); + CFS_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 1); if (aa->oa_speculative) { LASSERT(aa->oa_lvb == NULL); @@ -2670,17 +3106,17 @@ int osc_enqueue_interpret(const struct lu_env *env, struct ptlrpc_request *req, } /* Complete obtaining the lock procedure. */ - rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_type, 1, - aa->oa_mode, aa->oa_flags, lvb, lvb_len, - lockh, rc); + rc = ldlm_cli_enqueue_fini(aa->oa_exp, &req->rq_pill, &einfo, 1, + aa->oa_flags, lvb, lvb_len, lockh, rc, + false); /* Complete osc stuff. */ rc = osc_enqueue_fini(req, aa->oa_upcall, aa->oa_cookie, lockh, mode, aa->oa_flags, aa->oa_speculative, rc); - OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10); + CFS_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10); ldlm_lock_decref(lockh, mode); - LDLM_LOCK_PUT(lock); + ldlm_lock_put(lock); RETURN(rc); } @@ -2690,7 +3126,8 @@ int osc_enqueue_interpret(const struct lu_env *env, struct ptlrpc_request *req, * others may take a considerable amount of time in a case of ost failure; and * when other sync requests do not get released lock from a client, the client * is evicted from the cluster -- such scenarious make the life difficult, so - * release locks just after they are obtained. */ + * release locks just after they are obtained. + */ int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id, __u64 *flags, union ldlm_policy_data *policy, struct ost_lvb *lvb, osc_enqueue_upcall_f upcall, @@ -2702,40 +3139,48 @@ int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id, struct lustre_handle lockh = { 0 }; struct ptlrpc_request *req = NULL; int intent = *flags & LDLM_FL_HAS_INTENT; - __u64 match_flags = *flags; + __u64 search_flags = *flags; + __u64 match_flags = 0; enum ldlm_mode mode; int rc; - ENTRY; - /* Filesystem lock extents are extended to page boundaries so that - * dealing with the page cache is a little smoother. */ + ENTRY; + /* Filesystem lock extents are extended to page boundaries so that + * dealing with the page cache is a little smoother. + */ policy->l_extent.start -= policy->l_extent.start & ~PAGE_MASK; policy->l_extent.end |= ~PAGE_MASK; - /* Next, search for already existing extent locks that will cover us */ - /* If we're trying to read, we also search for an existing PW lock. The - * VFS and page cache already protect us locally, so lots of readers/ - * writers can share a single PW lock. - * - * There are problems with conversion deadlocks, so instead of - * converting a read lock to a write lock, we'll just enqueue a new - * one. - * - * At some point we should cancel the read lock instead of making them - * send us a blocking callback, but there are problems with canceling - * locks out from other users right now, too. */ - mode = einfo->ei_mode; - if (einfo->ei_mode == LCK_PR) - mode |= LCK_PW; + /* Next, search for already existing extent locks that will cover us + * + * If we're trying to read, we also search for an existing PW lock. The + * VFS and page cache already protect us locally, so lots of readers/ + * writers can share a single PW lock. + * + * There are problems with conversion deadlocks, so instead of + * converting a read lock to a write lock, we'll just enqueue a new + * one. + * + * At some point we should cancel the read lock instead of making them + * send us a blocking callback, but there are problems with canceling + * locks out from other users right now, too. + */ + mode = einfo->ei_mode; + if (einfo->ei_mode == LCK_PR) + mode |= LCK_PW; /* Normal lock requests must wait for the LVB to be ready before * matching a lock; speculative lock requests do not need to, - * because they will not actually use the lock. */ + * because they will not actually use the lock. + */ if (!speculative) - match_flags |= LDLM_FL_LVB_READY; + search_flags |= LDLM_FL_LVB_READY; if (intent != 0) - match_flags |= LDLM_FL_BLOCK_GRANTED; - mode = ldlm_lock_match(obd->obd_namespace, match_flags, res_id, - einfo->ei_type, policy, mode, &lockh); + search_flags |= LDLM_FL_BLOCK_GRANTED; + if (mode == LCK_GROUP) + match_flags = LDLM_MATCH_GROUP; + mode = ldlm_lock_match_with_skip(obd->obd_namespace, search_flags, 0, + res_id, einfo->ei_type, policy, mode, + &lockh, match_flags); if (mode) { struct ldlm_lock *matched; @@ -2747,7 +3192,8 @@ int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id, /* This DLM lock request is speculative, and does not * have an associated IO request. Therefore if there * is already a DLM lock, it wll just inform the - * caller to cancel the request for this stripe.*/ + * caller to cancel the request for this stripe. + */ lock_res_and_lock(matched); if (ldlm_extent_equal(&policy->l_extent, &matched->l_policy_data.l_extent)) @@ -2757,7 +3203,7 @@ int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id, unlock_res_and_lock(matched); ldlm_lock_decref(&lockh, mode); - LDLM_LOCK_PUT(matched); + ldlm_lock_put(matched); RETURN(rc); } else if (osc_set_lock_data(matched, einfo->ei_cbdata)) { *flags |= LDLM_FL_LVB_READY; @@ -2766,42 +3212,26 @@ int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id, (*upcall)(cookie, &lockh, ELDLM_LOCK_MATCHED); ldlm_lock_decref(&lockh, mode); - LDLM_LOCK_PUT(matched); + ldlm_lock_put(matched); RETURN(ELDLM_OK); } else { ldlm_lock_decref(&lockh, mode); - LDLM_LOCK_PUT(matched); + ldlm_lock_put(matched); } } if (*flags & (LDLM_FL_TEST_LOCK | LDLM_FL_MATCH_LOCK)) RETURN(-ENOLCK); - if (intent) { - req = ptlrpc_request_alloc(class_exp2cliimp(exp), - &RQF_LDLM_ENQUEUE_LVB); - if (req == NULL) - RETURN(-ENOMEM); - - rc = ldlm_prep_enqueue_req(exp, req, NULL, 0); - if (rc) { - ptlrpc_request_free(req); - RETURN(rc); - } - - req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER, - sizeof *lvb); - ptlrpc_request_set_replen(req); - } - - /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */ - *flags &= ~LDLM_FL_BLOCK_GRANTED; + /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */ + *flags &= ~LDLM_FL_BLOCK_GRANTED; - rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb, + rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb, sizeof(*lvb), LVB_T_OST, &lockh, async); if (async) { if (!rc) { struct osc_enqueue_args *aa; + aa = ptlrpc_req_async_args(aa, req); aa->oa_exp = exp; aa->oa_mode = einfo->ei_mode; @@ -2816,23 +3246,20 @@ int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id, } else { /* speculative locks are essentially to enqueue * a DLM lock in advance, so we don't care - * about the result of the enqueue. */ + * about the result of the enqueue. + */ aa->oa_lvb = NULL; aa->oa_flags = NULL; } req->rq_interpret_reply = osc_enqueue_interpret; ptlrpc_set_add_req(rqset, req); - } else if (intent) { - ptlrpc_req_finished(req); } RETURN(rc); } rc = osc_enqueue_fini(req, upcall, cookie, &lockh, einfo->ei_mode, flags, speculative, rc); - if (intent) - ptlrpc_req_finished(req); RETURN(rc); } @@ -2841,31 +3268,26 @@ int osc_match_base(const struct lu_env *env, struct obd_export *exp, struct ldlm_res_id *res_id, enum ldlm_type type, union ldlm_policy_data *policy, enum ldlm_mode mode, __u64 *flags, struct osc_object *obj, - struct lustre_handle *lockh, enum ldlm_match_flags match_flags) + struct lustre_handle *lockh, + enum ldlm_match_flags match_flags) { struct obd_device *obd = exp->exp_obd; __u64 lflags = *flags; enum ldlm_mode rc; - ENTRY; - if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH)) + ENTRY; + if (CFS_FAIL_CHECK(OBD_FAIL_OSC_MATCH)) RETURN(-EIO); /* Filesystem lock extents are extended to page boundaries so that - * dealing with the page cache is a little smoother */ + * dealing with the page cache is a little smoother + */ policy->l_extent.start -= policy->l_extent.start & ~PAGE_MASK; policy->l_extent.end |= ~PAGE_MASK; /* Next, search for already existing extent locks that will cover us */ - /* If we're trying to read, we also search for an existing PW lock. The - * VFS and page cache already protect us locally, so lots of readers/ - * writers can share a single PW lock. */ - rc = mode; - if (mode == LCK_PR) - rc |= LCK_PW; - rc = ldlm_lock_match_with_skip(obd->obd_namespace, lflags, 0, - res_id, type, policy, rc, lockh, + res_id, type, policy, mode, lockh, match_flags); if (rc == 0 || lflags & LDLM_FL_TEST_LOCK) RETURN(rc); @@ -2886,7 +3308,7 @@ int osc_match_base(const struct lu_env *env, struct obd_export *exp, ldlm_lock_decref(lockh, rc); rc = 0; } - LDLM_LOCK_PUT(lock); + ldlm_lock_put(lock); } RETURN(rc); } @@ -2915,7 +3337,7 @@ static int osc_statfs_interpret(const struct lu_env *env, msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS); if (msfs == NULL) - GOTO(out, rc = -EPROTO); + GOTO(out, rc = -EPROTO); *aa->aa_oi->oi_osfs = *msfs; out: @@ -2926,14 +3348,14 @@ out: static int osc_statfs_async(struct obd_export *exp, struct obd_info *oinfo, time64_t max_age, - struct ptlrpc_request_set *rqset) + struct ptlrpc_request_set *rqset) { - struct obd_device *obd = class_exp2obd(exp); - struct ptlrpc_request *req; - struct osc_async_args *aa; + struct obd_device *obd = class_exp2obd(exp); + struct ptlrpc_request *req; + struct osc_async_args *aa; int rc; - ENTRY; + ENTRY; if (obd->obd_osfs_age >= max_age) { CDEBUG(D_SUPER, "%s: use %p cache blocks %llu/%llu objects %llu/%llu\n", @@ -2950,21 +3372,22 @@ static int osc_statfs_async(struct obd_export *exp, RETURN(0); } - /* We could possibly pass max_age in the request (as an absolute - * timestamp or a "seconds.usec ago") so the target can avoid doing - * extra calls into the filesystem if that isn't necessary (e.g. - * during mount that would help a bit). Having relative timestamps - * is not so great if request processing is slow, while absolute - * timestamps are not ideal because they need time synchronization. */ - req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS); - if (req == NULL) - RETURN(-ENOMEM); - - rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS); - if (rc) { - ptlrpc_request_free(req); - RETURN(rc); - } + /* We could possibly pass max_age in the request (as an absolute + * timestamp or a "seconds.usec ago") so the target can avoid doing + * extra calls into the filesystem if that isn't necessary (e.g. + * during mount that would help a bit). Having relative timestamps + * is not so great if request processing is slow, while absolute + * timestamps are not ideal because they need time synchronization. + */ + req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS); + if (req == NULL) + RETURN(-ENOMEM); + + rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS); + if (rc) { + ptlrpc_request_free(req); + RETURN(rc); + } ptlrpc_request_set_replen(req); req->rq_request_portal = OST_CREATE_PORTAL; ptlrpc_at_set_req_timeout(req); @@ -2986,29 +3409,28 @@ static int osc_statfs_async(struct obd_export *exp, static int osc_statfs(const struct lu_env *env, struct obd_export *exp, struct obd_statfs *osfs, time64_t max_age, __u32 flags) { - struct obd_device *obd = class_exp2obd(exp); - struct obd_statfs *msfs; + struct obd_device *obd = class_exp2obd(exp); + struct obd_statfs *msfs; struct ptlrpc_request *req; - struct obd_import *imp = NULL; + struct obd_import *imp, *imp0; int rc; - ENTRY; - - /*Since the request might also come from lprocfs, so we need - *sync this with client_disconnect_export Bug15684*/ - down_read(&obd->u.cli.cl_sem); - if (obd->u.cli.cl_import) - imp = class_import_get(obd->u.cli.cl_import); - up_read(&obd->u.cli.cl_sem); - if (!imp) - RETURN(-ENODEV); + ENTRY; + /* Since the request might also come from lprocfs, so we need + * sync this with client_disconnect_export Bug15684 + */ + with_imp_locked(obd, imp0, rc) + imp = class_import_get(imp0); + if (rc) + RETURN(rc); /* We could possibly pass max_age in the request (as an absolute * timestamp or a "seconds.usec ago") so the target can avoid doing * extra calls into the filesystem if that isn't necessary (e.g. * during mount that would help a bit). Having relative timestamps * is not so great if request processing is slow, while absolute - * timestamps are not ideal because they need time synchronization. */ + * timestamps are not ideal because they need time synchronization. + */ req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS); class_import_put(imp); @@ -3043,7 +3465,7 @@ static int osc_statfs(const struct lu_env *env, struct obd_export *exp, EXIT; out: - ptlrpc_req_finished(req); + ptlrpc_req_put(req); return rc; } @@ -3051,30 +3473,57 @@ static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len, void *karg, void __user *uarg) { struct obd_device *obd = exp->exp_obd; - struct obd_ioctl_data *data = karg; - int rc = 0; + struct obd_ioctl_data *data; + int rc; ENTRY; + CDEBUG(D_IOCTL, "%s: cmd=%x len=%u karg=%pK uarg=%pK\n", + obd->obd_name, cmd, len, karg, uarg); + if (!try_module_get(THIS_MODULE)) { CERROR("%s: cannot get module '%s'\n", obd->obd_name, module_name(THIS_MODULE)); - return -EINVAL; + RETURN(-EINVAL); } + switch (cmd) { case OBD_IOC_CLIENT_RECOVER: + if (unlikely(karg == NULL)) { + OBD_IOC_ERROR(obd->obd_name, cmd, "karg=NULL", + rc = -EINVAL); + break; + } + data = karg; rc = ptlrpc_recover_import(obd->u.cli.cl_import, data->ioc_inlbuf1, 0); if (rc > 0) rc = 0; break; - case IOC_OSC_SET_ACTIVE: + case OBD_IOC_GETATTR: + if (unlikely(karg == NULL)) { + OBD_IOC_ERROR(obd->obd_name, cmd, "karg=NULL", + rc = -EINVAL); + break; + } + data = karg; + rc = obd_getattr(NULL, exp, &data->ioc_obdo1); + break; +#ifdef IOC_OSC_SET_ACTIVE + case_OBD_IOC_DEPRECATED_FT(IOC_OSC_SET_ACTIVE, obd->obd_name, 2, 17); +#endif + case OBD_IOC_SET_ACTIVE: + if (unlikely(karg == NULL)) { + OBD_IOC_ERROR(obd->obd_name, cmd, "karg=NULL", + rc = -EINVAL); + break; + } + data = karg; rc = ptlrpc_set_import_active(obd->u.cli.cl_import, data->ioc_offset); break; default: - rc = -ENOTTY; - CDEBUG(D_INODE, "%s: unrecognised ioctl %#x by %s: rc = %d\n", - obd->obd_name, cmd, current->comm, rc); + rc = OBD_IOC_DEBUG(D_IOCTL, obd->obd_name, cmd, "unrecognized", + -ENOTTY); break; } @@ -3086,31 +3535,31 @@ int osc_set_info_async(const struct lu_env *env, struct obd_export *exp, u32 keylen, void *key, u32 vallen, void *val, struct ptlrpc_request_set *set) { - struct ptlrpc_request *req; - struct obd_device *obd = exp->exp_obd; - struct obd_import *imp = class_exp2cliimp(exp); - char *tmp; - int rc; - ENTRY; + struct ptlrpc_request *req; + struct obd_device *obd = exp->exp_obd; + struct obd_import *imp = class_exp2cliimp(exp); + char *tmp; + int rc; - OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10); + ENTRY; + CFS_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10); - if (KEY_IS(KEY_CHECKSUM)) { - if (vallen != sizeof(int)) - RETURN(-EINVAL); - exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0; - RETURN(0); - } + if (KEY_IS(KEY_CHECKSUM)) { + if (vallen != sizeof(int)) + RETURN(-EINVAL); + exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0; + RETURN(0); + } - if (KEY_IS(KEY_SPTLRPC_CONF)) { - sptlrpc_conf_client_adapt(obd); - RETURN(0); - } + if (KEY_IS(KEY_SPTLRPC_CONF)) { + sptlrpc_conf_client_adapt(obd); + RETURN(0); + } - if (KEY_IS(KEY_FLUSH_CTX)) { - sptlrpc_import_flush_my_ctx(imp); - RETURN(0); - } + if (KEY_IS(KEY_FLUSH_CTX)) { + sptlrpc_import_flush_my_ctx(imp); + RETURN(0); + } if (KEY_IS(KEY_CACHE_LRU_SHRINK)) { struct client_obd *cli = &obd->u.cli; @@ -3122,15 +3571,38 @@ int osc_set_info_async(const struct lu_env *env, struct obd_export *exp, RETURN(0); } - if (!set && !KEY_IS(KEY_GRANT_SHRINK)) - RETURN(-EINVAL); + if (KEY_IS(KEY_UNEVICT_CACHE_SHRINK)) { + struct client_obd *cli = &obd->u.cli; + long ret; - /* We pass all other commands directly to OST. Since nobody calls osc - methods directly and everybody is supposed to go through LOV, we - assume lov checked invalid values for us. - The only recognised values so far are evict_by_nid and mds_conn. - Even if something bad goes through, we'd get a -EINVAL from OST - anyway. */ + ret = osc_unevict_cache_shrink(env, cli); + if (ret > 0) + ret = 0; + + /* + * Clear unused cache pages and move mlock()ed pages from + * the normal LRU list into unevictable LRU list. + */ + ret = osc_lru_shrink(env, cli, + atomic_long_read(&cli->cl_lru_in_list), + true); + if (ret > 0) + ret = 0; + + RETURN(ret); + } + + if (!set && !KEY_IS(KEY_GRANT_SHRINK)) + RETURN(-EINVAL); + + /* + * We pass all other commands directly to OST. Since nobody calls osc + * methods directly and everybody is supposed to go through LOV, we + * assume lov checked invalid values for us. + * The only recognised values so far are evict_by_nid and mds_conn. + * Even if something bad goes through, we'd get a -EINVAL from OST + * anyway. + */ req = ptlrpc_request_alloc(imp, KEY_IS(KEY_GRANT_SHRINK) ? &RQF_OST_SET_GRANT_INFO : @@ -3163,7 +3635,7 @@ int osc_set_info_async(const struct lu_env *env, struct obd_export *exp, aa = ptlrpc_req_async_args(aa, req); OBD_SLAB_ALLOC_PTR_GFP(oa, osc_obdo_kmem, GFP_NOFS); if (!oa) { - ptlrpc_req_finished(req); + ptlrpc_req_put(req); RETURN(-ENOMEM); } *oa = ((struct ost_body *)val)->oa; @@ -3208,9 +3680,10 @@ int osc_reconnect(const struct lu_env *env, struct obd_export *exp, cli->cl_lost_grant = 0; spin_unlock(&cli->cl_loi_list_lock); - CDEBUG(D_RPCTRACE, "ocd_connect_flags: %#llx ocd_version: %d" - " ocd_grant: %d, lost: %ld.\n", data->ocd_connect_flags, - data->ocd_version, data->ocd_grant, lost_grant); + CDEBUG(D_RPCTRACE, + "ocd_connect_flags: %#llx ocd_version: %d ocd_grant: %d, lost: %ld.\n", + data->ocd_connect_flags, data->ocd_version, + data->ocd_grant, lost_grant); } RETURN(0); @@ -3250,8 +3723,8 @@ int osc_ldlm_resource_invalidate(struct cfs_hash *hs, struct cfs_hash_bd *bd, struct ldlm_resource *res = cfs_hash_object(hs, hnode); struct ldlm_lock *lock; struct osc_object *osc = NULL; - ENTRY; + ENTRY; lock_res(res); list_for_each_entry(lock, &res->lr_granted, l_res_link) { if (lock->l_ast_data != NULL && osc == NULL) { @@ -3261,7 +3734,8 @@ int osc_ldlm_resource_invalidate(struct cfs_hash *hs, struct cfs_hash_bd *bd, /* clear LDLM_FL_CLEANED flag to make sure it will be canceled * by the 2nd round of ldlm_namespace_clean() call in - * osc_import_event(). */ + * osc_import_event(). + */ ldlm_clear_cleaned(lock); } unlock_res(res); @@ -3275,38 +3749,40 @@ int osc_ldlm_resource_invalidate(struct cfs_hash *hs, struct cfs_hash_bd *bd, } EXPORT_SYMBOL(osc_ldlm_resource_invalidate); -static int osc_import_event(struct obd_device *obd, - struct obd_import *imp, - enum obd_import_event event) +static int osc_import_event(struct obd_device *obd, struct obd_import *imp, + enum obd_import_event event) { - struct client_obd *cli; - int rc = 0; - - ENTRY; - LASSERT(imp->imp_obd == obd); + struct client_obd *cli; + int rc = 0; - switch (event) { - case IMP_EVENT_DISCON: { - cli = &obd->u.cli; + ENTRY; + if (WARN_ON_ONCE(!obd || !imp || imp->imp_obd != obd)) + RETURN(-ENODEV); + + switch (event) { + case IMP_EVENT_DISCON: { + cli = &obd->u.cli; + if (!cli) + RETURN(-ENODEV); spin_lock(&cli->cl_loi_list_lock); cli->cl_avail_grant = 0; cli->cl_lost_grant = 0; spin_unlock(&cli->cl_loi_list_lock); - break; - } - case IMP_EVENT_INACTIVE: { + break; + } + case IMP_EVENT_INACTIVE: { rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE); - break; - } - case IMP_EVENT_INVALIDATE: { - struct ldlm_namespace *ns = obd->obd_namespace; - struct lu_env *env; - __u16 refcheck; + break; + } + case IMP_EVENT_INVALIDATE: { + struct ldlm_namespace *ns = obd->obd_namespace; + struct lu_env *env; + __u16 refcheck; ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY); - env = cl_env_get(&refcheck); - if (!IS_ERR(env)) { + env = cl_env_get(&refcheck); + if (!IS_ERR(env)) { osc_io_unplug(env, &obd->u.cli, NULL); cfs_hash_for_each_nolock(ns->ns_rs_hash, @@ -3315,40 +3791,43 @@ static int osc_import_event(struct obd_device *obd, cl_env_put(env, &refcheck); ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY); - } else - rc = PTR_ERR(env); - break; - } - case IMP_EVENT_ACTIVE: { + } else { + rc = PTR_ERR(env); + } + break; + } + case IMP_EVENT_ACTIVE: { rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE); - break; - } - case IMP_EVENT_OCD: { - struct obd_connect_data *ocd = &imp->imp_connect_data; + break; + } + case IMP_EVENT_OCD: { + struct obd_connect_data *ocd = &imp->imp_connect_data; - if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT) - osc_init_grant(&obd->u.cli, ocd); + if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT) + osc_init_grant(&obd->u.cli, ocd); - /* See bug 7198 */ - if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL) - imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL; + /* See bug 7198 */ + if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL) + imp->imp_client->cli_request_portal = + OST_REQUEST_PORTAL; rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD); - break; - } - case IMP_EVENT_DEACTIVATE: { + break; + } + case IMP_EVENT_DEACTIVATE: { rc = obd_notify_observer(obd, obd, OBD_NOTIFY_DEACTIVATE); - break; - } - case IMP_EVENT_ACTIVATE: { + break; + } + case IMP_EVENT_ACTIVATE: { rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVATE); - break; - } - default: - CERROR("Unknown import event %d\n", event); - LBUG(); - } - RETURN(rc); + break; + } + default: + CERROR("%s: Unknown import event %d: rc = %d\n", + obd->obd_name, event, -EINVAL); + LBUG(); + } + RETURN(rc); } /** @@ -3413,6 +3892,7 @@ int osc_setup_common(struct obd_device *obd, struct lustre_cfg *lcfg) GOTO(out_ptlrpcd_work, rc); cli->cl_grant_shrink_interval = GRANT_SHRINK_INTERVAL; + cli->cl_root_squash = 0; osc_update_next_shrink(cli); RETURN(rc); @@ -3436,10 +3916,10 @@ EXPORT_SYMBOL(osc_setup_common); int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg) { struct client_obd *cli = &obd->u.cli; - int adding; - int added; - int req_count; - int rc; + int adding; + int added; + int req_count; + int rc; ENTRY; @@ -3480,8 +3960,8 @@ int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg) int osc_precleanup_common(struct obd_device *obd) { struct client_obd *cli = &obd->u.cli; - ENTRY; + ENTRY; /* LU-464 * for echo client, export may be on zombie list, wait for * zombie thread to cull it, because cli.cl_import will be @@ -3530,7 +4010,7 @@ int osc_cleanup_common(struct obd_device *obd) /* lru cleanup */ if (cli->cl_cache != NULL) { - LASSERT(atomic_read(&cli->cl_cache->ccc_users) > 0); + LASSERT(refcount_read(&cli->cl_cache->ccc_users) > 0); spin_lock(&cli->cl_cache->ccc_lru_lock); list_del_init(&cli->cl_lru_osc); spin_unlock(&cli->cl_cache->ccc_lru_lock); @@ -3550,72 +4030,86 @@ int osc_cleanup_common(struct obd_device *obd) EXPORT_SYMBOL(osc_cleanup_common); static const struct obd_ops osc_obd_ops = { - .o_owner = THIS_MODULE, - .o_setup = osc_setup, - .o_precleanup = osc_precleanup, + .o_owner = THIS_MODULE, + .o_setup = osc_setup, + .o_precleanup = osc_precleanup, .o_cleanup = osc_cleanup_common, - .o_add_conn = client_import_add_conn, - .o_del_conn = client_import_del_conn, + .o_add_conn = client_import_add_conn, + .o_del_conn = client_import_del_conn, .o_connect = client_connect_import, - .o_reconnect = osc_reconnect, - .o_disconnect = osc_disconnect, - .o_statfs = osc_statfs, - .o_statfs_async = osc_statfs_async, - .o_create = osc_create, - .o_destroy = osc_destroy, - .o_getattr = osc_getattr, - .o_setattr = osc_setattr, - .o_iocontrol = osc_iocontrol, - .o_set_info_async = osc_set_info_async, - .o_import_event = osc_import_event, - .o_quotactl = osc_quotactl, + .o_reconnect = osc_reconnect, + .o_disconnect = osc_disconnect, + .o_statfs = osc_statfs, + .o_statfs_async = osc_statfs_async, + .o_create = osc_create, + .o_destroy = osc_destroy, + .o_getattr = osc_getattr, + .o_setattr = osc_setattr, + .o_iocontrol = osc_iocontrol, + .o_set_info_async = osc_set_info_async, + .o_import_event = osc_import_event, + .o_quotactl = osc_quotactl, }; -static struct shrinker *osc_cache_shrinker; LIST_HEAD(osc_shrink_list); DEFINE_SPINLOCK(osc_shrink_lock); +bool osc_page_cache_shrink_enabled = true; -#ifndef HAVE_SHRINKER_COUNT -static int osc_cache_shrink(SHRINKER_ARGS(sc, nr_to_scan, gfp_mask)) +#ifdef HAVE_SHRINKER_COUNT +static struct ll_shrinker_ops osc_cache_sh_ops = { + .count_objects = osc_cache_shrink_count, + .scan_objects = osc_cache_shrink_scan, + .seeks = DEFAULT_SEEKS, +}; +#else +static int osc_cache_shrink(struct shrinker *shrinker, + struct shrink_control *sc) { - struct shrink_control scv = { - .nr_to_scan = shrink_param(sc, nr_to_scan), - .gfp_mask = shrink_param(sc, gfp_mask) - }; - (void)osc_cache_shrink_scan(shrinker, &scv); + if (!osc_page_cache_shrink_enabled) + return 0; + + (void)osc_cache_shrink_scan(shrinker, sc); - return osc_cache_shrink_count(shrinker, &scv); + return osc_cache_shrink_count(shrinker, sc); } + +static struct ll_shrinker_ops osc_cache_sh_ops = { + .shrink = osc_cache_shrink, + .seeks = DEFAULT_SEEKS, +}; #endif +static struct shrinker *osc_cache_shrinker; + static int __init osc_init(void) { unsigned int reqpool_size; unsigned int reqsize; int rc; - DEF_SHRINKER_VAR(osc_shvar, osc_cache_shrink, - osc_cache_shrink_count, osc_cache_shrink_scan); - ENTRY; + ENTRY; /* print an address of _any_ initialized kernel symbol from this * module, to allow debugging with gdb that doesn't support data - * symbols from modules.*/ + * symbols from modules. + */ CDEBUG(D_INFO, "Lustre OSC module (%p).\n", &osc_caches); - rc = lu_kmem_init(osc_caches); + rc = libcfs_setup(); if (rc) - RETURN(rc); + return rc; - rc = class_register_type(&osc_obd_ops, NULL, true, NULL, - LUSTRE_OSC_NAME, &osc_device_type); + rc = lu_kmem_init(osc_caches); if (rc) - GOTO(out_kmem, rc); + RETURN(rc); - osc_cache_shrinker = set_shrinker(DEFAULT_SEEKS, &osc_shvar); + osc_cache_shrinker = ll_shrinker_create(&osc_cache_sh_ops, 0, + "osc_cache"); + if (IS_ERR(osc_cache_shrinker)) + GOTO(out_kmem, rc = PTR_ERR(osc_cache_shrinker)); /* This is obviously too much memory, only prevent overflow here */ if (osc_reqpool_mem_max >= 1 << 12 || osc_reqpool_mem_max == 0) - GOTO(out_type, rc = -EINVAL); + GOTO(out_shrinker, rc = -EINVAL); reqpool_size = osc_reqpool_mem_max << 20; @@ -3636,18 +4130,25 @@ static int __init osc_init(void) ptlrpc_add_rqs_to_pool); if (osc_rq_pool == NULL) - GOTO(out_type, rc = -ENOMEM); + GOTO(out_shrinker, rc = -ENOMEM); rc = osc_start_grant_work(); if (rc != 0) GOTO(out_req_pool, rc); + rc = class_register_type(&osc_obd_ops, NULL, true, + LUSTRE_OSC_NAME, &osc_device_type); + if (rc < 0) + GOTO(out_stop_grant, rc); + RETURN(rc); +out_stop_grant: + osc_stop_grant_work(); out_req_pool: ptlrpc_free_rq_pool(osc_rq_pool); -out_type: - class_unregister_type(LUSTRE_OSC_NAME); +out_shrinker: + shrinker_free(osc_cache_shrinker); out_kmem: lu_kmem_fini(osc_caches); @@ -3656,11 +4157,11 @@ out_kmem: static void __exit osc_exit(void) { - osc_stop_grant_work(); - remove_shrinker(osc_cache_shrinker); class_unregister_type(LUSTRE_OSC_NAME); - lu_kmem_fini(osc_caches); ptlrpc_free_rq_pool(osc_rq_pool); + osc_stop_grant_work(); + shrinker_free(osc_cache_shrinker); + lu_kmem_fini(osc_caches); } MODULE_AUTHOR("OpenSFS, Inc. ");