struct mea *mea2; /* mea of inode2 */
};
+#define MDS_MODE_DONT_LOCK (1 << 30)
+#define MDS_MODE_REPLAY (1 << 31)
+
struct mds_rec_setattr {
__u32 sa_opcode;
__u32 sa_fsuid;
struct lmv_tgt_desc *tgts;
struct obd_export *exp;
int rc, rc2, i;
- ENTRY;
if (lmv->connected)
- RETURN(0);
+ return 0;
lmv->connected = 1;
cluuid = &lmv->cluuid;
lmv_set_timeouts(obd);
class_export_put(exp);
- RETURN (0);
+ return 0;
out_disc:
while (i-- > 0) {
RETURN(rc);
/* FIXME: choose right MDC here */
+ CWARN("this method isn't implemented yet\n");
rc = md_done_writing(lmv->tgts[0].ltd_exp, obdo);
RETURN(rc);
}
+int lmv_enqueue_slaves(struct obd_export *exp, int locktype,
+ struct lookup_intent *it, int lockmode,
+ struct mdc_op_data *data, struct lustre_handle *lockh,
+ void *lmm, int lmmsize,
+ ldlm_completion_callback cb_completion,
+ ldlm_blocking_callback cb_blocking, void *cb_data)
+{
+ struct obd_device *obd = exp->exp_obd;
+ struct lmv_obd *lmv = &obd->u.lmv;
+ struct mea *mea = data->mea1;
+ struct mdc_op_data data2;
+ int i, rc, mds;
+ ENTRY;
+
+ LASSERT(mea != NULL);
+ for (i = 0; i < mea->mea_count; i++) {
+ if (lmv->tgts[i].ltd_exp == NULL)
+ continue;
+
+ memset(&data2, 0, sizeof(data2));
+ data2.fid1 = mea->mea_fids[i];
+ mds = data2.fid1.mds;
+ rc = md_enqueue(lmv->tgts[mds].ltd_exp, locktype, it, lockmode,
+ &data2, lockh + i, lmm, lmmsize, cb_completion,
+ cb_blocking, cb_data);
+ CDEBUG(D_OTHER, "take lock on slave %lu/%lu/%lu -> %d/%d\n",
+ (unsigned long) mea->mea_fids[i].mds,
+ (unsigned long) mea->mea_fids[i].id,
+ (unsigned long) mea->mea_fids[i].generation,
+ rc, it->d.lustre.it_status);
+ if (rc)
+ GOTO(cleanup, rc);
+ if (it->d.lustre.it_data) {
+ struct ptlrpc_request *req;
+ req = (struct ptlrpc_request *) it->d.lustre.it_data;
+ ptlrpc_req_finished(req);
+ }
+
+ if (it->d.lustre.it_status)
+ GOTO(cleanup, rc = it->d.lustre.it_status);
+ }
+ RETURN(0);
+
+cleanup:
+ /* drop all taken locks */
+ while (--i >= 0) {
+ if (lockh[i].cookie)
+ ldlm_lock_decref(lockh + i, lockmode);
+ lockh[i].cookie = 0;
+ }
+ RETURN(rc);
+}
+
int lmv_enqueue(struct obd_export *exp, int lock_type,
struct lookup_intent *it, int lock_mode,
struct mdc_op_data *data, struct lustre_handle *lockh,
if (rc)
RETURN(rc);
+ if (it->it_op == IT_UNLINK) {
+ rc = lmv_enqueue_slaves(exp, lock_type, it, lock_mode,
+ data, lockh, lmm, lmmsize,
+ cb_completion, cb_blocking, cb_data);
+ RETURN(rc);
+ }
+
if (data->namelen) {
obj = lmv_grab_obj(obd, &data->fid1, 0);
if (obj) {
RETURN(rc);
}
+int lmv_unlink_slaves(struct obd_export *exp,
+ struct mdc_op_data *data, struct ptlrpc_request **req)
+{
+ struct obd_device *obd = exp->exp_obd;
+ struct lmv_obd *lmv = &obd->u.lmv;
+ struct mea *mea = data->mea1;
+ struct mdc_op_data data2;
+ int i, rc = 0, mds;
+ ENTRY;
+
+ LASSERT(mea != NULL);
+ for (i = 0; i < mea->mea_count; i++) {
+ if (lmv->tgts[i].ltd_exp == NULL)
+ continue;
+
+ memset(&data2, 0, sizeof(data2));
+ data2.fid1 = mea->mea_fids[i];
+ data2.create_mode = MDS_MODE_DONT_LOCK | S_IFDIR;
+ mds = data2.fid1.mds;
+ rc = md_unlink(lmv->tgts[mds].ltd_exp, &data2, req);
+ CDEBUG(D_OTHER, "unlink slave %lu/%lu/%lu -> %d\n",
+ (unsigned long) mea->mea_fids[i].mds,
+ (unsigned long) mea->mea_fids[i].id,
+ (unsigned long) mea->mea_fids[i].generation, rc);
+ if (*req) {
+ ptlrpc_req_finished(*req);
+ *req = NULL;
+ }
+ if (rc)
+ break;
+ }
+ RETURN(rc);
+}
+
int lmv_unlink(struct obd_export *exp, struct mdc_op_data *data,
struct ptlrpc_request **request)
{
struct lmv_obd *lmv = &obd->u.lmv;
int rc, i = 0;
ENTRY;
-
rc = lmv_check_connect(obd);
if (rc)
RETURN(rc);
- if (data->namelen != 0) {
+ if (data->namelen == 0 && data->mea1 != NULL) {
+ /* mds asks to remove slave objects */
+ rc = lmv_unlink_slaves(exp, data, request);
+ RETURN(rc);
+ } else if (data->namelen != 0) {
struct lmv_obj *obj;
obj = lmv_grab_obj(obd, &data->fid1, 0);
if (obj) {
char **pbuf, int *size)
{
struct fs_extent *fs_extents;
- struct ldlm_extent *extents;
+ struct ldlm_extent *extents = NULL;
struct inode *inode;
struct inode *cache_inode;
struct fsfilt_operations *cache_fsfilt = NULL;
/* get ready for the reply */
reply_buffers = 3;
req->rq_replen = lustre_msg_size(3, repsize);
- } else if (it->it_op & IT_UNLINK) {
- size[2] = sizeof(struct mds_rec_unlink);
- size[3] = data->namelen + 1;
- req = ptlrpc_prep_req(class_exp2cliimp(exp), LDLM_ENQUEUE, 4,
- size, NULL);
- if (!req)
- RETURN(-ENOMEM);
-
- /* pack the intent */
- lit = lustre_msg_buf(req->rq_reqmsg, 1, sizeof (*lit));
- lit->opc = (__u64)it->it_op;
-
- /* pack the intended request */
- mdc_unlink_pack(req->rq_reqmsg, 2, data);
- /* get ready for the reply */
- reply_buffers = 4;
- req->rq_replen = lustre_msg_size(4, repsize);
} else if (it->it_op & (IT_GETATTR | IT_LOOKUP | IT_CHDIR)) {
int valid = OBD_MD_FLNOTOBD | OBD_MD_FLEASIZE;
size[2] = sizeof(struct mds_body);
reply_buffers = 3;
req->rq_replen = lustre_msg_size(3, repsize);
} else if (it->it_op == IT_READDIR) {
- policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
+ policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
req = ptlrpc_prep_req(class_exp2cliimp(exp), LDLM_ENQUEUE, 1,
size, NULL);
if (!req)
/* get ready for the reply */
reply_buffers = 1;
req->rq_replen = lustre_msg_size(1, repsize);
- } else {
+ } else if (it->it_op == IT_UNLINK) {
+ size[2] = sizeof(struct mds_body);
+ policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
+ req = ptlrpc_prep_req(class_exp2cliimp(exp), LDLM_ENQUEUE, 3,
+ size, NULL);
+ if (!req)
+ RETURN(-ENOMEM);
+
+ /* pack the intended request */
+ mdc_getattr_pack(req->rq_reqmsg, 0, 2, 0, data);
+
+ /* pack the intent */
+ lit = lustre_msg_buf(req->rq_reqmsg, 1, sizeof (*lit));
+ lit->opc = (__u64)it->it_op;
+
+ /* get ready for the reply */
+ reply_buffers = 3;
+ req->rq_replen = lustre_msg_size(3, repsize);
+ } else {
LBUG();
RETURN(-EINVAL);
}
ret_mode = LCK_CW;
if (mds_splitting_expected(obd, dentry)) {
/* splitting possible. serialize any access */
- CERROR("%s: gonna split %lu/%lu\n",
+ CDEBUG(D_OTHER, "%s: gonna split %lu/%lu\n",
obd->obd_name,
(unsigned long) dentry->d_inode->i_ino,
(unsigned long) dentry->d_inode->i_generation);
LDLM_DEBUG(lock, "intent policy, opc: %s", ldlm_it2str(it->opc));
- rc = lustre_pack_reply(req, it->opc == IT_UNLINK ? 4 : 3, repsize,
- NULL);
+ rc = lustre_pack_reply(req, 3, repsize, NULL);
if (rc)
RETURN(req->rq_status = rc);
RETURN(ELDLM_LOCK_ABORTED);
}
break;
+ case IT_UNLINK:
+ rc = mds_lock_and_check_slave(offset, req, &lockh);
+ if ((rep->lock_policy_res2 = rc)) {
+ if (rc == ENOLCK)
+ rep->lock_policy_res2 = 0;
+ RETURN(ELDLM_LOCK_ABORTED);
+ }
+ break;
default:
CERROR("Unhandled intent "LPD64"\n", it->opc);
LBUG();
int mds_choose_mdsnum(struct obd_device *, const char *, int, int);
int mds_lmv_postsetup(struct obd_device *);
int mds_splitting_expected(struct obd_device *, struct dentry *);
+int mds_lock_slave_objs(struct obd_device *, struct dentry *,
+ struct lustre_handle **);
+int mds_unlink_slave_objs(struct obd_device *, struct dentry *);
+void mds_unlock_slave_objs(struct obd_device *, struct dentry *,
+ struct lustre_handle *);
+int mds_lock_and_check_slave(int, struct ptlrpc_request *, struct lustre_handle *);
+
#endif /* _MDS_INTERNAL_H */
}
if (rc > 0)
rc = 0;
+
RETURN(rc);
}
static int flush_buffer_onto_mds(struct dirsplit_control *dc, int mdsnum)
{
struct mds_obd *mds = &dc->obd->u.mds;
- struct dir_cache *ca;
struct list_head *cur, *tmp;
+ struct dir_cache *ca;
+ int rc;
ENTRY;
ca = dc->cache + mdsnum;
ca->brwc.count = PAGE_SIZE;
ca->brwc.flag = 0;
ca->oa.o_mds = mdsnum;
- obd_brw(OBD_BRW_WRITE, mds->mds_lmv_exp, &ca->oa,
- (struct lov_stripe_md *) dc->mea,
- 1, &ca->brwc, NULL);
+ rc = obd_brw(OBD_BRW_WRITE, mds->mds_lmv_exp, &ca->oa,
+ (struct lov_stripe_md *) dc->mea,
+ 1, &ca->brwc, NULL);
+ if (rc)
+ RETURN(rc);
- list_del(&page->list);
- __free_page(page);
+ }
+ RETURN(0);
+}
+
+static int remove_entries_from_orig_dir(struct dirsplit_control *dc, int mdsnum)
+{
+ struct list_head *cur, *tmp;
+ struct dentry *dentry;
+ struct dir_cache *ca;
+ struct dir_entry *de;
+ struct page *page;
+ char *buf, *end;
+ int rc;
+ ENTRY;
+
+ ca = dc->cache + mdsnum;
+ list_for_each_safe(cur, tmp, &ca->list) {
+ page = list_entry(cur, struct page, list);
+ buf = page_address(page);
+ end = buf + PAGE_SIZE;
+
+ de = (struct dir_entry *) buf;
+ while ((char *) de < end && de->namelen) {
+ /* lookup an inode */
+ LASSERT(de->namelen <= 255);
+
+ dentry = ll_lookup_one_len(de->name, dc->dentry,
+ de->namelen);
+ if (IS_ERR(dentry)) {
+ CERROR("can't lookup %*s: %d\n", de->namelen,
+ de->name, (int) PTR_ERR(dentry));
+ goto next;
+ }
+ LASSERT(dentry->d_inode != NULL);
+ rc = fsfilt_del_dir_entry(dc->obd, dentry);
+ l_dput(dentry);
+next:
+ de = (struct dir_entry *)
+ ((char *) de + DIR_REC_LEN(de->namelen));
+ }
}
RETURN(0);
}
}
err = vfs_readdir(file, filldir, &dc);
-
filp_close(file, 0);
+ if (err)
+ GOTO(cleanup, err);
for (i = 0; i < mea->mea_count; i++) {
- if (dc.cache[i].cached)
- flush_buffer_onto_mds(&dc, i);
+ if (!dc.cache[i].cached)
+ continue;
+ err = flush_buffer_onto_mds(&dc, i);
+ if (err)
+ GOTO(cleanup, err);
}
+ for (i = 0; i < mea->mea_count; i++) {
+ if (!dc.cache[i].cached)
+ continue;
+ err = remove_entries_from_orig_dir(&dc, i);
+ if (err)
+ GOTO(cleanup, err);
+ }
+
+cleanup:
+ for (i = 0; i < mea->mea_count; i++) {
+ struct list_head *cur, *tmp;
+ if (!dc.cache[i].cached)
+ continue;
+ list_for_each_safe(cur, tmp, &dc.cache[i].list) {
+ struct page *page;
+ page = list_entry(cur, struct page, list);
+ list_del(&page->list);
+ __free_page(page);
+ }
+ }
OBD_FREE(dc.cache, sizeof(struct dir_cache) * mea->mea_count);
OBD_FREE(file_name, nlen);
- return 0;
+ RETURN(err);
}
#define MAX_DIR_SIZE (64 * 1024)
err = fsfilt_add_dir_entry(obd, res->dentry, de->name,
de->namelen, de->ino,
de->generation, de->mds);
- /* FIXME: remove entries from the original dir */
-#warning "removing entries from the original dir"
LASSERT(err == 0);
de = (struct dir_entry *)
((char *) de + DIR_REC_LEN(de->namelen));
RETURN(i);
}
+int mds_lock_slave_objs(struct obd_device *obd, struct dentry *dentry,
+ struct lustre_handle **rlockh)
+{
+ struct mds_obd *mds = &obd->u.mds;
+ struct mdc_op_data op_data;
+ struct lookup_intent it;
+ struct mea *mea = NULL;
+ int mea_size, rc;
+
+ LASSERT(rlockh != NULL);
+ LASSERT(dentry != NULL);
+ LASSERT(dentry->d_inode != NULL);
+
+ /* clustered MD ? */
+ if (!mds->mds_lmv_obd)
+ return 0;
+
+ /* a dir can be splitted only */
+ if (!S_ISDIR(dentry->d_inode->i_mode))
+ return 0;
+
+ rc = mds_get_lmv_attr(obd, dentry->d_inode, &mea, &mea_size);
+ if (rc)
+ return rc;
+
+ if (mea == NULL)
+ return 0;
+ if (mea->mea_count == 0) {
+ /* this is slave object */
+ GOTO(cleanup, rc = 0);
+ }
+
+ CDEBUG(D_OTHER, "%s: lock slaves for %lu/%lu\n", obd->obd_name,
+ (unsigned long) dentry->d_inode->i_ino,
+ (unsigned long) dentry->d_inode->i_generation);
+
+ OBD_ALLOC(*rlockh, sizeof(struct lustre_handle) * mea->mea_count);
+ if (*rlockh == NULL)
+ GOTO(cleanup, rc = -ENOMEM);
+ memset(*rlockh, 0, sizeof(struct lustre_handle) * mea->mea_count);
+
+ memset(&op_data, 0, sizeof(op_data));
+ op_data.mea1 = mea;
+ it.it_op = IT_UNLINK;
+ rc = md_enqueue(mds->mds_lmv_exp, LDLM_IBITS, &it, LCK_EX, &op_data,
+ *rlockh, NULL, 0, ldlm_completion_ast, mds_blocking_ast,
+ NULL);
+cleanup:
+ OBD_FREE(mea, mea_size);
+ RETURN(rc);
+}
+
+void mds_unlock_slave_objs(struct obd_device *obd, struct dentry *dentry,
+ struct lustre_handle *lockh)
+{
+ struct mds_obd *mds = &obd->u.mds;
+ struct mea *mea = NULL;
+ int mea_size, rc, i;
+
+ if (lockh == NULL)
+ return;
+
+ LASSERT(mds->mds_lmv_obd != NULL);
+ LASSERT(S_ISDIR(dentry->d_inode->i_mode));
+
+ rc = mds_get_lmv_attr(obd, dentry->d_inode, &mea, &mea_size);
+ if (rc) {
+ CERROR("locks are leaked\n");
+ return;
+ }
+ LASSERT(mea_size != 0);
+ LASSERT(mea != NULL);
+ LASSERT(mea->mea_count != 0);
+
+ CDEBUG(D_OTHER, "%s: unlock slaves for %lu/%lu\n", obd->obd_name,
+ (unsigned long) dentry->d_inode->i_ino,
+ (unsigned long) dentry->d_inode->i_generation);
+
+ for (i = 0; i < mea->mea_count; i++) {
+ if (lockh[i].cookie != 0)
+ ldlm_lock_decref(lockh + i, LCK_EX);
+ }
+
+ OBD_FREE(lockh, sizeof(struct lustre_handle) * mea->mea_count);
+ OBD_FREE(mea, mea_size);
+ return;
+}
+
+int mds_unlink_slave_objs(struct obd_device *obd, struct dentry *dentry)
+{
+ struct mds_obd *mds = &obd->u.mds;
+ struct ptlrpc_request *req = NULL;
+ struct mdc_op_data op_data;
+ struct mea *mea = NULL;
+ int mea_size, rc;
+
+ /* clustered MD ? */
+ if (!mds->mds_lmv_obd)
+ return 0;
+
+ /* a dir can be splitted only */
+ if (!S_ISDIR(dentry->d_inode->i_mode))
+ RETURN(0);
+
+ rc = mds_get_lmv_attr(obd, dentry->d_inode, &mea, &mea_size);
+ if (rc)
+ RETURN(rc);
+
+ if (mea == NULL)
+ return 0;
+ if (mea->mea_count == 0)
+ GOTO(cleanup, rc = 0);
+
+ CDEBUG(D_OTHER, "%s: unlink slaves for %lu/%lu\n", obd->obd_name,
+ (unsigned long) dentry->d_inode->i_ino,
+ (unsigned long) dentry->d_inode->i_generation);
+
+ memset(&op_data, 0, sizeof(op_data));
+ op_data.mea1 = mea;
+ rc = md_unlink(mds->mds_lmv_exp, &op_data, &req);
+ LASSERT(req == NULL);
+cleanup:
+ OBD_FREE(mea, mea_size);
+ RETURN(rc);
+}
+
+struct ide_tracking {
+ int entries;
+ int empty;
+};
+
+int mds_ide_filldir(void *__buf, const char *name, int namelen,
+ loff_t offset, ino_t ino, unsigned int d_type)
+{
+ struct ide_tracking *it = __buf;
+
+ if (ino == 0)
+ return 0;
+
+ it->entries++;
+ if (it->entries > 2)
+ goto noempty;
+ if (namelen > 2)
+ goto noempty;
+ if (name[0] == '.' && namelen == 1)
+ return 0;
+ if (name[0] == '.' && name[1] == '.' && namelen == 2)
+ return 0;
+noempty:
+ it->empty = 0;
+ return -ENOTEMPTY;
+}
+
+int mds_is_dir_empty(struct obd_device *obd, struct dentry *dentry)
+{
+ struct ide_tracking it;
+ struct file * file;
+ char *file_name;
+ int nlen, i, rc;
+
+ it.entries = 0;
+ it.empty = 1;
+
+ nlen = strlen("__iopen__/") + 10 + 1;
+ OBD_ALLOC(file_name, nlen);
+ if (!file_name)
+ RETURN(-ENOMEM);
+ i = sprintf(file_name, "__iopen__/0x%lx", dentry->d_inode->i_ino);
+
+ file = filp_open(file_name, O_RDONLY, 0);
+ if (IS_ERR(file)) {
+ CERROR("can't open directory %s: %d\n",
+ file_name, (int) PTR_ERR(file));
+ GOTO(cleanup, rc = PTR_ERR(file));
+ }
+
+ rc = vfs_readdir(file, mds_ide_filldir, &it);
+ filp_close(file, 0);
+
+ if (it.empty && rc == 0)
+ rc = 1;
+ else
+ rc = 0;
+
+cleanup:
+ OBD_FREE(file_name, nlen);
+ return rc;
+}
+
+int mds_lock_and_check_slave(int offset, struct ptlrpc_request *req,
+ struct lustre_handle *lockh)
+{
+ struct obd_device *obd = req->rq_export->exp_obd;
+ struct dentry *dentry = NULL;
+ struct lvfs_run_ctxt saved;
+ int cleanup_phase = 0;
+ struct mds_body *body;
+ struct lvfs_ucred uc;
+ int rc, update_mode;
+ ENTRY;
+
+ body = lustre_swab_reqbuf(req, offset, sizeof(*body),
+ lustre_swab_mds_body);
+ if (body == NULL) {
+ CERROR("Can't swab mds_body\n");
+ GOTO(cleanup, rc = -EFAULT);
+ }
+ CDEBUG(D_OTHER, "%s: check slave %lu/%lu\n", obd->obd_name,
+ (unsigned long) body->fid1.id,
+ (unsigned long) body->fid1.generation);
+ dentry = mds_fid2locked_dentry(obd, &body->fid1, NULL, LCK_EX, lockh,
+ &update_mode, NULL, 0,
+ MDS_INODELOCK_UPDATE);
+ if (IS_ERR(dentry)) {
+ CERROR("can't find inode: %d\n", (int) PTR_ERR(dentry));
+ GOTO(cleanup, rc = PTR_ERR(dentry));
+ }
+ cleanup_phase = 1;
+
+ LASSERT(S_ISDIR(dentry->d_inode->i_mode));
+
+ uc.luc_fsuid = body->fsuid;
+ uc.luc_fsgid = body->fsgid;
+ uc.luc_cap = body->capability;
+ uc.luc_suppgid1 = body->suppgid;
+ uc.luc_suppgid2 = -1;
+ push_ctxt(&saved, &obd->obd_lvfs_ctxt, &uc);
+
+ rc = 0;
+ if (!mds_is_dir_empty(obd, dentry))
+ rc = -ENOTEMPTY;
+
+cleanup:
+ switch(cleanup_phase) {
+ case 1:
+ if (rc)
+ ldlm_lock_decref(lockh, LCK_EX);
+ l_dput(dentry);
+ pop_ctxt(&saved, &obd->obd_lvfs_ctxt, &uc);
+ default:
+ break;
+ }
+ RETURN(rc);
+}
+
/* new, local dentry will be added soon. we need no aliases here */
d_drop(new_child);
- child = mds_fid2locked_dentry(obd, rec->ur_fid1, NULL, LCK_EX,
- lockh, NULL, NULL, 0,
- MDS_INODELOCK_UPDATE);
+ if (rec->ur_mode & MDS_MODE_DONT_LOCK) {
+ child = mds_fid2dentry(mds, rec->ur_fid1, NULL);
+ } else {
+ child = mds_fid2locked_dentry(obd, rec->ur_fid1, NULL,
+ LCK_EX, lockh, NULL, NULL, 0,
+ MDS_INODELOCK_UPDATE);
+ }
+
if (IS_ERR(child)) {
CERROR("can't get victim\n");
GOTO(cleanup, rc = PTR_ERR(child));
cleanup:
switch(cleanup_phase) {
case 2:
- ldlm_lock_decref(lockh, LCK_EX);
+ if (!(rec->ur_mode & MDS_MODE_DONT_LOCK))
+ ldlm_lock_decref(lockh, LCK_EX);
dput(child);
case 1:
dput(new_child);
struct lustre_handle *child_lockh,
struct dentry *dchild)
{
+ struct obd_device *obd = req->rq_export->exp_obd;
struct mds_obd *mds = mds_req2mds(req);
struct mdc_op_data op_data;
int rc = 0, cleanup_phase = 0;
struct ptlrpc_request *request = NULL;
+ void *handle;
ENTRY;
LASSERT(offset == 0 || offset == 2);
DEBUG_REQ(D_INODE, req, "unlink %*s (remote inode %u/%u/%u)",
rec->ur_namelen - 1, rec->ur_name, (unsigned)dchild->d_mdsnum,
(unsigned) dchild->d_inum, (unsigned) dchild->d_generation);
+ if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_REPLAY)
+ DEBUG_REQ(D_HA, req, "unlink %*s (remote inode %u/%u/%u)",
+ rec->ur_namelen - 1, rec->ur_name,
+ (unsigned)dchild->d_mdsnum,
+ (unsigned) dchild->d_inum,
+ (unsigned) dchild->d_generation);
/* time to drop i_nlink on remote MDS */
op_data.fid1.mds = dchild->d_mdsnum;
op_data.fid1.id = dchild->d_inum;
op_data.fid1.generation = dchild->d_generation;
op_data.create_mode = rec->ur_mode;
+ if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_REPLAY)
+ op_data.create_mode |= MDS_MODE_REPLAY;
op_data.namelen = 0;
op_data.name = NULL;
rc = md_unlink(mds->mds_lmv_exp, &op_data, &request);
mds_copy_unlink_reply(req, request);
ptlrpc_req_finished(request);
}
- if (rc == 0)
+ if (rc == 0) {
+ handle = fsfilt_start(obd, dparent->d_inode, FSFILT_OP_RMDIR,
+ NULL);
+ if (IS_ERR(handle))
+ GOTO(cleanup, rc = PTR_ERR(handle));
rc = fsfilt_del_dir_entry(req->rq_export->exp_obd, dchild);
+ rc = mds_finish_transno(mds, dparent->d_inode, handle, req,
+ rc, 0);
+ }
+cleanup:
req->rq_status = rc;
#ifdef S_PDIROPS
struct inode *child_inode;
struct lustre_handle parent_lockh[2] = {{0}, {0}};
struct lustre_handle child_lockh = {0}, child_reuse_lockh = {0};
+ struct lustre_handle * slave_lockh = NULL;
char fidname[LL_FID_NAMELEN];
void *handle = NULL;
int rc = 0, log_unlink = 0, cleanup_phase = 0;
unlink_by_fid = 1;
rec->ur_name = fidname;
rc = mds_create_local_dentry(rec, obd);
- LASSERT(rc == 0);
- }
- rc = mds_get_parent_child_locked(obd, mds, rec->ur_fid1,
- parent_lockh, &dparent, LCK_PW,
- MDS_INODELOCK_UPDATE, &update_mode,
- rec->ur_name, rec->ur_namelen,
- &child_lockh, &dchild, LCK_EX,
- MDS_INODELOCK_LOOKUP|MDS_INODELOCK_UPDATE);
+ if (rc == -ENOENT || (rec->ur_mode & MDS_MODE_REPLAY)) {
+ DEBUG_REQ(D_HA, req,
+ "drop nlink on inode %u/%u/%u (replay)",
+ (unsigned) rec->ur_fid1->mds,
+ (unsigned) rec->ur_fid1->id,
+ (unsigned) rec->ur_fid1->generation);
+ req->rq_status = 0;
+ RETURN(0);
+ }
+ }
+
+ if (rec->ur_mode & MDS_MODE_DONT_LOCK) {
+ /* master mds for directory asks slave removing
+ * inode is already locked */
+ dparent = mds_fid2locked_dentry(obd, rec->ur_fid1, NULL,
+ LCK_PW, parent_lockh,
+ &update_mode, rec->ur_name,
+ rec->ur_namelen,
+ MDS_INODELOCK_UPDATE);
+ if (IS_ERR(dparent))
+ GOTO(cleanup, rc = PTR_ERR(dparent));
+ dchild = ll_lookup_one_len(rec->ur_name, dparent,
+ rec->ur_namelen - 1);
+ if (IS_ERR(dchild))
+ GOTO(cleanup, rc = PTR_ERR(dchild));
+ child_lockh.cookie = 0;
+ LASSERT(!(dchild->d_flags & DCACHE_CROSS_REF));
+ LASSERT(dchild->d_inode != NULL);
+ LASSERT(S_ISDIR(dchild->d_inode->i_mode));
+ } else {
+ rc = mds_get_parent_child_locked(obd, mds, rec->ur_fid1,
+ parent_lockh, &dparent,
+ LCK_PW, MDS_INODELOCK_UPDATE,
+ &update_mode, rec->ur_name,
+ rec->ur_namelen, &child_lockh,
+ &dchild, LCK_EX,
+ MDS_INODELOCK_LOOKUP |
+ MDS_INODELOCK_UPDATE);
+ }
if (rc)
GOTO(cleanup, rc);
cleanup_phase = 2; /* dchild has a lock */
+ /* We have to do these checks ourselves, in case we are making an
+ * orphan. The client tells us whether rmdir() or unlink() was called,
+ * so we need to return appropriate errors (bug 72).
+ *
+ * We don't have to check permissions, because vfs_rename (called from
+ * mds_open_unlink_rename) also calls may_delete. */
+ if ((rec->ur_mode & S_IFMT) == S_IFDIR) {
+ if (!S_ISDIR(child_inode->i_mode))
+ GOTO(cleanup, rc = -ENOTDIR);
+ } else {
+ if (S_ISDIR(child_inode->i_mode))
+ GOTO(cleanup, rc = -EISDIR);
+ }
+
+ /* handle splitted dir */
+ rc = mds_lock_slave_objs(obd, dchild, &slave_lockh);
+ if (rc)
+ GOTO(cleanup, rc);
+
/* Step 4: Get a lock on the ino to sync with creation WRT inode
* reuse (see bug 2029). */
rc = mds_lock_new_child(obd, child_inode, &child_reuse_lockh);
}
}
- /* We have to do these checks ourselves, in case we are making an
- * orphan. The client tells us whether rmdir() or unlink() was called,
- * so we need to return appropriate errors (bug 72).
- *
- * We don't have to check permissions, because vfs_rename (called from
- * mds_open_unlink_rename) also calls may_delete. */
- if ((rec->ur_mode & S_IFMT) == S_IFDIR) {
- if (!S_ISDIR(child_inode->i_mode))
- GOTO(cleanup, rc = -ENOTDIR);
- } else {
- if (S_ISDIR(child_inode->i_mode))
- GOTO(cleanup, rc = -EISDIR);
- }
-
/* Step 4: Do the unlink: we already verified ur_mode above (bug 72) */
switch (child_inode->i_mode & S_IFMT) {
case S_IFDIR:
LASSERT(atomic_read(&dchild->d_inode->i_count) > 0);
if (rc == 0 && dchild->d_inode->i_nlink == 0 &&
mds_open_orphan_count(dchild->d_inode) > 0) {
+
/* filesystem is really going to destroy an inode
* we have to delay this till inode is opened -bzzz */
mds_open_unlink_rename(rec, obd, dparent, dchild, NULL);
}
+ /* handle splitted dir */
+ mds_unlink_slave_objs(obd, dchild);
rc = mds_finish_transno(mds, dparent->d_inode, handle, req,
rc, 0);
if (!rc)
else
ptlrpc_save_lock(req, &child_reuse_lockh, LCK_EX);
case 2: /* child lock */
- ldlm_lock_decref(&child_lockh, LCK_EX);
+ mds_unlock_slave_objs(obd, dchild, slave_lockh);
+ if (child_lockh.cookie)
+ ldlm_lock_decref(&child_lockh, LCK_EX);
case 1: /* child and parent dentry, parent lock */
#ifdef S_PDIROPS
if (parent_lockh[1].cookie != 0)
--- /dev/null
+#!/bin/bash
+#
+# Run select tests by setting ONLY, or as arguments to the script.
+# Skip specific tests by setting EXCEPT.
+#
+# e.g. ONLY="22 23" or ONLY="`seq 32 39`" or EXCEPT="31"
+set -e
+
+ONLY=${ONLY:-"$*"}
+# bug number for skipped test: 2108
+ALWAYS_EXCEPT=${ALWAYS_EXCEPT:-""}
+# UPDATE THE COMMENT ABOVE WITH BUG NUMBERS WHEN CHANGING ALWAYS_EXCEPT!
+#case `uname -r` in
+#2.6.*) ALWAYS_EXCEPT="$ALWAYS_EXCEPT 54c 55" # bug 3117
+#esac
+
+[ "$ALWAYS_EXCEPT$EXCEPT" ] && echo "Skipping tests: $ALWAYS_EXCEPT $EXCEPT"
+
+SRCDIR=`dirname $0`
+export PATH=$PWD/$SRCDIR:$SRCDIR:$SRCDIR/../utils:$PATH
+
+TMP=${TMP:-/tmp}
+FSTYPE=${FSTYPE:-ext3}
+
+CHECKSTAT=${CHECKSTAT:-"checkstat -v"}
+CREATETEST=${CREATETEST:-createtest}
+LFS=${LFS:-lfs}
+LSTRIPE=${LSTRIPE:-"$LFS setstripe"}
+LFIND=${LFIND:-"$LFS find"}
+LVERIFY=${LVERIFY:-ll_dirstripe_verify}
+LCTL=${LCTL:-lctl}
+MCREATE=${MCREATE:-mcreate}
+OPENFILE=${OPENFILE:-openfile}
+OPENUNLINK=${OPENUNLINK:-openunlink}
+TOEXCL=${TOEXCL:-toexcl}
+TRUNCATE=${TRUNCATE:-truncate}
+MUNLINK=${MUNLINK:-munlink}
+SOCKETSERVER=${SOCKETSERVER:-socketserver}
+SOCKETCLIENT=${SOCKETCLIENT:-socketclient}
+IOPENTEST1=${IOPENTEST1:-iopentest1}
+IOPENTEST2=${IOPENTEST2:-iopentest2}
+
+if [ $UID -ne 0 ]; then
+ RUNAS_ID="$UID"
+ RUNAS=""
+else
+ RUNAS_ID=${RUNAS_ID:-500}
+ RUNAS=${RUNAS:-"runas -u $RUNAS_ID"}
+fi
+
+export NAME=${NAME:-lmv}
+
+SAVE_PWD=$PWD
+
+clean() {
+ echo -n "cln.."
+ sh llmountcleanup.sh > /dev/null || exit 20
+ I_MOUNTED=no
+}
+CLEAN=${CLEAN:-clean}
+
+start() {
+ echo -n "mnt.."
+ sh llrmount.sh > /dev/null || exit 10
+ I_MOUNTED=yes
+ echo "done"
+}
+START=${START:-start}
+
+log() {
+ echo "$*"
+ lctl mark "$*" 2> /dev/null || true
+}
+
+trace() {
+ log "STARTING: $*"
+ strace -o $TMP/$1.strace -ttt $*
+ RC=$?
+ log "FINISHED: $*: rc $RC"
+ return 1
+}
+TRACE=${TRACE:-""}
+
+check_kernel_version() {
+ VERSION_FILE=/proc/fs/lustre/kernel_version
+ WANT_VER=$1
+ [ ! -f $VERSION_FILE ] && echo "can't find kernel version" && return 1
+ GOT_VER=`cat $VERSION_FILE`
+ [ $GOT_VER -ge $WANT_VER ] && return 0
+ log "test needs at least kernel version $WANT_VER, running $GOT_VER"
+ return 1
+}
+
+run_one() {
+ if ! mount | grep -q $DIR; then
+ $START
+ fi
+ echo -1 >/proc/sys/portals/debug
+ log "== test $1: $2"
+ export TESTNAME=test_$1
+ test_$1 || error "test_$1: exit with rc=$?"
+ unset TESTNAME
+ pass
+ cd $SAVE_PWD
+ $CLEAN
+}
+
+build_test_filter() {
+ for O in $ONLY; do
+ eval ONLY_${O}=true
+ done
+ for E in $EXCEPT $ALWAYS_EXCEPT; do
+ eval EXCEPT_${E}=true
+ done
+}
+
+_basetest() {
+ echo $*
+}
+
+basetest() {
+ IFS=abcdefghijklmnopqrstuvwxyz _basetest $1
+}
+
+run_test() {
+ base=`basetest $1`
+ if [ "$ONLY" ]; then
+ testname=ONLY_$1
+ if [ ${!testname}x != x ]; then
+ run_one $1 "$2"
+ return $?
+ fi
+ testname=ONLY_$base
+ if [ ${!testname}x != x ]; then
+ run_one $1 "$2"
+ return $?
+ fi
+ echo -n "."
+ return 0
+ fi
+ testname=EXCEPT_$1
+ if [ ${!testname}x != x ]; then
+ echo "skipping excluded test $1"
+ return 0
+ fi
+ testname=EXCEPT_$base
+ if [ ${!testname}x != x ]; then
+ echo "skipping excluded test $1 (base $base)"
+ return 0
+ fi
+ run_one $1 "$2"
+ return $?
+}
+
+[ "$SANITYLOG" ] && rm -f $SANITYLOG || true
+
+error() {
+ log "FAIL: $@"
+ if [ "$SANITYLOG" ]; then
+ echo "FAIL: $TESTNAME $@" >> $SANITYLOG
+ else
+ exit 1
+ fi
+}
+
+pass() {
+ echo PASS
+}
+
+MOUNT="`mount | awk '/^'$NAME' .* lustre_lite / { print $3 }'`"
+if [ -z "$MOUNT" ]; then
+ sh llmount.sh
+ MOUNT="`mount | awk '/^'$NAME' .* lustre_lite / { print $3 }'`"
+ [ -z "$MOUNT" ] && error "NAME=$NAME not mounted"
+ I_MOUNTED=yes
+fi
+
+[ `echo $MOUNT | wc -w` -gt 1 ] && error "NAME=$NAME mounted more than once"
+
+DIR=${DIR:-$MOUNT}
+[ -z "`echo $DIR | grep $MOUNT`" ] && echo "$DIR not in $MOUNT" && exit 99
+
+LOVNAME=`cat /proc/fs/lustre/llite/fs0/lov/common_name`
+OSTCOUNT=`cat /proc/fs/lustre/lov/$LOVNAME/numobd`
+STRIPECOUNT=`cat /proc/fs/lustre/lov/$LOVNAME/stripecount`
+STRIPESIZE=`cat /proc/fs/lustre/lov/$LOVNAME/stripesize`
+
+[ -f $DIR/d52a/foo ] && chattr -a $DIR/d52a/foo
+[ -f $DIR/d52b/foo ] && chattr -i $DIR/d52b/foo
+rm -rf $DIR/[Rdfs][1-9]*
+
+build_test_filter
+
+echo preparing for tests involving mounts
+EXT2_DEV=${EXT2_DEV:-/tmp/SANITY.LOOP}
+touch $EXT2_DEV
+mke2fs -j -F $EXT2_DEV 8000 > /dev/null
+
+test_1a() {
+ mkdir $DIR/1a0 || error
+ createmany -o $DIR/1a0/f 4000
+ rmdir $DIR/1a0 && error
+ rm -rf $DIR/1a0 || error
+}
+run_test 1a " remove splitted dir ============================="
+
+test_1b() {
+ mkdir $DIR/1b0 || error
+ createmany -o $DIR/1b0/f 4000
+ find $DIR/1b0 -type f | xargs rm -f
+ NUM=`ls $DIR/1b0 | wc -l`
+ if [ $NUM -ne 0 ] ; then
+ echo "dir must be empty"
+ error
+ fi
+ touch $DIR/1b0/file0
+ touch $DIR/1b0/file1
+ touch $DIR/1b0/file2
+
+ echo "3 files left"
+ rmdir $DIR/1b0 && error
+ rm -f $DIR/1b0/file0
+
+ echo "2 files left"
+ rmdir $DIR/1b0 && error
+ rm -f $DIR/1b0/file1
+
+ echo "1 files left"
+ rmdir $DIR/1b0 && error
+ rm -f $DIR/1b0/file2
+
+ echo "0 files left"
+ rmdir $DIR/1b0 || error
+}
+run_test 1b " remove splitted dir ============================="
+
+test_1c() {
+ mkdir $DIR/1b1 || error
+ createmany -o $DIR/1b1/f 4000
+ find $DIR/1b1 -type f | xargs rm -f
+ NUM=`ls $DIR/1b1 | wc -l`
+ if [ $NUM -ne 0 ] ; then
+ echo "dir must be empty"
+ error
+ fi
+ touch $DIR/1b1/file0
+ touch $DIR/1b1/file1
+ touch $DIR/1b1/file2
+
+ echo "3 files left"
+ rmdir $DIR/1b1 && error
+ rm -f $DIR/1b1/file0
+
+ echo "2 files left"
+ rmdir $DIR/1b1 && error
+ rm -f $DIR/1b1/file1
+
+ echo "1 files left"
+ rmdir $DIR/1b1 && error
+ rm -f $DIR/1b1/file2
+
+ echo "0 files left"
+ rmdir $DIR/1b1 || error
+}
+run_test 1c " remove splitted cross-node dir ============================="
+
+test_2a() {
+ mkdir $DIR/2a0 || error
+ createmany -o $DIR/2a0/f 5000
+ NUM=`ls $DIR/2a0 | wc -l`
+ echo "found $NUM files"
+ if [ $NUM -ne 5000 ]; then
+ echo "wrong number of files: $NUM"
+ error
+ fi
+ rm -rf $DIR/2a0 || error
+}
+run_test 2a " list splitted dir ============================="
+
+TMPDIR=$OLDTMPDIR
+TMP=$OLDTMP
+HOME=$OLDHOME
+
+log "cleanup: ======================================================"
+if [ "`mount | grep ^$NAME`" ]; then
+ rm -rf $DIR/[Rdfs][1-9]*
+ if [ "$I_MOUNTED" = "yes" ]; then
+ sh llmountcleanup.sh || error
+ fi
+fi
+
+echo '=========================== finished ==============================='
+[ -f "$SANITYLOG" ] && cat $SANITYLOG && exit 1 || true