Whamcloud - gitweb
- fixed stupid bug with locking in MDS. It caused clients do not flush local
[fs/lustre-release.git] / lustre / obdecho / echo_client.c
index 2f15e62..e14484f 100644 (file)
@@ -218,8 +218,7 @@ static int echo_create_object(struct obd_device *obd, int on_target,
                 oa->o_id = ++last_object_id;
 
         if (on_target) {
-                /* XXX get some filter group constants */
-                oa->o_gr = 2;
+                oa->o_gr = FILTER_GROUP_ECHO;
                 oa->o_valid |= OBD_MD_FLGROUP;
                 rc = obd_create(ec->ec_exp, oa, &lsm, oti);
                 if (rc != 0)
@@ -274,7 +273,8 @@ echo_get_object (struct ec_object **ecop, struct obd_device *obd,
         struct ec_object       *eco2;
         int                     rc;
 
-        if ((oa->o_valid & OBD_MD_FLID) == 0)
+        if ((oa->o_valid & OBD_MD_FLID) == 0 ||
+            oa->o_id == 0)                      /* disallow use of object id 0 */
         {
                 CERROR ("No valid oid\n");
                 return (-EINVAL);
@@ -423,47 +423,68 @@ echo_get_stripe_off_id (struct lov_stripe_md *lsm, obd_off *offp, obd_id *idp)
         *offp = offset * stripe_size + woffset % stripe_size;
 }
 
-static void echo_page_debug_setup(struct lov_stripe_md *lsm, 
-                                  struct page *page, int rw, obd_id id, 
-                                  obd_off offset, obd_off count)
+static void 
+echo_client_page_debug_setup(struct lov_stripe_md *lsm, 
+                             struct page *page, int rw, obd_id id, 
+                             obd_off offset, obd_off count)
 {
-        void *addr;
-        obd_off stripe_off;
-        obd_id stripe_id;
+        char    *addr;
+        obd_off  stripe_off;
+        obd_id   stripe_id;
+        int      delta;
 
-        if (id == 0)
-                return;
+        /* no partial pages on the client */
+        LASSERT(count == PAGE_SIZE);
 
         addr = kmap(page);
 
-        if (rw == OBD_BRW_WRITE) {
-                stripe_off = offset;
-                stripe_id = id;
-                echo_get_stripe_off_id(lsm, &stripe_off, &stripe_id);
-        } else {
-                stripe_off = 0xdeadbeef00c0ffeeULL;
-                stripe_id = 0xdeadbeef00c0ffeeULL;
+        for (delta = 0; delta < PAGE_SIZE; delta += OBD_ECHO_BLOCK_SIZE) {
+                if (rw == OBD_BRW_WRITE) {
+                        stripe_off = offset + delta;
+                        stripe_id = id;
+                        echo_get_stripe_off_id(lsm, &stripe_off, &stripe_id);
+                } else {
+                        stripe_off = 0xdeadbeef00c0ffeeULL;
+                        stripe_id = 0xdeadbeef00c0ffeeULL;
+                }
+                block_debug_setup(addr + delta, OBD_ECHO_BLOCK_SIZE, 
+                                  stripe_off, stripe_id);
         }
-        page_debug_setup(addr, count, stripe_off, stripe_id);
 
         kunmap(page);
 }
 
-static int echo_page_debug_check(struct lov_stripe_md *lsm, 
-                                  struct page *page, obd_id id, 
-                                  obd_off offset, obd_off count)
+static int
+echo_client_page_debug_check(struct lov_stripe_md *lsm, 
+                             struct page *page, obd_id id, 
+                             obd_off offset, obd_off count)
 {
-        obd_off stripe_off = offset;
-        obd_id stripe_id = id;
-        void *addr;
-        int rc;
+        obd_off stripe_off;
+        obd_id  stripe_id;
+        char   *addr;
+        int     delta;
+        int     rc;
+        int     rc2;
 
-        if (id == 0)
-                return 0;
+        /* no partial pages on the client */
+        LASSERT(count == PAGE_SIZE);
 
         addr = kmap(page);
-        echo_get_stripe_off_id (lsm, &stripe_off, &stripe_id);
-        rc = page_debug_check("test_brw", addr, count, stripe_off, stripe_id);
+
+        for (rc = delta = 0; delta < PAGE_SIZE; delta += OBD_ECHO_BLOCK_SIZE) {
+                stripe_off = offset + delta;
+                stripe_id = id;
+                echo_get_stripe_off_id (lsm, &stripe_off, &stripe_id);
+
+                rc2 = block_debug_check("test_brw", 
+                                        addr + delta, OBD_ECHO_BLOCK_SIZE, 
+                                        stripe_off, stripe_id);
+                if (rc2 != 0) {
+                        CERROR ("Error in echo object "LPX64"\n", id);
+                        rc = rc2;
+                }
+        }
+
         kunmap(page);
         return rc;
 }
@@ -482,10 +503,11 @@ static int echo_client_kbrw(struct obd_device *obd, int rw, struct obdo *oa,
         int                     verify = 0;
         int                     gfp_mask;
 
-        /* oa_id  == 0    => speed test (no verification) else...
-         * oa & 1         => use HIGHMEM
-         */
-        gfp_mask = ((oa->o_id & 1) == 0) ? GFP_KERNEL : GFP_HIGHUSER;
+        verify = ((oa->o_id) != ECHO_PERSISTENT_OBJID &&
+                  (oa->o_valid & OBD_MD_FLFLAGS) != 0 &&
+                  (oa->o_flags & OBD_FL_DEBUG_CHECK) != 0);
+
+        gfp_mask = ((oa->o_id & 2) == 0) ? GFP_KERNEL : GFP_HIGHUSER;
 
         LASSERT(rw == OBD_BRW_WRITE || rw == OBD_BRW_READ);
 
@@ -514,18 +536,19 @@ static int echo_client_kbrw(struct obd_device *obd, int rw, struct obdo *oa,
                         goto out;
 
                 pgp->count = PAGE_SIZE;
-                pgp->off = off;
+                pgp->disk_offset = pgp->page_offset = off;
                 pgp->flag = 0;
 
-                echo_page_debug_setup(lsm, pgp->pg, rw, oa->o_id, off, 
-                                      pgp->count);
+                if (verify)
+                        echo_client_page_debug_setup(lsm, pgp->pg, rw, 
+                                                     oa->o_id, off, pgp->count);
         }
 
         rc = obd_brw(rw, ec->ec_exp, oa, lsm, npages, pga, oti);
 
  out:
-        if (rc == 0 && rw == OBD_BRW_READ)
-                verify = 1;
+        if (rc != 0 || rw != OBD_BRW_READ)
+                verify = 0;
 
         for (i = 0, pgp = pga; i < npages; i++, pgp++) {
                 if (pgp->pg == NULL)
@@ -533,8 +556,9 @@ static int echo_client_kbrw(struct obd_device *obd, int rw, struct obdo *oa,
 
                 if (verify) {
                         int vrc;
-                        vrc = echo_page_debug_check(lsm, pgp->pg, oa->o_id,
-                                                    pgp->off, pgp->count);
+                        vrc = echo_client_page_debug_check(lsm, pgp->pg, oa->o_id,
+                                                           pgp->page_offset, 
+                                                           pgp->count);
                         if (vrc != 0 && rc == 0)
                                 rc = vrc;
                 }
@@ -593,7 +617,7 @@ static int echo_client_ubrw(struct obd_device *obd, int rw,
         for (i = 0, off = offset, pgp = pga;
              i < npages;
              i++, off += PAGE_SIZE, pgp++) {
-                pgp->off = off;
+                pgp->disk_offset = pgp->page_offset = off;
                 pgp->pg = kiobuf->maplist[i];
                 pgp->count = PAGE_SIZE;
                 pgp->flag = 0;
@@ -688,7 +712,8 @@ static void ec_ap_fill_obdo(void *data, int cmd, struct obdo *oa)
 
         memcpy(oa, &eap->eap_eas->eas_oa, sizeof(*oa));
 }
-static void ec_ap_completion(void *data, int cmd, int rc)
+
+static void ec_ap_completion(void *data, int cmd, struct obdo *oa, int rc)
 {
         struct echo_async_page *eap = eap_from_cookie(data);
         struct echo_async_state *eas;
@@ -698,10 +723,13 @@ static void ec_ap_completion(void *data, int cmd, int rc)
                 return;
         eas = eap->eap_eas;
 
-        if (cmd == OBD_BRW_READ)
-                echo_page_debug_check(eas->eas_lsm, eap->eap_page, 
-                                      eas->eas_oa.o_id, eap->eap_off, 
-                                      PAGE_SIZE);
+        if (cmd == OBD_BRW_READ &&
+            eas->eas_oa.o_id != ECHO_PERSISTENT_OBJID &&
+            (eas->eas_oa.o_valid & OBD_MD_FLFLAGS) != 0 &&
+            (eas->eas_oa.o_flags & OBD_FL_DEBUG_CHECK) != 0)
+                echo_client_page_debug_check(eas->eas_lsm, eap->eap_page, 
+                                             eas->eas_oa.o_id, eap->eap_off,
+                                             PAGE_SIZE);
 
         spin_lock_irqsave(&eas->eas_lock, flags);
         if (rc && !eas->eas_rc)
@@ -770,7 +798,7 @@ static int echo_client_async_page(struct obd_export *exp, int rw,
                         GOTO(out, rc = -ENOMEM);
 
                 page->private = 0;
-                list_add_tail(&page->list, &pages);
+                list_add_tail(&PAGE_LIST(page), &pages);
 
                 OBD_ALLOC(eap, sizeof(*eap));
                 if (eap == NULL)
@@ -823,9 +851,12 @@ static int echo_client_async_page(struct obd_export *exp, int rw,
                         break;
                 }
 
-                if (rw == OBD_BRW_WRITE)
-                        echo_page_debug_setup(lsm, eap->eap_page, rw, oa->o_id,
-                                              eap->eap_off, PAGE_SIZE);
+                if (oa->o_id != ECHO_PERSISTENT_OBJID &&
+                    (oa->o_valid & OBD_MD_FLFLAGS) != 0 &&
+                    (oa->o_flags & OBD_FL_DEBUG_CHECK) != 0)
+                        echo_client_page_debug_setup(lsm, eap->eap_page, rw, 
+                                                     oa->o_id, 
+                                                     eap->eap_off, PAGE_SIZE);
 
                 /* always asserts urgent, which isn't quite right */
                 rc = obd_queue_async_io(exp, lsm, NULL, eap->eap_cookie,
@@ -855,9 +886,10 @@ static int echo_client_async_page(struct obd_export *exp, int rw,
 
 out:
         list_for_each_safe(pos, n, &pages) {
-                struct page *page = list_entry(pos, struct page, list);
+                struct page *page = list_entry(pos, struct page, 
+                                               PAGE_LIST_ENTRY);
 
-                list_del(&page->list);
+                list_del(&PAGE_LIST(page));
                 if (page->private != 0) {
                         eap = (struct echo_async_page *)page->private;
                         if (eap->eap_cookie != NULL)
@@ -881,7 +913,7 @@ static int echo_client_prep_commit(struct obd_export *exp, int rw,
         struct niobuf_remote *rnb;
         obd_off off;
         obd_size npages, tot_pages;
-        int i, ret = 0, err = 0;
+        int i, ret = 0;
         ENTRY;
 
         if (count <= 0 || (count & (PAGE_SIZE - 1)) != 0 ||
@@ -922,24 +954,30 @@ static int echo_client_prep_commit(struct obd_export *exp, int rw,
                         struct page *page = lnb[i].page;
 
                         /* read past eof? */
-                        if (page == NULL && lnb[i].rc == 0) 
+                        if (page == NULL && lnb[i].rc == 0)
                                 continue;
 
-                        if (rw == OBD_BRW_WRITE) 
-                                echo_page_debug_setup(lsm, page, rw, oa->o_id, 
-                                                      rnb[i].offset, 
-                                                      rnb[i].len);
+                        if (oa->o_id == ECHO_PERSISTENT_OBJID ||
+                            (oa->o_valid & OBD_MD_FLFLAGS) == 0 ||
+                            (oa->o_flags & OBD_FL_DEBUG_CHECK) == 0)
+                                continue;
+
+
+                        if (rw == OBD_BRW_WRITE)
+                                echo_client_page_debug_setup(lsm, page, rw,
+                                                             oa->o_id,
+                                                             rnb[i].offset,
+                                                             rnb[i].len);
                         else
-                                echo_page_debug_check(lsm, page, oa->o_id, 
-                                                      rnb[i].offset, 
-                                                      rnb[i].len);
+                                echo_client_page_debug_check(lsm, page,
+                                                             oa->o_id,
+                                                             rnb[i].offset,
+                                                             rnb[i].len);
                 }
 
-                ret = obd_commitrw(rw, exp, oa, 1, &ioo, npages, lnb, oti);
+                ret = obd_commitrw(rw, exp, oa, 1, &ioo, npages, lnb, oti, ret);
                 if (ret != 0)
                         GOTO(out, ret);
-                if (err)
-                        GOTO(out, ret = err);
         }
 
 out:
@@ -950,7 +988,7 @@ out:
         RETURN(ret);
 }
 
-int echo_client_brw_ioctl(int rw, struct obd_export *exp, 
+int echo_client_brw_ioctl(int rw, struct obd_export *exp,
                           struct obd_ioctl_data *data)
 {
         struct obd_device *obd = class_exp2obd(exp);
@@ -968,7 +1006,7 @@ int echo_client_brw_ioctl(int rw, struct obd_export *exp,
 
         data->ioc_obdo1.o_valid &= ~OBD_MD_FLHANDLE;
         data->ioc_obdo1.o_valid |= OBD_MD_FLGROUP;
-        data->ioc_obdo1.o_gr = 2;
+        data->ioc_obdo1.o_gr = FILTER_GROUP_ECHO;
 
         switch((long)data->ioc_pbuf1) {
         case 1:
@@ -1078,14 +1116,15 @@ echo_client_enqueue(struct obd_export *exp, struct obdo *oa,
 
         ecl->ecl_mode = mode;
         ecl->ecl_object = eco;
-        ecl->ecl_extent.start = offset;
-        ecl->ecl_extent.end = (nob == 0) ? ((obd_off) -1) : (offset + nob - 1);
+        ecl->ecl_policy.l_extent.start = offset;
+        ecl->ecl_policy.l_extent.end =
+                (nob == 0) ? ((obd_off) -1) : (offset + nob - 1);
 
         flags = 0;
-        rc = obd_enqueue(ec->ec_exp, eco->eco_lsm, NULL, LDLM_EXTENT,
-                         &ecl->ecl_extent,sizeof(ecl->ecl_extent), mode,
-                         &flags, echo_ldlm_callback, eco,
-                         &ecl->ecl_lock_handle);
+        rc = obd_enqueue(ec->ec_exp, eco->eco_lsm, LDLM_EXTENT,
+                         &ecl->ecl_policy, mode, &flags, echo_ldlm_callback,
+                         ldlm_completion_ast, NULL, eco, sizeof(struct ost_lvb),
+                         lustre_swab_ost_lvb, &ecl->ecl_lock_handle);
         if (rc != 0)
                 goto failed_1;
 
@@ -1186,7 +1225,7 @@ echo_client_iocontrol(unsigned int cmd, struct obd_export *exp,
                 rc = echo_get_object (&eco, obd, &data->ioc_obdo1);
                 if (rc == 0) {
                         oa = &data->ioc_obdo1;
-                        oa->o_gr = 2;
+                        oa->o_gr = FILTER_GROUP_ECHO;
                         oa->o_valid |= OBD_MD_FLGROUP;
                         rc = obd_destroy(ec->ec_exp, oa, eco->eco_lsm, 
                                          &dummy_oti);
@@ -1312,7 +1351,7 @@ echo_client_setup(struct obd_device *obddev, obd_count len, void *buf)
         INIT_LIST_HEAD (&ec->ec_objects);
         ec->ec_unique = 0;
 
-        rc = obd_connect(&conn, tgt, &echo_uuid);
+        rc = obd_connect(&conn, tgt, &echo_uuid, 0);
         if (rc) {
                 CERROR("fail to connect to device %s\n", lcfg->lcfg_inlbuf1);
                 return (rc);
@@ -1354,7 +1393,8 @@ static int echo_client_cleanup(struct obd_device *obddev, int flags)
 }
 
 static int echo_client_connect(struct lustre_handle *conn,
-                               struct obd_device *src, struct obd_uuid *cluuid)
+                               struct obd_device *src, struct obd_uuid *cluuid,
+                               unsigned long connect_flags)
 {
         struct obd_export *exp;
         int                rc;
@@ -1406,12 +1446,12 @@ static int echo_client_disconnect(struct obd_export *exp, int flags)
 }
 
 static struct obd_ops echo_obd_ops = {
-        o_owner:       THIS_MODULE,
-        o_setup:       echo_client_setup,
-        o_cleanup:     echo_client_cleanup,
-        o_iocontrol:   echo_client_iocontrol,
-        o_connect:     echo_client_connect,
-        o_disconnect:  echo_client_disconnect
+        .o_owner       = THIS_MODULE,
+        .o_setup       = echo_client_setup,
+        .o_cleanup     = echo_client_cleanup,
+        .o_iocontrol   = echo_client_iocontrol,
+        .o_connect     = echo_client_connect,
+        .o_disconnect  = echo_client_disconnect
 };
 
 int echo_client_init(void)
@@ -1419,7 +1459,7 @@ int echo_client_init(void)
         struct lprocfs_static_vars lvars;
 
         lprocfs_init_vars(echo, &lvars);
-        return class_register_type(&echo_obd_ops, lvars.module_vars,
+        return class_register_type(&echo_obd_ops, NULL, lvars.module_vars,
                                    OBD_ECHO_CLIENT_DEVICENAME);
 }