X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=blobdiff_plain;f=lustre%2Fmds%2Fhandler.c;h=4f5f6e3e4d556340af75e764bf57106d815232eb;hp=d78ad53764b14f0c442b6de8bbd488432537e033;hb=4d477d1468cf4be4c37681610b3d726fd27f229f;hpb=ef1e48d8856d66a6b23bca3fe6ef9deb81a9b90d diff --git a/lustre/mds/handler.c b/lustre/mds/handler.c index d78ad53..4f5f6e3 100644 --- a/lustre/mds/handler.c +++ b/lustre/mds/handler.c @@ -8,6 +8,7 @@ * Author: Peter Braam * Author: Andreas Dilger * Author: Phil Schwan + * Author: Mike Shaver * * This file is part of Lustre, http://www.lustre.org. * @@ -46,8 +47,9 @@ static kmem_cache_t *mds_file_cache; extern int mds_get_lovtgts(struct mds_obd *obd, int tgt_count, obd_uuid_t *uuidarray); extern int mds_get_lovdesc(struct mds_obd *obd, struct lov_desc *desc); -extern int mds_update_last_rcvd(struct mds_obd *mds, void *handle, - struct ptlrpc_request *req); +extern void mds_start_transno(struct mds_obd *mds); +extern int mds_finish_transno(struct mds_obd *mds, void *handle, + struct ptlrpc_request *req, int rc); static int mds_cleanup(struct obd_device * obddev); extern struct lprocfs_vars status_var_nm_1[]; @@ -63,7 +65,7 @@ static int mds_bulk_timeout(void *data) struct ptlrpc_bulk_desc *desc = data; ENTRY; - CERROR("(not yet) starting recovery of client %p\n", desc->bd_client); + recovd_conn_fail(desc->bd_connection); RETURN(1); } @@ -113,7 +115,8 @@ static int mds_sendpage(struct ptlrpc_request *req, struct file *file, } lwi = LWI_TIMEOUT(obd_timeout * HZ, mds_bulk_timeout, desc); - rc = l_wait_event(desc->bd_waitq, desc->bd_flags & PTL_BULK_FL_SENT, &lwi); + rc = l_wait_event(desc->bd_waitq, desc->bd_flags & PTL_BULK_FL_SENT, + &lwi); if (rc) { if (rc != -ETIMEDOUT) LBUG(); @@ -301,27 +304,53 @@ static int mds_connect(struct lustre_handle *conn, struct obd_device *obd, CERROR("FYI: NULL mcd - simultaneous connects\n"); continue; } - if (!memcmp(cluuid, mcd->mcd_uuid, sizeof(mcd->mcd_uuid))) { + if (!memcmp(cluuid, mcd->mcd_uuid, sizeof mcd->mcd_uuid)) { + /* XXX make handle-found-export a subroutine */ LASSERT(exp->exp_obd == obd); - if (!list_empty(&exp->exp_conn_chain)) { - CERROR("existing uuid/export, list not empty!\n"); - spin_unlock(&obd->obd_dev_lock); + spin_unlock(&obd->obd_dev_lock); + if (exp->exp_connection) { + struct lustre_handle *hdl; + hdl = &exp->exp_ldlm_data.led_import.imp_handle; + /* Might be a re-connect after a partition. */ + if (!memcmp(conn, hdl, sizeof *conn)) { + CERROR("%s reconnecting\n", cluuid); + conn->addr = (__u64) (unsigned long)exp; + conn->cookie = exp->exp_cookie; + rc = EALREADY; + } else { + CERROR("%s reconnecting from %s, " + "handle mismatch (ours %Lx/%Lx, " + "theirs %Lx/%Lx)\n", cluuid, + exp->exp_connection-> + c_remote_uuid, hdl->addr, + hdl->cookie, conn->addr, + conn->cookie); + /* XXX disconnect them here? */ + memset(conn, 0, sizeof *conn); + rc = -EALREADY; + } MOD_DEC_USE_COUNT; - RETURN(-EALREADY); + RETURN(rc); } conn->addr = (__u64) (unsigned long)exp; conn->cookie = exp->exp_cookie; - spin_unlock(&obd->obd_dev_lock); CDEBUG(D_INFO, "existing export for UUID '%s' at %p\n", cluuid, exp); CDEBUG(D_IOCTL,"connect: addr %Lx cookie %Lx\n", (long long)conn->addr, (long long)conn->cookie); - MOD_DEC_USE_COUNT; RETURN(0); } } spin_unlock(&obd->obd_dev_lock); + + if (obd->u.mds.mds_recoverable_clients != 0) { + CERROR("denying connection for new client %s: in recovery\n", + cluuid); + MOD_DEC_USE_COUNT; + RETURN(-EBUSY); + } + /* XXX There is a small race between checking the list and adding a * new connection for the same UUID, but the real threat (list * corruption when multiple different clients connect) is solved. @@ -351,7 +380,7 @@ static int mds_connect(struct lustre_handle *conn, struct obd_device *obd, INIT_LIST_HEAD(&med->med_open_head); spin_lock_init(&med->med_open_lock); - rc = mds_client_add(med, -1); + rc = mds_client_add(&obd->u.mds, med, -1); if (rc) GOTO(out_mcd, rc); @@ -836,13 +865,16 @@ static int mds_store_md(struct mds_obd *mds, struct ptlrpc_request *req, uc.ouc_fsgid = body->fsgid; uc.ouc_cap = body->capability; push_ctxt(&saved, &mds->mds_ctxt, &uc); + mds_start_transno(mds); handle = mds_fs_start(mds, inode, MDS_FSOP_SETATTR); - if (!handle) - GOTO(out_ea, rc = -ENOMEM); + if (IS_ERR(handle)) { + rc = PTR_ERR(handle); + mds_finish_transno(mds, handle, req, rc); + GOTO(out_ea, rc); + } rc = mds_fs_set_md(mds, inode, handle, lmm, lmm_size); - if (!rc) - rc = mds_update_last_rcvd(mds, handle, req); + rc = mds_finish_transno(mds, handle, req, rc); rc2 = mds_fs_commit(mds, inode, handle); if (rc2 && !rc) @@ -1058,9 +1090,162 @@ int mds_reint(struct ptlrpc_request *req, int offset) return rc; } +/* forward declaration */ +int mds_handle(struct ptlrpc_request *req); + +static int check_for_next_transno(struct mds_obd *mds) +{ + struct ptlrpc_request *req; + req = list_entry(mds->mds_recovery_queue.next, + struct ptlrpc_request, rq_list); + return req->rq_reqmsg->transno == mds->mds_next_recovery_transno; +} + +static void process_recovery_queue(struct mds_obd *mds) +{ + struct ptlrpc_request *req; + + for (;;) { + spin_lock(&mds->mds_processing_task_lock); + req = list_entry(mds->mds_recovery_queue.next, + struct ptlrpc_request, rq_list); + + if (req->rq_reqmsg->transno != mds->mds_next_recovery_transno) { + spin_unlock(&mds->mds_processing_task_lock); + wait_event(mds->mds_next_transno_waitq, + check_for_next_transno(mds)); + continue; + } + list_del(&req->rq_list); + spin_unlock(&mds->mds_processing_task_lock); + + DEBUG_REQ(D_HA, req, ""); + mds_handle(req); + + if (list_empty(&mds->mds_recovery_queue)) + break; + } +} + +static int queue_recovery_request(struct ptlrpc_request *req, + struct mds_obd *mds) +{ + struct list_head *tmp; + int inserted = 0, transno = req->rq_reqmsg->transno; + + if (!transno) { + DEBUG_REQ(D_HA, req, "not queueing"); + return 1; + } + + spin_lock(&mds->mds_processing_task_lock); + + if (mds->mds_processing_task == current->pid) { + /* Processing the queue right now, don't re-add. */ + spin_unlock(&mds->mds_processing_task_lock); + return 1; + } + + /* XXX O(n^2) */ + list_for_each(tmp, &mds->mds_recovery_queue) { + struct ptlrpc_request *reqiter = + list_entry(tmp, struct ptlrpc_request, rq_list); + if (reqiter->rq_reqmsg->transno > transno) { + list_add_tail(&req->rq_list, &reqiter->rq_list); + inserted = 1; + break; + } + } + + if (!inserted) + list_add_tail(&req->rq_list, &mds->mds_recovery_queue); + + if (mds->mds_processing_task != 0) { + /* Someone else is processing this queue, we'll leave it to + * them. + */ + spin_unlock(&mds->mds_processing_task_lock); + if (transno == mds->mds_next_recovery_transno) + wake_up(&mds->mds_next_transno_waitq); + return 0; + } + + /* Nobody is processing, and we know there's (at least) one to process + * now, so we'll do the honours. + */ + mds->mds_processing_task = current->pid; + spin_unlock(&mds->mds_processing_task_lock); + + process_recovery_queue(mds); + return 0; +} + +static int filter_recovery_request(struct ptlrpc_request *req, + struct mds_obd *mds, int *process) +{ + switch (req->rq_reqmsg->opc) { + case MDS_CONNECT: + case MDS_DISCONNECT: + case MDS_OPEN: + *process = 1; + RETURN(0); + + case MDS_GETSTATUS: /* used in unmounting */ + case MDS_REINT: + case LDLM_ENQUEUE: + *process = queue_recovery_request(req, mds); + RETURN(0); + + default: + DEBUG_REQ(D_ERROR, req, "not permitted during recovery"); + *process = 0; + RETURN(ptlrpc_error(req->rq_svc, req)); + } +} + +static int mds_queue_final_reply(struct ptlrpc_request *req, int rc) +{ + struct mds_obd *mds = mds_req2mds(req); + + if (rc) { + /* Just like ptlrpc_error, but without the sending. */ + lustre_pack_msg(0, NULL, NULL, &req->rq_replen, + &req->rq_repmsg); + req->rq_type = PTL_RPC_MSG_ERR; + } + + list_add(&req->rq_list, &mds->mds_delayed_reply_queue); + if (--mds->mds_recoverable_clients == 0) { + struct list_head *tmp, *n; + + CDEBUG(D_HA, + "all clients recovered, sending delayed replies\n"); + list_for_each_safe(tmp, n, &mds->mds_delayed_reply_queue) { + req = list_entry(tmp, struct ptlrpc_request, rq_list); + DEBUG_REQ(D_HA, req, "delayed:"); + ptlrpc_reply(req->rq_svc, req); + } + } else { + CDEBUG(D_HA, "%d recoverable clients remain\n", + mds->mds_recoverable_clients); + } + + return 1; +} + +static char *reint_names[] = { + [REINT_SETATTR] "setattr", + [REINT_CREATE] "create", + [REINT_LINK] "link", + [REINT_UNLINK] "unlink", + [REINT_RENAME] "rename" +}; + int mds_handle(struct ptlrpc_request *req) { int rc; + int should_process; + struct mds_obd *mds = NULL; /* quell gcc overwarning */ ENTRY; rc = lustre_unpack_msg(req->rq_reqmsg, req->rq_reqlen); @@ -1069,49 +1254,67 @@ int mds_handle(struct ptlrpc_request *req) GOTO(out, rc); } - if (req->rq_reqmsg->opc != MDS_CONNECT && req->rq_export == NULL) - GOTO(out, rc = -ENOTCONN); - LASSERT(!strcmp(req->rq_obd->obd_type->typ_name, LUSTRE_MDT_NAME)); + if (req->rq_reqmsg->opc != MDS_CONNECT) { + if (req->rq_export == NULL) + GOTO(out, rc = -ENOTCONN); + + mds = mds_req2mds(req); + if (mds->mds_recoverable_clients != 0) { + rc = filter_recovery_request(req, mds, &should_process); + if (rc || !should_process) + RETURN(rc); + } + } + switch (req->rq_reqmsg->opc) { case MDS_CONNECT: - CDEBUG(D_INODE, "connect\n"); + DEBUG_REQ(D_INODE, req, "connect"); OBD_FAIL_RETURN(OBD_FAIL_MDS_CONNECT_NET, 0); rc = target_handle_connect(req); + /* Make sure that last_rcvd is correct. */ + if (!rc) { + /* Now that we have an export, set mds. */ + mds = mds_req2mds(req); + mds_fsync_super(mds->mds_sb); + } break; case MDS_DISCONNECT: - CDEBUG(D_INODE, "disconnect\n"); + DEBUG_REQ(D_INODE, req, "disconnect"); OBD_FAIL_RETURN(OBD_FAIL_MDS_DISCONNECT_NET, 0); rc = target_handle_disconnect(req); + /* Make sure that last_rcvd is correct. */ + if (!rc) + mds_fsync_super(mds->mds_sb); goto out; case MDS_GETSTATUS: - CDEBUG(D_INODE, "getstatus\n"); + DEBUG_REQ(D_INODE, req, "getstatus"); OBD_FAIL_RETURN(OBD_FAIL_MDS_GETSTATUS_NET, 0); rc = mds_getstatus(req); break; case MDS_GETLOVINFO: - CDEBUG(D_INODE, "getlovinfo\n"); + DEBUG_REQ(D_INODE, req, "getlovinfo"); rc = mds_getlovinfo(req); break; case MDS_GETATTR: - CDEBUG(D_INODE, "getattr\n"); + DEBUG_REQ(D_INODE, req, "getattr"); OBD_FAIL_RETURN(OBD_FAIL_MDS_GETATTR_NET, 0); rc = mds_getattr(0, req); break; case MDS_STATFS: - CDEBUG(D_INODE, "statfs\n"); + DEBUG_REQ(D_INODE, req, "statfs"); OBD_FAIL_RETURN(OBD_FAIL_MDS_STATFS_NET, 0); rc = mds_statfs(req); break; case MDS_READPAGE: - CDEBUG(D_INODE, "readpage\n"); + DEBUG_REQ(D_INODE, req, "readpage\n"); OBD_FAIL_RETURN(OBD_FAIL_MDS_READPAGE_NET, 0); rc = mds_readpage(req); @@ -1121,7 +1324,13 @@ int mds_handle(struct ptlrpc_request *req) case MDS_REINT: { int size = sizeof(struct mds_body); - CDEBUG(D_INODE, "reint\n"); + int opc = *(u32 *)lustre_msg_buf(req->rq_reqmsg, 0), + realopc = opc & REINT_OPCODE_MASK; + + DEBUG_REQ(D_INODE, req, "reint (%s%s)", + reint_names[realopc], + opc & REINT_REPLAYING ? "|REPLAYING" : ""); + OBD_FAIL_RETURN(OBD_FAIL_MDS_REINT_NET, 0); rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, @@ -1136,30 +1345,30 @@ int mds_handle(struct ptlrpc_request *req) } case MDS_OPEN: - CDEBUG(D_INODE, "open\n"); + DEBUG_REQ(D_INODE, req, "open"); OBD_FAIL_RETURN(OBD_FAIL_MDS_OPEN_NET, 0); rc = mds_open(req); break; case MDS_CLOSE: - CDEBUG(D_INODE, "close\n"); + DEBUG_REQ(D_INODE, req, "close"); OBD_FAIL_RETURN(OBD_FAIL_MDS_CLOSE_NET, 0); rc = mds_close(req); break; case LDLM_ENQUEUE: - CDEBUG(D_INODE, "enqueue\n"); + DEBUG_REQ(D_INODE, req, "enqueue"); OBD_FAIL_RETURN(OBD_FAIL_LDLM_ENQUEUE, 0); rc = ldlm_handle_enqueue(req); break; case LDLM_CONVERT: - CDEBUG(D_INODE, "convert\n"); + DEBUG_REQ(D_INODE, req, "convert"); OBD_FAIL_RETURN(OBD_FAIL_LDLM_CONVERT, 0); rc = ldlm_handle_convert(req); break; case LDLM_BL_CALLBACK: case LDLM_CP_CALLBACK: - CDEBUG(D_INODE, "callback\n"); + DEBUG_REQ(D_INODE, req, "callback"); CERROR("callbacks should not happen on MDS\n"); LBUG(); OBD_FAIL_RETURN(OBD_FAIL_LDLM_BL_CALLBACK, 0); @@ -1173,7 +1382,6 @@ int mds_handle(struct ptlrpc_request *req) if (!rc) { struct mds_export_data *med = &req->rq_export->exp_mds_data; - struct mds_obd *mds = mds_req2mds(req); req->rq_repmsg->last_xid = HTON__u64(le64_to_cpu(med->med_mcd->mcd_last_xid)); @@ -1185,7 +1393,17 @@ int mds_handle(struct ptlrpc_request *req) cpu_to_le32(req->rq_xid)); } out: - if (rc) { + + if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_LAST_REPLAY) { + struct mds_obd *mds = mds_req2mds(req); + LASSERT(mds->mds_recoverable_clients); + DEBUG_REQ(D_HA, req, "LAST_REPLAY, queuing reply"); + return mds_queue_final_reply(req, rc); + } + + /* MDS_CONNECT / EALREADY (note: not -EALREADY!) isn't an error */ + if (rc && (req->rq_reqmsg->opc != MDS_CONNECT || + rc != EALREADY)) { CERROR("mds: processing error (opcode %d): %d\n", req->rq_reqmsg->opc, rc); ptlrpc_error(req->rq_svc, req); @@ -1205,7 +1423,6 @@ int mds_handle(struct ptlrpc_request *req) * * Also assumes for mds_last_rcvd that we are not modifying it (no locking). */ -static int mds_update_server_data(struct mds_obd *mds) { struct mds_server_data *msd = mds->mds_server_data; @@ -1238,12 +1455,14 @@ int mds_update_server_data(struct mds_obd *mds) } /* Do recovery actions for the MDS */ -static int mds_recover(struct obd_device *obddev) +static int mds_recovery_complete(struct obd_device *obddev) { struct mds_obd *mds = &obddev->u.mds; struct obd_run_ctxt saved; int rc; + LASSERT(mds->mds_recoverable_clients == 0); + /* This happens at the end when recovery is complete */ ++mds->mds_mount_count; push_ctxt(&saved, &mds->mds_ctxt, NULL); @@ -1283,7 +1502,7 @@ static int mds_setup(struct obd_device *obddev, obd_count len, void *buf) if (!mds->mds_sb) GOTO(err_put, rc = -ENODEV); - spin_lock_init(&mds->mds_last_lock); + init_MUTEX(&mds->mds_transno_sem); mds->mds_max_mdsize = sizeof(struct lov_mds_md); rc = mds_fs_setup(obddev, mnt); if (rc) { @@ -1298,14 +1517,14 @@ static int mds_setup(struct obd_device *obddev, obd_count len, void *buf) GOTO(err_fs, rc = -ENOMEM); } - - rc = mds_recover(obddev); - if (rc) - GOTO(err_fs, rc); - ptlrpc_init_client(LDLM_CB_REQUEST_PORTAL, LDLM_CB_REPLY_PORTAL, "mds_ldlm_client", &obddev->obd_ldlm_client); + spin_lock_init(&mds->mds_processing_task_lock); + mds->mds_processing_task = 0; + INIT_LIST_HEAD(&mds->mds_recovery_queue); + INIT_LIST_HEAD(&mds->mds_delayed_reply_queue); + RETURN(0); err_fs: