/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
* vim:expandtab:shiftwidth=8:tabstop=8:
*
- * Copyright (C) 2001, 2002 Cluster File Systems, Inc.
+ * Copyright (C) 2001-2003 Cluster File Systems, Inc.
*
* This file is part of Lustre, http://www.sf.net/projects/lustre/
*
* You should have received a copy of the GNU General Public License
* along with Lustre; if not, write to the Free Software
* Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
- *
*/
#define EXPORT_SYMTAB
#define DEBUG_SUBSYSTEM S_MDC
#include <linux/module.h>
+#include <linux/pagemap.h>
#include <linux/miscdevice.h>
#include <linux/lustre_mds.h>
#include <linux/lustre_lite.h>
#define REQUEST_MINOR 244
extern int mds_queue_req(struct ptlrpc_request *);
-extern struct lprocfs_vars status_var_nm_1[];
-extern struct lprocfs_vars status_class_var[];
+struct mdc_rpc_lock mdc_rpc_lock;
+EXPORT_SYMBOL(mdc_rpc_lock);
/* Helper that implements most of mdc_getstatus and signal_completed_replay. */
static int send_getstatus(struct obd_import *imp, struct ll_fid *rootfid,
body = lustre_msg_buf(req->rq_reqmsg, 0);
req->rq_level = level;
req->rq_replen = lustre_msg_size(1, &size);
-
+
mds_pack_req_body(req);
req->rq_reqmsg->flags |= msg_flags;
+ mdc_get_rpc_lock(&mdc_rpc_lock, NULL);
rc = ptlrpc_queue_wait(req);
+ mdc_put_rpc_lock(&mdc_rpc_lock, NULL);
if (!rc) {
body = lustre_msg_buf(req->rq_repmsg, 0);
size[0] = 512;
size[1] = 8192;
req->rq_replen = lustre_msg_size(2, size);
-
+ mdc_get_rpc_lock(&mdc_rpc_lock, NULL);
rc = ptlrpc_queue_wait(req);
+ mdc_put_rpc_lock(&mdc_rpc_lock, NULL);
out:
RETURN(rc);
if (!req)
GOTO(out, rc = -ENOMEM);
+ /* XXX FIXME bug 249 */
+ req->rq_request_portal = MDS_GETATTR_PORTAL;
+
body = lustre_msg_buf(req->rq_reqmsg, 0);
ll_ino2fid(&body->fid1, ino, 0, type);
body->valid = valid;
req->rq_replen = lustre_msg_size(bufcount, size);
mds_pack_req_body(req);
+ mdc_get_rpc_lock(&mdc_rpc_lock, NULL);
rc = ptlrpc_queue_wait(req);
-
+ mdc_put_rpc_lock(&mdc_rpc_lock, NULL);
if (!rc) {
body = lustre_msg_buf(req->rq_repmsg, 0);
mds_unpack_body(body);
CDEBUG(D_NET, "mode: %o\n", body->mode);
}
- EXIT;
+ GOTO(out, rc);
out:
*request = req;
return rc;
req->rq_replen = lustre_msg_size(bufcount, size);
mds_pack_req_body(req);
+ mdc_get_rpc_lock(&mdc_rpc_lock, NULL);
rc = ptlrpc_queue_wait(req);
-
+ mdc_put_rpc_lock(&mdc_rpc_lock, NULL);
if (!rc) {
body = lustre_msg_buf(req->rq_repmsg, 0);
mds_unpack_body(body);
return rc;
}
-void d_delete_aliases(struct inode *inode)
+/* This should be called with both the request and the reply still packed. */
+void mdc_store_inode_generation(struct ptlrpc_request *req, int reqoff,
+ int repoff)
{
- struct dentry *dentry = NULL;
- struct list_head *tmp;
- struct ll_sb_info *sbi = ll_i2sbi(inode);
- ENTRY;
-
- spin_lock(&dcache_lock);
- list_for_each(tmp, &inode->i_dentry) {
- dentry = list_entry(tmp, struct dentry, d_alias);
-
- list_del_init(&dentry->d_hash);
- list_add(&dentry->d_hash, &sbi->ll_orphan_dentry_list);
- }
+ struct mds_rec_create *rec = lustre_msg_buf(req->rq_reqmsg, reqoff);
+ struct mds_body *body = lustre_msg_buf(req->rq_repmsg, repoff);
- spin_unlock(&dcache_lock);
- EXIT;
+ memcpy(&rec->cr_replayfid, &body->fid1, sizeof rec->cr_replayfid);
+ DEBUG_REQ(D_HA, req, "storing generation %x for ino "LPD64,
+ rec->cr_replayfid.generation, rec->cr_replayfid.id);
}
static int mdc_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc,
- void *data, __u32 data_len, int flag)
+ void *data, int flag)
{
int rc;
struct lustre_handle lockh;
ENTRY;
+
switch (flag) {
case LDLM_CB_BLOCKING:
ldlm_lock2handle(lock, &lockh);
break;
case LDLM_CB_CANCELING: {
/* Invalidate all dentries associated with this inode */
- struct inode *inode;
+ struct inode *inode = lock->l_data;
LASSERT(data != NULL);
- LASSERT(data_len == sizeof(*inode));
/* XXX what tells us that 'data' is a valid inode at all?
* we should probably validate the lock handle first?
*/
- inode = igrab(data);
+
+ inode = igrab(inode);
if (inode == NULL) /* inode->i_state & I_FREEING */
break;
}
if (inode != inode->i_sb->s_root->d_inode)
- d_delete_aliases(inode);
+ d_unhash_aliases(inode);
iput(inode);
break;
RETURN(0);
}
-/* This should be called with both the request and the reply still packed. */
-void mdc_store_inode_generation(struct ptlrpc_request *req, int reqoff,
- int repoff)
-{
- struct mds_rec_create *rec = lustre_msg_buf(req->rq_reqmsg, reqoff);
- struct mds_body *body = lustre_msg_buf(req->rq_repmsg, repoff);
-
- memcpy(&rec->cr_replayfid, &body->fid1, sizeof rec->cr_replayfid);
- DEBUG_REQ(D_HA, req, "storing generation %x for ino "LPD64,
- rec->cr_replayfid.generation, rec->cr_replayfid.id);
-}
-
/* We always reserve enough space in the reply packet for a stripe MD, because
* we don't know in advance the file type.
*
{
struct ptlrpc_request *req;
struct obd_device *obddev = class_conn2obd(conn);
- __u64 res_id[RES_NAME_SIZE] = {dir->i_ino, (__u64)dir->i_generation};
+ struct ldlm_res_id res_id =
+ { .name = {dir->i_ino, dir->i_generation} };
int size[6] = {sizeof(struct ldlm_request), sizeof(struct ldlm_intent)};
int rc, flags = LDLM_FL_HAS_INTENT;
int repsize[3] = {sizeof(struct ldlm_reply),
sizeof(struct mds_body),
obddev->u.cli.cl_max_mds_easize};
+ struct mdc_unlink_data *d = data;
struct ldlm_reply *dlm_rep;
struct ldlm_intent *lit;
struct ldlm_request *lockreq;
LDLM_DEBUG_NOLOCK("mdsintent %s parent dir %lu",
ldlm_it2str(it->it_op), dir->i_ino);
- if (it->it_op & (IT_MKDIR | IT_CREAT | IT_SYMLINK | IT_MKNOD)) {
- switch (it->it_op) {
- case IT_MKDIR:
- it->it_mode |= S_IFDIR;
- break;
- case (IT_CREAT|IT_OPEN):
- case IT_CREAT:
- it->it_mode |= S_IFREG;
- break;
- case IT_SYMLINK:
- it->it_mode |= S_IFLNK;
- break;
- }
+ if (it->it_op & IT_OPEN) {
+ it->it_mode |= S_IFREG;
it->it_mode &= ~current->fs->umask;
size[2] = sizeof(struct mds_rec_create);
size[3] = de->d_name.len + 1;
- size[4] = tgtlen + 1;
- req = ptlrpc_prep_req(class_conn2cliimp(conn), LDLM_ENQUEUE, 5,
- size, NULL);
- if (!req)
- RETURN(-ENOMEM);
-
- /* pack the intent */
- lit = lustre_msg_buf(req->rq_reqmsg, 1);
- lit->opc = NTOH__u64((__u64)it->it_op);
-
- /* pack the intended request */
- mds_create_pack(req, 2, dir, it->it_mode, 0, current->fsuid,
- current->fsgid, CURRENT_TIME, de->d_name.name,
- de->d_name.len, tgt, tgtlen);
- req->rq_replen = lustre_msg_size(3, repsize);
- } else if (it->it_op == IT_RENAME2) {
- struct dentry *old_de = it->it_data;
-
- size[2] = sizeof(struct mds_rec_rename);
- size[3] = old_de->d_name.len + 1;
- size[4] = de->d_name.len + 1;
- req = ptlrpc_prep_req(class_conn2cliimp(conn), LDLM_ENQUEUE, 5,
- size, NULL);
- if (!req)
- RETURN(-ENOMEM);
-
- /* pack the intent */
- lit = lustre_msg_buf(req->rq_reqmsg, 1);
- lit->opc = NTOH__u64((__u64)it->it_op);
-
- /* pack the intended request */
- mds_rename_pack(req, 2, old_de->d_parent->d_inode, dir,
- old_de->d_name.name, old_de->d_name.len,
- de->d_name.name, de->d_name.len);
- req->rq_replen = lustre_msg_size(3, repsize);
- } else if (it->it_op == IT_LINK2) {
- struct dentry *old_de = it->it_data;
-
- size[2] = sizeof(struct mds_rec_link);
- size[3] = de->d_name.len + 1;
req = ptlrpc_prep_req(class_conn2cliimp(conn), LDLM_ENQUEUE, 4,
size, NULL);
if (!req)
RETURN(-ENOMEM);
+ req->rq_flags |= PTL_RPC_FL_REPLAY;
+
/* pack the intent */
lit = lustre_msg_buf(req->rq_reqmsg, 1);
lit->opc = NTOH__u64((__u64)it->it_op);
/* pack the intended request */
- mds_link_pack(req, 2, old_de->d_inode, dir,
- de->d_name.name, de->d_name.len);
+ mds_open_pack(req, 2, dir, it->it_mode, 0, current->fsuid,
+ current->fsgid, CURRENT_TIME, it->it_flags,
+ de->d_name.name, de->d_name.len, tgt, tgtlen);
req->rq_replen = lustre_msg_size(3, repsize);
- } else if (it->it_op == IT_UNLINK || it->it_op == IT_RMDIR) {
+ } else if (it->it_op & IT_UNLINK) {
size[2] = sizeof(struct mds_rec_unlink);
- size[3] = de->d_name.len + 1;
+ size[3] = d->unl_len + 1;
req = ptlrpc_prep_req(class_conn2cliimp(conn), LDLM_ENQUEUE, 4,
size, NULL);
if (!req)
lit->opc = NTOH__u64((__u64)it->it_op);
/* pack the intended request */
- mds_unlink_pack(req, 2, dir, NULL,
- it->it_op == IT_UNLINK ? S_IFREG : S_IFDIR,
- de->d_name.name, de->d_name.len);
-
+ mds_unlink_pack(req, 2, d->unl_dir,
+ d->unl_de, d->unl_mode,
+ d->unl_name, d->unl_len);
req->rq_replen = lustre_msg_size(3, repsize);
- } else if (it->it_op & (IT_GETATTR | IT_RENAME | IT_LINK |
- IT_OPEN | IT_SETATTR | IT_LOOKUP | IT_READLINK)) {
+ } else if (it->it_op & (IT_GETATTR| IT_SETATTR | IT_LOOKUP)) {
+ int valid = OBD_MD_FLNOTOBD | OBD_MD_FLEASIZE;
size[2] = sizeof(struct mds_body);
size[3] = de->d_name.len + 1;
lit->opc = NTOH__u64((__u64)it->it_op);
/* pack the intended request */
- mds_getattr_pack(req, 2, dir, de->d_name.name, de->d_name.len);
-
+ mds_getattr_pack(req, valid, 2, it->it_flags, dir,
+ de->d_name.name, de->d_name.len);
/* get ready for the reply */
req->rq_replen = lustre_msg_size(3, repsize);
} else if (it->it_op == IT_READDIR) {
/* get ready for the reply */
req->rq_replen = lustre_msg_size(1, repsize);
- } else {
+ } else {
LBUG();
RETURN(-EINVAL);
}
+ mdc_get_rpc_lock(&mdc_rpc_lock, it);
rc = ldlm_cli_enqueue(conn, req, obddev->obd_namespace, NULL, res_id,
lock_type, NULL, 0, lock_mode, &flags,
- ldlm_completion_ast, mdc_blocking_ast, data,
- datalen, lockh);
-
- if (it->it_op != IT_READDIR) {
- /* XXX This should become a lustre_msg flag, but for now... */
- __u32 *opp = lustre_msg_buf(req->rq_reqmsg, 2);
- *opp |= REINT_REPLAYING;
+ ldlm_completion_ast, mdc_blocking_ast, dir, NULL,
+ lockh);
+
+ /* If we successfully created, mark the request so that replay will
+ * do the right thing */
+ if (req->rq_transno) {
+ struct mds_rec_create *rec = lustre_msg_buf(req->rq_reqmsg, 2);
+ rec->cr_opcode |= REINT_REPLAYING;
}
-
- if (rc == -ENOENT) {
- /* This can go when we're sure that this can never happen */
- LBUG();
+ /* Similarly, if we're going to replay this request, we don't want to
+ * actually get a lock, just perform the intent. */
+ if (req->rq_transno || (req->rq_flags & PTL_RPC_FL_REPLAY)) {
+ lockreq = lustre_msg_buf(req->rq_reqmsg, 0);
+ lockreq->lock_flags |= LDLM_FL_INTENT_ONLY;
}
+
+ dlm_rep = lustre_msg_buf(req->rq_repmsg, 0);
+
+ /* This can go when we're sure that this can never happen */
+ LASSERT(rc != -ENOENT);
if (rc == ELDLM_LOCK_ABORTED) {
lock_mode = 0;
memset(lockh, 0, sizeof(*lockh));
- /* rc = 0 */
} else if (rc != 0) {
CERROR("ldlm_cli_enqueue: %d\n", rc);
RETURN(rc);
- } else {
- /* The server almost certainly gave us a lock other than the one
- * that we asked for. If we already have a matching lock, then
- * cancel this one--we don't need two. */
+ } else { /* rc = 0 */
struct ldlm_lock *lock = ldlm_handle2lock(lockh);
struct lustre_handle lockh2;
LASSERT(lock);
+ /* If the server gave us back a different lock mode, we should
+ * fix up our variables. */
+ if (lock->l_req_mode != lock_mode) {
+ ldlm_lock_addref(lockh, lock->l_req_mode);
+ ldlm_lock_decref(lockh, lock_mode);
+ lock_mode = lock->l_req_mode;
+ }
+
+ /* The server almost certainly gave us a lock other than the
+ * one that we asked for. If we already have a matching lock,
+ * then cancel this one--we don't need two. */
LDLM_DEBUG(lock, "matching against this");
memcpy(&lockh2, lockh, sizeof(lockh2));
- if (ldlm_lock_match(NULL, NULL, LDLM_PLAIN, NULL, 0, LCK_NL,
- &lockh2)) {
- /* We already have a lock; cancel the old one */
- ldlm_lock_decref(lockh, lock_mode);
- /* FIXME: bug 563 */
- //ldlm_cli_cancel(lockh);
+ if (ldlm_lock_match(NULL, LDLM_FL_BLOCK_GRANTED, NULL,
+ LDLM_PLAIN, NULL, 0, LCK_NL, &lockh2)) {
+ /* We already have a lock; cancel the new one */
+ ldlm_lock_decref_and_cancel(lockh, lock_mode);
memcpy(lockh, &lockh2, sizeof(lockh2));
}
LDLM_LOCK_PUT(lock);
}
- /* On replay, we don't want the lock granted. */
- lockreq = lustre_msg_buf(req->rq_reqmsg, 0);
- lockreq->lock_flags |= LDLM_FL_INTENT_ONLY;
-
- dlm_rep = lustre_msg_buf(req->rq_repmsg, 0);
it->it_disposition = (int) dlm_rep->lock_policy_res1;
it->it_status = (int) dlm_rep->lock_policy_res2;
it->it_lock_mode = lock_mode;
it->it_data = req;
- RETURN(0);
+ RETURN(rc);
+}
+
+void mdc_lock_set_inode(struct lustre_handle *lockh, struct inode *inode)
+{
+ struct ldlm_lock *lock = ldlm_handle2lock(lockh);
+ ENTRY;
+
+ LASSERT(lock != NULL);
+ lock->l_data = inode;
+ LDLM_LOCK_PUT(lock);
+ EXIT;
}
int mdc_cancel_unused(struct lustre_handle *conn, struct inode *inode,
int flags)
{
- __u64 res_id[RES_NAME_SIZE] = {inode->i_ino, inode->i_generation};
+ struct ldlm_res_id res_id =
+ { .name = {inode->i_ino, inode->i_generation} };
struct obd_device *obddev = class_conn2obd(conn);
ENTRY;
- RETURN(ldlm_cli_cancel_unused(obddev->obd_namespace, res_id, flags));
+ RETURN(ldlm_cli_cancel_unused(obddev->obd_namespace, &res_id, flags));
}
-struct replay_open_data {
- struct lustre_handle *fh;
-};
-
static void mdc_replay_open(struct ptlrpc_request *req)
{
- int offset;
- struct replay_open_data *saved;
+ struct lustre_handle old, *file_fh = req->rq_replay_data;
+ struct list_head *tmp;
struct mds_body *body = lustre_msg_buf(req->rq_repmsg, 0);
- if (lustre_msg_get_op_flags(req->rq_reqmsg) & MDS_OPEN_HAS_EA)
- offset = 2;
- else
- offset = 1;
-
- saved = lustre_msg_buf(req->rq_reqmsg, offset);
mds_unpack_body(body);
+ memcpy(&old, file_fh, sizeof(old));
CDEBUG(D_HA, "updating from "LPD64"/"LPD64" to "LPD64"/"LPD64"\n",
- saved->fh->addr, saved->fh->cookie,
- body->handle.addr, body->handle.cookie);
- memcpy(saved->fh, &body->handle, sizeof(body->handle));
+ file_fh->addr, file_fh->cookie, body->handle.addr,
+ body->handle.cookie);
+ memcpy(file_fh, &body->handle, sizeof(body->handle));
+
+ /* A few frames up, ptlrpc_replay holds the lock, so this is safe. */
+ list_for_each(tmp, &req->rq_import->imp_sending_list) {
+ req = list_entry(tmp, struct ptlrpc_request, rq_list);
+ if (req->rq_reqmsg->opc != MDS_CLOSE)
+ continue;
+ body = lustre_msg_buf(req->rq_reqmsg, 0);
+ if (memcmp(&body->handle, &old, sizeof(old)))
+ continue;
+
+ DEBUG_REQ(D_HA, req, "updating close body with new fh");
+ memcpy(&body->handle, file_fh, sizeof(*file_fh));
+ }
}
-/* If lmm is non-NULL and lmm_size is non-zero, the stripe MD is stored on
- * the MDS. Otherwise, we have already read a copy from the MDS (probably
- * during mdc_enqueue() and we do not need to send it to the MDS again.
- *
- * In the future (when we support the non-intent case) we need to be able
- * to read the stripe MD from the MDS here (need to fix mds_open() too).
- */
-int mdc_open(struct lustre_handle *conn, obd_id ino, int type, int flags,
- struct lov_mds_md *lmm, int lmm_size, struct lustre_handle *fh,
- struct ptlrpc_request **request)
+void mdc_set_open_replay_data(struct ll_file_data *fd)
{
- struct mds_body *body;
- struct replay_open_data *replay_data;
- int rc, size[3] = {sizeof(*body), sizeof(*replay_data)}, bufcount = 2;
- struct ptlrpc_request *req;
- ENTRY;
-
- if (lmm_size) {
- bufcount = 3;
- size[2] = size[1]; /* shuffle the replay data along */
- size[1] = lmm_size;
- }
-
- req = ptlrpc_prep_req(class_conn2cliimp(conn), MDS_OPEN, bufcount, size,
- NULL);
- if (!req)
- GOTO(out, rc = -ENOMEM);
-
- req->rq_flags |= PTL_RPC_FL_REPLAY;
- body = lustre_msg_buf(req->rq_reqmsg, 0);
-
- ll_ino2fid(&body->fid1, ino, 0, type);
- body->flags = HTON__u32(flags);
- memcpy(&body->handle, fh, sizeof(body->handle));
-
- if (lmm_size) {
- body->flags |= HTON__u32(OBD_MD_FLEASIZE);
- if (lmm) {
- CDEBUG(D_INODE, "sending %u bytes MD for ino "LPU64"\n",
- lmm_size, ino);
- lustre_msg_set_op_flags(req->rq_reqmsg,MDS_OPEN_HAS_EA);
- memcpy(lustre_msg_buf(req->rq_reqmsg,1), lmm, lmm_size);
- }
- }
-
- req->rq_replen = lustre_msg_size(1, size);
-
- rc = ptlrpc_queue_wait(req);
- if (!rc) {
- body = lustre_msg_buf(req->rq_repmsg, 0);
- mds_unpack_body(body);
- memcpy(fh, &body->handle, sizeof(*fh));
-
- /* If open is replayed, we need to fix up the fh. */
- req->rq_replay_cb = mdc_replay_open;
- replay_data = lustre_msg_buf(req->rq_reqmsg, lmm ? 2 : 1);
- replay_data->fh = fh;
- }
-
- EXIT;
- out:
- *request = req;
- return rc;
+ fd->fd_req->rq_replay_cb = mdc_replay_open;
+ fd->fd_req->rq_replay_data = &fd->fd_mdshandle;
}
int mdc_close(struct lustre_handle *conn, obd_id ino, int type,
int mdc_readpage(struct lustre_handle *conn, obd_id ino, int type, __u64 offset,
char *addr, struct ptlrpc_request **request)
{
- struct ptlrpc_connection *connection =
+ struct obd_import *imp = class_conn2cliimp(conn);
+ struct ptlrpc_connection *connection =
client_conn2cli(conn)->cl_import.imp_connection;
struct ptlrpc_request *req = NULL;
struct ptlrpc_bulk_desc *desc = NULL;
struct ptlrpc_bulk_page *bulk = NULL;
struct mds_body *body;
+ unsigned long flags;
int rc, size = sizeof(*body);
ENTRY;
if (desc == NULL)
GOTO(out, rc = -ENOMEM);
- req = ptlrpc_prep_req(class_conn2cliimp(conn), MDS_READPAGE, 1, &size,
- NULL);
+ req = ptlrpc_prep_req(imp, MDS_READPAGE, 1, &size, NULL);
if (!req)
GOTO(out2, rc = -ENOMEM);
bulk = ptlrpc_prep_bulk_page(desc);
- bulk->bp_buflen = PAGE_SIZE;
+ if (bulk == NULL)
+ GOTO(out2, rc = -ENOMEM);
+
+ spin_lock_irqsave(&imp->imp_lock, flags);
+ bulk->bp_xid = ++imp->imp_last_bulk_xid;
+ spin_unlock_irqrestore(&imp->imp_lock, flags);
+ bulk->bp_buflen = PAGE_CACHE_SIZE;
bulk->bp_buf = addr;
- bulk->bp_xid = req->rq_xid;
+
desc->bd_ptl_ev_hdlr = NULL;
desc->bd_portal = MDS_BULK_PORTAL;
- rc = ptlrpc_register_bulk(desc);
+ rc = ptlrpc_register_bulk_put(desc);
if (rc) {
CERROR("couldn't setup bulk sink: error %d.\n", rc);
GOTO(out2, rc);
}
- mds_readdir_pack(req, offset, ino, type);
+ mds_readdir_pack(req, offset, ino, type, bulk->bp_xid);
req->rq_replen = lustre_msg_size(1, &size);
rc = ptlrpc_queue_wait(req);
req->rq_replen = lustre_msg_size(1, &size);
+ mdc_get_rpc_lock(&mdc_rpc_lock, NULL);
rc = ptlrpc_queue_wait(req);
+ mdc_put_rpc_lock(&mdc_rpc_lock, NULL);
if (rc)
GOTO(out, rc);
static int mdc_attach(struct obd_device *dev, obd_count len, void *data)
{
- return lprocfs_reg_obd(dev, status_var_nm_1, dev);
+ struct lprocfs_static_vars lvars;
+
+ lprocfs_init_vars(&lvars);
+ return lprocfs_obd_attach(dev, lvars.obd_vars);
}
static int mdc_detach(struct obd_device *dev)
{
- return lprocfs_dereg_obd(dev);
+ return lprocfs_obd_detach(dev);
}
/* Send a mostly-dummy GETSTATUS request and indicate that we're done replay. */
static int signal_completed_replay(struct obd_import *imp)
{
struct ll_fid fid;
-
+
return send_getstatus(imp, &fid, LUSTRE_CONN_RECOVD, MSG_LAST_REPLAY);
}
int rc;
unsigned long flags;
struct ptlrpc_request *req;
+ struct ldlm_namespace *ns = imp->imp_obd->obd_namespace;
ENTRY;
switch(phase) {
case PTLRPC_RECOVD_PHASE_PREPARE:
- ldlm_cli_cancel_unused(imp->imp_obd->obd_namespace,
- NULL, LDLM_FL_LOCAL_ONLY);
+ ldlm_cli_cancel_unused(ns, NULL, LDLM_FL_LOCAL_ONLY);
RETURN(0);
+
+ case PTLRPC_RECOVD_PHASE_NOTCONN:
+ ldlm_namespace_cleanup(ns, 1);
+ ptlrpc_abort_inflight(imp, 0);
+ /* FALL THROUGH */
case PTLRPC_RECOVD_PHASE_RECOVER:
reconnect:
rc = ptlrpc_reconnect_import(imp, MDS_CONNECT, &req);
- /* We were still connected, just go about our business. */
- if (rc == EALREADY)
- GOTO(skip_replay, rc);
+ flags = req->rq_repmsg
+ ? lustre_msg_get_op_flags(req->rq_repmsg)
+ : 0;
+
+ if (rc == -EBUSY && (flags & MSG_CONNECT_RECOVERING))
+ CERROR("reconnect denied by recovery; should retry\n");
if (rc) {
- ptlrpc_req_finished(req);
- RETURN(rc);
- }
-
- /* We can't replay, which might be a problem. */
- if (!(lustre_msg_get_flags(req->rq_repmsg) &
- MSG_REPLAY_IN_PROGRESS)) {
if (phase != PTLRPC_RECOVD_PHASE_NOTCONN) {
- CERROR("can't replay, invalidating\n");
- ldlm_namespace_cleanup(imp->imp_obd->obd_namespace,
- 1);
- ptlrpc_abort_inflight(imp);
+ CERROR("can't reconnect, invalidating\n");
+ ldlm_namespace_cleanup(ns, 1);
+ ptlrpc_abort_inflight(imp, 0);
}
- goto skip_replay;
- }
-
- rc = ptlrpc_replay(imp);
- if (rc)
- RETURN(rc);
-
- rc = ldlm_replay_locks(imp);
- if (rc)
+ ptlrpc_req_finished(req);
RETURN(rc);
+ }
- rc = signal_completed_replay(imp);
- if (rc)
- RETURN(rc);
+ if (flags & MSG_CONNECT_RECOVERING) {
+ /* Replay if they want it. */
+ DEBUG_REQ(D_HA, req, "MDS wants replay");
+ rc = ptlrpc_replay(imp);
+ if (rc)
+ GOTO(check_rc, rc);
+
+ rc = ldlm_replay_locks(imp);
+ if (rc)
+ GOTO(check_rc, rc);
+
+ rc = signal_completed_replay(imp);
+ if (rc)
+ GOTO(check_rc, rc);
+ } else if (flags & MSG_CONNECT_RECONNECT) {
+ DEBUG_REQ(D_HA, req, "reconnecting to MDS\n");
+ /* Nothing else to do here. */
+ } else {
+ DEBUG_REQ(D_HA, req, "evicted: invalidating\n");
+ /* Otherwise, clean everything up. */
+ ldlm_namespace_cleanup(ns, 1);
+ ptlrpc_abort_inflight(imp, 0);
+ }
- skip_replay:
ptlrpc_req_finished(req);
spin_lock_irqsave(&imp->imp_lock, flags);
imp->imp_level = LUSTRE_CONN_FULL;
rc = ptlrpc_resend(imp);
if (rc)
- RETURN(rc);
+ GOTO(check_rc, rc);
RETURN(0);
-
- case PTLRPC_RECOVD_PHASE_NOTCONN:
- ldlm_namespace_cleanup(imp->imp_obd->obd_namespace, 1);
- ptlrpc_abort_inflight(imp);
- goto reconnect;
+ check_rc:
+ /* If we get disconnected in the middle, recovery has probably
+ * failed. Reconnect and find out.
+ */
+ if (rc == -ENOTCONN)
+ goto reconnect;
+ RETURN(rc);
default:
RETURN(-EINVAL);
}
static int mdc_connect(struct lustre_handle *conn, struct obd_device *obd,
- obd_uuid_t cluuid, struct recovd_obd *recovd,
+ struct obd_uuid *cluuid, struct recovd_obd *recovd,
ptlrpc_recovery_cb_t recover)
{
struct obd_import *imp = &obd->u.cli.cl_import;
static int __init ptlrpc_request_init(void)
{
- return class_register_type(&mdc_obd_ops, status_class_var,
+ struct lprocfs_static_vars lvars;
+ mdc_init_rpc_lock(&mdc_rpc_lock);
+ lprocfs_init_vars(&lvars);
+ return class_register_type(&mdc_obd_ops, lvars.module_vars,
LUSTRE_MDC_NAME);
}
class_unregister_type(LUSTRE_MDC_NAME);
}
-MODULE_AUTHOR("Cluster File Systems <info@clusterfs.com>");
-MODULE_DESCRIPTION("Lustre Metadata Client v1.0");
+MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
+MODULE_DESCRIPTION("Lustre Metadata Client");
MODULE_LICENSE("GPL");
-EXPORT_SYMBOL(d_delete_aliases);
EXPORT_SYMBOL(mdc_getstatus);
EXPORT_SYMBOL(mdc_getlovinfo);
EXPORT_SYMBOL(mdc_enqueue);
EXPORT_SYMBOL(mdc_readpage);
EXPORT_SYMBOL(mdc_setattr);
EXPORT_SYMBOL(mdc_close);
-EXPORT_SYMBOL(mdc_open);
+EXPORT_SYMBOL(mdc_lock_set_inode);
+EXPORT_SYMBOL(mdc_set_open_replay_data);
EXPORT_SYMBOL(mdc_store_inode_generation);