Whamcloud - gitweb
- moved dir entries are deleted from the original dir (master object)
authoralex <alex>
Fri, 11 Jun 2004 15:08:41 +0000 (15:08 +0000)
committeralex <alex>
Fri, 11 Jun 2004 15:08:41 +0000 (15:08 +0000)
- unlink for splitted dir has been implemented. it uses IT_UNLINK to check
  dir's emptiness and works as following: unlink requests comes to mds holding
  dir; mds recognizes splitted dir and issues LCK_EX with IT_UNLINK intent.
  each slave object is checked and locked. if all the slaves and master object
  are empty, then mds unlinks them and unlocks slave objects
- lmv_enqueue() handles splitted dir properly: issues given lock for each object
- lmv_unlink() handles splitted dir properly: unlinks each slave object
- mds_lock_slave_objs(), mds_unlock_slave_objs() and mds_unlink_slave_objs()
  have been introduced to implement splitted dir unlink
- mds_lock_and_check_slave() is IT_UNLINK handler on mds side
- mds_reint_unlink() sets MDS_MODE_REPLAY on "drop nlink" request in replay case
- mds_reint_unlink() recognizes MDS_MODE_DONT_LOCK and doesn't try to lock
  slave object being removed (it gets locked during earlier)
- minor cleanups in lmv to avoid needless debug messages
- sanity-lmv.sh has been added

lustre/include/linux/lustre_idl.h
lustre/lmv/lmv_obd.c
lustre/lvfs/fsfilt_smfs.c
lustre/mdc/mdc_locks.c
lustre/mds/handler.c
lustre/mds/mds_internal.h
lustre/mds/mds_lmv.c
lustre/mds/mds_reint.c
lustre/tests/sanity-lmv.sh [new file with mode: 0644]

index 42eda84..85e6132 100644 (file)
@@ -589,6 +589,9 @@ struct mdc_op_data {
         struct mea *mea2;       /* mea of inode2 */
 };
 
+#define MDS_MODE_DONT_LOCK      (1 << 30)
+#define MDS_MODE_REPLAY         (1 << 31)
+
 struct mds_rec_setattr {
         __u32           sa_opcode;
         __u32           sa_fsuid;
index 5b2d95b..cbad914 100644 (file)
@@ -227,10 +227,9 @@ int lmv_check_connect(struct obd_device *obd) {
         struct lmv_tgt_desc *tgts;
         struct obd_export *exp;
         int rc, rc2, i;
-        ENTRY;
 
         if (lmv->connected)
-                RETURN(0);
+                return 0;
       
         lmv->connected = 1;
         cluuid = &lmv->cluuid;
@@ -300,7 +299,7 @@ int lmv_check_connect(struct obd_device *obd) {
         lmv_set_timeouts(obd);
 
         class_export_put(exp);
-        RETURN (0);
+        return 0;
 
  out_disc:
         while (i-- > 0) {
@@ -708,10 +707,64 @@ int lmv_done_writing(struct obd_export *exp, struct obdo *obdo)
                 RETURN(rc);
 
         /* FIXME: choose right MDC here */
+        CWARN("this method isn't implemented yet\n");
         rc = md_done_writing(lmv->tgts[0].ltd_exp, obdo);
         RETURN(rc);
 }
 
+int lmv_enqueue_slaves(struct obd_export *exp, int locktype,
+                         struct lookup_intent *it, int lockmode,
+                         struct mdc_op_data *data, struct lustre_handle *lockh,
+                         void *lmm, int lmmsize,
+                         ldlm_completion_callback cb_completion,
+                         ldlm_blocking_callback cb_blocking, void *cb_data)
+{
+        struct obd_device *obd = exp->exp_obd;
+        struct lmv_obd *lmv = &obd->u.lmv;
+        struct mea *mea = data->mea1;
+        struct mdc_op_data data2;
+        int i, rc, mds;
+        ENTRY;
+
+        LASSERT(mea != NULL);
+        for (i = 0; i < mea->mea_count; i++) {
+                if (lmv->tgts[i].ltd_exp == NULL)
+                        continue;
+
+                memset(&data2, 0, sizeof(data2));
+                data2.fid1 = mea->mea_fids[i];
+                mds = data2.fid1.mds;
+                rc = md_enqueue(lmv->tgts[mds].ltd_exp, locktype, it, lockmode,
+                                &data2, lockh + i, lmm, lmmsize, cb_completion,
+                                cb_blocking, cb_data);
+                CDEBUG(D_OTHER, "take lock on slave %lu/%lu/%lu -> %d/%d\n",
+                       (unsigned long) mea->mea_fids[i].mds,
+                       (unsigned long) mea->mea_fids[i].id,
+                       (unsigned long) mea->mea_fids[i].generation,
+                       rc, it->d.lustre.it_status);
+                if (rc)
+                        GOTO(cleanup, rc);
+                if (it->d.lustre.it_data) {
+                        struct ptlrpc_request *req;
+                        req = (struct ptlrpc_request *) it->d.lustre.it_data;
+                        ptlrpc_req_finished(req);
+                }
+                
+                if (it->d.lustre.it_status)
+                        GOTO(cleanup, rc = it->d.lustre.it_status);
+        }
+        RETURN(0);
+        
+cleanup:
+        /* drop all taken locks */
+        while (--i >= 0) {
+                if (lockh[i].cookie)
+                        ldlm_lock_decref(lockh + i, lockmode);
+                lockh[i].cookie = 0;
+        }
+        RETURN(rc);
+}
+
 int lmv_enqueue(struct obd_export *exp, int lock_type,
                 struct lookup_intent *it, int lock_mode,
                 struct mdc_op_data *data, struct lustre_handle *lockh,
@@ -729,6 +782,13 @@ int lmv_enqueue(struct obd_export *exp, int lock_type,
         if (rc)
                 RETURN(rc);
 
+        if (it->it_op == IT_UNLINK) {
+                rc = lmv_enqueue_slaves(exp, lock_type, it, lock_mode,
+                                        data, lockh, lmm, lmmsize,
+                                        cb_completion, cb_blocking, cb_data);
+                RETURN(rc);
+        }
+
         if (data->namelen) {
                 obj = lmv_grab_obj(obd, &data->fid1, 0);
                 if (obj) {
@@ -1103,6 +1163,40 @@ int lmv_readpage(struct obd_export *exp, struct ll_fid *mdc_fid,
         RETURN(rc);
 }
 
+int lmv_unlink_slaves(struct obd_export *exp,
+                         struct mdc_op_data *data, struct ptlrpc_request **req)
+{
+        struct obd_device *obd = exp->exp_obd;
+        struct lmv_obd *lmv = &obd->u.lmv;
+        struct mea *mea = data->mea1;
+        struct mdc_op_data data2;
+        int i, rc = 0, mds;
+        ENTRY;
+
+        LASSERT(mea != NULL);
+        for (i = 0; i < mea->mea_count; i++) {
+                if (lmv->tgts[i].ltd_exp == NULL)
+                        continue;
+
+                memset(&data2, 0, sizeof(data2));
+                data2.fid1 = mea->mea_fids[i];
+                data2.create_mode = MDS_MODE_DONT_LOCK | S_IFDIR;
+                mds = data2.fid1.mds;
+                rc = md_unlink(lmv->tgts[mds].ltd_exp, &data2, req);
+                CDEBUG(D_OTHER, "unlink slave %lu/%lu/%lu -> %d\n",
+                       (unsigned long) mea->mea_fids[i].mds,
+                       (unsigned long) mea->mea_fids[i].id,
+                       (unsigned long) mea->mea_fids[i].generation, rc);
+                if (*req) {
+                        ptlrpc_req_finished(*req);
+                        *req = NULL;
+                }
+                if (rc)
+                        break;
+        }
+        RETURN(rc);
+}
+
 int lmv_unlink(struct obd_export *exp, struct mdc_op_data *data,
                struct ptlrpc_request **request)
 {
@@ -1110,12 +1204,15 @@ int lmv_unlink(struct obd_export *exp, struct mdc_op_data *data,
         struct lmv_obd *lmv = &obd->u.lmv;
         int rc, i = 0;
         ENTRY;
-
        rc = lmv_check_connect(obd);
        if (rc)
                RETURN(rc);
 
-        if (data->namelen != 0) {
+        if (data->namelen == 0 && data->mea1 != NULL) {
+                /* mds asks to remove slave objects */
+                rc = lmv_unlink_slaves(exp, data, request);
+                RETURN(rc);
+        } else if (data->namelen != 0) {
                 struct lmv_obj *obj;
                 obj = lmv_grab_obj(obd, &data->fid1, 0);
                 if (obj) {
index 31b6b95..f0059f3 100644 (file)
@@ -825,7 +825,7 @@ static int fsfilt_smfs_get_ino_write_extents(struct super_block *sb, ino_t ino,
                                              char **pbuf, int *size)
 {
         struct fs_extent *fs_extents;
-        struct ldlm_extent *extents;
+        struct ldlm_extent *extents = NULL;
         struct inode *inode;
         struct inode *cache_inode;
         struct fsfilt_operations *cache_fsfilt = NULL;
index 5e6983e..f36f7f4 100644 (file)
@@ -237,23 +237,6 @@ int mdc_enqueue(struct obd_export *exp,
                 /* get ready for the reply */
                 reply_buffers = 3;
                 req->rq_replen = lustre_msg_size(3, repsize);
-        } else if (it->it_op & IT_UNLINK) {
-                size[2] = sizeof(struct mds_rec_unlink);
-                size[3] = data->namelen + 1;
-                req = ptlrpc_prep_req(class_exp2cliimp(exp), LDLM_ENQUEUE, 4,
-                                      size, NULL);
-                if (!req)
-                        RETURN(-ENOMEM);
-
-                /* pack the intent */
-                lit = lustre_msg_buf(req->rq_reqmsg, 1, sizeof (*lit));
-                lit->opc = (__u64)it->it_op;
-
-                /* pack the intended request */
-                mdc_unlink_pack(req->rq_reqmsg, 2, data);
-                /* get ready for the reply */
-                reply_buffers = 4;
-                req->rq_replen = lustre_msg_size(4, repsize);
         } else if (it->it_op & (IT_GETATTR | IT_LOOKUP | IT_CHDIR)) {
                 int valid = OBD_MD_FLNOTOBD | OBD_MD_FLEASIZE;
                 size[2] = sizeof(struct mds_body);
@@ -277,7 +260,7 @@ int mdc_enqueue(struct obd_export *exp,
                 reply_buffers = 3;
                 req->rq_replen = lustre_msg_size(3, repsize);
         } else if (it->it_op == IT_READDIR) {
-               policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
+                policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
                 req = ptlrpc_prep_req(class_exp2cliimp(exp), LDLM_ENQUEUE, 1,
                                       size, NULL);
                 if (!req)
@@ -286,7 +269,25 @@ int mdc_enqueue(struct obd_export *exp,
                 /* get ready for the reply */
                 reply_buffers = 1;
                 req->rq_replen = lustre_msg_size(1, repsize);
-        }  else {
+        } else if (it->it_op == IT_UNLINK) {
+                size[2] = sizeof(struct mds_body);
+                policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
+                req = ptlrpc_prep_req(class_exp2cliimp(exp), LDLM_ENQUEUE, 3,
+                                      size, NULL);
+                if (!req)
+                        RETURN(-ENOMEM);
+
+                /* pack the intended request */
+                mdc_getattr_pack(req->rq_reqmsg, 0,  2, 0, data);
+
+                /* pack the intent */
+                lit = lustre_msg_buf(req->rq_reqmsg, 1, sizeof (*lit));
+                lit->opc = (__u64)it->it_op;
+
+                /* get ready for the reply */
+                reply_buffers = 3;
+                req->rq_replen = lustre_msg_size(3, repsize);
+        } else {
                 LBUG();
                 RETURN(-EINVAL);
         }
index dfdad07..02dfa13 100644 (file)
@@ -182,7 +182,7 @@ int mds_lock_mode_for_dir(struct obd_device *obd,
                 ret_mode = LCK_CW;
                 if (mds_splitting_expected(obd, dentry)) {
                         /* splitting possible. serialize any access */
-                        CERROR("%s: gonna split %lu/%lu\n",
+                        CDEBUG(D_OTHER, "%s: gonna split %lu/%lu\n",
                                obd->obd_name,
                                (unsigned long) dentry->d_inode->i_ino,
                                (unsigned long) dentry->d_inode->i_generation);
@@ -2355,8 +2355,7 @@ static int mds_intent_policy(struct ldlm_namespace *ns,
 
         LDLM_DEBUG(lock, "intent policy, opc: %s", ldlm_it2str(it->opc));
 
-        rc = lustre_pack_reply(req, it->opc == IT_UNLINK ? 4 : 3, repsize,
-                               NULL);
+        rc = lustre_pack_reply(req, 3, repsize, NULL);
         if (rc)
                 RETURN(req->rq_status = rc);
 
@@ -2412,6 +2411,14 @@ static int mds_intent_policy(struct ldlm_namespace *ns,
                         RETURN(ELDLM_LOCK_ABORTED);
                 }
                 break;
+        case IT_UNLINK:
+                rc = mds_lock_and_check_slave(offset, req, &lockh);
+                if ((rep->lock_policy_res2 = rc)) {
+                        if (rc == ENOLCK)
+                                rep->lock_policy_res2 = 0;
+                        RETURN(ELDLM_LOCK_ABORTED);
+                }
+                break;
         default:
                 CERROR("Unhandled intent "LPD64"\n", it->opc);
                 LBUG();
index dddd484..0751a8e 100644 (file)
@@ -130,5 +130,12 @@ int mds_get_lmv_attr(struct obd_device *, struct inode *, struct mea **, int *);
 int mds_choose_mdsnum(struct obd_device *, const char *, int, int);
 int mds_lmv_postsetup(struct obd_device *);
 int mds_splitting_expected(struct obd_device *, struct dentry *);
+int mds_lock_slave_objs(struct obd_device *, struct dentry *,
+                        struct lustre_handle **);
+int mds_unlink_slave_objs(struct obd_device *, struct dentry *);
+void mds_unlock_slave_objs(struct obd_device *, struct dentry *,
+                           struct lustre_handle *);
+int mds_lock_and_check_slave(int, struct ptlrpc_request *, struct lustre_handle *);
+
 
 #endif /* _MDS_INTERNAL_H */
index 9dca6e3..a3a9144 100644 (file)
@@ -176,6 +176,7 @@ int mds_get_lmv_attr(struct obd_device *obd, struct inode *inode,
        }
         if (rc > 0)
                 rc = 0;
+                        
        RETURN(rc);
 }
 
@@ -270,8 +271,9 @@ next:
 static int flush_buffer_onto_mds(struct dirsplit_control *dc, int mdsnum)
 {
         struct mds_obd *mds = &dc->obd->u.mds;
-        struct dir_cache *ca;
         struct list_head *cur, *tmp;
+        struct dir_cache *ca;
+        int rc;
         ENTRY; 
         ca = dc->cache + mdsnum;
 
@@ -294,12 +296,52 @@ static int flush_buffer_onto_mds(struct dirsplit_control *dc, int mdsnum)
                 ca->brwc.count = PAGE_SIZE;
                 ca->brwc.flag = 0;
                 ca->oa.o_mds = mdsnum;
-                obd_brw(OBD_BRW_WRITE, mds->mds_lmv_exp, &ca->oa,
-                                (struct lov_stripe_md *) dc->mea,
-                                1, &ca->brwc, NULL);
+                rc = obd_brw(OBD_BRW_WRITE, mds->mds_lmv_exp, &ca->oa,
+                             (struct lov_stripe_md *) dc->mea,
+                             1, &ca->brwc, NULL);
+                if (rc)
+                        RETURN(rc);
 
-                list_del(&page->list);
-                __free_page(page);
+        }
+        RETURN(0);
+}
+
+static int remove_entries_from_orig_dir(struct dirsplit_control *dc, int mdsnum)
+{
+        struct list_head *cur, *tmp;
+        struct dentry *dentry;
+        struct dir_cache *ca;
+        struct dir_entry *de;
+        struct page *page;
+        char *buf, *end;
+        int rc;
+        ENTRY; 
+
+        ca = dc->cache + mdsnum;
+        list_for_each_safe(cur, tmp, &ca->list) {
+                page = list_entry(cur, struct page, list);
+                buf = page_address(page);
+                end = buf + PAGE_SIZE;
+
+                de = (struct dir_entry *) buf;
+                while ((char *) de < end && de->namelen) {
+                        /* lookup an inode */
+                        LASSERT(de->namelen <= 255);
+
+                        dentry = ll_lookup_one_len(de->name, dc->dentry,
+                                                   de->namelen);
+                        if (IS_ERR(dentry)) {
+                                CERROR("can't lookup %*s: %d\n", de->namelen,
+                                                de->name, (int) PTR_ERR(dentry));
+                                goto next;
+                        }
+                        LASSERT(dentry->d_inode != NULL);
+                        rc = fsfilt_del_dir_entry(dc->obd, dentry);
+                        l_dput(dentry);
+next:
+                        de = (struct dir_entry *)
+                                ((char *) de + DIR_REC_LEN(de->namelen));
+                }
         }
         RETURN(0);
 }
@@ -395,18 +437,42 @@ int scan_and_distribute(struct obd_device *obd, struct dentry *dentry,
         }
 
         err = vfs_readdir(file, filldir, &dc);
-        
         filp_close(file, 0);
+        if (err)
+                GOTO(cleanup, err);
 
         for (i = 0; i < mea->mea_count; i++) {
-                if (dc.cache[i].cached)
-                        flush_buffer_onto_mds(&dc, i);
+                if (!dc.cache[i].cached)
+                        continue;
+                err = flush_buffer_onto_mds(&dc, i);
+                if (err)
+                        GOTO(cleanup, err);
         }
 
+        for (i = 0; i < mea->mea_count; i++) {
+                if (!dc.cache[i].cached)
+                        continue;
+                err = remove_entries_from_orig_dir(&dc, i);
+                if (err)
+                        GOTO(cleanup, err);
+        }
+
+cleanup:
+        for (i = 0; i < mea->mea_count; i++) {
+                struct list_head *cur, *tmp;
+                if (!dc.cache[i].cached)
+                        continue;
+                list_for_each_safe(cur, tmp, &dc.cache[i].list) {
+                        struct page *page;
+                        page = list_entry(cur, struct page, list);
+                        list_del(&page->list);
+                        __free_page(page);
+                }
+        }
         OBD_FREE(dc.cache, sizeof(struct dir_cache) * mea->mea_count);
         OBD_FREE(file_name, nlen);
 
-        return 0;
+        RETURN(err);
 }
 
 #define MAX_DIR_SIZE    (64 * 1024)
@@ -646,8 +712,6 @@ int mds_commitrw(int cmd, struct obd_export *exp, struct obdo *oa,
                         err = fsfilt_add_dir_entry(obd, res->dentry, de->name,
                                                    de->namelen, de->ino,
                                                    de->generation, de->mds);
-                        /* FIXME: remove entries from the original dir */
-#warning "removing entries from the original dir"
                         LASSERT(err == 0);
                         de = (struct dir_entry *)
                                 ((char *) de + DIR_REC_LEN(de->namelen));
@@ -678,3 +742,248 @@ int mds_choose_mdsnum(struct obd_device *obd, const char *name, int len, int fla
         RETURN(i);
 }
 
+int mds_lock_slave_objs(struct obd_device *obd, struct dentry *dentry,
+                        struct lustre_handle **rlockh)
+{
+        struct mds_obd *mds = &obd->u.mds;
+        struct mdc_op_data op_data;
+        struct lookup_intent it;
+        struct mea *mea = NULL;
+        int mea_size, rc;
+
+        LASSERT(rlockh != NULL);
+        LASSERT(dentry != NULL);
+        LASSERT(dentry->d_inode != NULL);
+
+       /* clustered MD ? */
+       if (!mds->mds_lmv_obd)
+               return 0;
+
+        /* a dir can be splitted only */
+        if (!S_ISDIR(dentry->d_inode->i_mode))
+                return 0;
+
+        rc = mds_get_lmv_attr(obd, dentry->d_inode, &mea, &mea_size);
+        if (rc)
+                return rc;
+
+        if (mea == NULL)
+                return 0;
+        if (mea->mea_count == 0) {
+                /* this is slave object */
+                GOTO(cleanup, rc = 0);
+        }
+                
+        CDEBUG(D_OTHER, "%s: lock slaves for %lu/%lu\n", obd->obd_name,
+               (unsigned long) dentry->d_inode->i_ino,
+               (unsigned long) dentry->d_inode->i_generation);
+
+        OBD_ALLOC(*rlockh, sizeof(struct lustre_handle) * mea->mea_count);
+        if (*rlockh == NULL)
+                GOTO(cleanup, rc = -ENOMEM);
+        memset(*rlockh, 0, sizeof(struct lustre_handle) * mea->mea_count);
+
+        memset(&op_data, 0, sizeof(op_data));
+        op_data.mea1 = mea;
+        it.it_op = IT_UNLINK;
+        rc = md_enqueue(mds->mds_lmv_exp, LDLM_IBITS, &it, LCK_EX, &op_data,
+                        *rlockh, NULL, 0, ldlm_completion_ast, mds_blocking_ast,
+                        NULL);
+cleanup:
+        OBD_FREE(mea, mea_size);
+        RETURN(rc);
+}
+
+void mds_unlock_slave_objs(struct obd_device *obd, struct dentry *dentry,
+                        struct lustre_handle *lockh)
+{
+        struct mds_obd *mds = &obd->u.mds;
+        struct mea *mea = NULL;
+        int mea_size, rc, i;
+
+        if (lockh == NULL)
+                return;
+
+       LASSERT(mds->mds_lmv_obd != NULL);
+        LASSERT(S_ISDIR(dentry->d_inode->i_mode));
+
+        rc = mds_get_lmv_attr(obd, dentry->d_inode, &mea, &mea_size);
+        if (rc) {
+                CERROR("locks are leaked\n");
+                return;
+        }
+        LASSERT(mea_size != 0);
+        LASSERT(mea != NULL);
+        LASSERT(mea->mea_count != 0);
+
+        CDEBUG(D_OTHER, "%s: unlock slaves for %lu/%lu\n", obd->obd_name,
+               (unsigned long) dentry->d_inode->i_ino,
+               (unsigned long) dentry->d_inode->i_generation);
+
+        for (i = 0; i < mea->mea_count; i++) {
+                if (lockh[i].cookie != 0)
+                        ldlm_lock_decref(lockh + i, LCK_EX);
+        }
+
+        OBD_FREE(lockh, sizeof(struct lustre_handle) * mea->mea_count);
+        OBD_FREE(mea, mea_size);
+        return;
+}
+
+int mds_unlink_slave_objs(struct obd_device *obd, struct dentry *dentry)
+{
+        struct mds_obd *mds = &obd->u.mds;
+        struct ptlrpc_request *req = NULL;
+        struct mdc_op_data op_data;
+        struct mea *mea = NULL;
+        int mea_size, rc;
+
+       /* clustered MD ? */
+       if (!mds->mds_lmv_obd)
+               return 0;
+
+        /* a dir can be splitted only */
+        if (!S_ISDIR(dentry->d_inode->i_mode))
+                RETURN(0);
+
+        rc = mds_get_lmv_attr(obd, dentry->d_inode, &mea, &mea_size);
+        if (rc)
+                RETURN(rc);
+
+        if (mea == NULL)
+                return 0;
+        if (mea->mea_count == 0)
+                GOTO(cleanup, rc = 0);
+
+        CDEBUG(D_OTHER, "%s: unlink slaves for %lu/%lu\n", obd->obd_name,
+               (unsigned long) dentry->d_inode->i_ino,
+               (unsigned long) dentry->d_inode->i_generation);
+
+        memset(&op_data, 0, sizeof(op_data));
+        op_data.mea1 = mea;
+        rc = md_unlink(mds->mds_lmv_exp, &op_data, &req);
+        LASSERT(req == NULL);
+cleanup:
+        OBD_FREE(mea, mea_size);
+        RETURN(rc);
+}
+
+struct ide_tracking {
+        int entries;
+        int empty;
+};
+
+int mds_ide_filldir(void *__buf, const char *name, int namelen,
+                    loff_t offset, ino_t ino, unsigned int d_type)
+{
+        struct ide_tracking *it = __buf;
+
+        if (ino == 0)
+                return 0;
+
+        it->entries++;
+        if (it->entries > 2)
+                goto noempty;
+        if (namelen > 2)
+                goto noempty;
+        if (name[0] == '.' && namelen == 1)
+                return 0;
+        if (name[0] == '.' && name[1] == '.' && namelen == 2)
+                return 0;
+noempty:
+        it->empty = 0;
+        return -ENOTEMPTY;
+}
+
+int mds_is_dir_empty(struct obd_device *obd, struct dentry *dentry)
+{
+        struct ide_tracking it;
+        struct file * file;
+        char *file_name;
+        int nlen, i, rc;
+        
+        it.entries = 0;
+        it.empty = 1;
+
+        nlen = strlen("__iopen__/") + 10 + 1;
+        OBD_ALLOC(file_name, nlen);
+        if (!file_name)
+                RETURN(-ENOMEM);
+        i = sprintf(file_name, "__iopen__/0x%lx", dentry->d_inode->i_ino);
+
+        file = filp_open(file_name, O_RDONLY, 0);
+        if (IS_ERR(file)) {
+                CERROR("can't open directory %s: %d\n",
+                       file_name, (int) PTR_ERR(file));
+                GOTO(cleanup, rc = PTR_ERR(file));
+        }
+
+        rc = vfs_readdir(file, mds_ide_filldir, &it);
+        filp_close(file, 0);
+
+        if (it.empty && rc == 0)
+                rc = 1;
+        else
+                rc = 0;
+
+cleanup:
+        OBD_FREE(file_name, nlen);
+        return rc;
+}
+
+int mds_lock_and_check_slave(int offset, struct ptlrpc_request *req,
+                             struct lustre_handle *lockh)
+{
+        struct obd_device *obd = req->rq_export->exp_obd;
+        struct dentry *dentry = NULL;
+        struct lvfs_run_ctxt saved;
+        int cleanup_phase = 0;
+        struct mds_body *body;
+        struct lvfs_ucred uc;
+        int rc, update_mode;
+        ENTRY;
+
+        body = lustre_swab_reqbuf(req, offset, sizeof(*body),
+                                  lustre_swab_mds_body);
+        if (body == NULL) {
+                CERROR("Can't swab mds_body\n");
+                GOTO(cleanup, rc = -EFAULT);
+        }
+        CDEBUG(D_OTHER, "%s: check slave %lu/%lu\n", obd->obd_name,
+               (unsigned long) body->fid1.id,
+               (unsigned long) body->fid1.generation);
+        dentry = mds_fid2locked_dentry(obd, &body->fid1, NULL, LCK_EX, lockh,
+                                       &update_mode, NULL, 0,
+                                       MDS_INODELOCK_UPDATE);
+        if (IS_ERR(dentry)) {
+                CERROR("can't find inode: %d\n", (int) PTR_ERR(dentry));
+                GOTO(cleanup, rc = PTR_ERR(dentry));
+        }
+        cleanup_phase = 1;
+
+        LASSERT(S_ISDIR(dentry->d_inode->i_mode));
+
+        uc.luc_fsuid = body->fsuid;
+        uc.luc_fsgid = body->fsgid;
+        uc.luc_cap = body->capability;
+        uc.luc_suppgid1 = body->suppgid;
+        uc.luc_suppgid2 = -1;
+        push_ctxt(&saved, &obd->obd_lvfs_ctxt, &uc);
+
+        rc = 0;
+        if (!mds_is_dir_empty(obd, dentry))
+                rc = -ENOTEMPTY;
+
+cleanup:
+        switch(cleanup_phase) {
+        case 1:
+                if (rc)
+                        ldlm_lock_decref(lockh, LCK_EX);
+                l_dput(dentry);
+                pop_ctxt(&saved, &obd->obd_lvfs_ctxt, &uc);
+        default:
+                break;
+        }
+        RETURN(rc);
+}
+
index 0b2a370..6b12ab4 100644 (file)
@@ -1368,9 +1368,14 @@ int mds_create_local_dentry(struct mds_update_record *rec,
         /* new, local dentry will be added soon. we need no aliases here */
         d_drop(new_child);
 
-        child = mds_fid2locked_dentry(obd, rec->ur_fid1, NULL, LCK_EX,
-                                      lockh, NULL, NULL, 0,
-                                      MDS_INODELOCK_UPDATE);
+        if (rec->ur_mode & MDS_MODE_DONT_LOCK) {
+                child = mds_fid2dentry(mds, rec->ur_fid1, NULL);
+        } else {
+                child = mds_fid2locked_dentry(obd, rec->ur_fid1, NULL,
+                                              LCK_EX, lockh, NULL, NULL, 0,
+                                              MDS_INODELOCK_UPDATE);
+        }
+
         if (IS_ERR(child)) {
                 CERROR("can't get victim\n");
                 GOTO(cleanup, rc = PTR_ERR(child));
@@ -1404,7 +1409,8 @@ int mds_create_local_dentry(struct mds_update_record *rec,
 cleanup:
         switch(cleanup_phase) {
                 case 2:
-                        ldlm_lock_decref(lockh, LCK_EX);
+                        if (!(rec->ur_mode & MDS_MODE_DONT_LOCK))
+                                ldlm_lock_decref(lockh, LCK_EX);
                         dput(child);
                 case 1:
                         dput(new_child);
@@ -1479,10 +1485,12 @@ static int mds_reint_unlink_remote(struct mds_update_record *rec, int offset,
                                    struct lustre_handle *child_lockh,
                                    struct dentry *dchild)
 {
+        struct obd_device *obd = req->rq_export->exp_obd;
         struct mds_obd *mds = mds_req2mds(req);
         struct mdc_op_data op_data;
         int rc = 0, cleanup_phase = 0;
         struct ptlrpc_request *request = NULL;
+        void *handle;
         ENTRY;
 
         LASSERT(offset == 0 || offset == 2);
@@ -1490,12 +1498,20 @@ static int mds_reint_unlink_remote(struct mds_update_record *rec, int offset,
         DEBUG_REQ(D_INODE, req, "unlink %*s (remote inode %u/%u/%u)",
                   rec->ur_namelen - 1, rec->ur_name, (unsigned)dchild->d_mdsnum,
                   (unsigned) dchild->d_inum, (unsigned) dchild->d_generation);
+        if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_REPLAY)
+                DEBUG_REQ(D_HA, req, "unlink %*s (remote inode %u/%u/%u)",
+                          rec->ur_namelen - 1, rec->ur_name,
+                          (unsigned)dchild->d_mdsnum,
+                          (unsigned) dchild->d_inum,
+                          (unsigned) dchild->d_generation);
 
         /* time to drop i_nlink on remote MDS */ 
         op_data.fid1.mds = dchild->d_mdsnum;
         op_data.fid1.id = dchild->d_inum;
         op_data.fid1.generation = dchild->d_generation;
         op_data.create_mode = rec->ur_mode;
+        if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_REPLAY)
+                op_data.create_mode |= MDS_MODE_REPLAY;
         op_data.namelen = 0;
         op_data.name = NULL;
         rc = md_unlink(mds->mds_lmv_exp, &op_data, &request);
@@ -1504,8 +1520,16 @@ static int mds_reint_unlink_remote(struct mds_update_record *rec, int offset,
                 mds_copy_unlink_reply(req, request);
                 ptlrpc_req_finished(request);
         }
-        if (rc == 0)
+        if (rc == 0) {
+                handle = fsfilt_start(obd, dparent->d_inode, FSFILT_OP_RMDIR,
+                                      NULL);
+                if (IS_ERR(handle))
+                        GOTO(cleanup, rc = PTR_ERR(handle));
                 rc = fsfilt_del_dir_entry(req->rq_export->exp_obd, dchild);
+                rc = mds_finish_transno(mds, dparent->d_inode, handle, req,
+                                        rc, 0);
+        }
+cleanup:
         req->rq_status = rc;
 
 #ifdef S_PDIROPS
@@ -1534,6 +1558,7 @@ static int mds_reint_unlink(struct mds_update_record *rec, int offset,
         struct inode *child_inode;
         struct lustre_handle parent_lockh[2] = {{0}, {0}}; 
         struct lustre_handle child_lockh = {0}, child_reuse_lockh = {0};
+        struct lustre_handle * slave_lockh = NULL;
         char fidname[LL_FID_NAMELEN];
         void *handle = NULL;
         int rc = 0, log_unlink = 0, cleanup_phase = 0;
@@ -1556,14 +1581,45 @@ static int mds_reint_unlink(struct mds_update_record *rec, int offset,
                 unlink_by_fid = 1;
                 rec->ur_name = fidname;
                 rc = mds_create_local_dentry(rec, obd);
-                LASSERT(rc == 0);
-        }
-        rc = mds_get_parent_child_locked(obd, mds, rec->ur_fid1,
-                                         parent_lockh, &dparent, LCK_PW,
-                                         MDS_INODELOCK_UPDATE, &update_mode,
-                                         rec->ur_name, rec->ur_namelen,
-                                         &child_lockh, &dchild, LCK_EX,
-                                         MDS_INODELOCK_LOOKUP|MDS_INODELOCK_UPDATE);
+                if (rc == -ENOENT || (rec->ur_mode & MDS_MODE_REPLAY)) {
+                        DEBUG_REQ(D_HA, req,
+                                  "drop nlink on inode %u/%u/%u (replay)",
+                                  (unsigned) rec->ur_fid1->mds,
+                                  (unsigned) rec->ur_fid1->id,
+                                  (unsigned) rec->ur_fid1->generation);
+                        req->rq_status = 0;
+                        RETURN(0);
+                }
+        }
+
+        if (rec->ur_mode & MDS_MODE_DONT_LOCK) {
+                /* master mds for directory asks slave removing
+                 * inode is already locked */
+                dparent = mds_fid2locked_dentry(obd, rec->ur_fid1, NULL,
+                                               LCK_PW, parent_lockh,
+                                               &update_mode, rec->ur_name,
+                                               rec->ur_namelen,
+                                               MDS_INODELOCK_UPDATE);
+                if (IS_ERR(dparent))
+                        GOTO(cleanup, rc = PTR_ERR(dparent));
+                dchild = ll_lookup_one_len(rec->ur_name, dparent,
+                                           rec->ur_namelen - 1);
+                if (IS_ERR(dchild))
+                        GOTO(cleanup, rc = PTR_ERR(dchild));
+                child_lockh.cookie = 0;
+                LASSERT(!(dchild->d_flags & DCACHE_CROSS_REF));
+                LASSERT(dchild->d_inode != NULL);
+                LASSERT(S_ISDIR(dchild->d_inode->i_mode));
+        } else {
+                rc = mds_get_parent_child_locked(obd, mds, rec->ur_fid1,
+                                                 parent_lockh, &dparent,
+                                                 LCK_PW, MDS_INODELOCK_UPDATE,
+                                                 &update_mode, rec->ur_name,
+                                                 rec->ur_namelen, &child_lockh,
+                                                 &dchild, LCK_EX,
+                                                 MDS_INODELOCK_LOOKUP |
+                                                        MDS_INODELOCK_UPDATE);
+        }
         if (rc)
                 GOTO(cleanup, rc);
 
@@ -1588,6 +1644,25 @@ static int mds_reint_unlink(struct mds_update_record *rec, int offset,
 
         cleanup_phase = 2; /* dchild has a lock */
 
+        /* We have to do these checks ourselves, in case we are making an
+         * orphan.  The client tells us whether rmdir() or unlink() was called,
+         * so we need to return appropriate errors (bug 72).
+         *
+         * We don't have to check permissions, because vfs_rename (called from
+         * mds_open_unlink_rename) also calls may_delete. */
+        if ((rec->ur_mode & S_IFMT) == S_IFDIR) {
+                if (!S_ISDIR(child_inode->i_mode))
+                        GOTO(cleanup, rc = -ENOTDIR);
+        } else {
+                if (S_ISDIR(child_inode->i_mode))
+                        GOTO(cleanup, rc = -EISDIR);
+        }
+
+        /* handle splitted dir */
+        rc = mds_lock_slave_objs(obd, dchild, &slave_lockh);
+        if (rc)
+                GOTO(cleanup, rc);
+
         /* Step 4: Get a lock on the ino to sync with creation WRT inode
          * reuse (see bug 2029). */
         rc = mds_lock_new_child(obd, child_inode, &child_reuse_lockh);
@@ -1624,20 +1699,6 @@ static int mds_reint_unlink(struct mds_update_record *rec, int offset,
                 }
         }
 
-        /* We have to do these checks ourselves, in case we are making an
-         * orphan.  The client tells us whether rmdir() or unlink() was called,
-         * so we need to return appropriate errors (bug 72).
-         *
-         * We don't have to check permissions, because vfs_rename (called from
-         * mds_open_unlink_rename) also calls may_delete. */
-        if ((rec->ur_mode & S_IFMT) == S_IFDIR) {
-                if (!S_ISDIR(child_inode->i_mode))
-                        GOTO(cleanup, rc = -ENOTDIR);
-        } else {
-                if (S_ISDIR(child_inode->i_mode))
-                        GOTO(cleanup, rc = -EISDIR);
-        }
-
         /* Step 4: Do the unlink: we already verified ur_mode above (bug 72) */
         switch (child_inode->i_mode & S_IFMT) {
         case S_IFDIR:
@@ -1713,10 +1774,13 @@ static int mds_reint_unlink(struct mds_update_record *rec, int offset,
                 LASSERT(atomic_read(&dchild->d_inode->i_count) > 0);
                 if (rc == 0 && dchild->d_inode->i_nlink == 0 &&
                                 mds_open_orphan_count(dchild->d_inode) > 0) {
+
                         /* filesystem is really going to destroy an inode
                          * we have to delay this till inode is opened -bzzz */
                         mds_open_unlink_rename(rec, obd, dparent, dchild, NULL);
                 }
+                /* handle splitted dir */
+                mds_unlink_slave_objs(obd, dchild);
                 rc = mds_finish_transno(mds, dparent->d_inode, handle, req,
                                         rc, 0);
                 if (!rc)
@@ -1732,7 +1796,9 @@ static int mds_reint_unlink(struct mds_update_record *rec, int offset,
                 else
                         ptlrpc_save_lock(req, &child_reuse_lockh, LCK_EX);
         case 2: /* child lock */
-                ldlm_lock_decref(&child_lockh, LCK_EX);
+                mds_unlock_slave_objs(obd, dchild, slave_lockh);
+                if (child_lockh.cookie)
+                        ldlm_lock_decref(&child_lockh, LCK_EX);
         case 1: /* child and parent dentry, parent lock */
 #ifdef S_PDIROPS
                 if (parent_lockh[1].cookie != 0)
diff --git a/lustre/tests/sanity-lmv.sh b/lustre/tests/sanity-lmv.sh
new file mode 100644 (file)
index 0000000..77eaa92
--- /dev/null
@@ -0,0 +1,293 @@
+#!/bin/bash
+#
+# Run select tests by setting ONLY, or as arguments to the script.
+# Skip specific tests by setting EXCEPT.
+#
+# e.g. ONLY="22 23" or ONLY="`seq 32 39`" or EXCEPT="31"
+set -e
+
+ONLY=${ONLY:-"$*"}
+# bug number for skipped test: 2108
+ALWAYS_EXCEPT=${ALWAYS_EXCEPT:-""}
+# UPDATE THE COMMENT ABOVE WITH BUG NUMBERS WHEN CHANGING ALWAYS_EXCEPT!
+#case `uname -r` in
+#2.6.*) ALWAYS_EXCEPT="$ALWAYS_EXCEPT 54c 55" # bug 3117
+#esac
+
+[ "$ALWAYS_EXCEPT$EXCEPT" ] && echo "Skipping tests: $ALWAYS_EXCEPT $EXCEPT"
+
+SRCDIR=`dirname $0`
+export PATH=$PWD/$SRCDIR:$SRCDIR:$SRCDIR/../utils:$PATH
+
+TMP=${TMP:-/tmp}
+FSTYPE=${FSTYPE:-ext3}
+
+CHECKSTAT=${CHECKSTAT:-"checkstat -v"}
+CREATETEST=${CREATETEST:-createtest}
+LFS=${LFS:-lfs}
+LSTRIPE=${LSTRIPE:-"$LFS setstripe"}
+LFIND=${LFIND:-"$LFS find"}
+LVERIFY=${LVERIFY:-ll_dirstripe_verify}
+LCTL=${LCTL:-lctl}
+MCREATE=${MCREATE:-mcreate}
+OPENFILE=${OPENFILE:-openfile}
+OPENUNLINK=${OPENUNLINK:-openunlink}
+TOEXCL=${TOEXCL:-toexcl}
+TRUNCATE=${TRUNCATE:-truncate}
+MUNLINK=${MUNLINK:-munlink}
+SOCKETSERVER=${SOCKETSERVER:-socketserver}
+SOCKETCLIENT=${SOCKETCLIENT:-socketclient}
+IOPENTEST1=${IOPENTEST1:-iopentest1}
+IOPENTEST2=${IOPENTEST2:-iopentest2}
+
+if [ $UID -ne 0 ]; then
+       RUNAS_ID="$UID"
+       RUNAS=""
+else
+       RUNAS_ID=${RUNAS_ID:-500}
+       RUNAS=${RUNAS:-"runas -u $RUNAS_ID"}
+fi
+
+export NAME=${NAME:-lmv}
+
+SAVE_PWD=$PWD
+
+clean() {
+       echo -n "cln.."
+       sh llmountcleanup.sh > /dev/null || exit 20
+       I_MOUNTED=no
+}
+CLEAN=${CLEAN:-clean}
+
+start() {
+       echo -n "mnt.."
+       sh llrmount.sh > /dev/null || exit 10
+       I_MOUNTED=yes
+       echo "done"
+}
+START=${START:-start}
+
+log() {
+       echo "$*"
+       lctl mark "$*" 2> /dev/null || true
+}
+
+trace() {
+       log "STARTING: $*"
+       strace -o $TMP/$1.strace -ttt $*
+       RC=$?
+       log "FINISHED: $*: rc $RC"
+       return 1
+}
+TRACE=${TRACE:-""}
+
+check_kernel_version() {
+       VERSION_FILE=/proc/fs/lustre/kernel_version
+       WANT_VER=$1
+       [ ! -f $VERSION_FILE ] && echo "can't find kernel version" && return 1
+       GOT_VER=`cat $VERSION_FILE`
+       [ $GOT_VER -ge $WANT_VER ] && return 0
+       log "test needs at least kernel version $WANT_VER, running $GOT_VER"
+       return 1
+}
+
+run_one() {
+       if ! mount | grep -q $DIR; then
+               $START
+       fi
+       echo -1 >/proc/sys/portals/debug        
+       log "== test $1: $2"
+       export TESTNAME=test_$1
+       test_$1 || error "test_$1: exit with rc=$?"
+       unset TESTNAME
+       pass
+       cd $SAVE_PWD
+       $CLEAN
+}
+
+build_test_filter() {
+        for O in $ONLY; do
+            eval ONLY_${O}=true
+        done
+        for E in $EXCEPT $ALWAYS_EXCEPT; do
+            eval EXCEPT_${E}=true
+        done
+}
+
+_basetest() {
+    echo $*
+}
+
+basetest() {
+    IFS=abcdefghijklmnopqrstuvwxyz _basetest $1
+}
+
+run_test() {
+         base=`basetest $1`
+         if [ "$ONLY" ]; then
+                 testname=ONLY_$1
+                 if [ ${!testname}x != x ]; then
+                       run_one $1 "$2"
+                       return $?
+                 fi
+                 testname=ONLY_$base
+                 if [ ${!testname}x != x ]; then
+                         run_one $1 "$2"
+                         return $?
+                 fi
+                 echo -n "."
+                 return 0
+       fi
+        testname=EXCEPT_$1
+        if [ ${!testname}x != x ]; then
+                 echo "skipping excluded test $1"
+                 return 0
+        fi
+        testname=EXCEPT_$base
+        if [ ${!testname}x != x ]; then
+                 echo "skipping excluded test $1 (base $base)"
+                 return 0
+        fi
+        run_one $1 "$2"
+       return $?
+}
+
+[ "$SANITYLOG" ] && rm -f $SANITYLOG || true
+
+error() { 
+       log "FAIL: $@"
+       if [ "$SANITYLOG" ]; then
+               echo "FAIL: $TESTNAME $@" >> $SANITYLOG
+       else
+               exit 1
+       fi
+}
+
+pass() { 
+       echo PASS
+}
+
+MOUNT="`mount | awk '/^'$NAME' .* lustre_lite / { print $3 }'`"
+if [ -z "$MOUNT" ]; then
+       sh llmount.sh
+       MOUNT="`mount | awk '/^'$NAME' .* lustre_lite / { print $3 }'`"
+       [ -z "$MOUNT" ] && error "NAME=$NAME not mounted"
+       I_MOUNTED=yes
+fi
+
+[ `echo $MOUNT | wc -w` -gt 1 ] && error "NAME=$NAME mounted more than once"
+
+DIR=${DIR:-$MOUNT}
+[ -z "`echo $DIR | grep $MOUNT`" ] && echo "$DIR not in $MOUNT" && exit 99
+
+LOVNAME=`cat /proc/fs/lustre/llite/fs0/lov/common_name`
+OSTCOUNT=`cat /proc/fs/lustre/lov/$LOVNAME/numobd`
+STRIPECOUNT=`cat /proc/fs/lustre/lov/$LOVNAME/stripecount`
+STRIPESIZE=`cat /proc/fs/lustre/lov/$LOVNAME/stripesize`
+
+[ -f $DIR/d52a/foo ] && chattr -a $DIR/d52a/foo
+[ -f $DIR/d52b/foo ] && chattr -i $DIR/d52b/foo
+rm -rf $DIR/[Rdfs][1-9]*
+
+build_test_filter
+
+echo preparing for tests involving mounts
+EXT2_DEV=${EXT2_DEV:-/tmp/SANITY.LOOP}
+touch $EXT2_DEV
+mke2fs -j -F $EXT2_DEV 8000 > /dev/null
+
+test_1a() {
+       mkdir $DIR/1a0 || error 
+       createmany -o $DIR/1a0/f 4000
+       rmdir $DIR/1a0 && error
+       rm -rf $DIR/1a0 || error
+}
+run_test 1a " remove splitted dir ============================="
+
+test_1b() {
+       mkdir $DIR/1b0 || error
+       createmany -o $DIR/1b0/f 4000
+       find $DIR/1b0 -type f | xargs rm -f
+       NUM=`ls $DIR/1b0 | wc -l`
+       if [ $NUM -ne 0 ] ; then
+               echo "dir must be empty"
+               error
+       fi
+       touch $DIR/1b0/file0
+       touch $DIR/1b0/file1
+       touch $DIR/1b0/file2
+
+       echo "3 files left"
+       rmdir $DIR/1b0 && error
+       rm -f $DIR/1b0/file0
+
+       echo "2 files left"
+       rmdir $DIR/1b0 && error
+       rm -f $DIR/1b0/file1
+
+       echo "1 files left"
+       rmdir $DIR/1b0 && error
+       rm -f $DIR/1b0/file2
+
+       echo "0 files left"
+       rmdir $DIR/1b0 || error
+}
+run_test 1b " remove splitted dir ============================="
+
+test_1c() {
+       mkdir $DIR/1b1 || error
+       createmany -o $DIR/1b1/f 4000
+       find $DIR/1b1 -type f | xargs rm -f
+       NUM=`ls $DIR/1b1 | wc -l`
+       if [ $NUM -ne 0 ] ; then
+               echo "dir must be empty"
+               error
+       fi
+       touch $DIR/1b1/file0
+       touch $DIR/1b1/file1
+       touch $DIR/1b1/file2
+
+       echo "3 files left"
+       rmdir $DIR/1b1 && error
+       rm -f $DIR/1b1/file0
+
+       echo "2 files left"
+       rmdir $DIR/1b1 && error
+       rm -f $DIR/1b1/file1
+
+       echo "1 files left"
+       rmdir $DIR/1b1 && error
+       rm -f $DIR/1b1/file2
+
+       echo "0 files left"
+       rmdir $DIR/1b1 || error
+}
+run_test 1c " remove splitted cross-node dir ============================="
+
+test_2a() {
+       mkdir $DIR/2a0 || error 
+       createmany -o $DIR/2a0/f 5000
+       NUM=`ls $DIR/2a0 | wc -l`
+       echo "found $NUM files"
+       if [ $NUM -ne 5000 ]; then
+               echo "wrong number of files: $NUM"
+               error
+       fi
+       rm -rf $DIR/2a0 || error
+}
+run_test 2a " list splitted dir ============================="
+
+TMPDIR=$OLDTMPDIR
+TMP=$OLDTMP
+HOME=$OLDHOME
+
+log "cleanup: ======================================================"
+if [ "`mount | grep ^$NAME`" ]; then
+       rm -rf $DIR/[Rdfs][1-9]*
+       if [ "$I_MOUNTED" = "yes" ]; then
+               sh llmountcleanup.sh || error
+       fi
+fi
+
+echo '=========================== finished ==============================='
+[ -f "$SANITYLOG" ] && cat $SANITYLOG && exit 1 || true