mainly about -EREMOTE in CMD.
}
rc = mo_attr_get(ctxt, next, la);
+ if (rc == -EREMOTE) {
+ /* This object is located on remote node */
+ req_capsule_set_size(&info->mti_pill, &RMF_MDT_MD,
+ RCL_SERVER, 0);
+#ifdef CONFIG_FS_POSIX_ACL
+ req_capsule_set_size(&info->mti_pill, &RMF_EADATA,
+ RCL_SERVER, 0);
+#endif
+ rc = req_capsule_pack(&info->mti_pill);
+ if (rc == 0) {
+ repbody = req_capsule_server_get(&info->mti_pill,
+ &RMF_MDT_BODY);
+ repbody->fid1 = *mdt_object_fid(o);
+ repbody->valid |= OBD_MD_FLID;
+ }
+ RETURN(rc);
+ }
if (rc){
CERROR("getattr error for "DFID3": %d\n",
PFID3(mdt_object_fid(o)), rc);
/* now, to getattr*/
if (mdt_body_has_lov(la, reqbody)) {
- if (length > 0)
+ if (length > 0) {
rc = mo_xattr_get(info->mti_ctxt, next,
buffer, length, XATTR_NAME_LOV);
- /* else rc = 0; */
-
- if (rc < 0)
- RETURN(rc);
-
- if (S_ISDIR(la->la_mode))
- repbody->valid |= OBD_MD_FLDIREA;
- else
- repbody->valid |= OBD_MD_FLEASIZE;
- repbody->eadatasize = rc;
+ if (rc > 0) {
+ if (S_ISDIR(la->la_mode))
+ repbody->valid |= OBD_MD_FLDIREA;
+ else
+ repbody->valid |= OBD_MD_FLEASIZE;
+ repbody->eadatasize = rc;
+ rc = 0;
+ }
+ }
} else if (S_ISLNK(la->la_mode) &&
(reqbody->valid & OBD_MD_LINKNAME) != 0) {
/* FIXME How to readlink??
rc = mo_xattr_get(info->mti_ctxt, next,
buffer, length, "readlink");
*/ rc = 10;
- if (rc < 0) {
+ if (rc <= 0) {
CERROR("readlink failed: %d\n", rc);
- RETURN(rc);
+ rc = -EFAULT;
} else {
repbody->valid |= OBD_MD_LINKNAME;
repbody->eadatasize = rc + 1;
((char*)buffer)[rc] = 0; /* NULL terminate */
+ rc = 0;
CDEBUG(D_INODE, "symlink dest %s\n", (char*)buffer);
}
}
repbody->valid |= OBD_MD_FLMODEASIZE;
}
+ if (rc != 0)
+ RETURN(rc);
#ifdef CONFIG_FS_POSIX_ACL
if ((req->rq_export->exp_connect_flags & OBD_CONNECT_ACL) &&
(reqbody->valid & OBD_MD_FLACL)) {
length = req_capsule_get_size(&info->mti_pill,
&RMF_EADATA,
RCL_SERVER);
- if (length > 0)
+ if (length > 0) {
rc = mo_xattr_get(info->mti_ctxt, next, buffer,
length, XATTR_NAME_ACL_ACCESS);
- else
- rc = 0;
-
- if (rc < 0) {
- CERROR("got acl size: %d\n", rc);
- RETURN(rc);
+ if (rc < 0) {
+ CERROR("got acl size: %d\n", rc);
+ } else {
+ repbody->aclsize = rc;
+ repbody->valid |= OBD_MD_FLACL;
+ }
}
- repbody->aclsize = rc;
- repbody->valid |= OBD_MD_FLACL;
}
#endif
- RETURN(0);
+ RETURN(rc);
}
static int mdt_getattr(struct mdt_thread_info *info)
RETURN(result);
}
-/* @ Huang Hua
+/*
* UPDATE lock should be taken against parent, and be release before exit;
* child_bits lock should be taken against child, and be returned back:
* (1)normal request should release the child lock;
name = req_capsule_client_get(&info->mti_pill, &RMF_NAME);
if (name == NULL)
RETURN(-EFAULT);
+
+ if (strlen(name) == 0) {
+ /* only open the child. parent is on another node. */
+ child = parent;
+ mdt_lock_handle_init(lhc);
+ lhc->mlh_mode = LCK_CR;
+ result = mdt_object_lock(info, child, lhc, child_bits);
+ if (result != 0) {
+ /* finally, we can get attr for child. */
+ result = mdt_getattr_internal(info, child);
+ if (result != 0)
+ mdt_object_unlock(info, child, lhc);
+ }
+ RETURN(result);
+ }
/*step 1: lock parent */
lhp = &info->mti_lh[MDT_LH_PARENT];
/*step 2: lookup child's fid by name */
result = mdo_lookup(info->mti_ctxt, next, name, child_fid);
- if (result == -EREMOTE) {
- /* This object is located on remote node */
- req_capsule_set_size(&info->mti_pill, &RMF_MDT_MD,
- RCL_SERVER, 0);
-#ifdef CONFIG_FS_POSIX_ACL
- req_capsule_set_size(&info->mti_pill, &RMF_EADATA,
- RCL_SERVER, 0);
-#endif
- result = req_capsule_pack(&info->mti_pill);
- if (result == 0) {
- struct mdt_body *repbody;
- repbody = req_capsule_server_get(&info->mti_pill,
- &RMF_MDT_BODY);
- repbody->fid1 = *child_fid;
- }
- GOTO(out_parent, result);
- }
if (result != 0)
GOTO(out_parent, result);
- /*step 3: find the child object by fid & lock it*/
+ /*
+ *step 3: find the child object by fid & lock it.
+ * regardless if it is local or remote.
+ */
+ mdt_lock_handle_init(lhc);
lhc->mlh_mode = LCK_CR;
child = mdt_object_find_lock(info, child_fid, lhc, child_bits);
if (IS_ERR(child))
ENTRY;
rc = mdt_getattr_name_lock(info, lhc, MDS_INODELOCK_UPDATE);
- if (lustre_handle_is_used(&lhc->mlh_lh))
+ if (lustre_handle_is_used(&lhc->mlh_lh)) {
ldlm_lock_decref(&lhc->mlh_lh, lhc->mlh_mode);
+ lhc->mlh_lh.cookie = 0;
+ }
RETURN(rc);
}
if (rc == 0) {
rc = mdt_object_sync(info);
if (rc == 0) {
- struct md_object *next;
- const struct lu_fid *fid;
+ struct md_object *next;
+ const struct lu_fid *fid;
next = mdt_object_child(info->mti_object);
fid = mdt_object_fid(info->mti_object);
rc = mo_attr_get(info->mti_ctxt,
- next,
- &info->mti_attr);
+ next, &info->mti_attr);
if (rc == 0) {
body = req_capsule_server_get(pill,
&RMF_MDT_BODY);
*/
static int mdt_body_unpack(struct mdt_thread_info *info, __u32 flags)
{
- int result;
- const struct mdt_body *body;
- struct mdt_object *obj;
- const struct lu_context *ctx;
- struct req_capsule *pill;
+ const struct mdt_body *body;
+ struct mdt_object *obj;
+ const struct lu_context *ctx;
+ struct req_capsule *pill;
+ int result;
ctx = info->mti_ctxt;
pill = &info->mti_pill;
flags = h->mh_flags;
LASSERT(ergo(flags & (HABEO_CORPUS | HABEO_REFERO), h->mh_fmt != NULL));
- req_capsule_init(&info->mti_pill,
- req, RCL_SERVER, info->mti_rep_buf_size);
+ req_capsule_init(&info->mti_pill, req, RCL_SERVER, info->mti_rep_buf_size);
if (h->mh_fmt != NULL) {
req_capsule_set(&info->mti_pill, h->mh_fmt);
struct ldlm_lock **lockp,
int flags)
{
- __u64 child_bits;
- struct ldlm_lock *old_lock = *lockp;
- struct ldlm_lock *new_lock = NULL;
- struct ptlrpc_request *req = mdt_info_req(info);
- struct ldlm_reply *ldlm_rep;
- struct mdt_lock_handle tmp_lock;
+ struct ldlm_lock *old_lock = *lockp;
+ struct ldlm_lock *new_lock = NULL;
+ struct ptlrpc_request *req = mdt_info_req(info);
+ struct ldlm_reply *ldlm_rep;
+ struct mdt_lock_handle tmp_lock;
struct mdt_lock_handle *lhc = &tmp_lock;
- int rc;
+ __u64 child_bits;
+ int rc;
ENTRY;
RETURN(-EINVAL);
}
-
- mdt_lock_handle_init(lhc);
rc = mdt_getattr_name_lock(info, lhc, child_bits);
ldlm_rep = req_capsule_server_get(&info->mti_pill, &RMF_DLM_REP);
intent_set_disposition(ldlm_rep, DISP_IT_EXECD);
RETURN(ELDLM_LOCK_REPLACED);
}
+ /* TODO:
+ * These are copied from mds/hander.c, and should be factored into
+ * ldlm module in order to share these code, and be easy for merge.
+ */
+
/* Fixup the lock to be given to the client */
l_lock(&new_lock->l_resource->lr_namespace->ns_lock);
new_lock->l_readers = 0;
rc = mo_attr_get(info->mti_ctxt, mdt_object_child(o),
&info->mti_attr);
+ if (rc == -EREMOTE) {
+ repbody->fid1 = *mdt_object_fid(o);
+ repbody->valid |= OBD_MD_FLID;
+ RETURN(-EREMOTE);
+ }
if (rc != 0)
- GOTO(out, rc);
+ RETURN(rc);
mdt_pack_attr2body(repbody, &info->mti_attr, mdt_object_fid(o));
rc = mo_xattr_get(info->mti_ctxt, mdt_object_child(o),
lmm, info->mti_mdt->mdt_max_mdsize, "lov");
if (rc < 0)
- GOTO(out, rc = -EINVAL);
+ RETURN(-EINVAL);
if (S_ISDIR(info->mti_attr.la_mode))
repbody->valid |= OBD_MD_FLDIREA;
else
mfd = mdt_mfd_new();
if (mfd == NULL) {
CERROR("mds: out of memory\n");
- GOTO(out, rc = -ENOMEM);
+ RETURN(-ENOMEM);
}
if (flags & FMODE_WRITE) {
repbody->handle.cookie = mfd->mfd_handle.h_cookie;
RETURN(rc);
-out:
- return rc;
}
-int mdt_pin(struct mdt_thread_info* info)
+int mdt_open_by_fid(struct mdt_thread_info* info, const struct lu_fid *fid,
+ __u32 flags, struct ldlm_reply *rep)
{
struct mdt_object *o;
int rc;
ENTRY;
- o = mdt_object_find(info->mti_ctxt, info->mti_mdt, &info->mti_body->fid1);
+ o = mdt_object_find(info->mti_ctxt, info->mti_mdt, fid);
if (!IS_ERR(o)) {
if (mdt_object_exists(info->mti_ctxt, &o->mot_obj.mo_lu)) {
- rc = mdt_object_open(info, o, info->mti_body->flags);
+ intent_set_disposition(rep, DISP_LOOKUP_EXECD);
+ intent_set_disposition(rep, DISP_LOOKUP_POS);
+ rc = mdt_object_open(info, o, flags);
+ intent_set_disposition(rep, DISP_OPEN_OPEN);
mdt_object_put(info->mti_ctxt, o);
- } else
+ } else {
+ intent_set_disposition(rep, DISP_LOOKUP_EXECD);
+ intent_set_disposition(rep, DISP_LOOKUP_NEG);
rc = -ENOENT;
+ }
} else
rc = PTR_ERR(o);
RETURN(rc);
}
+int mdt_pin(struct mdt_thread_info* info)
+{
+ int rc;
+ ENTRY;
+ rc = mdt_open_by_fid(info, &info->mti_body->fid1,
+ info->mti_body->flags, NULL);
+ RETURN(rc);
+}
+
/* Get an internal lock on the inode number (but not generation) to sync
* new inode creation with inode unlink (bug 2029). If child_lockh is NULL
* we just get the lock as a barrier to wait for other holders of this lock,
LASSERT(info->mti_pill.rc_fmt == &RQF_LDLM_INTENT_OPEN);
/*TODO: MDS_CHECK_RESENT */;
-
ldlm_rep = req_capsule_server_get(&info->mti_pill, &RMF_DLM_REP);
body = req_capsule_server_get(&info->mti_pill, &RMF_MDT_BODY);
+ if (strlen(rr->rr_name) == 0) {
+ /* partial remote open */
+ RETURN(mdt_open_by_fid(info, rr->rr_fid1,
+ info->mti_attr.la_flags, ldlm_rep));
+ }
+
+ intent_set_disposition(ldlm_rep, DISP_LOOKUP_EXECD);
lh = &info->mti_lh[MDT_LH_PARENT];
lh->mlh_mode = LCK_PW;
- parent = mdt_object_find_lock(info, rr->rr_fid1,
- lh, MDS_INODELOCK_UPDATE);
+ parent = mdt_object_find_lock(info, rr->rr_fid1, lh,
+ MDS_INODELOCK_UPDATE);
if (IS_ERR(parent)) {
- intent_set_disposition(ldlm_rep, DISP_LOOKUP_EXECD);
+ /* FIXME: just simulate child not exist */
intent_set_disposition(ldlm_rep, DISP_LOOKUP_NEG);
GOTO(out, result = PTR_ERR(parent));
}
- intent_set_disposition(ldlm_rep, DISP_LOOKUP_EXECD);
result = mdo_lookup(info->mti_ctxt, mdt_object_child(parent),
rr->rr_name, child_fid);
- if (result && result != -ENOENT) {
- if (result == -EREMOTE) {
- /* FIXME: POS or NEG? */
- intent_set_disposition(ldlm_rep, DISP_LOOKUP_POS);
- body->fid1 = *child_fid;
- body->valid |= OBD_MD_FLID;
- }
+ if (result != 0 && result != -ENOENT) {
GOTO(out_parent, result);
}
if (info->mti_attr.la_flags & MDS_OPEN_EXCL &&
info->mti_attr.la_flags & MDS_OPEN_CREAT)
GOTO(out_parent, result = -EEXIST);
-
+ /* child_fid is filled by mdo_lookup(). */
+ LASSERT(lu_fid_eq(child_fid, info->mti_rr.rr_fid2));
}
child = mdt_object_find(info->mti_ctxt, mdt, child_fid);
GOTO(destroy_child, result);
destroy_child:
- if (result != 0 && created) {
+ if (created && result != 0 && result != -EREMOTE) {
mdo_unlink(info->mti_ctxt, mdt_object_child(parent),
mdt_object_child(child), rr->rr_name);
} else if (created) {
+ /* barrier with other thread */
mdt_lock_new_child(info, child, NULL);
}
out_child:
class_handle_unhash(&mfd->mfd_handle);
list_del_init(&mfd->mfd_list);
spin_unlock(&med->med_open_lock);
-
/* mdt_handle2mfd increase reference count, we must drop it here */
mdt_mfd_put(mfd);
(unsigned int)attr->la_valid);
/* MDS_CHECK_RESENT */
+ lh = &info->mti_lh[MDT_LH_PARENT];
+ lh->mlh_mode = LCK_EX;
if (attr->la_valid & ATTR_FROM_OPEN) {
mo = mdt_object_find(info->mti_ctxt, info->mti_mdt,
rr->rr_fid1);
- if (IS_ERR(mo))
- RETURN(rc = PTR_ERR(mo));
} else {
__u64 lockpart = MDS_INODELOCK_UPDATE;
if (attr->la_valid & (ATTR_MODE|ATTR_UID|ATTR_GID))
lockpart |= MDS_INODELOCK_LOOKUP;
-
- lh = &info->mti_lh[MDT_LH_PARENT];
- lh->mlh_mode = LCK_EX;
mo = mdt_object_find_lock(info, rr->rr_fid1, lh, lockpart);
-
- if (IS_ERR(mo))
- RETURN(rc = PTR_ERR(mo));
}
+ if (IS_ERR(mo))
+ RETURN(rc = PTR_ERR(mo));
next = mdt_object_child(mo);
if (lu_object_exists(info->mti_ctxt, &mo->mot_obj.mo_lu) <= 0)
rc = mo_xattr_set(info->mti_ctxt, next,
rr->rr_eadata, rr->rr_eadatalen,
XATTR_NAME_LOV);
- if (rc)
- GOTO(out_unlock, rc);
/* FIXME & TODO Please deal with logcookies here*/
GOTO(out_unlock, rc);
ENTRY;
switch (info->mti_attr.la_mode & S_IFMT) {
+ case S_IFREG:
case S_IFDIR:{
if (strlen(info->mti_rr.rr_name) > 0)
rc = mdt_md_create(info);
rc = mdt_md_mkobj(info);
break;
}
- case S_IFREG:
case S_IFLNK:
case S_IFCHR:
case S_IFBLK:
case S_IFIFO:
case S_IFSOCK:{
+ /* special file should stay on the same node as parent */
+ LASSERT(strlen(info->mti_rr.rr_name) > 0);
+
rc = mdt_md_create(info);
break;
}
}
repbody = req_capsule_server_get(&info->mti_pill, &RMF_MDT_BODY);
+
/*step 2: find & lock the child */
lhc = &info->mti_lh[MDT_LH_CHILD];
lhc->mlh_mode = LCK_EX;
rc = mdo_lookup(info->mti_ctxt, mdt_object_child(mp),
rr->rr_name, child_fid);
- if (rc == -EREMOTE) {
- repbody->fid1 = *child_fid;
- repbody->valid |= OBD_MD_FLID;
- }
if (rc != 0)
GOTO(out_unlock_parent, rc);
+ /* we will lock the child regardless it is local or remote. No harm. */
mc = mdt_object_find_lock(info, child_fid, lhc, MDS_INODELOCK_FULL);
if (IS_ERR(mc))
GOTO(out_unlock_parent, rc = PTR_ERR(mc));
-
/*step 3: do some checking*/
if (req->rq_export->exp_connect_flags & OBD_CONNECT_RDONLY)
GOTO(out_unlock_child, rc = -EROFS);
/* step 4: delete it */
+ /* cmm will take care if child is local or remote */
rc = mdo_unlink(info->mti_ctxt, mdt_object_child(mp),
mdt_object_child(mc), rr->rr_name);
{
struct mdt_reint_record *rr = &info->mti_rr;
struct ptlrpc_request *req = mdt_info_req(info);
+ struct mdt_object *mp;
struct mdt_object *ms;
- struct mdt_object *mt;
+ struct mdt_lock_handle *lhp;
struct mdt_lock_handle *lhs;
- struct mdt_lock_handle *lht;
int rc;
ENTRY;
/* MDS_CHECK_RESENT here */
/* step 1: lock the source */
- lhs = &info->mti_lh[MDT_LH_PARENT];
- lhs->mlh_mode = LCK_EX;
- ms = mdt_object_find_lock(info, rr->rr_fid1, lhs, MDS_INODELOCK_UPDATE);
- if (IS_ERR(ms))
- RETURN(PTR_ERR(ms));
+ lhp = &info->mti_lh[MDT_LH_PARENT];
+ lhp->mlh_mode = LCK_EX;
+ mp = mdt_object_find_lock(info, rr->rr_fid1, lhp, MDS_INODELOCK_UPDATE);
+ if (IS_ERR(mp))
+ RETURN(PTR_ERR(mp));
if (strlen(rr->rr_name) == 0) {
if (req->rq_export->exp_connect_flags & OBD_CONNECT_RDONLY)
GOTO(out_unlock_source, rc = -EROFS);
/* remote partial operation */
- rc = mo_ref_add(info->mti_ctxt, mdt_object_child(ms));
+ rc = mo_ref_add(info->mti_ctxt, mdt_object_child(mp));
GOTO(out_unlock_source, rc);
}
/*step 2: find & lock the target */
- lht = &info->mti_lh[MDT_LH_CHILD];
- lht->mlh_mode = LCK_EX;
- mt = mdt_object_find_lock(info, rr->rr_fid2, lht, MDS_INODELOCK_UPDATE);
- if (IS_ERR(mt))
- GOTO(out_unlock_source, rc = PTR_ERR(mt));
+ lhs = &info->mti_lh[MDT_LH_CHILD];
+ lhs->mlh_mode = LCK_EX;
+ ms = mdt_object_find_lock(info, rr->rr_fid2, lhs, MDS_INODELOCK_UPDATE);
+ if (IS_ERR(ms))
+ GOTO(out_unlock_source, rc = PTR_ERR(ms));
if (req->rq_export->exp_connect_flags & OBD_CONNECT_RDONLY)
GOTO(out_unlock_target, rc = -EROFS);
/* step 4: link it */
- rc = mdo_link(info->mti_ctxt, mdt_object_child(mt),
+ rc = mdo_link(info->mti_ctxt, mdt_object_child(mp),
mdt_object_child(ms), rr->rr_name);
GOTO(out_unlock_target, rc);
out_unlock_target:
- mdt_object_unlock_put(info, mt, lht);
-out_unlock_source:
mdt_object_unlock_put(info, ms, lhs);
+out_unlock_source:
+ mdt_object_unlock_put(info, mp, lhp);
return rc;
}
/*step 2: find & lock the target object if exists*/
rc = mdo_lookup(info->mti_ctxt, mdt_object_child(mtgtdir),
rr->rr_tgt, tgt_fid);
- if (rc && rc != -ENOENT)
+ if (rc != 0 && rc != -ENOENT)
GOTO(out_unlock_tgtdir, rc);
if (req->rq_export->exp_connect_flags & OBD_CONNECT_RDONLY)
/*step 3: find & lock the old object*/
rc = mdo_lookup(info->mti_ctxt, mdt_object_child(msrcdir),
rr->rr_name, old_fid);
- if (rc)
+ if (rc != 0)
GOTO(out_unlock_target, rc);
lh_oldp = &info->mti_lh[MDT_LH_OLD];
/* new target object may not exist now */
rc = mdo_lookup(info->mti_ctxt, mdt_object_child(mtgtdir),
rr->rr_tgt, new_fid);
- if (rc && rc != -ENOENT)
+ if (rc != 0 && rc != -ENOENT)
GOTO(out_unlock_old, rc);
if (rc == 0) {
mdt_object_child(mtgtdir), old_fid,
rr->rr_name, mnew ? mdt_object_child(mnew): NULL,
rr->rr_tgt);
- if (rc)
- GOTO(out_unlock_new, rc);
-
- /* step 7: TODO:check old object */
- /* if (mold is orphan)
- */
+ GOTO(out_unlock_new, rc);
out_unlock_new:
if (mnew) {