Whamcloud - gitweb
WARNING: we currently crash on unmount after the last phase of runtests.
authorshaver <shaver>
Tue, 3 Sep 2002 04:06:19 +0000 (04:06 +0000)
committershaver <shaver>
Tue, 3 Sep 2002 04:06:19 +0000 (04:06 +0000)
Phil and I are going to debug this more tomorrow, but this changeset needed
to land.  I feel bad about this, but not as bad as I'd feel sitting with a
thousand lines of changes in my tree another day! =)

- introduced new-style obd_import structure and spread it thickly across the
  ptlrpc infrastructure.
- merged ptlrpc_prep_req and prep_req2 into one import-taking prep_req function.
- added import for ldlm to use when calling out to client
- send local-export handle info in client_obd_connect, so that said import can
  talk to a proper export during ASTs.
- new unified recovd architecture for client- and server-side recovery.
- much header frottage to hand the export -> ldlm_export_data -> import
  structure nesting.
- commented out cli_sem usage, because clients aren't the right place for that
  any more, and we probably don't need them anyway.
- connection-sharing now requires a matching (nid, uuid) pair, to avoid bogus
  sharing on the one-host, server+client case.
- chain exports on the connection for doing per-connection recovery (soon, I
  swear).
- introduced dozens, if not thousands, of bugs, no doubt.

35 files changed:
lustre/include/linux/lustre_dlm.h
lustre/include/linux/lustre_export.h [new file with mode: 0644]
lustre/include/linux/lustre_ha.h
lustre/include/linux/lustre_idl.h
lustre/include/linux/lustre_import.h [new file with mode: 0644]
lustre/include/linux/lustre_lib.h
lustre/include/linux/lustre_mds.h
lustre/include/linux/lustre_net.h
lustre/include/linux/obd.h
lustre/include/linux/obd_class.h
lustre/ldlm/ldlm_lockd.c
lustre/ldlm/ldlm_request.c
lustre/ldlm/ldlm_resource.c
lustre/ldlm/ldlm_test.c
lustre/lib/l_net.c
lustre/lib/lov_pack.c
lustre/lib/page.c
lustre/lib/simple.c
lustre/llite/recover.c
lustre/llite/super.c
lustre/mdc/mdc_reint.c
lustre/mdc/mdc_request.c
lustre/mds/handler.c
lustre/mds/mds_extN.c
lustre/obdclass/class_obd.c
lustre/obdclass/genops.c
lustre/obdclass/proc_lustre.c
lustre/osc/osc_request.c
lustre/ost/ost_handler.c
lustre/ptlrpc/client.c
lustre/ptlrpc/connection.c
lustre/ptlrpc/niobuf.c
lustre/ptlrpc/recovd.c
lustre/ptlrpc/rpc.c
lustre/ptlrpc/service.c

index a2576f2..0482b9c 100644 (file)
@@ -8,10 +8,11 @@
 #ifdef __KERNEL__
 
 #include <linux/proc_fs.h>
+#include <linux/lustre_lib.h>
 #include <linux/lustre_net.h>
+#include <linux/lustre_import.h>
 
 struct obd_ops;
-struct obd_export;
 struct obd_device;
 
 #define OBD_LDLM_DEVICENAME  "ldlm"
@@ -85,7 +86,6 @@ static inline int lockmode_compat(ldlm_mode_t exist, ldlm_mode_t new)
 
 struct ldlm_namespace {
         char                  *ns_name;
-        struct ptlrpc_client   ns_rpc_client;/* used for revocation callbacks */
         __u32                  ns_client; /* is this a client-side lock tree? */
         struct list_head      *ns_hash; /* hash table for ns */
         __u32                  ns_refcount; /* count of resources in the hash */
@@ -132,10 +132,9 @@ struct ldlm_lock {
         ldlm_blocking_callback    l_blocking_ast;
 
         struct obd_export    *l_export;
-        struct ptlrpc_client *l_client;
         struct lustre_handle *l_connh;
         __u32                 l_flags;
-        struct lustre_handle    l_remote_handle;
+        struct lustre_handle   l_remote_handle;
         void                 *l_data;
         __u32                 l_data_len;
         void                 *l_cookie;
@@ -199,7 +198,7 @@ struct ldlm_ast_work {
 /* Per-export ldlm state. */
 struct ldlm_export_data {
         struct list_head        led_held_locks;
-        struct ptlrpc_client    led_client;     /* cached client for locks  */
+        struct obd_import       led_import;
 };
         
 static inline struct ldlm_extent *ldlm_res2extent(struct ldlm_resource *res)
diff --git a/lustre/include/linux/lustre_export.h b/lustre/include/linux/lustre_export.h
new file mode 100644 (file)
index 0000000..6b00876
--- /dev/null
@@ -0,0 +1,40 @@
+/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
+ * vim:expandtab:shiftwidth=8:tabstop=8:
+ *
+ * Copyright (C) 2001  Cluster File Systems, Inc.
+ *
+ * This code is issued under the GNU General Public License.
+ * See the file COPYING in this distribution
+ */
+
+#ifndef __EXPORT_H
+#define __EXPORT_H
+
+#ifdef __KERNEL__
+
+#include <linux/lustre_idl.h>
+#include <linux/lustre_dlm.h>
+
+struct obd_export {
+        __u64                     exp_cookie;
+        struct lustre_handle      exp_impconnh;
+        struct list_head          exp_obd_chain;
+        struct list_head          exp_conn_chain;
+        struct obd_device        *exp_obd;
+        struct ptlrpc_connection *exp_connection;
+        struct mds_export_data    exp_mds_data;
+        struct ldlm_export_data   exp_ldlm_data;
+#if NOTYET && 0
+        struct ost_export_data    exp_ost_data;
+#endif
+        void                     *exp_data; /* device specific data */
+        int                       exp_desclen;
+        char                     *exp_desc;
+        uuid_t                    exp_uuid;
+};
+
+extern struct obd_export *class_conn2export(struct lustre_handle *conn);
+extern struct obd_device *class_conn2obd(struct lustre_handle *conn);
+#endif /* __KERNEL__ */
+
+#endif /* __EXPORT_H */
index 4f0fa0c..8095804 100644 (file)
@@ -7,18 +7,41 @@
 
 #define LUSTRE_HA_NAME "ptlrpc"
 
-struct recovd_data {
-        struct list_head rd_managed_chain;
-        int (*rd_recover)(struct recovd_data *);
-};
-
+struct recovd_data;
 struct recovd_obd;
 struct ptlrpc_connection;
 
+/* recovd_phase values */
+#define RECOVD_IDLE              0
+#define RECOVD_PREPARING         1
+#define RECOVD_PREPARED          2
+#define RECOVD_RECOVERING        3
+#define RECOVD_RECOVERED         4
+
+/* recovd_flags bits */
+#define RECOVD_STOPPING          1  /* how cleanup tells recovd to quit */
+#define RECOVD_STOPPED           2  /* after recovd has stopped */
+#define RECOVD_FAILED            4  /* the current recovery has failed */
+
+#define PTLRPC_RECOVD_PHASE_PREPARE  1
+#define PTLRPC_RECOVD_PHASE_RECOVER  2
+#define PTLRPC_RECOVD_PHASE_FAILURE  3
+
+typedef int (*ptlrpc_recovery_cb_t)(struct recovd_data *, int);
+
+struct recovd_data {
+        struct list_head     rd_managed_chain;
+        ptlrpc_recovery_cb_t rd_recover;
+        struct recovd_obd   *rd_recovd;
+};
+
 void recovd_conn_fail(struct ptlrpc_connection *conn);
-void recovd_conn_manage(struct recovd_obd *mgr, struct ptlrpc_connection *conn);
+void recovd_conn_manage(struct ptlrpc_connection *conn, struct recovd_obd *mgr,
+                        ptlrpc_recovery_cb_t recover);
 void recovd_conn_fixed(struct ptlrpc_connection *conn);
 int recovd_setup(struct recovd_obd *mgr);
 int recovd_cleanup(struct recovd_obd *mgr);
 
+extern struct recovd_obd *ptlrpc_recovd;
+
 #endif
index 2bb996f..2ed215e 100644 (file)
@@ -80,14 +80,6 @@ typedef __u8 uuid_t[37];
 #define SVC_STOPPING            16
 #define SVC_STOPPED             32
 
-#define RECOVD_STOPPING          1  /* how cleanup tells recovd to quit */
-#define RECOVD_IDLE              2  /* normal state */
-#define RECOVD_STOPPED           4  /* after recovd has stopped */
-#define RECOVD_FAIL              8  /* RPC timeout: wakeup recovd, sets flag */
-#define RECOVD_TIMEOUT          16  /* set when recovd detects a timeout */
-#define RECOVD_UPCALL_WAIT      32  /* an upcall has been placed */
-#define RECOVD_UPCALL_ANSWER    64  /* an upcall has been answered */
-
 #define LUSTRE_CONN_NEW          1
 #define LUSTRE_CONN_CON          2
 #define LUSTRE_CONN_RECOVD       3
@@ -109,6 +101,11 @@ struct lustre_handle {
         __u64 cookie;
 };
 
+static inline void ptlrpc_invalidate_handle(struct lustre_handle *hdl)
+{
+        hdl->addr = hdl->cookie = 0; /* XXX invalid enough? */
+}
+
 /* we depend on this structure to be 8-byte aligned */
 struct lustre_msg {
         __u64 addr;
diff --git a/lustre/include/linux/lustre_import.h b/lustre/include/linux/lustre_import.h
new file mode 100644 (file)
index 0000000..e6418cf
--- /dev/null
@@ -0,0 +1,27 @@
+/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
+ * vim:expandtab:shiftwidth=8:tabstop=8:
+ *
+ * Copyright (C) 2001  Cluster File Systems, Inc.
+ *
+ * This code is issued under the GNU General Public License.
+ * See the file COPYING in this distribution
+ */
+
+#ifndef __IMPORT_H
+#define __IMPORT_H
+
+#ifdef __KERNEL__
+
+#include <linux/lustre_idl.h>
+struct obd_import {
+        struct ptlrpc_connection *imp_connection;
+        struct ptlrpc_client     *imp_client;
+        struct lustre_handle      imp_handle;
+};
+
+extern struct obd_import *class_conn2cliimp(struct lustre_handle *);
+extern struct obd_import *class_conn2ldlmimp(struct lustre_handle *);
+
+#endif /* __KERNEL__ */
+
+#endif /* __IMPORT_H */
index 62edf8c..a166ee9 100644 (file)
@@ -29,6 +29,7 @@
 # include <string.h>
 #else
 # include <asm/semaphore.h>
+#include <linux/kp30.h> /* XXX just for LASSERT! */
 #endif
 #include <linux/portals_lib.h>
 #include <linux/lustre_idl.h>
@@ -37,6 +38,8 @@
 /* l_net.c */
 struct ptlrpc_request;
 struct obd_device;
+struct recovd_data;
+
 int target_handle_connect(struct ptlrpc_request *req);
 int target_handle_disconnect(struct ptlrpc_request *req);
 int client_obd_connect(struct lustre_handle *conn, struct obd_device *obd,
@@ -46,6 +49,8 @@ int client_obd_setup(struct obd_device *obddev, obd_count len, void *buf);
 int client_obd_cleanup(struct obd_device * obddev);
 struct client_obd *client_conn2cli(struct lustre_handle *conn); 
 
+int target_revoke_connection(struct recovd_data *rd, int phase);
+
 /* l_lock.c */
 struct lustre_lock { 
         int l_depth;
index 08ea7ea..ec6ecb6 100644 (file)
 
 #ifdef __KERNEL__
 
+#include <linux/fs.h>
 #include <linux/lustre_idl.h>
-#include <linux/lustre_net.h>
 
 struct ldlm_lock_desc;
 struct lov_stripe_md;
+struct mds_obd;
+struct ptlrpc_connection;
+struct ptlrpc_client;
+struct obd_export;
+struct ptlrpc_request;
+struct obd_device;
 
 #define LUSTRE_MDS_NAME "mds"
 #define LUSTRE_MDC_NAME "mdc"
@@ -140,10 +146,6 @@ struct dentry *mds_fid2dentry(struct mds_obd *mds, struct ll_fid *fid,
 int mds_reint(int offset, struct ptlrpc_request *req);
 
 /* mdc/mdc_request.c */
-extern int mdc_con2cl(struct lustre_handle *conn, struct ptlrpc_client **cl,
-                      struct ptlrpc_connection **connection,
-                      struct lustre_handle **rconn);
-
 int mdc_enqueue(struct lustre_handle *conn, int lock_type,
                 struct lookup_intent *it, int lock_mode, struct inode *dir,
                 struct dentry *de, struct lustre_handle *lockh, char *tgt,
@@ -212,77 +214,6 @@ extern void mds_unregister_fs_type(const char *name);
 extern int mds_fs_setup(struct obd_device *obddev, struct vfsmount *mnt);
 extern void mds_fs_cleanup(struct obd_device *obddev);
 
-static inline void *mds_fs_start(struct mds_obd *mds, struct inode *inode,
-                                 int op)
-{
-        return mds->mds_fsops->fs_start(inode, op);
-}
-
-static inline int mds_fs_commit(struct mds_obd *mds, struct inode *inode,
-                                void *handle)
-{
-        return mds->mds_fsops->fs_commit(inode, handle);
-}
-
-static inline int mds_fs_setattr(struct mds_obd *mds, struct dentry *dentry,
-                                 void *handle, struct iattr *iattr)
-{
-        int rc;
-        /*
-         * NOTE: we probably don't need to take i_sem here when changing
-         *       ATTR_SIZE because the MDS never needs to truncate a file.
-         *       The ext2/ext3 code never truncates a directory, and files
-         *       stored on the MDS are entirely sparse (no data blocks).
-         *       If we do need to get it, we can do it here.
-         */
-        lock_kernel();
-        rc = mds->mds_fsops->fs_setattr(dentry, handle, iattr);
-        unlock_kernel();
-
-        return rc;
-}
-
-static inline int mds_fs_set_md(struct mds_obd *mds, struct inode *inode,
-                                  void *handle, struct lov_mds_md *md)
-{
-        return mds->mds_fsops->fs_set_md(inode, handle, md);
-}
-
-static inline int mds_fs_get_md(struct mds_obd *mds, struct inode *inode,
-                                  struct lov_mds_md *md)
-{
-        return mds->mds_fsops->fs_get_md(inode, md);
-}
-
-static inline ssize_t mds_fs_readpage(struct mds_obd *mds, struct file *file,
-                                      char *buf, size_t count, loff_t *offset)
-{
-        return mds->mds_fsops->fs_readpage(file, buf, count, offset);
-}
-
-/* Set up callback to update mds->mds_last_committed with the current
- * value of mds->mds_last_recieved when this transaction is on disk.
- */
-static inline int mds_fs_set_last_rcvd(struct mds_obd *mds, void *handle)
-{
-        return mds->mds_fsops->fs_set_last_rcvd(mds, handle);
-}
-
-/* Enable data journaling on the given file */
-static inline ssize_t mds_fs_journal_data(struct mds_obd *mds,
-                                          struct file *file)
-{
-        return mds->mds_fsops->fs_journal_data(file);
-}
-
-static inline int mds_fs_statfs(struct mds_obd *mds, struct statfs *sfs)
-{
-        if (mds->mds_fsops->fs_statfs)
-                return mds->mds_fsops->fs_statfs(mds->mds_sb, sfs);
-
-        return vfs_statfs(mds->mds_sb, sfs);
-}
-
 #define MDS_FSOP_UNLINK         1
 #define MDS_FSOP_RMDIR          2
 #define MDS_FSOP_RENAME         3
index 6264d13..4d5f4b8 100644 (file)
 
 #include <linux/tqueue.h>
 #include <linux/kp30.h>
-#include <linux/obd.h>
+// #include <linux/obd.h>
 #include <portals/p30.h>
 #include <linux/lustre_idl.h>
 #include <linux/lustre_ha.h>
+#include <linux/lustre_import.h>
 
 /* default rpc ring length */
 #define RPC_RING_LENGTH    10
@@ -63,23 +64,17 @@ struct ptlrpc_connection {
         struct list_head        c_clients; /* XXXshaver will be c_imports */
         struct list_head        c_exports;
 
-        /* should this be in recovd_data? */
-        struct recovd_obd      *c_recovd;
 };
 
 struct ptlrpc_client {
-        struct obd_device        *cli_obd;
         __u32                     cli_request_portal;
         __u32                     cli_reply_portal;
 
         __u32                     cli_target_devno;
 
-        struct ptlrpc_connection *cli_connection;
-
         void                     *cli_data;
-        struct semaphore          cli_rpc_sem; /* limits outstanding requests */
+        // struct semaphore          cli_rpc_sem; /* limits outstanding requests */
 
-        struct list_head          cli_client_chain;
         char                     *cli_name;
 };
 
@@ -137,7 +132,7 @@ struct ptlrpc_request {
         struct lustre_peer rq_peer; /* XXX see service.c can this be factored away? */
         struct obd_export *rq_export;
         struct ptlrpc_connection *rq_connection;
-        struct ptlrpc_client *rq_client;
+        struct obd_import *rq_import;
         struct ptlrpc_service *rq_svc;
 };
 
@@ -218,9 +213,6 @@ static inline void ptlrpc_hdl2req(struct ptlrpc_request *req, struct lustre_hand
         req->rq_reqmsg->addr = h->addr;
         req->rq_reqmsg->cookie = h->cookie;
 }
-struct ptlrpc_request *ptlrpc_prep_req2(struct lustre_handle *conn, 
-                                        int opcode, int count, int *lengths,
-                                        char **bufs);
 
 typedef void (*bulk_callback_t)(struct ptlrpc_bulk_desc *, void *);
 
@@ -228,7 +220,8 @@ typedef int (*svc_handler_t)(struct ptlrpc_request *req);
 
 /* rpc/connection.c */
 void ptlrpc_readdress_connection(struct ptlrpc_connection *conn, char *uuid);
-struct ptlrpc_connection *ptlrpc_get_connection(struct lustre_peer *peer);
+struct ptlrpc_connection *ptlrpc_get_connection(struct lustre_peer *peer,
+                                                char *uuid);
 int ptlrpc_put_connection(struct ptlrpc_connection *c);
 struct ptlrpc_connection *ptlrpc_connection_addref(struct ptlrpc_connection *);
 void ptlrpc_init_connection(void);
@@ -247,9 +240,9 @@ int ptl_send_rpc(struct ptlrpc_request *request);
 void ptlrpc_link_svc_me(struct ptlrpc_service *service, int i);
 
 /* rpc/client.c */
-void ptlrpc_init_client(int req_portal, int rep_portal, struct ptlrpc_client *,
-                        struct ptlrpc_connection *);
-void ptlrpc_cleanup_client(struct ptlrpc_client *cli);
+void ptlrpc_init_client(int req_portal, int rep_portal, char *name,
+                        struct ptlrpc_client *);
+void ptlrpc_cleanup_client(struct obd_import *imp);
 __u8 *ptlrpc_req_to_uuid(struct ptlrpc_request *req);
 struct ptlrpc_connection *ptlrpc_uuid_to_connection(char *uuid);
 
@@ -258,7 +251,7 @@ void ptlrpc_continue_req(struct ptlrpc_request *req);
 int ptlrpc_replay_req(struct ptlrpc_request *req);
 void ptlrpc_restart_req(struct ptlrpc_request *req);
 
-struct ptlrpc_request *ptlrpc_prep_req(struct ptlrpc_client *cl, int opcode,
+struct ptlrpc_request *ptlrpc_prep_req(struct obd_import *imp, int opcode,
                                        int count, int *lengths, char **bufs);
 void ptlrpc_free_req(struct ptlrpc_request *request);
 void ptlrpc_req_finished(struct ptlrpc_request *request);
index 05294a4..c115d18 100644 (file)
@@ -16,6 +16,8 @@
 
 #include <linux/lustre_lib.h>
 #include <linux/lustre_idl.h>
+#include <linux/lustre_mds.h>
+#include <linux/lustre_export.h>
 
 struct obd_type {
         struct list_head typ_chain;
@@ -99,14 +101,11 @@ struct filter_obd {
 struct mds_server_data;
 
 struct client_obd {
-        struct ptlrpc_client *cl_client;
-        struct ptlrpc_client *cl_ldlm_client;
-        struct ptlrpc_connection *cl_conn;
-        struct lustre_handle cl_exporth;
-        struct semaphore cl_sem;
-        int cl_conn_count;
-        __u8 cl_target_uuid[37];
-        int cl_max_mdsize;
+        struct obd_import    cl_import;
+        struct semaphore     cl_sem;
+        int                  cl_conn_count;
+        __u8                 cl_target_uuid[37]; /* XXX -> lustre_name */
+        int                  cl_max_mdsize;
 };
 
 struct mds_obd {
@@ -150,17 +149,17 @@ struct echo_obd {
 };
 
 struct recovd_obd {
-        time_t                recovd_waketime;
-        time_t                recovd_timeout;
+        __u32                 recovd_phase;
+        __u32                 recovd_next_phase;
         __u32                 recovd_flags; 
-        __u32                 recovd_wakeup_flag; 
+        struct recovd_data   *recovd_current_rd;
         spinlock_t            recovd_lock;
         struct list_head      recovd_managed_items; /* items managed  */
         struct list_head      recovd_troubled_items; /* items in trouble */
         wait_queue_head_t     recovd_recovery_waitq;
         wait_queue_head_t     recovd_ctl_waitq;
         wait_queue_head_t     recovd_waitq;
-        struct task_struct    *recovd_thread;
+        struct task_struct   *recovd_thread;
 };
 
 struct trace_obd {
@@ -217,9 +216,10 @@ struct obd_device {
         int obd_minor;
         int obd_flags;
         struct proc_dir_entry *obd_proc_entry;
-        struct list_head obd_exports;
-        struct list_head obd_imports;
+        struct list_head       obd_exports;
+        struct list_head       obd_imports;
         struct ldlm_namespace *obd_namespace;
+        struct ptlrpc_client   obd_ldlm_client; /* XXX OST/MDS only */
         /* a spinlock is OK for what we do now, may need a semaphore later */
         spinlock_t obd_dev_lock;
         union {
@@ -316,4 +316,75 @@ struct obd_ops {
 #define LPX64 "%Lx"
 #endif
 
+static inline void *mds_fs_start(struct mds_obd *mds, struct inode *inode,
+                                 int op)
+{
+        return mds->mds_fsops->fs_start(inode, op);
+}
+
+static inline int mds_fs_commit(struct mds_obd *mds, struct inode *inode,
+                                void *handle)
+{
+        return mds->mds_fsops->fs_commit(inode, handle);
+}
+
+static inline int mds_fs_setattr(struct mds_obd *mds, struct dentry *dentry,
+                                 void *handle, struct iattr *iattr)
+{
+        int rc;
+        /*
+         * NOTE: we probably don't need to take i_sem here when changing
+         *       ATTR_SIZE because the MDS never needs to truncate a file.
+         *       The ext2/ext3 code never truncates a directory, and files
+         *       stored on the MDS are entirely sparse (no data blocks).
+         *       If we do need to get it, we can do it here.
+         */
+        lock_kernel();
+        rc = mds->mds_fsops->fs_setattr(dentry, handle, iattr);
+        unlock_kernel();
+
+        return rc;
+}
+
+static inline int mds_fs_set_md(struct mds_obd *mds, struct inode *inode,
+                                  void *handle, struct lov_mds_md *md)
+{
+        return mds->mds_fsops->fs_set_md(inode, handle, md);
+}
+
+static inline int mds_fs_get_md(struct mds_obd *mds, struct inode *inode,
+                                  struct lov_mds_md *md)
+{
+        return mds->mds_fsops->fs_get_md(inode, md);
+}
+
+static inline ssize_t mds_fs_readpage(struct mds_obd *mds, struct file *file,
+                                      char *buf, size_t count, loff_t *offset)
+{
+        return mds->mds_fsops->fs_readpage(file, buf, count, offset);
+}
+
+/* Set up callback to update mds->mds_last_committed with the current
+ * value of mds->mds_last_recieved when this transaction is on disk.
+ */
+static inline int mds_fs_set_last_rcvd(struct mds_obd *mds, void *handle)
+{
+        return mds->mds_fsops->fs_set_last_rcvd(mds, handle);
+}
+
+/* Enable data journaling on the given file */
+static inline ssize_t mds_fs_journal_data(struct mds_obd *mds,
+                                          struct file *file)
+{
+        return mds->mds_fsops->fs_journal_data(file);
+}
+
+static inline int mds_fs_statfs(struct mds_obd *mds, struct statfs *sfs)
+{
+        if (mds->mds_fsops->fs_statfs)
+                return mds->mds_fsops->fs_statfs(mds->mds_sb, sfs);
+
+        return vfs_statfs(mds->mds_sb, sfs);
+}
+
 #endif
index 55a29fc..f554ab7 100644 (file)
@@ -64,38 +64,6 @@ extern void proc_lustre_remove_obd_entry(const char* name,
  */
 
 #ifdef __KERNEL__
-struct obd_export {
-        __u64 exp_cookie;
-        struct lustre_handle      exp_rconnh;     /* remote connection handle */
-        struct lustre_handle      exp_impconnh;
-        struct list_head          exp_chain;
-        struct obd_device        *exp_obd;
-        struct ptlrpc_connection *exp_connection;
-        struct mds_export_data    exp_mds_data;
-        struct ldlm_export_data   exp_ldlm_data;
-#if NOTYET && 0
-        struct ost_export_data    exp_ost_data;
-#endif
-        void                     *exp_data; /* device specific data */
-        int                       exp_desclen;
-        char                     *exp_desc;
-        uuid_t                    exp_uuid;
-};
-
-extern struct obd_export *class_conn2export(struct lustre_handle *conn);
-extern struct obd_device *class_conn2obd(struct lustre_handle *conn);
-extern int class_rconn2export(struct lustre_handle *conn,
-                              struct lustre_handle *rconn);
-
-struct obd_import {
-        __u64 imp_cookie;
-        struct lustre_handle imp_expconnh;
-        struct list_head imp_chain;
-        struct obd_device *imp_obd;
-        unsigned int imp_id;
-        void *imp_data; /* device specific data */
-};
-
 static inline int obd_check_conn(struct lustre_handle *conn) 
 {
         struct obd_device *obd;
@@ -732,6 +700,12 @@ extern int (*mds_destroy_export)(struct obd_export *exp);
 /* == ldlm_client_free if(?) DLM running here */
 extern int (*ldlm_destroy_export)(struct obd_export *exp);
 
+static inline struct ptlrpc_connection *class_rd2conn(struct recovd_data *rd)
+{
+        /* reuse list_entry's member-pointer offset stuff */
+        return list_entry(rd, struct ptlrpc_connection, c_recovd_data);
+}
+
 #endif
 
 /* sysctl.c */
index 22d7470..b8d2293 100644 (file)
@@ -122,7 +122,7 @@ static int ldlm_server_blocking_ast(struct ldlm_lock *lock,
         int rc = 0, size = sizeof(*body);
         ENTRY;
 
-        req = ptlrpc_prep_req(&lock->l_export->exp_ldlm_data.led_client,
+        req = ptlrpc_prep_req(&lock->l_export->exp_ldlm_data.led_import,
                               LDLM_BL_CALLBACK, 1, &size, NULL);
         if (!req)
                 RETURN(-ENOMEM);
@@ -155,7 +155,7 @@ static int ldlm_server_completion_ast(struct ldlm_lock *lock, int flags)
                 RETURN(-EINVAL);
         }
 
-        req = ptlrpc_prep_req(&lock->l_export->exp_ldlm_data.led_client,
+        req = ptlrpc_prep_req(&lock->l_export->exp_ldlm_data.led_import,
                               LDLM_CP_CALLBACK, 1, &size, NULL);
         if (!req)
                 RETURN(-ENOMEM);
@@ -517,8 +517,10 @@ static int ldlm_iocontrol(long cmd, struct lustre_handle *conn, int len,
         if (!connection)
                 CERROR("No LDLM UUID found: assuming ldlm is local.\n");
 
-        ptlrpc_init_client(LDLM_REQUEST_PORTAL, LDLM_REPLY_PORTAL,
-                           obddev->u.ldlm.ldlm_client, connection);
+        /* XXX
+           ptlrpc_init_client(LDLM_REQUEST_PORTAL, LDLM_REPLY_PORTAL,
+           obddev->u.ldlm.ldlm_client, connection);
+        */
 
         switch (cmd) {
         case IOC_LDLM_TEST:
index 897e1ad..3731f7d 100644 (file)
@@ -12,6 +12,7 @@
 #define DEBUG_SUBSYSTEM S_LDLM
 
 #include <linux/lustre_dlm.h>
+#include <linux/obd.h>
 
 int ldlm_completion_ast(struct ldlm_lock *lock, int flags)
 {
@@ -130,7 +131,8 @@ int ldlm_cli_enqueue(struct lustre_handle *connh,
         ldlm_lock2handle(lock, lockh);
 
         if (req == NULL) {
-                req = ptlrpc_prep_req2(connh, LDLM_ENQUEUE, 1, &size, NULL);
+                req = ptlrpc_prep_req(class_conn2cliimp(connh), LDLM_ENQUEUE, 1,
+                                      &size, NULL);
                 if (!req)
                         GOTO(out, rc = -ENOMEM);
                 req_passed_in = 0;
@@ -158,7 +160,6 @@ int ldlm_cli_enqueue(struct lustre_handle *connh,
         }
         lock->l_connh = connh;
         lock->l_export = NULL;
-        lock->l_client = client_conn2cli(connh)->cl_client;
 
         rc = ptlrpc_queue_wait(req);
         /* FIXME: status check here? */
@@ -303,7 +304,8 @@ int ldlm_cli_convert(struct lustre_handle *lockh, int new_mode, int *flags)
 
         LDLM_DEBUG(lock, "client-side convert");
 
-        req = ptlrpc_prep_req2(connh, LDLM_CONVERT, 1, &size, NULL);
+        req = ptlrpc_prep_req(class_conn2cliimp(connh), LDLM_CONVERT, 1, &size,
+                              NULL);
         if (!req)
                 GOTO(out, rc = -ENOMEM);
 
@@ -363,8 +365,8 @@ int ldlm_cli_cancel(struct lustre_handle *lockh)
                 lock->l_flags |= LDLM_FL_CBPENDING;
                 l_unlock(&lock->l_resource->lr_namespace->ns_lock);
 
-                req = ptlrpc_prep_req2(lock->l_connh, LDLM_CANCEL, 1, &size,
-                                       NULL);
+                req = ptlrpc_prep_req(class_conn2cliimp(lock->l_connh), 
+                                      LDLM_CANCEL, 1, &size, NULL);
                 if (!req)
                         GOTO(out, rc = -ENOMEM);
 
index a198bf3..ac53d84 100644 (file)
@@ -177,7 +177,7 @@ int ldlm_namespace_free(struct ldlm_namespace *ns)
 int ldlm_client_free(struct obd_export *exp)
 {
         struct ldlm_export_data *led = &exp->exp_ldlm_data;
-        ptlrpc_cleanup_client(&led->led_client);
+        ptlrpc_cleanup_client(&led->led_import);
         RETURN(0);
 }
 
index f2c0b20..2676301 100644 (file)
@@ -28,6 +28,7 @@
 #include <linux/random.h>
 
 #include <linux/lustre_dlm.h>
+#include <linux/obd.h>
 
 struct ldlm_test_thread {
         struct obd_device *obddev;
index 27c96e6..e0a3dc7 100644 (file)
@@ -49,13 +49,22 @@ struct client_obd *client_conn2cli(struct lustre_handle *conn)
 int client_obd_setup(struct obd_device *obddev, obd_count len, void *buf)
 {
         struct obd_ioctl_data* data = buf;
-        int rq_portal = (obddev->obd_type->typ_ops->o_brw) ? OST_REQUEST_PORTAL : MDS_REQUEST_PORTAL;
-        int rp_portal = (obddev->obd_type->typ_ops->o_brw) ? OSC_REPLY_PORTAL : MDC_REPLY_PORTAL;
+        int rq_portal, rp_portal;
+        char *name;
         struct client_obd *mdc = &obddev->u.cli;
         char server_uuid[37];
-        int rc;
         ENTRY;
 
+        if (obddev->obd_type->typ_ops->o_brw) {
+                rq_portal = OST_REQUEST_PORTAL;
+                rp_portal = OSC_REPLY_PORTAL;
+                name = "osc";
+        } else {
+                rq_portal = MDS_REQUEST_PORTAL;
+                rp_portal = MDC_REPLY_PORTAL;
+                name = "mdc";
+        }
+
         if (data->ioc_inllen1 < 1) {
                 CERROR("requires a TARGET UUID\n");
                 RETURN(-EINVAL);
@@ -82,44 +91,26 @@ int client_obd_setup(struct obd_device *obddev, obd_count len, void *buf)
         memcpy(server_uuid, data->ioc_inlbuf2, MIN(data->ioc_inllen2,
                                                    sizeof(server_uuid)));
 
-        mdc->cl_conn = ptlrpc_uuid_to_connection(server_uuid);
-        if (!mdc->cl_conn)
+        mdc->cl_import.imp_connection = ptlrpc_uuid_to_connection(server_uuid);
+        if (!mdc->cl_import.imp_connection)
                 RETURN(-ENOENT);
 
-        OBD_ALLOC(mdc->cl_client, sizeof(*mdc->cl_client));
-        if (mdc->cl_client == NULL)
-                GOTO(out_conn, rc = -ENOMEM);
-
-        OBD_ALLOC(mdc->cl_ldlm_client, sizeof(*mdc->cl_ldlm_client));
-        if (mdc->cl_ldlm_client == NULL)
-                GOTO(out_client, rc = -ENOMEM);
+        ptlrpc_init_client(rq_portal, rp_portal, name,
+                           &obddev->obd_ldlm_client);
+        mdc->cl_import.imp_client = &obddev->obd_ldlm_client;
 
-        ptlrpc_init_client(rq_portal, rp_portal, mdc->cl_client, mdc->cl_conn);
-        ptlrpc_init_client(LDLM_REQUEST_PORTAL, LDLM_REPLY_PORTAL,
-                           mdc->cl_ldlm_client, mdc->cl_conn);
-        mdc->cl_client->cli_name = "mdc";
-        mdc->cl_ldlm_client->cli_name = "ldlm";
         mdc->cl_max_mdsize = sizeof(struct lov_mds_md);
 
         MOD_INC_USE_COUNT;
         RETURN(0);
-
- out_client:
-        OBD_FREE(mdc->cl_client, sizeof(*mdc->cl_client));
- out_conn:
-        ptlrpc_put_connection(mdc->cl_conn);
-        return rc;
 }
 
 int client_obd_cleanup(struct obd_device * obddev)
 {
         struct client_obd *mdc = &obddev->u.cli;
 
-        ptlrpc_cleanup_client(mdc->cl_client);
-        OBD_FREE(mdc->cl_client, sizeof(*mdc->cl_client));
-        ptlrpc_cleanup_client(mdc->cl_ldlm_client);
-        OBD_FREE(mdc->cl_ldlm_client, sizeof(*mdc->cl_ldlm_client));
-        ptlrpc_put_connection(mdc->cl_conn);
+        ptlrpc_cleanup_client(&mdc->cl_import);
+        ptlrpc_put_connection(mdc->cl_import.imp_connection);
 
         MOD_DEC_USE_COUNT;
         return 0;
@@ -143,7 +134,6 @@ int client_obd_connect(struct lustre_handle *conn, struct obd_device *obd,
                 MOD_DEC_USE_COUNT;
                 GOTO(out_sem, rc);
         }
-
         cli->cl_conn_count++;
         if (cli->cl_conn_count > 1)
                 GOTO(out_sem, rc);
@@ -153,7 +143,7 @@ int client_obd_connect(struct lustre_handle *conn, struct obd_device *obd,
         if (obd->obd_namespace == NULL)
                 GOTO(out_disco, rc = -ENOMEM);
 
-        request = ptlrpc_prep_req(cli->cl_client, rq_opc, 2, size, tmp);
+        request = ptlrpc_prep_req(&cli->cl_import, rq_opc, 2, size, tmp);
         if (!request)
                 GOTO(out_ldlm, rc = -ENOMEM);
 
@@ -161,8 +151,9 @@ int client_obd_connect(struct lustre_handle *conn, struct obd_device *obd,
         request->rq_replen = lustre_msg_size(0, NULL);
         //   This handle may be important if a callback needs
         //   to find the mdc/osc
-        //        request->rq_reqmsg->addr = conn->addr;
-        //        request->rq_reqmsg->cookie = conn->cookie;
+        request->rq_reqmsg->addr = conn->addr;
+        request->rq_reqmsg->cookie = conn->cookie;
+        class_conn2export(conn)->exp_connection = request->rq_connection;
 
         rc = ptlrpc_queue_wait(request);
         rc = ptlrpc_check_status(request, rc);
@@ -170,7 +161,7 @@ int client_obd_connect(struct lustre_handle *conn, struct obd_device *obd,
                 GOTO(out_req, rc);
 
         request->rq_connection->c_level = LUSTRE_CONN_FULL;
-        cli->cl_exporth = *(struct lustre_handle *)request->rq_repmsg;
+        cli->cl_import.imp_handle = *(struct lustre_handle *)request->rq_repmsg;
 
         EXIT;
 out_req:
@@ -210,7 +201,7 @@ int client_obd_disconnect(struct lustre_handle *conn)
 
         ldlm_namespace_free(obd->obd_namespace);
         obd->obd_namespace = NULL;
-        request = ptlrpc_prep_req2(conn, rq_opc, 0, NULL, NULL);
+        request = ptlrpc_prep_req(&cli->cl_import, rq_opc, 0, NULL, NULL);
         if (!request)
                 GOTO(out_disco, rc = -ENOMEM);
 
@@ -238,6 +229,7 @@ int target_handle_connect(struct ptlrpc_request *req)
 {
         struct obd_device *target;
         struct obd_export *export;
+        struct obd_import *dlmimp;
         struct lustre_handle conn;
         char *tgtuuid, *cluuid;
         int rc, i;
@@ -282,11 +274,21 @@ int target_handle_connect(struct ptlrpc_request *req)
         LASSERT(export);
 
         req->rq_export = export;
-        export->exp_connection = req->rq_connection;
-        ptlrpc_init_client(LDLM_REQUEST_PORTAL, LDLM_REPLY_PORTAL,
-                           &export->exp_ldlm_data.led_client,
-                           export->exp_connection);
-
+        export->exp_connection = ptlrpc_get_connection(&req->rq_peer, cluuid);
+        req->rq_connection = export->exp_connection;
+
+        spin_lock(&export->exp_connection->c_lock);
+        list_add(&export->exp_conn_chain, &export->exp_connection->c_exports);
+        spin_unlock(&export->exp_connection->c_lock);
+
+        recovd_conn_manage(export->exp_connection, ptlrpc_recovd,
+                           target_revoke_connection);
+        dlmimp = &export->exp_ldlm_data.led_import;
+        dlmimp->imp_connection = req->rq_connection;
+        dlmimp->imp_client = &export->exp_obd->obd_ldlm_client;
+        dlmimp->imp_handle.addr = req->rq_reqmsg->addr;
+        dlmimp->imp_handle.cookie = req->rq_reqmsg->cookie;
+        
 #warning Peter: is this the right place to upgrade the server connection level?
         req->rq_connection->c_level = LUSTRE_CONN_FULL;
 out:
@@ -307,3 +309,46 @@ int target_handle_disconnect(struct ptlrpc_request *req)
         req->rq_status = obd_disconnect(conn);
         RETURN(0);
 }
+
+static int target_revoke_client_resources(struct ptlrpc_connection *conn)
+{
+        struct list_head *tmp, *pos;
+
+        ENTRY;
+
+        /* Cancel outstanding locks. */
+        list_for_each_safe(tmp, pos, &conn->c_exports) {
+        }
+
+        RETURN(0);
+}
+
+static int target_fence_failed_connection(struct ptlrpc_connection *conn)
+{
+        ENTRY;
+
+        conn->c_level = LUSTRE_CONN_RECOVD;
+
+        RETURN(0);
+}
+
+int target_revoke_connection(struct recovd_data *rd, int phase)
+{
+        struct ptlrpc_connection *conn = class_rd2conn(rd);
+        
+        LASSERT(conn);
+        ENTRY;
+
+        switch (phase) {
+            case PTLRPC_RECOVD_PHASE_PREPARE:
+                RETURN(target_fence_failed_connection(conn));
+            case PTLRPC_RECOVD_PHASE_RECOVER:
+                RETURN(target_revoke_client_resources(conn));
+            case PTLRPC_RECOVD_PHASE_FAILURE:
+                LBUG();
+                RETURN(0);
+        }
+
+        LBUG();
+        RETURN(-ENOSYS);
+}
index f4a9fe2..626ac76 100644 (file)
@@ -25,6 +25,7 @@
 #define DEBUG_SUBSYSTEM S_LLITE
 
 #include <linux/lustre_net.h>
+#include <linux/obd.h>
 #include <linux/obd_support.h>
 
 void lov_packdesc(struct lov_desc *ld)
index 5134508..56fd7ae 100644 (file)
@@ -58,8 +58,7 @@ static int sync_io_timeout(void *data)
         ENTRY;
         desc->b_connection->c_level = LUSTRE_CONN_RECOVD;
         desc->b_flags |= PTL_RPC_FL_TIMEOUT;
-        if (desc->b_connection && desc->b_connection->c_recovd &&
-            class_signal_connection_failure) {
+        if (desc->b_connection && class_signal_connection_failure) {
                 /* XXXshaver Do we need a resend strategy, or do we just
                  * XXXshaver return -ERESTARTSYS and punt it?
                  */
index d38b3f2..4e4d7f5 100644 (file)
@@ -20,6 +20,7 @@
 
 #define DEBUG_SUBSYSTEM S_FILTER
 
+#include <linux/obd.h>
 #include <linux/lustre_mds.h>
 #include <linux/lustre_lib.h>
 #include <linux/lustre_net.h>
index 1785d29..368a893 100644 (file)
@@ -16,6 +16,7 @@
 
 #include <linux/config.h>
 #include <linux/module.h>
+#include <linux/kmod.h>
 
 #define DEBUG_SUBSYSTEM S_LLITE
 
@@ -29,10 +30,11 @@ static int ll_reconnect(struct ll_sb_info *sbi)
         __u64 last_xid;
         int err;
         struct ptlrpc_request *request; 
+        struct ptlrpc_connection *conn = sbi2mdc(sbi)->cl_import.imp_connection;
 
-        ptlrpc_readdress_connection(sbi2mdc(sbi)->cl_conn, "mds");
+        ptlrpc_readdress_connection(conn, "mds");
 
-        sbi2mdc(sbi)->cl_conn->c_level = LUSTRE_CONN_CON;
+        conn->c_level = LUSTRE_CONN_CON;
 
         /* XXX: need to store the last_* values somewhere */
         err = mdc_getstatus(&sbi->ll_mdc_conn, &rootfid, &last_committed,
@@ -41,14 +43,33 @@ static int ll_reconnect(struct ll_sb_info *sbi)
                 CERROR("cannot mds_connect: rc = %d\n", err);
                 GOTO(out_disc, err = -ENOTCONN);
         }
-        sbi2mdc(sbi)->cl_conn->c_last_xid = last_xid;
-        sbi2mdc(sbi)->cl_conn->c_level = LUSTRE_CONN_RECOVD;
+        conn->c_last_xid = last_xid;
+        conn->c_level = LUSTRE_CONN_RECOVD;
 
  out_disc:
         return err;
 }
 
-int ll_recover(struct ptlrpc_client *cli)
+static int ll_recover_upcall(struct ptlrpc_connection *conn)
+{
+        char *argv[3];
+        char *envp[3];
+
+        ENTRY;
+        conn->c_level = LUSTRE_CONN_RECOVD;
+
+        argv[0] = obd_recovery_upcall;
+        argv[1] = conn->c_remote_uuid;
+        argv[2] = NULL;
+
+        envp[0] = "HOME=/";
+        envp[1] = "PATH=/sbin:/bin:/usr/sbin:/usr/bin";
+        envp[2] = NULL;
+
+        RETURN(call_usermodehelper(argv[0], argv, envp));
+}
+
+static int ll_recover_reconnect(struct ptlrpc_connection *conn)
 {
         RETURN(-ENOSYS);
 #if 0
@@ -143,3 +164,24 @@ int ll_recover(struct ptlrpc_client *cli)
         return rc;
 #endif
 }
+
+int ll_recover(struct recovd_data *rd, int phase)
+{
+        struct ptlrpc_connection *conn = class_rd2conn(rd);
+
+        LASSERT(conn);
+        ENTRY;
+
+        switch (phase) {
+            case PTLRPC_RECOVD_PHASE_PREPARE:
+                RETURN(ll_recover_upcall(conn));
+            case PTLRPC_RECOVD_PHASE_RECOVER:
+                RETURN(ll_recover_reconnect(conn));
+            case PTLRPC_RECOVD_PHASE_FAILURE:
+                fixme();
+                RETURN(0);
+        }
+
+        LBUG();
+        RETURN(-ENOSYS);
+}
index 0d7b872..1777fe4 100644 (file)
@@ -24,7 +24,7 @@ extern struct address_space_operations ll_aops;
 extern struct address_space_operations ll_dir_aops;
 struct super_operations ll_super_operations;
 
-extern int ll_recover(struct ptlrpc_client *);
+extern int ll_recover(struct recovd_data *, int);
 extern int ll_commitcbd_setup(struct ll_sb_info *);
 extern int ll_commitcbd_cleanup(struct ll_sb_info *);
 
@@ -131,7 +131,11 @@ static struct super_block * ll_read_super(struct super_block *sb,
                 CERROR("cannot connect to %s: rc = %d\n", mdc, err);
                 GOTO(out_free, sb = NULL);
         }
-        sbi2mdc(sbi)->cl_conn->c_level = LUSTRE_CONN_FULL;
+        recovd_conn_manage(obd->u.cli.cl_import.imp_connection,
+                           ptlrpc_recovd, ll_recover);
+        
+#warning Peter: is this the right place to raise the connection level?
+        sbi2mdc(sbi)->cl_import.imp_connection->c_level = LUSTRE_CONN_FULL;
 
         obd = class_uuid2obd(osc);
         if (!obd) {
@@ -143,6 +147,8 @@ static struct super_block * ll_read_super(struct super_block *sb,
                 CERROR("cannot connect to %s: rc = %d\n", osc, err);
                 GOTO(out_mdc, sb = NULL);
         }
+        recovd_conn_manage(obd->u.cli.cl_import.imp_connection,
+                           ptlrpc_recovd, ll_recover);
 
         /* XXX: need to store the last_* values somewhere */
         err = mdc_getstatus(&sbi->ll_mdc_conn, &rootfid, &last_committed,
index b35ed6a..90ef7e5 100644 (file)
@@ -55,7 +55,8 @@ int mdc_setattr(struct lustre_handle *conn,
         int rc, size = sizeof(*rec);
         ENTRY;
 
-        req = ptlrpc_prep_req2(conn, MDS_REINT, 1, &size, NULL);
+        req = ptlrpc_prep_req(class_conn2cliimp(conn), MDS_REINT, 1, &size,
+                              NULL);
         if (!req)
                 RETURN(-ENOMEM);
 
@@ -98,7 +99,8 @@ int mdc_create(struct lustre_handle *conn,
                 bufcount = 3;
         }
 
-        req = ptlrpc_prep_req2(conn, MDS_REINT, bufcount, size, NULL);
+        req = ptlrpc_prep_req(class_conn2cliimp(conn), MDS_REINT, bufcount, size,
+                              NULL);
         if (!req)
                 RETURN(-ENOMEM);
 
@@ -141,7 +143,7 @@ int mdc_unlink(struct lustre_handle *conn, struct inode *dir,
         int rc, size[2] = {sizeof(struct mds_rec_unlink), namelen + 1};
         ENTRY;
 
-        req = ptlrpc_prep_req2(conn, MDS_REINT, 2, size, NULL);
+        req = ptlrpc_prep_req(class_conn2cliimp(conn), MDS_REINT, 2, size, NULL);
         if (!req)
                 RETURN(-ENOMEM);
 
@@ -166,7 +168,7 @@ int mdc_link(struct lustre_handle *conn,
         int rc, size[2] = {sizeof(struct mds_rec_link), namelen + 1};
         ENTRY;
 
-        req = ptlrpc_prep_req2(conn, MDS_REINT, 2, size, NULL);
+        req = ptlrpc_prep_req(class_conn2cliimp(conn), MDS_REINT, 2, size, NULL);
         if (!req)
                 RETURN(-ENOMEM);
 
@@ -193,7 +195,7 @@ int mdc_rename(struct lustre_handle *conn,
                            newlen + 1};
         ENTRY;
 
-        req = ptlrpc_prep_req2(conn, MDS_REINT, 3, size, NULL);
+        req = ptlrpc_prep_req(class_conn2cliimp(conn), MDS_REINT, 3, size, NULL);
         if (!req)
                 RETURN(-ENOMEM);
 
index 3f2dcc9..caabca4 100644 (file)
 
 extern int mds_queue_req(struct ptlrpc_request *);
 
-int mdc_con2cl(struct lustre_handle *conn, struct ptlrpc_client **cl,
-               struct ptlrpc_connection **connection,
-               struct lustre_handle **rconn)
-{
-        struct obd_export *export;
-        struct client_obd *mdc;
-
-        export = class_conn2export(conn);
-        if (!export)
-                return -ENOTCONN;
-
-        mdc = &export->exp_obd->u.cli;
-
-        *cl = mdc->cl_client;
-        *connection = mdc->cl_conn;
-        *rconn = &export->exp_rconnh;
-
-        return 0;
-}
-
 int mdc_getstatus(struct lustre_handle *conn, struct ll_fid *rootfid,
                   __u64 *last_committed, __u64 *last_xid,
                   struct ptlrpc_request **request)
@@ -64,7 +44,8 @@ int mdc_getstatus(struct lustre_handle *conn, struct ll_fid *rootfid,
         int rc, size = sizeof(*body);
         ENTRY;
 
-        req = ptlrpc_prep_req2(conn, MDS_GETSTATUS, 1, &size, NULL);
+        req = ptlrpc_prep_req(class_conn2cliimp(conn), MDS_GETSTATUS, 1, &size,
+                              NULL);
         if (!req)
                 GOTO(out, rc = -ENOMEM);
 
@@ -106,7 +87,8 @@ int mdc_getlovinfo(struct obd_device *obd, struct lustre_handle *mdc_connh,
         int rc, size[2] = {sizeof(*streq)};
         ENTRY;
 
-        req = ptlrpc_prep_req2(mdc_connh, MDS_GETLOVINFO, 1, size, NULL);
+        req = ptlrpc_prep_req(class_conn2cliimp(mdc_connh), MDS_GETLOVINFO, 1,
+                              size, NULL);
         if (!req)
                 GOTO(out, rc = -ENOMEM);
 
@@ -147,7 +129,8 @@ int mdc_getattr(struct lustre_handle *conn,
         int rc, size[2] = {sizeof(*body), 0}, bufcount = 1;
         ENTRY;
 
-        req = ptlrpc_prep_req2(conn, MDS_GETATTR, 1, size, NULL);
+        req = ptlrpc_prep_req(class_conn2cliimp(conn), MDS_GETATTR, 1, size,
+                              NULL);
         if (!req)
                 GOTO(out, rc = -ENOMEM);
 
@@ -251,7 +234,8 @@ int mdc_enqueue(struct lustre_handle *conn, int lock_type,
                 size[2] = sizeof(struct mds_rec_create);
                 size[3] = de->d_name.len + 1;
                 size[4] = tgtlen + 1;
-                req = ptlrpc_prep_req2(conn, LDLM_ENQUEUE, 5, size, NULL);
+                req = ptlrpc_prep_req(class_conn2cliimp(conn), LDLM_ENQUEUE, 5,
+                                      size, NULL);
                 if (!req)
                         RETURN(-ENOMEM);
 
@@ -270,7 +254,8 @@ int mdc_enqueue(struct lustre_handle *conn, int lock_type,
                 size[2] = sizeof(struct mds_rec_rename);
                 size[3] = old_de->d_name.len + 1;
                 size[4] = de->d_name.len + 1;
-                req = ptlrpc_prep_req2(conn, LDLM_ENQUEUE, 5, size, NULL);
+                req = ptlrpc_prep_req(class_conn2cliimp(conn), LDLM_ENQUEUE, 5,
+                                      size, NULL);
                 if (!req)
                         RETURN(-ENOMEM);
 
@@ -286,7 +271,8 @@ int mdc_enqueue(struct lustre_handle *conn, int lock_type,
         } else if (it->it_op == IT_UNLINK || it->it_op == IT_RMDIR) {
                 size[2] = sizeof(struct mds_rec_unlink);
                 size[3] = de->d_name.len + 1;
-                req = ptlrpc_prep_req2(conn, LDLM_ENQUEUE, 4, size, NULL);
+                req = ptlrpc_prep_req(class_conn2cliimp(conn), LDLM_ENQUEUE, 4,
+                                      size, NULL);
                 if (!req)
                         RETURN(-ENOMEM);
 
@@ -306,7 +292,8 @@ int mdc_enqueue(struct lustre_handle *conn, int lock_type,
                 size[2] = sizeof(struct mds_body);
                 size[3] = de->d_name.len + 1;
 
-                req = ptlrpc_prep_req2(conn, LDLM_ENQUEUE, 4, size, NULL);
+                req = ptlrpc_prep_req(class_conn2cliimp(conn), LDLM_ENQUEUE, 4,
+                                      size, NULL);
                 if (!req)
                         RETURN(-ENOMEM);
 
@@ -320,7 +307,8 @@ int mdc_enqueue(struct lustre_handle *conn, int lock_type,
                 /* get ready for the reply */
                 req->rq_replen = lustre_msg_size(3, repsize);
         } else if (it->it_op == IT_READDIR) {
-                req = ptlrpc_prep_req2(conn, LDLM_ENQUEUE, 1, size, NULL);
+                req = ptlrpc_prep_req(class_conn2cliimp(conn), LDLM_ENQUEUE, 1,
+                                      size, NULL);
                 if (!req)
                         RETURN(-ENOMEM);
 
@@ -361,9 +349,6 @@ int mdc_open(struct lustre_handle *conn, obd_id ino, int type, int flags,
              struct lov_stripe_md *smd, __u64 cookie, __u64 *fh,
              struct ptlrpc_request **request)
 {
-        struct ptlrpc_client *cl;
-        struct ptlrpc_connection *connection;
-        struct lustre_handle *rconn;
         struct mds_body *body;
         int rc, size[2] = {sizeof(*body)}, bufcount = 1;
         struct ptlrpc_request *req;
@@ -374,8 +359,8 @@ int mdc_open(struct lustre_handle *conn, obd_id ino, int type, int flags,
                 size[1] = smd->lmd_easize;
         }
 
-        mdc_con2cl(conn, &cl, &connection, &rconn);
-        req = ptlrpc_prep_req2(conn, MDS_OPEN, bufcount, size, NULL);
+        req = ptlrpc_prep_req(class_conn2cliimp(conn), MDS_OPEN, bufcount, size,
+                              NULL);
         if (!req)
                 GOTO(out, rc = -ENOMEM);
 
@@ -412,7 +397,8 @@ int mdc_close(struct lustre_handle *conn,
         int rc, size = sizeof(*body);
         struct ptlrpc_request *req;
 
-        req = ptlrpc_prep_req2(conn, MDS_CLOSE, 1, &size, NULL);
+        req = ptlrpc_prep_req(class_conn2cliimp(conn), MDS_CLOSE, 1, &size,
+                              NULL);
         if (!req)
                 GOTO(out, rc = -ENOMEM);
 
@@ -434,7 +420,8 @@ int mdc_close(struct lustre_handle *conn,
 int mdc_readpage(struct lustre_handle *conn, obd_id ino, int type, __u64 offset,
                  char *addr, struct ptlrpc_request **request)
 {
-        struct ptlrpc_connection *connection = client_conn2cli(conn)->cl_conn;
+        struct ptlrpc_connection *connection = 
+                client_conn2cli(conn)->cl_import.imp_connection;
         struct ptlrpc_request *req = NULL;
         struct ptlrpc_bulk_desc *desc = NULL;
         struct ptlrpc_bulk_page *bulk = NULL;
@@ -448,7 +435,8 @@ int mdc_readpage(struct lustre_handle *conn, obd_id ino, int type, __u64 offset,
         if (desc == NULL)
                 GOTO(out, rc = -ENOMEM);
 
-        req = ptlrpc_prep_req2(conn, MDS_READPAGE, 1, &size, NULL);
+        req = ptlrpc_prep_req(class_conn2cliimp(conn), MDS_READPAGE, 1, &size,
+                              NULL);
         if (!req)
                 GOTO(out2, rc = -ENOMEM);
 
@@ -496,7 +484,8 @@ int mdc_statfs(struct lustre_handle *conn, struct statfs *sfs,
         int rc, size = sizeof(*osfs);
         ENTRY;
 
-        req = ptlrpc_prep_req2(conn, MDS_STATFS, 0, NULL, NULL);
+        req = ptlrpc_prep_req(class_conn2cliimp(conn), MDS_STATFS, 0, NULL,
+                              NULL);
         if (!req)
                 GOTO(out, rc = -ENOMEM);
         req->rq_replen = lustre_msg_size(1, &size);
index f266985..76f5101 100644 (file)
@@ -276,13 +276,12 @@ static int mds_connect(struct lustre_handle *conn, struct obd_device *obd,
 
         spin_lock(&obd->obd_dev_lock);
         list_for_each(p, &obd->obd_exports) {
-                exp = list_entry(p, struct obd_export, exp_chain);
+                exp = list_entry(p, struct obd_export, exp_obd_chain);
                 mcd = exp->exp_mds_data.med_mcd;
                 if (!memcmp(cluuid, mcd->mcd_uuid, sizeof(mcd->mcd_uuid))) {
                         LASSERT(exp->exp_obd == obd);
 
-                        exp->exp_rconnh.addr = conn->addr;
-                        exp->exp_rconnh.cookie = conn->cookie;
+                        LASSERT(list_empty(&exp->exp_conn_chain));
                         conn->addr = (__u64) (unsigned long)exp;
                         conn->cookie = exp->exp_cookie;
                         spin_unlock(&obd->obd_dev_lock);
@@ -1119,6 +1118,9 @@ static int mds_setup(struct obd_device *obddev, obd_count len, void *buf)
         
         mds_destroy_export = mds_client_free;
 
+        ptlrpc_init_client(LDLM_REQUEST_PORTAL, LDLM_REPLY_PORTAL,
+                           "mds_ldlm_client", &obddev->obd_ldlm_client);
+
         RETURN(0);
 
 err_thread:
index 757f69b..42b838e 100644 (file)
 
 #include <linux/fs.h>
 #include <linux/jbd.h>
+#include <linux/slab.h>
 #include <linux/extN_fs.h>
 #include <linux/extN_jbd.h>
 #include <linux/extN_xattr.h>
+#include <linux/kp30.h>
 #include <linux/lustre_mds.h>
+#include <linux/obd.h>
 #include <linux/module.h>
 #include <linux/obd_lov.h>
 
index b38ab6f..04bb9af 100644 (file)
@@ -626,8 +626,9 @@ EXPORT_SYMBOL(class_new_export);
 EXPORT_SYMBOL(class_destroy_export);
 EXPORT_SYMBOL(class_connect);
 EXPORT_SYMBOL(class_conn2export);
-EXPORT_SYMBOL(class_rconn2export);
 EXPORT_SYMBOL(class_conn2obd);
+EXPORT_SYMBOL(class_conn2cliimp);
+EXPORT_SYMBOL(class_conn2ldlmimp);
 EXPORT_SYMBOL(class_disconnect);
 EXPORT_SYMBOL(class_disconnect_all);
 //EXPORT_SYMBOL(class_uuid_parse);
index be6ff06..6d366b9 100644 (file)
@@ -268,6 +268,16 @@ struct obd_device *class_conn2obd(struct lustre_handle *conn)
         return NULL;
 }
 
+struct obd_import *class_conn2cliimp(struct lustre_handle *conn)
+{
+        return &class_conn2obd(conn)->u.cli.cl_import;
+}
+
+struct obd_import *class_conn2ldlmimp(struct lustre_handle *conn)
+{
+        return &class_conn2export(conn)->exp_ldlm_data.led_import;
+}
+
 struct obd_export *class_new_export(struct obd_device *obddev)
 {
         struct obd_export * export;
@@ -284,8 +294,9 @@ struct obd_export *class_new_export(struct obd_device *obddev)
         /* XXX should these be in MDS and LDLM init functions? */
         INIT_LIST_HEAD(&export->exp_mds_data.med_open_head);
         INIT_LIST_HEAD(&export->exp_ldlm_data.led_held_locks);
+        INIT_LIST_HEAD(&export->exp_conn_chain);
         spin_lock(&obddev->obd_dev_lock);
-        list_add(&export->exp_chain, &export->exp_obd->obd_exports);
+        list_add(&export->exp_obd_chain, &export->exp_obd->obd_exports);
         spin_unlock(&obddev->obd_dev_lock);
         return export;
 }
@@ -296,9 +307,13 @@ void class_destroy_export(struct obd_export *exp)
         ENTRY;
 
         spin_lock(&exp->exp_obd->obd_dev_lock);
-        list_del(&exp->exp_chain);
+        list_del(&exp->exp_obd_chain);
         spin_unlock(&exp->exp_obd->obd_dev_lock);
 
+        spin_lock(&exp->exp_connection->c_lock);
+        list_del(&exp->exp_conn_chain);
+        spin_unlock(&exp->exp_connection->c_lock);
+        
         /* XXXshaver these bits want to be hung off the export, instead of
          * XXXshaver hard-coded here.
          */
@@ -337,29 +352,15 @@ int class_connect (struct lustre_handle *conn, struct obd_device *obd,
         if (!export)
                 return -ENOMEM;
 
-        export->exp_rconnh.addr = conn->addr;
-        export->exp_rconnh.cookie = conn->cookie;
 
         conn->addr = (__u64) (unsigned long)export;
         conn->cookie = export->exp_cookie;
+        
         CDEBUG(D_IOCTL, "connect: addr %Lx cookie %Lx\n",
                (long long)conn->addr, (long long)conn->cookie);
         return 0;
 }
 
-int class_rconn2export(struct lustre_handle *conn, struct lustre_handle *rconn)
-{
-        struct obd_export *export = class_conn2export(conn);
-
-        if (!export)
-                return -EINVAL;
-
-        export->exp_rconnh.addr = rconn->addr;
-        export->exp_rconnh.cookie = rconn->cookie;
-
-        return 0;
-}
-
 int class_disconnect(struct lustre_handle *conn)
 {
         struct obd_export *export;
@@ -391,7 +392,8 @@ void class_disconnect_all(struct obd_device *obddev)
                         struct lustre_handle conn;
                         int rc;
 
-                        export = list_entry(tmp, struct obd_export, exp_chain);
+                        export = list_entry(tmp, struct obd_export, 
+                                            exp_obd_chain);
                         conn.addr = (__u64)(unsigned long)export;
                         conn.cookie = export->exp_cookie;
                         spin_unlock(&obddev->obd_dev_lock);
index 6224cfc..5591815 100644 (file)
@@ -94,7 +94,8 @@ static int read_lustre_status(char *page, char **start, off_t offset,
                 while ((lh = lh->next) != &obddev->obd_exports) {
                         p += sprintf(&page[p],
                                   ((export == NULL) ? ", connections(" : ",") );
-                        export = list_entry(lh, struct obd_export, exp_chain);
+                        export = list_entry(lh, struct obd_export, 
+                                            exp_obd_chain);
                         p += sprintf(&page[p], "%p", export);
                 }
                 if (export != 0) { /* there was at least one export */
index cfbccec..2c60f97 100644 (file)
@@ -38,7 +38,8 @@ static int osc_getattr(struct lustre_handle *conn, struct obdo *oa,
         int rc, size = sizeof(*body);
         ENTRY;
 
-        request = ptlrpc_prep_req2(conn, OST_GETATTR, 1, &size, NULL);
+        request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_GETATTR, 1, &size,
+                                  NULL);
         if (!request)
                 RETURN(-ENOMEM);
 
@@ -74,7 +75,8 @@ static int osc_open(struct lustre_handle *conn, struct obdo *oa,
         int rc, size = sizeof(*body);
         ENTRY;
 
-        request = ptlrpc_prep_req2(conn, OST_OPEN, 1, &size, NULL);
+        request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_OPEN, 1, &size,
+                                  NULL);
         if (!request)
                 RETURN(-ENOMEM);
 
@@ -108,7 +110,8 @@ static int osc_close(struct lustre_handle *conn, struct obdo *oa,
         int rc, size = sizeof(*body);
         ENTRY;
 
-        request = ptlrpc_prep_req2(conn, OST_CLOSE, 1, &size, NULL);
+        request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_CLOSE, 1, &size,
+                                  NULL);
         if (!request)
                 RETURN(-ENOMEM);
 
@@ -142,7 +145,8 @@ static int osc_setattr(struct lustre_handle *conn, struct obdo *oa,
         int rc, size = sizeof(*body);
         ENTRY;
 
-        request = ptlrpc_prep_req2(conn, OST_SETATTR, 1, &size, NULL);
+        request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_SETATTR, 1, &size,
+                                  NULL);
         if (!request)
                 RETURN(-ENOMEM);
 
@@ -184,7 +188,8 @@ static int osc_create(struct lustre_handle *conn, struct obdo *oa,
                 (*ea)->lmd_easize = oa->o_easize;
         }
 
-        request = ptlrpc_prep_req2(conn, OST_CREATE, 1, &size, NULL);
+        request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_CREATE, 1, &size,
+                                  NULL);
         if (!request)
                 RETURN(-ENOMEM);
 
@@ -223,7 +228,8 @@ static int osc_punch(struct lustre_handle *conn, struct obdo *oa,
                 RETURN(-EINVAL);
         }
 
-        request = ptlrpc_prep_req2(conn, OST_PUNCH, 1, &size, NULL);
+        request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_PUNCH, 1, &size,
+                                  NULL);
         if (!request)
                 RETURN(-ENOMEM);
 
@@ -265,7 +271,8 @@ static int osc_destroy(struct lustre_handle *conn, struct obdo *oa,
                 CERROR("oa NULL\n");
                 RETURN(-EINVAL);
         }
-        request = ptlrpc_prep_req2(conn, OST_DESTROY, 1, &size, NULL);
+        request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_DESTROY, 1, &size,
+                                  NULL);
         if (!request)
                 RETURN(-ENOMEM);
 
@@ -345,7 +352,8 @@ static int osc_brw_read(struct lustre_handle *conn, struct lov_stripe_md *md,
                         obd_count page_count, struct brw_page *pga,
                         brw_callback_t callback, struct io_cb_data *data)
 {
-        struct ptlrpc_connection *connection = client_conn2cli(conn)->cl_conn;
+        struct ptlrpc_connection *connection =
+                client_conn2cli(conn)->cl_import.imp_connection;
         struct ptlrpc_request *request = NULL;
         struct ptlrpc_bulk_desc *desc = NULL;
         struct ost_body *body;
@@ -359,7 +367,8 @@ static int osc_brw_read(struct lustre_handle *conn, struct lov_stripe_md *md,
         size[1] = sizeof(struct obd_ioobj);
         size[2] = page_count * sizeof(struct niobuf_remote);
 
-        request = ptlrpc_prep_req2(conn, OST_READ, 3, size, NULL);
+        request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_READ, 3, size,
+                                  NULL);
         if (!request)
                 RETURN(-ENOMEM);
 
@@ -459,7 +468,8 @@ static int osc_brw_write(struct lustre_handle *conn, struct lov_stripe_md *md,
                          obd_count page_count, struct brw_page *pga,
                          brw_callback_t callback, struct io_cb_data *data)
 {
-        struct ptlrpc_connection *connection = client_conn2cli(conn)->cl_conn;
+        struct ptlrpc_connection *connection =
+                client_conn2cli(conn)->cl_import.imp_connection;
         struct ptlrpc_request *request = NULL;
         struct ptlrpc_bulk_desc *desc = NULL;
         struct ost_body *body;
@@ -474,7 +484,8 @@ static int osc_brw_write(struct lustre_handle *conn, struct lov_stripe_md *md,
         size[1] = sizeof(struct obd_ioobj);
         size[2] = page_count * sizeof(*remote);
 
-        request = ptlrpc_prep_req2(conn, OST_WRITE, 3, size, NULL);
+        request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_WRITE, 3, size,
+                                  NULL);
         if (!request)
                 RETURN(-ENOMEM);
 
@@ -669,7 +680,8 @@ static int osc_statfs(struct lustre_handle *conn, struct statfs *sfs)
         int rc, size = sizeof(*osfs);
         ENTRY;
 
-        request = ptlrpc_prep_req2(conn, OST_STATFS, 0, NULL, NULL);
+        request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_STATFS, 0, NULL,
+                                  NULL);
         if (!request)
                 RETURN(-ENOMEM);
 
index 78f8fcf..22a98a9 100644 (file)
@@ -620,6 +620,9 @@ static int ost_setup(struct obd_device *obddev, obd_count len, void *buf)
                 }
         }
 
+        ptlrpc_init_client(LDLM_REQUEST_PORTAL, LDLM_REPLY_PORTAL,
+                           "ost_ldlm_client", &obddev->obd_ldlm_client);
+
         RETURN(0);
 
 error_disc:
index bcdcc4a..aadfa8b 100644 (file)
 #include <linux/obd_class.h>
 #include <linux/lustre_lib.h>
 #include <linux/lustre_ha.h>
+#include <linux/lustre_import.h>
 
-void ptlrpc_init_client(int req_portal, int rep_portal, struct ptlrpc_client *cl,
-                        struct ptlrpc_connection *conn)
+void ptlrpc_init_client(int req_portal, int rep_portal, char *name, 
+                        struct ptlrpc_client *cl)
 {
-        memset(cl, 0, sizeof(*cl));
-        /* Some things, like the LDLM, can call us without a connection.
-         * I don't like it one bit.
-         */
-        if (conn) {
-                cl->cli_connection = conn;
-                list_add(&cl->cli_client_chain, &conn->c_clients);
-        }
-        cl->cli_obd = NULL;
         cl->cli_request_portal = req_portal;
-        cl->cli_reply_portal = rep_portal;
-        sema_init(&cl->cli_rpc_sem, 32);
+        cl->cli_reply_portal   = rep_portal;
+        cl->cli_name           = name;
 }
 
 __u8 *ptlrpc_req_to_uuid(struct ptlrpc_request *req)
@@ -61,12 +53,14 @@ struct ptlrpc_connection *ptlrpc_uuid_to_connection(char *uuid)
                 return NULL;
         }
 
-        c = ptlrpc_get_connection(&peer);
+        c = ptlrpc_get_connection(&peer, uuid);
         if (c) { 
                 memcpy(c->c_remote_uuid, uuid, sizeof(c->c_remote_uuid));
                 c->c_epoch++;
         }
 
+        CDEBUG(D_INFO, "%s -> %p\n", uuid, c);
+
         return c;
 }
 
@@ -150,12 +144,11 @@ void ptlrpc_free_bulk_page(struct ptlrpc_bulk_page *bulk)
         EXIT;
 }
 
-struct ptlrpc_request *ptlrpc_prep_req(struct ptlrpc_client *cl,
-                                       int opcode, int count, int *lengths,
-                                       char **bufs)
+struct ptlrpc_request *ptlrpc_prep_req(struct obd_import *imp, int opcode,
+                                       int count, int *lengths, char **bufs)
 {
+        struct ptlrpc_connection *conn = imp->imp_connection;
         struct ptlrpc_request *request;
-        struct ptlrpc_connection *conn = cl->cli_connection;
         int rc;
         ENTRY;
 
@@ -175,7 +168,7 @@ struct ptlrpc_request *ptlrpc_prep_req(struct ptlrpc_client *cl,
 
         request->rq_level = LUSTRE_CONN_FULL;
         request->rq_type = PTL_RPC_TYPE_REQUEST;
-        request->rq_client = cl;
+        request->rq_import = imp;
         request->rq_connection = ptlrpc_connection_addref(conn);
 
         INIT_LIST_HEAD(&request->rq_list);
@@ -192,28 +185,9 @@ struct ptlrpc_request *ptlrpc_prep_req(struct ptlrpc_client *cl,
         request->rq_reqmsg->opc = HTON__u32(opcode);
         request->rq_reqmsg->type = HTON__u32(PTL_RPC_MSG_REQUEST);
 
+        ptlrpc_hdl2req(request, &imp->imp_handle);
         RETURN(request);
 }
-struct ptlrpc_request *ptlrpc_prep_req2(struct lustre_handle *conn, 
-                                       int opcode, int count, int *lengths,
-                                       char **bufs)
-{
-        struct client_obd *clobd; 
-        struct ptlrpc_request *req;
-        struct obd_export *export;
-
-        export = class_conn2export(conn);
-        if (!export) { 
-                LBUG();
-                CERROR("NOT connected\n"); 
-                return NULL;
-        }
-
-        clobd = &export->exp_obd->u.cli;
-        req = ptlrpc_prep_req(clobd->cl_client, opcode, count, lengths, bufs);
-        ptlrpc_hdl2req(req, &clobd->cl_exporth);
-        return req;
-}
 
 void ptlrpc_req_finished(struct ptlrpc_request *request)
 {
@@ -356,7 +330,7 @@ restart:
                        (long long)req->rq_xid, (long long)req->rq_transno,
                        (long long)conn->c_last_committed);
                 if (atomic_dec_and_test(&req->rq_refcount)) {
-                        req->rq_client = NULL;
+                        req->rq_import = NULL;
 
                         /* We do this to prevent free_req deadlock.  Restarting
                          * after each removal is not so bad, as we are almost
@@ -376,27 +350,24 @@ restart:
         return;
 }
 
-void ptlrpc_cleanup_client(struct ptlrpc_client *cli)
+void ptlrpc_cleanup_client(struct obd_import *imp)
 {
         struct list_head *tmp, *saved;
         struct ptlrpc_request *req;
-        struct ptlrpc_connection *conn = cli->cli_connection;
+        struct ptlrpc_connection *conn = imp->imp_connection;
         ENTRY;
 
-        if (!conn) {
-                EXIT;
-                return;
-        }
+        LASSERT(conn);
 
 restart1:
         spin_lock(&conn->c_lock);
         list_for_each_safe(tmp, saved, &conn->c_sending_head) {
                 req = list_entry(tmp, struct ptlrpc_request, rq_list);
-                if (req->rq_client != cli)
+                if (req->rq_import != imp)
                         continue;
                 CDEBUG(D_INFO, "Cleaning req %p from sending list.\n", req);
                 list_del_init(&req->rq_list);
-                req->rq_client = NULL;
+                req->rq_import = NULL;
                 spin_unlock(&conn->c_lock);
                 ptlrpc_free_req(req);
                 goto restart1;
@@ -404,11 +375,11 @@ restart1:
 restart2:
         list_for_each_safe(tmp, saved, &conn->c_dying_head) {
                 req = list_entry(tmp, struct ptlrpc_request, rq_list);
-                if (req->rq_client != cli)
+                if (req->rq_import != imp)
                         continue;
                 CERROR("Request %p is on the dying list at cleanup!\n", req);
                 list_del_init(&req->rq_list);
-                req->rq_client = NULL;
+                req->rq_import = NULL;
                 spin_unlock(&conn->c_lock);
                 ptlrpc_free_req(req); 
                 spin_lock(&conn->c_lock);
@@ -465,9 +436,7 @@ static int expired_request(void *data)
         req->rq_connection->c_level = LUSTRE_CONN_RECOVD;
         req->rq_flags |= PTL_RPC_FL_TIMEOUT;
         /* Activate the recovd for this client, if there is one. */
-        if (req->rq_client && req->rq_client->cli_connection &&
-            req->rq_client->cli_connection->c_recovd)
-                recovd_conn_fail(req->rq_client->cli_connection);
+        recovd_conn_fail(req->rq_import->imp_connection);
 
         /* If this request is for recovery or other primordial tasks,
          * don't go back to sleep.
@@ -489,8 +458,8 @@ int ptlrpc_queue_wait(struct ptlrpc_request *req)
 {
         int rc = 0;
         struct l_wait_info lwi;
-        struct ptlrpc_client *cli = req->rq_client;
-        struct ptlrpc_connection *conn = cli->cli_connection;
+        struct ptlrpc_client *cli = req->rq_import->imp_client;
+        struct ptlrpc_connection *conn = req->rq_import->imp_connection;
         ENTRY;
 
         init_waitqueue_head(&req->rq_wait_for_rep);
@@ -530,7 +499,7 @@ int ptlrpc_queue_wait(struct ptlrpc_request *req)
                 if ( rc > 0 ) 
                         rc = -rc;
                 ptlrpc_cleanup_request_buf(req);
-                up(&cli->cli_rpc_sem);
+                // up(&cli->cli_rpc_sem);
                 RETURN(-rc);
         }
 
@@ -552,7 +521,7 @@ int ptlrpc_queue_wait(struct ptlrpc_request *req)
                 goto resend;
         }
 
-        up(&cli->cli_rpc_sem);
+        // up(&cli->cli_rpc_sem);
         if (req->rq_flags & PTL_RPC_FL_INTR) {
                 if (!(req->rq_flags & PTL_RPC_FL_TIMEOUT))
                         LBUG(); /* should only be interrupted if we timed out. */
@@ -591,7 +560,7 @@ int ptlrpc_queue_wait(struct ptlrpc_request *req)
 int ptlrpc_replay_req(struct ptlrpc_request *req)
 {
         int rc = 0;
-        struct ptlrpc_client *cli = req->rq_client;
+        // struct ptlrpc_client *cli = req->rq_import->imp_client;
         struct l_wait_info lwi;
         ENTRY;
 
@@ -606,7 +575,7 @@ int ptlrpc_replay_req(struct ptlrpc_request *req)
         if (rc) {
                 CERROR("error %d, opcode %d\n", rc, req->rq_reqmsg->opc);
                 ptlrpc_cleanup_request_buf(req);
-                up(&cli->cli_rpc_sem);
+                // up(&cli->cli_rpc_sem);
                 RETURN(-rc);
         }
 
@@ -615,7 +584,7 @@ int ptlrpc_replay_req(struct ptlrpc_request *req)
         l_wait_event(req->rq_wait_for_rep, ptlrpc_check_reply(req), &lwi);
         CDEBUG(D_OTHER, "-- done\n");
 
-        up(&cli->cli_rpc_sem);
+        // up(&cli->cli_rpc_sem);
 
         if (!(req->rq_flags & PTL_RPC_FL_REPLIED)) {
                 CERROR("Unknown reason for wakeup\n");
index 8acb504..31640d8 100644 (file)
@@ -29,16 +29,21 @@ static spinlock_t conn_lock;
 static struct list_head conn_list;
 static struct list_head conn_unused_list;
 
-struct ptlrpc_connection *ptlrpc_get_connection(struct lustre_peer *peer)
+struct ptlrpc_connection *ptlrpc_get_connection(struct lustre_peer *peer,
+                                                char *uuid)
 {
         struct list_head *tmp, *pos;
         struct ptlrpc_connection *c;
         ENTRY;
 
+        CDEBUG(D_INFO, "peer is %08x %08lx %08lx\n",
+               peer->peer_nid, peer->peer_ni.nal_idx, peer->peer_ni.handle_idx);
+
         spin_lock(&conn_lock);
         list_for_each(tmp, &conn_list) {
                 c = list_entry(tmp, struct ptlrpc_connection, c_link);
-                if (memcmp(peer, &c->c_peer, sizeof(*peer)) == 0) {
+                if (memcmp(peer, &c->c_peer, sizeof(*peer)) == 0 &&
+                    (!uuid || strcmp(c->c_remote_uuid, uuid) == 0)) {
                         ptlrpc_connection_addref(c);
                         GOTO(out, c);
                 }
@@ -46,7 +51,8 @@ struct ptlrpc_connection *ptlrpc_get_connection(struct lustre_peer *peer)
 
         list_for_each_safe(tmp, pos, &conn_unused_list) {
                 c = list_entry(tmp, struct ptlrpc_connection, c_link);
-                if (memcmp(peer, &c->c_peer, sizeof(*peer)) == 0) {
+                if (memcmp(peer, &c->c_peer, sizeof(*peer)) == 0 &&
+                    (!uuid || strcmp(c->c_remote_uuid, uuid) == 0)) {
                         ptlrpc_connection_addref(c);
                         list_del(&c->c_link);
                         list_add(&c->c_link, &conn_list);
@@ -66,6 +72,7 @@ struct ptlrpc_connection *ptlrpc_get_connection(struct lustre_peer *peer)
         c->c_generation = 1;
         c->c_epoch = 1;
         c->c_bootcount = 0;
+        strcpy(c->c_remote_uuid, uuid);
         INIT_LIST_HEAD(&c->c_delayed_head);
         INIT_LIST_HEAD(&c->c_sending_head);
         INIT_LIST_HEAD(&c->c_dying_head);
index 3933160..ee90f5d 100644 (file)
@@ -330,10 +330,10 @@ int ptl_send_rpc(struct ptlrpc_request *request)
                 RETURN(ENOMEM);
         }
 
-        down(&request->rq_client->cli_rpc_sem);
+        // down(&request->rq_client->cli_rpc_sem);
 
         rc = PtlMEAttach(request->rq_connection->c_peer.peer_ni,
-                         request->rq_client->cli_reply_portal,
+                         request->rq_import->imp_client->cli_reply_portal,
                          local_id, request->rq_xid, 0, PTL_UNLINK,
                          PTL_INS_AFTER, &request->rq_reply_me_h);
         if (rc != PTL_OK) {
@@ -360,17 +360,17 @@ int ptl_send_rpc(struct ptlrpc_request *request)
 
         CDEBUG(D_NET, "Setup reply buffer: %u bytes, xid %Lu, portal %u\n",
                request->rq_replen, request->rq_xid,
-               request->rq_client->cli_reply_portal);
+               request->rq_import->imp_client->cli_reply_portal);
 
         rc = ptl_send_buf(request, request->rq_connection,
-                          request->rq_client->cli_request_portal);
+                          request->rq_import->imp_client->cli_request_portal);
         RETURN(rc);
 
  cleanup2:
         PtlMEUnlink(request->rq_reply_me_h);
  cleanup:
         OBD_FREE(repbuf, request->rq_replen);
-        up(&request->rq_client->cli_rpc_sem);
+        // up(&request->rq_client->cli_rpc_sem);
 
         return rc;
 }
index 62e70f1..476c285 100644 (file)
 
 #define DEBUG_SUBSYSTEM S_RPC
 
-#include <linux/kmod.h>
 #include <linux/lustre_lite.h>
 #include <linux/lustre_ha.h>
 #include <linux/obd_support.h>
 
-void recovd_conn_manage(struct recovd_obd *recovd,
-                        struct ptlrpc_connection *conn)
+void recovd_conn_manage(struct ptlrpc_connection *conn,
+                        struct recovd_obd *recovd, ptlrpc_recovery_cb_t recover)
 {
+        struct recovd_data *rd = &conn->c_recovd_data;
         ENTRY;
-        conn->c_recovd = recovd;
+
+        rd->rd_recovd = recovd;
+        rd->rd_recover = recover;
+
         spin_lock(&recovd->recovd_lock);
-        list_add(&conn->c_recovd_data.rd_managed_chain,
-                 &recovd->recovd_managed_items);
+        list_add(&rd->rd_managed_chain, &recovd->recovd_managed_items);
         spin_unlock(&recovd->recovd_lock);
+
         EXIT;
 }
 
 void recovd_conn_fail(struct ptlrpc_connection *conn)
 {
+        struct recovd_data *rd = &conn->c_recovd_data;
+        struct recovd_obd *recovd = rd->rd_recovd;
         ENTRY;
-        spin_lock(&conn->c_recovd->recovd_lock);
-        conn->c_recovd->recovd_flags |= RECOVD_FAIL;
-        conn->c_recovd->recovd_wakeup_flag = 1;
-        list_del(&conn->c_recovd_data.rd_managed_chain);
-        list_add(&conn->c_recovd_data.rd_managed_chain, 
-                 &conn->c_recovd->recovd_troubled_items);
-        spin_unlock(&conn->c_recovd->recovd_lock);
-        wake_up(&conn->c_recovd->recovd_waitq);
+
+        spin_lock(&recovd->recovd_lock);
+        list_del(&rd->rd_managed_chain);
+        list_add_tail(&rd->rd_managed_chain, &recovd->recovd_troubled_items);
+        spin_unlock(&recovd->recovd_lock);
+
+        wake_up(&recovd->recovd_waitq);
+
         EXIT;
 }
 
 /* this function must be called with conn->c_lock held */
 void recovd_conn_fixed(struct ptlrpc_connection *conn)
 {
+        struct recovd_data *rd = &conn->c_recovd_data;
         ENTRY;
-        list_del(&conn->c_recovd_data.rd_managed_chain);
-        list_add(&conn->c_recovd_data.rd_managed_chain,
-                 &conn->c_recovd->recovd_managed_items);
-        EXIT;
-}
-
-
-static int recovd_upcall(void)
-{
-        char *argv[2];
-        char *envp[3];
-
-        argv[0] = obd_recovery_upcall;
-        argv[1] = NULL;
 
-        envp [0] = "HOME=/";
-        envp [1] = "PATH=/sbin:/bin:/usr/sbin:/usr/bin";
-        envp [2] = NULL;
+        list_del(&rd->rd_managed_chain);
+        list_add(&rd->rd_managed_chain, &rd->rd_recovd->recovd_managed_items);
 
-        return call_usermodehelper(argv[0], argv, envp);
+        EXIT;
 }
 
+
 static int recovd_check_event(struct recovd_obd *recovd)
 {
         int rc = 0;
@@ -80,72 +72,110 @@ static int recovd_check_event(struct recovd_obd *recovd)
 
         spin_lock(&recovd->recovd_lock);
 
-        recovd->recovd_waketime = CURRENT_TIME;
-        if (recovd->recovd_timeout) 
-                schedule_timeout(recovd->recovd_timeout);
-
-        if (recovd->recovd_wakeup_flag) {
-                CERROR("service woken\n"); 
+        if (recovd->recovd_phase == RECOVD_IDLE &&
+            !list_empty(&recovd->recovd_troubled_items)) {
                 GOTO(out, rc = 1);
         }
 
-        if (recovd->recovd_timeout && 
-            CURRENT_TIME > recovd->recovd_waketime + recovd->recovd_timeout) {
-                recovd->recovd_flags |= RECOVD_TIMEOUT;
-                CERROR("timeout\n");
+        if (recovd->recovd_flags & RECOVD_STOPPING)
                 GOTO(out, rc = 1);
-        }
 
-        if (recovd->recovd_flags & RECOVD_STOPPING) {
-                CERROR("recovd stopping\n");
-                rc = 1;
+        if (recovd->recovd_flags & RECOVD_FAILED) {
+                LASSERT(recovd->recovd_phase != RECOVD_IDLE && 
+                        recovd->recovd_current_rd);
+                GOTO(out, rc = 1);
         }
 
+        if (recovd->recovd_phase == recovd->recovd_next_phase)
+                GOTO(out, rc = 1);
+
  out:
-        recovd->recovd_wakeup_flag = 0;
         spin_unlock(&recovd->recovd_lock);
         RETURN(rc);
 }
 
 static int recovd_handle_event(struct recovd_obd *recovd)
 {
+        struct recovd_data *rd;
+        int rc;
         ENTRY;
 
-        if (!(recovd->recovd_flags & RECOVD_UPCALL_WAIT) &&
-            recovd->recovd_flags & RECOVD_FAIL) { 
+        if (recovd->recovd_flags & RECOVD_FAILED) {
 
-                CERROR("client in trouble: flags -> UPCALL_WAITING\n");
-                recovd->recovd_flags |= RECOVD_UPCALL_WAIT;
+                LASSERT(recovd->recovd_phase != RECOVD_IDLE && 
+                        recovd->recovd_current_rd);
 
-                recovd_upcall();
-                recovd->recovd_waketime = CURRENT_TIME;
-                recovd->recovd_timeout = 10 * HZ;
-                schedule_timeout(recovd->recovd_timeout);
-        }
+                rd = recovd->recovd_current_rd;
+        cb_failed:
+                CERROR("recovery FAILED for rd %p (conn %p), recovering\n",
+                       rd, class_rd2conn(rd));
 
-        if (recovd->recovd_flags & RECOVD_TIMEOUT) { 
-                CERROR("timeout - no news from upcall?\n");
-                recovd->recovd_flags &= ~RECOVD_TIMEOUT;
+                list_add(&rd->rd_managed_chain, &recovd->recovd_managed_items);
+                spin_unlock(&recovd->recovd_lock);
+                rd->rd_recover(rd, PTLRPC_RECOVD_PHASE_FAILURE);
+                spin_lock(&recovd->recovd_lock);
+                recovd->recovd_phase = RECOVD_IDLE;
+                recovd->recovd_next_phase = RECOVD_PREPARING;
+                
+                recovd->recovd_flags &= ~RECOVD_FAILED;
+
+                RETURN(1);
         }
 
-        if (recovd->recovd_flags & RECOVD_UPCALL_ANSWER) { 
-                CERROR("UPCALL_WAITING: upcall answer\n");
+        switch (recovd->recovd_phase) {
+            case RECOVD_IDLE:
+                if (recovd->recovd_current_rd ||
+                    list_empty(&recovd->recovd_troubled_items))
+                        break;
+                rd = list_entry(recovd->recovd_troubled_items.next,
+                                struct recovd_data, rd_managed_chain);
+                
+                list_del(&rd->rd_managed_chain);
+                if (!rd->rd_recover)
+                        LBUG();
+
+                CERROR("starting recovery for rd %p (conn %p)\n",
+                       rd, class_rd2conn(rd));
+                recovd->recovd_current_rd = rd;
+                recovd->recovd_flags &= ~RECOVD_FAILED;
+                recovd->recovd_phase = RECOVD_PREPARING;
+
+                spin_unlock(&recovd->recovd_lock);
+                rc = rd->rd_recover(rd, PTLRPC_RECOVD_PHASE_PREPARE);
+                spin_lock(&recovd->recovd_lock);
+                if (rc)
+                        goto cb_failed;
+                
+                recovd->recovd_next_phase = RECOVD_PREPARED;
+                break;
 
-                while (!list_empty(&recovd->recovd_troubled_items)) {
-                        struct recovd_data *rd =
-                                list_entry(recovd->recovd_troubled_items.next,
-                                           struct recovd_data, rd_managed_chain);
+            case RECOVD_PREPARED:
+                rd = recovd->recovd_current_rd;
+                recovd->recovd_phase = RECOVD_RECOVERING;
 
-                        list_del(&rd->rd_managed_chain);
-                        if (rd->rd_recover) {
-                                spin_unlock(&recovd->recovd_lock);
-                                rd->rd_recover(rd);
-                                spin_lock(&recovd->recovd_lock);
-                        }
-                }
+                CERROR("recovery prepared for rd %p (conn %p), recovering\n",
+                       rd, class_rd2conn(rd));
 
-                recovd->recovd_timeout = 0;
-                recovd->recovd_flags = RECOVD_IDLE; 
+                spin_unlock(&recovd->recovd_lock);
+                rc = rd->rd_recover(rd, PTLRPC_RECOVD_PHASE_RECOVER);
+                spin_lock(&recovd->recovd_lock);
+                if (rc)
+                        goto cb_failed;
+                
+                recovd->recovd_next_phase = RECOVD_RECOVERED;
+                break;
+
+            case RECOVD_RECOVERED:
+                rd = recovd->recovd_current_rd;
+                recovd->recovd_phase = RECOVD_IDLE;
+                recovd->recovd_next_phase = RECOVD_PREPARING;
+
+                CERROR("recovery complete for rd %p (conn %p), recovering\n",
+                       rd, class_rd2conn(rd));
+                break;
+
+            default:
+                break;
         }
 
         RETURN(0);
@@ -177,6 +207,7 @@ static int recovd_main(void *arg)
                 wait_event(recovd->recovd_waitq, recovd_check_event(recovd));
 
                 spin_lock(&recovd->recovd_lock);
+
                 if (recovd->recovd_flags & RECOVD_STOPPING) {
                         spin_unlock(&recovd->recovd_lock);
                         CERROR("lustre_recovd stopping\n");
@@ -211,16 +242,20 @@ int recovd_setup(struct recovd_obd *recovd)
         init_waitqueue_head(&recovd->recovd_recovery_waitq);
         init_waitqueue_head(&recovd->recovd_ctl_waitq);
 
+        recovd->recovd_next_phase = RECOVD_PREPARING;
+        
         rc = kernel_thread(recovd_main, (void *)recovd,
                            CLONE_VM | CLONE_FS | CLONE_FILES);
         if (rc < 0) {
                 CERROR("cannot start thread\n");
                 RETURN(-EINVAL);
         }
-        wait_event(recovd->recovd_ctl_waitq, recovd->recovd_flags & RECOVD_IDLE);
+        wait_event(recovd->recovd_ctl_waitq,
+                   recovd->recovd_phase == RECOVD_IDLE);
 
         /* exported and called by obdclass timeout handlers */
         class_signal_connection_failure = recovd_conn_fail;
+        ptlrpc_recovd = recovd;
 
         RETURN(0);
 }
@@ -236,3 +271,5 @@ int recovd_cleanup(struct recovd_obd *recovd)
                    (recovd->recovd_flags & RECOVD_STOPPED));
         RETURN(0);
 }
+
+struct recovd_obd *ptlrpc_recovd;
index df72d83..dad8744 100644 (file)
@@ -64,6 +64,7 @@ int connmgr_cleanup(struct obd_device *dev)
         RETURN(0);
 }
 
+/* should this be in llite? */
 
 int connmgr_iocontrol(long cmd, struct lustre_handle *conn, int len, void *karg,
                       void *uarg)
@@ -74,8 +75,7 @@ int connmgr_iocontrol(long cmd, struct lustre_handle *conn, int len, void *karg,
         ENTRY;
         if (cmd == OBD_IOC_RECOVD_NEWCONN) { 
                 spin_lock(&recovd->recovd_lock);
-                recovd->recovd_flags |= RECOVD_UPCALL_ANSWER;
-                recovd->recovd_wakeup_flag = 1;
+                /* XXX shaver flag upcall answer */
                 wake_up(&recovd->recovd_waitq);
                 spin_unlock(&recovd->recovd_lock);
                 EXIT;
@@ -109,7 +109,8 @@ static void __exit ptlrpc_exit(void)
         ptlrpc_cleanup_connection();
 }
 
-/* connmgr.c */
+/* recovd.c */
+EXPORT_SYMBOL(ptlrpc_recovd);
 EXPORT_SYMBOL(recovd_conn_fail);
 EXPORT_SYMBOL(recovd_conn_manage);
 EXPORT_SYMBOL(recovd_conn_fixed);
@@ -145,7 +146,6 @@ EXPORT_SYMBOL(ptlrpc_replay_req);
 EXPORT_SYMBOL(ptlrpc_restart_req);
 EXPORT_SYMBOL(ptlrpc_prep_req);
 EXPORT_SYMBOL(ptlrpc_free_req);
-EXPORT_SYMBOL(ptlrpc_prep_req2);
 EXPORT_SYMBOL(ptlrpc_req_finished);
 EXPORT_SYMBOL(ptlrpc_prep_bulk);
 EXPORT_SYMBOL(ptlrpc_free_bulk);
index 36cd279..7fb4407 100644 (file)
@@ -175,18 +175,16 @@ static int handle_incoming_request(struct obd_device *obddev,
 
         CDEBUG(D_NET, "got req %Ld\n", request.rq_xid);
 
-        peer.peer_nid = event->initiator.nid;
+        request.rq_peer.peer_nid = event->initiator.nid;
         /* FIXME: this NI should be the incoming NI.
          * We don't know how to find that from here. */
-        peer.peer_ni = svc->srv_self.peer_ni;
+        request.rq_peer.peer_ni = svc->srv_self.peer_ni;
 
         request.rq_export = class_conn2export((struct lustre_handle *) request.rq_reqmsg);
 
         if (request.rq_export) {
                 request.rq_connection = request.rq_export->exp_connection;
                 ptlrpc_connection_addref(request.rq_connection);
-        } else {
-                request.rq_connection = ptlrpc_get_connection(&peer);
         }
 
         spin_unlock(&svc->srv_lock);