Whamcloud - gitweb
merge b_devel into HEAD. Includes:
[fs/lustre-release.git] / lustre / osc / osc_request.c
index ea205a6..515aa70 100644 (file)
@@ -523,6 +523,8 @@ static void osc_ptl_ev_hdlr(struct ptlrpc_bulk_desc *desc)
         LASSERT(desc->bd_brw_set != NULL);
         LASSERT(desc->bd_brw_set->brw_callback != NULL);
 
+        /* It's important that you don't use desc->bd_brw_set after this
+         * callback runs.  If you do, take a reference on it. */
         desc->bd_brw_set->brw_callback(desc->bd_brw_set, CB_PHASE_FINISH);
 
         /* We can't kunmap the desc from interrupt context, so we do it from
@@ -547,7 +549,17 @@ static void osc_ptl_ev_abort(struct ptlrpc_bulk_desc *desc)
 
         LASSERT(desc->bd_brw_set != NULL);
 
-        ptlrpc_abort_bulk(desc);
+        /* XXX reconcile this with ll_sync_brw_timeout() handling, and/or
+         *     just make osc_ptl_ev_hdlr() check desc->bd_flags for either
+         *     PTL_BULK_FL_RCVD or PTL_BULK_FL_SENT, and pass CB_PHASE_ABORT
+         *     to brw_callback() and do the rest of the cleanup there.  I
+         *     also think ll_sync_brw_timeout() is missing an PtlMEUnlink,
+         *     but I could be wrong.
+         */
+        if (ptlrpc_abort_bulk(desc)) {
+                EXIT;
+                return;
+        }
         obd_brw_set_del(desc);
         unmap_and_decref_bulk_desc(desc);
 
@@ -565,7 +577,7 @@ static int osc_brw_read(struct lustre_handle *conn, struct lov_stripe_md *lsm,
         struct ost_body *body;
         int rc, size[3] = {sizeof(*body)}, mapped = 0;
         struct obd_ioobj *iooptr;
-        void *nioptr;
+        struct niobuf_remote *nioptr;
         __u32 xid;
         ENTRY;
 
@@ -589,26 +601,27 @@ restart_bulk:
 
         iooptr = lustre_msg_buf(request->rq_reqmsg, 1);
         nioptr = lustre_msg_buf(request->rq_reqmsg, 2);
-        ost_pack_ioo(&iooptr, lsm, page_count);
+        ost_pack_ioo(iooptr, lsm, page_count);
         /* end almost identical to brw_write case */
 
         xid = ptlrpc_next_xid();       /* single xid for all pages */
 
         obd_kmap_get(page_count, 0);
 
-        for (mapped = 0; mapped < page_count; mapped++) {
+        for (mapped = 0; mapped < page_count; mapped++, nioptr++) {
                 struct ptlrpc_bulk_page *bulk = ptlrpc_prep_bulk_page(desc);
                 if (bulk == NULL) {
                         unmap_and_decref_bulk_desc(desc);
                         GOTO(out_req, rc = -ENOMEM);
                 }
 
-                bulk->bp_xid = xid;           /* single xid for all pages */
+                LASSERT(mapped == 0 || pga[mapped].off > pga[mapped - 1].off);
 
+                bulk->bp_xid = xid;           /* single xid for all pages */
                 bulk->bp_buf = kmap(pga[mapped].pg);
                 bulk->bp_page = pga[mapped].pg;
                 bulk->bp_buflen = PAGE_SIZE;
-                ost_pack_niobuf(&nioptr, pga[mapped].off, pga[mapped].count,
+                ost_pack_niobuf(nioptr, pga[mapped].off, pga[mapped].count,
                                 pga[mapped].flag, bulk->bp_xid);
         }
 
@@ -703,7 +716,7 @@ static int osc_brw_write(struct lustre_handle *conn, struct lov_stripe_md *lsm,
         struct ost_body *body;
         int rc, size[3] = {sizeof(*body)}, mapped = 0;
         struct obd_ioobj *iooptr;
-        void *nioptr;
+        struct niobuf_remote *nioptr;
         __u32 xid;
 #if CHECKSUM_BULK
         __u64 cksum = 0;
@@ -729,26 +742,29 @@ restart_bulk:
 
         iooptr = lustre_msg_buf(request->rq_reqmsg, 1);
         nioptr = lustre_msg_buf(request->rq_reqmsg, 2);
-        ost_pack_ioo(&iooptr, lsm, page_count);
+        ost_pack_ioo(iooptr, lsm, page_count);
         /* end almost identical to brw_read case */
 
         xid = ptlrpc_next_xid();       /* single xid for all pages */
 
         obd_kmap_get(page_count, 0);
 
-        for (mapped = 0; mapped < page_count; mapped++) {
+        for (mapped = 0; mapped < page_count; mapped++, nioptr++) {
                 struct ptlrpc_bulk_page *bulk = ptlrpc_prep_bulk_page(desc);
                 if (bulk == NULL) {
                         unmap_and_decref_bulk_desc(desc);
                         GOTO(out_req, rc = -ENOMEM);
                 }
 
-                bulk->bp_xid = xid;           /* single xid for all pages */
+                LASSERT(mapped == 0 || pga[mapped].off > pga[mapped - 1].off);
 
+                bulk->bp_xid = xid;           /* single xid for all pages */
                 bulk->bp_buf = kmap(pga[mapped].pg);
                 bulk->bp_page = pga[mapped].pg;
+                /* matching ptlrpc_bulk_get assert */
+                LASSERT(pga[mapped].count > 0);
                 bulk->bp_buflen = pga[mapped].count;
-                ost_pack_niobuf(&nioptr, pga[mapped].off, pga[mapped].count,
+                ost_pack_niobuf(nioptr, pga[mapped].off, pga[mapped].count,
                                 pga[mapped].flag, bulk->bp_xid);
                 ost_checksum(&cksum, bulk->bp_buf, bulk->bp_buflen);
         }
@@ -808,6 +824,10 @@ restart_bulk:
 #define OSC_BRW_MAX_SIZE 65536
 #define OSC_BRW_MAX_IOV min_t(int, PTL_MD_MAX_IOV, OSC_BRW_MAX_SIZE/PAGE_SIZE)
 
+#warning "FIXME: make these values dynamic based on a get_info call at setup"
+#define OSC_BRW_MAX_SIZE 65536
+#define OSC_BRW_MAX_IOV min_t(int, PTL_MD_MAX_IOV, OSC_BRW_MAX_SIZE/PAGE_SIZE)
+
 static int osc_brw(int cmd, struct lustre_handle *conn,
                    struct lov_stripe_md *md, obd_count page_count,
                    struct brw_page *pga, struct obd_brw_set *set,
@@ -843,21 +863,20 @@ static int osc_brw(int cmd, struct lustre_handle *conn,
 /* Note: caller will lock/unlock, and set uptodate on the pages */
 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
 static int sanosc_brw_read(struct lustre_handle *conn,
-                           struct lov_stripe_md *md,
+                           struct lov_stripe_md *lsm,
                            obd_count page_count,
                            struct brw_page *pga,
                            struct obd_brw_set *set)
 {
         struct ptlrpc_request *request = NULL;
         struct ost_body *body;
-        struct niobuf_remote *remote, *nio_rep;
-        int rc, j, size[3] = {sizeof(*body)}, mapped = 0;
+        struct niobuf_remote *nioptr;
         struct obd_ioobj *iooptr;
-        void *nioptr;
+        int rc, j, size[3] = {sizeof(*body)}, mapped = 0;
         ENTRY;
 
         size[1] = sizeof(struct obd_ioobj);
-        size[2] = page_count * sizeof(*remote);
+        size[2] = page_count * sizeof(*nioptr);
 
         request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_SAN_READ, 3,
                                   size, NULL);
@@ -867,19 +886,20 @@ static int sanosc_brw_read(struct lustre_handle *conn,
         body = lustre_msg_buf(request->rq_reqmsg, 0);
         iooptr = lustre_msg_buf(request->rq_reqmsg, 1);
         nioptr = lustre_msg_buf(request->rq_reqmsg, 2);
-        ost_pack_ioo(&iooptr, md, page_count);
+        ost_pack_ioo(iooptr, lsm, page_count);
 
         obd_kmap_get(page_count, 0);
 
-        for (mapped = 0; mapped < page_count; mapped++) {
+        for (mapped = 0; mapped < page_count; mapped++, nioptr++) {
                 LASSERT(PageLocked(pga[mapped].pg));
+                LASSERT(mapped == 0 || pga[mapped].off > pga[mapped - 1].off);
 
                 kmap(pga[mapped].pg);
-                ost_pack_niobuf(&nioptr, pga[mapped].off, pga[mapped].count,
+                ost_pack_niobuf(nioptr, pga[mapped].off, pga[mapped].count,
                                 pga[mapped].flag, 0);
         }
 
-        size[1] = page_count * sizeof(*remote);
+        size[1] = page_count * sizeof(*nioptr);
         request->rq_replen = lustre_msg_size(2, size);
 
         rc = ptlrpc_queue_wait(request);
@@ -896,25 +916,19 @@ static int sanosc_brw_read(struct lustre_handle *conn,
                 GOTO(out_unmap, rc = -EINVAL);
         }
 
-        for (j = 0; j < page_count; j++) {
-                ost_unpack_niobuf(&nioptr, &remote);
-        }
-
-        nioptr = lustre_msg_buf(request->rq_repmsg, 1);
-        nio_rep = (struct niobuf_remote*)nioptr;
-
         /* actual read */
-        for (j = 0; j < page_count; j++) {
+        for (j = 0; j < page_count; j++, nioptr++) {
                 struct page *page = pga[j].pg;
                 struct buffer_head *bh;
                 kdev_t dev;
 
+                ost_unpack_niobuf(nioptr, nioptr);
                 /* got san device associated */
                 LASSERT(class_conn2obd(conn));
                 dev = class_conn2obd(conn)->u.cli.cl_sandev;
 
                 /* hole */
-                if (!nio_rep[j].offset) {
+                if (!nioptr->offset) {
                         CDEBUG(D_PAGE, "hole at ino %lu; index %ld\n",
                                         page->mapping->host->i_ino,
                                         page->index);
@@ -928,7 +942,7 @@ static int sanosc_brw_read(struct lustre_handle *conn,
 
                         clear_bit(BH_New, &bh->b_state);
                         set_bit(BH_Mapped, &bh->b_state);
-                        bh->b_blocknr = (unsigned long)nio_rep[j].offset;
+                        bh->b_blocknr = (unsigned long)nioptr->offset;
 
                         clear_bit(BH_Uptodate, &bh->b_state);
 
@@ -940,8 +954,7 @@ static int sanosc_brw_read(struct lustre_handle *conn,
                          * one we mapped before, check it */
                         LASSERT(!test_bit(BH_New, &bh->b_state));
                         LASSERT(test_bit(BH_Mapped, &bh->b_state));
-                        LASSERT(bh->b_blocknr ==
-                                (unsigned long)nio_rep[j].offset);
+                        LASSERT(bh->b_blocknr == (unsigned long)nioptr->offset);
 
                         /* wait it's io completion */
                         if (test_bit(BH_Lock, &bh->b_state))
@@ -976,21 +989,20 @@ out_unmap:
 }
 
 static int sanosc_brw_write(struct lustre_handle *conn,
-                            struct lov_stripe_md *md,
+                            struct lov_stripe_md *lsm,
                             obd_count page_count,
                             struct brw_page *pga,
                             struct obd_brw_set *set)
 {
         struct ptlrpc_request *request = NULL;
         struct ost_body *body;
-        struct niobuf_remote *remote, *nio_rep;
-        int rc, j, size[3] = {sizeof(*body)}, mapped = 0;
+        struct niobuf_remote *nioptr;
         struct obd_ioobj *iooptr;
-        void *nioptr;
+        int rc, j, size[3] = {sizeof(*body)}, mapped = 0;
         ENTRY;
 
         size[1] = sizeof(struct obd_ioobj);
-        size[2] = page_count * sizeof(*remote);
+        size[2] = page_count * sizeof(*nioptr);
 
         request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_SAN_WRITE,
                                   3, size, NULL);
@@ -1000,19 +1012,20 @@ static int sanosc_brw_write(struct lustre_handle *conn,
         body = lustre_msg_buf(request->rq_reqmsg, 0);
         iooptr = lustre_msg_buf(request->rq_reqmsg, 1);
         nioptr = lustre_msg_buf(request->rq_reqmsg, 2);
-        ost_pack_ioo(&iooptr, md, page_count);
+        ost_pack_ioo(iooptr, lsm, page_count);
 
         /* map pages, and pack request */
         obd_kmap_get(page_count, 0);
-        for (mapped = 0; mapped < page_count; mapped++) {
+        for (mapped = 0; mapped < page_count; mapped++, nioptr++) {
                 LASSERT(PageLocked(pga[mapped].pg));
+                LASSERT(mapped == 0 || pga[mapped].off > pga[mapped - 1].off);
 
                 kmap(pga[mapped].pg);
-                ost_pack_niobuf(&nioptr, pga[mapped].off, pga[mapped].count,
+                ost_pack_niobuf(nioptr, pga[mapped].off, pga[mapped].count,
                                 pga[mapped].flag, 0);
         }
 
-        size[1] = page_count * sizeof(*remote);
+        size[1] = page_count * sizeof(*nioptr);
         request->rq_replen = lustre_msg_size(2, size);
 
         rc = ptlrpc_queue_wait(request);
@@ -1029,19 +1042,13 @@ static int sanosc_brw_write(struct lustre_handle *conn,
                 GOTO(out_unmap, rc = -EINVAL);
         }
 
-        for (j = 0; j < page_count; j++) {
-                ost_unpack_niobuf(&nioptr, &remote);
-        }
-
-        nioptr = lustre_msg_buf(request->rq_repmsg, 1);
-        nio_rep = (struct niobuf_remote*)nioptr;
-
         /* actual write */
-        for (j = 0; j < page_count; j++) {
+        for (j = 0; j < page_count; j++, nioptr++) {
                 struct page *page = pga[j].pg;
                 struct buffer_head *bh;
                 kdev_t dev;
 
+                ost_unpack_niobuf(nioptr, nioptr);
                 /* got san device associated */
                 LASSERT(class_conn2obd(conn));
                 dev = class_conn2obd(conn)->u.cli.cl_sandev;
@@ -1053,7 +1060,7 @@ static int sanosc_brw_write(struct lustre_handle *conn,
                         LASSERT(!test_bit(BH_New, &page->buffers->b_state));
                         LASSERT(test_bit(BH_Mapped, &page->buffers->b_state));
                         LASSERT(page->buffers->b_blocknr ==
-                                (unsigned long)nio_rep[j].offset);
+                                (unsigned long)nioptr->offset);
                 }
                 bh = page->buffers;
 
@@ -1067,7 +1074,7 @@ static int sanosc_brw_write(struct lustre_handle *conn,
                 set_bit(BH_Mapped, &bh->b_state);
 
                 /* override the block nr */
-                bh->b_blocknr = (unsigned long)nio_rep[j].offset;
+                bh->b_blocknr = (unsigned long)nioptr->offset;
 
                 /* we are about to write it, so set it
                  * uptodate/dirty
@@ -1099,30 +1106,9 @@ out_unmap:
 
         goto out_req;
 }
-#else
-static int sanosc_brw_read(struct lustre_handle *conn,
-                           struct lov_stripe_md *md,
-                           obd_count page_count,
-                           struct brw_page *pga,
-                           struct obd_brw_set *set)
-{
-        LBUG();
-        return 0;
-}
-
-static int sanosc_brw_write(struct lustre_handle *conn,
-                            struct lov_stripe_md *md,
-                            obd_count page_count,
-                            struct brw_page *pga,
-                            struct obd_brw_set *set)
-{
-        LBUG();
-        return 0;
-}
-#endif
 
 static int sanosc_brw(int cmd, struct lustre_handle *conn,
-                      struct lov_stripe_md *md, obd_count page_count,
+                      struct lov_stripe_md *lsm, obd_count page_count,
                       struct brw_page *pga, struct obd_brw_set *set,
                       struct obd_trans_info *oti)
 {
@@ -1138,10 +1124,10 @@ static int sanosc_brw(int cmd, struct lustre_handle *conn,
                         pages_per_brw = page_count;
 
                 if (cmd & OBD_BRW_WRITE)
-                        rc = sanosc_brw_write(conn, md, pages_per_brw,
+                        rc = sanosc_brw_write(conn, lsm, pages_per_brw,
                                               pga, set);
                 else
-                        rc = sanosc_brw_read(conn, md, pages_per_brw, pga, set);
+                        rc = sanosc_brw_read(conn, lsm, pages_per_brw, pga,set);
 
                 if (rc != 0)
                         RETURN(rc);
@@ -1152,6 +1138,7 @@ static int sanosc_brw(int cmd, struct lustre_handle *conn,
         RETURN(0);
 }
 #endif
+#endif
 
 static int osc_enqueue(struct lustre_handle *connh, struct lov_stripe_md *lsm,
                        struct lustre_handle *parent_lock,
@@ -1178,7 +1165,7 @@ static int osc_enqueue(struct lustre_handle *connh, struct lov_stripe_md *lsm,
                              sizeof(extent), mode, lockh);
         if (rc == 1)
                 /* We already have a lock, and it's referenced */
-                RETURN(ELDLM_OK);
+                RETURN(ELDLM_LOCK_MATCHED);
 
         /* If we're trying to read, we also search for an existing PW lock.  The
          * VFS and page cache already protect us locally, so lots of readers/
@@ -1202,7 +1189,7 @@ static int osc_enqueue(struct lustre_handle *connh, struct lov_stripe_md *lsm,
                         ldlm_lock_addref(lockh, LCK_PR);
                         ldlm_lock_decref(lockh, LCK_PW);
 
-                        RETURN(ELDLM_OK);
+                        RETURN(ELDLM_LOCK_MATCHED);
                 }
         }