Whamcloud - gitweb
- elimininate the system calls from filter obd
[fs/lustre-release.git] / lustre / osc / osc_request.c
index 38ab56d..f1cfd3f 100644 (file)
@@ -1,6 +1,17 @@
-/*
+/* 
  * Copryright (C) 2001 Cluster File Systems, Inc.
  *
+ *  This code is issued under the GNU General Public License.
+ *  See the file COPYING in this distribution
+ *
+ *  Author Peter Braam <braam@clusterfs.com>
+ * 
+ *  This server is single threaded at present (but can easily be multi
+ *  threaded). For testing and management it is treated as an
+ *  obd_device, although it does not export a full OBD method table
+ *  (the requests are coming in over the wire, so object target
+ *  modules do not have a full method table.)
+ * 
  */
 
 #define EXPORT_SYMTAB
@@ -32,6 +43,9 @@
 
 extern int ost_queue_req(struct obd_device *, struct ptlrpc_request *);
 
+/* FIXME: this belongs in some sort of service struct */
+static int osc_xid = 1;
+
 struct ptlrpc_request *ost_prep_req(int opcode, int buflen1, char *buf1, 
                                 int buflen2, char *buf2)
 {
@@ -45,6 +59,9 @@ struct ptlrpc_request *ost_prep_req(int opcode, int buflen1, char *buf1,
                return NULL;
        }
 
+       memset(request, 0, sizeof(*request));
+       request->rq_xid = osc_xid++;
+
        rc = ost_pack_req(buf1, buflen1,  buf2, buflen2,
                          &request->rq_reqhdr, &request->rq_req.ost, 
                          &request->rq_reqlen, &request->rq_reqbuf);
@@ -58,51 +75,67 @@ struct ptlrpc_request *ost_prep_req(int opcode, int buflen1, char *buf1,
        return request;
 }
 
+/* XXX: unify with mdc_queue_wait */
 extern int osc_queue_wait(struct obd_conn *conn, struct ptlrpc_request *req)
 {
        struct obd_device *client = conn->oc_dev;
-       struct obd_device *target = client->u.osc.osc_tgt;
+       struct lustre_peer *peer = &conn->oc_dev->u.osc.osc_peer;
        int rc;
 
        ENTRY;
+
        /* set the connection id */
        req->rq_req.ost->connid = conn->oc_id;
 
-       CDEBUG(D_INODE, "tgt at %p, conn id %d, opcode %d request at: %p\n", 
-              &conn->oc_dev->u.osc.osc_tgt->u.ost, 
-              conn->oc_id, req->rq_reqhdr->opc, req);
-
        /* XXX fix the race here (wait_for_event?)*/
-       /* hand the packet over to the server */
-       rc =  ost_queue_req(target, req); 
+       if (peer == NULL) {
+               /* Local delivery */
+               CDEBUG(D_INODE, "\n");
+               rc = ost_queue_req(client, req); 
+       } else {
+               /* Remote delivery via portals. */
+               req->rq_req_portal = OST_REQUEST_PORTAL;
+               req->rq_reply_portal = OST_REPLY_PORTAL;
+               rc = ptl_send_rpc(req, peer);
+       }
        if (rc) { 
-               printk("osc_queue_wait: error %d, opcode %d\n", rc, 
+               printk(__FUNCTION__ ": error %d, opcode %d\n", rc, 
                       req->rq_reqhdr->opc); 
                return -rc;
        }
 
+       CDEBUG(D_INODE, "tgt at %p, conn id %d, opcode %d request at: %p\n", 
+              &conn->oc_dev->u.osc.osc_tgt->u.ost, 
+              conn->oc_id, req->rq_reqhdr->opc, req);
+
        /* wait for the reply */
        init_waitqueue_head(&req->rq_wait_for_rep);
+       CDEBUG(D_INODE, "-- sleeping\n");
        interruptible_sleep_on(&req->rq_wait_for_rep);
+       CDEBUG(D_INODE, "-- done\n");
+
+       rc = ost_unpack_rep(req->rq_repbuf, req->rq_replen, &req->rq_rephdr, 
+                           &req->rq_rep.ost); 
+       if (rc) {
+               printk(__FUNCTION__ ": mds_unpack_rep failed: %d\n", rc);
+               return rc;
+       }
 
-       ost_unpack_rep(req->rq_repbuf, req->rq_replen, &req->rq_rephdr, 
-                      &req->rq_rep.ost); 
-       printk("-->osc_queue_wait: buf %p len %d status %d\n"
-              req->rq_repbuf, req->rq_replen, req->rq_rephdr->status); 
+       if ( req->rq_rephdr->status == 0 )
+               CDEBUG(D_INODE, "buf %p len %d status %d\n", 
+                      req->rq_repbuf, req->rq_replen
+                      req->rq_rephdr->status); 
 
        EXIT;
-       return req->rq_rephdr->status;
+       return 0;
 }
 
-void osc_free_req(struct ptlrpc_request *request)
+static void osc_free_req(struct ptlrpc_request *request)
 {
-       if (request->rq_repbuf)
-               kfree(request->rq_repbuf);
        kfree(request);
 }
 
-
-int osc_connect(struct obd_conn *conn)
+static int osc_connect(struct obd_conn *conn)
 {
        struct ptlrpc_request *request;
        int rc; 
@@ -110,10 +143,13 @@ int osc_connect(struct obd_conn *conn)
        
        request = ost_prep_req(OST_CONNECT, 0, NULL, 0, NULL);
        if (!request) { 
-               printk("osc_connect: cannot pack req!\n"); 
+               printk(__FUNCTION__ ": cannot pack req!\n"); 
                return -ENOMEM;
        }
 
+       request->rq_replen = 
+               sizeof(struct ptlrep_hdr) + sizeof(struct ost_rep);
+
        rc = osc_queue_wait(conn, request);
        if (rc) { 
                EXIT;
@@ -129,7 +165,7 @@ int osc_connect(struct obd_conn *conn)
        return rc;
 }
 
-int osc_disconnect(struct obd_conn *conn)
+static int osc_disconnect(struct obd_conn *conn)
 {
        struct ptlrpc_request *request;
        int rc; 
@@ -137,10 +173,13 @@ int osc_disconnect(struct obd_conn *conn)
        
        request = ost_prep_req(OST_DISCONNECT, 0, NULL, 0, NULL);
        if (!request) { 
-               printk("osc_connect: cannot pack req!\n"); 
+               printk(__FUNCTION__ ": cannot pack req!\n"); 
                return -ENOMEM;
        }
 
+       request->rq_replen = 
+               sizeof(struct ptlrep_hdr) + sizeof(struct ost_rep);
+
        rc = osc_queue_wait(conn, request);
        if (rc) { 
                EXIT;
@@ -153,19 +192,21 @@ int osc_disconnect(struct obd_conn *conn)
 }
 
 
-int osc_getattr(struct obd_conn *conn, struct obdo *oa)
+static int osc_getattr(struct obd_conn *conn, struct obdo *oa)
 {
        struct ptlrpc_request *request;
        int rc; 
 
        request = ost_prep_req(OST_GETATTR, 0, NULL, 0, NULL);
        if (!request) { 
-               printk("osc_connect: cannot pack req!\n"); 
+               printk(__FUNCTION__ ": cannot pack req!\n"); 
                return -ENOMEM;
        }
        
        memcpy(&request->rq_req.ost->oa, oa, sizeof(*oa));
        request->rq_req.ost->oa.o_valid = ~0;
+       request->rq_replen = 
+               sizeof(struct ptlrep_hdr) + sizeof(struct ost_rep);
        
        rc = osc_queue_wait(conn, request);
        if (rc) { 
@@ -183,7 +224,33 @@ int osc_getattr(struct obd_conn *conn, struct obdo *oa)
        return 0;
 }
 
-int osc_create(struct obd_conn *conn, struct obdo *oa)
+static int osc_setattr(struct obd_conn *conn, struct obdo *oa)
+{
+       struct ptlrpc_request *request;
+       int rc; 
+
+       request = ost_prep_req(OST_SETATTR, 0, NULL, 0, NULL);
+       if (!request) { 
+               printk(__FUNCTION__ ": cannot pack req!\n"); 
+               return -ENOMEM;
+       }
+       
+       memcpy(&request->rq_req.ost->oa, oa, sizeof(*oa));
+       request->rq_replen = 
+               sizeof(struct ptlrep_hdr) + sizeof(struct ost_rep);
+       
+       rc = osc_queue_wait(conn, request);
+       if (rc) { 
+               EXIT;
+               goto out;
+       }
+
+ out:
+       osc_free_req(request);
+       return 0;
+}
+
+static int osc_create(struct obd_conn *conn, struct obdo *oa)
 {
        struct ptlrpc_request *request;
        int rc; 
@@ -199,6 +266,39 @@ int osc_create(struct obd_conn *conn, struct obdo *oa)
        
        memcpy(&request->rq_req.ost->oa, oa, sizeof(*oa));
        request->rq_req.ost->oa.o_valid = ~0;
+       request->rq_replen = 
+               sizeof(struct ptlrep_hdr) + sizeof(struct ost_rep);
+       
+       rc = osc_queue_wait(conn, request);
+       if (rc) { 
+               EXIT;
+               goto out;
+       }
+       memcpy(oa, &request->rq_rep.ost->oa, sizeof(*oa));
+
+ out:
+       osc_free_req(request);
+       return 0;
+}
+
+static int osc_destroy(struct obd_conn *conn, struct obdo *oa)
+{
+       struct ptlrpc_request *request;
+       int rc; 
+
+       if (!oa) { 
+               printk(__FUNCTION__ ": oa NULL\n"); 
+       }
+       request = ost_prep_req(OST_DESTROY, 0, NULL, 0, NULL);
+       if (!request) { 
+               printk("osc_connect: cannot pack req!\n"); 
+               return -ENOMEM;
+       }
+       
+       memcpy(&request->rq_req.ost->oa, oa, sizeof(*oa));
+       request->rq_req.ost->oa.o_valid = ~0;
+       request->rq_replen = 
+               sizeof(struct ptlrep_hdr) + sizeof(struct ost_rep);
        
        rc = osc_queue_wait(conn, request);
        if (rc) { 
@@ -222,23 +322,34 @@ static int osc_setup(struct obd_device *obddev, obd_count len,
        struct osc_obd *osc = &obddev->u.osc;
         ENTRY;
 
-       if (data->ioc_dev  < 0 || data->ioc_dev > MAX_OBD_DEVICES) { 
-               EXIT;
-               return -ENODEV;
+       if (data->ioc_dev >= 0 && data->ioc_dev < MAX_OBD_DEVICES) {
+               /* This is a local connection */
+               osc->osc_tgt = &obd_dev[data->ioc_dev];
+
+               printk("OSC: tgt %d ost at %p\n", data->ioc_dev,
+                      &osc->osc_tgt->u.ost);
+               if ( ! (osc->osc_tgt->obd_flags & OBD_ATTACHED) || 
+                    ! (osc->osc_tgt->obd_flags & OBD_SET_UP) ){
+                       printk("device not attached or not set up (%d)\n", 
+                              data->ioc_dev);
+                       EXIT;
+                       return -EINVAL;
+               }
+       } else {
+               int err;
+               /* This is a remote connection using Portals */
+
+               /* XXX: this should become something like ioc_inlbuf1 */
+               err = kportal_uuid_to_peer("ost", &osc->osc_peer);
+               if (err != 0) {
+                       printk("Cannot find 'ost' peer.\n");
+                       EXIT;
+                       return -EINVAL;
+               }
        }
 
-        osc->osc_tgt = &obd_dev[data->ioc_dev];
-       printk("OSC: tgt %d ost at %p\n", data->ioc_dev, &osc->osc_tgt->u.ost); 
-        if ( ! (osc->osc_tgt->obd_flags & OBD_ATTACHED) || 
-             ! (osc->osc_tgt->obd_flags & OBD_SET_UP) ){
-                printk("device not attached or not set up (%d)\n", 
-                       data->ioc_dev);
-                EXIT;
-               return -EINVAL;
-        } 
-
         MOD_INC_USE_COUNT;
-        EXIT; 
+        EXIT;
         return 0;
 } 
 
@@ -268,13 +379,14 @@ int osc_brw(int rw, struct obd_conn *conn, obd_count num_oa,
                size2 += oa_bufs[i] * sizeof(src);
        }
 
-       request = ost_prep_req(OST_PREPW, size1, NULL, size2, NULL);
+       request = ost_prep_req(OST_BRW, size1, NULL, size2, NULL);
        if (!request) { 
                printk("osc_connect: cannot pack req!\n"); 
                return -ENOMEM;
        }
 
        n = 0;
+       request->rq_req.ost->cmd = rw;
        ptr1 = ost_req_buf1(request->rq_req.ost);
        ptr2 = ost_req_buf2(request->rq_req.ost);
        for (i=0; i < num_oa; i++) { 
@@ -286,18 +398,25 @@ int osc_brw(int rw, struct obd_conn *conn, obd_count num_oa,
                }
        }
 
+       request->rq_replen = 
+               sizeof(struct ptlrep_hdr) + sizeof(struct ost_rep) + size2;
+
        rc = osc_queue_wait(conn, request);
        if (rc) { 
                EXIT;
                goto out;
        }
 
+#if 0
        ptr2 = ost_rep_buf2(request->rq_rep.ost); 
        if (request->rq_rep.ost->buflen2 != n * sizeof(struct niobuf)) { 
                printk(__FUNCTION__ ": buffer length wrong\n"); 
                goto out;
        }
 
+       if (rw == OBD_BRW_READ)
+               goto out;
+
        for (i=0; i < num_oa; i++) { 
                for (j = 0 ; j < oa_bufs[i] ; j++) { 
                        struct niobuf *dst;
@@ -308,7 +427,7 @@ int osc_brw(int rw, struct obd_conn *conn, obd_count num_oa,
                        n++;
                }
        }
-       //ost_complete_brw(rep); 
+#endif
 
  out:
        if (request->rq_rephdr)
@@ -323,34 +442,24 @@ int osc_brw(int rw, struct obd_conn *conn, obd_count num_oa,
 
        osc_free_req(request);
        return 0;
-
-
-
 }
 
-
 static int osc_cleanup(struct obd_device * obddev)
 {
-        ENTRY;
-
-        if ( !(obddev->obd_flags & OBD_SET_UP) ) {
-                EXIT;
-                return 0;
-        }
-
         MOD_DEC_USE_COUNT;
-        EXIT;
         return 0;
 }
 
-
 struct obd_ops osc_obd_ops = { 
        o_setup:   osc_setup,
        o_cleanup: osc_cleanup, 
        o_create: osc_create,
+       o_destroy: osc_destroy,
        o_getattr: osc_getattr,
+       o_setattr: osc_setattr,
        o_connect: osc_connect,
-       o_disconnect: osc_disconnect
+       o_disconnect: osc_disconnect,
+       o_brw: osc_brw
 };
 
 static int __init osc_init(void)