Whamcloud - gitweb
- raw_name2idx declaration to avoid warnings
authoralex <alex>
Wed, 19 May 2004 09:00:24 +0000 (09:00 +0000)
committeralex <alex>
Wed, 19 May 2004 09:00:24 +0000 (09:00 +0000)
- ldlm_cli_enqueue() takes 2nd number from resource name into account
  to recognize that returned lock changed. we need this because MDS
  may return LOOKUP lock with the same ino, but another generation.
  the case is very simple mkdir a; then touch a/b; remount; lookup a/b;
  a and b my live on different MDSs and have the same ino
- lmv_handle_remote_inode() changes intent from IT_LOOKUP to IT_GETATTR:
  caller expect to find attributes in reply
- lmv_intent_lookup() chooses right MDS to revalidate the name
- lmv_create() chooses right MDS where to forward request to
- lmv_link() chooses right mds where to forward request to
- lmv_unlink() chooses right mds where to forward request to
- lmv_readpage() removes . and .. from all the pages, but ones that come
  from master MDS for given directory
- lmv_obd_create_single() requests creation of single directory on given MDS
- mdt_obj_create() creates a directory for new mkdir() semantic
- each time new llite connects to MDS, the latter try to connect to own LMV
- mds_pack_inode2body() returns nlink=1 for directories
- mds_lmv_postsetup() pass valid cookiesize to lmv and down to the stack
- mds_reint_create() distributes new dir's inodes over the cluster
- bug in mds_create_local_dentry() fixed: rmdir may cause FIDS/ removal
- mds_get_parents_childer_lock() recognizes cross-ref dentries

12 files changed:
lustre/include/linux/obd_class.h
lustre/ldlm/ldlm_request.c
lustre/lmv/lmv_intent.c
lustre/lmv/lmv_obd.c
lustre/lvfs/fsfilt_ext3.c
lustre/mds/handler.c
lustre/mds/mds_internal.h
lustre/mds/mds_lib.c
lustre/mds/mds_lmv.c
lustre/mds/mds_reint.c
lustre/obdclass/class_obd.c
lustre/obdclass/mea.c

index 98bb2d3..5384a68 100644 (file)
@@ -1350,6 +1350,7 @@ void class_init_uuidlist(void);
 void class_exit_uuidlist(void);
 
 /* mea.c */
-int mea_name2idx(struct mea *mea, char *name, int namelen);
+int mea_name2idx(struct mea *, char *, int);
+int raw_name2idx(int, const char *, int);
 
 #endif /* __LINUX_OBD_CLASS_H */
index 37868ca..914a06a 100644 (file)
@@ -354,7 +354,9 @@ int ldlm_cli_enqueue(struct obd_export *exp,
                 }
 
                 if (reply->lock_desc.l_resource.lr_name.name[0] !=
-                    lock->l_resource->lr_name.name[0]) {
+                    lock->l_resource->lr_name.name[0] ||
+                   reply->lock_desc.l_resource.lr_name.name[1] !=
+                    lock->l_resource->lr_name.name[1]) {
                         CDEBUG(D_INFO, "remote intent success, locking %ld "
                                "instead of %ld\n",
                               (long)reply->lock_desc.l_resource.lr_name.name[0],
index 03b8f3b..81e4baa 100644 (file)
@@ -72,6 +72,12 @@ int lmv_handle_remote_inode(struct obd_export *exp, struct ll_uctxt *uctxt,
                 struct lustre_handle plock;
                 int pmode;
 
+                if (it->it_op == IT_LOOKUP) {
+                        /* unfortunately, we have to lie to MDC/MDS to
+                         * retrieve attributes llite needs */
+                        it->it_op = IT_GETATTR;
+                }
+
                 /* we got LOOKUP lock, but we really need attrs */
                 pmode = it->d.lustre.it_lock_mode;
                 if (pmode) {
@@ -171,10 +177,10 @@ int lmv_intent_open(struct obd_export *exp, struct ll_uctxt *uctxt,
                 rc = lmv_revalidate_slaves(exp, reqp, cfid,
                                 it, 1, cb_blocking);
         } else if (S_ISDIR(body->mode)) {
-                CWARN("hmmm, %lu/%lu/%lu has not lmv obj?!\n",
+                /*CWARN("hmmm, %lu/%lu/%lu has not lmv obj?!\n",
                                 (unsigned long) cfid->mds,
                                 (unsigned long) cfid->id,
-                                (unsigned long) cfid->generation);
+                                (unsigned long) cfid->generation);*/
         }
         lmv_put_obj(obj);
         RETURN(rc);
@@ -433,17 +439,25 @@ int lmv_intent_lookup(struct obd_export *exp, struct ll_uctxt *uctxt,
          * cfid != NULL specifies revalidation */
 
         if (cfid) {
-                /* this is revalidation  during revalidation it's
-                 * enough to return 1 if we think attrs are uptodate
-                 * it may return updated attrs, though */
-                mds = cfid->mds;
+                /* this is revalidation: we have to check is LOOKUP
+                 * lock still valid for given fid. very important
+                 * part is that we have to choose right mds because
+                 * namespace is per mds */
+                rpfid = *pfid;
+                obj = lmv_grab_obj(obd, pfid, 0);
+                if (obj) {
+                        mds = raw_name2idx(obj->objcount, (char *) name, len);
+                        rpfid = obj->objs[mds].fid;
+                        lmv_put_obj(obj);
+                }
+                mds = rpfid.mds;
+                CDEBUG(D_OTHER, "revalidate lookup for %lu/%lu/%lu to %d MDS\n",
+                       (unsigned long) cfid->mds,
+                       (unsigned long) cfid->id,
+                       (unsigned long) cfid->generation, mds);
                 rc = md_intent_lock(lmv->tgts[mds].exp, uctxt, pfid, name,
                                     len, lmm, lmmsize, cfid, it, flags,
                                     reqp, cb_blocking);
-                CDEBUG(D_OTHER, "revalidate lookup for %lu/%lu/%lu = %d\n",
-                       (unsigned long) cfid->mds,
-                       (unsigned long) cfid->id,
-                       (unsigned long) cfid->generation, rc);
                 RETURN(rc);
         }
 
index 89bd1c2..4932ea8 100644 (file)
@@ -33,6 +33,7 @@
 #else
 #include <liblustre.h>
 #endif
+#include <linux/ext2_fs.h>
 
 #include <linux/obd_support.h>
 #include <linux/lustre_lib.h>
@@ -457,7 +458,8 @@ int lmv_create(struct obd_export *exp, struct mdc_op_data *op_data,
         struct lmv_obd *lmv = &obd->u.lmv;
         struct mea *mea = op_data->mea1;
         struct mds_body *mds_body;
-        int rc, i, free_mea = 0;
+        int rc, i, mds, free_mea = 0;
+        struct lmv_obj *obj;
         ENTRY;
         lmv_connect(obd);
         /* TODO: where to create new directories?
@@ -465,17 +467,21 @@ int lmv_create(struct obd_export *exp, struct mdc_op_data *op_data,
          * but we lookup by name may forward any request in slave
          */
 repeat:
-        i = mea_name2idx(mea, (char *) op_data->name, op_data->namelen);
-        if (mea)
-                op_data->fid1 = mea->mea_fids[i];
+        obj = lmv_grab_obj(obd, &op_data->fid1, 0);
+        if (obj) {
+                mds = raw_name2idx(obj->objcount, op_data->name,
+                                        op_data->namelen - 1);
+                op_data->fid1 = obj->objs[mds].fid;
+                lmv_put_obj(obj);
+        }
 
         CDEBUG(D_OTHER, "CREATE '%*s' on %lu/%lu/%lu (mea 0x%p)\n",
                         op_data->namelen, op_data->name,
                         (unsigned long) op_data->fid1.mds,
                         (unsigned long) op_data->fid1.id,
                         (unsigned long) op_data->fid1.generation, mea);
-        rc = md_create(lmv->tgts[i].exp, op_data, data, datalen,
-                            mode, uid, gid, rdev, request);
+        rc = md_create(lmv->tgts[op_data->fid1.mds].exp, op_data, data,
+                       datalen, mode, uid, gid, rdev, request);
         if (rc == 0) {
                 if (*request == NULL)
                      RETURN(rc);
@@ -484,13 +490,16 @@ repeat:
                 LASSERT(mds_body != NULL);
                 CDEBUG(D_OTHER, "created. id = %lu, generation = %lu, mds = %d\n",
                        (unsigned long) mds_body->fid1.id,
-                       (unsigned long) mds_body->fid1.generation, i);
-                LASSERT(mds_body->mds == i);
+                       (unsigned long) mds_body->fid1.generation,
+                       op_data->fid1.mds);
+                LASSERT(mds_body->valid & OBD_MD_MDS ||
+                                mds_body->mds == op_data->fid1.mds);
         } else if (rc == -ESTALE) {
                 struct ptlrpc_request *req = NULL;
                 struct lustre_md md;
                 int mealen;
                 
+                LBUG(); /* FIXME ASAP */
                 CDEBUG(D_OTHER, "it seems MDS splitted dir\n");
                 LASSERT(mea == NULL);
 
@@ -597,30 +606,33 @@ int lmv_link(struct obd_export *exp, struct mdc_op_data *data,
 {
         struct obd_device *obd = exp->exp_obd;
         struct lmv_obd *lmv = &obd->u.lmv;
-        struct mea *mea = data->mea2;
-        int rc, i;
+        struct lmv_obj *obj;
+        int rc;
         ENTRY;
         lmv_connect(obd);
         if (data->namelen != 0) {
                 /* usual link request */
-                i = mea_name2idx(mea, (char *) data->name, data->namelen);
-                if (mea)
-                        data->fid2 = mea->mea_fids[i];
-                CDEBUG(D_OTHER,"link %u/%u/%u:%*s to %u/%u/%u mds %d mea %p\n",
+                obj = lmv_grab_obj(obd, &data->fid1, 0);
+                if (obj) {
+                        rc = raw_name2idx(obj->objcount, data->name,
+                                         data->namelen);
+                        data->fid1 = obj->objs[rc].fid;
+                        lmv_put_obj(obj);
+                }
+                CDEBUG(D_OTHER,"link %u/%u/%u:%*s to %u/%u/%u mds %d\n",
                        (unsigned) data->fid2.mds, (unsigned) data->fid2.id,
                        (unsigned) data->fid2.generation, data->namelen,
                        data->name, (unsigned) data->fid1.mds,
                        (unsigned) data->fid1.id,
-                       (unsigned) data->fid1.generation, i, mea);
+                       (unsigned) data->fid1.generation, data->fid1.mds);
         } else {
                 /* request from MDS to acquire i_links for inode by fid1 */
-                i = data->fid1.mds;
                 CDEBUG(D_OTHER, "inc i_nlinks for %u/%u/%u\n",
                        (unsigned) data->fid1.mds, (unsigned) data->fid1.id,
                        (unsigned) data->fid1.generation);
         }
                         
-        rc = md_link(lmv->tgts[i].exp, data, request);
+        rc = md_link(lmv->tgts[data->fid1.mds].exp, data, request);
         RETURN(rc);
 }
 
@@ -791,6 +803,23 @@ int lmv_dirobj_blocking_ast(struct ldlm_lock *lock,
         RETURN(0);
 }
 
+void lmv_remove_dots(struct page *page)
+{
+        char *kaddr = page_address(page);
+        unsigned limit = PAGE_CACHE_SIZE;
+        unsigned offs, rec_len;
+        struct ext2_dir_entry_2 *p;
+
+        for (offs = 0; offs <= limit - EXT2_DIR_REC_LEN(1); offs += rec_len) {
+                p = (struct ext2_dir_entry_2 *)(kaddr + offs);
+                rec_len = le16_to_cpu(p->rec_len);
+
+                if ((p->name_len == 1 && p->name[0] == '.') ||
+                    (p->name_len == 2 && p->name[0] == '.' && p->name[1] == '.'))
+                        p->inode = 0;
+        }
+}
+
 int lmv_readpage(struct obd_export *exp, struct ll_fid *mdc_fid,
                  __u64 offset, struct page *page,
                  struct ptlrpc_request **request)
@@ -826,11 +855,15 @@ int lmv_readpage(struct obd_export *exp, struct ll_fid *mdc_fid,
                        (unsigned long) offset);
         }
         rc = md_readpage(lmv->tgts[rfid.mds].exp, &rfid, offset, page, request);
+        if (rc == 0 && !fid_equal(&rfid, mdc_fid)) {
+                /* this page isn't from master object. to avoid
+                 * ./.. duplication in directory, we have to remove them
+                 * from all slave objects */
+                lmv_remove_dots(page);
+        }
       
         lmv_put_obj(obj);
 
-#warning "we need fix for duplicate . and .. from slaves"
-
         RETURN(rc);
 }
 
@@ -839,27 +872,30 @@ int lmv_unlink(struct obd_export *exp, struct mdc_op_data *data,
 {
         struct obd_device *obd = exp->exp_obd;
         struct lmv_obd *lmv = &obd->u.lmv;
-        struct mea *mea = data->mea1;
         int rc, i = 0;
         ENTRY;
         lmv_connect(obd);
         if (data->namelen != 0) {
-                i = mea_name2idx(mea, (char *) data->name, data->namelen);
-                if (mea)
-                        data->fid1 = mea->mea_fids[i];
+                struct lmv_obj *obj;
+                obj = lmv_grab_obj(obd, &data->fid1, 0);
+                if (obj) {
+                        i = raw_name2idx(obj->objcount, data->name,
+                                         data->namelen);
+                        data->fid1 = obj->objs[i].fid;
+                        lmv_put_obj(obj);
+                }
                 CDEBUG(D_OTHER, "unlink '%*s' in %lu/%lu/%lu -> %u\n",
                        data->namelen, data->name,
                        (unsigned long) data->fid1.mds,
                        (unsigned long) data->fid1.id,
                        (unsigned long) data->fid1.generation, i);
         } else {
-                i = data->fid1.mds;
                 CDEBUG(D_OTHER, "drop i_nlink on %lu/%lu/%lu\n",
                        (unsigned long) data->fid1.mds,
                        (unsigned long) data->fid1.id,
                        (unsigned long) data->fid1.generation);
         }
-        rc = md_unlink(lmv->tgts[i].exp, data, request); 
+        rc = md_unlink(lmv->tgts[data->fid1.mds].exp, data, request); 
         RETURN(rc);
 }
 
@@ -902,6 +938,26 @@ int lmv_init_ea_size(struct obd_export *exp, int easize, int cookiesize)
         RETURN(rc);
 }
 
+int lmv_obd_create_single(struct obd_export *exp, struct obdo *oa,
+                          struct lov_stripe_md **ea, struct obd_trans_info *oti)
+{
+        struct obd_device *obd = exp->exp_obd;
+        struct lmv_obd *lmv = &obd->u.lmv;
+        struct lov_stripe_md obj_md;
+        struct lov_stripe_md *obj_mdp = &obj_md;
+        int rc = 0;
+        ENTRY;
+        lmv_connect(obd);
+
+        LASSERT(ea == NULL);
+        LASSERT(oa->o_mds < lmv->count);
+
+        rc = obd_create(lmv->tgts[oa->o_mds].exp, oa, &obj_mdp, oti);
+        LASSERT(rc == 0);
+
+        RETURN(rc);
+}
+
 /*
  * to be called from MDS only
  */
@@ -916,8 +972,12 @@ int lmv_obd_create(struct obd_export *exp, struct obdo *oa,
         ENTRY;
         lmv_connect(obd);
 
-        LASSERT(ea != NULL);
         LASSERT(oa != NULL);
+        
+        if (ea == NULL) {
+                rc = lmv_obd_create_single(exp, oa, NULL, oti);
+                RETURN(rc);
+        }
 
         if (*ea == NULL) {
                 rc = obd_alloc_diskmd(exp, (struct lov_mds_md **) ea);
index b154e8a..46eb191 100644 (file)
@@ -925,7 +925,7 @@ static int fsfilt_ext3_add_dir_entry(struct obd_device *obd,
         
         l_dput(dentry);
 
-        return err;
+        RETURN(err);
 #else
 #error "rebuild kernel and lustre with ext3-mds-num patch!"
         LASSERT(0);
index fd7846d..9d9e64e 100644 (file)
@@ -723,9 +723,6 @@ static int mds_getattr_pack_msg(struct ptlrpc_request *req, struct inode *inode,
         return(rc);
 }
 
-#define DENTRY_VALID(dentry)    \
-        ((dentry)->d_inode || ((dentry)->d_flags & DCACHE_CROSS_REF))
-
 static int mds_getattr_name(int offset, struct ptlrpc_request *req,
                             struct lustre_handle *child_lockh, int child_part)
 {
@@ -1182,9 +1179,9 @@ static int mdt_obj_create(struct ptlrpc_request *req)
         char fidname[LL_FID_NAMELEN];
         struct inode *parent_inode;
         struct obd_run_ctxt saved;
-        struct dentry *new_child;
         int err, namelen, mealen;
         struct obd_ucred uc;
+        struct dentry *new;
         struct mea *mea;
         void *handle;
         ENTRY;
@@ -1205,13 +1202,49 @@ static int mdt_obj_create(struct ptlrpc_request *req)
                 RETURN(rc);
 
         repbody = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*repbody));
+
+        if (!(body->oa.o_valid & OBD_MD_FLID)) {
+                /* this is request from another MDS to create remove dir inode */
+                unsigned int tmpname = ll_insecure_random_int();
+
+                handle = fsfilt_start(obd, parent_inode, FSFILT_OP_MKDIR, NULL);
+                LASSERT(!IS_ERR(handle));
+
+                sprintf(fidname, "%u", tmpname);
+                new = simple_mkdir(mds->mds_objects_dir, fidname,
+                                        body->oa.o_mode, 1);
+                LASSERT(!IS_ERR(new));
+                LASSERT(new->d_inode != NULL);
+
+                obdo_from_inode(&repbody->oa, new->d_inode, FILTER_VALID_FLAGS);
+                repbody->oa.o_id = new->d_inode->i_ino;
+                repbody->oa.o_generation = new->d_inode->i_generation;
+                repbody->oa.o_valid |= OBD_MD_FLID | OBD_MD_FLGENER;
+
+                rc = fsfilt_del_dir_entry(obd, new);
+                LASSERT(rc == 0);
+
+                rc = fsfilt_commit(obd, parent_inode, handle, 0);
+                LASSERT(rc == 0);
+
+                CDEBUG(D_OTHER, "created dirobj: %lu/%lu mode %o\n",
+                       (unsigned long) new->d_inode->i_ino,
+                       (unsigned long) new->d_inode->i_generation,
+                       (unsigned) new->d_inode->i_mode);
+
+                l_dput(new);
+                pop_ctxt(&saved, &obd->obd_ctxt, &uc);
+                RETURN(0);
+        }
+
+        repbody = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*repbody));
         memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
        
         namelen = ll_fid2str(fidname, body->oa.o_id, body->oa.o_generation);
         
         down(&parent_inode->i_sem);
-        new_child = lookup_one_len(fidname, mds->mds_objects_dir, namelen);
-        if (new_child->d_inode != NULL) {
+        new = lookup_one_len(fidname, mds->mds_objects_dir, namelen);
+        if (new->d_inode != NULL) {
                 CERROR("impossible non-negative obj dentry " LPU64":%u!\n",
                        repbody->oa.o_id, repbody->oa.o_generation);
                 LBUG();
@@ -1221,7 +1254,7 @@ static int mdt_obj_create(struct ptlrpc_request *req)
         /* FIXME: error handling here */
         LASSERT(!IS_ERR(handle));
 
-        rc = vfs_mkdir(parent_inode, new_child, body->oa.o_mode);
+        rc = vfs_mkdir(parent_inode, new, body->oa.o_mode);
         up(&parent_inode->i_sem);
         /* FIXME: error handling here */
         if (rc)
@@ -1233,14 +1266,14 @@ static int mdt_obj_create(struct ptlrpc_request *req)
         OBD_ALLOC(mea, mealen);
         LASSERT(mea != NULL);
         mea->mea_count = 0;
-        down(&new_child->d_inode->i_sem);
-        handle = fsfilt_start(obd, new_child->d_inode, FSFILT_OP_SETATTR, NULL);
+        down(&new->d_inode->i_sem);
+        handle = fsfilt_start(obd, new->d_inode, FSFILT_OP_SETATTR, NULL);
         LASSERT(!IS_ERR(handle));
-       rc = fsfilt_set_md(obd, new_child->d_inode, handle, mea, mealen);
+       rc = fsfilt_set_md(obd, new->d_inode, handle, mea, mealen);
         LASSERT(rc == 0);
-        fsfilt_commit(obd, new_child->d_inode, handle, 0);
+        fsfilt_commit(obd, new->d_inode, handle, 0);
         LASSERT(rc == 0);
-       up(&new_child->d_inode->i_sem);
+       up(&new->d_inode->i_sem);
         OBD_FREE(mea, mealen);
 
         err = fsfilt_commit(exp->exp_obd, mds->mds_objects_dir->d_inode,
@@ -1248,16 +1281,16 @@ static int mdt_obj_create(struct ptlrpc_request *req)
         /* FIXME: error handling here */
         LASSERT(err == 0);
 
-        obdo_from_inode(&repbody->oa, new_child->d_inode, FILTER_VALID_FLAGS);
-        repbody->oa.o_id = new_child->d_inode->i_ino;
-        repbody->oa.o_generation = new_child->d_inode->i_generation;
+        obdo_from_inode(&repbody->oa, new->d_inode, FILTER_VALID_FLAGS);
+        repbody->oa.o_id = new->d_inode->i_ino;
+        repbody->oa.o_generation = new->d_inode->i_generation;
         CDEBUG(D_OTHER, "created dirobj: %lu, %lu mode %o, uid %u, gid %u\n",
                         (unsigned long) repbody->oa.o_id,
-                        (unsigned long) new_child->d_inode->i_ino,
-                        (unsigned) new_child->d_inode->i_mode,
-                        (unsigned) new_child->d_inode->i_uid,
-                        (unsigned) new_child->d_inode->i_gid);
-        dput(new_child);
+                        (unsigned long) new->d_inode->i_ino,
+                        (unsigned) new->d_inode->i_mode,
+                        (unsigned) new->d_inode->i_uid,
+                        (unsigned) new->d_inode->i_gid);
+        dput(new);
         pop_ctxt(&saved, &obd->obd_ctxt, &uc);
         RETURN(0);
 }
@@ -1322,6 +1355,7 @@ static int mds_set_info(struct obd_export *exp, __u32 keylen,
 {
         struct obd_device *obd;
         struct mds_obd *mds;
+        int rc;
         ENTRY;
 
         obd = class_exp2obd(exp);
@@ -1342,6 +1376,8 @@ static int mds_set_info(struct obd_export *exp, __u32 keylen,
                                         atomic_read(&mds->mds_real_clients));
                         exp->exp_flags |= OBD_OPT_REAL_CLIENT;
                 }
+                rc = mds_lmv_connect(obd, mds->mds_lmv_name);
+                LASSERT(rc == 0);
                 RETURN(0);
         }
 
index cb41877..c07a5c6 100644 (file)
@@ -12,6 +12,8 @@ struct mds_filter_data {
 };
 
 #define MDS_FILTERDATA(inode) ((struct mds_filter_data *)(inode)->i_filterdata)
+#define DENTRY_VALID(dentry)    \
+        ((dentry)->d_inode || ((dentry)->d_flags & DCACHE_CROSS_REF))
 
 static inline struct mds_obd *mds_req2mds(struct ptlrpc_request *req)
 {
@@ -116,5 +118,6 @@ int mds_lmv_disconnect(struct obd_device *obd, int flags);
 int mds_try_to_split_dir(struct obd_device *, struct dentry *, struct mea **,
                          int);
 int mds_get_lmv_attr(struct obd_device *, struct inode *, struct mea **, int *);
+int mds_choose_mdsnum(struct obd_device *, const char *, int);
 
 #endif /* _MDS_INTERNAL_H */
index a407176..327fd70 100644 (file)
@@ -83,7 +83,13 @@ void mds_pack_inode2body(struct obd_device *obd, struct mds_body *b,
         b->flags = inode->i_flags;
         b->rdev = inode->i_rdev;
         /* Return the correct link count for orphan inodes */
-        b->nlink = mds_inode_is_orphan(inode) ? 0 : inode->i_nlink;
+        if (mds_inode_is_orphan(inode)) {
+                b->nlink = 0;
+        } else if (S_ISDIR(inode->i_mode)) {
+                b->nlink = 1;
+        } else {
+                b->nlink = inode->i_nlink;
+        }
         b->generation = inode->i_generation;
         b->suppgid = -1;
         b->mds = obd->u.mds.mds_num;
index 8afb801..3a5c838 100644 (file)
@@ -117,7 +117,8 @@ int mds_lmv_postsetup(struct obd_device *obd)
         struct mds_obd *mds = &obd->u.mds;
         ENTRY;
         if (mds->mds_lmv_exp)
-                obd_init_ea_size(mds->mds_lmv_exp, mds->mds_max_mdsize, 0);
+                obd_init_ea_size(mds->mds_lmv_exp, mds->mds_max_mdsize,
+                                 mds->mds_max_cookiesize);
         RETURN(0);
 }
 
@@ -396,7 +397,7 @@ int scan_and_distribute(struct obd_device *obd, struct dentry *dentry,
         return 0;
 }
 
-#define MAX_DIR_SIZE    (32 * 1024)
+#define MAX_DIR_SIZE    (64 * 1024)
 
 /*
  * must not be called on already splitted directories
@@ -404,15 +405,11 @@ int scan_and_distribute(struct obd_device *obd, struct dentry *dentry,
 int mds_try_to_split_dir(struct obd_device *obd,
                          struct dentry *dentry, struct mea **mea, int nstripes)
 {
-        ldlm_policy_data_t policy = { .l_inodebits = {MDS_INODELOCK_UPDATE}};
-        struct ldlm_res_id res_id = { .name = {0} };
         struct inode *dir = dentry->d_inode;
         struct mds_obd *mds = &obd->u.mds;
-        struct lustre_handle lockh;
         struct mea *tmea = NULL;
         struct obdo *oa = NULL;
-       int rc, flags = 0;
-       int mea_size = 0;
+       int rc, mea_size = 0;
        void *handle;
        ENTRY;
 
@@ -424,7 +421,7 @@ int mds_try_to_split_dir(struct obd_device *obd,
         if (dentry->d_inode->i_ino == mds->mds_rootfid.id)
                 RETURN(0);
 
-#if 0
+#if 1
         if (dir->i_size < MAX_DIR_SIZE)
                 RETURN(0);
 #endif
@@ -445,23 +442,13 @@ int mds_try_to_split_dir(struct obd_device *obd,
            necessary amount of stripes, but on the other hand with this approach
            of allocating maximal possible amount of MDS slots, it would be
            easier to split the dir over more MDSes */
-        rc = obd_alloc_diskmd(mds->mds_lmv_exp, mea);
+        rc = obd_alloc_diskmd(mds->mds_lmv_exp, (void *) mea);
         if (!(*mea))
                 RETURN(-ENOMEM);
         (*mea)->mea_count = nstripes;
+       
+#warning "we have to take EX lock on a dir for splitting"
         
-        /* convert lock on the dir in order tox
-         * invalidate client's attributes -bzzz */
-        res_id.name[0] = dir->i_ino;
-        res_id.name[1] = dir->i_generation;
-        rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace, res_id,
-                              LDLM_IBITS, &policy, LCK_PW, &flags,
-                              mds_blocking_ast, ldlm_completion_ast, NULL, NULL,
-                              NULL, 0, NULL, &lockh);
-        if (rc != ELDLM_OK) {
-                CERROR("error: rc = %d\n", rc);
-        }
-
        /* 1) create directory objects on slave MDS'es */
        /* FIXME: should this be OBD method? */
         oa = obdo_alloc();
@@ -496,8 +483,6 @@ int mds_try_to_split_dir(struct obd_device *obd,
        up(&dir->i_sem);
        obdo_free(oa);
 
-        ldlm_lock_decref(&lockh, LCK_PW);
-
        /* 3) read through the dir and distribute it over objects */
         scan_and_distribute(obd, dentry, *mea);
 
@@ -641,3 +626,13 @@ int mds_commitrw(int cmd, struct obd_export *exp, struct obdo *oa,
         RETURN(rc);
 }
 
+int mds_choose_mdsnum(struct obd_device *obd, const char *name, int len)
+{
+        struct mds_obd *mds = &obd->u.mds;
+        struct lmv_obd *lmv = &mds->mds_lmv_exp->exp_obd->u.lmv;
+        int i;
+
+        i = raw_name2idx(lmv->count, name, len);
+        RETURN(i);
+}
+
index 9f7dd3a..d3615d4 100644 (file)
@@ -625,24 +625,74 @@ static int mds_reint_create(struct mds_update_record *rec, int offset,
         }
         case S_IFDIR:{
                 int nstripes = 0;
-                handle = fsfilt_start(obd, dir, FSFILT_OP_MKDIR, NULL);
-                if (IS_ERR(handle))
-                        GOTO(cleanup, rc = PTR_ERR(handle));
+                int i;
+                
+                /* as Peter asked, mkdir() should distribute new directories
+                 * over the whole cluster in order to distribute namespace
+                 * processing load. first, we calculate which MDS to use to
+                 * put new directory's inode in */
+                i = mds_choose_mdsnum(obd, rec->ur_name, rec->ur_namelen - 1);
+                if (i == mds->mds_num) {
+                        /* inode will be created locally */
 
-                rc = vfs_mkdir(dir, dchild, rec->ur_mode);
+                        handle = fsfilt_start(obd, dir, FSFILT_OP_MKDIR, NULL);
+                        if (IS_ERR(handle))
+                                GOTO(cleanup, rc = PTR_ERR(handle));
 
-                if (rec->ur_eadata)
-                        nstripes = *(u16 *)rec->ur_eadata;
+                        rc = vfs_mkdir(dir, dchild, rec->ur_mode);
+
+                        if (rec->ur_eadata)
+                                nstripes = *(u16 *)rec->ur_eadata;
 
 #if 1
-                /* this is for current testing yet. after the testing
-                 * directory will split if size reaches some limite -bzzz */
-                if (rc == 0) {
+                        /* this is for current testing yet. after the testing
+                         * directory will split if size reaches some limite -bzzz */
+                        if (rc == 0) {
 #else
-                if (rc == 0 && nstripes) {
+                        if (rc == 0 && nstripes) {
 #endif
-                        /* FIXME: error handling here */
-                        mds_try_to_split_dir(obd, dchild, NULL, nstripes);
+                                /* FIXME: error handling here */
+                                mds_try_to_split_dir(obd, dchild, NULL, nstripes);
+                        }
+                } else if (!DENTRY_VALID(dchild)) {
+                        /* inode will be created on another MDS */
+                        struct obdo *oa = NULL;
+                        struct mds_body *body;
+                        
+                        /* first, create that inode */
+                        oa = obdo_alloc();
+                        LASSERT(oa != NULL);
+                        oa->o_mds = i;
+                        obdo_from_inode(oa, dir, OBD_MD_FLTYPE | OBD_MD_FLATIME |
+                                        OBD_MD_FLMTIME | OBD_MD_FLCTIME |
+                                        OBD_MD_FLUID | OBD_MD_FLGID);
+                        oa->o_mode = dir->i_mode;
+                        CDEBUG(D_OTHER, "%s: create dir on MDS %u\n",
+                                        obd->obd_name, i);
+                        rc = obd_create(mds->mds_lmv_exp, oa, NULL, NULL);
+                       LASSERT(rc == 0);
+                        
+                        /* now, add new dir entry for it */
+                        handle = fsfilt_start(obd, dir, FSFILT_OP_MKDIR, NULL);
+                        if (IS_ERR(handle))
+                                GOTO(cleanup, rc = PTR_ERR(handle));
+                        rc = fsfilt_add_dir_entry(obd, dparent, rec->ur_name,
+                                                  rec->ur_namelen - 1,
+                                                  oa->o_id, oa->o_generation,
+                                                  i);
+                        LASSERT(rc == 0);
+
+                        /* fill reply */
+                        body = lustre_msg_buf(req->rq_repmsg,
+                                              offset, sizeof (*body));
+                        body->valid |= OBD_MD_FLID | OBD_MD_MDS;
+                        body->fid1.id = oa->o_id;
+                        body->fid1.mds = i;
+                        body->fid1.generation = oa->o_generation;
+                       obdo_free(oa);
+                } else {
+                        /* requested name exists in the directory */
+                        rc = -EEXIST;
                 }
                 EXIT;
                 break;
@@ -683,7 +733,7 @@ static int mds_reint_create(struct mds_update_record *rec, int offset,
         if (rc) {
                 CDEBUG(D_INODE, "error during create: %d\n", rc);
                 GOTO(cleanup, rc);
-        } else {
+        } else if (dchild->d_inode) {
                 struct iattr iattr;
                 struct inode *inode = dchild->d_inode;
                 struct mds_body *body;
@@ -1249,7 +1299,6 @@ int mds_create_local_dentry(struct mds_update_record *rec,
         char *fidname = rec->ur_name;
         struct dentry *child = NULL;
         struct lustre_handle lockh;
-        unsigned mode;
         void *handle;
         ENTRY;
 
@@ -1301,8 +1350,13 @@ int mds_create_local_dentry(struct mds_update_record *rec,
                 CERROR("error linking orphan %lu/%lu to FIDS: rc = %d\n",
                        (unsigned long) child->d_inode->i_ino,
                        (unsigned long) child->d_inode->i_generation, rc);
-        else
+        else {
+                if (S_ISDIR(child->d_inode->i_mode)) {
+                        fids_dir->i_nlink++;
+                        mark_inode_dirty(fids_dir);
+                }
                 mark_inode_dirty(child->d_inode);
+        }
         fsfilt_commit(obd, fids_dir, handle, 0);
 
         rec->ur_fid1->id = fids_dir->i_ino;
@@ -1325,7 +1379,6 @@ cleanup:
 static int mds_copy_unlink_reply(struct ptlrpc_request *master,
                                         struct ptlrpc_request *slave)
 {
-        struct lov_mds_md *eadata;
         void *cookie, *cookie2;
         struct mds_body *body2;
         struct mds_body *body;
@@ -1339,7 +1392,6 @@ static int mds_copy_unlink_reply(struct ptlrpc_request *master,
         LASSERT(body2 != NULL);
 
         if (!(body->valid & (OBD_MD_FLID | OBD_MD_FLGENER))) {
-                CWARN("empty reply\n");
                 RETURN(0);
         }
 
@@ -1395,8 +1447,6 @@ static int mds_reint_unlink_remote(struct mds_update_record *rec, int offset,
 
         LASSERT(offset == 0 || offset == 2);
 
-        DEBUG_REQ(D_INODE, req, "parent ino "LPU64"/%u, child %s",
-                  rec->ur_fid1->id, rec->ur_fid1->generation, rec->ur_name);
         DEBUG_REQ(D_INODE, req, "unlink %*s (remote inode %u/%u/%u)\n",
                   rec->ur_namelen - 1, rec->ur_name, (unsigned)dchild->d_mdsnum,
                   (unsigned) dchild->d_inum, (unsigned) dchild->d_generation);
@@ -1480,7 +1530,7 @@ static int mds_reint_unlink(struct mds_update_record *rec, int offset,
                 LASSERT(unlink_by_fid == 0);
                 LASSERT(dchild->d_mdsnum != mds->mds_num);
                 mds_reint_unlink_remote(rec, offset, req, parent_lockh,
-                                        dparent, &child_lockh, dchild);
+                                             dparent, &child_lockh, dchild);
                 RETURN(0);
         }
 
@@ -2143,14 +2193,18 @@ static int mds_get_parents_children_locked(struct obd_device *obd,
         cleanup_phase = 3; /* original name dentry */
 
         inode = (*de_oldp)->d_inode;
-        if (inode != NULL)
+        if (inode != NULL) {
                 inode = igrab(inode);
-        if (inode == NULL)
-                GOTO(cleanup, rc = -ENOENT);
+                if (inode == NULL)
+                        GOTO(cleanup, rc = -ENOENT);
 
-        c1_res_id.name[0] = inode->i_ino;
-        c1_res_id.name[1] = inode->i_generation;
-        iput(inode);
+                c1_res_id.name[0] = inode->i_ino;
+                c1_res_id.name[1] = inode->i_generation;
+                iput(inode);
+        } else if ((*de_oldp)->d_flags & DCACHE_CROSS_REF) {
+                c1_res_id.name[0] = (*de_oldp)->d_inum;
+                c1_res_id.name[1] = (*de_oldp)->d_generation;
+        }
 
         /* Step 4: Lookup the target child entry */
         *de_newp = ll_lookup_one_len(new_name, *de_tgtdirp, new_len - 1);
@@ -2164,15 +2218,18 @@ static int mds_get_parents_children_locked(struct obd_device *obd,
         cleanup_phase = 4; /* target dentry */
 
         inode = (*de_newp)->d_inode;
-        if (inode != NULL)
+        if (inode != NULL) {
                 inode = igrab(inode);
-        if (inode == NULL)
-                goto retry_locks;
-
-        c2_res_id.name[0] = inode->i_ino;
-        c2_res_id.name[1] = inode->i_generation;
+                if (inode == NULL)
+                        goto retry_locks;
 
-        iput(inode);
+                c2_res_id.name[0] = inode->i_ino;
+                c2_res_id.name[1] = inode->i_generation;
+                iput(inode);
+        } else if ((*de_newp)->d_flags & DCACHE_CROSS_REF) {
+                c2_res_id.name[0] = (*de_newp)->d_inum;
+                c2_res_id.name[1] = (*de_newp)->d_generation;
+        }
 
 retry_locks:
         /* Step 5: Take locks on the parents and child(ren) */
@@ -2213,7 +2270,7 @@ retry_locks:
                 GOTO(cleanup, rc);
         }
 
-        if ((*de_oldp)->d_inode == NULL)
+        if (!DENTRY_VALID(*de_oldp))
                 GOTO(cleanup, rc = -ENOENT);
 
         /* Step 6b: Re-lookup target child to verify it hasn't changed */
index b967b92..20f9d94 100644 (file)
@@ -439,7 +439,6 @@ EXPORT_SYMBOL(class_detach);
 
 /* mea.c */
 EXPORT_SYMBOL(mea_name2idx);
-int raw_name2idx(int count, char *name, int namelen);
 EXPORT_SYMBOL(raw_name2idx);
 
 #ifdef LPROCFS
index ceba64e..56fc149 100644 (file)
@@ -54,7 +54,7 @@ int mea_name2idx(struct mea *mea, char *name, int namelen)
         return c;
 }
 
-int raw_name2idx(int count, char *name, int namelen)
+int raw_name2idx(int count, const char *name, int namelen)
 {
         unsigned int c;