#include <linux/init.h>
#include <linux/obd_class.h>
#include <linux/random.h>
+#include <linux/fs.h>
+#include <linux/jbd.h>
+#include <linux/ext3_fs.h>
#if (LINUX_VERSION_CODE >= KERNEL_VERSION(2,5,0))
-#include <linux/smp_lock.h>
-#include <linux/buffer_head.h>
-#include <linux/workqueue.h>
-#include <linux/mount.h>
-#else
-#include <linux/locks.h>
+# include <linux/smp_lock.h>
+# include <linux/buffer_head.h>
+# include <linux/workqueue.h>
+# include <linux/mount.h>
+#else
+# include <linux/locks.h>
#endif
#include <linux/obd_lov.h>
#include <linux/lustre_mds.h>
#include <linux/lustre_fsfilt.h>
#include <linux/lprocfs_status.h>
-
-kmem_cache_t *mds_file_cache;
+#include "mds_internal.h"
extern int mds_get_lovtgts(struct mds_obd *obd, int tgt_count,
struct obd_uuid *uuidarray);
extern int mds_get_lovdesc(struct mds_obd *obd, struct lov_desc *desc);
int mds_finish_transno(struct mds_obd *mds, struct inode *i, void *handle,
struct ptlrpc_request *req, int rc, int disp);
-static int mds_cleanup(struct obd_device * obddev);
+static int mds_cleanup(struct obd_device * obddev, int force, int failover);
inline struct mds_obd *mds_req2mds(struct ptlrpc_request *req)
{
static int mds_bulk_timeout(void *data)
{
struct ptlrpc_bulk_desc *desc = data;
+ struct obd_export *exp = desc->bd_export;
- ENTRY;
- recovd_conn_fail(desc->bd_connection);
+ CERROR("bulk send timed out: evicting %s@%s\n",
+ exp->exp_client_uuid.uuid,
+ exp->exp_connection->c_remote_uuid.uuid);
+ ptlrpc_fail_export(exp);
+ ptlrpc_abort_bulk (desc);
RETURN(1);
}
__u64 offset, __u64 xid)
{
struct ptlrpc_bulk_desc *desc;
- struct ptlrpc_bulk_page *bulk;
struct l_wait_info lwi;
- char *buf;
+ struct page *page;
int rc = 0;
ENTRY;
- desc = ptlrpc_prep_bulk(req->rq_connection);
+ LASSERT ((offset & (PAGE_CACHE_SIZE - 1)) == 0);
+
+ desc = ptlrpc_prep_bulk_exp (req, BULK_PUT_SOURCE, MDS_BULK_PORTAL);
if (desc == NULL)
GOTO(out, rc = -ENOMEM);
- bulk = ptlrpc_prep_bulk_page(desc);
- if (bulk == NULL)
+ LASSERT (PAGE_SIZE == PAGE_CACHE_SIZE);
+ page = alloc_pages (GFP_KERNEL, 0);
+ if (page == NULL)
GOTO(cleanup_bulk, rc = -ENOMEM);
- OBD_ALLOC(buf, PAGE_CACHE_SIZE);
- if (buf == NULL)
- GOTO(cleanup_bulk, rc = -ENOMEM);
+ rc = ptlrpc_prep_bulk_page(desc, page, 0, PAGE_CACHE_SIZE);
+ if (rc != 0)
+ GOTO(cleanup_buf, rc);
CDEBUG(D_EXT2, "reading %lu@"LPU64" from dir %lu (size %llu)\n",
PAGE_CACHE_SIZE, offset, file->f_dentry->d_inode->i_ino,
file->f_dentry->d_inode->i_size);
- rc = fsfilt_readpage(req->rq_export->exp_obd, file, buf,
+ rc = fsfilt_readpage(req->rq_export->exp_obd, file, page_address (page),
PAGE_CACHE_SIZE, (loff_t *)&offset);
if (rc != PAGE_CACHE_SIZE)
GOTO(cleanup_buf, rc = -EIO);
- bulk->bp_xid = xid;
- bulk->bp_buf = buf;
- bulk->bp_buflen = PAGE_CACHE_SIZE;
- desc->bd_ptl_ev_hdlr = NULL;
- desc->bd_portal = MDS_BULK_PORTAL;
-
rc = ptlrpc_bulk_put(desc);
if (rc)
GOTO(cleanup_buf, rc);
}
lwi = LWI_TIMEOUT(obd_timeout * HZ, mds_bulk_timeout, desc);
- rc = l_wait_event(desc->bd_waitq, desc->bd_flags & PTL_BULK_FL_SENT,
- &lwi);
+ rc = l_wait_event(desc->bd_waitq, ptlrpc_bulk_complete (desc), &lwi);
if (rc) {
- if (rc != -ETIMEDOUT)
- LBUG();
+ LASSERT (rc == -ETIMEDOUT);
GOTO(cleanup_buf, rc);
}
EXIT;
cleanup_buf:
- OBD_FREE(buf, PAGE_SIZE);
+ __free_pages (page, 0);
cleanup_bulk:
- ptlrpc_bulk_decref(desc);
+ ptlrpc_free_bulk (desc);
out:
return rc;
}
rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace, NULL,
res_id, LDLM_PLAIN, NULL, 0, lock_mode,
&flags, ldlm_completion_ast,
- mds_blocking_ast, NULL, NULL, lockh);
+ mds_blocking_ast, NULL, lockh);
if (rc != ELDLM_OK) {
l_dput(de);
retval = ERR_PTR(-ENOLCK); /* XXX translate ldlm code */
#endif
-
/* Look up an entry by inode number. */
/* this function ONLY returns valid dget'd dentries with an initialized inode
or errors */
struct dentry *mds_fid2dentry(struct mds_obd *mds, struct ll_fid *fid,
struct vfsmount **mnt)
{
- /* stolen from NFS */
- struct super_block *sb = mds->mds_sb;
+ char fid_name[32];
unsigned long ino = fid->id;
__u32 generation = fid->generation;
struct inode *inode;
- struct list_head *lp;
struct dentry *result;
if (ino == 0)
RETURN(ERR_PTR(-ESTALE));
- inode = iget(sb, ino);
- if (inode == NULL)
- RETURN(ERR_PTR(-ENOMEM));
+ snprintf(fid_name, sizeof(fid_name), "0x%lx", ino);
- CDEBUG(D_DENTRY, "--> mds_fid2dentry: sb %p\n", inode->i_sb);
+ /* under ext3 this is neither supposed to return bad inodes
+ nor NULL inodes. */
+ result = ll_lookup_one_len(fid_name, mds->mds_fid_de, strlen(fid_name));
+ if (IS_ERR(result))
+ RETURN(result);
- if (is_bad_inode(inode) ||
- (generation && inode->i_generation != generation)) {
+ inode = result->d_inode;
+ if (!inode)
+ RETURN(ERR_PTR(-ENOENT));
+
+ CDEBUG(D_DENTRY, "--> mds_fid2dentry: ino %lu, gen %u, sb %p\n",
+ inode->i_ino, inode->i_generation, inode->i_sb);
+
+ if (generation && inode->i_generation != generation) {
/* we didn't find the right inode.. */
- CERROR("bad inode %lu, link: %d ct: %d or version %u/%u\n",
+ CERROR("bad inode %lu, link: %d ct: %d or generation %u/%u\n",
inode->i_ino, inode->i_nlink,
atomic_read(&inode->i_count), inode->i_generation,
generation);
- iput(inode);
+ dput(result);
RETURN(ERR_PTR(-ENOENT));
}
- /* now to find a dentry. If possible, get a well-connected one */
- if (mnt)
+ if (mnt) {
*mnt = mds->mds_vfsmnt;
- spin_lock(&dcache_lock);
- list_for_each(lp, &inode->i_dentry) {
- result = list_entry(lp, struct dentry, d_alias);
- if (!(result->d_flags & DCACHE_DISCONNECTED)) {
- dget_locked(result);
- result->d_vfs_flags |= DCACHE_REFERENCED;
- spin_unlock(&dcache_lock);
- iput(inode);
- if (mnt)
- mntget(*mnt);
- return result;
- }
- }
- spin_unlock(&dcache_lock);
- result = d_alloc_root(inode);
- if (result == NULL) {
- iput(inode);
- return ERR_PTR(-ENOMEM);
- }
- if (mnt)
mntget(*mnt);
- result->d_flags |= DCACHE_DISCONNECTED;
- return result;
+ }
+
+ RETURN(result);
}
* on the server, etc.
*/
static int mds_connect(struct lustre_handle *conn, struct obd_device *obd,
- struct obd_uuid *cluuid, struct recovd_obd *recovd,
- ptlrpc_recovery_cb_t recover)
+ struct obd_uuid *cluuid)
{
struct obd_export *exp;
struct mds_export_data *med;
struct mds_client_data *mcd;
- int rc;
+ int rc, abort_recovery;
ENTRY;
if (!conn || !obd || !cluuid)
/* Check for aborted recovery. */
spin_lock_bh(&obd->obd_processing_task_lock);
- if (obd->obd_flags & OBD_ABORT_RECOVERY)
- target_abort_recovery(obd);
+ abort_recovery = obd->obd_abort_recovery;
spin_unlock_bh(&obd->obd_processing_task_lock);
+ if (abort_recovery)
+ target_abort_recovery(obd);
/* XXX There is a small race between checking the list and adding a
* new connection for the same UUID, but the real threat (list
exp = class_conn2export(conn);
LASSERT(exp);
med = &exp->exp_mds_data;
+ class_export_put(exp);
OBD_ALLOC(mcd, sizeof(*mcd));
if (!mcd) {
INIT_LIST_HEAD(&med->med_open_head);
spin_lock_init(&med->med_open_lock);
- rc = mds_client_add(&obd->u.mds, med, -1);
+ rc = mds_client_add(obd, &obd->u.mds, med, -1);
if (rc)
GOTO(out_mcd, rc);
out_mcd:
OBD_FREE(mcd, sizeof(*mcd));
out_export:
- class_disconnect(conn);
+ class_disconnect(conn, 0);
return rc;
}
+static void mds_mfd_addref(void *mfdp)
+{
+ struct mds_file_data *mfd = mfdp;
+
+ atomic_inc(&mfd->mfd_refcount);
+ CDEBUG(D_INFO, "GETting mfd %p : new refcount %d\n", mfd,
+ atomic_read(&mfd->mfd_refcount));
+}
+
+struct mds_file_data *mds_mfd_new(void)
+{
+ struct mds_file_data *mfd;
+
+ OBD_ALLOC(mfd, sizeof *mfd);
+ if (mfd == NULL) {
+ CERROR("mds: out of memory\n");
+ return NULL;
+ }
+
+ atomic_set(&mfd->mfd_refcount, 2);
+
+ INIT_LIST_HEAD(&mfd->mfd_handle.h_link);
+ class_handle_hash(&mfd->mfd_handle, mds_mfd_addref);
+
+ return mfd;
+}
+
+static struct mds_file_data *mds_handle2mfd(struct lustre_handle *handle)
+{
+ ENTRY;
+ LASSERT(handle != NULL);
+ RETURN(class_handle2object(handle->cookie));
+}
+
+void mds_mfd_put(struct mds_file_data *mfd)
+{
+ CDEBUG(D_INFO, "PUTting mfd %p : new refcount %d\n", mfd,
+ atomic_read(&mfd->mfd_refcount) - 1);
+ LASSERT(atomic_read(&mfd->mfd_refcount) > 0 &&
+ atomic_read(&mfd->mfd_refcount) < 0x5a5a);
+ if (atomic_dec_and_test(&mfd->mfd_refcount)) {
+ LASSERT(list_empty(&mfd->mfd_handle.h_link));
+ OBD_FREE(mfd, sizeof *mfd);
+ }
+}
+
+void mds_mfd_destroy(struct mds_file_data *mfd)
+{
+ class_handle_unhash(&mfd->mfd_handle);
+ mds_mfd_put(mfd);
+}
+
/* Call with med->med_open_lock held, please. */
-inline int mds_close_mfd(struct mds_file_data *mfd, struct mds_export_data *med)
+static int mds_close_mfd(struct mds_file_data *mfd, struct mds_export_data *med)
{
- struct file *file = mfd->mfd_file;
- int rc;
struct dentry *de = NULL;
- LASSERT(file->private_data == mfd);
-
- LASSERT(mfd->mfd_servercookie != DEAD_HANDLE_MAGIC);
+#ifdef CONFIG_SMP
+ LASSERT(spin_is_locked(&med->med_open_lock));
+#endif
list_del(&mfd->mfd_list);
- mfd->mfd_servercookie = DEAD_HANDLE_MAGIC;
- kmem_cache_free(mds_file_cache, mfd);
- if (file->f_dentry->d_parent) {
- LASSERT(atomic_read(&file->f_dentry->d_parent->d_count));
- de = dget(file->f_dentry->d_parent);
+ if (mfd->mfd_dentry->d_parent) {
+ LASSERT(atomic_read(&mfd->mfd_dentry->d_parent->d_count));
+ de = dget(mfd->mfd_dentry->d_parent);
}
- rc = filp_close(file, 0);
+
+ /* this is the actual "close" */
+ l_dput(mfd->mfd_dentry);
+
if (de)
l_dput(de);
- RETURN(rc);
+
+ mds_mfd_destroy(mfd);
+ RETURN(0);
}
-static int mds_disconnect(struct lustre_handle *conn)
+static int mds_disconnect(struct lustre_handle *conn, int failover)
{
struct obd_export *export = class_conn2export(conn);
- struct list_head *tmp, *n;
+ int rc;
+ unsigned long flags;
+ ENTRY;
+
+ ldlm_cancel_locks_for_export(export);
+
+ spin_lock_irqsave(&export->exp_lock, flags);
+ export->exp_failover = failover;
+ spin_unlock_irqrestore(&export->exp_lock, flags);
+
+ rc = class_disconnect(conn, failover);
+ class_export_put(export);
+
+ RETURN(rc);
+}
+
+static void mds_destroy_export(struct obd_export *export)
+{
struct mds_export_data *med = &export->exp_mds_data;
+ struct list_head *tmp, *n;
int rc;
+
ENTRY;
+ LASSERT(!strcmp(export->exp_obd->obd_type->typ_name,
+ LUSTRE_MDS_NAME));
/*
* Close any open files.
list_for_each_safe(tmp, n, &med->med_open_head) {
struct mds_file_data *mfd =
list_entry(tmp, struct mds_file_data, mfd_list);
- CERROR("force closing client file handle for %*s\n",
- mfd->mfd_file->f_dentry->d_name.len,
- mfd->mfd_file->f_dentry->d_name.name);
+#if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
+ struct dentry *dentry = mfd->mfd_dentry;
+ CERROR("force closing client file handle for %*s (%s:%lu)\n",
+ dentry->d_name.len, dentry->d_name.name,
+ kdevname(dentry->d_inode->i_sb->s_dev),
+ dentry->d_inode->i_ino);
+#endif
rc = mds_close_mfd(mfd, med);
if (rc)
CDEBUG(D_INODE, "Error closing file: %d\n", rc);
}
spin_unlock(&med->med_open_lock);
- ldlm_cancel_locks_for_export(export);
- if (med->med_outstanding_reply) {
+ if (export->exp_outstanding_reply) {
+ struct ptlrpc_request *req = export->exp_outstanding_reply;
+ unsigned long flags;
+
/* Fake the ack, so the locks get cancelled. */
- med->med_outstanding_reply->rq_flags &= ~PTL_RPC_FL_WANT_ACK;
- med->med_outstanding_reply->rq_flags |= PTL_RPC_FL_ERR;
- wake_up(&med->med_outstanding_reply->rq_wait_for_rep);
- med->med_outstanding_reply = NULL;
- }
- mds_client_free(export);
+ LBUG ();
+ /* Actually we can't do this because it prevents us knowing
+ * if the ACK callback ran or not */
+ spin_lock_irqsave (&req->rq_lock, flags);
+ req->rq_want_ack = 0;
+ req->rq_err = 1;
+ wake_up(&req->rq_wait_for_rep);
+ spin_unlock_irqrestore (&req->rq_lock, flags);
- rc = class_disconnect(conn);
+ export->exp_outstanding_reply = NULL;
+ }
- RETURN(rc);
+ if (!export->exp_failover)
+ mds_client_free(export);
+ EXIT;
}
/*
rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_GETSTATUS_PACK)) {
CERROR("mds: out of memory for message: size=%d\n", size);
- req->rq_status = -ENOMEM;
+ req->rq_status = -ENOMEM; /* superfluous? */
RETURN(-ENOMEM);
}
*/
mds_fsync_super(mds->mds_sb);
- body = lustre_msg_buf(req->rq_repmsg, 0);
+ body = lustre_msg_buf(req->rq_repmsg, 0, sizeof (*body));
memcpy(&body->fid1, &mds->mds_rootfid, sizeof(body->fid1));
/* the last_committed and last_xid fields are filled in for all
struct mds_obd *mds = mds_req2mds(req);
struct mds_status_req *streq;
struct lov_desc *desc;
+ struct obd_uuid *uuid0;
int tgt_count;
int rc, size[2] = {sizeof(*desc)};
ENTRY;
- streq = lustre_msg_buf(req->rq_reqmsg, 0);
- streq->flags = NTOH__u32(streq->flags);
- streq->repbuf = NTOH__u32(streq->repbuf);
+ streq = lustre_swab_reqbuf (req, 0, sizeof (*streq),
+ lustre_swab_mds_status_req);
+ if (streq == NULL) {
+ CERROR ("Can't unpack mds_status_req\n");
+ RETURN (-EFAULT);
+ }
+
+ if (streq->repbuf > LOV_MAX_UUID_BUFFER_SIZE) {
+ CERROR ("Illegal request for uuid array > %d\n",
+ streq->repbuf);
+ RETURN (-EINVAL);
+ }
size[1] = streq->repbuf;
rc = lustre_pack_msg(2, size, NULL, &req->rq_replen, &req->rq_repmsg);
if (rc) {
CERROR("mds: out of memory for message: size=%d\n", size[1]);
- req->rq_status = -ENOMEM;
RETURN(-ENOMEM);
}
RETURN(0);
}
- desc = lustre_msg_buf(req->rq_repmsg, 0);
- memcpy(desc, &mds->mds_lov_desc, sizeof *desc);
- lov_packdesc(desc);
- tgt_count = le32_to_cpu(desc->ld_tgt_count);
- if (tgt_count * sizeof(struct obd_uuid) > streq->repbuf) {
+ /* XXX We're sending the lov_desc in my byte order.
+ * Receiver will swab... */
+ desc = lustre_msg_buf(req->rq_repmsg, 0, sizeof (*desc));
+ memcpy(desc, &mds->mds_lov_desc, sizeof (*desc));
+
+ tgt_count = mds->mds_lov_desc.ld_tgt_count;
+ uuid0 = lustre_msg_buf (req->rq_repmsg, 1,
+ tgt_count * sizeof (*uuid0));
+ if (uuid0 == NULL) {
CERROR("too many targets, enlarge client buffers\n");
req->rq_status = -ENOSPC;
RETURN(0);
}
- rc = mds_get_lovtgts(mds, tgt_count,
- lustre_msg_buf(req->rq_repmsg, 1));
+ rc = mds_get_lovtgts(mds, tgt_count, uuid0);
if (rc) {
CERROR("get_lovtgts error %d\n", rc);
req->rq_status = rc;
{
struct mds_obd *mds = &obd->u.mds;
struct lov_mds_md *lmm;
- int lmm_size = msg->buflens[offset];
+ int lmm_size;
int rc;
ENTRY;
- if (lmm_size == 0) {
+ lmm = lustre_msg_buf(msg, offset, 0);
+ if (lmm == NULL) {
+ /* Some problem with getting eadata when I sized the reply
+ * buffer... */
CDEBUG(D_INFO, "no space reserved for inode %lu MD\n",
inode->i_ino);
RETURN(0);
}
-
- lmm = lustre_msg_buf(msg, offset);
+ lmm_size = msg->buflens[offset];
/* I don't really like this, but it is a sanity check on the client
* MD request. However, if the client doesn't know how much space
// RETURN(-EINVAL);
}
- /* We don't need to store the reply size, because this buffer is
- * discarded right after unpacking, and the LOV can figure out the
- * size itself from the ost count.
- */
- if ((rc = fsfilt_get_md(obd, inode, lmm, lmm_size)) < 0) {
- CDEBUG(D_INFO, "No md for ino %lu: rc = %d\n",
- inode->i_ino, rc);
+ rc = fsfilt_get_md(obd, inode, lmm, lmm_size);
+ if (rc < 0) {
+ CERROR ("Error %d reading eadata for ino %lu\n",
+ rc, inode->i_ino);
} else if (rc > 0) {
body->valid |= OBD_MD_FLEASIZE;
+ body->eadatasize = rc;
rc = 0;
}
if (inode == NULL)
RETURN(-ENOENT);
- body = lustre_msg_buf(req->rq_repmsg, reply_off);
+ body = lustre_msg_buf(req->rq_repmsg, reply_off, sizeof (*body));
+ LASSERT (body != NULL); /* caller prepped reply */
mds_pack_inode2fid(&body->fid1, inode);
mds_pack_inode2body(body, inode);
- if (S_ISREG(inode->i_mode) && reqbody->valid & OBD_MD_FLEASIZE) {
+ if (S_ISREG(inode->i_mode) &&
+ (reqbody->valid & OBD_MD_FLEASIZE) != 0) {
rc = mds_pack_md(obd, req->rq_repmsg, reply_off + 1,
body, inode);
- } else if (S_ISLNK(inode->i_mode) && reqbody->valid & OBD_MD_LINKNAME) {
- char *symname = lustre_msg_buf(req->rq_repmsg, reply_off + 1);
- int len = req->rq_repmsg->buflens[reply_off + 1];
+ } else if (S_ISLNK(inode->i_mode) &&
+ (reqbody->valid & OBD_MD_LINKNAME) != 0) {
+ char *symname = lustre_msg_buf(req->rq_repmsg, reply_off + 1, 0);
+ int len;
+
+ LASSERT (symname != NULL); /* caller prepped reply */
+ len = req->rq_repmsg->buflens[reply_off + 1];
rc = inode->i_op->readlink(dentry, symname, len);
if (rc < 0) {
CERROR("readlink failed: %d\n", rc);
+ } else if (rc != len - 1) {
+ CERROR ("Unexpected readlink rc %d: expecting %d\n",
+ rc, len - 1);
+ rc = -EINVAL;
} else {
CDEBUG(D_INODE, "read symlink dest %s\n", symname);
body->valid |= OBD_MD_LINKNAME;
+ body->eadatasize = rc + 1;
+ symname[rc] = 0; /* NULL terminate */
rc = 0;
}
}
int rc = 0, size[2] = {sizeof(*body)}, bufcount = 1;
ENTRY;
- body = lustre_msg_buf(req->rq_reqmsg, offset);
+ body = lustre_msg_buf(req->rq_reqmsg, offset, sizeof (*body));
+ LASSERT (body != NULL); /* checked by caller */
+ LASSERT_REQSWABBED (req, offset); /* swabbed by caller */
- if (S_ISREG(inode->i_mode) && body->valid & OBD_MD_FLEASIZE) {
+ if (S_ISREG(inode->i_mode) &&
+ (body->valid & OBD_MD_FLEASIZE) != 0) {
int rc = fsfilt_get_md(req->rq_export->exp_obd, inode, NULL, 0);
CDEBUG(D_INODE, "got %d bytes MD data for inode %lu\n",
rc, inode->i_ino);
} else
size[bufcount] = rc;
bufcount++;
- } else if (body->valid & OBD_MD_LINKNAME) {
- size[bufcount] = MIN(inode->i_size + 1, body->size);
+ } else if (S_ISLNK (inode->i_mode) &&
+ (body->valid & OBD_MD_LINKNAME) != 0) {
+ if (inode->i_size + 1 != body->eadatasize)
+ CERROR ("symlink size: %Lu, reply space: %d\n",
+ inode->i_size + 1, body->eadatasize);
+ size[bufcount] = MIN(inode->i_size + 1, body->eadatasize);
bufcount++;
- CDEBUG(D_INODE, "symlink size: %Lu, reply space: "LPU64"\n",
- inode->i_size + 1, body->size);
+ CDEBUG(D_INODE, "symlink size: %Lu, reply space: %d\n",
+ inode->i_size + 1, body->eadatasize);
}
if (OBD_FAIL_CHECK(OBD_FAIL_MDS_GETATTR_PACK)) {
static void reconstruct_getattr_name(int offset, struct ptlrpc_request *req,
struct lustre_handle *client_lockh)
{
- struct mds_export_data *med = &req->rq_export->exp_mds_data;
- struct mds_client_data *mcd = med->med_mcd;
struct obd_device *obd = req->rq_export->exp_obd;
struct mds_obd *mds = mds_req2mds(req);
struct dentry *parent, *child;
int namelen, rc = 0;
char *name;
- req->rq_transno = mcd->mcd_last_transno;
- req->rq_status = mcd->mcd_last_result;
-
- if (med->med_outstanding_reply)
- mds_steal_ack_locks(med, req);
+ if (req->rq_export->exp_outstanding_reply)
+ mds_steal_ack_locks(req->rq_export, req);
- if (req->rq_status)
- return;
+ body = lustre_msg_buf(req->rq_reqmsg, offset, sizeof (*body));
+ LASSERT (body != NULL); /* checked by caller */
+ LASSERT_REQSWABBED (req, offset); /* swabbed by caller */
- body = lustre_msg_buf(req->rq_reqmsg, offset);
- name = lustre_msg_buf(req->rq_reqmsg, offset + 1);
+ name = lustre_msg_string(req->rq_reqmsg, offset + 1, 0);
+ LASSERT (name != NULL); /* checked by caller */
+ LASSERT_REQSWABBED (req, offset + 1); /* swabbed by caller */
namelen = req->rq_reqmsg->buflens[offset + 1];
+
+ LASSERT (offset == 2 || offset == 0);
/* requests were at offset 2, replies go back at 1 */
if (offset)
offset = 1;
LASSERT(!IS_ERR(parent));
dir = parent->d_inode;
LASSERT(dir);
- child = lookup_one_len(name, parent, namelen - 1);
+ child = ll_lookup_one_len(name, parent, namelen - 1);
LASSERT(!IS_ERR(child));
- if (!med->med_outstanding_reply) {
- /* XXX need to enqueue client lock */
- LBUG();
+ if (req->rq_repmsg == NULL) {
+ rc = mds_getattr_pack_msg(req, child->d_inode, offset);
+ /* XXX need to handle error here */
+ LASSERT (rc == 0);
}
- if (req->rq_repmsg == NULL)
- mds_getattr_pack_msg(req, child->d_inode, offset);
-
rc = mds_getattr_internal(obd, child, req, body, offset);
- LASSERT(!rc);
+ req->rq_status = rc;
l_dput(child);
l_dput(parent);
}
struct obd_ucred uc;
struct ldlm_res_id child_res_id = { .name = {0} };
struct lustre_handle parent_lockh;
- int namelen, flags = 0, rc = 0, cleanup_phase = 0;
+ int namesize;
+ int flags = 0, rc = 0, cleanup_phase = 0, req_was_resent;
char *name;
ENTRY;
LASSERT(!strcmp(obd->obd_type->typ_name, "mds"));
- MDS_CHECK_RESENT(req,
- reconstruct_getattr_name(offset, req, child_lockh));
+ /* Swab now, before anyone looks inside the request */
- if (req->rq_reqmsg->bufcount <= offset + 1) {
- LBUG();
- GOTO(cleanup, rc = -EINVAL);
+ body = lustre_swab_reqbuf (req, offset, sizeof (*body),
+ lustre_swab_mds_body);
+ if (body == NULL) {
+ CERROR ("Can't swab mds_body\n");
+ GOTO (cleanup, rc = -EFAULT);
}
- body = lustre_msg_buf(req->rq_reqmsg, offset);
- name = lustre_msg_buf(req->rq_reqmsg, offset + 1);
- namelen = req->rq_reqmsg->buflens[offset + 1];
- /* requests were at offset 2, replies go back at 1 */
+ LASSERT_REQSWAB (req, offset + 1);
+ name = lustre_msg_string (req->rq_reqmsg, offset + 1, 0);
+ if (name == NULL) {
+ CERROR ("Can't unpack name\n");
+ GOTO (cleanup, rc = -EFAULT);
+ }
+ namesize = req->rq_reqmsg->buflens[offset + 1];
+
+ req_was_resent = lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT;
+ if (child_lockh->cookie) {
+ LASSERT(req_was_resent);
+ reconstruct_getattr_name(offset, req, child_lockh);
+ RETURN(0);
+ } else if (req_was_resent) {
+ DEBUG_REQ(D_HA, req, "no reply for RESENT req");
+ }
+
+ LASSERT (offset == 0 || offset == 2);
+ /* if requests were at offset 2, replies go back at 1 */
if (offset)
offset = 1;
cleanup_phase = 1; /* parent dentry and lock */
- CDEBUG(D_INODE, "parent ino %lu, name %*s\n", dir->i_ino,namelen,name);
+ CDEBUG(D_INODE, "parent ino %lu, name %s\n", dir->i_ino, name);
/* Step 2: Lookup child */
- dchild = lookup_one_len(name, de, namelen - 1);
+ dchild = ll_lookup_one_len(name, de, namesize - 1);
if (IS_ERR(dchild)) {
CDEBUG(D_INODE, "child lookup error %ld\n", PTR_ERR(dchild));
GOTO(cleanup, rc = PTR_ERR(dchild));
rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace, NULL,
child_res_id, LDLM_PLAIN, NULL, 0, LCK_PR,
&flags, ldlm_completion_ast, mds_blocking_ast,
- NULL, NULL, child_lockh);
+ NULL, child_lockh);
if (rc != ELDLM_OK) {
CERROR("ldlm_cli_enqueue: %d\n", rc);
GOTO(cleanup, rc = -EIO);
cleanup_phase = 3; /* child lock */
- if (req->rq_repmsg == NULL)
- mds_getattr_pack_msg(req, dchild->d_inode, offset);
+ if (req->rq_repmsg == NULL) {
+ rc = mds_getattr_pack_msg(req, dchild->d_inode, offset);
+ if (rc != 0) {
+ CERROR ("mds_getattr_pack_msg: %d\n", rc);
+ GOTO (cleanup, rc);
+ }
+ }
rc = mds_getattr_internal(obd, dchild, req, body, offset);
GOTO(cleanup, rc); /* returns the lock to the client */
-
+
cleanup:
- rc = mds_finish_transno(mds, dchild ? dchild->d_inode : NULL, NULL,
- req, rc, 0);
switch (cleanup_phase) {
case 3:
if (rc)
int rc = 0;
ENTRY;
- body = lustre_msg_buf(req->rq_reqmsg, offset);
+ body = lustre_swab_reqbuf (req, offset, sizeof (*body),
+ lustre_swab_mds_body);
+ if (body == NULL) {
+ CERROR ("Can't unpack body\n");
+ RETURN (-EFAULT);
+ }
+
uc.ouc_fsuid = body->fsuid;
uc.ouc_fsgid = body->fsgid;
uc.ouc_cap = body->capability;
}
rc = mds_getattr_pack_msg(req, de->d_inode, offset);
+ if (rc != 0) {
+ CERROR ("mds_getattr_pack_msg: %d\n", rc);
+ GOTO (out_pop, rc);
+ }
req->rq_status = mds_getattr_internal(obd, de, req, body, 0);
GOTO(out, rc);
}
- osfs = lustre_msg_buf(req->rq_repmsg, 0);
+ osfs = lustre_msg_buf(req->rq_repmsg, 0, sizeof (*osfs));
rc = fsfilt_statfs(obd, obd->u.mds.mds_sb, osfs);
if (rc) {
CERROR("mds: statfs failed: rc %d\n", rc);
GOTO(out, rc);
}
- obd_statfs_pack(osfs, osfs);
EXIT;
out:
return 0;
}
-static struct mds_file_data *mds_handle2mfd(struct lustre_handle *handle)
-{
- struct mds_file_data *mfd = NULL;
- ENTRY;
-
- if (!handle || !handle->addr)
- RETURN(NULL);
-
- mfd = (struct mds_file_data *)(unsigned long)(handle->addr);
- if (!kmem_cache_validate(mds_file_cache, mfd))
- RETURN(NULL);
-
- if (mfd->mfd_servercookie != handle->cookie)
- RETURN(NULL);
-
- RETURN(mfd);
-}
-
-#if 0
-
-static int mds_store_md(struct mds_obd *mds, struct ptlrpc_request *req,
- int offset, struct mds_body *body, struct inode *inode)
-{
- struct obd_device *obd = req->rq_export->exp_obd;
- struct lov_mds_md *lmm = lustre_msg_buf(req->rq_reqmsg, offset);
- int lmm_size = req->rq_reqmsg->buflens[offset];
- struct obd_run_ctxt saved;
- struct obd_ucred uc;
- void *handle;
- int rc, rc2;
- ENTRY;
-
- /* I don't really like this, but it is a sanity check on the client
- * MD request.
- */
- if (lmm_size > mds->mds_max_mdsize) {
- CERROR("Saving MD for inode %lu of %d bytes > max %d\n",
- inode->i_ino, lmm_size, mds->mds_max_mdsize);
- //RETURN(-EINVAL);
- }
-
- CDEBUG(D_INODE, "storing %d bytes MD for inode %lu\n",
- lmm_size, inode->i_ino);
- uc.ouc_fsuid = body->fsuid;
- uc.ouc_fsgid = body->fsgid;
- uc.ouc_cap = body->capability;
- push_ctxt(&saved, &mds->mds_ctxt, &uc);
- handle = fsfilt_start(obd, inode, FSFILT_OP_SETATTR);
- if (IS_ERR(handle)) {
- rc = PTR_ERR(handle);
- GOTO(out_ea, rc);
- }
-
- rc = fsfilt_set_md(obd, inode,handle,lmm,lmm_size);
- rc = mds_finish_transno(mds, inode, handle, req, rc, 0);
-out_ea:
- pop_ctxt(&saved, &mds->mds_ctxt, &uc);
-
- RETURN(rc);
-}
-
-#endif
-
static void reconstruct_close(struct ptlrpc_request *req)
{
struct mds_export_data *med = &req->rq_export->exp_mds_data;
MDS_CHECK_RESENT(req, reconstruct_close(req));
- body = lustre_msg_buf(req->rq_reqmsg, 0);
+ body = lustre_swab_reqbuf(req, 0, sizeof (*body),
+ lustre_swab_mds_body);
+ if (body == NULL) {
+ CERROR ("Can't unpack body\n");
+ RETURN (-EFAULT);
+ }
mfd = mds_handle2mfd(&body->handle);
if (mfd == NULL) {
DEBUG_REQ(D_ERROR, req, "no handle for file close "LPD64
- ": addr "LPX64", cookie "LPX64"\n",
- body->fid1.id, body->handle.addr,
+ ": cookie "LPX64"\n", body->fid1.id,
body->handle.cookie);
RETURN(-ESTALE);
}
if (OBD_FAIL_CHECK(OBD_FAIL_MDS_CLOSE_PACK)) {
CERROR("test case OBD_FAIL_MDS_CLOSE_PACK\n");
req->rq_status = -ENOMEM;
+ mds_mfd_put(mfd);
RETURN(-ENOMEM);
}
req->rq_status = rc;
}
+ mds_mfd_put(mfd);
RETURN(0);
}
struct file *file;
struct mds_body *body, *repbody;
struct obd_run_ctxt saved;
- int rc, size = sizeof(*body);
+ int rc, size = sizeof(*repbody);
struct obd_ucred uc;
ENTRY;
GOTO(out, rc = -ENOMEM);
}
- body = lustre_msg_buf(req->rq_reqmsg, 0);
+ body = lustre_swab_reqbuf (req, 0, sizeof (*body),
+ lustre_swab_mds_body);
+ if (body == NULL)
+ GOTO (out, rc = -EFAULT);
+
+ /* body->size is actually the offset -eeb */
+ if ((body->size & (PAGE_SIZE - 1)) != 0) {
+ CERROR ("offset "LPU64"not on a page boundary\n", body->size);
+ GOTO (out, rc = -EFAULT);
+ }
+
+ /* body->nlink is actually the #bytes to read -eeb */
+ if (body->nlink != PAGE_SIZE) {
+ CERROR ("size %d is not PAGE_SIZE\n", body->nlink);
+ GOTO (out, rc = -EFAULT);
+ }
+
uc.ouc_fsuid = body->fsuid;
uc.ouc_fsgid = body->fsgid;
uc.ouc_cap = body->capability;
if (IS_ERR(file))
GOTO(out_pop, rc = PTR_ERR(file));
- repbody = lustre_msg_buf(req->rq_repmsg, 0);
+ repbody = lustre_msg_buf(req->rq_repmsg, 0, sizeof (*repbody));
repbody->size = file->f_dentry->d_inode->i_size;
repbody->valid = OBD_MD_FLSIZE;
doesn't send a reply when this function completes. Instead a
callback function would send the reply */
/* body->blocks is actually the xid -phil */
+ /* body->size is actually the offset -eeb */
rc = mds_sendpage(req, file, body->size, body->blocks);
filp_close(file, 0);
{
switch (req->rq_reqmsg->opc) {
case MDS_CONNECT: /* This will never get here, but for completeness. */
+ case OST_CONNECT: /* This will never get here, but for completeness. */
case MDS_DISCONNECT:
+ case OST_DISCONNECT:
*process = 1;
RETURN(0);
case MDS_CLOSE:
case MDS_GETSTATUS: /* used in unmounting */
+ case OBD_PING:
case MDS_REINT:
case LDLM_ENQUEUE:
*process = target_queue_recovery_request(req, obd);
DEBUG_REQ(D_ERROR, req, "not permitted during recovery");
*process = 0;
/* XXX what should we set rq_status to here? */
- RETURN(ptlrpc_error(req->rq_svc, req));
+ req->rq_status = -EAGAIN;
+ RETURN(ptlrpc_error(req));
}
}
[REINT_OPEN] "open",
};
-void mds_steal_ack_locks(struct mds_export_data *med,
+void mds_steal_ack_locks(struct obd_export *exp,
struct ptlrpc_request *req)
{
- struct ptlrpc_request *oldrep = med->med_outstanding_reply;
+ unsigned long flags;
+
+ struct ptlrpc_request *oldrep = exp->exp_outstanding_reply;
memcpy(req->rq_ack_locks, oldrep->rq_ack_locks,
sizeof req->rq_ack_locks);
- oldrep->rq_flags |= PTL_RPC_FL_RESENT;
+ spin_lock_irqsave (&req->rq_lock, flags);
+ oldrep->rq_resent = 1;
wake_up(&oldrep->rq_wait_for_rep);
+ spin_unlock_irqrestore (&req->rq_lock, flags);
DEBUG_REQ(D_HA, oldrep, "stole locks from");
DEBUG_REQ(D_HA, req, "stole locks for");
}
-static void mds_send_reply(struct ptlrpc_request *req, int rc)
-{
- int i;
- struct ptlrpc_req_ack_lock *ack_lock;
- struct l_wait_info lwi;
- struct mds_export_data *med =
- (req->rq_export && req->rq_ack_locks[0].mode) ?
- &req->rq_export->exp_mds_data : NULL;
-
- if (med) {
- med->med_outstanding_reply = req;
- req->rq_flags |= PTL_RPC_FL_WANT_ACK;
- init_waitqueue_head(&req->rq_wait_for_rep);
- }
-
- if (!OBD_FAIL_CHECK(OBD_FAIL_MDS_ALL_REPLY_NET | OBD_FAIL_ONCE)) {
- if (rc) {
- DEBUG_REQ(D_ERROR, req, "processing error (%d)", rc);
- ptlrpc_error(req->rq_svc, req);
- } else {
- DEBUG_REQ(D_NET, req, "sending reply");
- ptlrpc_reply(req->rq_svc, req);
- }
- } else {
- obd_fail_loc |= OBD_FAIL_ONCE | OBD_FAILED;
- DEBUG_REQ(D_ERROR, req, "dropping reply");
- if (!med && req->rq_repmsg)
- OBD_FREE(req->rq_repmsg, req->rq_replen);
- }
-
- if (!med) {
- DEBUG_REQ(D_HA, req, "not waiting for ack");
- return;
- }
-
- lwi = LWI_TIMEOUT(obd_timeout / 2 * HZ, NULL, NULL);
- rc = l_wait_event(req->rq_wait_for_rep,
- (req->rq_flags & PTL_RPC_FL_WANT_ACK) == 0 ||
- (req->rq_flags & PTL_RPC_FL_RESENT),
- &lwi);
-
- if (req->rq_flags & PTL_RPC_FL_RESENT) {
- /* The client resent this request, so abort the
- * waiting-ack portals stuff, and don't decref the
- * locks.
- */
- DEBUG_REQ(D_HA, req, "resent: not cancelling locks");
- ptlrpc_abort(req);
- return;
- }
-
- if (rc == -ETIMEDOUT) {
- ptlrpc_abort(req);
- recovd_conn_fail(req->rq_export->exp_connection);
- DEBUG_REQ(D_HA, req, "cancelling locks for timeout");
- } else {
- DEBUG_REQ(D_HA, req, "cancelling locks for ack");
- }
-
- med->med_outstanding_reply = NULL;
-
- for (ack_lock = req->rq_ack_locks, i = 0; i < 4; i++, ack_lock++) {
- if (!ack_lock->mode)
- break;
- ldlm_lock_decref(&ack_lock->lock, ack_lock->mode);
- }
-}
-
int mds_handle(struct ptlrpc_request *req)
{
- int should_process, rc;
+ int should_process;
+ int rc = 0;
struct mds_obd *mds = NULL; /* quell gcc overwarning */
struct obd_device *obd = NULL;
ENTRY;
- rc = lustre_unpack_msg(req->rq_reqmsg, req->rq_reqlen);
- if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_HANDLE_UNPACK)) {
- DEBUG_REQ(D_ERROR, req, "invalid request (%d)", rc);
- GOTO(out, rc);
- }
-
OBD_FAIL_RETURN(OBD_FAIL_MDS_ALL_REQUEST_NET | OBD_FAIL_ONCE, 0);
LASSERT(!strcmp(req->rq_obd->obd_type->typ_name, LUSTRE_MDT_NAME));
+ /* XXX identical to OST */
if (req->rq_reqmsg->opc != MDS_CONNECT) {
struct mds_export_data *med;
+ int recovering, abort_recovery;
+
if (req->rq_export == NULL) {
+ CERROR("lustre_mds: operation %d on unconnected MDS\n",
+ req->rq_reqmsg->opc);
req->rq_status = -ENOTCONN;
GOTO(out, rc = -ENOTCONN);
}
med = &req->rq_export->exp_mds_data;
obd = req->rq_export->exp_obd;
mds = &obd->u.mds;
+
+ /* Check for aborted recovery. */
spin_lock_bh(&obd->obd_processing_task_lock);
- if (obd->obd_flags & OBD_ABORT_RECOVERY)
- target_abort_recovery(obd);
+ abort_recovery = obd->obd_abort_recovery;
+ recovering = obd->obd_recovering;
spin_unlock_bh(&obd->obd_processing_task_lock);
-
- if (obd->obd_flags & OBD_RECOVERING) {
+ if (abort_recovery) {
+ target_abort_recovery(obd);
+ } else if (recovering) {
rc = filter_recovery_request(req, obd, &should_process);
if (rc || !should_process)
RETURN(rc);
/* Make sure that last_rcvd is correct. */
if (!rc)
mds_fsync_super(mds->mds_sb);
- req->rq_status = rc;
+ req->rq_status = rc; /* superfluous? */
break;
case MDS_GETSTATUS:
* acquiring any new locks in mds_getattr_name, so we don't
* want to cancel.
*/
- lockh.addr = 0;
+ lockh.cookie = 0;
rc = mds_getattr_name(0, req, &lockh);
- if (rc == 0 && lockh.addr)
+ if (rc == 0 && lockh.cookie)
ldlm_lock_decref(&lockh, LCK_PR);
break;
}
break;
case MDS_REINT: {
- int opc = *(u32 *)lustre_msg_buf(req->rq_reqmsg, 0);
+ __u32 *opcp = lustre_msg_buf (req->rq_reqmsg, 0, sizeof (*opcp));
+ __u32 opc;
int size[2] = {sizeof(struct mds_body), mds->mds_max_mdsize};
int bufcount;
- DEBUG_REQ(D_INODE, req, "reint (%s%s)",
- reint_names[opc & REINT_OPCODE_MASK],
- opc & REINT_REPLAYING ? "|REPLAYING" : "");
+ /* NB only peek inside req now; mds_reint() will swab it */
+ if (opcp == NULL) {
+ CERROR ("Can't inspect opcode\n");
+ rc = -EINVAL;
+ break;
+ }
+ opc = *opcp;
+ if (lustre_msg_swabbed (req->rq_reqmsg))
+ __swab32s (&opc);
+
+ DEBUG_REQ(D_INODE, req, "reint %d (%s)", opc,
+ (opc < sizeof (reint_names) / sizeof (reint_names[0]) ||
+ reint_names[opc] == NULL) ? reint_names[opc] : "unknown opcode");
OBD_FAIL_RETURN(OBD_FAIL_MDS_REINT_NET, 0);
rc = mds_close(req);
break;
+ case OBD_PING:
+ DEBUG_REQ(D_INODE, req, "ping");
+ rc = target_handle_ping(req);
+ break;
+
case LDLM_ENQUEUE:
DEBUG_REQ(D_INODE, req, "enqueue");
OBD_FAIL_RETURN(OBD_FAIL_LDLM_ENQUEUE, 0);
OBD_FAIL_RETURN(OBD_FAIL_LDLM_BL_CALLBACK, 0);
break;
default:
- rc = ptlrpc_error(req->rq_svc, req);
+ req->rq_status = -ENOTSUPP;
+ rc = ptlrpc_error(req);
RETURN(rc);
}
struct obd_device *obd = list_entry(mds, struct obd_device,
u.mds);
req->rq_repmsg->last_xid =
- HTON__u64(le64_to_cpu(med->med_mcd->mcd_last_xid));
- if ((obd->obd_flags & OBD_NO_TRANSNO) == 0) {
+ le64_to_cpu (med->med_mcd->mcd_last_xid);
+
+ if (!obd->obd_no_transno) {
req->rq_repmsg->last_committed =
- HTON__u64(obd->obd_last_committed);
+ obd->obd_last_committed;
} else {
DEBUG_REQ(D_IOCTL, req,
"not sending last_committed update");
CDEBUG(D_INFO, "last_transno "LPU64", last_committed "LPU64
", xid "LPU64"\n",
mds->mds_last_transno, obd->obd_last_committed,
- NTOH__u64(req->rq_xid));
+ req->rq_xid);
}
out:
if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_LAST_REPLAY) {
- if (obd && (obd->obd_flags & OBD_RECOVERING)) {
+ if (obd && obd->obd_recovering) {
DEBUG_REQ(D_HA, req, "LAST_REPLAY, queuing reply");
return target_queue_final_reply(req, rc);
}
rc = req->rq_status = -ENOTCONN;
}
- mds_send_reply(req, rc);
+ target_send_reply(req, rc, OBD_FAIL_MDS_ALL_REPLY_NET);
return 0;
}
struct mds_obd *mds = &obddev->u.mds;
struct vfsmount *mnt;
int rc = 0;
+ unsigned long page;
ENTRY;
+
#ifdef CONFIG_DEV_RDONLY
dev_clear_rdonly(2);
#endif
if (IS_ERR(obddev->obd_fsops))
RETURN(rc = PTR_ERR(obddev->obd_fsops));
- mnt = do_kern_mount(data->ioc_inlbuf2, 0, data->ioc_inlbuf1, NULL);
+ if (!(page = __get_free_page(GFP_KERNEL)))
+ return -ENOMEM;
+
+ memset((void *)page, 0, PAGE_SIZE);
+ sprintf((char *)page, "iopen_nopriv");
+
+ mnt = do_kern_mount(data->ioc_inlbuf2, 0,
+ data->ioc_inlbuf1, (void *)page);
+ free_page(page);
if (IS_ERR(mnt)) {
rc = PTR_ERR(mnt);
CERROR("do_kern_mount failed: rc = %d\n", rc);
obddev->obd_namespace =
ldlm_namespace_new("mds_server", LDLM_NAMESPACE_SERVER);
if (obddev->obd_namespace == NULL) {
- mds_cleanup(obddev);
+ mds_cleanup(obddev, 0, 0);
GOTO(err_fs, rc = -ENOMEM);
}
RETURN(0);
err_fs:
- mds_fs_cleanup(obddev);
+ mds_fs_cleanup(obddev, 0);
err_put:
unlock_kernel();
mntput(mds->mds_vfsmnt);
return rc;
}
-static int mds_cleanup(struct obd_device *obddev)
+static int mds_cleanup(struct obd_device *obddev, int force, int failover)
{
struct super_block *sb;
struct mds_obd *mds = &obddev->u.mds;
RETURN(0);
mds_update_server_data(mds);
- mds_fs_cleanup(obddev);
+ mds_fs_cleanup(obddev, failover);
unlock_kernel();
+
+ /* 2 seems normal on mds, (may_umount() also expects 2
+ fwiw), but we only see 1 at this point in obdfilter. */
+ if (atomic_read(&obddev->u.mds.mds_vfsmnt->mnt_count) > 2){
+ CERROR("%s: mount point busy, mnt_count: %d\n",
+ obddev->obd_name,
+ atomic_read(&obddev->u.mds.mds_vfsmnt->mnt_count));
+ }
+
mntput(mds->mds_vfsmnt);
mds->mds_sb = 0;
ldlm_namespace_free(obddev->obd_namespace);
+ if (obddev->obd_recovering)
+ target_cancel_recovery_timer(obddev);
lock_kernel();
#ifdef CONFIG_DEV_RDONLY
dev_clear_rdonly(2);
inline void fixup_handle_for_resent_req(struct ptlrpc_request *req,
struct lustre_handle *lockh)
{
- struct mds_export_data *med = &req->rq_export->exp_mds_data;
- struct mds_client_data *mcd = med->med_mcd;
- struct ptlrpc_request *oldrep = med->med_outstanding_reply;
- struct ldlm_reply *dlm_rep;
+ struct obd_export *exp = req->rq_export;
+ struct obd_device *obd = exp->exp_obd;
+ struct ldlm_request *dlmreq =
+ lustre_msg_buf(req->rq_reqmsg, 0, sizeof (*dlmreq));
+ struct lustre_handle remote_hdl = dlmreq->lock_handle1;
+ struct list_head *iter;
+
+ if (!(lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT))
+ return;
+
+ l_lock(&obd->obd_namespace->ns_lock);
+ list_for_each(iter, &exp->exp_ldlm_data.led_held_locks) {
+ struct ldlm_lock *lock;
+ lock = list_entry(iter, struct ldlm_lock, l_export_chain);
+ if (lock->l_remote_handle.cookie == remote_hdl.cookie) {
+ lockh->cookie = lock->l_handle.h_cookie;
+ DEBUG_REQ(D_HA, req, "restoring lock cookie "LPX64,
+ lockh->cookie);
+ l_unlock(&obd->obd_namespace->ns_lock);
+ return;
+ }
- if ((lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT) &&
- (mcd->mcd_last_xid == req->rq_xid) && (oldrep != NULL)) {
- DEBUG_REQ(D_HA, req, "restoring lock handle from %p", oldrep);
- dlm_rep = lustre_msg_buf(oldrep->rq_repmsg, 0);
- lockh->addr = dlm_rep->lock_handle.addr;
- lockh->cookie = dlm_rep->lock_handle.cookie;
}
+ l_unlock(&obd->obd_namespace->ns_lock);
+ DEBUG_REQ(D_HA, req, "no existing lock with rhandle "LPX64,
+ remote_hdl.cookie);
}
static int ldlm_intent_policy(struct ldlm_namespace *ns,
if (req->rq_reqmsg->bufcount > 1) {
/* an intent needs to be considered */
- struct ldlm_intent *it = lustre_msg_buf(req->rq_reqmsg, 1);
+ struct ldlm_intent *it;
struct mds_obd *mds = &req->rq_export->exp_obd->u.mds;
struct mds_body *mds_body;
struct ldlm_reply *rep;
- struct lustre_handle lockh;
+ struct lustre_handle lockh = { 0 };
struct ldlm_lock *new_lock;
int rc, offset = 2, repsize[3] = {sizeof(struct ldlm_reply),
sizeof(struct mds_body),
mds->mds_max_mdsize};
- it->opc = NTOH__u64(it->opc);
+ it = lustre_swab_reqbuf (req, 1, sizeof (*it),
+ lustre_swab_ldlm_intent);
+ if (it == NULL) {
+ CERROR ("Intent missing\n");
+ rc = req->rq_status = -EFAULT;
+ RETURN (rc);
+ }
LDLM_DEBUG(lock, "intent policy, opc: %s",
ldlm_it2str(it->opc));
RETURN(rc);
}
- rep = lustre_msg_buf(req->rq_repmsg, 0);
+ rep = lustre_msg_buf(req->rq_repmsg, 0, sizeof (*rep));
rep->lock_policy_res1 = IT_INTENT_EXEC;
fixup_handle_for_resent_req(req, &lockh);
rep->lock_policy_res2 = req->rq_status;
RETURN(ELDLM_LOCK_ABORTED);
}
- mds_body = lustre_msg_buf(req->rq_repmsg, 1);
+ mds_body = lustre_msg_buf(req->rq_repmsg, 1, sizeof (*mds_body));
if (!(mds_body->valid & OBD_MD_FLEASIZE)) {
rep->lock_policy_res2 = rc;
RETURN(ELDLM_LOCK_ABORTED);
LBUG();
}
- if (flags & LDLM_FL_INTENT_ONLY) {
- LDLM_DEBUG(lock, "INTENT_ONLY, aborting lock");
- RETURN(ELDLM_LOCK_ABORTED);
- }
-
/* By this point, whatever function we called above must have
* filled in 'lockh' or returned an error. We want to give the
* new lock to the client instead of whatever lock it was about
* to get. */
new_lock = ldlm_handle2lock(&lockh);
LASSERT(new_lock != NULL);
+
+ /* If we've already given this lock to a client once, then we
+ * should have no readers or writers. Otherwise, we should
+ * have one reader _or_ writer ref (which will be zeroed below
+ * before returning the lock to a client.
+ */
+ if (new_lock->l_export == req->rq_export)
+ LASSERT(new_lock->l_readers + new_lock->l_writers == 0);
+ else
+ LASSERT(new_lock->l_readers + new_lock->l_writers == 1);
+
+ /* If we're running an intent only, we want to abort the new
+ * lock, and let the client abort the original lock. */
+ if (flags & LDLM_FL_INTENT_ONLY) {
+ LDLM_DEBUG(lock, "INTENT_ONLY, aborting locks");
+ l_lock(&new_lock->l_resource->lr_namespace->ns_lock);
+ if (new_lock->l_readers)
+ ldlm_lock_decref(&lockh, LCK_PR);
+ else
+ ldlm_lock_decref(&lockh, LCK_PW);
+ l_unlock(&new_lock->l_resource->lr_namespace->ns_lock);
+ LDLM_LOCK_PUT(new_lock);
+ RETURN(ELDLM_LOCK_ABORTED);
+ }
+
*lockp = new_lock;
rep->lock_policy_res2 = req->rq_status;
if (new_lock->l_export == req->rq_export) {
/* Already gave this to the client, which means that we
* reconstructed a reply. */
- LASSERT(lustre_msg_get_flags(req->rq_reqmsg) &
+ LASSERT(lustre_msg_get_flags(req->rq_reqmsg) &
MSG_RESENT);
RETURN(ELDLM_LOCK_REPLACED);
}
/* Fixup the lock to be given to the client */
l_lock(&new_lock->l_resource->lr_namespace->ns_lock);
- LASSERT(new_lock->l_readers + new_lock->l_writers == 1);
new_lock->l_readers = 0;
new_lock->l_writers = 0;
mds->mds_service = ptlrpc_init_svc(MDS_NEVENTS, MDS_NBUFS,
MDS_BUFSIZE, MDS_MAXREQSIZE,
MDS_REQUEST_PORTAL, MDC_REPLY_PORTAL,
- mds_handle, "mds");
+ mds_handle, "mds", obddev);
+
if (!mds->mds_service) {
CERROR("failed to start service\n");
RETURN(rc = -ENOMEM);
ptlrpc_init_svc(MDS_NEVENTS, MDS_NBUFS,
MDS_BUFSIZE, MDS_MAXREQSIZE,
MDS_SETATTR_PORTAL, MDC_REPLY_PORTAL,
- mds_handle, "mds");
+ mds_handle, "mds_setattr", obddev);
if (!mds->mds_setattr_service) {
CERROR("failed to start getattr service\n");
GOTO(err_thread, rc = -ENOMEM);
ptlrpc_init_svc(MDS_NEVENTS, MDS_NBUFS,
MDS_BUFSIZE, MDS_MAXREQSIZE,
MDS_READPAGE_PORTAL, MDC_REPLY_PORTAL,
- mds_handle, "mds");
+ mds_handle, "mds_readpage", obddev);
if (!mds->mds_readpage_service) {
CERROR("failed to start readpage service\n");
GOTO(err_thread2, rc = -ENOMEM);
}
-static int mdt_cleanup(struct obd_device *obddev)
+static int mdt_cleanup(struct obd_device *obddev, int force, int failover)
{
struct mds_obd *mds = &obddev->u.mds;
ENTRY;
/* use obd ops to offer management infrastructure */
static struct obd_ops mds_obd_ops = {
- o_owner: THIS_MODULE,
- o_attach: mds_attach,
- o_detach: mds_detach,
- o_connect: mds_connect,
- o_disconnect: mds_disconnect,
- o_setup: mds_setup,
- o_cleanup: mds_cleanup,
- o_iocontrol: mds_iocontrol
+ o_owner: THIS_MODULE,
+ o_attach: mds_attach,
+ o_detach: mds_detach,
+ o_connect: mds_connect,
+ o_disconnect: mds_disconnect,
+ o_setup: mds_setup,
+ o_cleanup: mds_cleanup,
+ o_iocontrol: mds_iocontrol,
+ o_destroy_export: mds_destroy_export
};
static struct obd_ops mdt_obd_ops = {
static int __init mds_init(void)
{
struct lprocfs_static_vars lvars;
- mds_file_cache = kmem_cache_create("ll_mds_file_data",
- sizeof(struct mds_file_data),
- 0, 0, NULL, NULL);
- if (mds_file_cache == NULL)
- return -ENOMEM;
lprocfs_init_multi_vars(0, &lvars);
class_register_type(&mds_obd_ops, lvars.module_vars, LUSTRE_MDS_NAME);
ldlm_unregister_intent();
class_unregister_type(LUSTRE_MDS_NAME);
class_unregister_type(LUSTRE_MDT_NAME);
- if (kmem_cache_destroy(mds_file_cache))
- CERROR("couldn't free MDS file cache\n");
}
MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");