#include <linux/lprocfs_status.h>
#include <linux/lustre_commit_confd.h>
-#ifdef CONFIG_SNAPFS
-#include <linux/lustre_smfs.h>
-#include <linux/lustre_snap.h>
-#endif
#include "mds_internal.h"
static int mds_intent_policy(struct ldlm_namespace *ns,
if (OBD_FAIL_CHECK(OBD_FAIL_MDS_SENDPAGE)) {
CERROR("obd_fail_loc=%x, fail operation rc=%d\n",
- OBD_FAIL_MDS_SENDPAGE, rc);
+ OBD_FAIL_MDS_SENDPAGE, rc = -EIO);
GOTO(abort_bulk, rc);
}
if (!inode)
RETURN(ERR_PTR(-ENOENT));
+ if (is_bad_inode(inode)) {
+ CERROR("bad inode returned %lu/%u\n",
+ inode->i_ino, inode->i_generation);
+ dput(result);
+ RETURN(ERR_PTR(-ENOENT));
+ }
+
/* here we disabled generation check, as root inode i_generation
* of cache mds and real mds are different. */
if (inode->i_ino != mds->mds_rootfid.id && generation &&
dentry->d_name.len, dentry->d_name.name,
ll_bdevname(dentry->d_inode->i_sb, btmp),
dentry->d_inode->i_ino);
+ /* child inode->i_alloc_sem protects orphan_dec_test and
+ * is_orphan race, mds_mfd_close drops it */
+ DOWN_WRITE_I_ALLOC_SEM(dentry->d_inode);
rc = mds_mfd_close(NULL, 0, obd, mfd,
!(export->exp_flags & OBD_OPT_FAILOVER));
-
if (rc)
CDEBUG(D_INODE, "Error closing file: %d\n", rc);
spin_lock(&med->med_open_lock);
rc = lustre_pack_reply(req, 1, size, NULL);
if (rc) {
- CERROR("out of memory\n");
+ CERROR("lustre_pack_reply failed: rc %d\n", rc);
GOTO(out, req->rq_status = rc);
}
RETURN(rc);
}
-static int mds_getattr_name(struct ptlrpc_request *req, int offset,
+static int mds_getattr_lock(struct ptlrpc_request *req, int offset,
struct lustre_handle *child_lockh, int child_part)
{
struct obd_device *obd = req->rq_export->exp_obd;
struct mds_body *body;
struct dentry *dparent = NULL, *dchild = NULL;
struct lvfs_ucred uc;
- struct lustre_handle parent_lockh[2];
- int namesize, update_mode;
- int rc = 0, cleanup_phase = 0, resent_req = 0, reply_offset;
- struct clonefs_info *clone_info = NULL;
- char *name;
+ struct lustre_handle parent_lockh[2] = {{0}, {0}};
+ unsigned int namesize;
+ int rc = 0, cleanup_phase = 0, resent_req = 0, update_mode, reply_offset;
+ char *name = NULL;
ENTRY;
LASSERT(!strcmp(obd->obd_type->typ_name, LUSTRE_MDS_NAME));
}
namesize = req->rq_reqmsg->buflens[offset + 1];
+ /* namesize less than 2 means we have empty name, probably came from
+ revalidate by cfid, so no point in having name to be set */
+ if (namesize <= 1)
+ name = NULL;
+
LASSERT (offset == 1 || offset == 3);
- /* if requests were at offset 3, the getattr reply goes back at 1 */
+ /* if requests were at offset 2, the getattr reply goes back at 1 */
if (offset == 3) {
rep = lustre_msg_buf(req->rq_repmsg, 0, sizeof (*rep));
reply_offset = 1;
intent_set_disposition(rep, DISP_LOOKUP_EXECD);
LASSERT(namesize > 0);
- if (namesize == 1) {
- /* we have no dentry here, drop LOOKUP bit */
- child_part &= ~MDS_INODELOCK_LOOKUP;
- CDEBUG(D_OTHER, "%s: request to retrieve attrs for %lu/%lu\n",
- obd->obd_name, (unsigned long) body->fid1.id,
- (unsigned long) body->fid1.generation);
- dchild = mds_fid2locked_dentry(obd, &body->fid1, NULL, LCK_PR,
- parent_lockh, &update_mode,
- NULL, 0, child_part);
- if (IS_ERR(dchild)) {
- CERROR("can't find inode: %d\n", (int) PTR_ERR(dchild));
- GOTO(cleanup, rc = PTR_ERR(dchild));
- }
- memcpy(child_lockh, parent_lockh, sizeof(parent_lockh[0]));
-#ifdef S_PDIROPS
- if (parent_lockh[1].cookie)
- ldlm_lock_decref(parent_lockh + 1, update_mode);
-#endif
- cleanup_phase = 2;
- goto fill_inode;
+ if (child_lockh->cookie != 0) {
+ LASSERT(lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT);
+ resent_req = 1;
}
-
+#if 0
#if HAVE_LOOKUP_RAW
/* FIXME: handle raw lookup */
if (body->valid == OBD_MD_FLID) {
GOTO(cleanup, rc);
}
#endif
-
- if (child_lockh->cookie != 0) {
- LASSERT(lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT);
- resent_req = 1;
- }
-
+#endif
if (resent_req == 0) {
- rc = mds_get_parent_child_locked(obd, mds, &body->fid1,
- parent_lockh, &dparent,
- LCK_PR, MDS_INODELOCK_LOOKUP,
- &update_mode, name, namesize,
- child_lockh, &dchild, LCK_PR,
- child_part, clone_info);
+ if (name) {
+ rc = mds_get_parent_child_locked(obd, mds, &body->fid1,
+ parent_lockh, &dparent,
+ LCK_PR,
+ MDS_INODELOCK_LOOKUP,
+ &update_mode,
+ name, namesize,
+ child_lockh, &dchild,
+ LCK_PR, child_part);
+ } else {
+ /* we have no dentry here, drop LOOKUP bit */
+ /*FIXME: we need MDS_INODELOCK_LOOKUP or not*/
+ child_part &= ~MDS_INODELOCK_LOOKUP;
+ CDEBUG(D_OTHER, "%s: retrieve attrs for %lu/%lu\n",
+ obd->obd_name, (unsigned long) body->fid1.id,
+ (unsigned long) body->fid1.generation);
+
+#if 0
+ dchild = mds_fid2locked_dentry(obd, &body->fid1, NULL,
+ LCK_PR, parent_lockh,
+ &update_mode,
+ NULL, 0, child_part);
+#else
+ dchild = mds_fid2locked_dentry(obd, &body->fid1, NULL,
+ LCK_PR, parent_lockh,
+ &update_mode,
+ NULL, 0,
+ MDS_INODELOCK_UPDATE);
+#endif
+ if (IS_ERR(dchild)) {
+ CERROR("can't find inode: %d\n",
+ (int) PTR_ERR(dchild));
+ GOTO(cleanup, rc = PTR_ERR(dchild));
+ }
+ memcpy(child_lockh, parent_lockh,
+ sizeof(parent_lockh[0]));
+#ifdef S_PDIROPS
+ if (parent_lockh[1].cookie)
+ ldlm_lock_decref(parent_lockh + 1, update_mode);
+#endif
+ cleanup_phase = 2;
+ goto fill_inode;
+ }
if (rc)
GOTO(cleanup, rc);
struct ldlm_resource *res;
DEBUG_REQ(D_DLMTRACE, req, "resent, not enqueuing new locks");
granted_lock = ldlm_handle2lock(child_lockh);
- LASSERT(granted_lock);
+
+ LASSERTF(granted_lock != NULL, LPU64"/%u lockh "LPX64"\n",
+ body->fid1.id, body->fid1.generation,
+ child_lockh->cookie);
res = granted_lock->l_resource;
child_fid.id = res->lr_name.name[0];
if (resent_req == 0) {
if (rc && DENTRY_VALID(dchild))
ldlm_lock_decref(child_lockh, LCK_PR);
- if (dparent) {
+ if (name) {
ldlm_lock_decref(parent_lockh, LCK_PR);
+ }
#ifdef S_PDIROPS
- if (parent_lockh[1].cookie != 0)
- ldlm_lock_decref(parent_lockh + 1,
- update_mode);
+ if (parent_lockh[1].cookie != 0)
+ ldlm_lock_decref(parent_lockh + 1, update_mode);
#endif
- }
if (dparent)
l_dput(dparent);
}
push_ctxt(&saved, &obd->obd_lvfs_ctxt, &uc);
de = mds_fid2dentry(mds, &body->fid1, NULL);
if (IS_ERR(de)) {
- rc = req->rq_status = -ENOENT;
- GOTO(out_pop, PTR_ERR(de));
+ rc = req->rq_status = PTR_ERR(de);
+ GOTO(out_pop, rc);
}
rc = mds_getattr_pack_msg(req, de->d_inode, offset);
if (rc != 0) {
- CERROR ("mds_getattr_pack_msg: %d\n", rc);
- GOTO (out_pop, rc);
+ CERROR("mds_getattr_pack_msg: %d\n", rc);
+ GOTO(out_pop, rc);
}
req->rq_status = mds_getattr_internal(obd, de, req, body, 0);
case MDS_STATFS:
case MDS_GETSTATUS:
case MDS_GETATTR:
- case MDS_GETATTR_NAME:
+ case MDS_GETATTR_LOCK:
case MDS_READPAGE:
case MDS_REINT:
case MDS_CLOSE:
return rc;
}
+static char str[PTL_NALFMT_SIZE];
int mds_handle(struct ptlrpc_request *req)
{
int should_process, fail = OBD_FAIL_MDS_ALL_REPLY_NET;
int recovering;
if (req->rq_export == NULL) {
- CERROR("lustre_mds: operation %d on unconnected MDS\n",
- req->rq_reqmsg->opc);
+ CERROR("operation %d on unconnected MDS from NID %s\n",
+ req->rq_reqmsg->opc,
+ ptlrpc_peernid2str(&req->rq_peer, str));
req->rq_status = -ENOTCONN;
GOTO(out, rc = -ENOTCONN);
}
/* sanity check: if the xid matches, the request must
* be marked as a resent or replayed */
- if (req->rq_xid == med->med_mcd->mcd_last_xid)
+ if (req->rq_xid == med->med_mcd->mcd_last_xid) {
LASSERTF(lustre_msg_get_flags(req->rq_reqmsg) &
(MSG_RESENT | MSG_REPLAY),
"rq_xid "LPU64" matches last_xid, "
"expected RESENT flag\n",
req->rq_xid);
+ }
/* else: note the opposite is not always true; a
* RESENT req after a failover will usually not match
* the last_xid, since it was likely never
rc = mds_getattr(req, MDS_REQ_REC_OFF);
break;
- case MDS_GETATTR_NAME: {
+ case MDS_GETATTR_LOCK: {
struct lustre_handle lockh;
DEBUG_REQ(D_INODE, req, "getattr_name");
- OBD_FAIL_RETURN(OBD_FAIL_MDS_GETATTR_NAME_NET, 0);
+ OBD_FAIL_RETURN(OBD_FAIL_MDS_GETATTR_LOCK_NET, 0);
/* If this request gets a reconstructed reply, we won't be
- * acquiring any new locks in mds_getattr_name, so we don't
+ * acquiring any new locks in mds_getattr_lock, so we don't
* want to cancel.
*/
lockh.cookie = 0;
- rc = mds_getattr_name(req, MDS_REQ_REC_OFF, &lockh,
+ rc = mds_getattr_lock(req, MDS_REQ_REC_OFF, &lockh,
MDS_INODELOCK_UPDATE);
/* this non-intent call (from an ioctl) is special */
req->rq_status = rc;
OBD_FAIL_RETURN(OBD_FAIL_LDLM_ENQUEUE, 0);
rc = ldlm_handle_enqueue(req, ldlm_server_completion_ast,
ldlm_server_blocking_ast, NULL);
+ fail = OBD_FAIL_LDLM_REPLY;
break;
case LDLM_CONVERT:
DEBUG_REQ(D_INODE, req, "convert");
struct mds_obd *mds = &obd->u.mds;
char *options = NULL;
struct vfsmount *mnt;
+ char ns_name[48];
unsigned long page;
int rc = 0;
ENTRY;
mds->mds_max_cookiesize = sizeof(struct llog_cookie);
atomic_set(&mds->mds_real_clients, 0);
- obd->obd_namespace = ldlm_namespace_new(obd->obd_name,
- LDLM_NAMESPACE_SERVER);
+ sprintf(ns_name, "mds-%s", obd->obd_uuid.uuid);
+ obd->obd_namespace = ldlm_namespace_new(ns_name, LDLM_NAMESPACE_SERVER);
+
if (obd->obd_namespace == NULL) {
mds_cleanup(obd, 0);
GOTO(err_put, rc = -ENOMEM);
rc = mds_fs_setup(obd, mnt);
if (rc) {
- CERROR("MDS filesystem method init failed: rc = %d\n", rc);
+ CERROR("%s: MDS filesystem method init failed: rc = %d\n",
+ obd->obd_name, rc);
GOTO(err_ns, rc);
}
{
struct mds_obd *mds = &obd->u.mds;
struct llog_ctxt *ctxt;
- int rc, item = 0;
+ int rc, item = 0, valsize;
+ __u32 group;
ENTRY;
LASSERT(!obd->obd_recovering);
GOTO(out, rc);
item = rc;
+ group = FILTER_GROUP_FIRST_MDS + mds->mds_num;
+ valsize = sizeof(group);
+ rc = obd_set_info(mds->mds_osc_exp, strlen("mds_conn"), "mds_conn",
+ valsize, &group);
+ if (rc)
+ GOTO(out, rc);
+
rc = llog_connect(ctxt, obd->u.mds.mds_lov_desc.ld_tgt_count,
NULL, NULL, NULL);
if (rc) {
static void fixup_handle_for_resent_req(struct ptlrpc_request *req,
int offset,
struct ldlm_lock *new_lock,
+ struct ldlm_lock **old_lock,
struct lustre_handle *lockh)
{
struct obd_export *exp = req->rq_export;
continue;
if (lock->l_remote_handle.cookie == remote_hdl.cookie) {
lockh->cookie = lock->l_handle.h_cookie;
+ LDLM_DEBUG(lock, "restoring lock cookie");
DEBUG_REQ(D_HA, req, "restoring lock cookie "LPX64,
lockh->cookie);
+ if (old_lock)
+ *old_lock = LDLM_LOCK_GET(lock);
l_unlock(&obd->obd_namespace->ns_lock);
return;
}
struct mds_obd *mds = &req->rq_export->exp_obd->u.mds;
struct ldlm_reply *rep;
struct lustre_handle lockh[2] = {{0}, {0}};
- struct ldlm_lock *new_lock;
+ struct ldlm_lock *new_lock = NULL;
int getattr_part = MDS_INODELOCK_UPDATE;
int rc, repsize[4] = { sizeof(struct ldlm_reply),
sizeof(struct mds_body),
rep = lustre_msg_buf(req->rq_repmsg, 0, sizeof (*rep));
intent_set_disposition(rep, DISP_IT_EXECD);
- fixup_handle_for_resent_req(req, MDS_REQ_INTENT_LOCKREQ_OFF,
- lock, lockh);
/* execute policy */
switch ((long)it->opc) {
case IT_CREAT|IT_OPEN:
/* XXX swab here to assert that an mds_open reint
* packet is following */
+ fixup_handle_for_resent_req(req, MDS_REQ_INTENT_LOCKREQ_OFF,
+ lock, NULL, lockh);
rep->lock_policy_res2 = mds_reint(req, offset, lockh);
#if 0
/* We abort the lock if the lookup was negative and
case IT_GETATTR:
getattr_part |= MDS_INODELOCK_LOOKUP;
case IT_READDIR:
- rep->lock_policy_res2 = mds_getattr_name(req, offset, lockh,
+ fixup_handle_for_resent_req(req, MDS_REQ_INTENT_LOCKREQ_OFF,
+ lock, &new_lock, lockh);
+ rep->lock_policy_res2 = mds_getattr_lock(req, offset, lockh,
getattr_part);
/* FIXME: LDLM can set req->rq_status. MDS sets
policy_res{1,2} with disposition and status.
* drop it below anyways because lock replay is done separately by the
* client afterwards. For regular RPCs we want to give the new lock to
* the client instead of whatever lock it was about to get. */
- new_lock = ldlm_handle2lock(&lockh[0]);
+ if (new_lock == NULL)
+ new_lock = ldlm_handle2lock(&lockh[0]);
if (new_lock == NULL && (flags & LDLM_FL_INTENT_ONLY))
RETURN(0);
- LASSERT(new_lock != NULL);
+ LASSERTF(new_lock != NULL, "op "LPX64" lockh "LPX64"\n",
+ it->opc, lockh[0].cookie);
+
/* If we've already given this lock to a client once, then we should
* have no readers or writers. Otherwise, we should have one reader