From 8e9971aa38dc8e2bc88dda1f507c790abafcfdf4 Mon Sep 17 00:00:00 2001 From: Hongchao Zhang Date: Sat, 9 Feb 2013 01:05:13 +0800 Subject: [PATCH] LU-2703 osc: cancel lock outside of spinlock during calling *_ap_completion, there is a ldlm lock needed to be cancelled if the page belongs to readahead. this patch move the cancellation out of the spin_lock cl_loi_list_lock for it is a big operation containing lock acquisition (mutex) and memory allocation. Change-Id: Iecd4c27d405ffa968cc74f264dfc48cb2e2f5671 Signed-off-by: Hongchao Zhang Reviewed-on: http://review.whamcloud.com/5285 Tested-by: Hudson Reviewed-by: Johann Lombardi Tested-by: Maloo --- lustre/include/lustre/lustre_idl.h | 4 ++ lustre/include/lustre_net.h | 4 +- lustre/include/obd_ost.h | 20 ++++---- lustre/llite/rw.c | 24 ++++----- lustre/osc/osc_request.c | 102 ++++++++++++++++++++++++++----------- 5 files changed, 98 insertions(+), 56 deletions(-) diff --git a/lustre/include/lustre/lustre_idl.h b/lustre/include/lustre/lustre_idl.h index bb193b4..a1ea431 100644 --- a/lustre/include/lustre/lustre_idl.h +++ b/lustre/include/lustre/lustre_idl.h @@ -537,6 +537,9 @@ enum obdo_flags { OBD_FL_LOCAL_MASK = 0xF0000000, /* temporary OBDO used by osc_brw_async (see bug 18364) */ OBD_FL_TEMPORARY = 0x10000000, + /* during calling *_ap_completion, top layer sets this flag to indicate + * there is a lock needed to cancel (see bug LU-2703) */ + OBD_FL_HAVE_LOCK = 0x20000000, }; #define LOV_MAGIC_V1 0x0BD10BD0 @@ -673,6 +676,7 @@ extern void lustre_swab_obd_statfs (struct obd_statfs *os); #define OBD_BRW_SRVLOCK 0x200 /* Client holds no lock over this page */ #define OBD_BRW_ASYNC 0x400 /* Server may delay commit to disk */ #define OBD_BRW_MEMALLOC 0x800 /* Client runs in the "kswapd" context */ +#define OBD_BRW_READA 0x1000 /* read ahead */ #define OBD_OBJECT_EOF 0xffffffffffffffffULL diff --git a/lustre/include/lustre_net.h b/lustre/include/lustre_net.h index 8770cf6..6374eca 100644 --- a/lustre/include/lustre_net.h +++ b/lustre/include/lustre_net.h @@ -190,8 +190,8 @@ union ptlrpc_async_args { * big enough. For _tons_ of context, OBD_ALLOC a struct and store * a pointer to it here. The pointer_arg ensures this struct is at * least big enough for that. */ - void *pointer_arg[9]; - __u64 space[5]; + void *pointer_arg[10]; + __u64 space[6]; }; struct ptlrpc_request_set; diff --git a/lustre/include/obd_ost.h b/lustre/include/obd_ost.h index 3195586..1102f03 100644 --- a/lustre/include/obd_ost.h +++ b/lustre/include/obd_ost.h @@ -46,15 +46,17 @@ #include struct osc_brw_async_args { - struct obdo *aa_oa; - int aa_requested_nob; - int aa_nio_count; - obd_count aa_page_count; - int aa_resends; - int aa_pshift; - struct brw_page **aa_ppga; - struct client_obd *aa_cli; - struct list_head aa_oaps; + struct obdo *aa_oa; + int aa_requested_nob; + int aa_nio_count; + obd_count aa_page_count; + int aa_resends; + int aa_pshift; + struct brw_page **aa_ppga; + struct client_obd *aa_cli; + struct list_head aa_oaps; + obd_count aa_handle_count; + struct lustre_handle *aa_handle; }; #define osc_grant_args osc_brw_async_args diff --git a/lustre/llite/rw.c b/lustre/llite/rw.c index 2926e7d..788946e 100644 --- a/lustre/llite/rw.c +++ b/lustre/llite/rw.c @@ -1190,8 +1190,6 @@ int ll_ap_completion(void *data, int cmd, struct obdo *oa, int rc) { struct ll_async_page *llap; struct page *page; - struct obd_export *exp; - obd_off end; int ret = 0; ENTRY; @@ -1202,16 +1200,13 @@ int ll_ap_completion(void *data, int cmd, struct obdo *oa, int rc) LL_CDEBUG_PAGE(D_PAGE, page, "completing cmd %d with %d\n", cmd, rc); - if (cmd & OBD_BRW_READ && llap->llap_defer_uptodate) { - ll_ra_count_put(ll_i2sbi(page->mapping->host), 1); - - LASSERT(lustre_handle_is_used(&llap->llap_lockh_granted)); - exp = ll_i2obdexp(page->mapping->host); - end = ((loff_t)page->index) << CFS_PAGE_SHIFT; - end += CFS_PAGE_SIZE - 1; - obd_cancel(exp, ll_i2info(page->mapping->host)->lli_smd, LCK_PR, - &llap->llap_lockh_granted, OBD_FAST_LOCK, end); - } + if (cmd & OBD_BRW_READ && llap->llap_defer_uptodate) { + ll_ra_count_put(ll_i2sbi(page->mapping->host), 1); + + LASSERT(lustre_handle_is_used(&llap->llap_lockh_granted)); + oa->o_flags |= OBD_FL_HAVE_LOCK; + oa->o_handle = llap->llap_lockh_granted; + } if (rc == 0) { if (cmd & OBD_BRW_READ) { @@ -1336,8 +1331,9 @@ static int ll_issue_page_read(struct obd_export *exp, llap->llap_defer_uptodate = defer; llap->llap_ra_used = 0; rc = obd_queue_group_io(exp, ll_i2info(page->mapping->host)->lli_smd, - NULL, oig, llap->llap_cookie, OBD_BRW_READ, 0, - CFS_PAGE_SIZE, 0, ASYNC_COUNT_STABLE | + NULL, oig, llap->llap_cookie, + OBD_BRW_READ | (defer == 0 ? 0 : OBD_BRW_READA), + 0, CFS_PAGE_SIZE, 0, ASYNC_COUNT_STABLE | ASYNC_READY | ASYNC_URGENT); if (rc) { LL_CDEBUG_PAGE(D_ERROR, page, "read queue failed: rc %d\n", rc); diff --git a/lustre/osc/osc_request.c b/lustre/osc/osc_request.c index 0244f35..a5f2049 100644 --- a/lustre/osc/osc_request.c +++ b/lustre/osc/osc_request.c @@ -1690,10 +1690,12 @@ static int osc_brw_redo_request(struct ptlrpc_request *request, new_aa = ptlrpc_req_async_args(new_req); - CFS_INIT_LIST_HEAD(&new_aa->aa_oaps); - list_splice(&aa->aa_oaps, &new_aa->aa_oaps); - CFS_INIT_LIST_HEAD(&aa->aa_oaps); - new_aa->aa_resends = aa->aa_resends; + CFS_INIT_LIST_HEAD(&new_aa->aa_oaps); + list_splice(&aa->aa_oaps, &new_aa->aa_oaps); + CFS_INIT_LIST_HEAD(&aa->aa_oaps); + new_aa->aa_resends = aa->aa_resends; + new_aa->aa_handle_count = aa->aa_handle_count; + new_aa->aa_handle = aa->aa_handle; list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) { if (oap->oap_request) { @@ -2290,9 +2292,10 @@ static void osc_ap_completion(struct client_obd *cli, struct obdo *oa, static int brw_interpret(struct ptlrpc_request *request, void *data, int rc) { - struct osc_brw_async_args *aa = data; - struct client_obd *cli; - ENTRY; + struct osc_brw_async_args *aa = data; + struct client_obd *cli; + int index = 0; + ENTRY; rc = osc_brw_fini_request(request, rc); CDEBUG(D_INODE, "request %p aa %p rc %d\n", request, aa, rc); @@ -2342,13 +2345,21 @@ static int brw_interpret(struct ptlrpc_request *request, void *data, int rc) else cli->cl_r_in_flight--; - /* the caller may re-use the oap after the completion call so - * we need to clean it up a little */ - list_for_each_entry_safe(oap, tmp, &aa->aa_oaps, oap_rpc_item) { - list_del_init(&oap->oap_rpc_item); - osc_ap_completion(cli, aa->aa_oa, oap, 1, rc); - } - OBDO_FREE(aa->aa_oa); + /* the caller may re-use the oap after the completion call so + * we need to clean it up a little */ + list_for_each_entry_safe(oap, tmp, &aa->aa_oaps, oap_rpc_item) { + list_del_init(&oap->oap_rpc_item); + + aa->aa_oa->o_flags &= ~OBD_FL_HAVE_LOCK; + osc_ap_completion(cli, aa->aa_oa, oap, 1, rc); + if (aa->aa_oa->o_flags & OBD_FL_HAVE_LOCK) { + LASSERT(!(aa->aa_oa->o_valid & + OBD_MD_FLHANDLE)); + LASSERT(index < aa->aa_handle_count); + aa->aa_handle[index++] = aa->aa_oa->o_handle; + } + } + OBDO_FREE(aa->aa_oa); } else { /* from async_internal() */ obd_count i; for (i = 0; i < aa->aa_page_count; i++) @@ -2367,25 +2378,40 @@ static int brw_interpret(struct ptlrpc_request *request, void *data, int rc) osc_check_rpcs(cli); client_obd_list_unlock(&cli->cl_loi_list_lock); - osc_release_ppga(aa->aa_ppga, aa->aa_page_count); + if (index > 0) { + obd_count i; - RETURN(rc); + LASSERT(index <= aa->aa_handle_count); + for (i = 0; i < index; i++) + ldlm_lock_decref(aa->aa_handle + i, LCK_PR); + + OBD_FREE(aa->aa_handle, + sizeof(struct lustre_handle) * aa->aa_handle_count); + } else { + LASSERT(aa->aa_handle_count == 0 && aa->aa_handle == NULL); + } + + osc_release_ppga(aa->aa_ppga, aa->aa_page_count); + + RETURN(rc); } static struct ptlrpc_request *osc_build_req(struct client_obd *cli, struct list_head *rpc_list, int page_count, int cmd) { - struct ptlrpc_request *req; - struct brw_page **pga = NULL; - struct osc_brw_async_args *aa; - struct obdo *oa = NULL; - struct obd_async_page_ops *ops = NULL; - void *caller_data = NULL; - struct osc_async_page *oap; - struct ldlm_lock *lock = NULL; - obd_valid valid; - int i, rc, mpflag = 0; + struct ptlrpc_request *req; + struct brw_page **pga = NULL; + struct osc_brw_async_args *aa; + struct obdo *oa = NULL; + struct obd_async_page_ops *ops = NULL; + void *caller_data = NULL; + struct osc_async_page *oap; + struct ldlm_lock *lock = NULL; + obd_valid valid; + int handle_count = 0; + struct lustre_handle *handles = NULL; + int i, rc, mpflag = 0; ENTRY; LASSERT(!list_empty(rpc_list)); @@ -2408,6 +2434,9 @@ static struct ptlrpc_request *osc_build_req(struct client_obd *cli, caller_data = oap->oap_caller_data; lock = oap->oap_ldlm_lock; } + if (oap->oap_cmd & OBD_BRW_READA) + handle_count++; + pga[i] = &oap->oap_brw_page; pga[i]->off = oap->oap_obj_off + oap->oap_page_off; CDEBUG(0, "put page %p index %lu oap %p flg %x to pga\n", @@ -2415,6 +2444,12 @@ static struct ptlrpc_request *osc_build_req(struct client_obd *cli, i++; } + if (handle_count > 0) { + OBD_ALLOC(handles, sizeof(struct lustre_handle) * handle_count); + if (handles == NULL) + GOTO(out, req = ERR_PTR(-ENOMEM)); + } + /* always get the data for the obdo for the rpc */ LASSERT(ops != NULL); ops->ap_fill_obdo(caller_data, cmd, oa); @@ -2460,11 +2495,13 @@ static struct ptlrpc_request *osc_build_req(struct client_obd *cli, } ops->ap_update_obdo(caller_data, cmd, oa, valid); - CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args)); - aa = ptlrpc_req_async_args(req); - CFS_INIT_LIST_HEAD(&aa->aa_oaps); - list_splice(rpc_list, &aa->aa_oaps); - CFS_INIT_LIST_HEAD(rpc_list); + CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args)); + aa = ptlrpc_req_async_args(req); + aa->aa_handle_count = handle_count; + aa->aa_handle = handles; + CFS_INIT_LIST_HEAD(&aa->aa_oaps); + list_splice(rpc_list, &aa->aa_oaps); + CFS_INIT_LIST_HEAD(rpc_list); out: if (cmd & OBD_BRW_MEMALLOC) @@ -2475,6 +2512,9 @@ out: OBDO_FREE(oa); if (pga) OBD_FREE(pga, sizeof(*pga) * page_count); + if (handles) + OBD_FREE(handles, + sizeof(struct lustre_handle) * handle_count); } RETURN(req); } -- 1.8.3.1