Whamcloud - gitweb
Fix RPC request leak and potential refcounting problems on bulk descriptor.
authoradilger <adilger>
Wed, 24 Jul 2002 20:05:29 +0000 (20:05 +0000)
committeradilger <adilger>
Wed, 24 Jul 2002 20:05:29 +0000 (20:05 +0000)
lustre/osc/osc_request.c

index 8594c94..e7e6d54 100644 (file)
@@ -501,23 +501,24 @@ static int osc_brw_read(struct lustre_handle *conn, struct lov_stripe_md *md,
 
         desc = ptlrpc_prep_bulk(connection);
         if (!desc)
-                GOTO(out_free, rc = -ENOMEM);
+                GOTO(out_req, rc = -ENOMEM);
         desc->b_portal = OST_BULK_PORTAL;
         desc->b_cb = brw_finish;
         OBD_ALLOC(cb_data, sizeof(*cb_data));
         if (!cb_data)
-                GOTO(out_free, rc = -ENOMEM);
+                GOTO(out_desc, rc = -ENOMEM);
+
         cb_data->callback = callback;
         cb_data->cb_data = data;
         desc->b_cb_data = cb_data;
-        /* end almost identical to brw_write case */
 
         iooptr = lustre_msg_buf(request->rq_reqmsg, 1);
         nioptr = lustre_msg_buf(request->rq_reqmsg, 2);
         ost_pack_ioo(&iooptr, md, page_count);
+        /* end almost identical to brw_write case */
+
         for (mapped = 0; mapped < page_count; mapped++) {
-                struct ptlrpc_bulk_page *bulk;
-                bulk = ptlrpc_prep_bulk_page(desc);
+                struct ptlrpc_bulk_page *bulk = ptlrpc_prep_bulk_page(desc);
                 if (bulk == NULL)
                         GOTO(out_unmap, rc = -ENOMEM);
 
@@ -536,41 +537,64 @@ static int osc_brw_read(struct lustre_handle *conn, struct lov_stripe_md *md,
          * Register the bulk first, because the reply could arrive out of order,
          * and we want to be ready for the bulk data.
          *
-         * One reference is released by the bulk callback, the other when
-         * we finish sleeping on it (if we don't have a callback).
+         * One reference is released when brw_finish is complete, the
+         * other here when we finish waiting on it if we don't have a callback.
+         *
+         * We don't reference the bulk descriptor again here if there is a
+         * callback, so we don't need an additional refcount on it.
+         *
+         * On error, we never do the brw_finish, so we handle all decrefs.
          */
-        atomic_set(&desc->b_refcount, callback ? 1 : 2);
+        if (!callback)
+                ptlrpc_bulk_addref(desc);
         rc = ptlrpc_register_bulk(desc);
         if (rc)
-                GOTO(out_unmap, rc);
+                GOTO(out_desc2, rc);
 
         request->rq_replen = lustre_msg_size(1, size);
         rc = ptlrpc_queue_wait(request);
         rc = ptlrpc_check_status(request, rc);
-        if (rc) {
-                ptlrpc_bulk_decref(desc);
-                GOTO(out_unmap, rc);
-        }
+
+        /* XXX: Mike, this is the only place I'm not sure of.  If we had
+         *      an error here, will we always call brw_finish?  If yes, then
+         *      out_desc_2 will do too much and we should jump to out_desc.
+         *      If maybe, then we are screwed, and we need to set things up
+         *      so that bulk_sink_callback is called for each bulk page,
+         *      even on error so brw_finish is always called.  It would need
+         *      to be passed an error code as a parameter to know what to do.
+         *
+         *      That would also help with the partial completion case, so
+         *      we could say in brw_finish "these pages are done, don't
+         *      restart them" and osc_brw callers can know this.
+         */
+        if (rc)
+                GOTO(out_desc2, rc);
 
         /* Callbacks cause asynchronous handling. */
         if (callback)
-                RETURN(0);
+                GOTO(out_req, rc = 0);
 
+        /* If there's no callback function, sleep here until complete. */
         l_wait_event_killable(desc->b_waitq, ptlrpc_check_bulk_received(desc));
-        rc = desc->b_flags & PTL_RPC_FL_INTR ? -EINTR : 0;
+        if (desc->b_flags & PTL_RPC_FL_INTR)
+                GOTO(out_desc, rc = -EINTR);
+
+        EXIT;
+out_desc:
         ptlrpc_bulk_decref(desc);
+out_req:
+        ptlrpc_req_finished(request);
         RETURN(rc);
 
         /* Clean up on error. */
- out_unmap:
+out_desc2:
+        if (!callback)
+                ptlrpc_bulk_decref(desc);
+out_unmap:
         while (mapped-- > 0)
                 kunmap(page_array[mapped]);
- out_free:
-        if (cb_data)
-                OBD_FREE(cb_data, sizeof(*cb_data));
-        ptlrpc_free_bulk(desc);
-        ptlrpc_free_req(request);
-        return rc;
+        OBD_FREE(cb_data, sizeof(*cb_data));
+        goto out_desc;
 }
 
 static int osc_brw_write(struct lustre_handle *conn,
@@ -605,18 +629,15 @@ static int osc_brw_write(struct lustre_handle *conn,
         body = lustre_msg_buf(request->rq_reqmsg, 0);
         body->data = OBD_BRW_WRITE;
 
-        OBD_ALLOC(local, page_count * sizeof(*local));
-        if (!local)
-                GOTO(out_free, rc = -ENOMEM);
-
         desc = ptlrpc_prep_bulk(connection);
         if (!desc)
-                GOTO(out_free, rc = -ENOMEM);
+                GOTO(out_req, rc = -ENOMEM);
         desc->b_portal = OSC_BULK_PORTAL;
         desc->b_cb = brw_finish;
         OBD_ALLOC(cb_data, sizeof(*cb_data));
         if (!cb_data)
-                GOTO(out_free, rc = -ENOMEM);
+                GOTO(out_desc, rc = -ENOMEM);
+
         cb_data->callback = callback;
         cb_data->cb_data = data;
         desc->b_cb_data = cb_data;
@@ -626,6 +647,10 @@ static int osc_brw_write(struct lustre_handle *conn,
         ost_pack_ioo(&iooptr, md, page_count);
         /* end almost identical to brw_read case */
 
+        OBD_ALLOC(local, page_count * sizeof(*local));
+        if (!local)
+                GOTO(out_cb, rc = -ENOMEM);
+
         cb_data->obd_data = local;
         cb_data->obd_size = page_count * sizeof(*local);
 
@@ -672,39 +697,48 @@ static int osc_brw_write(struct lustre_handle *conn,
                 LBUG();
 
         /*
-         * One is released when the bulk is complete, the other when we finish
-         * waiting on it.  (Callback cases don't sleep, so only one ref for
-         * them.)
+         * One reference is released when brw_finish is complete, the
+         * other here when we finish waiting on it if we don't have a callback.
+         *
+         * We don't reference the bulk descriptor again here if there is a
+         * callback, so we don't need an additional refcount on it.
          */
-        atomic_set(&desc->b_refcount, callback ? 1 : 2);
-        CDEBUG(D_PAGE, "Set refcount of %p to %d\n", desc,
-               atomic_read(&desc->b_refcount));
+        if (!callback)
+                ptlrpc_bulk_addref(desc);
         rc = ptlrpc_send_bulk(desc);
+
+        /* XXX: Mike, same question as in osc_brw_read. */
         if (rc)
-                GOTO(out_unmap, rc);
+                GOTO(out_desc2, rc);
 
         /* Callbacks cause asynchronous handling. */
         if (callback)
-                RETURN(0);
+                GOTO(out_req, rc = 0);
 
         /* If there's no callback function, sleep here until complete. */
         l_wait_event_killable(desc->b_waitq, ptlrpc_check_bulk_sent(desc));
-        ptlrpc_bulk_decref(desc);
         if (desc->b_flags & PTL_RPC_FL_INTR)
-                RETURN(-EINTR);
-        RETURN(0);
+                GOTO(out_desc, rc = -EINTR);
+
+        EXIT;
+out_desc:
+        ptlrpc_bulk_decref(desc);
+out_req:
+        ptlrpc_req_finished(request);
+        return rc;
 
         /* Clean up on error. */
- out_unmap:
+out_desc2:
+        if (!callback)
+                ptlrpc_bulk_decref(desc);
+out_unmap:
         while (mapped-- > 0)
                 kunmap(pagearray[mapped]);
 
- out_free:
-        OBD_FREE(cb_data, sizeof(*cb_data));
         OBD_FREE(local, page_count * sizeof(*local));
-        ptlrpc_free_bulk(desc);
-        ptlrpc_req_finished(request);
-        return rc;
+out_cb:
+        OBD_FREE(cb_data, sizeof(*cb_data));
+        goto out_desc;
 }
 
 static int osc_brw(int cmd, struct lustre_handle *conn,