Whamcloud - gitweb
class/class_obd.c: small OBD_ATTACHED sanity cleanup; OBD_SET_UP fix.
authorbraam <braam>
Wed, 13 Feb 2002 23:17:05 +0000 (23:17 +0000)
committerbraam <braam>
Wed, 13 Feb 2002 23:17:05 +0000 (23:17 +0000)
include/linux/lustre_net.h: set the LUSTRE_NAL back to socknal
include/linux/class_obd.h: added full license boilerplate, #define cleanup
include/linux/obd_osc: there was some confusion with obd_ost; fixed.

- added bits to structures for ost_request
- added OSC/OST portals communication
- added osc_setattr()
- added rpc_unregister_service in an attempt to get a handle on cleanup
- brought tests/ostreq.sh into the 1990s
- fixed a trivial obdctl setattr bug

14 files changed:
lustre/include/linux/lustre_mds.h
lustre/include/linux/lustre_net.h
lustre/include/linux/obd_class.h
lustre/include/linux/obd_osc.h
lustre/include/linux/obd_ost.h
lustre/mdc/mdc_request.c
lustre/mds/handler.c
lustre/obdclass/class_obd.c
lustre/osc/osc_request.c
lustre/ost/ost_handler.c
lustre/ptlrpc/rpc.c
lustre/tests/llmountcleanup.sh
lustre/tests/ostreq.sh
lustre/utils/obdctl.c

index cfb58a7..bd65d92 100644 (file)
@@ -123,5 +123,3 @@ int mdc_create(struct lustre_peer *peer, struct inode *dir, const char *name,
 #define IOC_REQUEST_MAX_NR               33
 
 #endif
-
-
index 686fcca..1d047e9 100644 (file)
@@ -47,16 +47,15 @@ struct ptlrpc_service {
         struct lustre_peer srv_self;
 
         /* FIXME: perhaps a list of EQs, if multiple NIs are used? */
-        ptl_handle_eq_t srv_eq;
+        ptl_handle_eq_t srv_eq_h;
 
-        ptl_handle_me_t srv_me;
+        ptl_handle_me_t srv_me_h;
         ptl_process_id_t srv_id;
         ptl_md_t srv_md;
         ptl_handle_md_t srv_md_h;
         wait_queue_head_t *srv_wait_queue;
 };
 
-
 struct ptlrpc_request { 
        struct list_head rq_list;
        struct mds_obd *rq_obd;
@@ -99,9 +98,10 @@ int ptl_send_buf(struct ptlrpc_request *request, struct lustre_peer *peer,
                  int portal, int is_request);
 int ptl_send_rpc(struct ptlrpc_request *request, struct lustre_peer *peer);
 int rpc_register_service(struct ptlrpc_service *service, char *uuid);
+int rpc_unregister_service(struct ptlrpc_service *service);
 
 /* FIXME */
-#if 0
+#if 1
 # define LUSTRE_NAL "ksocknal"
 #else
 # define LUSTRE_NAL "kqswnal"
index 39e93a3..81e22d9 100644 (file)
@@ -1,12 +1,28 @@
-#ifndef __LINUX_CLASS_OBD_H
-#define __LINUX_CLASS_OBD_H
-/*
- * Copyright (C) 2001  Cluster File Systems, Inc.
+/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
+ * vim:expandtab:shiftwidth=8:tabstop=8:
+ *
+ *  Copyright (C) 2001, 2002 Cluster File Systems, Inc.
+ *
+ *   This file is part of Lustre, http://www.lustre.org.
+ *
+ *   Lustre is free software; you can redistribute it and/or
+ *   modify it under the terms of version 2 of the GNU General Public
+ *   License as published by the Free Software Foundation.
+ *
+ *   Lustre is distributed in the hope that it will be useful,
+ *   but WITHOUT ANY WARRANTY; without even the implied warranty of
+ *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ *   GNU General Public License for more details.
+ *
+ *   You should have received a copy of the GNU General Public License
+ *   along with Lustre; if not, write to the Free Software
+ *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
  *
- * This code is issued under the GNU General Public License.
- * See the file COPYING in this distribution
  */
 
+#ifndef __LINUX_CLASS_OBD_H
+#define __LINUX_CLASS_OBD_H
+
 #ifndef __KERNEL__
 #include <stdint.h>
 #define __KERNEL__
@@ -54,6 +70,7 @@ typedef struct {
 /* #include <linux/obd_fc.h> */
 #include <linux/obd_raid1.h>
 #include <linux/obd_ost.h>
+#include <linux/obd_osc.h>
 
 #ifdef __KERNEL__
 /* corresponds to one of the obdx */
@@ -186,12 +203,18 @@ static inline int obd_check_conn(struct obd_conn *conn)
 #define OBT(dev)        dev->obd_type->typ_ops
 #define OBP(dev,op)     dev->obd_type->typ_ops->o_ ## op
 
-#define OBD_CHECK_OP(conn,op) do { \
-        int rc = obd_check_conn(conn);\
-        if (rc) { printk("obd: error in operation: " #op "\n"); return rc; }\
-        if (!OBP(conn->oc_dev,op)) { printk("obd_" #op ": dev %d no operation\n", conn->oc_dev->obd_minor); \
-               return -EOPNOTSUPP;\
-       }\
+#define OBD_CHECK_OP(conn,op)                                  \
+do {                                                           \
+        int rc = obd_check_conn(conn);                         \
+        if (rc) {                                              \
+               printk("obd: error in operation: " #op "\n");   \
+               return rc;                                      \
+       }                                                       \
+        if (!OBP(conn->oc_dev,op)) {                           \
+               printk("obd_" #op ": dev %d no operation\n",    \
+                      conn->oc_dev->obd_minor);                \
+               return -EOPNOTSUPP;                             \
+       }                                                       \
 } while (0)
 
 static inline int obd_get_info(struct obd_conn *conn, obd_count keylen, void *key,
index e99c408..f341737 100644 (file)
  *
  */
 
-#ifndef _LUSTRE_OST_H
-#define _LUSTRE_OST_H
+#ifndef _LUSTRE_OSC_H
+#define _LUSTRE_OSC_H
 
 #include <linux/obd_support.h>
+#include <linux/lustre_net.h>
 
 #define OST_EXIT 1
 #define LUSTRE_OST_NAME "ost"
 
-struct ost_obd {
-       struct obd_device *ost_tgt;
-       struct obd_conn ost_conn;
-       struct task_struct *ost_thread;
-       wait_queue_head_t ost_waitq;
-       wait_queue_head_t ost_done_waitq;
-       int ost_flags;
-       spinlock_t ost_lock;
-       struct list_head ost_reqs;
-};
-
 struct osc_obd {
-       struct obd_device *ost_tgt;
-};
-
-struct ost_request { 
-       struct list_head rq_list;
-       struct ost_obd *rq_obd;
-       int rq_status;
-
-       char *rq_reqbuf;
-       int rq_reqlen;
-       struct ost_req_hdr *rq_reqhdr;
-       struct ost_req *rq_req;
-
-       char *rq_repbuf;
-       int rq_replen;
-       struct ost_rep_hdr *rq_rephdr;
-       struct ost_rep *rq_rep;
-
-        void *rq_reply_handle;
-       wait_queue_head_t rq_wait_for_rep;
+       struct obd_device *osc_tgt;
+        struct lustre_peer osc_peer;
 };
 
-/* ost/ost_pack.c */
-int ost_pack_req(char *buf1, int buflen1, char *buf2, int buflen2, struct ost_req_hdr **hdr, struct ost_req **req, int *len, char **buf);
-int ost_unpack_req(char *buf, int len, struct ost_req_hdr **hdr, struct ost_req **req);
-int ost_pack_rep(void *buf1, __u32 buflen1, void *buf2, __u32 buflen2, struct ost_rep_hdr **hdr, struct ost_rep **rep, int *len, char **buf);
-int ost_unpack_rep(char *buf, int len, struct ost_rep_hdr **hdr, struct ost_rep **rep);
-
 #endif
-
-
index da00eca..ab4a0e7 100644 (file)
@@ -42,10 +42,27 @@ struct ost_obd {
        int ost_flags;
        spinlock_t ost_lock;
        struct list_head ost_reqs;
+
+        struct ptlrpc_service *ost_service;
 };
 
-struct osc_obd {
-       struct obd_device *osc_tgt;
+struct ost_request { 
+       struct list_head rq_list;
+       struct ost_obd *rq_obd;
+       int rq_status;
+
+       char *rq_reqbuf;
+       int rq_reqlen;
+       struct ost_req_hdr *rq_reqhdr;
+       struct ost_req *rq_req;
+
+       char *rq_repbuf;
+       int rq_replen;
+       struct ost_rep_hdr *rq_rephdr;
+       struct ost_rep *rq_rep;
+
+        void *rq_reply_handle;
+       wait_queue_head_t rq_wait_for_rep;
 };
 
 /* ost/ost_pack.c */
index 654275f..ea996b6 100644 (file)
@@ -69,9 +69,6 @@ struct ptlrpc_request *mds_prep_req(int opcode, int namelen, char *name, int tgt
        return request;
 }
 
-
-
-
 static int mds_queue_wait(struct ptlrpc_request *req, struct lustre_peer *peer)
 {
        int rc;
@@ -114,7 +111,7 @@ static int mds_queue_wait(struct ptlrpc_request *req, struct lustre_peer *peer)
        return 0;
 }
 
-void mds_free_req(struct ptlrpc_request *request)
+void mdc_free_req(struct ptlrpc_request *request)
 {
        kfree(request);
 }
@@ -153,7 +150,7 @@ int mdc_getattr(struct lustre_peer *peer, ino_t ino, int type, int valid,
        }
 
  out: 
-       mds_free_req(request);
+       mdc_free_req(request);
        return rc;
 }
 
@@ -200,7 +197,7 @@ int mdc_readpage(struct lustre_peer *peer, ino_t ino, int type, __u64 offset,
        }
 
  out: 
-       mds_free_req(request);
+       mdc_free_req(request);
        return rc;
 }
 
index 19b9691..480d6d7 100644 (file)
@@ -71,7 +71,6 @@ static int mds_queue_req(struct ptlrpc_request *req)
        return 0;
 }
 
-/* XXX do this over the net */
 int mds_sendpage(struct ptlrpc_request *req, struct file *file, 
                    __u64 offset, struct niobuf *dst)
 {
@@ -114,7 +113,6 @@ int mds_sendpage(struct ptlrpc_request *req, struct file *file,
        return 0;
 }
 
-/* XXX replace with networking code */
 int mds_reply(struct ptlrpc_request *req)
 {
        struct ptlrpc_request *clnt_req = req->rq_reply_handle;
@@ -468,7 +466,7 @@ int mds_main(void *arg)
                        while (1) {
                                struct ptlrpc_request request;
 
-                               rc = PtlEQGet(mds->mds_service->srv_eq, &ev);
+                               rc = PtlEQGet(mds->mds_service->srv_eq_h, &ev);
                                if (rc != PTL_OK && rc != PTL_EQ_DROPPED)
                                        break;
                                /* FIXME: If we move to an event-driven model,
@@ -632,7 +630,6 @@ static int mds_cleanup(struct obd_device * obddev)
         mds->mds_sb = 0;
        kfree(mds->mds_fstype);
        lock_kernel();
-       
 
         MOD_DEC_USE_COUNT;
         EXIT;
index 3202dff..cdd4be0 100644 (file)
@@ -174,7 +174,7 @@ static int obd_class_ioctl (struct inode * inode, struct file * filp,
 
                 ENTRY;
                 /* have we attached a type to this device */
-                if ( obd->obd_type ||  (obd->obd_flags & OBD_ATTACHED) ){
+                if ( obd->obd_flags & OBD_ATTACHED ) {
                         printk("OBD: Device %d already typed as  %s.\n",
                                obd->obd_minor, MKSTR(obd->obd_type->typ_name));
                         return -EBUSY;
@@ -200,7 +200,6 @@ static int obd_class_ioctl (struct inode * inode, struct file * filp,
                }
 
                 if ( err ) {
-                        obd->obd_flags &= ~OBD_ATTACHED;
                         obd->obd_type = NULL;
                         EXIT;
                 } else {
@@ -272,6 +271,11 @@ static int obd_class_ioctl (struct inode * inode, struct file * filp,
         case OBD_IOC_CLEANUP: {
                 ENTRY;
 
+               if ( !(obd->obd_flags & OBD_SET_UP) ) {
+                       EXIT;
+                       return -EINVAL;
+               }
+
                 err = obd_cleanup(obd);
                 if ( err ) {
                         EXIT;
index 38ab56d..20a5745 100644 (file)
@@ -32,6 +32,9 @@
 
 extern int ost_queue_req(struct obd_device *, struct ptlrpc_request *);
 
+/* FIXME: this belongs in some sort of service struct */
+static int osc_xid = 1;
+
 struct ptlrpc_request *ost_prep_req(int opcode, int buflen1, char *buf1, 
                                 int buflen2, char *buf2)
 {
@@ -45,6 +48,9 @@ struct ptlrpc_request *ost_prep_req(int opcode, int buflen1, char *buf1,
                return NULL;
        }
 
+       memset(request, 0, sizeof(*request));
+       request->rq_xid = osc_xid++;
+
        rc = ost_pack_req(buf1, buflen1,  buf2, buflen2,
                          &request->rq_reqhdr, &request->rq_req.ost, 
                          &request->rq_reqlen, &request->rq_reqbuf);
@@ -58,50 +64,66 @@ struct ptlrpc_request *ost_prep_req(int opcode, int buflen1, char *buf1,
        return request;
 }
 
+/* XXX: unify with mdc_queue_wait */
 extern int osc_queue_wait(struct obd_conn *conn, struct ptlrpc_request *req)
 {
        struct obd_device *client = conn->oc_dev;
-       struct obd_device *target = client->u.osc.osc_tgt;
+       struct lustre_peer *peer = &conn->oc_dev->u.osc.osc_peer;
        int rc;
 
        ENTRY;
+
        /* set the connection id */
        req->rq_req.ost->connid = conn->oc_id;
 
-       CDEBUG(D_INODE, "tgt at %p, conn id %d, opcode %d request at: %p\n", 
-              &conn->oc_dev->u.osc.osc_tgt->u.ost, 
-              conn->oc_id, req->rq_reqhdr->opc, req);
-
        /* XXX fix the race here (wait_for_event?)*/
-       /* hand the packet over to the server */
-       rc =  ost_queue_req(target, req); 
+       if (peer == NULL) {
+               /* Local delivery */
+               CDEBUG(D_INODE, "\n");
+               rc = ost_queue_req(client, req); 
+       } else {
+               /* Remote delivery via portals. */
+               req->rq_req_portal = OST_REQUEST_PORTAL;
+               req->rq_reply_portal = OST_REPLY_PORTAL;
+               rc = ptl_send_rpc(req, peer);
+       }
        if (rc) { 
-               printk("osc_queue_wait: error %d, opcode %d\n", rc, 
+               printk(__FUNCTION__ ": error %d, opcode %d\n", rc, 
                       req->rq_reqhdr->opc); 
                return -rc;
        }
 
+       CDEBUG(D_INODE, "tgt at %p, conn id %d, opcode %d request at: %p\n", 
+              &conn->oc_dev->u.osc.osc_tgt->u.ost, 
+              conn->oc_id, req->rq_reqhdr->opc, req);
+
        /* wait for the reply */
        init_waitqueue_head(&req->rq_wait_for_rep);
+       CDEBUG(D_INODE, "-- sleeping\n");
        interruptible_sleep_on(&req->rq_wait_for_rep);
+       CDEBUG(D_INODE, "-- done\n");
+
+       rc = ost_unpack_rep(req->rq_repbuf, req->rq_replen, &req->rq_rephdr, 
+                           &req->rq_rep.ost); 
+       if (rc) {
+               printk(__FUNCTION__ ": mds_unpack_rep failed: %d\n", rc);
+               return rc;
+       }
 
-       ost_unpack_rep(req->rq_repbuf, req->rq_replen, &req->rq_rephdr, 
-                      &req->rq_rep.ost); 
-       printk("-->osc_queue_wait: buf %p len %d status %d\n"
-              req->rq_repbuf, req->rq_replen, req->rq_rephdr->status); 
+       if ( req->rq_rephdr->status == 0 )
+               CDEBUG(D_INODE, "buf %p len %d status %d\n", 
+                      req->rq_repbuf, req->rq_replen
+                      req->rq_rephdr->status); 
 
        EXIT;
-       return req->rq_rephdr->status;
+       return 0;
 }
 
 void osc_free_req(struct ptlrpc_request *request)
 {
-       if (request->rq_repbuf)
-               kfree(request->rq_repbuf);
        kfree(request);
 }
 
-
 int osc_connect(struct obd_conn *conn)
 {
        struct ptlrpc_request *request;
@@ -110,10 +132,13 @@ int osc_connect(struct obd_conn *conn)
        
        request = ost_prep_req(OST_CONNECT, 0, NULL, 0, NULL);
        if (!request) { 
-               printk("osc_connect: cannot pack req!\n"); 
+               printk(__FUNCTION__ ": cannot pack req!\n"); 
                return -ENOMEM;
        }
 
+       request->rq_replen = 
+               sizeof(struct ptlrep_hdr) + sizeof(struct ost_rep);
+
        rc = osc_queue_wait(conn, request);
        if (rc) { 
                EXIT;
@@ -137,10 +162,13 @@ int osc_disconnect(struct obd_conn *conn)
        
        request = ost_prep_req(OST_DISCONNECT, 0, NULL, 0, NULL);
        if (!request) { 
-               printk("osc_connect: cannot pack req!\n"); 
+               printk(__FUNCTION__ ": cannot pack req!\n"); 
                return -ENOMEM;
        }
 
+       request->rq_replen = 
+               sizeof(struct ptlrep_hdr) + sizeof(struct ost_rep);
+
        rc = osc_queue_wait(conn, request);
        if (rc) { 
                EXIT;
@@ -160,12 +188,14 @@ int osc_getattr(struct obd_conn *conn, struct obdo *oa)
 
        request = ost_prep_req(OST_GETATTR, 0, NULL, 0, NULL);
        if (!request) { 
-               printk("osc_connect: cannot pack req!\n"); 
+               printk(__FUNCTION__ ": cannot pack req!\n"); 
                return -ENOMEM;
        }
        
        memcpy(&request->rq_req.ost->oa, oa, sizeof(*oa));
        request->rq_req.ost->oa.o_valid = ~0;
+       request->rq_replen = 
+               sizeof(struct ptlrep_hdr) + sizeof(struct ost_rep);
        
        rc = osc_queue_wait(conn, request);
        if (rc) { 
@@ -183,6 +213,32 @@ int osc_getattr(struct obd_conn *conn, struct obdo *oa)
        return 0;
 }
 
+int osc_setattr(struct obd_conn *conn, struct obdo *oa)
+{
+       struct ptlrpc_request *request;
+       int rc; 
+
+       request = ost_prep_req(OST_SETATTR, 0, NULL, 0, NULL);
+       if (!request) { 
+               printk(__FUNCTION__ ": cannot pack req!\n"); 
+               return -ENOMEM;
+       }
+       
+       memcpy(&request->rq_req.ost->oa, oa, sizeof(*oa));
+       request->rq_replen = 
+               sizeof(struct ptlrep_hdr) + sizeof(struct ost_rep);
+       
+       rc = osc_queue_wait(conn, request);
+       if (rc) { 
+               EXIT;
+               goto out;
+       }
+
+ out:
+       osc_free_req(request);
+       return 0;
+}
+
 int osc_create(struct obd_conn *conn, struct obdo *oa)
 {
        struct ptlrpc_request *request;
@@ -199,6 +255,8 @@ int osc_create(struct obd_conn *conn, struct obdo *oa)
        
        memcpy(&request->rq_req.ost->oa, oa, sizeof(*oa));
        request->rq_req.ost->oa.o_valid = ~0;
+       request->rq_replen = 
+               sizeof(struct ptlrep_hdr) + sizeof(struct ost_rep);
        
        rc = osc_queue_wait(conn, request);
        if (rc) { 
@@ -222,23 +280,34 @@ static int osc_setup(struct obd_device *obddev, obd_count len,
        struct osc_obd *osc = &obddev->u.osc;
         ENTRY;
 
-       if (data->ioc_dev  < 0 || data->ioc_dev > MAX_OBD_DEVICES) { 
-               EXIT;
-               return -ENODEV;
+       if (data->ioc_dev >= 0 && data->ioc_dev < MAX_OBD_DEVICES) {
+               /* This is a local connection */
+               osc->osc_tgt = &obd_dev[data->ioc_dev];
+
+               printk("OSC: tgt %d ost at %p\n", data->ioc_dev,
+                      &osc->osc_tgt->u.ost);
+               if ( ! (osc->osc_tgt->obd_flags & OBD_ATTACHED) || 
+                    ! (osc->osc_tgt->obd_flags & OBD_SET_UP) ){
+                       printk("device not attached or not set up (%d)\n", 
+                              data->ioc_dev);
+                       EXIT;
+                       return -EINVAL;
+               }
+       } else {
+               int err;
+               /* This is a remote connection using Portals */
+
+               /* XXX: this should become something like ioc_inlbuf1 */
+               err = kportal_uuid_to_peer("ost", &osc->osc_peer);
+               if (err != 0) {
+                       printk("Cannot find 'ost' peer.\n");
+                       EXIT;
+                       return -EINVAL;
+               }
        }
 
-        osc->osc_tgt = &obd_dev[data->ioc_dev];
-       printk("OSC: tgt %d ost at %p\n", data->ioc_dev, &osc->osc_tgt->u.ost); 
-        if ( ! (osc->osc_tgt->obd_flags & OBD_ATTACHED) || 
-             ! (osc->osc_tgt->obd_flags & OBD_SET_UP) ){
-                printk("device not attached or not set up (%d)\n", 
-                       data->ioc_dev);
-                EXIT;
-               return -EINVAL;
-        } 
-
         MOD_INC_USE_COUNT;
-        EXIT; 
+        EXIT;
         return 0;
 } 
 
@@ -323,32 +392,20 @@ int osc_brw(int rw, struct obd_conn *conn, obd_count num_oa,
 
        osc_free_req(request);
        return 0;
-
-
-
 }
 
-
 static int osc_cleanup(struct obd_device * obddev)
 {
-        ENTRY;
-
-        if ( !(obddev->obd_flags & OBD_SET_UP) ) {
-                EXIT;
-                return 0;
-        }
-
         MOD_DEC_USE_COUNT;
-        EXIT;
         return 0;
 }
 
-
 struct obd_ops osc_obd_ops = { 
        o_setup:   osc_setup,
        o_cleanup: osc_cleanup, 
        o_create: osc_create,
        o_getattr: osc_getattr,
+       o_setattr: osc_setattr,
        o_connect: osc_connect,
        o_disconnect: osc_disconnect
 };
index da3eed9..35c1be7 100644 (file)
@@ -4,17 +4,18 @@
  *  
  *  Lustre Object Server Module (OST)
  * 
- *  Copyright (C) 2001  Cluster File Systems, Inc.
+ *  Copyright (C) 2001, 2002 Cluster File Systems, Inc.
  *
  *  This code is issued under the GNU General Public License.
  *  See the file COPYING in this distribution
  *
  *  by Peter Braam <braam@clusterfs.com>
  * 
- *  This server is single threaded at present (but can easily be multi threaded). 
- *  For testing and management it is treated as an obd_device, although it does
- *  not export a full OBD method table (the requests are coming in over the wire, 
- *  so object target modules do not have a full method table.)
+ *  This server is single threaded at present (but can easily be multi
+ *  threaded). For testing and management it is treated as an
+ *  obd_device, although it does not export a full OBD method table
+ *  (the requests are coming in over the wire, so object target
+ *  modules do not have a full method table.)
  * 
  */
 
@@ -72,34 +73,34 @@ static int ost_queue_req(struct obd_device *obddev, struct ptlrpc_request *req)
        return 0;
 }
 
-
-/* XXX replace with networking code */
 int ost_reply(struct obd_device *obddev, struct ptlrpc_request *req)
 {
        struct ptlrpc_request *clnt_req = req->rq_reply_handle;
 
        ENTRY;
-       printk("ost_reply: req %p clnt_req at %p\n", req, clnt_req); 
 
-       /* free the request buffer */
-       kfree(req->rq_reqbuf);
-       req->rq_reqbuf = NULL; 
-       
-       /* move the reply to the client */ 
-       clnt_req->rq_replen = req->rq_replen;
-       clnt_req->rq_repbuf = req->rq_repbuf;
+       if (req->rq_ost->ost_service != NULL) {
+               /* This is a request that came from the network via portals. */
 
-       printk("---> client req %p repbuf %p len %d status %d\n", 
-              clnt_req, clnt_req->rq_repbuf, clnt_req->rq_replen, 
-              req->rq_rephdr->status); 
+               /* FIXME: we need to increment the count of handled events */
+               ptl_send_buf(req, &req->rq_peer, OST_REPLY_PORTAL, 0);
+       } else {
+               /* This is a local request that came from another thread. */
+
+               /* move the reply to the client */ 
+               clnt_req->rq_replen = req->rq_replen;
+               clnt_req->rq_repbuf = req->rq_repbuf;
+               req->rq_repbuf = NULL;
+               req->rq_replen = 0;
+
+               /* free the request buffer */
+               kfree(req->rq_reqbuf);
+               req->rq_reqbuf = NULL;
+
+               /* wake up the client */ 
+               wake_up_interruptible(&clnt_req->rq_wait_for_rep); 
+       }
 
-       req->rq_repbuf = NULL;
-       req->rq_replen = 0;
-       
-       /* free the server request */
-       kfree(req); 
-       /* wake up the client */ 
-       wake_up_interruptible(&clnt_req->rq_wait_for_rep); 
        EXIT;
        return 0;
 }
@@ -109,6 +110,7 @@ int ost_error(struct obd_device *obddev, struct ptlrpc_request *req)
        struct ptlrep_hdr *hdr;
 
        ENTRY;
+
        hdr = kmalloc(sizeof(*hdr), GFP_KERNEL);
        if (!hdr) { 
                EXIT;
@@ -223,7 +225,8 @@ static int ost_setattr(struct ost_obd *ost, struct ptlrpc_request *req)
                return rc;
        }
 
-       memcpy(&req->rq_rep.ost->oa, &req->rq_req.ost->oa, sizeof(req->rq_req.ost->oa));
+       memcpy(&req->rq_rep.ost->oa, &req->rq_req.ost->oa,
+              sizeof(req->rq_req.ost->oa));
 
        req->rq_rep.ost->result =ost->ost_tgt->obd_type->typ_ops->o_setattr
                (&conn, &req->rq_rep.ost->oa); 
@@ -532,35 +535,62 @@ int ost_main(void *arg)
 
        /* And now, wait forever for commit wakeup events. */
        while (1) {
-               struct ptlrpc_request *request;
                int rc; 
 
                if (ost->ost_flags & OST_EXIT)
                        break;
 
-
                wake_up(&ost->ost_done_waitq);
                interruptible_sleep_on(&ost->ost_waitq);
 
                CDEBUG(D_INODE, "lustre_ost wakes\n");
                CDEBUG(D_INODE, "pick up req here and continue\n"); 
 
-               if (list_empty(&ost->ost_reqs)) { 
-                       CDEBUG(D_INODE, "woke because of timer\n"); 
-               } else { 
-                       printk("---> %d\n", __LINE__);
-                       request = list_entry(ost->ost_reqs.next, 
-                                            struct ptlrpc_request, rq_list);
-                       printk("---> %d\n", __LINE__);
-                       list_del(&request->rq_list);
-                       rc = ost_handle(obddev, request); 
+
+               if (ost->ost_service != NULL) {
+                       ptl_event_t ev;
+
+                       while (1) {
+                               struct ptlrpc_request request;
+
+                               rc = PtlEQGet(ost->ost_service->srv_eq_h, &ev);
+                               if (rc != PTL_OK && rc != PTL_EQ_DROPPED)
+                                       break;
+                               /* FIXME: If we move to an event-driven model,
+                                * we should put the request on the stack of
+                                * mds_handle instead. */
+                               memset(&request, 0, sizeof(request));
+                               request.rq_reqbuf = ev.mem_desc.start +
+                                       ev.offset;
+                               request.rq_reqlen = ev.mem_desc.length;
+                               request.rq_ost = ost;
+                               request.rq_xid = ev.match_bits;
+
+                               request.rq_peer.peer_nid = ev.initiator.nid;
+                               /* FIXME: this NI should be the incoming NI.
+                                * We don't know how to find that from here. */
+                               request.rq_peer.peer_ni =
+                                       ost->ost_service->srv_self.peer_ni;
+                               rc = ost_handle(obddev, &request);
+                       }
+               } else {
+                       struct ptlrpc_request *request;
+
+                       if (list_empty(&ost->ost_reqs)) { 
+                               CDEBUG(D_INODE, "woke because of timer\n"); 
+                       } else { 
+                               request = list_entry(ost->ost_reqs.next,
+                                                    struct ptlrpc_request,
+                                                    rq_list);
+                               list_del(&request->rq_list);
+                               rc = ost_handle(obddev, request); 
+                       }
                }
        }
 
        /* XXX maintain a list of all managed devices: cleanup here */
-       printk("---> %d\n", __LINE__);
+
        ost->ost_thread = NULL;
-       printk("---> %d\n", __LINE__);
        wake_up(&ost->ost_done_waitq);
        printk("lustre_ost: exiting\n");
        return 0;
@@ -602,6 +632,7 @@ static int ost_setup(struct obd_device *obddev, obd_count len,
        struct obd_ioctl_data* data = buf;
        struct ost_obd *ost = &obddev->u.ost;
        struct obd_device *tgt;
+       struct lustre_peer peer;
        int err; 
         ENTRY;
 
@@ -634,6 +665,20 @@ static int ost_setup(struct obd_device *obddev, obd_count len,
 
        spin_lock_init(&obddev->u.ost.ost_lock);
 
+       err = kportal_uuid_to_peer("self", &peer);
+       if (err == 0) {
+               ost->ost_service = kmalloc(sizeof(*ost->ost_service),
+                                          GFP_KERNEL);
+               if (ost->ost_service == NULL)
+                       return -ENOMEM;
+               ost->ost_service->srv_buf_size = 64 * 1024;
+               ost->ost_service->srv_portal = OST_REQUEST_PORTAL;
+               memcpy(&ost->ost_service->srv_self, &peer, sizeof(peer));
+               ost->ost_service->srv_wait_queue = &ost->ost_waitq;
+
+               rpc_register_service(ost->ost_service, "self");
+       }
+
        ost_start_srv_thread(obddev);
 
         MOD_INC_USE_COUNT;
@@ -660,6 +705,8 @@ static int ost_cleanup(struct obd_device * obddev)
                 return -EBUSY;
         }
 
+       rpc_unregister_service(ost->ost_service);
+
        ost_stop_srv_thread(ost);
 
        if (!list_empty(&ost->ost_reqs)) {
index 9a19265..1059e89 100644 (file)
 
 static ptl_handle_eq_t req_eq, bulk_source_eq, bulk_sink_eq;
 
+/* This callback performs two functions:
+ *
+ * 1. Free the request buffer after it has gone out on the wire
+ * 2. Wake up the thread waiting for the reply once it comes in.
+ */
 static int request_callback(ptl_event_t *ev, void *data)
 {
         struct ptlrpc_request *rpc = ev->mem_desc.user_ptr;
@@ -239,14 +244,14 @@ int rpc_register_service(struct ptlrpc_service *service, char *uuid)
         service->srv_id.rid = PTL_ID_ANY;
 
        rc = PtlMEAttach(peer.peer_ni, service->srv_portal, service->srv_id,
-                         0, ~0, PTL_RETAIN, &service->srv_me);
+                         0, ~0, PTL_RETAIN, &service->srv_me_h);
         if (rc != PTL_OK) {
                 printk("PtlMEAttach failed: %d\n", rc);
                 return rc;
         }
 
         rc = PtlEQAlloc(peer.peer_ni, 128, incoming_callback, service,
-                        &service->srv_eq);
+                        &service->srv_eq_h);
         if (rc != PTL_OK) {
                 printk("PtlEQAlloc failed: %d\n", rc);
                 return rc;
@@ -260,9 +265,9 @@ int rpc_register_service(struct ptlrpc_service *service, char *uuid)
        service->srv_md.threshold       = PTL_MD_THRESH_INF;
        service->srv_md.options         = PTL_MD_OP_PUT;
        service->srv_md.user_ptr        = service;
-       service->srv_md.eventq          = service->srv_eq;
+       service->srv_md.eventq          = service->srv_eq_h;
 
-       rc = PtlMDAttach(service->srv_me, service->srv_md,
+       rc = PtlMDAttach(service->srv_me_h, service->srv_md,
                          PTL_RETAIN, &service->srv_md_h);
         if (rc != PTL_OK) {
                 printk("PtlMDAttach failed: %d\n", rc);
@@ -273,6 +278,23 @@ int rpc_register_service(struct ptlrpc_service *service, char *uuid)
         return 0;
 }
 
+int rpc_unregister_service(struct ptlrpc_service *service)
+{
+        int rc;
+
+        rc = PtlMDUnlink(service->srv_md_h);
+        if (rc)
+                printk(__FUNCTION__ ": PtlMDUnlink failed: %d\n", rc);
+        rc = PtlEQFree(service->srv_eq_h);
+        if (rc)
+                printk(__FUNCTION__ ": PtlEQFree failed: %d\n", rc);
+        rc = PtlMEUnlink(service->srv_me_h);
+        if (rc)
+                printk(__FUNCTION__ ": PtlMEUnlink failed: %d\n", rc);
+
+        kfree(service->srv_buf);
+}
+
 static int req_init_portals(void)
 {
         int rc;
index 7621fba..473aca8 100755 (executable)
@@ -36,6 +36,7 @@ setup tcp
 disconnect localhost
 del_uuid self
 del_uuid mds
+del_uuid ost
 EOF
 
 rmmod ksocknal
index bebc73a..685488f 100644 (file)
@@ -1,20 +1,35 @@
 #!/bin/sh
 
-R=/r
+SRCDIR="`dirname $0`"
+. $SRCDIR/common.sh
 
-# insmod /lib/modules/2.4.17/kernel/drivers/block/loop.o
-insmod $R/usr/src/obd/class/obdclass.o 
-insmod $R/usr/src/obd/ext2obd/obdext2.o
-insmod $R/usr/src/obd/ost/ost.o
-insmod $R/usr/src/obd/osc/osc.o
-insmod $R/usr/src/obd/mds/mds.o
-insmod $R/usr/src/obd/mdc/mdc.o
-insmod $R/usr/src/obd/llight/llight.o
+SERVER=localhost
 
+mknod /dev/portals c 10 240
+
+insmod $R/usr/src/portals/linux/oslib/portals.o || exit -1
+insmod $R/usr/src/portals/linux/socknal/ksocknal.o || exit -1
+
+$R/usr/src/portals/linux/utils/acceptor 1234 &
+
+$R/usr/src/portals/linux/utils/ptlctl <<EOF
+mynid
+setup tcp
+connect $SERVER 1234
+add_uuid ost
+add_uuid self
+quit
+EOF
+
+insmod $R/usr/src/obd/rpc/ptlrpc.o || exit -1
+insmod $R/usr/src/obd/class/obdclass.o || exit -1
+insmod $R/usr/src/obd/ext2obd/obdext2.o || exit -1
+insmod $R/usr/src/obd/ost/ost.o || exit -1
+insmod $R/usr/src/obd/osc/osc.o || exit -1
 
 dd if=/dev/zero of=/tmp/fs bs=1024 count=10000
 mke2fs -F /tmp/fs
-losetup /dev/loop/0 /tmp/fs
+losetup ${LOOP}0 /tmp/fs || exit -1
 
 echo 4095 > /proc/sys/obd/debug
 echo 4095 > /proc/sys/obd/trace
@@ -24,16 +39,12 @@ mknod /dev/obd c 10 241
 $R/usr/src/obd/utils/obdctl <<EOF
 device 0
 attach obdext2
-setup /dev/loop/0
+setup ${LOOP}0
 device 1
 attach ost
 setup 0
 device 2
 attach osc
-setup 1
+setup
 quit
 EOF
-
-
-
-
index f6552a6..f4074fb 100644 (file)
@@ -250,14 +250,18 @@ static int jt_setup(int argc, char **argv)
 
        IOCINIT(data);
 
-       if ( argc != 2 && argc != 3  ) {
-               fprintf(stderr, "Usage: %s device [fstype]\n", argv[0]);
+       if ( argc > 3  ) {
+               fprintf(stderr, "Usage: %s [device] [fstype]\n", argv[0]);
                return 1;
        }
 
-       data.ioc_inllen1 =  strlen(argv[1]) + 1;
-       data.ioc_inlbuf1 = argv[1];
-       data.ioc_dev = strtoul(argv[1], NULL, 0); 
+       if (argc > 1) {
+               data.ioc_inllen1 =  strlen(argv[1]) + 1;
+               data.ioc_inlbuf1 = argv[1];
+               data.ioc_dev = strtoul(argv[1], NULL, 0);
+       } else {
+               data.ioc_dev = -1;
+       }
        if ( argc == 3 ) { 
                data.ioc_inllen2 = strlen(argv[2]) + 1;
                data.ioc_inlbuf2 = argv[2];
@@ -330,7 +334,8 @@ static int jt_setattr(int argc, char **argv)
 
        IOCINIT(data);
        if (argc < 2) { 
-               printf("usage %s id mode\n", argv[0]); 
+               printf("usage: %s id mode\n", argv[0]); 
+               return -1;
        }
 
         data.ioc_obdo1.o_id = strtoul(argv[1], NULL, 0);
@@ -358,7 +363,7 @@ static int jt_destroy(int argc, char **argv)
 
        rc = ioctl(fd, OBD_IOC_DESTROY , &data);
        if (rc < 0) {
-               printf("setattr: %x %s\n", OBD_IOC_SETATTR, strerror(errno));
+               printf("setattr: %x %s\n", OBD_IOC_DESTROY, strerror(errno));
        }
        return rc;
 }