Whamcloud - gitweb
Landing b_recovery
authorrread <rread>
Sat, 7 Feb 2004 21:59:04 +0000 (21:59 +0000)
committerrread <rread>
Sat, 7 Feb 2004 21:59:04 +0000 (21:59 +0000)
The principal change here is ptlrpc_connect_import and recovery are
merged and are now asynchronous and are run on ptlrpcd.

This branch also includes fixes for

  2477 (this was the initial bug)

  2355 recover for initial connections

  1934 (b_1934 was merged directly to b_recovery)

  1901 block all file creations until orphan recovery completes

  2423 client remove rq_connection from request struct

  2640 conf-sanity test_5, and proper cleanup in umount when
       cleanup log is not availale.

  2670 recovery timer race

  2532 mdc_close recovey bug

And few others that werent' filed.

52 files changed:
lnet/libcfs/debug.c
lustre/include/linux/lustre_fsfilt.h
lustre/include/linux/lustre_import.h
lustre/include/linux/lustre_log.h
lustre/include/linux/lustre_net.h
lustre/include/linux/lvfs.h
lustre/include/linux/lvfs_linux.h
lustre/include/linux/obd.h
lustre/include/linux/obd_class.h
lustre/ldlm/ldlm_lib.c
lustre/ldlm/ldlm_lockd.c
lustre/ldlm/ldlm_request.c
lustre/llite/llite_lib.c
lustre/lvfs/lvfs_linux.c
lustre/mdc/mdc_request.c
lustre/mds/handler.c
lustre/mds/mds_open.c
lustre/mds/mds_unlink_open.c
lustre/obdclass/class_obd.c
lustre/obdclass/genops.c
lustre/obdclass/llog_cat.c
lustre/obdclass/lprocfs_status.c
lustre/obdclass/obd_config.c
lustre/obdecho/echo.c
lustre/obdfilter/filter.c
lustre/obdfilter/filter_log.c
lustre/osc/osc_create.c
lustre/osc/osc_request.c
lustre/ost/ost_handler.c
lustre/portals/libcfs/debug.c
lustre/ptlbd/client.c
lustre/ptlrpc/client.c
lustre/ptlrpc/import.c
lustre/ptlrpc/niobuf.c
lustre/ptlrpc/ptlrpc_internal.h
lustre/ptlrpc/ptlrpc_module.c
lustre/ptlrpc/recover.c
lustre/ptlrpc/service.c
lustre/scripts/merge1.sh [new file with mode: 0755]
lustre/scripts/merge2.sh [new file with mode: 0755]
lustre/tests/cfg/insanity-adev.sh [new file with mode: 0644]
lustre/tests/cfg/insanity-local.sh
lustre/tests/cfg/insanity-mdev.sh
lustre/tests/cfg/mdev.sh
lustre/tests/conf-sanity.sh
lustre/tests/insanity.sh
lustre/tests/recovery-small.sh
lustre/tests/replay-ost-single.sh
lustre/tests/replay-single-upcall.sh
lustre/tests/replay-single.sh
lustre/tests/test-framework.sh
lustre/tests/writeme.c

index b503cec..09db54e 100644 (file)
@@ -805,8 +805,6 @@ portals_debug_msg(int subsys, int mask, char *file, const char *fn,
                               subsys, mask, smp_processor_id(),
                               tv.tv_sec, tv.tv_usec, stack, current->pid);
         max_nob -= prefix_nob;
-        if(*(format + strlen(format) - 1) != '\n')
-                *(format + strlen(format)) = '\n';
 
 #if defined(__arch_um__) && (LINUX_VERSION_CODE < KERNEL_VERSION(2,4,20))
         msg_nob = snprintf(debug_buf + debug_off + prefix_nob, max_nob,
index 0d62e90..5f9ac77 100644 (file)
@@ -99,7 +99,7 @@ static inline void *fsfilt_start(struct obd_device *obd, struct inode *inode,
         unsigned long now = jiffies;
         void *parent_handle = oti ? oti->oti_handle : NULL;
         void *handle = obd->obd_fsops->fs_start(inode, op, parent_handle);
-        CDEBUG(D_HA, "started handle %p (%p)\n", handle, parent_handle);
+        CDEBUG(D_INFO, "started handle %p (%p)\n", handle, parent_handle);
 
         if (oti != NULL) {
                 if (parent_handle == NULL) {
@@ -147,7 +147,7 @@ static inline int fsfilt_commit(struct obd_device *obd, struct inode *inode,
 {
         unsigned long now = jiffies;
         int rc = obd->obd_fsops->fs_commit(inode, handle, force_sync);
-        CDEBUG(D_HA, "committing handle %p\n", handle);
+        CDEBUG(D_INFO, "committing handle %p\n", handle);
         if (time_after(jiffies, now + 15 * HZ))
                 CERROR("long journal start time %lus\n", (jiffies - now) / HZ);
         return rc;
index 59c2196..c940ac1 100644 (file)
 #include <linux/lustre_idl.h>
 
 enum lustre_imp_state {
-//        LUSTRE_IMP_INVALID    = 1,
+        LUSTRE_IMP_CLOSED     = 1,
         LUSTRE_IMP_NEW        = 2,
         LUSTRE_IMP_DISCON     = 3,
         LUSTRE_IMP_CONNECTING = 4,
         LUSTRE_IMP_REPLAY     = 5,
-        LUSTRE_IMP_RECOVER    = 6,
-        LUSTRE_IMP_FULL       = 7,
-        LUSTRE_IMP_EVICTED    = 8,
+        LUSTRE_IMP_REPLAY_LOCKS = 6,
+        LUSTRE_IMP_REPLAY_WAIT  = 7,
+        LUSTRE_IMP_RECOVER    = 8,
+        LUSTRE_IMP_FULL       = 9,
+        LUSTRE_IMP_EVICTED    = 10,
 };
 
+static inline char * ptlrpc_import_state_name(enum lustre_imp_state state)
+{
+        
+        static char* import_state_names[] = {
+                "<UNKNOWN>", "CLOSED",  "NEW", "DISCONN", 
+                "CONNECTING", "REPLAY", "REPLAY_LOCKS", "REPLAY_WAIT", 
+                "RECOVER", "FULL", "EVICTED",
+        };
+
+        LASSERT (state <= LUSTRE_IMP_EVICTED);
+        return import_state_names[state];
+}
+
 
 struct obd_import {
         struct portals_handle     imp_handle;
@@ -42,7 +57,9 @@ struct obd_import {
         struct list_head          imp_delayed_list;
 
         struct obd_device        *imp_obd;
-        struct semaphore          imp_recovery_sem;
+        wait_queue_head_t         imp_recovery_waitq;
+        __u64                     imp_last_replay_transno;
+        atomic_t                  imp_replay_inflight;
         enum lustre_imp_state     imp_state;
         int                       imp_generation;
         __u32                     imp_conn_cnt;
@@ -57,7 +74,8 @@ struct obd_import {
 
         /* flags */
         int                       imp_invalid:1, imp_replayable:1,
-                                  imp_dlm_fake:1, imp_server_timeout:1;
+                                  imp_dlm_fake:1, imp_server_timeout:1,
+                                  imp_initial_recov:1;
         __u32                     imp_connect_op;
 };
 
index 2b62378..1ea4740 100644 (file)
@@ -220,9 +220,9 @@ static inline int llog_gen_lt(struct llog_gen a, struct llog_gen b)
 static inline int llog_obd2ops(struct llog_ctxt *ctxt,
                                struct llog_operations **lop)
 {
-        if (ctxt == NULL)
+       if (ctxt == NULL)
                 return -ENOTCONN;
-
+        
         *lop = ctxt->loc_logops;
         if (*lop == NULL)
                 return -EOPNOTSUPP;
index c98a831..c44c33e 100644 (file)
@@ -221,6 +221,7 @@ struct ptlrpc_request {
         struct lustre_msg *rq_repmsg;
         __u64 rq_transno;
         __u64 rq_xid;
+        struct list_head rq_replay_list;
 
 #if SWAB_PARANOIA
         __u32 rq_req_swab_mask;
@@ -240,7 +241,6 @@ struct ptlrpc_request {
 
         struct ptlrpc_peer rq_peer; /* XXX see service.c can this be factored away? */
         struct obd_export *rq_export;
-        struct ptlrpc_connection *rq_connection;
         struct obd_import *rq_import;
         struct ptlrpc_service *rq_svc;
 
@@ -274,6 +274,8 @@ struct ptlrpc_request {
 /* Spare the preprocessor, spoil the bugs. */
 #define FLAG(field, str) (field ? str : "")
 
+#define PTLRPC_REQUEST_COMPLETE(req) ((req)->rq_phase > RQ_PHASE_RPC)
+
 #define DEBUG_REQ_FLAGS(req)                                                   \
         ((req->rq_phase == RQ_PHASE_NEW) ? "New" :                             \
          (req->rq_phase == RQ_PHASE_RPC) ? "RPC" :                             \
@@ -297,8 +299,8 @@ CDEBUG(level, "@@@ " fmt                                                       \
        req->rq_transno,                                                        \
        req->rq_reqmsg ? req->rq_reqmsg->opc : -1,                              \
        req->rq_import ? (char *)req->rq_import->imp_target_uuid.uuid : "<?>",  \
-       req->rq_connection ?                                                    \
-          (char *)req->rq_connection->c_remote_uuid.uuid : "<?>",              \
+       req->rq_import ?                                                        \
+          (char *)req->rq_import->imp_connection->c_remote_uuid.uuid : "<?>",  \
        (req->rq_import && req->rq_import->imp_client) ?                        \
            req->rq_import->imp_client->cli_request_portal : -1,                \
        req->rq_reqlen, req->rq_replen,                                         \
@@ -464,7 +466,6 @@ void ptlrpc_link_svc_me(struct ptlrpc_request_buffer_desc *rqbd);
 void ptlrpc_init_client(int req_portal, int rep_portal, char *name,
                         struct ptlrpc_client *);
 void ptlrpc_cleanup_client(struct obd_import *imp);
-struct obd_uuid *ptlrpc_req_to_uuid(struct ptlrpc_request *req);
 struct ptlrpc_connection *ptlrpc_uuid_to_connection(struct obd_uuid *uuid);
 
 int ptlrpc_queue_wait(struct ptlrpc_request *req);
@@ -523,7 +524,8 @@ struct ptlrpc_svc_data {
 };
 
 /* ptlrpc/import.c */
-int ptlrpc_connect_import(struct obd_import *imp);
+int ptlrpc_connect_import(struct obd_import *imp, char * new_uuid);
+int ptlrpc_init_import(struct obd_import *imp);
 int ptlrpc_disconnect_import(struct obd_import *imp);
 
 /* ptlrpc/pack_generic.c */
index 6f48bcc..bf27a40 100644 (file)
@@ -3,6 +3,8 @@
 
 #include <linux/kp30.h>
 
+#define LL_FID_NAMELEN (16 + 1 + 8 + 1)
+
 #if defined __KERNEL__
 #include <linux/lvfs_linux.h>
 #endif 
@@ -59,7 +61,7 @@ struct dentry *simple_mknod(struct dentry *dir, char *name, int mode);
 int lustre_fread(struct file *file, void *buf, int len, loff_t *off);
 int lustre_fwrite(struct file *file, const void *buf, int len, loff_t *off);
 int lustre_fsync(struct file *file);
-long l_readdir(struct file * file, void * dirent, unsigned int count);
+long l_readdir(struct file * file, struct list_head *dentry_list);
 
 static inline void l_dput(struct dentry *de)
 {
@@ -96,7 +98,6 @@ static inline void ll_sleep(int t)
 }
 #endif
 
-#define LL_FID_NAMELEN         (16 + 1 + 8 + 1)
 static inline int ll_fid2str(char *str, __u64 id, __u32 generation)
 {
         return sprintf(str, "%llx:%08x", (unsigned long long)id, generation);
index b38d6f0..71fc431 100644 (file)
@@ -1,3 +1,6 @@
+/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
+ * vim:expandtab:shiftwidth=8:tabstop=8:
+ */ 
 #ifndef __LVFS_LINUX_H__
 #define __LVFS_LINUX_H__
 
@@ -22,17 +25,14 @@ struct l_file *l_dentry_open(struct obd_run_ctxt *, struct l_dentry *,
                              int flags);
 
 struct l_linux_dirent {
-        ino_t           d_ino;
-        unsigned long   d_off;
-        unsigned short  d_reclen;
-        char            d_name[1]; 
+        struct list_head lld_list;
+        ino_t           lld_ino;
+        unsigned long   lld_off;
+        char            lld_name[LL_FID_NAMELEN];
 };
-
 struct l_readdir_callback {
-        struct l_linux_dirent *current_dir;
-        struct l_linux_dirent *previous;
-        int count;
-        int error;
+        struct l_linux_dirent *lrc_dirent;
+        struct list_head      *lrc_list;
 };
 
 #endif
index d946942..44b1809 100644 (file)
@@ -524,7 +524,6 @@ struct obd_ops {
         int (*o_attach)(struct obd_device *dev, obd_count len, void *data);
         int (*o_detach)(struct obd_device *dev);
         int (*o_setup) (struct obd_device *dev, obd_count len, void *data);
-        int (*o_postsetup) (struct obd_device *dev);
         int (*o_precleanup)(struct obd_device *dev, int flags);
         int (*o_cleanup)(struct obd_device *dev, int flags);
         int (*o_postrecov)(struct obd_device *dev);
index de61f92..660f588 100644 (file)
@@ -65,6 +65,8 @@ struct obd_device *class_uuid2obd(struct obd_uuid *uuid);
 struct obd_device * class_find_client_obd(struct obd_uuid *tgt_uuid, 
                                           char * typ_name,
                                           struct obd_uuid *grp_uuid);
+struct obd_device * class_devices_in_group(struct obd_uuid *grp_uuid, 
+                                           int *next);
 
 void osic_init(struct obd_sync_io_container **osic);
 void osic_add_one(struct obd_sync_io_container *osic,
@@ -100,6 +102,7 @@ struct lustre_profile {
 };
 
 struct lustre_profile *class_get_profile(char * prof);
+void class_del_profile(char *prof);
 
 #define class_export_get(exp)                                                  \
 ({                                                                             \
@@ -369,19 +372,6 @@ static inline int obd_setup(struct obd_device *obd, int datalen, void *data)
         RETURN(rc);
 }
 
-static inline int obd_postsetup(struct obd_device *obd)
-{
-        int rc;
-        ENTRY;
-
-        OBD_CHECK_DEV_ACTIVE(obd);
-        OBD_CHECK_OP(obd, postsetup, 0);
-        OBD_COUNTER_INCREMENT(obd, postsetup);
-
-        rc = OBP(obd, postsetup)(obd);
-        RETURN(rc);
-}
-
 static inline int obd_precleanup(struct obd_device *obd, int flags)
 {
         int rc;
index f4c0fc5..0629db2 100644 (file)
@@ -91,6 +91,7 @@ int client_obd_setup(struct obd_device *obddev, obd_count len, void *buf)
                 RETURN(-EINVAL);
         }
 
+
         sema_init(&cli->cl_sem, 1);
         cli->cl_conn_count = 0;
         memcpy(server_uuid.uuid, lcfg->lcfg_inlbuf2, MIN(lcfg->lcfg_inllen2,
@@ -136,6 +137,7 @@ int client_obd_setup(struct obd_device *obddev, obd_count len, void *buf)
         imp->imp_obd = obddev;
         imp->imp_connect_op = connect_op;
         imp->imp_generation = 0;
+        imp->imp_initial_recov = 1;
         INIT_LIST_HEAD(&imp->imp_pinger_chain);
         memcpy(imp->imp_target_uuid.uuid, lcfg->lcfg_inlbuf1,
               lcfg->lcfg_inllen1);
@@ -245,27 +247,17 @@ int client_connect_import(struct lustre_handle *dlm_handle,
                 GOTO(out_disco, rc = -ENOMEM);
 
         imp->imp_dlm_handle = *dlm_handle;
-        imp->imp_state = LUSTRE_IMP_DISCON;
+        rc = ptlrpc_init_import(imp);
+        if (rc != 0) 
+                GOTO(out_ldlm, rc);
 
-        rc = ptlrpc_connect_import(imp);
+        exp->exp_connection = ptlrpc_connection_addref(imp->imp_connection);
+        rc = ptlrpc_connect_import(imp, NULL);
         if (rc != 0) {
                 LASSERT (imp->imp_state == LUSTRE_IMP_DISCON);
                 GOTO(out_ldlm, rc);
         }
 
-        LASSERT (imp->imp_state == LUSTRE_IMP_FULL);
-
-        exp->exp_connection = ptlrpc_connection_addref(imp->imp_connection);
-
-        if (imp->imp_replayable) {
-                CDEBUG(D_HA, "connected to replayable target: %s\n",
-                       imp->imp_target_uuid.uuid);
-                ptlrpc_pinger_add_import(imp);
-        }
-
-        CDEBUG(D_HA, "local import: %p, remote handle: "LPX64"\n", imp,
-               imp->imp_remote_handle.cookie);
-
         EXIT;
 
         if (rc) {
@@ -327,8 +319,6 @@ int client_disconnect_export(struct obd_export *exp, int failover)
         else
                 rc = ptlrpc_disconnect_import(imp);
 
-        imp->imp_state = LUSTRE_IMP_NEW;
-
         EXIT;
  out_no_disconnect:
         err = class_disconnect(exp, 0);
@@ -509,13 +499,10 @@ int target_handle_connect(struct ptlrpc_request *req, svc_handler_t handler)
         export = req->rq_export = class_conn2export(&conn);
         LASSERT(export != NULL);
 
-        if (req->rq_connection != NULL)
-                ptlrpc_put_connection(req->rq_connection);
         if (export->exp_connection != NULL)
                 ptlrpc_put_connection(export->exp_connection);
         export->exp_connection = ptlrpc_get_connection(&req->rq_peer,
                                                        &remote_uuid);
-        req->rq_connection = ptlrpc_connection_addref(export->exp_connection);
 
         LASSERT(export->exp_conn_cnt < req->rq_reqmsg->conn_cnt);
         export->exp_conn_cnt = req->rq_reqmsg->conn_cnt;
@@ -532,7 +519,7 @@ int target_handle_connect(struct ptlrpc_request *req, svc_handler_t handler)
         if (export->exp_imp_reverse != NULL)
                 class_destroy_import(export->exp_imp_reverse);
         revimp = export->exp_imp_reverse = class_new_import();
-        revimp->imp_connection = ptlrpc_connection_addref(req->rq_connection);
+        revimp->imp_connection = ptlrpc_connection_addref(export->exp_connection);
         revimp->imp_client = &export->exp_obd->obd_ldlm_client;
         revimp->imp_remote_handle = conn;
         revimp->imp_obd = target;
@@ -547,6 +534,7 @@ out:
 
 int target_handle_disconnect(struct ptlrpc_request *req)
 {
+        struct obd_export *exp;
         int rc;
         ENTRY;
 
@@ -554,8 +542,9 @@ int target_handle_disconnect(struct ptlrpc_request *req)
         if (rc)
                 RETURN(rc);
 
-        req->rq_status = obd_disconnect(req->rq_export, 0);
-        req->rq_export = NULL;
+        /* keep the rq_export around so we can send the reply */
+        exp = class_export_get(req->rq_export);
+        req->rq_status = obd_disconnect(exp, 0);
         RETURN(0);
 }
 
@@ -575,11 +564,6 @@ void target_destroy_export(struct obd_export *exp)
  * Recovery functions
  */
 
-void target_cancel_recovery_timer(struct obd_device *obd)
-{
-        del_timer(&obd->obd_recovery_timer);
-}
-
 static void abort_delayed_replies(struct obd_device *obd)
 {
         struct ptlrpc_request *req;
@@ -590,6 +574,7 @@ static void abort_delayed_replies(struct obd_device *obd)
                 req->rq_status = -ENOTCONN;
                 req->rq_type = PTL_RPC_MSG_ERR;
                 ptlrpc_reply(req);
+                class_export_put(req->rq_export);
                 list_del(&req->rq_list);
                 OBD_FREE(req->rq_reqmsg, req->rq_reqlen);
                 OBD_FREE(req, sizeof *req);
@@ -666,18 +651,25 @@ static void target_recovery_expired(unsigned long castmeharder)
         spin_unlock_bh(&obd->obd_processing_task_lock);
 }
 
-static void reset_recovery_timer(struct obd_device *obd)
+
+/* obd_processing_task_lock should be held */
+void target_cancel_recovery_timer(struct obd_device *obd)
 {
-        int recovering;
-        spin_lock(&obd->obd_dev_lock);
-        recovering = obd->obd_recovering;
-        spin_unlock(&obd->obd_dev_lock);
+        CDEBUG(D_HA, "%s: cancel recovery timer\n", obd->obd_name);
+        del_timer(&obd->obd_recovery_timer);
+}
 
-        if (!recovering)
+static void reset_recovery_timer(struct obd_device *obd)
+{
+        spin_lock_bh(&obd->obd_processing_task_lock);
+        if (!obd->obd_recovering) {
+                spin_unlock_bh(&obd->obd_processing_task_lock);
                 return;
+        }                
         CDEBUG(D_HA, "timer will expire in %u seconds\n",
                OBD_RECOVERY_TIMEOUT / HZ);
         mod_timer(&obd->obd_recovery_timer, jiffies + OBD_RECOVERY_TIMEOUT);
+        spin_unlock_bh(&obd->obd_processing_task_lock);
 }
 
 
@@ -916,6 +908,7 @@ int target_queue_final_reply(struct ptlrpc_request *req, int rc)
         memcpy(reqmsg, req->rq_reqmsg, req->rq_reqlen);
         req = saved_req;
         req->rq_reqmsg = reqmsg;
+        class_export_get(req->rq_export);
         list_add(&req->rq_list, &obd->obd_delayed_reply_queue);
 
         spin_lock_bh(&obd->obd_processing_task_lock);
@@ -928,7 +921,10 @@ int target_queue_final_reply(struct ptlrpc_request *req, int rc)
                 ldlm_reprocess_all_ns(req->rq_export->exp_obd->obd_namespace);
                 CWARN("%s: all clients recovered, sending delayed replies\n",
                        obd->obd_name);
+                spin_lock_bh(&obd->obd_processing_task_lock);
                 obd->obd_recovering = 0;
+                target_cancel_recovery_timer(obd);
+                spin_unlock_bh(&obd->obd_processing_task_lock);
 
                 /* when recovery finished, cleanup orphans on mds and ost */
                 if (OBT(obd) && OBP(obd, postrecov)) {
@@ -944,11 +940,12 @@ int target_queue_final_reply(struct ptlrpc_request *req, int rc)
                         req = list_entry(tmp, struct ptlrpc_request, rq_list);
                         DEBUG_REQ(D_ERROR, req, "delayed:");
                         ptlrpc_reply(req);
+                        class_export_put(req->rq_export);
                         list_del(&req->rq_list);
                         OBD_FREE(req->rq_reqmsg, req->rq_reqlen);
                         OBD_FREE(req, sizeof *req);
                 }
-                target_cancel_recovery_timer(obd);
+                ptlrpc_run_recovery_over_upcall(obd);
         } else {
                 CWARN("%s: %d recoverable clients remain\n",
                        obd->obd_name, obd->obd_recoverable_clients);
@@ -1125,6 +1122,8 @@ void *ldlm_put_lock_into_req(struct ptlrpc_request *req,
         for (i = 0; i < REQ_MAX_ACK_LOCKS; i++) {
                 if (req->rq_ack_locks[i].mode)
                         continue;
+                CDEBUG(D_HA, "saving lock "LPX64" in req %p ack_lock[%d]\n",
+                       lock->cookie, req, i);
                 memcpy(&req->rq_ack_locks[i].lock, lock, sizeof(*lock));
                 req->rq_ack_locks[i].mode = mode;
                 return &req->rq_ack_locks[i];
index 042a383..85c98c4 100644 (file)
@@ -366,7 +366,7 @@ int ldlm_server_blocking_ast(struct ldlm_lock *lock,
                  */
                 CERROR("BLOCKING AST to client (nid "LPU64") timeout, "
                        "simply cancel lock 0x%p\n",
-                       req->rq_connection->c_peer.peer_nid, lock);
+                       req->rq_peer.peer_nid, lock);
                 ldlm_lock_cancel(lock);
                 rc = -ERESTART;
 #endif
@@ -374,18 +374,18 @@ int ldlm_server_blocking_ast(struct ldlm_lock *lock,
                 if (rc == -EINVAL)
                         CDEBUG(D_DLMTRACE, "client (nid "LPU64") returned %d "
                                "from blocking AST for lock %p--normal race\n",
-                               req->rq_connection->c_peer.peer_nid,
+                               req->rq_peer.peer_nid,
                                req->rq_repmsg->status, lock);
                 else if (rc == -ENOTCONN)
                         CDEBUG(D_DLMTRACE, "client (nid "LPU64") returned %d "
                                "from blocking AST for lock %p--this client was "
                                "probably rebooted while it held a lock, nothing"
-                               " serious\n",req->rq_connection->c_peer.peer_nid,
+                               " serious\n",req->rq_peer.peer_nid,
                                req->rq_repmsg->status, lock);
                 else
                         CDEBUG(D_ERROR, "client (nid "LPU64") returned %d "
                                "from blocking AST for lock %p\n",
-                               req->rq_connection->c_peer.peer_nid,
+                               req->rq_peer.peer_nid,
                                (req->rq_repmsg != NULL)?
                                req->rq_repmsg->status : 0,
                                lock);
@@ -653,9 +653,9 @@ int ldlm_handle_cancel(struct ptlrpc_request *req)
         if (!lock) {
                 CERROR("received cancel for unknown lock cookie "LPX64
                        " from nid "LPX64" (%s)\n", dlm_req->lock_handle1.cookie,
-                       req->rq_connection->c_peer.peer_nid,
-                       portals_nid2str(req->rq_connection->c_peer.peer_ni->pni_number,
-                                       req->rq_connection->c_peer.peer_nid, str));
+                       req->rq_peer.peer_nid,
+                       portals_nid2str(req->rq_peer.peer_ni->pni_number,
+                                       req->rq_peer.peer_nid, str));
                 LDLM_DEBUG_NOLOCK("server-side cancel handler stale lock "
                                   "(cookie "LPU64")",
                                   dlm_req->lock_handle1.cookie);
@@ -817,9 +817,9 @@ static int ldlm_callback_handler(struct ptlrpc_request *req)
                 CDEBUG(D_RPCTRACE, "operation %d from nid "LPX64" (%s) with bad "
                        "export cookie "LPX64" (ptl req %d/rep %d); this is "
                        "normal if this node rebooted with a lock held\n",
-                       req->rq_reqmsg->opc, req->rq_connection->c_peer.peer_nid,
-                       portals_nid2str(req->rq_connection->c_peer.peer_ni->pni_number,
-                                       req->rq_connection->c_peer.peer_nid, str),
+                       req->rq_reqmsg->opc, req->rq_peer.peer_nid,
+                       portals_nid2str(req->rq_peer.peer_ni->pni_number,
+                                       req->rq_peer.peer_nid, str),
                        req->rq_reqmsg->handle.cookie,
                        req->rq_request_portal, req->rq_reply_portal);
 
index 7e79cd8..ae9b202 100644 (file)
@@ -871,12 +871,40 @@ static int ldlm_chain_lock_for_replay(struct ldlm_lock *lock, void *closure)
         return LDLM_ITER_CONTINUE;
 }
 
+static int replay_lock_interpret(struct ptlrpc_request *req,
+                                    void * data, int rc)
+{
+        struct ldlm_lock *lock;
+        struct ldlm_reply *reply;
+
+        atomic_dec(&req->rq_import->imp_replay_inflight);
+        if (rc != ELDLM_OK)
+                GOTO(out, rc);
+
+        lock = req->rq_async_args.pointer_arg[0];
+        LASSERT(lock != NULL);
+
+        reply = lustre_swab_repbuf(req, 0, sizeof (*reply),
+                                   lustre_swab_ldlm_reply);
+        if (reply == NULL) {
+                CERROR("Can't unpack ldlm_reply\n");
+                GOTO (out, rc = -EPROTO);
+        }
+
+        memcpy(&lock->l_remote_handle, &reply->lock_handle,
+               sizeof(lock->l_remote_handle));
+        LDLM_DEBUG(lock, "replayed lock:");
+        ptlrpc_import_recovery_state_machine(req->rq_import);
+ out:
+        RETURN(rc);
+}
+
 static int replay_one_lock(struct obd_import *imp, struct ldlm_lock *lock)
 {
         struct ptlrpc_request *req;
         struct ldlm_request *body;
         struct ldlm_reply *reply;
-        int rc, size;
+        int size;
         int flags;
 
         /*
@@ -908,7 +936,7 @@ static int replay_one_lock(struct obd_import *imp, struct ldlm_lock *lock)
                 RETURN(-ENOMEM);
 
         /* We're part of recovery, so don't wait for it. */
-        req->rq_send_state = LUSTRE_IMP_REPLAY;
+        req->rq_send_state = LUSTRE_IMP_REPLAY_LOCKS;
 
         body = lustre_msg_buf(req->rq_reqmsg, 0, sizeof (*body));
         ldlm_lock2desc(lock, &body->lock_desc);
@@ -919,23 +947,13 @@ static int replay_one_lock(struct obd_import *imp, struct ldlm_lock *lock)
         req->rq_replen = lustre_msg_size(1, &size);
 
         LDLM_DEBUG(lock, "replaying lock:");
-        rc = ptlrpc_queue_wait(req);
-        if (rc != ELDLM_OK)
-                GOTO(out, rc);
 
-        reply = lustre_swab_repbuf(req, 0, sizeof (*reply),
-                                   lustre_swab_ldlm_reply);
-        if (reply == NULL) {
-                CERROR("Can't unpack ldlm_reply\n");
-                GOTO (out, rc = -EPROTO);
-        }
+        atomic_inc(&req->rq_import->imp_replay_inflight);
+        req->rq_async_args.pointer_arg[0] = lock;
+        req->rq_interpret_reply = replay_lock_interpret;
+        ptlrpcd_add_req(req);
 
-        memcpy(&lock->l_remote_handle, &reply->lock_handle,
-               sizeof(lock->l_remote_handle));
-        LDLM_DEBUG(lock, "replayed lock:");
- out:
-        ptlrpc_req_finished(req);
-        RETURN(rc);
+        RETURN(0);
 }
 
 int ldlm_replay_locks(struct obd_import *imp)
@@ -948,6 +966,11 @@ int ldlm_replay_locks(struct obd_import *imp)
         ENTRY;
         INIT_LIST_HEAD(&list);
 
+        LASSERT(atomic_read(&imp->imp_replay_inflight) == 0);
+
+        /* ensure this doesn't fall to 0 before all have been queued */
+        atomic_inc(&imp->imp_replay_inflight);
+
         l_lock(&ns->ns_lock);
         (void)ldlm_namespace_foreach(ns, ldlm_chain_lock_for_replay, &list);
 
@@ -958,5 +981,8 @@ int ldlm_replay_locks(struct obd_import *imp)
                         break; /* or try to do the rest? */
         }
         l_unlock(&ns->ns_lock);
+
+        atomic_dec(&imp->imp_replay_inflight);
+
         RETURN(rc);
 }
index 267c61c..f0c778c 100644 (file)
@@ -369,7 +369,7 @@ void ll_put_super(struct super_block *sb)
 } /* ll_put_super */
 
 int lustre_process_log(struct lustre_mount_data *lmd, char * profile,
-                       struct config_llog_instance *cfg)
+                       struct config_llog_instance *cfg, int allow_recov)
 {
         struct lustre_cfg lcfg;
         struct portals_cfg pcfg;
@@ -441,6 +441,13 @@ int lustre_process_log(struct lustre_mount_data *lmd, char * profile,
         if (obd == NULL)
                 GOTO(out_cleanup, err = -EINVAL);
 
+        /* Disable initial recovery on this import */
+        err = obd_set_info(obd->obd_self_export, 
+                           strlen("initial_recov"), "initial_recov", 
+                           sizeof(allow_recov), &allow_recov);
+        if (err)
+                GOTO(out_cleanup, err);
+
         err = obd_connect(&mdc_conn, obd, &mdc_uuid);
         if (err) {
                 CERROR("cannot connect to %s: rc = %d\n", lmd->lmd_mds, err);
@@ -535,7 +542,7 @@ int lustre_fill_super(struct super_block *sb, void *data, int silent)
                 cfg.cfg_instance = sbi->ll_instance;
                 cfg.cfg_uuid = sbi->ll_sb_uuid;
                 cfg.cfg_local_nid = lmd->lmd_local_nid;
-                err = lustre_process_log(lmd, lmd->lmd_profile, &cfg);
+                err = lustre_process_log(lmd, lmd->lmd_profile, &cfg, 1);
                 if (err < 0) {
                         CERROR("Unable to process log: %s\n", lmd->lmd_profile);
 
@@ -598,7 +605,8 @@ out_free:
                         OBD_ALLOC(cln_prof, len);
                         sprintf(cln_prof, "%s-clean", sbi->ll_lmd->lmd_profile);
 
-                        err = lustre_process_log(sbi->ll_lmd, cln_prof, &cfg);
+                        err = lustre_process_log(sbi->ll_lmd, cln_prof, &cfg, 
+                                                 0);
                         if (err < 0) 
                                 CERROR("Unable to process log: %s\n", cln_prof);
                         OBD_FREE(cln_prof, len);
@@ -611,6 +619,35 @@ out_free:
         goto out_dev;
 } /* lustre_fill_super */
 
+static void lustre_manual_cleanup(struct ll_sb_info *sbi) 
+{
+        struct lustre_cfg lcfg;
+        struct obd_device *obd;
+        int next = 0; 
+
+        while ((obd = class_devices_in_group(&sbi->ll_sb_uuid, &next)) != NULL)
+        {
+                int err;
+
+                LCFG_INIT(lcfg, LCFG_CLEANUP, obd->obd_name);
+                err = class_process_config(&lcfg);
+                if (err) {
+                        CERROR("cleanup failed: %s\n", obd->obd_name);
+                        //continue;
+                }
+
+                LCFG_INIT(lcfg, LCFG_DETACH, obd->obd_name);
+                err = class_process_config(&lcfg);
+                if (err) {
+                        CERROR("detach failed: %s\n", obd->obd_name);
+                        //continue;
+                }
+        }
+
+        if (sbi->ll_lmd != NULL) 
+                class_del_profile(sbi->ll_lmd->lmd_profile);
+}
+
 void lustre_put_super(struct super_block *sb)
 {
         struct ll_sb_info *sbi = ll_s2sbi(sb);
@@ -632,9 +669,12 @@ void lustre_put_super(struct super_block *sb)
                 OBD_ALLOC(cln_prof, len);
                 sprintf(cln_prof, "%s-clean", sbi->ll_lmd->lmd_profile);
 
-                err = lustre_process_log(sbi->ll_lmd, cln_prof, &cfg);
-                if (err < 0)
-                        CERROR("Unable to process log: %s\n", cln_prof);
+                err = lustre_process_log(sbi->ll_lmd, cln_prof, &cfg, 0);
+                if (err < 0) {
+                        CERROR("Unable to process log: %s, doing manual cleanup"
+                               "\n", cln_prof);
+                        lustre_manual_cleanup(sbi);
+                }
 
                 OBD_FREE(cln_prof, len);
                 OBD_FREE(sbi->ll_lmd, sizeof(*sbi->ll_lmd));
index 61cd57c..935548e 100644 (file)
 #include <linux/obd.h>
 #include <linux/lustre_lib.h>
 
+atomic_t obd_memory;
+int obd_memmax;
+
+
 /* Debugging check only needed during development */
 #ifdef OBD_CTXT_DEBUG
 # define ASSERT_CTXT_MAGIC(magic) LASSERT((magic) == OBD_RUN_CTXT_MAGIC)
@@ -308,49 +312,45 @@ static int l_filldir(void *__buf, const char *name, int namlen, loff_t offset,
 {
         struct l_linux_dirent *dirent;
         struct l_readdir_callback *buf = (struct l_readdir_callback *)__buf;
-        int reclen = size_round(offsetof(struct l_linux_dirent, d_name) + namlen + 1);
         
-        buf->error = -EINVAL;
-        if (reclen > buf->count)
-                return -EINVAL;
-        dirent = buf->previous;
+        dirent = buf->lrc_dirent;
         if (dirent)
-               dirent->d_off = offset; 
-        dirent = buf->current_dir;
-        buf->previous = dirent;
-        dirent->d_ino = ino;
-        dirent->d_reclen = reclen;
-        memcpy(dirent->d_name, name, namlen);
-        ((char *)dirent) += reclen;
-        buf->current_dir = dirent;
-        buf->count -= reclen; 
+               dirent->lld_off = offset; 
+
+        OBD_ALLOC(dirent, sizeof(*dirent));
+
+        list_add_tail(&dirent->lld_list, buf->lrc_list);
+
+        buf->lrc_dirent = dirent;
+        dirent->lld_ino = ino;
+        LASSERT(sizeof(dirent->lld_name) >= namlen + 1);
+        memcpy(dirent->lld_name, name, namlen);
+
         return 0;
 }
 
-long l_readdir(struct file * file, void * dirent, unsigned int count)
+long l_readdir(struct file *file, struct list_head *dentry_list)
 {
-        struct l_linux_dirent * lastdirent;
+        struct l_linux_dirent *lastdirent;
         struct l_readdir_callback buf;
         int error;
 
-        buf.current_dir = (struct l_linux_dirent *)dirent;
-        buf.previous = NULL;
-        buf.count = count;
-        buf.error = 0;
+        buf.lrc_dirent = NULL;
+        buf.lrc_list = dentry_list; 
 
         error = vfs_readdir(file, l_filldir, &buf);
         if (error < 0)
                 return error;
-        error = buf.error;
-        lastdirent = buf.previous;
 
-        if (lastdirent) {
-                lastdirent->d_off = file->f_pos;
-                error = count - buf.count;        
-        }
-        return error
+        lastdirent = buf.lrc_dirent;
+        if (lastdirent)
+                lastdirent->lld_off = file->f_pos;
+
+        return 0
 }
 EXPORT_SYMBOL(l_readdir);
+EXPORT_SYMBOL(obd_memory);
+EXPORT_SYMBOL(obd_memmax);
 
 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
 
@@ -361,6 +361,12 @@ static int __init lvfs_linux_init(void)
 
 static void __exit lvfs_linux_exit(void)
 {
+        int leaked;
+        ENTRY;
+
+        leaked = atomic_read(&obd_memory);
+        CDEBUG(leaked ? D_ERROR : D_INFO,
+               "obd mem max: %d leaked: %d\n", obd_memmax, leaked);
 
         return;
 }
index ef6861c..bcac2e3 100644 (file)
@@ -41,6 +41,8 @@
 
 #define REQUEST_MINOR 244
 
+static int mdc_cleanup(struct obd_device *obd, int flags);
+
 extern int mds_queue_req(struct ptlrpc_request *);
 /* Helper that implements most of mdc_getstatus and signal_completed_replay. */
 /* XXX this should become mdc_get_info("key"), sending MDS_GET_INFO RPC */
@@ -403,6 +405,7 @@ static int mdc_close_interpret(struct ptlrpc_request *req, void *data, int rc)
 {
         union ptlrpc_async_args *aa = data;
         struct mdc_rpc_lock *rpc_lock = aa->pointer_arg[0];
+        
         mdc_put_rpc_lock(rpc_lock, NULL);
         wake_up(&req->rq_reply_waitq);
         RETURN(rc);
@@ -416,8 +419,9 @@ static int mdc_close_check_reply(struct ptlrpc_request *req)
         unsigned long flags;
 
         spin_lock_irqsave(&req->rq_lock, flags);
-        if (req->rq_replied || req->rq_err)
+        if (PTLRPC_REQUEST_COMPLETE(req)) {
                 rc = 1;
+        }
         spin_unlock_irqrestore (&req->rq_lock, flags);
         return rc;
 }
@@ -483,16 +487,15 @@ int mdc_close(struct obd_export *exp, struct obdo *obdo,
         rc = l_wait_event(req->rq_reply_waitq, mdc_close_check_reply(req),
                           &lwi);
         
-        if (mod == NULL && rc == 0)
-                CERROR("Unexpected: can't find mdc_open_data, but the close "
-                       "succeeded.  Please tell CFS.\n");
-
-        if (rc == 0) {
+         if (rc == 0) {
                 rc = req->rq_repmsg->status;
                 if (req->rq_repmsg->type == PTL_RPC_MSG_ERR) {
-                        DEBUG_REQ(D_ERROR, req, "type == PTL_RPC_MSG_ERR");
+                        DEBUG_REQ(D_ERROR, req, "type == PTL_RPC_MSG_ERR, err = %d", rc);
                         if (rc > 0)
                                 rc = -rc;
+                } else if (mod == NULL) {
+                        CERROR("Unexpected: can't find mdc_open_data, but the close "
+                               "succeeded.  Please tell CFS.\n");
                 }
         }
 
@@ -587,18 +590,21 @@ static int mdc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
         int rc;
         ENTRY;
         
+        MOD_INC_USE_COUNT;
+
         switch (cmd) {
         case OBD_IOC_CLIENT_RECOVER:
                 rc = ptlrpc_recover_import(imp, data->ioc_inlbuf1);
                 if (rc < 0)
-                        RETURN(rc);
-                RETURN(0);
+                        GOTO(out, rc);
+                GOTO(out, rc = 0);
         case IOC_OSC_SET_ACTIVE:
-                RETURN(ptlrpc_set_import_active(imp, data->ioc_offset));
+                rc = ptlrpc_set_import_active(imp, data->ioc_offset);
+                GOTO(out, rc);
         case OBD_IOC_PARSE: {
                 ctxt = llog_get_context(exp->exp_obd, LLOG_CONFIG_REPL_CTXT);
                 rc = class_config_parse_llog(ctxt, data->ioc_inlbuf1, NULL);
-                RETURN(rc);
+                GOTO(out, rc);
         }
 #ifdef __KERNEL__
         case OBD_IOC_LLOG_INFO:
@@ -606,13 +612,36 @@ static int mdc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
                 ctxt = llog_get_context(obd, LLOG_CONFIG_REPL_CTXT);
                 rc = llog_ioctl(ctxt, cmd, data);
                 
-                RETURN(rc);
+                GOTO(out, rc);
         }
 #endif
         default:
                 CERROR("osc_ioctl(): unrecognised ioctl %#x\n", cmd);
-                RETURN(-ENOTTY);
+                GOTO(out, rc = -ENOTTY);
         }
+out:
+        MOD_DEC_USE_COUNT;
+        return rc;
+}
+
+int mdc_set_info(struct obd_export *exp, obd_count keylen,
+                 void *key, obd_count vallen, void *val)
+{
+        int rc = -EINVAL;
+
+        if (keylen == strlen("initial_recov") &&
+            memcmp(key, "initial_recov", strlen("initial_recov")) == 0) {
+                struct obd_import *imp = exp->exp_obd->u.cli.cl_import;
+                if (vallen != sizeof(int))
+                        RETURN(-EINVAL);
+                imp->imp_initial_recov = *(int *)val;
+                CDEBUG(D_HA, "%s: set imp_no_init_recov = %d\n",
+                       exp->exp_obd->obd_name,
+                       imp->imp_initial_recov);
+                RETURN(0);
+        }
+        
+        RETURN(rc);
 }
 
 static int mdc_statfs(struct obd_device *obd, struct obd_statfs *osfs,
@@ -803,17 +832,12 @@ static int mdc_setup(struct obd_device *obd, obd_count len, void *buf)
                 OBD_FREE(cli->cl_rpc_lock, sizeof (*cli->cl_rpc_lock));
         }
 
-        RETURN(rc);
-}
-
-
-int mdc_postsetup(struct obd_device *obd) 
-{
-        int rc;
         rc = obd_llog_init(obd, obd, 0, NULL);
         if (rc) {
+                mdc_cleanup(obd, 0);
                 CERROR("failed to setup llogging subsystems\n");
         }
+
         RETURN(rc);
 }
 
@@ -907,12 +931,12 @@ struct obd_ops mdc_obd_ops = {
         o_attach:      mdc_attach,
         o_detach:      mdc_detach,
         o_setup:       mdc_setup,
-        o_postsetup:   mdc_postsetup,
         o_precleanup:  mdc_precleanup,
         o_cleanup:     mdc_cleanup,
         o_connect:     client_connect_import,
         o_disconnect:  client_disconnect_export,
         o_iocontrol:   mdc_iocontrol,
+        o_set_info:    mdc_set_info,
         o_statfs:      mdc_statfs,
         o_pin:         mdc_pin,
         o_unpin:       mdc_unpin,
index b8ade94..06dd213 100644 (file)
@@ -56,6 +56,7 @@
 
 #include "mds_internal.h"
 
+static int mds_postsetup(struct obd_device *obd);
 static int mds_cleanup(struct obd_device *obd, int flags);
 
 static int mds_bulk_timeout(void *data)
@@ -1090,8 +1091,8 @@ int mds_handle(struct ptlrpc_request *req)
                 OBD_FAIL_RETURN(OBD_FAIL_MDS_READPAGE_NET, 0);
                 rc = mds_readpage(req);
 
-                if (OBD_FAIL_CHECK(OBD_FAIL_MDS_SENDPAGE))
-                        return 0;
+                OBD_FAIL_RETURN(OBD_FAIL_MDS_SENDPAGE, 0);
+
                 break;
 
         case MDS_REINT: {
@@ -1368,6 +1369,9 @@ static int mds_setup(struct obd_device *obd, obd_count len, void *buf)
                            "mds_ldlm_client", &obd->obd_ldlm_client);
         obd->obd_replayable = 1;
 
+        rc = mds_postsetup(obd);
+        if (rc)
+                GOTO(err_fs, rc);
         RETURN(0);
 
 err_fs:
@@ -1437,6 +1441,7 @@ static int mds_postrecov(struct obd_device *obd)
         int rc, rc2;
 
         LASSERT(!obd->obd_recovering);
+        LASSERT(llog_get_context(obd, LLOG_UNLINK_ORIG_CTXT) != NULL);
 
         rc = llog_connect(llog_get_context(obd, LLOG_UNLINK_ORIG_CTXT),
                           obd->u.mds.mds_lov_desc.ld_tgt_count, NULL, NULL);
@@ -1520,8 +1525,13 @@ static int mds_cleanup(struct obd_device *obd, int flags)
 
         ldlm_namespace_free(obd->obd_namespace, flags & OBD_OPT_FORCE);
 
-        if (obd->obd_recovering)
+        spin_lock_bh(&obd->obd_processing_task_lock);
+        if (obd->obd_recovering) {
                 target_cancel_recovery_timer(obd);
+                obd->obd_recovering = 0;
+        }
+        spin_unlock_bh(&obd->obd_processing_task_lock);
+
         lock_kernel();
         dev_clear_rdonly(2);
         fsfilt_put_ops(obd->obd_fsops);
@@ -1871,7 +1881,6 @@ static struct obd_ops mds_obd_ops = {
         o_destroy_export:  mds_destroy_export,
         o_disconnect:  mds_disconnect,
         o_setup:       mds_setup,
-        o_postsetup:   mds_postsetup,
         o_precleanup:  mds_precleanup,
         o_cleanup:     mds_cleanup,
         o_postrecov:   mds_postrecov,
index ae4117c..a71ee06 100644 (file)
@@ -739,7 +739,7 @@ int mds_lock_new_child(struct obd_device *obd, struct inode *inode,
         else if (child_lockh == &lockh)
                 ldlm_lock_decref(child_lockh, LCK_EX);
 
-        return rc;
+        RETURN(rc);
 }
 
 int mds_open(struct mds_update_record *rec, int offset,
index 415b133..84a4090 100644 (file)
@@ -235,87 +235,95 @@ int mds_cleanup_orphans(struct obd_device *obd)
         struct mds_obd *mds = &obd->u.mds;
         struct obd_run_ctxt saved;
         struct file *file;
-        struct dentry *dchild;
+        struct dentry *dchild, *dentry;
+        struct vfsmount *mnt;
         struct inode *child_inode, *pending_dir = mds->mds_pending_dir->d_inode;
-        struct l_linux_dirent *dirent, *ptr;
-        unsigned int count = pending_dir->i_size;
-        int rc = 0, rc2 = 0, item = 0;
+        struct l_linux_dirent *dirent, *n;
+        struct list_head dentry_list;
+        char d_name[LL_FID_NAMELEN];
+        __u64 i = 0;
+        int rc = 0, item = 0, namlen;
         ENTRY;
 
         push_ctxt(&saved, &obd->obd_ctxt, NULL);
-        dget(mds->mds_pending_dir);
-        mntget(mds->mds_vfsmnt);
+        dentry = dget(mds->mds_pending_dir);
+        if (IS_ERR(dentry))
+                GOTO(err_pop, rc = PTR_ERR(dentry));
+        mnt = mntget(mds->mds_vfsmnt);
+        if (IS_ERR(mnt))
+                GOTO(err_mntget, rc = PTR_ERR(mnt));
+
         file = dentry_open(mds->mds_pending_dir, mds->mds_vfsmnt,
                            O_RDONLY | O_LARGEFILE);
         if (IS_ERR(file))
-                GOTO(err_open, rc2 = PTR_ERR(file));
-
-        OBD_ALLOC(dirent, count);
-        if (dirent == NULL)
-                GOTO(err_alloc_dirent, rc2 = -ENOMEM);
+                GOTO(err_pop, rc = PTR_ERR(file));
 
-        rc = l_readdir(file, dirent, count);
+        INIT_LIST_HEAD(&dentry_list);
+        rc = l_readdir(file, &dentry_list);
         filp_close(file, 0);
         if (rc < 0)
-                GOTO(err_out, rc2 = rc);
-
-        for (ptr = dirent; (char *)ptr < (char *)dirent + rc;
-                        (char *)ptr += ptr->d_reclen) {
-                int namlen = strlen(ptr->d_name);
-
-                if (((namlen == 1) && !strcmp(ptr->d_name, ".")) ||
-                    ((namlen == 2) && !strcmp(ptr->d_name, "..")))
+                GOTO(err_out, rc);
+
+        list_for_each_entry_safe(dirent, n, &dentry_list, lld_list) {
+                i ++;
+                list_del(&dirent->lld_list);
+
+                namlen = strlen(dirent->lld_name);
+                LASSERT(sizeof(d_name) >= namlen + 1);
+                strcpy(d_name, dirent->lld_name);
+                OBD_FREE(dirent, sizeof(*dirent));
+
+                CDEBUG(D_INODE, "entry "LPU64" of PENDING DIR: %s\n",
+                       i, d_name);
+                
+                if (((namlen == 1) && !strcmp(d_name, ".")) ||
+                    ((namlen == 2) && !strcmp(d_name, ".."))) {
                         continue;
+                }
 
                 down(&pending_dir->i_sem);
-                dchild = lookup_one_len(ptr->d_name, mds->mds_pending_dir,
-                                        namlen);
+                dchild = lookup_one_len(d_name, mds->mds_pending_dir, namlen);
                 if (IS_ERR(dchild)) {
                         up(&pending_dir->i_sem);
-                        GOTO(err_out, rc2 = PTR_ERR(dchild));
+                        GOTO(err_out, rc = PTR_ERR(dchild));
                 }
                 if (!dchild->d_inode) {
-                        CDEBUG(D_ERROR, "orphan %s has been removed\n",
-                               ptr->d_name);
-                        GOTO(next, rc2 = 0);
+                        CERROR("orphan %s has been removed\n", d_name);
+                        GOTO(next, rc = 0);
                 }
 
                 child_inode = dchild->d_inode;
                 if (mds_inode_is_orphan(child_inode) &&
                     mds_open_orphan_count(child_inode)) {
-                        CWARN("orphan %s was re-opened during recovery\n", 
-                              ptr->d_name);
-                        GOTO(next, rc2 = 0);
+                        CWARN("orphan %s was re-opened during recovery\n", d_name);
+                        GOTO(next, rc = 0);
                 }
 
-                rc2 = mds_unlink_orphan(obd, dchild, child_inode, pending_dir);
-                if (rc2 == 0) {
+                rc = mds_unlink_orphan(obd, dchild, child_inode, pending_dir);
+                if (rc == 0) {
                         item ++;
-                        CWARN("removed orphan %s from MDS and OST\n",
-                               ptr->d_name);
+                        CWARN("removed orphan %s from MDS and OST\n", d_name);
                 } else {
-                        l_dput(dchild); 
-                        up(&pending_dir->i_sem);
-                        GOTO(err_out, rc2);
+                        CERROR("removed orphan %s from MDS and OST failed,"
+                               " rc = %d\n", d_name, rc);
+                        rc = 0;
                 }
 next:
                 l_dput(dchild);
                 up(&pending_dir->i_sem);
         }
 err_out:
-        OBD_FREE(dirent, count);
+        list_for_each_entry_safe(dirent, n, &dentry_list, lld_list) {
+                list_del(&dirent->lld_list);
+                OBD_FREE(dirent, sizeof(*dirent));
+        }
 err_pop:
         pop_ctxt(&saved, &obd->obd_ctxt, NULL);
-        if (rc2 == 0)
-                rc2 = item;
-
-        RETURN(rc2);
+        if (rc == 0)
+                rc = item;
+        RETURN(rc);
 
-err_open:
-        mntput(mds->mds_vfsmnt);
+err_mntget:
         l_dput(mds->mds_pending_dir);
         goto err_pop;
-err_alloc_dirent:
-        filp_close(file, 0);
-        goto err_pop;
 }
index 01cf385..4bc5828 100644 (file)
 struct semaphore obd_conf_sem;   /* serialize configuration commands */
 struct obd_device obd_dev[MAX_OBD_DEVICES];
 struct list_head obd_types;
+#ifndef __KERNEL__
 atomic_t obd_memory;
 int obd_memmax;
+#endif
 
 int proc_version;
 
@@ -417,8 +419,6 @@ void *obd_psdev = NULL;
 
 EXPORT_SYMBOL(obd_dev);
 EXPORT_SYMBOL(obdo_cachep);
-EXPORT_SYMBOL(obd_memory);
-EXPORT_SYMBOL(obd_memmax);
 EXPORT_SYMBOL(obd_fail_loc);
 EXPORT_SYMBOL(obd_timeout);
 EXPORT_SYMBOL(obd_lustre_upcall);
@@ -436,6 +436,7 @@ EXPORT_SYMBOL(class_name2obd);
 EXPORT_SYMBOL(class_uuid2dev);
 EXPORT_SYMBOL(class_uuid2obd);
 EXPORT_SYMBOL(class_find_client_obd);
+EXPORT_SYMBOL(class_devices_in_group);
 EXPORT_SYMBOL(__class_export_put);
 EXPORT_SYMBOL(class_new_export);
 EXPORT_SYMBOL(class_unlink_export);
@@ -468,6 +469,7 @@ EXPORT_SYMBOL(class_handle2object);
 
 /* config.c */
 EXPORT_SYMBOL(class_get_profile);
+EXPORT_SYMBOL(class_del_profile);
 EXPORT_SYMBOL(class_process_config);
 EXPORT_SYMBOL(class_config_parse_llog);
 EXPORT_SYMBOL(class_config_dump_llog);
@@ -645,7 +647,11 @@ static void /*__exit*/ cleanup_obdclass(void)
 static void cleanup_obdclass(void)
 #endif
 {
+#ifdef __KERNEL__
+        int i;
+#else
         int i, leaked;
+#endif
         ENTRY;
 
         misc_deregister(&obd_psdev);
@@ -672,9 +678,11 @@ static void cleanup_obdclass(void)
         class_handle_cleanup();
         class_exit_uuidlist();
 
+#ifndef __KERNEL__
         leaked = atomic_read(&obd_memory);
         CDEBUG(leaked ? D_ERROR : D_INFO,
                "obd mem max: %d leaked: %d\n", obd_memmax, leaked);
+#endif
 
         EXIT;
 }
index 0c86eac..dbd805e 100644 (file)
@@ -266,6 +266,35 @@ struct obd_device * class_find_client_obd(struct obd_uuid *tgt_uuid,
         return NULL;
 }
 
+/* Iterate the obd_device list looking devices have grp_uuid. Start
+   searching at *next, and if a device is found, the next index to look
+   it is saved in *next. If next is NULL, then the first matching device
+   will always be returned. */
+struct obd_device * class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
+{
+        int i;
+        if (next == NULL) 
+                i = 0;
+        else if (*next >= 0 && *next < MAX_OBD_DEVICES)
+                i = *next;
+        else 
+                return NULL;
+                
+        for (; i < MAX_OBD_DEVICES; i++) {
+                struct obd_device *obd = &obd_dev[i];
+                if (obd->obd_type == NULL)
+                        continue;
+                if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
+                        if (next != NULL)
+                                *next = i+1;
+                        return obd;
+                }
+        }
+
+        return NULL;
+}
+
+
 void obd_cleanup_caches(void)
 {
         int rc;
@@ -493,9 +522,10 @@ struct obd_import *class_new_import(void)
         imp->imp_max_transno = 0;
         imp->imp_peer_committed_transno = 0;
         imp->imp_state = LUSTRE_IMP_NEW;
-        sema_init(&imp->imp_recovery_sem, 1);
+        init_waitqueue_head(&imp->imp_recovery_waitq);
 
         atomic_set(&imp->imp_refcount, 2);
+        atomic_set(&imp->imp_replay_inflight, 0);
         INIT_LIST_HEAD(&imp->imp_handle.h_link);
         class_handle_hash(&imp->imp_handle, import_handle_addref);
 
@@ -589,7 +619,7 @@ void class_disconnect_exports(struct obd_device *obd, int flags)
         list_del_init(&obd->obd_exports);
         spin_unlock(&obd->obd_dev_lock);
 
-        CDEBUG(D_IOCTL, "OBD device %d (%p) has exports, "
+        CDEBUG(D_HA, "OBD device %d (%p) has exports, "
                "disconnecting them\n", obd->obd_minor, obd);
         list_for_each_safe(tmp, n, &work_list) {
                 exp = list_entry(tmp, struct obd_export, exp_obd_chain);
@@ -597,7 +627,7 @@ void class_disconnect_exports(struct obd_device *obd, int flags)
                 
                 if (obd_uuid_equals(&exp->exp_client_uuid, 
                                     &exp->exp_obd->obd_uuid)) {
-                        CDEBUG(D_IOCTL
+                        CDEBUG(D_HA
                                "exp %p export uuid == obd uuid, don't discon\n",
                                exp);
                         class_export_put(exp);
@@ -613,10 +643,10 @@ void class_disconnect_exports(struct obd_device *obd, int flags)
                 rc = obd_disconnect(fake_exp, flags);
                 class_export_put(exp);
                 if (rc) {
-                        CDEBUG(D_IOCTL, "disconnecting export %p failed: %d\n",
+                        CDEBUG(D_HA, "disconnecting export %p failed: %d\n",
                                exp, rc);
                 } else {
-                        CDEBUG(D_IOCTL, "export %p disconnected\n", exp);
+                        CDEBUG(D_HA, "export %p disconnected\n", exp);
                 }
         }
         EXIT;
index d09cad1..b0e82fe 100644 (file)
@@ -239,7 +239,7 @@ static struct llog_handle *llog_cat_current_log(struct llog_handle *cathandle,
 
         CDEBUG(D_INODE, "creating new log\n");
         loghandle = llog_cat_new_log(cathandle);
-        if (loghandle)
+        if (!IS_ERR(loghandle))
                 down_write(&loghandle->lgh_lock);
         up_write(&cathandle->lgh_lock);
         RETURN(loghandle);
index eb98251..8fd0175 100644 (file)
@@ -302,16 +302,11 @@ int lprocfs_rd_server_uuid(char *page, char **start, off_t off, int count,
 {
         struct obd_device *obd = (struct obd_device *)data;
         struct obd_import *imp;
-        static char* import_state_names[] = {
-                "<UNKNOWN 0>", "INVALID", "NEW", "DISCONN", "CONNECTING",
-                "REPLAY", "RECOVER", "FULL", "EVICTED",
-        };
         char *imp_state_name = NULL;
         
         LASSERT(obd != NULL);
         imp = obd->u.cli.cl_import;
-        LASSERT(imp->imp_state <= LUSTRE_IMP_EVICTED);
-        imp_state_name = import_state_names[imp->imp_state];
+        imp_state_name = ptlrpc_import_state_name(imp->imp_state);
         *eof = 1;
         return snprintf(page, count, "%s\t%s\n",
                         imp->imp_target_uuid.uuid, imp_state_name);
@@ -585,7 +580,6 @@ int lprocfs_alloc_obd_stats(struct obd_device *obd, unsigned num_private_stats)
         LPROCFS_OBD_OP_INIT(num_private_stats, stats, attach);
         LPROCFS_OBD_OP_INIT(num_private_stats, stats, detach);
         LPROCFS_OBD_OP_INIT(num_private_stats, stats, setup);
-        LPROCFS_OBD_OP_INIT(num_private_stats, stats, postsetup);
         LPROCFS_OBD_OP_INIT(num_private_stats, stats, precleanup);
         LPROCFS_OBD_OP_INIT(num_private_stats, stats, cleanup);
         LPROCFS_OBD_OP_INIT(num_private_stats, stats, postrecov);
index 931d5d3..5bf82b3 100644 (file)
@@ -192,40 +192,27 @@ int class_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
         }
 
         atomic_set(&obd->obd_refcount, 0);
-
-        err = obd_setup(obd, sizeof(*lcfg), lcfg);
-        if (err) {
-                RETURN(err);
-        }
-        
-        obd->obd_type->typ_refcnt++;
-        obd->obd_set_up = 1;
-
         exp = class_new_export(obd);
-        if (exp == NULL) {
-                GOTO(err_cleanup, err = -ENOMEM);
-        }
+        if (exp == NULL) 
+                RETURN(err);
         memcpy(&exp->exp_client_uuid, &obd->obd_uuid, 
                sizeof(exp->exp_client_uuid));
         obd->obd_self_export = exp;
         class_export_put(exp);
 
-        if (OBT(obd) && OBP(obd, postsetup)) {
-                err = obd_postsetup(obd);
-                if (err) 
-                        GOTO(err_exp, err);
-        } 
+        err = obd_setup(obd, sizeof(*lcfg), lcfg);
+        if (err) 
+                GOTO(err_exp, err);
+        
+        obd->obd_type->typ_refcnt++;
+        obd->obd_set_up = 1;
 
         RETURN(err);
 
 err_exp:
         class_unlink_export(obd->obd_self_export);
         obd->obd_self_export = NULL;
-err_cleanup:
-        obd->obd_stopping = 1;
-        obd_cleanup(obd, 0);
-        obd->obd_set_up = obd->obd_stopping = 0;
-        obd->obd_type->typ_refcnt--;
         RETURN(err);
 }
 
index a32f2be..36beb06 100644 (file)
@@ -90,7 +90,7 @@ static int echo_destroy_export(struct obd_export *exp)
         RETURN(0);
 }
 
-static __u64 echo_next_id(struct obd_device *obddev)
+ static __u64 echo_next_id(struct obd_device *obddev)
 {
         obd_id id;
 
index a9b09fd..f8b2ed1 100644 (file)
@@ -1133,6 +1133,12 @@ int filter_common_setup(struct obd_device *obd, obd_count len, void *buf,
         ptlrpc_init_client(LDLM_CB_REQUEST_PORTAL, LDLM_CB_REPLY_PORTAL,
                            "filter_ldlm_cb_client", &obd->obd_ldlm_client);
 
+        rc = llog_cat_initialize(obd, 1);
+        if (rc) {
+                CERROR("failed to setup llogging subsystems\n");
+                GOTO(err_post, rc);
+        }
+
         RETURN(0);
 
 err_post:
@@ -1175,18 +1181,6 @@ static int filter_setup(struct obd_device *obd, obd_count len, void *buf)
         return rc;
 }
 
-static int filter_postsetup(struct obd_device *obd)
-{
-        int rc = 0;
-        ENTRY;
-
-        // XXX add a storage location for the logid for size changes
-        rc = llog_cat_initialize(obd, 1);
-        if (rc)
-                CERROR("failed to setup llogging subsystems\n");
-        RETURN(rc);
-}
-
 static int filter_cleanup(struct obd_device *obd, int flags)
 {
         struct filter_obd *filter = &obd->u.filter;
@@ -1760,7 +1754,8 @@ static int filter_destroy(struct obd_export *exp, struct obdo *oa,
         cleanup_phase = 2;
 
         if (dchild->d_inode == NULL) {
-                CERROR("destroying non-existent object "LPU64"\n", oa->o_id);
+                CDEBUG(D_INODE, "destroying non-existent object "LPU64"\n", 
+                       oa->o_id);
                 GOTO(cleanup, rc = -ENOENT);
         }
 
@@ -2105,7 +2100,6 @@ static struct obd_ops filter_obd_ops = {
         o_get_info:       filter_get_info,
         o_set_info:       filter_set_info,
         o_setup:          filter_setup,
-        o_postsetup:      filter_postsetup,
         o_precleanup:     filter_precleanup,
         o_cleanup:        filter_cleanup,
         o_connect:        filter_connect,
index 686fd30..eb3df7a 100644 (file)
@@ -158,13 +158,13 @@ int filter_recov_log_unlink_cb(struct llog_handle *llh,
         rc = obd_destroy(exp, oa, NULL, NULL);
         obdo_free(oa);
         if (rc == -ENOENT) {
-                CWARN("object already removed, send cookie\n");
+                CDEBUG(D_HA, "object already removed, send cookie\n");
                 llog_cancel(ctxt, NULL, 1, &cookie, 0);
                 RETURN(0);
         }
 
         if (rc == 0)
-                CWARN("object: "LPU64" in record is destroyed\n", oid);
+                CDEBUG(D_HA, "object: "LPU64" in record is destroyed\n", oid);
 
         RETURN(rc);
 }
index 04970cc..149ff44 100644 (file)
@@ -82,6 +82,7 @@ static int osc_interpret_create(struct ptlrpc_request *req, void *data,
                 DEBUG_REQ(D_ERROR, req,
                           "unknown rc %d from async create: failing oscc",
                           rc);
+                oscc->oscc_flags |= OSCC_FLAG_RECOVERING;
                 ptlrpc_fail_import(req->rq_import, req->rq_import_generation);
         }
         oscc->oscc_flags &= ~OSCC_FLAG_CREATING;
@@ -191,6 +192,17 @@ static int oscc_precreate(struct osc_creator *oscc, int wait)
         RETURN(rc);
 }
 
+int oscc_recovering(struct osc_creator *oscc) 
+{
+        int recov = 0;
+
+        spin_lock(&oscc->oscc_lock);
+        recov = oscc->oscc_flags & OSCC_FLAG_RECOVERING;
+        spin_unlock(&oscc->oscc_lock);
+
+        return recov;
+}
+
 int osc_create(struct obd_export *exp, struct obdo *oa,
                struct lov_stripe_md **ea, struct obd_trans_info *oti)
 {
@@ -214,6 +226,7 @@ int osc_create(struct obd_export *exp, struct obdo *oa,
        /* this is the special case where create removes orphans */
        if ((oa->o_valid & OBD_MD_FLFLAGS) &&
            oa->o_flags == OBD_FL_DELORPHAN) {
+                CDEBUG(D_HA, "%p: oscc recovery started\n", oscc);
                 /* delete from next_id on up */
                 oa->o_valid |= OBD_MD_FLID;
                 oa->o_id = oscc->oscc_next_id - 1;
@@ -223,12 +236,33 @@ int osc_create(struct obd_export *exp, struct obdo *oa,
                 spin_lock(&oscc->oscc_lock);
                 if (rc == -ENOSPC)
                         oscc->oscc_flags |= OSCC_FLAG_NOSPC;
+                oscc->oscc_flags &= ~OSCC_FLAG_RECOVERING;
                 oscc->oscc_last_id = oa->o_id;
+                wake_up(&oscc->oscc_waitq);
                 spin_unlock(&oscc->oscc_lock);
 
+                CDEBUG(D_HA, "%p: oscc recovery finished\n", oscc);
+
                RETURN(rc);
        }
 
+        /* If orphans are being recovered, then we must wait until it is 
+           finished before we can continue with create. */
+        if (oscc_recovering(oscc)) {
+                struct l_wait_info lwi;
+
+                CDEBUG(D_HA, "%p: oscc recovery in progress, waiting\n", oscc);
+
+                lwi = LWI_TIMEOUT(MAX(obd_timeout * HZ, 1), NULL, NULL);
+                rc = l_wait_event(oscc->oscc_waitq, !oscc_recovering(oscc),
+                                  &lwi);
+                LASSERT(rc == 0 || rc == -ETIMEDOUT);
+                if (rc == -ETIMEDOUT)
+                        RETURN(rc);
+                CDEBUG(D_HA, "%p: oscc recovery over, waking up\n", oscc);
+        }
+        
+        
         while (try_again) {
                 spin_lock(&oscc->oscc_lock);
                 if (oscc->oscc_last_id >= oscc->oscc_next_id) {
@@ -275,6 +309,7 @@ void oscc_init(struct obd_export *exp)
 
         oed->oed_oscc.oscc_next_id = 2;
         oed->oed_oscc.oscc_last_id = 1;
+        oed->oed_oscc.oscc_flags |= OSCC_FLAG_RECOVERING;
         /* XXX the export handle should give the oscc the last object */
         /* oed->oed_oscc.oscc_last_id = exph->....; */
 }
index d33939e..54de594 100644 (file)
@@ -2514,6 +2514,8 @@ static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
         struct obd_ioctl_data *data = karg;
         int err = 0;
         ENTRY;
+        
+        MOD_INC_USE_COUNT;
 
         switch (cmd) {
         case OBD_IOC_LOV_GET_CONFIG: {
@@ -2578,6 +2580,7 @@ static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
                 GOTO(out, err = -ENOTTY);
         }
 out:
+        MOD_DEC_USE_COUNT;
         return err;
 }
 
@@ -2662,6 +2665,19 @@ static int osc_set_info(struct obd_export *exp, obd_count keylen,
                 RETURN(0);
         }
 
+
+        if (keylen == strlen("initial_recov") &&
+            memcmp(key, "initial_recov", strlen("initial_recov")) == 0) {
+                struct obd_import *imp = exp->exp_obd->u.cli.cl_import;
+                if (vallen != sizeof(int))
+                        RETURN(-EINVAL);
+                imp->imp_initial_recov = *(int *)val;
+                CDEBUG(D_HA, "%s: set imp_no_init_recov = %d\n",
+                       exp->exp_obd->obd_name,
+                       imp->imp_initial_recov);
+                RETURN(0);
+        }
+
         if (keylen < strlen("mds_conn") ||
             memcmp(key, "mds_conn", strlen("mds_conn")) != 0)
                 RETURN(-EINVAL);
index 67120d7..6edebc8 100644 (file)
@@ -512,20 +512,20 @@ static int ost_brw_read(struct ptlrpc_request *req)
                         CERROR("bulk IO comms error: "
                                "evicting %s@%s nid "LPX64" (%s)\n",
                                req->rq_export->exp_client_uuid.uuid,
-                               req->rq_connection->c_remote_uuid.uuid,
-                               req->rq_connection->c_peer.peer_nid,
-                               portals_nid2str(req->rq_connection->c_peer.peer_ni->pni_number,
-                                               req->rq_connection->c_peer.peer_nid,
+                               req->rq_export->exp_connection->c_remote_uuid.uuid,
+                               req->rq_peer.peer_nid,
+                               portals_nid2str(req->rq_peer.peer_ni->pni_number,
+                                               req->rq_peer.peer_nid,
                                                str));
                         ptlrpc_fail_export(req->rq_export);
                 } else {
                         CERROR("ignoring bulk IO comms error: "
                                "client reconnected %s@%s nid "LPX64" (%s)\n",  
                                req->rq_export->exp_client_uuid.uuid,
-                               req->rq_connection->c_remote_uuid.uuid,
-                               req->rq_connection->c_peer.peer_nid,
-                               portals_nid2str(req->rq_connection->c_peer.peer_ni->pni_number,
-                                               req->rq_connection->c_peer.peer_nid,
+                               req->rq_export->exp_connection->c_remote_uuid.uuid,
+                               req->rq_peer.peer_nid,
+                               portals_nid2str(req->rq_peer.peer_ni->pni_number,
+                                               req->rq_peer.peer_nid,
                                                str));
                 }
         }
@@ -723,20 +723,20 @@ static int ost_brw_write(struct ptlrpc_request *req, struct obd_trans_info *oti)
                         CERROR("bulk IO comms error: "
                                "evicting %s@%s nid "LPX64" (%s)\n",
                                req->rq_export->exp_client_uuid.uuid,
-                               req->rq_connection->c_remote_uuid.uuid,
-                               req->rq_connection->c_peer.peer_nid,
-                               portals_nid2str(req->rq_connection->c_peer.peer_ni->pni_number,
-                                               req->rq_connection->c_peer.peer_nid,
+                               req->rq_export->exp_connection->c_remote_uuid.uuid,
+                               req->rq_peer.peer_nid,
+                               portals_nid2str(req->rq_peer.peer_ni->pni_number,
+                                               req->rq_peer.peer_nid,
                                                str));
                         ptlrpc_fail_export(req->rq_export);
                 } else {
                         CERROR("ignoring bulk IO comms error: "
                                "client reconnected %s@%s nid "LPX64" (%s)\n",
                                req->rq_export->exp_client_uuid.uuid,
-                               req->rq_connection->c_remote_uuid.uuid,
-                               req->rq_connection->c_peer.peer_nid,
-                               portals_nid2str(req->rq_connection->c_peer.peer_ni->pni_number,
-                                               req->rq_connection->c_peer.peer_nid,
+                               req->rq_export->exp_connection->c_remote_uuid.uuid,
+                               req->rq_peer.peer_nid,
+                               portals_nid2str(req->rq_peer.peer_ni->pni_number,
+                                               req->rq_peer.peer_nid,
                                                str));
                 }        
         }
@@ -1171,8 +1171,12 @@ static int ost_cleanup(struct obd_device *obddev, int flags)
         int err = 0;
         ENTRY;
 
-        if (obddev->obd_recovering)
+        spin_lock_bh(&obddev->obd_processing_task_lock);
+        if (obddev->obd_recovering) {
                 target_cancel_recovery_timer(obddev);
+                obddev->obd_recovering = 0;
+        }
+        spin_unlock_bh(&obddev->obd_processing_task_lock);
 
         ptlrpc_stop_all_threads(ost->ost_service);
         ptlrpc_unregister_service(ost->ost_service);
index b503cec..09db54e 100644 (file)
@@ -805,8 +805,6 @@ portals_debug_msg(int subsys, int mask, char *file, const char *fn,
                               subsys, mask, smp_processor_id(),
                               tv.tv_sec, tv.tv_usec, stack, current->pid);
         max_nob -= prefix_nob;
-        if(*(format + strlen(format) - 1) != '\n')
-                *(format + strlen(format)) = '\n';
 
 #if defined(__arch_um__) && (LINUX_VERSION_CODE < KERNEL_VERSION(2,4,20))
         msg_nob = snprintf(debug_buf + debug_off + prefix_nob, max_nob,
index 1acac79..b51d09e 100644 (file)
@@ -136,7 +136,7 @@ int ptlbd_cl_connect(struct lustre_handle *conn, struct obd_device *obd,
         if (rc)
                 GOTO(out_req, rc);
 
-        exp->exp_connection = ptlrpc_connection_addref(request->rq_connection);
+        exp->exp_connection = ptlrpc_connection_addref(imp->imp_connection);
 
         imp->imp_state = LUSTRE_IMP_FULL;
         imp->imp_remote_handle = request->rq_repmsg->handle;
index 6f3ae1b..12cf867 100644 (file)
@@ -43,11 +43,6 @@ void ptlrpc_init_client(int req_portal, int rep_portal, char *name,
         cl->cli_name           = name;
 }
 
-struct obd_uuid *ptlrpc_req_to_uuid(struct ptlrpc_request *req)
-{
-        return &req->rq_connection->c_remote_uuid;
-}
-
 struct ptlrpc_connection *ptlrpc_uuid_to_connection(struct obd_uuid *uuid)
 {
         struct ptlrpc_connection *c;
@@ -246,10 +241,9 @@ struct ptlrpc_request *ptlrpc_prep_req(struct obd_import *imp, int opcode,
         request->rq_request_portal = imp->imp_client->cli_request_portal;
         request->rq_reply_portal = imp->imp_client->cli_reply_portal;
 
-        request->rq_connection = ptlrpc_connection_addref(imp->imp_connection);
-
         spin_lock_init(&request->rq_lock);
         INIT_LIST_HEAD(&request->rq_list);
+        INIT_LIST_HEAD(&request->rq_replay_list);
         init_waitqueue_head(&request->rq_reply_waitq);
         request->rq_xid = ptlrpc_next_xid();
         atomic_set(&request->rq_refcount, 1);
@@ -372,11 +366,14 @@ static int ptlrpc_import_delay_req(struct obd_import *imp,
         LASSERT (status != NULL);
         *status = 0;
 
-        /* A new import, or one that has been cleaned up.
-         */
         if (imp->imp_state == LUSTRE_IMP_NEW) {
                 DEBUG_REQ(D_ERROR, req, "Uninitialized import.");
                 *status = -EIO;
+                LBUG();
+        }
+        else if (imp->imp_state == LUSTRE_IMP_CLOSED) {
+                DEBUG_REQ(D_ERROR, req, "IMP_CLOSED ");
+                *status = -EIO;
         }
         /*
          * If the import has been invalidated (such as by an OST failure), the
@@ -442,7 +439,8 @@ static int ptlrpc_check_status(struct ptlrpc_request *req)
 
         err = req->rq_repmsg->status;
         if (req->rq_repmsg->type == PTL_RPC_MSG_ERR) {
-                DEBUG_REQ(D_ERROR, req, "type == PTL_RPC_MSG_ERR");
+                DEBUG_REQ(D_ERROR, req, "type == PTL_RPC_MSG_ERR, err == %d", 
+                          err);
                 RETURN(err < 0 ? err : -EINVAL);
         }
 
@@ -456,7 +454,7 @@ static int ptlrpc_check_status(struct ptlrpc_request *req)
         RETURN(err);
 }
 
-static int after_reply(struct ptlrpc_request *req, int *restartp)
+static int after_reply(struct ptlrpc_request *req)
 {
         unsigned long flags;
         struct obd_import *imp = req->rq_import;
@@ -466,9 +464,6 @@ static int after_reply(struct ptlrpc_request *req, int *restartp)
         LASSERT(!req->rq_receiving_reply);
         LASSERT(req->rq_replied);
 
-        if (restartp != NULL)
-                *restartp = 0;
-
         /* NB Until this point, the whole of the incoming message,
          * including buflens, status etc is in the sender's byte order. */
 
@@ -505,26 +500,7 @@ static int after_reply(struct ptlrpc_request *req, int *restartp)
 
                 ptlrpc_request_handle_notconn(req);
 
-                if (req->rq_err)
-                        RETURN(-EIO);
-
-                if (req->rq_no_resend)
-                        RETURN(rc); /* -ENOTCONN */
-
-                if (req->rq_resend) {
-                        if (restartp == NULL)
-                                LBUG(); /* async resend not supported yet */
-                        spin_lock_irqsave (&req->rq_lock, flags);
-                        req->rq_resend = 0;
-                        spin_unlock_irqrestore (&req->rq_lock, flags);
-                        *restartp = 1;
-                        lustre_msg_add_flags(req->rq_reqmsg, MSG_RESENT);
-                        DEBUG_REQ(D_HA, req, "resending: ");
-                        RETURN(0);
-                }
-
-                CERROR("request should be err or resend: %p\n", req);
-                LBUG();
+                RETURN(rc);
         }
 
         if (req->rq_import->imp_replayable) {
@@ -555,7 +531,6 @@ static int ptlrpc_send_new_req(struct ptlrpc_request *req)
         int rc;
         ENTRY;
 
-        LASSERT(req->rq_send_state == LUSTRE_IMP_FULL);
         LASSERT(req->rq_phase == RQ_PHASE_NEW);
         req->rq_phase = RQ_PHASE_RPC;
 
@@ -681,7 +656,6 @@ int ptlrpc_check_set(struct ptlrpc_request_set *set)
                 }
 
                 if (req->rq_phase == RQ_PHASE_RPC) {
-                        int do_restart = 0;
                         if (req->rq_waiting || req->rq_resend) {
                                 int status;
                                 spin_lock_irqsave(&imp->imp_lock, flags);
@@ -709,11 +683,6 @@ int ptlrpc_check_set(struct ptlrpc_request_set *set)
                                 if (req->rq_resend) {
                                         lustre_msg_add_flags(req->rq_reqmsg,
                                                              MSG_RESENT);
-                                        spin_lock_irqsave(&req->rq_lock, flags);
-                                        req->rq_resend = 0;
-                                        spin_unlock_irqrestore(&req->rq_lock,
-                                                               flags);
-
                                         ptlrpc_unregister_reply(req);
                                         if (req->rq_bulk) {
                                                 __u64 old_xid = req->rq_xid;
@@ -750,11 +719,15 @@ int ptlrpc_check_set(struct ptlrpc_request_set *set)
                         list_del_init(&req->rq_list);
                         spin_unlock_irqrestore(&imp->imp_lock, flags);
 
-                        req->rq_status = after_reply(req, &do_restart);
-                        if (do_restart) {
+                        req->rq_status = after_reply(req);
+                        if (req->rq_resend) {
+                                /* Add this req to the delayed list so
+                                   it can be errored if the import is
+                                   evicted after recovery. */
                                 spin_lock_irqsave (&req->rq_lock, flags);
-                                req->rq_resend = 1; /* ugh */
-                                spin_unlock_irqrestore (&req->rq_lock, flags);
+                                list_add_tail(&req->rq_list, 
+                                              &imp->imp_delayed_list);
+                                spin_unlock_irqrestore(&req->rq_lock, flags);
                                 continue;
                         }
 
@@ -785,6 +758,8 @@ int ptlrpc_check_set(struct ptlrpc_request_set *set)
                 if (req->rq_bulk != NULL)
                         ptlrpc_unregister_bulk (req);
 
+                req->rq_phase = RQ_PHASE_COMPLETE;
+
                 if (req->rq_interpret_reply != NULL) {
                         int (*interpreter)(struct ptlrpc_request *,void *,int) =
                                 req->rq_interpret_reply;
@@ -800,7 +775,6 @@ int ptlrpc_check_set(struct ptlrpc_request_set *set)
                        imp->imp_connection->c_peer.peer_nid,
                        req->rq_reqmsg->opc);
 
-                req->rq_phase = RQ_PHASE_COMPLETE;
                 set->set_remaining--;
         }
 
@@ -832,9 +806,15 @@ int ptlrpc_expire_one_request(struct ptlrpc_request *req)
                 RETURN(1);
 
         /* If this request is for recovery or other primordial tasks,
-         * don't go back to sleep, and don't start recovery again.. */
-        if (req->rq_send_state != LUSTRE_IMP_FULL || imp->imp_obd->obd_no_recov)
+         * then error it out here. */
+        if (req->rq_send_state != LUSTRE_IMP_FULL || 
+            imp->imp_obd->obd_no_recov) {
+                spin_lock_irqsave (&req->rq_lock, flags);
+                req->rq_status = -ETIMEDOUT;
+                req->rq_err = 1;
+                spin_unlock_irqrestore (&req->rq_lock, flags);
                 RETURN(1);
+        }
 
         ptlrpc_fail_import(imp, req->rq_import_generation);
 
@@ -856,7 +836,8 @@ int ptlrpc_expired_set(void *data)
                         list_entry(tmp, struct ptlrpc_request, rq_set_chain);
 
                 /* request in-flight? */
-                if (!((req->rq_phase == RQ_PHASE_RPC && !req->rq_waiting) ||
+                if (!((req->rq_phase == RQ_PHASE_RPC && !req->rq_waiting 
+                       && !req->rq_resend) ||
                       (req->rq_phase == RQ_PHASE_BULK)))
                         continue;
 
@@ -1007,7 +988,7 @@ static void __ptlrpc_free_req(struct ptlrpc_request *request, int locked)
                 unsigned long flags = 0;
                 if (!locked)
                         spin_lock_irqsave(&request->rq_import->imp_lock, flags);
-                list_del_init(&request->rq_list);
+                list_del_init(&request->rq_replay_list);
                 if (!locked)
                         spin_unlock_irqrestore(&request->rq_import->imp_lock,
                                                flags);
@@ -1038,7 +1019,6 @@ static void __ptlrpc_free_req(struct ptlrpc_request *request, int locked)
         if (request->rq_bulk != NULL)
                 ptlrpc_free_bulk(request->rq_bulk);
 
-        ptlrpc_put_connection(request->rq_connection);
         OBD_FREE(request, sizeof(*request));
         EXIT;
 }
@@ -1086,13 +1066,6 @@ void ptlrpc_req_finished(struct ptlrpc_request *request)
         __ptlrpc_req_finished(request, 0);
 }
 
-static void ptlrpc_cleanup_request_buf(struct ptlrpc_request *request)
-{
-        OBD_FREE(request->rq_reqmsg, request->rq_reqlen);
-        request->rq_reqmsg = NULL;
-        request->rq_reqlen = 0;
-}
-
 /* Disengage the client's reply buffer from the network
  * NB does _NOT_ unregister any client-side bulk.
  * IDEMPOTENT, but _not_ safe against concurrent callers.
@@ -1181,7 +1154,7 @@ void ptlrpc_free_committed(struct obd_import *imp)
                imp->imp_obd->obd_name, imp->imp_peer_committed_transno);
 
         list_for_each_safe(tmp, saved, &imp->imp_replay_list) {
-                req = list_entry(tmp, struct ptlrpc_request, rq_list);
+                req = list_entry(tmp, struct ptlrpc_request, rq_replay_list);
 
                 /* XXX ok to remove when 1357 resolved - rread 05/29/03  */
                 LASSERT(req != last_req);
@@ -1208,7 +1181,7 @@ void ptlrpc_free_committed(struct obd_import *imp)
 free_req:
                 if (req->rq_commit_cb != NULL)
                         req->rq_commit_cb(req);
-                list_del_init(&req->rq_list);
+                list_del_init(&req->rq_replay_list);
                 __ptlrpc_req_finished(req, 1);
         }
 
@@ -1227,11 +1200,8 @@ void ptlrpc_resend_req(struct ptlrpc_request *req)
 {
         unsigned long flags;
 
-        DEBUG_REQ(D_HA, req, "resending");
+        DEBUG_REQ(D_HA, req, "going to resend");
         req->rq_reqmsg->handle.cookie = 0;
-        ptlrpc_put_connection(req->rq_connection);
-        req->rq_connection =
-                ptlrpc_connection_addref(req->rq_import->imp_connection);
         req->rq_status = -EAGAIN;
 
         spin_lock_irqsave (&req->rq_lock, flags);
@@ -1297,12 +1267,16 @@ void ptlrpc_retain_replayable_request(struct ptlrpc_request *req,
         LASSERT(spin_is_locked(&imp->imp_lock));
 #endif
 
+        /* don't re-add requests that have been replayed */
+        if (!list_empty(&req->rq_replay_list))
+                return;
+
         LASSERT(imp->imp_replayable);
         /* Balanced in ptlrpc_free_committed, usually. */
         ptlrpc_request_addref(req);
         list_for_each_prev(tmp, &imp->imp_replay_list) {
                 struct ptlrpc_request *iter =
-                        list_entry(tmp, struct ptlrpc_request, rq_list);
+                        list_entry(tmp, struct ptlrpc_request, rq_replay_list);
 
                 /* We may have duplicate transnos if we create and then
                  * open a file, or for closes retained if to match creating
@@ -1319,11 +1293,11 @@ void ptlrpc_retain_replayable_request(struct ptlrpc_request *req,
                                 continue;
                 }
 
-                list_add(&req->rq_list, &iter->rq_list);
+                list_add(&req->rq_replay_list, &iter->rq_replay_list);
                 return;
         }
 
-        list_add_tail(&req->rq_list, &imp->imp_replay_list);
+        list_add_tail(&req->rq_replay_list, &imp->imp_replay_list);
 }
 
 int ptlrpc_queue_wait(struct ptlrpc_request *req)
@@ -1333,7 +1307,6 @@ int ptlrpc_queue_wait(struct ptlrpc_request *req)
         struct l_wait_info lwi;
         struct obd_import *imp = req->rq_import;
         unsigned long flags;
-        int do_restart = 0;
         int timeout = 0;
         ENTRY;
 
@@ -1363,15 +1336,19 @@ restart:
                 list_add_tail(&req->rq_list, &imp->imp_delayed_list);
                 spin_unlock_irqrestore(&imp->imp_lock, flags);
 
-                DEBUG_REQ(D_HA, req, "\"%s\" waiting for recovery: (%d > %d)",
-                          current->comm, req->rq_send_state, imp->imp_state);
+                DEBUG_REQ(D_HA, req, "\"%s\" waiting for recovery: (%s != %s)",
+                          current->comm, 
+                          ptlrpc_import_state_name(req->rq_send_state), 
+                          ptlrpc_import_state_name(imp->imp_state));
                 lwi = LWI_INTR(interrupted_request, req);
                 rc = l_wait_event(req->rq_reply_waitq,
                                   (req->rq_send_state == imp->imp_state ||
                                    req->rq_err),
                                   &lwi);
-                DEBUG_REQ(D_HA, req, "\"%s\" awake: (%d > %d or %d == 1)",
-                          current->comm, imp->imp_state, req->rq_send_state,
+                DEBUG_REQ(D_HA, req, "\"%s\" awake: (%s == %s or %d == 1)",
+                          current->comm, 
+                          ptlrpc_import_state_name(imp->imp_state), 
+                          ptlrpc_import_state_name(req->rq_send_state),
                           req->rq_err);
 
                 spin_lock_irqsave(&imp->imp_lock, flags);
@@ -1395,6 +1372,15 @@ restart:
                 GOTO(out, rc);
         }
 
+        if (req->rq_resend) {
+                lustre_msg_add_flags(req->rq_reqmsg, MSG_RESENT);
+
+                if (req->rq_bulk != NULL)
+                        ptlrpc_unregister_bulk (req);
+
+                DEBUG_REQ(D_HA, req, "resending: ");
+        }
+
         /* XXX this is the same as ptlrpc_set_wait */
         LASSERT(list_empty(&req->rq_list));
         list_add_tail(&req->rq_list, &imp->imp_sending_list);
@@ -1438,15 +1424,6 @@ restart:
                 /* ...unless we were specifically told otherwise. */
                 if (req->rq_no_resend)
                         GOTO(out, rc = -ETIMEDOUT);
-                spin_lock_irqsave (&req->rq_lock, flags);
-                req->rq_resend = 0;
-                spin_unlock_irqrestore (&req->rq_lock, flags);
-                lustre_msg_add_flags(req->rq_reqmsg, MSG_RESENT);
-
-                if (req->rq_bulk != NULL)
-                        ptlrpc_unregister_bulk (req);
-
-                DEBUG_REQ(D_HA, req, "resending: ");
                 spin_lock_irqsave(&imp->imp_lock, flags);
                 goto restart;
         }
@@ -1470,12 +1447,9 @@ restart:
                 GOTO(out, rc = req->rq_status);
         }
 
-        rc = after_reply (req, &do_restart);
+        rc = after_reply (req);
         /* NB may return +ve success rc */
-        if (do_restart) {
-                if (req->rq_bulk != NULL)
-                        ptlrpc_unregister_bulk (req);
-                DEBUG_REQ(D_HA, req, "resending: ");
+        if (req->rq_resend) {
                 spin_lock_irqsave(&imp->imp_lock, flags);
                 goto restart;
         }
@@ -1502,68 +1476,19 @@ restart:
         RETURN(rc);
 }
 
-int ptlrpc_replay_req(struct ptlrpc_request *req)
-{
-        int rc = 0, old_state, old_status = 0;
-        // struct ptlrpc_client *cli = req->rq_import->imp_client;
-        struct l_wait_info lwi;
-        ENTRY;
-
-        LASSERT(req->rq_import->imp_state == LUSTRE_IMP_REPLAY);
-
-        /* I don't touch rq_phase here, so the debug log can show what
-         * state it was left in */
-
-        /* Not handling automatic bulk replay yet (or ever?) */
-        LASSERT(req->rq_bulk == NULL);
-
-        DEBUG_REQ(D_NET, req, "about to replay");
-
-        /* Update request's state, since we might have a new connection. */
-        ptlrpc_put_connection(req->rq_connection);
-        req->rq_connection =
-                ptlrpc_connection_addref(req->rq_import->imp_connection);
-
-        /* temporarily set request to REPLAY level---not strictly
-         * necessary since ptl_send_rpc doesn't check state, but let's
-         * be consistent.*/
-        old_state = req->rq_send_state;
-
-        /*
-         * Q: "How can a req get on the replay list if it wasn't replied?"
-         * A: "If we failed during the replay of this request, it will still
-         *     be on the list, but rq_replied will have been reset to 0."
-         */
-        if (req->rq_replied)
-                old_status = req->rq_repmsg->status;
-        req->rq_send_state = LUSTRE_IMP_REPLAY;
-        rc = ptl_send_rpc(req);
-        if (rc) {
-                CERROR("error %d, opcode %d\n", rc, req->rq_reqmsg->opc);
-                ptlrpc_cleanup_request_buf(req);
-                // up(&cli->cli_rpc_sem);
-                GOTO(out, rc = -rc);
-        }
-
-        CDEBUG(D_OTHER, "-- sleeping\n");
-        lwi = LWI_INTR(NULL, NULL); /* XXX needs timeout, nested recovery */
-        l_wait_event(req->rq_reply_waitq, ptlrpc_check_reply(req), &lwi);
-        CDEBUG(D_OTHER, "-- done\n");
-
-        // up(&cli->cli_rpc_sem);
+struct ptlrpc_replay_async_args {
+        int praa_old_state;
+        int praa_old_status;
+};
 
-        /* If the reply was received normally, this just grabs the spinlock
-         * (ensuring the reply callback has returned), sees that
-         * req->rq_receiving_reply is clear and returns. */
-        ptlrpc_unregister_reply (req);
+static int ptlrpc_replay_interpret(struct ptlrpc_request *req,
+                                    void * data, int rc)
+{
+        struct ptlrpc_replay_async_args *aa = data;
+        struct obd_import *imp = req->rq_import;
+        unsigned long flags;
 
-        if (!req->rq_replied) {
-                CERROR("Unknown reason for wakeup\n");
-                /* XXX Phil - I end up here when I kill obdctl */
-                /* ...that's because signals aren't all masked in
-                 * l_wait_event() -eeb */
-                GOTO(out, rc = -EINTR);
-        }
+        atomic_dec(&imp->imp_replay_inflight);
 
 #if SWAB_PARANOIA
         /* Clear reply swab mask; this is a new reply in sender's byte order */
@@ -1574,15 +1499,6 @@ int ptlrpc_replay_req(struct ptlrpc_request *req)
                 CERROR("unpack_rep failed: %d\n", rc);
                 GOTO(out, rc = -EPROTO);
         }
-#if 0
-        /* FIXME: Enable when BlueArc makes new release */
-        if (req->rq_repmsg->type != PTL_RPC_MSG_REPLY &&
-            req->rq_repmsg->type != PTL_RPC_MSG_ERR) {
-                CERROR("invalid packet type received (type=%u)\n",
-                       req->rq_repmsg->type);
-                GOTO(out, rc = -EPROTO);
-        }
-#endif
 
         if (req->rq_repmsg->type == PTL_RPC_MSG_ERR && 
             req->rq_repmsg->status == -ENOTCONN) 
@@ -1591,25 +1507,76 @@ int ptlrpc_replay_req(struct ptlrpc_request *req)
         /* The transno had better not change over replay. */
         LASSERT(req->rq_reqmsg->transno == req->rq_repmsg->transno);
 
-        CDEBUG(D_NET, "got rep "LPD64"\n", req->rq_xid);
+        DEBUG_REQ(D_HA, req, "got rep");
 
         /* let the callback do fixups, possibly including in the request */
         if (req->rq_replay_cb)
                 req->rq_replay_cb(req);
 
-        if (req->rq_replied && req->rq_repmsg->status != old_status) {
+        if (req->rq_replied && req->rq_repmsg->status != aa->praa_old_status) {
                 DEBUG_REQ(D_ERROR, req, "status %d, old was %d",
-                          req->rq_repmsg->status, old_status);
+                          req->rq_repmsg->status, aa->praa_old_status);
         } else {
                 /* Put it back for re-replay. */
-                req->rq_status = old_status;
+                req->rq_repmsg->status = aa->praa_old_status;
         }
 
+        spin_lock_irqsave(&imp->imp_lock, flags);
+        imp->imp_last_replay_transno = req->rq_transno;
+        spin_unlock_irqrestore(&imp->imp_lock, flags);
+
+        /* continue with recovery */
+        rc = ptlrpc_import_recovery_state_machine(imp);
  out:
-        req->rq_send_state = old_state;
+        req->rq_send_state = aa->praa_old_state;
+        
+        if (rc != 0)
+                /* this replay failed, so restart recovery */
+                ptlrpc_connect_import(imp, NULL);
+
         RETURN(rc);
 }
 
+
+int ptlrpc_replay_req(struct ptlrpc_request *req)
+{
+        struct ptlrpc_replay_async_args *aa;
+        ENTRY;
+
+        LASSERT(req->rq_import->imp_state == LUSTRE_IMP_REPLAY);
+
+        /* Not handling automatic bulk replay yet (or ever?) */
+        LASSERT(req->rq_bulk == NULL);
+
+        DEBUG_REQ(D_HA, req, "REPLAY");
+
+        LASSERT (sizeof (*aa) <= sizeof (req->rq_async_args));
+        aa = (struct ptlrpc_replay_async_args *)&req->rq_async_args;
+        memset(aa, 0, sizeof *aa);
+
+        /* Prepare request to be resent with ptlrpcd */
+        aa->praa_old_state = req->rq_send_state;
+        req->rq_send_state = LUSTRE_IMP_REPLAY;
+        req->rq_phase = RQ_PHASE_NEW;
+        /*
+         * Q: "How can a req get on the replay list if it wasn't replied?"
+         * A: "If we failed during the replay of this request, it will still
+         *     be on the list, but rq_replied will have been reset to 0."
+         */
+        if (req->rq_replied) {
+                aa->praa_old_status = req->rq_repmsg->status;
+                req->rq_status = 0;
+                req->rq_replied = 0;
+        }
+
+        req->rq_interpret_reply = ptlrpc_replay_interpret;
+        atomic_inc(&req->rq_import->imp_replay_inflight);
+        ptlrpc_request_addref(req); /* ptlrpcd needs a ref */
+
+        ptlrpcd_add_req(req);
+        RETURN(0);
+}
+
 void ptlrpc_abort_inflight(struct obd_import *imp)
 {
         unsigned long flags;
index 80742c8..63fd22b 100644 (file)
 
 #include "ptlrpc_internal.h"
 
-/* should this take an imp_sem to ensure connect is single threaded? */
-int ptlrpc_connect_import(struct obd_import *imp)
+struct ptlrpc_connect_async_args {
+         __u64 pcaa_peer_committed;
+        int pcaa_initial_connect;
+        int pcaa_was_invalid;
+};
+
+/* A CLOSED import should remain so. */
+#define IMPORT_SET_STATE_NOLOCK(imp, state)                                    \
+do {                                                                           \
+        if (imp->imp_state != LUSTRE_IMP_CLOSED) {                             \
+               CDEBUG(D_HA, "%p %s: changing import state from %s to %s\n",    \
+                      imp, imp->imp_target_uuid.uuid,                          \
+                      ptlrpc_import_state_name(imp->imp_state),                \
+                      ptlrpc_import_state_name(state));                        \
+               imp->imp_state = state;                                         \
+        }                                                                      \
+} while(0)
+
+#define IMPORT_SET_STATE(imp, state)                    \
+do {                                                    \
+        unsigned long flags;                            \
+                                                        \
+        spin_lock_irqsave(&imp->imp_lock, flags);       \
+        IMPORT_SET_STATE_NOLOCK(imp, state);            \
+        spin_unlock_irqrestore(&imp->imp_lock, flags);  \
+} while(0)
+
+
+static int ptlrpc_connect_interpret(struct ptlrpc_request *request,
+                                    void * data, int rc);
+int ptlrpc_import_recovery_state_machine(struct obd_import *imp);
+
+/* Only this function is allowed to change the import state when it is
+ * CLOSED. I would rather refcount the import and free it after
+ * disconnection like we do with exports. To do that, the client_obd
+ * will need to save the peer info somewhere other than in the import,
+ * though. */
+int ptlrpc_init_import(struct obd_import *imp)
+{
+        unsigned long flags;
+        
+        spin_lock_irqsave(&imp->imp_lock, flags);
+
+        imp->imp_generation++;
+        imp->imp_state =  LUSTRE_IMP_NEW;
+
+        spin_unlock_irqrestore(&imp->imp_lock, flags);
+
+        return 0;
+}
+
+/* Returns true if import was FULL, false if import was already not
+ * connected.
+ */
+int ptlrpc_set_import_discon(struct obd_import *imp)
+{
+        unsigned long flags;
+        int rc = 0;
+        
+        spin_lock_irqsave(&imp->imp_lock, flags);
+
+        if (imp->imp_state == LUSTRE_IMP_FULL) {
+                IMPORT_SET_STATE_NOLOCK(imp, LUSTRE_IMP_DISCON);
+                rc = 1;
+        } else {
+                CDEBUG(D_HA, "%p %s: import already not connected: %s\n",
+                       imp,imp->imp_client->cli_name, 
+                       ptlrpc_import_state_name(imp->imp_state));
+        }
+        spin_unlock_irqrestore(&imp->imp_lock, flags);
+
+        return rc;
+}
+
+void ptlrpc_fail_import(struct obd_import *imp, int generation)
+{
+        ENTRY;
+
+        LASSERT (!imp->imp_dlm_fake);
+
+        if (ptlrpc_set_import_discon(imp))
+                ptlrpc_handle_failed_import(imp);
+
+        EXIT;
+}
+
+int ptlrpc_connect_import(struct obd_import *imp, char * new_uuid)
 {
         struct obd_device *obd = imp->imp_obd;
-        int msg_flags;
         int initial_connect = 0;
         int rc;
         __u64 committed_before_reconnect = 0;
+        int was_invalid = 0;
         struct ptlrpc_request *request;
-        struct lustre_handle old_hdl;
         int size[] = {sizeof(imp->imp_target_uuid),
                                  sizeof(obd->obd_uuid),
                                  sizeof(imp->imp_dlm_handle)};
         char *tmp[] = {imp->imp_target_uuid.uuid,
                        obd->obd_uuid.uuid,
                        (char *)&imp->imp_dlm_handle};
+        struct ptlrpc_connect_async_args *aa;
         unsigned long flags;
 
         spin_lock_irqsave(&imp->imp_lock, flags);
-        if (imp->imp_state == LUSTRE_IMP_CONNECTING) {
+        if (imp->imp_state == LUSTRE_IMP_CLOSED) {
                 spin_unlock_irqrestore(&imp->imp_lock, flags);
+                CERROR("can't connect to a closed import\n");
+                RETURN(-EINVAL);
+        } else if (imp->imp_state == LUSTRE_IMP_FULL) {
+                spin_unlock_irqrestore(&imp->imp_lock, flags);
+                CERROR("already connected\n");
+                RETURN(0);
+        } else if (imp->imp_state == LUSTRE_IMP_CONNECTING) {
+                spin_unlock_irqrestore(&imp->imp_lock, flags);
+                CERROR("already connecting\n");
                 RETURN(-EALREADY);
-        } else {
-                LASSERT(imp->imp_state == LUSTRE_IMP_DISCON);
         }
-        CDEBUG(D_HA, "%s: new state: CONNECTING\n", 
-               imp->imp_client->cli_name);
-        imp->imp_state = LUSTRE_IMP_CONNECTING;
+
+        IMPORT_SET_STATE_NOLOCK(imp, LUSTRE_IMP_CONNECTING);
+
         imp->imp_conn_cnt++; 
+        imp->imp_last_replay_transno = 0;
+
         if (imp->imp_remote_handle.cookie == 0) {
                 initial_connect = 1;
         } else {
-                committed_before_reconnect = imp->imp_peer_committed_transno;
+                committed_before_reconnect = imp->imp_peer_committed_transno;;
+
+        }
+
+        if (imp->imp_invalid) {
+                imp->imp_invalid = 0;
+                was_invalid = 1;
         }
+
         spin_unlock_irqrestore(&imp->imp_lock, flags);
 
+        if (new_uuid) {
+                struct ptlrpc_connection *conn;
+                struct obd_uuid uuid;
+                struct obd_export *dlmexp;
+
+                obd_str2uuid(&uuid, new_uuid);
+
+                conn = ptlrpc_uuid_to_connection(&uuid);
+                if (!conn)
+                        GOTO(out, rc = -ENOENT);
+
+                CDEBUG(D_HA, "switching import %s/%s from %s to %s\n",
+                       imp->imp_target_uuid.uuid, imp->imp_obd->obd_name,
+                       imp->imp_connection->c_remote_uuid.uuid,
+                       conn->c_remote_uuid.uuid);
+
+                /* Switch the import's connection and the DLM export's
+                 * connection (which are almost certainly the same, but we
+                 * keep distinct refs just to make things clearer. I think. */
+                if (imp->imp_connection)
+                        ptlrpc_put_connection(imp->imp_connection);
+                /* We hand off the ref from ptlrpc_get_connection. */
+                imp->imp_connection = conn;
+
+                dlmexp = class_conn2export(&imp->imp_dlm_handle);
+                
+                LASSERT(dlmexp != NULL);
+
+                if (dlmexp->exp_connection)
+                        ptlrpc_put_connection(dlmexp->exp_connection);
+                dlmexp->exp_connection = ptlrpc_connection_addref(conn);
+                class_export_put(dlmexp);
+
+        }
+
         request = ptlrpc_prep_req(imp, imp->imp_connect_op, 3, size, tmp);
         if (!request)
                 GOTO(out, rc = -ENOMEM);
 
         request->rq_send_state = LUSTRE_IMP_CONNECTING;
         request->rq_replen = lustre_msg_size(0, NULL);
+        request->rq_interpret_reply = ptlrpc_connect_interpret;
+
+        LASSERT (sizeof (*aa) <= sizeof (request->rq_async_args));
+        aa = (struct ptlrpc_connect_async_args *)&request->rq_async_args;
+        memset(aa, 0, sizeof *aa);
 
-        // lustre_msg_add_op_flags(request->rq_reqmsg, MSG_CONNECT_PEER);
+        aa->pcaa_peer_committed = committed_before_reconnect;
+        aa->pcaa_initial_connect = initial_connect;
+        aa->pcaa_was_invalid = was_invalid;
 
-        rc = ptlrpc_queue_wait(request);
-        if (rc) {
-                GOTO(free_req, rc);
+        if (aa->pcaa_initial_connect)
+                imp->imp_replayable = 1;
+        ptlrpcd_add_req(request);
+        rc = 0;
+out:
+        if (rc != 0) {
+                IMPORT_SET_STATE(imp, LUSTRE_IMP_DISCON);
+        }
+
+        RETURN(rc);
+}
+
+static int ptlrpc_connect_interpret(struct ptlrpc_request *request,
+                                    void * data, int rc)
+{
+        struct ptlrpc_connect_async_args *aa = data;
+        struct obd_import *imp = request->rq_import;
+        struct lustre_handle old_hdl;
+        unsigned long flags;
+        int msg_flags;
+        ENTRY;
+        
+        spin_lock_irqsave(&imp->imp_lock, flags);
+        if (imp->imp_state == LUSTRE_IMP_CLOSED) {
+                spin_unlock_irqrestore(&imp->imp_lock, flags);
+                RETURN(0);
         }
+        spin_unlock_irqrestore(&imp->imp_lock, flags);
+
+        if (rc)
+                GOTO(out, rc);
 
         msg_flags = lustre_msg_get_op_flags(request->rq_repmsg);
 
-        if (initial_connect) {
-                CDEBUG(D_HA, "%s: new state: FULL\n", 
-                       imp->imp_client->cli_name);
-                if (msg_flags & MSG_CONNECT_REPLAYABLE)
+        if (aa->pcaa_initial_connect) {
+                if (msg_flags & MSG_CONNECT_REPLAYABLE) {
+                        CDEBUG(D_HA, "connected to replayable target: %s\n",
+                               imp->imp_target_uuid.uuid);
                         imp->imp_replayable = 1;
+                        ptlrpc_pinger_add_import(imp);
+                } else {
+                        imp->imp_replayable = 0;
+                }
                 imp->imp_remote_handle = request->rq_repmsg->handle;
-                imp->imp_state = LUSTRE_IMP_FULL;
-                GOTO(free_req, rc = 0);
+                IMPORT_SET_STATE(imp, LUSTRE_IMP_FULL);
+                GOTO(finish, rc = 0);
         }
 
         /* Determine what recovery state to move the import to. */
@@ -110,7 +284,7 @@ int ptlrpc_connect_import(struct obd_import *imp)
                                ", failed\n", imp->imp_target_uuid.uuid,
                                imp->imp_connection->c_remote_uuid.uuid,
                                imp->imp_dlm_handle.cookie);
-                        GOTO(free_req, rc = -ENOTCONN);
+                        GOTO(out, rc = -ENOTCONN);
                 }
 
                 if (memcmp(&imp->imp_remote_handle, &request->rq_repmsg->handle,
@@ -127,23 +301,17 @@ int ptlrpc_connect_import(struct obd_import *imp)
                                imp->imp_target_uuid.uuid, 
                                imp->imp_connection->c_remote_uuid.uuid);
                 }
-                CDEBUG(D_HA, "%s: new state: RECOVER\n", 
-                       imp->imp_client->cli_name);
-                imp->imp_state = LUSTRE_IMP_RECOVER;
+                IMPORT_SET_STATE(imp, LUSTRE_IMP_RECOVER);
         } 
         else if (MSG_CONNECT_RECOVERING & msg_flags) {
-                CDEBUG(D_HA, "%s: new state: REPLAY\n", 
-                       imp->imp_client->cli_name);
                 LASSERT(imp->imp_replayable);
                 imp->imp_state = LUSTRE_IMP_RECOVER;
                 imp->imp_remote_handle = request->rq_repmsg->handle;
-                imp->imp_state = LUSTRE_IMP_REPLAY;
+                IMPORT_SET_STATE(imp, LUSTRE_IMP_REPLAY);
         } 
         else {
-                CDEBUG(D_HA, "%s: new state: EVICTED\n", 
-                       imp->imp_client->cli_name);
                 imp->imp_remote_handle = request->rq_repmsg->handle;
-                imp->imp_state = LUSTRE_IMP_EVICTED;
+                IMPORT_SET_STATE(imp, LUSTRE_IMP_EVICTED);
         }
         
         /* Sanity checks for a reconnected import. */
@@ -153,31 +321,150 @@ int ptlrpc_connect_import(struct obd_import *imp)
                        "after reconnect. We should LBUG right here.\n");
         }
 
-        if (request->rq_repmsg->last_committed < committed_before_reconnect) {
+        if (request->rq_repmsg->last_committed < aa->pcaa_peer_committed) {
                 CERROR("%s went back in time (transno "LPD64
                        " was previously committed, server now claims "LPD64
                        ")! is shared storage not coherent?\n",
                        imp->imp_target_uuid.uuid,
-                       committed_before_reconnect,
+                       aa->pcaa_peer_committed,
                        request->rq_repmsg->last_committed);
         }
 
- free_req:
-        ptlrpc_req_finished(request);
+finish:
+        rc = ptlrpc_import_recovery_state_machine(imp);
+        if (rc != 0) {
+                if (aa->pcaa_was_invalid) {
+                        ptlrpc_set_import_active(imp, 0);
+                }                
 
+                if (rc == -ENOTCONN) {
+                        CDEBUG(D_HA, "evicted/aborted by %s@%s during recovery;"
+                               "invalidating and reconnecting\n",
+                               imp->imp_target_uuid.uuid,
+                               imp->imp_connection->c_remote_uuid.uuid);
+                        ptlrpc_connect_import(imp, NULL);
+                        RETURN(0);
+                } 
+        }
  out:
-        if (rc != 0)
-                imp->imp_state = LUSTRE_IMP_DISCON;
+        if (rc != 0) {
+                IMPORT_SET_STATE(imp, LUSTRE_IMP_DISCON);
+                if (aa->pcaa_initial_connect && !imp->imp_initial_recov)
+                        GOTO(norecov, rc);
+                CDEBUG(D_ERROR, 
+                       "recovery of %s on %s failed (%d); restarting\n",
+                       imp->imp_target_uuid.uuid,
+                       (char *)imp->imp_connection->c_remote_uuid.uuid, rc);
+                ptlrpc_handle_failed_import(imp);
+        }
+
+norecov:
+        wake_up(&imp->imp_recovery_waitq);
         RETURN(rc);
 }
 
+static int completed_replay_interpret(struct ptlrpc_request *req,
+                                    void * data, int rc)
+{
+        atomic_dec(&req->rq_import->imp_replay_inflight);
+        ptlrpc_import_recovery_state_machine(req->rq_import);
+        RETURN(0);
+}
+
+static int signal_completed_replay(struct obd_import *imp)
+ {
+        struct ptlrpc_request *req;
+        ENTRY;
+
+        LASSERT(atomic_read(&imp->imp_replay_inflight) == 0);
+        atomic_inc(&imp->imp_replay_inflight);
+
+        req = ptlrpc_prep_req(imp, OBD_PING, 0, NULL, NULL);
+        if (!req)
+                RETURN(-ENOMEM);
+
+        req->rq_replen = lustre_msg_size(0, NULL);
+        req->rq_send_state = LUSTRE_IMP_REPLAY_WAIT;
+        req->rq_reqmsg->flags |= MSG_LAST_REPLAY;
+        req->rq_timeout *= 3; 
+        req->rq_interpret_reply = completed_replay_interpret;
+
+        ptlrpcd_add_req(req);
+        RETURN(0);
+}
+
+
+int ptlrpc_import_recovery_state_machine(struct obd_import *imp)
+{
+        int rc = 0;
+
+        if (imp->imp_state == LUSTRE_IMP_EVICTED) {
+                CDEBUG(D_HA, "evicted from %s@%s; invalidating\n",
+                       imp->imp_target_uuid.uuid,
+                       imp->imp_connection->c_remote_uuid.uuid);
+                ptlrpc_set_import_active(imp, 0);
+                IMPORT_SET_STATE(imp, LUSTRE_IMP_RECOVER);
+        } 
+        
+        if (imp->imp_state == LUSTRE_IMP_REPLAY) {
+                CDEBUG(D_HA, "replay requested by %s\n",
+                       imp->imp_target_uuid.uuid);
+                rc = ptlrpc_replay_next(imp);
+                if (rc == 0 && atomic_read(&imp->imp_replay_inflight) == 0) {
+                        IMPORT_SET_STATE(imp, LUSTRE_IMP_REPLAY_LOCKS);
+                        rc = ldlm_replay_locks(imp);
+                        if (rc)
+                                GOTO(out, rc);
+                }
+                rc = 0;
+        }
+
+        if (imp->imp_state == LUSTRE_IMP_REPLAY_LOCKS) {
+                if (atomic_read(&imp->imp_replay_inflight) == 0) {
+                        IMPORT_SET_STATE(imp, LUSTRE_IMP_REPLAY_WAIT);
+                        rc = signal_completed_replay(imp);
+                        if (rc)
+                                GOTO(out, rc);
+                }
 
+        }
+
+        if (imp->imp_state == LUSTRE_IMP_REPLAY_WAIT) {
+                if (atomic_read(&imp->imp_replay_inflight) == 0) {
+                        IMPORT_SET_STATE(imp, LUSTRE_IMP_RECOVER);
+                }
+        }
+
+        if (imp->imp_state == LUSTRE_IMP_RECOVER) {
+                CDEBUG(D_HA, "reconnected to %s@%s\n",
+                       imp->imp_target_uuid.uuid,
+                       imp->imp_connection->c_remote_uuid.uuid);
+
+                ptlrpc_set_import_active(imp, 1);
+                ptlrpc_resend(imp);
+                IMPORT_SET_STATE(imp, LUSTRE_IMP_FULL);
+        } 
+
+        if (imp->imp_state == LUSTRE_IMP_FULL) {
+                wake_up(&imp->imp_recovery_waitq);
+                ptlrpc_wake_delayed(imp);
+        }
+
+ out:
+        RETURN(rc);
+}
+
+static int back_to_sleep(void *unused) 
+{
+       return 0;
+}
 
 int ptlrpc_disconnect_import(struct obd_import *imp)
 {
         struct ptlrpc_request *request;
         int rq_opc;
         int rc = 0;
+        unsigned long flags;
         ENTRY;
 
         switch (imp->imp_connect_op) {
@@ -190,12 +477,28 @@ int ptlrpc_disconnect_import(struct obd_import *imp)
                 RETURN(-EINVAL);
         }
 
+
+        if (ptlrpc_import_in_recovery(imp)) {
+                struct l_wait_info lwi;
+                lwi = LWI_TIMEOUT_INTR(MAX(obd_timeout * HZ, 1), back_to_sleep, 
+                                       NULL, NULL);
+                rc = l_wait_event(imp->imp_recovery_waitq, 
+                                  !ptlrpc_import_in_recovery(imp), &lwi);
+
+        }
+
+        spin_lock_irqsave(&imp->imp_lock, flags);
+        if (imp->imp_state != LUSTRE_IMP_FULL) {
+                GOTO(out, 0);
+        }
+        spin_unlock_irqrestore(&imp->imp_lock, flags);
+
         request = ptlrpc_prep_req(imp, rq_opc, 0, NULL, NULL);
         if (request) {
                 /* For non-replayable connections, don't attempt
                    reconnect if this fails */
-                if (!imp->imp_obd->obd_replayable) {
-                        imp->imp_state = LUSTRE_IMP_DISCON;
+                if (!imp->imp_replayable) {
+                        IMPORT_SET_STATE(imp, LUSTRE_IMP_DISCON);
                         request->rq_send_state =  LUSTRE_IMP_DISCON;
                 }
                 request->rq_replen = lustre_msg_size(0, NULL);
@@ -203,8 +506,12 @@ int ptlrpc_disconnect_import(struct obd_import *imp)
                 ptlrpc_req_finished(request);
         }
 
-        imp->imp_state = LUSTRE_IMP_DISCON;
+        spin_lock_irqsave(&imp->imp_lock, flags);
+out:
+        IMPORT_SET_STATE_NOLOCK(imp, LUSTRE_IMP_CLOSED);
         memset(&imp->imp_remote_handle, 0, sizeof(imp->imp_remote_handle));
+        spin_unlock_irqrestore(&imp->imp_lock, flags);
+
         RETURN(rc);
 }
 
index fd523a4..ab6684a 100644 (file)
@@ -548,6 +548,7 @@ void ptlrpc_unregister_bulk (struct ptlrpc_request *req)
 
 int ptlrpc_reply(struct ptlrpc_request *req)
 {
+        struct ptlrpc_connection *conn;
         unsigned long flags;
         int rc;
 
@@ -565,8 +566,14 @@ int ptlrpc_reply(struct ptlrpc_request *req)
         req->rq_repmsg->status = req->rq_status;
         req->rq_repmsg->opc = req->rq_reqmsg->opc;
 
+        if (req->rq_export == NULL) 
+                conn = ptlrpc_get_connection(&req->rq_peer, NULL);
+        else
+                conn = ptlrpc_connection_addref(req->rq_export->exp_connection);
+
         init_waitqueue_head(&req->rq_reply_waitq);
-        rc = ptl_send_buf(req, req->rq_connection, req->rq_svc->srv_rep_portal);
+        rc = ptl_send_buf(req, conn, 
+                          req->rq_svc->srv_rep_portal);
         if (rc != 0) {
                 /* Do what the callback handler would have done */
                 OBD_FREE (req->rq_repmsg, req->rq_replen);
@@ -575,6 +582,7 @@ int ptlrpc_reply(struct ptlrpc_request *req)
                 req->rq_want_ack = 0;
                 spin_unlock_irqrestore (&req->rq_lock, flags);
         }
+        ptlrpc_put_connection(conn);
         return rc;
 }
 
@@ -600,6 +608,7 @@ int ptl_send_rpc(struct ptlrpc_request *request)
 {
         int rc;
         int rc2;
+        struct ptlrpc_connection *connection;
         unsigned long flags;
         ptl_process_id_t source_id;
         ptl_handle_me_t  reply_me_h;
@@ -611,6 +620,8 @@ int ptl_send_rpc(struct ptlrpc_request *request)
          * cleanly from the previous attempt */
         LASSERT (!request->rq_receiving_reply);
 
+        connection = request->rq_import->imp_connection;
+
         if (request->rq_bulk != NULL) {
                 rc = ptlrpc_register_bulk (request);
                 if (rc != 0)
@@ -620,7 +631,7 @@ int ptl_send_rpc(struct ptlrpc_request *request)
         request->rq_reqmsg->handle = request->rq_import->imp_remote_handle;
         request->rq_reqmsg->conn_cnt = request->rq_import->imp_conn_cnt;
 
-        source_id.nid = request->rq_connection->c_peer.peer_nid;
+        source_id.nid = connection->c_peer.peer_nid;
         source_id.pid = PTL_PID_ANY;
 
         LASSERT (request->rq_replen != 0);
@@ -631,7 +642,7 @@ int ptl_send_rpc(struct ptlrpc_request *request)
                 RETURN(-ENOMEM);
         }
 
-        rc = PtlMEAttach(request->rq_connection->c_peer.peer_ni->pni_ni_h,
+        rc = PtlMEAttach(connection->c_peer.peer_ni->pni_ni_h,
                          request->rq_reply_portal, /* XXX FIXME bug 249 */
                          source_id, request->rq_xid, 0, PTL_UNLINK,
                          PTL_INS_AFTER, &reply_me_h);
@@ -647,8 +658,8 @@ int ptl_send_rpc(struct ptlrpc_request *request)
         request->rq_reply_md.threshold = 1;
         request->rq_reply_md.options = PTL_MD_OP_PUT;
         request->rq_reply_md.user_ptr = request;
-        request->rq_reply_md.eventq =
-                request->rq_connection->c_peer.peer_ni->pni_reply_in_eq_h;
+        request->rq_reply_md.eventq = 
+                connection->c_peer.peer_ni->pni_reply_in_eq_h;
 
         rc = PtlMDAttach(reply_me_h, request->rq_reply_md,
                          PTL_UNLINK, &request->rq_reply_md_h);
@@ -663,7 +674,7 @@ int ptl_send_rpc(struct ptlrpc_request *request)
                ", portal %u on %s\n",
                request->rq_replen, request->rq_xid,
                request->rq_reply_portal,
-               request->rq_connection->c_peer.peer_ni->pni_name);
+               connection->c_peer.peer_ni->pni_name);
 
         ptlrpc_request_addref(request);        /* 1 ref for the SENT callback */
 
@@ -679,8 +690,7 @@ int ptl_send_rpc(struct ptlrpc_request *request)
 
         request->rq_sent = LTIME_S(CURRENT_TIME);
         ptlrpc_pinger_sending_on_import(request->rq_import);
-        rc = ptl_send_buf(request, request->rq_connection,
-                          request->rq_request_portal);
+        rc = ptl_send_buf(request, connection, request->rq_request_portal);
         if (rc == 0) {
                 ptlrpc_lprocfs_rpc_sent(request);
                 RETURN(rc);
index 7ec654c..f8adbd1 100644 (file)
@@ -36,6 +36,12 @@ void ptlrpc_daemonize(void);
 
 void ptlrpc_request_handle_notconn(struct ptlrpc_request *);
 void lustre_assert_wire_constants(void);
+int ptlrpc_import_in_recovery(struct obd_import *imp);
+int ptlrpc_set_import_discon(struct obd_import *imp);
+void ptlrpc_handle_failed_import(struct obd_import *imp);
+int ptlrpc_import_recovery_state_machine(struct obd_import *imp);
+int ptlrpc_replay_next(struct obd_import *imp);
+
 
 #ifdef __KERNEL__
 void ptlrpc_lprocfs_register_service(struct proc_dir_entry *proc_entry,
index 95750b2..bfe525c 100644 (file)
@@ -97,7 +97,6 @@ EXPORT_SYMBOL(ptlrpc_link_svc_me);
 /* client.c */
 EXPORT_SYMBOL(ptlrpc_init_client);
 EXPORT_SYMBOL(ptlrpc_cleanup_client);
-EXPORT_SYMBOL(ptlrpc_req_to_uuid);
 EXPORT_SYMBOL(ptlrpc_uuid_to_connection);
 EXPORT_SYMBOL(ptlrpc_queue_wait);
 EXPORT_SYMBOL(ptlrpc_replay_req);
@@ -174,9 +173,7 @@ EXPORT_SYMBOL(lustre_swab_ptlbd_rsp);
 /* recover.c */
 EXPORT_SYMBOL(ptlrpc_run_recovery_over_upcall);
 EXPORT_SYMBOL(ptlrpc_run_failed_import_upcall);
-EXPORT_SYMBOL(ptlrpc_connect_import);
 EXPORT_SYMBOL(ptlrpc_disconnect_import);
-EXPORT_SYMBOL(ptlrpc_replay);
 EXPORT_SYMBOL(ptlrpc_resend);
 EXPORT_SYMBOL(ptlrpc_wake_delayed);
 EXPORT_SYMBOL(ptlrpc_set_import_active);
index 9341403..a569ab7 100644 (file)
@@ -50,13 +50,13 @@ void ptlrpc_run_recovery_over_upcall(struct obd_device *obd)
         char *argv[4];
         char *envp[3];
         int rc;
-
         ENTRY;
+
         argv[0] = obd_lustre_upcall;
         argv[1] = "RECOVERY_OVER";
         argv[2] = obd->obd_uuid.uuid;
         argv[3] = NULL;
-
+        
         envp[0] = "HOME=/";
         envp[1] = "PATH=/sbin:/bin:/usr/sbin:/usr/bin";
         envp[2] = NULL;
@@ -68,7 +68,7 @@ void ptlrpc_run_recovery_over_upcall(struct obd_device *obd)
                        argv[0], argv[1], argv[2], rc);
 
         } else {
-                CERROR("Invoked upcall %s %s %s",
+                CERROR("Invoked upcall %s %s %s\n",
                        argv[0], argv[1], argv[2]);
         }
 }
@@ -76,11 +76,20 @@ void ptlrpc_run_recovery_over_upcall(struct obd_device *obd)
 void ptlrpc_run_failed_import_upcall(struct obd_import* imp)
 {
 #ifdef __KERNEL__
+        unsigned long flags;
         char *argv[7];
         char *envp[3];
         int rc;
-
         ENTRY;
+
+        spin_lock_irqsave(&imp->imp_lock, flags);
+        if (imp->imp_state == LUSTRE_IMP_CLOSED) {
+                spin_unlock_irqrestore(&imp->imp_lock, flags);
+                EXIT;
+                return;
+        }
+        spin_unlock_irqrestore(&imp->imp_lock, flags);
+        
         argv[0] = obd_lustre_upcall;
         argv[1] = "FAILED_IMPORT";
         argv[2] = imp->imp_target_uuid.uuid;
@@ -108,12 +117,14 @@ void ptlrpc_run_failed_import_upcall(struct obd_import* imp)
 #endif
 }
 
-int ptlrpc_replay(struct obd_import *imp)
+int ptlrpc_replay_next(struct obd_import *imp)
 {
         int rc = 0;
         struct list_head *tmp, *pos;
         struct ptlrpc_request *req;
         unsigned long flags;
+        __u64 last_transno;
+        int sent_req = 0;
         ENTRY;
 
         /* It might have committed some after we last spoke, so make sure we
@@ -121,16 +132,11 @@ int ptlrpc_replay(struct obd_import *imp)
          */
         spin_lock_irqsave(&imp->imp_lock, flags);
         ptlrpc_free_committed(imp);
+        last_transno = imp->imp_last_replay_transno;
         spin_unlock_irqrestore(&imp->imp_lock, flags);
 
         CDEBUG(D_HA, "import %p from %s has committed "LPD64"\n",
                imp, imp->imp_target_uuid.uuid, imp->imp_peer_committed_transno);
-
-        list_for_each(tmp, &imp->imp_replay_list) {
-                req = list_entry(tmp, struct ptlrpc_request, rq_list);
-                DEBUG_REQ(D_HA, req, "RETAINED: ");
-        }
-
         /* Do I need to hold a lock across this iteration?  We shouldn't be
          * racing with any additions to the list, because we're in recovery
          * and are therefore not processing additional requests to add.  Calls
@@ -147,20 +153,27 @@ int ptlrpc_replay(struct obd_import *imp)
          * just a little race...
          */
         list_for_each_safe(tmp, pos, &imp->imp_replay_list) {
-                req = list_entry(tmp, struct ptlrpc_request, rq_list);
-
-                DEBUG_REQ(D_HA, req, "REPLAY:");
-
-                rc = ptlrpc_replay_req(req);
-
-                if (rc) {
-                        CERROR("recovery replay error %d for req "LPD64"\n",
-                               rc, req->rq_xid);
-                        RETURN(rc);
+                req = list_entry(tmp, struct ptlrpc_request, rq_replay_list);
+                if (req->rq_transno > last_transno) {
+                        /* remove from list so ptlrpcd can send the
+                           req, it should be reinserted after it is
+                           sent and replied.  Perhaps better solution
+                           would be to add req->rq_replay_list so the
+                           req can be saved for replay and still go
+                           through the normal send thread. */
+                        rc = ptlrpc_replay_req(req);
+                        if (rc) {
+                                CERROR("recovery replay error %d for req "LPD64"\n",
+                                       rc, req->rq_xid);
+                                RETURN(rc);
+                        }
+                        sent_req = 1;
+                        break;
                 }
+
         }
 
-        RETURN(0);
+        RETURN(sent_req);
 }
 
 int ptlrpc_resend(struct obd_import *imp)
@@ -199,10 +212,6 @@ void ptlrpc_wake_delayed(struct obd_import *imp)
         list_for_each_safe(tmp, pos, &imp->imp_delayed_list) {
                 req = list_entry(tmp, struct ptlrpc_request, rq_list);
 
-                ptlrpc_put_connection(req->rq_connection);
-                req->rq_connection =
-                       ptlrpc_connection_addref(req->rq_import->imp_connection);
-
                 if (req->rq_set) {
                         DEBUG_REQ(D_HA, req, "waking (set %p):", req->rq_set);
                         wake_up(&req->rq_set->set_waitq);
@@ -231,6 +240,7 @@ inline void ptlrpc_invalidate_import_state(struct obd_import *imp)
 void ptlrpc_handle_failed_import(struct obd_import *imp)
 {
         ENTRY;
+
         if (!imp->imp_replayable) {
                 CDEBUG(D_HA,
                        "import %s@%s for %s not replayable, deactivating\n",
@@ -255,20 +265,18 @@ void ptlrpc_request_handle_notconn(struct ptlrpc_request *failed_req)
                imp->imp_obd->obd_name,
                imp->imp_target_uuid.uuid,
                imp->imp_connection->c_remote_uuid.uuid);
+        
+        ptlrpc_set_import_discon(imp);
 
-        rc = ptlrpc_recover_import_no_retry(imp, NULL);
-
-        if (failed_req->rq_import_generation != imp->imp_generation) {
-                spin_lock_irqsave (&failed_req->rq_lock, flags);
-                failed_req->rq_err = 1;
-                spin_unlock_irqrestore (&failed_req->rq_lock, flags);
-        }
-        else {
-                ptlrpc_resend_req(failed_req);
-                if (rc && rc != -EALREADY)
-                        ptlrpc_handle_failed_import(imp);
-                        
-        }
+        rc = ptlrpc_connect_import(imp, NULL);
+        
+        /* Wait for recovery to complete and resend. If evicted, then
+           this request will be errored out later.*/
+        spin_lock_irqsave(&failed_req->rq_lock, flags);
+        if (!failed_req->rq_no_resend)
+                failed_req->rq_resend = 1;
+        spin_unlock_irqrestore(&failed_req->rq_lock, flags);
+        
         EXIT;
 }
 
@@ -311,222 +319,66 @@ int ptlrpc_set_import_active(struct obd_import *imp, int active)
         RETURN(0);
 }
 
-void ptlrpc_fail_import(struct obd_import *imp, int generation)
-{
-        unsigned long flags;
-        int in_recovery = 0;
-        ENTRY;
-
-        LASSERT (!imp->imp_dlm_fake);
-
-        spin_lock_irqsave(&imp->imp_lock, flags);
-        if (imp->imp_state != LUSTRE_IMP_FULL) {
-                in_recovery = 1;
-        } else {
-                CDEBUG(D_HA, "%s: new state: DISCON\n", 
-                       imp->imp_client->cli_name);
-                imp->imp_state = LUSTRE_IMP_DISCON;
-        }
-        spin_unlock_irqrestore(&imp->imp_lock, flags);
-
-        if (in_recovery) {
-                EXIT;
-                return;
-        }
-
-        ptlrpc_handle_failed_import(imp);
-        EXIT;
-}
-
-static int signal_completed_replay(struct obd_import *imp)
-{
-        struct ptlrpc_request *req;
-        int rc;
-        ENTRY;
-
-        req = ptlrpc_prep_req(imp, OBD_PING, 0, NULL, NULL);
-        if (!req)
-                RETURN(-ENOMEM);
-
-        req->rq_replen = lustre_msg_size(0, NULL);
-        req->rq_send_state = LUSTRE_IMP_REPLAY;
-        req->rq_reqmsg->flags |= MSG_LAST_REPLAY;
-        req->rq_timeout *= 3; 
-
-        rc = ptlrpc_queue_wait(req);
-
-        ptlrpc_req_finished(req);
-        RETURN(rc);
-}
-
 int ptlrpc_recover_import(struct obd_import *imp, char *new_uuid)
 {
         int rc;
         ENTRY;
         
+        /* force import to be disconnected. */
+        ptlrpc_set_import_discon(imp);
+        
         rc = ptlrpc_recover_import_no_retry(imp, new_uuid);
 
-        if (rc && rc != -EALREADY) {
-                unsigned long flags;
-                CDEBUG(D_HA, "recovery of %s on %s failed (%d); restarting\n",
-                       imp->imp_target_uuid.uuid,
-                       new_uuid ? new_uuid :
-                       (char *)imp->imp_connection->c_remote_uuid.uuid, rc);
-                spin_lock_irqsave(&imp->imp_lock, flags);
-                imp->imp_state = LUSTRE_IMP_FULL;
-                spin_unlock_irqrestore(&imp->imp_lock, flags);
-                ptlrpc_fail_import(imp, imp->imp_generation);
-        }
         RETURN(rc);
 }
 
+int ptlrpc_import_in_recovery(struct obd_import *imp)
+{
+        unsigned long flags;
+        int in_recovery = 1;
+        spin_lock_irqsave(&imp->imp_lock, flags);
+        if (imp->imp_state == LUSTRE_IMP_FULL ||
+            imp->imp_state == LUSTRE_IMP_CLOSED ||
+            imp->imp_state == LUSTRE_IMP_DISCON)
+                in_recovery = 0;
+        spin_unlock_irqrestore(&imp->imp_lock, flags);
+        return in_recovery;
+}
+
 static int ptlrpc_recover_import_no_retry(struct obd_import *imp,
                                           char *new_uuid)
 {
         int rc;
         unsigned long flags;
         int in_recovery = 0;
-        int was_invalid = 0;
+        struct l_wait_info lwi;
         ENTRY;
 
         spin_lock_irqsave(&imp->imp_lock, flags);
-        if (imp->imp_state == LUSTRE_IMP_FULL) {
-                CDEBUG(D_HA, "%s: new state: DISCON\n", 
-                       imp->imp_client->cli_name);
-                imp->imp_state = LUSTRE_IMP_DISCON;
-        } 
-        
         if (imp->imp_state != LUSTRE_IMP_DISCON) {
                 in_recovery = 1;
-        } else if (imp->imp_invalid) {
-                imp->imp_invalid = 0;
-                was_invalid = 1;
-        }
-
+        } 
         spin_unlock_irqrestore(&imp->imp_lock, flags);
 
         if (in_recovery == 1)
                 RETURN(-EALREADY);
 
-        down(&imp->imp_recovery_sem);
-        /* If recovery happened while we waited, we're done. */
-        if (imp->imp_state == LUSTRE_IMP_FULL)
-                GOTO(out, rc = 0);
-
-        LASSERT (imp->imp_state == LUSTRE_IMP_DISCON);
-
-        if (new_uuid) {
-                struct ptlrpc_connection *conn;
-                struct obd_uuid uuid;
-                struct ptlrpc_peer peer;
-                struct obd_export *dlmexp;
-
-                obd_str2uuid(&uuid, new_uuid);
-                if (ptlrpc_uuid_to_peer(&uuid, &peer)) {
-                        CERROR("no connection found for UUID %s\n", new_uuid);
-                        GOTO(out, rc = -EINVAL);
-                }
-
-                conn = ptlrpc_get_connection(&peer, &uuid);
-                if (!conn)
-                        GOTO(out, rc = -ENOMEM);
-
-                CDEBUG(D_HA, "switching import %s/%s from %s to %s\n",
-                       imp->imp_target_uuid.uuid, imp->imp_obd->obd_name,
-                       imp->imp_connection->c_remote_uuid.uuid,
-                       conn->c_remote_uuid.uuid);
-
-                /* Switch the import's connection and the DLM export's
-                 * connection (which are almost certainly the same, but we
-                 * keep distinct refs just to make things clearer. I think. */
-                if (imp->imp_connection)
-                        ptlrpc_put_connection(imp->imp_connection);
-                /* We hand off the ref from ptlrpc_get_connection. */
-                imp->imp_connection = conn;
-
-                dlmexp = class_conn2export(&imp->imp_dlm_handle);
-                if (dlmexp->exp_connection)
-                        ptlrpc_put_connection(dlmexp->exp_connection);
-                dlmexp->exp_connection = ptlrpc_connection_addref(conn);
-                class_export_put(dlmexp);
-
-        }
-
- connect:
-        rc = ptlrpc_connect_import(imp);
-
-        if (rc < 0) {
-                CERROR("failed to reconnect to %s@%s: %d\n",
-                       imp->imp_target_uuid.uuid,
-                       imp->imp_connection->c_remote_uuid.uuid, rc);
-                GOTO(out, rc);
-        } 
-
-        if (imp->imp_state == LUSTRE_IMP_EVICTED) {
-                CDEBUG(D_HA, "evicted from %s@%s; invalidating\n",
-                       imp->imp_target_uuid.uuid,
-                       imp->imp_connection->c_remote_uuid.uuid);
-                ptlrpc_set_import_active(imp, 0);
-                CDEBUG(D_HA, "%s: new state: RECOVER\n", 
-                       imp->imp_client->cli_name);
-                imp->imp_state = LUSTRE_IMP_RECOVER;
-        } 
         
-        if (imp->imp_state == LUSTRE_IMP_REPLAY) {
-                CDEBUG(D_HA, "replay requested by %s\n",
-                       imp->imp_target_uuid.uuid);
-                rc = ptlrpc_replay(imp);
-                if (rc)
-                        GOTO(out, rc);
-
-                rc = ldlm_replay_locks(imp);
-                if (rc)
-                        GOTO(out, rc);
-
-                rc = signal_completed_replay(imp);
-                if (rc)
-                        GOTO(out, rc);
-                CDEBUG(D_HA, "%s: new state: RECOVER\n", 
-                       imp->imp_client->cli_name);
-                imp->imp_state = LUSTRE_IMP_RECOVER;
-        } 
-
-        if (imp->imp_state == LUSTRE_IMP_RECOVER) {
-                CDEBUG(D_HA, "reconnected to %s@%s\n",
-                       imp->imp_target_uuid.uuid,
-                       imp->imp_connection->c_remote_uuid.uuid);
-
-                ptlrpc_set_import_active(imp, 1);
-                ptlrpc_resend(imp);
-                spin_lock_irqsave(&imp->imp_lock, flags);
-                CDEBUG(D_HA, "%s: new state: FULL\n", 
-                       imp->imp_client->cli_name);
-                imp->imp_state = LUSTRE_IMP_FULL;
-                spin_unlock_irqrestore(&imp->imp_lock, flags);
-                ptlrpc_wake_delayed(imp);
-        } 
+        rc = ptlrpc_connect_import(imp, new_uuid);
+        if (rc)
+                RETURN(rc);
 
+        CDEBUG(D_ERROR, "%s: recovery started, waiting\n", 
+               imp->imp_client->cli_name);
 
-        LASSERT(imp->imp_state == LUSTRE_IMP_FULL);
+        lwi = LWI_TIMEOUT(MAX(obd_timeout * HZ, 1), NULL, NULL);
+        rc = l_wait_event(imp->imp_recovery_waitq, 
+                          !ptlrpc_import_in_recovery(imp), &lwi);
+        CDEBUG(D_ERROR, "%s: recovery finished\n", 
+               imp->imp_client->cli_name);
 
- out:
-        if (rc != 0) {
-                spin_lock_irqsave(&imp->imp_lock, flags);
-                imp->imp_state = LUSTRE_IMP_DISCON;
-                spin_unlock_irqrestore(&imp->imp_lock, flags);
-                
-                if (rc == -ENOTCONN) {
-                        CDEBUG(D_HA, "evicted/aborted by %s@%s during recovery;"
-                               "invalidating and reconnecting\n",
-                               imp->imp_target_uuid.uuid,
-                               imp->imp_connection->c_remote_uuid.uuid);
-                        GOTO(connect, -ENOTCONN);
-                } else if (was_invalid) {
-                        ptlrpc_set_import_active(imp, 0);
-                }
-        }
-        up(&imp->imp_recovery_sem);
         RETURN(rc);
+        
 }
 
 void ptlrpc_fail_export(struct obd_export *exp)
index 9d3ff82..979355c 100644 (file)
@@ -237,8 +237,6 @@ static int handle_incoming_request(struct obd_device *obddev,
         request->rq_export = class_conn2export(&request->rq_reqmsg->handle);
 
         if (request->rq_export) {
-                request->rq_connection = request->rq_export->exp_connection;
-                ptlrpc_connection_addref(request->rq_connection);
                 if (request->rq_reqmsg->conn_cnt < 
                     request->rq_export->exp_conn_cnt) {
                         DEBUG_REQ(D_ERROR, request,
@@ -250,13 +248,7 @@ static int handle_incoming_request(struct obd_device *obddev,
 
                 request->rq_export->exp_last_request_time =
                         LTIME_S(CURRENT_TIME);
-        } else {
-                /* create a (hopefully temporary) connection that will be used
-                 * to send the reply if this call doesn't create an export.
-                 * XXX revisit this when we revamp ptlrpc */
-                request->rq_connection =
-                        ptlrpc_get_connection(&request->rq_peer, NULL);
-        }
+        } 
 
         CDEBUG(D_RPCTRACE, "Handling RPC pname:cluuid+ref:pid:xid:ni:nid:opc "
                "%s:%s+%d:%d:"LPU64":%s:"LPX64":%d\n", current->comm,
@@ -280,7 +272,6 @@ static int handle_incoming_request(struct obd_device *obddev,
                request->rq_reqmsg->opc);
 
 put_conn:
-        ptlrpc_put_connection(request->rq_connection);
         if (request->rq_export != NULL)
                 class_export_put(request->rq_export);
 
diff --git a/lustre/scripts/merge1.sh b/lustre/scripts/merge1.sh
new file mode 100755 (executable)
index 0000000..362ddd4
--- /dev/null
@@ -0,0 +1,61 @@
+#!/bin/sh -e 
+
+CVS=cvs
+
+if [ -f .mergeinfo ] ; then
+    echo ".mergeinfo exists - clean up first"
+    exit 
+fi
+
+if [ -f merge-conflicts ] ; then
+    echo "cvs-merge-conflicts exists - clean up first"
+    exit 
+fi
+
+if [ $# != 2 ]; then
+    echo "This is phase 1 of merging branches. Usage: $0 parent child"
+    exit
+fi
+
+parent=$1
+PARENT=`echo $parent | tr '[a-z]' '[A-Z]'`
+child=$2
+CHILD=`echo $child | tr '[a-z]' '[A-Z]'`
+date=`date +%Y%m%d_%H%M`
+module=lustre
+
+if [ $parent != "HEAD" ]; then
+  parent="b_$parent"
+fi
+if [ $child != "HEAD" ]; then
+  child="b_$child"
+fi
+
+cat << EOF > .mergeinfo
+parent=$parent
+PARENT=$PARENT
+child=$child
+CHILD=$CHILD
+date=$date
+module=$module
+EOF
+
+echo PARENT $PARENT parent $parent CHILD $CHILD child $child date $date
+
+echo -n "tagging $parent as ${PARENT}_${CHILD}_UPDATE_PARENT_$date ...."
+$CVS rtag -r $parent ${PARENT}_${CHILD}_UPDATE_PARENT_$date $module
+echo "done"
+echo -n "tagging $child as ${PARENT}_${CHILD}_UPDATE_CHILD_$date ...."
+$CVS rtag -r $child ${PARENT}_${CHILD}_UPDATE_CHILD_$date $module
+echo "done"
+echo "Updating: -j ${CHILD}_BASE -j ${PARENT}_${CHILD}_UPDATE_PARENT_$date ...."
+$CVS update -j ${CHILD}_BASE -j ${PARENT}_${CHILD}_UPDATE_PARENT_$date -dP
+echo "done"
+echo -n "Recording conflicts in cvs-merge-conflicts ..."
+if $CVS update | grep '^C' > cvs-merge-conflicts; then
+    echo "Conflicts found, fix before committing."
+    cat cvs-merge-conflicts
+else 
+    echo "No conflicts found"
+fi
+echo "Test, commit and then run merge2.sh (no arguments)"
diff --git a/lustre/scripts/merge2.sh b/lustre/scripts/merge2.sh
new file mode 100755 (executable)
index 0000000..e6ba077
--- /dev/null
@@ -0,0 +1,21 @@
+#!/bin/sh -e 
+
+if [ ! -f .mergeinfo ] ; then
+    echo ".mergeinfo doesn't exist - exit"
+    exit 
+fi
+
+. .mergeinfo
+
+echo -n "Tagging ${PARENT}_${CHILD}_UPDATE_PARENT_$date as ${CHILD}_BASE_$date ..."
+cvs rtag -r ${PARENT}_${CHILD}_UPDATE_PARENT_$date ${CHILD}_BASE_$date $module
+echo  "done"
+echo -n "Tagging ${CHILD}_BASE as ${CHILD}_BASE_PREV ...."
+cvs rtag -F -r ${CHILD}_BASE ${CHILD}_BASE_PREV $module
+echo  "done"
+echo "${CHILD}_BASE_$date as ${CHILD}_BASE ..."
+cvs rtag -F -r ${CHILD}_BASE_$date ${CHILD}_BASE $module
+
+echo "saving .mergeinfo as .mergeinfo-$date"
+mv .mergeinfo .mergeinfo-$date
+echo  "done"
diff --git a/lustre/tests/cfg/insanity-adev.sh b/lustre/tests/cfg/insanity-adev.sh
new file mode 100644 (file)
index 0000000..b7cf033
--- /dev/null
@@ -0,0 +1,37 @@
+mds_HOST=${mds_HOST:-adev2}
+mdsfailover_HOST=${mdsfailover_HOST:-adev2}
+ost1_HOST=${ost1_HOST:-adev3}
+ost2_HOST=${ost2_HOST:-adev4}
+EXTRA_OSTS=${EXTRA_OSTS:-adev7}
+client_HOST=client
+LIVE_CLIENT=${LIVE_CLIENT:-adev8}
+# This should always be a list, not a regexp
+#FAIL_CLIENTS=${FAIL_CLIENTS:-mdev7}
+FAIL_CLIENTS=${FAIL_CLIENTS:-"adev9 adev10 adev11 adev12"}
+EXTRA_CLIENTS=${EXTRA_CLIENTS:-"adev[13-15]"}
+
+NETTYPE=${NETTYPE:-tcp}
+
+TIMEOUT=${TIMEOUT:-30}
+PTLDEBUG=${PTLDEBUG:-0}
+SUBSYSTEM=${SUBSYSTEM:-0}
+MOUNT=${MOUNT:-"/mnt/lustre"}
+UPCALL=${CLIENT_UPCALL:-`pwd`/replay-single-upcall.sh}
+
+MDSDEV=${MDSDEV:-/dev/sdc}
+MDSSIZE=${MDSSIZE:-50000}
+MDSJOURNALSIZE=${MDSJOURNALSIZE:-0}
+
+OSTDEV=${OSTDEV:-/tmp/ost-`hostname`}
+OSTSIZE=${OSTSIZE:=50000}
+OSTJOURNALSIZE=${OSTJOURNALSIZE:-0}
+
+FSTYPE=${FSTYPE:-ext3}
+STRIPE_BYTES=${STRIPE_BYTES:-1048576} 
+STRIPES_PER_OBJ=${STRIPES_PER_OBJ:-0}
+
+FAILURE_MODE=${FAILURE_MODE:-HARD} # or HARD
+POWER_DOWN=${POWER_DOWN:-"powerman --off"}
+POWER_UP=${POWER_UP:-"powerman --on"}
+
+PDSH="pdsh -S -w "
index 0c99215..d0df708 100644 (file)
@@ -2,6 +2,7 @@ mds_HOST=${mds_HOST:-`hostname`}
 mdsfailover_HOST=${mdsfailover_HOST:-""}
 ost1_HOST=${ost1_HOST:-"`hostname`"}
 ost2_HOST=${ost2_HOST:-"`hostname`"}
+EXTRA_OSTS=${EXTRA_OSTS:-"`hostname`"}
 client_HOST="'*'"
 LIVE_CLIENT=${LIVE_CLIENT:-"`hostname`"}
 # This should always be a list, not a regexp
@@ -17,9 +18,12 @@ UPCALL=${CLIENT_UPCALL:-`pwd`/replay-single-upcall.sh}
 
 MDSDEV=${MDSDEV:-$ROOT/tmp/mds-`hostname`}
 MDSSIZE=${MDSSIZE:-10000} #50000000
+MDSJOURNALSIZE=${MDSJOURNALSIZE:-0}
 
-OSTDEV=${OSTDEV:-$ROOT/tmp/ost-`hostname`}
+OSTDEV=${OSTDEV:-"$ROOT/tmp/ost-`hostname`-%d"}
 OSTSIZE=${OSTSIZE:=10000} #50000000
+OSTJOURNALSIZE=${OSTJOURNALSIZE:-0}
+
 FSTYPE=${FSTYPE:-ext3}
 STRIPE_BYTES=${STRIPE_BYTES:-65536} #1048576
 STRIPES_PER_OBJ=${STRIPES_PER_OBJ:-0}
index 5e69356..ff34d6d 100644 (file)
@@ -2,16 +2,18 @@ mds_HOST=${mds_HOST:-mdev4}
 mdsfailover_HOST=${mdsfailover_HOST:-mdev5}
 ost1_HOST=${ost1_HOST:-mdev2}
 ost2_HOST=${ost2_HOST:-mdev3}
+EXTRA_OSTS=${EXTRA_OSTS:-mdev7}
 client_HOST=client
 LIVE_CLIENT=${LIVE_CLIENT:-mdev6}
 # This should always be a list, not a regexp
-FAIL_CLIENTS=${FAIL_CLIENTS:-mdev7}
+#FAIL_CLIENTS=${FAIL_CLIENTS:-mdev7}
+FAIL_CLIENTS=${FAIL_CLIENTS:-""}
 
 NETTYPE=${NETTYPE:-tcp}
 
 TIMEOUT=${TIMEOUT:-30}
-#PTLDEBUG=${PTLDEBUG:-'"ha|info|ioctl|malloc"'}
 PTLDEBUG=${PTLDEBUG:-0}
+SUBSYSTEM=${SUBSYSTEM:-0}
 MOUNT=${MOUNT:-"/mnt/lustre"}
 UPCALL=${CLIENT_UPCALL:-`pwd`/replay-single-upcall.sh}
 
index ec8edf2..dd373ba 100644 (file)
@@ -11,7 +11,8 @@ MOUNT1=${MOUNT1:-$MOUNT}
 MOUNT2=${MOUNT2:-"/mnt/lustre2"}
 DIR=${DIR:-$MOUNT}
 DIR2=${DIR2:-$MOUNT1}
-PTLDEBUG=${PTLDEBUG:-0}
+PTLDEBUG=${PTLDEBUG:-0x3f0400}
+SUBSYSTEM=${SUBSYSTEM:- 0xffb7e3ff}
 PDSH=${PDSH:-pdsh -S -w}
 
 MDSDEV=${MDSDEV:-/dev/sda1}
index 39e92a9..0f58491 100644 (file)
@@ -2,6 +2,11 @@
 # requirement:
 #      add uml1 uml2 uml3 in your /etc/hosts
 
+# FIXME - there is no reason to use all of these different
+#   return codes, espcially when most of them are mapped to something
+#   else anyway.  The combination of test number and return code
+#   figure out what failed.
+
 set -e
 
 SRCDIR=`dirname $0`
@@ -66,7 +71,7 @@ stop_ost() {
 mount_client() {
        local MOUNTPATH=$1
        echo "mount lustre on ${MOUNTPATH}....."
-       zconf_mount $MOUNTPATH $CMDVERBOSE || return 96
+       zconf_mount `hostname`  $MOUNTPATH $CMDVERBOSE || return 96
 }
 
 umount_client() {
@@ -178,14 +183,27 @@ run_test 4 "force cleanup ost, then cleanup"
 
 test_5() {
        setup
-       touch $DIR/$tfile || return 86
-       stop_mds ${FORCE} || return 98
-       cleanup 
-       eno=$?
-       # ok for mds to fail shutdown
-       if [ 201 -ne $eno ]; then
-               return $eno;
-       fi
+       touch $DIR/$tfile || return 1
+       stop_mds ${FORCE} || return 2
+
+       # cleanup may return an error from the failed 
+       # disconnects; for now I'll consider this successful 
+       # if all the modules have unloaded.
+       umount $MOUNT &
+       UMOUNT_PID=$!
+       sleep $TIMEOUT
+       echo "killing umount"
+       kill -TERM $UMOUNT_PID
+       wait $UMOUNT_PID 
+
+       # cleanup client modules
+       $LCONF --cleanup --nosetup --node client_facet $XMLCONFIG > /dev/null 
+       
+       # stop_mds is a no-op here, and should not fail
+       stop_mds  || return 4
+       stop_ost || return 5
+
+       lsmod | grep -q portals && return 6
        return 0
 }
 run_test 5 "force cleanup mds, then cleanup"
index 1838cea..7ad2c1c 100755 (executable)
@@ -16,10 +16,19 @@ build_test_filter
 
 assert_env mds_HOST ost1_HOST ost2_HOST client_HOST LIVE_CLIENT 
 
-# This can be a regexp, to allow more clients
-CLIENTS=${CLIENTS:-"`comma_list $LIVE_CLIENT $FAIL_CLIENTS`"}
+####
+# Initialize all the ostN_HOST 
+NUMOST=2
+if [ "$EXTRA_OSTS" ]; then
+    for host in $EXTRA_OSTS; do
+       NUMOST=$((NUMOST + 1))
+       OST=ost$NUMOST
+       eval ${OST}_HOST=$host
+    done
+fi
 
-CLIENTLIST="$LIVE_CLIENT $FAIL_CLIENTS"
+# This can be a regexp, to allow more clients
+CLIENTS=${CLIENTS:-"`comma_list $LIVE_CLIENT $FAIL_CLIENTS $EXTRA_CLIENTS`"}
 
 DIR=${DIR:-$MOUNT}
 
@@ -30,21 +39,26 @@ DIR=${DIR:-$MOUNT}
 FAIL_LIST=($FAIL_CLIENTS)
 FAIL_NUM=${#FAIL_LIST[*]}
 FAIL_NEXT=0
+typeset -i  FAIL_NEXT
 DOWN_NUM=0   # number of nodes currently down
 
-# return next client to fail
-fail_client() {
-    ret=${FAIL_LIST[$FAIL_NEXT]}
+# set next client to fail
+set_fail_client() {
+    FAIL_CLIENT=${FAIL_LIST[$FAIL_NEXT]}
     FAIL_NEXT=$(( (FAIL_NEXT+1) % FAIL_NUM ))
-    echo $ret
+    echo "fail $FAIL_CLIENT, next is $FAIL_NEXT"
 }
 
 shutdown_client() {
     client=$1
     if [ "$FAILURE_MODE" = HARD ]; then
        $POWER_DOWN $client
+       while ping -w 3 -c 1 $client > /dev/null 2>&1; do 
+          echo "waiting for node $client to fail"
+          sleep 1
+       done  
     elif [ "$FAILURE_MODE" = SOFT ]; then
-       $PDSH $client $LCONF --clenaup --force --nomod $XMLCONFIG
+       zconf_umount $client $MOUNT -f
     fi
 }
 
@@ -65,24 +79,29 @@ fail_clients() {
         return
     fi
 
+    client_mkdirs
+
     for i in `seq $num`; do
-       client=`fail_client`
+       set_fail_client
+       client=$FAIL_CLIENT
        DOWN_CLIENTS="$DOWN_CLIENTS $client"
-       client_mkdirs
        shutdown_client $client
     done
 
+    echo "down clients: $DOWN_CLIENTS"
+
     for client in $DOWN_CLIENTS; do
        reboot_node $client
     done
     DOWN_NUM=`echo $DOWN_CLIENTS | wc -w`
-    $PDSH $LIVE_CLIENT "cd $MOUNT && rmdir $CLIENTLIST"
+    client_rmdirs
 }
 
 reintegrate_clients() {
     for client in $DOWN_CLIENTS; do
        wait_for_host $client
-       $PDSH $client "$LCONF --node client --select mds_svc=`facet_active mds` $CLIENTOPTS $XMLCONFIG"
+       echo "Restarting $client"
+       zconf_mount $client $MOUNT || return 1
     done
     DOWN_CLIENTS=""
     DOWN_NUM=0
@@ -90,7 +109,7 @@ reintegrate_clients() {
 
 gen_config() {
     rm -f $XMLCONFIG
-    add_mds mds --dev $MDSDEV --size $MDSSIZE
+    add_mds mds --dev $MDSDEV --size $MDSSIZE --journal-size $MDSJOURNALSIZE
 
     if [ ! -z "$mdsfailover_HOST" ]; then
         add_mdsfailover mds --dev $MDSDEV --size $MDSSIZE
@@ -98,58 +117,107 @@ gen_config() {
 
     add_lov lov1 mds --stripe_sz $STRIPE_BYTES\
        --stripe_cnt $STRIPES_PER_OBJ --stripe_pattern 0
-    add_ost ost1 --lov lov1 --dev $OSTDEV --size $OSTSIZE
-    add_ost ost2 --lov lov1 --dev ${OSTDEV}-2 --size $OSTSIZE
+    for i in `seq $NUMOST`; do
+       dev=`printf $OSTDEV $i`
+       add_ost ost$i --lov lov1 --dev $dev --size $OSTSIZE \
+           --journal-size $OSTJOURNALSIZE
+    done
+     
+
     add_client client mds --lov lov1 --path $MOUNT
 }
 
 setup() {
-    wait_for ost1
-    start ost1 ${REFORMAT} $OSTLCONFARGS 
-    wait_for ost2
-    start ost2 ${REFORMAT} $OSTLCONFARGS 
+    rm -rf logs/*
+    for i in `seq $NUMOST`; do
+       wait_for ost$i
+       start ost$i ${REFORMAT} $OSTLCONFARGS 
+    done
     [ "$DAEMONFILE" ] && $LCTL debug_daemon start $DAEMONFILE $DAEMONSIZE
     wait_for mds
     start mds $MDSLCONFARGS ${REFORMAT}
-    while ! do_node $HOST "$CHECKSTAT -t dir $LUSTRE"; do sleep 5; done
-    do_node $CLIENTS lconf --node client_facet \
-       --select mds_service=$ACTIVEMDS $XMLCONFIG
+    while ! do_node $CLIENTS "ls -d $LUSTRE" > /dev/null; do sleep 5; done
+    zconf_mount $CLIENTS $MOUNT
+
 }
 
 cleanup() {
-    # make sure we are using the primary MDS, so the config log will
-    # be able to clean up properly.
-    activemds=`facet_active mds`
-#    if [ $activemds != "mds" ]; then
-#        fail mds
-#    fi
-    for node in $CLIENTS; do
-       do_node $node lconf ${FORCE} --select mds_svc=${activemds}_facet --cleanup --node client_facet $XMLCONFIG || true
-    done
+    zconf_umount $CLIENTS $MOUNT
 
-    stop mds ${FORCE} $MDSLCONFARGS
-    stop ost1 ${FORCE}
-    stop ost2 ${FORCE} --dump cleanup.log
+    stop mds ${FORCE} $MDSLCONFARGS || :
+    for i in `seq $NUMOST`; do
+       stop ost$i ${REFORMAT} ${FORCE} $OSTLCONFARGS  || :
+    done
 }
 
 trap exit INT
 
+client_touch() {
+    file=$1
+    for c in $LIVE_CLIENT $FAIL_CLIENTS;  do
+       if echo $DOWN_CLIENTS | grep -q $c; then continue; fi
+       $PDSH $c touch $MOUNT/${c}_$file
+    done
+}
+
+client_rm() {
+    file=$1
+    for c in $LIVE_CLIENT $FAIL_CLIENTS;  do
+       $PDSH $c rm $MOUNT/${c}_$file
+    done
+}
+
 client_mkdirs() {
-   $PDSH $CLIENTS "mkdir $MOUNT/\`hostname\`; ls $MOUNT/\`hostname\` > /dev/null"
+    for c in $LIVE_CLIENT $FAIL_CLIENTS;  do
+       echo "$c mkdir $MOUNT/$c"
+       $PDSH $c "mkdir $MOUNT/$c"
+       $PDSH $c "ls -l $MOUNT/$c" 
+    done
+}
+
+client_rmdirs() {
+    for c in $LIVE_CLIENT $FAIL_CLIENTS;  do
+       echo "rmdir $MOUNT/$c"
+       $PDSH $LIVE_CLIENT "rmdir $MOUNT/$c"
+    done
 }
 
 clients_recover_osts() {
     facet=$1
-    $PDSH $CLIENTS "$LCTL "'--device %OSC_`hostname`_'"${facet}_svc_MNT_client_facet recover"
+#    do_node $CLIENTS "$LCTL "'--device %OSC_`hostname`_'"${facet}_svc_MNT_client_facet recover"
+}
+
+node_to_ost() {
+    node=$1
+    retvar=$2
+    for i in `seq $NUMOST`; do
+       ostvar="ost${i}_HOST"
+       if [ "${!ostvar}" == $node ]; then
+           eval $retvar=ost${i}
+           return 0
+       fi
+    done
+    echo "No ost found for node; $node"
+    return 1
+    
 }
 
+
+
 if [ "$ONLY" == "cleanup" ]; then
     cleanup
     exit
 fi
 
-gen_config
-setup
+if [ -z "$NOSETUP" ]; then
+    gen_config
+    setup
+fi
+
+if [ ! -z "$EVAL" ]; then
+    eval "$EVAL"
+    exit $?
+fi
 
 if [ "$ONLY" == "setup" ]; then
     exit 0
@@ -161,14 +229,17 @@ echo "Starting Test 17 at `date`"
 test_0() {
     echo "Failover MDS"
     facet_failover mds
+    echo "Waiting for df pid: $DFPID"
     wait $DFPID || return 1
 
     echo "Failing OST1"
     facet_failover ost1
+    echo "Waiting for df pid: $DFPID"
     wait $DFPID || return 2
 
     echo "Failing OST2"
     facet_failover ost2
+    echo "Waiting for df pid: $DFPID"
     wait $DFPID || return 3
     return 0
 }
@@ -178,7 +249,6 @@ run_test 0 "Fail all nodes, independently"
 test_1() {
 echo "Don't do a MDS - MDS Failure Case"
 echo "This makes no sense"
-# FIXME every test makes sense
 }
 run_test 1 "MDS/MDS failure"
 ###################################################
@@ -246,9 +316,9 @@ test_3() {
     
     #Reintegration
     echo "Reintegrating CLIENTS"
-    reintegrate_clients
+    reintegrate_clients || return 1
 
-    client_df || return 1
+    client_df || return 3
 }
 run_test 3  "Thirdb Failure Mode: MDS/CLIENT `date`"
 ###################################################
@@ -326,14 +396,15 @@ test_5() {
     #Reintegration
     echo "Reintegrating OSTs"
     wait_for ost1
-    wait_for ost1
     start ost1
+    wait_for ost2
     start ost2
     
     clients_recover_osts ost1
     clients_recover_osts ost2
-    sleep 5
-    client_df || return 1
+    sleep $TIMEOUT
+
+    client_df || return 2
 }
 run_test 5 "Fifth Failure Mode: OST/OST `date`"
 ###################################################
@@ -345,7 +416,7 @@ test_6() {
     #Create files
     echo "Verify Lustre filesystem is up and running"
     client_df || return 1
-    $PDSH $CLIENTS "/bin/touch $MOUNT/\`hostname\`_testfile" || return 2
+    client_touch testfile || return 2
        
     #OST Portion
     echo "Failing OST"
@@ -385,7 +456,7 @@ test_7() {
     #Create files
     echo "Verify Lustre filesystem is up and running"
     client_df
-    $PDSH $CLIENTS "/bin/touch $MOUNT/\`hostname\`_testfile"
+    client_touch testfile  || return 1
 
     #CLIENT Portion
     echo "Part 1: Failing CLIENT"
@@ -404,7 +475,7 @@ test_7() {
     #Create files
     echo "Verify Lustre filesystem is up and running"
     client_df
-    $PDSH $CLIENTS "/bin/touch $MOUNT/\`hostname\`_testfile"
+    client_rm testfile
 
     #MDS Portion
     echo "Failing MDS"
@@ -412,14 +483,14 @@ test_7() {
 
     #Check FS
     echo "Test Lustre stability after MDS failover"
-    client_df
+    wait $DFPID || echo "df on down clients fails " || return 1
     $PDSH $LIVE_CLIENT "ls -l $MOUNT"
     $PDSH $LIVE_CLIENT "rm -f $MOUNT/*_testfile"
 
     #Reintegration
     echo "Reintegrating CLIENTs"
     reintegrate_clients
-    client_df || return 1
+    client_df || return 2
     
     #Sleep
     echo "wait 1 minutes"
@@ -436,7 +507,7 @@ test_8() {
     #Create files
     echo "Verify Lustre filesystem is up and running"
     client_df
-    $PDSH $CLIENTS "/bin/touch $MOUNT/\`hostname\`_testfile"
+    client_touch testfile
        
     #CLIENT Portion
     echo "Failing CLIENTs"
@@ -455,7 +526,8 @@ test_8() {
     #Create files
     echo "Verify Lustre filesystem is up and running"
     client_df
-    $PDSH $CLIENTS "/bin/touch $MOUNT/\`hostname\`_testfile"
+    client_touch testfile
+
 
     #OST Portion
     echo "Failing OST"
@@ -471,9 +543,10 @@ test_8() {
     #Reintegration
     echo "Reintegrating CLIENTs/OST"
     reintegrate_clients
+    wait_for ost1
     start ost1
     client_df || return 1
-    $PDSH $CLIENTS "/bin/touch $MOUNT/CLIENT_OST_2\`hostname\`_testfile" || return 2
+    client_touch testfile2 || return 2
 
     #Sleep
     echo "Wait 1 minutes"
@@ -490,7 +563,7 @@ test_9() {
     #Create files
     echo "Verify Lustre filesystem is up and running"
     client_df
-    $PDSH $CLIENTS "/bin/touch $MOUNT/\`hostname\`_testfile"
+    client_touch testfile || return 1
        
     #CLIENT Portion
     echo "Failing CLIENTs"
@@ -508,8 +581,8 @@ test_9() {
 
     #Create files
     echo "Verify Lustre filesystem is up and running"
-    client_df || return 3
-    $PDSH $CLIENTS "/bin/touch $MOUNT/\`hostname\`_testfile" || return 4
+    $PDSH $LIVE_CLIENT df $MOUNT || return 3
+    client_touch testfile || return 4
 
     #CLIENT Portion
     echo "Failing CLIENTs"
@@ -535,7 +608,9 @@ run_test 9 "Ninth Failure Mode: CLIENT/CLIENT `date`"
 
 test_10() {
     #Run availability after all failures
-    ./availability.sh  21600
+    DURATION=${DURATION:-$((2 * 60 * 60))} # 6 hours default
+    LOADTEST=${LOADTEST:-metadata-load.py}
+    $PWD/availability.sh $CONFIG $DURATION $CLIENTS || return 1
 }
 run_test 10 "Running Availability for 6 hours..."
 
index d5dae90..18a93ea 100755 (executable)
@@ -152,21 +152,12 @@ test_11(){
 }
 run_test 11 "wake up a thead waiting for completion after eviction (b=2460)"
 
-clear_failloc() {
-    facet=$1
-    pause=$2
-    sleep $pause
-    echo "clearing fail_loc on $facet"
-    do_facet $facet "sysctl -w lustre.fail_loc=0"
-}
-
 #b=2494
 test_12(){
     $LCTL mark multiop $MOUNT/$tfile OS_c 
     multiop $MOUNT/$tfile OS_c  &
     PID=$!
 #define OBD_FAIL_MDS_CLOSE_NET           0x115
-    DDPID=$!
     do_facet mds "sysctl -w lustre.fail_loc=0x115"
     clear_failloc mds $((TIMEOUT * 2)) &
     kill -USR1 $PID
@@ -176,5 +167,27 @@ test_12(){
 }
 run_test 12 "recover from timed out resend in ptlrpcd (b=2494)"
 
+# Bug 113, check that readdir lost recv timeout works.
+test_13() {
+    mkdir /mnt/lustre/readdir
+    touch /mnt/lustre/readdir/newentry
+# OBD_FAIL_MDS_READPAGE_NET|OBD_FAIL_ONCE
+    do_facet mds "sysctl -w lustre.fail_loc=0x80000104"
+    ls /mnt/lustre/readdir || return 1
+    do_facet mds "sysctl -w lustre.fail_loc=0"
+    rm -rf /mnt/lustre/readdir
+}
+run_test 13 "mdc_readpage restart test (bug 1138)"
+
+# Bug 113, check that readdir lost send timeout works.
+test_14() {
+    mkdir /mnt/lustre/readdir
+    touch /mnt/lustre/readdir/newentry
+# OBD_FAIL_MDS_SENDPAGE|OBD_FAIL_ONCE
+    do_facet mds "sysctl -w lustre.fail_loc=0x80000106"
+    ls /mnt/lustre/readdir || return 1
+    do_facet mds "sysctl -w lustre.fail_loc=0"
+}
+run_test 14 "mdc_readpage resend test (bug 1138)"
+
 $CLEANUP
-    
index 56e6faa..d31b348 100755 (executable)
@@ -12,7 +12,8 @@ init_test_env $@
 ostfailover_HOST=${ostfailover_HOST:-$ost_HOST}
 
 # Skip these tests
-ALWAYS_EXCEPT=""
+ALWAYS_EXCEPT="5"
+# test 5 needs a larger fs than what local normally has
 
 gen_config() {
     rm -f $XMLCONFIG
@@ -51,13 +52,7 @@ rm -f ostactive
 gen_config
 
 start ost --reformat $OSTLCONFARGS
-PINGER=`cat /proc/fs/lustre/pinger`
 
-if [ "$PINGER" != "on" ]; then
-    echo "ERROR: Lustre must be built with --enable-pinger for this test."
-    stop ost
-    exit 1
-fi
 [ "$DAEMONFILE" ] && $LCTL debug_daemon start $DAEMONFILE $DAEMONSIZE
 start mds --reformat $MDSLCONFARGS
 zconf_mount $MOUNT
@@ -65,7 +60,6 @@ zconf_mount $MOUNT
 mkdir -p $DIR
 
 test_0() {
-    replay_barrier ost
     fail ost
     cp /etc/profile  $DIR/$tfile
     sync
@@ -74,7 +68,6 @@ test_0() {
 run_test 0 "empty replay"
 
 test_1() {
-    replay_barrier ost
     date > $DIR/$tfile
     fail ost
     $CHECKSTAT -t file $DIR/$tfile || return 1
@@ -82,7 +75,6 @@ test_1() {
 run_test 1 "touch"
 
 test_2() {
-    replay_barrier ost
     for i in `seq 10`; do
         echo "tag-$i" > $DIR/$tfile-$i
     done 
@@ -120,5 +112,16 @@ test_4() {
 }
 run_test 4 "Fail OST during read, with verification"
 
+test_5() {
+    IOZONE_OPTS="-i 0 -i 1 -i 2 -+d -r 64 -s 1g"
+    iozone $IOZONE_OPTS -f $DIR/$tfile &
+    PID=$!
+    
+    sleep 10
+    fail ost
+    wait $PID || return 1
+}
+run_test 5 "Fail OST during iozone"
+
 equals_msg test complete, cleaning up
 cleanup
index 17e04c9..59c1371 100755 (executable)
@@ -9,6 +9,10 @@ mkdir -p $TESTDIR/logs
 exec >> $TESTDIR/logs/recovery-`hostname`.log
 exec 2>&1
 
+echo ==========================================
+echo "start upcall: `date`"
+echo "command line: $0 $*"
+
 set -xv
 
 failed_import() {
index bed8b61..8fdcb68 100755 (executable)
@@ -662,9 +662,12 @@ run_test 34 "abort recovery before client does replay (test mds_cleanup_orphans)
 test_35() {
     touch $DIR/$tfile
 
-    echo 0x80000119 > /proc/sys/lustre/fail_loc
+#define OBD_FAIL_MDS_REINT_NET_REP       0x119
+    do_facet mds "sysctl -w lustre.fail_loc=0x80000119"
     rm -f $DIR/$tfile &
     sleep 1
+    sync
+    sleep 1
     # give a chance to remove from MDS
     fail_abort mds
     $CHECKSTAT -t file $DIR/$tfile && return 1 || true
@@ -708,40 +711,67 @@ test_37() {
 run_test 37 "abort recovery before client does replay (test mds_cleanup_orphans for directories)"
 
 test_38() {
-    for i in `seq 1 800`; do
-       touch $DIR/$tfile-$i
-    done
-    for i in `seq 1 400`; do
-       rm $DIR/$tfile-$i
-    done
-
+    createmany -o $DIR/$tfile-%d 800
+    unlinkmany $DIR/$tfile-%d 0 400
     replay_barrier mds
     fail mds
-    for i in `seq 401 800`; do
-       rm $DIR/$tfile-$i
-    done
+    unlinkmany $DIR/$tfile-%d 400 400
     sleep 2
     $CHECKSTAT -t file $DIR/$tfile-* && return 1 || true
 }
 run_test 38 "test recovery from unlink llog (test llog_gen_rec) "
 
 test_39() {
-    for i in `seq 1 800`; do
-       touch $DIR/$tfile-$i
-    done
-
+    createmany -o $DIR/$tfile-%d 800
     replay_barrier mds
-    for i in `seq 1 400`; do
-       rm $DIR/$tfile-$i
-    done
+    unlinkmany $DIR/$tfile-%d 0 400
     fail mds
-    for i in `seq 401 800`; do
-       rm $DIR/$tfile-$i
-    done
+    unlinkmany $DIR/$tfile-%d 400 400
     sleep 2
     $CHECKSTAT -t file $DIR/$tfile-* && return 1 || true
 }
 run_test 39 "test recovery from unlink llog (test llog_gen_rec) "
 
+count_ost_writes() {
+        cat /proc/fs/lustre/osc/*/stats |
+            awk -vwrites=0 '/ost_write/ { writes += $2 } END { print writes; }'
+}
+
+#b=2477,2532
+test_40(){
+    $LCTL mark multiop $MOUNT/$tfile OS_c 
+    multiop $MOUNT/$tfile OS_c  &
+    PID=$!
+    writeme -s $MOUNT/${tfile}-2 &
+    WRITE_PID=$!
+    sleep 1
+    facet_failover mds
+#define OBD_FAIL_MDS_CONNECT_NET         0x117
+    do_facet mds "sysctl -w lustre.fail_loc=0x80000117"
+    kill -USR1 $PID
+    stat1=`count_ost_writes`
+    sleep $TIMEOUT
+    stat2=`count_ost_writes`
+    echo "$stat1, $stat2"
+    if [ $stat1 -lt $stat2 ]; then 
+       echo "writes continuing during recovery"
+       RC=0
+    else
+       echo "writes not continuing during recovery, bug 2477"
+       RC=4
+    fi
+    echo "waiting for writeme $WRITE_PID"
+    kill $WRITE_PID
+    wait $WRITE_PID 
+
+    echo "waiting for multiop $PID"
+    wait $PID || return 2
+    do_facet client munlink $MOUNT/$tfile  || return 3
+    do_facet client munlink $MOUNT/${tfile}-2  || return 3
+    return $RC
+}
+run_test 40 "cause recovery in ptlrpc, ensure IO continues"
+
 equals_msg test complete, cleaning up
 $CLEANUP
+
index 53ded76..17c4167 100644 (file)
@@ -33,9 +33,10 @@ init_test_env() {
     [ -d /r ] && export ROOT=/r
 
     export PATH=:$PATH:$LUSTRE/utils:$LUSTRE/tests
+    export LLMOUNT=${LLMOUNT:-"llmount"}
     export LCONF=${LCONF:-"lconf"}
     export LMC=${LMC:-"lmc"}
-    export LCTL=${LCTL:-"lctl"}
+    export LCTL=${LCTL:-"$LUSTRE/utils/lctl"}
     export CHECKSTAT="${CHECKSTAT:-checkstat} "
 
     # Paths on remote nodes, if different 
@@ -55,6 +56,7 @@ init_test_env() {
     
     # save the name of the config file for the upcall
     echo "XMLCONFIG=$LUSTRE/tests/$XMLCONFIG"  > $LUSTRE/tests/XMLCONFIG
+#    echo "CONFIG=`canonical_path $CONFIG`"  > $LUSTRE/tests/CONFIG
 }
 
 # Facet functions
@@ -77,18 +79,18 @@ stop() {
 }
 
 zconf_mount() {
-    mnt=$1
+    client=$1
+    mnt=$2
+
+    do_node $client mkdir $mnt 2> /dev/null || :
 
-    [ -d $mnt ] || mkdir $mnt
-    
     if [ -x /sbin/mount.lustre ] ; then
-       mount -t lustre -o nettype=$NETTYPE \
-           `facet_host mds`:/mds_svc/client_facet $mnt
+       do_node $client mount -t lustre -o nettype=$NETTYPE `facet_active_host mds`:/mds_svc/client_facet $mnt || return 1
     else
        # this is so cheating
+       do_node $client $LCONF --nosetup --node client_facet $XMLCONFIG  > /dev/null || return 2
        $LCONF --nosetup --node client_facet $XMLCONFIG
-       $LUSTRE/utils/llmount `facet_host mds`:/mds_svc/client_facet $mnt \
-            -o nettype=$NETTYPE || return $?
+       do_node $client $LLMOUNT `facet_active_host mds`:/mds_svc/client_facet $mnt -o nettype=$NETTYPE|| return 4
     fi
 
     [ -d /r ] && $LCTL modules > /r/tmp/ogdb-`hostname`
@@ -96,9 +98,11 @@ zconf_mount() {
 }
 
 zconf_umount() {
-    mnt=$1
-    umount  $mnt || :
-    $LCONF --cleanup --nosetup --node client_facet $XMLCONFIG || :
+    client=$1
+    mnt=$2
+    [ "$3" ] && force=-f
+    do_node $client umount $force  $mnt || :
+    do_node $client $LCONF --cleanup --nosetup --node client_facet $XMLCONFIG > /dev/null || :
 }
 
 shutdown_facet() {
@@ -122,6 +126,7 @@ wait_for_host() {
    HOST=$1
    check_network  $HOST 900
    while ! do_node $HOST "$CHECKSTAT -t dir $LUSTRE"; do sleep 5; done
+   while ! do_node $HOST "ls -d $LUSTRE " > /dev/null; do sleep 5; done
 }
 
 wait_for() {
@@ -144,9 +149,10 @@ facet_failover() {
     reboot_facet $facet
     client_df &
     DFPID=$!
+    echo "df pid is $DFPID"
     change_active $facet
     TO=`facet_active_host $facet`
-    echo "Failover MDS to $TO"
+    echo "Failover $facet to $TO"
     wait_for $facet
     start $facet
 }
@@ -186,6 +192,12 @@ do_lmc() {
     $LMC -m ${XMLCONFIG} $@
 }
 
+h2gm () {
+   if [ "$1" = "client" ]; then echo \'*\'; else
+       $PDSH $1 $GMNALNID -l | cut -d\  -f2
+   fi
+}
+
 h2tcp() {
    if [ "$1" = "client" ]; then echo \'*\'; else
    echo $1 
@@ -230,7 +242,11 @@ facet_active() {
 facet_active_host() {
     local facet=$1
     local active=`facet_active $facet`
-    echo `facet_host $active`
+    if [ "$facet" == client ]; then
+       hostname
+    else
+       echo `facet_host $active`
+    fi
 }
 
 change_active() {
@@ -255,7 +271,7 @@ do_node() {
 
     if $VERBOSE; then
        echo "CMD: $HOST $@"
-       $PDSH $HOST $LCTL mark "$@" || :
+       $PDSH $HOST $LCTL mark "$@" > /dev/null 2>&1 || :
     fi
     $PDSH $HOST "(PATH=\$PATH:$RLUSTRE/utils:$RLUSTRE/tests; cd $RPWD; sh -c \"$@\")"
 }
@@ -416,6 +432,14 @@ drop_bl_callback() {
     return $RC
 }
 
+clear_failloc() {
+    facet=$1
+    pause=$2
+    sleep $pause
+    echo "clearing fail_loc on $facet"
+    do_facet $facet "sysctl -w lustre.fail_loc=0"
+}
+
 cancel_lru_locks() {
     $LCTL mark cancel_lru_locks
     for d in /proc/fs/lustre/ldlm/namespaces/$1*; do
@@ -501,3 +525,8 @@ run_one() {
 
     test_${testnum} || error "test_$testnum failed with $?"
 }
+
+canonical_path() {
+   (cd `dirname $1`; echo $PWD/`basename $1`)
+}
+
index a376063..bf5c971 100644 (file)
@@ -4,20 +4,32 @@
 #include <stdio.h>
 #include <string.h>
 
+void usage(char *prog)
+{
+        printf("usage: %s [-s] filename\n", prog);
+}
+
 int main(int argc, char **argv)
 {
         int fd, rc;
+       int do_sync = 0;
         int i = 0;
+       int file_arg = 1;
         char buf[4096];
 
         memset(buf, 0, 4096);
 
-        if (argc != 2) {
-                printf("Usage: %s <filename>\n", argv[0]);
+        if (argc < 2 || argc > 3) {
+               usage(argv[0]);
                 exit(1);
         }
 
-        fd = open(argv[1], O_RDWR | O_CREAT, 0600);
+        if (strcmp(argv[1], "-s") == 0) {
+                do_sync = 1;
+               file_arg++;
+        }
+
+        fd = open(argv[file_arg], O_RDWR | O_CREAT, 0600);
         if (fd == -1) {
                 printf("Error opening %s\n", argv[1]);
                 exit(1);
@@ -26,6 +38,8 @@ int main(int argc, char **argv)
         while (1) {
                 sprintf(buf, "write %d\n", i);
                 rc = write(fd, buf, sizeof(buf));
+               if (do_sync)
+                       sync();
                 sleep(1);
         }
         return 0;