* Author: Peter Braam <braam@clusterfs.com>
* Author: Andreas Dilger <adilger@clusterfs.com>
* Author: Phil Schwan <phil@clusterfs.com>
+ * Author: Mike Shaver <shaver@clusterfs.com>
*
* This file is part of Lustre, http://www.lustre.org.
*
extern int mds_get_lovtgts(struct mds_obd *obd, int tgt_count,
obd_uuid_t *uuidarray);
extern int mds_get_lovdesc(struct mds_obd *obd, struct lov_desc *desc);
-extern int mds_update_last_rcvd(struct mds_obd *mds, void *handle,
- struct ptlrpc_request *req);
+extern void mds_start_transno(struct mds_obd *mds);
+extern int mds_finish_transno(struct mds_obd *mds, void *handle,
+ struct ptlrpc_request *req, int rc);
static int mds_cleanup(struct obd_device * obddev);
extern struct lprocfs_vars status_var_nm_1[];
struct ptlrpc_bulk_desc *desc = data;
ENTRY;
- CERROR("(not yet) starting recovery of client %p\n", desc->bd_client);
+ recovd_conn_fail(desc->bd_connection);
RETURN(1);
}
}
lwi = LWI_TIMEOUT(obd_timeout * HZ, mds_bulk_timeout, desc);
- rc = l_wait_event(desc->bd_waitq, desc->bd_flags & PTL_BULK_FL_SENT, &lwi);
+ rc = l_wait_event(desc->bd_waitq, desc->bd_flags & PTL_BULK_FL_SENT,
+ &lwi);
if (rc) {
if (rc != -ETIMEDOUT)
LBUG();
CERROR("FYI: NULL mcd - simultaneous connects\n");
continue;
}
- if (!memcmp(cluuid, mcd->mcd_uuid, sizeof(mcd->mcd_uuid))) {
+ if (!memcmp(cluuid, mcd->mcd_uuid, sizeof mcd->mcd_uuid)) {
+ /* XXX make handle-found-export a subroutine */
LASSERT(exp->exp_obd == obd);
- if (!list_empty(&exp->exp_conn_chain)) {
- CERROR("existing uuid/export, list not empty!\n");
- spin_unlock(&obd->obd_dev_lock);
+ spin_unlock(&obd->obd_dev_lock);
+ if (exp->exp_connection) {
+ struct lustre_handle *hdl;
+ hdl = &exp->exp_ldlm_data.led_import.imp_handle;
+ /* Might be a re-connect after a partition. */
+ if (!memcmp(conn, hdl, sizeof *conn)) {
+ CERROR("%s reconnecting\n", cluuid);
+ conn->addr = (__u64) (unsigned long)exp;
+ conn->cookie = exp->exp_cookie;
+ rc = EALREADY;
+ } else {
+ CERROR("%s reconnecting from %s, "
+ "handle mismatch (ours %Lx/%Lx, "
+ "theirs %Lx/%Lx)\n", cluuid,
+ exp->exp_connection->
+ c_remote_uuid, hdl->addr,
+ hdl->cookie, conn->addr,
+ conn->cookie);
+ /* XXX disconnect them here? */
+ memset(conn, 0, sizeof *conn);
+ rc = -EALREADY;
+ }
MOD_DEC_USE_COUNT;
- RETURN(-EALREADY);
+ RETURN(rc);
}
conn->addr = (__u64) (unsigned long)exp;
conn->cookie = exp->exp_cookie;
- spin_unlock(&obd->obd_dev_lock);
CDEBUG(D_INFO, "existing export for UUID '%s' at %p\n",
cluuid, exp);
CDEBUG(D_IOCTL,"connect: addr %Lx cookie %Lx\n",
(long long)conn->addr, (long long)conn->cookie);
- MOD_DEC_USE_COUNT;
RETURN(0);
}
}
spin_unlock(&obd->obd_dev_lock);
+
+ if (obd->u.mds.mds_recoverable_clients != 0) {
+ CERROR("denying connection for new client %s: in recovery\n",
+ cluuid);
+ MOD_DEC_USE_COUNT;
+ RETURN(-EBUSY);
+ }
+
/* XXX There is a small race between checking the list and adding a
* new connection for the same UUID, but the real threat (list
* corruption when multiple different clients connect) is solved.
INIT_LIST_HEAD(&med->med_open_head);
spin_lock_init(&med->med_open_lock);
- rc = mds_client_add(med, -1);
+ rc = mds_client_add(&obd->u.mds, med, -1);
if (rc)
GOTO(out_mcd, rc);
uc.ouc_fsgid = body->fsgid;
uc.ouc_cap = body->capability;
push_ctxt(&saved, &mds->mds_ctxt, &uc);
+ mds_start_transno(mds);
handle = mds_fs_start(mds, inode, MDS_FSOP_SETATTR);
- if (!handle)
- GOTO(out_ea, rc = -ENOMEM);
+ if (IS_ERR(handle)) {
+ rc = PTR_ERR(handle);
+ mds_finish_transno(mds, handle, req, rc);
+ GOTO(out_ea, rc);
+ }
rc = mds_fs_set_md(mds, inode, handle, lmm, lmm_size);
- if (!rc)
- rc = mds_update_last_rcvd(mds, handle, req);
+ rc = mds_finish_transno(mds, handle, req, rc);
rc2 = mds_fs_commit(mds, inode, handle);
if (rc2 && !rc)
return rc;
}
+/* forward declaration */
+int mds_handle(struct ptlrpc_request *req);
+
+static int check_for_next_transno(struct mds_obd *mds)
+{
+ struct ptlrpc_request *req;
+ req = list_entry(mds->mds_recovery_queue.next,
+ struct ptlrpc_request, rq_list);
+ return req->rq_reqmsg->transno == mds->mds_next_recovery_transno;
+}
+
+static void process_recovery_queue(struct mds_obd *mds)
+{
+ struct ptlrpc_request *req;
+
+ for (;;) {
+ spin_lock(&mds->mds_processing_task_lock);
+ req = list_entry(mds->mds_recovery_queue.next,
+ struct ptlrpc_request, rq_list);
+
+ if (req->rq_reqmsg->transno != mds->mds_next_recovery_transno) {
+ spin_unlock(&mds->mds_processing_task_lock);
+ wait_event(mds->mds_next_transno_waitq,
+ check_for_next_transno(mds));
+ continue;
+ }
+ list_del(&req->rq_list);
+ spin_unlock(&mds->mds_processing_task_lock);
+
+ DEBUG_REQ(D_HA, req, "");
+ mds_handle(req);
+
+ if (list_empty(&mds->mds_recovery_queue))
+ break;
+ }
+}
+
+static int queue_recovery_request(struct ptlrpc_request *req,
+ struct mds_obd *mds)
+{
+ struct list_head *tmp;
+ int inserted = 0, transno = req->rq_reqmsg->transno;
+
+ if (!transno) {
+ DEBUG_REQ(D_HA, req, "not queueing");
+ return 1;
+ }
+
+ spin_lock(&mds->mds_processing_task_lock);
+
+ if (mds->mds_processing_task == current->pid) {
+ /* Processing the queue right now, don't re-add. */
+ spin_unlock(&mds->mds_processing_task_lock);
+ return 1;
+ }
+
+ /* XXX O(n^2) */
+ list_for_each(tmp, &mds->mds_recovery_queue) {
+ struct ptlrpc_request *reqiter =
+ list_entry(tmp, struct ptlrpc_request, rq_list);
+ if (reqiter->rq_reqmsg->transno > transno) {
+ list_add_tail(&req->rq_list, &reqiter->rq_list);
+ inserted = 1;
+ break;
+ }
+ }
+
+ if (!inserted)
+ list_add_tail(&req->rq_list, &mds->mds_recovery_queue);
+
+ if (mds->mds_processing_task != 0) {
+ /* Someone else is processing this queue, we'll leave it to
+ * them.
+ */
+ spin_unlock(&mds->mds_processing_task_lock);
+ if (transno == mds->mds_next_recovery_transno)
+ wake_up(&mds->mds_next_transno_waitq);
+ return 0;
+ }
+
+ /* Nobody is processing, and we know there's (at least) one to process
+ * now, so we'll do the honours.
+ */
+ mds->mds_processing_task = current->pid;
+ spin_unlock(&mds->mds_processing_task_lock);
+
+ process_recovery_queue(mds);
+ return 0;
+}
+
+static int filter_recovery_request(struct ptlrpc_request *req,
+ struct mds_obd *mds, int *process)
+{
+ switch (req->rq_reqmsg->opc) {
+ case MDS_CONNECT:
+ case MDS_DISCONNECT:
+ case MDS_OPEN:
+ *process = 1;
+ RETURN(0);
+
+ case MDS_GETSTATUS: /* used in unmounting */
+ case MDS_REINT:
+ case LDLM_ENQUEUE:
+ *process = queue_recovery_request(req, mds);
+ RETURN(0);
+
+ default:
+ DEBUG_REQ(D_ERROR, req, "not permitted during recovery");
+ *process = 0;
+ RETURN(ptlrpc_error(req->rq_svc, req));
+ }
+}
+
+static int mds_queue_final_reply(struct ptlrpc_request *req, int rc)
+{
+ struct mds_obd *mds = mds_req2mds(req);
+
+ if (rc) {
+ /* Just like ptlrpc_error, but without the sending. */
+ lustre_pack_msg(0, NULL, NULL, &req->rq_replen,
+ &req->rq_repmsg);
+ req->rq_type = PTL_RPC_MSG_ERR;
+ }
+
+ list_add(&req->rq_list, &mds->mds_delayed_reply_queue);
+ if (--mds->mds_recoverable_clients == 0) {
+ struct list_head *tmp, *n;
+
+ CDEBUG(D_HA,
+ "all clients recovered, sending delayed replies\n");
+ list_for_each_safe(tmp, n, &mds->mds_delayed_reply_queue) {
+ req = list_entry(tmp, struct ptlrpc_request, rq_list);
+ DEBUG_REQ(D_HA, req, "delayed:");
+ ptlrpc_reply(req->rq_svc, req);
+ }
+ } else {
+ CDEBUG(D_HA, "%d recoverable clients remain\n",
+ mds->mds_recoverable_clients);
+ }
+
+ return 1;
+}
+
+static char *reint_names[] = {
+ [REINT_SETATTR] "setattr",
+ [REINT_CREATE] "create",
+ [REINT_LINK] "link",
+ [REINT_UNLINK] "unlink",
+ [REINT_RENAME] "rename"
+};
+
int mds_handle(struct ptlrpc_request *req)
{
int rc;
+ int should_process;
+ struct mds_obd *mds = NULL; /* quell gcc overwarning */
ENTRY;
rc = lustre_unpack_msg(req->rq_reqmsg, req->rq_reqlen);
GOTO(out, rc);
}
- if (req->rq_reqmsg->opc != MDS_CONNECT && req->rq_export == NULL)
- GOTO(out, rc = -ENOTCONN);
-
LASSERT(!strcmp(req->rq_obd->obd_type->typ_name, LUSTRE_MDT_NAME));
+ if (req->rq_reqmsg->opc != MDS_CONNECT) {
+ if (req->rq_export == NULL)
+ GOTO(out, rc = -ENOTCONN);
+
+ mds = mds_req2mds(req);
+ if (mds->mds_recoverable_clients != 0) {
+ rc = filter_recovery_request(req, mds, &should_process);
+ if (rc || !should_process)
+ RETURN(rc);
+ }
+ }
+
switch (req->rq_reqmsg->opc) {
case MDS_CONNECT:
- CDEBUG(D_INODE, "connect\n");
+ DEBUG_REQ(D_INODE, req, "connect");
OBD_FAIL_RETURN(OBD_FAIL_MDS_CONNECT_NET, 0);
rc = target_handle_connect(req);
+ /* Make sure that last_rcvd is correct. */
+ if (!rc) {
+ /* Now that we have an export, set mds. */
+ mds = mds_req2mds(req);
+ mds_fsync_super(mds->mds_sb);
+ }
break;
case MDS_DISCONNECT:
- CDEBUG(D_INODE, "disconnect\n");
+ DEBUG_REQ(D_INODE, req, "disconnect");
OBD_FAIL_RETURN(OBD_FAIL_MDS_DISCONNECT_NET, 0);
rc = target_handle_disconnect(req);
+ /* Make sure that last_rcvd is correct. */
+ if (!rc)
+ mds_fsync_super(mds->mds_sb);
goto out;
case MDS_GETSTATUS:
- CDEBUG(D_INODE, "getstatus\n");
+ DEBUG_REQ(D_INODE, req, "getstatus");
OBD_FAIL_RETURN(OBD_FAIL_MDS_GETSTATUS_NET, 0);
rc = mds_getstatus(req);
break;
case MDS_GETLOVINFO:
- CDEBUG(D_INODE, "getlovinfo\n");
+ DEBUG_REQ(D_INODE, req, "getlovinfo");
rc = mds_getlovinfo(req);
break;
case MDS_GETATTR:
- CDEBUG(D_INODE, "getattr\n");
+ DEBUG_REQ(D_INODE, req, "getattr");
OBD_FAIL_RETURN(OBD_FAIL_MDS_GETATTR_NET, 0);
rc = mds_getattr(0, req);
break;
case MDS_STATFS:
- CDEBUG(D_INODE, "statfs\n");
+ DEBUG_REQ(D_INODE, req, "statfs");
OBD_FAIL_RETURN(OBD_FAIL_MDS_STATFS_NET, 0);
rc = mds_statfs(req);
break;
case MDS_READPAGE:
- CDEBUG(D_INODE, "readpage\n");
+ DEBUG_REQ(D_INODE, req, "readpage\n");
OBD_FAIL_RETURN(OBD_FAIL_MDS_READPAGE_NET, 0);
rc = mds_readpage(req);
case MDS_REINT: {
int size = sizeof(struct mds_body);
- CDEBUG(D_INODE, "reint\n");
+ int opc = *(u32 *)lustre_msg_buf(req->rq_reqmsg, 0),
+ realopc = opc & REINT_OPCODE_MASK;
+
+ DEBUG_REQ(D_INODE, req, "reint (%s%s)",
+ reint_names[realopc],
+ opc & REINT_REPLAYING ? "|REPLAYING" : "");
+
OBD_FAIL_RETURN(OBD_FAIL_MDS_REINT_NET, 0);
rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen,
}
case MDS_OPEN:
- CDEBUG(D_INODE, "open\n");
+ DEBUG_REQ(D_INODE, req, "open");
OBD_FAIL_RETURN(OBD_FAIL_MDS_OPEN_NET, 0);
rc = mds_open(req);
break;
case MDS_CLOSE:
- CDEBUG(D_INODE, "close\n");
+ DEBUG_REQ(D_INODE, req, "close");
OBD_FAIL_RETURN(OBD_FAIL_MDS_CLOSE_NET, 0);
rc = mds_close(req);
break;
case LDLM_ENQUEUE:
- CDEBUG(D_INODE, "enqueue\n");
+ DEBUG_REQ(D_INODE, req, "enqueue");
OBD_FAIL_RETURN(OBD_FAIL_LDLM_ENQUEUE, 0);
rc = ldlm_handle_enqueue(req);
break;
case LDLM_CONVERT:
- CDEBUG(D_INODE, "convert\n");
+ DEBUG_REQ(D_INODE, req, "convert");
OBD_FAIL_RETURN(OBD_FAIL_LDLM_CONVERT, 0);
rc = ldlm_handle_convert(req);
break;
case LDLM_BL_CALLBACK:
case LDLM_CP_CALLBACK:
- CDEBUG(D_INODE, "callback\n");
+ DEBUG_REQ(D_INODE, req, "callback");
CERROR("callbacks should not happen on MDS\n");
LBUG();
OBD_FAIL_RETURN(OBD_FAIL_LDLM_BL_CALLBACK, 0);
if (!rc) {
struct mds_export_data *med = &req->rq_export->exp_mds_data;
- struct mds_obd *mds = mds_req2mds(req);
req->rq_repmsg->last_xid =
HTON__u64(le64_to_cpu(med->med_mcd->mcd_last_xid));
cpu_to_le32(req->rq_xid));
}
out:
- if (rc) {
+
+ if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_LAST_REPLAY) {
+ struct mds_obd *mds = mds_req2mds(req);
+ LASSERT(mds->mds_recoverable_clients);
+ DEBUG_REQ(D_HA, req, "LAST_REPLAY, queuing reply");
+ return mds_queue_final_reply(req, rc);
+ }
+
+ /* MDS_CONNECT / EALREADY (note: not -EALREADY!) isn't an error */
+ if (rc && (req->rq_reqmsg->opc != MDS_CONNECT ||
+ rc != EALREADY)) {
CERROR("mds: processing error (opcode %d): %d\n",
req->rq_reqmsg->opc, rc);
ptlrpc_error(req->rq_svc, req);
*
* Also assumes for mds_last_rcvd that we are not modifying it (no locking).
*/
-static
int mds_update_server_data(struct mds_obd *mds)
{
struct mds_server_data *msd = mds->mds_server_data;
}
/* Do recovery actions for the MDS */
-static int mds_recover(struct obd_device *obddev)
+static int mds_recovery_complete(struct obd_device *obddev)
{
struct mds_obd *mds = &obddev->u.mds;
struct obd_run_ctxt saved;
int rc;
+ LASSERT(mds->mds_recoverable_clients == 0);
+
/* This happens at the end when recovery is complete */
++mds->mds_mount_count;
push_ctxt(&saved, &mds->mds_ctxt, NULL);
if (!mds->mds_sb)
GOTO(err_put, rc = -ENODEV);
- spin_lock_init(&mds->mds_last_lock);
+ init_MUTEX(&mds->mds_transno_sem);
mds->mds_max_mdsize = sizeof(struct lov_mds_md);
rc = mds_fs_setup(obddev, mnt);
if (rc) {
GOTO(err_fs, rc = -ENOMEM);
}
-
- rc = mds_recover(obddev);
- if (rc)
- GOTO(err_fs, rc);
-
ptlrpc_init_client(LDLM_CB_REQUEST_PORTAL, LDLM_CB_REPLY_PORTAL,
"mds_ldlm_client", &obddev->obd_ldlm_client);
+ spin_lock_init(&mds->mds_processing_task_lock);
+ mds->mds_processing_task = 0;
+ INIT_LIST_HEAD(&mds->mds_recovery_queue);
+ INIT_LIST_HEAD(&mds->mds_delayed_reply_queue);
+
RETURN(0);
err_fs: