Whamcloud - gitweb
- new routine lmv_get_mea_and_update_object() to be called for
authoralex <alex>
Thu, 20 May 2004 13:23:46 +0000 (13:23 +0000)
committeralex <alex>
Thu, 20 May 2004 13:23:46 +0000 (13:23 +0000)
  MDS's reply with -ERESTART which notifies directory got splitted
- lmv_intent_open() recognizes a dir gets splitted during request,
  retrieves mea and repeats the request using proper MDS
- lmv_create() recognizes a dir gets splitted during request,
  retrieves mea and repeats the request using proper MDS
- bug fixed in lmv_getattr_name(): it passed wrong namelen to raw_name2idx()
- lmv_obd_create() sets OBD_MD_FLID to flag MDSs to mark created object
  unsplittable. we have to differ two possible requests: 1) to create
  remote inode for cross-node mkdir(); 2) to create slave objects. last
  ones must not be splitted recursively
- mdt_obj_create() has been rewritten to comply just described rules.
  also, it takes a lock on newly created inode. this is needed for recovery
- bug fixed in scan_and_distribute(): it tried to open an inode using decimal
  number and this caused iopen_lookup() to find alias dentries
- mds_get_lmv_attr() should return right mea size for given conf. in any case
- minor cleanups

lustre/lmv/lmv_intent.c
lustre/lmv/lmv_internal.h
lustre/lmv/lmv_obd.c
lustre/mds/handler.c
lustre/mds/mds_lmv.c
lustre/mds/mds_open.c
lustre/mds/mds_reint.c

index 81e4baa..8d18bbb 100644 (file)
@@ -131,8 +131,9 @@ int lmv_intent_open(struct obd_export *exp, struct ll_uctxt *uctxt,
         /* IT_OPEN is intended to open (and create, possible) an object.
          * parent (pfid) may be splitted dir */
 
-        mds = pfid->mds;
-        obj = lmv_grab_obj(obd, pfid, 0);
+repeat:
+        mds = rpfid.mds;
+        obj = lmv_grab_obj(obd, &rpfid, 0);
         if (obj) {
                 /* directory is already splitted, so we have to forward
                  * request to the right MDS */
@@ -141,10 +142,20 @@ int lmv_intent_open(struct obd_export *exp, struct ll_uctxt *uctxt,
                 CDEBUG(D_OTHER, "forward to MDS #%u\n", mds);
         }
 
-        rc = md_intent_lock(lmv->tgts[mds].exp, uctxt, &rpfid, name, len,
-                            lmm, lmmsize, cfid, it, flags, reqp, cb_blocking);
-       
+        rc = md_intent_lock(lmv->tgts[mds].exp, uctxt, &rpfid, name,
+                            len, lmm, lmmsize, cfid, it, flags, reqp,
+                            cb_blocking);
         lmv_put_obj(obj);
+        if (rc == -ERESTART) {
+                /* directory got splitted. time to update local object
+                 * and repeat the request with proper MDS */
+                LASSERT(fid_equal(pfid, &rpfid));
+                rc = lmv_get_mea_and_update_object(exp, &rpfid);
+                if (rc == 0) {
+                        ptlrpc_req_finished(*reqp);
+                        goto repeat;
+                }
+        }
         if (rc != 0)
                 RETURN(rc);
 
@@ -498,7 +509,7 @@ repeat:
                 RETURN(rc);
         }
        
-        if (rc == -ESTALE) {
+        if (rc == -ERESTART) {
                 /* directory got splitted since last update. this shouldn't
                  * be becasue splitting causes lock revocation, so revalidate
                  * had to fail and lookup on dir had to return mea */
index e531834..30688f2 100644 (file)
@@ -50,6 +50,7 @@ int lmv_revalidate_slaves(struct obd_export *, struct ptlrpc_request **,
                           struct ll_fid *, struct lookup_intent *, int,
                          ldlm_blocking_callback cb_blocking);
 void lmv_cleanup_objs(struct obd_device *obd);
+int lmv_get_mea_and_update_object(struct obd_export *, struct ll_fid *);
 
 static inline struct mea * 
 is_body_of_splitted_dir(struct ptlrpc_request *req, int offset)
index ce78065..0441c14 100644 (file)
@@ -450,36 +450,62 @@ int lmv_close(struct obd_export *exp, struct obdo *obdo,
         RETURN(rc);
 }
 
+int lmv_get_mea_and_update_object(struct obd_export *exp, struct ll_fid *fid)
+{
+        struct obd_device *obd = exp->exp_obd;
+        struct lmv_obd *lmv = &obd->u.lmv;
+        struct ptlrpc_request *req = NULL;
+        struct lustre_md md;
+        int mealen, rc;
+
+        md.mea = NULL;
+        mealen = MEA_SIZE_LMV(lmv);
+
+        /* time to update mea of parent fid */
+        rc = md_getattr(lmv->tgts[fid->mds].exp, fid,
+                        OBD_MD_FLEASIZE, mealen, &req);
+        if (rc)
+                GOTO(cleanup, rc);
+        rc = mdc_req2lustre_md(req, 0, NULL, exp, &md);
+        if (rc)
+                GOTO(cleanup, rc);
+        if (md.mea == NULL)
+                GOTO(cleanup, rc = -ENODATA);
+        rc = lmv_create_obj_from_attrs(exp, fid, md.mea);
+        obd_free_memmd(exp, (struct lov_stripe_md **) &md.mea);
+
+cleanup:
+        if (req)
+                ptlrpc_req_finished(req);
+        RETURN(rc);
+}
+
 int lmv_create(struct obd_export *exp, struct mdc_op_data *op_data,
                    const void *data, int datalen, int mode, __u32 uid,
                    __u32 gid, __u64 rdev, struct ptlrpc_request **request)
 {
         struct obd_device *obd = exp->exp_obd;
         struct lmv_obd *lmv = &obd->u.lmv;
-        struct mea *mea = op_data->mea1;
         struct mds_body *mds_body;
-        int rc, i, mds, free_mea = 0;
         struct lmv_obj *obj;
+        int rc, mds;
         ENTRY;
+
         lmv_connect(obd);
-        /* TODO: where to create new directories?
-         * current design don't support directory on a slave MDS,
-         * but we lookup by name may forward any request in slave
-         */
 repeat:
         obj = lmv_grab_obj(obd, &op_data->fid1, 0);
         if (obj) {
                 mds = raw_name2idx(obj->objcount, op_data->name,
-                                        op_data->namelen - 1);
+                                        op_data->namelen);
                 op_data->fid1 = obj->objs[mds].fid;
                 lmv_put_obj(obj);
         }
 
-        CDEBUG(D_OTHER, "CREATE '%*s' on %lu/%lu/%lu (mea 0x%p)\n",
+        CDEBUG(D_OTHER, "CREATE '%*s' on %lu/%lu/%lu\n",
                         op_data->namelen, op_data->name,
                         (unsigned long) op_data->fid1.mds,
                         (unsigned long) op_data->fid1.id,
-                        (unsigned long) op_data->fid1.generation, mea);
+                        (unsigned long) op_data->fid1.generation);
         rc = md_create(lmv->tgts[op_data->fid1.mds].exp, op_data, data,
                        datalen, mode, uid, gid, rdev, request);
         if (rc == 0) {
@@ -494,33 +520,15 @@ repeat:
                        op_data->fid1.mds);
                 LASSERT(mds_body->valid & OBD_MD_MDS ||
                                 mds_body->mds == op_data->fid1.mds);
-        } else if (rc == -ESTALE) {
-                struct ptlrpc_request *req = NULL;
-                struct lustre_md md;
-                int mealen;
-                
-                LBUG(); /* FIXME ASAP */
-                CDEBUG(D_OTHER, "it seems MDS splitted dir\n");
-                LASSERT(mea == NULL);
-
-                mealen = sizeof(struct ll_fid)*lmv->count + sizeof(struct mea);
-                /* time to update mea of parent fid */
-                i = op_data->fid1.mds;
-                rc = md_getattr(lmv->tgts[i].exp, &op_data->fid1,
-                                        OBD_MD_FLEASIZE, mealen, &req);
-                LASSERT(rc == 0);
-                md.mea = NULL;
-                rc = mdc_req2lustre_md(req, 0, NULL, exp, &md);
-                LASSERT(rc == 0);
-                LASSERT(md.mea != NULL);
-                mea = md.mea;
-                ptlrpc_req_finished(req);
-                free_mea = 1;
-
-                goto repeat;
+        } else if (rc == -ERESTART) {
+                /* directory got splitted. time to update local object
+                 * and repeat the request with proper MDS */
+                rc = lmv_get_mea_and_update_object(exp, &op_data->fid1);
+                if (rc == 0) {
+                        ptlrpc_req_finished(*request);
+                        goto repeat;
+                }
         }
-        if (free_mea)
-                obd_free_memmd(exp, (struct lov_stripe_md**) &mea);
         RETURN(rc);
 }
 
@@ -582,12 +590,12 @@ int lmv_getattr_name(struct obd_export *exp, struct ll_fid *fid,
         ENTRY;
         lmv_connect(obd);
         CDEBUG(D_OTHER, "getattr_name for %*s on %lu/%lu/%lu\n",
-               namelen - 1, filename, (unsigned long) fid->mds,
+               namelen, filename, (unsigned long) fid->mds,
                (unsigned long) fid->id, (unsigned long) fid->generation);
         obj = lmv_grab_obj(obd, fid, 0);
         if (obj) {
                 /* directory is splitted. look for right mds for this name */
-                mds = raw_name2idx(obj->objcount, filename, namelen - 1);
+                mds = raw_name2idx(obj->objcount, filename, namelen);
                 rfid = obj->objs[mds].fid;
                 lmv_put_obj(obj);
         }
@@ -1014,7 +1022,7 @@ int lmv_obd_create(struct obd_export *exp, struct obdo *oa,
                         continue;
 
                 oa->o_valid = OBD_MD_FLGENER | OBD_MD_FLTYPE | OBD_MD_FLMODE
-                                | OBD_MD_FLUID | OBD_MD_FLGID;
+                                | OBD_MD_FLUID | OBD_MD_FLGID | OBD_MD_FLID;
 
                 rc = obd_create(lmv->tgts[c].exp, oa, &obj_mdp, oti);
                 /* FIXME: error handling here */
index 9d9e64e..761760d 100644 (file)
@@ -1171,6 +1171,8 @@ static char *reint_names[] = {
 
 static int mdt_obj_create(struct ptlrpc_request *req)
 {
+        unsigned int tmpname = ll_insecure_random_int();
+        struct ldlm_res_id res_id = { .name = {0} };
         struct obd_export *exp = req->rq_export;
         struct obd_device *obd = exp->exp_obd;
         struct mds_obd *mds = &obd->u.mds;
@@ -1178,8 +1180,10 @@ static int mdt_obj_create(struct ptlrpc_request *req)
         int rc, size = sizeof(*repbody);
         char fidname[LL_FID_NAMELEN];
         struct inode *parent_inode;
+        struct lustre_handle lockh;
         struct obd_run_ctxt saved;
-        int err, namelen, mealen;
+        ldlm_policy_data_t policy;
+        int mealen, flags = 0;
         struct obd_ucred uc;
         struct dentry *new;
         struct mea *mea;
@@ -1203,95 +1207,63 @@ static int mdt_obj_create(struct ptlrpc_request *req)
 
         repbody = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*repbody));
 
-        if (!(body->oa.o_valid & OBD_MD_FLID)) {
-                /* this is request from another MDS to create remove dir inode */
-                unsigned int tmpname = ll_insecure_random_int();
+        handle = fsfilt_start(obd, parent_inode, FSFILT_OP_MKDIR, NULL);
+        LASSERT(!IS_ERR(handle));
 
-                handle = fsfilt_start(obd, parent_inode, FSFILT_OP_MKDIR, NULL);
+        sprintf(fidname, "%u", tmpname);
+        new = simple_mkdir(mds->mds_objects_dir, fidname,
+                        body->oa.o_mode, 1);
+        LASSERT(!IS_ERR(new));
+        LASSERT(new->d_inode != NULL);
+
+        if (body->oa.o_valid & OBD_MD_FLID) {
+                /* this is new object for splitted dir. we have to
+                 * prevent recursive splitting on it -bzzz */
+                mealen = obd_size_diskmd(mds->mds_lmv_exp, NULL);
+                OBD_ALLOC(mea, mealen);
+                LASSERT(mea != NULL);
+                mea->mea_count = 0;
+                down(&new->d_inode->i_sem);
+                handle = fsfilt_start(obd, new->d_inode, FSFILT_OP_SETATTR, NULL);
                 LASSERT(!IS_ERR(handle));
-
-                sprintf(fidname, "%u", tmpname);
-                new = simple_mkdir(mds->mds_objects_dir, fidname,
-                                        body->oa.o_mode, 1);
-                LASSERT(!IS_ERR(new));
-                LASSERT(new->d_inode != NULL);
-
-                obdo_from_inode(&repbody->oa, new->d_inode, FILTER_VALID_FLAGS);
-                repbody->oa.o_id = new->d_inode->i_ino;
-                repbody->oa.o_generation = new->d_inode->i_generation;
-                repbody->oa.o_valid |= OBD_MD_FLID | OBD_MD_FLGENER;
-
-                rc = fsfilt_del_dir_entry(obd, new);
+                rc = fsfilt_set_md(obd, new->d_inode, handle, mea, mealen);
                 LASSERT(rc == 0);
-
-                rc = fsfilt_commit(obd, parent_inode, handle, 0);
+                fsfilt_commit(obd, new->d_inode, handle, 0);
                 LASSERT(rc == 0);
-
-                CDEBUG(D_OTHER, "created dirobj: %lu/%lu mode %o\n",
-                       (unsigned long) new->d_inode->i_ino,
-                       (unsigned long) new->d_inode->i_generation,
-                       (unsigned) new->d_inode->i_mode);
-
-                l_dput(new);
-                pop_ctxt(&saved, &obd->obd_ctxt, &uc);
-                RETURN(0);
+                up(&new->d_inode->i_sem);
+                OBD_FREE(mea, mealen);
         }
+        obdo_from_inode(&repbody->oa, new->d_inode, FILTER_VALID_FLAGS);
+        repbody->oa.o_id = new->d_inode->i_ino;
+        repbody->oa.o_generation = new->d_inode->i_generation;
+        repbody->oa.o_valid |= OBD_MD_FLID | OBD_MD_FLGENER;
 
-        repbody = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*repbody));
-        memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
-       
-        namelen = ll_fid2str(fidname, body->oa.o_id, body->oa.o_generation);
-        
         down(&parent_inode->i_sem);
-        new = lookup_one_len(fidname, mds->mds_objects_dir, namelen);
-        if (new->d_inode != NULL) {
-                CERROR("impossible non-negative obj dentry " LPU64":%u!\n",
-                       repbody->oa.o_id, repbody->oa.o_generation);
-                LBUG();
-        }
-        handle = fsfilt_start(exp->exp_obd, mds->mds_objects_dir->d_inode,
-                              FSFILT_OP_MKDIR, NULL);
-        /* FIXME: error handling here */
-        LASSERT(!IS_ERR(handle));
-
-        rc = vfs_mkdir(parent_inode, new, body->oa.o_mode);
+        rc = fsfilt_del_dir_entry(obd, new);
         up(&parent_inode->i_sem);
-        /* FIXME: error handling here */
-        if (rc)
-                CERROR("vfs_mkdir() returned %d\n", rc);
         LASSERT(rc == 0);
-        
-       /* mark this object non-splittable */
-        mealen = obd_size_diskmd(mds->mds_lmv_exp, NULL);
-        OBD_ALLOC(mea, mealen);
-        LASSERT(mea != NULL);
-        mea->mea_count = 0;
-        down(&new->d_inode->i_sem);
-        handle = fsfilt_start(obd, new->d_inode, FSFILT_OP_SETATTR, NULL);
-        LASSERT(!IS_ERR(handle));
-       rc = fsfilt_set_md(obd, new->d_inode, handle, mea, mealen);
-        LASSERT(rc == 0);
-        fsfilt_commit(obd, new->d_inode, handle, 0);
-        LASSERT(rc == 0);
-       up(&new->d_inode->i_sem);
-        OBD_FREE(mea, mealen);
 
-        err = fsfilt_commit(exp->exp_obd, mds->mds_objects_dir->d_inode,
-                            handle, 0);
-        /* FIXME: error handling here */
-        LASSERT(err == 0);
+        rc = mds_finish_transno(mds, parent_inode, handle, req, rc, 0);
+        LASSERT(rc == 0);
 
-        obdo_from_inode(&repbody->oa, new->d_inode, FILTER_VALID_FLAGS);
-        repbody->oa.o_id = new->d_inode->i_ino;
-        repbody->oa.o_generation = new->d_inode->i_generation;
-        CDEBUG(D_OTHER, "created dirobj: %lu, %lu mode %o, uid %u, gid %u\n",
-                        (unsigned long) repbody->oa.o_id,
+        res_id.name[0] = new->d_inode->i_ino;
+        res_id.name[1] = new->d_inode->i_generation;
+        policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
+        rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace,
+                        res_id, LDLM_IBITS, &policy,
+                        LCK_EX, &flags, mds_blocking_ast,
+                        ldlm_completion_ast, NULL, NULL,
+                        NULL, 0, NULL, &lockh);
+        LASSERT(rc == ELDLM_OK);
+
+        CDEBUG(D_OTHER, "created dirobj: %lu/%lu mode %o\n",
                         (unsigned long) new->d_inode->i_ino,
-                        (unsigned) new->d_inode->i_mode,
-                        (unsigned) new->d_inode->i_uid,
-                        (unsigned) new->d_inode->i_gid);
-        dput(new);
+                        (unsigned long) new->d_inode->i_generation,
+                        (unsigned) new->d_inode->i_mode);
+
+        l_dput(new);
         pop_ctxt(&saved, &obd->obd_ctxt, &uc);
+        ptlrpc_save_lock(req, &lockh, LCK_EX);
         RETURN(0);
 }
 
index 3a5c838..c4ea3af 100644 (file)
@@ -169,7 +169,6 @@ int mds_get_lmv_attr(struct obd_device *obd, struct inode *inode,
        if (rc <= 0) {
                OBD_FREE(*mea, *mea_size);
                *mea = NULL;
-                *mea_size = 0;
        }
         if (rc > 0)
                 rc = 0;
@@ -358,8 +357,7 @@ int scan_and_distribute(struct obd_device *obd, struct dentry *dentry,
         OBD_ALLOC(file_name, nlen);
         if (!file_name)
                 RETURN(-ENOMEM);
-        i = sprintf(file_name, "__iopen__/%u",
-                        (unsigned) dentry->d_inode->i_ino);
+        i = sprintf(file_name, "__iopen__/0x%lx", dentry->d_inode->i_ino);
 
         file = filp_open(file_name, O_RDONLY, 0);
         if (IS_ERR(file)) {
@@ -421,27 +419,26 @@ int mds_try_to_split_dir(struct obd_device *obd,
         if (dentry->d_inode->i_ino == mds->mds_rootfid.id)
                 RETURN(0);
 
-#if 1
+        /* we want to split only large dirs. this may be already
+         * splitted dir or a slave dir created during splitting */
         if (dir->i_size < MAX_DIR_SIZE)
                 RETURN(0);
-#endif
 
         /* check is directory marked non-splittable */
         if (mea && *mea)
                 RETURN(0);
 
-        CDEBUG(D_OTHER, "%s: split directory %lu/%lu (mea 0x%p)\n",
-               obd->obd_name, dir->i_ino,
-               (unsigned long) dir->i_generation, mea);
+        CDEBUG(D_OTHER, "%s: split directory %lu/%lu\n",
+               obd->obd_name, dir->i_ino, (unsigned long) dir->i_generation);
 
         if (mea == NULL)
                 mea = &tmea;
         mea_size = obd_size_diskmd(mds->mds_lmv_exp, NULL);
 
         /* FIXME: Actually we may only want to allocate enough space for
-           necessary amount of stripes, but on the other hand with this approach
-           of allocating maximal possible amount of MDS slots, it would be
-           easier to split the dir over more MDSes */
+         * necessary amount of stripes, but on the other hand with this
+         * approach of allocating maximal possible amount of MDS slots,
+         * it would be easier to split the dir over more MDSes */
         rc = obd_alloc_diskmd(mds->mds_lmv_exp, (void *) mea);
         if (!(*mea))
                 RETURN(-ENOMEM);
@@ -460,7 +457,7 @@ int mds_try_to_split_dir(struct obd_device *obd,
                        OBD_MD_FLMTIME | OBD_MD_FLCTIME |
                         OBD_MD_FLUID | OBD_MD_FLGID);
         oa->o_gr = FILTER_GROUP_FIRST_MDS + mds->mds_num;
-        oa->o_valid |= OBD_MD_FLFLAGS | OBD_MD_FLGROUP;
+        oa->o_valid |= OBD_MD_FLID | OBD_MD_FLFLAGS | OBD_MD_FLGROUP;
         oa->o_mode = dir->i_mode;
         CDEBUG(D_OTHER, "%s: create subdirs with mode %o, uid %u, gid %u\n",
                         obd->obd_name, dir->i_mode, dir->i_uid, dir->i_gid);
index 1a95d04..2546cc6 100644 (file)
@@ -874,7 +874,7 @@ int mds_open(struct mds_update_record *rec, int offset,
                 if (mea->mea_master != i) {
                         CERROR("inapropriate MDS(%d) for %s. should be %d\n",
                                 mea->mea_master, rec->ur_name, i);
-                        GOTO(cleanup, rc = -ESTALE);
+                        GOTO(cleanup, rc = -ERESTART);
                 }
         }
 
@@ -939,7 +939,7 @@ got_child:
                 if ((rc = mds_try_to_split_dir(obd, dparent, &mea, 0))) {
                         if (rc > 0) {
                                 /* dir got splitted */
-                                GOTO(cleanup, rc = -ESTALE);
+                                GOTO(cleanup, rc = -ERESTART);
                         } else {
                                 /* error happened during spitting */
                                 GOTO(cleanup, rc);
index d3615d4..bb56334 100644 (file)
@@ -578,7 +578,7 @@ static int mds_reint_create(struct mds_update_record *rec, int offset,
                 if (mea->mea_master != i) {
                         CERROR("inapropriate MDS(%d) for %s. should be %d\n",
                                 mea->mea_master, rec->ur_name, i);
-                        GOTO(cleanup, rc = -ESTALE);
+                        GOTO(cleanup, rc = -ERESTART);
                 }
         }
 
@@ -597,7 +597,7 @@ static int mds_reint_create(struct mds_update_record *rec, int offset,
                 if ((rc = mds_try_to_split_dir(obd, dparent, &mea, 0))) {
                         if (rc > 0) {
                                 /* dir got splitted */
-                                GOTO(cleanup, rc = -ESTALE);
+                                GOTO(cleanup, rc = -ERESTART);
                         } else {
                                 /* error happened during spitting */
                                 GOTO(cleanup, rc);
@@ -644,15 +644,10 @@ static int mds_reint_create(struct mds_update_record *rec, int offset,
                         if (rec->ur_eadata)
                                 nstripes = *(u16 *)rec->ur_eadata;
 
-#if 1
-                        /* this is for current testing yet. after the testing
-                         * directory will split if size reaches some limite -bzzz */
-                        if (rc == 0) {
-#else
                         if (rc == 0 && nstripes) {
-#endif
                                 /* FIXME: error handling here */
-                                mds_try_to_split_dir(obd, dchild, NULL, nstripes);
+                                mds_try_to_split_dir(obd, dchild,
+                                                        NULL, nstripes);
                         }
                 } else if (!DENTRY_VALID(dchild)) {
                         /* inode will be created on another MDS */