Whamcloud - gitweb
- Use refcounting to control lifetime of bulk descriptors.
authorshaver <shaver>
Sat, 13 Jul 2002 23:38:45 +0000 (23:38 +0000)
committershaver <shaver>
Sat, 13 Jul 2002 23:38:45 +0000 (23:38 +0000)
- Add some LDLM timeout #warnings for future reference.
- Make all bulk-desc cleanup happen asynchronously, for cleaner interrupt (and
  later timeout) handling.
- Have OSC wait for brw_write reply before beginning bulk operations, for
  symmetry with brw_read and general niceness.
- Fix reply-for-freed-request check to use proper XID width.
- Use l_killable_pending in ptlrpc_check_bulk_sent, for consistency. (Also in the
  new ptlrpc_check_bulk_received sibling.)

lustre/doc/.cvsignore
lustre/include/linux/lustre_net.h
lustre/ldlm/ldlm_request.c
lustre/osc/osc_request.c
lustre/ptlrpc/client.c
lustre/ptlrpc/events.c
lustre/ptlrpc/niobuf.c
lustre/ptlrpc/rpc.c

index 249de7d..43841ad 100644 (file)
@@ -12,3 +12,4 @@ OBD-HOWTO.txt
 lustre-HOWTO.txt
 *.eps
 master.pdf
+lustre.lyx
index dc07883..65dd815 100644 (file)
@@ -162,6 +162,7 @@ struct ptlrpc_bulk_desc {
         struct list_head b_page_list;
         __u32 b_page_count;
         atomic_t b_pages_remaining;
+        atomic_t b_refcount;
         void *b_desc_private;
 };
 
@@ -226,6 +227,7 @@ void ptlrpc_cleanup_connection(void);
 
 /* rpc/niobuf.c */
 int ptlrpc_check_bulk_sent(struct ptlrpc_bulk_desc *bulk);
+int ptlrpc_check_bulk_received(struct ptlrpc_bulk_desc *bulk);
 int ptlrpc_send_bulk(struct ptlrpc_bulk_desc *);
 int ptlrpc_register_bulk(struct ptlrpc_bulk_desc *);
 int ptlrpc_abort_bulk(struct ptlrpc_bulk_desc *bulk);
@@ -282,4 +284,16 @@ int lustre_pack_msg(int count, int *lens, char **bufs, int *len,
 int lustre_msg_size(int count, int *lengths);
 int lustre_unpack_msg(struct lustre_msg *m, int len);
 void *lustre_msg_buf(struct lustre_msg *m, int n);
+
+static inline void ptlrpc_bulk_decref(struct ptlrpc_bulk_desc *desc)
+{
+        if (atomic_dec_and_test(&desc->b_refcount)) {
+                CDEBUG(D_PAGE, "Released last ref on %p, freeing\n", desc);
+                ptlrpc_free_bulk(desc);
+        } else {
+                CDEBUG(D_PAGE, "%p -> %d\n", desc,
+                       atomic_read(&desc->b_refcount));
+        }
+}
+
 #endif
index f632587..eddf484 100644 (file)
@@ -134,6 +134,7 @@ int ldlm_cli_enqueue(struct ptlrpc_client *cl, struct ptlrpc_connection *conn,
                 LDLM_DEBUG(lock, "client-side enqueue returned a blocked lock,"
                            " sleeping");
                 ldlm_lock_dump(lock);
+#warning ldlm needs to time out
                 wait_event(lock->l_waitq,
                            lock->l_req_mode == lock->l_granted_mode);
                 LDLM_DEBUG(lock, "client-side enqueue waking up: granted");
index c8a7e39..70d40c2 100644 (file)
@@ -1,7 +1,7 @@
 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
  * vim:expandtab:shiftwidth=8:tabstop=8:
  *
- * Copryright (C) 2001, 2002 Cluster File Systems, Inc.
+ * Copyright (C) 2001, 2002 Cluster File Systems, Inc.
  *
  *  This code is issued under the GNU General Public License.
  *  See the file COPYING in this distribution
@@ -395,24 +395,32 @@ static int osc_destroy(struct lustre_handle *conn, struct obdo *oa)
 
 struct osc_brw_cb_data {
         struct page **buf;
-        struct ptlrpc_request *req;
         bulk_callback_t callback;
         void *cb_data;
+        void *obd_data;
+        size_t obd_size;
 };
 
-static void brw_read_finish(struct ptlrpc_bulk_desc *desc, void *data)
+static void brw_finish(struct ptlrpc_bulk_desc *desc, void *data)
 {
         struct osc_brw_cb_data *cb_data = data;
+        int i;
+        ENTRY;
 
         if (desc->b_flags & PTL_RPC_FL_INTR)
                 CERROR("got signal\n");
 
-        (cb_data->callback)(desc, cb_data->cb_data);
+        for (i = 0; i < desc->b_page_count; i++)
+                kunmap(cb_data->buf[i]);
 
-        ptlrpc_free_bulk(desc);
-        ptlrpc_free_req(cb_data->req);
+        if (cb_data->callback)
+                (cb_data->callback)(desc, cb_data->cb_data);
 
+        ptlrpc_bulk_decref(desc);
+        if (cb_data->obd_data)
+                OBD_FREE(cb_data->obd_data, cb_data->obd_size);
         OBD_FREE(cb_data, sizeof(*cb_data));
+        EXIT;
 }
 
 static int osc_brw_read(struct lustre_handle *conn, obd_count num_oa,
@@ -422,15 +430,17 @@ static int osc_brw_read(struct lustre_handle *conn, obd_count num_oa,
 {
         struct ptlrpc_client *cl;
         struct ptlrpc_connection *connection;
+        struct ptlrpc_request *request = NULL;
+        struct ptlrpc_bulk_desc *desc = NULL;
         struct osc_obd *osc = &class_conn2obd(conn)->u.osc;
-        struct ptlrpc_request *request;
         struct ost_body *body;
-        struct list_head *tmp;
-        int pages, rc, i, j, size[3] = {sizeof(*body)};
-        void *ptr1, *ptr2;
-        struct ptlrpc_bulk_desc *desc;
+        struct osc_brw_cb_data *cb_data = NULL;
+        long pages;
+        int rc, i, j, size[3] = {sizeof(*body)};
+        void *iooptr, *nioptr;
         ENTRY;
 
+        /* XXX almost identical to brw_write case */
         size[1] = num_oa * sizeof(struct obd_ioobj);
         pages = 0;
         for (i = 0; i < num_oa; i++)
@@ -439,28 +449,36 @@ static int osc_brw_read(struct lustre_handle *conn, obd_count num_oa,
 
         osc_con2cl(conn, &cl, &connection);
         request = ptlrpc_prep_req2(cl, connection, &osc->osc_connh, 
-                                  OST_BRW, 3, size, NULL);
+                                   OST_BRW, 3, size, NULL);
         if (!request)
-                GOTO(out, rc = -ENOMEM);
+                RETURN(-ENOMEM);
 
         body = lustre_msg_buf(request->rq_reqmsg, 0);
         body->data = OBD_BRW_READ;
 
         desc = ptlrpc_prep_bulk(connection);
         if (!desc)
-                GOTO(out2, rc = -ENOMEM);
+                GOTO(out_free, rc = -ENOMEM);
         desc->b_portal = OST_BULK_PORTAL;
-
-        ptr1 = lustre_msg_buf(request->rq_reqmsg, 1);
-        ptr2 = lustre_msg_buf(request->rq_reqmsg, 2);
+        desc->b_cb = brw_finish;
+        OBD_ALLOC(cb_data, sizeof(*cb_data));
+        if (!cb_data)
+                GOTO(out_free, rc = -ENOMEM);
+        cb_data->buf = NULL;
+        cb_data->callback = callback;
+        desc->b_cb_data = cb_data;
+        /* XXX end almost identical to brw_write case */
+
+        iooptr = lustre_msg_buf(request->rq_reqmsg, 1);
+        nioptr = lustre_msg_buf(request->rq_reqmsg, 2);
         for (pages = 0, i = 0; i < num_oa; i++) {
-                ost_pack_ioo(&ptr1, oa[i], oa_bufs[i]);
+                ost_pack_ioo(&iooptr, oa[i], oa_bufs[i]);
                 /* FIXME: this inner loop is wrong for multiple OAs */
                 for (j = 0; j < oa_bufs[i]; j++, pages++) {
                         struct ptlrpc_bulk_page *bulk;
                         bulk = ptlrpc_prep_bulk_page(desc);
                         if (bulk == NULL)
-                                GOTO(out3, rc = -ENOMEM);
+                                GOTO(out_unmap, rc = -ENOMEM);
 
                         spin_lock(&connection->c_lock);
                         bulk->b_xid = ++connection->c_xid_out;
@@ -469,55 +487,52 @@ static int osc_brw_read(struct lustre_handle *conn, obd_count num_oa,
                         bulk->b_buf = kmap(buf[pages]);
                         bulk->b_page = buf[pages];
                         bulk->b_buflen = PAGE_SIZE;
-                        ost_pack_niobuf(&ptr2, offset[pages], count[pages],
+                        ost_pack_niobuf(&nioptr, offset[pages], count[pages],
                                         flags[pages], bulk->b_xid);
                 }
         }
 
+        /* 
+         * Register the bulk first, because the reply could arrive out of order,
+         * and we want to be ready for the bulk data.
+         *
+         * One reference is released by the bulk callback, the other when
+         * we finish sleeping on it (if we don't have a callback).
+         */
+        atomic_set(&desc->b_refcount, callback ? 1 : 2);
         rc = ptlrpc_register_bulk(desc);
         if (rc)
-                GOTO(out3, rc);
+                GOTO(out_unmap, rc);
 
         request->rq_replen = lustre_msg_size(1, size);
         rc = ptlrpc_queue_wait(request);
         rc = ptlrpc_check_status(request, rc);
-        if (rc)
-                ptlrpc_abort_bulk(desc);
-        GOTO(out3, rc);
-
- out3:
-        list_for_each(tmp, &desc->b_page_list) {
-                struct ptlrpc_bulk_page *bulk;
-                bulk = list_entry(tmp, struct ptlrpc_bulk_page, b_link);
-                if (bulk->b_page != NULL)
-                        kunmap(bulk->b_page);
+        if (rc) {
+                ptlrpc_bulk_decref(desc);
+                GOTO(out_unmap, rc);
         }
-        ptlrpc_free_bulk(desc);
- out2:
-        ptlrpc_free_req(request);
- out:
-        return rc;
-}
 
-static void brw_write_finish(struct ptlrpc_bulk_desc *desc, void *data)
-{
-        struct osc_brw_cb_data *cb_data = data;
-        int i;
-        ENTRY;
+        /* Callbacks cause asynchronous handling. */
+        if (callback)
+                RETURN(0);
 
+        l_wait_event_killable(desc->b_waitq, ptlrpc_check_bulk_received(desc));
+        ptlrpc_bulk_decref(desc);
         if (desc->b_flags & PTL_RPC_FL_INTR)
-                CERROR("got signal\n");
-
-        for (i = 0; i < desc->b_page_count; i++)
-                kunmap(cb_data->buf[i]);
-
-        (cb_data->callback)(desc, cb_data->cb_data);
-
+                RETURN(-EINTR);
+        RETURN(0);
+        
+        /* Clean up on error. */
+ out_unmap:
+        for (pages = 0, i = 0; i < num_oa; i++)
+                for (j = 0; j < oa_bufs[i]; j++, pages++)
+                        kunmap(pagearray[pages]);
+ out_free:
+        if (cb_data)
+                OBD_FREE(cb_data, sizeof(*cb_data));
         ptlrpc_free_bulk(desc);
-        ptlrpc_free_req(cb_data->req);
-
-        OBD_FREE(cb_data, sizeof(*cb_data));
-        EXIT;
+        ptlrpc_free_req(request);
+        return rc;
 }
 
 static int osc_brw_write(struct lustre_handle *conn, obd_count num_oa,
@@ -528,89 +543,94 @@ static int osc_brw_write(struct lustre_handle *conn, obd_count num_oa,
 {
         struct ptlrpc_client *cl;
         struct ptlrpc_connection *connection;
-        struct ptlrpc_request *request;
-        struct ptlrpc_bulk_desc *desc;
+        struct ptlrpc_request *request = NULL;
+        struct ptlrpc_bulk_desc *desc = NULL;
         struct osc_obd *osc = &class_conn2obd(conn)->u.osc;
-        struct obd_ioobj ioo;
         struct ost_body *body;
-        struct niobuf_local *local;
+        struct niobuf_local *local = NULL;
         struct niobuf_remote *remote;
-        struct osc_brw_cb_data *cb_data;
+        struct osc_brw_cb_data *cb_data = NULL;
         long pages;
         int rc, i, j, size[3] = {sizeof(*body)};
-        void *ptr1, *ptr2;
+        void *iooptr, *nioptr;
         ENTRY;
 
-        size[1] = num_oa * sizeof(ioo);
+        /* XXX almost identical to brw_read case */
+        size[1] = num_oa * sizeof(struct obd_ioobj);
         pages = 0;
         for (i = 0; i < num_oa; i++)
                 pages += oa_bufs[i];
-        size[2] = pages * sizeof(*remote);
-
-        OBD_ALLOC(local, pages * sizeof(*local));
-        if (local == NULL)
-                RETURN(-ENOMEM);
+        size[2] = pages * sizeof(struct niobuf_remote);
 
         osc_con2cl(conn, &cl, &connection);
         request = ptlrpc_prep_req2(cl, connection, &osc->osc_connh, 
-                                  OST_BRW, 3, size, NULL);
+                                   OST_BRW, 3, size, NULL);
         if (!request)
-                GOTO(out, rc = -ENOMEM);
+                RETURN(-ENOMEM);
+
         body = lustre_msg_buf(request->rq_reqmsg, 0);
         body->data = OBD_BRW_WRITE;
 
-        ptr1 = lustre_msg_buf(request->rq_reqmsg, 1);
-        ptr2 = lustre_msg_buf(request->rq_reqmsg, 2);
+        OBD_ALLOC(local, pages * sizeof(*local));
+        if (!local)
+                GOTO(out_free, rc = -ENOMEM);
+
+        desc = ptlrpc_prep_bulk(connection);
+        if (!desc)
+                GOTO(out_free, rc = -ENOMEM);
+        desc->b_portal = OSC_BULK_PORTAL;
+        desc->b_cb = brw_finish;
+        OBD_ALLOC(cb_data, sizeof(*cb_data));
+        if (!cb_data)
+                GOTO(out_free, rc = -ENOMEM);
+        cb_data->buf = pagearray;
+        cb_data->callback = callback;
+        desc->b_cb_data = cb_data;
+        /* XXX end almost identical to brw_read case */
+        cb_data->obd_data = local;
+        cb_data->obd_size = pages * sizeof(*local);
+
+        iooptr = lustre_msg_buf(request->rq_reqmsg, 1);
+        nioptr = lustre_msg_buf(request->rq_reqmsg, 2);
         for (pages = 0, i = 0; i < num_oa; i++) {
-                ost_pack_ioo(&ptr1, oa[i], oa_bufs[i]);
+                ost_pack_ioo(&iooptr, oa[i], oa_bufs[i]);
                 for (j = 0; j < oa_bufs[i]; j++, pages++) {
                         local[pages].addr = kmap(pagearray[pages]);
                         local[pages].offset = offset[pages];
                         local[pages].len = count[pages];
-                        ost_pack_niobuf(&ptr2, offset[pages], count[pages],
+                        ost_pack_niobuf(&nioptr, offset[pages], count[pages],
                                         flags[pages], 0);
                 }
         }
 
         size[1] = pages * sizeof(struct niobuf_remote);
         request->rq_replen = lustre_msg_size(2, size);
-
         rc = ptlrpc_queue_wait(request);
         rc = ptlrpc_check_status(request, rc);
         if (rc)
-                GOTO(out2, rc);
+                GOTO(out_unmap, rc);
 
-        ptr2 = lustre_msg_buf(request->rq_repmsg, 1);
-        if (ptr2 == NULL)
-                GOTO(out2, rc = -EINVAL);
+        nioptr = lustre_msg_buf(request->rq_repmsg, 1);
+        if (!nioptr)
+                GOTO(out_unmap, rc = -EINVAL);
 
         if (request->rq_repmsg->buflens[1] !=
             pages * sizeof(struct niobuf_remote)) {
                 CERROR("buffer length wrong (%d vs. %ld)\n",
                        request->rq_repmsg->buflens[1],
                        pages * sizeof(struct niobuf_remote));
-                GOTO(out2, rc = -EINVAL);
-        }
-
-        desc = ptlrpc_prep_bulk(connection);
-        desc->b_portal = OSC_BULK_PORTAL;
-        if (callback) {
-                desc->b_cb = brw_write_finish;
-                OBD_ALLOC(cb_data, sizeof(*cb_data));
-                cb_data->buf = pagearray;
-                cb_data->callback = callback;
-                desc->b_cb_data = cb_data;
+                GOTO(out_unmap, rc = -EINVAL);
         }
 
         for (pages = 0, i = 0; i < num_oa; i++) {
                 for (j = 0; j < oa_bufs[i]; j++, pages++) {
                         struct ptlrpc_bulk_page *page;
 
-                        ost_unpack_niobuf(&ptr2, &remote);
+                        ost_unpack_niobuf(&nioptr, &remote);
 
                         page = ptlrpc_prep_bulk_page(desc);
-                        if (page == NULL)
-                                GOTO(out3, rc = -ENOMEM);
+                        if (!page)
+                                GOTO(out_unmap, rc = -ENOMEM);
 
                         page->b_buf = (void *)(unsigned long)local[pages].addr;
                         page->b_buflen = local[pages].len;
@@ -621,29 +641,42 @@ static int osc_brw_write(struct lustre_handle *conn, obd_count num_oa,
         if (desc->b_page_count != pages)
                 LBUG();
 
+        /*
+         * One is released when the bulk is complete, the other when we finish
+         * waiting on it.  (Callback cases don't sleep, so only one ref for
+         * them.)
+         */
+        atomic_set(&desc->b_refcount, callback ? 1 : 2);
+        CDEBUG(D_PAGE, "Set refcount of %p to %d\n", desc,
+               atomic_read(&desc->b_refcount));
         rc = ptlrpc_send_bulk(desc);
         if (rc)
-                GOTO(out3, rc);
+                GOTO(out_unmap, rc);
+
+        /* Callbacks cause asynchronous handling. */
         if (callback)
-                GOTO(out, rc);
+                RETURN(0);
 
         /* If there's no callback function, sleep here until complete. */
         l_wait_event_killable(desc->b_waitq, ptlrpc_check_bulk_sent(desc));
+        ptlrpc_bulk_decref(desc);
         if (desc->b_flags & PTL_RPC_FL_INTR)
-                rc = -EINTR;
-
-        GOTO(out3, rc);
+                RETURN(-EINTR);
+        RETURN(0);
 
- out3:
-        ptlrpc_free_bulk(desc);
- out2:
-        ptlrpc_free_req(request);
+        /* Clean up on error. */
+ out_unmap:
         for (pages = 0, i = 0; i < num_oa; i++)
                 for (j = 0; j < oa_bufs[i]; j++, pages++)
                         kunmap(pagearray[pages]);
- out:
-        OBD_FREE(local, pages * sizeof(*local));
 
+ out_free:
+        if (cb_data)
+                OBD_FREE(cb_data, sizeof(*cb_data));
+        if (local)
+                OBD_FREE(local, pages * sizeof(*local));
+        ptlrpc_free_bulk(desc);
+        ptlrpc_req_finished(request);
         return rc;
 }
 
index ce69521..2f8c1f4 100644 (file)
@@ -469,7 +469,8 @@ int ptlrpc_queue_wait(struct ptlrpc_request *req)
 
         /* XXX probably both an import and connection level are needed */
         if (req->rq_level > req->rq_connection->c_level) { 
-                CERROR("process %d waiting for recovery\n", current->pid);
+                CERROR("process %d waiting for recovery (%d > %d)\n", 
+                       current->pid, req->rq_level, req->rq_connection->c_level);
                 spin_lock(&cli->cli_lock);
                 list_del_init(&req->rq_list);
                 list_add(&req->rq_list, cli->cli_delayed_head.prev); 
index ca4e566..b819c29 100644 (file)
@@ -73,7 +73,7 @@ static int reply_in_callback(ptl_event_t *ev)
         struct ptlrpc_request *req = ev->mem_desc.user_ptr;
         ENTRY;
 
-        if (req->rq_xid == 0x5a5a5a5a) {
+        if (req->rq_xid == 0x5a5a5a5a5a5a5a5a) {
                 CERROR("Reply received for freed request!  Probably a missing "
                        "ptlrpc_abort()\n");
                 LBUG();
index a72d14e..072592f 100644 (file)
@@ -24,6 +24,7 @@
 
 #include <linux/obd_support.h>
 #include <linux/lustre_net.h>
+#include <linux/lustre_lib.h>
 
 extern ptl_handle_eq_t request_out_eq, reply_in_eq, reply_out_eq,
         bulk_source_eq, bulk_sink_eq;
@@ -36,8 +37,23 @@ int ptlrpc_check_bulk_sent(struct ptlrpc_bulk_desc *bulk)
         if (bulk->b_flags & PTL_BULK_FL_SENT)
                 RETURN(1);
 
-        if (sigismember(&(current->pending.signal), SIGKILL) ||
-            sigismember(&(current->pending.signal), SIGINT)) {
+        if (l_killable_pending(current)) {
+                bulk->b_flags |= PTL_RPC_FL_INTR;
+                RETURN(1);
+        }
+
+        CDEBUG(D_NET, "no event yet\n");
+        RETURN(0);
+}
+
+int ptlrpc_check_bulk_received(struct ptlrpc_bulk_desc *bulk)
+{
+        ENTRY;
+
+        if (bulk->b_flags & PTL_BULK_FL_RCVD)
+                RETURN(1);
+        
+        if (l_killable_pending(current)) {
                 bulk->b_flags |= PTL_RPC_FL_INTR;
                 RETURN(1);
         }
index 578b99e..c8e268a 100644 (file)
@@ -147,6 +147,7 @@ static void __exit ptlrpc_exit(void)
 
 /* events.c */
 EXPORT_SYMBOL(ptlrpc_check_bulk_sent);
+EXPORT_SYMBOL(ptlrpc_check_bulk_received);
 
 /* connmgr.c */
 EXPORT_SYMBOL(ptlrpc_connmgr);