Whamcloud - gitweb
No new functionality outside of the DLM. ptlrpc_client no longer contains
authorpschwan <pschwan>
Wed, 10 Apr 2002 16:27:43 +0000 (16:27 +0000)
committerpschwan <pschwan>
Wed, 10 Apr 2002 16:27:43 +0000 (16:27 +0000)
a peer, so the peer is passed in and managed separately.

- All of the DLM calls are hooked up to the RPC system now
- Some unmaintained local processing code removed
- Added ldlm network-related bits to the test scripts

27 files changed:
lustre/include/linux/lustre_dlm.h
lustre/include/linux/lustre_idl.h
lustre/include/linux/lustre_lite.h
lustre/include/linux/lustre_mds.h
lustre/include/linux/lustre_net.h
lustre/include/linux/obd.h
lustre/include/linux/obd_class.h
lustre/include/linux/obd_support.h
lustre/ldlm/Makefile.am
lustre/ldlm/ldlm_lock.c
lustre/ldlm/ldlm_lockd.c
lustre/ldlm/ldlm_request.c [new file with mode: 0644]
lustre/ldlm/ldlm_resource.c
lustre/ldlm/ldlm_test.c
lustre/llite/dir.c
lustre/llite/file.c
lustre/llite/namei.c
lustre/llite/super.c
lustre/mdc/mdc_reint.c
lustre/mdc/mdc_request.c
lustre/mds/handler.c
lustre/osc/osc_request.c
lustre/ost/ost_handler.c
lustre/ptlrpc/client.c
lustre/ptlrpc/niobuf.c
lustre/tests/common.sh
lustre/tests/llmountcleanup.sh

index c720b3a..9fd38b5 100644 (file)
@@ -5,30 +5,29 @@
 #ifndef _LUSTRE_DLM_H__
 #define _LUSTRE_DLM_H__
 
-#include <linux/kp30.h>
-#include <linux/list.h>
+#ifdef __KERNEL__
 
 #include <linux/obd_class.h>
-
-#ifdef __KERNEL__
+#include <linux/lustre_net.h>
 
 #define OBD_LDLM_DEVICENAME  "ldlm"
 
-typedef  int cluster_host;
-typedef  int cluster_pid;
+typedef int cluster_host;
+typedef int cluster_pid;
 
 typedef enum {
         ELDLM_OK = 0,
-        ELDLM_BLOCK_GRANTED = 600,
-        ELDLM_BLOCK_CONV,
-        ELDLM_BLOCK_WAIT,
-        ELDLM_BAD_NAMESPACE,
-        ELDLM_LOCK_CHANGED
+
+        ELDLM_LOCK_CHANGED = 300,
+
+        ELDLM_NAMESPACE_EXISTS = 400,
+        ELDLM_BAD_NAMESPACE    = 401
 } ldlm_error_t;
 
 #define LDLM_FL_LOCK_CHANGED   (1 << 0)
-#define LDLM_FL_COMPLETION_AST (1 << 1)
-#define LDLM_FL_BLOCKING_AST   (1 << 2)
+#define LDLM_FL_BLOCK_GRANTED  (1 << 1)
+#define LDLM_FL_BLOCK_CONV     (1 << 2)
+#define LDLM_FL_BLOCK_WAIT     (1 << 3)
 
 #define L2B(c) (1 << c)
 
@@ -108,10 +107,11 @@ struct ldlm_lock {
         ldlm_mode_t           l_granted_mode;
         ldlm_lock_callback    l_completion_ast;
         ldlm_lock_callback    l_blocking_ast;
-        struct lustre_peer   *l_peer;
+        struct lustre_peer    l_peer;
         void                 *l_data;
         __u32                 l_data_len;
         struct ldlm_extent    l_extent;
+        struct ldlm_handle    l_remote_handle;
         //void                 *l_event;
         //XXX cluster_host    l_holder;
         __u32                 l_version[RES_VERSION_SIZE];
@@ -188,6 +188,7 @@ int ldlm_extent_policy(struct ldlm_resource *, struct ldlm_extent *,
 
 /* ldlm_lock.c */
 void ldlm_lock_free(struct ldlm_lock *lock);
+void ldlm_lock2desc(struct ldlm_lock *lock, struct ldlm_lock_desc *desc);
 ldlm_error_t ldlm_local_lock_enqueue(struct obd_device *obddev,
                                      __u32 ns_id,
                                      struct ldlm_handle *parent_lock_handle,
@@ -213,9 +214,9 @@ int ldlm_test(struct obd_device *device);
 
 /* resource.c */
 struct ldlm_namespace *ldlm_namespace_find(struct obd_device *, __u32 id);
-struct ldlm_namespace *ldlm_namespace_new(struct obd_device *, __u32 id);
+ldlm_error_t ldlm_namespace_new(struct obd_device *, __u32 id,
+                                struct ldlm_namespace **);
 int ldlm_namespace_free(struct ldlm_namespace *ns);
-void ldlm_resource_dump(struct ldlm_resource *res);
 struct ldlm_resource *ldlm_resource_get(struct ldlm_namespace *ns,
                                         struct ldlm_resource *parent,
                                         __u64 *name, __u32 type, int create);
@@ -223,6 +224,27 @@ int ldlm_resource_put(struct ldlm_resource *res);
 void ldlm_resource_add_lock(struct ldlm_resource *res, struct list_head *head,
                             struct ldlm_lock *lock);
 void ldlm_resource_del_lock(struct ldlm_lock *lock);
+void ldlm_res2desc(struct ldlm_resource *res, struct ldlm_resource_desc *desc);
+void ldlm_resource_dump(struct ldlm_resource *res);
+
+/* ldlm_request.c */
+int ldlm_cli_namespace_new(struct ptlrpc_client *, struct lustre_peer *,
+                           __u32 ns_id, struct ptlrpc_request **);
+int ldlm_cli_enqueue(struct ptlrpc_client *cl, struct lustre_peer *peer,
+                     __u32 ns_id,
+                     struct ldlm_handle *parent_lock_handle,
+                     __u64 *res_id,
+                     __u32 type,
+                     struct ldlm_extent *req_ex,
+                     ldlm_mode_t mode,
+                     int *flags,
+                     void *data,
+                     __u32 data_len,
+                     struct ldlm_handle *lockh,
+                     struct ptlrpc_request **request);
+int ldlm_cli_callback(struct ldlm_lock *lock, struct ldlm_lock *new,
+                      void *data, __u32 data_len);
+
 
 #endif /* __KERNEL__ */
 
index 9e4d842..067a834 100644 (file)
@@ -155,6 +155,7 @@ struct obd_ioobj {
  *   MDS REQ RECORDS
  */
 
+/* opcodes */
 #define MDS_GETATTR   1
 #define MDS_OPEN      2
 #define MDS_CLOSE     3
@@ -274,10 +275,11 @@ static inline void ll_inode2fid(struct ll_fid *fid, struct inode *inode)
  */
 
 /* opcodes */
-#define LDLM_ENQUEUE   1
-#define LDLM_CONVERT   2
-#define LDLM_CANCEL    3
-#define LDLM_CALLBACK  4
+#define LDLM_NAMESPACE_NEW 1
+#define LDLM_ENQUEUE       2
+#define LDLM_CONVERT       3
+#define LDLM_CANCEL        4
+#define LDLM_CALLBACK      5
 
 #define RES_NAME_SIZE 3
 #define RES_VERSION_SIZE 4
@@ -302,19 +304,31 @@ struct ldlm_extent {
         __u64 end;
 };
 
+struct ldlm_resource_desc {
+        __u32 lr_ns_id;
+        __u32 lr_type;
+        __u64 lr_name[RES_NAME_SIZE];
+        __u64 lr_version[RES_VERSION_SIZE];
+};
+
+struct ldlm_lock_desc {
+        struct ldlm_resource_desc l_resource;
+        ldlm_mode_t l_req_mode;
+        ldlm_mode_t l_granted_mode;
+        struct ldlm_extent l_extent;
+        __u32 l_version[RES_VERSION_SIZE];
+};
+
 struct ldlm_request {
-        __u32 ns_id;
-        __u64 res_id[RES_NAME_SIZE];
-        __u32 res_type;
         __u32 flags;
-        struct ldlm_extent lock_extent;
-        struct ldlm_handle parent_res_handle;
-        struct ldlm_handle parent_lock_handle;
-        ldlm_mode_t mode;
+        struct ldlm_lock_desc lock_desc;
+        struct ldlm_handle lock_handle1;
+        struct ldlm_handle lock_handle2;
 };
 
 struct ldlm_reply {
         struct ldlm_handle lock_handle;
+        struct ldlm_extent lock_extent;
 };
 
 /*
index 80dc7cd..41a222b 100644 (file)
@@ -43,7 +43,9 @@ struct ll_sb_info {
         unsigned long            ll_cache_count;
         struct semaphore         ll_list_mutex;
         struct ptlrpc_client     ll_mds_client;
+        struct lustre_peer       ll_mds_peer;
         struct ptlrpc_client     ll_ost_client;
+        struct lustre_peer       ll_ost_peer;
 };
 
 
index 52a1d97..6696006 100644 (file)
@@ -79,31 +79,30 @@ void mds_rename_pack(struct mds_rec_rename *, struct inode *srcdir,
 struct dentry *mds_fid2dentry(struct mds_obd *mds, struct ll_fid *fid, struct vfsmount **mnt);
 
 /* llight/request.c */
-int mdc_getattr(struct ptlrpc_client *peer, ino_t ino, int type, int valid, 
-                struct ptlrpc_request **);
-int mdc_setattr(struct ptlrpc_client *peer, struct inode *inode,
+int mdc_getattr(struct ptlrpc_client *, struct lustre_peer *, ino_t ino,
+                int type, int valid, struct ptlrpc_request **);
+int mdc_setattr(struct ptlrpc_client *, struct lustre_peer *, struct inode *,
                 struct iattr *iattr, struct ptlrpc_request **);
-int mdc_open(struct ptlrpc_client *cl, ino_t ino, int type, int flags,
-             __u64 *fh, struct ptlrpc_request **req);
-int mdc_close(struct ptlrpc_client *cl, ino_t ino, int type, __u64 fh, 
-              struct ptlrpc_request **req);
-int mdc_readpage(struct ptlrpc_client *peer, ino_t ino, int type, __u64 offset,
-                 char *addr, struct ptlrpc_request **);
-int mdc_create(struct ptlrpc_client *peer, 
+int mdc_open(struct ptlrpc_client *, struct lustre_peer *, ino_t ino, int type,
+             int flags, __u64 *fh, struct ptlrpc_request **req);
+int mdc_close(struct ptlrpc_client *cl, struct lustre_peer *peer, ino_t ino,
+              int type, __u64 fh,  struct ptlrpc_request **req);
+int mdc_readpage(struct ptlrpc_client *, struct lustre_peer *, ino_t ino,
+                 int type, __u64 offset, char *addr, struct ptlrpc_request **);
+int mdc_create(struct ptlrpc_client *, struct lustre_peer *,
                struct inode *dir, const char *name, int namelen, 
                const char *tgt, int tgtlen, 
                int mode, __u64 id, __u32 uid, __u32 gid, __u64 time, 
                struct ptlrpc_request **);
-int mdc_unlink(struct ptlrpc_client *peer, struct inode *dir,
+int mdc_unlink(struct ptlrpc_client *, struct lustre_peer *, struct inode *dir,
                struct inode *child, const char *name, int namelen, 
                struct ptlrpc_request **);
-int mdc_link(struct ptlrpc_client *peer, struct dentry *src, 
+int mdc_link(struct ptlrpc_client *, struct lustre_peer *, struct dentry *src, 
                struct inode *dir, const char *name, int namelen, 
                struct ptlrpc_request **);
-int mdc_rename(struct ptlrpc_client *peer, struct inode *src, 
+int mdc_rename(struct ptlrpc_client *, struct lustre_peer *, struct inode *src, 
                struct inode *tgt, const char *old, int oldlen, 
-               const char *new, int newlen, 
-               struct ptlrpc_request **);
+               const char *new, int newlen, struct ptlrpc_request **);
 int mdc_create_client(char *uuid, struct ptlrpc_client *cl);
 
 struct mds_fs_operations {
index c211ec1..bd37fce 100644 (file)
 #define MDS_REPLY_PORTAL   11
 #define MDS_BULK_PORTAL    12
 
-#define LDLM_REQUEST_PORTAL 13
-#define LDLM_REPLY_PORTAL   14
+#define LDLM_REQUEST_PORTAL     13
+#define LDLM_REPLY_PORTAL       14
+#define LDLM_CLI_REQUEST_PORTAL 15
+#define LDLM_CLI_REPLY_PORTAL   16
 
 /* default rpc ring length */
 #define RPC_RING_LENGTH    2
 
 #define SVC_STOPPING 1
-#define SVC_RUNNING 2
-#define SVC_STOPPED 4
-#define SVC_KILLED  8
-#define SVC_EVENT  16
-#define SVC_LIST   32
-#define SVC_SIGNAL 64
+#define SVC_RUNNING  2
+#define SVC_STOPPED  4
+#define SVC_KILLED   8
+#define SVC_EVENT   16
+#define SVC_LIST    32
+#define SVC_SIGNAL  64
 
 struct ptlrpc_client {
-        struct lustre_peer cli_server;
         struct obd_device *cli_obd;
         struct list_head cli_sending_head;
         struct list_head cli_sent_head;
@@ -192,20 +193,20 @@ int ptlrpc_send_bulk(struct ptlrpc_bulk_desc *, int portal);
 int ptl_send_buf(struct ptlrpc_request *, struct lustre_peer *, int portal);
 int ptlrpc_register_bulk(struct ptlrpc_bulk_desc *);
 int ptlrpc_abort_bulk(struct ptlrpc_bulk_desc *bulk);
-int ptlrpc_reply(struct obd_device *obddev, struct ptlrpc_service *svc,
-                 struct ptlrpc_request *req);
-int ptlrpc_error(struct obd_device *obddev, struct ptlrpc_service *svc,
-                 struct ptlrpc_request *req);
+int ptlrpc_reply(struct ptlrpc_service *svc, struct ptlrpc_request *req);
+int ptlrpc_error(struct ptlrpc_service *svc, struct ptlrpc_request *req);
 int ptl_send_rpc(struct ptlrpc_request *request, struct ptlrpc_client *cl);
 void ptlrpc_link_svc_me(struct ptlrpc_service *service, int i);
 
 /* rpc/client.c */
 void ptlrpc_init_client(int dev, int req_portal, int rep_portal,
-                       struct ptlrpc_client *cl);
-int ptlrpc_connect_client(int dev, char *uuid, struct ptlrpc_client *cl);
+                        struct ptlrpc_client *cl);
+int ptlrpc_connect_client(char *uuid, struct ptlrpc_client *cl,
+                          struct lustre_peer *peer);
 int ptlrpc_queue_wait(struct ptlrpc_client *cl, struct ptlrpc_request *req);
 int ptlrpc_queue_req(struct ptlrpc_client *peer, struct ptlrpc_request *req);
-struct ptlrpc_request *ptlrpc_prep_req(struct ptlrpc_client *cl, int opcode,
+struct ptlrpc_request *ptlrpc_prep_req(struct ptlrpc_client *cl,
+                                       struct lustre_peer *peer, int opcode,
                                        int count, int *lengths, char **bufs);
 void ptlrpc_free_req(struct ptlrpc_request *request);
 struct ptlrpc_bulk_desc *ptlrpc_prep_bulk(struct lustre_peer *);
index d97bdba..4d41e8a 100644 (file)
@@ -79,6 +79,8 @@ struct mds_obd {
 
 struct ldlm_obd {
         struct ptlrpc_service *ldlm_service;
+        struct ptlrpc_client *ldlm_client;
+        struct lustre_peer ldlm_server_peer;
 
         struct list_head ldlm_namespaces;
         spinlock_t       ldlm_lock;
@@ -126,6 +128,7 @@ struct ost_obd {
 struct osc_obd {
         struct obd_device *osc_tgt;
         struct ptlrpc_client *osc_client;
+        struct lustre_peer osc_peer;
 };
 
 /* corresponds to one of the obd's */
index 5e47065..1ce67bd 100644 (file)
 #define __LINUX_CLASS_OBD_H
 
 #ifndef __KERNEL__
-#include <stdint.h>
-#define __KERNEL__
-#include <linux/list.h>
-#undef __KERNEL__
+# include <stdint.h>
+# define __KERNEL__
+# include <linux/list.h>
+# undef __KERNEL__
 #else 
 #include <asm/segment.h>
 #include <asm/uaccess.h>
index 94fb231..6e57009 100644 (file)
@@ -69,10 +69,11 @@ extern unsigned long obd_fail_loc;
 #define OBD_FAIL_OST_PUNCH_NET           0x20b
 
 #define OBB_FAIL_LDLM                    0x300
-#define OBD_FAIL_LDLM_ENQUEUE            0x301
-#define OBD_FAIL_LDLM_CONVERT            0x302
-#define OBD_FAIL_LDLM_CANCEL             0x303
-#define OBD_FAIL_LDLM_CALLBACK           0x304
+#define OBD_FAIL_LDLM_NAMESPACE_NEW      0x301
+#define OBD_FAIL_LDLM_ENQUEUE            0x302
+#define OBD_FAIL_LDLM_CONVERT            0x303
+#define OBD_FAIL_LDLM_CANCEL             0x304
+#define OBD_FAIL_LDLM_CALLBACK           0x305
 
 /* preparation for a more advanced failure testbed (not functional yet) */
 #define OBD_FAIL_MASK_SYS    0x0000FF00
@@ -95,6 +96,7 @@ do {                                                                         \
         }                                                                    \
 } while(0)
 
+#include <linux/types.h>
 #include <linux/blkdev.h>
 
 static inline void OBD_FAIL_WRITE(int id, kdev_t dev)
index 925775b..da6256a 100644 (file)
@@ -9,7 +9,7 @@ modulefs_DATA = ldlm.o
 EXTRA_PROGRAMS = ldlm
 
 ldlm_SOURCES = ldlm_lock.c ldlm_resource.c ldlm_test.c ldlm_lockd.c \
-ldlm_extent.c
+ldlm_extent.c ldlm_request.c
 
 include $(top_srcdir)/Rules
 
index ef47d1e..bddb89d 100644 (file)
@@ -84,6 +84,15 @@ void ldlm_lock_free(struct ldlm_lock *lock)
         kmem_cache_free(ldlm_lock_slab, lock);
 }
 
+void ldlm_lock2desc(struct ldlm_lock *lock, struct ldlm_lock_desc *desc)
+{
+        ldlm_res2desc(lock->l_resource, &desc->l_resource);
+        desc->l_req_mode = lock->l_req_mode;
+        desc->l_granted_mode = lock->l_granted_mode;
+        memcpy(&desc->l_extent, &lock->l_extent, sizeof(desc->l_extent));
+        memcpy(desc->l_version, lock->l_version, sizeof(desc->l_version));
+}
+
 static int ldlm_lock_compat(struct ldlm_lock *lock)
 {
         struct list_head *tmp;
@@ -119,7 +128,8 @@ static void ldlm_grant_lock(struct ldlm_resource *res, struct ldlm_lock *lock)
                 res->lr_most_restr = lock->l_granted_mode;
 
         if (lock->l_completion_ast)
-                lock->l_completion_ast(lock, NULL, NULL, 0);
+                lock->l_completion_ast(lock, NULL,
+                                       lock->l_data, lock->l_data_len);
 }
 
 /* XXX: Revisit the error handling; we do not, for example, do
@@ -181,10 +191,8 @@ ldlm_error_t ldlm_local_lock_enqueue(struct obd_device *obddev,
                 memcpy(&lock->l_extent, req_ex, sizeof(*req_ex));
         lock->l_data = data;
         lock->l_data_len = data_len;
-        if ((*flags) & LDLM_FL_COMPLETION_AST)
-                lock->l_completion_ast = completion;
-        if ((*flags) & LDLM_FL_BLOCKING_AST)
-                lock->l_blocking_ast = blocking;
+        lock->l_completion_ast = completion;
+        lock->l_blocking_ast = blocking;
         ldlm_object2handle(lock, lockh);
         spin_lock(&res->lr_lock);
 
@@ -192,25 +200,27 @@ ldlm_error_t ldlm_local_lock_enqueue(struct obd_device *obddev,
 
         if (!list_empty(&res->lr_converting)) {
                 ldlm_resource_add_lock(res, res->lr_waiting.prev, lock);
-                GOTO(out, rc = -ELDLM_BLOCK_CONV);
+                *flags |= LDLM_FL_BLOCK_CONV;
+                GOTO(out, ELDLM_OK);
         }
         if (!list_empty(&res->lr_waiting)) {
                 ldlm_resource_add_lock(res, res->lr_waiting.prev, lock);
-                GOTO(out, rc = -ELDLM_BLOCK_WAIT);
+                *flags |= LDLM_FL_BLOCK_WAIT;
+                GOTO(out, ELDLM_OK);
         }
 
         incompat = ldlm_lock_compat(lock);
         if (incompat) {
                 ldlm_resource_add_lock(res, res->lr_waiting.prev, lock);
-                GOTO(out, rc = -ELDLM_BLOCK_GRANTED);
+                *flags |= LDLM_FL_BLOCK_GRANTED;
+                GOTO(out, ELDLM_OK);
         }
 
         ldlm_grant_lock(res, lock);
-        GOTO(out, rc = ELDLM_OK);
-
+        EXIT;
  out:
         spin_unlock(&res->lr_lock);
-        return rc;
+        return ELDLM_OK;
 }
 
 static int ldlm_reprocess_queue(struct ldlm_resource *res,
index 4f384f1..7995590 100644 (file)
 
 #define DEBUG_SUBSYSTEM S_LDLM
 
-#include <linux/obd_class.h>
 #include <linux/lustre_dlm.h>
-#include <linux/lustre_net.h>
 
 extern kmem_cache_t *ldlm_resource_slab;
 extern kmem_cache_t *ldlm_lock_slab;
 
-static int ldlm_client_callback(struct ldlm_lock *lock, struct ldlm_lock *new,
-                                void *data, __u32 data_len)
+static int _ldlm_namespace_new(struct obd_device *obddev,
+                               struct ptlrpc_request *req)
 {
-        LBUG();
-        return 0;
+        struct ldlm_request *dlm_req;
+        struct ldlm_namespace *ns;
+        int rc;
+        ldlm_error_t err;
+        ENTRY;
+
+        rc = lustre_pack_msg(0, NULL, NULL, &req->rq_replen, &req->rq_repbuf);
+        req->rq_repmsg = (struct lustre_msg *)req->rq_repbuf;
+        if (rc) {
+                CERROR("out of memory\n");
+                req->rq_status = -ENOMEM;
+                RETURN(0);
+        }
+        dlm_req = lustre_msg_buf(req->rq_reqmsg, 0);
+
+        err = ldlm_namespace_new(obddev, dlm_req->lock_desc.l_resource.lr_ns_id,
+                                 &ns);
+        req->rq_status = err;
+
+        CERROR("err = %d\n", err);
+
+        RETURN(0);
 }
 
-static int ldlm_enqueue(struct ptlrpc_request *req)
+static int _ldlm_enqueue(struct ptlrpc_request *req)
 {
         struct ldlm_reply *dlm_rep;
         struct ldlm_request *dlm_req;
+        int rc, size = sizeof(*dlm_rep);
         ldlm_error_t err;
-        int rc, bufsize = sizeof(*dlm_rep);
-        
-        rc = lustre_pack_msg(1, &bufsize, NULL, &req->rq_replen,
-                             &req->rq_repbuf);
+        ENTRY;
+
+        rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repbuf);
+        req->rq_repmsg = (struct lustre_msg *)req->rq_repbuf;
         if (rc) {
                 CERROR("out of memory\n");
                 req->rq_status = -ENOMEM;
                 RETURN(0);
         }
-        req->rq_repmsg = (struct lustre_msg *)req->rq_repbuf;
         dlm_rep = lustre_msg_buf(req->rq_repmsg, 0);
         dlm_req = lustre_msg_buf(req->rq_reqmsg, 0);
 
+        memcpy(&dlm_rep->lock_extent, &dlm_req->lock_desc.l_extent,
+               sizeof(dlm_rep->lock_extent));
+
         err = ldlm_local_lock_enqueue(req->rq_obd,
-                                      dlm_req->ns_id,
-                                      &dlm_req->parent_lock_handle,
-                                      dlm_req->res_id,
-                                      dlm_req->res_type,
-                                      &dlm_req->lock_extent,
-                                      dlm_req->mode,
+                                      dlm_req->lock_desc.l_resource.lr_ns_id,
+                                      &dlm_req->lock_handle2,
+                                      dlm_req->lock_desc.l_resource.lr_name,
+                                      dlm_req->lock_desc.l_resource.lr_type,
+                                      &dlm_rep->lock_extent,
+                                      dlm_req->lock_desc.l_req_mode,
                                       &dlm_req->flags,
-                                      ldlm_client_callback,
-                                      ldlm_client_callback,
+                                      //ldlm_cli_callback,
+                                      NULL,
+                                      ldlm_cli_callback,
                                       lustre_msg_buf(req->rq_reqmsg, 1),
                                       req->rq_reqmsg->buflens[1],
                                       &dlm_rep->lock_handle);
+        if (err != -ENOMEM && err != -ELDLM_BAD_NAMESPACE) {
+                struct ldlm_lock *lock;
+                lock = ldlm_handle2object(&dlm_rep->lock_handle);
+                memcpy(&lock->l_peer, &req->rq_peer, sizeof(lock->l_peer));
+                memcpy(&lock->l_remote_handle, &dlm_req->lock_handle1,
+                       sizeof(lock->l_remote_handle));
+        }
         req->rq_status = err;
 
-        CERROR("local_lock_enqueue: %d\n", err);
+        CERROR("err = %d\n", err);
+
+        RETURN(0);
+}
+
+static int _ldlm_convert(struct ptlrpc_request *req)
+{
+        struct ldlm_request *dlm_req;
+        int rc;
+        ENTRY;
+
+        rc = lustre_pack_msg(0, NULL, NULL, &req->rq_replen, &req->rq_repbuf);
+        req->rq_repmsg = (struct lustre_msg *)req->rq_repbuf;
+        if (rc) {
+                CERROR("out of memory\n");
+                req->rq_status = -ENOMEM;
+                RETURN(0);
+        }
+        dlm_req = lustre_msg_buf(req->rq_reqmsg, 0);
+
+        req->rq_status =
+                ldlm_local_lock_convert(req->rq_obd, &dlm_req->lock_handle1,
+                                        dlm_req->lock_desc.l_req_mode,
+                                        &dlm_req->flags);
+        RETURN(0);
+}
+
+static int _ldlm_cancel(struct ptlrpc_request *req)
+{
+        struct ldlm_request *dlm_req;
+        int rc;
+        ENTRY;
+
+        rc = lustre_pack_msg(0, NULL, NULL, &req->rq_replen, &req->rq_repbuf);
+        req->rq_repmsg = (struct lustre_msg *)req->rq_repbuf;
+        if (rc) {
+                CERROR("out of memory\n");
+                req->rq_status = -ENOMEM;
+                RETURN(0);
+        }
+        dlm_req = lustre_msg_buf(req->rq_reqmsg, 0);
+
+        req->rq_status =
+                ldlm_local_lock_cancel(req->rq_obd, &dlm_req->lock_handle1);
+        RETURN(0);
+}
+
+static int _ldlm_callback(struct ptlrpc_request *req)
+{
+        struct ldlm_request *dlm_req;
+        struct ldlm_lock *lock;
+        int rc;
+        ENTRY;
+
+        rc = lustre_pack_msg(0, NULL, NULL, &req->rq_replen, &req->rq_repbuf);
+        req->rq_repmsg = (struct lustre_msg *)req->rq_repbuf;
+        if (rc) {
+                CERROR("out of memory\n");
+                req->rq_status = -ENOMEM;
+                RETURN(0);
+        }
+        dlm_req = lustre_msg_buf(req->rq_reqmsg, 0);
+
+        lock = ldlm_handle2object(&dlm_req->lock_handle1);
+        ldlm_lock_dump(lock);
+
+        req->rq_status = 0;
 
         RETURN(0);
 }
@@ -90,75 +185,59 @@ static int ldlm_handle(struct obd_device *dev, struct ptlrpc_service *svc,
         }
 
         switch (req->rq_reqmsg->opc) {
+        case LDLM_NAMESPACE_NEW:
+                CDEBUG(D_INODE, "namespace_new\n");
+                OBD_FAIL_RETURN(OBD_FAIL_LDLM_NAMESPACE_NEW, 0);
+                rc = _ldlm_namespace_new(dev, req);
+                break;
+
         case LDLM_ENQUEUE:
                 CDEBUG(D_INODE, "enqueue\n");
                 OBD_FAIL_RETURN(OBD_FAIL_LDLM_ENQUEUE, 0);
-                rc = ldlm_enqueue(req);
+                rc = _ldlm_enqueue(req);
                 break;
-#if 0
+
         case LDLM_CONVERT:
                 CDEBUG(D_INODE, "convert\n");
                 OBD_FAIL_RETURN(OBD_FAIL_LDLM_CONVERT, 0);
-                rc = ldlm_convert(req);
+                rc = _ldlm_convert(req);
                 break;
 
         case LDLM_CANCEL:
                 CDEBUG(D_INODE, "cancel\n");
                 OBD_FAIL_RETURN(OBD_FAIL_LDLM_CANCEL, 0);
-                rc = ldlm_cancel(req);
+                rc = _ldlm_cancel(req);
                 break;
 
         case LDLM_CALLBACK:
                 CDEBUG(D_INODE, "callback\n");
                 OBD_FAIL_RETURN(OBD_FAIL_LDLM_CALLBACK, 0);
-                rc = ldlm_callback(req);
+                rc = _ldlm_callback(req);
                 break;
-#endif
 
         default:
-                rc = ptlrpc_error(dev, svc, req);
+                rc = ptlrpc_error(svc, req);
                 RETURN(rc);
         }
 
-        EXIT;
 out:
-        if (rc) {
-                CERROR("no header\n");
-                return 0;
-        }
-
-        if( req->rq_status) {
-                ptlrpc_error(dev, svc, req);
-        } else {
-                CDEBUG(D_NET, "sending reply\n");
-                ptlrpc_reply(dev, svc, req);
-        }
-
-        return 0;
+        if (rc)
+                RETURN(ptlrpc_error(svc, req));
+        else
+                RETURN(ptlrpc_reply(svc, req));
 }
 
 static int ldlm_iocontrol(int cmd, struct obd_conn *conn, int len, void *karg,
                           void *uarg)
 {
         struct obd_device *obddev = conn->oc_dev;
-        struct ptlrpc_client cl;
         int err;
-
         ENTRY;
 
-        if ( _IOC_TYPE(cmd) != IOC_LDLM_TYPE ||
-             _IOC_NR(cmd) < IOC_LDLM_MIN_NR  ||
-             _IOC_NR(cmd) > IOC_LDLM_MAX_NR ) {
+        if (_IOC_TYPE(cmd) != IOC_LDLM_TYPE || _IOC_NR(cmd) < IOC_LDLM_MIN_NR ||
+            _IOC_NR(cmd) > IOC_LDLM_MAX_NR) {
                 CDEBUG(D_IOCTL, "invalid ioctl ( type %d, nr %d, size %d )\n",
-                                _IOC_TYPE(cmd), _IOC_NR(cmd), _IOC_SIZE(cmd));
-                EXIT;
-                return -EINVAL;
-        }
-
-        ptlrpc_init_client(-1, LDLM_REQUEST_PORTAL, LDLM_REPLY_PORTAL, &cl);
-        err = ptlrpc_connect_client(-1, "ldlm", &cl);
-        if (err) {
-                CERROR("cannot create client\n");
+                       _IOC_TYPE(cmd), _IOC_NR(cmd), _IOC_SIZE(cmd));
                 RETURN(-EINVAL);
         }
 
@@ -166,15 +245,13 @@ static int ldlm_iocontrol(int cmd, struct obd_conn *conn, int len, void *karg,
         case IOC_LDLM_TEST: {
                 err = ldlm_test(obddev);
                 CERROR("-- done err %d\n", err);
-                EXIT;
-                break;
+                GOTO(out, err);
         }
         default:
-                err = -EINVAL;
-                EXIT;
-                break;
+                GOTO(out, err = -EINVAL);
         }
 
+ out:
         return err;
 }
 
@@ -195,8 +272,23 @@ static int ldlm_setup(struct obd_device *obddev, obd_count len, void *data)
                 LBUG();
 
         err = ptlrpc_start_thread(obddev, ldlm->ldlm_service, "lustre_dlm");
-        if (err)
+        if (err) {
                 CERROR("cannot start thread\n");
+                LBUG();
+        }
+
+        OBD_ALLOC(ldlm->ldlm_client, sizeof(*ldlm->ldlm_client));
+        if (ldlm->ldlm_client == NULL)
+                LBUG();
+
+        ptlrpc_init_client(-1, LDLM_REQUEST_PORTAL, LDLM_REPLY_PORTAL,
+                           ldlm->ldlm_client);
+        err = ptlrpc_connect_client("ldlm", ldlm->ldlm_client,
+                                    &ldlm->ldlm_server_peer);
+        if (err) {
+                CERROR("cannot create client\n");
+                LBUG();
+        }
 
         MOD_INC_USE_COUNT;
         RETURN(0);
@@ -257,7 +349,7 @@ static int ldlm_free_all(struct obd_device *obddev)
 
         ldlm_lock(obddev);
 
-        list_for_each_safe(tmp, pos, &obddev->u.ldlm.ldlm_namespaces) { 
+        list_for_each_safe(tmp, pos, &obddev->u.ldlm.ldlm_namespaces) {
                 struct ldlm_namespace *ns;
                 ns = list_entry(tmp, struct ldlm_namespace, ns_link);
 
@@ -282,6 +374,7 @@ static int ldlm_cleanup(struct obd_device *obddev)
                 CERROR("Request list not empty!\n");
         }
 
+        OBD_FREE(ldlm->ldlm_client, sizeof(*ldlm->ldlm_client));
         OBD_FREE(ldlm->ldlm_service, sizeof(*ldlm->ldlm_service));
 
         if (ldlm_free_all(obddev)) {
@@ -334,7 +427,7 @@ static void __exit ldlm_exit(void)
 
 MODULE_AUTHOR("Cluster File Systems, Inc. <braam@clusterfs.com>");
 MODULE_DESCRIPTION("Lustre Lock Management Module v0.1");
-MODULE_LICENSE("GPL"); 
+MODULE_LICENSE("GPL");
 
 module_init(ldlm_init);
 module_exit(ldlm_exit);
diff --git a/lustre/ldlm/ldlm_request.c b/lustre/ldlm/ldlm_request.c
new file mode 100644 (file)
index 0000000..3e796ac
--- /dev/null
@@ -0,0 +1,143 @@
+/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
+ * vim:expandtab:shiftwidth=8:tabstop=8:
+ *
+ * Copyright (C) 2002 Cluster File Systems, Inc.
+ *
+ * This code is issued under the GNU General Public License.
+ * See the file COPYING in this distribution
+ *
+ * by Cluster File Systems, Inc.
+ */
+
+#define EXPORT_SYMTAB
+
+#define DEBUG_SUBSYSTEM S_LDLM
+
+#include <linux/lustre_dlm.h>
+
+int ldlm_cli_enqueue(struct ptlrpc_client *cl, struct lustre_peer *peer,
+                     __u32 ns_id,
+                     struct ldlm_handle *parent_lock_handle,
+                     __u64 *res_id,
+                     __u32 type,
+                     struct ldlm_extent *req_ex,
+                     ldlm_mode_t mode,
+                     int *flags,
+                     void *data,
+                     __u32 data_len,
+                     struct ldlm_handle *lockh,
+                     struct ptlrpc_request **request)
+{
+        struct ldlm_request *body;
+        struct ldlm_reply *reply;
+        struct ptlrpc_request *req;
+        char *bufs[2] = {NULL, data};
+        int rc, size[2] = {sizeof(*body), data_len};
+
+#if 0
+        ldlm_local_lock_enqueue(obddev, ns_id, parent_lock_handle, res_id, type,
+                                req_ex, mode, flags);
+#endif                           
+
+        /* FIXME: if this is a local lock, stop here. */
+
+        req = ptlrpc_prep_req(cl, peer, LDLM_ENQUEUE, 2, size, bufs);
+        if (!req)
+                GOTO(out, rc = -ENOMEM);
+
+        /* Dump all of this data into the request buffer */
+        body = lustre_msg_buf(req->rq_reqmsg, 0);
+        body->lock_desc.l_resource.lr_ns_id = ns_id;
+        body->lock_desc.l_resource.lr_type = type;
+        memcpy(body->lock_desc.l_resource.lr_name, res_id,
+               sizeof(body->lock_desc.l_resource.lr_name));
+
+        body->lock_desc.l_req_mode = mode;
+        if (req_ex)
+                memcpy(&body->lock_desc.l_extent, req_ex,
+                       sizeof(body->lock_desc.l_extent));
+        body->flags = *flags;
+
+        /* FIXME: lock_handle1 will be the shadow handle */
+
+        if (parent_lock_handle)
+                memcpy(&body->lock_handle2, parent_lock_handle,
+                       sizeof(body->lock_handle2));
+
+        /* Continue as normal. */
+        size[0] = sizeof(*reply);
+        req->rq_replen = lustre_msg_size(1, size);
+
+        rc = ptlrpc_queue_wait(cl, req);
+        rc = ptlrpc_check_status(req, rc);
+        if (rc != ELDLM_OK)
+                GOTO(out, rc);
+
+        reply = lustre_msg_buf(req->rq_repmsg, 0);
+        CERROR("remote handle: %p\n",
+               (void *)(unsigned long)reply->lock_handle.addr);
+        CERROR("extent: %Lu -> %Lu\n", reply->lock_extent.start,
+               reply->lock_extent.end);
+
+        EXIT;
+ out:
+        *request = req;
+        return rc;
+}
+
+int ldlm_cli_namespace_new(struct ptlrpc_client *cl, struct lustre_peer *peer,
+                           __u32 ns_id, struct ptlrpc_request **request)
+{
+        struct ldlm_request *body;
+        struct ptlrpc_request *req;
+        int rc, size = sizeof(*body);
+
+        req = ptlrpc_prep_req(cl, peer, LDLM_NAMESPACE_NEW, 1, &size, NULL);
+        if (!req)
+                GOTO(out, rc = -ENOMEM);
+
+        body = lustre_msg_buf(req->rq_reqmsg, 0);
+        body->lock_desc.l_resource.lr_ns_id = ns_id;
+
+        req->rq_replen = lustre_msg_size(0, NULL);
+
+        rc = ptlrpc_queue_wait(cl, req);
+        rc = ptlrpc_check_status(req, rc);
+
+        EXIT;
+ out:
+        *request = req;
+        return rc;
+}
+
+int ldlm_cli_callback(struct ldlm_lock *lock, struct ldlm_lock *new,
+                      void *data, __u32 data_len)
+{
+        struct ldlm_request *body;
+        struct ptlrpc_request *req;
+        struct obd_device *obddev = lock->l_resource->lr_namespace->ns_obddev;
+        struct ptlrpc_client *cl = obddev->u.ldlm.ldlm_client;
+        int rc, size[2] = {sizeof(*body), data_len};
+        char *bufs[2] = {NULL, data};
+
+        req = ptlrpc_prep_req(cl, &lock->l_peer, LDLM_CALLBACK, 2, size, bufs);
+        if (!req)
+                GOTO(out, rc = -ENOMEM);
+
+        body = lustre_msg_buf(req->rq_reqmsg, 0);
+        memcpy(&body->lock_handle1, &lock->l_remote_handle,
+               sizeof(body->lock_handle1));
+
+        if (new != NULL)
+                ldlm_lock2desc(new, &body->lock_desc);
+
+        req->rq_replen = lustre_msg_size(0, NULL);
+
+        rc = ptlrpc_queue_wait(cl, req);
+        rc = ptlrpc_check_status(req, rc);
+        ptlrpc_free_req(req);
+
+        EXIT;
+ out:
+        return rc;
+}
index 4d8ed88..88f82f2 100644 (file)
@@ -66,18 +66,20 @@ static void res_hash_init(struct ldlm_namespace *ns)
         ns->ns_hash = res_hash;
 }
 
-struct ldlm_namespace *ldlm_namespace_new(struct obd_device *obddev, __u32 id)
+ldlm_error_t ldlm_namespace_new(struct obd_device *obddev, __u32 id,
+                                struct ldlm_namespace **ns_out)
 {
         struct ldlm_namespace *ns;
 
         if (ldlm_namespace_find(obddev, id))
-                LBUG();
+                return -ELDLM_NAMESPACE_EXISTS;
 
         OBD_ALLOC(ns, sizeof(*ns));
         if (!ns)
                 LBUG();
 
         ns->ns_id = id;
+        ns->ns_obddev = obddev;
         INIT_LIST_HEAD(&ns->ns_root_list);
 
         list_add(&ns->ns_link, &obddev->u.ldlm.ldlm_namespaces);
@@ -85,7 +87,8 @@ struct ldlm_namespace *ldlm_namespace_new(struct obd_device *obddev, __u32 id)
         res_hash_init(ns); 
         atomic_set(&ns->ns_refcount, 0);
 
-        return ns;
+        *ns_out = ns;
+        return ELDLM_OK;
 }
 
 int ldlm_namespace_free(struct ldlm_namespace *ns)
@@ -249,6 +252,14 @@ int ldlm_get_resource_handle(struct ldlm_resource *res, struct ldlm_handle *h)
         return 0;
 }
 
+void ldlm_res2desc(struct ldlm_resource *res, struct ldlm_resource_desc *desc)
+{
+        desc->lr_ns_id = res->lr_namespace->ns_id;
+        desc->lr_type = res->lr_type;
+        memcpy(desc->lr_name, res->lr_name, sizeof(desc->lr_name));
+        memcpy(desc->lr_version, res->lr_version, sizeof(desc->lr_version));
+}
+
 void ldlm_resource_dump(struct ldlm_resource *res)
 {
         struct list_head *tmp;
index 17bbd76..39f4bd9 100644 (file)
@@ -41,8 +41,8 @@ int ldlm_test_basics(struct obd_device *obddev)
 
         ldlm_lock(obddev);
 
-        ns = ldlm_namespace_new(obddev, 1);
-        if (ns == NULL)
+        err = ldlm_namespace_new(obddev, 1, &ns);
+        if (err != ELDLM_OK)
                 LBUG();
 
         res = ldlm_resource_get(ns, NULL, res_id, LDLM_PLAIN, 1);
@@ -50,7 +50,6 @@ int ldlm_test_basics(struct obd_device *obddev)
                 LBUG();
 
         /* Get a couple of read locks */
-        flags = LDLM_FL_BLOCKING_AST;
         err = ldlm_local_lock_enqueue(obddev, 1, NULL, res_id, LDLM_PLAIN,
                                       NULL, LCK_CR, &flags, NULL,
                                       ldlm_test_callback, NULL, 0, &lockh_1);
@@ -60,7 +59,9 @@ int ldlm_test_basics(struct obd_device *obddev)
         err = ldlm_local_lock_enqueue(obddev, 1, NULL, res_id, LDLM_PLAIN,
                                       NULL, LCK_EX, &flags, NULL,
                                       ldlm_test_callback, NULL, 0, &lockh_2);
-        if (err != -ELDLM_BLOCK_GRANTED)
+        if (err != ELDLM_OK)
+                LBUG();
+        if (!(flags & LDLM_FL_BLOCK_GRANTED))
                 LBUG();
 
         ldlm_resource_dump(res);
@@ -88,8 +89,8 @@ int ldlm_test_extents(struct obd_device *obddev)
 
         ldlm_lock(obddev);
 
-        ns = ldlm_namespace_new(obddev, 2);
-        if (ns == NULL)
+        err = ldlm_namespace_new(obddev, 2, &ns);
+        if (err != ELDLM_OK)
                 LBUG();
 
         flags = 0;
@@ -114,7 +115,9 @@ int ldlm_test_extents(struct obd_device *obddev)
         err = ldlm_local_lock_enqueue(obddev, 2, NULL, res_id, LDLM_EXTENT,
                                       &ext3, LCK_EX, &flags, NULL, NULL, NULL,
                                       0, &ext3_h);
-        if (err != -ELDLM_BLOCK_GRANTED)
+        if (err != ELDLM_OK)
+                LBUG();
+        if (!(flags & LDLM_FL_BLOCK_GRANTED))
                 LBUG();
         if (flags & LDLM_FL_LOCK_CHANGED)
                 LBUG();
@@ -141,13 +144,53 @@ int ldlm_test_extents(struct obd_device *obddev)
         return 0;
 }
 
+static int ldlm_test_network(struct obd_device *obddev)
+{
+        struct ldlm_obd *ldlm = &obddev->u.ldlm;
+        struct ptlrpc_request *request;
+
+        __u64 res_id[RES_NAME_SIZE] = {1, 2, 3};
+        struct ldlm_extent ext = {4, 6};
+        struct ldlm_handle lockh1, lockh2;
+        int flags = 0;
+        ldlm_error_t err;
+
+        err = ldlm_cli_namespace_new(ldlm->ldlm_client, &ldlm->ldlm_server_peer,
+                                     3, &request);
+        ptlrpc_free_req(request);
+        CERROR("ldlm_cli_namespace_new: %d\n", err);
+        if (err != ELDLM_OK)
+                GOTO(out, err);
+
+        err = ldlm_cli_enqueue(ldlm->ldlm_client, &ldlm->ldlm_server_peer, 3,
+                               NULL, res_id, LDLM_EXTENT, &ext, LCK_PR, &flags,
+                               NULL, 0, &lockh1, &request);
+        ptlrpc_free_req(request);
+        CERROR("ldlm_cli_enqueue: %d\n", err);
+
+        flags = 0;
+        err = ldlm_cli_enqueue(ldlm->ldlm_client, &ldlm->ldlm_server_peer, 3,
+                               NULL, res_id, LDLM_EXTENT, &ext, LCK_EX, &flags,
+                               NULL, 0, &lockh2, &request);
+        ptlrpc_free_req(request);
+        CERROR("ldlm_cli_enqueue: %d\n", err);
+
+        EXIT;
+ out:
+        return err;
+}
+
 int ldlm_test(struct obd_device *obddev)
 {
-        int rc; 
+        int rc;
         rc = ldlm_test_basics(obddev);
-        if (rc) 
+        if (rc)
                 RETURN(rc);
 
         rc = ldlm_test_extents(obddev);
-        RETURN(rc); 
+        if (rc)
+                RETURN(rc);
+
+        rc = ldlm_test_network(obddev);
+        RETURN(rc);
 }
index 93d17d7..2a645fd 100644 (file)
@@ -80,8 +80,8 @@ static int ll_dir_readpage(struct file *file, struct page *page)
 
         offset = page->index << PAGE_SHIFT; 
         buf = kmap(page);
-        rc = mdc_readpage(&sbi->ll_mds_client, inode->i_ino, S_IFDIR, offset, 
-                          buf, &request);
+        rc = mdc_readpage(&sbi->ll_mds_client, &sbi->ll_mds_peer, inode->i_ino,
+                          S_IFDIR, offset, buf, &request);
         kunmap(page); 
         ptlrpc_free_req(request);
         EXIT;
index 2eb2829..25f1444 100644 (file)
@@ -62,8 +62,8 @@ static int ll_file_open(struct inode *inode, struct file *file)
                 GOTO(out, rc = -ENOMEM);
         memset(fd, 0, sizeof(*fd));
 
-        rc = mdc_open(&sbi->ll_mds_client, inode->i_ino, S_IFREG,
-                      file->f_flags, &fd->fd_mdshandle, &req); 
+        rc = mdc_open(&sbi->ll_mds_client, &sbi->ll_mds_peer, inode->i_ino,
+                      S_IFREG, file->f_flags, &fd->fd_mdshandle, &req); 
         if (!fd->fd_mdshandle)
                 CERROR("mdc_open didn't assign fd_mdshandle\n");
 
@@ -134,8 +134,8 @@ static int ll_file_release(struct inode *inode, struct file *file)
                 rc = -EIO;
         }
 
-        rc = mdc_close(&sbi->ll_mds_client, inode->i_ino, S_IFREG, 
-                      fd->fd_mdshandle, &req); 
+        rc = mdc_close(&sbi->ll_mds_client, &sbi->ll_mds_peer, inode->i_ino,
+                       S_IFREG, fd->fd_mdshandle, &req); 
         ptlrpc_free_req(req);
         if (rc) { 
                 if (rc > 0) 
index ce2cdaa..5d6fb60 100644 (file)
@@ -107,7 +107,7 @@ static struct dentry *ll_lookup(struct inode * dir, struct dentry *dentry)
         if (!ino)
                 goto negative;
 
-        err = mdc_getattr(&sbi->ll_mds_client, ino, type,
+        err = mdc_getattr(&sbi->ll_mds_client, &sbi->ll_mds_peer, ino, type,
                           OBD_MD_FLNOTOBD|OBD_MD_FLBLOCKS, &request);
         if (err) {
                 CERROR("failure %d inode %ld\n", err, ino);
@@ -141,8 +141,9 @@ static struct inode *ll_create_node(struct inode *dir, const char *name,
 
         ENTRY;
 
-        err = mdc_create(&sbi->ll_mds_client, dir, name, namelen, tgt, tgtlen,
-                         mode, id,  current->uid, current->gid, time, &request);
+        err = mdc_create(&sbi->ll_mds_client, &sbi->ll_mds_peer, dir, name,
+                         namelen, tgt, tgtlen, mode, id,  current->uid,
+                         current->gid, time, &request);
         if (err) { 
                 inode = ERR_PTR(err);
                 GOTO(out, err);
@@ -190,7 +191,8 @@ int ll_mdc_unlink(struct inode *dir, struct inode *child,
 
         ENTRY;
 
-        err = mdc_unlink(&sbi->ll_mds_client, dir, child, name, len, &request);
+        err = mdc_unlink(&sbi->ll_mds_client, &sbi->ll_mds_peer, dir, child,
+                         name, len, &request);
         ptlrpc_free_req(request);
 
         EXIT;
@@ -206,7 +208,8 @@ int ll_mdc_link(struct dentry *src, struct inode *dir,
 
         ENTRY;
 
-        err = mdc_link(&sbi->ll_mds_client, src, dir, name, len, &request);
+        err = mdc_link(&sbi->ll_mds_client, &sbi->ll_mds_peer, src, dir, name,
+                       len, &request);
         ptlrpc_free_req(request);
 
         EXIT;
@@ -222,7 +225,7 @@ int ll_mdc_rename(struct inode *src, struct inode *tgt,
 
         ENTRY;
 
-        err = mdc_rename(&sbi->ll_mds_client, src, tgt, 
+        err = mdc_rename(&sbi->ll_mds_client, &sbi->ll_mds_peer, src, tgt, 
                          old->d_name.name, old->d_name.len, 
                          new->d_name.name, new->d_name.len, &request);
         ptlrpc_free_req(request);
index 3e5a65d..50bb08f 100644 (file)
@@ -136,7 +136,8 @@ static struct super_block * ll_read_super(struct super_block *sb,
         /* the first parameter should become an mds device no */
         ptlrpc_init_client(-1, MDS_REQUEST_PORTAL, MDC_REPLY_PORTAL,
                            &sbi->ll_mds_client);
-        err = ptlrpc_connect_client(-1, "mds", &sbi->ll_mds_client);
+        err = ptlrpc_connect_client("mds", &sbi->ll_mds_client,
+                                    &sbi->ll_mds_peer);
         if (err) {
                 CERROR("cannot find MDS\n");
                 GOTO(out_disc, sb = NULL);
@@ -152,7 +153,8 @@ static struct super_block * ll_read_super(struct super_block *sb,
         sb->s_op = &ll_super_operations;
 
         /* make root inode */
-        err = mdc_getattr(&sbi->ll_mds_client, sbi->ll_rootino, S_IFDIR,
+        err = mdc_getattr(&sbi->ll_mds_client, &sbi->ll_mds_peer,
+                          sbi->ll_rootino, S_IFDIR,
                           OBD_MD_FLNOTOBD|OBD_MD_FLBLOCKS, &request);
         if (err) {
                 CERROR("mdc_getattr failed for root %d\n", err);
@@ -260,7 +262,8 @@ int ll_inode_setattr(struct inode *inode, struct iattr *attr, int do_trunc)
         /* change incore inode */
         ll_attr2inode(inode, attr, do_trunc);
 
-        err = mdc_setattr(&sbi->ll_mds_client, inode, attr, &request);
+        err = mdc_setattr(&sbi->ll_mds_client, &sbi->ll_mds_peer, inode, attr,
+                          &request);
         if (err)
                 CERROR("mdc_setattr fails (%d)\n", err);
 
index e2e062e..58eb55a 100644 (file)
@@ -43,15 +43,16 @@ static int mdc_reint(struct ptlrpc_client *cl, struct ptlrpc_request *request)
         return rc;
 }
 
-int mdc_setattr(struct ptlrpc_client *cl, struct inode *inode,
-                struct iattr *iattr, struct ptlrpc_request **request)
+int mdc_setattr(struct ptlrpc_client *cl, struct lustre_peer *peer,
+                struct inode *inode, struct iattr *iattr,
+                struct ptlrpc_request **request)
 {
         struct mds_rec_setattr *rec;
         struct ptlrpc_request *req;
         int rc, size = sizeof(*rec);
         ENTRY;
 
-        req = ptlrpc_prep_req(cl, MDS_REINT, 1, &size, NULL);
+        req = ptlrpc_prep_req(cl, peer, MDS_REINT, 1, &size, NULL);
         if (!req)
                 RETURN(-ENOMEM);
 
@@ -67,10 +68,10 @@ int mdc_setattr(struct ptlrpc_client *cl, struct inode *inode,
         RETURN(rc);
 }
 
-int mdc_create(struct ptlrpc_client *cl, struct inode *dir, const char *name,
-               int namelen, const char *tgt, int tgtlen, int mode, __u64 id,
-               __u32 uid, __u32 gid, __u64 time,
-               struct ptlrpc_request **request)
+int mdc_create(struct ptlrpc_client *cl, struct lustre_peer *peer,
+               struct inode *dir, const char *name, int namelen,
+               const char *tgt, int tgtlen, int mode, __u64 id, __u32 uid,
+               __u32 gid, __u64 time, struct ptlrpc_request **request)
 {
         struct mds_rec_create *rec;
         struct ptlrpc_request *req;
@@ -78,7 +79,7 @@ int mdc_create(struct ptlrpc_client *cl, struct inode *dir, const char *name,
         char *tmp;
         ENTRY;
 
-        req = ptlrpc_prep_req(cl, MDS_REINT, 3, size, NULL);
+        req = ptlrpc_prep_req(cl, peer, MDS_REINT, 3, size, NULL);
         if (!req)
                 RETURN(-ENOMEM);
 
@@ -102,9 +103,9 @@ int mdc_create(struct ptlrpc_client *cl, struct inode *dir, const char *name,
         RETURN(rc);
 }
 
-int mdc_unlink(struct ptlrpc_client *cl,  struct inode *dir,
-               struct inode *child, const char *name, int namelen,
-               struct ptlrpc_request **request)
+int mdc_unlink(struct ptlrpc_client *cl, struct lustre_peer *peer,
+               struct inode *dir, struct inode *child, const char *name,
+               int namelen, struct ptlrpc_request **request)
 {
         struct mds_rec_unlink *rec;
         struct ptlrpc_request *req;
@@ -112,7 +113,7 @@ int mdc_unlink(struct ptlrpc_client *cl,  struct inode *dir,
         char *tmp;
         ENTRY;
 
-        req = ptlrpc_prep_req(cl, MDS_REINT, 2, size, NULL);
+        req = ptlrpc_prep_req(cl, peer, MDS_REINT, 2, size, NULL);
         if (!req)
                 RETURN(-ENOMEM);
 
@@ -131,8 +132,9 @@ int mdc_unlink(struct ptlrpc_client *cl,  struct inode *dir,
         RETURN(rc);
 }
 
-int mdc_link(struct ptlrpc_client *cl, struct dentry *src, struct inode *dir,
-             const char *name, int namelen, struct ptlrpc_request **request)
+int mdc_link(struct ptlrpc_client *cl, struct lustre_peer *peer,
+             struct dentry *src, struct inode *dir, const char *name,
+             int namelen, struct ptlrpc_request **request)
 {
         struct mds_rec_link *rec;
         struct ptlrpc_request *req;
@@ -140,7 +142,7 @@ int mdc_link(struct ptlrpc_client *cl, struct dentry *src, struct inode *dir,
         char *tmp;
         ENTRY;
 
-        req = ptlrpc_prep_req(cl, MDS_REINT, 2, size, NULL);
+        req = ptlrpc_prep_req(cl, peer, MDS_REINT, 2, size, NULL);
         if (!req)
                 RETURN(-ENOMEM);
 
@@ -159,8 +161,9 @@ int mdc_link(struct ptlrpc_client *cl, struct dentry *src, struct inode *dir,
         RETURN(rc);
 }
 
-int mdc_rename(struct ptlrpc_client *cl, struct inode *src, struct inode *tgt,
-               const char *old, int oldlen, const char *new, int newlen,
+int mdc_rename(struct ptlrpc_client *cl, struct lustre_peer *peer,
+               struct inode *src, struct inode *tgt, const char *old,
+               int oldlen, const char *new, int newlen,
                struct ptlrpc_request **request)
 {
         struct mds_rec_rename *rec;
@@ -169,7 +172,7 @@ int mdc_rename(struct ptlrpc_client *cl, struct inode *src, struct inode *tgt,
         char *tmp;
         ENTRY;
 
-        req = ptlrpc_prep_req(cl, MDS_REINT, 3, size, NULL);
+        req = ptlrpc_prep_req(cl, peer, MDS_REINT, 3, size, NULL);
         if (!req)
                 RETURN(-ENOMEM);
 
index ec567e8..b2ee0ce 100644 (file)
 
 #define EXPORT_SYMTAB
 
-#include <linux/config.h>
 #include <linux/module.h>
-#include <linux/kernel.h>
-#include <linux/mm.h>
-#include <linux/string.h>
-#include <linux/stat.h>
-#include <linux/errno.h>
-#include <linux/locks.h>
-#include <linux/unistd.h>
-
-#include <asm/system.h>
-#include <asm/uaccess.h>
-#include <linux/module.h>
-
-#include <linux/fs.h>
-#include <linux/stat.h>
-#include <asm/uaccess.h>
-#include <asm/segment.h>
 #include <linux/miscdevice.h>
 
 #define DEBUG_SUBSYSTEM S_MDC
 
-#include <linux/obd_support.h>
-#include <linux/obd_class.h>
-#include <linux/lustre_lib.h>
-#include <linux/lustre_idl.h>
 #include <linux/lustre_mds.h>
 
 #define REQUEST_MINOR 244
 
 extern int mds_queue_req(struct ptlrpc_request *);
 
-int mdc_connect(struct ptlrpc_client *cl, ino_t ino, int type, int valid,
-                struct ptlrpc_request **request)
+int mdc_connect(struct ptlrpc_client *cl, struct lustre_peer *peer, ino_t ino,
+                int type, int valid, struct ptlrpc_request **request)
 {
         struct ptlrpc_request *req;
         struct mds_body *body;
         int rc, size = sizeof(*body);
         ENTRY;
 
-        req = ptlrpc_prep_req(cl, MDS_GETATTR, 1, &size, NULL);
+        req = ptlrpc_prep_req(cl, peer, MDS_GETATTR, 1, &size, NULL);
         if (!req)
                 GOTO(out, rc = -ENOMEM);
 
@@ -88,15 +67,15 @@ int mdc_connect(struct ptlrpc_client *cl, ino_t ino, int type, int valid,
 }
 
 
-int mdc_getattr(struct ptlrpc_client *cl, ino_t ino, int type, int valid,
-                struct ptlrpc_request **request)
+int mdc_getattr(struct ptlrpc_client *cl, struct lustre_peer *peer, ino_t ino,
+                int type, int valid, struct ptlrpc_request **request)
 {
         struct ptlrpc_request *req;
         struct mds_body *body;
         int rc, size = sizeof(*body);
         ENTRY;
 
-        req = ptlrpc_prep_req(cl, MDS_GETATTR, 1, &size, NULL);
+        req = ptlrpc_prep_req(cl, peer, MDS_GETATTR, 1, &size, NULL);
         if (!req)
                 GOTO(out, rc = -ENOMEM);
 
@@ -121,14 +100,14 @@ int mdc_getattr(struct ptlrpc_client *cl, ino_t ino, int type, int valid,
         return rc;
 }
 
-int mdc_open(struct ptlrpc_client *cl, ino_t ino, int type, int flags,
-             __u64 *fh, struct ptlrpc_request **request)
+int mdc_open(struct ptlrpc_client *cl, struct lustre_peer *peer, ino_t ino,
+             int type, int flags, __u64 *fh, struct ptlrpc_request **request)
 {
         struct mds_body *body;
         int rc, size = sizeof(*body);
         struct ptlrpc_request *req;
 
-        req = ptlrpc_prep_req(cl, MDS_OPEN, 1, &size, NULL);
+        req = ptlrpc_prep_req(cl, peer, MDS_OPEN, 1, &size, NULL);
         if (!req)
                 GOTO(out, rc = -ENOMEM);
 
@@ -153,14 +132,14 @@ int mdc_open(struct ptlrpc_client *cl, ino_t ino, int type, int flags,
         return rc;
 }
 
-int mdc_close(struct ptlrpc_client *cl, ino_t ino, int type, __u64 fh,
-              struct ptlrpc_request **request)
+int mdc_close(struct ptlrpc_client *cl, struct lustre_peer *peer, ino_t ino,
+              int type, __u64 fh, struct ptlrpc_request **request)
 {
         struct mds_body *body;
         int rc, size = sizeof(*body);
         struct ptlrpc_request *req;
 
-        req = ptlrpc_prep_req(cl, MDS_CLOSE, 1, &size, NULL);
+        req = ptlrpc_prep_req(cl, peer, MDS_CLOSE, 1, &size, NULL);
         if (!req)
                 GOTO(out, rc = -ENOMEM);
 
@@ -179,8 +158,9 @@ int mdc_close(struct ptlrpc_client *cl, ino_t ino, int type, __u64 fh,
         return rc;
 }
 
-int mdc_readpage(struct ptlrpc_client *cl, ino_t ino, int type, __u64 offset,
-                 char *addr, struct ptlrpc_request **request)
+int mdc_readpage(struct ptlrpc_client *cl, struct lustre_peer *peer, ino_t ino,
+                 int type, __u64 offset, char *addr,
+                 struct ptlrpc_request **request)
 {
         struct ptlrpc_request *req = NULL;
         struct ptlrpc_bulk_desc *bulk = NULL;
@@ -193,14 +173,14 @@ int mdc_readpage(struct ptlrpc_client *cl, ino_t ino, int type, __u64 offset,
 
         CDEBUG(D_INODE, "inode: %ld\n", ino);
 
-        bulk = ptlrpc_prep_bulk(&cl->cli_server);
+        bulk = ptlrpc_prep_bulk(peer);
         if (bulk == NULL) {
                 CERROR("%s: cannot init bulk desc\n", __FUNCTION__);
                 rc = -ENOMEM;
                 goto out;
         }
 
-        req = ptlrpc_prep_req(cl, MDS_READPAGE, 2, size, bufs);
+        req = ptlrpc_prep_req(cl, peer, MDS_READPAGE, 2, size, bufs);
         if (!req)
                 GOTO(out, rc = -ENOMEM);
 
@@ -244,6 +224,7 @@ static int request_ioctl(struct inode *inode, struct file *file,
 {
         int err;
         struct ptlrpc_client cl;
+        struct lustre_peer peer;
         struct ptlrpc_request *request;
 
         ENTRY;
@@ -260,7 +241,7 @@ static int request_ioctl(struct inode *inode, struct file *file,
         }
 
         ptlrpc_init_client(-1, MDS_REQUEST_PORTAL, MDC_REPLY_PORTAL, &cl);
-        err = ptlrpc_connect_client(-1, "mds", &cl);
+        err = ptlrpc_connect_client("mds", &cl, &peer);
         if (err) {
                 CERROR("cannot create client\n");
                 RETURN(-EINVAL);
@@ -269,7 +250,7 @@ static int request_ioctl(struct inode *inode, struct file *file,
         switch (cmd) {
         case IOC_REQUEST_GETATTR: {
                 CERROR("-- getting attr for ino %lu\n", arg);
-                err = mdc_getattr(&cl, arg, S_IFDIR, ~0, &request);
+                err = mdc_getattr(&cl, &peer, arg, S_IFDIR, ~0, &request);
                 CERROR("-- done err %d\n", err);
 
                 GOTO(out, err);
@@ -283,7 +264,7 @@ static int request_ioctl(struct inode *inode, struct file *file,
                         break;
                 }
                 CERROR("-- readpage 0 for ino %lu\n", arg);
-                err = mdc_readpage(&cl, arg, S_IFDIR, 0, buf, &request);
+                err = mdc_readpage(&cl, &peer, arg, S_IFDIR, 0, buf, &request);
                 CERROR("-- done err %d\n", err);
                 OBD_FREE(buf, PAGE_SIZE);
 
@@ -300,7 +281,7 @@ static int request_ioctl(struct inode *inode, struct file *file,
                 iattr.ia_atime = 0;
                 iattr.ia_valid = ATTR_MODE | ATTR_ATIME;
 
-                err = mdc_setattr(&cl, &inode, &iattr, &request);
+                err = mdc_setattr(&cl, &peer, &inode, &iattr, &request);
                 CERROR("-- done err %d\n", err);
 
                 GOTO(out, err);
@@ -316,7 +297,7 @@ static int request_ioctl(struct inode *inode, struct file *file,
                 iattr.ia_atime = 0;
                 iattr.ia_valid = ATTR_MODE | ATTR_ATIME;
 
-                err = mdc_create(&cl, &inode,
+                err = mdc_create(&cl, &peer, &inode,
                                  "foofile", strlen("foofile"),
                                  NULL, 0, 0100707, 47114711,
                                  11, 47, 0, &request);
@@ -329,7 +310,8 @@ static int request_ioctl(struct inode *inode, struct file *file,
                 __u64 fh, ino;
                 copy_from_user(&ino, (__u64 *)arg, sizeof(ino));
                 CERROR("-- opening ino %llu\n", ino);
-                err = mdc_open(&cl, ino, S_IFDIR, O_RDONLY, &fh, &request);
+                err = mdc_open(&cl, &peer, ino, S_IFDIR, O_RDONLY, &fh,
+                               &request);
                 copy_to_user((__u64 *)arg, &fh, sizeof(fh));
                 CERROR("-- done err %d (fh=%Lu)\n", err, fh);
 
@@ -338,7 +320,7 @@ static int request_ioctl(struct inode *inode, struct file *file,
 
         case IOC_REQUEST_CLOSE: {
                 CERROR("-- closing ino 2, filehandle %lu\n", arg);
-                err = mdc_close(&cl, 2, S_IFDIR, arg, &request);
+                err = mdc_close(&cl, &peer, 2, S_IFDIR, arg, &request);
                 CERROR("-- done err %d\n", err);
 
                 GOTO(out, err);
index 695a63f..14d670e 100644 (file)
@@ -403,17 +403,17 @@ int mds_handle(struct obd_device *dev, struct ptlrpc_service *svc,
                 break;
 
         default:
-                rc = ptlrpc_error(dev, svc, req);
+                rc = ptlrpc_error(svc, req);
                 RETURN(rc);
         }
 
         EXIT;
 out:
         if (rc) {
-                ptlrpc_error(dev, svc, req);
+                ptlrpc_error(svc, req);
         } else {
                 CDEBUG(D_NET, "sending reply\n");
-                ptlrpc_reply(dev, svc, req);
+                ptlrpc_reply(svc, req);
         }
 
         return 0;
index fb2c6da..2928d3c 100644 (file)
 #include <linux/lustre_net.h>
 #include <linux/obd_ost.h>
 
-struct ptlrpc_client *osc_con2cl(struct obd_conn *conn)
+static void osc_con2cl(struct obd_conn *conn, struct ptlrpc_client **cl,
+                       struct lustre_peer **peer)
 {
         struct osc_obd *osc = &conn->oc_dev->u.osc;
-        return osc->osc_client;
+        *cl = osc->osc_client;
+        *peer = &osc->osc_peer;
 }
 
 static int osc_connect(struct obd_conn *conn)
 {
         struct ptlrpc_request *request;
-        struct ptlrpc_client *cl = osc_con2cl(conn);
+        struct ptlrpc_client *cl;
+        struct lustre_peer *peer;
         struct ost_body *body;
         int rc, size = sizeof(*body);
         ENTRY;
 
-        request = ptlrpc_prep_req(cl, OST_CONNECT, 0, NULL, NULL);
+        osc_con2cl(conn, &cl, &peer);
+        request = ptlrpc_prep_req(cl, peer, OST_CONNECT, 0, NULL, NULL);
         if (!request)
                 RETURN(-ENOMEM);
 
@@ -81,12 +85,14 @@ static int osc_connect(struct obd_conn *conn)
 static int osc_disconnect(struct obd_conn *conn)
 {
         struct ptlrpc_request *request;
-        struct ptlrpc_client *cl = osc_con2cl(conn);
+        struct ptlrpc_client *cl;
+        struct lustre_peer *peer;
         struct ost_body *body;
         int rc, size = sizeof(*body);
         ENTRY;
 
-        request = ptlrpc_prep_req(cl, OST_DISCONNECT, 1, &size, NULL);
+        osc_con2cl(conn, &cl, &peer);
+        request = ptlrpc_prep_req(cl, peer, OST_DISCONNECT, 1, &size, NULL);
         if (!request)
                 RETURN(-ENOMEM);
 
@@ -105,12 +111,14 @@ static int osc_disconnect(struct obd_conn *conn)
 static int osc_getattr(struct obd_conn *conn, struct obdo *oa)
 {
         struct ptlrpc_request *request;
-        struct ptlrpc_client *cl = osc_con2cl(conn);
+        struct ptlrpc_client *cl;
+        struct lustre_peer *peer;
         struct ost_body *body;
         int rc, size = sizeof(*body);
         ENTRY;
 
-        request = ptlrpc_prep_req(cl, OST_GETATTR, 1, &size, NULL);
+        osc_con2cl(conn, &cl, &peer);
+        request = ptlrpc_prep_req(cl, peer, OST_GETATTR, 1, &size, NULL);
         if (!request)
                 RETURN(-ENOMEM);
 
@@ -139,12 +147,14 @@ static int osc_getattr(struct obd_conn *conn, struct obdo *oa)
 static int osc_open(struct obd_conn *conn, struct obdo *oa)
 {
         struct ptlrpc_request *request;
-        struct ptlrpc_client *cl = osc_con2cl(conn);
+        struct ptlrpc_client *cl;
+        struct lustre_peer *peer;
         struct ost_body *body;
         int rc, size = sizeof(*body);
         ENTRY;
 
-        request = ptlrpc_prep_req(cl, OST_OPEN, 1, &size, NULL);
+        osc_con2cl(conn, &cl, &peer);
+        request = ptlrpc_prep_req(cl, peer, OST_OPEN, 1, &size, NULL);
         if (!request)
                 RETURN(-ENOMEM);
 
@@ -174,12 +184,14 @@ static int osc_open(struct obd_conn *conn, struct obdo *oa)
 static int osc_close(struct obd_conn *conn, struct obdo *oa)
 {
         struct ptlrpc_request *request;
-        struct ptlrpc_client *cl = osc_con2cl(conn);
+        struct ptlrpc_client *cl;
+        struct lustre_peer *peer;
         struct ost_body *body;
         int rc, size = sizeof(*body);
         ENTRY;
 
-        request = ptlrpc_prep_req(cl, OST_CLOSE, 1, &size, NULL);
+        osc_con2cl(conn, &cl, &peer);
+        request = ptlrpc_prep_req(cl, peer, OST_CLOSE, 1, &size, NULL);
         if (!request)
                 RETURN(-ENOMEM);
 
@@ -207,12 +219,14 @@ static int osc_close(struct obd_conn *conn, struct obdo *oa)
 static int osc_setattr(struct obd_conn *conn, struct obdo *oa)
 {
         struct ptlrpc_request *request;
-        struct ptlrpc_client *cl = osc_con2cl(conn);
+        struct ptlrpc_client *cl;
+        struct lustre_peer *peer;
         struct ost_body *body;
         int rc, size = sizeof(*body);
         ENTRY;
 
-        request = ptlrpc_prep_req(cl, OST_SETATTR, 1, &size, NULL);
+        osc_con2cl(conn, &cl, &peer);
+        request = ptlrpc_prep_req(cl, peer, OST_SETATTR, 1, &size, NULL);
         if (!request)
                 RETURN(-ENOMEM);
 
@@ -233,7 +247,8 @@ static int osc_setattr(struct obd_conn *conn, struct obdo *oa)
 static int osc_create(struct obd_conn *conn, struct obdo *oa)
 {
         struct ptlrpc_request *request;
-        struct ptlrpc_client *cl = osc_con2cl(conn);
+        struct ptlrpc_client *cl;
+        struct lustre_peer *peer;
         struct ost_body *body;
         int rc, size = sizeof(*body);
         ENTRY;
@@ -242,7 +257,8 @@ static int osc_create(struct obd_conn *conn, struct obdo *oa)
                 CERROR("oa NULL\n");
                 RETURN(-EINVAL);
         }
-        request = ptlrpc_prep_req(cl, OST_CREATE, 1, &size, NULL);
+        osc_con2cl(conn, &cl, &peer);
+        request = ptlrpc_prep_req(cl, peer, OST_CREATE, 1, &size, NULL);
         if (!request)
                 RETURN(-ENOMEM);
 
@@ -270,7 +286,8 @@ static int osc_punch(struct obd_conn *conn, struct obdo *oa, obd_size count,
                      obd_off offset)
 {
         struct ptlrpc_request *request;
-        struct ptlrpc_client *cl = osc_con2cl(conn);
+        struct ptlrpc_client *cl;
+        struct lustre_peer *peer;
         struct ost_body *body;
         int rc, size = sizeof(*body);
         ENTRY;
@@ -279,7 +296,8 @@ static int osc_punch(struct obd_conn *conn, struct obdo *oa, obd_size count,
                 CERROR("oa NULL\n");
                 RETURN(-EINVAL);
         }
-        request = ptlrpc_prep_req(cl, OST_PUNCH, 1, &size, NULL);
+        osc_con2cl(conn, &cl, &peer);
+        request = ptlrpc_prep_req(cl, peer, OST_PUNCH, 1, &size, NULL);
         if (!request)
                 RETURN(-ENOMEM);
 
@@ -308,7 +326,8 @@ static int osc_punch(struct obd_conn *conn, struct obdo *oa, obd_size count,
 static int osc_destroy(struct obd_conn *conn, struct obdo *oa)
 {
         struct ptlrpc_request *request;
-        struct ptlrpc_client *cl = osc_con2cl(conn);
+        struct ptlrpc_client *cl;
+        struct lustre_peer *peer;
         struct ost_body *body;
         int rc, size = sizeof(*body);
         ENTRY;
@@ -317,7 +336,8 @@ static int osc_destroy(struct obd_conn *conn, struct obdo *oa)
                 CERROR("oa NULL\n");
                 RETURN(-EINVAL);
         }
-        request = ptlrpc_prep_req(cl, OST_DESTROY, 1, &size, NULL);
+        osc_con2cl(conn, &cl, &peer);
+        request = ptlrpc_prep_req(cl, peer, OST_DESTROY, 1, &size, NULL);
         if (!request)
                 RETURN(-ENOMEM);
 
@@ -344,7 +364,10 @@ static int osc_destroy(struct obd_conn *conn, struct obdo *oa)
 int osc_sendpage(struct obd_conn *conn, struct ptlrpc_request *req,
                  struct niobuf *dst, struct niobuf *src)
 {
-        struct ptlrpc_client *cl = osc_con2cl(conn);
+        struct ptlrpc_client *cl;
+        struct lustre_peer *peer;
+
+        osc_con2cl(conn, &cl, &peer);
 
         if (cl->cli_obd) {
                 /* local sendpage */
@@ -354,7 +377,7 @@ int osc_sendpage(struct obd_conn *conn, struct ptlrpc_request *req,
                 struct ptlrpc_bulk_desc *bulk;
                 int rc;
 
-                bulk = ptlrpc_prep_bulk(&cl->cli_server);
+                bulk = ptlrpc_prep_bulk(peer);
                 if (bulk == NULL)
                         return -ENOMEM;
 
@@ -386,7 +409,8 @@ int osc_brw_read(struct obd_conn *conn, obd_count num_oa, struct obdo **oa,
                  obd_count *oa_bufs, struct page **buf, obd_size *count,
                  obd_off *offset, obd_flag *flags)
 {
-        struct ptlrpc_client *cl = osc_con2cl(conn);
+        struct ptlrpc_client *cl;
+        struct lustre_peer *peer;
         struct ptlrpc_request *request;
         struct ost_body *body;
         struct obd_ioobj ioo;
@@ -406,7 +430,8 @@ int osc_brw_read(struct obd_conn *conn, obd_count num_oa, struct obdo **oa,
         if (bulk == NULL)
                 RETURN(-ENOMEM);
 
-        request = ptlrpc_prep_req(cl, OST_BRW, 3, size, NULL);
+        osc_con2cl(conn, &cl, &peer);
+        request = ptlrpc_prep_req(cl, peer, OST_BRW, 3, size, NULL);
         if (!request)
                 GOTO(out, rc = -ENOMEM);
 
@@ -418,7 +443,7 @@ int osc_brw_read(struct obd_conn *conn, obd_count num_oa, struct obdo **oa,
         for (pages = 0, i = 0; i < num_oa; i++) {
                 ost_pack_ioo(&ptr1, oa[i], oa_bufs[i]);
                 for (j = 0; j < oa_bufs[i]; j++, pages++) {
-                        bulk[pages] = ptlrpc_prep_bulk(&cl->cli_server);
+                        bulk[pages] = ptlrpc_prep_bulk(peer);
                         if (bulk[pages] == NULL)
                                 GOTO(out, rc = -ENOMEM);
 
@@ -465,7 +490,8 @@ int osc_brw_write(struct obd_conn *conn, obd_count num_oa, struct obdo **oa,
                   obd_count *oa_bufs, struct page **buf, obd_size *count,
                   obd_off *offset, obd_flag *flags)
 {
-        struct ptlrpc_client *cl = osc_con2cl(conn);
+        struct ptlrpc_client *cl;
+        struct lustre_peer *peer;
         struct ptlrpc_request *request;
         struct obd_ioobj ioo;
         struct ost_body *body;
@@ -484,7 +510,8 @@ int osc_brw_write(struct obd_conn *conn, obd_count num_oa, struct obdo **oa,
         if (!src)
                 RETURN(-ENOMEM);
 
-        request = ptlrpc_prep_req(cl, OST_BRW, 3, size, NULL);
+        osc_con2cl(conn, &cl, &peer);
+        request = ptlrpc_prep_req(cl, peer, OST_BRW, 3, size, NULL);
         if (!request)
                 RETURN(-ENOMEM);
         body = lustre_msg_buf(request->rq_reqmsg, 0);
@@ -565,7 +592,7 @@ static int osc_setup(struct obd_device *obddev, obd_count len,
 
         ptlrpc_init_client(dev, OST_REQUEST_PORTAL, OSC_REPLY_PORTAL,
                                    osc->osc_client);
-        rc = ptlrpc_connect_client(dev, "ost", osc->osc_client);
+        rc = ptlrpc_connect_client("ost", osc->osc_client, &osc->osc_peer);
 
         if (rc == 0)
                 MOD_INC_USE_COUNT;
index b5f57ce..f543343 100644 (file)
@@ -563,7 +563,7 @@ static int ost_handle(struct obd_device *obddev, struct ptlrpc_service *svc,
                 break;
         default:
                 req->rq_status = -ENOTSUPP;
-                rc = ptlrpc_error(obddev, svc, req);
+                rc = ptlrpc_error(svc, req);
                 RETURN(rc);
         }
 
@@ -572,10 +572,10 @@ out:
         //req->rq_status = rc;
         if (rc) {
                 CERROR("ost: processing error %d\n", rc);
-                ptlrpc_error(obddev, svc, req);
+                ptlrpc_error(svc, req);
         } else {
                 CDEBUG(D_INODE, "sending reply\n");
-                ptlrpc_reply(obddev, svc, req);
+                ptlrpc_reply(svc, req);
         }
 
         return 0;
index a1322ac..73d1324 100644 (file)
@@ -34,7 +34,7 @@
 #include <linux/lustre_net.h>
 
 void ptlrpc_init_client(int dev, int req_portal, int rep_portal,
-                          struct ptlrpc_client *cl)
+                        struct ptlrpc_client *cl)
 {
         memset(cl, 0, sizeof(*cl));
         spin_lock_init(&cl->cli_lock);
@@ -50,12 +50,13 @@ void ptlrpc_init_client(int dev, int req_portal, int rep_portal,
         sema_init(&cl->cli_rpc_sem, 32);
 }
 
-int ptlrpc_connect_client(int dev, char *uuid, struct ptlrpc_client *cl)
+int ptlrpc_connect_client(char *uuid, struct ptlrpc_client *cl,
+                          struct lustre_peer *peer)
 {
         int err;
 
         cl->cli_epoch++;
-        err = kportal_uuid_to_peer(uuid, &cl->cli_server);
+        err = kportal_uuid_to_peer(uuid, peer);
         if (err != 0)
                 CERROR("cannot find peer %s!\n", uuid);
 
@@ -75,7 +76,8 @@ struct ptlrpc_bulk_desc *ptlrpc_prep_bulk(struct lustre_peer *peer)
         return bulk;
 }
 
-struct ptlrpc_request *ptlrpc_prep_req(struct ptlrpc_client *cl, int opcode,
+struct ptlrpc_request *ptlrpc_prep_req(struct ptlrpc_client *cl,
+                                       struct lustre_peer *peer, int opcode,
                                        int count, int *lengths, char **bufs)
 {
         struct ptlrpc_request *request;
@@ -99,6 +101,7 @@ struct ptlrpc_request *ptlrpc_prep_req(struct ptlrpc_client *cl, int opcode,
                 RETURN(NULL);
         }
         request->rq_type = PTL_RPC_REQUEST;
+        memcpy(&request->rq_peer, peer, sizeof(*peer));
         request->rq_reqmsg = (struct lustre_msg *)request->rq_reqbuf;
         request->rq_reqmsg->opc = HTON__u32(opcode);
         request->rq_reqmsg->xid = HTON__u32(request->rq_xid);
index 7f68332..90c7f51 100644 (file)
@@ -209,43 +209,17 @@ int ptlrpc_abort_bulk(struct ptlrpc_bulk_desc *bulk)
         return rc;
 }
 
-int ptlrpc_reply(struct obd_device *obddev, struct ptlrpc_service *svc,
-                 struct ptlrpc_request *req)
+int ptlrpc_reply(struct ptlrpc_service *svc, struct ptlrpc_request *req)
 {
-        struct ptlrpc_request *clnt_req = req->rq_reply_handle;
-        ENTRY;
-
-        if (req->rq_reply_handle == NULL) {
-                /* This is a request that came from the network via portals. */
-
-                /* FIXME: we need to increment the count of handled events */
-                req->rq_type = PTL_RPC_REPLY;
-                req->rq_repmsg->xid = HTON__u32(req->rq_reqmsg->xid);
-                req->rq_repmsg->status = HTON__u32(req->rq_status);
-                req->rq_reqmsg->type = HTON__u32(req->rq_type);
-                ptl_send_buf(req, &req->rq_peer, svc->srv_rep_portal);
-        } else {
-                /* This is a local request that came from another thread. */
-
-                /* move the reply to the client */ 
-                clnt_req->rq_replen = req->rq_replen;
-                clnt_req->rq_repbuf = req->rq_repbuf;
-                req->rq_repbuf = NULL;
-                req->rq_replen = 0;
-
-                /* free the request buffer */
-                OBD_FREE(req->rq_reqbuf, req->rq_reqlen);
-                req->rq_reqbuf = NULL;
-
-                /* wake up the client */ 
-                wake_up_interruptible(&clnt_req->rq_wait_for_rep); 
-        }
-
-        RETURN(0);
+        /* FIXME: we need to increment the count of handled events */
+        req->rq_type = PTL_RPC_REPLY;
+        req->rq_repmsg->xid = HTON__u32(req->rq_reqmsg->xid);
+        req->rq_repmsg->status = HTON__u32(req->rq_status);
+        req->rq_reqmsg->type = HTON__u32(req->rq_type);
+        return ptl_send_buf(req, &req->rq_peer, svc->srv_rep_portal);
 }
 
-int ptlrpc_error(struct obd_device *obddev, struct ptlrpc_service *svc,
-                 struct ptlrpc_request *req)
+int ptlrpc_error(struct ptlrpc_service *svc, struct ptlrpc_request *req)
 {
         int rc;
         ENTRY;
@@ -262,7 +236,7 @@ int ptlrpc_error(struct obd_device *obddev, struct ptlrpc_service *svc,
 
         req->rq_repmsg->type = HTON__u32(PTL_RPC_ERR);
 
-        rc = ptlrpc_reply(obddev, svc, req);
+        rc = ptlrpc_reply(svc, req);
         RETURN(rc);
 }
 
@@ -296,7 +270,7 @@ int ptl_send_rpc(struct ptlrpc_request *request, struct ptlrpc_client *cl)
 
         down(&cl->cli_rpc_sem);
 
-        rc = PtlMEAttach(cl->cli_server.peer_ni, request->rq_reply_portal,
+        rc = PtlMEAttach(request->rq_peer.peer_ni, request->rq_reply_portal,
                          local_id, request->rq_xid, 0, PTL_UNLINK,
                          PTL_INS_AFTER, &request->rq_reply_me_h);
         if (rc != PTL_OK) {
@@ -325,7 +299,7 @@ int ptl_send_rpc(struct ptlrpc_request *request, struct ptlrpc_client *cl)
                request->rq_replen, request->rq_xid, request->rq_reply_portal);
 
         list_add(&request->rq_list, &cl->cli_sending_head);
-        rc = ptl_send_buf(request, &cl->cli_server, request->rq_req_portal);
+        rc = ptl_send_buf(request, &request->rq_peer, request->rq_req_portal);
         RETURN(rc);
 
  cleanup2:
index 9c398a1..fd4a529 100644 (file)
@@ -134,6 +134,7 @@ setup_portals() {
        add_uuid self
        add_uuid mds
        add_uuid ost
+       add_uuid ldlm
        quit
        EOF
 }
@@ -167,4 +168,6 @@ setup_ldlm() {
        insmod $LUSTRE/ldlm/ldlm.o || exit -1
 
        list_mods
+        echo "Press Enter to continue"
+        read
 }
index a7a62cb..196d274 100755 (executable)
@@ -41,6 +41,7 @@ disconnect
 del_uuid self
 del_uuid mds
 del_uuid ost
+del_uuid ldlm
 quit
 EOF