X-Git-Url: https://git.whamcloud.com/?a=blobdiff_plain;f=lustre%2Fosc%2Fosc_io.c;h=6190433311bf65eeb191033929af9d0fc1ab57b8;hb=d031e92fe730792e3a4dba2f1e8ae90a085c96c5;hp=0df758c1f7f47b1b17af4f58f0238c82b2afc357;hpb=b401ba0c214b184424146dda994470aa3c4356c9;p=fs%2Flustre-release.git diff --git a/lustre/osc/osc_io.c b/lustre/osc/osc_io.c index 0df758c..6190433 100644 --- a/lustre/osc/osc_io.c +++ b/lustre/osc/osc_io.c @@ -38,12 +38,14 @@ * Author: Nikita Danilov */ -/** \addtogroup osc osc @{ */ - #define DEBUG_SUBSYSTEM S_OSC #include "osc_cl_internal.h" +/** \addtogroup osc + * @{ + */ + /***************************************************************************** * * Type conversions. @@ -113,7 +115,8 @@ static void osc_io_unplug(const struct lu_env *env, struct osc_object *osc, */ static int osc_io_submit(const struct lu_env *env, const struct cl_io_slice *ios, - enum cl_req_type crt, struct cl_2queue *queue) + enum cl_req_type crt, struct cl_2queue *queue, + enum cl_req_priority priority) { struct cl_page *page; struct cl_page *tmp; @@ -148,12 +151,17 @@ static int osc_io_submit(const struct lu_env *env, osc = cl2osc(opg->ops_cl.cpl_obj); exp = osc_export(osc); + if (priority > CRP_NORMAL) { + cfs_spin_lock(&oap->oap_lock); + oap->oap_async_flags |= ASYNC_HP; + cfs_spin_unlock(&oap->oap_lock); + } /* * This can be checked without cli->cl_loi_list_lock, because * ->oap_*_item are always manipulated when the page is owned. */ - if (!list_empty(&oap->oap_urgent_item) || - !list_empty(&oap->oap_rpc_item)) { + if (!cfs_list_empty(&oap->oap_urgent_item) || + !cfs_list_empty(&oap->oap_rpc_item)) { result = -EBUSY; break; } @@ -169,7 +177,7 @@ static int osc_io_submit(const struct lu_env *env, result = cl_page_prep(env, io, page, crt); if (result == 0) { cl_page_list_move(qout, qin, page); - if (list_empty(&oap->oap_pending_item)) { + if (cfs_list_empty(&oap->oap_pending_item)) { osc_io_submit_page(env, cl2osc_io(env, ios), opg, crt); } else { @@ -177,9 +185,18 @@ static int osc_io_submit(const struct lu_env *env, osc->oo_oinfo, oap, OSC_FLAGS); - if (result != 0) - break; + /* + * bug 18881: we can't just break out here when + * error occurrs after cl_page_prep has been + * called against the page. The correct + * way is to call page's completion routine, + * as in osc_oap_interrupted. For simplicity, + * we just force osc_set_async_flags_base() to + * not return error. + */ + LASSERT(result == 0); } + opg->ops_submit_time = cfs_time_current(); } else { LASSERT(result < 0); if (result != -EALREADY) @@ -194,6 +211,19 @@ static int osc_io_submit(const struct lu_env *env, /* * Don't keep client_obd_list_lock() for too long. * + * XXX client_obd_list lock has to be unlocked periodically to + * avoid soft-lockups that tend to happen otherwise (see bug + * 16651). On the other hand, osc_io_submit_page() queues a + * page with ASYNC_URGENT flag and so all pages queued up + * until this point are sent out immediately by + * osc_io_unplug() resulting in sub-optimal RPCs (sub-optimal + * RPCs only happen during `warm up' phase when less than + * cl_max_rpcs_in_flight RPCs are in flight). To balance these + * conflicting requirements, one might unplug once enough + * pages to form a large RPC were queued (i.e., use + * cli->cl_max_pages_per_rpc as OSC_QUEUE_GRAIN, see + * lop_makes_rpc()), or ignore soft-lockup issue altogether. + * * XXX lock_need_resched() should be used here, but it is not * available in the older of supported kernels. */ @@ -350,11 +380,14 @@ static int osc_punch_upcall(void *a, int rc) struct osc_punch_cbargs *args = a; args->opc_rc = rc; - complete(&args->opc_sync); + cfs_complete(&args->opc_sync); return 0; } -#ifdef __KERNEL__ +/* Disable osc_trunc_check() because it is naturally race between read and + * truncate. See bug 20645 for details. + */ +#if 0 && defined(__KERNEL__) /** * Checks that there are no pages being written in the extent being truncated. */ @@ -381,7 +414,7 @@ static void osc_trunc_check(const struct lu_env *env, struct cl_io *io, * XXX this is quite expensive check. */ cl_page_list_init(list); - cl_page_gang_lookup(env, clob, io, start + partial, CL_PAGE_EOF, list); + cl_page_gang_lookup(env, clob, io, start + partial, CL_PAGE_EOF, list, 0); cl_page_list_for_each(page, list) CL_PAGE_DEBUG(D_ERROR, env, page, "exists %lu\n", start); @@ -389,8 +422,9 @@ static void osc_trunc_check(const struct lu_env *env, struct cl_io *io, cl_page_list_disown(env, io, list); cl_page_list_fini(env, list); - spin_lock(&obj->oo_seatbelt); - list_for_each_entry(cp, &obj->oo_inflight[CRT_WRITE], ops_inflight) { + cfs_spin_lock(&obj->oo_seatbelt); + cfs_list_for_each_entry(cp, &obj->oo_inflight[CRT_WRITE], + ops_inflight) { page = cp->ops_cl.cpl_page; if (page->cp_index >= start + partial) { cfs_task_t *submitter; @@ -404,7 +438,7 @@ static void osc_trunc_check(const struct lu_env *env, struct cl_io *io, libcfs_debug_dumpstack(submitter); } } - spin_unlock(&obj->oo_seatbelt); + cfs_spin_unlock(&obj->oo_seatbelt); } #else /* __KERNEL__ */ # define osc_trunc_check(env, io, oio, size) do {;} while (0) @@ -424,9 +458,6 @@ static int osc_io_trunc_start(const struct lu_env *env, loff_t size = io->u.ci_truncate.tr_size; int result = 0; - - memset(oa, 0, sizeof(*oa)); - osc_trunc_check(env, io, oio, size); if (oio->oi_lockless == 0) { @@ -440,6 +471,7 @@ static int osc_io_trunc_start(const struct lu_env *env, cl_object_attr_unlock(obj); } + memset(oa, 0, sizeof(*oa)); if (result == 0) { oa->o_id = loi->loi_id; oa->o_gr = loi->loi_gr; @@ -449,7 +481,7 @@ static int osc_io_trunc_start(const struct lu_env *env, oa->o_valid = OBD_MD_FLID | OBD_MD_FLGROUP | OBD_MD_FLATIME | OBD_MD_FLCTIME | OBD_MD_FLMTIME; if (oio->oi_lockless) { - oa->o_flags = OBD_FL_TRUNCLOCK; + oa->o_flags = OBD_FL_SRVLOCK; oa->o_valid |= OBD_MD_FLFLAGS; } oa->o_size = size; @@ -457,7 +489,7 @@ static int osc_io_trunc_start(const struct lu_env *env, oa->o_valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS; capa = io->u.ci_truncate.tr_capa; - init_completion(&cbargs->opc_sync); + cfs_init_completion(&cbargs->opc_sync); result = osc_punch_base(osc_export(cl2osc(obj)), oa, capa, osc_punch_upcall, cbargs, PTLRPCD_SET); } @@ -473,7 +505,7 @@ static void osc_io_trunc_end(const struct lu_env *env, struct obdo *oa = &oio->oi_oa; int result; - wait_for_completion(&cbargs->opc_sync); + cfs_wait_for_completion(&cbargs->opc_sync); result = io->ci_result = cbargs->opc_rc; if (result == 0) { @@ -600,15 +632,30 @@ static void osc_req_attr_set(const struct lu_env *env, } if (flags & OBD_MD_FLHANDLE) { clerq = slice->crs_req; - LASSERT(!list_empty(&clerq->crq_pages)); + LASSERT(!cfs_list_empty(&clerq->crq_pages)); apage = container_of(clerq->crq_pages.next, struct cl_page, cp_flight); opg = osc_cl_page_osc(apage); apage = opg->ops_cl.cpl_page; /* now apage is a sub-page */ lock = cl_lock_at_page(env, apage->cp_obj, apage, NULL, 1, 1); - LASSERT(lock != NULL); + if (lock == NULL) { + struct cl_object_header *head; + struct cl_lock *scan; + + head = cl_object_header(apage->cp_obj); + cfs_list_for_each_entry(scan, &head->coh_locks, + cll_linkage) + CL_LOCK_DEBUG(D_ERROR, env, scan, + "no cover page!\n"); + CL_PAGE_DEBUG(D_ERROR, env, apage, + "dump uncover page!\n"); + libcfs_debug_dumpstack(NULL); + LBUG(); + } + olck = osc_lock_at(lock); LASSERT(olck != NULL); + LASSERT(ergo(opg->ops_srvlock, olck->ols_lock == NULL)); /* check for lockless io. */ if (olck->ols_lock != NULL) { oa->o_handle = olck->ols_lock->l_remote_handle;