extern struct dentry_operations ll_d_ops;
-int ll_lock(struct inode *dir, struct dentry *dentry,
- struct lookup_intent *it, struct lustre_handle *lockh)
-{
- struct ll_sb_info *sbi = ll_i2sbi(dir);
- char *tgt = NULL;
- int tgtlen = 0;
- int err, lock_mode;
-
- /* CREAT needs to be tested before open (both could be set) */
- if ((it->it_op & (IT_CREAT | IT_MKDIR | IT_SETATTR | IT_MKNOD))) {
- lock_mode = LCK_PW;
- } else if (it->it_op & (IT_READDIR | IT_GETATTR | IT_OPEN | IT_UNLINK |
- IT_RMDIR | IT_RENAME | IT_RENAME2 | IT_READLINK|
- IT_LINK | IT_LINK2 | IT_LOOKUP)) {
- /* XXXphil PW for LINK2/RENAME2? */
- lock_mode = LCK_PR;
- } else if (it->it_op & IT_SYMLINK) {
- lock_mode = LCK_PW;
- tgt = it->it_data;
- tgtlen = strlen(tgt);
- it->it_data = NULL;
- } else {
- LBUG();
- RETURN(-EINVAL);
- }
-
- err = mdc_enqueue(&sbi->ll_mdc_conn, LDLM_MDSINTENT, it, lock_mode,
- dir, dentry, lockh, tgt, tgtlen, dir, sizeof(*dir));
-
- RETURN(err);
-}
-
int ll_unlock(__u32 mode, struct lustre_handle *lockh)
{
ENTRY;
}
#endif
-static struct dentry *ll_lookup2(struct inode *dir, struct dentry *dentry,
- struct lookup_intent *it)
+static int ll_intent_to_lock_mode(struct lookup_intent *it)
{
- struct ptlrpc_request *request = NULL;
- struct inode * inode = NULL;
- struct ll_sb_info *sbi = ll_i2sbi(dir);
- struct ll_read_inode2_cookie lic;
+ /* CREAT needs to be tested before open (both could be set) */
+ if ((it->it_op & (IT_CREAT | IT_MKDIR | IT_SETATTR | IT_MKNOD))) {
+ return LCK_PW;
+ } else if (it->it_op & (IT_READDIR | IT_GETATTR | IT_OPEN | IT_UNLINK |
+ IT_RMDIR | IT_RENAME | IT_RENAME2 | IT_READLINK|
+ IT_LINK | IT_LINK2 | IT_LOOKUP | IT_SYMLINK)) {
+ return LCK_PW;
+ }
+
+ LBUG();
+ RETURN(-EINVAL);
+}
+
+#define LL_LOOKUP_POSITIVE 1
+#define LL_LOOKUP_NEGATIVE 2
+
+typedef int (*intent_finish_cb)(int flag, struct ptlrpc_request *,
+ struct dentry *, struct lookup_intent *,
+ int offset, obd_id ino);
+
+int ll_intent_lock(struct inode *parent, struct dentry *dentry,
+ struct lookup_intent *it,
+ intent_finish_cb intent_finish)
+{
+ struct ll_sb_info *sbi = ll_i2sbi(parent);
struct lustre_handle lockh;
- struct lookup_intent lookup_it = { IT_LOOKUP };
- int rc, offset;
+ struct lookup_intent lookup_it = { .it_op = IT_LOOKUP };
+ struct ptlrpc_request *request = NULL;
+ char *tgt = NULL;
+ int rc, lock_mode, tgtlen = 0, offset, flag = LL_LOOKUP_POSITIVE;
obd_id ino = 0;
ENTRY;
dentry->d_name.name, ldlm_it2str(it->it_op));
if (dentry->d_name.len > EXT2_NAME_LEN)
- RETURN(ERR_PTR(-ENAMETOOLONG));
+ RETURN(-ENAMETOOLONG);
- rc = ll_lock(dir, dentry, it, &lockh);
+ lock_mode = ll_intent_to_lock_mode(it);
+ if (it->it_op & IT_SYMLINK) {
+ tgt = it->it_data;
+ tgtlen = strlen(tgt);
+ it->it_data = NULL;
+ }
+
+ rc = mdc_enqueue(&sbi->ll_mdc_conn, LDLM_MDSINTENT, it, lock_mode,
+ parent, dentry, &lockh, tgt, tgtlen, parent,
+ sizeof(*parent));
if (rc < 0)
- RETURN(ERR_PTR(rc));
+ RETURN(rc);
memcpy(it->it_lock_handle, &lockh, sizeof(lockh));
request = (struct ptlrpc_request *)it->it_data;
if (it->it_disposition) {
+ struct mds_body *mds_body;
int mode, symlen = 0;
obd_flag valid;
+ /* it_disposition == 1 indicates that the server performed the
+ * intent on our behalf. This long block is all about fixing up
+ * the local state so that it is correct as of the moment
+ * _before_ the operation was applied; that way, the VFS will
+ * think that everything is normal and call Lustre's regular
+ * FS function.
+ *
+ * If we're performing a creation, that means that unless the
+ * creation failed with EEXIST, we should fake up a negative
+ * dentry. Likewise for the target of a hard link.
+ *
+ * For everything else, we want to lookup to succeed. */
+
+ /* One additional note: we add an extra reference to the request
+ * because we need to keep it around until ll_create gets
+ * called. For anything else which results in
+ * LL_LOOKUP_POSITIVE, we can do the iget() immediately with the
+ * contents of the reply (in the intent_finish callback). In
+ * the create case, however, we need to wait until
+ * ll_create_node to do the iget() or the VFS will abort with
+ * -EEXISTS. */
+
offset = 1;
- lic.lic_body = lustre_msg_buf(request->rq_repmsg, offset);
- ino = lic.lic_body->fid1.id;
- mode = lic.lic_body->mode;
+ mds_body = lustre_msg_buf(request->rq_repmsg, offset);
+ ino = mds_body->fid1.id;
+ mode = mds_body->mode;
if (it->it_op & (IT_CREAT | IT_MKDIR | IT_SYMLINK | IT_MKNOD)) {
- mdc_store_create_replay_data(request, dir->i_sb);
+ mdc_store_create_replay_data(request, parent->i_sb);
/* For create ops, we want the lookup to be negative,
* unless the create failed in a way that indicates
* that the file is already there */
- if (it->it_status != -EEXIST)
- GOTO(negative, NULL);
+ if (it->it_status != -EEXIST) {
+ atomic_inc(&request->rq_refcount);
+ GOTO(out, flag = LL_LOOKUP_NEGATIVE);
+ }
} else if (it->it_op & (IT_GETATTR | IT_SETATTR | IT_LOOKUP |
IT_READLINK)) {
/* For check ops, we want the lookup to succeed */
it->it_data = NULL;
if (it->it_status)
- GOTO(neg_req, NULL);
+ GOTO(out, flag = LL_LOOKUP_NEGATIVE);
} else if (it->it_op & (IT_RENAME | IT_LINK)) {
/* For rename, we want the source lookup to succeed */
if (it->it_status) {
* the file truly doesn't exist */
it->it_data = NULL;
if (it->it_status == -ENOENT)
- GOTO(neg_req, NULL);
- goto iget;
+ GOTO(out, flag = LL_LOOKUP_NEGATIVE);
+ GOTO(out, flag = LL_LOOKUP_POSITIVE);
} else if (it->it_op == IT_OPEN) {
it->it_data = NULL;
if (it->it_status && it->it_status != -EEXIST)
- GOTO(neg_req, NULL);
+ GOTO(out, flag = LL_LOOKUP_NEGATIVE);
} else if (it->it_op & (IT_RENAME2 | IT_LINK2)) {
+ struct mds_body *body =
+ lustre_msg_buf(request->rq_repmsg, offset);
it->it_data = NULL;
/* This means the target lookup is negative */
- if (lic.lic_body->valid == 0)
- GOTO(neg_req, NULL);
- goto iget;
+ if (body->valid == 0)
+ GOTO(out, flag = LL_LOOKUP_NEGATIVE);
+ GOTO(out, flag = LL_LOOKUP_POSITIVE);
}
/* Do a getattr now that we have the lock */
valid = OBD_MD_FLNOTOBD | OBD_MD_FLEASIZE;
if (it->it_op == IT_READLINK) {
valid |= OBD_MD_LINKNAME;
- symlen = lic.lic_body->size;
+ symlen = mds_body->size;
}
ptlrpc_req_finished(request);
request = NULL;
}
offset = 0;
} else {
- struct ll_inode_info *lli = ll_i2info(dir);
+ struct ll_inode_info *lli = ll_i2info(parent);
int mode;
+ /* it_disposition == 0 indicates that it just did a simple lock
+ * request, for which we are very thankful. move along with
+ * the local lookup then. */
+
memcpy(&lli->lli_intent_lock_handle, &lockh, sizeof(lockh));
offset = 0;
- ino = ll_inode_by_name(dir, dentry, &mode);
+ ino = ll_inode_by_name(parent, dentry, &mode);
if (!ino) {
CERROR("inode %*s not found by name\n",
dentry->d_name.len, dentry->d_name.name);
}
}
- iget:
- lic.lic_body = lustre_msg_buf(request->rq_repmsg, offset);
- if (S_ISREG(lic.lic_body->mode) &&
- lic.lic_body->valid & OBD_MD_FLEASIZE) {
- LASSERT(request->rq_repmsg->bufcount > offset);
- lic.lic_lmm = lustre_msg_buf(request->rq_repmsg, offset + 1);
- } else
- lic.lic_lmm = NULL;
+ EXIT;
+ out:
+ if (intent_finish != NULL)
+ rc = intent_finish(flag, request, dentry, it, offset, ino);
+ else
+ ptlrpc_req_finished(request);
- /* No rpc's happen during iget4, -ENOMEM's are possible */
- LASSERT(ino != 0);
- inode = ll_iget(dir->i_sb, ino, &lic);
- if (!inode) {
- ptlrpc_free_req(request);
+ if (it->it_op == IT_LOOKUP || rc < 0)
ll_intent_release(dentry, it);
- RETURN(ERR_PTR(-ENOMEM));
+
+ return rc;
+
+ drop_req:
+ ptlrpc_free_req(request);
+ drop_lock:
+#warning FIXME: must release lock here
+ return rc;
+}
+
+static int lookup2_finish(int flag, struct ptlrpc_request *request,
+ struct dentry *dentry, struct lookup_intent *it,
+ int offset, obd_id ino)
+{
+ struct inode *inode = NULL;
+ struct ll_read_inode2_cookie lic;
+
+ if (flag == LL_LOOKUP_POSITIVE) {
+ ENTRY;
+ lic.lic_body = lustre_msg_buf(request->rq_repmsg, offset);
+
+ if (S_ISREG(lic.lic_body->mode) &&
+ lic.lic_body->valid & OBD_MD_FLEASIZE) {
+ LASSERT(request->rq_repmsg->bufcount > offset);
+ lic.lic_lmm = lustre_msg_buf(request->rq_repmsg,
+ offset + 1);
+ } else {
+ lic.lic_lmm = NULL;
+ }
+
+ /* No rpc's happen during iget4, -ENOMEM's are possible */
+ LASSERT(ino != 0);
+ inode = ll_iget(dentry->d_sb, ino, &lic);
+ if (!inode) {
+ /* XXX make sure that request is freed in this case;
+ * I think it is, but double-check refcounts. -phil */
+ RETURN(-ENOMEM);
+ }
+ EXIT;
+ } else {
+ ENTRY;
}
- EXIT;
- neg_req:
ptlrpc_req_finished(request);
- negative:
+
dentry->d_op = &ll_d_ops;
if (ll_d2d(dentry) == NULL) {
ll_set_dd(dentry);
it->it_status);
}
- if (it->it_op == IT_LOOKUP)
- ll_intent_release(dentry, it);
+ RETURN(0);
+}
- return NULL;
+static struct dentry *ll_lookup2(struct inode *parent, struct dentry *dentry,
+ struct lookup_intent *it)
+{
+ int rc;
- drop_req:
- ptlrpc_free_req(request);
- drop_lock:
-#warning FIXME: must release lock here
- return ERR_PTR(rc);
+ rc = ll_intent_lock(parent, dentry, it, lookup2_finish);
+ if (rc < 0) {
+ CERROR("ll_intent_lock: %d\n", rc);
+ return ERR_PTR(rc);
+ }
+
+ return 0;
}
static struct inode *ll_create_node(struct inode *dir, const char *name,
<!DOCTYPE lustre>
<lustre>
<ldlm name='ldlm' uuid='ldlm_UUID'/>
- <node uuid='localhost_UUID' name='localhost'>
+ <node name='localhost' uuid='localhost_UUID'>
<profile>
<ldlm_ref uuidref='ldlm_UUID'/>
<network_ref uuidref='NET_localhost_tcp_UUID'/>
<ost_ref uuidref='OST_localhost_2_UUID'/>
<mountpoint_ref uuidref='MNT_localhost_UUID'/>
</profile>
- <network type='tcp' uuid='NET_localhost_tcp_UUID' name='NET_localhost_tcp'>
+ <network name='NET_localhost_tcp' uuid='NET_localhost_tcp_UUID' type='tcp'>
<server>localhost</server>
<port>988</port>
</network>
<network_ref uuidref='NET_localhost_tcp_UUID'/>
<node_ref uuidref='localhost_UUID'/>
</mds>
- <lov uuid='lov1_UUID' name='lov1'>
+ <lov name='lov1' uuid='lov1_UUID'>
<mds_ref uuidref='mds1_UUID'/>
- <devices pattern='0' stripesize='65536' stripecount='0'>
+ <devices stripecount='0' stripesize='65536' pattern='0'>
<osc_ref uuidref='OSC_localhost_UUID'/>
<osc_ref uuidref='OSC_localhost_2_UUID'/>
</devices>
</lov>
- <lovconfig uuid='LVCFG_lov1_UUID' name='LVCFG_lov1'>
+ <lovconfig name='LVCFG_lov1' uuid='LVCFG_lov1_UUID'>
<lov_ref uuidref='lov1_UUID'/>
</lovconfig>
- <obd type='obdfilter' uuid='OBD_localhost_UUID' name='OBD_localhost'>
+ <obd uuid='OBD_localhost_UUID' name='OBD_localhost' type='obdfilter'>
<fstype>extN</fstype>
<device size='100000'>/tmp/ost1</device>
<autoformat>no</autoformat>
<network_ref uuidref='NET_localhost_tcp_UUID'/>
<obd_ref uuidref='OBD_localhost_UUID'/>
</ost>
- <obd type='obdfilter' uuid='OBD_localhost_2_UUID' name='OBD_localhost_2'>
+ <obd name='OBD_localhost_2' uuid='OBD_localhost_2_UUID' type='obdfilter'>
<fstype>extN</fstype>
<device size='100000'>/tmp/ost2</device>
<autoformat>no</autoformat>
</obd>
- <osc uuid='OSC_localhost_2_UUID' name='OSC_localhost_2'>
+ <osc name='OSC_localhost_2' uuid='OSC_localhost_2_UUID'>
<ost_ref uuidref='OST_localhost_2_UUID'/>
<obd_ref uuidref='OBD_localhost_2_UUID'/>
</osc>
- <ost uuid='OST_localhost_2_UUID' name='OST_localhost_2'>
+ <ost name='OST_localhost_2' uuid='OST_localhost_2_UUID'>
<network_ref uuidref='NET_localhost_tcp_UUID'/>
<obd_ref uuidref='OBD_localhost_2_UUID'/>
</ost>