X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=blobdiff_plain;f=lustre%2Fosc%2Fosc_io.c;h=5fe0eb26c898b6c8f8e3e7738fde9d56c4031267;hp=0df758c1f7f47b1b17af4f58f0238c82b2afc357;hb=0a859380c36ac24871f221b35042f76c56b04438;hpb=b401ba0c214b184424146dda994470aa3c4356c9 diff --git a/lustre/osc/osc_io.c b/lustre/osc/osc_io.c index 0df758c..5fe0eb2 100644 --- a/lustre/osc/osc_io.c +++ b/lustre/osc/osc_io.c @@ -38,12 +38,14 @@ * Author: Nikita Danilov */ -/** \addtogroup osc osc @{ */ - #define DEBUG_SUBSYSTEM S_OSC #include "osc_cl_internal.h" +/** \addtogroup osc + * @{ + */ + /***************************************************************************** * * Type conversions. @@ -99,12 +101,6 @@ static void osc_io_unplug(const struct lu_env *env, struct osc_object *osc, } /** - * How many pages osc_io_submit() queues before checking whether an RPC is - * ready. - */ -#define OSC_QUEUE_GRAIN (32) - -/** * An implementation of cl_io_operations::cio_io_submit() method for osc * layer. Iterates over pages in the in-queue, prepares each for io by calling * cl_page_prep() and then either submits them through osc_io_submit_page() @@ -113,7 +109,8 @@ static void osc_io_unplug(const struct lu_env *env, struct osc_object *osc, */ static int osc_io_submit(const struct lu_env *env, const struct cl_io_slice *ios, - enum cl_req_type crt, struct cl_2queue *queue) + enum cl_req_type crt, struct cl_2queue *queue, + enum cl_req_priority priority) { struct cl_page *page; struct cl_page *tmp; @@ -148,12 +145,17 @@ static int osc_io_submit(const struct lu_env *env, osc = cl2osc(opg->ops_cl.cpl_obj); exp = osc_export(osc); + if (priority > CRP_NORMAL) { + cfs_spin_lock(&oap->oap_lock); + oap->oap_async_flags |= ASYNC_HP; + cfs_spin_unlock(&oap->oap_lock); + } /* * This can be checked without cli->cl_loi_list_lock, because * ->oap_*_item are always manipulated when the page is owned. */ - if (!list_empty(&oap->oap_urgent_item) || - !list_empty(&oap->oap_rpc_item)) { + if (!cfs_list_empty(&oap->oap_urgent_item) || + !cfs_list_empty(&oap->oap_rpc_item)) { result = -EBUSY; break; } @@ -169,7 +171,7 @@ static int osc_io_submit(const struct lu_env *env, result = cl_page_prep(env, io, page, crt); if (result == 0) { cl_page_list_move(qout, qin, page); - if (list_empty(&oap->oap_pending_item)) { + if (cfs_list_empty(&oap->oap_pending_item)) { osc_io_submit_page(env, cl2osc_io(env, ios), opg, crt); } else { @@ -177,9 +179,18 @@ static int osc_io_submit(const struct lu_env *env, osc->oo_oinfo, oap, OSC_FLAGS); - if (result != 0) - break; + /* + * bug 18881: we can't just break out here when + * error occurrs after cl_page_prep has been + * called against the page. The correct + * way is to call page's completion routine, + * as in osc_oap_interrupted. For simplicity, + * we just force osc_set_async_flags_base() to + * not return error. + */ + LASSERT(result == 0); } + opg->ops_submit_time = cfs_time_current(); } else { LASSERT(result < 0); if (result != -EALREADY) @@ -191,17 +202,18 @@ static int osc_io_submit(const struct lu_env *env, */ result = 0; } + /* - * Don't keep client_obd_list_lock() for too long. + * We might hold client_obd_list_lock() for too long and cause + * soft-lockups (see bug 16651). But on the other hand, pages + * are queued here with ASYNC_URGENT flag, thus will be sent + * out immediately once osc_io_unplug() be called, possibly + * resulting sub-optimal RPCs. * - * XXX lock_need_resched() should be used here, but it is not - * available in the older of supported kernels. + * We think creating optimal-sized RPCs is more important than + * avoiding the transient soft-lockups, plus I believe the + * soft-locks only happen in full debug testing. */ - if (queued > OSC_QUEUE_GRAIN || cfs_need_resched()) { - queued = 0; - osc_io_unplug(env, osc, cli); - cfs_cond_resched(); - } } LASSERT(ergo(result == 0, cli != NULL)); @@ -345,16 +357,19 @@ static int osc_io_fault_start(const struct lu_env *env, RETURN(0); } -static int osc_punch_upcall(void *a, int rc) +static int osc_setattr_upcall(void *a, int rc) { - struct osc_punch_cbargs *args = a; + struct osc_setattr_cbargs *args = a; args->opc_rc = rc; - complete(&args->opc_sync); + cfs_complete(&args->opc_sync); return 0; } -#ifdef __KERNEL__ +/* Disable osc_trunc_check() because it is naturally race between read and + * truncate. See bug 20645 for details. + */ +#if 0 && defined(__KERNEL__) /** * Checks that there are no pages being written in the extent being truncated. */ @@ -381,7 +396,7 @@ static void osc_trunc_check(const struct lu_env *env, struct cl_io *io, * XXX this is quite expensive check. */ cl_page_list_init(list); - cl_page_gang_lookup(env, clob, io, start + partial, CL_PAGE_EOF, list); + cl_page_gang_lookup(env, clob, io, start + partial, CL_PAGE_EOF, list, 0); cl_page_list_for_each(page, list) CL_PAGE_DEBUG(D_ERROR, env, page, "exists %lu\n", start); @@ -389,8 +404,9 @@ static void osc_trunc_check(const struct lu_env *env, struct cl_io *io, cl_page_list_disown(env, io, list); cl_page_list_fini(env, list); - spin_lock(&obj->oo_seatbelt); - list_for_each_entry(cp, &obj->oo_inflight[CRT_WRITE], ops_inflight) { + cfs_spin_lock(&obj->oo_seatbelt); + cfs_list_for_each_entry(cp, &obj->oo_inflight[CRT_WRITE], + ops_inflight) { page = cp->ops_cl.cpl_page; if (page->cp_index >= start + partial) { cfs_task_t *submitter; @@ -404,14 +420,14 @@ static void osc_trunc_check(const struct lu_env *env, struct cl_io *io, libcfs_debug_dumpstack(submitter); } } - spin_unlock(&obj->oo_seatbelt); + cfs_spin_unlock(&obj->oo_seatbelt); } #else /* __KERNEL__ */ # define osc_trunc_check(env, io, oio, size) do {;} while (0) #endif -static int osc_io_trunc_start(const struct lu_env *env, - const struct cl_io_slice *slice) +static int osc_io_setattr_start(const struct lu_env *env, + const struct cl_io_slice *slice) { struct cl_io *io = slice->cis_io; struct osc_io *oio = cl2osc_io(env, slice); @@ -419,115 +435,162 @@ static int osc_io_trunc_start(const struct lu_env *env, struct lov_oinfo *loi = cl2osc(obj)->oo_oinfo; struct cl_attr *attr = &osc_env_info(env)->oti_attr; struct obdo *oa = &oio->oi_oa; - struct osc_punch_cbargs *cbargs = &oio->oi_punch_cbarg; - struct obd_capa *capa; - loff_t size = io->u.ci_truncate.tr_size; + struct osc_setattr_cbargs *cbargs = &oio->oi_setattr_cbarg; + loff_t size = io->u.ci_setattr.sa_attr.lvb_size; + unsigned int ia_valid = io->u.ci_setattr.sa_valid; int result = 0; + struct obd_info oinfo = { { { 0 } } }; - - memset(oa, 0, sizeof(*oa)); - - osc_trunc_check(env, io, oio, size); + if (ia_valid & ATTR_SIZE) + osc_trunc_check(env, io, oio, size); if (oio->oi_lockless == 0) { cl_object_attr_lock(obj); result = cl_object_attr_get(env, obj, attr); if (result == 0) { - attr->cat_size = attr->cat_kms = size; - result = cl_object_attr_set(env, obj, attr, - CAT_SIZE|CAT_KMS); + unsigned int cl_valid = 0; + + if (ia_valid & ATTR_SIZE) { + attr->cat_size = attr->cat_kms = size; + cl_valid = (CAT_SIZE | CAT_KMS); + } + if (ia_valid & ATTR_MTIME_SET) { + attr->cat_mtime = io->u.ci_setattr.sa_attr.lvb_mtime; + cl_valid |= CAT_MTIME; + } + if (ia_valid & ATTR_ATIME_SET) { + attr->cat_atime = io->u.ci_setattr.sa_attr.lvb_atime; + cl_valid |= CAT_ATIME; + } + if (ia_valid & ATTR_CTIME_SET) { + attr->cat_ctime = io->u.ci_setattr.sa_attr.lvb_ctime; + cl_valid |= CAT_CTIME; + } + result = cl_object_attr_set(env, obj, attr, cl_valid); } cl_object_attr_unlock(obj); } - + memset(oa, 0, sizeof(*oa)); if (result == 0) { oa->o_id = loi->loi_id; - oa->o_gr = loi->loi_gr; + oa->o_seq = loi->loi_seq; oa->o_mtime = attr->cat_mtime; oa->o_atime = attr->cat_atime; oa->o_ctime = attr->cat_ctime; oa->o_valid = OBD_MD_FLID | OBD_MD_FLGROUP | OBD_MD_FLATIME | OBD_MD_FLCTIME | OBD_MD_FLMTIME; - if (oio->oi_lockless) { - oa->o_flags = OBD_FL_TRUNCLOCK; - oa->o_valid |= OBD_MD_FLFLAGS; + if (ia_valid & ATTR_SIZE) { + oa->o_size = size; + oa->o_blocks = OBD_OBJECT_EOF; + oa->o_valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS; + + if (oio->oi_lockless) { + oa->o_flags = OBD_FL_SRVLOCK; + oa->o_valid |= OBD_MD_FLFLAGS; + } + } else { + LASSERT(oio->oi_lockless == 0); } - oa->o_size = size; - oa->o_blocks = OBD_OBJECT_EOF; - oa->o_valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS; - - capa = io->u.ci_truncate.tr_capa; - init_completion(&cbargs->opc_sync); - result = osc_punch_base(osc_export(cl2osc(obj)), oa, capa, - osc_punch_upcall, cbargs, PTLRPCD_SET); + + oinfo.oi_oa = oa; + oinfo.oi_capa = io->u.ci_setattr.sa_capa; + cfs_init_completion(&cbargs->opc_sync); + + if (ia_valid & ATTR_SIZE) + result = osc_punch_base(osc_export(cl2osc(obj)), + &oinfo, osc_setattr_upcall, + cbargs, PTLRPCD_SET); + else + result = osc_setattr_async_base(osc_export(cl2osc(obj)), + &oinfo, NULL, + osc_setattr_upcall, + cbargs, PTLRPCD_SET); } return result; } -static void osc_io_trunc_end(const struct lu_env *env, - const struct cl_io_slice *slice) +static void osc_io_setattr_end(const struct lu_env *env, + const struct cl_io_slice *slice) { struct cl_io *io = slice->cis_io; struct osc_io *oio = cl2osc_io(env, slice); - struct osc_punch_cbargs *cbargs = &oio->oi_punch_cbarg; - struct obdo *oa = &oio->oi_oa; + struct osc_setattr_cbargs *cbargs = &oio->oi_setattr_cbarg; int result; - wait_for_completion(&cbargs->opc_sync); + cfs_wait_for_completion(&cbargs->opc_sync); result = io->ci_result = cbargs->opc_rc; if (result == 0) { struct cl_object *obj = slice->cis_obj; - if (oio->oi_lockless == 0) { - struct cl_attr *attr = &osc_env_info(env)->oti_attr; - int valid = 0; - - /* Update kms & size */ - if (oa->o_valid & OBD_MD_FLSIZE) { - attr->cat_size = oa->o_size; - attr->cat_kms = oa->o_size; - valid |= CAT_KMS|CAT_SIZE; - } - if (oa->o_valid & OBD_MD_FLBLOCKS) { - attr->cat_blocks = oa->o_blocks; - valid |= CAT_BLOCKS; - } - if (oa->o_valid & OBD_MD_FLMTIME) { - attr->cat_mtime = oa->o_mtime; - valid |= CAT_MTIME; - } - if (oa->o_valid & OBD_MD_FLCTIME) { - attr->cat_ctime = oa->o_ctime; - valid |= CAT_CTIME; - } - if (oa->o_valid & OBD_MD_FLATIME) { - attr->cat_atime = oa->o_atime; - valid |= CAT_ATIME; - } - cl_object_attr_lock(obj); - result = cl_object_attr_set(env, obj, attr, valid); - cl_object_attr_unlock(obj); - } else { /* lockless truncate */ + if (oio->oi_lockless) { + /* lockless truncate */ struct osc_device *osd = lu2osc_dev(obj->co_lu.lo_dev); + + LASSERT(cl_io_is_trunc(io)); /* XXX: Need a lock. */ osd->od_stats.os_lockless_truncates++; } } +} + +static int osc_io_read_start(const struct lu_env *env, + const struct cl_io_slice *slice) +{ + struct osc_io *oio = cl2osc_io(env, slice); + struct cl_object *obj = slice->cis_obj; + struct cl_attr *attr = &osc_env_info(env)->oti_attr; + int result = 0; + ENTRY; + + if (oio->oi_lockless == 0) { + cl_object_attr_lock(obj); + result = cl_object_attr_get(env, obj, attr); + if (result == 0) { + attr->cat_atime = LTIME_S(CFS_CURRENT_TIME); + result = cl_object_attr_set(env, obj, attr, + CAT_ATIME); + } + cl_object_attr_unlock(obj); + } + RETURN(result); +} + +static int osc_io_write_start(const struct lu_env *env, + const struct cl_io_slice *slice) +{ + struct osc_io *oio = cl2osc_io(env, slice); + struct cl_object *obj = slice->cis_obj; + struct cl_attr *attr = &osc_env_info(env)->oti_attr; + int result = 0; + ENTRY; - /* return result; */ + if (oio->oi_lockless == 0) { + cl_object_attr_lock(obj); + result = cl_object_attr_get(env, obj, attr); + if (result == 0) { + attr->cat_mtime = attr->cat_ctime = + LTIME_S(CFS_CURRENT_TIME); + result = cl_object_attr_set(env, obj, attr, + CAT_MTIME | CAT_CTIME); + } + cl_object_attr_unlock(obj); + } + RETURN(result); } static const struct cl_io_operations osc_io_ops = { .op = { [CIT_READ] = { + .cio_start = osc_io_read_start, .cio_fini = osc_io_fini }, [CIT_WRITE] = { + .cio_start = osc_io_write_start, .cio_fini = osc_io_fini }, - [CIT_TRUNC] = { - .cio_start = osc_io_trunc_start, - .cio_end = osc_io_trunc_end + [CIT_SETATTR] = { + .cio_start = osc_io_setattr_start, + .cio_end = osc_io_setattr_end }, [CIT_FAULT] = { .cio_fini = osc_io_fini, @@ -572,7 +635,7 @@ static void osc_req_completion(const struct lu_env *env, /** * Implementation of struct cl_req_operations::cro_attr_set() for osc - * layer. osc is responsible for struct obdo::o_id and struct obdo::o_gr + * layer. osc is responsible for struct obdo::o_id and struct obdo::o_seq * fields. */ static void osc_req_attr_set(const struct lu_env *env, @@ -595,20 +658,35 @@ static void osc_req_attr_set(const struct lu_env *env, oa->o_valid |= OBD_MD_FLID; } if (flags & OBD_MD_FLGROUP) { - oa->o_gr = oinfo->loi_gr; + oa->o_seq = oinfo->loi_seq; oa->o_valid |= OBD_MD_FLGROUP; } if (flags & OBD_MD_FLHANDLE) { clerq = slice->crs_req; - LASSERT(!list_empty(&clerq->crq_pages)); + LASSERT(!cfs_list_empty(&clerq->crq_pages)); apage = container_of(clerq->crq_pages.next, struct cl_page, cp_flight); opg = osc_cl_page_osc(apage); apage = opg->ops_cl.cpl_page; /* now apage is a sub-page */ lock = cl_lock_at_page(env, apage->cp_obj, apage, NULL, 1, 1); - LASSERT(lock != NULL); + if (lock == NULL) { + struct cl_object_header *head; + struct cl_lock *scan; + + head = cl_object_header(apage->cp_obj); + cfs_list_for_each_entry(scan, &head->coh_locks, + cll_linkage) + CL_LOCK_DEBUG(D_ERROR, env, scan, + "no cover page!\n"); + CL_PAGE_DEBUG(D_ERROR, env, apage, + "dump uncover page!\n"); + libcfs_debug_dumpstack(NULL); + LBUG(); + } + olck = osc_lock_at(lock); LASSERT(olck != NULL); + LASSERT(ergo(opg->ops_srvlock, olck->ols_lock == NULL)); /* check for lockless io. */ if (olck->ols_lock != NULL) { oa->o_handle = olck->ols_lock->l_remote_handle;