X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=blobdiff_plain;f=lustre%2Fosc%2Fosc_request.c;h=6b90effc33c323f7f3650e2dbe98af970cf62e32;hp=7a055870a5fbaa7ba9f07124231ce4dfc407db31;hb=4311cdaa832fc4a444d48e50174dde09f21146d2;hpb=1f1d3a376d488d715dd1b0c94d5b66ea05c1e6ca diff --git a/lustre/osc/osc_request.c b/lustre/osc/osc_request.c index 7a05587..6b90eff 100644 --- a/lustre/osc/osc_request.c +++ b/lustre/osc/osc_request.c @@ -27,7 +27,7 @@ * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved. * Use is subject to license terms. * - * Copyright (c) 2011, 2013, Intel Corporation. + * Copyright (c) 2011, 2014, Intel Corporation. */ /* * This file is part of Lustre, http://www.lustre.org/ @@ -38,7 +38,6 @@ #include - #include #include #include @@ -46,10 +45,10 @@ #include #include #include -#include #include #include #include +#include #include "osc_internal.h" #include "osc_cl_internal.h" @@ -86,82 +85,21 @@ struct osc_fsync_args { }; struct osc_enqueue_args { - struct obd_export *oa_exp; - __u64 *oa_flags; - obd_enqueue_update_f oa_upcall; - void *oa_cookie; - struct ost_lvb *oa_lvb; - struct lustre_handle *oa_lockh; - struct ldlm_enqueue_info *oa_ei; - unsigned int oa_agl:1; + struct obd_export *oa_exp; + ldlm_type_t oa_type; + ldlm_mode_t oa_mode; + __u64 *oa_flags; + osc_enqueue_upcall_f oa_upcall; + void *oa_cookie; + struct ost_lvb *oa_lvb; + struct lustre_handle oa_lockh; + unsigned int oa_agl:1; }; static void osc_release_ppga(struct brw_page **ppga, obd_count count); static int brw_interpret(const struct lu_env *env, struct ptlrpc_request *req, void *data, int rc); -/* Unpack OSC object metadata from disk storage (LE byte order). */ -static int osc_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp, - struct lov_mds_md *lmm, int lmm_bytes) -{ - int lsm_size; - struct obd_import *imp = class_exp2cliimp(exp); - ENTRY; - - if (lmm != NULL) { - if (lmm_bytes < sizeof(*lmm)) { - CERROR("%s: lov_mds_md too small: %d, need %d\n", - exp->exp_obd->obd_name, lmm_bytes, - (int)sizeof(*lmm)); - RETURN(-EINVAL); - } - /* XXX LOV_MAGIC etc check? */ - - if (unlikely(ostid_id(&lmm->lmm_oi) == 0)) { - CERROR("%s: zero lmm_object_id: rc = %d\n", - exp->exp_obd->obd_name, -EINVAL); - RETURN(-EINVAL); - } - } - - lsm_size = lov_stripe_md_size(1); - if (lsmp == NULL) - RETURN(lsm_size); - - if (*lsmp != NULL && lmm == NULL) { - OBD_FREE((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo)); - OBD_FREE(*lsmp, lsm_size); - *lsmp = NULL; - RETURN(0); - } - - if (*lsmp == NULL) { - OBD_ALLOC(*lsmp, lsm_size); - if (unlikely(*lsmp == NULL)) - RETURN(-ENOMEM); - OBD_ALLOC((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo)); - if (unlikely((*lsmp)->lsm_oinfo[0] == NULL)) { - OBD_FREE(*lsmp, lsm_size); - RETURN(-ENOMEM); - } - loi_init((*lsmp)->lsm_oinfo[0]); - } else if (unlikely(ostid_id(&(*lsmp)->lsm_oi) == 0)) { - RETURN(-EBADF); - } - - if (lmm != NULL) - /* XXX zero *lsmp? */ - ostid_le_to_cpu(&lmm->lmm_oi, &(*lsmp)->lsm_oi); - - if (imp != NULL && - (imp->imp_connect_data.ocd_connect_flags & OBD_CONNECT_MAXBYTES)) - (*lsmp)->lsm_maxbytes = imp->imp_connect_data.ocd_maxbytes; - else - (*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES; - - RETURN(lsm_size); -} - static inline void osc_pack_capa(struct ptlrpc_request *req, struct ost_body *body, void *capa) { @@ -429,24 +367,17 @@ static int osc_setattr_async(struct obd_export *exp, struct obd_info *oinfo, oinfo->oi_cb_up, oinfo, rqset); } -int osc_real_create(struct obd_export *exp, struct obdo *oa, - struct lov_stripe_md **ea, struct obd_trans_info *oti) +static int osc_create(const struct lu_env *env, struct obd_export *exp, + struct obdo *oa, struct obd_trans_info *oti) { struct ptlrpc_request *req; struct ost_body *body; - struct lov_stripe_md *lsm; int rc; ENTRY; - LASSERT(oa); - LASSERT(ea); - - lsm = *ea; - if (!lsm) { - rc = obd_alloc_memmd(exp, &lsm); - if (rc < 0) - RETURN(rc); - } + LASSERT(oa != NULL); + LASSERT(oa->o_valid & OBD_MD_FLGROUP); + LASSERT(fid_seq_is_echo(ostid_seq(&oa->o_oi))); req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE); if (req == NULL) @@ -487,13 +418,6 @@ int osc_real_create(struct obd_export *exp, struct obdo *oa, oa->o_blksize = cli_brw_size(exp->exp_obd); oa->o_valid |= OBD_MD_FLBLKSZ; - /* XXX LOV STACKING: the lsm that is passed to us from LOV does not - * have valid lsm_oinfo data structs, so don't go touching that. - * This needs to be fixed in a big way. - */ - lsm->lsm_oi = oa->o_oi; - *ea = lsm; - if (oti != NULL) { if (oa->o_valid & OBD_MD_FLCOOKIE) { if (oti->oti_logcookies == NULL) @@ -508,9 +432,7 @@ int osc_real_create(struct obd_export *exp, struct obdo *oa, out_req: ptlrpc_req_finished(req); out: - if (rc && !*ea) - obd_free_memmd(exp, &lsm); - RETURN(rc); + RETURN(rc); } int osc_punch_base(struct obd_export *exp, struct obd_info *oinfo, @@ -690,31 +612,6 @@ static int osc_can_send_destroy(struct client_obd *cli) return 0; } -int osc_create(const struct lu_env *env, struct obd_export *exp, - struct obdo *oa, struct lov_stripe_md **ea, - struct obd_trans_info *oti) -{ - int rc = 0; - ENTRY; - - LASSERT(oa); - LASSERT(ea); - LASSERT(oa->o_valid & OBD_MD_FLGROUP); - - if ((oa->o_valid & OBD_MD_FLFLAGS) && - oa->o_flags == OBD_FL_RECREATE_OBJS) { - RETURN(osc_real_create(exp, oa, ea, oti)); - } - - if (!fid_seq_is_mdt(ostid_seq(&oa->o_oi))) - RETURN(osc_real_create(exp, oa, ea, oti)); - - /* we should not get here anymore */ - LBUG(); - - RETURN(rc); -} - /* Destroy requests can be async always on the client, and we don't even really * care about the return code since the client cannot do anything at all about * a destroy failure. @@ -726,9 +623,7 @@ int osc_create(const struct lu_env *env, struct obd_export *exp, * it will retrieve the llog unlink logs and then sends the log cancellation * cookies to the MDS after committing destroy transactions. */ static int osc_destroy(const struct lu_env *env, struct obd_export *exp, - struct obdo *oa, struct lov_stripe_md *ea, - struct obd_trans_info *oti, struct obd_export *md_export, - void *capa) + struct obdo *oa, struct obd_trans_info *oti) { struct client_obd *cli = &exp->exp_obd->u.cli; struct ptlrpc_request *req; @@ -751,7 +646,7 @@ static int osc_destroy(const struct lu_env *env, struct obd_export *exp, RETURN(-ENOMEM); } - osc_set_capa_size(req, &RMF_CAPA1, (struct obd_capa *)capa); + osc_set_capa_size(req, &RMF_CAPA1, NULL); rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY, 0, &cancels, count); if (rc) { @@ -768,7 +663,6 @@ static int osc_destroy(const struct lu_env *env, struct obd_export *exp, LASSERT(body); lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa); - osc_pack_capa(req, body, (struct obd_capa *)capa); ptlrpc_request_set_replen(req); /* If osc_destory is for destroying the unlink orphan, @@ -803,7 +697,7 @@ static void osc_announce_cached(struct client_obd *cli, struct obdo *oa, LASSERT(!(oa->o_valid & bits)); oa->o_valid |= bits; - client_obd_list_lock(&cli->cl_loi_list_lock); + spin_lock(&cli->cl_loi_list_lock); oa->o_dirty = cli->cl_dirty_pages << PAGE_CACHE_SHIFT; if (unlikely(cli->cl_dirty_pages - cli->cl_dirty_transit > cli->cl_dirty_max_pages)) { @@ -811,16 +705,16 @@ static void osc_announce_cached(struct client_obd *cli, struct obdo *oa, cli->cl_dirty_pages, cli->cl_dirty_transit, cli->cl_dirty_max_pages); oa->o_undirty = 0; - } else if (unlikely(atomic_read(&obd_dirty_pages) - - atomic_read(&obd_dirty_transit_pages) > - (long)(obd_max_dirty_pages + 1))) { + } else if (unlikely(atomic_long_read(&obd_dirty_pages) - + atomic_long_read(&obd_dirty_transit_pages) > + (obd_max_dirty_pages + 1))) { /* The atomic_read() allowing the atomic_inc() are * not covered by a lock thus they may safely race and trip * this CERROR() unless we add in a small fudge factor (+1). */ - CERROR("%s: dirty %d - %d > system dirty_max %d\n", + CERROR("%s: dirty %ld - %ld > system dirty_max %lu\n", cli->cl_import->imp_obd->obd_name, - atomic_read(&obd_dirty_pages), - atomic_read(&obd_dirty_transit_pages), + atomic_long_read(&obd_dirty_pages), + atomic_long_read(&obd_dirty_transit_pages), obd_max_dirty_pages); oa->o_undirty = 0; } else if (unlikely(cli->cl_dirty_max_pages - cli->cl_dirty_pages > @@ -838,7 +732,7 @@ static void osc_announce_cached(struct client_obd *cli, struct obdo *oa, oa->o_grant = cli->cl_avail_grant + cli->cl_reserved_grant; oa->o_dropped = cli->cl_lost_grant; cli->cl_lost_grant = 0; - client_obd_list_unlock(&cli->cl_loi_list_lock); + spin_unlock(&cli->cl_loi_list_lock); CDEBUG(D_CACHE,"dirty: "LPU64" undirty: %u dropped %u grant: "LPU64"\n", oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant); @@ -854,9 +748,9 @@ void osc_update_next_shrink(struct client_obd *cli) static void __osc_update_grant(struct client_obd *cli, obd_size grant) { - client_obd_list_lock(&cli->cl_loi_list_lock); - cli->cl_avail_grant += grant; - client_obd_list_unlock(&cli->cl_loi_list_lock); + spin_lock(&cli->cl_loi_list_lock); + cli->cl_avail_grant += grant; + spin_unlock(&cli->cl_loi_list_lock); } static void osc_update_grant(struct client_obd *cli, struct ost_body *body) @@ -894,10 +788,10 @@ out: static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa) { - client_obd_list_lock(&cli->cl_loi_list_lock); - oa->o_grant = cli->cl_avail_grant / 4; - cli->cl_avail_grant -= oa->o_grant; - client_obd_list_unlock(&cli->cl_loi_list_lock); + spin_lock(&cli->cl_loi_list_lock); + oa->o_grant = cli->cl_avail_grant / 4; + cli->cl_avail_grant -= oa->o_grant; + spin_unlock(&cli->cl_loi_list_lock); if (!(oa->o_valid & OBD_MD_FLFLAGS)) { oa->o_valid |= OBD_MD_FLFLAGS; oa->o_flags = 0; @@ -915,10 +809,10 @@ static int osc_shrink_grant(struct client_obd *cli) __u64 target_bytes = (cli->cl_max_rpcs_in_flight + 1) * (cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT); - client_obd_list_lock(&cli->cl_loi_list_lock); + spin_lock(&cli->cl_loi_list_lock); if (cli->cl_avail_grant <= target_bytes) target_bytes = cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT; - client_obd_list_unlock(&cli->cl_loi_list_lock); + spin_unlock(&cli->cl_loi_list_lock); return osc_shrink_grant_to_target(cli, target_bytes); } @@ -929,7 +823,7 @@ int osc_shrink_grant_to_target(struct client_obd *cli, __u64 target_bytes) struct ost_body *body; ENTRY; - client_obd_list_lock(&cli->cl_loi_list_lock); + spin_lock(&cli->cl_loi_list_lock); /* Don't shrink if we are already above or below the desired limit * We don't want to shrink below a single RPC, as that will negatively * impact block allocation and long-term performance. */ @@ -937,10 +831,10 @@ int osc_shrink_grant_to_target(struct client_obd *cli, __u64 target_bytes) target_bytes = cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT; if (target_bytes >= cli->cl_avail_grant) { - client_obd_list_unlock(&cli->cl_loi_list_lock); + spin_unlock(&cli->cl_loi_list_lock); RETURN(0); } - client_obd_list_unlock(&cli->cl_loi_list_lock); + spin_unlock(&cli->cl_loi_list_lock); OBD_ALLOC_PTR(body); if (!body) @@ -948,10 +842,10 @@ int osc_shrink_grant_to_target(struct client_obd *cli, __u64 target_bytes) osc_announce_cached(cli, &body->oa, 0); - client_obd_list_lock(&cli->cl_loi_list_lock); + spin_lock(&cli->cl_loi_list_lock); body->oa.o_grant = cli->cl_avail_grant - target_bytes; cli->cl_avail_grant = target_bytes; - client_obd_list_unlock(&cli->cl_loi_list_lock); + spin_unlock(&cli->cl_loi_list_lock); if (!(body->oa.o_valid & OBD_MD_FLFLAGS)) { body->oa.o_valid |= OBD_MD_FLFLAGS; body->oa.o_flags = 0; @@ -1039,7 +933,7 @@ static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd) * race is tolerable here: if we're evicted, but imp_state already * left EVICTED state, then cl_dirty_pages must be 0 already. */ - client_obd_list_lock(&cli->cl_loi_list_lock); + spin_lock(&cli->cl_loi_list_lock); if (cli->cl_import->imp_state == LUSTRE_IMP_EVICTED) cli->cl_avail_grant = ocd->ocd_grant; else @@ -1057,7 +951,7 @@ static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd) /* determine the appropriate chunk size used by osc_extent. */ cli->cl_chunkbits = max_t(int, PAGE_CACHE_SHIFT, ocd->ocd_blocksize); - client_obd_list_unlock(&cli->cl_loi_list_lock); + spin_unlock(&cli->cl_loi_list_lock); CDEBUG(D_CACHE, "%s, setting cl_avail_grant: %ld cl_lost_grant: %ld." "chunk bits: %d.\n", cli->cl_import->imp_obd->obd_name, @@ -1413,7 +1307,11 @@ static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa, if (ocapa && reserve) aa->aa_ocapa = capa_get(ocapa); - *reqp = req; + *reqp = req; + niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE); + CDEBUG(D_RPCTRACE, "brw rpc %p - object "DOSTID" offset %lld<>%lld\n", + req, POSTID(&oa->o_oi), niobuf[0].rnb_offset, + niobuf[niocount - 1].rnb_offset + niobuf[niocount - 1].rnb_len); RETURN(0); out: @@ -1838,7 +1736,7 @@ static int brw_interpret(const struct lu_env *env, osc_release_ppga(aa->aa_ppga, aa->aa_page_count); ptlrpc_lprocfs_brw(req, req->rq_bulk->bd_nob_transferred); - client_obd_list_lock(&cli->cl_loi_list_lock); + spin_lock(&cli->cl_loi_list_lock); /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters * is called so we know whether to go to sync BRWs or wait for more * RPCs to complete */ @@ -1847,7 +1745,7 @@ static int brw_interpret(const struct lu_env *env, else cli->cl_r_in_flight--; osc_wake_cache_waiters(cli); - client_obd_list_unlock(&cli->cl_loi_list_lock); + spin_unlock(&cli->cl_loi_list_lock); osc_io_unplug(env, cli, NULL, PDL_POLICY_SAME); RETURN(rc); @@ -1899,7 +1797,7 @@ int osc_build_rpc(const struct lu_env *env, struct client_obd *cli, int i; int rc; struct list_head rpc_list = LIST_HEAD_INIT(rpc_list); - + struct ost_body *body; ENTRY; LASSERT(!list_empty(ext_list)); @@ -1991,6 +1889,8 @@ int osc_build_rpc(const struct lu_env *env, struct client_obd *cli, * later setattr before earlier BRW (as determined by the request xid), * the OST will not use BRW timestamps. Sadly, there is no obvious * way to do this in a single call. bug 10150 */ + body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY); + crattr->cra_oa = &body->oa; cl_req_attr_set(env, clerq, crattr, OBD_MD_FLMTIME|OBD_MD_FLCTIME|OBD_MD_FLATIME); @@ -2020,7 +1920,7 @@ int osc_build_rpc(const struct lu_env *env, struct client_obd *cli, if (tmp != NULL) tmp->oap_request = ptlrpc_request_addref(req); - client_obd_list_lock(&cli->cl_loi_list_lock); + spin_lock(&cli->cl_loi_list_lock); starting_offset >>= PAGE_CACHE_SHIFT; if (cmd == OBD_BRW_READ) { cli->cl_r_in_flight++; @@ -2035,7 +1935,7 @@ int osc_build_rpc(const struct lu_env *env, struct client_obd *cli, lprocfs_oh_tally_log2(&cli->cl_write_offset_hist, starting_offset + 1); } - client_obd_list_unlock(&cli->cl_loi_list_lock); + spin_unlock(&cli->cl_loi_list_lock); DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %ur/%uw in flight", page_count, aa, cli->cl_r_in_flight, @@ -2100,14 +2000,12 @@ static int osc_set_lock_data_with_check(struct ldlm_lock *lock, LASSERT(lock->l_glimpse_ast == einfo->ei_cb_gl); lock_res_and_lock(lock); - spin_lock(&osc_ast_guard); if (lock->l_ast_data == NULL) lock->l_ast_data = data; if (lock->l_ast_data == data) set = 1; - spin_unlock(&osc_ast_guard); unlock_res_and_lock(lock); return set; @@ -2159,37 +2057,41 @@ static int osc_find_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm, return(rc); } -static int osc_enqueue_fini(struct ptlrpc_request *req, struct ost_lvb *lvb, - obd_enqueue_update_f upcall, void *cookie, - __u64 *flags, int agl, int rc) +static int osc_enqueue_fini(struct ptlrpc_request *req, + osc_enqueue_upcall_f upcall, void *cookie, + struct lustre_handle *lockh, ldlm_mode_t mode, + __u64 *flags, int agl, int errcode) { - int intent = *flags & LDLM_FL_HAS_INTENT; - ENTRY; - - if (intent) { - /* The request was created before ldlm_cli_enqueue call. */ - if (rc == ELDLM_LOCK_ABORTED) { - struct ldlm_reply *rep; - rep = req_capsule_server_get(&req->rq_pill, - &RMF_DLM_REP); - - LASSERT(rep != NULL); - rep->lock_policy_res1 = - ptlrpc_status_ntoh(rep->lock_policy_res1); - if (rep->lock_policy_res1) - rc = rep->lock_policy_res1; - } - } + bool intent = *flags & LDLM_FL_HAS_INTENT; + int rc; + ENTRY; - if ((intent != 0 && rc == ELDLM_LOCK_ABORTED && agl == 0) || - (rc == 0)) { - *flags |= LDLM_FL_LVB_READY; - CDEBUG(D_INODE,"got kms "LPU64" blocks "LPU64" mtime "LPU64"\n", - lvb->lvb_size, lvb->lvb_blocks, lvb->lvb_mtime); - } + /* The request was created before ldlm_cli_enqueue call. */ + if (intent && errcode == ELDLM_LOCK_ABORTED) { + struct ldlm_reply *rep; + + rep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP); + LASSERT(rep != NULL); + + rep->lock_policy_res1 = + ptlrpc_status_ntoh(rep->lock_policy_res1); + if (rep->lock_policy_res1) + errcode = rep->lock_policy_res1; + if (!agl) + *flags |= LDLM_FL_LVB_READY; + } else if (errcode == ELDLM_OK) { + *flags |= LDLM_FL_LVB_READY; + } /* Call the update callback. */ - rc = (*upcall)(cookie, rc); + rc = (*upcall)(cookie, lockh, errcode); + + /* release the reference taken in ldlm_cli_enqueue() */ + if (errcode == ELDLM_LOCK_MATCHED) + errcode = ELDLM_OK; + if (errcode == ELDLM_OK && lustre_handle_is_used(lockh)) + ldlm_lock_decref(lockh, mode); + RETURN(rc); } @@ -2197,65 +2099,53 @@ static int osc_enqueue_interpret(const struct lu_env *env, struct ptlrpc_request *req, struct osc_enqueue_args *aa, int rc) { - struct ldlm_lock *lock; - struct lustre_handle handle; - __u32 mode; - struct ost_lvb *lvb; - __u32 lvb_len; - __u64 *flags = aa->oa_flags; + struct ldlm_lock *lock; + struct lustre_handle *lockh = &aa->oa_lockh; + ldlm_mode_t mode = aa->oa_mode; + struct ost_lvb *lvb = aa->oa_lvb; + __u32 lvb_len = sizeof(*lvb); + __u64 flags = 0; - /* Make a local copy of a lock handle and a mode, because aa->oa_* - * might be freed anytime after lock upcall has been called. */ - lustre_handle_copy(&handle, aa->oa_lockh); - mode = aa->oa_ei->ei_mode; + ENTRY; - /* ldlm_cli_enqueue is holding a reference on the lock, so it must - * be valid. */ - lock = ldlm_handle2lock(&handle); + /* ldlm_cli_enqueue is holding a reference on the lock, so it must + * be valid. */ + lock = ldlm_handle2lock(lockh); + LASSERTF(lock != NULL, + "lockh "LPX64", req %p, aa %p - client evicted?\n", + lockh->cookie, req, aa); - /* Take an additional reference so that a blocking AST that - * ldlm_cli_enqueue_fini() might post for a failed lock, is guaranteed - * to arrive after an upcall has been executed by - * osc_enqueue_fini(). */ - ldlm_lock_addref(&handle, mode); + /* Take an additional reference so that a blocking AST that + * ldlm_cli_enqueue_fini() might post for a failed lock, is guaranteed + * to arrive after an upcall has been executed by + * osc_enqueue_fini(). */ + ldlm_lock_addref(lockh, mode); /* Let cl_lock_state_wait fail with -ERESTARTSYS to unuse sublocks. */ OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_ENQUEUE_HANG, 2); - /* Let CP AST to grant the lock first. */ - OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 1); + /* Let CP AST to grant the lock first. */ + OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 1); - if (aa->oa_agl && rc == ELDLM_LOCK_ABORTED) { - lvb = NULL; - lvb_len = 0; - } else { - lvb = aa->oa_lvb; - lvb_len = sizeof(*aa->oa_lvb); - } + if (aa->oa_agl) { + LASSERT(aa->oa_lvb == NULL); + LASSERT(aa->oa_flags == NULL); + aa->oa_flags = &flags; + } - /* Complete obtaining the lock procedure. */ - rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_ei->ei_type, 1, - mode, flags, lvb, lvb_len, &handle, rc); - /* Complete osc stuff. */ - rc = osc_enqueue_fini(req, aa->oa_lvb, aa->oa_upcall, aa->oa_cookie, - flags, aa->oa_agl, rc); + /* Complete obtaining the lock procedure. */ + rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_type, 1, + aa->oa_mode, aa->oa_flags, lvb, lvb_len, + lockh, rc); + /* Complete osc stuff. */ + rc = osc_enqueue_fini(req, aa->oa_upcall, aa->oa_cookie, lockh, mode, + aa->oa_flags, aa->oa_agl, rc); OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10); - /* Release the lock for async request. */ - if (lustre_handle_is_used(&handle) && rc == ELDLM_OK) - /* - * Releases a reference taken by ldlm_cli_enqueue(), if it is - * not already released by - * ldlm_cli_enqueue_fini()->failed_lock_cleanup() - */ - ldlm_lock_decref(&handle, mode); - - LASSERTF(lock != NULL, "lockh %p, req %p, aa %p - client evicted?\n", - aa->oa_lockh, req, aa); - ldlm_lock_decref(&handle, mode); - LDLM_LOCK_PUT(lock); - return rc; + ldlm_lock_decref(lockh, mode); + LDLM_LOCK_PUT(lock); + RETURN(rc); } struct ptlrpc_request_set *PTLRPCD_SET = (void *)1; @@ -2265,20 +2155,20 @@ struct ptlrpc_request_set *PTLRPCD_SET = (void *)1; * other synchronous requests, however keeping some locks and trying to obtain * others may take a considerable amount of time in a case of ost failure; and * when other sync requests do not get released lock from a client, the client - * is excluded from the cluster -- such scenarious make the life difficult, so + * is evicted from the cluster -- such scenarious make the life difficult, so * release locks just after they are obtained. */ int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id, __u64 *flags, ldlm_policy_data_t *policy, struct ost_lvb *lvb, int kms_valid, - obd_enqueue_update_f upcall, void *cookie, + osc_enqueue_upcall_f upcall, void *cookie, struct ldlm_enqueue_info *einfo, - struct lustre_handle *lockh, struct ptlrpc_request_set *rqset, int async, int agl) { struct obd_device *obd = exp->exp_obd; + struct lustre_handle lockh = { 0 }; struct ptlrpc_request *req = NULL; int intent = *flags & LDLM_FL_HAS_INTENT; - __u64 match_lvb = (agl != 0 ? 0 : LDLM_FL_LVB_READY); + __u64 match_lvb = agl ? 0 : LDLM_FL_LVB_READY; ldlm_mode_t mode; int rc; ENTRY; @@ -2313,50 +2203,41 @@ int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id, if (einfo->ei_mode == LCK_PR) mode |= LCK_PW; mode = ldlm_lock_match(obd->obd_namespace, *flags | match_lvb, res_id, - einfo->ei_type, policy, mode, lockh, 0); - if (mode) { - struct ldlm_lock *matched = ldlm_handle2lock(lockh); - - if ((agl != 0) && !ldlm_is_lvb_ready(matched)) { - /* For AGL, if enqueue RPC is sent but the lock is not - * granted, then skip to process this strpe. - * Return -ECANCELED to tell the caller. */ - ldlm_lock_decref(lockh, mode); - LDLM_LOCK_PUT(matched); - RETURN(-ECANCELED); - } else if (osc_set_lock_data_with_check(matched, einfo)) { - *flags |= LDLM_FL_LVB_READY; - /* addref the lock only if not async requests and PW - * lock is matched whereas we asked for PR. */ - if (!rqset && einfo->ei_mode != mode) - ldlm_lock_addref(lockh, LCK_PR); - if (intent) { - /* I would like to be able to ASSERT here that - * rss <= kms, but I can't, for reasons which - * are explained in lov_enqueue() */ - } + einfo->ei_type, policy, mode, &lockh, 0); + if (mode) { + struct ldlm_lock *matched; + + if (*flags & LDLM_FL_TEST_LOCK) + RETURN(ELDLM_OK); + + matched = ldlm_handle2lock(&lockh); + if (agl) { + /* AGL enqueues DLM locks speculatively. Therefore if + * it already exists a DLM lock, it wll just inform the + * caller to cancel the AGL process for this stripe. */ + ldlm_lock_decref(&lockh, mode); + LDLM_LOCK_PUT(matched); + RETURN(-ECANCELED); + } else if (osc_set_lock_data_with_check(matched, einfo)) { + *flags |= LDLM_FL_LVB_READY; + + /* We already have a lock, and it's referenced. */ + (*upcall)(cookie, &lockh, ELDLM_LOCK_MATCHED); + + ldlm_lock_decref(&lockh, mode); + LDLM_LOCK_PUT(matched); + RETURN(ELDLM_OK); + } else { + ldlm_lock_decref(&lockh, mode); + LDLM_LOCK_PUT(matched); + } + } - /* We already have a lock, and it's referenced. - * - * At this point, the cl_lock::cll_state is CLS_QUEUING, - * AGL upcall may change it to CLS_HELD directly. */ - (*upcall)(cookie, ELDLM_OK); - - if (einfo->ei_mode != mode) - ldlm_lock_decref(lockh, LCK_PW); - else if (rqset) - /* For async requests, decref the lock. */ - ldlm_lock_decref(lockh, einfo->ei_mode); - LDLM_LOCK_PUT(matched); - RETURN(ELDLM_OK); - } else { - ldlm_lock_decref(lockh, mode); - LDLM_LOCK_PUT(matched); - } - } +no_match: + if (*flags & LDLM_FL_TEST_LOCK) + RETURN(-ENOLCK); - no_match: - if (intent) { + if (intent) { req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_LDLM_ENQUEUE_LVB); if (req == NULL) @@ -2377,20 +2258,29 @@ int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id, *flags &= ~LDLM_FL_BLOCK_GRANTED; rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb, - sizeof(*lvb), LVB_T_OST, lockh, async); - if (rqset) { - if (!rc) { - struct osc_enqueue_args *aa; - CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args)); - aa = ptlrpc_req_async_args(req); - aa->oa_ei = einfo; - aa->oa_exp = exp; - aa->oa_flags = flags; - aa->oa_upcall = upcall; - aa->oa_cookie = cookie; - aa->oa_lvb = lvb; - aa->oa_lockh = lockh; - aa->oa_agl = !!agl; + sizeof(*lvb), LVB_T_OST, &lockh, async); + if (async) { + if (!rc) { + struct osc_enqueue_args *aa; + CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args)); + aa = ptlrpc_req_async_args(req); + aa->oa_exp = exp; + aa->oa_mode = einfo->ei_mode; + aa->oa_type = einfo->ei_type; + lustre_handle_copy(&aa->oa_lockh, &lockh); + aa->oa_upcall = upcall; + aa->oa_cookie = cookie; + aa->oa_agl = !!agl; + if (!agl) { + aa->oa_flags = flags; + aa->oa_lvb = lvb; + } else { + /* AGL is essentially to enqueue an DLM lock + * in advance, so we don't care about the + * result of AGL enqueue. */ + aa->oa_lvb = NULL; + aa->oa_flags = NULL; + } req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_enqueue_interpret; @@ -2404,11 +2294,12 @@ int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id, RETURN(rc); } - rc = osc_enqueue_fini(req, lvb, upcall, cookie, flags, agl, rc); - if (intent) - ptlrpc_req_finished(req); + rc = osc_enqueue_fini(req, upcall, cookie, &lockh, einfo->ei_mode, + flags, agl, rc); + if (intent) + ptlrpc_req_finished(req); - RETURN(rc); + RETURN(rc); } int osc_match_base(struct obd_export *exp, struct ldlm_res_id *res_id, @@ -2656,48 +2547,7 @@ static int osc_get_info(const struct lu_env *env, struct obd_export *exp, if (!vallen || !val) RETURN(-EFAULT); - if (KEY_IS(KEY_LOCK_TO_STRIPE)) { - __u32 *stripe = val; - *vallen = sizeof(*stripe); - *stripe = 0; - RETURN(0); - } else if (KEY_IS(KEY_LAST_ID)) { - struct ptlrpc_request *req; - obd_id *reply; - char *tmp; - int rc; - - req = ptlrpc_request_alloc(class_exp2cliimp(exp), - &RQF_OST_GET_INFO_LAST_ID); - if (req == NULL) - RETURN(-ENOMEM); - - req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY, - RCL_CLIENT, keylen); - rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO); - if (rc) { - ptlrpc_request_free(req); - RETURN(rc); - } - - tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY); - memcpy(tmp, key, keylen); - - req->rq_no_delay = req->rq_no_resend = 1; - ptlrpc_request_set_replen(req); - rc = ptlrpc_queue_wait(req); - if (rc) - GOTO(out, rc); - - reply = req_capsule_server_get(&req->rq_pill, &RMF_OBD_ID); - if (reply == NULL) - GOTO(out, rc = -EPROTO); - - *((obd_id *)val) = *reply; - out: - ptlrpc_req_finished(req); - RETURN(rc); - } else if (KEY_IS(KEY_FIEMAP)) { + if (KEY_IS(KEY_FIEMAP)) { struct ll_fiemap_info_key *fm_key = (struct ll_fiemap_info_key *)key; struct ldlm_res_id res_id; @@ -2833,11 +2683,11 @@ static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp, if (KEY_IS(KEY_CACHE_LRU_SHRINK)) { struct client_obd *cli = &obd->u.cli; - int nr = atomic_read(&cli->cl_lru_in_list) >> 1; - int target = *(int *)val; + long nr = atomic_long_read(&cli->cl_lru_in_list) >> 1; + long target = *(long *)val; nr = osc_lru_shrink(env, cli, min(nr, target), true); - *(int *)val -= nr; + *(long *)val -= nr; RETURN(0); } @@ -2913,13 +2763,13 @@ static int osc_reconnect(const struct lu_env *env, if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) { long lost_grant; - client_obd_list_lock(&cli->cl_loi_list_lock); + spin_lock(&cli->cl_loi_list_lock); data->ocd_grant = (cli->cl_avail_grant + (cli->cl_dirty_pages << PAGE_CACHE_SHIFT)) ?: 2 * cli_brw_size(obd); - lost_grant = cli->cl_lost_grant; - cli->cl_lost_grant = 0; - client_obd_list_unlock(&cli->cl_loi_list_lock); + lost_grant = cli->cl_lost_grant; + cli->cl_lost_grant = 0; + spin_unlock(&cli->cl_loi_list_lock); CDEBUG(D_RPCTRACE, "ocd_connect_flags: "LPX64" ocd_version: %d" " ocd_grant: %d, lost: %ld.\n", data->ocd_connect_flags, @@ -2931,22 +2781,8 @@ static int osc_reconnect(const struct lu_env *env, static int osc_disconnect(struct obd_export *exp) { - struct obd_device *obd = class_exp2obd(exp); - struct llog_ctxt *ctxt; - int rc; - - ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT); - if (ctxt) { - if (obd->u.cli.cl_conn_count == 1) { - /* Flush any remaining cancel messages out to the - * target */ - llog_sync(ctxt, exp, 0); - } - llog_ctxt_put(ctxt); - } else { - CDEBUG(D_HA, "No LLOG_SIZE_REPL_CTXT found in obd %p\n", - obd); - } + struct obd_device *obd = class_exp2obd(exp); + int rc; rc = client_disconnect_export(exp); /** @@ -2984,10 +2820,10 @@ static int osc_import_event(struct obd_device *obd, switch (event) { case IMP_EVENT_DISCON: { cli = &obd->u.cli; - client_obd_list_lock(&cli->cl_loi_list_lock); - cli->cl_avail_grant = 0; - cli->cl_lost_grant = 0; - client_obd_list_unlock(&cli->cl_loi_list_lock); + spin_lock(&cli->cl_loi_list_lock); + cli->cl_avail_grant = 0; + cli->cl_lost_grant = 0; + spin_unlock(&cli->cl_loi_list_lock); break; } case IMP_EVENT_INACTIVE: { @@ -3126,7 +2962,7 @@ int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg) obd->obd_proc_entry = NULL; } } else { - rc = lprocfs_seq_obd_setup(obd); + rc = lprocfs_obd_setup(obd); } /* If the basic OSC proc tree construction succeeded then @@ -3207,9 +3043,6 @@ static int osc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage) obd_cleanup_client_import(obd); ptlrpc_lprocfs_unregister_obd(obd); lprocfs_obd_cleanup(obd); - rc = obd_llog_finish(obd, 0); - if (rc != 0) - CERROR("failed to cleanup llogging subsystems\n"); break; } } @@ -3245,17 +3078,16 @@ int osc_cleanup(struct obd_device *obd) int osc_process_config_base(struct obd_device *obd, struct lustre_cfg *lcfg) { - int rc = class_process_proc_seq_param(PARAM_OSC, obd->obd_vars, - lcfg, obd); + int rc = class_process_proc_param(PARAM_OSC, obd->obd_vars, lcfg, obd); return rc > 0 ? 0: rc; } -static int osc_process_config(struct obd_device *obd, obd_count len, void *buf) +static int osc_process_config(struct obd_device *obd, size_t len, void *buf) { return osc_process_config_base(obd, buf); } -struct obd_ops osc_obd_ops = { +static struct obd_ops osc_obd_ops = { .o_owner = THIS_MODULE, .o_setup = osc_setup, .o_precleanup = osc_precleanup, @@ -3267,7 +3099,6 @@ struct obd_ops osc_obd_ops = { .o_disconnect = osc_disconnect, .o_statfs = osc_statfs, .o_statfs_async = osc_statfs_async, - .o_unpackmd = osc_unpackmd, .o_create = osc_create, .o_destroy = osc_destroy, .o_getattr = osc_getattr, @@ -3285,11 +3116,7 @@ struct obd_ops osc_obd_ops = { .o_quotacheck = osc_quotacheck, }; -extern struct lu_kmem_descr osc_caches[]; -extern spinlock_t osc_ast_guard; -extern struct lock_class_key osc_ast_guard_class; - -int __init osc_init(void) +static int __init osc_init(void) { bool enable_proc = true; struct obd_type *type; @@ -3310,18 +3137,12 @@ int __init osc_init(void) enable_proc = false; rc = class_register_type(&osc_obd_ops, NULL, enable_proc, NULL, -#ifndef HAVE_ONLY_PROCFS_SEQ - NULL, -#endif LUSTRE_OSC_NAME, &osc_device_type); if (rc) { lu_kmem_fini(osc_caches); RETURN(rc); } - spin_lock_init(&osc_ast_guard); - lockdep_set_class(&osc_ast_guard, &osc_ast_guard_class); - RETURN(rc); }