X-Git-Url: https://git.whamcloud.com/?a=blobdiff_plain;ds=sidebyside;f=lustre%2Fosc%2Fosc_io.c;h=c9ccf7dc3b92d0d8c9d454c43e59b11b115a7815;hb=42c04e8ee918adb6ce658334c12610e925466752;hp=4b4ae0a1ad296ef9002b3aa76f8cc389d9c187fb;hpb=cefa8cda2ba2d288ccaa4ec077a6c627592503ea;p=fs%2Flustre-release.git diff --git a/lustre/osc/osc_io.c b/lustre/osc/osc_io.c index 4b4ae0a..c9ccf7d 100644 --- a/lustre/osc/osc_io.c +++ b/lustre/osc/osc_io.c @@ -38,12 +38,14 @@ * Author: Nikita Danilov */ -/** \addtogroup osc osc @{ */ - #define DEBUG_SUBSYSTEM S_OSC #include "osc_cl_internal.h" +/** \addtogroup osc + * @{ + */ + /***************************************************************************** * * Type conversions. @@ -113,7 +115,8 @@ static void osc_io_unplug(const struct lu_env *env, struct osc_object *osc, */ static int osc_io_submit(const struct lu_env *env, const struct cl_io_slice *ios, - enum cl_req_type crt, struct cl_2queue *queue) + enum cl_req_type crt, struct cl_2queue *queue, + enum cl_req_priority priority) { struct cl_page *page; struct cl_page *tmp; @@ -148,6 +151,11 @@ static int osc_io_submit(const struct lu_env *env, osc = cl2osc(opg->ops_cl.cpl_obj); exp = osc_export(osc); + if (priority > CRP_NORMAL) { + spin_lock(&oap->oap_lock); + oap->oap_async_flags |= ASYNC_HP; + spin_unlock(&oap->oap_lock); + } /* * This can be checked without cli->cl_loi_list_lock, because * ->oap_*_item are always manipulated when the page is owned. @@ -177,9 +185,18 @@ static int osc_io_submit(const struct lu_env *env, osc->oo_oinfo, oap, OSC_FLAGS); - if (result != 0) - break; + /* + * bug 18881: we can't just break out here when + * error occurrs after cl_page_prep has been + * called against the page. The correct + * way is to call page's completion routine, + * as in osc_oap_interrupted. For simplicity, + * we just force osc_set_async_flags_base() to + * not return error. + */ + LASSERT(result == 0); } + opg->ops_submit_time = cfs_time_current(); } else { LASSERT(result < 0); if (result != -EALREADY) @@ -194,6 +211,19 @@ static int osc_io_submit(const struct lu_env *env, /* * Don't keep client_obd_list_lock() for too long. * + * XXX client_obd_list lock has to be unlocked periodically to + * avoid soft-lockups that tend to happen otherwise (see bug + * 16651). On the other hand, osc_io_submit_page() queues a + * page with ASYNC_URGENT flag and so all pages queued up + * until this point are sent out immediately by + * osc_io_unplug() resulting in sub-optimal RPCs (sub-optimal + * RPCs only happen during `warm up' phase when less than + * cl_max_rpcs_in_flight RPCs are in flight). To balance these + * conflicting requirements, one might unplug once enough + * pages to form a large RPC were queued (i.e., use + * cli->cl_max_pages_per_rpc as OSC_QUEUE_GRAIN, see + * lop_makes_rpc()), or ignore soft-lockup issue altogether. + * * XXX lock_need_resched() should be used here, but it is not * available in the older of supported kernels. */ @@ -354,7 +384,10 @@ static int osc_punch_upcall(void *a, int rc) return 0; } -#ifdef __KERNEL__ +/* Disable osc_trunc_check() because it is naturally race between read and + * truncate. See bug 20645 for details. + */ +#if 0 && defined(__KERNEL__) /** * Checks that there are no pages being written in the extent being truncated. */ @@ -381,7 +414,7 @@ static void osc_trunc_check(const struct lu_env *env, struct cl_io *io, * XXX this is quite expensive check. */ cl_page_list_init(list); - cl_page_gang_lookup(env, clob, io, start + partial, CL_PAGE_EOF, list); + cl_page_gang_lookup(env, clob, io, start + partial, CL_PAGE_EOF, list, 0); cl_page_list_for_each(page, list) CL_PAGE_DEBUG(D_ERROR, env, page, "exists %lu\n", start); @@ -422,20 +455,22 @@ static int osc_io_trunc_start(const struct lu_env *env, struct osc_punch_cbargs *cbargs = &oio->oi_punch_cbarg; struct obd_capa *capa; loff_t size = io->u.ci_truncate.tr_size; - int result; - - memset(oa, 0, sizeof(*oa)); + int result = 0; osc_trunc_check(env, io, oio, size); - cl_object_attr_lock(obj); - result = cl_object_attr_get(env, obj, attr); - if (result == 0) { - attr->cat_size = attr->cat_kms = size; - result = cl_object_attr_set(env, obj, attr, CAT_SIZE|CAT_KMS); + if (oio->oi_lockless == 0) { + cl_object_attr_lock(obj); + result = cl_object_attr_get(env, obj, attr); + if (result == 0) { + attr->cat_size = attr->cat_kms = size; + result = cl_object_attr_set(env, obj, attr, + CAT_SIZE|CAT_KMS); + } + cl_object_attr_unlock(obj); } - cl_object_attr_unlock(obj); + memset(oa, 0, sizeof(*oa)); if (result == 0) { oa->o_id = loi->loi_id; oa->o_gr = loi->loi_gr; @@ -445,7 +480,7 @@ static int osc_io_trunc_start(const struct lu_env *env, oa->o_valid = OBD_MD_FLID | OBD_MD_FLGROUP | OBD_MD_FLATIME | OBD_MD_FLCTIME | OBD_MD_FLMTIME; if (oio->oi_lockless) { - oa->o_flags = OBD_FL_TRUNCLOCK; + oa->o_flags = OBD_FL_SRVLOCK; oa->o_valid |= OBD_MD_FLFLAGS; } oa->o_size = size; @@ -602,19 +637,29 @@ static void osc_req_attr_set(const struct lu_env *env, opg = osc_cl_page_osc(apage); apage = opg->ops_cl.cpl_page; /* now apage is a sub-page */ lock = cl_lock_at_page(env, apage->cp_obj, apage, NULL, 1, 1); - if (lock != NULL) { - olck = osc_lock_at(lock); - LASSERT(olck != NULL); - /* check for lockless io. */ - if (olck->ols_lock != NULL) { - oa->o_handle = olck->ols_lock->l_remote_handle; - oa->o_valid |= OBD_MD_FLHANDLE; - } - cl_lock_put(env, lock); - } else { - /* Should only be possible with liblustre */ - LASSERT(LIBLUSTRE_CLIENT); + if (lock == NULL) { + struct cl_object_header *head; + struct cl_lock *scan; + + head = cl_object_header(apage->cp_obj); + list_for_each_entry(scan, &head->coh_locks, cll_linkage) + CL_LOCK_DEBUG(D_ERROR, env, scan, + "no cover page!\n"); + CL_PAGE_DEBUG(D_ERROR, env, apage, + "dump uncover page!\n"); + libcfs_debug_dumpstack(NULL); + LBUG(); + } + + olck = osc_lock_at(lock); + LASSERT(olck != NULL); + LASSERT(ergo(opg->ops_srvlock, olck->ols_lock == NULL)); + /* check for lockless io. */ + if (olck->ols_lock != NULL) { + oa->o_handle = olck->ols_lock->l_remote_handle; + oa->o_valid |= OBD_MD_FLHANDLE; } + cl_lock_put(env, lock); } } @@ -641,7 +686,7 @@ int osc_req_init(const struct lu_env *env, struct cl_device *dev, struct osc_req *or; int result; - OBD_SLAB_ALLOC_PTR(or, osc_req_kmem); + OBD_SLAB_ALLOC_PTR_GFP(or, osc_req_kmem, CFS_ALLOC_IO); if (or != NULL) { cl_req_slice_add(req, &or->or_cl, dev, &osc_req_ops); result = 0;