Whamcloud - gitweb
* new network config snapshot
[fs/lustre-release.git] / lustre / obdecho / echo_client.c
index 87b2f92..806788d 100644 (file)
@@ -29,6 +29,7 @@
 #include <linux/iobuf.h>
 #endif
 #include <asm/div64.h>
+#include <linux/smp_lock.h>
 #else
 #include <liblustre.h>
 #endif
@@ -274,7 +275,8 @@ echo_get_object (struct ec_object **ecop, struct obd_device *obd,
         struct ec_object       *eco2;
         int                     rc;
 
-        if ((oa->o_valid & OBD_MD_FLID) == 0)
+        if ((oa->o_valid & OBD_MD_FLID) == 0 ||
+            oa->o_id == 0)                      /* disallow use of object id 0 */
         {
                 CERROR ("No valid oid\n");
                 return (-EINVAL);
@@ -423,6 +425,70 @@ echo_get_stripe_off_id (struct lov_stripe_md *lsm, obd_off *offp, obd_id *idp)
         *offp = offset * stripe_size + woffset % stripe_size;
 }
 
+static void 
+echo_client_page_debug_setup(struct lov_stripe_md *lsm, 
+                             struct page *page, int rw, obd_id id, 
+                             obd_off offset, obd_off count)
+{
+        char    *addr;
+        obd_off  stripe_off;
+        obd_id   stripe_id;
+        int      delta;
+
+        /* no partial pages on the client */
+        LASSERT(count == PAGE_SIZE);
+
+        addr = kmap(page);
+
+        for (delta = 0; delta < PAGE_SIZE; delta += OBD_ECHO_BLOCK_SIZE) {
+                if (rw == OBD_BRW_WRITE) {
+                        stripe_off = offset + delta;
+                        stripe_id = id;
+                        echo_get_stripe_off_id(lsm, &stripe_off, &stripe_id);
+                } else {
+                        stripe_off = 0xdeadbeef00c0ffeeULL;
+                        stripe_id = 0xdeadbeef00c0ffeeULL;
+                }
+                block_debug_setup(addr + delta, OBD_ECHO_BLOCK_SIZE, 
+                                  stripe_off, stripe_id);
+        }
+
+        kunmap(page);
+}
+
+static int
+echo_client_page_debug_check(struct lov_stripe_md *lsm, 
+                             struct page *page, obd_id id, 
+                             obd_off offset, obd_off count)
+{
+        obd_off stripe_off;
+        obd_id  stripe_id;
+        char   *addr;
+        int     delta;
+        int     rc;
+        int     rc2;
+
+        /* no partial pages on the client */
+        LASSERT(count == PAGE_SIZE);
+
+        addr = kmap(page);
+
+        for (rc = delta = 0; delta < PAGE_SIZE; delta += OBD_ECHO_BLOCK_SIZE) {
+                stripe_off = offset + delta;
+                stripe_id = id;
+                echo_get_stripe_off_id (lsm, &stripe_off, &stripe_id);
+
+                rc2 = block_debug_check("test_brw", 
+                                        addr + delta, OBD_ECHO_BLOCK_SIZE, 
+                                        stripe_off, stripe_id);
+                if (rc2 != 0)
+                        rc = rc2;
+        }
+
+        kunmap(page);
+        return rc;
+}
+
 static int echo_client_kbrw(struct obd_device *obd, int rw, struct obdo *oa,
                             struct lov_stripe_md *lsm, obd_off offset,
                             obd_size count, struct obd_trans_info *oti)
@@ -437,11 +503,11 @@ static int echo_client_kbrw(struct obd_device *obd, int rw, struct obdo *oa,
         int                     verify;
         int                     gfp_mask;
 
-        /* oa_id  == 0    => speed test (no verification) else...
-         * oa & 1         => use HIGHMEM
-         */
-        verify = (oa->o_id != 0);
-        gfp_mask = ((oa->o_id & 1) == 0) ? GFP_KERNEL : GFP_HIGHUSER;
+        verify = ((oa->o_id) != ECHO_PERSISTENT_OBJID &&
+                  (oa->o_valid & OBD_MD_FLFLAGS) != 0 &&
+                  (oa->o_flags & OBD_FL_DEBUG_CHECK) != 0);
+
+        gfp_mask = ((oa->o_id & 2) == 0) ? GFP_KERNEL : GFP_HIGHUSER;
 
         LASSERT(rw == OBD_BRW_WRITE || rw == OBD_BRW_READ);
 
@@ -473,29 +539,15 @@ static int echo_client_kbrw(struct obd_device *obd, int rw, struct obdo *oa,
                 pgp->off = off;
                 pgp->flag = 0;
 
-                if (verify) {
-                        void *addr = kmap(pgp->pg);
-                        obd_off      stripe_off = off;
-                        obd_id       stripe_id = oa->o_id;
-
-                        if (rw == OBD_BRW_WRITE) {
-                                echo_get_stripe_off_id(lsm, &stripe_off,
-                                                       &stripe_id);
-                                page_debug_setup(addr, pgp->count,
-                                                 stripe_off, stripe_id);
-                        } else {
-                                page_debug_setup(addr, pgp->count,
-                                                 0xdeadbeef00c0ffeeULL,
-                                                 0xdeadbeef00c0ffeeULL);
-                        }
-                        kunmap(pgp->pg);
-                }
+                if (verify)
+                        echo_client_page_debug_setup(lsm, pgp->pg, rw, 
+                                                     oa->o_id, off, pgp->count);
         }
 
         rc = obd_brw(rw, ec->ec_exp, oa, lsm, npages, pga, oti);
 
  out:
-        if (rc != 0)
+        if (rc != 0 || rw != OBD_BRW_READ)
                 verify = 0;
 
         for (i = 0, pgp = pga; i < npages; i++, pgp++) {
@@ -503,18 +555,11 @@ static int echo_client_kbrw(struct obd_device *obd, int rw, struct obdo *oa,
                         continue;
 
                 if (verify) {
-                        void    *addr = kmap(pgp->pg);
-                        obd_off  stripe_off = pgp->off;
-                        obd_id   stripe_id  = oa->o_id;
-                        int      vrc;
-
-                        echo_get_stripe_off_id (lsm, &stripe_off, &stripe_id);
-                        vrc = page_debug_check("test_brw", addr, pgp->count,
-                                               stripe_off, stripe_id);
+                        int vrc;
+                        vrc = echo_client_page_debug_check(lsm, pgp->pg, oa->o_id,
+                                                           pgp->off, pgp->count);
                         if (vrc != 0 && rc == 0)
                                 rc = vrc;
-
-                        kunmap(pgp->pg);
                 }
                 __free_pages(pgp->pg, 0);
         }
@@ -614,6 +659,10 @@ struct echo_async_page {
         struct list_head        eap_item;
 };
 
+#define EAP_FROM_COOKIE(c)                                                      \
+        (LASSERT(((struct echo_async_page *)(c))->eap_magic == EAP_MAGIC),      \
+         (struct echo_async_page *)(c))
+
 struct echo_async_state {
         spinlock_t              eas_lock;
         obd_off                 eas_next_offset;
@@ -623,6 +672,7 @@ struct echo_async_state {
         wait_queue_head_t       eas_waitq;
         struct list_head        eas_avail;
         struct obdo             eas_oa;
+        struct lov_stripe_md    *eas_lsm;
 };
 
 static int eas_should_wake(struct echo_async_state *eas)
@@ -636,14 +686,6 @@ static int eas_should_wake(struct echo_async_state *eas)
         return rc;
 };
 
-struct echo_async_page *eap_from_cookie(void *cookie)
-{
-        struct echo_async_page *eap = cookie;
-        if (eap->eap_magic != EAP_MAGIC)
-                return ERR_PTR(-EINVAL);
-        return eap;
-};
-
 static int ec_ap_make_ready(void *data, int cmd)
 {
         /* our pages are issued ready */
@@ -658,23 +700,27 @@ static int ec_ap_refresh_count(void *data, int cmd)
 }
 static void ec_ap_fill_obdo(void *data, int cmd, struct obdo *oa)
 {
-        struct echo_async_page *eap;
-        eap = eap_from_cookie(data);
-        if (IS_ERR(eap))
-                return;
+        struct echo_async_page *eap = EAP_FROM_COOKIE(data);
 
         memcpy(oa, &eap->eap_eas->eas_oa, sizeof(*oa));
 }
-static void ec_ap_completion(void *data, int cmd, int rc)
+
+static void ec_ap_completion(void *data, int cmd, struct obdo *oa, int rc)
 {
-        struct echo_async_page *eap = eap_from_cookie(data);
+        struct echo_async_page *eap = EAP_FROM_COOKIE(data);
         struct echo_async_state *eas;
         unsigned long flags;
 
-        if (IS_ERR(eap))
-                return;
         eas = eap->eap_eas;
 
+        if (cmd == OBD_BRW_READ &&
+            eas->eas_oa.o_id != ECHO_PERSISTENT_OBJID &&
+            (eas->eas_oa.o_valid & OBD_MD_FLFLAGS) != 0 &&
+            (eas->eas_oa.o_flags & OBD_FL_DEBUG_CHECK) != 0)
+                echo_client_page_debug_check(eas->eas_lsm, eap->eap_page, 
+                                             eas->eas_oa.o_id, eap->eap_off,
+                                             PAGE_SIZE);
+
         spin_lock_irqsave(&eas->eas_lock, flags);
         if (rc && !eas->eas_rc)
                 eas->eas_rc = rc;
@@ -706,11 +752,12 @@ static int echo_client_async_page(struct obd_export *exp, int rw,
 #if 0
         int                     verify;
         int                     gfp_mask;
-        /* oa_id  == 0    => speed test (no verification) else...
-         * oa & 1         => use HIGHMEM
-         */
-        verify = (oa->o_id != 0);
-        gfp_mask = ((oa->o_id & 1) == 0) ? GFP_KERNEL : GFP_HIGHUSER;
+
+        verify = ((oa->o_id) != ECHO_PERSISTENT_OBJID &&
+                  (oa->o_valid & OBD_MD_FLFLAGS) != 0 &&
+                  (oa->o_flags & OBD_FL_DEBUG_CHECK) != 0);
+
+        gfp_mask = ((oa->o_id & 2) == 0) ? GFP_KERNEL : GFP_HIGHUSER;
 #endif
 
         LASSERT(rw == OBD_BRW_WRITE || rw == OBD_BRW_READ);
@@ -731,6 +778,7 @@ static int echo_client_async_page(struct obd_export *exp, int rw,
         init_waitqueue_head(&eas.eas_waitq);
         eas.eas_in_flight = 0;
         eas.eas_rc = 0;
+        eas.eas_lsm = lsm;
         INIT_LIST_HEAD(&eas.eas_avail);
 
         /* prepare the group of pages that we're going to be keeping
@@ -740,7 +788,8 @@ static int echo_client_async_page(struct obd_export *exp, int rw,
                 if (page == NULL)
                         GOTO(out, rc = -ENOMEM);
 
-                list_add_tail(&page->list, &pages);
+                page->private = 0;
+                list_add_tail(&PAGE_LIST(page), &pages);
 
                 OBD_ALLOC(eap, sizeof(*eap));
                 if (eap == NULL)
@@ -749,7 +798,7 @@ static int echo_client_async_page(struct obd_export *exp, int rw,
                 eap->eap_magic = EAP_MAGIC;
                 eap->eap_page = page;
                 eap->eap_eas = &eas;
-                eap->eap_cookie = ERR_PTR(-ENOENT);
+                page->private = (unsigned long)eap;
                 list_add_tail(&eap->eap_item, &eas.eas_avail);
         }
 
@@ -775,17 +824,17 @@ static int echo_client_async_page(struct obd_export *exp, int rw,
                 spin_unlock_irqrestore(&eas.eas_lock, flags);
 
                 /* unbind the eap from its old page offset */
-                if (!IS_ERR(eap->eap_cookie)) {
+                if (eap->eap_cookie != NULL) {
                         obd_teardown_async_page(exp, lsm, NULL, 
                                                 eap->eap_cookie);
-                        eap->eap_cookie = ERR_PTR(-ENOENT);
+                        eap->eap_cookie = NULL;
                 }
 
                 eas.eas_next_offset += PAGE_SIZE;
                 eap->eap_off = eas.eas_next_offset;
 
-                rc = obd_prep_async_page(exp, lsm, NULL, eap->eap_page, 
-                                         eap->eap_off, &ec_async_page_ops, 
+                rc = obd_prep_async_page(exp, lsm, NULL, eap->eap_page,
+                                         eap->eap_off, &ec_async_page_ops,
                                          eap, &eap->eap_cookie);
                 if (rc) {
                         spin_lock_irqsave(&eas.eas_lock, flags);
@@ -793,9 +842,16 @@ static int echo_client_async_page(struct obd_export *exp, int rw,
                         break;
                 }
 
+                if (oa->o_id != ECHO_PERSISTENT_OBJID &&
+                    (oa->o_valid & OBD_MD_FLFLAGS) != 0 &&
+                    (oa->o_flags & OBD_FL_DEBUG_CHECK) != 0)
+                        echo_client_page_debug_setup(lsm, eap->eap_page, rw, 
+                                                     oa->o_id, 
+                                                     eap->eap_off, PAGE_SIZE);
+
                 /* always asserts urgent, which isn't quite right */
-                rc = obd_queue_async_io(exp, lsm, NULL, eap->eap_cookie, 
-                                        rw, 0, PAGE_SIZE, 0, 
+                rc = obd_queue_async_io(exp, lsm, NULL, eap->eap_cookie,
+                                        rw, 0, PAGE_SIZE, 0,
                                         ASYNC_READY | ASYNC_URGENT |
                                         ASYNC_COUNT_STABLE);
                 spin_lock_irqsave(&eas.eas_lock, flags);
@@ -821,12 +877,13 @@ static int echo_client_async_page(struct obd_export *exp, int rw,
 
 out:
         list_for_each_safe(pos, n, &pages) {
-                struct page *page = list_entry(pos, struct page, list);
+                struct page *page = list_entry(pos, struct page, 
+                                               PAGE_LIST_ENTRY);
 
-                list_del(&page->list);
-                if (page->private) {
+                list_del(&PAGE_LIST(page));
+                if (page->private != 0) {
                         eap = (struct echo_async_page *)page->private;
-                        if (!IS_ERR(eap->eap_cookie))
+                        if (eap->eap_cookie != NULL)
                                 obd_teardown_async_page(exp, lsm, NULL, 
                                                         eap->eap_cookie);
                         OBD_FREE(eap, sizeof(*eap));
@@ -847,7 +904,7 @@ static int echo_client_prep_commit(struct obd_export *exp, int rw,
         struct niobuf_remote *rnb;
         obd_off off;
         obd_size npages, tot_pages;
-        int i, ret = 0, err = 0;
+        int i, ret = 0;
         ENTRY;
 
         if (count <= 0 || (count & (PAGE_SIZE - 1)) != 0 ||
@@ -864,7 +921,6 @@ static int echo_client_prep_commit(struct obd_export *exp, int rw,
                 GOTO(out, ret = -ENOMEM);
 
         obdo_to_ioobj(oa, &ioo);
-        ioo.ioo_bufcnt = npages;
 
         off = offset;
 
@@ -879,6 +935,7 @@ static int echo_client_prep_commit(struct obd_export *exp, int rw,
 
                 /* XXX this can't be the best.. */
                 memset(oti, 0, sizeof(*oti));
+                ioo.ioo_bufcnt = npages;
 
                 ret = obd_preprw(rw, exp, oa, 1, &ioo, npages, rnb, lnb, oti);
                 if (ret != 0)
@@ -886,30 +943,31 @@ static int echo_client_prep_commit(struct obd_export *exp, int rw,
 
                 for (i = 0; i < npages; i++) {
                         struct page *page = lnb[i].page;
-                        void *addr;
 
                         /* read past eof? */
-                        if (page == NULL && lnb[i].rc == 0) 
+                        if (page == NULL && lnb[i].rc == 0)
                                 continue;
 
-                        addr = kmap(lnb[i].page);
-
-                        if (rw == OBD_BRW_WRITE) 
-                                page_debug_setup(addr, PAGE_SIZE,
-                                                 rnb[i].offset, oa->o_id);
-                        else 
-                                err = page_debug_check("prep_commit", addr, 
-                                                 PAGE_SIZE, rnb[i].offset,
-                                                 oa->o_id);
+                        if (oa->o_id == ECHO_PERSISTENT_OBJID ||
+                            (oa->o_valid & OBD_MD_FLFLAGS) == 0 ||
+                            (oa->o_flags & OBD_FL_DEBUG_CHECK) == 0)
+                                continue;
 
-                        kunmap(lnb[i].page);
+                        if (rw == OBD_BRW_WRITE)
+                                echo_client_page_debug_setup(lsm, page, rw,
+                                                             oa->o_id,
+                                                             rnb[i].offset,
+                                                             rnb[i].len);
+                        else
+                                echo_client_page_debug_check(lsm, page,
+                                                             oa->o_id,
+                                                             rnb[i].offset,
+                                                             rnb[i].len);
                 }
 
-                ret = obd_commitrw(rw, exp, oa, 1, &ioo, npages, lnb, oti);
+                ret = obd_commitrw(rw, exp, oa, 1, &ioo, npages, lnb, oti, ret);
                 if (ret != 0)
                         GOTO(out, ret);
-                if (err)
-                        GOTO(out, ret = err);
         }
 
 out:
@@ -920,7 +978,7 @@ out:
         RETURN(ret);
 }
 
-int echo_client_brw_ioctl(int rw, struct obd_export *exp, 
+int echo_client_brw_ioctl(int rw, struct obd_export *exp,
                           struct obd_ioctl_data *data)
 {
         struct obd_device *obd = class_exp2obd(exp);
@@ -1048,14 +1106,15 @@ echo_client_enqueue(struct obd_export *exp, struct obdo *oa,
 
         ecl->ecl_mode = mode;
         ecl->ecl_object = eco;
-        ecl->ecl_extent.start = offset;
-        ecl->ecl_extent.end = (nob == 0) ? ((obd_off) -1) : (offset + nob - 1);
+        ecl->ecl_policy.l_extent.start = offset;
+        ecl->ecl_policy.l_extent.end =
+                (nob == 0) ? ((obd_off) -1) : (offset + nob - 1);
 
         flags = 0;
-        rc = obd_enqueue(ec->ec_exp, eco->eco_lsm, NULL, LDLM_EXTENT,
-                         &ecl->ecl_extent,sizeof(ecl->ecl_extent), mode,
-                         &flags, echo_ldlm_callback, eco,
-                         &ecl->ecl_lock_handle);
+        rc = obd_enqueue(ec->ec_exp, eco->eco_lsm, LDLM_EXTENT,
+                         &ecl->ecl_policy, mode, &flags, echo_ldlm_callback,
+                         ldlm_completion_ast, NULL, eco, sizeof(struct ost_lvb),
+                         lustre_swab_ost_lvb, &ecl->ecl_lock_handle);
         if (rc != 0)
                 goto failed_1;
 
@@ -1134,6 +1193,8 @@ echo_client_iocontrol(unsigned int cmd, struct obd_export *exp,
         int                     i;
         ENTRY;
 
+        unlock_kernel();
+
         memset(&dummy_oti, 0, sizeof(dummy_oti));
 
         obd = exp->exp_obd;
@@ -1252,6 +1313,8 @@ echo_client_iocontrol(unsigned int cmd, struct obd_export *exp,
                 ldlm_lock_decref(&ack_lock->lock, ack_lock->mode);
         }
 
+        lock_kernel();
+
         return rc;
 }
 
@@ -1266,15 +1329,15 @@ echo_client_setup(struct obd_device *obddev, obd_count len, void *buf)
         int rc;
         ENTRY;
 
-        if (lcfg->lcfg_inllen1 < 1) {
+        if (lcfg->lcfg_bufcount < 2 || LUSTRE_CFG_BUFLEN(lcfg, 1) < 1) {
                 CERROR("requires a TARGET OBD name\n");
                 RETURN(-EINVAL);
         }
 
-        tgt = class_name2obd(lcfg->lcfg_inlbuf1);
+        tgt = class_name2obd(lustre_cfg_string(lcfg, 1));
         if (!tgt || !tgt->obd_attached || !tgt->obd_set_up) {
                 CERROR("device not attached or not set up (%s)\n",
-                       lcfg->lcfg_inlbuf1);
+                       lustre_cfg_string(lcfg, 1));
                 RETURN(-EINVAL);
         }
 
@@ -1282,9 +1345,10 @@ echo_client_setup(struct obd_device *obddev, obd_count len, void *buf)
         INIT_LIST_HEAD (&ec->ec_objects);
         ec->ec_unique = 0;
 
-        rc = obd_connect(&conn, tgt, &echo_uuid);
+        rc = obd_connect(&conn, tgt, &echo_uuid, NULL /* obd_connect_data */);
         if (rc) {
-                CERROR("fail to connect to device %s\n", lcfg->lcfg_inlbuf1);
+                CERROR("fail to connect to device %s\n",
+                       lustre_cfg_string(lcfg, 1));
                 return (rc);
         }
         ec->ec_exp = class_conn2export(&conn);
@@ -1292,7 +1356,7 @@ echo_client_setup(struct obd_device *obddev, obd_count len, void *buf)
         RETURN(rc);
 }
 
-static int echo_client_cleanup(struct obd_device *obddev, int flags)
+static int echo_client_cleanup(struct obd_device *obddev)
 {
         struct list_head       *el;
         struct ec_object       *eco;
@@ -1316,7 +1380,7 @@ static int echo_client_cleanup(struct obd_device *obddev, int flags)
                 echo_put_object(eco);
         }
 
-        rc = obd_disconnect(ec->ec_exp, 0);
+        rc = obd_disconnect(ec->ec_exp);
         if (rc != 0)
                 CERROR("fail to disconnect device: %d\n", rc);
 
@@ -1324,7 +1388,8 @@ static int echo_client_cleanup(struct obd_device *obddev, int flags)
 }
 
 static int echo_client_connect(struct lustre_handle *conn,
-                               struct obd_device *src, struct obd_uuid *cluuid)
+                               struct obd_device *src, struct obd_uuid *cluuid,
+                               struct obd_connect_data *data)
 {
         struct obd_export *exp;
         int                rc;
@@ -1339,7 +1404,7 @@ static int echo_client_connect(struct lustre_handle *conn,
         RETURN (rc);
 }
 
-static int echo_client_disconnect(struct obd_export *exp, int flags)
+static int echo_client_disconnect(struct obd_export *exp)
 {
         struct obd_device      *obd;
         struct echo_client_obd *ec;
@@ -1369,19 +1434,19 @@ static int echo_client_disconnect(struct obd_export *exp, int flags)
                 OBD_FREE (ecl, sizeof (*ecl));
         }
 
-        rc = class_disconnect(exp, 0);
+        rc = class_disconnect(exp);
         GOTO(out, rc);
  out:
         return rc;
 }
 
 static struct obd_ops echo_obd_ops = {
-        o_owner:       THIS_MODULE,
-        o_setup:       echo_client_setup,
-        o_cleanup:     echo_client_cleanup,
-        o_iocontrol:   echo_client_iocontrol,
-        o_connect:     echo_client_connect,
-        o_disconnect:  echo_client_disconnect
+        .o_owner       = THIS_MODULE,
+        .o_setup       = echo_client_setup,
+        .o_cleanup     = echo_client_cleanup,
+        .o_iocontrol   = echo_client_iocontrol,
+        .o_connect     = echo_client_connect,
+        .o_disconnect  = echo_client_disconnect
 };
 
 int echo_client_init(void)