X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=blobdiff_plain;f=lustre%2Fosc%2Fosc_request.c;h=f84101436db9eb792c972ed58e18ec6736775c31;hp=6575d79627e4ff8feee54f86b3ff0d731dc76134;hb=6ea4de1b7f237d331c9e0d66f4cc53365d036e5d;hpb=df497dc560062a0a0c7178498cba8853843d39f1 diff --git a/lustre/osc/osc_request.c b/lustre/osc/osc_request.c index 6575d79..f841014 100644 --- a/lustre/osc/osc_request.c +++ b/lustre/osc/osc_request.c @@ -27,7 +27,7 @@ * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved. * Use is subject to license terms. * - * Copyright (c) 2011, 2013, Intel Corporation. + * Copyright (c) 2011, 2014, Intel Corporation. */ /* * This file is part of Lustre, http://www.lustre.org/ @@ -38,26 +38,17 @@ #include -#ifndef __KERNEL__ -# include -#endif - #include #include #include #include - -#ifdef __CYGWIN__ -# include -#endif - #include #include #include -#include #include #include #include +#include #include "osc_internal.h" #include "osc_cl_internal.h" @@ -65,7 +56,7 @@ struct osc_brw_async_args { struct obdo *aa_oa; int aa_requested_nob; int aa_nio_count; - obd_count aa_page_count; + u32 aa_page_count; int aa_resends; struct brw_page **aa_ppga; struct client_obd *aa_cli; @@ -77,10 +68,6 @@ struct osc_brw_async_args { #define osc_grant_args osc_brw_async_args -struct osc_async_args { - struct obd_info *aa_oi; -}; - struct osc_setattr_args { struct obdo *sa_oa; obd_enqueue_update_f sa_upcall; @@ -94,82 +81,21 @@ struct osc_fsync_args { }; struct osc_enqueue_args { - struct obd_export *oa_exp; - __u64 *oa_flags; - obd_enqueue_update_f oa_upcall; - void *oa_cookie; - struct ost_lvb *oa_lvb; - struct lustre_handle *oa_lockh; - struct ldlm_enqueue_info *oa_ei; - unsigned int oa_agl:1; + struct obd_export *oa_exp; + ldlm_type_t oa_type; + ldlm_mode_t oa_mode; + __u64 *oa_flags; + osc_enqueue_upcall_f oa_upcall; + void *oa_cookie; + struct ost_lvb *oa_lvb; + struct lustre_handle oa_lockh; + unsigned int oa_agl:1; }; -static void osc_release_ppga(struct brw_page **ppga, obd_count count); +static void osc_release_ppga(struct brw_page **ppga, size_t count); static int brw_interpret(const struct lu_env *env, struct ptlrpc_request *req, void *data, int rc); -/* Unpack OSC object metadata from disk storage (LE byte order). */ -static int osc_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp, - struct lov_mds_md *lmm, int lmm_bytes) -{ - int lsm_size; - struct obd_import *imp = class_exp2cliimp(exp); - ENTRY; - - if (lmm != NULL) { - if (lmm_bytes < sizeof(*lmm)) { - CERROR("%s: lov_mds_md too small: %d, need %d\n", - exp->exp_obd->obd_name, lmm_bytes, - (int)sizeof(*lmm)); - RETURN(-EINVAL); - } - /* XXX LOV_MAGIC etc check? */ - - if (unlikely(ostid_id(&lmm->lmm_oi) == 0)) { - CERROR("%s: zero lmm_object_id: rc = %d\n", - exp->exp_obd->obd_name, -EINVAL); - RETURN(-EINVAL); - } - } - - lsm_size = lov_stripe_md_size(1); - if (lsmp == NULL) - RETURN(lsm_size); - - if (*lsmp != NULL && lmm == NULL) { - OBD_FREE((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo)); - OBD_FREE(*lsmp, lsm_size); - *lsmp = NULL; - RETURN(0); - } - - if (*lsmp == NULL) { - OBD_ALLOC(*lsmp, lsm_size); - if (unlikely(*lsmp == NULL)) - RETURN(-ENOMEM); - OBD_ALLOC((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo)); - if (unlikely((*lsmp)->lsm_oinfo[0] == NULL)) { - OBD_FREE(*lsmp, lsm_size); - RETURN(-ENOMEM); - } - loi_init((*lsmp)->lsm_oinfo[0]); - } else if (unlikely(ostid_id(&(*lsmp)->lsm_oi) == 0)) { - RETURN(-EBADF); - } - - if (lmm != NULL) - /* XXX zero *lsmp? */ - ostid_le_to_cpu(&lmm->lmm_oi, &(*lsmp)->lsm_oi); - - if (imp != NULL && - (imp->imp_connect_data.ocd_connect_flags & OBD_CONNECT_MAXBYTES)) - (*lsmp)->lsm_maxbytes = imp->imp_connect_data.ocd_maxbytes; - else - (*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES; - - RETURN(lsm_size); -} - static inline void osc_pack_capa(struct ptlrpc_request *req, struct ost_body *body, void *capa) { @@ -186,8 +112,7 @@ static inline void osc_pack_capa(struct ptlrpc_request *req, DEBUG_CAPA(D_SEC, c, "pack"); } -static inline void osc_pack_req_body(struct ptlrpc_request *req, - struct obd_info *oinfo) +void osc_pack_req_body(struct ptlrpc_request *req, struct obd_info *oinfo) { struct ost_body *body; @@ -199,9 +124,9 @@ static inline void osc_pack_req_body(struct ptlrpc_request *req, osc_pack_capa(req, body, oinfo->oi_capa); } -static inline void osc_set_capa_size(struct ptlrpc_request *req, - const struct req_msg_field *field, - struct obd_capa *oc) +void osc_set_capa_size(struct ptlrpc_request *req, + const struct req_msg_field *field, + struct obd_capa *oc) { if (oc == NULL) req_capsule_set_size(&req->rq_pill, field, RCL_CLIENT, 0); @@ -210,9 +135,9 @@ static inline void osc_set_capa_size(struct ptlrpc_request *req, ; } -static int osc_getattr_interpret(const struct lu_env *env, - struct ptlrpc_request *req, - struct osc_async_args *aa, int rc) +int osc_getattr_interpret(const struct lu_env *env, + struct ptlrpc_request *req, + struct osc_async_args *aa, int rc) { struct ost_body *body; ENTRY; @@ -239,38 +164,6 @@ out: RETURN(rc); } -static int osc_getattr_async(struct obd_export *exp, struct obd_info *oinfo, - struct ptlrpc_request_set *set) -{ - struct ptlrpc_request *req; - struct osc_async_args *aa; - int rc; - ENTRY; - - req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR); - if (req == NULL) - RETURN(-ENOMEM); - - osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa); - rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR); - if (rc) { - ptlrpc_request_free(req); - RETURN(rc); - } - - osc_pack_req_body(req, oinfo); - - ptlrpc_request_set_replen(req); - req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_getattr_interpret; - - CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args)); - aa = ptlrpc_req_async_args(req); - aa->aa_oi = oinfo; - - ptlrpc_set_add_req(set, req); - RETURN(0); -} - static int osc_getattr(const struct lu_env *env, struct obd_export *exp, struct obd_info *oinfo) { @@ -378,10 +271,9 @@ out: RETURN(rc); } -int osc_setattr_async_base(struct obd_export *exp, struct obd_info *oinfo, - struct obd_trans_info *oti, - obd_enqueue_update_f upcall, void *cookie, - struct ptlrpc_request_set *rqset) +int osc_setattr_async(struct obd_export *exp, struct obd_info *oinfo, + obd_enqueue_update_f upcall, void *cookie, + struct ptlrpc_request_set *rqset) { struct ptlrpc_request *req; struct osc_setattr_args *sa; @@ -399,9 +291,6 @@ int osc_setattr_async_base(struct obd_export *exp, struct obd_info *oinfo, RETURN(rc); } - if (oti && oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE) - oinfo->oi_oa->o_lcookie = *oti->oti_logcookies; - osc_pack_req_body(req, oinfo); ptlrpc_request_set_replen(req); @@ -429,32 +318,17 @@ int osc_setattr_async_base(struct obd_export *exp, struct obd_info *oinfo, RETURN(0); } -static int osc_setattr_async(struct obd_export *exp, struct obd_info *oinfo, - struct obd_trans_info *oti, - struct ptlrpc_request_set *rqset) -{ - return osc_setattr_async_base(exp, oinfo, oti, - oinfo->oi_cb_up, oinfo, rqset); -} - -int osc_real_create(struct obd_export *exp, struct obdo *oa, - struct lov_stripe_md **ea, struct obd_trans_info *oti) +static int osc_create(const struct lu_env *env, struct obd_export *exp, + struct obdo *oa, struct obd_trans_info *oti) { struct ptlrpc_request *req; struct ost_body *body; - struct lov_stripe_md *lsm; int rc; ENTRY; - LASSERT(oa); - LASSERT(ea); - - lsm = *ea; - if (!lsm) { - rc = obd_alloc_memmd(exp, &lsm); - if (rc < 0) - RETURN(rc); - } + LASSERT(oa != NULL); + LASSERT(oa->o_valid & OBD_MD_FLGROUP); + LASSERT(fid_seq_is_echo(ostid_seq(&oa->o_oi))); req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE); if (req == NULL) @@ -495,19 +369,11 @@ int osc_real_create(struct obd_export *exp, struct obdo *oa, oa->o_blksize = cli_brw_size(exp->exp_obd); oa->o_valid |= OBD_MD_FLBLKSZ; - /* XXX LOV STACKING: the lsm that is passed to us from LOV does not - * have valid lsm_oinfo data structs, so don't go touching that. - * This needs to be fixed in a big way. - */ - lsm->lsm_oi = oa->o_oi; - *ea = lsm; - if (oti != NULL) { - oti->oti_transno = lustre_msg_get_transno(req->rq_repmsg); - if (oa->o_valid & OBD_MD_FLCOOKIE) { - if (!oti->oti_logcookies) - oti_alloc_cookies(oti, 1); + if (oti->oti_logcookies == NULL) + oti->oti_logcookies = &oti->oti_onecookie; + *oti->oti_logcookies = oa->o_lcookie; } } @@ -517,9 +383,7 @@ int osc_real_create(struct obd_export *exp, struct obdo *oa, out_req: ptlrpc_req_finished(req); out: - if (rc && !*ea) - obd_free_memmd(exp, &lsm); - RETURN(rc); + RETURN(rc); } int osc_punch_base(struct obd_export *exp, struct obd_info *oinfo, @@ -659,7 +523,7 @@ static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa, ostid_build_res_name(&oa->o_oi, &res_id); res = ldlm_resource_get(ns, NULL, &res_id, 0, 0); - if (res == NULL) + if (IS_ERR(res)) RETURN(0); LDLM_RESOURCE_ADDREF(res); @@ -699,31 +563,6 @@ static int osc_can_send_destroy(struct client_obd *cli) return 0; } -int osc_create(const struct lu_env *env, struct obd_export *exp, - struct obdo *oa, struct lov_stripe_md **ea, - struct obd_trans_info *oti) -{ - int rc = 0; - ENTRY; - - LASSERT(oa); - LASSERT(ea); - LASSERT(oa->o_valid & OBD_MD_FLGROUP); - - if ((oa->o_valid & OBD_MD_FLFLAGS) && - oa->o_flags == OBD_FL_RECREATE_OBJS) { - RETURN(osc_real_create(exp, oa, ea, oti)); - } - - if (!fid_seq_is_mdt(ostid_seq(&oa->o_oi))) - RETURN(osc_real_create(exp, oa, ea, oti)); - - /* we should not get here anymore */ - LBUG(); - - RETURN(rc); -} - /* Destroy requests can be async always on the client, and we don't even really * care about the return code since the client cannot do anything at all about * a destroy failure. @@ -735,9 +574,7 @@ int osc_create(const struct lu_env *env, struct obd_export *exp, * it will retrieve the llog unlink logs and then sends the log cancellation * cookies to the MDS after committing destroy transactions. */ static int osc_destroy(const struct lu_env *env, struct obd_export *exp, - struct obdo *oa, struct lov_stripe_md *ea, - struct obd_trans_info *oti, struct obd_export *md_export, - void *capa) + struct obdo *oa, struct obd_trans_info *oti) { struct client_obd *cli = &exp->exp_obd->u.cli; struct ptlrpc_request *req; @@ -760,7 +597,7 @@ static int osc_destroy(const struct lu_env *env, struct obd_export *exp, RETURN(-ENOMEM); } - osc_set_capa_size(req, &RMF_CAPA1, (struct obd_capa *)capa); + osc_set_capa_size(req, &RMF_CAPA1, NULL); rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY, 0, &cancels, count); if (rc) { @@ -777,7 +614,6 @@ static int osc_destroy(const struct lu_env *env, struct obd_export *exp, LASSERT(body); lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa); - osc_pack_capa(req, body, (struct obd_capa *)capa); ptlrpc_request_set_replen(req); /* If osc_destory is for destroying the unlink orphan, @@ -807,46 +643,47 @@ static int osc_destroy(const struct lu_env *env, struct obd_export *exp, static void osc_announce_cached(struct client_obd *cli, struct obdo *oa, long writing_bytes) { - obd_flag bits = OBD_MD_FLBLOCKS|OBD_MD_FLGRANT; + u64 bits = OBD_MD_FLBLOCKS | OBD_MD_FLGRANT; - LASSERT(!(oa->o_valid & bits)); + LASSERT(!(oa->o_valid & bits)); - oa->o_valid |= bits; - client_obd_list_lock(&cli->cl_loi_list_lock); - oa->o_dirty = cli->cl_dirty; - if (unlikely(cli->cl_dirty - cli->cl_dirty_transit > - cli->cl_dirty_max)) { + oa->o_valid |= bits; + spin_lock(&cli->cl_loi_list_lock); + oa->o_dirty = cli->cl_dirty_pages << PAGE_CACHE_SHIFT; + if (unlikely(cli->cl_dirty_pages - cli->cl_dirty_transit > + cli->cl_dirty_max_pages)) { CERROR("dirty %lu - %lu > dirty_max %lu\n", - cli->cl_dirty, cli->cl_dirty_transit, cli->cl_dirty_max); + cli->cl_dirty_pages, cli->cl_dirty_transit, + cli->cl_dirty_max_pages); oa->o_undirty = 0; - } else if (unlikely(atomic_read(&obd_unstable_pages) + - atomic_read(&obd_dirty_pages) - - atomic_read(&obd_dirty_transit_pages) > - (long)(obd_max_dirty_pages + 1))) { + } else if (unlikely(atomic_long_read(&obd_dirty_pages) - + atomic_long_read(&obd_dirty_transit_pages) > + (obd_max_dirty_pages + 1))) { /* The atomic_read() allowing the atomic_inc() are * not covered by a lock thus they may safely race and trip * this CERROR() unless we add in a small fudge factor (+1). */ - CERROR("%s: dirty %d + %d - %d > system dirty_max %d\n", + CERROR("%s: dirty %ld - %ld > system dirty_max %lu\n", cli->cl_import->imp_obd->obd_name, - atomic_read(&obd_unstable_pages), - atomic_read(&obd_dirty_pages), - atomic_read(&obd_dirty_transit_pages), + atomic_long_read(&obd_dirty_pages), + atomic_long_read(&obd_dirty_transit_pages), obd_max_dirty_pages); oa->o_undirty = 0; - } else if (unlikely(cli->cl_dirty_max - cli->cl_dirty > 0x7fffffff)) { + } else if (unlikely(cli->cl_dirty_max_pages - cli->cl_dirty_pages > + 0x7fffffff)) { CERROR("dirty %lu - dirty_max %lu too big???\n", - cli->cl_dirty, cli->cl_dirty_max); + cli->cl_dirty_pages, cli->cl_dirty_max_pages); oa->o_undirty = 0; } else { - long max_in_flight = (cli->cl_max_pages_per_rpc << + unsigned long max_in_flight = (cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT) * (cli->cl_max_rpcs_in_flight + 1); - oa->o_undirty = max(cli->cl_dirty_max, max_in_flight); + oa->o_undirty = max(cli->cl_dirty_max_pages << PAGE_CACHE_SHIFT, + max_in_flight); } oa->o_grant = cli->cl_avail_grant + cli->cl_reserved_grant; oa->o_dropped = cli->cl_lost_grant; cli->cl_lost_grant = 0; - client_obd_list_unlock(&cli->cl_loi_list_lock); + spin_unlock(&cli->cl_loi_list_lock); CDEBUG(D_CACHE,"dirty: "LPU64" undirty: %u dropped %u grant: "LPU64"\n", oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant); @@ -860,11 +697,11 @@ void osc_update_next_shrink(struct client_obd *cli) cli->cl_next_shrink_grant); } -static void __osc_update_grant(struct client_obd *cli, obd_size grant) +static void __osc_update_grant(struct client_obd *cli, u64 grant) { - client_obd_list_lock(&cli->cl_loi_list_lock); - cli->cl_avail_grant += grant; - client_obd_list_unlock(&cli->cl_loi_list_lock); + spin_lock(&cli->cl_loi_list_lock); + cli->cl_avail_grant += grant; + spin_unlock(&cli->cl_loi_list_lock); } static void osc_update_grant(struct client_obd *cli, struct ost_body *body) @@ -876,8 +713,9 @@ static void osc_update_grant(struct client_obd *cli, struct ost_body *body) } static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp, - obd_count keylen, void *key, obd_count vallen, - void *val, struct ptlrpc_request_set *set); + u32 keylen, void *key, + u32 vallen, void *val, + struct ptlrpc_request_set *set); static int osc_shrink_grant_interpret(const struct lu_env *env, struct ptlrpc_request *req, @@ -902,10 +740,10 @@ out: static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa) { - client_obd_list_lock(&cli->cl_loi_list_lock); - oa->o_grant = cli->cl_avail_grant / 4; - cli->cl_avail_grant -= oa->o_grant; - client_obd_list_unlock(&cli->cl_loi_list_lock); + spin_lock(&cli->cl_loi_list_lock); + oa->o_grant = cli->cl_avail_grant / 4; + cli->cl_avail_grant -= oa->o_grant; + spin_unlock(&cli->cl_loi_list_lock); if (!(oa->o_valid & OBD_MD_FLFLAGS)) { oa->o_valid |= OBD_MD_FLFLAGS; oa->o_flags = 0; @@ -923,10 +761,10 @@ static int osc_shrink_grant(struct client_obd *cli) __u64 target_bytes = (cli->cl_max_rpcs_in_flight + 1) * (cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT); - client_obd_list_lock(&cli->cl_loi_list_lock); + spin_lock(&cli->cl_loi_list_lock); if (cli->cl_avail_grant <= target_bytes) target_bytes = cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT; - client_obd_list_unlock(&cli->cl_loi_list_lock); + spin_unlock(&cli->cl_loi_list_lock); return osc_shrink_grant_to_target(cli, target_bytes); } @@ -937,7 +775,7 @@ int osc_shrink_grant_to_target(struct client_obd *cli, __u64 target_bytes) struct ost_body *body; ENTRY; - client_obd_list_lock(&cli->cl_loi_list_lock); + spin_lock(&cli->cl_loi_list_lock); /* Don't shrink if we are already above or below the desired limit * We don't want to shrink below a single RPC, as that will negatively * impact block allocation and long-term performance. */ @@ -945,10 +783,10 @@ int osc_shrink_grant_to_target(struct client_obd *cli, __u64 target_bytes) target_bytes = cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT; if (target_bytes >= cli->cl_avail_grant) { - client_obd_list_unlock(&cli->cl_loi_list_lock); + spin_unlock(&cli->cl_loi_list_lock); RETURN(0); } - client_obd_list_unlock(&cli->cl_loi_list_lock); + spin_unlock(&cli->cl_loi_list_lock); OBD_ALLOC_PTR(body); if (!body) @@ -956,10 +794,10 @@ int osc_shrink_grant_to_target(struct client_obd *cli, __u64 target_bytes) osc_announce_cached(cli, &body->oa, 0); - client_obd_list_lock(&cli->cl_loi_list_lock); + spin_lock(&cli->cl_loi_list_lock); body->oa.o_grant = cli->cl_avail_grant - target_bytes; cli->cl_avail_grant = target_bytes; - client_obd_list_unlock(&cli->cl_loi_list_lock); + spin_unlock(&cli->cl_loi_list_lock); if (!(body->oa.o_valid & OBD_MD_FLFLAGS)) { body->oa.o_valid |= OBD_MD_FLFLAGS; body->oa.o_flags = 0; @@ -1038,24 +876,26 @@ static int osc_del_shrink_grant(struct client_obd *client) static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd) { - /* - * ocd_grant is the total grant amount we're expect to hold: if we've - * been evicted, it's the new avail_grant amount, cl_dirty will drop - * to 0 as inflight RPCs fail out; otherwise, it's avail_grant + dirty. - * - * race is tolerable here: if we're evicted, but imp_state already - * left EVICTED state, then cl_dirty must be 0 already. - */ - client_obd_list_lock(&cli->cl_loi_list_lock); - if (cli->cl_import->imp_state == LUSTRE_IMP_EVICTED) - cli->cl_avail_grant = ocd->ocd_grant; - else - cli->cl_avail_grant = ocd->ocd_grant - cli->cl_dirty; + /* + * ocd_grant is the total grant amount we're expect to hold: if we've + * been evicted, it's the new avail_grant amount, cl_dirty_pages will + * drop to 0 as inflight RPCs fail out; otherwise, it's avail_grant + + * dirty. + * + * race is tolerable here: if we're evicted, but imp_state already + * left EVICTED state, then cl_dirty_pages must be 0 already. + */ + spin_lock(&cli->cl_loi_list_lock); + if (cli->cl_import->imp_state == LUSTRE_IMP_EVICTED) + cli->cl_avail_grant = ocd->ocd_grant; + else + cli->cl_avail_grant = ocd->ocd_grant - + (cli->cl_dirty_pages << PAGE_CACHE_SHIFT); if (cli->cl_avail_grant < 0) { CWARN("%s: available grant < 0: avail/ocd/dirty %ld/%u/%ld\n", cli->cl_import->imp_obd->obd_name, cli->cl_avail_grant, - ocd->ocd_grant, cli->cl_dirty); + ocd->ocd_grant, cli->cl_dirty_pages << PAGE_CACHE_SHIFT); /* workaround for servers which do not have the patch from * LU-2679 */ cli->cl_avail_grant = ocd->ocd_grant; @@ -1063,7 +903,7 @@ static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd) /* determine the appropriate chunk size used by osc_extent. */ cli->cl_chunkbits = max_t(int, PAGE_CACHE_SHIFT, ocd->ocd_blocksize); - client_obd_list_unlock(&cli->cl_loi_list_lock); + spin_unlock(&cli->cl_loi_list_lock); CDEBUG(D_CACHE, "%s, setting cl_avail_grant: %ld cl_lost_grant: %ld." "chunk bits: %d.\n", cli->cl_import->imp_obd->obd_name, @@ -1078,7 +918,7 @@ static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd) * beyond the end of a stripe file; i.e. lustre is reading a sparse file * via the LOV, and it _knows_ it's reading inside the file, it's just that * this stripe never got written at or beyond this stripe offset yet. */ -static void handle_short_read(int nob_read, obd_count page_count, +static void handle_short_read(int nob_read, size_t page_count, struct brw_page **pga) { char *ptr; @@ -1091,7 +931,7 @@ static void handle_short_read(int nob_read, obd_count page_count, if (pga[i]->count > nob_read) { /* EOF inside this page */ ptr = kmap(pga[i]->pg) + - (pga[i]->off & ~CFS_PAGE_MASK); + (pga[i]->off & ~PAGE_MASK); memset(ptr + nob_read, 0, pga[i]->count - nob_read); kunmap(pga[i]->pg); page_count--; @@ -1106,7 +946,7 @@ static void handle_short_read(int nob_read, obd_count page_count, /* zero remaining pages */ while (page_count-- > 0) { - ptr = kmap(pga[i]->pg) + (pga[i]->off & ~CFS_PAGE_MASK); + ptr = kmap(pga[i]->pg) + (pga[i]->off & ~PAGE_MASK); memset(ptr, 0, pga[i]->count); kunmap(pga[i]->pg); i++; @@ -1114,8 +954,8 @@ static void handle_short_read(int nob_read, obd_count page_count, } static int check_write_rcs(struct ptlrpc_request *req, - int requested_nob, int niocount, - obd_count page_count, struct brw_page **pga) + int requested_nob, int niocount, + size_t page_count, struct brw_page **pga) { int i; __u32 *remote_rcs; @@ -1160,7 +1000,7 @@ static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2) * safe to combine */ if (unlikely((p1->flag & mask) != (p2->flag & mask))) { CWARN("Saw flags 0x%x and 0x%x in the same brw, please " - "report this at http://bugs.whamcloud.com/\n", + "report this at https://jira.hpdd.intel.com/\n", p1->flag, p2->flag); } return 0; @@ -1169,11 +1009,11 @@ static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2) return (p1->off + p1->count == p2->off); } -static obd_count osc_checksum_bulk(int nob, obd_count pg_count, - struct brw_page **pga, int opc, - cksum_type_t cksum_type) +static u32 osc_checksum_bulk(int nob, size_t pg_count, + struct brw_page **pga, int opc, + cksum_type_t cksum_type) { - __u32 cksum; + u32 cksum; int i = 0; struct cfs_crypto_hash_desc *hdesc; unsigned int bufsize; @@ -1190,34 +1030,32 @@ static obd_count osc_checksum_bulk(int nob, obd_count pg_count, } while (nob > 0 && pg_count > 0) { - int count = pga[i]->count > nob ? nob : pga[i]->count; + unsigned int count = pga[i]->count > nob ? nob : pga[i]->count; /* corrupt the data before we compute the checksum, to * simulate an OST->client data error */ if (i == 0 && opc == OST_READ && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE)) { unsigned char *ptr = kmap(pga[i]->pg); - int off = pga[i]->off & ~CFS_PAGE_MASK; - memcpy(ptr + off, "bad1", min(4, nob)); + int off = pga[i]->off & ~PAGE_MASK; + + memcpy(ptr + off, "bad1", min_t(typeof(nob), 4, nob)); kunmap(pga[i]->pg); } cfs_crypto_hash_update_page(hdesc, pga[i]->pg, - pga[i]->off & ~CFS_PAGE_MASK, - count); + pga[i]->off & ~PAGE_MASK, + count); LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d\n", - (int)(pga[i]->off & ~CFS_PAGE_MASK)); + (int)(pga[i]->off & ~PAGE_MASK)); nob -= pga[i]->count; pg_count--; i++; } - bufsize = 4; + bufsize = sizeof(cksum); err = cfs_crypto_hash_final(hdesc, (unsigned char *)&cksum, &bufsize); - if (err) - cfs_crypto_hash_final(hdesc, NULL, NULL); - /* For sending we only compute the wrong checksum instead * of corrupting the data so it is still correct on a redo */ if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND)) @@ -1227,11 +1065,11 @@ static obd_count osc_checksum_bulk(int nob, obd_count pg_count, } static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa, - struct lov_stripe_md *lsm, obd_count page_count, - struct brw_page **pga, - struct ptlrpc_request **reqp, - struct obd_capa *ocapa, int reserve, - int resend) + struct lov_stripe_md *lsm, u32 page_count, + struct brw_page **pga, + struct ptlrpc_request **reqp, + struct obd_capa *ocapa, int reserve, + int resend) { struct ptlrpc_request *req; struct ptlrpc_bulk_desc *desc; @@ -1313,7 +1151,7 @@ static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa, pg_prev = pga[0]; for (requested_nob = i = 0; i < page_count; i++, niobuf++) { struct brw_page *pg = pga[i]; - int poff = pg->off & ~CFS_PAGE_MASK; + int poff = pg->off & ~PAGE_MASK; LASSERT(pg->count > 0); /* make sure there is no gap in the middle of page array */ @@ -1324,7 +1162,6 @@ static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa, ergo(i == page_count - 1, poff == 0)), "i: %d/%d pg: %p off: "LPU64", count: %u\n", i, page_count, pg, pg->off, pg->count); -#ifdef __linux__ LASSERTF(i == 0 || pg->off > pg_prev->off, "i %d p_c %u pg %p [pri %lu ind %lu] off "LPU64 " prev_pg %p [pri %lu ind %lu] off "LPU64"\n", @@ -1332,10 +1169,6 @@ static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa, pg->pg, page_private(pg->pg), pg->pg->index, pg->off, pg_prev->pg, page_private(pg_prev->pg), pg_prev->pg->index, pg_prev->off); -#else - LASSERTF(i == 0 || pg->off > pg_prev->off, - "i %d p_c %u\n", i, page_count); -#endif LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) == (pg->flag & OBD_BRW_SRVLOCK)); @@ -1344,11 +1177,11 @@ static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa, if (i > 0 && can_merge_pages(pg_prev, pg)) { niobuf--; - niobuf->len += pg->count; - } else { - niobuf->offset = pg->off; - niobuf->len = pg->count; - niobuf->flags = pg->flag; + niobuf->rnb_len += pg->count; + } else { + niobuf->rnb_offset = pg->off; + niobuf->rnb_len = pg->count; + niobuf->rnb_flags = pg->flag; } pg_prev = pg; } @@ -1426,7 +1259,11 @@ static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa, if (ocapa && reserve) aa->aa_ocapa = capa_get(ocapa); - *reqp = req; + *reqp = req; + niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE); + CDEBUG(D_RPCTRACE, "brw rpc %p - object "DOSTID" offset %lld<>%lld\n", + req, POSTID(&oa->o_oi), niobuf[0].rnb_offset, + niobuf[niocount - 1].rnb_offset + niobuf[niocount - 1].rnb_len); RETURN(0); out: @@ -1435,9 +1272,9 @@ static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa, } static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer, - __u32 client_cksum, __u32 server_cksum, int nob, - obd_count page_count, struct brw_page **pga, - cksum_type_t client_cksum_type) + __u32 client_cksum, __u32 server_cksum, int nob, + size_t page_count, struct brw_page **pga, + cksum_type_t client_cksum_type) { __u32 new_cksum; char *msg; @@ -1487,7 +1324,7 @@ static int osc_brw_fini_request(struct ptlrpc_request *req, int rc) &req->rq_import->imp_connection->c_peer; struct client_obd *cli = aa->aa_cli; struct ost_body *body; - __u32 client_cksum = 0; + u32 client_cksum = 0; ENTRY; if (rc < 0 && rc != -EDQUOT) { @@ -1567,9 +1404,9 @@ static int osc_brw_fini_request(struct ptlrpc_request *req, int rc) if (body->oa.o_valid & OBD_MD_FLCKSUM) { static int cksum_counter; - __u32 server_cksum = body->oa.o_cksum; - char *via; - char *router; + u32 server_cksum = body->oa.o_cksum; + char *via = ""; + char *router = ""; cksum_type_t cksum_type; cksum_type = cksum_type_unpack(body->oa.o_valid &OBD_MD_FLFLAGS? @@ -1578,19 +1415,12 @@ static int osc_brw_fini_request(struct ptlrpc_request *req, int rc) aa->aa_ppga, OST_READ, cksum_type); - if (peer->nid == req->rq_bulk->bd_sender) { - via = router = ""; - } else { - via = " via "; - router = libcfs_nid2str(req->rq_bulk->bd_sender); - } + if (peer->nid != req->rq_bulk->bd_sender) { + via = " via "; + router = libcfs_nid2str(req->rq_bulk->bd_sender); + } - if (server_cksum == ~0 && rc > 0) { - CERROR("Protocol error: server %s set the 'checksum' " - "bit, but didn't send a checksum. Not fatal, " - "but please notify on http://bugs.whamcloud.com/\n", - libcfs_nid2str(peer->nid)); - } else if (server_cksum != client_cksum) { + if (server_cksum != client_cksum) { LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from " "%s%s%s inode "DFID" object "DOSTID " extent ["LPU64"-"LPU64"]\n", @@ -1741,7 +1571,7 @@ static void sort_brw_pages(struct brw_page **array, int num) } while (stride > 1); } -static void osc_release_ppga(struct brw_page **ppga, obd_count count) +static void osc_release_ppga(struct brw_page **ppga, size_t count) { LASSERT(ppga != NULL); OBD_FREE(ppga, sizeof(*ppga) * count); @@ -1818,7 +1648,8 @@ static int brw_interpret(const struct lu_env *env, if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) { struct lov_oinfo *loi = cl2osc(obj)->oo_oinfo; - loff_t last_off = last->oap_count + last->oap_obj_off; + loff_t last_off = last->oap_count + last->oap_obj_off + + last->oap_page_off; /* Change file size if this is an out of quota or * direct IO write and it extends the file size */ @@ -1835,11 +1666,14 @@ static int brw_interpret(const struct lu_env *env, } if (valid != 0) - cl_object_attr_set(env, obj, attr, valid); + cl_object_attr_update(env, obj, attr, valid); cl_object_attr_unlock(obj); } OBDO_FREE(aa->aa_oa); + if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE && rc == 0) + osc_inc_unstable_pages(req); + list_for_each_entry_safe(ext, tmp, &aa->aa_exts, oe_link) { list_del_init(&ext->oe_link); osc_extent_finish(env, ext, 1, rc); @@ -1852,7 +1686,7 @@ static int brw_interpret(const struct lu_env *env, osc_release_ppga(aa->aa_ppga, aa->aa_page_count); ptlrpc_lprocfs_brw(req, req->rq_bulk->bd_nob_transferred); - client_obd_list_lock(&cli->cl_loi_list_lock); + spin_lock(&cli->cl_loi_list_lock); /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters * is called so we know whether to go to sync BRWs or wait for more * RPCs to complete */ @@ -1861,7 +1695,7 @@ static int brw_interpret(const struct lu_env *env, else cli->cl_r_in_flight--; osc_wake_cache_waiters(cli); - client_obd_list_unlock(&cli->cl_loi_list_lock); + spin_unlock(&cli->cl_loi_list_lock); osc_io_unplug(env, cli, NULL, PDL_POLICY_SAME); RETURN(rc); @@ -1903,17 +1737,17 @@ int osc_build_rpc(const struct lu_env *env, struct client_obd *cli, struct cl_req *clerq = NULL; enum cl_req_type crt = (cmd & OBD_BRW_WRITE) ? CRT_WRITE : CRT_READ; - struct ldlm_lock *lock = NULL; struct cl_req_attr *crattr = NULL; - obd_off starting_offset = OBD_OBJECT_EOF; - obd_off ending_offset = 0; + loff_t starting_offset = OBD_OBJECT_EOF; + loff_t ending_offset = 0; int mpflag = 0; int mem_tight = 0; int page_count = 0; + bool soft_sync = false; int i; int rc; struct list_head rpc_list = LIST_HEAD_INIT(rpc_list); - + struct ost_body *body; ENTRY; LASSERT(!list_empty(ext_list)); @@ -1924,7 +1758,8 @@ int osc_build_rpc(const struct lu_env *env, struct client_obd *cli, list_for_each_entry(oap, &ext->oe_pages, oap_pending_item) { ++page_count; list_add_tail(&oap->oap_rpc_item, &rpc_list); - if (starting_offset > oap->oap_obj_off) + if (starting_offset == OBD_OBJECT_EOF || + starting_offset > oap->oap_obj_off) starting_offset = oap->oap_obj_off; else LASSERT(oap->oap_page_off == 0); @@ -1937,6 +1772,7 @@ int osc_build_rpc(const struct lu_env *env, struct client_obd *cli, } } + soft_sync = osc_over_unstable_soft_limit(cli); if (mem_tight) mpflag = cfs_memory_pressure_get_and_set(); @@ -1960,10 +1796,11 @@ int osc_build_rpc(const struct lu_env *env, struct client_obd *cli, 1 /* only 1-object rpcs for now */); if (IS_ERR(clerq)) GOTO(out, rc = PTR_ERR(clerq)); - lock = oap->oap_ldlm_lock; } if (mem_tight) oap->oap_brw_flags |= OBD_BRW_MEMALLOC; + if (soft_sync) + oap->oap_brw_flags |= OBD_BRW_SOFT_SYNC; pga[i] = &oap->oap_brw_page; pga[i]->off = oap->oap_obj_off + oap->oap_page_off; CDEBUG(0, "put page %p index %lu oap %p flg %x to pga\n", @@ -1977,10 +1814,6 @@ int osc_build_rpc(const struct lu_env *env, struct client_obd *cli, LASSERT(clerq != NULL); crattr->cra_oa = oa; cl_req_attr_set(env, clerq, crattr, ~0ULL); - if (lock) { - oa->o_handle = lock->l_remote_handle; - oa->o_valid |= OBD_MD_FLHANDLE; - } rc = cl_req_prep(env, clerq); if (rc != 0) { @@ -2007,6 +1840,8 @@ int osc_build_rpc(const struct lu_env *env, struct client_obd *cli, * later setattr before earlier BRW (as determined by the request xid), * the OST will not use BRW timestamps. Sadly, there is no obvious * way to do this in a single call. bug 10150 */ + body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY); + crattr->cra_oa = &body->oa; cl_req_attr_set(env, clerq, crattr, OBD_MD_FLMTIME|OBD_MD_FLCTIME|OBD_MD_FLATIME); @@ -2036,7 +1871,7 @@ int osc_build_rpc(const struct lu_env *env, struct client_obd *cli, if (tmp != NULL) tmp->oap_request = ptlrpc_request_addref(req); - client_obd_list_lock(&cli->cl_loi_list_lock); + spin_lock(&cli->cl_loi_list_lock); starting_offset >>= PAGE_CACHE_SHIFT; if (cmd == OBD_BRW_READ) { cli->cl_r_in_flight++; @@ -2051,9 +1886,9 @@ int osc_build_rpc(const struct lu_env *env, struct client_obd *cli, lprocfs_oh_tally_log2(&cli->cl_write_offset_hist, starting_offset + 1); } - client_obd_list_unlock(&cli->cl_loi_list_lock); + spin_unlock(&cli->cl_loi_list_lock); - DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %dr/%dw in flight", + DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %ur/%uw in flight", page_count, aa, cli->cl_r_in_flight, cli->cl_w_in_flight); @@ -2116,14 +1951,12 @@ static int osc_set_lock_data_with_check(struct ldlm_lock *lock, LASSERT(lock->l_glimpse_ast == einfo->ei_cb_gl); lock_res_and_lock(lock); - spin_lock(&osc_ast_guard); if (lock->l_ast_data == NULL) lock->l_ast_data = data; if (lock->l_ast_data == data) set = 1; - spin_unlock(&osc_ast_guard); unlock_res_and_lock(lock); return set; @@ -2144,68 +1977,41 @@ static int osc_set_data_with_check(struct lustre_handle *lockh, return set; } -static int osc_change_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm, - ldlm_iterator_t replace, void *data) -{ - struct ldlm_res_id res_id; - struct obd_device *obd = class_exp2obd(exp); - - ostid_build_res_name(&lsm->lsm_oi, &res_id); - ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data); - return 0; -} - -/* find any ldlm lock of the inode in osc - * return 0 not find - * 1 find one - * < 0 error */ -static int osc_find_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm, - ldlm_iterator_t replace, void *data) +static int osc_enqueue_fini(struct ptlrpc_request *req, + osc_enqueue_upcall_f upcall, void *cookie, + struct lustre_handle *lockh, ldlm_mode_t mode, + __u64 *flags, int agl, int errcode) { - struct ldlm_res_id res_id; - struct obd_device *obd = class_exp2obd(exp); - int rc = 0; - - ostid_build_res_name(&lsm->lsm_oi, &res_id); - rc = ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data); - if (rc == LDLM_ITER_STOP) - return(1); - if (rc == LDLM_ITER_CONTINUE) - return(0); - return(rc); -} + bool intent = *flags & LDLM_FL_HAS_INTENT; + int rc; + ENTRY; -static int osc_enqueue_fini(struct ptlrpc_request *req, struct ost_lvb *lvb, - obd_enqueue_update_f upcall, void *cookie, - __u64 *flags, int agl, int rc) -{ - int intent = *flags & LDLM_FL_HAS_INTENT; - ENTRY; + /* The request was created before ldlm_cli_enqueue call. */ + if (intent && errcode == ELDLM_LOCK_ABORTED) { + struct ldlm_reply *rep; + + rep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP); + LASSERT(rep != NULL); + + rep->lock_policy_res1 = + ptlrpc_status_ntoh(rep->lock_policy_res1); + if (rep->lock_policy_res1) + errcode = rep->lock_policy_res1; + if (!agl) + *flags |= LDLM_FL_LVB_READY; + } else if (errcode == ELDLM_OK) { + *flags |= LDLM_FL_LVB_READY; + } - if (intent) { - /* The request was created before ldlm_cli_enqueue call. */ - if (rc == ELDLM_LOCK_ABORTED) { - struct ldlm_reply *rep; - rep = req_capsule_server_get(&req->rq_pill, - &RMF_DLM_REP); - - LASSERT(rep != NULL); - rep->lock_policy_res1 = - ptlrpc_status_ntoh(rep->lock_policy_res1); - if (rep->lock_policy_res1) - rc = rep->lock_policy_res1; - } - } + /* Call the update callback. */ + rc = (*upcall)(cookie, lockh, errcode); - if ((intent != 0 && rc == ELDLM_LOCK_ABORTED && agl == 0) || - (rc == 0)) { - *flags |= LDLM_FL_LVB_READY; - CDEBUG(D_INODE,"got kms "LPU64" blocks "LPU64" mtime "LPU64"\n", - lvb->lvb_size, lvb->lvb_blocks, lvb->lvb_mtime); - } + /* release the reference taken in ldlm_cli_enqueue() */ + if (errcode == ELDLM_LOCK_MATCHED) + errcode = ELDLM_OK; + if (errcode == ELDLM_OK && lustre_handle_is_used(lockh)) + ldlm_lock_decref(lockh, mode); - /* Call the update callback. */ - rc = (*upcall)(cookie, rc); RETURN(rc); } @@ -2213,65 +2019,53 @@ static int osc_enqueue_interpret(const struct lu_env *env, struct ptlrpc_request *req, struct osc_enqueue_args *aa, int rc) { - struct ldlm_lock *lock; - struct lustre_handle handle; - __u32 mode; - struct ost_lvb *lvb; - __u32 lvb_len; - __u64 *flags = aa->oa_flags; + struct ldlm_lock *lock; + struct lustre_handle *lockh = &aa->oa_lockh; + ldlm_mode_t mode = aa->oa_mode; + struct ost_lvb *lvb = aa->oa_lvb; + __u32 lvb_len = sizeof(*lvb); + __u64 flags = 0; - /* Make a local copy of a lock handle and a mode, because aa->oa_* - * might be freed anytime after lock upcall has been called. */ - lustre_handle_copy(&handle, aa->oa_lockh); - mode = aa->oa_ei->ei_mode; + ENTRY; - /* ldlm_cli_enqueue is holding a reference on the lock, so it must - * be valid. */ - lock = ldlm_handle2lock(&handle); + /* ldlm_cli_enqueue is holding a reference on the lock, so it must + * be valid. */ + lock = ldlm_handle2lock(lockh); + LASSERTF(lock != NULL, + "lockh "LPX64", req %p, aa %p - client evicted?\n", + lockh->cookie, req, aa); - /* Take an additional reference so that a blocking AST that - * ldlm_cli_enqueue_fini() might post for a failed lock, is guaranteed - * to arrive after an upcall has been executed by - * osc_enqueue_fini(). */ - ldlm_lock_addref(&handle, mode); + /* Take an additional reference so that a blocking AST that + * ldlm_cli_enqueue_fini() might post for a failed lock, is guaranteed + * to arrive after an upcall has been executed by + * osc_enqueue_fini(). */ + ldlm_lock_addref(lockh, mode); /* Let cl_lock_state_wait fail with -ERESTARTSYS to unuse sublocks. */ OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_ENQUEUE_HANG, 2); - /* Let CP AST to grant the lock first. */ - OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 1); + /* Let CP AST to grant the lock first. */ + OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 1); - if (aa->oa_agl && rc == ELDLM_LOCK_ABORTED) { - lvb = NULL; - lvb_len = 0; - } else { - lvb = aa->oa_lvb; - lvb_len = sizeof(*aa->oa_lvb); - } + if (aa->oa_agl) { + LASSERT(aa->oa_lvb == NULL); + LASSERT(aa->oa_flags == NULL); + aa->oa_flags = &flags; + } - /* Complete obtaining the lock procedure. */ - rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_ei->ei_type, 1, - mode, flags, lvb, lvb_len, &handle, rc); - /* Complete osc stuff. */ - rc = osc_enqueue_fini(req, aa->oa_lvb, aa->oa_upcall, aa->oa_cookie, - flags, aa->oa_agl, rc); + /* Complete obtaining the lock procedure. */ + rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_type, 1, + aa->oa_mode, aa->oa_flags, lvb, lvb_len, + lockh, rc); + /* Complete osc stuff. */ + rc = osc_enqueue_fini(req, aa->oa_upcall, aa->oa_cookie, lockh, mode, + aa->oa_flags, aa->oa_agl, rc); OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10); - /* Release the lock for async request. */ - if (lustre_handle_is_used(&handle) && rc == ELDLM_OK) - /* - * Releases a reference taken by ldlm_cli_enqueue(), if it is - * not already released by - * ldlm_cli_enqueue_fini()->failed_lock_cleanup() - */ - ldlm_lock_decref(&handle, mode); - - LASSERTF(lock != NULL, "lockh %p, req %p, aa %p - client evicted?\n", - aa->oa_lockh, req, aa); - ldlm_lock_decref(&handle, mode); - LDLM_LOCK_PUT(lock); - return rc; + ldlm_lock_decref(lockh, mode); + LDLM_LOCK_PUT(lock); + RETURN(rc); } struct ptlrpc_request_set *PTLRPCD_SET = (void *)1; @@ -2281,28 +2075,28 @@ struct ptlrpc_request_set *PTLRPCD_SET = (void *)1; * other synchronous requests, however keeping some locks and trying to obtain * others may take a considerable amount of time in a case of ost failure; and * when other sync requests do not get released lock from a client, the client - * is excluded from the cluster -- such scenarious make the life difficult, so + * is evicted from the cluster -- such scenarious make the life difficult, so * release locks just after they are obtained. */ int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id, __u64 *flags, ldlm_policy_data_t *policy, struct ost_lvb *lvb, int kms_valid, - obd_enqueue_update_f upcall, void *cookie, + osc_enqueue_upcall_f upcall, void *cookie, struct ldlm_enqueue_info *einfo, - struct lustre_handle *lockh, struct ptlrpc_request_set *rqset, int async, int agl) { struct obd_device *obd = exp->exp_obd; + struct lustre_handle lockh = { 0 }; struct ptlrpc_request *req = NULL; int intent = *flags & LDLM_FL_HAS_INTENT; - __u64 match_lvb = (agl != 0 ? 0 : LDLM_FL_LVB_READY); + __u64 match_lvb = agl ? 0 : LDLM_FL_LVB_READY; ldlm_mode_t mode; int rc; ENTRY; /* Filesystem lock extents are extended to page boundaries so that * dealing with the page cache is a little smoother. */ - policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK; - policy->l_extent.end |= ~CFS_PAGE_MASK; + policy->l_extent.start -= policy->l_extent.start & ~PAGE_MASK; + policy->l_extent.end |= ~PAGE_MASK; /* * kms is not valid when either object is completely fresh (so that no @@ -2329,50 +2123,41 @@ int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id, if (einfo->ei_mode == LCK_PR) mode |= LCK_PW; mode = ldlm_lock_match(obd->obd_namespace, *flags | match_lvb, res_id, - einfo->ei_type, policy, mode, lockh, 0); - if (mode) { - struct ldlm_lock *matched = ldlm_handle2lock(lockh); - - if ((agl != 0) && !ldlm_is_lvb_ready(matched)) { - /* For AGL, if enqueue RPC is sent but the lock is not - * granted, then skip to process this strpe. - * Return -ECANCELED to tell the caller. */ - ldlm_lock_decref(lockh, mode); - LDLM_LOCK_PUT(matched); - RETURN(-ECANCELED); - } else if (osc_set_lock_data_with_check(matched, einfo)) { - *flags |= LDLM_FL_LVB_READY; - /* addref the lock only if not async requests and PW - * lock is matched whereas we asked for PR. */ - if (!rqset && einfo->ei_mode != mode) - ldlm_lock_addref(lockh, LCK_PR); - if (intent) { - /* I would like to be able to ASSERT here that - * rss <= kms, but I can't, for reasons which - * are explained in lov_enqueue() */ - } + einfo->ei_type, policy, mode, &lockh, 0); + if (mode) { + struct ldlm_lock *matched; + + if (*flags & LDLM_FL_TEST_LOCK) + RETURN(ELDLM_OK); + + matched = ldlm_handle2lock(&lockh); + if (agl) { + /* AGL enqueues DLM locks speculatively. Therefore if + * it already exists a DLM lock, it wll just inform the + * caller to cancel the AGL process for this stripe. */ + ldlm_lock_decref(&lockh, mode); + LDLM_LOCK_PUT(matched); + RETURN(-ECANCELED); + } else if (osc_set_lock_data_with_check(matched, einfo)) { + *flags |= LDLM_FL_LVB_READY; + + /* We already have a lock, and it's referenced. */ + (*upcall)(cookie, &lockh, ELDLM_LOCK_MATCHED); + + ldlm_lock_decref(&lockh, mode); + LDLM_LOCK_PUT(matched); + RETURN(ELDLM_OK); + } else { + ldlm_lock_decref(&lockh, mode); + LDLM_LOCK_PUT(matched); + } + } - /* We already have a lock, and it's referenced. - * - * At this point, the cl_lock::cll_state is CLS_QUEUING, - * AGL upcall may change it to CLS_HELD directly. */ - (*upcall)(cookie, ELDLM_OK); - - if (einfo->ei_mode != mode) - ldlm_lock_decref(lockh, LCK_PW); - else if (rqset) - /* For async requests, decref the lock. */ - ldlm_lock_decref(lockh, einfo->ei_mode); - LDLM_LOCK_PUT(matched); - RETURN(ELDLM_OK); - } else { - ldlm_lock_decref(lockh, mode); - LDLM_LOCK_PUT(matched); - } - } +no_match: + if (*flags & LDLM_FL_TEST_LOCK) + RETURN(-ENOLCK); - no_match: - if (intent) { + if (intent) { req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_LDLM_ENQUEUE_LVB); if (req == NULL) @@ -2393,20 +2178,29 @@ int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id, *flags &= ~LDLM_FL_BLOCK_GRANTED; rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb, - sizeof(*lvb), LVB_T_OST, lockh, async); - if (rqset) { - if (!rc) { - struct osc_enqueue_args *aa; - CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args)); - aa = ptlrpc_req_async_args(req); - aa->oa_ei = einfo; - aa->oa_exp = exp; - aa->oa_flags = flags; - aa->oa_upcall = upcall; - aa->oa_cookie = cookie; - aa->oa_lvb = lvb; - aa->oa_lockh = lockh; - aa->oa_agl = !!agl; + sizeof(*lvb), LVB_T_OST, &lockh, async); + if (async) { + if (!rc) { + struct osc_enqueue_args *aa; + CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args)); + aa = ptlrpc_req_async_args(req); + aa->oa_exp = exp; + aa->oa_mode = einfo->ei_mode; + aa->oa_type = einfo->ei_type; + lustre_handle_copy(&aa->oa_lockh, &lockh); + aa->oa_upcall = upcall; + aa->oa_cookie = cookie; + aa->oa_agl = !!agl; + if (!agl) { + aa->oa_flags = flags; + aa->oa_lvb = lvb; + } else { + /* AGL is essentially to enqueue an DLM lock + * in advance, so we don't care about the + * result of AGL enqueue. */ + aa->oa_lvb = NULL; + aa->oa_flags = NULL; + } req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_enqueue_interpret; @@ -2420,11 +2214,12 @@ int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id, RETURN(rc); } - rc = osc_enqueue_fini(req, lvb, upcall, cookie, flags, agl, rc); - if (intent) - ptlrpc_req_finished(req); + rc = osc_enqueue_fini(req, upcall, cookie, &lockh, einfo->ei_mode, + flags, agl, rc); + if (intent) + ptlrpc_req_finished(req); - RETURN(rc); + RETURN(rc); } int osc_match_base(struct obd_export *exp, struct ldlm_res_id *res_id, @@ -2442,8 +2237,8 @@ int osc_match_base(struct obd_export *exp, struct ldlm_res_id *res_id, /* Filesystem lock extents are extended to page boundaries so that * dealing with the page cache is a little smoother */ - policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK; - policy->l_extent.end |= ~CFS_PAGE_MASK; + policy->l_extent.start -= policy->l_extent.start & ~PAGE_MASK; + policy->l_extent.end |= ~PAGE_MASK; /* Next, search for already existing extent locks that will cover us */ /* If we're trying to read, we also search for an existing PW lock. The @@ -2471,18 +2266,6 @@ int osc_match_base(struct obd_export *exp, struct ldlm_res_id *res_id, RETURN(rc); } -int osc_cancel_base(struct lustre_handle *lockh, __u32 mode) -{ - ENTRY; - - if (unlikely(mode == LCK_GROUP)) - ldlm_lock_decref_and_cancel(lockh, mode); - else - ldlm_lock_decref(lockh, mode); - - RETURN(0); -} - static int osc_statfs_interpret(const struct lu_env *env, struct ptlrpc_request *req, struct osc_async_args *aa, int rc) @@ -2633,7 +2416,8 @@ static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len, ENTRY; if (!try_module_get(THIS_MODULE)) { - CERROR("Can't get module. Is it alive?"); + CERROR("%s: cannot get module '%s'\n", obd->obd_name, + module_name(THIS_MODULE)); return -EINVAL; } switch (cmd) { @@ -2663,145 +2447,10 @@ out: return err; } -static int osc_get_info(const struct lu_env *env, struct obd_export *exp, - obd_count keylen, void *key, __u32 *vallen, void *val, - struct lov_stripe_md *lsm) -{ - ENTRY; - if (!vallen || !val) - RETURN(-EFAULT); - - if (KEY_IS(KEY_LOCK_TO_STRIPE)) { - __u32 *stripe = val; - *vallen = sizeof(*stripe); - *stripe = 0; - RETURN(0); - } else if (KEY_IS(KEY_LAST_ID)) { - struct ptlrpc_request *req; - obd_id *reply; - char *tmp; - int rc; - - req = ptlrpc_request_alloc(class_exp2cliimp(exp), - &RQF_OST_GET_INFO_LAST_ID); - if (req == NULL) - RETURN(-ENOMEM); - - req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY, - RCL_CLIENT, keylen); - rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO); - if (rc) { - ptlrpc_request_free(req); - RETURN(rc); - } - - tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY); - memcpy(tmp, key, keylen); - - req->rq_no_delay = req->rq_no_resend = 1; - ptlrpc_request_set_replen(req); - rc = ptlrpc_queue_wait(req); - if (rc) - GOTO(out, rc); - - reply = req_capsule_server_get(&req->rq_pill, &RMF_OBD_ID); - if (reply == NULL) - GOTO(out, rc = -EPROTO); - - *((obd_id *)val) = *reply; - out: - ptlrpc_req_finished(req); - RETURN(rc); - } else if (KEY_IS(KEY_FIEMAP)) { - struct ll_fiemap_info_key *fm_key = - (struct ll_fiemap_info_key *)key; - struct ldlm_res_id res_id; - ldlm_policy_data_t policy; - struct lustre_handle lockh; - ldlm_mode_t mode = 0; - struct ptlrpc_request *req; - struct ll_user_fiemap *reply; - char *tmp; - int rc; - - if (!(fm_key->fiemap.fm_flags & FIEMAP_FLAG_SYNC)) - goto skip_locking; - - policy.l_extent.start = fm_key->fiemap.fm_start & - CFS_PAGE_MASK; - - if (OBD_OBJECT_EOF - fm_key->fiemap.fm_length <= - fm_key->fiemap.fm_start + PAGE_CACHE_SIZE - 1) - policy.l_extent.end = OBD_OBJECT_EOF; - else - policy.l_extent.end = (fm_key->fiemap.fm_start + - fm_key->fiemap.fm_length + - PAGE_CACHE_SIZE - 1) & CFS_PAGE_MASK; - - ostid_build_res_name(&fm_key->oa.o_oi, &res_id); - mode = ldlm_lock_match(exp->exp_obd->obd_namespace, - LDLM_FL_BLOCK_GRANTED | - LDLM_FL_LVB_READY, - &res_id, LDLM_EXTENT, &policy, - LCK_PR | LCK_PW, &lockh, 0); - if (mode) { /* lock is cached on client */ - if (mode != LCK_PR) { - ldlm_lock_addref(&lockh, LCK_PR); - ldlm_lock_decref(&lockh, LCK_PW); - } - } else { /* no cached lock, needs acquire lock on server side */ - fm_key->oa.o_valid |= OBD_MD_FLFLAGS; - fm_key->oa.o_flags |= OBD_FL_SRVLOCK; - } - -skip_locking: - req = ptlrpc_request_alloc(class_exp2cliimp(exp), - &RQF_OST_GET_INFO_FIEMAP); - if (req == NULL) - GOTO(drop_lock, rc = -ENOMEM); - - req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_KEY, - RCL_CLIENT, keylen); - req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL, - RCL_CLIENT, *vallen); - req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL, - RCL_SERVER, *vallen); - - rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO); - if (rc) { - ptlrpc_request_free(req); - GOTO(drop_lock, rc); - } - - tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_KEY); - memcpy(tmp, key, keylen); - tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_VAL); - memcpy(tmp, val, *vallen); - - ptlrpc_request_set_replen(req); - rc = ptlrpc_queue_wait(req); - if (rc) - GOTO(fini_req, rc); - - reply = req_capsule_server_get(&req->rq_pill, &RMF_FIEMAP_VAL); - if (reply == NULL) - GOTO(fini_req, rc = -EPROTO); - - memcpy(val, reply, *vallen); -fini_req: - ptlrpc_req_finished(req); -drop_lock: - if (mode) - ldlm_lock_decref(&lockh, LCK_PR); - RETURN(rc); - } - - RETURN(-EINVAL); -} - static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp, - obd_count keylen, void *key, obd_count vallen, - void *val, struct ptlrpc_request_set *set) + u32 keylen, void *key, + u32 vallen, void *val, + struct ptlrpc_request_set *set) { struct ptlrpc_request *req; struct obd_device *obd = exp->exp_obd; @@ -2834,7 +2483,7 @@ static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp, LASSERT(cli->cl_cache == NULL); /* only once */ cli->cl_cache = (struct cl_client_cache *)val; - atomic_inc(&cli->cl_cache->ccc_users); + cl_cache_incref(cli->cl_cache); cli->cl_lru_left = &cli->cl_cache->ccc_lru_left; /* add this osc into entity list */ @@ -2848,11 +2497,11 @@ static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp, if (KEY_IS(KEY_CACHE_LRU_SHRINK)) { struct client_obd *cli = &obd->u.cli; - int nr = atomic_read(&cli->cl_lru_in_list) >> 1; - int target = *(int *)val; + long nr = atomic_long_read(&cli->cl_lru_in_list) >> 1; + long target = *(long *)val; nr = osc_lru_shrink(env, cli, min(nr, target), true); - *(int *)val -= nr; + *(long *)val -= nr; RETURN(0); } @@ -2928,12 +2577,13 @@ static int osc_reconnect(const struct lu_env *env, if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) { long lost_grant; - client_obd_list_lock(&cli->cl_loi_list_lock); - data->ocd_grant = (cli->cl_avail_grant + cli->cl_dirty) ?: - 2 * cli_brw_size(obd); - lost_grant = cli->cl_lost_grant; - cli->cl_lost_grant = 0; - client_obd_list_unlock(&cli->cl_loi_list_lock); + spin_lock(&cli->cl_loi_list_lock); + data->ocd_grant = (cli->cl_avail_grant + + (cli->cl_dirty_pages << PAGE_CACHE_SHIFT)) ?: + 2 * cli_brw_size(obd); + lost_grant = cli->cl_lost_grant; + cli->cl_lost_grant = 0; + spin_unlock(&cli->cl_loi_list_lock); CDEBUG(D_RPCTRACE, "ocd_connect_flags: "LPX64" ocd_version: %d" " ocd_grant: %d, lost: %ld.\n", data->ocd_connect_flags, @@ -2945,22 +2595,8 @@ static int osc_reconnect(const struct lu_env *env, static int osc_disconnect(struct obd_export *exp) { - struct obd_device *obd = class_exp2obd(exp); - struct llog_ctxt *ctxt; - int rc; - - ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT); - if (ctxt) { - if (obd->u.cli.cl_conn_count == 1) { - /* Flush any remaining cancel messages out to the - * target */ - llog_sync(ctxt, exp, 0); - } - llog_ctxt_put(ctxt); - } else { - CDEBUG(D_HA, "No LLOG_SIZE_REPL_CTXT found in obd %p\n", - obd); - } + struct obd_device *obd = class_exp2obd(exp); + int rc; rc = client_disconnect_export(exp); /** @@ -2998,10 +2634,10 @@ static int osc_import_event(struct obd_device *obd, switch (event) { case IMP_EVENT_DISCON: { cli = &obd->u.cli; - client_obd_list_lock(&cli->cl_loi_list_lock); - cli->cl_avail_grant = 0; - cli->cl_lost_grant = 0; - client_obd_list_unlock(&cli->cl_loi_list_lock); + spin_lock(&cli->cl_loi_list_lock); + cli->cl_avail_grant = 0; + cli->cl_lost_grant = 0; + spin_unlock(&cli->cl_loi_list_lock); break; } case IMP_EVENT_INACTIVE: { @@ -3121,7 +2757,7 @@ int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg) cli->cl_grant_shrink_interval = GRANT_SHRINK_INTERVAL; -#ifdef LPROCFS +#ifdef CONFIG_PROC_FS obd->obd_vars = lprocfs_osc_obd_vars; #endif /* If this is true then both client (osc) and server (osp) are on the @@ -3130,9 +2766,9 @@ int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg) * tree to type->typ_procsym instead of obd->obd_type->typ_procroot. */ type = class_search_type(LUSTRE_OSP_NAME); if (type && type->typ_procsym) { - obd->obd_proc_entry = lprocfs_seq_register(obd->obd_name, - type->typ_procsym, - obd->obd_vars, obd); + obd->obd_proc_entry = lprocfs_register(obd->obd_name, + type->typ_procsym, + obd->obd_vars, obd); if (IS_ERR(obd->obd_proc_entry)) { rc = PTR_ERR(obd->obd_proc_entry); CERROR("error %d setting up lprocfs for %s\n", rc, @@ -3140,7 +2776,7 @@ int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg) obd->obd_proc_entry = NULL; } } else { - rc = lprocfs_seq_obd_setup(obd); + rc = lprocfs_obd_setup(obd); } /* If the basic OSC proc tree construction succeeded then @@ -3221,9 +2857,6 @@ static int osc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage) obd_cleanup_client_import(obd); ptlrpc_lprocfs_unregister_obd(obd); lprocfs_obd_cleanup(obd); - rc = obd_llog_finish(obd, 0); - if (rc != 0) - CERROR("failed to cleanup llogging subsystems\n"); break; } } @@ -3244,7 +2877,7 @@ int osc_cleanup(struct obd_device *obd) list_del_init(&cli->cl_lru_osc); spin_unlock(&cli->cl_cache->ccc_lru_lock); cli->cl_lru_left = NULL; - atomic_dec(&cli->cl_cache->ccc_users); + cl_cache_decref(cli->cl_cache); cli->cl_cache = NULL; } @@ -3259,17 +2892,16 @@ int osc_cleanup(struct obd_device *obd) int osc_process_config_base(struct obd_device *obd, struct lustre_cfg *lcfg) { - int rc = class_process_proc_seq_param(PARAM_OSC, obd->obd_vars, - lcfg, obd); + int rc = class_process_proc_param(PARAM_OSC, obd->obd_vars, lcfg, obd); return rc > 0 ? 0: rc; } -static int osc_process_config(struct obd_device *obd, obd_count len, void *buf) +static int osc_process_config(struct obd_device *obd, size_t len, void *buf) { return osc_process_config_base(obd, buf); } -struct obd_ops osc_obd_ops = { +static struct obd_ops osc_obd_ops = { .o_owner = THIS_MODULE, .o_setup = osc_setup, .o_precleanup = osc_precleanup, @@ -3281,17 +2913,11 @@ struct obd_ops osc_obd_ops = { .o_disconnect = osc_disconnect, .o_statfs = osc_statfs, .o_statfs_async = osc_statfs_async, - .o_unpackmd = osc_unpackmd, .o_create = osc_create, .o_destroy = osc_destroy, .o_getattr = osc_getattr, - .o_getattr_async = osc_getattr_async, .o_setattr = osc_setattr, - .o_setattr_async = osc_setattr_async, - .o_change_cbdata = osc_change_cbdata, - .o_find_cbdata = osc_find_cbdata, .o_iocontrol = osc_iocontrol, - .o_get_info = osc_get_info, .o_set_info_async = osc_set_info_async, .o_import_event = osc_import_event, .o_process_config = osc_process_config, @@ -3299,11 +2925,7 @@ struct obd_ops osc_obd_ops = { .o_quotacheck = osc_quotacheck, }; -extern struct lu_kmem_descr osc_caches[]; -extern spinlock_t osc_ast_guard; -extern struct lock_class_key osc_ast_guard_class; - -int __init osc_init(void) +static int __init osc_init(void) { bool enable_proc = true; struct obd_type *type; @@ -3324,22 +2946,15 @@ int __init osc_init(void) enable_proc = false; rc = class_register_type(&osc_obd_ops, NULL, enable_proc, NULL, -#ifndef HAVE_ONLY_PROCFS_SEQ - NULL, -#endif LUSTRE_OSC_NAME, &osc_device_type); if (rc) { lu_kmem_fini(osc_caches); RETURN(rc); } - spin_lock_init(&osc_ast_guard); - lockdep_set_class(&osc_ast_guard, &osc_ast_guard_class); - RETURN(rc); } -#ifdef __KERNEL__ static void /*__exit*/ osc_exit(void) { class_unregister_type(LUSTRE_OSC_NAME); @@ -3351,4 +2966,3 @@ MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)"); MODULE_LICENSE("GPL"); cfs_module(osc, LUSTRE_VERSION_STRING, osc_init, osc_exit); -#endif