Whamcloud - gitweb
- fixed stupid bug with locking in MDS. It caused clients do not flush local
[fs/lustre-release.git] / lustre / obdecho / echo_client.c
index fa591b2..e14484f 100644 (file)
@@ -218,8 +218,7 @@ static int echo_create_object(struct obd_device *obd, int on_target,
                 oa->o_id = ++last_object_id;
 
         if (on_target) {
-                /* XXX get some filter group constants */
-                oa->o_gr = 2;
+                oa->o_gr = FILTER_GROUP_ECHO;
                 oa->o_valid |= OBD_MD_FLGROUP;
                 rc = obd_create(ec->ec_exp, oa, &lsm, oti);
                 if (rc != 0)
@@ -274,7 +273,8 @@ echo_get_object (struct ec_object **ecop, struct obd_device *obd,
         struct ec_object       *eco2;
         int                     rc;
 
-        if ((oa->o_valid & OBD_MD_FLID) == 0)
+        if ((oa->o_valid & OBD_MD_FLID) == 0 ||
+            oa->o_id == 0)                      /* disallow use of object id 0 */
         {
                 CERROR ("No valid oid\n");
                 return (-EINVAL);
@@ -423,6 +423,72 @@ echo_get_stripe_off_id (struct lov_stripe_md *lsm, obd_off *offp, obd_id *idp)
         *offp = offset * stripe_size + woffset % stripe_size;
 }
 
+static void 
+echo_client_page_debug_setup(struct lov_stripe_md *lsm, 
+                             struct page *page, int rw, obd_id id, 
+                             obd_off offset, obd_off count)
+{
+        char    *addr;
+        obd_off  stripe_off;
+        obd_id   stripe_id;
+        int      delta;
+
+        /* no partial pages on the client */
+        LASSERT(count == PAGE_SIZE);
+
+        addr = kmap(page);
+
+        for (delta = 0; delta < PAGE_SIZE; delta += OBD_ECHO_BLOCK_SIZE) {
+                if (rw == OBD_BRW_WRITE) {
+                        stripe_off = offset + delta;
+                        stripe_id = id;
+                        echo_get_stripe_off_id(lsm, &stripe_off, &stripe_id);
+                } else {
+                        stripe_off = 0xdeadbeef00c0ffeeULL;
+                        stripe_id = 0xdeadbeef00c0ffeeULL;
+                }
+                block_debug_setup(addr + delta, OBD_ECHO_BLOCK_SIZE, 
+                                  stripe_off, stripe_id);
+        }
+
+        kunmap(page);
+}
+
+static int
+echo_client_page_debug_check(struct lov_stripe_md *lsm, 
+                             struct page *page, obd_id id, 
+                             obd_off offset, obd_off count)
+{
+        obd_off stripe_off;
+        obd_id  stripe_id;
+        char   *addr;
+        int     delta;
+        int     rc;
+        int     rc2;
+
+        /* no partial pages on the client */
+        LASSERT(count == PAGE_SIZE);
+
+        addr = kmap(page);
+
+        for (rc = delta = 0; delta < PAGE_SIZE; delta += OBD_ECHO_BLOCK_SIZE) {
+                stripe_off = offset + delta;
+                stripe_id = id;
+                echo_get_stripe_off_id (lsm, &stripe_off, &stripe_id);
+
+                rc2 = block_debug_check("test_brw", 
+                                        addr + delta, OBD_ECHO_BLOCK_SIZE, 
+                                        stripe_off, stripe_id);
+                if (rc2 != 0) {
+                        CERROR ("Error in echo object "LPX64"\n", id);
+                        rc = rc2;
+                }
+        }
+
+        kunmap(page);
+        return rc;
+}
+
 static int echo_client_kbrw(struct obd_device *obd, int rw, struct obdo *oa,
                             struct lov_stripe_md *lsm, obd_off offset,
                             obd_size count, struct obd_trans_info *oti)
@@ -434,14 +500,14 @@ static int echo_client_kbrw(struct obd_device *obd, int rw, struct obdo *oa,
         obd_off                 off;
         int                     i;
         int                     rc;
-        int                     verify;
+        int                     verify = 0;
         int                     gfp_mask;
 
-        /* oa_id  == 0    => speed test (no verification) else...
-         * oa & 1         => use HIGHMEM
-         */
-        verify = (oa->o_id != 0);
-        gfp_mask = ((oa->o_id & 1) == 0) ? GFP_KERNEL : GFP_HIGHUSER;
+        verify = ((oa->o_id) != ECHO_PERSISTENT_OBJID &&
+                  (oa->o_valid & OBD_MD_FLFLAGS) != 0 &&
+                  (oa->o_flags & OBD_FL_DEBUG_CHECK) != 0);
+
+        gfp_mask = ((oa->o_id & 2) == 0) ? GFP_KERNEL : GFP_HIGHUSER;
 
         LASSERT(rw == OBD_BRW_WRITE || rw == OBD_BRW_READ);
 
@@ -470,32 +536,18 @@ static int echo_client_kbrw(struct obd_device *obd, int rw, struct obdo *oa,
                         goto out;
 
                 pgp->count = PAGE_SIZE;
-                pgp->off = off;
+                pgp->disk_offset = pgp->page_offset = off;
                 pgp->flag = 0;
 
-                if (verify) {
-                        void *addr = kmap(pgp->pg);
-                        obd_off      stripe_off = off;
-                        obd_id       stripe_id = oa->o_id;
-
-                        if (rw == OBD_BRW_WRITE) {
-                                echo_get_stripe_off_id(lsm, &stripe_off,
-                                                       &stripe_id);
-                                page_debug_setup(addr, pgp->count,
-                                                 stripe_off, stripe_id);
-                        } else {
-                                page_debug_setup(addr, pgp->count,
-                                                 0xdeadbeef00c0ffeeULL,
-                                                 0xdeadbeef00c0ffeeULL);
-                        }
-                        kunmap(pgp->pg);
-                }
+                if (verify)
+                        echo_client_page_debug_setup(lsm, pgp->pg, rw, 
+                                                     oa->o_id, off, pgp->count);
         }
 
         rc = obd_brw(rw, ec->ec_exp, oa, lsm, npages, pga, oti);
 
  out:
-        if (rc != 0)
+        if (rc != 0 || rw != OBD_BRW_READ)
                 verify = 0;
 
         for (i = 0, pgp = pga; i < npages; i++, pgp++) {
@@ -503,18 +555,12 @@ static int echo_client_kbrw(struct obd_device *obd, int rw, struct obdo *oa,
                         continue;
 
                 if (verify) {
-                        void    *addr = kmap(pgp->pg);
-                        obd_off  stripe_off = pgp->off;
-                        obd_id   stripe_id  = oa->o_id;
-                        int      vrc;
-
-                        echo_get_stripe_off_id (lsm, &stripe_off, &stripe_id);
-                        vrc = page_debug_check("test_brw", addr, pgp->count,
-                                               stripe_off, stripe_id);
+                        int vrc;
+                        vrc = echo_client_page_debug_check(lsm, pgp->pg, oa->o_id,
+                                                           pgp->page_offset, 
+                                                           pgp->count);
                         if (vrc != 0 && rc == 0)
                                 rc = vrc;
-
-                        kunmap(pgp->pg);
                 }
                 __free_pages(pgp->pg, 0);
         }
@@ -571,7 +617,7 @@ static int echo_client_ubrw(struct obd_device *obd, int rw,
         for (i = 0, off = offset, pgp = pga;
              i < npages;
              i++, off += PAGE_SIZE, pgp++) {
-                pgp->off = off;
+                pgp->disk_offset = pgp->page_offset = off;
                 pgp->pg = kiobuf->maplist[i];
                 pgp->count = PAGE_SIZE;
                 pgp->flag = 0;
@@ -623,6 +669,7 @@ struct echo_async_state {
         wait_queue_head_t       eas_waitq;
         struct list_head        eas_avail;
         struct obdo             eas_oa;
+        struct lov_stripe_md    *eas_lsm;
 };
 
 static int eas_should_wake(struct echo_async_state *eas)
@@ -665,7 +712,8 @@ static void ec_ap_fill_obdo(void *data, int cmd, struct obdo *oa)
 
         memcpy(oa, &eap->eap_eas->eas_oa, sizeof(*oa));
 }
-static void ec_ap_completion(void *data, int cmd, int rc)
+
+static void ec_ap_completion(void *data, int cmd, struct obdo *oa, int rc)
 {
         struct echo_async_page *eap = eap_from_cookie(data);
         struct echo_async_state *eas;
@@ -675,6 +723,14 @@ static void ec_ap_completion(void *data, int cmd, int rc)
                 return;
         eas = eap->eap_eas;
 
+        if (cmd == OBD_BRW_READ &&
+            eas->eas_oa.o_id != ECHO_PERSISTENT_OBJID &&
+            (eas->eas_oa.o_valid & OBD_MD_FLFLAGS) != 0 &&
+            (eas->eas_oa.o_flags & OBD_FL_DEBUG_CHECK) != 0)
+                echo_client_page_debug_check(eas->eas_lsm, eap->eap_page, 
+                                             eas->eas_oa.o_id, eap->eap_off,
+                                             PAGE_SIZE);
+
         spin_lock_irqsave(&eas->eas_lock, flags);
         if (rc && !eas->eas_rc)
                 eas->eas_rc = rc;
@@ -731,6 +787,7 @@ static int echo_client_async_page(struct obd_export *exp, int rw,
         init_waitqueue_head(&eas.eas_waitq);
         eas.eas_in_flight = 0;
         eas.eas_rc = 0;
+        eas.eas_lsm = lsm;
         INIT_LIST_HEAD(&eas.eas_avail);
 
         /* prepare the group of pages that we're going to be keeping
@@ -740,7 +797,8 @@ static int echo_client_async_page(struct obd_export *exp, int rw,
                 if (page == NULL)
                         GOTO(out, rc = -ENOMEM);
 
-                list_add_tail(&page->list, &pages);
+                page->private = 0;
+                list_add_tail(&PAGE_LIST(page), &pages);
 
                 OBD_ALLOC(eap, sizeof(*eap));
                 if (eap == NULL)
@@ -749,7 +807,7 @@ static int echo_client_async_page(struct obd_export *exp, int rw,
                 eap->eap_magic = EAP_MAGIC;
                 eap->eap_page = page;
                 eap->eap_eas = &eas;
-                eap->eap_cookie = ERR_PTR(-ENOENT);
+                page->private = (unsigned long)eap;
                 list_add_tail(&eap->eap_item, &eas.eas_avail);
         }
 
@@ -775,17 +833,17 @@ static int echo_client_async_page(struct obd_export *exp, int rw,
                 spin_unlock_irqrestore(&eas.eas_lock, flags);
 
                 /* unbind the eap from its old page offset */
-                if (!IS_ERR(eap->eap_cookie)) {
+                if (eap->eap_cookie != NULL) {
                         obd_teardown_async_page(exp, lsm, NULL, 
                                                 eap->eap_cookie);
-                        eap->eap_cookie = ERR_PTR(-ENOENT);
+                        eap->eap_cookie = NULL;
                 }
 
                 eas.eas_next_offset += PAGE_SIZE;
                 eap->eap_off = eas.eas_next_offset;
 
-                rc = obd_prep_async_page(exp, lsm, NULL, eap->eap_page, 
-                                         eap->eap_off, &ec_async_page_ops, 
+                rc = obd_prep_async_page(exp, lsm, NULL, eap->eap_page,
+                                         eap->eap_off, &ec_async_page_ops,
                                          eap, &eap->eap_cookie);
                 if (rc) {
                         spin_lock_irqsave(&eas.eas_lock, flags);
@@ -793,9 +851,16 @@ static int echo_client_async_page(struct obd_export *exp, int rw,
                         break;
                 }
 
+                if (oa->o_id != ECHO_PERSISTENT_OBJID &&
+                    (oa->o_valid & OBD_MD_FLFLAGS) != 0 &&
+                    (oa->o_flags & OBD_FL_DEBUG_CHECK) != 0)
+                        echo_client_page_debug_setup(lsm, eap->eap_page, rw, 
+                                                     oa->o_id, 
+                                                     eap->eap_off, PAGE_SIZE);
+
                 /* always asserts urgent, which isn't quite right */
-                rc = obd_queue_async_io(exp, lsm, NULL, eap->eap_cookie, 
-                                        rw, 0, PAGE_SIZE, 0, 
+                rc = obd_queue_async_io(exp, lsm, NULL, eap->eap_cookie,
+                                        rw, 0, PAGE_SIZE, 0,
                                         ASYNC_READY | ASYNC_URGENT |
                                         ASYNC_COUNT_STABLE);
                 spin_lock_irqsave(&eas.eas_lock, flags);
@@ -821,12 +886,13 @@ static int echo_client_async_page(struct obd_export *exp, int rw,
 
 out:
         list_for_each_safe(pos, n, &pages) {
-                struct page *page = list_entry(pos, struct page, list);
+                struct page *page = list_entry(pos, struct page, 
+                                               PAGE_LIST_ENTRY);
 
-                list_del(&page->list);
-                if (page->private) {
+                list_del(&PAGE_LIST(page));
+                if (page->private != 0) {
                         eap = (struct echo_async_page *)page->private;
-                        if (!IS_ERR(eap->eap_cookie))
+                        if (eap->eap_cookie != NULL)
                                 obd_teardown_async_page(exp, lsm, NULL, 
                                                         eap->eap_cookie);
                         OBD_FREE(eap, sizeof(*eap));
@@ -847,7 +913,7 @@ static int echo_client_prep_commit(struct obd_export *exp, int rw,
         struct niobuf_remote *rnb;
         obd_off off;
         obd_size npages, tot_pages;
-        int i, ret = 0, err = 0;
+        int i, ret = 0;
         ENTRY;
 
         if (count <= 0 || (count & (PAGE_SIZE - 1)) != 0 ||
@@ -886,30 +952,32 @@ static int echo_client_prep_commit(struct obd_export *exp, int rw,
 
                 for (i = 0; i < npages; i++) {
                         struct page *page = lnb[i].page;
-                        void *addr;
 
                         /* read past eof? */
-                        if (page == NULL && lnb[i].rc == 0) 
+                        if (page == NULL && lnb[i].rc == 0)
                                 continue;
 
-                        addr = kmap(lnb[i].page);
+                        if (oa->o_id == ECHO_PERSISTENT_OBJID ||
+                            (oa->o_valid & OBD_MD_FLFLAGS) == 0 ||
+                            (oa->o_flags & OBD_FL_DEBUG_CHECK) == 0)
+                                continue;
 
-                        if (rw == OBD_BRW_WRITE) 
-                                page_debug_setup(addr, PAGE_SIZE,
-                                                 rnb[i].offset, oa->o_id);
-                        else 
-                                err = page_debug_check("prep_commit", addr, 
-                                                 PAGE_SIZE, rnb[i].offset,
-                                                 oa->o_id);
 
-                        kunmap(lnb[i].page);
+                        if (rw == OBD_BRW_WRITE)
+                                echo_client_page_debug_setup(lsm, page, rw,
+                                                             oa->o_id,
+                                                             rnb[i].offset,
+                                                             rnb[i].len);
+                        else
+                                echo_client_page_debug_check(lsm, page,
+                                                             oa->o_id,
+                                                             rnb[i].offset,
+                                                             rnb[i].len);
                 }
 
-                ret = obd_commitrw(rw, exp, oa, 1, &ioo, npages, lnb, oti);
+                ret = obd_commitrw(rw, exp, oa, 1, &ioo, npages, lnb, oti, ret);
                 if (ret != 0)
                         GOTO(out, ret);
-                if (err)
-                        GOTO(out, ret = err);
         }
 
 out:
@@ -920,7 +988,7 @@ out:
         RETURN(ret);
 }
 
-int echo_client_brw_ioctl(int rw, struct obd_export *exp, 
+int echo_client_brw_ioctl(int rw, struct obd_export *exp,
                           struct obd_ioctl_data *data)
 {
         struct obd_device *obd = class_exp2obd(exp);
@@ -938,7 +1006,7 @@ int echo_client_brw_ioctl(int rw, struct obd_export *exp,
 
         data->ioc_obdo1.o_valid &= ~OBD_MD_FLHANDLE;
         data->ioc_obdo1.o_valid |= OBD_MD_FLGROUP;
-        data->ioc_obdo1.o_gr = 2;
+        data->ioc_obdo1.o_gr = FILTER_GROUP_ECHO;
 
         switch((long)data->ioc_pbuf1) {
         case 1:
@@ -1048,14 +1116,15 @@ echo_client_enqueue(struct obd_export *exp, struct obdo *oa,
 
         ecl->ecl_mode = mode;
         ecl->ecl_object = eco;
-        ecl->ecl_extent.start = offset;
-        ecl->ecl_extent.end = (nob == 0) ? ((obd_off) -1) : (offset + nob - 1);
+        ecl->ecl_policy.l_extent.start = offset;
+        ecl->ecl_policy.l_extent.end =
+                (nob == 0) ? ((obd_off) -1) : (offset + nob - 1);
 
         flags = 0;
-        rc = obd_enqueue(ec->ec_exp, eco->eco_lsm, NULL, LDLM_EXTENT,
-                         &ecl->ecl_extent,sizeof(ecl->ecl_extent), mode,
-                         &flags, echo_ldlm_callback, eco,
-                         &ecl->ecl_lock_handle);
+        rc = obd_enqueue(ec->ec_exp, eco->eco_lsm, LDLM_EXTENT,
+                         &ecl->ecl_policy, mode, &flags, echo_ldlm_callback,
+                         ldlm_completion_ast, NULL, eco, sizeof(struct ost_lvb),
+                         lustre_swab_ost_lvb, &ecl->ecl_lock_handle);
         if (rc != 0)
                 goto failed_1;
 
@@ -1156,7 +1225,7 @@ echo_client_iocontrol(unsigned int cmd, struct obd_export *exp,
                 rc = echo_get_object (&eco, obd, &data->ioc_obdo1);
                 if (rc == 0) {
                         oa = &data->ioc_obdo1;
-                        oa->o_gr = 2;
+                        oa->o_gr = FILTER_GROUP_ECHO;
                         oa->o_valid |= OBD_MD_FLGROUP;
                         rc = obd_destroy(ec->ec_exp, oa, eco->eco_lsm, 
                                          &dummy_oti);
@@ -1282,7 +1351,7 @@ echo_client_setup(struct obd_device *obddev, obd_count len, void *buf)
         INIT_LIST_HEAD (&ec->ec_objects);
         ec->ec_unique = 0;
 
-        rc = obd_connect(&conn, tgt, &echo_uuid);
+        rc = obd_connect(&conn, tgt, &echo_uuid, 0);
         if (rc) {
                 CERROR("fail to connect to device %s\n", lcfg->lcfg_inlbuf1);
                 return (rc);
@@ -1324,7 +1393,8 @@ static int echo_client_cleanup(struct obd_device *obddev, int flags)
 }
 
 static int echo_client_connect(struct lustre_handle *conn,
-                               struct obd_device *src, struct obd_uuid *cluuid)
+                               struct obd_device *src, struct obd_uuid *cluuid,
+                               unsigned long connect_flags)
 {
         struct obd_export *exp;
         int                rc;
@@ -1376,12 +1446,12 @@ static int echo_client_disconnect(struct obd_export *exp, int flags)
 }
 
 static struct obd_ops echo_obd_ops = {
-        o_owner:       THIS_MODULE,
-        o_setup:       echo_client_setup,
-        o_cleanup:     echo_client_cleanup,
-        o_iocontrol:   echo_client_iocontrol,
-        o_connect:     echo_client_connect,
-        o_disconnect:  echo_client_disconnect
+        .o_owner       = THIS_MODULE,
+        .o_setup       = echo_client_setup,
+        .o_cleanup     = echo_client_cleanup,
+        .o_iocontrol   = echo_client_iocontrol,
+        .o_connect     = echo_client_connect,
+        .o_disconnect  = echo_client_disconnect
 };
 
 int echo_client_init(void)
@@ -1389,7 +1459,7 @@ int echo_client_init(void)
         struct lprocfs_static_vars lvars;
 
         lprocfs_init_vars(echo, &lvars);
-        return class_register_type(&echo_obd_ops, lvars.module_vars,
+        return class_register_type(&echo_obd_ops, NULL, lvars.module_vars,
                                    OBD_ECHO_CLIENT_DEVICENAME);
 }