Whamcloud - gitweb
LU-834 echo_client: fix page_is_vmlocked
[fs/lustre-release.git] / lustre / obdecho / echo_client.c
index 237c146..cd56684 100644 (file)
  * GPL HEADER END
  */
 /*
- * Copyright  2008 Sun Microsystems, Inc. All rights reserved
+ * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
  * Use is subject to license terms.
+ *
+ * Copyright (c) 2011 Whamcloud, Inc.
+ *
  */
 /*
  * This file is part of Lustre, http://www.lustre.org/
@@ -82,6 +85,7 @@ struct echo_object_conf {
 
 struct echo_page {
         struct cl_page_slice   ep_cl;
+        cfs_mutex_t            ep_lock;
         cfs_page_t            *ep_vmpage;
 };
 
@@ -169,8 +173,10 @@ struct echo_object_conf *cl2echo_conf(const struct cl_object_conf *c)
 static inline void lsm2fid(struct lov_stripe_md *lsm, struct lu_fid *fid)
 {
         fid_zero(fid);
-        fid->f_seq = lsm->lsm_object_gr << 16 | lsm->lsm_object_id >> 32;
+        fid->f_seq = FID_SEQ_ECHO;
+        /* truncated to 32 bits by assignment */
         fid->f_oid = lsm->lsm_object_id;
+        fid->f_ver = lsm->lsm_object_id >> 32;
 }
 /** @} echo_helpers */
 
@@ -257,6 +263,29 @@ cfs_page_t *echo_page_vmpage(const struct lu_env *env,
         return cl2echo_page(slice)->ep_vmpage;
 }
 
+static int echo_page_own(const struct lu_env *env,
+                         const struct cl_page_slice *slice,
+                         struct cl_io *io, int nonblock)
+{
+        struct echo_page *ep = cl2echo_page(slice);
+
+        if (!nonblock)
+                cfs_mutex_lock(&ep->ep_lock);
+        else if (!cfs_mutex_trylock(&ep->ep_lock))
+                return -EAGAIN;
+        return 0;
+}
+
+static void echo_page_disown(const struct lu_env *env,
+                             const struct cl_page_slice *slice,
+                             struct cl_io *io)
+{
+        struct echo_page *ep = cl2echo_page(slice);
+
+        LASSERT(cfs_mutex_is_locked(&ep->ep_lock));
+        cfs_mutex_unlock(&ep->ep_lock);
+}
+
 static void echo_page_discard(const struct lu_env *env,
                               const struct cl_page_slice *slice,
                               struct cl_io *unused)
@@ -267,7 +296,9 @@ static void echo_page_discard(const struct lu_env *env,
 static int echo_page_is_vmlocked(const struct lu_env *env,
                                  const struct cl_page_slice *slice)
 {
-        return 1;
+        if (cfs_mutex_is_locked(&cl2echo_page(slice)->ep_lock))
+                return -EBUSY;
+        return -ENODATA;
 }
 
 static void echo_page_completion(const struct lu_env *env,
@@ -304,12 +335,14 @@ static int echo_page_print(const struct lu_env *env,
 {
         struct echo_page *ep = cl2echo_page(slice);
 
-        (*printer)(env, cookie, LUSTRE_ECHO_CLIENT_NAME"-page@%p vm@%p\n",
-                   ep, ep->ep_vmpage);
+        (*printer)(env, cookie, LUSTRE_ECHO_CLIENT_NAME"-page@%p %d vm@%p\n",
+                   ep, cfs_mutex_is_locked(&ep->ep_lock), ep->ep_vmpage);
         return 0;
 }
 
 static const struct cl_page_operations echo_page_ops = {
+        .cpo_own           = echo_page_own,
+        .cpo_disown        = echo_page_disown,
         .cpo_discard       = echo_page_discard,
         .cpo_vmpage        = echo_page_vmpage,
         .cpo_fini          = echo_page_fini,
@@ -385,6 +418,7 @@ static struct cl_page *echo_page_init(const struct lu_env *env,
                 struct echo_object *eco = cl2echo_obj(obj);
                 ep->ep_vmpage = vmpage;
                 page_cache_get(vmpage);
+                cfs_mutex_init(&ep->ep_lock);
                 cl_page_slice_add(page, &ep->ep_cl, obj, &echo_page_ops);
                 cfs_atomic_inc(&eco->eo_npages);
         }
@@ -791,29 +825,8 @@ static struct lu_device *echo_device_free(const struct lu_env *env,
         struct echo_object     *eco;
         struct lu_device       *next = ed->ed_next;
 
-        CDEBUG(D_INFO, "echo device:%p is going to be freed, next = %p\n", ed, next);
-
-        /* destroy locks */
-        cfs_spin_lock(&ec->ec_lock);
-        while (!cfs_list_empty(&ec->ec_locks)) {
-                struct echo_lock *ecl = cfs_list_entry(ec->ec_locks.next,
-                                                       struct echo_lock,
-                                                       el_chain);
-                int still_used = 0;
-
-                if (cfs_atomic_dec_and_test(&ecl->el_refcount))
-                        cfs_list_del_init(&ecl->el_chain);
-                else
-                        still_used = 1;
-                cfs_spin_unlock(&ec->ec_lock);
-
-                CERROR("echo client: pending lock %p refs %d\n",
-                       ecl, cfs_atomic_read(&ecl->el_refcount));
-
-                echo_lock_release(env, ecl, still_used);
-                cfs_spin_lock(&ec->ec_lock);
-        }
-        cfs_spin_unlock(&ec->ec_lock);
+        CDEBUG(D_INFO, "echo device:%p is going to be freed, next = %p\n",
+               ed, next);
 
         LASSERT(ed->ed_site);
         lu_site_purge(env, &ed->ed_site->cs_lu, -1);
@@ -846,6 +859,8 @@ static struct lu_device *echo_device_free(const struct lu_env *env,
         }
         cfs_spin_unlock(&ec->ec_lock);
 
+        LASSERT(cfs_list_empty(&ec->ec_locks));
+
         CDEBUG(D_INFO, "No object exists, exiting...\n");
 
         echo_client_cleanup(d->ld_obd);
@@ -923,7 +938,7 @@ static struct echo_object *cl_echo_object_find(struct echo_device *d,
                         struct lov_oinfo *oinfo = lsm->lsm_oinfo[0];
                         LASSERT(oinfo != NULL);
                         oinfo->loi_id = lsm->lsm_object_id;
-                        oinfo->loi_gr = lsm->lsm_object_gr;
+                        oinfo->loi_seq = lsm->lsm_object_seq;
                         conf->eoc_cl.u.coc_oinfo = oinfo;
                 } else {
                         struct lustre_md *md;
@@ -969,7 +984,6 @@ static int cl_echo_object_put(struct echo_object *eco)
                 struct lu_object_header *loh = obj->co_lu.lo_header;
                 LASSERT(&eco->eo_hdr == luh2coh(loh));
                 cfs_set_bit(LU_OBJECT_HEARD_BANSHEE, &loh->loh_flags);
-                cl_object_prune(env, obj);
         }
 
         cl_object_put(env, obj);
@@ -1306,7 +1320,7 @@ static int echo_create_object(struct echo_device *ed, int on_target,
                 if (lsm->lsm_stripe_size == 0)
                         lsm->lsm_stripe_size = CFS_PAGE_SIZE;
 
-                idx = ll_rand();
+                idx = cfs_rand();
 
                 /* setup stripes: indices + default ids if required */
                 for (i = 0; i < lsm->lsm_stripe_count; i++) {
@@ -1329,7 +1343,7 @@ static int echo_create_object(struct echo_device *ed, int on_target,
         if (on_target) {
                 /* Only echo objects are allowed to be created */
                 LASSERT((oa->o_valid & OBD_MD_FLGROUP) &&
-                        (oa->o_gr == FILTER_GROUP_ECHO));
+                        (oa->o_seq == FID_SEQ_ECHO));
                 rc = obd_create(ec->ec_exp, oa, &lsm, oti);
                 if (rc != 0) {
                         CERROR("Cannot create objects, rc = %d\n", rc);
@@ -1382,9 +1396,9 @@ static int echo_get_object(struct echo_object **ecop, struct echo_device *ed,
 
         lsm->lsm_object_id = oa->o_id;
         if (oa->o_valid & OBD_MD_FLGROUP)
-                lsm->lsm_object_gr = oa->o_gr;
+                lsm->lsm_object_seq = oa->o_seq;
         else
-                lsm->lsm_object_gr = FILTER_GROUP_ECHO;
+                lsm->lsm_object_seq = FID_SEQ_ECHO;
 
         rc = 0;
         eco = cl_echo_object_find(ed, &lsm);
@@ -1513,6 +1527,7 @@ static int echo_client_kbrw(struct echo_device *ed, int rw, struct obdo *oa,
         int                     rc;
         int                     verify;
         int                     gfp_mask;
+        int                     brw_flags = 0;
         ENTRY;
 
         verify = ((oa->o_id) != ECHO_PERSISTENT_OBJID &&
@@ -1532,6 +1547,9 @@ static int echo_client_kbrw(struct echo_device *ed, int rw, struct obdo *oa,
         /* XXX think again with misaligned I/O */
         npages = count >> CFS_PAGE_SHIFT;
 
+        if (rw == OBD_BRW_WRITE)
+                brw_flags = OBD_BRW_ASYNC;
+
         OBD_ALLOC(pga, npages * sizeof(*pga));
         if (pga == NULL)
                 RETURN(-ENOMEM);
@@ -1556,7 +1574,7 @@ static int echo_client_kbrw(struct echo_device *ed, int rw, struct obdo *oa,
                 pages[i] = pgp->pg;
                 pgp->count = CFS_PAGE_SIZE;
                 pgp->off = off;
-                pgp->flag = 0;
+                pgp->flag = brw_flags;
 
                 if (verify)
                         echo_client_page_debug_setup(lsm, pgp->pg, rw,
@@ -1691,7 +1709,7 @@ static int echo_client_brw_ioctl(int rw, struct obd_export *exp,
         struct obd_device *obd = class_exp2obd(exp);
         struct echo_device *ed = obd2echo_dev(obd);
         struct echo_client_obd *ec = ed->ed_ec;
-        struct obd_trans_info dummy_oti = { .oti_thread = NULL };
+        struct obd_trans_info dummy_oti = { 0 };
         struct obdo *oa = &data->ioc_obdo1;
         struct echo_object *eco;
         int rc;
@@ -1788,22 +1806,28 @@ echo_client_iocontrol(unsigned int cmd, struct obd_export *exp,
         struct obd_trans_info   dummy_oti;
         struct oti_req_ack_lock *ack_lock;
         struct obdo            *oa;
+        struct lu_fid           fid;
         int                     rw = OBD_BRW_READ;
         int                     rc = 0;
         int                     i;
         ENTRY;
 
+#ifndef HAVE_UNLOCKED_IOCTL
         cfs_unlock_kernel();
+#endif
 
         memset(&dummy_oti, 0, sizeof(dummy_oti));
 
         oa = &data->ioc_obdo1;
         if (!(oa->o_valid & OBD_MD_FLGROUP)) {
                 oa->o_valid |= OBD_MD_FLGROUP;
-                oa->o_gr = FILTER_GROUP_ECHO;
+                oa->o_seq = FID_SEQ_ECHO;
         }
-        /* assume we can touch filter native objects with echo device. */
-        /* LASSERT(oa->o_gr == FILTER_GROUP_ECHO); */
+
+        /* This FID is unpacked just for validation at this point */
+        rc = fid_ostid_unpack(&fid, &oa->o_oi, 0);
+        if (rc < 0)
+                RETURN(rc);
 
         switch (cmd) {
         case OBD_IOC_CREATE:                    /* may create echo object */
@@ -1921,7 +1945,9 @@ echo_client_iocontrol(unsigned int cmd, struct obd_export *exp,
                 ldlm_lock_decref(&ack_lock->lock, ack_lock->mode);
         }
 
+#ifndef HAVE_UNLOCKED_IOCTL
         cfs_lock_kernel();
+#endif
 
         return rc;
 }
@@ -1961,9 +1987,10 @@ static int echo_client_setup(struct obd_device *obddev, struct lustre_cfg *lcfg)
         }
 
         ocd->ocd_connect_flags = OBD_CONNECT_VERSION | OBD_CONNECT_REQPORTAL |
-                                 OBD_CONNECT_GRANT;
+                                 OBD_CONNECT_GRANT | OBD_CONNECT_FULL20 |
+                                 OBD_CONNECT_64BITHASH;
         ocd->ocd_version = LUSTRE_VERSION_CODE;
-        ocd->ocd_group = FILTER_GROUP_ECHO;
+        ocd->ocd_group = FID_SEQ_ECHO;
 
         rc = obd_connect(NULL, &ec->ec_exp, tgt, &echo_uuid, ocd, NULL);
         if (rc == 0) {
@@ -2079,10 +2106,16 @@ int echo_client_init(void)
         int rc;
 
         lprocfs_echo_init_vars(&lvars);
-        rc = class_register_type(&echo_obd_ops, NULL, lvars.module_vars,
-                                 LUSTRE_ECHO_CLIENT_NAME, &echo_device_type);
-        if (rc == 0)
-                lu_kmem_init(echo_caches);
+
+        rc = lu_kmem_init(echo_caches);
+        if (rc == 0) {
+                rc = class_register_type(&echo_obd_ops, NULL,
+                                         lvars.module_vars,
+                                         LUSTRE_ECHO_CLIENT_NAME,
+                                         &echo_device_type);
+                if (rc)
+                        lu_kmem_fini(echo_caches);
+        }
         return rc;
 }