#include <linux/init.h>
#include <linux/obd_class.h>
#include <linux/random.h>
-#include <linux/locks.h>
#if (LINUX_VERSION_CODE >= KERNEL_VERSION(2,5,0))
+#include <linux/smp_lock.h>
#include <linux/buffer_head.h>
#include <linux/workqueue.h>
+#include <linux/mount.h>
+#else
+#include <linux/locks.h>
#endif
#include <linux/obd_lov.h>
#include <linux/lustre_mds.h>
extern int mds_get_lovtgts(struct mds_obd *obd, int tgt_count,
struct obd_uuid *uuidarray);
extern int mds_get_lovdesc(struct mds_obd *obd, struct lov_desc *desc);
-extern void mds_start_transno(struct mds_obd *mds);
-extern int mds_finish_transno(struct mds_obd *mds, void *handle,
- struct ptlrpc_request *req, int rc);
+int mds_finish_transno(struct mds_obd *mds, struct inode *i, void *handle,
+ struct ptlrpc_request *req, int rc, int disp);
static int mds_cleanup(struct obd_device * obddev);
inline struct mds_obd *mds_req2mds(struct ptlrpc_request *req)
return result;
}
-static void mds_abort_recovery(void *data);
/* Establish a connection to the MDS.
*
struct obd_export *exp;
struct mds_export_data *med;
struct mds_client_data *mcd;
- struct mds_obd *mds = &obd->u.mds;
int rc;
ENTRY;
RETURN(-EINVAL);
/* Check for aborted recovery. */
- spin_lock_bh(&mds->mds_processing_task_lock);
+ spin_lock_bh(&obd->obd_processing_task_lock);
if (obd->obd_flags & OBD_ABORT_RECOVERY)
- mds_abort_recovery(mds);
- spin_unlock_bh(&mds->mds_processing_task_lock);
+ target_abort_recovery(obd);
+ spin_unlock_bh(&obd->obd_processing_task_lock);
/* XXX There is a small race between checking the list and adding a
* new connection for the same UUID, but the real threat (list
mfd->mfd_servercookie = DEAD_HANDLE_MAGIC;
kmem_cache_free(mds_file_cache, mfd);
- if (file->f_dentry->d_parent)
+ if (file->f_dentry->d_parent) {
+ LASSERT(atomic_read(&file->f_dentry->d_parent->d_count));
de = dget(file->f_dentry->d_parent);
+ }
rc = filp_close(file, 0);
if (de)
l_dput(de);
spin_unlock(&med->med_open_lock);
ldlm_cancel_locks_for_export(export);
+ if (med->med_outstanding_reply) {
+ /* Fake the ack, so the locks get cancelled. */
+ med->med_outstanding_reply->rq_flags &= ~PTL_RPC_FL_WANT_ACK;
+ med->med_outstanding_reply->rq_flags |= PTL_RPC_FL_ERR;
+ wake_up(&med->med_outstanding_reply->rq_wait_for_rep);
+ med->med_outstanding_reply = NULL;
+ }
mds_client_free(export);
rc = class_disconnect(conn);
struct lustre_handle lockh;
int rc;
- LDLM_DEBUG0(lock, "already unused, calling ldlm_cli_cancel");
+ LDLM_DEBUG(lock, "already unused, calling ldlm_cli_cancel");
ldlm_lock2handle(lock, &lockh);
rc = ldlm_cli_cancel(&lockh);
if (rc < 0)
CERROR("ldlm_cli_cancel: %d\n", rc);
} else {
- LDLM_DEBUG0(lock, "Lock still has references, will be "
- "cancelled later");
+ LDLM_DEBUG(lock, "Lock still has references, will be "
+ "cancelled later");
}
RETURN(0);
}
rc = 0;
}
- return rc;
+ RETURN(rc);
}
static int mds_getattr_internal(struct obd_device *obd, struct dentry *dentry,
return(rc);
}
+/* This is more copy-and-paste from getattr_name than I'd like. */
+static void reconstruct_getattr_name(int offset, struct ptlrpc_request *req,
+ struct lustre_handle *client_lockh)
+{
+ struct mds_export_data *med = &req->rq_export->exp_mds_data;
+ struct mds_client_data *mcd = med->med_mcd;
+ struct obd_device *obd = req->rq_export->exp_obd;
+ struct mds_obd *mds = mds_req2mds(req);
+ struct dentry *parent, *child;
+ struct mds_body *body;
+ struct inode *dir;
+ struct obd_run_ctxt saved;
+ struct obd_ucred uc;
+ int namelen, rc = 0;
+ char *name;
+
+ req->rq_transno = mcd->mcd_last_transno;
+ req->rq_status = mcd->mcd_last_result;
+
+ if (med->med_outstanding_reply)
+ mds_steal_ack_locks(med, req);
+
+ if (req->rq_status)
+ return;
+
+ body = lustre_msg_buf(req->rq_reqmsg, offset);
+ name = lustre_msg_buf(req->rq_reqmsg, offset + 1);
+ namelen = req->rq_reqmsg->buflens[offset + 1];
+ /* requests were at offset 2, replies go back at 1 */
+ if (offset)
+ offset = 1;
+
+ uc.ouc_fsuid = body->fsuid;
+ uc.ouc_fsgid = body->fsgid;
+ uc.ouc_cap = body->capability;
+ uc.ouc_suppgid1 = body->suppgid;
+ uc.ouc_suppgid2 = -1;
+ push_ctxt(&saved, &mds->mds_ctxt, &uc);
+ parent = mds_fid2dentry(mds, &body->fid1, NULL);
+ LASSERT(!IS_ERR(parent));
+ dir = parent->d_inode;
+ LASSERT(dir);
+ child = lookup_one_len(name, parent, namelen - 1);
+ LASSERT(!IS_ERR(child));
+
+ if (!med->med_outstanding_reply) {
+ /* XXX need to enqueue client lock */
+ LBUG();
+ }
+
+ if (req->rq_repmsg == NULL)
+ mds_getattr_pack_msg(req, child->d_inode, offset);
+
+ rc = mds_getattr_internal(obd, child, req, body, offset);
+ LASSERT(!rc);
+ l_dput(child);
+ l_dput(parent);
+}
+
static int mds_getattr_name(int offset, struct ptlrpc_request *req,
struct lustre_handle *child_lockh)
{
- struct ldlm_intent *it = lustre_msg_buf(req->rq_reqmsg, 1);
- int lock_mode;
struct mds_obd *mds = mds_req2mds(req);
struct obd_device *obd = req->rq_export->exp_obd;
struct obd_run_ctxt saved;
struct obd_ucred uc;
struct ldlm_res_id child_res_id = { .name = {0} };
struct lustre_handle parent_lockh;
- int namelen, flags = 0, rc = 0;
+ int namelen, flags = 0, rc = 0, cleanup_phase = 0;
char *name;
ENTRY;
LASSERT(!strcmp(obd->obd_type->typ_name, "mds"));
+ MDS_CHECK_RESENT(req,
+ reconstruct_getattr_name(offset, req, child_lockh));
+
if (req->rq_reqmsg->bufcount <= offset + 1) {
LBUG();
- GOTO(out_pre_de, rc = -EINVAL);
+ GOTO(cleanup, rc = -EINVAL);
}
body = lustre_msg_buf(req->rq_reqmsg, offset);
uc.ouc_fsuid = body->fsuid;
uc.ouc_fsgid = body->fsgid;
uc.ouc_cap = body->capability;
- uc.ouc_suppgid = body->suppgid;
+ uc.ouc_suppgid1 = body->suppgid;
+ uc.ouc_suppgid2 = -1;
push_ctxt(&saved, &mds->mds_ctxt, &uc);
/* Step 1: Lookup/lock parent */
de = mds_fid2locked_dentry(obd, &body->fid1, NULL, LCK_PR,
&parent_lockh);
if (IS_ERR(de))
- GOTO(out_pre_de, rc = PTR_ERR(de));
+ GOTO(cleanup, rc = PTR_ERR(de));
dir = de->d_inode;
LASSERT(dir);
+ cleanup_phase = 1; /* parent dentry and lock */
+
CDEBUG(D_INODE, "parent ino %lu, name %*s\n", dir->i_ino,namelen,name);
/* Step 2: Lookup child */
dchild = lookup_one_len(name, de, namelen - 1);
if (IS_ERR(dchild)) {
CDEBUG(D_INODE, "child lookup error %ld\n", PTR_ERR(dchild));
- GOTO(out_step_1, rc = PTR_ERR(dchild));
- } else if (dchild->d_inode == NULL) {
- GOTO(out_step_2, rc = -ENOENT);
+ GOTO(cleanup, rc = PTR_ERR(dchild));
+ }
+
+ cleanup_phase = 2; /* child dentry */
+
+ if (dchild->d_inode == NULL) {
+ GOTO(cleanup, rc = -ENOENT);
}
/* Step 3: Lock child */
- if (it->opc == IT_SETATTR)
- lock_mode = LCK_PW;
- else
- lock_mode = LCK_PR;
child_res_id.name[0] = dchild->d_inode->i_ino;
child_res_id.name[1] = dchild->d_inode->i_generation;
rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace, NULL,
- child_res_id, LDLM_PLAIN, NULL, 0, lock_mode,
+ child_res_id, LDLM_PLAIN, NULL, 0, LCK_PR,
&flags, ldlm_completion_ast, mds_blocking_ast,
NULL, NULL, child_lockh);
if (rc != ELDLM_OK) {
CERROR("ldlm_cli_enqueue: %d\n", rc);
- GOTO(out_step_2, rc = -EIO);
+ GOTO(cleanup, rc = -EIO);
}
+ cleanup_phase = 3; /* child lock */
+
if (req->rq_repmsg == NULL)
mds_getattr_pack_msg(req, dchild->d_inode, offset);
rc = mds_getattr_internal(obd, dchild, req, body, offset);
- if (rc)
- GOTO(out_step_3, rc);
- GOTO(out_step_2, rc); /* returns the lock to the client */
- out_step_3:
- ldlm_lock_decref(child_lockh, LCK_PR);
- out_step_2:
- l_dput(dchild);
- out_step_1:
- ldlm_lock_decref(&parent_lockh, LCK_PR);
- l_dput(de);
- out_pre_de:
+ GOTO(cleanup, rc); /* returns the lock to the client */
+
+ cleanup:
+ rc = mds_finish_transno(mds, dchild ? dchild->d_inode : NULL, NULL,
+ req, rc, 0);
+ switch (cleanup_phase) {
+ case 3:
+ if (rc)
+ ldlm_lock_decref(child_lockh, LCK_PR);
+ case 2:
+ l_dput(dchild);
+
+ case 1:
+ if (rc) {
+ ldlm_lock_decref(&parent_lockh, LCK_PR);
+ } else {
+ memcpy(&req->rq_ack_locks[0].lock, &parent_lockh,
+ sizeof(parent_lockh));
+ req->rq_ack_locks[0].mode = LCK_PR;
+ }
+ l_dput(de);
+ default: ;
+ }
req->rq_status = rc;
pop_ctxt(&saved, &mds->mds_ctxt, &uc);
return rc;
uc.ouc_fsgid = body->fsgid;
uc.ouc_cap = body->capability;
push_ctxt(&saved, &mds->mds_ctxt, &uc);
- mds_start_transno(mds);
handle = fsfilt_start(obd, inode, FSFILT_OP_SETATTR);
if (IS_ERR(handle)) {
rc = PTR_ERR(handle);
- mds_finish_transno(mds, handle, req, rc);
GOTO(out_ea, rc);
}
rc = fsfilt_set_md(obd, inode,handle,lmm,lmm_size);
- rc = mds_finish_transno(mds, handle, req, rc);
-
- rc2 = fsfilt_commit(obd, inode, handle);
- if (rc2 && !rc)
- rc = rc2;
+ rc = mds_finish_transno(mds, inode, handle, req, rc, 0);
out_ea:
pop_ctxt(&saved, &mds->mds_ctxt, &uc);
#endif
+static void reconstruct_close(struct ptlrpc_request *req)
+{
+ struct mds_export_data *med = &req->rq_export->exp_mds_data;
+ struct mds_client_data *mcd = med->med_mcd;
+
+ req->rq_transno = mcd->mcd_last_transno;
+ req->rq_status = mcd->mcd_last_result;
+
+ /* XXX When open-unlink is working, we'll need to steal ack locks as
+ * XXX well, and make sure that we do the right unlinking after we
+ * XXX get the ack back.
+ */
+}
+
static int mds_close(struct ptlrpc_request *req)
{
struct mds_export_data *med = &req->rq_export->exp_mds_data;
int rc;
ENTRY;
+ MDS_CHECK_RESENT(req, reconstruct_close(req));
+
body = lustre_msg_buf(req->rq_reqmsg, 0);
mfd = mds_handle2mfd(&body->handle);
return rc;
}
-/* forward declaration */
-int mds_handle(struct ptlrpc_request *req);
-
-static void abort_delayed_replies(struct mds_obd *mds)
-{
- struct ptlrpc_request *req;
- struct list_head *tmp, *n;
- list_for_each_safe(tmp, n, &mds->mds_delayed_reply_queue) {
- req = list_entry(tmp, struct ptlrpc_request, rq_list);
- DEBUG_REQ(D_ERROR, req, "aborted:");
- req->rq_status = -ENOTCONN;
- req->rq_type = PTL_RPC_MSG_ERR;
- ptlrpc_reply(req->rq_svc, req);
- list_del(&req->rq_list);
- OBD_FREE(req, sizeof *req);
- }
-}
-
-static void mds_abort_recovery(void *data)
-{
- struct mds_obd *mds = data;
- struct obd_device *obd = list_entry(mds, struct obd_device, u.mds);
- CERROR("disconnecting clients and aborting recovery\n");
- mds->mds_recoverable_clients = 0;
- obd->obd_flags &= ~(OBD_RECOVERING | OBD_ABORT_RECOVERY);
- abort_delayed_replies(mds);
- spin_unlock_bh(&mds->mds_processing_task_lock);
- class_disconnect_all(obd);
- spin_lock_bh(&mds->mds_processing_task_lock);
-}
-
-static void mds_recovery_expired(unsigned long castmeharder)
-{
- struct mds_obd *mds = (struct mds_obd *)castmeharder;
- struct obd_device *obd = list_entry(mds, struct obd_device, u.mds);
- CERROR("recovery timed out, aborting\n");
- spin_lock_bh(&mds->mds_processing_task_lock);
- obd->obd_flags |= OBD_ABORT_RECOVERY;
- wake_up(&mds->mds_next_transno_waitq);
- spin_unlock_bh(&mds->mds_processing_task_lock);
-}
-
-static void reset_recovery_timer(struct mds_obd *mds)
-{
- CDEBUG(D_ERROR, "timer will expire in %ld seconds\n",
- MDS_RECOVERY_TIMEOUT / HZ);
- mod_timer(&mds->mds_recovery_timer, jiffies + MDS_RECOVERY_TIMEOUT);
-}
-
-static void start_recovery_timer(struct mds_obd *mds)
-{
- mds->mds_recovery_timer.function = mds_recovery_expired;
- mds->mds_recovery_timer.data = (unsigned long)mds;
- init_timer(&mds->mds_recovery_timer);
- reset_recovery_timer(mds);
-}
-
-static void cancel_recovery_timer(struct mds_obd *mds)
-{
- del_timer(&mds->mds_recovery_timer);
-}
-
-static int check_for_next_transno(struct mds_obd *mds)
-{
- struct ptlrpc_request *req;
- struct obd_device *obd = list_entry(mds, struct obd_device, u.mds);
- req = list_entry(mds->mds_recovery_queue.next,
- struct ptlrpc_request, rq_list);
- LASSERT(req->rq_reqmsg->transno >= mds->mds_next_recovery_transno);
-
- return req->rq_reqmsg->transno == mds->mds_next_recovery_transno ||
- (obd->obd_flags & OBD_RECOVERING) == 0;
-}
-
-static void process_recovery_queue(struct mds_obd *mds)
-{
- struct ptlrpc_request *req;
- struct obd_device *obd = list_entry(mds, struct obd_device, u.mds);
- int aborted = 0;
- ENTRY;
-
- for (;;) {
- spin_lock_bh(&mds->mds_processing_task_lock);
- LASSERT(mds->mds_processing_task == current->pid);
- req = list_entry(mds->mds_recovery_queue.next,
- struct ptlrpc_request, rq_list);
-
- if (req->rq_reqmsg->transno != mds->mds_next_recovery_transno) {
- spin_unlock_bh(&mds->mds_processing_task_lock);
- CDEBUG(D_HA, "Waiting for transno "LPD64" (1st is "
- LPD64")\n",
- mds->mds_next_recovery_transno,
- req->rq_reqmsg->transno);
- wait_event(mds->mds_next_transno_waitq,
- check_for_next_transno(mds));
- spin_lock_bh(&mds->mds_processing_task_lock);
- if (obd->obd_flags & OBD_ABORT_RECOVERY) {
- mds_abort_recovery(mds);
- aborted = 1;
- }
- spin_unlock_bh(&mds->mds_processing_task_lock);
- if (aborted)
- return;
- continue;
- }
- list_del_init(&req->rq_list);
- spin_unlock_bh(&mds->mds_processing_task_lock);
-
- DEBUG_REQ(D_ERROR, req, "processing: ");
- (void)mds_handle(req);
- reset_recovery_timer(mds);
- mds_fsync_super(mds->mds_sb);
- OBD_FREE(req, sizeof *req);
- spin_lock_bh(&mds->mds_processing_task_lock);
- mds->mds_next_recovery_transno++;
- if (list_empty(&mds->mds_recovery_queue)) {
- mds->mds_processing_task = 0;
- spin_unlock_bh(&mds->mds_processing_task_lock);
- break;
- }
- spin_unlock_bh(&mds->mds_processing_task_lock);
- }
- EXIT;
-}
-
-static int queue_recovery_request(struct ptlrpc_request *req,
- struct mds_obd *mds)
-{
- struct list_head *tmp;
- int inserted = 0;
- __u64 transno = req->rq_reqmsg->transno;
- struct ptlrpc_request *saved_req;
-
- if (!transno) {
- INIT_LIST_HEAD(&req->rq_list);
- DEBUG_REQ(D_HA, req, "not queueing");
- return 1;
- }
-
- spin_lock_bh(&mds->mds_processing_task_lock);
-
- if (mds->mds_processing_task == current->pid) {
- /* Processing the queue right now, don't re-add. */
- LASSERT(list_empty(&req->rq_list));
- spin_unlock_bh(&mds->mds_processing_task_lock);
- return 1;
- }
-
- OBD_ALLOC(saved_req, sizeof *saved_req);
- if (!saved_req)
- LBUG();
- memcpy(saved_req, req, sizeof *req);
- req = saved_req;
- INIT_LIST_HEAD(&req->rq_list);
-
- /* XXX O(n^2) */
- list_for_each(tmp, &mds->mds_recovery_queue) {
- struct ptlrpc_request *reqiter =
- list_entry(tmp, struct ptlrpc_request, rq_list);
-
- if (reqiter->rq_reqmsg->transno > transno) {
- list_add_tail(&req->rq_list, &reqiter->rq_list);
- inserted = 1;
- break;
- }
- }
-
- if (!inserted) {
- list_add_tail(&req->rq_list, &mds->mds_recovery_queue);
- }
-
- if (mds->mds_processing_task != 0) {
- /* Someone else is processing this queue, we'll leave it to
- * them.
- */
- if (transno == mds->mds_next_recovery_transno)
- wake_up(&mds->mds_next_transno_waitq);
- spin_unlock_bh(&mds->mds_processing_task_lock);
- return 0;
- }
-
- /* Nobody is processing, and we know there's (at least) one to process
- * now, so we'll do the honours.
- */
- mds->mds_processing_task = current->pid;
- spin_unlock_bh(&mds->mds_processing_task_lock);
-
- process_recovery_queue(mds);
- return 0;
-}
-
static int filter_recovery_request(struct ptlrpc_request *req,
- struct mds_obd *mds, int *process)
+ struct obd_device *obd, int *process)
{
switch (req->rq_reqmsg->opc) {
case MDS_CONNECT: /* This will never get here, but for completeness. */
case MDS_GETSTATUS: /* used in unmounting */
case MDS_REINT:
case LDLM_ENQUEUE:
- *process = queue_recovery_request(req, mds);
+ *process = target_queue_recovery_request(req, obd);
RETURN(0);
default:
}
}
-static int mds_queue_final_reply(struct ptlrpc_request *req, int rc)
-{
- struct mds_obd *mds = mds_req2mds(req);
- struct obd_device *mds_obd = list_entry(mds, struct obd_device, u.mds);
- struct ptlrpc_request *saved_req;
-
- spin_lock_bh(&mds->mds_processing_task_lock);
- if (rc) {
- /* Just like ptlrpc_error, but without the sending. */
- lustre_pack_msg(0, NULL, NULL, &req->rq_replen,
- &req->rq_repmsg);
- req->rq_type = PTL_RPC_MSG_ERR;
- }
-
- LASSERT(list_empty(&req->rq_list));
- OBD_ALLOC(saved_req, sizeof *saved_req);
- memcpy(saved_req, req, sizeof *saved_req);
- req = saved_req;
- list_add(&req->rq_list, &mds->mds_delayed_reply_queue);
- if (--mds->mds_recoverable_clients == 0) {
- struct list_head *tmp, *n;
- ldlm_reprocess_all_ns(req->rq_export->exp_obd->obd_namespace);
- CDEBUG(D_ERROR,
- "all clients recovered, sending delayed replies\n");
- mds_obd->obd_flags &= ~OBD_RECOVERING;
- list_for_each_safe(tmp, n, &mds->mds_delayed_reply_queue) {
- req = list_entry(tmp, struct ptlrpc_request, rq_list);
- DEBUG_REQ(D_ERROR, req, "delayed:");
- ptlrpc_reply(req->rq_svc, req);
- list_del(&req->rq_list);
- OBD_FREE(req, sizeof *req);
- }
- cancel_recovery_timer(mds);
- } else {
- CERROR("%d recoverable clients remain\n",
- mds->mds_recoverable_clients);
- }
-
- spin_unlock_bh(&mds->mds_processing_task_lock);
- return 1;
-}
-
static char *reint_names[] = {
[REINT_SETATTR] "setattr",
[REINT_CREATE] "create",
[REINT_OPEN] "open",
};
+void mds_steal_ack_locks(struct mds_export_data *med,
+ struct ptlrpc_request *req)
+{
+ struct ptlrpc_request *oldrep = med->med_outstanding_reply;
+ memcpy(req->rq_ack_locks, oldrep->rq_ack_locks,
+ sizeof req->rq_ack_locks);
+ oldrep->rq_flags |= PTL_RPC_FL_RESENT;
+ wake_up(&oldrep->rq_wait_for_rep);
+ DEBUG_REQ(D_HA, oldrep, "stole locks from");
+ DEBUG_REQ(D_HA, req, "stole locks for");
+}
+
+static void mds_send_reply(struct ptlrpc_request *req, int rc)
+{
+ int i;
+ struct ptlrpc_req_ack_lock *ack_lock;
+ struct l_wait_info lwi;
+ struct mds_export_data *med =
+ (req->rq_export && req->rq_ack_locks[0].mode) ?
+ &req->rq_export->exp_mds_data : NULL;
+
+ if (med) {
+ med->med_outstanding_reply = req;
+ req->rq_flags |= PTL_RPC_FL_WANT_ACK;
+ init_waitqueue_head(&req->rq_wait_for_rep);
+ }
+
+ if (!OBD_FAIL_CHECK(OBD_FAIL_MDS_ALL_REPLY_NET | OBD_FAIL_ONCE)) {
+ if (rc) {
+ DEBUG_REQ(D_ERROR, req, "processing error (%d)", rc);
+ ptlrpc_error(req->rq_svc, req);
+ } else {
+ DEBUG_REQ(D_NET, req, "sending reply");
+ ptlrpc_reply(req->rq_svc, req);
+ }
+ } else {
+ obd_fail_loc |= OBD_FAIL_ONCE | OBD_FAILED;
+ DEBUG_REQ(D_ERROR, req, "dropping reply");
+ if (!med && req->rq_repmsg)
+ OBD_FREE(req->rq_repmsg, req->rq_replen);
+ }
+
+ if (!med) {
+ DEBUG_REQ(D_HA, req, "not waiting for ack");
+ return;
+ }
+
+ lwi = LWI_TIMEOUT(obd_timeout / 2 * HZ, NULL, NULL);
+ rc = l_wait_event(req->rq_wait_for_rep,
+ (req->rq_flags & PTL_RPC_FL_WANT_ACK) == 0 ||
+ (req->rq_flags & PTL_RPC_FL_RESENT),
+ &lwi);
+
+ if (req->rq_flags & PTL_RPC_FL_RESENT) {
+ /* The client resent this request, so abort the
+ * waiting-ack portals stuff, and don't decref the
+ * locks.
+ */
+ DEBUG_REQ(D_HA, req, "resent: not cancelling locks");
+ ptlrpc_abort(req);
+ return;
+ }
+
+ if (rc == -ETIMEDOUT) {
+ ptlrpc_abort(req);
+ recovd_conn_fail(req->rq_export->exp_connection);
+ DEBUG_REQ(D_HA, req, "cancelling locks for timeout");
+ } else {
+ DEBUG_REQ(D_HA, req, "cancelling locks for ack");
+ }
+
+ med->med_outstanding_reply = NULL;
+
+ for (ack_lock = req->rq_ack_locks, i = 0; i < 4; i++, ack_lock++) {
+ if (!ack_lock->mode)
+ break;
+ ldlm_lock_decref(&ack_lock->lock, ack_lock->mode);
+ }
+}
+
int mds_handle(struct ptlrpc_request *req)
{
int should_process, rc;
struct mds_obd *mds = NULL; /* quell gcc overwarning */
- struct obd_device *mds_obd = NULL;
+ struct obd_device *obd = NULL;
ENTRY;
rc = lustre_unpack_msg(req->rq_reqmsg, req->rq_reqlen);
}
med = &req->rq_export->exp_mds_data;
- mds_obd = req->rq_export->exp_obd;
- mds = &mds_obd->u.mds;
- spin_lock_bh(&mds->mds_processing_task_lock);
- if (mds_obd->obd_flags & OBD_ABORT_RECOVERY)
- mds_abort_recovery(mds);
- spin_unlock_bh(&mds->mds_processing_task_lock);
-
- if (mds_obd->obd_flags & OBD_RECOVERING) {
- rc = filter_recovery_request(req, mds, &should_process);
+ obd = req->rq_export->exp_obd;
+ mds = &obd->u.mds;
+ spin_lock_bh(&obd->obd_processing_task_lock);
+ if (obd->obd_flags & OBD_ABORT_RECOVERY)
+ target_abort_recovery(obd);
+ spin_unlock_bh(&obd->obd_processing_task_lock);
+
+ if (obd->obd_flags & OBD_RECOVERING) {
+ rc = filter_recovery_request(req, obd, &should_process);
if (rc || !should_process)
RETURN(rc);
- } else if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT) {
- if (req->rq_xid == med->med_last_xid) {
- DEBUG_REQ(D_HA, req, "resending reply");
- OBD_ALLOC(req->rq_repmsg, med->med_last_replen);
- req->rq_replen = med->med_last_replen;
- memcpy(req->rq_repmsg, med->med_last_reply,
- req->rq_replen);
- ptlrpc_reply(req->rq_svc, req);
- return 0;
- }
- DEBUG_REQ(D_HA, req, "no reply for resend, continuing");
}
-
}
switch (req->rq_reqmsg->opc) {
case MDS_CONNECT:
DEBUG_REQ(D_INODE, req, "connect");
OBD_FAIL_RETURN(OBD_FAIL_MDS_CONNECT_NET, 0);
- rc = target_handle_connect(req);
+ rc = target_handle_connect(req, mds_handle);
/* Make sure that last_rcvd is correct. */
if (!rc) {
/* Now that we have an export, set mds. */
struct lustre_handle lockh;
DEBUG_REQ(D_INODE, req, "getattr_name");
OBD_FAIL_RETURN(OBD_FAIL_MDS_GETATTR_NAME_NET, 0);
+
+ /* If this request gets a reconstructed reply, we won't be
+ * acquiring any new locks in mds_getattr_name, so we don't
+ * want to cancel.
+ */
+ lockh.addr = 0;
rc = mds_getattr_name(0, req, &lockh);
- if (rc == 0)
+ if (rc == 0 && lockh.addr)
ldlm_lock_decref(&lockh, LCK_PR);
break;
}
break;
case MDS_READPAGE:
- DEBUG_REQ(D_INODE, req, "readpage\n");
+ DEBUG_REQ(D_INODE, req, "readpage");
OBD_FAIL_RETURN(OBD_FAIL_MDS_READPAGE_NET, 0);
rc = mds_readpage(req);
DEBUG_REQ(D_IOCTL, req,
"not sending last_committed update");
}
- CDEBUG(D_INFO, "last_transno %Lu, last_committed %Lu, xid %d\n",
- (unsigned long long)mds->mds_last_rcvd,
- (unsigned long long)obd->obd_last_committed,
- cpu_to_le32(req->rq_xid));
+ CDEBUG(D_INFO, "last_transno "LPU64", last_committed "LPU64
+ ", xid "LPU64"\n",
+ mds->mds_last_transno, obd->obd_last_committed,
+ NTOH__u64(req->rq_xid));
}
out:
if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_LAST_REPLAY) {
- if (mds_obd && (mds_obd->obd_flags & OBD_RECOVERING)) {
+ if (obd && (obd->obd_flags & OBD_RECOVERING)) {
DEBUG_REQ(D_HA, req, "LAST_REPLAY, queuing reply");
- return mds_queue_final_reply(req, rc);
+ return target_queue_final_reply(req, rc);
}
/* Lost a race with recovery; let the error path DTRT. */
rc = req->rq_status = -ENOTCONN;
}
- if (req->rq_export && mds_obd &&
- (mds_obd->obd_flags & OBD_RECOVERING) == 0) {
- struct mds_export_data *med = &req->rq_export->exp_mds_data;
- if (med->med_last_reply)
- OBD_FREE(med->med_last_reply, med->med_last_replen);
- OBD_ALLOC(med->med_last_reply, req->rq_replen);
- med->med_last_replen = req->rq_replen;
- med->med_last_xid = req->rq_xid;
- memcpy(med->med_last_reply, req->rq_repmsg, req->rq_replen);
- /* XXX serialize */
- }
-
- if (!OBD_FAIL_CHECK(OBD_FAIL_MDS_ALL_REPLY_NET | OBD_FAIL_ONCE)) {
- if (rc) {
- DEBUG_REQ(D_ERROR, req, "processing error (%d)", rc);
- ptlrpc_error(req->rq_svc, req);
- } else {
- DEBUG_REQ(D_NET, req, "sending reply");
- ptlrpc_reply(req->rq_svc, req);
- }
- } else {
- obd_fail_loc |= OBD_FAIL_ONCE | OBD_FAILED;
- DEBUG_REQ(D_ERROR, req, "dropping reply");
- if (req->rq_repmsg)
- OBD_FREE(req->rq_repmsg, req->rq_replen);
- }
-
+ mds_send_reply(req, rc);
return 0;
}
* then the server last_rcvd value may be less than that of the clients.
* This will alert us that we may need to do client recovery.
*
- * Also assumes for mds_last_rcvd that we are not modifying it (no locking).
+ * Also assumes for mds_last_transno that we are not modifying it (no locking).
*/
int mds_update_server_data(struct mds_obd *mds)
{
int rc;
push_ctxt(&saved, &mds->mds_ctxt, NULL);
- msd->msd_last_rcvd = cpu_to_le64(mds->mds_last_rcvd);
+ msd->msd_last_transno = cpu_to_le64(mds->mds_last_transno);
msd->msd_mount_count = cpu_to_le64(mds->mds_mount_count);
- CDEBUG(D_SUPER, "MDS mount_count is %Lu, last_rcvd is %Lu\n",
+ CDEBUG(D_SUPER, "MDS mount_count is %Lu, last_transno is %Lu\n",
(unsigned long long)mds->mds_mount_count,
- (unsigned long long)mds->mds_last_rcvd);
+ (unsigned long long)mds->mds_last_transno);
rc = lustre_fwrite(filp, (char *)msd, sizeof(*msd), &off);
if (rc != sizeof(*msd)) {
CERROR("error writing MDS server data: rc = %d\n", rc);
#if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
rc = fsync_dev(filp->f_dentry->d_inode->i_rdev);
#else
- rc = file_fsync(filp, filp->f_dentry, 1);
+ rc = file_fsync(filp, filp->f_dentry, 1);
#endif
if (rc)
CERROR("error flushing MDS server data: rc = %d\n", rc);
if (!mds->mds_sb)
GOTO(err_put, rc = -ENODEV);
- init_MUTEX(&mds->mds_transno_sem);
+ spin_lock_init(&mds->mds_transno_lock);
mds->mds_max_mdsize = sizeof(struct lov_mds_md);
rc = mds_fs_setup(obddev, mnt);
if (rc) {
GOTO(err_put, rc);
}
- if (obddev->obd_flags & OBD_RECOVERING)
- start_recovery_timer(mds);
-
obddev->obd_namespace =
ldlm_namespace_new("mds_server", LDLM_NAMESPACE_SERVER);
if (obddev->obd_namespace == NULL) {
ptlrpc_init_client(LDLM_CB_REQUEST_PORTAL, LDLM_CB_REPLY_PORTAL,
"mds_ldlm_client", &obddev->obd_ldlm_client);
- spin_lock_init(&mds->mds_processing_task_lock);
- mds->mds_processing_task = 0;
mds->mds_has_lov_desc = 0;
- INIT_LIST_HEAD(&mds->mds_recovery_queue);
- INIT_LIST_HEAD(&mds->mds_delayed_reply_queue);
- init_waitqueue_head(&mds->mds_next_transno_waitq);
RETURN(0);
lock_kernel();
err_ops:
fsfilt_put_ops(obddev->obd_fsops);
- RETURN(rc);
+ return rc;
}
static int mds_cleanup(struct obd_device *obddev)
RETURN(0);
}
+inline void fixup_handle_for_resent_req(struct ptlrpc_request *req,
+ struct lustre_handle *lockh)
+{
+ struct mds_export_data *med = &req->rq_export->exp_mds_data;
+ struct mds_client_data *mcd = med->med_mcd;
+ struct ptlrpc_request *oldrep = med->med_outstanding_reply;
+ struct ldlm_reply *dlm_rep;
+
+ if ((lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT) &&
+ (mcd->mcd_last_xid == req->rq_xid) && (oldrep != NULL)) {
+ DEBUG_REQ(D_HA, req, "restoring lock handle from %p", oldrep);
+ dlm_rep = lustre_msg_buf(oldrep->rq_repmsg, 0);
+ lockh->addr = dlm_rep->lock_handle.addr;
+ lockh->cookie = dlm_rep->lock_handle.cookie;
+ }
+}
+
static int ldlm_intent_policy(struct ldlm_namespace *ns,
struct ldlm_lock **lockp, void *req_cookie,
ldlm_mode_t mode, int flags, void *data)
rep = lustre_msg_buf(req->rq_repmsg, 0);
rep->lock_policy_res1 = IT_INTENT_EXEC;
+ fixup_handle_for_resent_req(req, &lockh);
+
/* execute policy */
switch ((long)it->opc) {
case IT_OPEN:
case IT_GETATTR:
case IT_LOOKUP:
case IT_READDIR:
- case IT_SETATTR:
rc = mds_getattr_name(offset, req, &lockh);
/* FIXME: we need to sit down and decide on who should
* set req->rq_status, who should return negative and
}
if (flags & LDLM_FL_INTENT_ONLY) {
- LDLM_DEBUG0(lock, "INTENT_ONLY, aborting lock");
+ LDLM_DEBUG(lock, "INTENT_ONLY, aborting lock");
RETURN(ELDLM_LOCK_ABORTED);
}
* to get. */
new_lock = ldlm_handle2lock(&lockh);
LASSERT(new_lock != NULL);
- mds_body = lustre_msg_buf(req->rq_repmsg, 1);
*lockp = new_lock;
+ rep->lock_policy_res2 = req->rq_status;
+
+ if (new_lock->l_export == req->rq_export) {
+ /* Already gave this to the client, which means that we
+ * reconstructed a reply. */
+ LASSERT(lustre_msg_get_flags(req->rq_reqmsg) &
+ MSG_RESENT);
+ RETURN(ELDLM_LOCK_REPLACED);
+ }
+
/* Fixup the lock to be given to the client */
l_lock(&new_lock->l_resource->lr_namespace->ns_lock);
LASSERT(new_lock->l_readers + new_lock->l_writers == 1);
LDLM_LOCK_PUT(new_lock);
l_unlock(&new_lock->l_resource->lr_namespace->ns_lock);
- rep->lock_policy_res2 = req->rq_status;
-
RETURN(ELDLM_LOCK_REPLACED);
} else {
int size = sizeof(struct ldlm_reply);
static int mdt_setup(struct obd_device *obddev, obd_count len, void *buf)
{
struct mds_obd *mds = &obddev->u.mds;
- struct obd_uuid uuid = { "self" };
int i, rc = 0;
ENTRY;
mds->mds_service = ptlrpc_init_svc(MDS_NEVENTS, MDS_NBUFS,
MDS_BUFSIZE, MDS_MAXREQSIZE,
MDS_REQUEST_PORTAL, MDC_REPLY_PORTAL,
- &uuid, mds_handle, "mds");
+ mds_handle, "mds");
if (!mds->mds_service) {
CERROR("failed to start service\n");
RETURN(rc = -ENOMEM);
}
}
- mds->mds_getattr_service =
+ mds->mds_setattr_service =
ptlrpc_init_svc(MDS_NEVENTS, MDS_NBUFS,
MDS_BUFSIZE, MDS_MAXREQSIZE,
- MDS_GETATTR_PORTAL, MDC_REPLY_PORTAL,
- &uuid, mds_handle, "mds");
- if (!mds->mds_getattr_service) {
+ MDS_SETATTR_PORTAL, MDC_REPLY_PORTAL,
+ mds_handle, "mds");
+ if (!mds->mds_setattr_service) {
CERROR("failed to start getattr service\n");
GOTO(err_thread, rc = -ENOMEM);
}
for (i = 0; i < MDT_NUM_THREADS; i++) {
char name[32];
sprintf(name, "ll_mdt_attr_%02d", i);
- rc = ptlrpc_start_thread(obddev, mds->mds_getattr_service,
+ rc = ptlrpc_start_thread(obddev, mds->mds_setattr_service,
name);
if (rc) {
- CERROR("cannot start MDT getattr thread #%d: rc %d\n",
+ CERROR("cannot start MDT setattr thread #%d: rc %d\n",
i, rc);
GOTO(err_thread2, rc);
}
}
+ mds->mds_readpage_service =
+ ptlrpc_init_svc(MDS_NEVENTS, MDS_NBUFS,
+ MDS_BUFSIZE, MDS_MAXREQSIZE,
+ MDS_READPAGE_PORTAL, MDC_REPLY_PORTAL,
+ mds_handle, "mds");
+ if (!mds->mds_readpage_service) {
+ CERROR("failed to start readpage service\n");
+ GOTO(err_thread2, rc = -ENOMEM);
+ }
+
+ for (i = 0; i < MDT_NUM_THREADS; i++) {
+ char name[32];
+ sprintf(name, "ll_mdt_rdpg_%02d", i);
+ rc = ptlrpc_start_thread(obddev, mds->mds_readpage_service,
+ name);
+ if (rc) {
+ CERROR("cannot start MDT readpage thread #%d: rc %d\n",
+ i, rc);
+ GOTO(err_thread3, rc);
+ }
+ }
+
RETURN(0);
+err_thread3:
+ ptlrpc_stop_all_threads(mds->mds_readpage_service);
+ ptlrpc_unregister_service(mds->mds_readpage_service);
err_thread2:
- ptlrpc_stop_all_threads(mds->mds_getattr_service);
- ptlrpc_unregister_service(mds->mds_getattr_service);
+ ptlrpc_stop_all_threads(mds->mds_setattr_service);
+ ptlrpc_unregister_service(mds->mds_setattr_service);
err_thread:
ptlrpc_stop_all_threads(mds->mds_service);
ptlrpc_unregister_service(mds->mds_service);
struct mds_obd *mds = &obddev->u.mds;
ENTRY;
- ptlrpc_stop_all_threads(mds->mds_getattr_service);
- ptlrpc_unregister_service(mds->mds_getattr_service);
+ ptlrpc_stop_all_threads(mds->mds_readpage_service);
+ ptlrpc_unregister_service(mds->mds_readpage_service);
+
+ ptlrpc_stop_all_threads(mds->mds_setattr_service);
+ ptlrpc_unregister_service(mds->mds_setattr_service);
ptlrpc_stop_all_threads(mds->mds_service);
ptlrpc_unregister_service(mds->mds_service);