#include "mds_internal.h"
-extern inline struct mds_obd *mds_req2mds(struct ptlrpc_request *req);
-int mds_finish_transno(struct mds_obd *mds, struct inode *i, void *handle,
- struct ptlrpc_request *req, int rc, __u32 op_data);
-extern int enqueue_ordered_locks(int lock_mode, struct obd_device *obd,
- struct ldlm_res_id *p1_res_id,
- struct ldlm_res_id *p2_res_id,
- struct ldlm_res_id *c1_res_id,
- struct ldlm_res_id *c2_res_id,
- struct lustre_handle *p1_lockh,
- struct lustre_handle *p2_lockh,
- struct lustre_handle *c1_lockh,
- struct lustre_handle *c2_lockh);
-
struct mds_file_data *mds_dentry_open(struct dentry *dentry,
struct vfsmount *mnt,
int flags,
{
struct mds_export_data *med = &req->rq_export->exp_mds_data;
struct inode *inode;
- int mode;
struct mds_file_data *mfd;
- int error;
+ int mode, error;
mfd = mds_mfd_new();
- if (!mfd) {
+ if (mfd == NULL) {
CERROR("mds: out of memory\n");
GOTO(cleanup_dentry, error = -ENOMEM);
}
- mode = (flags+1) & O_ACCMODE;
+ mode = (flags + 1) & O_ACCMODE;
inode = dentry->d_inode;
if (mode & FMODE_WRITE) {
struct ptlrpc_request *req,
struct lustre_handle *child_lockh)
{
+ struct ptlrpc_request *oldreq = req->rq_export->exp_outstanding_reply;
struct mds_export_data *med = &req->rq_export->exp_mds_data;
struct mds_client_data *mcd = med->med_mcd;
struct mds_obd *mds = mds_req2mds(req);
struct dentry *parent, *child;
struct ldlm_reply *rep;
struct mds_body *body;
- int disp, rc;
+ int rc;
struct list_head *t;
int put_child = 1;
ENTRY;
/* copy rc, transno and disp; steal locks */
req->rq_transno = mcd->mcd_last_transno;
req->rq_status = mcd->mcd_last_result;
- disp = rep->lock_policy_res1 = mcd->mcd_last_data;
+ intent_set_disposition(rep, mcd->mcd_last_data);
- if (req->rq_export->exp_outstanding_reply)
+ if (oldreq)
mds_steal_ack_locks(req->rq_export, req);
- /* We never care about these. */
- disp &= ~(IT_OPEN_LOOKUP | IT_OPEN_POS | IT_OPEN_NEG);
- if (!disp) {
+ /* Only replay if create or open actually happened. */
+ if (!intent_disposition(rep, DISP_OPEN_CREATE | DISP_OPEN_OPEN) ) {
EXIT;
return; /* error looking up parent or child */
}
GOTO(out_dput, 0); /* child not present to open */
}
- /* At this point, we know we have a child, which means that we'll send
- * it back _unless_ it was open failed, _and_ we didn't create the file.
- * I love you guys. No, really.
+ /* At this point, we know we have a child. We'll send
+ * it back _unless_ it not created and open failed.
*/
- if (((disp & (IT_OPEN_OPEN | IT_OPEN_CREATE)) == IT_OPEN_OPEN) &&
+ if (intent_disposition(rep, DISP_OPEN_OPEN) &&
+ !intent_disposition(rep, DISP_OPEN_CREATE) &&
req->rq_status) {
GOTO(out_dput, 0);
}
if (S_ISREG(child->d_inode->i_mode)) {
rc = mds_pack_md(obd, req->rq_repmsg, 2, body,
child->d_inode);
+
if (rc)
LASSERT(rc == req->rq_status);
+
+ /* If we have LOV EA data, the OST holds size, mtime */
+ if (!(body->valid & OBD_MD_FLEASIZE))
+ body->valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS |
+ OBD_MD_FLATIME | OBD_MD_FLMTIME);
} else {
/* XXX need to check this case */
}
/* If we didn't get as far as trying to open, then some locking thing
* probably went wrong, and we'll just bail here.
*/
- if ((disp & IT_OPEN_OPEN) == 0)
+ if (!intent_disposition(rep, DISP_OPEN_OPEN))
GOTO(out_dput, 0);
/* If we failed, then we must have failed opening, so don't look for
mfd = NULL;
list_for_each(t, &med->med_open_head) {
mfd = list_entry(t, struct mds_file_data, mfd_list);
- if (mfd->mfd_xid == req->rq_xid)
+ if (mfd->mfd_xid == req->rq_xid)
break;
mfd = NULL;
}
- if (req->rq_export->exp_outstanding_reply) {
+ if (oldreq) {
/* if we're not recovering, it had better be found */
LASSERT(mfd);
} else if (mfd == NULL) {
EXIT;
}
+int mds_pin(struct ptlrpc_request *req)
+{
+ struct mds_obd *mds = mds_req2mds(req);
+ struct inode *pending_dir = mds->mds_pending_dir->d_inode;
+ struct mds_file_data *mfd = NULL;
+ struct mds_body *body;
+ struct dentry *dchild;
+ struct obd_run_ctxt saved;
+ char fidname[LL_FID_NAMELEN];
+ int fidlen = 0, rc, cleanup_phase = 0, size = sizeof(*body);
+ ENTRY;
+
+ body = lustre_msg_buf(req->rq_reqmsg, 0, sizeof (*body));
+
+ down(&pending_dir->i_sem);
+ fidlen = ll_fid2str(fidname, body->fid1.id, body->fid1.generation);
+ dchild = lookup_one_len(fidname, mds->mds_pending_dir, fidlen);
+ if (IS_ERR(dchild)) {
+ up(&pending_dir->i_sem);
+ rc = PTR_ERR(dchild);
+ CERROR("error looking up %s in PENDING: rc = %d\n",
+ fidname, rc);
+ RETURN(rc);
+ }
+
+ cleanup_phase = 2;
+
+ if (dchild->d_inode) {
+ up(&pending_dir->i_sem);
+ mds_inode_set_orphan(dchild->d_inode);
+ mds_pack_inode2fid(&body->fid1, dchild->d_inode);
+ mds_pack_inode2body(body, dchild->d_inode);
+ GOTO(openit, rc = 0);
+ }
+ dput(dchild);
+ up(&pending_dir->i_sem);
+
+ /* We didn't find it in PENDING so it isn't an orphan. See
+ * if it's a regular inode. */
+ dchild = mds_fid2dentry(mds, &body->fid1, NULL);
+ if (!IS_ERR(dchild)) {
+ mds_pack_inode2fid(&body->fid1, dchild->d_inode);
+ mds_pack_inode2body(body, dchild->d_inode);
+ GOTO(openit, rc = 0);
+ }
+
+ /* We didn't find this inode on disk, but we're trying to pin it.
+ * This should never happen. */
+ CERROR("ENOENT during mds_pin for fid "LPU64"/%u\n", body->fid1.id,
+ body->fid1.generation);
+ RETURN(-ENOENT);
+
+ openit:
+ /* dentry_open does a dput(de) and mntput(mds->mds_vfsmnt) on error */
+ mfd = mds_dentry_open(dchild, mds->mds_vfsmnt, body->flags, req);
+ if (IS_ERR(mfd)) {
+ dchild = NULL; /* prevent a double dput in cleanup phase 2 */
+ GOTO(cleanup, rc = PTR_ERR(mfd));
+ }
+
+ rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
+ if (rc) {
+ CERROR("out of memoryK\n");
+ GOTO(cleanup, rc);
+ }
+ body = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*body));
+
+ cleanup_phase = 4; /* mfd allocated */
+ body->handle.cookie = mfd->mfd_handle.h_cookie;
+ CDEBUG(D_INODE, "mfd %p, cookie "LPX64"\n", mfd,
+ mfd->mfd_handle.h_cookie);
+ GOTO(cleanup, rc = 0);
+
+ cleanup:
+ push_ctxt(&saved, &mds->mds_ctxt, NULL);
+ rc = mds_finish_transno(mds, dchild ? dchild->d_inode : NULL, NULL,
+ req, rc, 0);
+ pop_ctxt(&saved, &mds->mds_ctxt, NULL);
+ /* XXX what do we do here if mds_finish_transno itself failed? */
+ switch (cleanup_phase) {
+ case 4:
+ if (rc)
+ mds_mfd_destroy(mfd);
+ case 2:
+ if (rc || S_ISLNK(dchild->d_inode->i_mode))
+ l_dput(dchild);
+ }
+ return rc;
+}
+
int mds_open(struct mds_update_record *rec, int offset,
struct ptlrpc_request *req, struct lustre_handle *child_lockh)
{
+ /* XXX ALLOCATE _something_ - 464 bytes on stack here */
static const char acc_table [] = {[O_RDONLY] MAY_READ,
[O_WRONLY] MAY_WRITE,
[O_RDWR] MAY_READ | MAY_WRITE};
struct mds_obd *mds = mds_req2mds(req);
struct obd_device *obd = req->rq_export->exp_obd;
- struct ldlm_reply *rep;
- struct mds_body *body;
- struct dentry *dchild = NULL, *parent;
+ struct ldlm_reply *rep = NULL;
+ struct mds_body *body = NULL;
+ struct dentry *dchild = NULL, *parent = NULL;
struct mds_export_data *med;
struct mds_file_data *mfd = NULL;
struct ldlm_res_id child_res_id = { .name = {0} };
struct lustre_handle parent_lockh;
int rc = 0, parent_mode, child_mode = LCK_PR, lock_flags, created = 0;
- int cleanup_phase = 0;
+ int cleanup_phase = 0, acc_mode;
void *handle = NULL;
- int acc_mode;
ENTRY;
- LASSERT(offset == 2); /* only called via intent */
- rep = lustre_msg_buf(req->rq_repmsg, 0, sizeof (*rep));
- body = lustre_msg_buf(req->rq_repmsg, 1, sizeof (*body));
+ if (offset == 2) { /* intent */
+ rep = lustre_msg_buf(req->rq_repmsg, 0, sizeof (*rep));
+ body = lustre_msg_buf(req->rq_repmsg, 1, sizeof (*body));
+ } else if (offset == 0) { /* non-intent reint */
+ body = lustre_msg_buf(req->rq_repmsg, 0, sizeof (*body));
+ } else {
+ body = NULL;
+ LBUG();
+ }
MDS_CHECK_RESENT(req, reconstruct_open(rec, offset, req, child_lockh));
+ /* Step 0: If we are passed a fid, then we assume the client already
+ * opened this file and is only replaying the RPC, so we open the
+ * inode by fid (at some large expense in security).
+ */
+ if (rec->ur_fid2->id) {
+ struct inode *pending_dir = mds->mds_pending_dir->d_inode;
+ char fidname[LL_FID_NAMELEN];
+ int fidlen = 0;
+
+ down(&pending_dir->i_sem);
+ fidlen = ll_fid2str(fidname, rec->ur_fid2->id,
+ rec->ur_fid2->generation);
+ dchild = lookup_one_len(fidname, mds->mds_pending_dir, fidlen);
+ if (IS_ERR(dchild)) {
+ up(&pending_dir->i_sem);
+ rc = PTR_ERR(dchild);
+ CERROR("error looking up %s in PENDING: rc = %d\n",
+ fidname, rc);
+ RETURN(rc);
+ }
+
+ if (dchild->d_inode) {
+ up(&pending_dir->i_sem);
+ mds_inode_set_orphan(dchild->d_inode);
+ mds_pack_inode2fid(&body->fid1, dchild->d_inode);
+ mds_pack_inode2body(body, dchild->d_inode);
+ cleanup_phase = 2;
+ GOTO(openit, rc = 0);
+ }
+ dput(dchild);
+ up(&pending_dir->i_sem);
+
+ /* We didn't find it in PENDING so it isn't an orphan. See
+ * if it was a regular inode that was previously created.
+ */
+ dchild = mds_fid2dentry(mds, rec->ur_fid2, NULL);
+ if (!IS_ERR(dchild)) {
+ mds_pack_inode2fid(&body->fid1, dchild->d_inode);
+ mds_pack_inode2body(body, dchild->d_inode);
+ cleanup_phase = 2;
+ GOTO(openit, rc = 0);
+ }
+
+ /* We didn't find the correct inode on disk either, so we
+ * need to re-create it via a regular replay. Do that below.
+ */
+ LASSERT(rec->ur_flags & O_CREAT);
+ }
+ LASSERT(offset == 2); /* If we got here, we must be called via intent */
+
med = &req->rq_export->exp_mds_data;
- rep->lock_policy_res1 |= IT_OPEN_LOOKUP;
if (OBD_FAIL_CHECK(OBD_FAIL_MDS_OPEN_PACK)) {
CERROR("test case OBD_FAIL_MDS_OPEN_PACK\n");
req->rq_status = -ENOMEM;
if ((rec->ur_flags & O_ACCMODE) >= sizeof (acc_table))
RETURN(-EINVAL);
- acc_mode = acc_table [rec->ur_flags & O_ACCMODE];
+ acc_mode = acc_table[rec->ur_flags & O_ACCMODE];
if ((rec->ur_flags & O_TRUNC) != 0)
acc_mode |= MAY_WRITE;
/* Step 1: Find and lock the parent */
+ intent_set_disposition(rep, DISP_LOOKUP_EXECD);
parent_mode = (rec->ur_flags & O_CREAT) ? LCK_PW : LCK_PR;
parent = mds_fid2locked_dentry(obd, rec->ur_fid1, NULL, parent_mode,
&parent_lockh);
cleanup_phase = 2; /* child dentry */
if (dchild->d_inode)
- rep->lock_policy_res1 |= IT_OPEN_POS;
+ intent_set_disposition(rep, DISP_LOOKUP_POS);
else
- rep->lock_policy_res1 |= IT_OPEN_NEG;
+ intent_set_disposition(rep, DISP_LOOKUP_NEG);
/* Step 3: If the child was negative, and we're supposed to,
* create it. */
if (!dchild->d_inode) {
+ unsigned long ino = rec->ur_fid2->id;
+
if (!(rec->ur_flags & O_CREAT)) {
/* It's negative and we weren't supposed to create it */
GOTO(cleanup, rc = -ENOENT);
}
- rep->lock_policy_res1 |= IT_OPEN_CREATE;
- handle = fsfilt_start(obd, parent->d_inode, FSFILT_OP_CREATE);
+ intent_set_disposition(rep, DISP_OPEN_CREATE);
+ handle = fsfilt_start(obd, parent->d_inode, FSFILT_OP_CREATE,
+ NULL);
if (IS_ERR(handle)) {
rc = PTR_ERR(handle);
handle = NULL;
GOTO(cleanup, rc);
}
+ if (ino)
+ dchild->d_fsdata = (void *)(unsigned long)ino;
+
rc = vfs_create(parent->d_inode, dchild, rec->ur_mode);
- if (rc)
+ if (dchild->d_fsdata == (void *)(unsigned long)ino)
+ dchild->d_fsdata = NULL;
+
+ if (rc) {
+ CDEBUG(D_INODE, "error during create: %d\n", rc);
GOTO(cleanup, rc);
- created = 1;
+ } else {
+ struct iattr iattr;
+ struct inode *inode = dchild->d_inode;
+
+ if (ino) {
+ LASSERT(ino == inode->i_ino);
+ /* Written as part of setattr */
+ inode->i_generation = rec->ur_fid2->generation;
+ CDEBUG(D_HA, "recreated ino %lu with gen %x\n",
+ inode->i_ino, inode->i_generation);
+ }
+
+ created = 1;
+ LTIME_S(iattr.ia_atime) = rec->ur_time;
+ LTIME_S(iattr.ia_ctime) = rec->ur_time;
+ LTIME_S(iattr.ia_mtime) = rec->ur_time;
+
+ iattr.ia_uid = rec->ur_uid;
+ if (parent->d_inode->i_mode & S_ISGID) {
+ iattr.ia_gid = parent->d_inode->i_gid;
+ } else
+ iattr.ia_gid = rec->ur_gid;
+
+ iattr.ia_valid = ATTR_UID | ATTR_GID | ATTR_ATIME |
+ ATTR_MTIME | ATTR_CTIME;
+
+ rc = fsfilt_setattr(obd, dchild, handle, &iattr, 0);
+ if (rc) {
+ CERROR("error on setattr: rc = %d\n", rc);
+ /* XXX should we abort here in case of error? */
+ }
+ }
+
child_mode = LCK_PW;
acc_mode = 0; /* Don't check for permissions */
}
+ LASSERT(!mds_inode_is_orphan(dchild->d_inode));
+
/* Step 4: It's positive, so lock the child */
child_res_id.name[0] = dchild->d_inode->i_ino;
child_res_id.name[1] = dchild->d_inode->i_generation;
reacquire:
lock_flags = 0;
+ /* For the open(O_CREAT) case, this would technically be a lock
+ * inversion (getting a VFS lock after starting a transaction),
+ * but in that case we cannot possibly block on this lock because
+ * we just created the child and also hold a write lock on the
+ * parent, so nobody could be holding the lock yet.
+ */
rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace, NULL,
child_res_id, LDLM_PLAIN, NULL, 0, child_mode,
&lock_flags, ldlm_completion_ast,
/* An append-only file must be opened in append mode for
* writing */
- if (IS_APPEND(dchild->d_inode) &&
- (acc_mode & MAY_WRITE) != 0 &&
+ if (IS_APPEND(dchild->d_inode) && (acc_mode & MAY_WRITE) != 0 &&
((rec->ur_flags & O_APPEND) == 0 ||
(rec->ur_flags & O_TRUNC) != 0))
- GOTO (cleanup, rc = -EPERM);
+ GOTO(cleanup, rc = -EPERM);
rc = mds_pack_md(obd, req->rq_repmsg, 2, body, dchild->d_inode);
if (rc)
GOTO(cleanup, rc);
+
+ /* If we have LOV EA data, the OST holds size, mtime */
+ if (!(body->valid & OBD_MD_FLEASIZE))
+ body->valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS |
+ OBD_MD_FLATIME | OBD_MD_FLMTIME);
}
if (!created && (rec->ur_flags & O_CREAT) &&
GOTO(cleanup, rc = -EEXIST); // returns a lock to the client
}
- /* If we're opening a file without an EA, the client needs a write
- * lock. */
- if (S_ISREG(dchild->d_inode->i_mode) &&
+ /* If we're opening a file without an EA for write, the client needs
+ * a write lock. */
+ if (S_ISREG(dchild->d_inode->i_mode) && (rec->ur_flags & O_ACCMODE) &&
child_mode != LCK_PW && !(body->valid & OBD_MD_FLEASIZE)) {
ldlm_lock_decref(child_lockh, child_mode);
child_mode = LCK_PW;
GOTO(cleanup, rc = -ENOTDIR);
/* Step 5: mds_open it */
- rep->lock_policy_res1 |= IT_OPEN_OPEN;
-
+ intent_set_disposition(rep, DISP_OPEN_OPEN);
+ openit:
/* dentry_open does a dput(de) and mntput(mds->mds_vfsmnt) on error */
mfd = mds_dentry_open(dchild, mds->mds_vfsmnt,
rec->ur_flags & ~(O_DIRECT | O_TRUNC), req);
- if (!mfd) {
- CERROR("mds: out of memory\n");
- dchild = NULL; /* prevent a double dput in step 2 */
- GOTO(cleanup, rc = -ENOMEM);
+ if (IS_ERR(mfd)) {
+ dchild = NULL; /* prevent a double dput in cleanup phase 2 */
+ GOTO(cleanup, rc = PTR_ERR(mfd));
}
cleanup_phase = 4; /* mfd allocated */
cleanup:
rc = mds_finish_transno(mds, dchild ? dchild->d_inode : NULL, handle,
req, rc, rep->lock_policy_res1);
+ /* XXX what do we do here if mds_finish_transno itself failed? */
switch (cleanup_phase) {
case 4:
if (rc && !S_ISLNK(dchild->d_inode->i_mode))
* ldlm_intent_policy: if we found the dentry, or we tried to
* open it (meaning that we created, if it wasn't found), then
* we return the lock to the caller and client. */
- if (!(rep->lock_policy_res1 & (IT_OPEN_OPEN | IT_OPEN_POS)))
+ if (intent_disposition(rep, DISP_LOOKUP_NEG) &&
+ !intent_disposition(rep, DISP_OPEN_OPEN))
ldlm_lock_decref(child_lockh, child_mode);
case 2:
if (rc || S_ISLNK(dchild->d_inode->i_mode))
l_dput(dchild);
case 1:
- l_dput(parent);
- if (rc) {
- ldlm_lock_decref(&parent_lockh, parent_mode);
- } else {
- memcpy(&req->rq_ack_locks[0].lock, &parent_lockh,
- sizeof(parent_lockh));
- req->rq_ack_locks[0].mode = parent_mode;
+ if (parent) {
+ l_dput(parent);
+ if (rc) {
+ ldlm_lock_decref(&parent_lockh, parent_mode);
+ } else {
+ memcpy(&req->rq_ack_locks[0].lock,&parent_lockh,
+ sizeof(parent_lockh));
+ req->rq_ack_locks[0].mode = parent_mode;
+ }
}
}
RETURN(rc);