From 11a4ea796e0edea3db95642ff5ef19be2817b244 Mon Sep 17 00:00:00 2001 From: alex Date: Fri, 11 Jun 2004 15:08:41 +0000 Subject: [PATCH] - moved dir entries are deleted from the original dir (master object) - unlink for splitted dir has been implemented. it uses IT_UNLINK to check dir's emptiness and works as following: unlink requests comes to mds holding dir; mds recognizes splitted dir and issues LCK_EX with IT_UNLINK intent. each slave object is checked and locked. if all the slaves and master object are empty, then mds unlinks them and unlocks slave objects - lmv_enqueue() handles splitted dir properly: issues given lock for each object - lmv_unlink() handles splitted dir properly: unlinks each slave object - mds_lock_slave_objs(), mds_unlock_slave_objs() and mds_unlink_slave_objs() have been introduced to implement splitted dir unlink - mds_lock_and_check_slave() is IT_UNLINK handler on mds side - mds_reint_unlink() sets MDS_MODE_REPLAY on "drop nlink" request in replay case - mds_reint_unlink() recognizes MDS_MODE_DONT_LOCK and doesn't try to lock slave object being removed (it gets locked during earlier) - minor cleanups in lmv to avoid needless debug messages - sanity-lmv.sh has been added --- lustre/include/linux/lustre_idl.h | 3 + lustre/lmv/lmv_obd.c | 107 +++++++++++- lustre/lvfs/fsfilt_smfs.c | 2 +- lustre/mdc/mdc_locks.c | 39 ++--- lustre/mds/handler.c | 13 +- lustre/mds/mds_internal.h | 7 + lustre/mds/mds_lmv.c | 333 ++++++++++++++++++++++++++++++++++++-- lustre/mds/mds_reint.c | 122 ++++++++++---- lustre/tests/sanity-lmv.sh | 293 +++++++++++++++++++++++++++++++++ 9 files changed, 851 insertions(+), 68 deletions(-) create mode 100644 lustre/tests/sanity-lmv.sh diff --git a/lustre/include/linux/lustre_idl.h b/lustre/include/linux/lustre_idl.h index 42eda84..85e6132 100644 --- a/lustre/include/linux/lustre_idl.h +++ b/lustre/include/linux/lustre_idl.h @@ -589,6 +589,9 @@ struct mdc_op_data { struct mea *mea2; /* mea of inode2 */ }; +#define MDS_MODE_DONT_LOCK (1 << 30) +#define MDS_MODE_REPLAY (1 << 31) + struct mds_rec_setattr { __u32 sa_opcode; __u32 sa_fsuid; diff --git a/lustre/lmv/lmv_obd.c b/lustre/lmv/lmv_obd.c index 5b2d95b..cbad914 100644 --- a/lustre/lmv/lmv_obd.c +++ b/lustre/lmv/lmv_obd.c @@ -227,10 +227,9 @@ int lmv_check_connect(struct obd_device *obd) { struct lmv_tgt_desc *tgts; struct obd_export *exp; int rc, rc2, i; - ENTRY; if (lmv->connected) - RETURN(0); + return 0; lmv->connected = 1; cluuid = &lmv->cluuid; @@ -300,7 +299,7 @@ int lmv_check_connect(struct obd_device *obd) { lmv_set_timeouts(obd); class_export_put(exp); - RETURN (0); + return 0; out_disc: while (i-- > 0) { @@ -708,10 +707,64 @@ int lmv_done_writing(struct obd_export *exp, struct obdo *obdo) RETURN(rc); /* FIXME: choose right MDC here */ + CWARN("this method isn't implemented yet\n"); rc = md_done_writing(lmv->tgts[0].ltd_exp, obdo); RETURN(rc); } +int lmv_enqueue_slaves(struct obd_export *exp, int locktype, + struct lookup_intent *it, int lockmode, + struct mdc_op_data *data, struct lustre_handle *lockh, + void *lmm, int lmmsize, + ldlm_completion_callback cb_completion, + ldlm_blocking_callback cb_blocking, void *cb_data) +{ + struct obd_device *obd = exp->exp_obd; + struct lmv_obd *lmv = &obd->u.lmv; + struct mea *mea = data->mea1; + struct mdc_op_data data2; + int i, rc, mds; + ENTRY; + + LASSERT(mea != NULL); + for (i = 0; i < mea->mea_count; i++) { + if (lmv->tgts[i].ltd_exp == NULL) + continue; + + memset(&data2, 0, sizeof(data2)); + data2.fid1 = mea->mea_fids[i]; + mds = data2.fid1.mds; + rc = md_enqueue(lmv->tgts[mds].ltd_exp, locktype, it, lockmode, + &data2, lockh + i, lmm, lmmsize, cb_completion, + cb_blocking, cb_data); + CDEBUG(D_OTHER, "take lock on slave %lu/%lu/%lu -> %d/%d\n", + (unsigned long) mea->mea_fids[i].mds, + (unsigned long) mea->mea_fids[i].id, + (unsigned long) mea->mea_fids[i].generation, + rc, it->d.lustre.it_status); + if (rc) + GOTO(cleanup, rc); + if (it->d.lustre.it_data) { + struct ptlrpc_request *req; + req = (struct ptlrpc_request *) it->d.lustre.it_data; + ptlrpc_req_finished(req); + } + + if (it->d.lustre.it_status) + GOTO(cleanup, rc = it->d.lustre.it_status); + } + RETURN(0); + +cleanup: + /* drop all taken locks */ + while (--i >= 0) { + if (lockh[i].cookie) + ldlm_lock_decref(lockh + i, lockmode); + lockh[i].cookie = 0; + } + RETURN(rc); +} + int lmv_enqueue(struct obd_export *exp, int lock_type, struct lookup_intent *it, int lock_mode, struct mdc_op_data *data, struct lustre_handle *lockh, @@ -729,6 +782,13 @@ int lmv_enqueue(struct obd_export *exp, int lock_type, if (rc) RETURN(rc); + if (it->it_op == IT_UNLINK) { + rc = lmv_enqueue_slaves(exp, lock_type, it, lock_mode, + data, lockh, lmm, lmmsize, + cb_completion, cb_blocking, cb_data); + RETURN(rc); + } + if (data->namelen) { obj = lmv_grab_obj(obd, &data->fid1, 0); if (obj) { @@ -1103,6 +1163,40 @@ int lmv_readpage(struct obd_export *exp, struct ll_fid *mdc_fid, RETURN(rc); } +int lmv_unlink_slaves(struct obd_export *exp, + struct mdc_op_data *data, struct ptlrpc_request **req) +{ + struct obd_device *obd = exp->exp_obd; + struct lmv_obd *lmv = &obd->u.lmv; + struct mea *mea = data->mea1; + struct mdc_op_data data2; + int i, rc = 0, mds; + ENTRY; + + LASSERT(mea != NULL); + for (i = 0; i < mea->mea_count; i++) { + if (lmv->tgts[i].ltd_exp == NULL) + continue; + + memset(&data2, 0, sizeof(data2)); + data2.fid1 = mea->mea_fids[i]; + data2.create_mode = MDS_MODE_DONT_LOCK | S_IFDIR; + mds = data2.fid1.mds; + rc = md_unlink(lmv->tgts[mds].ltd_exp, &data2, req); + CDEBUG(D_OTHER, "unlink slave %lu/%lu/%lu -> %d\n", + (unsigned long) mea->mea_fids[i].mds, + (unsigned long) mea->mea_fids[i].id, + (unsigned long) mea->mea_fids[i].generation, rc); + if (*req) { + ptlrpc_req_finished(*req); + *req = NULL; + } + if (rc) + break; + } + RETURN(rc); +} + int lmv_unlink(struct obd_export *exp, struct mdc_op_data *data, struct ptlrpc_request **request) { @@ -1110,12 +1204,15 @@ int lmv_unlink(struct obd_export *exp, struct mdc_op_data *data, struct lmv_obd *lmv = &obd->u.lmv; int rc, i = 0; ENTRY; - rc = lmv_check_connect(obd); if (rc) RETURN(rc); - if (data->namelen != 0) { + if (data->namelen == 0 && data->mea1 != NULL) { + /* mds asks to remove slave objects */ + rc = lmv_unlink_slaves(exp, data, request); + RETURN(rc); + } else if (data->namelen != 0) { struct lmv_obj *obj; obj = lmv_grab_obj(obd, &data->fid1, 0); if (obj) { diff --git a/lustre/lvfs/fsfilt_smfs.c b/lustre/lvfs/fsfilt_smfs.c index 31b6b95..f0059f3 100644 --- a/lustre/lvfs/fsfilt_smfs.c +++ b/lustre/lvfs/fsfilt_smfs.c @@ -825,7 +825,7 @@ static int fsfilt_smfs_get_ino_write_extents(struct super_block *sb, ino_t ino, char **pbuf, int *size) { struct fs_extent *fs_extents; - struct ldlm_extent *extents; + struct ldlm_extent *extents = NULL; struct inode *inode; struct inode *cache_inode; struct fsfilt_operations *cache_fsfilt = NULL; diff --git a/lustre/mdc/mdc_locks.c b/lustre/mdc/mdc_locks.c index 5e6983e..f36f7f4 100644 --- a/lustre/mdc/mdc_locks.c +++ b/lustre/mdc/mdc_locks.c @@ -237,23 +237,6 @@ int mdc_enqueue(struct obd_export *exp, /* get ready for the reply */ reply_buffers = 3; req->rq_replen = lustre_msg_size(3, repsize); - } else if (it->it_op & IT_UNLINK) { - size[2] = sizeof(struct mds_rec_unlink); - size[3] = data->namelen + 1; - req = ptlrpc_prep_req(class_exp2cliimp(exp), LDLM_ENQUEUE, 4, - size, NULL); - if (!req) - RETURN(-ENOMEM); - - /* pack the intent */ - lit = lustre_msg_buf(req->rq_reqmsg, 1, sizeof (*lit)); - lit->opc = (__u64)it->it_op; - - /* pack the intended request */ - mdc_unlink_pack(req->rq_reqmsg, 2, data); - /* get ready for the reply */ - reply_buffers = 4; - req->rq_replen = lustre_msg_size(4, repsize); } else if (it->it_op & (IT_GETATTR | IT_LOOKUP | IT_CHDIR)) { int valid = OBD_MD_FLNOTOBD | OBD_MD_FLEASIZE; size[2] = sizeof(struct mds_body); @@ -277,7 +260,7 @@ int mdc_enqueue(struct obd_export *exp, reply_buffers = 3; req->rq_replen = lustre_msg_size(3, repsize); } else if (it->it_op == IT_READDIR) { - policy.l_inodebits.bits = MDS_INODELOCK_UPDATE; + policy.l_inodebits.bits = MDS_INODELOCK_UPDATE; req = ptlrpc_prep_req(class_exp2cliimp(exp), LDLM_ENQUEUE, 1, size, NULL); if (!req) @@ -286,7 +269,25 @@ int mdc_enqueue(struct obd_export *exp, /* get ready for the reply */ reply_buffers = 1; req->rq_replen = lustre_msg_size(1, repsize); - } else { + } else if (it->it_op == IT_UNLINK) { + size[2] = sizeof(struct mds_body); + policy.l_inodebits.bits = MDS_INODELOCK_UPDATE; + req = ptlrpc_prep_req(class_exp2cliimp(exp), LDLM_ENQUEUE, 3, + size, NULL); + if (!req) + RETURN(-ENOMEM); + + /* pack the intended request */ + mdc_getattr_pack(req->rq_reqmsg, 0, 2, 0, data); + + /* pack the intent */ + lit = lustre_msg_buf(req->rq_reqmsg, 1, sizeof (*lit)); + lit->opc = (__u64)it->it_op; + + /* get ready for the reply */ + reply_buffers = 3; + req->rq_replen = lustre_msg_size(3, repsize); + } else { LBUG(); RETURN(-EINVAL); } diff --git a/lustre/mds/handler.c b/lustre/mds/handler.c index dfdad07..02dfa13 100644 --- a/lustre/mds/handler.c +++ b/lustre/mds/handler.c @@ -182,7 +182,7 @@ int mds_lock_mode_for_dir(struct obd_device *obd, ret_mode = LCK_CW; if (mds_splitting_expected(obd, dentry)) { /* splitting possible. serialize any access */ - CERROR("%s: gonna split %lu/%lu\n", + CDEBUG(D_OTHER, "%s: gonna split %lu/%lu\n", obd->obd_name, (unsigned long) dentry->d_inode->i_ino, (unsigned long) dentry->d_inode->i_generation); @@ -2355,8 +2355,7 @@ static int mds_intent_policy(struct ldlm_namespace *ns, LDLM_DEBUG(lock, "intent policy, opc: %s", ldlm_it2str(it->opc)); - rc = lustre_pack_reply(req, it->opc == IT_UNLINK ? 4 : 3, repsize, - NULL); + rc = lustre_pack_reply(req, 3, repsize, NULL); if (rc) RETURN(req->rq_status = rc); @@ -2412,6 +2411,14 @@ static int mds_intent_policy(struct ldlm_namespace *ns, RETURN(ELDLM_LOCK_ABORTED); } break; + case IT_UNLINK: + rc = mds_lock_and_check_slave(offset, req, &lockh); + if ((rep->lock_policy_res2 = rc)) { + if (rc == ENOLCK) + rep->lock_policy_res2 = 0; + RETURN(ELDLM_LOCK_ABORTED); + } + break; default: CERROR("Unhandled intent "LPD64"\n", it->opc); LBUG(); diff --git a/lustre/mds/mds_internal.h b/lustre/mds/mds_internal.h index dddd484..0751a8e 100644 --- a/lustre/mds/mds_internal.h +++ b/lustre/mds/mds_internal.h @@ -130,5 +130,12 @@ int mds_get_lmv_attr(struct obd_device *, struct inode *, struct mea **, int *); int mds_choose_mdsnum(struct obd_device *, const char *, int, int); int mds_lmv_postsetup(struct obd_device *); int mds_splitting_expected(struct obd_device *, struct dentry *); +int mds_lock_slave_objs(struct obd_device *, struct dentry *, + struct lustre_handle **); +int mds_unlink_slave_objs(struct obd_device *, struct dentry *); +void mds_unlock_slave_objs(struct obd_device *, struct dentry *, + struct lustre_handle *); +int mds_lock_and_check_slave(int, struct ptlrpc_request *, struct lustre_handle *); + #endif /* _MDS_INTERNAL_H */ diff --git a/lustre/mds/mds_lmv.c b/lustre/mds/mds_lmv.c index 9dca6e3..a3a9144 100644 --- a/lustre/mds/mds_lmv.c +++ b/lustre/mds/mds_lmv.c @@ -176,6 +176,7 @@ int mds_get_lmv_attr(struct obd_device *obd, struct inode *inode, } if (rc > 0) rc = 0; + RETURN(rc); } @@ -270,8 +271,9 @@ next: static int flush_buffer_onto_mds(struct dirsplit_control *dc, int mdsnum) { struct mds_obd *mds = &dc->obd->u.mds; - struct dir_cache *ca; struct list_head *cur, *tmp; + struct dir_cache *ca; + int rc; ENTRY; ca = dc->cache + mdsnum; @@ -294,12 +296,52 @@ static int flush_buffer_onto_mds(struct dirsplit_control *dc, int mdsnum) ca->brwc.count = PAGE_SIZE; ca->brwc.flag = 0; ca->oa.o_mds = mdsnum; - obd_brw(OBD_BRW_WRITE, mds->mds_lmv_exp, &ca->oa, - (struct lov_stripe_md *) dc->mea, - 1, &ca->brwc, NULL); + rc = obd_brw(OBD_BRW_WRITE, mds->mds_lmv_exp, &ca->oa, + (struct lov_stripe_md *) dc->mea, + 1, &ca->brwc, NULL); + if (rc) + RETURN(rc); - list_del(&page->list); - __free_page(page); + } + RETURN(0); +} + +static int remove_entries_from_orig_dir(struct dirsplit_control *dc, int mdsnum) +{ + struct list_head *cur, *tmp; + struct dentry *dentry; + struct dir_cache *ca; + struct dir_entry *de; + struct page *page; + char *buf, *end; + int rc; + ENTRY; + + ca = dc->cache + mdsnum; + list_for_each_safe(cur, tmp, &ca->list) { + page = list_entry(cur, struct page, list); + buf = page_address(page); + end = buf + PAGE_SIZE; + + de = (struct dir_entry *) buf; + while ((char *) de < end && de->namelen) { + /* lookup an inode */ + LASSERT(de->namelen <= 255); + + dentry = ll_lookup_one_len(de->name, dc->dentry, + de->namelen); + if (IS_ERR(dentry)) { + CERROR("can't lookup %*s: %d\n", de->namelen, + de->name, (int) PTR_ERR(dentry)); + goto next; + } + LASSERT(dentry->d_inode != NULL); + rc = fsfilt_del_dir_entry(dc->obd, dentry); + l_dput(dentry); +next: + de = (struct dir_entry *) + ((char *) de + DIR_REC_LEN(de->namelen)); + } } RETURN(0); } @@ -395,18 +437,42 @@ int scan_and_distribute(struct obd_device *obd, struct dentry *dentry, } err = vfs_readdir(file, filldir, &dc); - filp_close(file, 0); + if (err) + GOTO(cleanup, err); for (i = 0; i < mea->mea_count; i++) { - if (dc.cache[i].cached) - flush_buffer_onto_mds(&dc, i); + if (!dc.cache[i].cached) + continue; + err = flush_buffer_onto_mds(&dc, i); + if (err) + GOTO(cleanup, err); } + for (i = 0; i < mea->mea_count; i++) { + if (!dc.cache[i].cached) + continue; + err = remove_entries_from_orig_dir(&dc, i); + if (err) + GOTO(cleanup, err); + } + +cleanup: + for (i = 0; i < mea->mea_count; i++) { + struct list_head *cur, *tmp; + if (!dc.cache[i].cached) + continue; + list_for_each_safe(cur, tmp, &dc.cache[i].list) { + struct page *page; + page = list_entry(cur, struct page, list); + list_del(&page->list); + __free_page(page); + } + } OBD_FREE(dc.cache, sizeof(struct dir_cache) * mea->mea_count); OBD_FREE(file_name, nlen); - return 0; + RETURN(err); } #define MAX_DIR_SIZE (64 * 1024) @@ -646,8 +712,6 @@ int mds_commitrw(int cmd, struct obd_export *exp, struct obdo *oa, err = fsfilt_add_dir_entry(obd, res->dentry, de->name, de->namelen, de->ino, de->generation, de->mds); - /* FIXME: remove entries from the original dir */ -#warning "removing entries from the original dir" LASSERT(err == 0); de = (struct dir_entry *) ((char *) de + DIR_REC_LEN(de->namelen)); @@ -678,3 +742,248 @@ int mds_choose_mdsnum(struct obd_device *obd, const char *name, int len, int fla RETURN(i); } +int mds_lock_slave_objs(struct obd_device *obd, struct dentry *dentry, + struct lustre_handle **rlockh) +{ + struct mds_obd *mds = &obd->u.mds; + struct mdc_op_data op_data; + struct lookup_intent it; + struct mea *mea = NULL; + int mea_size, rc; + + LASSERT(rlockh != NULL); + LASSERT(dentry != NULL); + LASSERT(dentry->d_inode != NULL); + + /* clustered MD ? */ + if (!mds->mds_lmv_obd) + return 0; + + /* a dir can be splitted only */ + if (!S_ISDIR(dentry->d_inode->i_mode)) + return 0; + + rc = mds_get_lmv_attr(obd, dentry->d_inode, &mea, &mea_size); + if (rc) + return rc; + + if (mea == NULL) + return 0; + if (mea->mea_count == 0) { + /* this is slave object */ + GOTO(cleanup, rc = 0); + } + + CDEBUG(D_OTHER, "%s: lock slaves for %lu/%lu\n", obd->obd_name, + (unsigned long) dentry->d_inode->i_ino, + (unsigned long) dentry->d_inode->i_generation); + + OBD_ALLOC(*rlockh, sizeof(struct lustre_handle) * mea->mea_count); + if (*rlockh == NULL) + GOTO(cleanup, rc = -ENOMEM); + memset(*rlockh, 0, sizeof(struct lustre_handle) * mea->mea_count); + + memset(&op_data, 0, sizeof(op_data)); + op_data.mea1 = mea; + it.it_op = IT_UNLINK; + rc = md_enqueue(mds->mds_lmv_exp, LDLM_IBITS, &it, LCK_EX, &op_data, + *rlockh, NULL, 0, ldlm_completion_ast, mds_blocking_ast, + NULL); +cleanup: + OBD_FREE(mea, mea_size); + RETURN(rc); +} + +void mds_unlock_slave_objs(struct obd_device *obd, struct dentry *dentry, + struct lustre_handle *lockh) +{ + struct mds_obd *mds = &obd->u.mds; + struct mea *mea = NULL; + int mea_size, rc, i; + + if (lockh == NULL) + return; + + LASSERT(mds->mds_lmv_obd != NULL); + LASSERT(S_ISDIR(dentry->d_inode->i_mode)); + + rc = mds_get_lmv_attr(obd, dentry->d_inode, &mea, &mea_size); + if (rc) { + CERROR("locks are leaked\n"); + return; + } + LASSERT(mea_size != 0); + LASSERT(mea != NULL); + LASSERT(mea->mea_count != 0); + + CDEBUG(D_OTHER, "%s: unlock slaves for %lu/%lu\n", obd->obd_name, + (unsigned long) dentry->d_inode->i_ino, + (unsigned long) dentry->d_inode->i_generation); + + for (i = 0; i < mea->mea_count; i++) { + if (lockh[i].cookie != 0) + ldlm_lock_decref(lockh + i, LCK_EX); + } + + OBD_FREE(lockh, sizeof(struct lustre_handle) * mea->mea_count); + OBD_FREE(mea, mea_size); + return; +} + +int mds_unlink_slave_objs(struct obd_device *obd, struct dentry *dentry) +{ + struct mds_obd *mds = &obd->u.mds; + struct ptlrpc_request *req = NULL; + struct mdc_op_data op_data; + struct mea *mea = NULL; + int mea_size, rc; + + /* clustered MD ? */ + if (!mds->mds_lmv_obd) + return 0; + + /* a dir can be splitted only */ + if (!S_ISDIR(dentry->d_inode->i_mode)) + RETURN(0); + + rc = mds_get_lmv_attr(obd, dentry->d_inode, &mea, &mea_size); + if (rc) + RETURN(rc); + + if (mea == NULL) + return 0; + if (mea->mea_count == 0) + GOTO(cleanup, rc = 0); + + CDEBUG(D_OTHER, "%s: unlink slaves for %lu/%lu\n", obd->obd_name, + (unsigned long) dentry->d_inode->i_ino, + (unsigned long) dentry->d_inode->i_generation); + + memset(&op_data, 0, sizeof(op_data)); + op_data.mea1 = mea; + rc = md_unlink(mds->mds_lmv_exp, &op_data, &req); + LASSERT(req == NULL); +cleanup: + OBD_FREE(mea, mea_size); + RETURN(rc); +} + +struct ide_tracking { + int entries; + int empty; +}; + +int mds_ide_filldir(void *__buf, const char *name, int namelen, + loff_t offset, ino_t ino, unsigned int d_type) +{ + struct ide_tracking *it = __buf; + + if (ino == 0) + return 0; + + it->entries++; + if (it->entries > 2) + goto noempty; + if (namelen > 2) + goto noempty; + if (name[0] == '.' && namelen == 1) + return 0; + if (name[0] == '.' && name[1] == '.' && namelen == 2) + return 0; +noempty: + it->empty = 0; + return -ENOTEMPTY; +} + +int mds_is_dir_empty(struct obd_device *obd, struct dentry *dentry) +{ + struct ide_tracking it; + struct file * file; + char *file_name; + int nlen, i, rc; + + it.entries = 0; + it.empty = 1; + + nlen = strlen("__iopen__/") + 10 + 1; + OBD_ALLOC(file_name, nlen); + if (!file_name) + RETURN(-ENOMEM); + i = sprintf(file_name, "__iopen__/0x%lx", dentry->d_inode->i_ino); + + file = filp_open(file_name, O_RDONLY, 0); + if (IS_ERR(file)) { + CERROR("can't open directory %s: %d\n", + file_name, (int) PTR_ERR(file)); + GOTO(cleanup, rc = PTR_ERR(file)); + } + + rc = vfs_readdir(file, mds_ide_filldir, &it); + filp_close(file, 0); + + if (it.empty && rc == 0) + rc = 1; + else + rc = 0; + +cleanup: + OBD_FREE(file_name, nlen); + return rc; +} + +int mds_lock_and_check_slave(int offset, struct ptlrpc_request *req, + struct lustre_handle *lockh) +{ + struct obd_device *obd = req->rq_export->exp_obd; + struct dentry *dentry = NULL; + struct lvfs_run_ctxt saved; + int cleanup_phase = 0; + struct mds_body *body; + struct lvfs_ucred uc; + int rc, update_mode; + ENTRY; + + body = lustre_swab_reqbuf(req, offset, sizeof(*body), + lustre_swab_mds_body); + if (body == NULL) { + CERROR("Can't swab mds_body\n"); + GOTO(cleanup, rc = -EFAULT); + } + CDEBUG(D_OTHER, "%s: check slave %lu/%lu\n", obd->obd_name, + (unsigned long) body->fid1.id, + (unsigned long) body->fid1.generation); + dentry = mds_fid2locked_dentry(obd, &body->fid1, NULL, LCK_EX, lockh, + &update_mode, NULL, 0, + MDS_INODELOCK_UPDATE); + if (IS_ERR(dentry)) { + CERROR("can't find inode: %d\n", (int) PTR_ERR(dentry)); + GOTO(cleanup, rc = PTR_ERR(dentry)); + } + cleanup_phase = 1; + + LASSERT(S_ISDIR(dentry->d_inode->i_mode)); + + uc.luc_fsuid = body->fsuid; + uc.luc_fsgid = body->fsgid; + uc.luc_cap = body->capability; + uc.luc_suppgid1 = body->suppgid; + uc.luc_suppgid2 = -1; + push_ctxt(&saved, &obd->obd_lvfs_ctxt, &uc); + + rc = 0; + if (!mds_is_dir_empty(obd, dentry)) + rc = -ENOTEMPTY; + +cleanup: + switch(cleanup_phase) { + case 1: + if (rc) + ldlm_lock_decref(lockh, LCK_EX); + l_dput(dentry); + pop_ctxt(&saved, &obd->obd_lvfs_ctxt, &uc); + default: + break; + } + RETURN(rc); +} + diff --git a/lustre/mds/mds_reint.c b/lustre/mds/mds_reint.c index 0b2a370..6b12ab4 100644 --- a/lustre/mds/mds_reint.c +++ b/lustre/mds/mds_reint.c @@ -1368,9 +1368,14 @@ int mds_create_local_dentry(struct mds_update_record *rec, /* new, local dentry will be added soon. we need no aliases here */ d_drop(new_child); - child = mds_fid2locked_dentry(obd, rec->ur_fid1, NULL, LCK_EX, - lockh, NULL, NULL, 0, - MDS_INODELOCK_UPDATE); + if (rec->ur_mode & MDS_MODE_DONT_LOCK) { + child = mds_fid2dentry(mds, rec->ur_fid1, NULL); + } else { + child = mds_fid2locked_dentry(obd, rec->ur_fid1, NULL, + LCK_EX, lockh, NULL, NULL, 0, + MDS_INODELOCK_UPDATE); + } + if (IS_ERR(child)) { CERROR("can't get victim\n"); GOTO(cleanup, rc = PTR_ERR(child)); @@ -1404,7 +1409,8 @@ int mds_create_local_dentry(struct mds_update_record *rec, cleanup: switch(cleanup_phase) { case 2: - ldlm_lock_decref(lockh, LCK_EX); + if (!(rec->ur_mode & MDS_MODE_DONT_LOCK)) + ldlm_lock_decref(lockh, LCK_EX); dput(child); case 1: dput(new_child); @@ -1479,10 +1485,12 @@ static int mds_reint_unlink_remote(struct mds_update_record *rec, int offset, struct lustre_handle *child_lockh, struct dentry *dchild) { + struct obd_device *obd = req->rq_export->exp_obd; struct mds_obd *mds = mds_req2mds(req); struct mdc_op_data op_data; int rc = 0, cleanup_phase = 0; struct ptlrpc_request *request = NULL; + void *handle; ENTRY; LASSERT(offset == 0 || offset == 2); @@ -1490,12 +1498,20 @@ static int mds_reint_unlink_remote(struct mds_update_record *rec, int offset, DEBUG_REQ(D_INODE, req, "unlink %*s (remote inode %u/%u/%u)", rec->ur_namelen - 1, rec->ur_name, (unsigned)dchild->d_mdsnum, (unsigned) dchild->d_inum, (unsigned) dchild->d_generation); + if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_REPLAY) + DEBUG_REQ(D_HA, req, "unlink %*s (remote inode %u/%u/%u)", + rec->ur_namelen - 1, rec->ur_name, + (unsigned)dchild->d_mdsnum, + (unsigned) dchild->d_inum, + (unsigned) dchild->d_generation); /* time to drop i_nlink on remote MDS */ op_data.fid1.mds = dchild->d_mdsnum; op_data.fid1.id = dchild->d_inum; op_data.fid1.generation = dchild->d_generation; op_data.create_mode = rec->ur_mode; + if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_REPLAY) + op_data.create_mode |= MDS_MODE_REPLAY; op_data.namelen = 0; op_data.name = NULL; rc = md_unlink(mds->mds_lmv_exp, &op_data, &request); @@ -1504,8 +1520,16 @@ static int mds_reint_unlink_remote(struct mds_update_record *rec, int offset, mds_copy_unlink_reply(req, request); ptlrpc_req_finished(request); } - if (rc == 0) + if (rc == 0) { + handle = fsfilt_start(obd, dparent->d_inode, FSFILT_OP_RMDIR, + NULL); + if (IS_ERR(handle)) + GOTO(cleanup, rc = PTR_ERR(handle)); rc = fsfilt_del_dir_entry(req->rq_export->exp_obd, dchild); + rc = mds_finish_transno(mds, dparent->d_inode, handle, req, + rc, 0); + } +cleanup: req->rq_status = rc; #ifdef S_PDIROPS @@ -1534,6 +1558,7 @@ static int mds_reint_unlink(struct mds_update_record *rec, int offset, struct inode *child_inode; struct lustre_handle parent_lockh[2] = {{0}, {0}}; struct lustre_handle child_lockh = {0}, child_reuse_lockh = {0}; + struct lustre_handle * slave_lockh = NULL; char fidname[LL_FID_NAMELEN]; void *handle = NULL; int rc = 0, log_unlink = 0, cleanup_phase = 0; @@ -1556,14 +1581,45 @@ static int mds_reint_unlink(struct mds_update_record *rec, int offset, unlink_by_fid = 1; rec->ur_name = fidname; rc = mds_create_local_dentry(rec, obd); - LASSERT(rc == 0); - } - rc = mds_get_parent_child_locked(obd, mds, rec->ur_fid1, - parent_lockh, &dparent, LCK_PW, - MDS_INODELOCK_UPDATE, &update_mode, - rec->ur_name, rec->ur_namelen, - &child_lockh, &dchild, LCK_EX, - MDS_INODELOCK_LOOKUP|MDS_INODELOCK_UPDATE); + if (rc == -ENOENT || (rec->ur_mode & MDS_MODE_REPLAY)) { + DEBUG_REQ(D_HA, req, + "drop nlink on inode %u/%u/%u (replay)", + (unsigned) rec->ur_fid1->mds, + (unsigned) rec->ur_fid1->id, + (unsigned) rec->ur_fid1->generation); + req->rq_status = 0; + RETURN(0); + } + } + + if (rec->ur_mode & MDS_MODE_DONT_LOCK) { + /* master mds for directory asks slave removing + * inode is already locked */ + dparent = mds_fid2locked_dentry(obd, rec->ur_fid1, NULL, + LCK_PW, parent_lockh, + &update_mode, rec->ur_name, + rec->ur_namelen, + MDS_INODELOCK_UPDATE); + if (IS_ERR(dparent)) + GOTO(cleanup, rc = PTR_ERR(dparent)); + dchild = ll_lookup_one_len(rec->ur_name, dparent, + rec->ur_namelen - 1); + if (IS_ERR(dchild)) + GOTO(cleanup, rc = PTR_ERR(dchild)); + child_lockh.cookie = 0; + LASSERT(!(dchild->d_flags & DCACHE_CROSS_REF)); + LASSERT(dchild->d_inode != NULL); + LASSERT(S_ISDIR(dchild->d_inode->i_mode)); + } else { + rc = mds_get_parent_child_locked(obd, mds, rec->ur_fid1, + parent_lockh, &dparent, + LCK_PW, MDS_INODELOCK_UPDATE, + &update_mode, rec->ur_name, + rec->ur_namelen, &child_lockh, + &dchild, LCK_EX, + MDS_INODELOCK_LOOKUP | + MDS_INODELOCK_UPDATE); + } if (rc) GOTO(cleanup, rc); @@ -1588,6 +1644,25 @@ static int mds_reint_unlink(struct mds_update_record *rec, int offset, cleanup_phase = 2; /* dchild has a lock */ + /* We have to do these checks ourselves, in case we are making an + * orphan. The client tells us whether rmdir() or unlink() was called, + * so we need to return appropriate errors (bug 72). + * + * We don't have to check permissions, because vfs_rename (called from + * mds_open_unlink_rename) also calls may_delete. */ + if ((rec->ur_mode & S_IFMT) == S_IFDIR) { + if (!S_ISDIR(child_inode->i_mode)) + GOTO(cleanup, rc = -ENOTDIR); + } else { + if (S_ISDIR(child_inode->i_mode)) + GOTO(cleanup, rc = -EISDIR); + } + + /* handle splitted dir */ + rc = mds_lock_slave_objs(obd, dchild, &slave_lockh); + if (rc) + GOTO(cleanup, rc); + /* Step 4: Get a lock on the ino to sync with creation WRT inode * reuse (see bug 2029). */ rc = mds_lock_new_child(obd, child_inode, &child_reuse_lockh); @@ -1624,20 +1699,6 @@ static int mds_reint_unlink(struct mds_update_record *rec, int offset, } } - /* We have to do these checks ourselves, in case we are making an - * orphan. The client tells us whether rmdir() or unlink() was called, - * so we need to return appropriate errors (bug 72). - * - * We don't have to check permissions, because vfs_rename (called from - * mds_open_unlink_rename) also calls may_delete. */ - if ((rec->ur_mode & S_IFMT) == S_IFDIR) { - if (!S_ISDIR(child_inode->i_mode)) - GOTO(cleanup, rc = -ENOTDIR); - } else { - if (S_ISDIR(child_inode->i_mode)) - GOTO(cleanup, rc = -EISDIR); - } - /* Step 4: Do the unlink: we already verified ur_mode above (bug 72) */ switch (child_inode->i_mode & S_IFMT) { case S_IFDIR: @@ -1713,10 +1774,13 @@ static int mds_reint_unlink(struct mds_update_record *rec, int offset, LASSERT(atomic_read(&dchild->d_inode->i_count) > 0); if (rc == 0 && dchild->d_inode->i_nlink == 0 && mds_open_orphan_count(dchild->d_inode) > 0) { + /* filesystem is really going to destroy an inode * we have to delay this till inode is opened -bzzz */ mds_open_unlink_rename(rec, obd, dparent, dchild, NULL); } + /* handle splitted dir */ + mds_unlink_slave_objs(obd, dchild); rc = mds_finish_transno(mds, dparent->d_inode, handle, req, rc, 0); if (!rc) @@ -1732,7 +1796,9 @@ static int mds_reint_unlink(struct mds_update_record *rec, int offset, else ptlrpc_save_lock(req, &child_reuse_lockh, LCK_EX); case 2: /* child lock */ - ldlm_lock_decref(&child_lockh, LCK_EX); + mds_unlock_slave_objs(obd, dchild, slave_lockh); + if (child_lockh.cookie) + ldlm_lock_decref(&child_lockh, LCK_EX); case 1: /* child and parent dentry, parent lock */ #ifdef S_PDIROPS if (parent_lockh[1].cookie != 0) diff --git a/lustre/tests/sanity-lmv.sh b/lustre/tests/sanity-lmv.sh new file mode 100644 index 0000000..77eaa92 --- /dev/null +++ b/lustre/tests/sanity-lmv.sh @@ -0,0 +1,293 @@ +#!/bin/bash +# +# Run select tests by setting ONLY, or as arguments to the script. +# Skip specific tests by setting EXCEPT. +# +# e.g. ONLY="22 23" or ONLY="`seq 32 39`" or EXCEPT="31" +set -e + +ONLY=${ONLY:-"$*"} +# bug number for skipped test: 2108 +ALWAYS_EXCEPT=${ALWAYS_EXCEPT:-""} +# UPDATE THE COMMENT ABOVE WITH BUG NUMBERS WHEN CHANGING ALWAYS_EXCEPT! +#case `uname -r` in +#2.6.*) ALWAYS_EXCEPT="$ALWAYS_EXCEPT 54c 55" # bug 3117 +#esac + +[ "$ALWAYS_EXCEPT$EXCEPT" ] && echo "Skipping tests: $ALWAYS_EXCEPT $EXCEPT" + +SRCDIR=`dirname $0` +export PATH=$PWD/$SRCDIR:$SRCDIR:$SRCDIR/../utils:$PATH + +TMP=${TMP:-/tmp} +FSTYPE=${FSTYPE:-ext3} + +CHECKSTAT=${CHECKSTAT:-"checkstat -v"} +CREATETEST=${CREATETEST:-createtest} +LFS=${LFS:-lfs} +LSTRIPE=${LSTRIPE:-"$LFS setstripe"} +LFIND=${LFIND:-"$LFS find"} +LVERIFY=${LVERIFY:-ll_dirstripe_verify} +LCTL=${LCTL:-lctl} +MCREATE=${MCREATE:-mcreate} +OPENFILE=${OPENFILE:-openfile} +OPENUNLINK=${OPENUNLINK:-openunlink} +TOEXCL=${TOEXCL:-toexcl} +TRUNCATE=${TRUNCATE:-truncate} +MUNLINK=${MUNLINK:-munlink} +SOCKETSERVER=${SOCKETSERVER:-socketserver} +SOCKETCLIENT=${SOCKETCLIENT:-socketclient} +IOPENTEST1=${IOPENTEST1:-iopentest1} +IOPENTEST2=${IOPENTEST2:-iopentest2} + +if [ $UID -ne 0 ]; then + RUNAS_ID="$UID" + RUNAS="" +else + RUNAS_ID=${RUNAS_ID:-500} + RUNAS=${RUNAS:-"runas -u $RUNAS_ID"} +fi + +export NAME=${NAME:-lmv} + +SAVE_PWD=$PWD + +clean() { + echo -n "cln.." + sh llmountcleanup.sh > /dev/null || exit 20 + I_MOUNTED=no +} +CLEAN=${CLEAN:-clean} + +start() { + echo -n "mnt.." + sh llrmount.sh > /dev/null || exit 10 + I_MOUNTED=yes + echo "done" +} +START=${START:-start} + +log() { + echo "$*" + lctl mark "$*" 2> /dev/null || true +} + +trace() { + log "STARTING: $*" + strace -o $TMP/$1.strace -ttt $* + RC=$? + log "FINISHED: $*: rc $RC" + return 1 +} +TRACE=${TRACE:-""} + +check_kernel_version() { + VERSION_FILE=/proc/fs/lustre/kernel_version + WANT_VER=$1 + [ ! -f $VERSION_FILE ] && echo "can't find kernel version" && return 1 + GOT_VER=`cat $VERSION_FILE` + [ $GOT_VER -ge $WANT_VER ] && return 0 + log "test needs at least kernel version $WANT_VER, running $GOT_VER" + return 1 +} + +run_one() { + if ! mount | grep -q $DIR; then + $START + fi + echo -1 >/proc/sys/portals/debug + log "== test $1: $2" + export TESTNAME=test_$1 + test_$1 || error "test_$1: exit with rc=$?" + unset TESTNAME + pass + cd $SAVE_PWD + $CLEAN +} + +build_test_filter() { + for O in $ONLY; do + eval ONLY_${O}=true + done + for E in $EXCEPT $ALWAYS_EXCEPT; do + eval EXCEPT_${E}=true + done +} + +_basetest() { + echo $* +} + +basetest() { + IFS=abcdefghijklmnopqrstuvwxyz _basetest $1 +} + +run_test() { + base=`basetest $1` + if [ "$ONLY" ]; then + testname=ONLY_$1 + if [ ${!testname}x != x ]; then + run_one $1 "$2" + return $? + fi + testname=ONLY_$base + if [ ${!testname}x != x ]; then + run_one $1 "$2" + return $? + fi + echo -n "." + return 0 + fi + testname=EXCEPT_$1 + if [ ${!testname}x != x ]; then + echo "skipping excluded test $1" + return 0 + fi + testname=EXCEPT_$base + if [ ${!testname}x != x ]; then + echo "skipping excluded test $1 (base $base)" + return 0 + fi + run_one $1 "$2" + return $? +} + +[ "$SANITYLOG" ] && rm -f $SANITYLOG || true + +error() { + log "FAIL: $@" + if [ "$SANITYLOG" ]; then + echo "FAIL: $TESTNAME $@" >> $SANITYLOG + else + exit 1 + fi +} + +pass() { + echo PASS +} + +MOUNT="`mount | awk '/^'$NAME' .* lustre_lite / { print $3 }'`" +if [ -z "$MOUNT" ]; then + sh llmount.sh + MOUNT="`mount | awk '/^'$NAME' .* lustre_lite / { print $3 }'`" + [ -z "$MOUNT" ] && error "NAME=$NAME not mounted" + I_MOUNTED=yes +fi + +[ `echo $MOUNT | wc -w` -gt 1 ] && error "NAME=$NAME mounted more than once" + +DIR=${DIR:-$MOUNT} +[ -z "`echo $DIR | grep $MOUNT`" ] && echo "$DIR not in $MOUNT" && exit 99 + +LOVNAME=`cat /proc/fs/lustre/llite/fs0/lov/common_name` +OSTCOUNT=`cat /proc/fs/lustre/lov/$LOVNAME/numobd` +STRIPECOUNT=`cat /proc/fs/lustre/lov/$LOVNAME/stripecount` +STRIPESIZE=`cat /proc/fs/lustre/lov/$LOVNAME/stripesize` + +[ -f $DIR/d52a/foo ] && chattr -a $DIR/d52a/foo +[ -f $DIR/d52b/foo ] && chattr -i $DIR/d52b/foo +rm -rf $DIR/[Rdfs][1-9]* + +build_test_filter + +echo preparing for tests involving mounts +EXT2_DEV=${EXT2_DEV:-/tmp/SANITY.LOOP} +touch $EXT2_DEV +mke2fs -j -F $EXT2_DEV 8000 > /dev/null + +test_1a() { + mkdir $DIR/1a0 || error + createmany -o $DIR/1a0/f 4000 + rmdir $DIR/1a0 && error + rm -rf $DIR/1a0 || error +} +run_test 1a " remove splitted dir =============================" + +test_1b() { + mkdir $DIR/1b0 || error + createmany -o $DIR/1b0/f 4000 + find $DIR/1b0 -type f | xargs rm -f + NUM=`ls $DIR/1b0 | wc -l` + if [ $NUM -ne 0 ] ; then + echo "dir must be empty" + error + fi + touch $DIR/1b0/file0 + touch $DIR/1b0/file1 + touch $DIR/1b0/file2 + + echo "3 files left" + rmdir $DIR/1b0 && error + rm -f $DIR/1b0/file0 + + echo "2 files left" + rmdir $DIR/1b0 && error + rm -f $DIR/1b0/file1 + + echo "1 files left" + rmdir $DIR/1b0 && error + rm -f $DIR/1b0/file2 + + echo "0 files left" + rmdir $DIR/1b0 || error +} +run_test 1b " remove splitted dir =============================" + +test_1c() { + mkdir $DIR/1b1 || error + createmany -o $DIR/1b1/f 4000 + find $DIR/1b1 -type f | xargs rm -f + NUM=`ls $DIR/1b1 | wc -l` + if [ $NUM -ne 0 ] ; then + echo "dir must be empty" + error + fi + touch $DIR/1b1/file0 + touch $DIR/1b1/file1 + touch $DIR/1b1/file2 + + echo "3 files left" + rmdir $DIR/1b1 && error + rm -f $DIR/1b1/file0 + + echo "2 files left" + rmdir $DIR/1b1 && error + rm -f $DIR/1b1/file1 + + echo "1 files left" + rmdir $DIR/1b1 && error + rm -f $DIR/1b1/file2 + + echo "0 files left" + rmdir $DIR/1b1 || error +} +run_test 1c " remove splitted cross-node dir =============================" + +test_2a() { + mkdir $DIR/2a0 || error + createmany -o $DIR/2a0/f 5000 + NUM=`ls $DIR/2a0 | wc -l` + echo "found $NUM files" + if [ $NUM -ne 5000 ]; then + echo "wrong number of files: $NUM" + error + fi + rm -rf $DIR/2a0 || error +} +run_test 2a " list splitted dir =============================" + +TMPDIR=$OLDTMPDIR +TMP=$OLDTMP +HOME=$OLDHOME + +log "cleanup: ======================================================" +if [ "`mount | grep ^$NAME`" ]; then + rm -rf $DIR/[Rdfs][1-9]* + if [ "$I_MOUNTED" = "yes" ]; then + sh llmountcleanup.sh || error + fi +fi + +echo '=========================== finished ===============================' +[ -f "$SANITYLOG" ] && cat $SANITYLOG && exit 1 || true -- 1.8.3.1