Whamcloud - gitweb
- documentation update for MDS recovery
[fs/lustre-release.git] / lustre / osc / osc_request.c
index b8869ba..7b33e6c 100644 (file)
  */
 
 #define EXPORT_SYMTAB
-
-#include <linux/config.h>
-#include <linux/module.h>
-#include <linux/kernel.h>
-#include <linux/mm.h>
-#include <linux/string.h>
-#include <linux/stat.h>
-#include <linux/errno.h>
-#include <linux/locks.h>
-#include <linux/unistd.h>
-
-#include <asm/system.h>
-#include <asm/uaccess.h>
-
-#include <linux/fs.h>
-#include <linux/stat.h>
-#include <asm/uaccess.h>
-#include <asm/segment.h>
-#include <linux/miscdevice.h>
-
 #define DEBUG_SUBSYSTEM S_OSC
 
-#include <linux/obd_class.h>
-#include <linux/lustre_lib.h>
-#include <linux/lustre_net.h>
+#include <linux/module.h>
+#include <linux/lustre_dlm.h>
 #include <linux/obd_ost.h>
 
-struct ptlrpc_client *osc_con2cl(struct obd_conn *conn)
+static void osc_con2cl(struct obd_conn *conn, struct ptlrpc_client **cl,
+                       struct ptlrpc_connection **connection)
 {
         struct osc_obd *osc = &conn->oc_dev->u.osc;
-        return osc->osc_client;
+        *cl = osc->osc_client;
+        *connection = osc->osc_conn;
 }
 
 static int osc_connect(struct obd_conn *conn)
 {
         struct ptlrpc_request *request;
-        struct ptlrpc_client *cl = osc_con2cl(conn);
+        struct ptlrpc_client *cl;
+        struct ptlrpc_connection *connection;
         struct ost_body *body;
         int rc, size = sizeof(*body);
         ENTRY;
 
-        request = ptlrpc_prep_req(cl, OST_CONNECT, 0, NULL, NULL);
+        osc_con2cl(conn, &cl, &connection);
+        request = ptlrpc_prep_req(cl, connection, OST_CONNECT, 0, NULL, NULL);
         if (!request)
                 RETURN(-ENOMEM);
 
         request->rq_replen = lustre_msg_size(1, &size);
 
-        rc = ptlrpc_queue_wait(cl, request);
+        rc = ptlrpc_queue_wait(request);
         if (rc)
                 GOTO(out, rc);
 
@@ -81,12 +64,14 @@ static int osc_connect(struct obd_conn *conn)
 static int osc_disconnect(struct obd_conn *conn)
 {
         struct ptlrpc_request *request;
-        struct ptlrpc_client *cl = osc_con2cl(conn);
+        struct ptlrpc_client *cl;
+        struct ptlrpc_connection *connection;
         struct ost_body *body;
         int rc, size = sizeof(*body);
         ENTRY;
 
-        request = ptlrpc_prep_req(cl, OST_DISCONNECT, 1, &size, NULL);
+        osc_con2cl(conn, &cl, &connection);
+        request = ptlrpc_prep_req(cl, connection, OST_DISCONNECT, 1, &size, NULL);
         if (!request)
                 RETURN(-ENOMEM);
 
@@ -95,7 +80,7 @@ static int osc_disconnect(struct obd_conn *conn)
 
         request->rq_replen = lustre_msg_size(1, &size);
 
-        rc = ptlrpc_queue_wait(cl, request);
+        rc = ptlrpc_queue_wait(request);
         GOTO(out, rc);
  out:
         ptlrpc_free_req(request);
@@ -105,12 +90,14 @@ static int osc_disconnect(struct obd_conn *conn)
 static int osc_getattr(struct obd_conn *conn, struct obdo *oa)
 {
         struct ptlrpc_request *request;
-        struct ptlrpc_client *cl = osc_con2cl(conn);
+        struct ptlrpc_client *cl;
+        struct ptlrpc_connection *connection;
         struct ost_body *body;
         int rc, size = sizeof(*body);
         ENTRY;
 
-        request = ptlrpc_prep_req(cl, OST_GETATTR, 1, &size, NULL);
+        osc_con2cl(conn, &cl, &connection);
+        request = ptlrpc_prep_req(cl, connection, OST_GETATTR, 1, &size, NULL);
         if (!request)
                 RETURN(-ENOMEM);
 
@@ -121,7 +108,7 @@ static int osc_getattr(struct obd_conn *conn, struct obdo *oa)
 
         request->rq_replen = lustre_msg_size(1, &size);
 
-        rc = ptlrpc_queue_wait(cl, request);
+        rc = ptlrpc_queue_wait(request);
         if (rc)
                 GOTO(out, rc);
 
@@ -139,12 +126,14 @@ static int osc_getattr(struct obd_conn *conn, struct obdo *oa)
 static int osc_open(struct obd_conn *conn, struct obdo *oa)
 {
         struct ptlrpc_request *request;
-        struct ptlrpc_client *cl = osc_con2cl(conn);
+        struct ptlrpc_client *cl;
+        struct ptlrpc_connection *connection;
         struct ost_body *body;
         int rc, size = sizeof(*body);
         ENTRY;
 
-        request = ptlrpc_prep_req(cl, OST_OPEN, 1, &size, NULL);
+        osc_con2cl(conn, &cl, &connection);
+        request = ptlrpc_prep_req(cl, connection, OST_OPEN, 1, &size, NULL);
         if (!request)
                 RETURN(-ENOMEM);
 
@@ -156,7 +145,7 @@ static int osc_open(struct obd_conn *conn, struct obdo *oa)
 
         request->rq_replen = lustre_msg_size(1, &size);
 
-        rc = ptlrpc_queue_wait(cl, request);
+        rc = ptlrpc_queue_wait(request);
         if (rc)
                 GOTO(out, rc);
 
@@ -174,12 +163,14 @@ static int osc_open(struct obd_conn *conn, struct obdo *oa)
 static int osc_close(struct obd_conn *conn, struct obdo *oa)
 {
         struct ptlrpc_request *request;
-        struct ptlrpc_client *cl = osc_con2cl(conn);
+        struct ptlrpc_client *cl;
+        struct ptlrpc_connection *connection;
         struct ost_body *body;
         int rc, size = sizeof(*body);
         ENTRY;
 
-        request = ptlrpc_prep_req(cl, OST_CLOSE, 1, &size, NULL);
+        osc_con2cl(conn, &cl, &connection);
+        request = ptlrpc_prep_req(cl, connection, OST_CLOSE, 1, &size, NULL);
         if (!request)
                 RETURN(-ENOMEM);
 
@@ -189,7 +180,7 @@ static int osc_close(struct obd_conn *conn, struct obdo *oa)
 
         request->rq_replen = lustre_msg_size(1, &size);
 
-        rc = ptlrpc_queue_wait(cl, request);
+        rc = ptlrpc_queue_wait(request);
         if (rc)
                 GOTO(out, rc);
 
@@ -207,12 +198,14 @@ static int osc_close(struct obd_conn *conn, struct obdo *oa)
 static int osc_setattr(struct obd_conn *conn, struct obdo *oa)
 {
         struct ptlrpc_request *request;
-        struct ptlrpc_client *cl = osc_con2cl(conn);
+        struct ptlrpc_client *cl;
+        struct ptlrpc_connection *connection;
         struct ost_body *body;
         int rc, size = sizeof(*body);
         ENTRY;
 
-        request = ptlrpc_prep_req(cl, OST_SETATTR, 1, &size, NULL);
+        osc_con2cl(conn, &cl, &connection);
+        request = ptlrpc_prep_req(cl, connection, OST_SETATTR, 1, &size, NULL);
         if (!request)
                 RETURN(-ENOMEM);
 
@@ -222,7 +215,7 @@ static int osc_setattr(struct obd_conn *conn, struct obdo *oa)
 
         request->rq_replen = lustre_msg_size(1, &size);
 
-        rc = ptlrpc_queue_wait(cl, request);
+        rc = ptlrpc_queue_wait(request);
         GOTO(out, rc);
 
  out:
@@ -233,7 +226,8 @@ static int osc_setattr(struct obd_conn *conn, struct obdo *oa)
 static int osc_create(struct obd_conn *conn, struct obdo *oa)
 {
         struct ptlrpc_request *request;
-        struct ptlrpc_client *cl = osc_con2cl(conn);
+        struct ptlrpc_client *cl;
+        struct ptlrpc_connection *connection;
         struct ost_body *body;
         int rc, size = sizeof(*body);
         ENTRY;
@@ -242,7 +236,8 @@ static int osc_create(struct obd_conn *conn, struct obdo *oa)
                 CERROR("oa NULL\n");
                 RETURN(-EINVAL);
         }
-        request = ptlrpc_prep_req(cl, OST_CREATE, 1, &size, NULL);
+        osc_con2cl(conn, &cl, &connection);
+        request = ptlrpc_prep_req(cl, connection, OST_CREATE, 1, &size, NULL);
         if (!request)
                 RETURN(-ENOMEM);
 
@@ -253,7 +248,7 @@ static int osc_create(struct obd_conn *conn, struct obdo *oa)
 
         request->rq_replen = lustre_msg_size(1, &size);
 
-        rc = ptlrpc_queue_wait(cl, request);
+        rc = ptlrpc_queue_wait(request);
         if (rc)
                 GOTO(out, rc);
 
@@ -270,7 +265,8 @@ static int osc_punch(struct obd_conn *conn, struct obdo *oa, obd_size count,
                      obd_off offset)
 {
         struct ptlrpc_request *request;
-        struct ptlrpc_client *cl = osc_con2cl(conn);
+        struct ptlrpc_client *cl;
+        struct ptlrpc_connection *connection;
         struct ost_body *body;
         int rc, size = sizeof(*body);
         ENTRY;
@@ -279,7 +275,8 @@ static int osc_punch(struct obd_conn *conn, struct obdo *oa, obd_size count,
                 CERROR("oa NULL\n");
                 RETURN(-EINVAL);
         }
-        request = ptlrpc_prep_req(cl, OST_PUNCH, 1, &size, NULL);
+        osc_con2cl(conn, &cl, &connection);
+        request = ptlrpc_prep_req(cl, connection, OST_PUNCH, 1, &size, NULL);
         if (!request)
                 RETURN(-ENOMEM);
 
@@ -292,7 +289,7 @@ static int osc_punch(struct obd_conn *conn, struct obdo *oa, obd_size count,
 
         request->rq_replen = lustre_msg_size(1, &size);
 
-        rc = ptlrpc_queue_wait(cl, request);
+        rc = ptlrpc_queue_wait(request);
         if (rc)
                 GOTO(out, rc);
 
@@ -308,7 +305,8 @@ static int osc_punch(struct obd_conn *conn, struct obdo *oa, obd_size count,
 static int osc_destroy(struct obd_conn *conn, struct obdo *oa)
 {
         struct ptlrpc_request *request;
-        struct ptlrpc_client *cl = osc_con2cl(conn);
+        struct ptlrpc_client *cl;
+        struct ptlrpc_connection *connection;
         struct ost_body *body;
         int rc, size = sizeof(*body);
         ENTRY;
@@ -317,7 +315,8 @@ static int osc_destroy(struct obd_conn *conn, struct obdo *oa)
                 CERROR("oa NULL\n");
                 RETURN(-EINVAL);
         }
-        request = ptlrpc_prep_req(cl, OST_DESTROY, 1, &size, NULL);
+        osc_con2cl(conn, &cl, &connection);
+        request = ptlrpc_prep_req(cl, connection, OST_DESTROY, 1, &size, NULL);
         if (!request)
                 RETURN(-ENOMEM);
 
@@ -328,7 +327,7 @@ static int osc_destroy(struct obd_conn *conn, struct obdo *oa)
 
         request->rq_replen = lustre_msg_size(1, &size);
 
-        rc = ptlrpc_queue_wait(cl, request);
+        rc = ptlrpc_queue_wait(request);
         if (rc)
                 GOTO(out, rc);
 
@@ -344,7 +343,10 @@ static int osc_destroy(struct obd_conn *conn, struct obdo *oa)
 int osc_sendpage(struct obd_conn *conn, struct ptlrpc_request *req,
                  struct niobuf *dst, struct niobuf *src)
 {
-        struct ptlrpc_client *cl = osc_con2cl(conn);
+        struct ptlrpc_client *cl;
+        struct ptlrpc_connection *connection;
+
+        osc_con2cl(conn, &cl, &connection);
 
         if (cl->cli_obd) {
                 /* local sendpage */
@@ -354,9 +356,9 @@ int osc_sendpage(struct obd_conn *conn, struct ptlrpc_request *req,
                 struct ptlrpc_bulk_desc *bulk;
                 int rc;
 
-                bulk = ptlrpc_prep_bulk(&cl->cli_server);
+                bulk = ptlrpc_prep_bulk(connection);
                 if (bulk == NULL)
-                        return -ENOMEM;
+                        RETURN(-ENOMEM);
 
                 bulk->b_buf = (void *)(unsigned long)src->addr;
                 bulk->b_buflen = src->len;
@@ -364,19 +366,19 @@ int osc_sendpage(struct obd_conn *conn, struct ptlrpc_request *req,
                 rc = ptlrpc_send_bulk(bulk, OSC_BULK_PORTAL);
                 if (rc != 0) {
                         CERROR("send_bulk failed: %d\n", rc);
+                        ptlrpc_free_bulk(bulk);
                         LBUG();
-                        return rc;
+                        RETURN(rc);
                 }
                 wait_event_interruptible(bulk->b_waitq,
                                          ptlrpc_check_bulk_sent(bulk));
 
-                if (bulk->b_flags == PTL_RPC_INTR) {
-                        EXIT;
-                        /* FIXME: hey hey, we leak here. */
-                        return -EINTR;
+                if (bulk->b_flags & PTL_RPC_FL_INTR) {
+                        ptlrpc_free_bulk(bulk);
+                        RETURN(-EINTR);
                 }
 
-                OBD_FREE(bulk, sizeof(*bulk));
+                ptlrpc_free_bulk(bulk);
         }
 
         return 0;
@@ -386,7 +388,8 @@ int osc_brw_read(struct obd_conn *conn, obd_count num_oa, struct obdo **oa,
                  obd_count *oa_bufs, struct page **buf, obd_size *count,
                  obd_off *offset, obd_flag *flags)
 {
-        struct ptlrpc_client *cl = osc_con2cl(conn);
+        struct ptlrpc_client *cl;
+        struct ptlrpc_connection *connection;
         struct ptlrpc_request *request;
         struct ost_body *body;
         struct obd_ioobj ioo;
@@ -406,7 +409,8 @@ int osc_brw_read(struct obd_conn *conn, obd_count num_oa, struct obdo **oa,
         if (bulk == NULL)
                 RETURN(-ENOMEM);
 
-        request = ptlrpc_prep_req(cl, OST_BRW, 3, size, NULL);
+        osc_con2cl(conn, &cl, &connection);
+        request = ptlrpc_prep_req(cl, connection, OST_BRW, 3, size, NULL);
         if (!request)
                 GOTO(out, rc = -ENOMEM);
 
@@ -418,13 +422,13 @@ int osc_brw_read(struct obd_conn *conn, obd_count num_oa, struct obdo **oa,
         for (pages = 0, i = 0; i < num_oa; i++) {
                 ost_pack_ioo(&ptr1, oa[i], oa_bufs[i]);
                 for (j = 0; j < oa_bufs[i]; j++, pages++) {
-                        bulk[pages] = ptlrpc_prep_bulk(&cl->cli_server);
+                        bulk[pages] = ptlrpc_prep_bulk(connection);
                         if (bulk[pages] == NULL)
                                 GOTO(out, rc = -ENOMEM);
 
-                        spin_lock(&cl->cli_lock);
-                        bulk[pages]->b_xid = cl->cli_xid++;
-                        spin_unlock(&cl->cli_lock);
+                        spin_lock(&connection->c_lock);
+                        bulk[pages]->b_xid = ++connection->c_xid_out;
+                        spin_unlock(&connection->c_lock);
 
                         bulk[pages]->b_buf = kmap(buf[pages]);
                         bulk[pages]->b_buflen = PAGE_SIZE;
@@ -440,7 +444,7 @@ int osc_brw_read(struct obd_conn *conn, obd_count num_oa, struct obdo **oa,
         }
 
         request->rq_replen = lustre_msg_size(1, size);
-        rc = ptlrpc_queue_wait(cl, request);
+        rc = ptlrpc_queue_wait(request);
         GOTO(out, rc);
 
  out:
@@ -452,7 +456,7 @@ int osc_brw_read(struct obd_conn *conn, obd_count num_oa, struct obdo **oa,
                         if (bulk[pages] == NULL)
                                 continue;
                         kunmap(buf[pages]);
-                        OBD_FREE(bulk[pages], sizeof(**bulk));
+                        ptlrpc_free_bulk(bulk[pages]);
                 }
         }
 
@@ -465,7 +469,8 @@ int osc_brw_write(struct obd_conn *conn, obd_count num_oa, struct obdo **oa,
                   obd_count *oa_bufs, struct page **buf, obd_size *count,
                   obd_off *offset, obd_flag *flags)
 {
-        struct ptlrpc_client *cl = osc_con2cl(conn);
+        struct ptlrpc_client *cl;
+        struct ptlrpc_connection *connection;
         struct ptlrpc_request *request;
         struct obd_ioobj ioo;
         struct ost_body *body;
@@ -484,7 +489,8 @@ int osc_brw_write(struct obd_conn *conn, obd_count num_oa, struct obdo **oa,
         if (!src)
                 RETURN(-ENOMEM);
 
-        request = ptlrpc_prep_req(cl, OST_BRW, 3, size, NULL);
+        osc_con2cl(conn, &cl, &connection);
+        request = ptlrpc_prep_req(cl, connection, OST_BRW, 3, size, NULL);
         if (!request)
                 RETURN(-ENOMEM);
         body = lustre_msg_buf(request->rq_reqmsg, 0);
@@ -504,7 +510,7 @@ int osc_brw_write(struct obd_conn *conn, obd_count num_oa, struct obdo **oa,
         size[1] = pages * sizeof(struct niobuf);
         request->rq_replen = lustre_msg_size(2, size);
 
-        rc = ptlrpc_queue_wait(cl, request);
+        rc = ptlrpc_queue_wait(request);
         if (rc)
                 GOTO(out, rc);
 
@@ -530,7 +536,7 @@ int osc_brw_write(struct obd_conn *conn, obd_count num_oa, struct obdo **oa,
  out:
         for (pages = 0, i = 0; i < num_oa; i++)
                 for (j = 0; j < oa_bufs[i]; j++, pages++)
-                        kunmap(buf[n]);
+                        kunmap(buf[pages]);
 
         ptlrpc_free_req(request);
         return 0;
@@ -548,36 +554,48 @@ int osc_brw(int rw, struct obd_conn *conn, obd_count num_oa,
                                      offset, flags);
 }
 
-/* mount the file system (secretly) */
-static int osc_setup(struct obd_device *obddev, obd_count len,
-                        void *buf)
-
+static int osc_setup(struct obd_device *obddev, obd_count len, void *buf)
 {
         struct osc_obd *osc = &obddev->u.osc;
-        struct obd_ioctl_data *data = (struct obd_ioctl_data *)buf;
         int rc;
-        int dev = data->ioc_dev;
         ENTRY;
 
+        osc->osc_conn = ptlrpc_uuid_to_connection("ost");
+        if (!osc->osc_conn)
+                RETURN(-EINVAL);
+
         OBD_ALLOC(osc->osc_client, sizeof(*osc->osc_client));
         if (osc->osc_client == NULL)
-                RETURN(-ENOMEM);
+                GOTO(out_conn, rc = -ENOMEM);
 
-        ptlrpc_init_client(dev, OST_REQUEST_PORTAL, OSC_REPLY_PORTAL,
-                                   osc->osc_client);
-        rc = ptlrpc_connect_client(dev, "ost", osc->osc_client);
+        OBD_ALLOC(osc->osc_ldlm_client, sizeof(*osc->osc_ldlm_client));
+        if (osc->osc_ldlm_client == NULL)
+                GOTO(out_client, rc = -ENOMEM);
 
-        if (rc == 0)
-                MOD_INC_USE_COUNT;
-        RETURN(rc);
+        ptlrpc_init_client(NULL, NULL, OST_REQUEST_PORTAL, OSC_REPLY_PORTAL,
+                           osc->osc_client);
+        ptlrpc_init_client(NULL, NULL, LDLM_REQUEST_PORTAL, LDLM_REPLY_PORTAL,
+                           osc->osc_ldlm_client);
+
+        MOD_INC_USE_COUNT;
+        RETURN(0);
+
+ out_client:
+        OBD_FREE(osc->osc_client, sizeof(*osc->osc_client));
+ out_conn:
+        ptlrpc_put_connection(osc->osc_conn);
+        return rc;
 }
 
 static int osc_cleanup(struct obd_device * obddev)
 {
         struct osc_obd *osc = &obddev->u.osc;
 
-        if (osc->osc_client != NULL)
-                OBD_FREE(osc->osc_client, sizeof(*osc->osc_client));
+        ptlrpc_cleanup_client(osc->osc_client);
+        OBD_FREE(osc->osc_client, sizeof(*osc->osc_client));
+        ptlrpc_cleanup_client(osc->osc_ldlm_client);
+        OBD_FREE(osc->osc_ldlm_client, sizeof(*osc->osc_ldlm_client));
+        ptlrpc_put_connection(osc->osc_conn);
 
         MOD_DEC_USE_COUNT;
         return 0;