X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=blobdiff_plain;f=lustre%2Fosc%2Fosc_io.c;h=d4eb1e31576799daf9610259c78ad4d0b0f9ecf0;hp=a9bad93fa0e344b4ad4761d59e93981715395614;hb=65e067d5d90270d4237a7271008561a4b432b94d;hpb=4fcbd1af9ec3b1e5f6424d925f43f0cb2910c3ec diff --git a/lustre/osc/osc_io.c b/lustre/osc/osc_io.c index a9bad93..d4eb1e3 100644 --- a/lustre/osc/osc_io.c +++ b/lustre/osc/osc_io.c @@ -27,7 +27,7 @@ * Copyright (c) 2008, 2010, Oracle and/or its affiliates. All rights reserved. * Use is subject to license terms. * - * Copyright (c) 2011, 2013, Intel Corporation. + * Copyright (c) 2011, 2014, Intel Corporation. */ /* * This file is part of Lustre, http://www.lustre.org/ @@ -67,14 +67,18 @@ static struct osc_io *cl2osc_io(const struct lu_env *env, return oio; } -static struct osc_page *osc_cl_page_osc(struct cl_page *page) +static struct osc_page *osc_cl_page_osc(struct cl_page *page, + struct osc_object *osc) { - const struct cl_page_slice *slice; + const struct cl_page_slice *slice; - slice = cl_page_at(page, &osc_device_type); - LASSERT(slice != NULL); + if (osc != NULL) + slice = cl_object_page_slice(&osc->oo_cl, page); + else + slice = cl_page_at(page, &osc_device_type); + LASSERT(slice != NULL); - return cl2osc_page(slice); + return cl2osc_page(slice); } @@ -99,21 +103,21 @@ static int osc_io_submit(const struct lu_env *env, const struct cl_io_slice *ios, enum cl_req_type crt, struct cl_2queue *queue) { - struct cl_page *page; - struct cl_page *tmp; - struct client_obd *cli = NULL; - struct osc_object *osc = NULL; /* to keep gcc happy */ - struct osc_page *opg; - struct cl_io *io; - CFS_LIST_HEAD (list); + struct cl_page *page; + struct cl_page *tmp; + struct client_obd *cli = NULL; + struct osc_object *osc = NULL; /* to keep gcc happy */ + struct osc_page *opg; + struct cl_io *io; + struct list_head list = LIST_HEAD_INIT(list); struct cl_page_list *qin = &queue->c2_qin; struct cl_page_list *qout = &queue->c2_qout; - int queued = 0; + unsigned int queued = 0; int result = 0; int cmd; int brw_flags; - int max_pages; + unsigned int max_pages; LASSERT(qin->pl_nr > 0); @@ -137,12 +141,12 @@ static int osc_io_submit(const struct lu_env *env, io = page->cp_owner; LASSERT(io != NULL); - opg = osc_cl_page_osc(page); - oap = &opg->ops_oap; + opg = osc_cl_page_osc(page, osc); + oap = &opg->ops_oap; LASSERT(osc == oap->oap_obj); - if (!cfs_list_empty(&oap->oap_pending_item) || - !cfs_list_empty(&oap->oap_rpc_item)) { + if (!list_empty(&oap->oap_pending_item) || + !list_empty(&oap->oap_rpc_item)) { CDEBUG(D_CACHE, "Busy oap %p page %p for submit.\n", oap, opg); result = -EBUSY; @@ -163,14 +167,19 @@ static int osc_io_submit(const struct lu_env *env, continue; } - cl_page_list_move(qout, qin, page); spin_lock(&oap->oap_lock); oap->oap_async_flags = ASYNC_URGENT|ASYNC_READY; oap->oap_async_flags |= ASYNC_COUNT_STABLE; spin_unlock(&oap->oap_lock); osc_page_submit(env, opg, crt, brw_flags); - cfs_list_add_tail(&oap->oap_pending_item, &list); + list_add_tail(&oap->oap_pending_item, &list); + + if (page->cp_sync_io != NULL) + cl_page_list_move(qout, qin, page); + else /* async IO */ + cl_page_list_del(env, qin, page); + if (++queued == max_pages) { queued = 0; result = osc_queue_sync_pages(env, osc, &list, cmd, @@ -195,7 +204,7 @@ static int osc_io_submit(const struct lu_env *env, * Expand stripe KMS if necessary. */ static void osc_page_touch_at(const struct lu_env *env, - struct cl_object *obj, pgoff_t idx, unsigned to) + struct cl_object *obj, pgoff_t idx, size_t to) { struct lov_oinfo *loi = cl2osc(obj)->oo_oinfo; struct cl_attr *attr = &osc_env_info(env)->oti_attr; @@ -219,16 +228,16 @@ static void osc_page_touch_at(const struct lu_env *env, attr->cat_mtime = attr->cat_ctime = LTIME_S(CFS_CURRENT_TIME); valid = CAT_MTIME | CAT_CTIME; - if (kms > loi->loi_kms) { - attr->cat_kms = kms; - valid |= CAT_KMS; - } - if (kms > loi->loi_lvb.lvb_size) { - attr->cat_size = kms; - valid |= CAT_SIZE; - } - cl_object_attr_set(env, obj, attr, valid); - cl_object_attr_unlock(obj); + if (kms > loi->loi_kms) { + attr->cat_kms = kms; + valid |= CAT_KMS; + } + if (kms > loi->loi_lvb.lvb_size) { + attr->cat_size = kms; + valid |= CAT_SIZE; + } + cl_object_attr_update(env, obj, attr, valid); + cl_object_attr_unlock(obj); } static int osc_io_commit_async(const struct lu_env *env, @@ -261,18 +270,14 @@ static int osc_io_commit_async(const struct lu_env *env, } } - /* - * NOTE: here @page is a top-level page. This is done to avoid - * creation of sub-page-list. - */ while (qin->pl_nr > 0) { struct osc_async_page *oap; page = cl_page_list_first(qin); - opg = osc_cl_page_osc(page); + opg = osc_cl_page_osc(page, osc); oap = &opg->ops_oap; - if (!cfs_list_empty(&oap->oap_rpc_item)) { + if (!list_empty(&oap->oap_rpc_item)) { CDEBUG(D_CACHE, "Busy oap %p page %p for submit.\n", oap, opg); result = -EBUSY; @@ -280,14 +285,13 @@ static int osc_io_commit_async(const struct lu_env *env, } /* The page may be already in dirty cache. */ - if (cfs_list_empty(&oap->oap_pending_item)) { + if (list_empty(&oap->oap_pending_item)) { result = osc_page_cache_add(env, &opg->ops_cl, io); if (result != 0) break; } - osc_page_touch_at(env, osc2cl(osc), - opg->ops_cl.cpl_page->cp_index, + osc_page_touch_at(env, osc2cl(osc), osc_index(opg), page == last_page ? to : PAGE_SIZE); cl_page_list_del(env, qin, page); @@ -317,8 +321,8 @@ static int osc_io_rw_iter_init(const struct lu_env *env, struct osc_object *osc = cl2osc(ios->cis_obj); struct client_obd *cli = osc_cli(osc); unsigned long c; - unsigned int npages; - unsigned int max_pages; + unsigned long npages; + unsigned long max_pages; ENTRY; if (cl_io_is_append(io)) @@ -332,15 +336,15 @@ static int osc_io_rw_iter_init(const struct lu_env *env, if (npages > max_pages) npages = max_pages; - c = cfs_atomic_read(cli->cl_lru_left); + c = atomic_long_read(cli->cl_lru_left); if (c < npages && osc_lru_reclaim(cli) > 0) - c = cfs_atomic_read(cli->cl_lru_left); + c = atomic_long_read(cli->cl_lru_left); while (c >= npages) { - if (c == cfs_atomic_cmpxchg(cli->cl_lru_left, c, c - npages)) { + if (c == atomic_long_cmpxchg(cli->cl_lru_left, c, c - npages)) { oio->oi_lru_reserved = npages; break; } - c = cfs_atomic_read(cli->cl_lru_left); + c = atomic_long_read(cli->cl_lru_left); } RETURN(0); @@ -354,32 +358,32 @@ static void osc_io_rw_iter_fini(const struct lu_env *env, struct client_obd *cli = osc_cli(osc); if (oio->oi_lru_reserved > 0) { - cfs_atomic_add(oio->oi_lru_reserved, cli->cl_lru_left); + atomic_long_add(oio->oi_lru_reserved, cli->cl_lru_left); oio->oi_lru_reserved = 0; } + oio->oi_write_osclock = NULL; } static int osc_io_fault_start(const struct lu_env *env, - const struct cl_io_slice *ios) + const struct cl_io_slice *ios) { - struct cl_io *io; - struct cl_fault_io *fio; - - ENTRY; + struct cl_io *io; + struct cl_fault_io *fio; + ENTRY; - io = ios->cis_io; - fio = &io->u.ci_fault; - CDEBUG(D_INFO, "%lu %d %d\n", - fio->ft_index, fio->ft_writable, fio->ft_nob); - /* - * If mapping is writeable, adjust kms to cover this page, - * but do not extend kms beyond actual file size. - * See bug 10919. - */ - if (fio->ft_writable) - osc_page_touch_at(env, ios->cis_obj, - fio->ft_index, fio->ft_nob); - RETURN(0); + io = ios->cis_io; + fio = &io->u.ci_fault; + CDEBUG(D_INFO, "%lu %d %zu\n", + fio->ft_index, fio->ft_writable, fio->ft_nob); + /* + * If mapping is writeable, adjust kms to cover this page, + * but do not extend kms beyond actual file size. + * See bug 10919. + */ + if (fio->ft_writable) + osc_page_touch_at(env, ios->cis_obj, + fio->ft_index, fio->ft_nob); + RETURN(0); } static int osc_async_upcall(void *a, int rc) @@ -391,7 +395,6 @@ static int osc_async_upcall(void *a, int rc) return 0; } -#if defined(__KERNEL__) /** * Checks that there are no pages being written in the extent being truncated. */ @@ -404,19 +407,13 @@ static int trunc_check_cb(const struct lu_env *env, struct cl_io *io, oap = &ops->ops_oap; if (oap->oap_cmd & OBD_BRW_WRITE && - !cfs_list_empty(&oap->oap_pending_item)) + !list_empty(&oap->oap_pending_item)) CL_PAGE_DEBUG(D_ERROR, env, page, "exists " LPU64 "/%s.\n", start, current->comm); -#ifdef __linux__ - { - struct page *vmpage = cl_page_vmpage(env, page); - if (PageLocked(vmpage)) - CDEBUG(D_CACHE, "page %p index %lu locked for %d.\n", - ops, page->cp_index, - (oap->oap_cmd & OBD_BRW_RWMASK)); - } -#endif + if (PageLocked(page->cp_vmpage)) + CDEBUG(D_CACHE, "page %p index %lu locked for %d.\n", + ops, osc_index(ops), oap->oap_cmd & OBD_BRW_RWMASK); return CLP_GANG_OKAY; } @@ -439,13 +436,6 @@ static void osc_trunc_check(const struct lu_env *env, struct cl_io *io, start + partial, CL_PAGE_EOF, trunc_check_cb, (void *)&size); } -#else /* __KERNEL__ */ -static void osc_trunc_check(const struct lu_env *env, struct cl_io *io, - struct osc_io *oio, __u64 size) -{ - return; -} -#endif static int osc_io_setattr_start(const struct lu_env *env, const struct cl_io_slice *slice) @@ -487,14 +477,15 @@ static int osc_io_setattr_start(const struct lu_env *env, } if (ia_valid & ATTR_CTIME_SET) { attr->cat_ctime = lvb->lvb_ctime; - cl_valid |= CAT_CTIME; - } - result = cl_object_attr_set(env, obj, attr, cl_valid); - } - cl_object_attr_unlock(obj); - } - memset(oa, 0, sizeof(*oa)); - if (result == 0) { + cl_valid |= CAT_CTIME; + } + result = cl_object_attr_update(env, obj, attr, + cl_valid); + } + cl_object_attr_unlock(obj); + } + memset(oa, 0, sizeof(*oa)); + if (result == 0) { oa->o_oi = loi->loi_oi; oa->o_mtime = attr->cat_mtime; oa->o_atime = attr->cat_atime; @@ -569,16 +560,15 @@ static void osc_io_setattr_end(const struct lu_env *env, static int osc_io_read_start(const struct lu_env *env, const struct cl_io_slice *slice) { - struct osc_io *oio = cl2osc_io(env, slice); struct cl_object *obj = slice->cis_obj; struct cl_attr *attr = &osc_env_info(env)->oti_attr; int rc = 0; ENTRY; - if (oio->oi_lockless == 0 && !slice->cis_io->ci_noatime) { + if (!slice->cis_io->ci_noatime) { cl_object_attr_lock(obj); attr->cat_atime = LTIME_S(CFS_CURRENT_TIME); - rc = cl_object_attr_set(env, obj, attr, CAT_ATIME); + rc = cl_object_attr_update(env, obj, attr, CAT_ATIME); cl_object_attr_unlock(obj); } @@ -588,25 +578,18 @@ static int osc_io_read_start(const struct lu_env *env, static int osc_io_write_start(const struct lu_env *env, const struct cl_io_slice *slice) { - struct osc_io *oio = cl2osc_io(env, slice); - struct cl_object *obj = slice->cis_obj; - struct cl_attr *attr = &osc_env_info(env)->oti_attr; - int result = 0; - ENTRY; - - if (oio->oi_lockless == 0) { - OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_DELAY_SETTIME, 1); - cl_object_attr_lock(obj); - result = cl_object_attr_get(env, obj, attr); - if (result == 0) { - attr->cat_mtime = attr->cat_ctime = - LTIME_S(CFS_CURRENT_TIME); - result = cl_object_attr_set(env, obj, attr, - CAT_MTIME | CAT_CTIME); - } - cl_object_attr_unlock(obj); - } - RETURN(result); + struct cl_object *obj = slice->cis_obj; + struct cl_attr *attr = &osc_env_info(env)->oti_attr; + int rc = 0; + ENTRY; + + OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_DELAY_SETTIME, 1); + cl_object_attr_lock(obj); + attr->cat_mtime = attr->cat_ctime = LTIME_S(CFS_CURRENT_TIME); + rc = cl_object_attr_update(env, obj, attr, CAT_MTIME | CAT_CTIME); + cl_object_attr_unlock(obj); + + RETURN(rc); } static int osc_fsync_ost(const struct lu_env *env, struct osc_object *obj, @@ -783,8 +766,7 @@ static void osc_req_attr_set(const struct lu_env *env, struct lov_oinfo *oinfo; struct cl_req *clerq; struct cl_page *apage; /* _some_ page in @clerq */ - struct cl_lock *lock; /* _some_ lock protecting @apage */ - struct osc_lock *olck; + struct ldlm_lock *lock; /* _some_ lock protecting @apage */ struct osc_page *opg; struct obdo *oa; struct ost_lvb *lvb; @@ -814,38 +796,37 @@ static void osc_req_attr_set(const struct lu_env *env, oa->o_valid |= OBD_MD_FLID; } if (flags & OBD_MD_FLHANDLE) { - clerq = slice->crs_req; - LASSERT(!cfs_list_empty(&clerq->crq_pages)); - apage = container_of(clerq->crq_pages.next, - struct cl_page, cp_flight); - opg = osc_cl_page_osc(apage); - apage = opg->ops_cl.cpl_page; /* now apage is a sub-page */ - lock = cl_lock_at_page(env, apage->cp_obj, apage, NULL, 1, 1); - if (lock == NULL) { - struct cl_object_header *head; - struct cl_lock *scan; - - head = cl_object_header(apage->cp_obj); - cfs_list_for_each_entry(scan, &head->coh_locks, - cll_linkage) - CL_LOCK_DEBUG(D_ERROR, env, scan, - "no cover page!\n"); - CL_PAGE_DEBUG(D_ERROR, env, apage, - "dump uncover page!\n"); - libcfs_debug_dumpstack(NULL); - LBUG(); - } + clerq = slice->crs_req; + LASSERT(!list_empty(&clerq->crq_pages)); + apage = container_of(clerq->crq_pages.next, + struct cl_page, cp_flight); + opg = osc_cl_page_osc(apage, NULL); + lock = osc_dlmlock_at_pgoff(env, cl2osc(obj), osc_index(opg), + 1, 1); + if (lock == NULL && !opg->ops_srvlock) { + struct ldlm_resource *res; + struct ldlm_res_id *resname; + + CL_PAGE_DEBUG(D_ERROR, env, apage, "uncovered page!\n"); + + resname = &osc_env_info(env)->oti_resname; + ostid_build_res_name(&oinfo->loi_oi, resname); + res = ldlm_resource_get( + osc_export(cl2osc(obj))->exp_obd->obd_namespace, + NULL, resname, LDLM_EXTENT, 0); + ldlm_resource_dump(D_ERROR, res); + + libcfs_debug_dumpstack(NULL); + LBUG(); + } - olck = osc_lock_at(lock); - LASSERT(olck != NULL); - LASSERT(ergo(opg->ops_srvlock, olck->ols_lock == NULL)); - /* check for lockless io. */ - if (olck->ols_lock != NULL) { - oa->o_handle = olck->ols_lock->l_remote_handle; - oa->o_valid |= OBD_MD_FLHANDLE; - } - cl_lock_put(env, lock); - } + /* check for lockless io. */ + if (lock != NULL) { + oa->o_handle = lock->l_remote_handle; + oa->o_valid |= OBD_MD_FLHANDLE; + LDLM_LOCK_PUT(lock); + } + } } static const struct cl_req_operations osc_req_ops = { @@ -866,18 +847,18 @@ int osc_io_init(const struct lu_env *env, } int osc_req_init(const struct lu_env *env, struct cl_device *dev, - struct cl_req *req) + struct cl_req *req) { - struct osc_req *or; - int result; - - OBD_SLAB_ALLOC_PTR_GFP(or, osc_req_kmem, __GFP_IO); - if (or != NULL) { - cl_req_slice_add(req, &or->or_cl, dev, &osc_req_ops); - result = 0; - } else - result = -ENOMEM; - return result; + struct osc_req *or; + int result; + + OBD_SLAB_ALLOC_PTR_GFP(or, osc_req_kmem, GFP_NOFS); + if (or != NULL) { + cl_req_slice_add(req, &or->or_cl, dev, &osc_req_ops); + result = 0; + } else + result = -ENOMEM; + return result; } /** @} osc */