Whamcloud - gitweb
WARNING: This commit breaks everything. It will be back in shape within 12
[fs/lustre-release.git] / lustre / osc / osc_request.c
index fe83d9b..444ff05 100644 (file)
@@ -82,7 +82,8 @@ static int osc_disconnect(struct obd_conn *conn)
         ENTRY;
 
         osc_con2cl(conn, &cl, &connection);
-        request = ptlrpc_prep_req(cl, connection, OST_DISCONNECT, 1, &size, NULL);
+        request = ptlrpc_prep_req(cl, connection, OST_DISCONNECT, 1, &size,
+                                  NULL);
         if (!request)
                 RETURN(-ENOMEM);
 
@@ -360,26 +361,32 @@ static int osc_destroy(struct obd_conn *conn, struct obdo *oa)
         return 0;
 }
 
-static int osc_sendpage(struct ptlrpc_bulk_desc *desc,
-                        struct niobuf_remote *dst, struct niobuf_local *src)
+struct osc_brw_cb_data {
+        struct page **buf;
+        struct ptlrpc_request *req;
+        bulk_callback_t callback;
+        void *cb_data;
+};
+
+static void brw_read_finish(struct ptlrpc_bulk_desc *desc, void *data)
 {
-        struct ptlrpc_bulk_page *page;
-        ENTRY;
+        struct osc_brw_cb_data *cb_data = data;
 
-        page = ptlrpc_prep_bulk_page(desc);
-        if (page == NULL)
-                RETURN(-ENOMEM);
+        if (desc->b_flags & PTL_RPC_FL_INTR)
+                CERROR("got signal\n");
 
-        page->b_buf = (void *)(unsigned long)src->addr;
-        page->b_buflen = src->len;
-        page->b_xid = dst->xid;
+        (cb_data->callback)(desc, cb_data->cb_data);
 
-        RETURN(0);
+        ptlrpc_free_bulk(desc);
+        ptlrpc_free_req(cb_data->req);
+
+        OBD_FREE(cb_data, sizeof(*cb_data));
 }
 
 static int osc_brw_read(struct obd_conn *conn, obd_count num_oa,
                         struct obdo **oa, obd_count *oa_bufs, struct page **buf,
-                        obd_size *count, obd_off *offset, obd_flag *flags)
+                        obd_size *count, obd_off *offset, obd_flag *flags,
+                        bulk_callback_t callback)
 {
         struct ptlrpc_client *cl;
         struct ptlrpc_connection *connection;
@@ -414,6 +421,7 @@ static int osc_brw_read(struct obd_conn *conn, obd_count num_oa,
         ptr2 = lustre_msg_buf(request->rq_reqmsg, 2);
         for (pages = 0, i = 0; i < num_oa; i++) {
                 ost_pack_ioo(&ptr1, oa[i], oa_bufs[i]);
+                /* FIXME: this inner loop is wrong for multiple OAs */
                 for (j = 0; j < oa_bufs[i]; j++, pages++) {
                         struct ptlrpc_bulk_page *bulk;
                         bulk = ptlrpc_prep_bulk_page(desc);
@@ -457,10 +465,30 @@ static int osc_brw_read(struct obd_conn *conn, obd_count num_oa,
         return rc;
 }
 
+static void brw_write_finish(struct ptlrpc_bulk_desc *desc, void *data)
+{
+        struct osc_brw_cb_data *cb_data = data;
+        int i;
+
+        if (desc->b_flags & PTL_RPC_FL_INTR)
+                CERROR("got signal\n");
+
+        for (i = 0; i < desc->b_page_count; i++)
+                kunmap(cb_data->buf[i]);
+
+        (cb_data->callback)(desc, cb_data->cb_data);
+
+        ptlrpc_free_bulk(desc);
+        ptlrpc_free_req(cb_data->req);
+
+        OBD_FREE(cb_data, sizeof(*cb_data));
+}
+
 static int osc_brw_write(struct obd_conn *conn, obd_count num_oa,
                          struct obdo **oa, obd_count *oa_bufs,
-                         struct page **pagearray, obd_size *count, obd_off *offset,
-                         obd_flag *flags)
+                         struct page **pagearray, obd_size *count,
+                         obd_off *offset, obd_flag *flags,
+                         bulk_callback_t callback)
 {
         struct ptlrpc_client *cl;
         struct ptlrpc_connection *connection;
@@ -470,6 +498,7 @@ static int osc_brw_write(struct obd_conn *conn, obd_count num_oa,
         struct ost_body *body;
         struct niobuf_local *local;
         struct niobuf_remote *remote;
+        struct osc_brw_cb_data *cb_data;
         long pages;
         int rc, i, j, size[3] = {sizeof(*body)};
         void *ptr1, *ptr2;
@@ -527,17 +556,44 @@ static int osc_brw_write(struct obd_conn *conn, obd_count num_oa,
 
         desc = ptlrpc_prep_bulk(connection);
         desc->b_portal = OSC_BULK_PORTAL;
+        if (callback) {
+                desc->b_cb = brw_write_finish;
+                OBD_ALLOC(cb_data, sizeof(*cb_data));
+                cb_data->buf = pagearray;
+                cb_data->callback = callback;
+                desc->b_cb_data = cb_data;
+        }
 
         for (pages = 0, i = 0; i < num_oa; i++) {
                 for (j = 0; j < oa_bufs[i]; j++, pages++) {
+                        struct ptlrpc_bulk_page *page;
+
                         ost_unpack_niobuf(&ptr2, &remote);
-                        rc = osc_sendpage(desc, remote, &local[pages]);
-                        if (rc)
-                                GOTO(out3, rc);
+
+                        page = ptlrpc_prep_bulk_page(desc);
+                        if (page == NULL)
+                                GOTO(out3, rc = -ENOMEM);
+
+                        page->b_buf = (void *)(unsigned long)local[pages].addr;
+                        page->b_buflen = local[pages].len;
+                        page->b_xid = remote->xid;
                 }
         }
 
+        if (desc->b_page_count != pages)
+                LBUG();
+
         rc = ptlrpc_send_bulk(desc);
+        if (rc)
+                GOTO(out3, rc);
+        if (callback)
+                GOTO(out, rc);
+
+        /* If there's no callback function, sleep here until complete. */
+        wait_event_interruptible(desc->b_waitq, ptlrpc_check_bulk_sent(desc));
+        if (desc->b_flags & PTL_RPC_FL_INTR)
+                rc = -EINTR;
+
         GOTO(out3, rc);
 
  out3:
@@ -555,14 +611,18 @@ static int osc_brw_write(struct obd_conn *conn, obd_count num_oa,
 
 static int osc_brw(int rw, struct obd_conn *conn, obd_count num_oa,
                    struct obdo **oa, obd_count *oa_bufs, struct page **buf,
-                   obd_size *count, obd_off *offset, obd_flag *flags)
+                   obd_size *count, obd_off *offset, obd_flag *flags,
+                   void *callback)
 {
+        if (num_oa != 1)
+                LBUG();
+
         if (rw == OBD_BRW_READ)
                 return osc_brw_read(conn, num_oa, oa, oa_bufs, buf, count,
-                                    offset, flags);
+                                    offset, flags, (bulk_callback_t)callback);
         else
                 return osc_brw_write(conn, num_oa, oa, oa_bufs, buf, count,
-                                     offset, flags);
+                                     offset, flags, (bulk_callback_t)callback);
 }
 
 static int osc_enqueue(struct obd_conn *oconn, struct ldlm_namespace *ns,