Whamcloud - gitweb
Landing of b_recovery (at last).
authorshaver <shaver>
Thu, 28 Nov 2002 20:21:24 +0000 (20:21 +0000)
committershaver <shaver>
Thu, 28 Nov 2002 20:21:24 +0000 (20:21 +0000)
Highlights:
  - b=324: MDS recovery must replay transactions in strict transno sequence
  - b=325: getattr after OST failure returns -EIO
  - b=326: unlink after OST failure returns -EIO
  - b=400: new client can't join cluster after OST failure
  - b=403: multi-client access failure when OST fails
  - b=410: After an OST failure, lfind incorrectly displays file information
  - b=417: Freeing unreplayable requests twice (aed's fix from b_md)
  - b=402: (partial) give error for lstripe request that exceeds configured OSTs
  - much better support for reconnecting to MDS after network partition
    (still some lock-repeating issues to be resolved for some requests)
  - better support for connecting to multiple MDSes on one host (xid and
    transno and request_list are all per-import now)
  - track disconnecting clients in last_rcvd, for more reliable recovery
  - also, sync last_rcvd after connect/disconnect
  - reduced syslog/CERROR output for recovery (hi, Terry!)
  - server (DLM) timeout is half the system-wide timeout, to avoid cascading
    failure in the face of a dead client
  - don't wait for recovery to finish in order to send disconnect messages
  - removal of c_dying_head
  - don't wait for timeout to trigger recovery after ptl_send_rpc error
  - strict MDS transno ordering via mds_transno_sem (non-optimal, but correct)
  - many !handle -> IS_ERR(handle) fixes around mds_fs_start callers.
  - turn on client-eviction for bulk-timeouts in OST and MDS

31 files changed:
lustre/include/linux/lustre_export.h
lustre/include/linux/lustre_ha.h
lustre/include/linux/lustre_idl.h
lustre/include/linux/lustre_import.h
lustre/include/linux/lustre_lib.h
lustre/include/linux/lustre_mds.h
lustre/include/linux/lustre_net.h
lustre/include/linux/obd.h
lustre/ldlm/ldlm_lockd.c
lustre/ldlm/ldlm_request.c
lustre/lib/client.c
lustre/lib/target.c
lustre/llite/file.c
lustre/llite/recover.c
lustre/llite/super.c
lustre/lov/lov_obd.c
lustre/mdc/mdc_reint.c
lustre/mdc/mdc_request.c
lustre/mds/handler.c
lustre/mds/mds_fs.c
lustre/mds/mds_reint.c
lustre/obdclass/class_obd.c
lustre/obdclass/genops.c
lustre/osc/osc_request.c
lustre/ost/ost_handler.c
lustre/ptlrpc/client.c
lustre/ptlrpc/connection.c
lustre/ptlrpc/recovd.c
lustre/ptlrpc/recover.c
lustre/ptlrpc/rpc.c
lustre/utils/lconf

index 38551ac..dc2c0b5 100644 (file)
@@ -22,21 +22,16 @@ struct lov_export_data {
 
 struct obd_export {
         __u64                     exp_cookie;
-        struct lustre_handle      exp_impconnh;
         struct list_head          exp_obd_chain;
         struct list_head          exp_conn_chain;
         struct obd_device        *exp_obd;
         struct ptlrpc_connection *exp_connection;
-        struct ldlm_export_data   exp_ldlm_data;  /* can this go inside u? */
+        struct ldlm_export_data   exp_ldlm_data;
         union {
                 struct mds_export_data    eu_mds_data;
                 struct filter_export_data eu_filter_data;
                 struct lov_export_data    eu_lov_data;
         } u;
-        void                     *exp_data; /* device specific data */
-        int                       exp_desclen;
-        char                     *exp_desc;
-        obd_uuid_t                exp_uuid;
 };
 
 #define exp_mds_data    u.eu_mds_data
index f989e6d..8611e88 100644 (file)
@@ -54,6 +54,6 @@ extern struct recovd_obd *ptlrpc_recovd;
 
 int ptlrpc_run_recovery_upcall(struct ptlrpc_connection *conn);
 int ptlrpc_reconnect_import(struct obd_import *imp, int rq_opc);
-int ptlrpc_replay(struct ptlrpc_connection *conn);
+int ptlrpc_replay(struct obd_import *imp, int unreplied_only);
 
 #endif
index 2e01bde..ea75f08 100644 (file)
@@ -127,6 +127,7 @@ struct lustre_msg {
 
 /* Flags that apply to all requests are in the bottom 16 bits */
 #define MSG_GEN_FLAG_MASK  0x0000ffff
+#define MSG_LAST_REPLAY    1
 
 static inline int lustre_msg_get_flags(struct lustre_msg *msg)
 {
index 13b39b7..893fd0a 100644 (file)
@@ -21,9 +21,17 @@ struct obd_import {
         struct ptlrpc_client     *imp_client;
         struct lustre_handle      imp_handle;
         struct list_head          imp_chain;
+        struct list_head          imp_request_list;
         struct obd_device        *imp_obd;
         int                       imp_flags;
-        /* XXX need a UUID here, I think, unless we just use the OBD's UUID */
+        int                       imp_level;
+        __u64                     imp_last_xid;
+        __u64                     imp_max_transno;
+        __u64                     imp_peer_last_xid;
+        __u64                     imp_peer_committed_transno;
+
+        /* Protects flags, level, *_xid, request_list */
+        spinlock_t                imp_lock;
 };
 
 extern struct obd_import *class_conn2cliimp(struct lustre_handle *);
index a1e325b..1c6e0fd 100644 (file)
@@ -548,7 +548,6 @@ do {                                                                           \
             if (condition)                                                     \
                     break;                                                     \
             if (__state == TASK_INTERRUPTIBLE && l_killable_pending(current)) {\
-                CERROR("lwe: interrupt\n");                                    \
                 if (info->lwi_on_signal)                                       \
                         info->lwi_on_signal(info->lwi_cb_data);                \
                 ret = -EINTR;                                                  \
@@ -556,7 +555,6 @@ do {                                                                           \
             }                                                                  \
             if (info->lwi_timeout && !__timed_out) {                           \
                 if (schedule_timeout(info->lwi_timeout) == 0) {                \
-                    CERROR("lwe: timeout\n");                                  \
                     __timed_out = 1;                                           \
                     if (!info->lwi_on_timeout ||                               \
                         info->lwi_on_timeout(info->lwi_cb_data)) {             \
@@ -568,7 +566,6 @@ do {                                                                           \
                         __state = TASK_INTERRUPTIBLE;                          \
                         /* Check for a pending interrupt. */                   \
                         if (info->lwi_signals && l_killable_pending(current)) {\
-                            CERROR("lwe: pending interrupt\n");                \
                             if (info->lwi_on_signal)                           \
                                 info->lwi_on_signal(info->lwi_cb_data);        \
                             ret = -EINTR;                                      \
index 67d8542..936ce99 100644 (file)
@@ -194,8 +194,9 @@ int mdc_create_client(obd_uuid_t uuid, struct ptlrpc_client *cl);
 void mdc_store_inode_generation(struct ptlrpc_request *req, int reqoff,
                                 int repoff);
 
-extern int mds_client_add(struct mds_export_data *med, int cl_off);
-extern int mds_client_free(struct obd_export *exp);
+int mds_client_add(struct mds_obd *mds, struct mds_export_data *med,
+                   int cl_off);
+int mds_client_free(struct obd_export *exp);
 
 /* mds/mds_fs.c */
 struct mds_fs_operations {
index f1a0870..d3aa4fd 100644 (file)
@@ -78,19 +78,13 @@ struct ptlrpc_connection {
         __u32                   c_bootcount;   /* peer's boot count */
 
         spinlock_t              c_lock;        /* also protects req->rq_list */
-        __u32                   c_xid_in;
-        __u32                   c_xid_out;
 
         atomic_t                c_refcount;
         __u64                   c_token;
         __u64                   c_remote_conn;
         __u64                   c_remote_token;
 
-        __u64                   c_last_xid;    /* protected by c_lock */
-        __u64                   c_last_committed;/* protected by c_lock */
-        struct list_head        c_delayed_head;/* delayed until post-recovery */
-        struct list_head        c_sending_head;/* protected by c_lock */
-        struct list_head        c_dying_head;  /* protected by c_lock */
+        struct list_head        c_delayed_head;/* delayed until post-recovery XXX imp? */
         struct recovd_data      c_recovd_data;
 
         struct list_head        c_imports;
@@ -120,7 +114,7 @@ struct ptlrpc_client {
 #define PTL_RPC_FL_ERR       (1 << 5)
 #define PTL_RPC_FL_TIMEOUT   (1 << 6)
 #define PTL_RPC_FL_RESEND    (1 << 7)
-#define PTL_RPC_FL_RECOVERY  (1 << 8)  /* retransmission for recovery */
+#define PTL_RPC_FL_RESTART   (1 << 8)  /* operation must be restarted */
 #define PTL_RPC_FL_FINISHED  (1 << 9)
 #define PTL_RPC_FL_RETAIN    (1 << 10) /* retain for replay after reply */
 #define PTL_RPC_FL_REPLAY    (1 << 11) /* replay upon recovery */
@@ -169,13 +163,13 @@ struct ptlrpc_request {
 #define DEBUG_REQ(level, req, fmt, args...)                                    \
 do {                                                                           \
 CDEBUG(level,                                                                  \
-       "@@@ " fmt " req x"LPD64"/t"LPD64" o%d->%s:%d lens %d/%d fl "           \
+       "@@@ " fmt " req x"LPD64"/t"LPD64" o%d->%s:%d lens %d/%d ref %d fl "    \
        "%x\n" ,  ## args, req->rq_xid, req->rq_transno,                        \
        req->rq_reqmsg ? req->rq_reqmsg->opc : -1,                              \
        req->rq_connection ? (char *)req->rq_connection->c_remote_uuid : "<?>", \
        (req->rq_import && req->rq_import->imp_client) ?                        \
            req->rq_import->imp_client->cli_request_portal : -1,                \
-       req->rq_reqlen, req->rq_replen, req->rq_flags);                         \
+       req->rq_reqlen, req->rq_replen, req->rq_refcount, req->rq_flags);       \
 } while (0)
 
 struct ptlrpc_bulk_page {
@@ -326,7 +320,6 @@ struct ptlrpc_bulk_desc *ptlrpc_prep_bulk(struct ptlrpc_connection *);
 void ptlrpc_free_bulk(struct ptlrpc_bulk_desc *bulk);
 struct ptlrpc_bulk_page *ptlrpc_prep_bulk_page(struct ptlrpc_bulk_desc *desc);
 void ptlrpc_free_bulk_page(struct ptlrpc_bulk_page *page);
-int ptlrpc_check_status(struct ptlrpc_request *req, int err);
 
 /* rpc/service.c */
 struct ptlrpc_service *
index 70e1369..2235a1e 100644 (file)
@@ -119,25 +119,34 @@ struct client_obd {
 #define IOC_OSC_MAX_NR       50
 
 struct mds_obd {
-        struct ptlrpc_service *mds_service;
-
-        char *mds_fstype;
-        struct super_block *mds_sb;
-        struct super_operations *mds_sop;
-        struct vfsmount *mds_vfsmnt;
-        struct obd_run_ctxt mds_ctxt;
-        struct file_operations *mds_fop;
-        struct inode_operations *mds_iop;
+        struct ptlrpc_service           *mds_service;
+
+        char                            *mds_fstype;
+        struct super_block              *mds_sb;
+        struct super_operations         *mds_sop;
+        struct vfsmount                 *mds_vfsmnt;
+        struct obd_run_ctxt              mds_ctxt;
+        struct file_operations          *mds_fop;
+        struct inode_operations         *mds_iop;
         struct address_space_operations *mds_aops;
-        struct mds_fs_operations *mds_fsops;
-        int mds_max_mdsize;
-        struct file *mds_rcvd_filp;
-        spinlock_t mds_last_lock;
-        __u64 mds_last_committed;
-        __u64 mds_last_rcvd;
-        __u64 mds_mount_count;
-        struct ll_fid mds_rootfid;
-        struct mds_server_data *mds_server_data;
+        struct mds_fs_operations        *mds_fsops;
+
+        int                              mds_max_mdsize;
+        struct file                     *mds_rcvd_filp;
+        struct semaphore                 mds_transno_sem;
+        __u64                            mds_last_committed;
+        __u64                            mds_last_rcvd;
+        __u64                            mds_mount_count;
+        struct ll_fid                    mds_rootfid;
+        struct mds_server_data          *mds_server_data;
+
+        wait_queue_head_t                mds_next_transno_waitq;
+        __u64                            mds_next_recovery_transno;
+        int                              mds_recoverable_clients;
+        struct list_head                 mds_recovery_queue;
+        struct list_head                 mds_delayed_reply_queue;
+        spinlock_t                       mds_processing_task_lock;
+        pid_t                            mds_processing_task;
 };
 
 struct ldlm_obd {
index a1524bf..12af650 100644 (file)
@@ -77,7 +77,7 @@ static int ldlm_add_waiting_lock(struct ldlm_lock *lock)
         LASSERT(list_empty(&lock->l_pending_chain));
 
         spin_lock_bh(&waiting_locks_spinlock);
-        lock->l_callback_timeout = jiffies + (obd_timeout * HZ);
+        lock->l_callback_timeout = jiffies + (obd_timeout * HZ / 2);
 
         timeout_rounded = round_timeout(lock->l_callback_timeout);
 
index c23fd85..6672c3e 100644 (file)
@@ -230,7 +230,6 @@ int ldlm_cli_enqueue(struct lustre_handle *connh,
 
         LDLM_DEBUG(lock, "sending request");
         rc = ptlrpc_queue_wait(req);
-        rc = ptlrpc_check_status(req, rc);
 
         if (rc != ELDLM_OK) {
                 LASSERT(!is_replay);
@@ -407,7 +406,6 @@ int ldlm_cli_convert(struct lustre_handle *lockh, int new_mode, int *flags)
         req->rq_replen = lustre_msg_size(1, &size);
 
         rc = ptlrpc_queue_wait(req);
-        rc = ptlrpc_check_status(req, rc);
         if (rc != ELDLM_OK)
                 GOTO(out, rc);
 
@@ -464,7 +462,6 @@ int ldlm_cli_cancel(struct lustre_handle *lockh)
                 req->rq_replen = lustre_msg_size(0, NULL);
 
                 rc = ptlrpc_queue_wait(req);
-                rc = ptlrpc_check_status(req, rc);
                 ptlrpc_req_finished(req);
                 if (rc != ELDLM_OK)
                         GOTO(out, rc);
index c5590fa..bf5fac3 100644 (file)
@@ -46,6 +46,7 @@ int client_obd_setup(struct obd_device *obddev, obd_count len, void *buf)
         int rq_portal, rp_portal;
         char *name;
         struct client_obd *cli = &obddev->u.cli;
+        struct obd_import *imp = &cli->cl_import;
         obd_uuid_t server_uuid;
         ENTRY;
 
@@ -85,14 +86,17 @@ int client_obd_setup(struct obd_device *obddev, obd_count len, void *buf)
         memcpy(server_uuid, data->ioc_inlbuf2, MIN(data->ioc_inllen2,
                                                    sizeof(server_uuid)));
 
-        cli->cl_import.imp_connection = ptlrpc_uuid_to_connection(server_uuid);
-        if (!cli->cl_import.imp_connection)
+        imp->imp_connection = ptlrpc_uuid_to_connection(server_uuid);
+        if (!imp->imp_connection)
                 RETURN(-ENOENT);
+        
+        INIT_LIST_HEAD(&imp->imp_request_list);
+        spin_lock_init(&imp->imp_lock);
 
         ptlrpc_init_client(rq_portal, rp_portal, name,
                            &obddev->obd_ldlm_client);
-        cli->cl_import.imp_client = &obddev->obd_ldlm_client;
-        cli->cl_import.imp_obd = obddev;
+        imp->imp_client = &obddev->obd_ldlm_client;
+        imp->imp_obd = obddev;
 
         cli->cl_max_mds_easize = sizeof(struct lov_mds_md);
 
@@ -122,6 +126,7 @@ int client_obd_connect(struct lustre_handle *conn, struct obd_device *obd,
         char *tmp[] = {cli->cl_target_uuid, obd->obd_uuid};
         int rq_opc = (obd->obd_type->typ_ops->o_brw) ? OST_CONNECT :MDS_CONNECT;
         struct ptlrpc_connection *c;
+        struct obd_import *imp = &cli->cl_import;
 
         ENTRY;
         down(&cli->cl_sem);
@@ -140,6 +145,12 @@ int client_obd_connect(struct lustre_handle *conn, struct obd_device *obd,
         if (obd->obd_namespace == NULL)
                 GOTO(out_disco, rc = -ENOMEM);
 
+        INIT_LIST_HEAD(&imp->imp_chain);
+        imp->imp_last_xid = 0;
+        imp->imp_max_transno = 0;
+        imp->imp_peer_last_xid = 0;
+        imp->imp_peer_committed_transno = 0;
+
         request = ptlrpc_prep_req(&cli->cl_import, rq_opc, 2, size, tmp);
         if (!request)
                 GOTO(out_ldlm, rc = -ENOMEM);
@@ -153,16 +164,15 @@ int client_obd_connect(struct lustre_handle *conn, struct obd_device *obd,
         recovd_conn_manage(c, recovd, recover);
 
         rc = ptlrpc_queue_wait(request);
-        rc = ptlrpc_check_status(request, rc);
         if (rc)
                 GOTO(out_req, rc);
 
         if (rq_opc == MDS_CONNECT)
-                cli->cl_import.imp_flags |= IMP_REPLAYABLE;
-        list_add(&cli->cl_import.imp_chain, &c->c_imports);
+                imp->imp_flags |= IMP_REPLAYABLE;
+        list_add(&imp->imp_chain, &c->c_imports);
         c->c_level = LUSTRE_CONN_FULL;
-        cli->cl_import.imp_handle.addr = request->rq_repmsg->addr;
-        cli->cl_import.imp_handle.cookie = request->rq_repmsg->cookie;
+        imp->imp_handle.addr = request->rq_repmsg->addr;
+        imp->imp_handle.cookie = request->rq_repmsg->cookie;
 
         EXIT;
 out_req:
@@ -171,9 +181,21 @@ out_req:
 out_ldlm:
                 ldlm_namespace_free(obd->obd_namespace);
                 obd->obd_namespace = NULL;
+                if (rq_opc == MDS_CONNECT) {
+                        /* Don't class_disconnect OSCs, because the LOV
+                         * cares about them even if they can't connect to the
+                         * OST.
+                         *
+                         * This is leak-bait, but without either a way to
+                         * operate on the osc without an export or separate
+                         * methods for connect-to-osc and connect-osc-to-ost
+                         * it's not clear what else to do.
+                         */
 out_disco:
-                class_disconnect(conn);
-                MOD_DEC_USE_COUNT;
+                        cli->cl_conn_count--;
+                        class_disconnect(conn);
+                        MOD_DEC_USE_COUNT;
+                }
         }
 out_sem:
         up(&cli->cl_sem);
@@ -210,12 +232,16 @@ int client_obd_disconnect(struct lustre_handle *conn)
 
         ldlm_namespace_free(obd->obd_namespace);
         obd->obd_namespace = NULL;
-        request = ptlrpc_prep_req(&cli->cl_import, rq_opc, 0, NULL, NULL);
+        request = ptlrpc_prep_req(&cli->cl_import, rq_opc, 0, NULL,
+                                  NULL);
         if (!request)
                 GOTO(out_disco, rc = -ENOMEM);
-
+        
         request->rq_replen = lustre_msg_size(0, NULL);
 
+        /* Process disconnects even if we're waiting for recovery. */
+        request->rq_level = LUSTRE_CONN_RECOVD;
+        
         rc = ptlrpc_queue_wait(request);
         if (rc)
                 GOTO(out_req, rc);
index 8786ee8..7666663 100644 (file)
@@ -69,7 +69,8 @@ int target_handle_connect(struct ptlrpc_request *req)
 
         rc = obd_connect(&conn, target, cluuid, ptlrpc_recovd,
                          target_revoke_connection);
-        if (rc)
+        /* EALREADY indicates a reconnection, send the reply normally. */
+        if (rc && rc != EALREADY)
                 GOTO(out, rc);
 
         rc = lustre_pack_msg(0, NULL, NULL, &req->rq_replen, &req->rq_repmsg);
@@ -99,6 +100,7 @@ int target_handle_connect(struct ptlrpc_request *req)
         dlmimp->imp_handle.addr = req->rq_reqmsg->addr;
         dlmimp->imp_handle.cookie = req->rq_reqmsg->cookie;
         dlmimp->imp_obd = /* LDLM! */ NULL;
+        spin_lock_init(&dlmimp->imp_lock);
         
         req->rq_connection->c_level = LUSTRE_CONN_FULL;
 out:
index dbc0485..a67b023 100644 (file)
@@ -158,7 +158,8 @@ out_mdc:
         mdc_close(&sbi->ll_mdc_conn, inode->i_ino,
                   S_IFREG, &fd->fd_mdshandle, &req);
 out_req:
-        ptlrpc_free_req(req);
+        ptlrpc_req_finished(req); /* once for reply */
+        ptlrpc_req_finished(req); /* once for an early "commit" */
 //out_fd:
         fd->fd_mdshandle.cookie = DEAD_HANDLE_MAGIC;
         kmem_cache_free(ll_file_data_slab, fd);
@@ -344,11 +345,7 @@ out_mdc:
                         rc = -abs(rc2);
                 GOTO(out_fd, rc);
         }
-        CDEBUG(D_HA, "matched req %p xid "LPD64" transno "LPD64" op "
-               "%d->%s:%d\n", fd->fd_req, fd->fd_req->rq_xid,
-               fd->fd_req->rq_repmsg->transno, fd->fd_req->rq_reqmsg->opc,
-               fd->fd_req->rq_import->imp_connection->c_remote_uuid,
-               fd->fd_req->rq_import->imp_client->cli_request_portal);
+        DEBUG_REQ(D_HA, fd->fd_req, "matched open for this close: ");
         ptlrpc_req_finished(fd->fd_req);
 
         if (atomic_dec_and_test(&lli->lli_open_count)) {
index b688fb9..8acd1bb 100644 (file)
@@ -33,26 +33,16 @@ static void abort_inflight_for_import(struct obd_import *imp)
         imp->imp_flags |= IMP_INVALID;
         spin_unlock(&imp->imp_connection->c_lock);
 
-        list_for_each_safe(tmp, n, &imp->imp_connection->c_sending_head) {
+        list_for_each_safe(tmp, n, &imp->imp_request_list) {
                 struct ptlrpc_request *req =
                         list_entry(tmp, struct ptlrpc_request, rq_list);
 
-                if (req->rq_import != imp)
-                        continue;
-
                 if (req->rq_flags & PTL_RPC_FL_REPLIED) {
                         /* no need to replay, just discard */
-                        CERROR("uncommitted req xid "LPD64" op %d to OST %s\n",
-                               (unsigned long long)req->rq_xid,
-                               req->rq_reqmsg->opc,
-                               imp->imp_obd->u.cli.cl_target_uuid);
+                        DEBUG_REQ(D_ERROR, req, "uncommitted");
                         ptlrpc_req_finished(req);
                 } else {
-                        CERROR("inflight req xid "LPD64" op %d to OST %s\n",
-                               (unsigned long long)req->rq_xid,
-                               req->rq_reqmsg->opc,
-                               imp->imp_obd->u.cli.cl_target_uuid);
-
+                        DEBUG_REQ(D_ERROR, req, "inflight");
                         req->rq_flags |= PTL_RPC_FL_ERR;
                         wake_up(&req->rq_wait_for_rep);
                 }
@@ -61,9 +51,11 @@ static void abort_inflight_for_import(struct obd_import *imp)
         list_for_each_safe(tmp, n, &imp->imp_connection->c_delayed_head) {
                 struct ptlrpc_request *req =
                         list_entry(tmp, struct ptlrpc_request, rq_list);
-                CERROR("aborting waiting req xid "LPD64" op %d to OST %s\n",
-                       (unsigned long long)req->rq_xid, req->rq_reqmsg->opc,
-                       imp->imp_obd->u.cli.cl_target_uuid);
+
+                if (req->rq_import != imp)
+                        continue;
+
+                DEBUG_REQ(D_ERROR, req, "aborting waiting req");
                 req->rq_flags |= PTL_RPC_FL_ERR;
                 wake_up(&req->rq_wait_for_rep);
         }
@@ -149,53 +141,32 @@ static void reconnect_osc(struct obd_import *imp)
                        imp->imp_obd->obd_uuid);
 }
 
-static int reconnect_mdc(struct obd_import *imp)
+static void reconnect_mdc(struct obd_import *imp)
 {
-        return ptlrpc_reconnect_import(imp, MDS_CONNECT);
+        int rc = ptlrpc_reconnect_import(imp, MDS_CONNECT);
+        if (!rc)
+                ptlrpc_replay(imp, 0 /* all reqs */);
+        else if (rc == EALREADY)
+                ptlrpc_replay(imp, 1 /* only unreplied reqs */);
 }
 
 static int ll_reconnect(struct ptlrpc_connection *conn)
 {
         struct list_head *tmp;
-        int need_replay = 0;
 
         ENTRY;
-
-        /* XXX c_lock semantics! */
-        conn->c_level = LUSTRE_CONN_CON;
-
-        /* XXX this code MUST be shared with class_obd_connect! */
         list_for_each(tmp, &conn->c_imports) {
                 struct obd_import *imp = list_entry(tmp, struct obd_import,
                                                     imp_chain);
                 if (imp->imp_obd->obd_type->typ_ops->o_brw) {
-                        /* XXX what to do if we fail? */
                         reconnect_osc(imp);
                 } else {
-                        int rc = reconnect_mdc(imp);
-                        if (!rc) {
-                                need_replay = 1;
-                        }
-                        /* make sure we don't try to replay for dead imps?
-                         *
-                         * else imp->imp_connection = NULL;
-                         *
-                         */
-
+                        reconnect_mdc(imp);
                 }
         }
 
-        if (!need_replay) {
-                /* all done! */
-                conn->c_level = LUSTRE_CONN_FULL;
-                RETURN(0);
-        }
-
-        conn->c_level = LUSTRE_CONN_RECOVD;
-        /* this will replay, up the c_level, recovd_conn_fixed and continue
-         * reqs. also, makes a mean cup of coffee.
-         */
-        RETURN(ptlrpc_replay(conn));
+        conn->c_level = LUSTRE_CONN_FULL;
+        RETURN(0);
 }
 
 int ll_recover(struct recovd_data *rd, int phase)
index 01e00ce..cb7136c 100644 (file)
@@ -574,7 +574,7 @@ void ll_umount_begin(struct super_block *sb)
                 /* XXX should just be dealing with imports, probably through
                  * XXX iocontrol, need next-gen recovery! */
                 conn->c_flags |= CONN_INVALID;
-                invalidate_request_list(&conn->c_sending_head);
+                /* invalidate_request_list(&conn->c_sending_head); */
                 invalidate_request_list(&conn->c_delayed_head);
                 spin_unlock(&conn->c_lock);
         }
index c840368..15fe873 100644 (file)
@@ -158,6 +158,7 @@ static int lov_connect(struct lustre_handle *conn, struct obd_device *obd,
 
         for (i = 0; i < desc->ld_tgt_count; i++) {
                 struct obd_device *tgt = class_uuid2obd(uuidarray[i]);
+                int rc2;
 
                 if (!tgt) {
                         CERROR("Target %s not attached\n", uuidarray[i]);
@@ -171,18 +172,27 @@ static int lov_connect(struct lustre_handle *conn, struct obd_device *obd,
 
                 rc = obd_connect(&lov->tgts[i].conn, tgt, NULL, recovd,
                                  recover);
-                if (rc) {
-                        CERROR("Target %s connect error %d\n",
-                               uuidarray[i], rc);
-                        GOTO(out_disc, rc);
+
+                /* Register even if connect failed, so that we get reactivation
+                 * notices.
+                 */
+                rc2 = obd_iocontrol(IOC_OSC_REGISTER_LOV, &lov->tgts[i].conn,
+                                    sizeof(struct obd_device *), obd, NULL);
+                if (rc2) {
+                        CERROR("Target %s REGISTER_LOV error %d\n",
+                               uuidarray[i], rc2);
+                        GOTO(out_disc, rc2);
                 }
-                rc = obd_iocontrol(IOC_OSC_REGISTER_LOV, &lov->tgts[i].conn,
-                                   sizeof(struct obd_device *), obd, NULL);
+
+                /* But mark failed-connect OSCs as inactive! */
                 if (rc) {
-                        CERROR("Target %s REGISTER_LOV error %d\n",
+                        CDEBUG(D_INFO, "Target %s connect error %d\n",
                                uuidarray[i], rc);
-                        GOTO(out_disc, rc);
+                        LASSERT(lov->tgts[i].active == 0);
+                        rc = 0;
+                        continue;
                 }
+                
                 desc->ld_active_tgt_count++;
                 lov->tgts[i].active = 1;
         }
@@ -227,19 +237,17 @@ static int lov_disconnect(struct lustre_handle *conn)
                 goto out_local;
 
         for (i = 0; i < lov->desc.ld_tgt_count; i++) {
-                if (!lov->tgts[i].active) {
-                        CERROR("Skipping disconnect for inactive OSC %s\n",
-                               lov->tgts[i].uuid);
-                        continue;
-                }
-
-                lov->desc.ld_active_tgt_count--;
-                lov->tgts[i].active = 0;
                 rc = obd_disconnect(&lov->tgts[i].conn);
                 if (rc) {
-                        CERROR("Target %s disconnect error %d\n",
-                               lov->tgts[i].uuid, rc);
-                        RETURN(rc);
+                        if (lov->tgts[i].active) {
+                                CERROR("Target %s disconnect error %d\n",
+                                       lov->tgts[i].uuid, rc);
+                        }
+                        rc = 0;
+                }
+                if (lov->tgts[i].active) {
+                        lov->desc.ld_active_tgt_count--;
+                        lov->tgts[i].active = 0;
                 }
         }
         OBD_FREE(lov->tgts, lov->bufsize);
@@ -313,10 +321,24 @@ static int lov_set_osc_active(struct lov_obd *lov, obd_uuid_t uuid,
         CDEBUG(D_INFO, "Marking OBD %p %sactive\n", obd, activate ? "" : "in");
 
         lov->tgts[i].active = activate;
-        if (activate)
+        if (activate) {
+                /*
+                 * foreach(export)
+                 *     foreach(open_file)
+                 *         if (file_handle uses this_osc)
+                 *             if (has_no_filehandle)
+                 *                 open(file_handle, this_osc);
+                 */
+                /* XXX reconnect? */
                 lov->desc.ld_active_tgt_count++;
-        else
+        } else {
+                /*
+                 * Should I invalidate filehandles that refer to this OSC, so
+                 * that I reopen them during reactivation?
+                 */
+                /* XXX disconnect from OSC? */
                 lov->desc.ld_active_tgt_count--;
+        }
 
         EXIT;
  out:
@@ -332,7 +354,7 @@ static int lov_setup(struct obd_device *obd, obd_count len, void *buf)
         ENTRY;
 
         if (data->ioc_inllen1 < 1) {
-                CERROR("osc setup requires an MDC UUID\n");
+                CERROR("LOV setup requires an MDC UUID\n");
                 RETURN(-EINVAL);
         }
 
@@ -400,6 +422,10 @@ static int lov_create(struct lustre_handle *conn, struct obdo *oa,
 
         lsm = *ea;
 
+        /* Can't create more stripes than we have targets (incl inactive). */
+        if (lsm && lsm->lsm_stripe_count > lov->desc.ld_tgt_count)
+                GOTO(out_tmp, rc = -EINVAL);
+
         /* Free the user lsm if it needs to be changed, to avoid memory leaks */
         if (!lsm || (lsm &&
                      lsm->lsm_stripe_count > lov->desc.ld_active_tgt_count)) {
@@ -494,7 +520,7 @@ static int lov_create(struct lustre_handle *conn, struct obdo *oa,
 
  out_tmp:
         obdo_free(tmp);
-        return rc;
+        RETURN(rc);
 
  out_cleanup:
         while (i-- > 0) {
@@ -547,6 +573,12 @@ static int lov_destroy(struct lustre_handle *conn, struct obdo *oa,
 
         lov = &export->exp_obd->u.lov;
         for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++,loi++) {
+                int err;
+                if (lov->tgts[loi->loi_ost_idx].active == 0) {
+                        /* Orphan clean up will (someday) fix this up. */
+                        continue;
+                }
+
                 memcpy(&tmp, oa, sizeof(tmp));
                 tmp.o_id = loi->loi_id;
                 if (lfh)
@@ -554,11 +586,15 @@ static int lov_destroy(struct lustre_handle *conn, struct obdo *oa,
                                sizeof(lfh->lfh_handles[i]));
                 else
                         tmp.o_valid &= ~OBD_MD_FLHANDLE;
-                rc = obd_destroy(&lov->tgts[loi->loi_ost_idx].conn, &tmp, NULL);
-                if (rc)
-                        CERROR("Error destroying objid "LPX64" subobj "LPX64
-                               " on OST idx %d\n: rc = %d",
-                               oa->o_id, loi->loi_id, loi->loi_ost_idx, rc);
+                err = obd_destroy(&lov->tgts[loi->loi_ost_idx].conn, &tmp,
+                                  NULL);
+                if (err && lov->tgts[loi->loi_ost_idx].active) {
+                        CERROR("Error destroying objid "LPX64" subobj "
+                               LPX64" on OST idx %d\n: rc = %d",
+                               oa->o_id, loi->loi_id, loi->loi_ost_idx, err);
+                        if (!rc)
+                                rc = err;
+                }
         }
         RETURN(rc);
 }
@@ -620,7 +656,7 @@ static int lov_getattr(struct lustre_handle *conn, struct obdo *oa,
         struct lov_obd *lov;
         struct lov_oinfo *loi;
         struct lov_file_handles *lfh = NULL;
-        int rc = 0, i;
+        int i;
         int new = 1;
         ENTRY;
 
@@ -649,6 +685,9 @@ static int lov_getattr(struct lustre_handle *conn, struct obdo *oa,
                 if (loi->loi_id == 0)
                         continue;
 
+                if (lov->tgts[loi->loi_ost_idx].active == 0)
+                        continue;
+
                 CDEBUG(D_INFO, "objid "LPX64"[%d] has subobj "LPX64" at idx "
                        "%u\n", oa->o_id, i, loi->loi_id, loi->loi_ost_idx);
                 /* create data objects with "parent" OA */
@@ -661,17 +700,16 @@ static int lov_getattr(struct lustre_handle *conn, struct obdo *oa,
                         tmp.o_valid &= ~OBD_MD_FLHANDLE;
 
                 err = obd_getattr(&lov->tgts[loi->loi_ost_idx].conn, &tmp,NULL);
-                if (err) {
+                if (err && lov->tgts[loi->loi_ost_idx].active) {
                         CERROR("Error getattr objid "LPX64" subobj "LPX64
                                " on OST idx %d: rc = %d\n",
                                oa->o_id, loi->loi_id, loi->loi_ost_idx, err);
-                        if (!rc)
-                                rc = err;
-                        continue; /* XXX or break? */
+                        RETURN(err);
                 }
                 lov_merge_attrs(oa, &tmp, tmp.o_valid, lsm, i, &new);
         }
-        RETURN(rc);
+
+        RETURN(0);
 }
 
 static int lov_setattr(struct lustre_handle *conn, struct obdo *oa,
@@ -744,11 +782,12 @@ static int lov_setattr(struct lustre_handle *conn, struct obdo *oa,
 static int lov_open(struct lustre_handle *conn, struct obdo *oa,
                     struct lov_stripe_md *lsm)
 {
-        struct obdo *tmp;
+        struct obdo *tmp; /* on the heap here, on the stack in lov_close? */
         struct obd_export *export = class_conn2export(conn);
         struct lov_obd *lov;
         struct lov_oinfo *loi;
         struct lov_file_handles *lfh = NULL;
+        struct lustre_handle *handle;
         int new = 1;
         int rc = 0, i;
         ENTRY;
@@ -783,20 +822,22 @@ static int lov_open(struct lustre_handle *conn, struct obdo *oa,
         oa->o_size = 0;
         oa->o_blocks = 0;
         for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++,loi++) {
-                int err;
+
+                if (lov->tgts[loi->loi_ost_idx].active == 0) {
+                        continue;
+                }
 
                 /* create data objects with "parent" OA */
                 memcpy(tmp, oa, sizeof(*tmp));
                 tmp->o_id = loi->loi_id;
 
-                err = obd_open(&lov->tgts[loi->loi_ost_idx].conn, tmp, NULL);
-                if (err) {
+                rc = obd_open(&lov->tgts[loi->loi_ost_idx].conn, tmp, NULL);
+                if (rc && lov->tgts[loi->loi_ost_idx].active) {
                         CERROR("Error open objid "LPX64" subobj "LPX64
                                " on OST idx %d: rc = %d\n",
                                oa->o_id, lsm->lsm_oinfo[i].loi_id,
                                loi->loi_ost_idx, rc);
-                        if (!rc)
-                                rc = err;
+                        goto out_handles;
                 }
 
                 lov_merge_attrs(oa, tmp, tmp->o_valid, lsm, i, &new);
@@ -806,31 +847,40 @@ static int lov_open(struct lustre_handle *conn, struct obdo *oa,
                                sizeof(lfh->lfh_handles[i]));
         }
 
-        if (tmp->o_valid & OBD_MD_FLHANDLE) {
-                struct lustre_handle *handle = obdo_handle(oa);
+        handle = obdo_handle(oa);
+        
+        lfh->lfh_count = lsm->lsm_stripe_count;
+        get_random_bytes(&lfh->lfh_cookie, sizeof(lfh->lfh_cookie));
+        
+        handle->addr = (__u64)(unsigned long)lfh;
+        handle->cookie = lfh->lfh_cookie;
+        oa->o_valid |= OBD_MD_FLHANDLE;
+        list_add(&lfh->lfh_list, &export->exp_lov_data.led_open_head);
 
-                lfh->lfh_count = lsm->lsm_stripe_count;
-                get_random_bytes(&lfh->lfh_cookie, sizeof(lfh->lfh_cookie));
-
-                handle->addr = (__u64)(unsigned long)lfh;
-                handle->cookie = lfh->lfh_cookie;
-                oa->o_valid |= OBD_MD_FLHANDLE;
-                list_add(&lfh->lfh_list, &export->exp_lov_data.led_open_head);
-        } else
-                goto out_handles;
-
-        /* FIXME: returning an error, but having opened some objects is a bad
-         *        idea, since they will likely never be closed.  We either
-         *        need to not return an error if _some_ objects could be
-         *        opened, and leave it to read/write to return -EIO (with
-         *        hopefully partial error status) or close all opened objects
-         *        and return an error.  I think the former is preferred.
-         */
 out_tmp:
         obdo_free(tmp);
         RETURN(rc);
 
 out_handles:
+        for (i--, loi = &lsm->lsm_oinfo[i]; i >= 0; i--, loi--) {
+                int err;
+
+                if (lov->tgts[loi->loi_ost_idx].active == 0)
+                        continue;
+
+                memcpy(tmp, oa, sizeof(*tmp));
+                tmp->o_id = loi->loi_id;
+                memcpy(obdo_handle(tmp), &lfh->lfh_handles[i],
+                       sizeof(lfh->lfh_handles[i]));
+
+                err = obd_close(&lov->tgts[loi->loi_ost_idx].conn, tmp, NULL);
+                if (err) {
+                        CERROR("Error closing objid "LPX64" subobj "LPX64
+                               " on OST idx %d after open error: rc = %d\n",
+                               oa->o_id, loi->loi_id, loi->loi_ost_idx, err);
+                }
+        }
+       
         OBD_FREE(lfh->lfh_handles,
                  lsm->lsm_stripe_count * sizeof(*lfh->lfh_handles));
 out_lfh:
@@ -870,6 +920,9 @@ static int lov_close(struct lustre_handle *conn, struct obdo *oa,
         lov = &export->exp_obd->u.lov;
         for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++,loi++) {
                 int err;
+                
+                if (lov->tgts[loi->loi_ost_idx].active == 0)
+                        continue;
 
                 /* create data objects with "parent" OA */
                 memcpy(&tmp, oa, sizeof(tmp));
@@ -1119,6 +1172,8 @@ static int lov_enqueue(struct lustre_handle *conn, struct lov_stripe_md *lsm,
                 RETURN(-EINVAL);
         }
 
+        /* XXX assert that we're not in recovery */
+
         if (!export || !export->exp_obd)
                 RETURN(-ENODEV);
 
@@ -1128,6 +1183,7 @@ static int lov_enqueue(struct lustre_handle *conn, struct lov_stripe_md *lsm,
                 struct ldlm_extent sub_ext;
                 struct lov_stripe_md submd;
 
+                *flags = 0;
                 sub_ext.start = lov_stripe_offset(lsm, extent->start, i);
                 sub_ext.end = lov_stripe_offset(lsm, extent->end, i);
                 if (sub_ext.start == sub_ext.end)
index 433d365..63c1ef0 100644 (file)
@@ -37,7 +37,6 @@ static int mdc_reint(struct ptlrpc_request *request, int level)
         request->rq_level = level;
 
         rc = ptlrpc_queue_wait(request);
-        rc = ptlrpc_check_status(request, rc);
 
         if (rc) {
                 CERROR("error in handling %d\n", rc);
index c409b3d..a9a5d9a 100644 (file)
@@ -56,7 +56,6 @@ int mdc_getstatus(struct lustre_handle *conn, struct ll_fid *rootfid)
 
         mds_pack_req_body(req);
         rc = ptlrpc_queue_wait(req);
-        rc = ptlrpc_check_status(req, rc);
 
         if (!rc) {
                 body = lustre_msg_buf(req->rq_repmsg, 0);
@@ -100,7 +99,6 @@ int mdc_getlovinfo(struct obd_device *obd, struct lustre_handle *mdc_connh,
         req->rq_replen = lustre_msg_size(2, size);
 
         rc = ptlrpc_queue_wait(req);
-        rc = ptlrpc_check_status(req, rc);
 
  out:
         RETURN(rc);
@@ -136,7 +134,6 @@ int mdc_getattr(struct lustre_handle *conn,
         mds_pack_req_body(req);
 
         rc = ptlrpc_queue_wait(req);
-        rc = ptlrpc_check_status(req, rc);
 
         if (!rc) {
                 body = lustre_msg_buf(req->rq_repmsg, 0);
@@ -225,6 +222,8 @@ void mdc_store_inode_generation(struct ptlrpc_request *req, int reqoff,
         struct mds_rec_create *rec = lustre_msg_buf(req->rq_reqmsg, reqoff);
         struct mds_body *body = lustre_msg_buf(req->rq_repmsg, repoff);
 
+        DEBUG_REQ(D_HA, req, "storing generation %x for ino "LPD64,
+                  body->fid1.generation, body->fid1.id);
         memcpy(&rec->cr_replayfid, &body->fid1, sizeof rec->cr_replayfid);
 }
 
@@ -496,7 +495,6 @@ int mdc_open(struct lustre_handle *conn, obd_id ino, int type, int flags,
         req->rq_replen = lustre_msg_size(1, size);
 
         rc = ptlrpc_queue_wait(req);
-        rc = ptlrpc_check_status(req, rc);
         if (!rc) {
                 body = lustre_msg_buf(req->rq_repmsg, 0);
                 mds_unpack_body(body);
@@ -533,7 +531,6 @@ int mdc_close(struct lustre_handle *conn, obd_id ino, int type,
         req->rq_replen = lustre_msg_size(0, NULL);
 
         rc = ptlrpc_queue_wait(req);
-        rc = ptlrpc_check_status(req, rc);
 
         EXIT;
  out:
@@ -580,7 +577,6 @@ int mdc_readpage(struct lustre_handle *conn, obd_id ino, int type, __u64 offset,
 
         req->rq_replen = lustre_msg_size(1, &size);
         rc = ptlrpc_queue_wait(req);
-        rc = ptlrpc_check_status(req, rc);
         if (rc) {
                 ptlrpc_abort_bulk(desc);
                 GOTO(out2, rc);
@@ -611,7 +607,6 @@ static int mdc_statfs(struct lustre_handle *conn, struct obd_statfs *osfs)
         req->rq_replen = lustre_msg_size(1, &size);
 
         rc = ptlrpc_queue_wait(req);
-        rc = ptlrpc_check_status(req, rc);
 
         if (rc)
                 GOTO(out, rc);
index d78ad53..4f5f6e3 100644 (file)
@@ -8,6 +8,7 @@
  *   Author: Peter Braam <braam@clusterfs.com>
  *   Author: Andreas Dilger <adilger@clusterfs.com>
  *   Author: Phil Schwan <phil@clusterfs.com>
+ *   Author: Mike Shaver <shaver@clusterfs.com>
  *
  *   This file is part of Lustre, http://www.lustre.org.
  *
@@ -46,8 +47,9 @@ static kmem_cache_t *mds_file_cache;
 extern int mds_get_lovtgts(struct mds_obd *obd, int tgt_count,
                            obd_uuid_t *uuidarray);
 extern int mds_get_lovdesc(struct mds_obd  *obd, struct lov_desc *desc);
-extern int mds_update_last_rcvd(struct mds_obd *mds, void *handle,
-                                struct ptlrpc_request *req);
+extern void mds_start_transno(struct mds_obd *mds);
+extern int mds_finish_transno(struct mds_obd *mds, void *handle,
+                              struct ptlrpc_request *req, int rc);
 static int mds_cleanup(struct obd_device * obddev);
 
 extern struct lprocfs_vars status_var_nm_1[];
@@ -63,7 +65,7 @@ static int mds_bulk_timeout(void *data)
         struct ptlrpc_bulk_desc *desc = data;
 
         ENTRY;
-        CERROR("(not yet) starting recovery of client %p\n", desc->bd_client);
+        recovd_conn_fail(desc->bd_connection);
         RETURN(1);
 }
 
@@ -113,7 +115,8 @@ static int mds_sendpage(struct ptlrpc_request *req, struct file *file,
         }
 
         lwi = LWI_TIMEOUT(obd_timeout * HZ, mds_bulk_timeout, desc);
-        rc = l_wait_event(desc->bd_waitq, desc->bd_flags & PTL_BULK_FL_SENT, &lwi);
+        rc = l_wait_event(desc->bd_waitq, desc->bd_flags & PTL_BULK_FL_SENT,
+                          &lwi);
         if (rc) {
                 if (rc != -ETIMEDOUT)
                         LBUG();
@@ -301,27 +304,53 @@ static int mds_connect(struct lustre_handle *conn, struct obd_device *obd,
                         CERROR("FYI: NULL mcd - simultaneous connects\n");
                         continue;
                 }
-                if (!memcmp(cluuid, mcd->mcd_uuid, sizeof(mcd->mcd_uuid))) {
+                if (!memcmp(cluuid, mcd->mcd_uuid, sizeof mcd->mcd_uuid)) {
+                        /* XXX make handle-found-export a subroutine */
                         LASSERT(exp->exp_obd == obd);
 
-                        if (!list_empty(&exp->exp_conn_chain)) {
-                                CERROR("existing uuid/export, list not empty!\n");
-                                spin_unlock(&obd->obd_dev_lock);
+                        spin_unlock(&obd->obd_dev_lock);
+                        if (exp->exp_connection) {
+                                struct lustre_handle *hdl;
+                                hdl = &exp->exp_ldlm_data.led_import.imp_handle;
+                                /* Might be a re-connect after a partition. */
+                                if (!memcmp(conn, hdl, sizeof *conn)) {
+                                        CERROR("%s reconnecting\n", cluuid);
+                                        conn->addr = (__u64) (unsigned long)exp;
+                                        conn->cookie = exp->exp_cookie;
+                                        rc = EALREADY;
+                                } else {
+                                        CERROR("%s reconnecting from %s, "
+                                               "handle mismatch (ours %Lx/%Lx, "
+                                               "theirs %Lx/%Lx)\n", cluuid,
+                                               exp->exp_connection->
+                                               c_remote_uuid, hdl->addr,
+                                               hdl->cookie, conn->addr,
+                                               conn->cookie);
+                                        /* XXX disconnect them here? */
+                                        memset(conn, 0, sizeof *conn);
+                                        rc = -EALREADY;
+                                }
                                 MOD_DEC_USE_COUNT;
-                                RETURN(-EALREADY);
+                                RETURN(rc);
                         }
                         conn->addr = (__u64) (unsigned long)exp;
                         conn->cookie = exp->exp_cookie;
-                        spin_unlock(&obd->obd_dev_lock);
                         CDEBUG(D_INFO, "existing export for UUID '%s' at %p\n",
                                cluuid, exp);
                         CDEBUG(D_IOCTL,"connect: addr %Lx cookie %Lx\n",
                                (long long)conn->addr, (long long)conn->cookie);
-                        MOD_DEC_USE_COUNT;
                         RETURN(0);
                 }
         }
         spin_unlock(&obd->obd_dev_lock);
+
+        if (obd->u.mds.mds_recoverable_clients != 0) {
+                CERROR("denying connection for new client %s: in recovery\n",
+                       cluuid);
+                MOD_DEC_USE_COUNT;
+                RETURN(-EBUSY);
+        }
+
         /* XXX There is a small race between checking the list and adding a
          * new connection for the same UUID, but the real threat (list
          * corruption when multiple different clients connect) is solved.
@@ -351,7 +380,7 @@ static int mds_connect(struct lustre_handle *conn, struct obd_device *obd,
         INIT_LIST_HEAD(&med->med_open_head);
         spin_lock_init(&med->med_open_lock);
 
-        rc = mds_client_add(med, -1);
+        rc = mds_client_add(&obd->u.mds, med, -1);
         if (rc)
                 GOTO(out_mcd, rc);
 
@@ -836,13 +865,16 @@ static int mds_store_md(struct mds_obd *mds, struct ptlrpc_request *req,
         uc.ouc_fsgid = body->fsgid;
         uc.ouc_cap = body->capability;
         push_ctxt(&saved, &mds->mds_ctxt, &uc);
+        mds_start_transno(mds);
         handle = mds_fs_start(mds, inode, MDS_FSOP_SETATTR);
-        if (!handle)
-                GOTO(out_ea, rc = -ENOMEM);
+        if (IS_ERR(handle)) {
+                rc = PTR_ERR(handle);
+                mds_finish_transno(mds, handle, req, rc);
+                GOTO(out_ea, rc);
+        }
 
         rc = mds_fs_set_md(mds, inode, handle, lmm, lmm_size);
-        if (!rc)
-                rc = mds_update_last_rcvd(mds, handle, req);
+        rc = mds_finish_transno(mds, handle, req, rc);
 
         rc2 = mds_fs_commit(mds, inode, handle);
         if (rc2 && !rc)
@@ -1058,9 +1090,162 @@ int mds_reint(struct ptlrpc_request *req, int offset)
         return rc;
 }
 
+/* forward declaration */
+int mds_handle(struct ptlrpc_request *req);
+
+static int check_for_next_transno(struct mds_obd *mds)
+{
+        struct ptlrpc_request *req;
+        req = list_entry(mds->mds_recovery_queue.next, 
+                         struct ptlrpc_request, rq_list);
+        return req->rq_reqmsg->transno == mds->mds_next_recovery_transno;
+}
+
+static void process_recovery_queue(struct mds_obd *mds)
+{
+        struct ptlrpc_request *req;
+        
+        for (;;) {
+                spin_lock(&mds->mds_processing_task_lock);
+                req = list_entry(mds->mds_recovery_queue.next, 
+                                 struct ptlrpc_request, rq_list);
+
+                if (req->rq_reqmsg->transno != mds->mds_next_recovery_transno) {
+                        spin_unlock(&mds->mds_processing_task_lock);
+                        wait_event(mds->mds_next_transno_waitq,
+                                   check_for_next_transno(mds));
+                        continue;
+                }
+                list_del(&req->rq_list);
+                spin_unlock(&mds->mds_processing_task_lock);
+
+                DEBUG_REQ(D_HA, req, "");
+                mds_handle(req);
+                
+                if (list_empty(&mds->mds_recovery_queue))
+                        break;
+        }
+}
+
+static int queue_recovery_request(struct ptlrpc_request *req,
+                                  struct mds_obd *mds)
+{
+        struct list_head *tmp;
+        int inserted = 0, transno = req->rq_reqmsg->transno;
+
+        if (!transno) {
+                DEBUG_REQ(D_HA, req, "not queueing");
+                return 1;
+        }
+
+        spin_lock(&mds->mds_processing_task_lock);
+
+        if (mds->mds_processing_task == current->pid) {
+                /* Processing the queue right now, don't re-add. */
+                spin_unlock(&mds->mds_processing_task_lock);
+                return 1;
+        }
+
+        /* XXX O(n^2) */
+        list_for_each(tmp, &mds->mds_recovery_queue) {
+                struct ptlrpc_request *reqiter = 
+                        list_entry(tmp, struct ptlrpc_request, rq_list);
+                if (reqiter->rq_reqmsg->transno > transno) {
+                        list_add_tail(&req->rq_list, &reqiter->rq_list);
+                        inserted = 1;
+                        break;
+                }
+        }
+
+        if (!inserted)
+                list_add_tail(&req->rq_list, &mds->mds_recovery_queue);
+
+        if (mds->mds_processing_task != 0) {
+                /* Someone else is processing this queue, we'll leave it to
+                 * them.
+                 */
+                spin_unlock(&mds->mds_processing_task_lock);
+                if (transno == mds->mds_next_recovery_transno)
+                        wake_up(&mds->mds_next_transno_waitq);
+                return 0;
+        }
+
+        /* Nobody is processing, and we know there's (at least) one to process
+         * now, so we'll do the honours.
+         */
+        mds->mds_processing_task = current->pid;
+        spin_unlock(&mds->mds_processing_task_lock);
+
+        process_recovery_queue(mds);
+        return 0;
+}
+
+static int filter_recovery_request(struct ptlrpc_request *req, 
+                                   struct mds_obd *mds, int *process)
+{
+        switch (req->rq_reqmsg->opc) {
+        case MDS_CONNECT:
+        case MDS_DISCONNECT:
+        case MDS_OPEN:
+               *process = 1;
+               RETURN(0);
+            
+        case MDS_GETSTATUS: /* used in unmounting */
+        case MDS_REINT:
+        case LDLM_ENQUEUE:
+                *process = queue_recovery_request(req, mds);
+                RETURN(0);
+                
+        default:
+                DEBUG_REQ(D_ERROR, req, "not permitted during recovery");
+                *process = 0;
+                RETURN(ptlrpc_error(req->rq_svc, req));
+        }
+}
+
+static int mds_queue_final_reply(struct ptlrpc_request *req, int rc)
+{
+        struct mds_obd *mds = mds_req2mds(req);
+
+        if (rc) {
+                /* Just like ptlrpc_error, but without the sending. */
+                lustre_pack_msg(0, NULL, NULL, &req->rq_replen,
+                                &req->rq_repmsg);
+                req->rq_type = PTL_RPC_MSG_ERR;
+        }
+
+        list_add(&req->rq_list, &mds->mds_delayed_reply_queue);
+        if (--mds->mds_recoverable_clients == 0) {
+                struct list_head *tmp, *n;
+
+                CDEBUG(D_HA,
+                       "all clients recovered, sending delayed replies\n");
+                list_for_each_safe(tmp, n, &mds->mds_delayed_reply_queue) {
+                        req = list_entry(tmp, struct ptlrpc_request, rq_list);
+                        DEBUG_REQ(D_HA, req, "delayed:");
+                        ptlrpc_reply(req->rq_svc, req);
+                }
+        } else {
+                CDEBUG(D_HA, "%d recoverable clients remain\n",
+                       mds->mds_recoverable_clients);
+        }
+
+        return 1;
+}
+
+static char *reint_names[] = {
+        [REINT_SETATTR] "setattr",
+        [REINT_CREATE]  "create",
+        [REINT_LINK]    "link",
+        [REINT_UNLINK]  "unlink",
+        [REINT_RENAME]  "rename"
+};
+
 int mds_handle(struct ptlrpc_request *req)
 {
         int rc;
+        int should_process;
+        struct mds_obd *mds = NULL; /* quell gcc overwarning */
         ENTRY;
 
         rc = lustre_unpack_msg(req->rq_reqmsg, req->rq_reqlen);
@@ -1069,49 +1254,67 @@ int mds_handle(struct ptlrpc_request *req)
                 GOTO(out, rc);
         }
 
-        if (req->rq_reqmsg->opc != MDS_CONNECT && req->rq_export == NULL)
-                GOTO(out, rc = -ENOTCONN);
-
         LASSERT(!strcmp(req->rq_obd->obd_type->typ_name, LUSTRE_MDT_NAME));
 
+        if (req->rq_reqmsg->opc != MDS_CONNECT) {
+                if (req->rq_export == NULL)
+                        GOTO(out, rc = -ENOTCONN);
+
+                mds = mds_req2mds(req);
+                if (mds->mds_recoverable_clients != 0) {
+                        rc = filter_recovery_request(req, mds, &should_process);
+                        if (rc || !should_process)
+                                RETURN(rc);
+                }
+        }
+
         switch (req->rq_reqmsg->opc) {
         case MDS_CONNECT:
-                CDEBUG(D_INODE, "connect\n");
+                DEBUG_REQ(D_INODE, req, "connect");
                 OBD_FAIL_RETURN(OBD_FAIL_MDS_CONNECT_NET, 0);
                 rc = target_handle_connect(req);
+                /* Make sure that last_rcvd is correct. */
+                if (!rc) {
+                        /* Now that we have an export, set mds. */
+                        mds = mds_req2mds(req);
+                        mds_fsync_super(mds->mds_sb);
+                }
                 break;
 
         case MDS_DISCONNECT:
-                CDEBUG(D_INODE, "disconnect\n");
+                DEBUG_REQ(D_INODE, req, "disconnect");
                 OBD_FAIL_RETURN(OBD_FAIL_MDS_DISCONNECT_NET, 0);
                 rc = target_handle_disconnect(req);
+                /* Make sure that last_rcvd is correct. */
+                if (!rc)
+                        mds_fsync_super(mds->mds_sb);
                 goto out;
 
         case MDS_GETSTATUS:
-                CDEBUG(D_INODE, "getstatus\n");
+                DEBUG_REQ(D_INODE, req, "getstatus");
                 OBD_FAIL_RETURN(OBD_FAIL_MDS_GETSTATUS_NET, 0);
                 rc = mds_getstatus(req);
                 break;
 
         case MDS_GETLOVINFO:
-                CDEBUG(D_INODE, "getlovinfo\n");
+                DEBUG_REQ(D_INODE, req, "getlovinfo");
                 rc = mds_getlovinfo(req);
                 break;
 
         case MDS_GETATTR:
-                CDEBUG(D_INODE, "getattr\n");
+                DEBUG_REQ(D_INODE, req, "getattr");
                 OBD_FAIL_RETURN(OBD_FAIL_MDS_GETATTR_NET, 0);
                 rc = mds_getattr(0, req);
                 break;
 
         case MDS_STATFS:
-                CDEBUG(D_INODE, "statfs\n");
+                DEBUG_REQ(D_INODE, req, "statfs");
                 OBD_FAIL_RETURN(OBD_FAIL_MDS_STATFS_NET, 0);
                 rc = mds_statfs(req);
                 break;
 
         case MDS_READPAGE:
-                CDEBUG(D_INODE, "readpage\n");
+                DEBUG_REQ(D_INODE, req, "readpage\n");
                 OBD_FAIL_RETURN(OBD_FAIL_MDS_READPAGE_NET, 0);
                 rc = mds_readpage(req);
 
@@ -1121,7 +1324,13 @@ int mds_handle(struct ptlrpc_request *req)
 
         case MDS_REINT: {
                 int size = sizeof(struct mds_body);
-                CDEBUG(D_INODE, "reint\n");
+                int opc = *(u32 *)lustre_msg_buf(req->rq_reqmsg, 0), 
+                        realopc = opc & REINT_OPCODE_MASK;
+                        
+                DEBUG_REQ(D_INODE, req, "reint (%s%s)",
+                          reint_names[realopc],
+                          opc & REINT_REPLAYING ? "|REPLAYING" : "");
+                          
                 OBD_FAIL_RETURN(OBD_FAIL_MDS_REINT_NET, 0);
 
                 rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen,
@@ -1136,30 +1345,30 @@ int mds_handle(struct ptlrpc_request *req)
                 }
 
         case MDS_OPEN:
-                CDEBUG(D_INODE, "open\n");
+                DEBUG_REQ(D_INODE, req, "open");
                 OBD_FAIL_RETURN(OBD_FAIL_MDS_OPEN_NET, 0);
                 rc = mds_open(req);
                 break;
 
         case MDS_CLOSE:
-                CDEBUG(D_INODE, "close\n");
+                DEBUG_REQ(D_INODE, req, "close");
                 OBD_FAIL_RETURN(OBD_FAIL_MDS_CLOSE_NET, 0);
                 rc = mds_close(req);
                 break;
 
         case LDLM_ENQUEUE:
-                CDEBUG(D_INODE, "enqueue\n");
+                DEBUG_REQ(D_INODE, req, "enqueue");
                 OBD_FAIL_RETURN(OBD_FAIL_LDLM_ENQUEUE, 0);
                 rc = ldlm_handle_enqueue(req);
                 break;
         case LDLM_CONVERT:
-                CDEBUG(D_INODE, "convert\n");
+                DEBUG_REQ(D_INODE, req, "convert");
                 OBD_FAIL_RETURN(OBD_FAIL_LDLM_CONVERT, 0);
                 rc = ldlm_handle_convert(req);
                 break;
         case LDLM_BL_CALLBACK:
         case LDLM_CP_CALLBACK:
-                CDEBUG(D_INODE, "callback\n");
+                DEBUG_REQ(D_INODE, req, "callback");
                 CERROR("callbacks should not happen on MDS\n");
                 LBUG();
                 OBD_FAIL_RETURN(OBD_FAIL_LDLM_BL_CALLBACK, 0);
@@ -1173,7 +1382,6 @@ int mds_handle(struct ptlrpc_request *req)
 
         if (!rc) {
                 struct mds_export_data *med = &req->rq_export->exp_mds_data;
-                struct mds_obd *mds = mds_req2mds(req);
 
                 req->rq_repmsg->last_xid =
                         HTON__u64(le64_to_cpu(med->med_mcd->mcd_last_xid));
@@ -1185,7 +1393,17 @@ int mds_handle(struct ptlrpc_request *req)
                        cpu_to_le32(req->rq_xid));
         }
  out:
-        if (rc) {
+
+        if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_LAST_REPLAY) {
+                struct mds_obd *mds = mds_req2mds(req);
+                LASSERT(mds->mds_recoverable_clients);
+                DEBUG_REQ(D_HA, req, "LAST_REPLAY, queuing reply");
+                return mds_queue_final_reply(req, rc);
+        }
+        
+        /* MDS_CONNECT / EALREADY (note: not -EALREADY!) isn't an error */
+        if (rc && (req->rq_reqmsg->opc != MDS_CONNECT ||
+                   rc != EALREADY)) {
                 CERROR("mds: processing error (opcode %d): %d\n",
                        req->rq_reqmsg->opc, rc);
                 ptlrpc_error(req->rq_svc, req);
@@ -1205,7 +1423,6 @@ int mds_handle(struct ptlrpc_request *req)
  *
  * Also assumes for mds_last_rcvd that we are not modifying it (no locking).
  */
-static
 int mds_update_server_data(struct mds_obd *mds)
 {
         struct mds_server_data *msd = mds->mds_server_data;
@@ -1238,12 +1455,14 @@ int mds_update_server_data(struct mds_obd *mds)
 }
 
 /* Do recovery actions for the MDS */
-static int mds_recover(struct obd_device *obddev)
+static int mds_recovery_complete(struct obd_device *obddev)
 {
         struct mds_obd *mds = &obddev->u.mds;
         struct obd_run_ctxt saved;
         int rc;
 
+        LASSERT(mds->mds_recoverable_clients == 0);
+
         /* This happens at the end when recovery is complete */
         ++mds->mds_mount_count;
         push_ctxt(&saved, &mds->mds_ctxt, NULL);
@@ -1283,7 +1502,7 @@ static int mds_setup(struct obd_device *obddev, obd_count len, void *buf)
         if (!mds->mds_sb)
                 GOTO(err_put, rc = -ENODEV);
 
-        spin_lock_init(&mds->mds_last_lock);
+        init_MUTEX(&mds->mds_transno_sem);
         mds->mds_max_mdsize = sizeof(struct lov_mds_md);
         rc = mds_fs_setup(obddev, mnt);
         if (rc) {
@@ -1298,14 +1517,14 @@ static int mds_setup(struct obd_device *obddev, obd_count len, void *buf)
                 GOTO(err_fs, rc = -ENOMEM);
         }
 
-
-        rc = mds_recover(obddev);
-        if (rc)
-                GOTO(err_fs, rc);
-
         ptlrpc_init_client(LDLM_CB_REQUEST_PORTAL, LDLM_CB_REPLY_PORTAL,
                            "mds_ldlm_client", &obddev->obd_ldlm_client);
 
+        spin_lock_init(&mds->mds_processing_task_lock);
+        mds->mds_processing_task = 0;
+        INIT_LIST_HEAD(&mds->mds_recovery_queue);
+        INIT_LIST_HEAD(&mds->mds_delayed_reply_queue);
+        
         RETURN(0);
 
 err_fs:
index 3975f4f..9bba857 100644 (file)
@@ -37,17 +37,21 @@ struct mds_fs_type {
 
 static unsigned long last_rcvd_slots[MDS_MAX_CLIENT_WORDS];
 
+#define LAST_RCVD "last_rcvd"
+
 /* Add client data to the MDS.  We use a bitmap to locate a free space
  * in the last_rcvd file if cl_off is -1 (i.e. a new client).
  * Otherwise, we have just read the data from the last_rcvd file and
  * we know its offset.
  */
-int mds_client_add(struct mds_export_data *med, int cl_off)
+int mds_client_add(struct mds_obd *mds, struct mds_export_data *med, int cl_off)
 {
+        int new_client = (cl_off == -1);
+
         /* the bitmap operations can handle cl_off > sizeof(long) * 8, so
          * there's no need for extra complication here
          */
-        if (cl_off == -1) {
+        if (new_client) {
                 cl_off = find_first_zero_bit(last_rcvd_slots, MDS_MAX_CLIENTS);
         repeat:
                 if (cl_off >= MDS_MAX_CLIENTS) {
@@ -73,12 +77,35 @@ int mds_client_add(struct mds_export_data *med, int cl_off)
                cl_off, med->med_mcd->mcd_uuid);
 
         med->med_off = cl_off;
+
+        if (new_client) {
+                struct obd_run_ctxt saved;
+                loff_t off = MDS_LR_CLIENT + (cl_off * MDS_LR_SIZE);
+                ssize_t written;
+                
+                push_ctxt(&saved, &mds->mds_ctxt, NULL);
+                written = lustre_fwrite(mds->mds_rcvd_filp,
+                                                (char *)med->med_mcd,
+                                                sizeof(*med->med_mcd), &off);
+                pop_ctxt(&saved);
+
+                if (written != sizeof(*med->med_mcd)) {
+                        if (written < 0)
+                                RETURN(written);
+                        RETURN(-EIO);
+                }
+        }
         return 0;
 }
 
 int mds_client_free(struct obd_export *exp)
 {
         struct mds_export_data *med = &exp->exp_mds_data;
+        struct mds_obd *mds = &exp->exp_obd->u.mds;
+        struct mds_client_data zero_mcd;
+        struct obd_run_ctxt saved;
+        int written;
+        loff_t off;
 
         if (!med->med_mcd)
                 RETURN(0);
@@ -92,6 +119,24 @@ int mds_client_free(struct obd_export *exp)
                 LBUG();
         }
 
+        off = med->med_off;
+
+        memset(&zero_mcd, 0, sizeof zero_mcd);
+        push_ctxt(&saved, &mds->mds_ctxt, NULL);
+        written = lustre_fwrite(mds->mds_rcvd_filp, (const char *)&zero_mcd,
+                                sizeof zero_mcd, &off);
+        pop_ctxt(&saved);
+
+        if (written != sizeof zero_mcd) {
+                CERROR("error zeroing out client %s off %d in %s: %d\n",
+                       med->med_mcd->mcd_uuid, med->med_off, LAST_RCVD,
+                       written);
+                LBUG();
+        } else {
+                CDEBUG(D_INFO, "zeroed out disconnecting client %s at off %d\n",
+                       med->med_mcd->mcd_uuid, med->med_off);
+        }
+        
         OBD_FREE(med->med_mcd, sizeof(*med->med_mcd));
 
         return 0;
@@ -105,19 +150,16 @@ static int mds_server_free_data(struct mds_obd *mds)
         return 0;
 }
 
-#define LAST_RCVD "last_rcvd"
-
 static int mds_read_last_rcvd(struct obd_device *obddev, struct file *f)
 {
         struct mds_obd *mds = &obddev->u.mds;
         struct mds_server_data *msd;
         struct mds_client_data *mcd = NULL;
-        loff_t fsize = f->f_dentry->d_inode->i_size;
         loff_t off = 0;
         int cl_off;
+        int max_off = f->f_dentry->d_inode->i_size / sizeof(*mcd);
         __u64 last_rcvd = 0;
         __u64 last_mount;
-        int clients = 0;
         int rc = 0;
 
         OBD_ALLOC(msd, sizeof(*msd));
@@ -154,9 +196,11 @@ static int mds_read_last_rcvd(struct obd_device *obddev, struct file *f)
         CDEBUG(D_INODE, "got %Lu for server last_mount value\n",
                (unsigned long long)last_mount);
 
-        for (off = MDS_LR_CLIENT, cl_off = 0, rc = sizeof(*mcd);
-             off <= fsize - sizeof(*mcd) && rc == sizeof(*mcd);
-             off = MDS_LR_CLIENT + ++cl_off * MDS_LR_SIZE) {
+        for (off = MDS_LR_CLIENT, cl_off = 0;
+             off < max_off;
+             off += MDS_LR_SIZE, cl_off++) {
+                int mount_age;
+
                 if (!mcd) {
                         OBD_ALLOC(mcd, sizeof(*mcd));
                         if (!mcd)
@@ -172,13 +216,19 @@ static int mds_read_last_rcvd(struct obd_device *obddev, struct file *f)
                         break;
                 }
 
+                if (mcd->mcd_uuid[0] == '\0') {
+                        CDEBUG(D_INFO, "skipping zeroed client at offset %d\n",
+                               cl_off);
+                        continue;
+                }
+
                 last_rcvd = le64_to_cpu(mcd->mcd_last_rcvd);
 
                 /* The exports are cleaned up by mds_disconnect, so they
                  * need to be set up like real exports also.
                  */
-                if (last_rcvd && (last_mount - le64_to_cpu(mcd->mcd_mount_count)
-                                  < MDS_MOUNT_RECOV)) {
+                mount_age = last_mount - le64_to_cpu(mcd->mcd_mount_count);
+                if (last_rcvd && mount_age < MDS_MOUNT_RECOV) {
                         struct obd_export *exp = class_new_export(obddev);
                         struct mds_export_data *med;
 
@@ -189,17 +239,17 @@ static int mds_read_last_rcvd(struct obd_device *obddev, struct file *f)
 
                         med = &exp->exp_mds_data;
                         med->med_mcd = mcd;
-                        mds_client_add(med, cl_off);
+                        mds_client_add(mds, med, cl_off);
                         /* XXX put this in a helper if it gets more complex */
                         INIT_LIST_HEAD(&med->med_open_head);
                         spin_lock_init(&med->med_open_lock);
 
                         mcd = NULL;
-                        clients++;
+                        mds->mds_recoverable_clients++;
                         MOD_INC_USE_COUNT;
                 } else {
                         CDEBUG(D_INFO,
-                               "ignored client %d, UUID '%s', last_mount %Ld\n",
+                               "discarded client %d, UUID '%s', count %Ld\n",
                                cl_off, mcd->mcd_uuid,
                                (long long)le64_to_cpu(mcd->mcd_mount_count));
                 }
@@ -211,15 +261,16 @@ static int mds_read_last_rcvd(struct obd_device *obddev, struct file *f)
                         mds->mds_last_rcvd = last_rcvd;
                 }
         }
-        CDEBUG(D_INODE, "got %Lu for highest last_rcvd value, %d/%d clients\n",
-               (unsigned long long)mds->mds_last_rcvd, clients, cl_off);
+
+        mds->mds_last_committed = mds->mds_last_rcvd;
+        if (mds->mds_recoverable_clients) {
+                CERROR("need recovery: %d recoverable clients, last_rcvd %Lu\n",
+                       mds->mds_recoverable_clients, mds->mds_last_rcvd);
+        }
 
         if (mcd)
                 OBD_FREE(mcd, sizeof(*mcd));
 
-        /* After recovery, there can be no local uncommitted transactions */
-        mds->mds_last_committed = mds->mds_last_rcvd;
-
         return 0;
 
 err_msd:
index 854b357..f158bc2 100644 (file)
 
 extern inline struct mds_obd *mds_req2mds(struct ptlrpc_request *req);
 
+void mds_start_transno(struct mds_obd *mds)
+{
+        ENTRY;
+        down(&mds->mds_transno_sem);
+}
+
 /* Assumes caller has already pushed us into the kernel context. */
-int mds_update_last_rcvd(struct mds_obd *mds, void *handle,
-                         struct ptlrpc_request *req)
+int mds_finish_transno(struct mds_obd *mds, void *handle,
+                       struct ptlrpc_request *req, int rc)
 {
         struct mds_export_data *med = &req->rq_export->exp_mds_data;
         struct mds_client_data *mcd = med->med_mcd;
         __u64 last_rcvd;
         loff_t off;
-        int rc;
+        ssize_t written;
+
+        /* Propagate error code. */
+        if (rc)
+                goto out;
 
         /* we don't allocate new transnos for replayed requests */
-        if (req->rq_level == LUSTRE_CONN_RECOVD)
-                RETURN(0);
+        if (req->rq_level == LUSTRE_CONN_RECOVD) {
+                rc = 0;
+                goto out;
+        }
 
         off = MDS_LR_CLIENT + med->med_off * MDS_LR_SIZE;
 
-        spin_lock(&mds->mds_last_lock);
         last_rcvd = ++mds->mds_last_rcvd;
-        spin_unlock(&mds->mds_last_lock);
         req->rq_repmsg->transno = HTON__u64(last_rcvd);
         mcd->mcd_last_rcvd = cpu_to_le64(last_rcvd);
         mcd->mcd_mount_count = cpu_to_le64(mds->mds_mount_count);
         mcd->mcd_last_xid = cpu_to_le64(req->rq_xid);
 
         mds_fs_set_last_rcvd(mds, handle);
-        rc = lustre_fwrite(mds->mds_rcvd_filp, (char *)mcd, sizeof(*mcd), &off);
-        CDEBUG(D_INODE, "wrote trans #"LPD64" for client '%s' at #%d: rc = "
-               "%d\n", last_rcvd, mcd->mcd_uuid, med->med_off, rc);
+        written = lustre_fwrite(mds->mds_rcvd_filp, (char *)mcd, sizeof(*mcd),
+                                &off);
+        CDEBUG(D_INODE, "wrote trans #"LPD64" for client %s at #%d: written = "
+               "%d\n", last_rcvd, mcd->mcd_uuid, med->med_off, written);
 
-        if (rc == sizeof(*mcd))
-                rc = 0;
-        else {
-                CERROR("error writing to last_rcvd file: rc = %d\n", rc);
-                if (rc >= 0)
-                        rc = -EIO;
-        }
+        if (written == sizeof(*mcd))
+                GOTO(out, rc = 0);
+        CERROR("error writing to last_rcvd file: rc = %d\n", rc);
+        if (written >= 0)
+                GOTO(out, rc = -EIO);
+
+        rc = 0;
 
+ out:
+        EXIT;
+        up(&mds->mds_transno_sem);
         return rc;
 }
 
@@ -129,9 +143,13 @@ static int mds_reint_setattr(struct mds_update_record *rec, int offset,
         OBD_FAIL_WRITE(OBD_FAIL_MDS_REINT_SETATTR_WRITE,
                        to_kdev_t(inode->i_sb->s_dev));
 
+        mds_start_transno(mds);
         handle = mds_fs_start(mds, inode, MDS_FSOP_SETATTR);
-        if (!handle)
-                GOTO(out_setattr_de, rc = PTR_ERR(handle));
+        if (IS_ERR(handle)) {
+                rc = PTR_ERR(handle);
+                (void)mds_finish_transno(mds, handle, req, rc);
+                GOTO(out_setattr_de, rc);
+        }
 
         rc = mds_fs_setattr(mds, de, handle, &rec->ur_iattr);
 
@@ -141,8 +159,7 @@ static int mds_reint_setattr(struct mds_update_record *rec, int offset,
                 mds_pack_inode2body(body, inode);
         }
 
-        if (!rc)
-                rc = mds_update_last_rcvd(mds, handle, req);
+        rc = mds_finish_transno(mds, handle, req, rc);
 
         err = mds_fs_commit(mds, de->d_inode, handle);
         if (err) {
@@ -238,27 +255,34 @@ static int mds_reint_create(struct mds_update_record *rec, int offset,
                         rec->ur_mode |= S_ISGID;
         }
 
+        /* From here on, we must exit via a path that calls mds_finish_transno,
+         * so that we release the mds_transno_sem (and, in the case of success,
+         * update the transno correctly).  out_create_commit and
+         * out_transno_dchild are good candidates.
+         */
+        mds_start_transno(mds);
+
         switch (type) {
         case S_IFREG:{
                 handle = mds_fs_start(mds, dir, MDS_FSOP_CREATE);
-                if (!handle)
-                        GOTO(out_create_dchild, PTR_ERR(handle));
+                if (IS_ERR(handle))
+                        GOTO(out_transno_dchild, rc = PTR_ERR(handle));
                 rc = vfs_create(dir, dchild, rec->ur_mode);
                 EXIT;
                 break;
         }
         case S_IFDIR:{
                 handle = mds_fs_start(mds, dir, MDS_FSOP_MKDIR);
-                if (!handle)
-                        GOTO(out_create_dchild, PTR_ERR(handle));
+                if (IS_ERR(handle))
+                        GOTO(out_transno_dchild, rc = PTR_ERR(handle));
                 rc = vfs_mkdir(dir, dchild, rec->ur_mode);
                 EXIT;
                 break;
         }
         case S_IFLNK:{
                 handle = mds_fs_start(mds, dir, MDS_FSOP_SYMLINK);
-                if (!handle)
-                        GOTO(out_create_dchild, PTR_ERR(handle));
+                if (IS_ERR(handle))
+                        GOTO(out_transno_dchild, rc = PTR_ERR(handle));
                 rc = vfs_symlink(dir, dchild, rec->ur_name);
                 EXIT;
                 break;
@@ -269,15 +293,16 @@ static int mds_reint_create(struct mds_update_record *rec, int offset,
         case S_IFSOCK:{
                 int rdev = rec->ur_rdev;
                 handle = mds_fs_start(mds, dir, MDS_FSOP_MKNOD);
-                if (!handle)
-                        GOTO(out_create_dchild, PTR_ERR(handle));
+                if (IS_ERR(handle))
+                        GOTO(out_transno_dchild, rc = PTR_ERR(handle));
                 rc = vfs_mknod(dir, dchild, rec->ur_mode, rdev);
                 EXIT;
                 break;
         }
         default:
                 CERROR("bad file type %o creating %s\n", type, rec->ur_name);
-                GOTO(out_create_dchild, rc = -EINVAL);
+                handle = NULL; /* quell uninitialized warning */
+                GOTO(out_transno_dchild, rc = -EINVAL);
         }
 
         if (rc) {
@@ -299,7 +324,7 @@ static int mds_reint_create(struct mds_update_record *rec, int offset,
                 if (rec->ur_fid2->id) {
                         LASSERT(rec->ur_opcode & REINT_REPLAYING);
                         inode->i_generation = rec->ur_fid2->generation;
-                        /* Dirtied and committed by this setattr: */
+                        /* Dirtied and committed by the upcoming setattr. */
                         CDEBUG(D_INODE, "recreated ino %ld with gen %ld\n",
                                inode->i_ino, inode->i_generation);
                 } else {
@@ -312,18 +337,19 @@ static int mds_reint_create(struct mds_update_record *rec, int offset,
                         /* XXX should we abort here in case of error? */
                 }
 
-                rc = mds_update_last_rcvd(mds, handle, req);
-                if (rc) {
-                        CERROR("error on mds_update_last_rcvd: rc = %d\n", rc);
-                        GOTO(out_create_unlink, rc);
-                }
-
                 body = lustre_msg_buf(req->rq_repmsg, offset);
                 mds_pack_inode2fid(&body->fid1, inode);
                 mds_pack_inode2body(body, inode);
         }
         EXIT;
 out_create_commit:
+        if (rc) {
+                rc = mds_finish_transno(mds, handle, req, rc);
+        } else {
+                rc = mds_finish_transno(mds, handle, req, rc);
+                if (rc)
+                        GOTO(out_create_unlink, rc);
+        }
         err = mds_fs_commit(mds, dir, handle);
         if (err) {
                 CERROR("error on commit: err = %d\n", err);
@@ -340,6 +366,12 @@ out_create:
         req->rq_status = rc;
         return 0;
 
+out_transno_dchild:
+        /* Need to release the transno lock, and then put the dchild. */
+        LASSERT(rc);
+        mds_finish_transno(mds, handle, req, rc);
+        goto out_create_dchild;
+
 out_create_unlink:
         /* Destroy the file we just created.  This should not need extra
          * journal credits, as we have already modified all of the blocks
@@ -431,11 +463,12 @@ static int mds_reint_unlink(struct mds_update_record *rec, int offset,
         OBD_FAIL_WRITE(OBD_FAIL_MDS_REINT_UNLINK_WRITE,
                        to_kdev_t(dir->i_sb->s_dev));
 
+        mds_start_transno(mds);
         switch (rec->ur_mode /* & S_IFMT ? */) {
         case S_IFDIR:
                 handle = mds_fs_start(mds, dir, MDS_FSOP_RMDIR);
-                if (!handle)
-                        GOTO(out_unlink_cancel, rc = PTR_ERR(handle));
+                if (IS_ERR(handle))
+                        GOTO(out_unlink_cancel_transno, rc = PTR_ERR(handle));
                 rc = vfs_rmdir(dir, dchild);
                 break;
         case S_IFREG:
@@ -449,19 +482,18 @@ static int mds_reint_unlink(struct mds_update_record *rec, int offset,
         case S_IFIFO:
         case S_IFSOCK:
                 handle = mds_fs_start(mds, dir, MDS_FSOP_UNLINK);
-                if (!handle)
-                        GOTO(out_unlink_cancel, rc = PTR_ERR(handle));
+                if (IS_ERR(handle))
+                        GOTO(out_unlink_cancel_transno, rc = PTR_ERR(handle));
                 rc = vfs_unlink(dir, dchild);
                 break;
         default:
                 CERROR("bad file type %o unlinking %s\n", rec->ur_mode, name);
                 handle = NULL;
                 LBUG();
-                GOTO(out_unlink_cancel, rc = -EINVAL);
+                GOTO(out_unlink_cancel_transno, rc = -EINVAL);
         }
 
-        if (!rc)
-                rc = mds_update_last_rcvd(mds, handle, req);
+        rc = mds_finish_transno(mds, handle, req, rc);
         err = mds_fs_commit(mds, dir, handle);
         if (err) {
                 CERROR("error on commit: err = %d\n", err);
@@ -487,6 +519,10 @@ out_unlink:
         l_dput(de);
         req->rq_status = rc;
         return 0;
+
+out_unlink_cancel_transno:
+        rc = mds_finish_transno(mds, handle, req, rc);
+        goto out_unlink_cancel;
 }
 
 static int mds_reint_link(struct mds_update_record *rec, int offset,
@@ -589,15 +625,18 @@ static int mds_reint_link(struct mds_update_record *rec, int offset,
         OBD_FAIL_WRITE(OBD_FAIL_MDS_REINT_LINK_WRITE,
                        to_kdev_t(de_src->d_inode->i_sb->s_dev));
 
+        mds_start_transno(mds);
         handle = mds_fs_start(mds, de_tgt_dir->d_inode, MDS_FSOP_LINK);
-        if (!handle)
-                GOTO(out_link_dchild, rc = PTR_ERR(handle));
+        if (IS_ERR(handle)) {
+                rc = PTR_ERR(handle);
+                mds_finish_transno(mds, handle, req, rc);
+                GOTO(out_link_dchild, rc);
+        }
 
         rc = vfs_link(de_src, de_tgt_dir->d_inode, dchild);
         if (rc)
                 CERROR("link error %d\n", rc);
-        if (!rc)
-                rc = mds_update_last_rcvd(mds, handle, req);
+        rc = mds_finish_transno(mds, handle, req, rc);
 
         err = mds_fs_commit(mds, de_tgt_dir->d_inode, handle);
         if (err) {
@@ -720,16 +759,20 @@ static int mds_reint_rename(struct mds_update_record *rec, int offset,
         OBD_FAIL_WRITE(OBD_FAIL_MDS_REINT_RENAME_WRITE,
                        to_kdev_t(de_srcdir->d_inode->i_sb->s_dev));
 
+        mds_start_transno(mds);
         handle = mds_fs_start(mds, de_tgtdir->d_inode, MDS_FSOP_RENAME);
-        if (!handle)
-                GOTO(out_rename_denew, rc = PTR_ERR(handle));
+        if (IS_ERR(handle)) {
+                rc = PTR_ERR(handle);
+                mds_finish_transno(mds, handle, req, rc);
+                GOTO(out_rename_denew, rc);
+        }
+
         lock_kernel();
         rc = vfs_rename(de_srcdir->d_inode, de_old, de_tgtdir->d_inode, de_new,
                         NULL);
         unlock_kernel();
 
-        if (!rc)
-                rc = mds_update_last_rcvd(mds, handle, req);
+        rc = mds_finish_transno(mds, handle, req, rc);
 
         err = mds_fs_commit(mds, de_tgtdir->d_inode, handle);
         if (err) {
index cc10150..60816e7 100644 (file)
@@ -332,29 +332,29 @@ static int obd_class_ioctl (struct inode * inode, struct file * filp,
                 INIT_LIST_HEAD(&obd->obd_imports);
                 spin_lock_init(&obd->obd_dev_lock);
 
-                        if (data->ioc_inlbuf2) {
-                                int len = strlen(data->ioc_inlbuf2) + 1;
-                                OBD_ALLOC(obd->obd_name, len);
-                                if (!obd->obd_name) {
-                                        CERROR("no memory\n");
-                                        LBUG();
-                                }
-                                memcpy(obd->obd_name, data->ioc_inlbuf2, len);
-                        } else {
-                                CERROR("WARNING: unnamed obd device\n");
+                if (data->ioc_inlbuf2) {
+                        int len = strlen(data->ioc_inlbuf2) + 1;
+                        OBD_ALLOC(obd->obd_name, len);
+                        if (!obd->obd_name) {
+                                CERROR("no memory\n");
+                                LBUG();
                         }
-                        if (data->ioc_inlbuf3) {
-                                int len = strlen(data->ioc_inlbuf3);
-                                if (len >= sizeof(obd->obd_uuid)) {
-                                        CERROR("uuid must be < %d bytes long\n",
-                                               sizeof(obd->obd_uuid));
-                                        if (obd->obd_name)
-                                                OBD_FREE(obd->obd_name,
-                                                         strlen(obd->obd_name) + 1);
-                                        GOTO(out, err=-EINVAL);
-                                }
-                                memcpy(obd->obd_uuid, data->ioc_inlbuf3, len);
+                        memcpy(obd->obd_name, data->ioc_inlbuf2, len);
+                } else {
+                        CERROR("WARNING: unnamed obd device\n");
+                }
+                if (data->ioc_inlbuf3) {
+                        int len = strlen(data->ioc_inlbuf3);
+                        if (len >= sizeof(obd->obd_uuid)) {
+                                CERROR("uuid must be < %d bytes long\n",
+                                       sizeof(obd->obd_uuid));
+                                if (obd->obd_name)
+                                        OBD_FREE(obd->obd_name,
+                                                 strlen(obd->obd_name) + 1);
+                                GOTO(out, err=-EINVAL);
                         }
+                        memcpy(obd->obd_uuid, data->ioc_inlbuf3, len);
+                }
                 /* do the attach */
                 if (OBP(obd, attach))
                         err = OBP(obd,attach)(obd, sizeof(*data), data);
index 8d6fa4c..f31a97a 100644 (file)
@@ -408,7 +408,7 @@ void class_disconnect_all(struct obd_device *obddev)
                         spin_unlock(&obddev->obd_dev_lock);
                         CERROR("force disconnecting %s:%s export %p\n",
                                export->exp_obd->obd_type->typ_name,
-                               export->exp_uuid, export);
+                               export->exp_connection->c_remote_uuid, export);
                         rc = obd_disconnect(&conn);
                         if (rc < 0) {
                                 /* AED: not so sure about this...  We can't
index 68918cc..43ae0ca 100644 (file)
@@ -136,7 +136,6 @@ static int osc_getattr(struct lustre_handle *conn, struct obdo *oa,
         request->rq_replen = lustre_msg_size(1, &size);
 
         rc = ptlrpc_queue_wait(request);
-        rc = ptlrpc_check_status(request, rc);
         if (rc) {
                 CERROR("%s failed: rc = %d\n", __FUNCTION__, rc);
                 GOTO(out, rc);
@@ -173,7 +172,6 @@ static int osc_open(struct lustre_handle *conn, struct obdo *oa,
         request->rq_replen = lustre_msg_size(1, &size);
 
         rc = ptlrpc_queue_wait(request);
-        rc = ptlrpc_check_status(request, rc);
         if (rc)
                 GOTO(out, rc);
 
@@ -208,7 +206,6 @@ static int osc_close(struct lustre_handle *conn, struct obdo *oa,
         request->rq_replen = lustre_msg_size(1, &size);
 
         rc = ptlrpc_queue_wait(request);
-        rc = ptlrpc_check_status(request, rc);
         if (rc)
                 GOTO(out, rc);
 
@@ -242,7 +239,6 @@ static int osc_setattr(struct lustre_handle *conn, struct obdo *oa,
         request->rq_replen = lustre_msg_size(1, &size);
 
         rc = ptlrpc_queue_wait(request);
-        rc = ptlrpc_check_status(request, rc);
 
         ptlrpc_req_finished(request);
         return rc;
@@ -278,7 +274,6 @@ static int osc_create(struct lustre_handle *conn, struct obdo *oa,
         request->rq_replen = lustre_msg_size(1, &size);
 
         rc = ptlrpc_queue_wait(request);
-        rc = ptlrpc_check_status(request, rc);
         if (rc)
                 GOTO(out_req, rc);
 
@@ -328,7 +323,6 @@ static int osc_punch(struct lustre_handle *conn, struct obdo *oa,
         request->rq_replen = lustre_msg_size(1, &size);
 
         rc = ptlrpc_queue_wait(request);
-        rc = ptlrpc_check_status(request, rc);
         if (rc)
                 GOTO(out, rc);
 
@@ -365,7 +359,6 @@ static int osc_destroy(struct lustre_handle *conn, struct obdo *oa,
         request->rq_replen = lustre_msg_size(1, &size);
 
         rc = ptlrpc_queue_wait(request);
-        rc = ptlrpc_check_status(request, rc);
         if (rc)
                 GOTO(out, rc);
 
@@ -422,8 +415,8 @@ static int osc_brw_read(struct lustre_handle *conn, struct lov_stripe_md *lsm,
                         obd_count page_count, struct brw_page *pga,
                         struct obd_brw_set *set)
 {
-        struct ptlrpc_connection *connection =
-                client_conn2cli(conn)->cl_import.imp_connection;
+        struct obd_import *imp = class_conn2cliimp(conn);
+        struct ptlrpc_connection *connection = imp->imp_connection;
         struct ptlrpc_request *request = NULL;
         struct ptlrpc_bulk_desc *desc = NULL;
         struct ost_body *body;
@@ -435,8 +428,7 @@ static int osc_brw_read(struct lustre_handle *conn, struct lov_stripe_md *lsm,
         size[1] = sizeof(struct obd_ioobj);
         size[2] = page_count * sizeof(struct niobuf_remote);
 
-        request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_READ, 3, size,
-                                  NULL);
+        request = ptlrpc_prep_req(imp, OST_READ, 3, size, NULL);
         if (!request)
                 RETURN(-ENOMEM);
 
@@ -454,9 +446,9 @@ static int osc_brw_read(struct lustre_handle *conn, struct lov_stripe_md *lsm,
         ost_pack_ioo(&iooptr, lsm, page_count);
         /* end almost identical to brw_write case */
 
-        spin_lock(&connection->c_lock);
-        xid = ++connection->c_xid_out;       /* single xid for all pages */
-        spin_unlock(&connection->c_lock);
+        spin_lock(&imp->imp_lock);
+        xid = ++imp->imp_last_xid;       /* single xid for all pages */
+        spin_unlock(&imp->imp_lock);
 
         obd_kmap_get(page_count, 0);
 
@@ -495,7 +487,6 @@ static int osc_brw_read(struct lustre_handle *conn, struct lov_stripe_md *lsm,
 
         request->rq_replen = lustre_msg_size(1, size);
         rc = ptlrpc_queue_wait(request);
-        rc = ptlrpc_check_status(request, rc);
 
         /*
          * XXX: If there is an error during the processing of the callback,
@@ -584,7 +575,6 @@ static int osc_brw_write(struct lustre_handle *conn, struct lov_stripe_md *md,
         size[1] = page_count * sizeof(*remote);
         request->rq_replen = lustre_msg_size(2, size);
         rc = ptlrpc_queue_wait(request);
-        rc = ptlrpc_check_status(request, rc);
         if (rc)
                 GOTO(out_unmap, rc);
 
@@ -766,7 +756,6 @@ static int osc_statfs(struct lustre_handle *conn, struct obd_statfs *osfs)
         request->rq_replen = lustre_msg_size(1, &size);
 
         rc = ptlrpc_queue_wait(request);
-        rc = ptlrpc_check_status(request, rc);
         if (rc) {
                 CERROR("%s failed: rc = %d\n", __FUNCTION__, rc);
                 GOTO(out, rc);
index 228115f..ced4655 100644 (file)
@@ -215,7 +215,7 @@ static int ost_bulk_timeout(void *data)
         struct ptlrpc_bulk_desc *desc = data;
 
         ENTRY;
-        CERROR("(not yet) starting recovery of client %p\n", desc->bd_client);
+        recovd_conn_fail(desc->bd_connection);
         RETURN(1);
 }
 
index 28f1a5c..8f4aceb 100644 (file)
@@ -299,9 +299,9 @@ struct ptlrpc_request *ptlrpc_prep_req(struct obd_import *imp, int opcode,
          */
         atomic_set(&request->rq_refcount, 2);
 
-        spin_lock(&conn->c_lock);
-        request->rq_xid = HTON__u32(++conn->c_xid_out);
-        spin_unlock(&conn->c_lock);
+        spin_lock(&imp->imp_lock);
+        request->rq_xid = HTON__u32(++imp->imp_last_xid);
+        spin_unlock(&imp->imp_lock);
 
         request->rq_reqmsg->magic = PTLRPC_MSG_MAGIC;
         request->rq_reqmsg->version = PTLRPC_MSG_VERSION;
@@ -312,19 +312,7 @@ struct ptlrpc_request *ptlrpc_prep_req(struct obd_import *imp, int opcode,
         RETURN(request);
 }
 
-void ptlrpc_req_finished(struct ptlrpc_request *request)
-{
-        if (request == NULL)
-                return;
-
-        if (atomic_dec_and_test(&request->rq_refcount))
-                ptlrpc_free_req(request);
-        else
-                DEBUG_REQ(D_INFO, request, "refcount now %u",
-                          atomic_read(&request->rq_refcount));
-}
-
-void ptlrpc_free_req(struct ptlrpc_request *request)
+static void __ptlrpc_free_req(struct ptlrpc_request *request, int locked)
 {
         ENTRY;
         if (request == NULL) {
@@ -351,10 +339,12 @@ void ptlrpc_free_req(struct ptlrpc_request *request)
                 request->rq_reqmsg = NULL;
         }
 
-        if (request->rq_connection) {
-                spin_lock(&request->rq_connection->c_lock);
+        if (request->rq_import) {
+                if (!locked)
+                        spin_lock(&request->rq_import->imp_lock);
                 list_del_init(&request->rq_list);
-                spin_unlock(&request->rq_connection->c_lock);
+                if (!locked)
+                        spin_unlock(&request->rq_import->imp_lock);
         }
 
         ptlrpc_put_connection(request->rq_connection);
@@ -362,62 +352,87 @@ void ptlrpc_free_req(struct ptlrpc_request *request)
         EXIT;
 }
 
+void ptlrpc_free_req(struct ptlrpc_request *request)
+{
+        __ptlrpc_free_req(request, 0);
+}
+
+static int __ptlrpc_req_finished(struct ptlrpc_request *request, int locked)
+{
+        ENTRY;
+        if (request == NULL)
+                RETURN(1);
+
+        if (atomic_dec_and_test(&request->rq_refcount)) {
+                __ptlrpc_free_req(request, locked);
+                RETURN(1);
+        }
+
+        DEBUG_REQ(D_INFO, request, "refcount now %u",
+                  atomic_read(&request->rq_refcount));
+        RETURN(0);
+}
+
+void ptlrpc_req_finished(struct ptlrpc_request *request)
+{
+        __ptlrpc_req_finished(request, 0);
+}
+
 static int ptlrpc_check_reply(struct ptlrpc_request *req)
 {
         int rc = 0;
 
         if (req->rq_repmsg != NULL) {
-                struct ptlrpc_connection *conn = req->rq_import->imp_connection;
+                struct obd_import *imp = req->rq_import;
+                struct ptlrpc_connection *conn = imp->imp_connection;
+                ENTRY;
                 if (req->rq_level > conn->c_level) {
-                        CDEBUG(D_HA,
-                               "rep to xid "LPD64" op %d to %s:%d: "
-                               "recovery started, ignoring (%d > %d)\n",
-                               (unsigned long long)req->rq_xid,
-                               req->rq_reqmsg->opc, conn->c_remote_uuid,
-                               req->rq_import->imp_client->cli_request_portal,
+                        DEBUG_REQ(D_HA, req,
+                               "recovery started, ignoring (%d > %d)",
                                req->rq_level, conn->c_level);
                         req->rq_repmsg = NULL;
                         GOTO(out, rc = 0);
                 }
                 req->rq_transno = NTOH__u64(req->rq_repmsg->transno);
+                spin_lock(&imp->imp_lock);
+                if (req->rq_transno > imp->imp_max_transno) {
+                        imp->imp_max_transno = req->rq_transno;
+                } else if (req->rq_transno != 0) {
+                        if (conn->c_level == LUSTRE_CONN_FULL) {
+                                CERROR("got transno "LPD64" after "
+                                       LPD64": recovery may not work\n",
+                                       req->rq_transno, imp->imp_max_transno);
+                        }
+                }
+                spin_unlock(&imp->imp_lock);
                 req->rq_flags |= PTL_RPC_FL_REPLIED;
                 GOTO(out, rc = 1);
         }
 
         if (req->rq_flags & PTL_RPC_FL_RESEND) {
-                CERROR("-- RESTART --\n");
+                DEBUG_REQ(D_ERROR, req, "RESEND:");
                 GOTO(out, rc = 1);
         }
 
         if (req->rq_flags & PTL_RPC_FL_ERR) {
-                CERROR("-- ABORTED --\n");
+                DEBUG_REQ(D_ERROR, req, "ABORTED:");
                 GOTO(out, rc = 1);
         }
 
+        if (req->rq_flags & PTL_RPC_FL_RESTART) {
+                DEBUG_REQ(D_ERROR, req, "RESTART:");
+                GOTO(out, rc = 1);
+        }
  out:
-        CDEBUG(D_NET, "req = %p, rc = %d\n", req, rc);
+        DEBUG_REQ(D_NET, req, "rc = %d for", rc);
         return rc;
 }
 
-int ptlrpc_check_status(struct ptlrpc_request *req, int err)
+static int ptlrpc_check_status(struct ptlrpc_request *req)
 {
+        int err;
         ENTRY;
 
-        if (err != 0) {
-                CERROR("err is %d\n", err);
-                RETURN(err);
-        }
-
-        if (req == NULL) {
-                CERROR("req == NULL\n");
-                RETURN(-ENOMEM);
-        }
-
-        if (req->rq_repmsg == NULL) {
-                CERROR("req->rq_repmsg == NULL\n");
-                RETURN(-ENOMEM);
-        }
-
         err = req->rq_repmsg->status;
         if (req->rq_repmsg->type == NTOH__u32(PTL_RPC_MSG_ERR)) {
                 CERROR("req->rq_repmsg->type == PTL_RPC_MSG_ERR\n");
@@ -426,14 +441,12 @@ int ptlrpc_check_status(struct ptlrpc_request *req, int err)
 
         if (err != 0) {
                 if (err < 0)
-                        CERROR("req->rq_repmsg->status is %d\n", err);
+                        CDEBUG(D_INFO, "req->rq_repmsg->status is %d\n", err);
                 else
                         CDEBUG(D_INFO, "req->rq_repmsg->status is %d\n", err);
-                /* XXX: translate this error from net to host */
-                RETURN(err);
         }
 
-        RETURN(0);
+        RETURN(err);
 }
 
 static void ptlrpc_cleanup_request_buf(struct ptlrpc_request *request)
@@ -455,14 +468,13 @@ static int ptlrpc_abort(struct ptlrpc_request *request)
         return 0;
 }
 
-/* caller must hold conn->c_lock */
-void ptlrpc_free_committed(struct ptlrpc_connection *conn)
+/* caller must hold imp->imp_lock */
+void ptlrpc_free_committed(struct obd_import *imp)
 {
         struct list_head *tmp, *saved;
         struct ptlrpc_request *req;
 
-restart:
-        list_for_each_safe(tmp, saved, &conn->c_sending_head) {
+        list_for_each_safe(tmp, saved, &imp->imp_request_list) {
                 req = list_entry(tmp, struct ptlrpc_request, rq_list);
 
                 if (req->rq_flags & PTL_RPC_FL_REPLAY) {
@@ -470,33 +482,27 @@ restart:
                         continue;
                 }
 
-                if (!(req->rq_flags & PTL_RPC_FL_REPLIED)) {
+                /* If neither replied-to nor restarted, keep it. */
+                if (!(req->rq_flags &
+                      (PTL_RPC_FL_REPLIED | PTL_RPC_FL_RESTART))) {
                         DEBUG_REQ(D_HA, req, "keeping (in-flight)");
                         continue;
                 }
 
+                /* This needs to match the commit test in ptlrpc_queue_wait() */
+                if (!(req->rq_import->imp_flags & IMP_REPLAYABLE) ||
+                    req->rq_transno == 0) {
+                        DEBUG_REQ(D_HA, req, "keeping (queue_wait will free)");
+                        continue;
+                }
+
                 /* not yet committed */
-                if (req->rq_transno > conn->c_last_committed)
+                if (req->rq_transno > imp->imp_peer_committed_transno)
                         break;
 
                 DEBUG_REQ(D_HA, req, "committing (last_committed %Lu)",
-                          (long long)conn->c_last_committed);
-                if (atomic_dec_and_test(&req->rq_refcount)) {
-                        /* We do this to prevent free_req deadlock.  Restarting
-                         * after each removal is not so bad, as we are almost
-                         * always deleting the first item in the list.
-                         *
-                         * If we use a recursive lock here, we can skip the
-                         * unlock/lock/restart sequence.
-                         */
-                        spin_unlock(&conn->c_lock);
-                        ptlrpc_free_req(req);
-                        spin_lock(&conn->c_lock);
-                        goto restart;
-                } else {
-                        list_del(&req->rq_list);
-                        list_add(&req->rq_list, &conn->c_dying_head);
-                }
+                          imp->imp_peer_committed_transno);
+                __ptlrpc_req_finished(req, 1);
         }
 
         EXIT;
@@ -512,35 +518,18 @@ void ptlrpc_cleanup_client(struct obd_import *imp)
 
         LASSERT(conn);
 
-restart1:
-        spin_lock(&conn->c_lock);
-        list_for_each_safe(tmp, saved, &conn->c_sending_head) {
+        spin_lock(&imp->imp_lock);
+        list_for_each_safe(tmp, saved, &imp->imp_request_list) {
                 req = list_entry(tmp, struct ptlrpc_request, rq_list);
-                if (req->rq_import != imp)
-                        continue;
+
                 /* XXX we should make sure that nobody's sleeping on these! */
                 DEBUG_REQ(D_HA, req, "cleaning up from sending list");
                 list_del_init(&req->rq_list);
                 req->rq_import = NULL;
-                spin_unlock(&conn->c_lock);
-                ptlrpc_req_finished(req);
-                goto restart1;
+                __ptlrpc_req_finished(req, 0);
         }
-restart2:
-        list_for_each_safe(tmp, saved, &conn->c_dying_head) {
-                req = list_entry(tmp, struct ptlrpc_request, rq_list);
-                if (req->rq_import != imp)
-                        continue;
-                DEBUG_REQ(D_ERROR, req, "on dying list at cleanup");
-                list_del_init(&req->rq_list);
-                req->rq_import = NULL;
-                spin_unlock(&conn->c_lock);
-                ptlrpc_req_finished(req);
-                spin_lock(&conn->c_lock);
-                goto restart2;
-        }
-        spin_unlock(&conn->c_lock);
-
+        spin_unlock(&imp->imp_lock);
+        
         EXIT;
         return;
 }
@@ -548,8 +537,7 @@ restart2:
 void ptlrpc_continue_req(struct ptlrpc_request *req)
 {
         ENTRY;
-        CDEBUG(D_HA, "continue delayed request "LPD64" opc %d\n",
-               req->rq_xid, req->rq_reqmsg->opc);
+        DEBUG_REQ(D_HA, req, "continuing delayed request");
         req->rq_reqmsg->addr = req->rq_import->imp_handle.addr;
         req->rq_reqmsg->cookie = req->rq_import->imp_handle.cookie;
         wake_up(&req->rq_wait_for_rep);
@@ -559,8 +547,7 @@ void ptlrpc_continue_req(struct ptlrpc_request *req)
 void ptlrpc_resend_req(struct ptlrpc_request *req)
 {
         ENTRY;
-        CDEBUG(D_HA, "resend request "LPD64", opc %d\n",
-               req->rq_xid, req->rq_reqmsg->opc);
+        DEBUG_REQ(D_HA, req, "resending");
         req->rq_reqmsg->addr = req->rq_import->imp_handle.addr;
         req->rq_reqmsg->cookie = req->rq_import->imp_handle.cookie;
         req->rq_status = -EAGAIN;
@@ -574,10 +561,9 @@ void ptlrpc_resend_req(struct ptlrpc_request *req)
 void ptlrpc_restart_req(struct ptlrpc_request *req)
 {
         ENTRY;
-        CDEBUG(D_HA, "restart completed request "LPD64", opc %d\n",
-               req->rq_xid, req->rq_reqmsg->opc);
+        DEBUG_REQ(D_HA, req, "restarting (possibly-)completed request");
         req->rq_status = -ERESTARTSYS;
-        req->rq_flags |= PTL_RPC_FL_RECOVERY;
+        req->rq_flags |= PTL_RPC_FL_RESTART;
         req->rq_flags &= ~PTL_RPC_FL_TIMEOUT;
         wake_up(&req->rq_wait_for_rep);
         EXIT;
@@ -654,21 +640,16 @@ int ptlrpc_queue_wait(struct ptlrpc_request *req)
 {
         int rc = 0;
         struct l_wait_info lwi;
-        //struct ptlrpc_client *cli = req->rq_import->imp_client;
-        struct ptlrpc_connection *conn = req->rq_import->imp_connection;
+        struct obd_import *imp = req->rq_import;
+        struct ptlrpc_connection *conn = imp->imp_connection;
         ENTRY;
 
         init_waitqueue_head(&req->rq_wait_for_rep);
         req->rq_reqmsg->status = HTON__u32(current->pid); /* for distributed debugging */
-        CDEBUG(D_RPCTRACE, "Sending RPC pid:xid:nid:opc %d:"
-               LPX64":%x:%d\n", 
-               NTOH__u32(req->rq_reqmsg->status), 
-               req->rq_xid,
-               conn->c_peer.peer_nid,
-               NTOH__u32(req->rq_reqmsg->opc)
-               );
+        CDEBUG(D_RPCTRACE, "Sending RPC pid:xid:nid:opc %d:"LPU64":%x:%d\n",
+               NTOH__u32(req->rq_reqmsg->status), req->rq_xid,
+               conn->c_peer.peer_nid, NTOH__u32(req->rq_reqmsg->opc));
 
-        //DEBUG_REQ(D_HA, req, "subsys: %s:", cli->cli_name);
 
         /* XXX probably both an import and connection level are needed */
         if (req->rq_level > conn->c_level) {
@@ -703,18 +684,20 @@ int ptlrpc_queue_wait(struct ptlrpc_request *req)
         EIO_IF_INVALID(conn, req);
 
         list_del(&req->rq_list);
-        list_add_tail(&req->rq_list, &conn->c_sending_head);
+        list_add_tail(&req->rq_list, &imp->imp_request_list);
         spin_unlock(&conn->c_lock);
         rc = ptl_send_rpc(req);
         if (rc) {
                 CDEBUG(D_HA, "error %d, opcode %d, need recovery\n", rc,
                        req->rq_reqmsg->opc);
-                /* the sleep below will time out, triggering recovery */
+                /* sleep for a jiffy, then trigger recovery */
+                lwi = LWI_TIMEOUT_INTR(1, expired_request,
+                                       interrupted_request, req);
+        } else {
+                DEBUG_REQ(D_NET, req, "-- sleeping");
+                lwi = LWI_TIMEOUT_INTR(req->rq_timeout * HZ, expired_request,
+                                       interrupted_request, req);
         }
-
-        DEBUG_REQ(D_NET, req, "-- sleeping");
-        lwi = LWI_TIMEOUT_INTR(req->rq_timeout * HZ, expired_request,
-                               interrupted_request, req);
         l_wait_event(req->rq_wait_for_rep, ptlrpc_check_reply(req), &lwi);
         DEBUG_REQ(D_NET, req, "-- done sleeping");
 
@@ -761,7 +744,7 @@ int ptlrpc_queue_wait(struct ptlrpc_request *req)
                 GOTO(out, rc = -EINVAL);
         }
 #endif
-        CDEBUG(D_NET, "got rep "LPD64"\n", req->rq_xid);
+        CDEBUG(D_NET, "got rep "LPU64"\n", req->rq_xid);
         if (req->rq_repmsg->status == 0)
                 CDEBUG(D_NET, "--> buf %p len %d status %d\n", req->rq_repmsg,
                        req->rq_replen, req->rq_repmsg->status);
@@ -773,8 +756,10 @@ int ptlrpc_queue_wait(struct ptlrpc_request *req)
          *
          * But don't commit anything that's kept indefinitely for replay (has
          * the PTL_RPC_FL_REPLAY flag set), such as open requests.
+         *
+         * This needs to match the commit test in ptlrpc_free_committed().
          */
-        if ((req->rq_import->imp_flags & IMP_REPLAYABLE) == 0 ||
+        if (!(req->rq_import->imp_flags & IMP_REPLAYABLE) ||
             (req->rq_repmsg->transno == 0 &&
              (req->rq_flags & PTL_RPC_FL_REPLAY) == 0)) {
                 /* This import doesn't support replay, so we can just "commit"
@@ -782,20 +767,17 @@ int ptlrpc_queue_wait(struct ptlrpc_request *req)
                  */
                 DEBUG_REQ(D_HA, req, "not replayable, committing:");
                 list_del_init(&req->rq_list);
-                spin_unlock(&conn->c_lock);
-                ptlrpc_req_finished(req); /* Must be called unlocked. */
-                spin_lock(&conn->c_lock);
-        } else /* if (req->rq_import->imp_flags & IMP_REPLAYABLE) */ {
+                __ptlrpc_req_finished(req, 1);
+        }
+        if (req->rq_import->imp_flags & IMP_REPLAYABLE) {
                 /* Replay-enabled imports return commit-status information. */
-                /* XXX this needs to be per-import, or multiple MDS services on
-                 * XXX the same system are going to interfere messily with each
-                 * XXX others' transno spaces.
-                 */
-                conn->c_last_xid = req->rq_repmsg->last_xid;
-                conn->c_last_committed = req->rq_repmsg->last_committed;
-                ptlrpc_free_committed(conn);
+                imp->imp_peer_last_xid = req->rq_repmsg->last_xid;
+                imp->imp_peer_committed_transno = 
+                        req->rq_repmsg->last_committed;
+                ptlrpc_free_committed(imp);
         }
 
+        rc = ptlrpc_check_status(req);
         spin_unlock(&conn->c_lock);
 
         EXIT;
index df2a2c2..2458b08 100644 (file)
@@ -80,21 +80,17 @@ struct ptlrpc_connection *ptlrpc_get_connection(struct lustre_peer *peer,
                 GOTO(out, c);
 
         c->c_level = LUSTRE_CONN_NEW;
-        c->c_xid_in = 1;
-        c->c_xid_out = 1;
         c->c_generation = 1;
         c->c_epoch = 1;
         c->c_bootcount = 0;
         c->c_flags = 0;
         if (uuid)
                 strcpy(c->c_remote_uuid, uuid);
-        INIT_LIST_HEAD(&c->c_delayed_head);
-        INIT_LIST_HEAD(&c->c_sending_head);
-        INIT_LIST_HEAD(&c->c_dying_head);
         INIT_LIST_HEAD(&c->c_imports);
         INIT_LIST_HEAD(&c->c_exports);
         INIT_LIST_HEAD(&c->c_sb_chain);
         INIT_LIST_HEAD(&c->c_recovd_data.rd_managed_chain);
+        INIT_LIST_HEAD(&c->c_delayed_head);
         atomic_set(&c->c_refcount, 0);
         ptlrpc_connection_addref(c);
         spin_lock_init(&c->c_lock);
@@ -164,8 +160,8 @@ void ptlrpc_cleanup_connection(void)
         }
         list_for_each_safe(tmp, pos, &conn_list) {
                 c = list_entry(tmp, struct ptlrpc_connection, c_link);
-                CERROR("Connection %p has refcount %d at cleanup (nid=%lu)!\n",
-                       c, atomic_read(&c->c_refcount),
+                CERROR("Connection %p/%s has refcount %d (nid=%lu)\n",
+                       c, c->c_remote_uuid, atomic_read(&c->c_refcount),
                        (unsigned long)c->c_peer.peer_nid);
                 list_del(&c->c_link);
                 OBD_FREE(c, sizeof(*c));
index 1520cf9..0bbc4b0 100644 (file)
@@ -124,8 +124,8 @@ void recovd_conn_fail(struct ptlrpc_connection *conn)
                 return;
         }
 
-        CERROR("connection %p to %s failed\n", conn, conn->c_remote_uuid);
-        CERROR("peer is %08x %08lx %08lx\n", conn->c_peer.peer_nid,
+        CERROR("connection %p to %s (%08x %08lx %08lx) failed\n", conn,
+               conn->c_remote_uuid, conn->c_peer.peer_nid,
                conn->c_peer.peer_ni.nal_idx, conn->c_peer.peer_ni.handle_idx);
         list_del(&rd->rd_managed_chain);
         list_add_tail(&rd->rd_managed_chain, &recovd->recovd_troubled_items);
index 060258f..9d955e6 100644 (file)
@@ -6,11 +6,11 @@
  * This code is issued under the GNU General Public License.
  * See the file COPYING in this distribution
  *
- * Copryright (C) 1996 Peter J. Braam <braam@stelias.com>
- * Copryright (C) 1999 Stelias Computing Inc. <braam@stelias.com>
- * Copryright (C) 1999 Seagate Technology Inc.
- * Copryright (C) 2001 Mountain View Data, Inc.
- * Copryright (C) 2002 Cluster File Systems, Inc.
+ * Copyright (C) 1996 Peter J. Braam <braam@stelias.com>
+ * Copyright (C) 1999 Stelias Computing Inc. <braam@stelias.com>
+ * Copyright (C) 1999 Seagate Technology Inc.
+ * Copyright (C) 2001 Mountain View Data, Inc.
+ * Copyright (C) 2002 Cluster File Systems, Inc.
  *
  */
 
@@ -40,7 +40,6 @@ int ptlrpc_reconnect_import(struct obd_import *imp, int rq_opc)
         request->rq_level = LUSTRE_CONN_NEW;
         request->rq_replen = lustre_msg_size(0, NULL);
         /*
-
          * This address is the export that represents our client-side LDLM
          * service (for ASTs).  We should only have one on this list, so we
          * just grab the first one.
@@ -52,24 +51,55 @@ int ptlrpc_reconnect_import(struct obd_import *imp, int rq_opc)
         request->rq_reqmsg->addr = (__u64)(unsigned long)ldlmexp;
         request->rq_reqmsg->cookie = ldlmexp->exp_cookie;
         rc = ptlrpc_queue_wait(request);
-        rc = ptlrpc_check_status(request, rc);
-        if (rc) {
+        switch (rc) {
+            case EALREADY:
+            case -EALREADY:
+                /* already connected! */
+                memset(&old_hdl, 0, sizeof(old_hdl));
+                if (!memcmp(&old_hdl.addr, &request->rq_repmsg->addr,
+                            sizeof (old_hdl.addr)) &&
+                    !memcmp(&old_hdl.cookie, &request->rq_repmsg->cookie,
+                            sizeof (old_hdl.cookie))) {
+                        CERROR("%s@%s didn't like our handle %Lx/%Lx, failed\n",
+                               cli->cl_target_uuid, conn->c_remote_uuid,
+                               (__u64)(unsigned long)ldlmexp,
+                               ldlmexp->exp_cookie);
+                        GOTO(out_disc, rc = -ENOTCONN);
+                }
+
+                old_hdl.addr = request->rq_repmsg->addr;
+                old_hdl.cookie = request->rq_repmsg->cookie;
+                if (memcmp(&imp->imp_handle, &old_hdl, sizeof(old_hdl))) {
+                        CERROR("%s@%s changed handle from %Lx/%Lx to %Lx/%Lx; "
+                               "copying, but this may foreshadow disaster\n",
+                               cli->cl_target_uuid, conn->c_remote_uuid,
+                               old_hdl.addr, old_hdl.cookie,
+                               imp->imp_handle.addr, imp->imp_handle.cookie);
+                        imp->imp_handle.addr = request->rq_repmsg->addr;
+                        imp->imp_handle.cookie = request->rq_repmsg->cookie;
+                        GOTO(out_disc, rc = EALREADY);
+                }
+                
+                CERROR("reconnected to %s@%s after partition\n",
+                       cli->cl_target_uuid, conn->c_remote_uuid);
+                GOTO(out_disc, rc = EALREADY);
+            case 0:
+                old_hdl = imp->imp_handle;
+                imp->imp_handle.addr = request->rq_repmsg->addr;
+                imp->imp_handle.cookie = request->rq_repmsg->cookie;
+                CERROR("now connected to %s@%s (%Lx/%Lx, was %Lx/%Lx)!\n",
+                       cli->cl_target_uuid, conn->c_remote_uuid,
+                       imp->imp_handle.addr, imp->imp_handle.cookie,
+                       old_hdl.addr, old_hdl.cookie);
+                GOTO(out_disc, rc = 0);
+            default:
                 CERROR("cannot connect to %s@%s: rc = %d\n",
                        cli->cl_target_uuid, conn->c_remote_uuid, rc);
-                ptlrpc_free_req(request);
-                GOTO(out_disc, rc = -ENOTCONN);
+                GOTO(out_disc, rc = -ENOTCONN); /* XXX preserve rc? */
         }
-        
-        old_hdl = imp->imp_handle;
-        imp->imp_handle.addr = request->rq_repmsg->addr;
-        imp->imp_handle.cookie = request->rq_repmsg->cookie;
-        CERROR("reconnected to %s@%s (%Lx/%Lx, was %Lx/%Lx)!\n",
-               cli->cl_target_uuid, conn->c_remote_uuid,
-               imp->imp_handle.addr, imp->imp_handle.cookie,
-               old_hdl.addr, old_hdl.cookie);
-        ptlrpc_req_finished(request);
 
  out_disc:
+        ptlrpc_req_finished(request);
         return rc;
 }
 
@@ -113,23 +143,16 @@ int ptlrpc_run_recovery_upcall(struct ptlrpc_connection *conn)
 #define REPLAY_RESEND        2 /* Resend required. */
 #define REPLAY_RESEND_IGNORE 3 /* Resend, ignore the reply (already saw it). */
 #define REPLAY_RESTART       4 /* Have to restart the call, sorry! */
-#define REPLAY_NO_STATE      5 /* Request doesn't change MDS state: skip. */
 
-static int replay_state(struct ptlrpc_request *req, __u64 last_xid)
+static int replay_state(struct ptlrpc_request *req, __u64 committed)
 {
         /* This request must always be replayed. */
         if (req->rq_flags & PTL_RPC_FL_REPLAY)
                 return REPLAY_REPLAY;
 
         /* Uncommitted request */
-        if (req->rq_xid > last_xid) {
+        if (req->rq_transno > committed) {
                 if (req->rq_flags & PTL_RPC_FL_REPLIED) {
-                        if (req->rq_transno == 0) {
-                                /* If no transno was returned, no state was
-                                   altered on the MDS. */
-                                return REPLAY_NO_STATE;
-                        }
-
                         /* Saw reply, so resend and ignore new reply. */
                         return REPLAY_RESEND_IGNORE;
                 }
@@ -149,7 +172,6 @@ static int replay_state(struct ptlrpc_request *req, __u64 last_xid)
 static char *replay_state2str(int state) {
         static char *state_strings[] = {
                 "COMMITTED", "REPLAY", "RESEND", "RESEND_IGNORE", "RESTART",
-                "NO_STATE"
         };
         static char *unknown_state = "UNKNOWN";
 
@@ -161,36 +183,52 @@ static char *replay_state2str(int state) {
         return state_strings[state];
 }
 
-int ptlrpc_replay(struct ptlrpc_connection *conn)
+int ptlrpc_replay(struct obd_import *imp, int unreplied_only)
 {
-        int rc = 0;
+        int rc = 0, state;
         struct list_head *tmp, *pos;
         struct ptlrpc_request *req;
+        struct ptlrpc_connection *conn = imp->imp_connection;
+        __u64 committed = imp->imp_peer_committed_transno;
         ENTRY;
 
-        spin_lock(&conn->c_lock);
+        spin_lock(&imp->imp_lock);
 
-        CDEBUG(D_HA, "connection %p to %s has last_xid "LPD64"\n",
-               conn, conn->c_remote_uuid, conn->c_last_xid);
+        CDEBUG(D_HA, "import %p from %s has committed "LPD64"\n",
+               imp, imp->imp_obd->u.cli.cl_target_uuid, committed);
 
-        list_for_each(tmp, &conn->c_sending_head) {
-                int state;
+        list_for_each(tmp, &imp->imp_request_list) {
                 req = list_entry(tmp, struct ptlrpc_request, rq_list);
-                state = replay_state(req, conn->c_last_xid);
+                state = replay_state(req, committed);
                 DEBUG_REQ(D_HA, req, "SENDING: %s: ", replay_state2str(state));
         }
 
         list_for_each(tmp, &conn->c_delayed_head) {
-                int state;
                 req = list_entry(tmp, struct ptlrpc_request, rq_list);
-                state = replay_state(req, conn->c_last_xid);
-                DEBUG_REQ(D_HA, req, "DELAYED: ");
+                state = replay_state(req, committed);
+                DEBUG_REQ(D_HA, req, "DELAYED: %s: ", replay_state2str(state));
         }
 
-        list_for_each_safe(tmp, pos, &conn->c_sending_head) { 
+        list_for_each_safe(tmp, pos, &imp->imp_request_list) { 
                 req = list_entry(tmp, struct ptlrpc_request, rq_list);
-                
-                switch (replay_state(req, conn->c_last_xid)) {
+
+                if (unreplied_only) {
+                        if (!(req->rq_flags & PTL_RPC_FL_REPLIED)) {
+                                DEBUG_REQ(D_HA, req, "UNREPLIED:");
+                                ptlrpc_restart_req(req);
+                        }
+                        continue;
+                }
+
+                state = replay_state(req, committed);
+
+                if (req->rq_transno == imp->imp_max_transno) {
+                        req->rq_reqmsg->flags |= MSG_LAST_REPLAY;
+                        DEBUG_REQ(D_HA, req, "last for replay");
+                        LASSERT(state != REPLAY_COMMITTED);
+                }
+
+                switch (state) {
                     case REPLAY_REPLAY:
                         DEBUG_REQ(D_HA, req, "REPLAY:");
                         rc = ptlrpc_replay_req(req);
@@ -208,14 +246,8 @@ int ptlrpc_replay(struct ptlrpc_connection *conn)
                         }
                         break;
 
-
                     case REPLAY_COMMITTED:
-                        DEBUG_REQ(D_HA, req, "COMMITTED:");
-                        /* XXX commit now? */
-                        break;
-
-                    case REPLAY_NO_STATE:
-                        DEBUG_REQ(D_HA, req, "NO_STATE:");
+                        DEBUG_REQ(D_ERROR, req, "COMMITTED:");
                         /* XXX commit now? */
                         break;
 
index f33fa17..7263ac0 100644 (file)
@@ -254,7 +254,6 @@ EXPORT_SYMBOL(ptlrpc_prep_bulk);
 EXPORT_SYMBOL(ptlrpc_free_bulk);
 EXPORT_SYMBOL(ptlrpc_prep_bulk_page);
 EXPORT_SYMBOL(ptlrpc_free_bulk_page);
-EXPORT_SYMBOL(ptlrpc_check_status);
 EXPORT_SYMBOL(ll_brw_sync_wait);
 
 /* service.c */
index 9e7e42c..d74ba89 100755 (executable)
@@ -888,7 +888,12 @@ class LOV(Module):
             osc = lookup(self.dom_node.parentNode, osc_uuid)
             if osc:
                 n = OSC(osc)
-                n.prepare()
+                try:
+                    # Ignore connection failures, because the LOV will DTRT with
+                    # an unconnected OSC.
+                    n.prepare(ignore_connect_failure=1)
+                except CommandError:
+                    print "Error preparing OSC %s (inactive)\n" % osc_uuid
             else:
                 panic('osc not found:', osc_uuid)
         mdc_uuid = prepare_mdc(self.dom_node.parentNode, self.mds_uuid)
@@ -1089,19 +1094,23 @@ class OSC(Module):
         self.lookup_server(self.ost_uuid)
         self.add_module('lustre/osc', 'osc')
 
-    def prepare(self):
+    def prepare(self, ignore_connect_failure = 0):
         if is_prepared(self.uuid):
             return
         self.info(self.obd_uuid, self.ost_uuid)
         srv = self.get_server()
-        if local_net(srv):
-            lctl.connect(srv.net_type, srv.nid, srv.port, srv.uuid, srv.send_mem, srv.recv_mem)
-        else:
-            r =  find_route(srv)
-            if r:
-                lctl.add_route_host(r[0], srv.uuid, r[1], r[2])
+        try:
+            if local_net(srv):
+                lctl.connect(srv.net_type, srv.nid, srv.port, srv.uuid, srv.send_mem, srv.recv_mem)
             else:
-                panic ("no route to",  srv.nid)
+                r =  find_route(srv)
+                if r:
+                    lctl.add_route_host(r[0], srv.uuid, r[1], r[2])
+                else:
+                    panic ("no route to",  srv.nid)
+        except CommandError:
+            if (ignore_connect_failure == 0):
+                pass
             
         lctl.newdev(attach="osc %s %s" % (self.name, self.uuid),
                     setup ="%s %s" %(self.obd_uuid, srv.uuid))