X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=blobdiff_plain;f=lustre%2Fosc%2Fosc_io.c;h=b31fdf8c26516571f0ebd555397a83e165da6099;hp=4b4ae0a1ad296ef9002b3aa76f8cc389d9c187fb;hb=52c90605254df678dfb932c5297c1fcb7e743af1;hpb=cefa8cda2ba2d288ccaa4ec077a6c627592503ea diff --git a/lustre/osc/osc_io.c b/lustre/osc/osc_io.c index 4b4ae0a..b31fdf8 100644 --- a/lustre/osc/osc_io.c +++ b/lustre/osc/osc_io.c @@ -38,12 +38,14 @@ * Author: Nikita Danilov */ -/** \addtogroup osc osc @{ */ - #define DEBUG_SUBSYSTEM S_OSC #include "osc_cl_internal.h" +/** \addtogroup osc + * @{ + */ + /***************************************************************************** * * Type conversions. @@ -99,12 +101,6 @@ static void osc_io_unplug(const struct lu_env *env, struct osc_object *osc, } /** - * How many pages osc_io_submit() queues before checking whether an RPC is - * ready. - */ -#define OSC_QUEUE_GRAIN (32) - -/** * An implementation of cl_io_operations::cio_io_submit() method for osc * layer. Iterates over pages in the in-queue, prepares each for io by calling * cl_page_prep() and then either submits them through osc_io_submit_page() @@ -113,7 +109,8 @@ static void osc_io_unplug(const struct lu_env *env, struct osc_object *osc, */ static int osc_io_submit(const struct lu_env *env, const struct cl_io_slice *ios, - enum cl_req_type crt, struct cl_2queue *queue) + enum cl_req_type crt, struct cl_2queue *queue, + enum cl_req_priority priority) { struct cl_page *page; struct cl_page *tmp; @@ -148,12 +145,17 @@ static int osc_io_submit(const struct lu_env *env, osc = cl2osc(opg->ops_cl.cpl_obj); exp = osc_export(osc); + if (priority > CRP_NORMAL) { + cfs_spin_lock(&oap->oap_lock); + oap->oap_async_flags |= ASYNC_HP; + cfs_spin_unlock(&oap->oap_lock); + } /* * This can be checked without cli->cl_loi_list_lock, because * ->oap_*_item are always manipulated when the page is owned. */ - if (!list_empty(&oap->oap_urgent_item) || - !list_empty(&oap->oap_rpc_item)) { + if (!cfs_list_empty(&oap->oap_urgent_item) || + !cfs_list_empty(&oap->oap_rpc_item)) { result = -EBUSY; break; } @@ -169,7 +171,7 @@ static int osc_io_submit(const struct lu_env *env, result = cl_page_prep(env, io, page, crt); if (result == 0) { cl_page_list_move(qout, qin, page); - if (list_empty(&oap->oap_pending_item)) { + if (cfs_list_empty(&oap->oap_pending_item)) { osc_io_submit_page(env, cl2osc_io(env, ios), opg, crt); } else { @@ -177,9 +179,18 @@ static int osc_io_submit(const struct lu_env *env, osc->oo_oinfo, oap, OSC_FLAGS); - if (result != 0) - break; + /* + * bug 18881: we can't just break out here when + * error occurrs after cl_page_prep has been + * called against the page. The correct + * way is to call page's completion routine, + * as in osc_oap_interrupted. For simplicity, + * we just force osc_set_async_flags_base() to + * not return error. + */ + LASSERT(result == 0); } + opg->ops_submit_time = cfs_time_current(); } else { LASSERT(result < 0); if (result != -EALREADY) @@ -191,17 +202,18 @@ static int osc_io_submit(const struct lu_env *env, */ result = 0; } + /* - * Don't keep client_obd_list_lock() for too long. + * We might hold client_obd_list_lock() for too long and cause + * soft-lockups (see bug 16651). But on the other hand, pages + * are queued here with ASYNC_URGENT flag, thus will be sent + * out immediately once osc_io_unplug() be called, possibly + * resulting sub-optimal RPCs. * - * XXX lock_need_resched() should be used here, but it is not - * available in the older of supported kernels. + * We think creating optimal-sized RPCs is more important than + * avoiding the transient soft-lockups, plus I believe the + * soft-locks only happen in full debug testing. */ - if (queued > OSC_QUEUE_GRAIN || cfs_need_resched()) { - queued = 0; - osc_io_unplug(env, osc, cli); - cfs_cond_resched(); - } } LASSERT(ergo(result == 0, cli != NULL)); @@ -345,16 +357,19 @@ static int osc_io_fault_start(const struct lu_env *env, RETURN(0); } -static int osc_punch_upcall(void *a, int rc) +static int osc_setattr_upcall(void *a, int rc) { - struct osc_punch_cbargs *args = a; + struct osc_setattr_cbargs *args = a; args->opc_rc = rc; - complete(&args->opc_sync); + cfs_complete(&args->opc_sync); return 0; } -#ifdef __KERNEL__ +/* Disable osc_trunc_check() because it is naturally race between read and + * truncate. See bug 20645 for details. + */ +#if 0 && defined(__KERNEL__) /** * Checks that there are no pages being written in the extent being truncated. */ @@ -381,7 +396,7 @@ static void osc_trunc_check(const struct lu_env *env, struct cl_io *io, * XXX this is quite expensive check. */ cl_page_list_init(list); - cl_page_gang_lookup(env, clob, io, start + partial, CL_PAGE_EOF, list); + cl_page_gang_lookup(env, clob, io, start + partial, CL_PAGE_EOF, list, 0); cl_page_list_for_each(page, list) CL_PAGE_DEBUG(D_ERROR, env, page, "exists %lu\n", start); @@ -389,8 +404,9 @@ static void osc_trunc_check(const struct lu_env *env, struct cl_io *io, cl_page_list_disown(env, io, list); cl_page_list_fini(env, list); - spin_lock(&obj->oo_seatbelt); - list_for_each_entry(cp, &obj->oo_inflight[CRT_WRITE], ops_inflight) { + cfs_spin_lock(&obj->oo_seatbelt); + cfs_list_for_each_entry(cp, &obj->oo_inflight[CRT_WRITE], + ops_inflight) { page = cp->ops_cl.cpl_page; if (page->cp_index >= start + partial) { cfs_task_t *submitter; @@ -404,14 +420,14 @@ static void osc_trunc_check(const struct lu_env *env, struct cl_io *io, libcfs_debug_dumpstack(submitter); } } - spin_unlock(&obj->oo_seatbelt); + cfs_spin_unlock(&obj->oo_seatbelt); } #else /* __KERNEL__ */ # define osc_trunc_check(env, io, oio, size) do {;} while (0) #endif -static int osc_io_trunc_start(const struct lu_env *env, - const struct cl_io_slice *slice) +static int osc_io_setattr_start(const struct lu_env *env, + const struct cl_io_slice *slice) { struct cl_io *io = slice->cis_io; struct osc_io *oio = cl2osc_io(env, slice); @@ -419,23 +435,42 @@ static int osc_io_trunc_start(const struct lu_env *env, struct lov_oinfo *loi = cl2osc(obj)->oo_oinfo; struct cl_attr *attr = &osc_env_info(env)->oti_attr; struct obdo *oa = &oio->oi_oa; - struct osc_punch_cbargs *cbargs = &oio->oi_punch_cbarg; - struct obd_capa *capa; - loff_t size = io->u.ci_truncate.tr_size; - int result; - - memset(oa, 0, sizeof(*oa)); - - osc_trunc_check(env, io, oio, size); + struct osc_setattr_cbargs *cbargs = &oio->oi_setattr_cbarg; + loff_t size = io->u.ci_setattr.sa_attr.lvb_size; + unsigned int ia_valid = io->u.ci_setattr.sa_valid; + int result = 0; + struct obd_info oinfo = { { { 0 } } }; + + if (ia_valid & ATTR_SIZE) + osc_trunc_check(env, io, oio, size); + + if (oio->oi_lockless == 0) { + cl_object_attr_lock(obj); + result = cl_object_attr_get(env, obj, attr); + if (result == 0) { + unsigned int cl_valid = 0; - cl_object_attr_lock(obj); - result = cl_object_attr_get(env, obj, attr); - if (result == 0) { - attr->cat_size = attr->cat_kms = size; - result = cl_object_attr_set(env, obj, attr, CAT_SIZE|CAT_KMS); + if (ia_valid & ATTR_SIZE) { + attr->cat_size = attr->cat_kms = size; + cl_valid = (CAT_SIZE | CAT_KMS); + } + if (ia_valid & ATTR_MTIME_SET) { + attr->cat_mtime = io->u.ci_setattr.sa_attr.lvb_mtime; + cl_valid |= CAT_MTIME; + } + if (ia_valid & ATTR_ATIME_SET) { + attr->cat_atime = io->u.ci_setattr.sa_attr.lvb_atime; + cl_valid |= CAT_ATIME; + } + if (ia_valid & ATTR_CTIME_SET) { + attr->cat_ctime = io->u.ci_setattr.sa_attr.lvb_ctime; + cl_valid |= CAT_CTIME; + } + result = cl_object_attr_set(env, obj, attr, cl_valid); + } + cl_object_attr_unlock(obj); } - cl_object_attr_unlock(obj); - + memset(oa, 0, sizeof(*oa)); if (result == 0) { oa->o_id = loi->loi_id; oa->o_gr = loi->loi_gr; @@ -444,86 +479,118 @@ static int osc_io_trunc_start(const struct lu_env *env, oa->o_ctime = attr->cat_ctime; oa->o_valid = OBD_MD_FLID | OBD_MD_FLGROUP | OBD_MD_FLATIME | OBD_MD_FLCTIME | OBD_MD_FLMTIME; - if (oio->oi_lockless) { - oa->o_flags = OBD_FL_TRUNCLOCK; - oa->o_valid |= OBD_MD_FLFLAGS; + if (ia_valid & ATTR_SIZE) { + oa->o_size = size; + oa->o_blocks = OBD_OBJECT_EOF; + oa->o_valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS; + + if (oio->oi_lockless) { + oa->o_flags = OBD_FL_SRVLOCK; + oa->o_valid |= OBD_MD_FLFLAGS; + } + } else { + LASSERT(oio->oi_lockless == 0); } - oa->o_size = size; - oa->o_blocks = OBD_OBJECT_EOF; - oa->o_valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS; - - capa = io->u.ci_truncate.tr_capa; - init_completion(&cbargs->opc_sync); - result = osc_punch_base(osc_export(cl2osc(obj)), oa, capa, - osc_punch_upcall, cbargs, PTLRPCD_SET); + + oinfo.oi_oa = oa; + oinfo.oi_capa = io->u.ci_setattr.sa_capa; + cfs_init_completion(&cbargs->opc_sync); + + if (ia_valid & ATTR_SIZE) + result = osc_punch_base(osc_export(cl2osc(obj)), + &oinfo, osc_setattr_upcall, + cbargs, PTLRPCD_SET); + else + result = osc_setattr_async_base(osc_export(cl2osc(obj)), + &oinfo, NULL, + osc_setattr_upcall, + cbargs, PTLRPCD_SET); } return result; } -static void osc_io_trunc_end(const struct lu_env *env, - const struct cl_io_slice *slice) +static void osc_io_setattr_end(const struct lu_env *env, + const struct cl_io_slice *slice) { struct cl_io *io = slice->cis_io; struct osc_io *oio = cl2osc_io(env, slice); - struct osc_punch_cbargs *cbargs = &oio->oi_punch_cbarg; - struct obdo *oa = &oio->oi_oa; + struct osc_setattr_cbargs *cbargs = &oio->oi_setattr_cbarg; int result; - wait_for_completion(&cbargs->opc_sync); + cfs_wait_for_completion(&cbargs->opc_sync); result = io->ci_result = cbargs->opc_rc; if (result == 0) { struct cl_object *obj = slice->cis_obj; - if (oio->oi_lockless == 0) { - struct cl_attr *attr = &osc_env_info(env)->oti_attr; - int valid = 0; - - /* Update kms & size */ - if (oa->o_valid & OBD_MD_FLSIZE) { - attr->cat_size = oa->o_size; - attr->cat_kms = oa->o_size; - valid |= CAT_KMS|CAT_SIZE; - } - if (oa->o_valid & OBD_MD_FLBLOCKS) { - attr->cat_blocks = oa->o_blocks; - valid |= CAT_BLOCKS; - } - if (oa->o_valid & OBD_MD_FLMTIME) { - attr->cat_mtime = oa->o_mtime; - valid |= CAT_MTIME; - } - if (oa->o_valid & OBD_MD_FLCTIME) { - attr->cat_ctime = oa->o_ctime; - valid |= CAT_CTIME; - } - if (oa->o_valid & OBD_MD_FLATIME) { - attr->cat_atime = oa->o_atime; - valid |= CAT_ATIME; - } - cl_object_attr_lock(obj); - result = cl_object_attr_set(env, obj, attr, valid); - cl_object_attr_unlock(obj); - } else { /* lockless truncate */ + if (oio->oi_lockless) { + /* lockless truncate */ struct osc_device *osd = lu2osc_dev(obj->co_lu.lo_dev); + + LASSERT(cl_io_is_trunc(io)); /* XXX: Need a lock. */ osd->od_stats.os_lockless_truncates++; } } +} + +static int osc_io_read_start(const struct lu_env *env, + const struct cl_io_slice *slice) +{ + struct osc_io *oio = cl2osc_io(env, slice); + struct cl_object *obj = slice->cis_obj; + struct cl_attr *attr = &osc_env_info(env)->oti_attr; + int result = 0; + ENTRY; + + if (oio->oi_lockless == 0) { + cl_object_attr_lock(obj); + result = cl_object_attr_get(env, obj, attr); + if (result == 0) { + attr->cat_atime = LTIME_S(CFS_CURRENT_TIME); + result = cl_object_attr_set(env, obj, attr, + CAT_ATIME); + } + cl_object_attr_unlock(obj); + } + RETURN(result); +} + +static int osc_io_write_start(const struct lu_env *env, + const struct cl_io_slice *slice) +{ + struct osc_io *oio = cl2osc_io(env, slice); + struct cl_object *obj = slice->cis_obj; + struct cl_attr *attr = &osc_env_info(env)->oti_attr; + int result = 0; + ENTRY; - /* return result; */ + if (oio->oi_lockless == 0) { + cl_object_attr_lock(obj); + result = cl_object_attr_get(env, obj, attr); + if (result == 0) { + attr->cat_mtime = attr->cat_ctime = + LTIME_S(CFS_CURRENT_TIME); + result = cl_object_attr_set(env, obj, attr, + CAT_MTIME | CAT_CTIME); + } + cl_object_attr_unlock(obj); + } + RETURN(result); } static const struct cl_io_operations osc_io_ops = { .op = { [CIT_READ] = { + .cio_start = osc_io_read_start, .cio_fini = osc_io_fini }, [CIT_WRITE] = { + .cio_start = osc_io_write_start, .cio_fini = osc_io_fini }, - [CIT_TRUNC] = { - .cio_start = osc_io_trunc_start, - .cio_end = osc_io_trunc_end + [CIT_SETATTR] = { + .cio_start = osc_io_setattr_start, + .cio_end = osc_io_setattr_end }, [CIT_FAULT] = { .cio_fini = osc_io_fini, @@ -596,25 +663,36 @@ static void osc_req_attr_set(const struct lu_env *env, } if (flags & OBD_MD_FLHANDLE) { clerq = slice->crs_req; - LASSERT(!list_empty(&clerq->crq_pages)); + LASSERT(!cfs_list_empty(&clerq->crq_pages)); apage = container_of(clerq->crq_pages.next, struct cl_page, cp_flight); opg = osc_cl_page_osc(apage); apage = opg->ops_cl.cpl_page; /* now apage is a sub-page */ lock = cl_lock_at_page(env, apage->cp_obj, apage, NULL, 1, 1); - if (lock != NULL) { - olck = osc_lock_at(lock); - LASSERT(olck != NULL); - /* check for lockless io. */ - if (olck->ols_lock != NULL) { - oa->o_handle = olck->ols_lock->l_remote_handle; - oa->o_valid |= OBD_MD_FLHANDLE; - } - cl_lock_put(env, lock); - } else { - /* Should only be possible with liblustre */ - LASSERT(LIBLUSTRE_CLIENT); + if (lock == NULL) { + struct cl_object_header *head; + struct cl_lock *scan; + + head = cl_object_header(apage->cp_obj); + cfs_list_for_each_entry(scan, &head->coh_locks, + cll_linkage) + CL_LOCK_DEBUG(D_ERROR, env, scan, + "no cover page!\n"); + CL_PAGE_DEBUG(D_ERROR, env, apage, + "dump uncover page!\n"); + libcfs_debug_dumpstack(NULL); + LBUG(); + } + + olck = osc_lock_at(lock); + LASSERT(olck != NULL); + LASSERT(ergo(opg->ops_srvlock, olck->ols_lock == NULL)); + /* check for lockless io. */ + if (olck->ols_lock != NULL) { + oa->o_handle = olck->ols_lock->l_remote_handle; + oa->o_valid |= OBD_MD_FLHANDLE; } + cl_lock_put(env, lock); } } @@ -641,7 +719,7 @@ int osc_req_init(const struct lu_env *env, struct cl_device *dev, struct osc_req *or; int result; - OBD_SLAB_ALLOC_PTR(or, osc_req_kmem); + OBD_SLAB_ALLOC_PTR_GFP(or, osc_req_kmem, CFS_ALLOC_IO); if (or != NULL) { cl_req_slice_add(req, &or->or_cl, dev, &osc_req_ops); result = 0;