Whamcloud - gitweb
Land b1_2_2766 onto b1_2 (20040518_2110)
authoradilger <adilger>
Wed, 19 May 2004 07:13:13 +0000 (07:13 +0000)
committeradilger <adilger>
Wed, 19 May 2004 07:13:13 +0000 (07:13 +0000)
- clear page cache after eviction (2766)
- resynchronize MDS->OST in background (2824)

Running test 5 in replay-ost-single.sh (still disabled) still fails in UML,
I'm not sure if this is bug 2766-related or something else.

27 files changed:
lustre/ChangeLog
lustre/include/linux/lustre_dlm.h
lustre/include/linux/lustre_ha.h
lustre/include/linux/lustre_import.h
lustre/include/linux/lustre_net.h
lustre/include/linux/obd.h
lustre/ldlm/ldlm_lib.c
lustre/ldlm/ldlm_request.c
lustre/ldlm/ldlm_resource.c
lustre/lov/lov_obd.c
lustre/mds/handler.c
lustre/mds/mds_internal.h
lustre/mds/mds_lov.c
lustre/obdclass/genops.c
lustre/osc/osc_create.c
lustre/osc/osc_request.c
lustre/ptlrpc/client.c
lustre/ptlrpc/import.c
lustre/ptlrpc/niobuf.c
lustre/ptlrpc/ptlrpc_internal.h
lustre/ptlrpc/ptlrpc_module.c
lustre/ptlrpc/ptlrpcd.c
lustre/ptlrpc/recover.c
lustre/tests/insanity.sh
lustre/tests/recovery-small.sh
lustre/tests/replay-ost-single.sh
lustre/tests/replay-single.sh

index afe8799..f0f0e20 100644 (file)
@@ -33,6 +33,8 @@ tbd  Cluster File Systems, Inc. <info@clusterfs.com>
        - update iopen-2.6 patch with fixes from 2399,2517,2904 (3301)
        - don't leak open file on MDS after open resend (3325)
        - serialize filter_precreate and filter_destroy_precreated (3329)
+       - clear page cache after eviction (2766)
+       - resynchronize MDS->OST in background (2824)
        * miscellania
        - allow default OST striping configuration per directory (1414)
        - fix compilation for qswnal for 2.6 kernels (3125)
index d5b1f76..9c2aa53 100644 (file)
@@ -60,7 +60,7 @@ typedef enum {
 #define LDLM_FL_LOCAL_ONLY     0x000400 /* see ldlm_cli_cancel_unused */
 
 /* don't run the cancel callback under ldlm_cli_cancel_unused */
-#define LDLM_FL_NO_CALLBACK    0x000800
+#define LDLM_FL_FAILED         0x000800
 
 #define LDLM_FL_HAS_INTENT     0x001000 /* lock request has intent */
 #define LDLM_FL_CANCELING      0x002000 /* lock cancel has already been sent */
index 808ff44..fe83b7d 100644 (file)
@@ -19,7 +19,8 @@ void ptlrpc_free_committed(struct obd_import *imp);
 void ptlrpc_wake_delayed(struct obd_import *imp);
 int ptlrpc_recover_import(struct obd_import *imp, char *new_uuid);
 int ptlrpc_set_import_active(struct obd_import *imp, int active);
-void ptlrpc_invalidate_import(struct obd_import *imp);
+void ptlrpc_deactivate_import(struct obd_import *imp);
+void ptlrpc_invalidate_import(struct obd_import *imp, int in_rpc);
 void ptlrpc_fail_import(struct obd_import *imp, int generation);
 void ptlrpc_fail_export(struct obd_export *exp);
 
index 14943f8..d2af141 100644 (file)
@@ -41,8 +41,9 @@ static inline char * ptlrpc_import_state_name(enum lustre_imp_state state)
 
 enum obd_import_event {
         IMP_EVENT_DISCON     = 0x808001,
-        IMP_EVENT_INVALIDATE = 0x808002,
-        IMP_EVENT_ACTIVE     = 0x808003,
+        IMP_EVENT_INACTIVE   = 0x808002,
+        IMP_EVENT_INVALIDATE = 0x808003,
+        IMP_EVENT_ACTIVE     = 0x808004,
 };
 
 struct obd_import {
@@ -64,6 +65,7 @@ struct obd_import {
         struct obd_device        *imp_obd;
         wait_queue_head_t         imp_recovery_waitq;
         __u64                     imp_last_replay_transno;
+        atomic_t                  imp_inflight;
         atomic_t                  imp_replay_inflight;
         enum lustre_imp_state     imp_state;
         int                       imp_generation;
index f7e7b43..9b33c3e 100644 (file)
@@ -326,6 +326,7 @@ struct ptlrpc_request {
         struct ptlrpc_request_set *rq_set;
         void *rq_interpret_reply;               /* Async completion handler */
         union ptlrpc_async_args rq_async_args;  /* Async completion context */
+        void * rq_ptlrpcd_data;
 };
 
 
@@ -626,6 +627,8 @@ int ptlrpc_start_thread(struct obd_device *dev, struct ptlrpc_service *svc,
                         char *name);
 int ptlrpc_unregister_service(struct ptlrpc_service *service);
 int liblustre_check_services (void *arg);
+void ptlrpc_daemonize(void);
+
 
 struct ptlrpc_svc_data {
         char *name;
@@ -669,7 +672,7 @@ int ptlrpc_pinger_add_import(struct obd_import *imp);
 int ptlrpc_pinger_del_import(struct obd_import *imp);
 
 /* ptlrpc/ptlrpcd.c */
-void ptlrpcd_wake(void);
+void ptlrpcd_wake(struct ptlrpc_request *req);
 void ptlrpcd_add_req(struct ptlrpc_request *req);
 int ptlrpcd_addref(void);
 void ptlrpcd_decref(void);
index f9a0bab..fdd3ebb 100644 (file)
@@ -53,6 +53,7 @@ struct lov_oinfo {                 /* per-stripe data structure */
         /* used by the osc to keep track of what objects to build into rpcs */
         struct loi_oap_pages loi_read_lop;
         struct loi_oap_pages loi_write_lop;
+        struct list_head loi_read_item;
         /* _cli_ is poorly named, it should be _ready_ */
         struct list_head loi_cli_item;
         struct list_head loi_write_item;
@@ -74,6 +75,7 @@ static inline void loi_init(struct lov_oinfo *loi)
         INIT_LIST_HEAD(&loi->loi_write_lop.lop_pending_group);
         INIT_LIST_HEAD(&loi->loi_cli_item);
         INIT_LIST_HEAD(&loi->loi_write_item);
+        INIT_LIST_HEAD(&loi->loi_read_item);
 }
 
 struct lov_stripe_md {
@@ -244,6 +246,7 @@ struct client_obd {
         spinlock_t               cl_loi_list_lock;
         struct list_head         cl_loi_ready_list;
         struct list_head         cl_loi_write_list;
+        struct list_head         cl_loi_read_list;
         int                      cl_brw_in_flight;
         /* just a sum of the loi/lop pending numbers to be exported by /proc */
         int                      cl_pending_w_pages;
index 8165ce0..906090b 100644 (file)
@@ -102,6 +102,7 @@ int client_obd_setup(struct obd_device *obddev, obd_count len, void *buf)
         INIT_LIST_HEAD(&cli->cl_cache_waiters);
         INIT_LIST_HEAD(&cli->cl_loi_ready_list);
         INIT_LIST_HEAD(&cli->cl_loi_write_list);
+        INIT_LIST_HEAD(&cli->cl_loi_read_list);
         spin_lock_init(&cli->cl_loi_list_lock);
         cli->cl_brw_in_flight = 0;
         spin_lock_init(&cli->cl_read_rpc_hist.oh_lock);
@@ -313,7 +314,7 @@ int client_disconnect_export(struct obd_export *exp, int failover)
 
         /* Yeah, obd_no_recov also (mainly) means "forced shutdown". */
         if (obd->obd_no_recov)
-                ptlrpc_invalidate_import(imp);
+                ptlrpc_invalidate_import(imp, 0);
         else
                 rc = ptlrpc_disconnect_import(imp);
 
index bb1b32e..9cdad73 100644 (file)
@@ -117,9 +117,9 @@ noreproc:
         /* Go to sleep until the lock is granted or cancelled. */
         rc = l_wait_event(lock->l_waitq,
                           ((lock->l_req_mode == lock->l_granted_mode) ||
-                           (lock->l_flags & LDLM_FL_CANCEL)), &lwi);
+                           (lock->l_flags & LDLM_FL_FAILED)), &lwi);
 
-        if (lock->l_destroyed) {
+        if (lock->l_destroyed || lock->l_flags & LDLM_FL_FAILED) {
                 LDLM_DEBUG(lock, "client-side enqueue waking up: destroyed");
                 RETURN(-EIO);
         }
@@ -680,11 +680,6 @@ static int ldlm_cli_cancel_unused_resource(struct ldlm_namespace *ns,
 
                 w->w_lock = LDLM_LOCK_GET(lock);
 
-                /* Prevent the cancel callback from being called by setting
-                 * LDLM_FL_CANCEL in the lock.  Very sneaky. -p */
-                if (flags & LDLM_FL_NO_CALLBACK)
-                        w->w_lock->l_flags |= LDLM_FL_CANCEL;
-
                 list_add(&w->w_list, &list);
         }
         l_unlock(&ns->ns_lock);
@@ -717,7 +712,6 @@ static int ldlm_cli_cancel_unused_resource(struct ldlm_namespace *ns,
  *
  * If flags & LDLM_FL_LOCAL_ONLY, throw the locks away without trying
  * to notify the server.
- * If flags & LDLM_FL_NO_CALLBACK, don't run the cancel callback.
  * If flags & LDLM_FL_WARN, print a warning if some locks are still in use. */
 int ldlm_cli_cancel_unused(struct ldlm_namespace *ns,
                            struct ldlm_res_id *res_id, int flags, void *opaque)
index 53b9d46..7118858 100644 (file)
@@ -289,7 +289,10 @@ static void cleanup_resource(struct ldlm_resource *res, struct list_head *q,
                 lock = list_entry(tmp, struct ldlm_lock, l_res_link);
                 LDLM_LOCK_GET(lock);
 
-                lock->l_flags |= LDLM_FL_CANCEL;
+                /* Set CBPENDING so nothing in the cancellation path
+                 * can match this lock */
+                lock->l_flags |= LDLM_FL_CBPENDING;
+                lock->l_flags |= LDLM_FL_FAILED;
                 lock->l_flags |= flags;
 
                 if (local_only && (lock->l_readers || lock->l_writers)) {
@@ -297,7 +300,6 @@ static void cleanup_resource(struct ldlm_resource *res, struct list_head *q,
                          * alternative: pretend that we got a blocking AST from
                          * the server, so that when the lock is decref'd, it
                          * will go away ... */
-                        lock->l_flags |= LDLM_FL_CBPENDING;
                         /* ... without sending a CANCEL message. */
                         lock->l_flags |= LDLM_FL_LOCAL_ONLY;
                         LDLM_DEBUG(lock, "setting FL_LOCAL_ONLY");
index 82018b1..87376fb 100644 (file)
@@ -338,24 +338,20 @@ static int lov_notify(struct obd_device *obd, struct obd_device *watched,
         }
         uuid = &watched->u.cli.cl_import->imp_target_uuid;
 
-        /*
-         * Must notify (MDS) before we mark the OSC as active, so that
-         * the orphan deletion happens without interference from racing
-         * creates.
+        /* Set OSC as active before notifying the observer, so the
+         * observer can use the OSC normally.  
          */
-        if (obd->obd_observer) {
-                /* Pass the notification up the chain. */
-                rc = obd_notify(obd->obd_observer, watched, active);
-                if (rc)
-                        RETURN(rc);
-        }
-
         rc = lov_set_osc_active(&obd->u.lov, uuid, active);
-
         if (rc) {
                 CERROR("%sactivation of %s failed: %d\n",
                        active ? "" : "de", uuid->uuid, rc);
+                RETURN(rc);
         }
+
+        if (obd->obd_observer)
+                /* Pass the notification up the chain. */
+                rc = obd_notify(obd->obd_observer, watched, active);
+
         RETURN(rc);
 }
 
index 4405c08..99e5699 100644 (file)
@@ -1527,27 +1527,53 @@ err_llog:
         RETURN(rc);
 }
 
-static int mds_postrecov(struct obd_device *obd) 
-
+int mds_postrecov(struct obd_device *obd) 
 {
-        int rc, rc2;
+        struct mds_obd *mds = &obd->u.mds;
+        int rc;
 
         LASSERT(!obd->obd_recovering);
         LASSERT(llog_get_context(obd, LLOG_UNLINK_ORIG_CTXT) != NULL);
 
+        /* set nextid first, so we are sure it happens */
+        rc = mds_lov_set_nextid(obd);
+        if (rc) {
+                CERROR ("%s: mds_lov_set_nextid failed\n",
+                        obd->obd_name);
+                GOTO(out, rc);
+        }
+
+        /* clean PENDING dir */
+        rc = mds_cleanup_orphans(obd);
+        if (rc) {
+                GOTO(out, rc);
+        }
+
         rc = llog_connect(llog_get_context(obd, LLOG_UNLINK_ORIG_CTXT),
                           obd->u.mds.mds_lov_desc.ld_tgt_count,
                           NULL, NULL, NULL);
-        if (rc != 0) {
-                CERROR("faild at llog_origin_connect: %d\n", rc);
+        if (rc) {
+                CERROR("%s: failed at llog_origin_connect: %d\n", 
+                       obd->obd_name, rc);
+                GOTO(out, rc);
         }
 
-        rc = mds_cleanup_orphans(obd);
+        /* remove the orphaned precreated objects */
+        rc = mds_lov_clearorphans(mds, NULL /* all OSTs */);
+        if (rc) {
+                GOTO(err_llog, rc);
+        }
 
-        rc2 = mds_lov_set_nextid(obd);
-        if (rc2 == 0)
-                rc2 = rc;
-        RETURN(rc2);
+out:
+        RETURN(rc);
+
+err_llog:
+        /* cleanup all llogging subsystems */
+        rc = obd_llog_finish(obd, mds->mds_lov_desc.ld_tgt_count);
+        if (rc)
+                CERROR("%s: failed to cleanup llogging subsystems\n",
+                        obd->obd_name);
+        goto out;
 }
 
 int mds_lov_clean(struct obd_device *obd)
index 9c5ae2e..2829812 100644 (file)
@@ -68,6 +68,7 @@ int mds_lov_write_objids(struct obd_device *obd);
 void mds_lov_update_objids(struct obd_device *obd, obd_id *ids);
 int mds_lov_set_growth(struct mds_obd *mds, int count);
 int mds_lov_set_nextid(struct obd_device *obd);
+int mds_lov_clearorphans(struct mds_obd *mds, struct obd_uuid *ost_uuid);
 int mds_post_mds_lovconf(struct obd_device *obd);
 int mds_notify(struct obd_device *obd, struct obd_device *watched, int active);
 int mds_convert_lov_ea(struct obd_device *obd, struct inode *inode,
@@ -98,6 +99,7 @@ extern struct lvfs_callback_ops mds_lvfs_ops;
 int mds_lov_clean(struct obd_device *obd);
 extern int mds_iocontrol(unsigned int cmd, struct obd_export *exp,
                          int len, void *karg, void *uarg);
+int mds_postrecov(struct obd_device *obd);
 #ifdef __KERNEL__
 int mds_get_md(struct obd_device *, struct inode *, void *md, int *size, 
                int lock);
index 2d0bed0..d93ce0e 100644 (file)
@@ -117,7 +117,7 @@ int mds_lov_write_objids(struct obd_device *obd)
         RETURN(rc);
 }
 
-static int mds_lov_clearorphans(struct mds_obd *mds, struct obd_uuid *ost_uuid)
+int mds_lov_clearorphans(struct mds_obd *mds, struct obd_uuid *ost_uuid)
 {
         int rc;
         struct obdo oa;
@@ -155,12 +155,6 @@ int mds_lov_set_nextid(struct obd_device *obd)
 
         rc = obd_set_info(mds->mds_osc_exp, strlen("next_id"), "next_id",
                           mds->mds_lov_desc.ld_tgt_count, mds->mds_lov_objids);
-        if (rc < 0)
-                GOTO(out, rc);
-
-        rc = mds_lov_clearorphans(mds, NULL /* all OSTs */);
-
-out:
         RETURN(rc);
 }
 
@@ -263,28 +257,10 @@ int mds_lov_connect(struct obd_device *obd, char * lov_name)
          * set_nextid().  The class driver can help us here, because
          * it can use the obd_recovering flag to determine when the
          * the OBD is full available. */
-        if (!obd->obd_recovering) {
-                rc = llog_connect(llog_get_context(obd, LLOG_UNLINK_ORIG_CTXT),
-                                  obd->u.mds.mds_lov_desc.ld_tgt_count, NULL,
-                                  NULL, NULL);
-                if (rc != 0)
-                        CERROR("faild at llog_origin_connect: %d\n", rc);
-
-                rc = mds_cleanup_orphans(obd);
-                if (rc > 0)
-                        CERROR("Cleanup %d orphans while MDS isn't recovering\n", rc);
-
-                rc = mds_lov_set_nextid(obd);
-                if (rc)
-                        GOTO(err_llog, rc);
-        }
+        if (!obd->obd_recovering)
+                rc = mds_postrecov(obd);
         RETURN(rc);
 
-err_llog:
-        /* cleanup all llogging subsystems */
-        rc = obd_llog_finish(obd, mds->mds_lov_desc.ld_tgt_count);
-        if (rc)
-                CERROR("failed to cleanup llogging subsystems\n");
 err_reg:
         obd_register_observer(mds->mds_osc_obd, NULL);
 err_discon:
@@ -502,6 +478,89 @@ int mds_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
                 RETURN(-EINVAL);
         }
         RETURN(0);
+
+}
+
+struct mds_lov_sync_info {
+        struct obd_device *mlsi_obd; /* the lov device to sync */
+        struct obd_uuid   *mlsi_uuid;  /* target to sync */
+};
+
+int mds_lov_synchronize(void *data)
+{
+        struct mds_lov_sync_info *mlsi = data;
+        struct obd_device *obd;
+        struct obd_uuid *uuid;
+        unsigned long flags;
+        int rc;
+
+        lock_kernel();
+        ptlrpc_daemonize();
+
+        SIGNAL_MASK_LOCK(current, flags);
+        sigfillset(&current->blocked);
+        RECALC_SIGPENDING;
+        SIGNAL_MASK_UNLOCK(current, flags);
+
+        obd = mlsi->mlsi_obd;
+        uuid = mlsi->mlsi_uuid;
+
+        OBD_FREE(mlsi, sizeof(*mlsi));
+
+        LASSERT(obd != NULL);
+        LASSERT(uuid != NULL);
+
+        rc = obd_set_info(obd->u.mds.mds_osc_exp, strlen("mds_conn"), 
+                          "mds_conn", 0, uuid);
+        if (rc != 0)
+                RETURN(rc);
+        
+        rc = llog_connect(llog_get_context(obd, LLOG_UNLINK_ORIG_CTXT),
+                          obd->u.mds.mds_lov_desc.ld_tgt_count,
+                          NULL, NULL, uuid);
+        if (rc != 0) {
+                CERROR("%s: failed at llog_origin_connect: %d\n", 
+                       obd->obd_name, rc);
+                RETURN(rc);
+        }
+        
+        CWARN("MDS %s: %s now active, resetting orphans\n",
+              obd->obd_name, uuid->uuid);
+        rc = mds_lov_clearorphans(&obd->u.mds, uuid);
+        if (rc != 0) {
+                CERROR("%s: failed at mds_lov_clearorphans: %d\n", 
+                       obd->obd_name, rc);
+                RETURN(rc);
+        }
+
+        RETURN(0);
+}
+
+int mds_lov_start_synchronize(struct obd_device *obd, struct obd_uuid *uuid)
+{
+        struct mds_lov_sync_info *mlsi;
+        int rc;
+        
+        ENTRY;
+
+        OBD_ALLOC(mlsi, sizeof(*mlsi));
+        if (mlsi == NULL)
+                RETURN(-ENOMEM);
+
+        mlsi->mlsi_obd = obd;
+        mlsi->mlsi_uuid = uuid;
+
+        rc = kernel_thread(mds_lov_synchronize, mlsi, CLONE_VM | CLONE_FILES);
+        if (rc < 0)
+                CERROR("%s: error starting mds_lov_synchronize: %d\n", 
+                       obd->obd_name, rc);
+        else {
+                CDEBUG(D_HA, "%s: mds_lov_synchronize thread: %d\n", 
+                       obd->obd_name, rc);
+                rc = 0;
+        }
+
+        RETURN(rc);
 }
 
 int mds_notify(struct obd_device *obd, struct obd_device *watched, int active)
@@ -526,22 +585,7 @@ int mds_notify(struct obd_device *obd, struct obd_device *watched, int active)
         } else {
                 LASSERT(llog_get_context(obd, LLOG_UNLINK_ORIG_CTXT) != NULL);
 
-                rc = obd_set_info(obd->u.mds.mds_osc_exp, strlen("mds_conn"), "mds_conn",
-                                  0, uuid);
-                if (rc != 0)
-                        RETURN(rc);
-
-                rc = llog_connect(llog_get_context(obd, LLOG_UNLINK_ORIG_CTXT),
-                                  obd->u.mds.mds_lov_desc.ld_tgt_count,
-                                  NULL, NULL, uuid);
-                if (rc != 0) {
-                        CERROR("faild at llog_origin_connect: %d\n", rc);
-                        RETURN(rc);
-                }
-
-                CWARN("MDS %s: %s now active, resetting orphans\n",
-                      obd->obd_name, uuid->uuid);
-                rc = mds_lov_clearorphans(&obd->u.mds, uuid);
+                rc = mds_lov_start_synchronize(obd, uuid);
         }
         RETURN(rc);
 }
index 98ae3b5..aee57a7 100644 (file)
@@ -526,6 +526,7 @@ struct obd_import *class_new_import(void)
         init_waitqueue_head(&imp->imp_recovery_waitq);
 
         atomic_set(&imp->imp_refcount, 2);
+        atomic_set(&imp->imp_inflight, 0);
         atomic_set(&imp->imp_replay_inflight, 0);
         INIT_LIST_HEAD(&imp->imp_handle.h_link);
         class_handle_hash(&imp->imp_handle, import_handle_addref);
index 407b0bf..26d923e 100644 (file)
@@ -231,26 +231,35 @@ int osc_create(struct obd_export *exp, struct obdo *oa,
        /* this is the special case where create removes orphans */
        if ((oa->o_valid & OBD_MD_FLFLAGS) &&
            oa->o_flags == OBD_FL_DELORPHAN) {
-                CDEBUG(D_HA, "%p: oscc recovery started\n", oscc);
+                CDEBUG(D_HA, "%s: oscc recovery started\n", 
+                        exp->exp_obd->obd_name);
+                LASSERT(oscc->oscc_flags & OSCC_FLAG_RECOVERING);
+
                 /* delete from next_id on up */
                 oa->o_valid |= OBD_MD_FLID;
                 oa->o_id = oscc->oscc_next_id - 1;
 
                 CDEBUG(D_HA, "%s: deleting to next_id: "LPU64"\n", 
-                       oscc->oscc_obd->u.cli.cl_import->imp_target_uuid.uuid, 
-                       oa->o_id);
+                       exp->exp_obd->obd_name, oa->o_id);
 
                 rc = osc_real_create(exp, oa, ea, NULL);
 
                 spin_lock(&oscc->oscc_lock);
-                if (rc == -ENOSPC)
-                        oscc->oscc_flags |= OSCC_FLAG_NOSPC;
-                oscc->oscc_flags &= ~OSCC_FLAG_RECOVERING;
-                oscc->oscc_last_id = oa->o_id;
-                wake_up(&oscc->oscc_waitq);
+                if (rc == 0 || rc == -ENOSPC) {
+                        if (rc == -ENOSPC)
+                                oscc->oscc_flags |= OSCC_FLAG_NOSPC;
+                        oscc->oscc_flags &= ~OSCC_FLAG_RECOVERING;
+                        oscc->oscc_last_id = oa->o_id;
+                        CDEBUG(D_HA, "%s: oscc recovery finished: %d\n", 
+                               exp->exp_obd->obd_name, rc);
+                        wake_up(&oscc->oscc_waitq);
+                        
+                } else {
+                        CDEBUG(D_ERROR, "%s: oscc recovery failed: %d\n", 
+                               exp->exp_obd->obd_name, rc);
+                }
                 spin_unlock(&oscc->oscc_lock);
 
-                CDEBUG(D_HA, "%p: oscc recovery finished\n", oscc);
 
                RETURN(rc);
        }
index bc1312b..3e06638 100644 (file)
@@ -311,6 +311,8 @@ int osc_real_create(struct obd_export *exp, struct obdo *oa,
                         oa->o_flags == OBD_FL_DELORPHAN);
                 DEBUG_REQ(D_HA, request,
                           "delorphan from OST integration");
+                /* Don't resend the delorphan request */
+                request->rq_no_resend = request->rq_no_delay = 1;
         }
 
         rc = ptlrpc_queue_wait(request);
@@ -1139,7 +1141,7 @@ static void osc_occ_interrupted(struct oig_callback_context *occ)
         /* ok, it's been put in an rpc. */
         if (oap->oap_request != NULL) {
                 ptlrpc_mark_interrupted(oap->oap_request);
-                ptlrpcd_wake();
+                ptlrpcd_wake(oap->oap_request);
                 GOTO(unlock, 0);
         }
 
@@ -1200,17 +1202,23 @@ static int brw_interpret_oap(struct ptlrpc_request *request,
         struct list_head *pos, *n;
         ENTRY;
 
-        CDEBUG(D_INODE, "request %p aa %p\n", request, aa);
 
         rc = osc_brw_fini_request(request, aa->aa_oa, aa->aa_requested_nob,
                                   aa->aa_nio_count, aa->aa_page_count,
                                   aa->aa_pga, rc);
 
+        CDEBUG(D_INODE, "request %p aa %p rc %d\n", request, aa, rc);
+
         cli = aa->aa_cli;
         /* in failout recovery we ignore writeback failure and want
          * to just tell llite to unlock the page and continue */
-        if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
+        if (request->rq_reqmsg->opc == OST_WRITE && 
+            (cli->cl_import == NULL || cli->cl_import->imp_invalid)) {
+                CDEBUG(D_INODE, "flipping to rc 0 imp %p inv %d\n", 
+                       cli->cl_import, 
+                       cli->cl_import ? cli->cl_import->imp_invalid : -1);
                 rc = 0;
+        }
 
         spin_lock(&cli->cl_loi_list_lock);
 
@@ -1502,6 +1510,13 @@ static int lop_makes_rpc(struct client_obd *cli, struct loi_oap_pages *lop,
         if (lop->lop_num_pending == 0)
                 RETURN(0);
 
+        /* if we have an invalid import we want to drain the queued pages
+         * by forcing them through rpcs that immediately fail and complete
+         * the pages.  recovery relies on this to empty the queued pages
+         * before canceling the locks and evicting down the llite pages */
+        if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
+                RETURN(1);
+
         /* stream rpcs in queue order as long as as there is an urgent page
          * queued.  this is our cheap solution for good batching in the case
          * where writepage marks some random page in the middle of the file as
@@ -1549,6 +1564,9 @@ static void loi_list_maint(struct client_obd *cli, struct lov_oinfo *loi)
 
         on_list(&loi->loi_write_item, &cli->cl_loi_write_list,
                 loi->loi_write_lop.lop_num_pending);
+
+        on_list(&loi->loi_read_item, &cli->cl_loi_read_list,
+                loi->loi_read_lop.lop_num_pending);
 }
 
 #define LOI_DEBUG(LOI, STR, args...)                                     \
@@ -1577,6 +1595,17 @@ struct lov_oinfo *osc_next_loi(struct client_obd *cli)
             !list_empty(&cli->cl_loi_write_list))
                 RETURN(list_entry(cli->cl_loi_write_list.next,
                                   struct lov_oinfo, loi_write_item));
+
+        /* then return all queued objects when we have an invalid import
+         * so that they get flushed */
+        if (cli->cl_import == NULL || cli->cl_import->imp_invalid) {
+                if (!list_empty(&cli->cl_loi_write_list))
+                        RETURN(list_entry(cli->cl_loi_write_list.next,
+                                          struct lov_oinfo, loi_write_item));
+                if (!list_empty(&cli->cl_loi_read_list))
+                        RETURN(list_entry(cli->cl_loi_read_list.next,
+                                          struct lov_oinfo, loi_read_item));
+        }
         RETURN(NULL);
 }
 
@@ -1626,6 +1655,8 @@ static void osc_check_rpcs(struct client_obd *cli)
                         list_del_init(&loi->loi_cli_item);
                 if (!list_empty(&loi->loi_write_item))
                         list_del_init(&loi->loi_write_item);
+                if (!list_empty(&loi->loi_read_item))
+                        list_del_init(&loi->loi_read_item);
 
                 loi_list_maint(cli, loi);
 
@@ -2858,25 +2889,25 @@ static int osc_import_event(struct obd_device *obd,
                 }
                 break;
         }
+        case IMP_EVENT_INACTIVE: {
+                if (obd->obd_observer)
+                        rc = obd_notify(obd->obd_observer, obd, 0);
+                break;
+        }
         case IMP_EVENT_INVALIDATE: {
                 struct ldlm_namespace *ns = obd->obd_namespace;
 
-                /* this used to try and tear down queued pages, but it was
-                 * not correctly implemented.  We'll have to do it again once
-                 * we call obd_invalidate_import() agian */
-                /* XXX And we still need to do this */
-
-                /* Reset grants, too */
+                /* Reset grants */
                 cli = &obd->u.cli;
                 spin_lock(&cli->cl_loi_list_lock);
                 cli->cl_avail_grant = 0;
                 cli->cl_lost_grant = 0;
+                /* all pages go to failing rpcs due to the invalid import */
+                osc_check_rpcs(cli);
                 spin_unlock(&cli->cl_loi_list_lock);
                 
                 ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
 
-                if (obd->obd_observer)
-                        rc = obd_notify(obd->obd_observer, obd, 0);
                 break;
         }
         case IMP_EVENT_ACTIVE: {
index 1887589..7ef963d 100644 (file)
@@ -327,6 +327,7 @@ void ptlrpc_set_add_req(struct ptlrpc_request_set *set,
         list_add_tail(&req->rq_set_chain, &set->set_requests);
         req->rq_set = set;
         set->set_remaining++;
+        atomic_inc(&req->rq_import->imp_inflight);
 }
 
 /* lock so many callers can add things, the context that owns the set
@@ -370,6 +371,11 @@ static int ptlrpc_import_delay_req(struct obd_import *imp,
                 DEBUG_REQ(D_ERROR, req, "IMP_CLOSED ");
                 *status = -EIO;
         }
+        /* allow CONNECT even if import is invalid */
+        else if (req->rq_send_state == LUSTRE_IMP_CONNECTING &&
+                 imp->imp_state == LUSTRE_IMP_CONNECTING) {
+                ;
+        }
         /*
          * If the import has been invalidated (such as by an OST failure), the
          * request must fail with -EIO.  
@@ -540,13 +546,6 @@ static int ptlrpc_send_new_req(struct ptlrpc_request *req)
         imp = req->rq_import;
         spin_lock_irqsave(&imp->imp_lock, flags);
 
-        if (imp->imp_invalid) {
-                spin_unlock_irqrestore(&imp->imp_lock, flags);
-                req->rq_status = -EIO;
-                req->rq_phase = RQ_PHASE_INTERPRET;
-                RETURN(-EIO);
-        }
-
         req->rq_import_generation = imp->imp_generation;
 
         if (ptlrpc_import_delay_req(imp, req, &rc)) {
@@ -807,6 +806,9 @@ int ptlrpc_check_set(struct ptlrpc_request_set *set)
                        req->rq_reqmsg->opc);
 
                 set->set_remaining--;
+
+                atomic_dec(&imp->imp_inflight);
+                wake_up(&imp->imp_recovery_waitq);
         }
 
         /* If we hit an error, we want to recover promptly. */
@@ -1323,6 +1325,7 @@ int ptlrpc_queue_wait(struct ptlrpc_request *req)
 
         LASSERT(req->rq_set == NULL);
         LASSERT(!req->rq_receiving_reply);
+        atomic_inc(&imp->imp_inflight);
 
         /* for distributed debugging */
         req->rq_reqmsg->status = current->pid;
@@ -1497,6 +1500,9 @@ restart:
 
         LASSERT(!req->rq_receiving_reply);
         req->rq_phase = RQ_PHASE_INTERPRET;
+
+        atomic_dec(&imp->imp_inflight);
+        wake_up(&imp->imp_recovery_waitq);
         RETURN(rc);
 }
 
index f46aa03..f2d034f 100644 (file)
@@ -42,7 +42,6 @@
 struct ptlrpc_connect_async_args {
          __u64 pcaa_peer_committed;
         int pcaa_initial_connect;
-        int pcaa_was_invalid;
 };
 
 /* A CLOSED import should remain so. */
@@ -115,29 +114,67 @@ int ptlrpc_set_import_discon(struct obd_import *imp)
         return rc;
 }
 
-void ptlrpc_invalidate_import(struct obd_import *imp)
+/*
+ * This acts as a barrier; all existing requests are rejected, and
+ * no new requests will be accepted until the import is valid again.
+ */
+void ptlrpc_deactivate_import(struct obd_import *imp)
 {
-        struct obd_device *obd = imp->imp_obd;
         unsigned long flags;
         ENTRY;
 
         spin_lock_irqsave(&imp->imp_lock, flags);
-        /* This is a bit of a hack, but invalidating replayable
-         * imports makes a temporary reconnect failure into a much more
-         * ugly -- and hard to remedy -- situation. */
-        if (!imp->imp_replayable) {
-                CDEBUG(D_HA, "setting import %s INVALID\n",
-                       imp->imp_target_uuid.uuid);
-                imp->imp_invalid = 1;
-        }
+        CDEBUG(D_HA, "setting import %s INVALID\n",
+               imp->imp_target_uuid.uuid);
+        imp->imp_invalid = 1;
         imp->imp_generation++;
         spin_unlock_irqrestore(&imp->imp_lock, flags);
 
         ptlrpc_abort_inflight(imp);
-        obd_import_event(obd, imp, IMP_EVENT_INVALIDATE);
+        obd_import_event(imp->imp_obd, imp, IMP_EVENT_INACTIVE);
+}
+
+/*
+ * This function will invalidate the import, if necessary, then block
+ * for all the RPC completions, and finally notify the obd to
+ * invalidate its state (ie cancel locks, clear pending requests,
+ * etc).
+ *
+ * in_rpc: true if this is called while processing an rpc, like
+ *    CONNECT. It will allow for one RPC to be inflight while
+ *    waiting for requests to complete. Ugly, yes, but I don't see an
+ *    cleaner way right now.
+ */
+void ptlrpc_invalidate_import(struct obd_import *imp, int in_rpc)
+{
+        struct l_wait_info lwi;
+        int inflight = 0;
+        int rc;
+
+        if (!imp->imp_invalid)
+                ptlrpc_deactivate_import(imp);
+
+        LASSERT(imp->imp_invalid);
+
+        if (in_rpc)
+                inflight = 1;
+        /* wait for all requests to error out and call completion
+           callbacks */
+        lwi = LWI_TIMEOUT_INTR(MAX(obd_timeout * HZ, 1), NULL,
+                               NULL, NULL);
+        rc = l_wait_event(imp->imp_recovery_waitq,
+                          (atomic_read(&imp->imp_inflight) == inflight),
+                          &lwi);
+
+        if (rc)
+                CERROR("%s: rc = %d waiting for callback (%d != %d)\n",
+                       imp->imp_target_uuid.uuid, rc,
+                       atomic_read(&imp->imp_inflight), inflight);
+
+        obd_import_event(imp->imp_obd, imp, IMP_EVENT_INVALIDATE);
 }
 
-void ptlrpc_validate_import(struct obd_import *imp)
+static void ptlrpc_activate_import(struct obd_import *imp)
 {
         struct obd_device *obd = imp->imp_obd;
         unsigned long flags;
@@ -164,7 +201,7 @@ void ptlrpc_fail_import(struct obd_import *imp, int generation)
                                imp->imp_target_uuid.uuid,
                                imp->imp_connection->c_remote_uuid.uuid,
                                imp->imp_obd->obd_name);
-                        ptlrpc_invalidate_import(imp);
+                        ptlrpc_deactivate_import(imp);
                 }
 
                 CDEBUG(D_HA, "%s: waking up pinger\n",
@@ -185,7 +222,6 @@ int ptlrpc_connect_import(struct obd_import *imp, char * new_uuid)
         int initial_connect = 0;
         int rc;
         __u64 committed_before_reconnect = 0;
-        int was_invalid = 0;
         struct ptlrpc_request *request;
         int size[] = {sizeof(imp->imp_target_uuid),
                                  sizeof(obd->obd_uuid),
@@ -223,10 +259,6 @@ int ptlrpc_connect_import(struct obd_import *imp, char * new_uuid)
 
         }
 
-        if (imp->imp_invalid) {
-                imp->imp_invalid = 0;
-                was_invalid = 1;
-        }
 
         spin_unlock_irqrestore(&imp->imp_lock, flags);
 
@@ -283,7 +315,6 @@ int ptlrpc_connect_import(struct obd_import *imp, char * new_uuid)
 
         aa->pcaa_peer_committed = committed_before_reconnect;
         aa->pcaa_initial_connect = initial_connect;
-        aa->pcaa_was_invalid = was_invalid;
 
         if (aa->pcaa_initial_connect)
                 imp->imp_replayable = 1;
@@ -359,11 +390,14 @@ static int ptlrpc_connect_interpret(struct ptlrpc_request *request,
                                imp->imp_target_uuid.uuid,
                                imp->imp_connection->c_remote_uuid.uuid);
                 }
-                IMPORT_SET_STATE(imp, LUSTRE_IMP_RECOVER);
+
+                if (imp->imp_invalid)
+                        IMPORT_SET_STATE(imp, LUSTRE_IMP_EVICTED);
+                else
+                        IMPORT_SET_STATE(imp, LUSTRE_IMP_RECOVER);
         } 
-        else if (MSG_CONNECT_RECOVERING & msg_flags) {
+        else if ((MSG_CONNECT_RECOVERING & msg_flags) && !imp->imp_invalid) {
                 LASSERT(imp->imp_replayable);
-                imp->imp_state = LUSTRE_IMP_RECOVER;
                 imp->imp_remote_handle = request->rq_repmsg->handle;
                 IMPORT_SET_STATE(imp, LUSTRE_IMP_REPLAY);
         } 
@@ -391,9 +425,6 @@ static int ptlrpc_connect_interpret(struct ptlrpc_request *request,
 finish:
         rc = ptlrpc_import_recovery_state_machine(imp);
         if (rc != 0) {
-                if (aa->pcaa_was_invalid)
-                        ptlrpc_invalidate_import(imp);
-
                 if (rc == -ENOTCONN) {
                         CDEBUG(D_HA, "evicted/aborted by %s@%s during recovery;"
                                "invalidating and reconnecting\n",
@@ -407,7 +438,7 @@ finish:
         if (rc != 0) {
                 IMPORT_SET_STATE(imp, LUSTRE_IMP_DISCON);
                 if (aa->pcaa_initial_connect && !imp->imp_initial_recov) {
-                        ptlrpc_invalidate_import(imp);
+                        ptlrpc_deactivate_import(imp);
                 }
                 CDEBUG(D_ERROR, "recovery of %s on %s failed (%d)\n",
                        imp->imp_target_uuid.uuid,
@@ -448,7 +479,6 @@ static int signal_completed_replay(struct obd_import *imp)
         RETURN(0);
 }
 
-
 int ptlrpc_import_recovery_state_machine(struct obd_import *imp)
 {
         int rc = 0;
@@ -458,7 +488,9 @@ int ptlrpc_import_recovery_state_machine(struct obd_import *imp)
                 CDEBUG(D_HA, "evicted from %s@%s; invalidating\n",
                        imp->imp_target_uuid.uuid,
                        imp->imp_connection->c_remote_uuid.uuid);
-                ptlrpc_invalidate_import(imp);
+
+                ptlrpc_invalidate_import(imp, 1);
+
                 IMPORT_SET_STATE(imp, LUSTRE_IMP_RECOVER);
         }
 
@@ -501,7 +533,7 @@ int ptlrpc_import_recovery_state_machine(struct obd_import *imp)
                 if (rc)
                         GOTO(out, rc);
                 IMPORT_SET_STATE(imp, LUSTRE_IMP_FULL);
-                ptlrpc_validate_import(imp);
+                ptlrpc_activate_import(imp);
         } 
 
         if (imp->imp_state == LUSTRE_IMP_FULL) {
index 4eff5c5..6c7c9a3 100644 (file)
@@ -102,6 +102,9 @@ int ptlrpc_start_bulk_transfer (struct ptlrpc_bulk_desc *desc)
         __u64               xid;
         ENTRY;
 
+        if (OBD_FAIL_CHECK_ONCE(OBD_FAIL_PTLRPC_BULK_PUT_NET)) 
+                RETURN(0);
+
         /* NB no locking required until desc is on the network */
         LASSERT (!desc->bd_network_rw);
         LASSERT (desc->bd_type == BULK_PUT_SOURCE ||
@@ -215,6 +218,9 @@ int ptlrpc_register_bulk (struct ptlrpc_request *req)
         ptl_md_t         md;
         ENTRY;
 
+        if (OBD_FAIL_CHECK_ONCE(OBD_FAIL_PTLRPC_BULK_GET_NET)) 
+                RETURN(0);
+
         /* NB no locking required until desc is on the network */
         LASSERT (desc->bd_nob > 0);
         LASSERT (!desc->bd_network_rw);
index 021fb0f..89c6cc0 100644 (file)
@@ -32,8 +32,6 @@ struct obd_import;
 struct ldlm_res_id;
 struct ptlrpc_request_set;
 
-void ptlrpc_daemonize(void);
-
 void ptlrpc_request_handle_notconn(struct ptlrpc_request *);
 void lustre_assert_wire_constants(void);
 int ptlrpc_import_in_recovery(struct obd_import *imp);
index dfb9635..ff19d6a 100644 (file)
@@ -134,6 +134,7 @@ EXPORT_SYMBOL(ptlrpc_stop_all_threads);
 EXPORT_SYMBOL(ptlrpc_start_n_threads);
 EXPORT_SYMBOL(ptlrpc_start_thread);
 EXPORT_SYMBOL(ptlrpc_unregister_service);
+EXPORT_SYMBOL(ptlrpc_daemonize);
 
 /* pack_generic.c */
 EXPORT_SYMBOL(lustre_msg_swabbed);
@@ -181,6 +182,7 @@ EXPORT_SYMBOL(ptlrpc_disconnect_import);
 EXPORT_SYMBOL(ptlrpc_resend);
 EXPORT_SYMBOL(ptlrpc_wake_delayed);
 EXPORT_SYMBOL(ptlrpc_set_import_active);
+EXPORT_SYMBOL(ptlrpc_deactivate_import);
 EXPORT_SYMBOL(ptlrpc_invalidate_import);
 EXPORT_SYMBOL(ptlrpc_fail_import);
 EXPORT_SYMBOL(ptlrpc_fail_export);
index 4e688a8..687f588 100644 (file)
@@ -54,7 +54,7 @@
 #include <linux/lprocfs_status.h>
 
 #define LIOD_STOP 0
-static struct ptlrpcd_ctl {
+struct ptlrpcd_ctl {
         unsigned long             pc_flags;
         spinlock_t                pc_lock;
         struct completion         pc_starting;
@@ -62,23 +62,40 @@ static struct ptlrpcd_ctl {
         struct list_head          pc_req_list;
         wait_queue_head_t         pc_waitq;
         struct ptlrpc_request_set *pc_set;
-} ptlrpcd_pc;
+#ifndef __KERNEL__
+        int                       pc_recurred;
+        void                     *pc_callback;
+#endif
+};
+
+static struct ptlrpcd_ctl ptlrpcd_pc;
+static struct ptlrpcd_ctl ptlrpcd_recovery_pc;
 
 static DECLARE_MUTEX(ptlrpcd_sem);
 static int ptlrpcd_users = 0;
 
-void ptlrpcd_wake(void)
+void ptlrpcd_wake(struct ptlrpc_request *req)
 {
-        struct ptlrpcd_ctl *pc = &ptlrpcd_pc;
+        struct ptlrpcd_ctl *pc = req->rq_ptlrpcd_data;
+
+        LASSERT(pc != NULL);
+
         wake_up(&pc->pc_waitq);
 }
 
 void ptlrpcd_add_req(struct ptlrpc_request *req)
 {
-        struct ptlrpcd_ctl *pc = &ptlrpcd_pc;
+        struct ptlrpcd_ctl *pc;
+
+        if (req->rq_send_state == LUSTRE_IMP_FULL)
+                pc = &ptlrpcd_pc;
+        else 
+                pc = &ptlrpcd_recovery_pc;
 
         ptlrpc_set_add_new_req(pc->pc_set, req);
-        ptlrpcd_wake();
+        req->rq_ptlrpcd_data = pc;
+                
+        ptlrpcd_wake(req);
 }
 
 static int ptlrpcd_check(struct ptlrpcd_ctl *pc)
@@ -169,8 +186,6 @@ static int ptlrpcd(void *arg)
         return 0;
 }
 #else
-static int ptlrpcd_recurred = 0;
-static void *ptlrpcd_callback;
 
 int ptlrpcd_check_async_rpcs(void *arg)
 {
@@ -178,25 +193,19 @@ int ptlrpcd_check_async_rpcs(void *arg)
         int                  rc = 0;
 
         /* single threaded!! */
-        ptlrpcd_recurred++;
+        pc->pc_recurred++;
 
-        if (ptlrpcd_recurred == 1)
+        if (pc->pc_recurred == 1)
                 rc = ptlrpcd_check(pc);
 
-        ptlrpcd_recurred--;
+        pc->pc_recurred--;
         return rc;
 }
 #endif
 
-int ptlrpcd_addref(void)
+static int ptlrpcd_start(struct ptlrpcd_ctl *pc)
 {
-        struct ptlrpcd_ctl *pc = &ptlrpcd_pc;
         int rc = 0;
-        ENTRY;
-
-        down(&ptlrpcd_sem);
-        if (++ptlrpcd_users != 1)
-                GOTO(out, rc);
 
         memset(pc, 0, sizeof(*pc));
         init_completion(&pc->pc_starting);
@@ -218,28 +227,57 @@ int ptlrpcd_addref(void)
 
         wait_for_completion(&pc->pc_starting);
 #else
-        ptlrpcd_callback =
+        pc->pc_callback =
                 liblustre_register_wait_callback(&ptlrpcd_check_async_rpcs, pc);
 #endif
 out:
+        RETURN(rc);
+}
+
+static void ptlrpcd_stop(struct ptlrpcd_ctl *pc)
+{
+        set_bit(LIOD_STOP, &pc->pc_flags);
+        wake_up(&pc->pc_waitq);
+#ifdef __KERNEL__
+        wait_for_completion(&pc->pc_finishing);
+#else
+        liblustre_deregister_wait_callback(pc->pc_callback);
+#endif
+        ptlrpc_set_destroy(pc->pc_set);
+}
+
+int ptlrpcd_addref(void)
+{
+        int rc = 0;
+        ENTRY;
+
+        down(&ptlrpcd_sem);
+        if (++ptlrpcd_users != 1)
+                GOTO(out, rc);
+
+        rc = ptlrpcd_start(&ptlrpcd_pc);
+        if (rc) {
+                --ptlrpcd_users;
+                GOTO(out, rc);
+        }
+
+        rc = ptlrpcd_start(&ptlrpcd_recovery_pc);
+        if (rc) {
+                ptlrpcd_stop(&ptlrpcd_pc);
+                --ptlrpcd_users;
+                GOTO(out, rc);
+        }
+out:
         up(&ptlrpcd_sem);
         RETURN(rc);
 }
 
 void ptlrpcd_decref(void)
 {
-        struct ptlrpcd_ctl *pc = &ptlrpcd_pc;
-
         down(&ptlrpcd_sem);
         if (--ptlrpcd_users == 0) {
-                set_bit(LIOD_STOP, &pc->pc_flags);
-                wake_up(&pc->pc_waitq);
-#ifdef __KERNEL__
-                wait_for_completion(&pc->pc_finishing);
-#else
-                liblustre_deregister_wait_callback(ptlrpcd_callback);
-#endif
-                ptlrpc_set_destroy(pc->pc_set);
+                ptlrpcd_stop(&ptlrpcd_pc);
+                ptlrpcd_stop(&ptlrpcd_recovery_pc);
         }
         up(&ptlrpcd_sem);
 }
index c3ab04d..18bc6f4 100644 (file)
@@ -258,9 +258,19 @@ void ptlrpc_request_handle_notconn(struct ptlrpc_request *failed_req)
                imp->imp_target_uuid.uuid,
                imp->imp_connection->c_remote_uuid.uuid);
         
-        ptlrpc_set_import_discon(imp);
+        if (ptlrpc_set_import_discon(imp)) {
+                if (!imp->imp_replayable) {
+                        CDEBUG(D_HA, "import %s@%s for %s not replayable, "
+                               "auto-deactivating\n",
+                               imp->imp_target_uuid.uuid,
+                               imp->imp_connection->c_remote_uuid.uuid,
+                               imp->imp_obd->obd_name);
+                        ptlrpc_deactivate_import(imp);
+                }
+
+                rc = ptlrpc_connect_import(imp, NULL);
+        }
 
-        rc = ptlrpc_connect_import(imp, NULL);
         
         /* Wait for recovery to complete and resend. If evicted, then
            this request will be errored out later.*/
@@ -276,10 +286,9 @@ void ptlrpc_request_handle_notconn(struct ptlrpc_request *failed_req)
  * This should only be called by the ioctl interface, currently
  * with the lctl deactivate and activate commands.
  */
-int ptlrpc_set_import_active(struct obd_import *imp, int active)
+int  ptlrpc_set_import_active(struct obd_import *imp, int active)
 {
         struct obd_device *obd = imp->imp_obd;
-        unsigned long flags;
         int rc = 0;
 
         LASSERT(obd);
@@ -287,23 +296,14 @@ int ptlrpc_set_import_active(struct obd_import *imp, int active)
         /* When deactivating, mark import invalid, and abort in-flight
          * requests. */
         if (!active) {
-                ptlrpc_invalidate_import(imp);
+                ptlrpc_invalidate_import(imp, 0);
         } 
 
         /* When activating, mark import valid, and attempt recovery */
         if (active) {
                 CDEBUG(D_HA, "setting import %s VALID\n",
                        imp->imp_target_uuid.uuid);
-                spin_lock_irqsave(&imp->imp_lock, flags);
-                imp->imp_invalid = 0;
-                spin_unlock_irqrestore(&imp->imp_lock, flags);
-
                 rc = ptlrpc_recover_import(imp, NULL);
-                if (rc) {
-                        spin_lock_irqsave(&imp->imp_lock, flags);
-                        imp->imp_invalid = 1;
-                        spin_unlock_irqrestore(&imp->imp_lock, flags);
-                }
         }
 
         RETURN(rc);
index 6200733..68d0ff9 100755 (executable)
@@ -10,7 +10,7 @@ init_test_env $@
 
 . ${CONFIG:=$LUSTRE/tests/cfg/insanity-local.sh}
 
-ALWAYS_EXCEPT="10"
+ALWAYS_EXCEPT="10"
 
 build_test_filter
 
index b79d32d..882c716 100755 (executable)
@@ -2,8 +2,8 @@
 
 set -e
 
-#         bug 2732 2986 2762 2766
-ALWAYS_EXCEPT="17   20b  16   18"
+#         bug  2986 
+ALWAYS_EXCEPT="20b"
 
 
 LUSTRE=${LUSTRE:-`dirname $0`/..}
@@ -70,6 +70,12 @@ if [ "$ONLY" == "cleanup" ]; then
     exit
 fi
 
+if [ "$ONLY" == "cleanup" ]; then
+    sysctl -w portals.debug=0 || true
+    cleanup
+    exit
+fi
+
 REFORMAT=--reformat $SETUP
 unset REFORMAT
 
@@ -207,9 +213,22 @@ test_15() {
 }
 run_test 15 "failed open (-ENOMEM)"
 
+stop_read_ahead() {
+   for f in /proc/fs/lustre/llite/*/read_ahead; do 
+      echo 0 > $f
+   done
+}
+
+start_read_ahead() {
+   for f in /proc/fs/lustre/llite/*/read_ahead; do 
+      echo 1 > $f
+   done
+}
+
 test_16() {
     do_facet client cp /etc/termcap $MOUNT
     sync
+    stop_read_ahead
 
 #define OBD_FAIL_PTLRPC_BULK_PUT_NET 0x504 | OBD_FAIL_ONCE
     sysctl -w lustre.fail_loc=0x80000504
@@ -218,20 +237,24 @@ test_16() {
     do_facet client "cmp /etc/termcap $MOUNT/termcap"  && return 1
     sysctl -w lustre.fail_loc=0
     # give recovery a chance to finish (shouldn't take long)
-    sleep 1
+    sleep $TIMEOUT
     do_facet client "cmp /etc/termcap $MOUNT/termcap"  || return 2
+    start_read_ahead
 }
 run_test 16 "timeout bulk put, evict client (2732)"
 
 test_17() {
-#define OBD_FAIL_PTLRPC_BULK_GET_NET 0x0503 | OBD_FAIL_ONCE
-    # will get evicted here
+    # OBD_FAIL_PTLRPC_BULK_GET_NET 0x0503 | OBD_FAIL_ONCE
+    # client will get evicted here
     sysctl -w lustre.fail_loc=0x80000503
-    do_facet client cp /etc/termcap $MOUNT && return 1
-
-    do_facet client "cmp /etc/termcap $MOUNT/termcap"  && return 1
+    do_facet client cp /etc/termcap $DIR/$tfile
     sysctl -w lustre.fail_loc=0
-    do_facet client "cmp /etc/termcap $MOUNT/termcap"  || return 2
+
+    sleep $TIMEOUT
+    # expect cmp to fail
+    do_facet client "cmp /etc/termcap $DIR/$tfile"  && return 1
+    do_facet client "rm $DIR/$tfile" || return 2
+    return 0
 }
 run_test 17 "timeout bulk get, evict client (2732)"
 
index f1523bb..d11007a 100755 (executable)
@@ -12,8 +12,8 @@ init_test_env $@
 ostfailover_HOST=${ostfailover_HOST:-$ost_HOST}
 
 # Skip these tests
+# BUG NUMBER: 2766?
 ALWAYS_EXCEPT="5"
-# test 5 needs a larger fs than what local normally has
 
 gen_config() {
     rm -f $XMLCONFIG
@@ -120,11 +120,15 @@ test_4() {
 run_test 4 "Fail OST during read, with verification"
 
 test_5() {
-    IOZONE_OPTS="-i 0 -i 1 -i 2 -+d -r 64 -s 1g"
+    FREE=`df -h $DIR | tail -n 1 | awk '{ print $3 }'`
+    case $FREE in
+    *T|*G) FREE=1G;;
+    esac
+    IOZONE_OPTS="-i 0 -i 1 -i 2 -+d -r 4 -s $FREE"
     iozone $IOZONE_OPTS -f $DIR/$tfile &
     PID=$!
     
-    sleep 10
+    sleep 8
     fail ost
     wait $PID || return 1
     rm -f $DIR/$tfile
index 5786fde..6fce04c 100755 (executable)
@@ -834,25 +834,19 @@ run_test 41 "read from a valid osc while other oscs are invalid"
 
 # test MDS recovery after ost failure
 test_42() {
-    blocks=`df $MOUNT | tail -1 | awk '{ print $1 }'`
+    blocks=`df $MOUNT | tail -1 | awk '{ print $1 }'`
     createmany -o $DIR/$tfile-%d 800
     replay_barrier ost
     unlinkmany $DIR/$tfile-%d 0 400
     facet_failover ost
     
     # osc is evicted, fs is smaller
-    set -vx
-    blocks_after=`df $MOUNT | tail -1 | awk '{ print $1 }'`
-    if [ "$blocks_after" = "Filesystem" ]; then
-       echo "df failed, assuming caused by OST failout"
-    else
-        [ $blocks_after -lt $blocks ] || return 1
-    fi
+    blocks_after=`df $MOUNT | tail -n 1 | awk '{ print $1 }'`
+    [ $blocks_after -lt $blocks ] || return 1
     echo wait for MDS to timeout and recover
     sleep $((TIMEOUT * 2))
     unlinkmany $DIR/$tfile-%d 400 400
     $CHECKSTAT -t file $DIR/$tfile-* && return 2 || true
-    set +vx
 }
 run_test 42 "recovery after ost failure"
 
@@ -920,6 +914,31 @@ test_46() {
 }
 run_test 46 "Don't leak file handle after open resend (3325)"
 
+# b=2824
+test_47() {
+
+    # create some files to make sure precreate has been done on all 
+    # OSTs. (just in case this test is run independently)
+    createmany -o $DIR/$tfile 20  || return 1
+
+    # OBD_FAIL_OST_CREATE_NET 0x204
+    fail ost
+    do_facet ost "sysctl -w lustre.fail_loc=0x80000204"
+    df $MOUNT || return 2
+
+    # let the MDS discover the OST failure, attempt to recover, fail
+    # and recover again.  
+    sleep $((3 * TIMEOUT))
+
+    # Without 2824, this createmany would hang 
+    createmany -o $DIR/$tfile 20 || return 3
+    unlinkmany $DIR/$tfile 20 || return 4
+
+    do_facet ost "sysctl -w lustre.fail_loc=0"
+    return 0
+}
+run_test 47 "MDS->OSC failure during precreate cleanup (2824)"
+
 equals_msg test complete, cleaning up
 $CLEANUP