X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=blobdiff_plain;f=lustre%2Fobdecho%2Fecho_client.c;h=cd56684ce4a9d73090736b5d540c4042878debc3;hp=237c14659ebf3146d83fcb656ab8aa9dba48964c;hb=c965680bf2b891a5372b994e685848fd8f0e255a;hpb=90dfed8469d46f4b79ebdff8121eb109b5db6746 diff --git a/lustre/obdecho/echo_client.c b/lustre/obdecho/echo_client.c index 237c146..cd56684 100644 --- a/lustre/obdecho/echo_client.c +++ b/lustre/obdecho/echo_client.c @@ -26,8 +26,11 @@ * GPL HEADER END */ /* - * Copyright 2008 Sun Microsystems, Inc. All rights reserved + * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved. * Use is subject to license terms. + * + * Copyright (c) 2011 Whamcloud, Inc. + * */ /* * This file is part of Lustre, http://www.lustre.org/ @@ -82,6 +85,7 @@ struct echo_object_conf { struct echo_page { struct cl_page_slice ep_cl; + cfs_mutex_t ep_lock; cfs_page_t *ep_vmpage; }; @@ -169,8 +173,10 @@ struct echo_object_conf *cl2echo_conf(const struct cl_object_conf *c) static inline void lsm2fid(struct lov_stripe_md *lsm, struct lu_fid *fid) { fid_zero(fid); - fid->f_seq = lsm->lsm_object_gr << 16 | lsm->lsm_object_id >> 32; + fid->f_seq = FID_SEQ_ECHO; + /* truncated to 32 bits by assignment */ fid->f_oid = lsm->lsm_object_id; + fid->f_ver = lsm->lsm_object_id >> 32; } /** @} echo_helpers */ @@ -257,6 +263,29 @@ cfs_page_t *echo_page_vmpage(const struct lu_env *env, return cl2echo_page(slice)->ep_vmpage; } +static int echo_page_own(const struct lu_env *env, + const struct cl_page_slice *slice, + struct cl_io *io, int nonblock) +{ + struct echo_page *ep = cl2echo_page(slice); + + if (!nonblock) + cfs_mutex_lock(&ep->ep_lock); + else if (!cfs_mutex_trylock(&ep->ep_lock)) + return -EAGAIN; + return 0; +} + +static void echo_page_disown(const struct lu_env *env, + const struct cl_page_slice *slice, + struct cl_io *io) +{ + struct echo_page *ep = cl2echo_page(slice); + + LASSERT(cfs_mutex_is_locked(&ep->ep_lock)); + cfs_mutex_unlock(&ep->ep_lock); +} + static void echo_page_discard(const struct lu_env *env, const struct cl_page_slice *slice, struct cl_io *unused) @@ -267,7 +296,9 @@ static void echo_page_discard(const struct lu_env *env, static int echo_page_is_vmlocked(const struct lu_env *env, const struct cl_page_slice *slice) { - return 1; + if (cfs_mutex_is_locked(&cl2echo_page(slice)->ep_lock)) + return -EBUSY; + return -ENODATA; } static void echo_page_completion(const struct lu_env *env, @@ -304,12 +335,14 @@ static int echo_page_print(const struct lu_env *env, { struct echo_page *ep = cl2echo_page(slice); - (*printer)(env, cookie, LUSTRE_ECHO_CLIENT_NAME"-page@%p vm@%p\n", - ep, ep->ep_vmpage); + (*printer)(env, cookie, LUSTRE_ECHO_CLIENT_NAME"-page@%p %d vm@%p\n", + ep, cfs_mutex_is_locked(&ep->ep_lock), ep->ep_vmpage); return 0; } static const struct cl_page_operations echo_page_ops = { + .cpo_own = echo_page_own, + .cpo_disown = echo_page_disown, .cpo_discard = echo_page_discard, .cpo_vmpage = echo_page_vmpage, .cpo_fini = echo_page_fini, @@ -385,6 +418,7 @@ static struct cl_page *echo_page_init(const struct lu_env *env, struct echo_object *eco = cl2echo_obj(obj); ep->ep_vmpage = vmpage; page_cache_get(vmpage); + cfs_mutex_init(&ep->ep_lock); cl_page_slice_add(page, &ep->ep_cl, obj, &echo_page_ops); cfs_atomic_inc(&eco->eo_npages); } @@ -791,29 +825,8 @@ static struct lu_device *echo_device_free(const struct lu_env *env, struct echo_object *eco; struct lu_device *next = ed->ed_next; - CDEBUG(D_INFO, "echo device:%p is going to be freed, next = %p\n", ed, next); - - /* destroy locks */ - cfs_spin_lock(&ec->ec_lock); - while (!cfs_list_empty(&ec->ec_locks)) { - struct echo_lock *ecl = cfs_list_entry(ec->ec_locks.next, - struct echo_lock, - el_chain); - int still_used = 0; - - if (cfs_atomic_dec_and_test(&ecl->el_refcount)) - cfs_list_del_init(&ecl->el_chain); - else - still_used = 1; - cfs_spin_unlock(&ec->ec_lock); - - CERROR("echo client: pending lock %p refs %d\n", - ecl, cfs_atomic_read(&ecl->el_refcount)); - - echo_lock_release(env, ecl, still_used); - cfs_spin_lock(&ec->ec_lock); - } - cfs_spin_unlock(&ec->ec_lock); + CDEBUG(D_INFO, "echo device:%p is going to be freed, next = %p\n", + ed, next); LASSERT(ed->ed_site); lu_site_purge(env, &ed->ed_site->cs_lu, -1); @@ -846,6 +859,8 @@ static struct lu_device *echo_device_free(const struct lu_env *env, } cfs_spin_unlock(&ec->ec_lock); + LASSERT(cfs_list_empty(&ec->ec_locks)); + CDEBUG(D_INFO, "No object exists, exiting...\n"); echo_client_cleanup(d->ld_obd); @@ -923,7 +938,7 @@ static struct echo_object *cl_echo_object_find(struct echo_device *d, struct lov_oinfo *oinfo = lsm->lsm_oinfo[0]; LASSERT(oinfo != NULL); oinfo->loi_id = lsm->lsm_object_id; - oinfo->loi_gr = lsm->lsm_object_gr; + oinfo->loi_seq = lsm->lsm_object_seq; conf->eoc_cl.u.coc_oinfo = oinfo; } else { struct lustre_md *md; @@ -969,7 +984,6 @@ static int cl_echo_object_put(struct echo_object *eco) struct lu_object_header *loh = obj->co_lu.lo_header; LASSERT(&eco->eo_hdr == luh2coh(loh)); cfs_set_bit(LU_OBJECT_HEARD_BANSHEE, &loh->loh_flags); - cl_object_prune(env, obj); } cl_object_put(env, obj); @@ -1306,7 +1320,7 @@ static int echo_create_object(struct echo_device *ed, int on_target, if (lsm->lsm_stripe_size == 0) lsm->lsm_stripe_size = CFS_PAGE_SIZE; - idx = ll_rand(); + idx = cfs_rand(); /* setup stripes: indices + default ids if required */ for (i = 0; i < lsm->lsm_stripe_count; i++) { @@ -1329,7 +1343,7 @@ static int echo_create_object(struct echo_device *ed, int on_target, if (on_target) { /* Only echo objects are allowed to be created */ LASSERT((oa->o_valid & OBD_MD_FLGROUP) && - (oa->o_gr == FILTER_GROUP_ECHO)); + (oa->o_seq == FID_SEQ_ECHO)); rc = obd_create(ec->ec_exp, oa, &lsm, oti); if (rc != 0) { CERROR("Cannot create objects, rc = %d\n", rc); @@ -1382,9 +1396,9 @@ static int echo_get_object(struct echo_object **ecop, struct echo_device *ed, lsm->lsm_object_id = oa->o_id; if (oa->o_valid & OBD_MD_FLGROUP) - lsm->lsm_object_gr = oa->o_gr; + lsm->lsm_object_seq = oa->o_seq; else - lsm->lsm_object_gr = FILTER_GROUP_ECHO; + lsm->lsm_object_seq = FID_SEQ_ECHO; rc = 0; eco = cl_echo_object_find(ed, &lsm); @@ -1513,6 +1527,7 @@ static int echo_client_kbrw(struct echo_device *ed, int rw, struct obdo *oa, int rc; int verify; int gfp_mask; + int brw_flags = 0; ENTRY; verify = ((oa->o_id) != ECHO_PERSISTENT_OBJID && @@ -1532,6 +1547,9 @@ static int echo_client_kbrw(struct echo_device *ed, int rw, struct obdo *oa, /* XXX think again with misaligned I/O */ npages = count >> CFS_PAGE_SHIFT; + if (rw == OBD_BRW_WRITE) + brw_flags = OBD_BRW_ASYNC; + OBD_ALLOC(pga, npages * sizeof(*pga)); if (pga == NULL) RETURN(-ENOMEM); @@ -1556,7 +1574,7 @@ static int echo_client_kbrw(struct echo_device *ed, int rw, struct obdo *oa, pages[i] = pgp->pg; pgp->count = CFS_PAGE_SIZE; pgp->off = off; - pgp->flag = 0; + pgp->flag = brw_flags; if (verify) echo_client_page_debug_setup(lsm, pgp->pg, rw, @@ -1691,7 +1709,7 @@ static int echo_client_brw_ioctl(int rw, struct obd_export *exp, struct obd_device *obd = class_exp2obd(exp); struct echo_device *ed = obd2echo_dev(obd); struct echo_client_obd *ec = ed->ed_ec; - struct obd_trans_info dummy_oti = { .oti_thread = NULL }; + struct obd_trans_info dummy_oti = { 0 }; struct obdo *oa = &data->ioc_obdo1; struct echo_object *eco; int rc; @@ -1788,22 +1806,28 @@ echo_client_iocontrol(unsigned int cmd, struct obd_export *exp, struct obd_trans_info dummy_oti; struct oti_req_ack_lock *ack_lock; struct obdo *oa; + struct lu_fid fid; int rw = OBD_BRW_READ; int rc = 0; int i; ENTRY; +#ifndef HAVE_UNLOCKED_IOCTL cfs_unlock_kernel(); +#endif memset(&dummy_oti, 0, sizeof(dummy_oti)); oa = &data->ioc_obdo1; if (!(oa->o_valid & OBD_MD_FLGROUP)) { oa->o_valid |= OBD_MD_FLGROUP; - oa->o_gr = FILTER_GROUP_ECHO; + oa->o_seq = FID_SEQ_ECHO; } - /* assume we can touch filter native objects with echo device. */ - /* LASSERT(oa->o_gr == FILTER_GROUP_ECHO); */ + + /* This FID is unpacked just for validation at this point */ + rc = fid_ostid_unpack(&fid, &oa->o_oi, 0); + if (rc < 0) + RETURN(rc); switch (cmd) { case OBD_IOC_CREATE: /* may create echo object */ @@ -1921,7 +1945,9 @@ echo_client_iocontrol(unsigned int cmd, struct obd_export *exp, ldlm_lock_decref(&ack_lock->lock, ack_lock->mode); } +#ifndef HAVE_UNLOCKED_IOCTL cfs_lock_kernel(); +#endif return rc; } @@ -1961,9 +1987,10 @@ static int echo_client_setup(struct obd_device *obddev, struct lustre_cfg *lcfg) } ocd->ocd_connect_flags = OBD_CONNECT_VERSION | OBD_CONNECT_REQPORTAL | - OBD_CONNECT_GRANT; + OBD_CONNECT_GRANT | OBD_CONNECT_FULL20 | + OBD_CONNECT_64BITHASH; ocd->ocd_version = LUSTRE_VERSION_CODE; - ocd->ocd_group = FILTER_GROUP_ECHO; + ocd->ocd_group = FID_SEQ_ECHO; rc = obd_connect(NULL, &ec->ec_exp, tgt, &echo_uuid, ocd, NULL); if (rc == 0) { @@ -2079,10 +2106,16 @@ int echo_client_init(void) int rc; lprocfs_echo_init_vars(&lvars); - rc = class_register_type(&echo_obd_ops, NULL, lvars.module_vars, - LUSTRE_ECHO_CLIENT_NAME, &echo_device_type); - if (rc == 0) - lu_kmem_init(echo_caches); + + rc = lu_kmem_init(echo_caches); + if (rc == 0) { + rc = class_register_type(&echo_obd_ops, NULL, + lvars.module_vars, + LUSTRE_ECHO_CLIENT_NAME, + &echo_device_type); + if (rc) + lu_kmem_fini(echo_caches); + } return rc; }