Whamcloud - gitweb
Landing of b_recovery (at last).
[fs/lustre-release.git] / lustre / mds / handler.c
index d78ad53..4f5f6e3 100644 (file)
@@ -8,6 +8,7 @@
  *   Author: Peter Braam <braam@clusterfs.com>
  *   Author: Andreas Dilger <adilger@clusterfs.com>
  *   Author: Phil Schwan <phil@clusterfs.com>
+ *   Author: Mike Shaver <shaver@clusterfs.com>
  *
  *   This file is part of Lustre, http://www.lustre.org.
  *
@@ -46,8 +47,9 @@ static kmem_cache_t *mds_file_cache;
 extern int mds_get_lovtgts(struct mds_obd *obd, int tgt_count,
                            obd_uuid_t *uuidarray);
 extern int mds_get_lovdesc(struct mds_obd  *obd, struct lov_desc *desc);
-extern int mds_update_last_rcvd(struct mds_obd *mds, void *handle,
-                                struct ptlrpc_request *req);
+extern void mds_start_transno(struct mds_obd *mds);
+extern int mds_finish_transno(struct mds_obd *mds, void *handle,
+                              struct ptlrpc_request *req, int rc);
 static int mds_cleanup(struct obd_device * obddev);
 
 extern struct lprocfs_vars status_var_nm_1[];
@@ -63,7 +65,7 @@ static int mds_bulk_timeout(void *data)
         struct ptlrpc_bulk_desc *desc = data;
 
         ENTRY;
-        CERROR("(not yet) starting recovery of client %p\n", desc->bd_client);
+        recovd_conn_fail(desc->bd_connection);
         RETURN(1);
 }
 
@@ -113,7 +115,8 @@ static int mds_sendpage(struct ptlrpc_request *req, struct file *file,
         }
 
         lwi = LWI_TIMEOUT(obd_timeout * HZ, mds_bulk_timeout, desc);
-        rc = l_wait_event(desc->bd_waitq, desc->bd_flags & PTL_BULK_FL_SENT, &lwi);
+        rc = l_wait_event(desc->bd_waitq, desc->bd_flags & PTL_BULK_FL_SENT,
+                          &lwi);
         if (rc) {
                 if (rc != -ETIMEDOUT)
                         LBUG();
@@ -301,27 +304,53 @@ static int mds_connect(struct lustre_handle *conn, struct obd_device *obd,
                         CERROR("FYI: NULL mcd - simultaneous connects\n");
                         continue;
                 }
-                if (!memcmp(cluuid, mcd->mcd_uuid, sizeof(mcd->mcd_uuid))) {
+                if (!memcmp(cluuid, mcd->mcd_uuid, sizeof mcd->mcd_uuid)) {
+                        /* XXX make handle-found-export a subroutine */
                         LASSERT(exp->exp_obd == obd);
 
-                        if (!list_empty(&exp->exp_conn_chain)) {
-                                CERROR("existing uuid/export, list not empty!\n");
-                                spin_unlock(&obd->obd_dev_lock);
+                        spin_unlock(&obd->obd_dev_lock);
+                        if (exp->exp_connection) {
+                                struct lustre_handle *hdl;
+                                hdl = &exp->exp_ldlm_data.led_import.imp_handle;
+                                /* Might be a re-connect after a partition. */
+                                if (!memcmp(conn, hdl, sizeof *conn)) {
+                                        CERROR("%s reconnecting\n", cluuid);
+                                        conn->addr = (__u64) (unsigned long)exp;
+                                        conn->cookie = exp->exp_cookie;
+                                        rc = EALREADY;
+                                } else {
+                                        CERROR("%s reconnecting from %s, "
+                                               "handle mismatch (ours %Lx/%Lx, "
+                                               "theirs %Lx/%Lx)\n", cluuid,
+                                               exp->exp_connection->
+                                               c_remote_uuid, hdl->addr,
+                                               hdl->cookie, conn->addr,
+                                               conn->cookie);
+                                        /* XXX disconnect them here? */
+                                        memset(conn, 0, sizeof *conn);
+                                        rc = -EALREADY;
+                                }
                                 MOD_DEC_USE_COUNT;
-                                RETURN(-EALREADY);
+                                RETURN(rc);
                         }
                         conn->addr = (__u64) (unsigned long)exp;
                         conn->cookie = exp->exp_cookie;
-                        spin_unlock(&obd->obd_dev_lock);
                         CDEBUG(D_INFO, "existing export for UUID '%s' at %p\n",
                                cluuid, exp);
                         CDEBUG(D_IOCTL,"connect: addr %Lx cookie %Lx\n",
                                (long long)conn->addr, (long long)conn->cookie);
-                        MOD_DEC_USE_COUNT;
                         RETURN(0);
                 }
         }
         spin_unlock(&obd->obd_dev_lock);
+
+        if (obd->u.mds.mds_recoverable_clients != 0) {
+                CERROR("denying connection for new client %s: in recovery\n",
+                       cluuid);
+                MOD_DEC_USE_COUNT;
+                RETURN(-EBUSY);
+        }
+
         /* XXX There is a small race between checking the list and adding a
          * new connection for the same UUID, but the real threat (list
          * corruption when multiple different clients connect) is solved.
@@ -351,7 +380,7 @@ static int mds_connect(struct lustre_handle *conn, struct obd_device *obd,
         INIT_LIST_HEAD(&med->med_open_head);
         spin_lock_init(&med->med_open_lock);
 
-        rc = mds_client_add(med, -1);
+        rc = mds_client_add(&obd->u.mds, med, -1);
         if (rc)
                 GOTO(out_mcd, rc);
 
@@ -836,13 +865,16 @@ static int mds_store_md(struct mds_obd *mds, struct ptlrpc_request *req,
         uc.ouc_fsgid = body->fsgid;
         uc.ouc_cap = body->capability;
         push_ctxt(&saved, &mds->mds_ctxt, &uc);
+        mds_start_transno(mds);
         handle = mds_fs_start(mds, inode, MDS_FSOP_SETATTR);
-        if (!handle)
-                GOTO(out_ea, rc = -ENOMEM);
+        if (IS_ERR(handle)) {
+                rc = PTR_ERR(handle);
+                mds_finish_transno(mds, handle, req, rc);
+                GOTO(out_ea, rc);
+        }
 
         rc = mds_fs_set_md(mds, inode, handle, lmm, lmm_size);
-        if (!rc)
-                rc = mds_update_last_rcvd(mds, handle, req);
+        rc = mds_finish_transno(mds, handle, req, rc);
 
         rc2 = mds_fs_commit(mds, inode, handle);
         if (rc2 && !rc)
@@ -1058,9 +1090,162 @@ int mds_reint(struct ptlrpc_request *req, int offset)
         return rc;
 }
 
+/* forward declaration */
+int mds_handle(struct ptlrpc_request *req);
+
+static int check_for_next_transno(struct mds_obd *mds)
+{
+        struct ptlrpc_request *req;
+        req = list_entry(mds->mds_recovery_queue.next, 
+                         struct ptlrpc_request, rq_list);
+        return req->rq_reqmsg->transno == mds->mds_next_recovery_transno;
+}
+
+static void process_recovery_queue(struct mds_obd *mds)
+{
+        struct ptlrpc_request *req;
+        
+        for (;;) {
+                spin_lock(&mds->mds_processing_task_lock);
+                req = list_entry(mds->mds_recovery_queue.next, 
+                                 struct ptlrpc_request, rq_list);
+
+                if (req->rq_reqmsg->transno != mds->mds_next_recovery_transno) {
+                        spin_unlock(&mds->mds_processing_task_lock);
+                        wait_event(mds->mds_next_transno_waitq,
+                                   check_for_next_transno(mds));
+                        continue;
+                }
+                list_del(&req->rq_list);
+                spin_unlock(&mds->mds_processing_task_lock);
+
+                DEBUG_REQ(D_HA, req, "");
+                mds_handle(req);
+                
+                if (list_empty(&mds->mds_recovery_queue))
+                        break;
+        }
+}
+
+static int queue_recovery_request(struct ptlrpc_request *req,
+                                  struct mds_obd *mds)
+{
+        struct list_head *tmp;
+        int inserted = 0, transno = req->rq_reqmsg->transno;
+
+        if (!transno) {
+                DEBUG_REQ(D_HA, req, "not queueing");
+                return 1;
+        }
+
+        spin_lock(&mds->mds_processing_task_lock);
+
+        if (mds->mds_processing_task == current->pid) {
+                /* Processing the queue right now, don't re-add. */
+                spin_unlock(&mds->mds_processing_task_lock);
+                return 1;
+        }
+
+        /* XXX O(n^2) */
+        list_for_each(tmp, &mds->mds_recovery_queue) {
+                struct ptlrpc_request *reqiter = 
+                        list_entry(tmp, struct ptlrpc_request, rq_list);
+                if (reqiter->rq_reqmsg->transno > transno) {
+                        list_add_tail(&req->rq_list, &reqiter->rq_list);
+                        inserted = 1;
+                        break;
+                }
+        }
+
+        if (!inserted)
+                list_add_tail(&req->rq_list, &mds->mds_recovery_queue);
+
+        if (mds->mds_processing_task != 0) {
+                /* Someone else is processing this queue, we'll leave it to
+                 * them.
+                 */
+                spin_unlock(&mds->mds_processing_task_lock);
+                if (transno == mds->mds_next_recovery_transno)
+                        wake_up(&mds->mds_next_transno_waitq);
+                return 0;
+        }
+
+        /* Nobody is processing, and we know there's (at least) one to process
+         * now, so we'll do the honours.
+         */
+        mds->mds_processing_task = current->pid;
+        spin_unlock(&mds->mds_processing_task_lock);
+
+        process_recovery_queue(mds);
+        return 0;
+}
+
+static int filter_recovery_request(struct ptlrpc_request *req, 
+                                   struct mds_obd *mds, int *process)
+{
+        switch (req->rq_reqmsg->opc) {
+        case MDS_CONNECT:
+        case MDS_DISCONNECT:
+        case MDS_OPEN:
+               *process = 1;
+               RETURN(0);
+            
+        case MDS_GETSTATUS: /* used in unmounting */
+        case MDS_REINT:
+        case LDLM_ENQUEUE:
+                *process = queue_recovery_request(req, mds);
+                RETURN(0);
+                
+        default:
+                DEBUG_REQ(D_ERROR, req, "not permitted during recovery");
+                *process = 0;
+                RETURN(ptlrpc_error(req->rq_svc, req));
+        }
+}
+
+static int mds_queue_final_reply(struct ptlrpc_request *req, int rc)
+{
+        struct mds_obd *mds = mds_req2mds(req);
+
+        if (rc) {
+                /* Just like ptlrpc_error, but without the sending. */
+                lustre_pack_msg(0, NULL, NULL, &req->rq_replen,
+                                &req->rq_repmsg);
+                req->rq_type = PTL_RPC_MSG_ERR;
+        }
+
+        list_add(&req->rq_list, &mds->mds_delayed_reply_queue);
+        if (--mds->mds_recoverable_clients == 0) {
+                struct list_head *tmp, *n;
+
+                CDEBUG(D_HA,
+                       "all clients recovered, sending delayed replies\n");
+                list_for_each_safe(tmp, n, &mds->mds_delayed_reply_queue) {
+                        req = list_entry(tmp, struct ptlrpc_request, rq_list);
+                        DEBUG_REQ(D_HA, req, "delayed:");
+                        ptlrpc_reply(req->rq_svc, req);
+                }
+        } else {
+                CDEBUG(D_HA, "%d recoverable clients remain\n",
+                       mds->mds_recoverable_clients);
+        }
+
+        return 1;
+}
+
+static char *reint_names[] = {
+        [REINT_SETATTR] "setattr",
+        [REINT_CREATE]  "create",
+        [REINT_LINK]    "link",
+        [REINT_UNLINK]  "unlink",
+        [REINT_RENAME]  "rename"
+};
+
 int mds_handle(struct ptlrpc_request *req)
 {
         int rc;
+        int should_process;
+        struct mds_obd *mds = NULL; /* quell gcc overwarning */
         ENTRY;
 
         rc = lustre_unpack_msg(req->rq_reqmsg, req->rq_reqlen);
@@ -1069,49 +1254,67 @@ int mds_handle(struct ptlrpc_request *req)
                 GOTO(out, rc);
         }
 
-        if (req->rq_reqmsg->opc != MDS_CONNECT && req->rq_export == NULL)
-                GOTO(out, rc = -ENOTCONN);
-
         LASSERT(!strcmp(req->rq_obd->obd_type->typ_name, LUSTRE_MDT_NAME));
 
+        if (req->rq_reqmsg->opc != MDS_CONNECT) {
+                if (req->rq_export == NULL)
+                        GOTO(out, rc = -ENOTCONN);
+
+                mds = mds_req2mds(req);
+                if (mds->mds_recoverable_clients != 0) {
+                        rc = filter_recovery_request(req, mds, &should_process);
+                        if (rc || !should_process)
+                                RETURN(rc);
+                }
+        }
+
         switch (req->rq_reqmsg->opc) {
         case MDS_CONNECT:
-                CDEBUG(D_INODE, "connect\n");
+                DEBUG_REQ(D_INODE, req, "connect");
                 OBD_FAIL_RETURN(OBD_FAIL_MDS_CONNECT_NET, 0);
                 rc = target_handle_connect(req);
+                /* Make sure that last_rcvd is correct. */
+                if (!rc) {
+                        /* Now that we have an export, set mds. */
+                        mds = mds_req2mds(req);
+                        mds_fsync_super(mds->mds_sb);
+                }
                 break;
 
         case MDS_DISCONNECT:
-                CDEBUG(D_INODE, "disconnect\n");
+                DEBUG_REQ(D_INODE, req, "disconnect");
                 OBD_FAIL_RETURN(OBD_FAIL_MDS_DISCONNECT_NET, 0);
                 rc = target_handle_disconnect(req);
+                /* Make sure that last_rcvd is correct. */
+                if (!rc)
+                        mds_fsync_super(mds->mds_sb);
                 goto out;
 
         case MDS_GETSTATUS:
-                CDEBUG(D_INODE, "getstatus\n");
+                DEBUG_REQ(D_INODE, req, "getstatus");
                 OBD_FAIL_RETURN(OBD_FAIL_MDS_GETSTATUS_NET, 0);
                 rc = mds_getstatus(req);
                 break;
 
         case MDS_GETLOVINFO:
-                CDEBUG(D_INODE, "getlovinfo\n");
+                DEBUG_REQ(D_INODE, req, "getlovinfo");
                 rc = mds_getlovinfo(req);
                 break;
 
         case MDS_GETATTR:
-                CDEBUG(D_INODE, "getattr\n");
+                DEBUG_REQ(D_INODE, req, "getattr");
                 OBD_FAIL_RETURN(OBD_FAIL_MDS_GETATTR_NET, 0);
                 rc = mds_getattr(0, req);
                 break;
 
         case MDS_STATFS:
-                CDEBUG(D_INODE, "statfs\n");
+                DEBUG_REQ(D_INODE, req, "statfs");
                 OBD_FAIL_RETURN(OBD_FAIL_MDS_STATFS_NET, 0);
                 rc = mds_statfs(req);
                 break;
 
         case MDS_READPAGE:
-                CDEBUG(D_INODE, "readpage\n");
+                DEBUG_REQ(D_INODE, req, "readpage\n");
                 OBD_FAIL_RETURN(OBD_FAIL_MDS_READPAGE_NET, 0);
                 rc = mds_readpage(req);
 
@@ -1121,7 +1324,13 @@ int mds_handle(struct ptlrpc_request *req)
 
         case MDS_REINT: {
                 int size = sizeof(struct mds_body);
-                CDEBUG(D_INODE, "reint\n");
+                int opc = *(u32 *)lustre_msg_buf(req->rq_reqmsg, 0), 
+                        realopc = opc & REINT_OPCODE_MASK;
+                        
+                DEBUG_REQ(D_INODE, req, "reint (%s%s)",
+                          reint_names[realopc],
+                          opc & REINT_REPLAYING ? "|REPLAYING" : "");
+                          
                 OBD_FAIL_RETURN(OBD_FAIL_MDS_REINT_NET, 0);
 
                 rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen,
@@ -1136,30 +1345,30 @@ int mds_handle(struct ptlrpc_request *req)
                 }
 
         case MDS_OPEN:
-                CDEBUG(D_INODE, "open\n");
+                DEBUG_REQ(D_INODE, req, "open");
                 OBD_FAIL_RETURN(OBD_FAIL_MDS_OPEN_NET, 0);
                 rc = mds_open(req);
                 break;
 
         case MDS_CLOSE:
-                CDEBUG(D_INODE, "close\n");
+                DEBUG_REQ(D_INODE, req, "close");
                 OBD_FAIL_RETURN(OBD_FAIL_MDS_CLOSE_NET, 0);
                 rc = mds_close(req);
                 break;
 
         case LDLM_ENQUEUE:
-                CDEBUG(D_INODE, "enqueue\n");
+                DEBUG_REQ(D_INODE, req, "enqueue");
                 OBD_FAIL_RETURN(OBD_FAIL_LDLM_ENQUEUE, 0);
                 rc = ldlm_handle_enqueue(req);
                 break;
         case LDLM_CONVERT:
-                CDEBUG(D_INODE, "convert\n");
+                DEBUG_REQ(D_INODE, req, "convert");
                 OBD_FAIL_RETURN(OBD_FAIL_LDLM_CONVERT, 0);
                 rc = ldlm_handle_convert(req);
                 break;
         case LDLM_BL_CALLBACK:
         case LDLM_CP_CALLBACK:
-                CDEBUG(D_INODE, "callback\n");
+                DEBUG_REQ(D_INODE, req, "callback");
                 CERROR("callbacks should not happen on MDS\n");
                 LBUG();
                 OBD_FAIL_RETURN(OBD_FAIL_LDLM_BL_CALLBACK, 0);
@@ -1173,7 +1382,6 @@ int mds_handle(struct ptlrpc_request *req)
 
         if (!rc) {
                 struct mds_export_data *med = &req->rq_export->exp_mds_data;
-                struct mds_obd *mds = mds_req2mds(req);
 
                 req->rq_repmsg->last_xid =
                         HTON__u64(le64_to_cpu(med->med_mcd->mcd_last_xid));
@@ -1185,7 +1393,17 @@ int mds_handle(struct ptlrpc_request *req)
                        cpu_to_le32(req->rq_xid));
         }
  out:
-        if (rc) {
+
+        if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_LAST_REPLAY) {
+                struct mds_obd *mds = mds_req2mds(req);
+                LASSERT(mds->mds_recoverable_clients);
+                DEBUG_REQ(D_HA, req, "LAST_REPLAY, queuing reply");
+                return mds_queue_final_reply(req, rc);
+        }
+        
+        /* MDS_CONNECT / EALREADY (note: not -EALREADY!) isn't an error */
+        if (rc && (req->rq_reqmsg->opc != MDS_CONNECT ||
+                   rc != EALREADY)) {
                 CERROR("mds: processing error (opcode %d): %d\n",
                        req->rq_reqmsg->opc, rc);
                 ptlrpc_error(req->rq_svc, req);
@@ -1205,7 +1423,6 @@ int mds_handle(struct ptlrpc_request *req)
  *
  * Also assumes for mds_last_rcvd that we are not modifying it (no locking).
  */
-static
 int mds_update_server_data(struct mds_obd *mds)
 {
         struct mds_server_data *msd = mds->mds_server_data;
@@ -1238,12 +1455,14 @@ int mds_update_server_data(struct mds_obd *mds)
 }
 
 /* Do recovery actions for the MDS */
-static int mds_recover(struct obd_device *obddev)
+static int mds_recovery_complete(struct obd_device *obddev)
 {
         struct mds_obd *mds = &obddev->u.mds;
         struct obd_run_ctxt saved;
         int rc;
 
+        LASSERT(mds->mds_recoverable_clients == 0);
+
         /* This happens at the end when recovery is complete */
         ++mds->mds_mount_count;
         push_ctxt(&saved, &mds->mds_ctxt, NULL);
@@ -1283,7 +1502,7 @@ static int mds_setup(struct obd_device *obddev, obd_count len, void *buf)
         if (!mds->mds_sb)
                 GOTO(err_put, rc = -ENODEV);
 
-        spin_lock_init(&mds->mds_last_lock);
+        init_MUTEX(&mds->mds_transno_sem);
         mds->mds_max_mdsize = sizeof(struct lov_mds_md);
         rc = mds_fs_setup(obddev, mnt);
         if (rc) {
@@ -1298,14 +1517,14 @@ static int mds_setup(struct obd_device *obddev, obd_count len, void *buf)
                 GOTO(err_fs, rc = -ENOMEM);
         }
 
-
-        rc = mds_recover(obddev);
-        if (rc)
-                GOTO(err_fs, rc);
-
         ptlrpc_init_client(LDLM_CB_REQUEST_PORTAL, LDLM_CB_REPLY_PORTAL,
                            "mds_ldlm_client", &obddev->obd_ldlm_client);
 
+        spin_lock_init(&mds->mds_processing_task_lock);
+        mds->mds_processing_task = 0;
+        INIT_LIST_HEAD(&mds->mds_recovery_queue);
+        INIT_LIST_HEAD(&mds->mds_delayed_reply_queue);
+        
         RETURN(0);
 
 err_fs: