void class_exit_uuidlist(void);
/* mea.c */
-int mea_name2idx(struct mea *mea, char *name, int namelen);
+int mea_name2idx(struct mea *, char *, int);
+int raw_name2idx(int, const char *, int);
#endif /* __LINUX_OBD_CLASS_H */
}
if (reply->lock_desc.l_resource.lr_name.name[0] !=
- lock->l_resource->lr_name.name[0]) {
+ lock->l_resource->lr_name.name[0] ||
+ reply->lock_desc.l_resource.lr_name.name[1] !=
+ lock->l_resource->lr_name.name[1]) {
CDEBUG(D_INFO, "remote intent success, locking %ld "
"instead of %ld\n",
(long)reply->lock_desc.l_resource.lr_name.name[0],
struct lustre_handle plock;
int pmode;
+ if (it->it_op == IT_LOOKUP) {
+ /* unfortunately, we have to lie to MDC/MDS to
+ * retrieve attributes llite needs */
+ it->it_op = IT_GETATTR;
+ }
+
/* we got LOOKUP lock, but we really need attrs */
pmode = it->d.lustre.it_lock_mode;
if (pmode) {
rc = lmv_revalidate_slaves(exp, reqp, cfid,
it, 1, cb_blocking);
} else if (S_ISDIR(body->mode)) {
- CWARN("hmmm, %lu/%lu/%lu has not lmv obj?!\n",
+ /*CWARN("hmmm, %lu/%lu/%lu has not lmv obj?!\n",
(unsigned long) cfid->mds,
(unsigned long) cfid->id,
- (unsigned long) cfid->generation);
+ (unsigned long) cfid->generation);*/
}
lmv_put_obj(obj);
RETURN(rc);
* cfid != NULL specifies revalidation */
if (cfid) {
- /* this is revalidation during revalidation it's
- * enough to return 1 if we think attrs are uptodate
- * it may return updated attrs, though */
- mds = cfid->mds;
+ /* this is revalidation: we have to check is LOOKUP
+ * lock still valid for given fid. very important
+ * part is that we have to choose right mds because
+ * namespace is per mds */
+ rpfid = *pfid;
+ obj = lmv_grab_obj(obd, pfid, 0);
+ if (obj) {
+ mds = raw_name2idx(obj->objcount, (char *) name, len);
+ rpfid = obj->objs[mds].fid;
+ lmv_put_obj(obj);
+ }
+ mds = rpfid.mds;
+ CDEBUG(D_OTHER, "revalidate lookup for %lu/%lu/%lu to %d MDS\n",
+ (unsigned long) cfid->mds,
+ (unsigned long) cfid->id,
+ (unsigned long) cfid->generation, mds);
rc = md_intent_lock(lmv->tgts[mds].exp, uctxt, pfid, name,
len, lmm, lmmsize, cfid, it, flags,
reqp, cb_blocking);
- CDEBUG(D_OTHER, "revalidate lookup for %lu/%lu/%lu = %d\n",
- (unsigned long) cfid->mds,
- (unsigned long) cfid->id,
- (unsigned long) cfid->generation, rc);
RETURN(rc);
}
#else
#include <liblustre.h>
#endif
+#include <linux/ext2_fs.h>
#include <linux/obd_support.h>
#include <linux/lustre_lib.h>
struct lmv_obd *lmv = &obd->u.lmv;
struct mea *mea = op_data->mea1;
struct mds_body *mds_body;
- int rc, i, free_mea = 0;
+ int rc, i, mds, free_mea = 0;
+ struct lmv_obj *obj;
ENTRY;
lmv_connect(obd);
/* TODO: where to create new directories?
* but we lookup by name may forward any request in slave
*/
repeat:
- i = mea_name2idx(mea, (char *) op_data->name, op_data->namelen);
- if (mea)
- op_data->fid1 = mea->mea_fids[i];
+ obj = lmv_grab_obj(obd, &op_data->fid1, 0);
+ if (obj) {
+ mds = raw_name2idx(obj->objcount, op_data->name,
+ op_data->namelen - 1);
+ op_data->fid1 = obj->objs[mds].fid;
+ lmv_put_obj(obj);
+ }
CDEBUG(D_OTHER, "CREATE '%*s' on %lu/%lu/%lu (mea 0x%p)\n",
op_data->namelen, op_data->name,
(unsigned long) op_data->fid1.mds,
(unsigned long) op_data->fid1.id,
(unsigned long) op_data->fid1.generation, mea);
- rc = md_create(lmv->tgts[i].exp, op_data, data, datalen,
- mode, uid, gid, rdev, request);
+ rc = md_create(lmv->tgts[op_data->fid1.mds].exp, op_data, data,
+ datalen, mode, uid, gid, rdev, request);
if (rc == 0) {
if (*request == NULL)
RETURN(rc);
LASSERT(mds_body != NULL);
CDEBUG(D_OTHER, "created. id = %lu, generation = %lu, mds = %d\n",
(unsigned long) mds_body->fid1.id,
- (unsigned long) mds_body->fid1.generation, i);
- LASSERT(mds_body->mds == i);
+ (unsigned long) mds_body->fid1.generation,
+ op_data->fid1.mds);
+ LASSERT(mds_body->valid & OBD_MD_MDS ||
+ mds_body->mds == op_data->fid1.mds);
} else if (rc == -ESTALE) {
struct ptlrpc_request *req = NULL;
struct lustre_md md;
int mealen;
+ LBUG(); /* FIXME ASAP */
CDEBUG(D_OTHER, "it seems MDS splitted dir\n");
LASSERT(mea == NULL);
{
struct obd_device *obd = exp->exp_obd;
struct lmv_obd *lmv = &obd->u.lmv;
- struct mea *mea = data->mea2;
- int rc, i;
+ struct lmv_obj *obj;
+ int rc;
ENTRY;
lmv_connect(obd);
if (data->namelen != 0) {
/* usual link request */
- i = mea_name2idx(mea, (char *) data->name, data->namelen);
- if (mea)
- data->fid2 = mea->mea_fids[i];
- CDEBUG(D_OTHER,"link %u/%u/%u:%*s to %u/%u/%u mds %d mea %p\n",
+ obj = lmv_grab_obj(obd, &data->fid1, 0);
+ if (obj) {
+ rc = raw_name2idx(obj->objcount, data->name,
+ data->namelen);
+ data->fid1 = obj->objs[rc].fid;
+ lmv_put_obj(obj);
+ }
+ CDEBUG(D_OTHER,"link %u/%u/%u:%*s to %u/%u/%u mds %d\n",
(unsigned) data->fid2.mds, (unsigned) data->fid2.id,
(unsigned) data->fid2.generation, data->namelen,
data->name, (unsigned) data->fid1.mds,
(unsigned) data->fid1.id,
- (unsigned) data->fid1.generation, i, mea);
+ (unsigned) data->fid1.generation, data->fid1.mds);
} else {
/* request from MDS to acquire i_links for inode by fid1 */
- i = data->fid1.mds;
CDEBUG(D_OTHER, "inc i_nlinks for %u/%u/%u\n",
(unsigned) data->fid1.mds, (unsigned) data->fid1.id,
(unsigned) data->fid1.generation);
}
- rc = md_link(lmv->tgts[i].exp, data, request);
+ rc = md_link(lmv->tgts[data->fid1.mds].exp, data, request);
RETURN(rc);
}
RETURN(0);
}
+void lmv_remove_dots(struct page *page)
+{
+ char *kaddr = page_address(page);
+ unsigned limit = PAGE_CACHE_SIZE;
+ unsigned offs, rec_len;
+ struct ext2_dir_entry_2 *p;
+
+ for (offs = 0; offs <= limit - EXT2_DIR_REC_LEN(1); offs += rec_len) {
+ p = (struct ext2_dir_entry_2 *)(kaddr + offs);
+ rec_len = le16_to_cpu(p->rec_len);
+
+ if ((p->name_len == 1 && p->name[0] == '.') ||
+ (p->name_len == 2 && p->name[0] == '.' && p->name[1] == '.'))
+ p->inode = 0;
+ }
+}
+
int lmv_readpage(struct obd_export *exp, struct ll_fid *mdc_fid,
__u64 offset, struct page *page,
struct ptlrpc_request **request)
(unsigned long) offset);
}
rc = md_readpage(lmv->tgts[rfid.mds].exp, &rfid, offset, page, request);
+ if (rc == 0 && !fid_equal(&rfid, mdc_fid)) {
+ /* this page isn't from master object. to avoid
+ * ./.. duplication in directory, we have to remove them
+ * from all slave objects */
+ lmv_remove_dots(page);
+ }
lmv_put_obj(obj);
-#warning "we need fix for duplicate . and .. from slaves"
-
RETURN(rc);
}
{
struct obd_device *obd = exp->exp_obd;
struct lmv_obd *lmv = &obd->u.lmv;
- struct mea *mea = data->mea1;
int rc, i = 0;
ENTRY;
lmv_connect(obd);
if (data->namelen != 0) {
- i = mea_name2idx(mea, (char *) data->name, data->namelen);
- if (mea)
- data->fid1 = mea->mea_fids[i];
+ struct lmv_obj *obj;
+ obj = lmv_grab_obj(obd, &data->fid1, 0);
+ if (obj) {
+ i = raw_name2idx(obj->objcount, data->name,
+ data->namelen);
+ data->fid1 = obj->objs[i].fid;
+ lmv_put_obj(obj);
+ }
CDEBUG(D_OTHER, "unlink '%*s' in %lu/%lu/%lu -> %u\n",
data->namelen, data->name,
(unsigned long) data->fid1.mds,
(unsigned long) data->fid1.id,
(unsigned long) data->fid1.generation, i);
} else {
- i = data->fid1.mds;
CDEBUG(D_OTHER, "drop i_nlink on %lu/%lu/%lu\n",
(unsigned long) data->fid1.mds,
(unsigned long) data->fid1.id,
(unsigned long) data->fid1.generation);
}
- rc = md_unlink(lmv->tgts[i].exp, data, request);
+ rc = md_unlink(lmv->tgts[data->fid1.mds].exp, data, request);
RETURN(rc);
}
RETURN(rc);
}
+int lmv_obd_create_single(struct obd_export *exp, struct obdo *oa,
+ struct lov_stripe_md **ea, struct obd_trans_info *oti)
+{
+ struct obd_device *obd = exp->exp_obd;
+ struct lmv_obd *lmv = &obd->u.lmv;
+ struct lov_stripe_md obj_md;
+ struct lov_stripe_md *obj_mdp = &obj_md;
+ int rc = 0;
+ ENTRY;
+ lmv_connect(obd);
+
+ LASSERT(ea == NULL);
+ LASSERT(oa->o_mds < lmv->count);
+
+ rc = obd_create(lmv->tgts[oa->o_mds].exp, oa, &obj_mdp, oti);
+ LASSERT(rc == 0);
+
+ RETURN(rc);
+}
+
/*
* to be called from MDS only
*/
ENTRY;
lmv_connect(obd);
- LASSERT(ea != NULL);
LASSERT(oa != NULL);
+
+ if (ea == NULL) {
+ rc = lmv_obd_create_single(exp, oa, NULL, oti);
+ RETURN(rc);
+ }
if (*ea == NULL) {
rc = obd_alloc_diskmd(exp, (struct lov_mds_md **) ea);
l_dput(dentry);
- return err;
+ RETURN(err);
#else
#error "rebuild kernel and lustre with ext3-mds-num patch!"
LASSERT(0);
return(rc);
}
-#define DENTRY_VALID(dentry) \
- ((dentry)->d_inode || ((dentry)->d_flags & DCACHE_CROSS_REF))
-
static int mds_getattr_name(int offset, struct ptlrpc_request *req,
struct lustre_handle *child_lockh, int child_part)
{
char fidname[LL_FID_NAMELEN];
struct inode *parent_inode;
struct obd_run_ctxt saved;
- struct dentry *new_child;
int err, namelen, mealen;
struct obd_ucred uc;
+ struct dentry *new;
struct mea *mea;
void *handle;
ENTRY;
RETURN(rc);
repbody = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*repbody));
+
+ if (!(body->oa.o_valid & OBD_MD_FLID)) {
+ /* this is request from another MDS to create remove dir inode */
+ unsigned int tmpname = ll_insecure_random_int();
+
+ handle = fsfilt_start(obd, parent_inode, FSFILT_OP_MKDIR, NULL);
+ LASSERT(!IS_ERR(handle));
+
+ sprintf(fidname, "%u", tmpname);
+ new = simple_mkdir(mds->mds_objects_dir, fidname,
+ body->oa.o_mode, 1);
+ LASSERT(!IS_ERR(new));
+ LASSERT(new->d_inode != NULL);
+
+ obdo_from_inode(&repbody->oa, new->d_inode, FILTER_VALID_FLAGS);
+ repbody->oa.o_id = new->d_inode->i_ino;
+ repbody->oa.o_generation = new->d_inode->i_generation;
+ repbody->oa.o_valid |= OBD_MD_FLID | OBD_MD_FLGENER;
+
+ rc = fsfilt_del_dir_entry(obd, new);
+ LASSERT(rc == 0);
+
+ rc = fsfilt_commit(obd, parent_inode, handle, 0);
+ LASSERT(rc == 0);
+
+ CDEBUG(D_OTHER, "created dirobj: %lu/%lu mode %o\n",
+ (unsigned long) new->d_inode->i_ino,
+ (unsigned long) new->d_inode->i_generation,
+ (unsigned) new->d_inode->i_mode);
+
+ l_dput(new);
+ pop_ctxt(&saved, &obd->obd_ctxt, &uc);
+ RETURN(0);
+ }
+
+ repbody = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*repbody));
memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
namelen = ll_fid2str(fidname, body->oa.o_id, body->oa.o_generation);
down(&parent_inode->i_sem);
- new_child = lookup_one_len(fidname, mds->mds_objects_dir, namelen);
- if (new_child->d_inode != NULL) {
+ new = lookup_one_len(fidname, mds->mds_objects_dir, namelen);
+ if (new->d_inode != NULL) {
CERROR("impossible non-negative obj dentry " LPU64":%u!\n",
repbody->oa.o_id, repbody->oa.o_generation);
LBUG();
/* FIXME: error handling here */
LASSERT(!IS_ERR(handle));
- rc = vfs_mkdir(parent_inode, new_child, body->oa.o_mode);
+ rc = vfs_mkdir(parent_inode, new, body->oa.o_mode);
up(&parent_inode->i_sem);
/* FIXME: error handling here */
if (rc)
OBD_ALLOC(mea, mealen);
LASSERT(mea != NULL);
mea->mea_count = 0;
- down(&new_child->d_inode->i_sem);
- handle = fsfilt_start(obd, new_child->d_inode, FSFILT_OP_SETATTR, NULL);
+ down(&new->d_inode->i_sem);
+ handle = fsfilt_start(obd, new->d_inode, FSFILT_OP_SETATTR, NULL);
LASSERT(!IS_ERR(handle));
- rc = fsfilt_set_md(obd, new_child->d_inode, handle, mea, mealen);
+ rc = fsfilt_set_md(obd, new->d_inode, handle, mea, mealen);
LASSERT(rc == 0);
- fsfilt_commit(obd, new_child->d_inode, handle, 0);
+ fsfilt_commit(obd, new->d_inode, handle, 0);
LASSERT(rc == 0);
- up(&new_child->d_inode->i_sem);
+ up(&new->d_inode->i_sem);
OBD_FREE(mea, mealen);
err = fsfilt_commit(exp->exp_obd, mds->mds_objects_dir->d_inode,
/* FIXME: error handling here */
LASSERT(err == 0);
- obdo_from_inode(&repbody->oa, new_child->d_inode, FILTER_VALID_FLAGS);
- repbody->oa.o_id = new_child->d_inode->i_ino;
- repbody->oa.o_generation = new_child->d_inode->i_generation;
+ obdo_from_inode(&repbody->oa, new->d_inode, FILTER_VALID_FLAGS);
+ repbody->oa.o_id = new->d_inode->i_ino;
+ repbody->oa.o_generation = new->d_inode->i_generation;
CDEBUG(D_OTHER, "created dirobj: %lu, %lu mode %o, uid %u, gid %u\n",
(unsigned long) repbody->oa.o_id,
- (unsigned long) new_child->d_inode->i_ino,
- (unsigned) new_child->d_inode->i_mode,
- (unsigned) new_child->d_inode->i_uid,
- (unsigned) new_child->d_inode->i_gid);
- dput(new_child);
+ (unsigned long) new->d_inode->i_ino,
+ (unsigned) new->d_inode->i_mode,
+ (unsigned) new->d_inode->i_uid,
+ (unsigned) new->d_inode->i_gid);
+ dput(new);
pop_ctxt(&saved, &obd->obd_ctxt, &uc);
RETURN(0);
}
{
struct obd_device *obd;
struct mds_obd *mds;
+ int rc;
ENTRY;
obd = class_exp2obd(exp);
atomic_read(&mds->mds_real_clients));
exp->exp_flags |= OBD_OPT_REAL_CLIENT;
}
+ rc = mds_lmv_connect(obd, mds->mds_lmv_name);
+ LASSERT(rc == 0);
RETURN(0);
}
};
#define MDS_FILTERDATA(inode) ((struct mds_filter_data *)(inode)->i_filterdata)
+#define DENTRY_VALID(dentry) \
+ ((dentry)->d_inode || ((dentry)->d_flags & DCACHE_CROSS_REF))
static inline struct mds_obd *mds_req2mds(struct ptlrpc_request *req)
{
int mds_try_to_split_dir(struct obd_device *, struct dentry *, struct mea **,
int);
int mds_get_lmv_attr(struct obd_device *, struct inode *, struct mea **, int *);
+int mds_choose_mdsnum(struct obd_device *, const char *, int);
#endif /* _MDS_INTERNAL_H */
b->flags = inode->i_flags;
b->rdev = inode->i_rdev;
/* Return the correct link count for orphan inodes */
- b->nlink = mds_inode_is_orphan(inode) ? 0 : inode->i_nlink;
+ if (mds_inode_is_orphan(inode)) {
+ b->nlink = 0;
+ } else if (S_ISDIR(inode->i_mode)) {
+ b->nlink = 1;
+ } else {
+ b->nlink = inode->i_nlink;
+ }
b->generation = inode->i_generation;
b->suppgid = -1;
b->mds = obd->u.mds.mds_num;
struct mds_obd *mds = &obd->u.mds;
ENTRY;
if (mds->mds_lmv_exp)
- obd_init_ea_size(mds->mds_lmv_exp, mds->mds_max_mdsize, 0);
+ obd_init_ea_size(mds->mds_lmv_exp, mds->mds_max_mdsize,
+ mds->mds_max_cookiesize);
RETURN(0);
}
return 0;
}
-#define MAX_DIR_SIZE (32 * 1024)
+#define MAX_DIR_SIZE (64 * 1024)
/*
* must not be called on already splitted directories
int mds_try_to_split_dir(struct obd_device *obd,
struct dentry *dentry, struct mea **mea, int nstripes)
{
- ldlm_policy_data_t policy = { .l_inodebits = {MDS_INODELOCK_UPDATE}};
- struct ldlm_res_id res_id = { .name = {0} };
struct inode *dir = dentry->d_inode;
struct mds_obd *mds = &obd->u.mds;
- struct lustre_handle lockh;
struct mea *tmea = NULL;
struct obdo *oa = NULL;
- int rc, flags = 0;
- int mea_size = 0;
+ int rc, mea_size = 0;
void *handle;
ENTRY;
if (dentry->d_inode->i_ino == mds->mds_rootfid.id)
RETURN(0);
-#if 0
+#if 1
if (dir->i_size < MAX_DIR_SIZE)
RETURN(0);
#endif
necessary amount of stripes, but on the other hand with this approach
of allocating maximal possible amount of MDS slots, it would be
easier to split the dir over more MDSes */
- rc = obd_alloc_diskmd(mds->mds_lmv_exp, mea);
+ rc = obd_alloc_diskmd(mds->mds_lmv_exp, (void *) mea);
if (!(*mea))
RETURN(-ENOMEM);
(*mea)->mea_count = nstripes;
+
+#warning "we have to take EX lock on a dir for splitting"
- /* convert lock on the dir in order tox
- * invalidate client's attributes -bzzz */
- res_id.name[0] = dir->i_ino;
- res_id.name[1] = dir->i_generation;
- rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace, res_id,
- LDLM_IBITS, &policy, LCK_PW, &flags,
- mds_blocking_ast, ldlm_completion_ast, NULL, NULL,
- NULL, 0, NULL, &lockh);
- if (rc != ELDLM_OK) {
- CERROR("error: rc = %d\n", rc);
- }
-
/* 1) create directory objects on slave MDS'es */
/* FIXME: should this be OBD method? */
oa = obdo_alloc();
up(&dir->i_sem);
obdo_free(oa);
- ldlm_lock_decref(&lockh, LCK_PW);
-
/* 3) read through the dir and distribute it over objects */
scan_and_distribute(obd, dentry, *mea);
RETURN(rc);
}
+int mds_choose_mdsnum(struct obd_device *obd, const char *name, int len)
+{
+ struct mds_obd *mds = &obd->u.mds;
+ struct lmv_obd *lmv = &mds->mds_lmv_exp->exp_obd->u.lmv;
+ int i;
+
+ i = raw_name2idx(lmv->count, name, len);
+ RETURN(i);
+}
+
}
case S_IFDIR:{
int nstripes = 0;
- handle = fsfilt_start(obd, dir, FSFILT_OP_MKDIR, NULL);
- if (IS_ERR(handle))
- GOTO(cleanup, rc = PTR_ERR(handle));
+ int i;
+
+ /* as Peter asked, mkdir() should distribute new directories
+ * over the whole cluster in order to distribute namespace
+ * processing load. first, we calculate which MDS to use to
+ * put new directory's inode in */
+ i = mds_choose_mdsnum(obd, rec->ur_name, rec->ur_namelen - 1);
+ if (i == mds->mds_num) {
+ /* inode will be created locally */
- rc = vfs_mkdir(dir, dchild, rec->ur_mode);
+ handle = fsfilt_start(obd, dir, FSFILT_OP_MKDIR, NULL);
+ if (IS_ERR(handle))
+ GOTO(cleanup, rc = PTR_ERR(handle));
- if (rec->ur_eadata)
- nstripes = *(u16 *)rec->ur_eadata;
+ rc = vfs_mkdir(dir, dchild, rec->ur_mode);
+
+ if (rec->ur_eadata)
+ nstripes = *(u16 *)rec->ur_eadata;
#if 1
- /* this is for current testing yet. after the testing
- * directory will split if size reaches some limite -bzzz */
- if (rc == 0) {
+ /* this is for current testing yet. after the testing
+ * directory will split if size reaches some limite -bzzz */
+ if (rc == 0) {
#else
- if (rc == 0 && nstripes) {
+ if (rc == 0 && nstripes) {
#endif
- /* FIXME: error handling here */
- mds_try_to_split_dir(obd, dchild, NULL, nstripes);
+ /* FIXME: error handling here */
+ mds_try_to_split_dir(obd, dchild, NULL, nstripes);
+ }
+ } else if (!DENTRY_VALID(dchild)) {
+ /* inode will be created on another MDS */
+ struct obdo *oa = NULL;
+ struct mds_body *body;
+
+ /* first, create that inode */
+ oa = obdo_alloc();
+ LASSERT(oa != NULL);
+ oa->o_mds = i;
+ obdo_from_inode(oa, dir, OBD_MD_FLTYPE | OBD_MD_FLATIME |
+ OBD_MD_FLMTIME | OBD_MD_FLCTIME |
+ OBD_MD_FLUID | OBD_MD_FLGID);
+ oa->o_mode = dir->i_mode;
+ CDEBUG(D_OTHER, "%s: create dir on MDS %u\n",
+ obd->obd_name, i);
+ rc = obd_create(mds->mds_lmv_exp, oa, NULL, NULL);
+ LASSERT(rc == 0);
+
+ /* now, add new dir entry for it */
+ handle = fsfilt_start(obd, dir, FSFILT_OP_MKDIR, NULL);
+ if (IS_ERR(handle))
+ GOTO(cleanup, rc = PTR_ERR(handle));
+ rc = fsfilt_add_dir_entry(obd, dparent, rec->ur_name,
+ rec->ur_namelen - 1,
+ oa->o_id, oa->o_generation,
+ i);
+ LASSERT(rc == 0);
+
+ /* fill reply */
+ body = lustre_msg_buf(req->rq_repmsg,
+ offset, sizeof (*body));
+ body->valid |= OBD_MD_FLID | OBD_MD_MDS;
+ body->fid1.id = oa->o_id;
+ body->fid1.mds = i;
+ body->fid1.generation = oa->o_generation;
+ obdo_free(oa);
+ } else {
+ /* requested name exists in the directory */
+ rc = -EEXIST;
}
EXIT;
break;
if (rc) {
CDEBUG(D_INODE, "error during create: %d\n", rc);
GOTO(cleanup, rc);
- } else {
+ } else if (dchild->d_inode) {
struct iattr iattr;
struct inode *inode = dchild->d_inode;
struct mds_body *body;
char *fidname = rec->ur_name;
struct dentry *child = NULL;
struct lustre_handle lockh;
- unsigned mode;
void *handle;
ENTRY;
CERROR("error linking orphan %lu/%lu to FIDS: rc = %d\n",
(unsigned long) child->d_inode->i_ino,
(unsigned long) child->d_inode->i_generation, rc);
- else
+ else {
+ if (S_ISDIR(child->d_inode->i_mode)) {
+ fids_dir->i_nlink++;
+ mark_inode_dirty(fids_dir);
+ }
mark_inode_dirty(child->d_inode);
+ }
fsfilt_commit(obd, fids_dir, handle, 0);
rec->ur_fid1->id = fids_dir->i_ino;
static int mds_copy_unlink_reply(struct ptlrpc_request *master,
struct ptlrpc_request *slave)
{
- struct lov_mds_md *eadata;
void *cookie, *cookie2;
struct mds_body *body2;
struct mds_body *body;
LASSERT(body2 != NULL);
if (!(body->valid & (OBD_MD_FLID | OBD_MD_FLGENER))) {
- CWARN("empty reply\n");
RETURN(0);
}
LASSERT(offset == 0 || offset == 2);
- DEBUG_REQ(D_INODE, req, "parent ino "LPU64"/%u, child %s",
- rec->ur_fid1->id, rec->ur_fid1->generation, rec->ur_name);
DEBUG_REQ(D_INODE, req, "unlink %*s (remote inode %u/%u/%u)\n",
rec->ur_namelen - 1, rec->ur_name, (unsigned)dchild->d_mdsnum,
(unsigned) dchild->d_inum, (unsigned) dchild->d_generation);
LASSERT(unlink_by_fid == 0);
LASSERT(dchild->d_mdsnum != mds->mds_num);
mds_reint_unlink_remote(rec, offset, req, parent_lockh,
- dparent, &child_lockh, dchild);
+ dparent, &child_lockh, dchild);
RETURN(0);
}
cleanup_phase = 3; /* original name dentry */
inode = (*de_oldp)->d_inode;
- if (inode != NULL)
+ if (inode != NULL) {
inode = igrab(inode);
- if (inode == NULL)
- GOTO(cleanup, rc = -ENOENT);
+ if (inode == NULL)
+ GOTO(cleanup, rc = -ENOENT);
- c1_res_id.name[0] = inode->i_ino;
- c1_res_id.name[1] = inode->i_generation;
- iput(inode);
+ c1_res_id.name[0] = inode->i_ino;
+ c1_res_id.name[1] = inode->i_generation;
+ iput(inode);
+ } else if ((*de_oldp)->d_flags & DCACHE_CROSS_REF) {
+ c1_res_id.name[0] = (*de_oldp)->d_inum;
+ c1_res_id.name[1] = (*de_oldp)->d_generation;
+ }
/* Step 4: Lookup the target child entry */
*de_newp = ll_lookup_one_len(new_name, *de_tgtdirp, new_len - 1);
cleanup_phase = 4; /* target dentry */
inode = (*de_newp)->d_inode;
- if (inode != NULL)
+ if (inode != NULL) {
inode = igrab(inode);
- if (inode == NULL)
- goto retry_locks;
-
- c2_res_id.name[0] = inode->i_ino;
- c2_res_id.name[1] = inode->i_generation;
+ if (inode == NULL)
+ goto retry_locks;
- iput(inode);
+ c2_res_id.name[0] = inode->i_ino;
+ c2_res_id.name[1] = inode->i_generation;
+ iput(inode);
+ } else if ((*de_newp)->d_flags & DCACHE_CROSS_REF) {
+ c2_res_id.name[0] = (*de_newp)->d_inum;
+ c2_res_id.name[1] = (*de_newp)->d_generation;
+ }
retry_locks:
/* Step 5: Take locks on the parents and child(ren) */
GOTO(cleanup, rc);
}
- if ((*de_oldp)->d_inode == NULL)
+ if (!DENTRY_VALID(*de_oldp))
GOTO(cleanup, rc = -ENOENT);
/* Step 6b: Re-lookup target child to verify it hasn't changed */
/* mea.c */
EXPORT_SYMBOL(mea_name2idx);
-int raw_name2idx(int count, char *name, int namelen);
EXPORT_SYMBOL(raw_name2idx);
#ifdef LPROCFS
return c;
}
-int raw_name2idx(int count, char *name, int namelen)
+int raw_name2idx(int count, const char *name, int namelen)
{
unsigned int c;