Whamcloud - gitweb
- returned back OBD_MD_FID. For many reasons its removal was wrong.
[fs/lustre-release.git] / lustre / mds / mds_lmv.c
index 34a6178..2093a19 100644 (file)
 /*
  * TODO:
  *   - magic in mea struct
- *   - error handling is totally missed
  */
-
-int mds_lmv_connect(struct obd_device *obd, char *lmv_name)
+int mds_md_connect(struct obd_device *obd, char *md_name)
 {
         struct mds_obd *mds = &obd->u.mds;
         struct lustre_handle conn = {0};
         int rc, valsize, value;
         ENTRY;
 
-        if (IS_ERR(mds->mds_lmv_obd))
-                RETURN(PTR_ERR(mds->mds_lmv_obd));
+        if (IS_ERR(mds->mds_md_obd))
+                RETURN(PTR_ERR(mds->mds_md_obd));
 
-        if (mds->mds_lmv_connected)
+        if (mds->mds_md_connected)
                 RETURN(0);
 
-        down(&mds->mds_lmv_sem);
-        if (mds->mds_lmv_connected) {
-                up(&mds->mds_lmv_sem);
+        down(&mds->mds_md_sem);
+        if (mds->mds_md_connected) {
+                up(&mds->mds_md_sem);
                 RETURN(0);
         }
 
-        mds->mds_lmv_obd = class_name2obd(lmv_name);
-        if (!mds->mds_lmv_obd) {
-                CERROR("MDS cannot locate LMV %s\n",
-                       lmv_name);
-                mds->mds_lmv_obd = ERR_PTR(-ENOTCONN);
+        mds->mds_md_obd = class_name2obd(md_name);
+        if (!mds->mds_md_obd) {
+                CERROR("MDS cannot locate MD(LMV) %s\n",
+                       md_name);
+                mds->mds_md_obd = ERR_PTR(-ENOTCONN);
                 GOTO(err_last, rc = -ENOTCONN);
         }
 
-        rc = obd_connect(&conn, mds->mds_lmv_obd,
+        rc = obd_connect(&conn, mds->mds_md_obd,
                          &obd->obd_uuid, OBD_OPT_MDS_CONNECTION);
         if (rc) {
-                CERROR("MDS cannot connect to LMV %s (%d)\n",
-                       lmv_name, rc);
-                mds->mds_lmv_obd = ERR_PTR(rc);
+                CERROR("MDS cannot connect to MD(LMV) %s (%d)\n",
+                       md_name, rc);
+                mds->mds_md_obd = ERR_PTR(rc);
                 GOTO(err_last, rc);
         }
-        mds->mds_lmv_exp = class_conn2export(&conn);
-        if (mds->mds_lmv_exp == NULL)
+        mds->mds_md_exp = class_conn2export(&conn);
+        if (mds->mds_md_exp == NULL)
                 CERROR("can't get export!\n");
 
-        rc = obd_register_observer(mds->mds_lmv_obd, obd);
+        rc = obd_register_observer(mds->mds_md_obd, obd);
         if (rc) {
-                CERROR("MDS cannot register as observer of LMV %s, "
-                       "rc = %d\n", lmv_name, rc);
+                CERROR("MDS cannot register as observer of MD(LMV) %s, "
+                       "rc = %d\n", md_name, rc);
                 GOTO(err_discon, rc);
         }
 
         /* retrieve size of EA */
-        rc = obd_get_info(mds->mds_lmv_exp, strlen("mdsize"),
+        rc = obd_get_info(mds->mds_md_exp, strlen("mdsize"),
                           "mdsize", &valsize, &value);
         if (rc) 
                 GOTO(err_reg, rc);
@@ -100,41 +98,41 @@ int mds_lmv_connect(struct obd_device *obd, char *lmv_name)
                 mds->mds_max_mdsize = value;
 
         /* find our number in LMV cluster */
-        rc = obd_get_info(mds->mds_lmv_exp, strlen("mdsnum"),
+        rc = obd_get_info(mds->mds_md_exp, strlen("mdsnum"),
                           "mdsnum", &valsize, &value);
         if (rc) 
                 GOTO(err_reg, rc);
         
         mds->mds_num = value;
 
-        rc = obd_set_info(mds->mds_lmv_exp, strlen("inter_mds"),
+        rc = obd_set_info(mds->mds_md_exp, strlen("inter_mds"),
                           "inter_mds", 0, NULL);
         if (rc)
                 GOTO(err_reg, rc);
 
-        mds->mds_lmv_connected = 1;
-        up(&mds->mds_lmv_sem);
+        mds->mds_md_connected = 1;
+        up(&mds->mds_md_sem);
        RETURN(0);
 
 err_reg:
-        obd_register_observer(mds->mds_lmv_obd, NULL);
+        obd_register_observer(mds->mds_md_obd, NULL);
 err_discon:
-        obd_disconnect(mds->mds_lmv_exp, 0);
-        mds->mds_lmv_exp = NULL;
-        mds->mds_lmv_obd = ERR_PTR(rc);
+        obd_disconnect(mds->mds_md_exp, 0);
+        mds->mds_md_exp = NULL;
+        mds->mds_md_obd = ERR_PTR(rc);
 err_last:
-        up(&mds->mds_lmv_sem);
-        RETURN(rc);
+        up(&mds->mds_md_sem);
+        return rc;
 }
 
-int mds_lmv_postsetup(struct obd_device *obd)
+int mds_md_postsetup(struct obd_device *obd)
 {
         struct mds_obd *mds = &obd->u.mds;
         int rc = 0;
         ENTRY;
 
-        if (mds->mds_lmv_exp) {
-                rc = obd_init_ea_size(mds->mds_lmv_exp,
+        if (mds->mds_md_exp) {
+                rc = obd_init_ea_size(mds->mds_md_exp,
                                       mds->mds_max_mdsize,
                                       mds->mds_max_cookiesize);
         }
@@ -142,26 +140,26 @@ int mds_lmv_postsetup(struct obd_device *obd)
         RETURN(rc);
 }
 
-int mds_lmv_disconnect(struct obd_device *obd, int flags)
+int mds_md_disconnect(struct obd_device *obd, int flags)
 {
         struct mds_obd *mds = &obd->u.mds;
         int rc = 0;
         ENTRY;
 
-        if (!mds->mds_lmv_connected)
+        if (!mds->mds_md_connected)
                 RETURN(0);
 
-        down(&mds->mds_lmv_sem);
-        if (!IS_ERR(mds->mds_lmv_obd) && mds->mds_lmv_exp != NULL) {
-                LASSERT(mds->mds_lmv_connected);
+        down(&mds->mds_md_sem);
+        if (!IS_ERR(mds->mds_md_obd) && mds->mds_md_exp != NULL) {
+                LASSERT(mds->mds_md_connected);
                 
-                obd_register_observer(mds->mds_lmv_obd, NULL);
+                obd_register_observer(mds->mds_md_obd, NULL);
 
                 if (flags & OBD_OPT_FORCE) {
                         struct obd_device *lmv_obd;
                         struct obd_ioctl_data ioc_data = { 0 };
                         
-                        lmv_obd = class_exp2obd(mds->mds_lmv_exp);
+                        lmv_obd = class_exp2obd(mds->mds_md_exp);
                         if (lmv_obd == NULL)
                                 GOTO(out, rc = 0);
 
@@ -175,7 +173,7 @@ int mds_lmv_disconnect(struct obd_device *obd, int flags)
                          * it here. --umka.
                          */
                         lmv_obd->obd_no_recov = 1;
-                        obd_iocontrol(IOC_OSC_SET_ACTIVE, mds->mds_lmv_exp,
+                        obd_iocontrol(IOC_OSC_SET_ACTIVE, mds->mds_md_exp,
                                       sizeof(ioc_data), &ioc_data, NULL);
                 }
 
@@ -184,33 +182,33 @@ int mds_lmv_disconnect(struct obd_device *obd, int flags)
                  * disconnected by class_disconnect_exports()) then we just need
                  * to drop our ref.
                  */
-                mds->mds_lmv_connected = 0;
-                rc = obd_disconnect(mds->mds_lmv_exp, flags);
+                mds->mds_md_connected = 0;
+                rc = obd_disconnect(mds->mds_md_exp, flags);
                 if (rc)
-                        class_export_put(mds->mds_lmv_exp);
+                        class_export_put(mds->mds_md_exp);
 
         out:
-                mds->mds_lmv_exp = NULL;
-                mds->mds_lmv_obd = NULL;
+                mds->mds_md_exp = NULL;
+                mds->mds_md_obd = NULL;
         }
-        up(&mds->mds_lmv_sem);
+        up(&mds->mds_md_sem);
         RETURN(rc);
 }
 
-int mds_get_lmv_attr(struct obd_device *obd, struct inode *inode,
-                     struct mea **mea, int *mea_size)
+int mds_md_get_attr(struct obd_device *obd, struct inode *inode,
+                    struct mea **mea, int *mea_size)
 {
         struct mds_obd *mds = &obd->u.mds;
        int rc;
         ENTRY;
 
-       if (!mds->mds_lmv_obd)
+       if (!mds->mds_md_obd)
                RETURN(0);
         if (!S_ISDIR(inode->i_mode))
                 RETURN(0);
 
        /* first calculate mea size */
-        *mea_size = obd_alloc_diskmd(mds->mds_lmv_exp,
+        *mea_size = obd_alloc_diskmd(mds->mds_md_exp,
                                      (struct lov_mds_md **)mea);
         if (*mea_size < 0 || *mea == NULL)
                 return *mea_size < 0 ? *mea_size : -EINVAL;
@@ -360,7 +358,7 @@ static int flush_buffer_onto_mds(struct dirsplit_control *dc, int mdsnum)
                 ca->brwc.count = PAGE_SIZE;
                 ca->brwc.flag = 0;
                 ca->oa.o_mds = mdsnum;
-                rc = obd_brw(OBD_BRW_WRITE, mds->mds_lmv_exp, &ca->oa,
+                rc = obd_brw(OBD_BRW_WRITE, mds->mds_md_exp, &ca->oa,
                              (struct lov_stripe_md *) dc->mea,
                              1, &ca->brwc, NULL);
                 if (rc)
@@ -522,6 +520,7 @@ int scan_and_distribute(struct obd_device *obd, struct dentry *dentry,
                         GOTO(cleanup, err);
         }
 
+        EXIT;
 cleanup:
         for (i = 0; i < mea->mea_count; i++) {
                 struct list_head *cur, *tmp;
@@ -537,7 +536,7 @@ cleanup:
         OBD_FREE(dc.cache, sizeof(struct dir_cache) * mea->mea_count);
         OBD_FREE(file_name, nlen);
 
-        RETURN(err);
+        return err;
 }
 
 #define MAX_DIR_SIZE      (64 * 1024)
@@ -550,7 +549,7 @@ int mds_splitting_expected(struct obd_device *obd, struct dentry *dentry)
         int rc, size;
 
        /* clustered MD ? */
-       if (!mds->mds_lmv_obd)
+       if (!mds->mds_md_obd)
                return MDS_NO_SPLITTABLE;
 
         /* inode exist? */
@@ -573,7 +572,7 @@ int mds_splitting_expected(struct obd_device *obd, struct dentry *dentry)
         if (dentry->d_inode->i_size < MAX_DIR_SIZE)
                 return MDS_NO_SPLIT_EXPECTED;
 
-        mds_get_lmv_attr(obd, dentry->d_inode, &mea, &size);
+        mds_md_get_attr(obd, dentry->d_inode, &mea, &size);
         if (mea) {
                 /* already splitted or slave object: shouldn't be splitted */
                 rc = MDS_NO_SPLITTABLE;
@@ -623,13 +622,13 @@ int mds_try_to_split_dir(struct obd_device *obd, struct dentry *dentry,
 
         if (mea == NULL)
                 mea = &tmea;
-        mea_size = obd_size_diskmd(mds->mds_lmv_exp, NULL);
+        mea_size = obd_size_diskmd(mds->mds_md_exp, NULL);
 
         /* FIXME: Actually we may only want to allocate enough space for
          * necessary amount of stripes, but on the other hand with this
          * approach of allocating maximal possible amount of MDS slots,
          * it would be easier to split the dir over more MDSes */
-        rc = obd_alloc_diskmd(mds->mds_lmv_exp, (void *)mea);
+        rc = obd_alloc_diskmd(mds->mds_md_exp, (void *)mea);
         if (rc < 0) {
                 CERROR("obd_alloc_diskmd() failed, error %d.\n", rc);
                 RETURN(rc);
@@ -645,7 +644,6 @@ int mds_try_to_split_dir(struct obd_device *obd, struct dentry *dentry,
         if (!oa)
                 RETURN(-ENOMEM);
 
-       oa->o_id = dir->i_ino;
        oa->o_generation = dir->i_generation;
 
         obdo_from_inode(oa, dir, OBD_MD_FLTYPE | OBD_MD_FLATIME |
@@ -656,6 +654,14 @@ int mds_try_to_split_dir(struct obd_device *obd, struct dentry *dentry,
         oa->o_valid |= OBD_MD_FLID | OBD_MD_FLFLAGS | OBD_MD_FLGROUP;
         oa->o_mode = dir->i_mode;
 
+        /* 
+         * until lmv_obd_create() properly rewritten, it is important to have
+         * here oa->o_id = dir->i_ino, as otherwise master object will have
+         * invalid store cookie (zero inode num), what will lead to -ESTALE in
+         * mds_open() or somewhere else.
+         */
+       oa->o_id = dir->i_ino;
+
         down(&dir->i_sem);
         rc = mds_read_inode_sid(obd, dir, &id);
         up(&dir->i_sem);
@@ -665,11 +671,13 @@ int mds_try_to_split_dir(struct obd_device *obd, struct dentry *dentry,
                 GOTO(err_oa, rc);
         }
         oa->o_fid = id_fid(&id);
+        oa->o_mds = mds->mds_num;
+        LASSERT(oa->o_fid != 0);
 
         CDEBUG(D_OTHER, "%s: create subdirs with mode %o, uid %u, gid %u\n",
                obd->obd_name, dir->i_mode, dir->i_uid, dir->i_gid);
                         
-        rc = obd_create(mds->mds_lmv_exp, oa,
+        rc = obd_create(mds->mds_md_exp, oa,
                         (struct lov_stripe_md **)mea, NULL);
         if (rc) {
                 CERROR("Can't create remote inode, rc = %d\n", rc);
@@ -709,7 +717,7 @@ int mds_try_to_split_dir(struct obd_device *obd, struct dentry *dentry,
        /* 3) read through the dir and distribute it over objects */
         rc = scan_and_distribute(obd, dentry, *mea);
        if (mea == &tmea)
-                obd_free_diskmd(mds->mds_lmv_exp, (struct lov_mds_md **)mea);
+                obd_free_diskmd(mds->mds_md_exp, (struct lov_mds_md **)mea);
         if (rc) {
                 CERROR("scan_and_distribute() failed, error %d.\n", rc);
                 RETURN(rc);
@@ -719,7 +727,7 @@ int mds_try_to_split_dir(struct obd_device *obd, struct dentry *dentry,
 
 err_oa:
        obdo_free(oa);
-        RETURN(rc);
+        return rc;
 }
 
 static int filter_start_page_write(struct inode *inode,
@@ -728,12 +736,11 @@ static int filter_start_page_write(struct inode *inode,
         struct page *page = alloc_pages(GFP_HIGHUSER, 0);
         if (page == NULL) {
                 CERROR("no memory for a temp page\n");
-                RETURN(lnb->rc = -ENOMEM);
+                return lnb->rc = -ENOMEM;
         }
         POISON_PAGE(page, 0xf1);
         page->index = lnb->offset >> PAGE_SHIFT;
         lnb->page = page;
-
         return 0;
 }
 
@@ -754,6 +761,7 @@ int mds_preprw(int cmd, struct obd_export *exp, struct obdo *oa,
         struct dentry *dentry;
         struct lustre_id id;
         ENTRY;
+
         LASSERT(objcount == 1);
         LASSERT(obj->ioo_bufcnt > 0);
 
@@ -880,8 +888,8 @@ int mds_choose_mdsnum(struct obd_device *obd, const char *name, int len, int fla
 
         if (flags & REC_REINT_CREATE) { 
                 i = mds->mds_num;
-        } else if (mds->mds_lmv_exp) {
-                lmv = &mds->mds_lmv_exp->exp_obd->u.lmv;
+        } else if (mds->mds_md_exp) {
+                lmv = &mds->mds_md_exp->exp_obd->u.lmv;
                 i = raw_name2idx(MEA_MAGIC_LAST_CHAR, lmv->desc.ld_tgt_count, name, len);
         }
         RETURN(i);
@@ -891,7 +899,7 @@ int mds_lock_slave_objs(struct obd_device *obd, struct dentry *dentry,
                         struct lustre_handle **rlockh)
 {
         struct mds_obd *mds = &obd->u.mds;
-        struct mdc_op_data op_data;
+        struct mdc_op_data *op_data;
         struct lookup_intent it;
         struct mea *mea = NULL;
         int mea_size, rc;
@@ -903,15 +911,15 @@ int mds_lock_slave_objs(struct obd_device *obd, struct dentry *dentry,
         LASSERT(dentry->d_inode != NULL);
 
        /* clustered MD ? */
-       if (!mds->mds_lmv_obd)
+       if (!mds->mds_md_obd)
                RETURN(0);
 
         /* a dir can be splitted only */
         if (!S_ISDIR(dentry->d_inode->i_mode))
                 RETURN(0);
 
-        rc = mds_get_lmv_attr(obd, dentry->d_inode,
-                              &mea, &mea_size);
+        rc = mds_md_get_attr(obd, dentry->d_inode,
+                             &mea, &mea_size);
         if (rc)
                 RETURN(rc);
 
@@ -934,35 +942,46 @@ int mds_lock_slave_objs(struct obd_device *obd, struct dentry *dentry,
                 GOTO(cleanup, rc = -ENOMEM);
 
         memset(*rlockh, 0, handle_size);
-        memset(&op_data, 0, sizeof(op_data));
+        OBD_ALLOC(op_data, sizeof(*op_data));
+        if (op_data == NULL) {
+                OBD_FREE(*rlockh, handle_size);
+                RETURN(-ENOMEM);
+        }
+        memset(op_data, 0, sizeof(*op_data));
 
-        op_data.mea1 = mea;
+        op_data->mea1 = mea;
         it.it_op = IT_UNLINK;
 
-        rc = md_enqueue(mds->mds_lmv_exp, LDLM_IBITS, &it, LCK_EX,
-                        &op_data, *rlockh, NULL, 0, ldlm_completion_ast,
+        rc = md_enqueue(mds->mds_md_exp, LDLM_IBITS, &it, LCK_EX,
+                        op_data, *rlockh, NULL, 0, ldlm_completion_ast,
                         mds_blocking_ast, NULL);
+        OBD_FREE(op_data, sizeof(*op_data));
+        EXIT;
 cleanup:
         OBD_FREE(mea, mea_size);
-        RETURN(rc);
+        return rc;
 }
 
 void mds_unlock_slave_objs(struct obd_device *obd, struct dentry *dentry,
-                        struct lustre_handle *lockh)
+                           struct lustre_handle *lockh)
 {
         struct mds_obd *mds = &obd->u.mds;
         struct mea *mea = NULL;
         int mea_size, rc, i;
+        ENTRY;
 
-        if (lockh == NULL)
+        if (lockh == NULL) {
+                EXIT;
                 return;
+        }
 
-       LASSERT(mds->mds_lmv_obd != NULL);
+       LASSERT(mds->mds_md_obd != NULL);
         LASSERT(S_ISDIR(dentry->d_inode->i_mode));
 
-        rc = mds_get_lmv_attr(obd, dentry->d_inode, &mea, &mea_size);
+        rc = mds_md_get_attr(obd, dentry->d_inode, &mea, &mea_size);
         if (rc) {
                 CERROR("locks are leaked\n");
+                EXIT;
                 return;
         }
         LASSERT(mea_size != 0);
@@ -980,45 +999,53 @@ void mds_unlock_slave_objs(struct obd_device *obd, struct dentry *dentry,
 
         OBD_FREE(lockh, sizeof(struct lustre_handle) * mea->mea_count);
         OBD_FREE(mea, mea_size);
-        return;
+        EXIT;
 }
 
 int mds_unlink_slave_objs(struct obd_device *obd, struct dentry *dentry)
 {
         struct mds_obd *mds = &obd->u.mds;
         struct ptlrpc_request *req = NULL;
-        struct mdc_op_data op_data;
+        struct mdc_op_data *op_data;
         struct mea *mea = NULL;
         int mea_size, rc;
+        ENTRY;
 
        /* clustered MD ? */
-       if (!mds->mds_lmv_obd)
-               return 0;
+       if (!mds->mds_md_obd)
+               RETURN(0);
 
         /* a dir can be splitted only */
         if (!S_ISDIR(dentry->d_inode->i_mode))
                 RETURN(0);
 
-        rc = mds_get_lmv_attr(obd, dentry->d_inode, &mea, &mea_size);
+        rc = mds_md_get_attr(obd, dentry->d_inode, &mea, &mea_size);
         if (rc)
                 RETURN(rc);
 
         if (mea == NULL)
-                return 0;
+                RETURN(0);
+                       
         if (mea->mea_count == 0)
                 GOTO(cleanup, rc = 0);
 
         CDEBUG(D_OTHER, "%s: unlink slaves for %lu/%lu\n", obd->obd_name,
-               (unsigned long) dentry->d_inode->i_ino,
-               (unsigned long) dentry->d_inode->i_generation);
+               (unsigned long)dentry->d_inode->i_ino,
+               (unsigned long)dentry->d_inode->i_generation);
 
-        memset(&op_data, 0, sizeof(op_data));
-        op_data.mea1 = mea;
-        rc = md_unlink(mds->mds_lmv_exp, &op_data, &req);
+        OBD_ALLOC(op_data, sizeof(*op_data));
+        if (op_data == NULL)
+                RETURN(-ENOMEM);
+        
+        memset(op_data, 0, sizeof(*op_data));
+        op_data->mea1 = mea;
+        rc = md_unlink(mds->mds_md_exp, op_data, &req);
+        OBD_FREE(op_data, sizeof(*op_data));
         LASSERT(req == NULL);
+        EXIT;
 cleanup:
         OBD_FREE(mea, mea_size);
-        RETURN(rc);
+        return rc;
 }
 
 struct ide_tracking {
@@ -1048,6 +1075,7 @@ noempty:
         return -ENOTEMPTY;
 }
 
+/* checks if passed dentry points to empty dir. */
 int mds_is_dir_empty(struct obd_device *obd, struct dentry *dentry)
 {
         struct ide_tracking it;
@@ -1062,7 +1090,10 @@ int mds_is_dir_empty(struct obd_device *obd, struct dentry *dentry)
         OBD_ALLOC(file_name, nlen);
         if (!file_name)
                 RETURN(-ENOMEM);
-        i = sprintf(file_name, "__iopen__/0x%lx", dentry->d_inode->i_ino);
+        
+        LASSERT(dentry->d_inode != NULL);
+        i = sprintf(file_name, "__iopen__/0x%lx",
+                    dentry->d_inode->i_ino);
 
         file = filp_open(file_name, O_RDONLY, 0);
         if (IS_ERR(file)) {
@@ -1136,6 +1167,7 @@ int mds_lock_and_check_slave(int offset, struct ptlrpc_request *req,
         if (!mds_is_dir_empty(obd, dentry))
                 rc = -ENOTEMPTY;
 
+        EXIT;
 cleanup:
         switch(cleanup_phase) {
         case 1:
@@ -1147,7 +1179,7 @@ cleanup:
         default:
                 break;
         }
-        RETURN(rc);
+        return rc;
 }
 
 int mds_convert_mea_ea(struct obd_device *obd, struct inode *inode,
@@ -1209,7 +1241,7 @@ int mds_convert_mea_ea(struct obd_device *obd, struct inode *inode,
         err = fsfilt_commit(obd, obd->u.mds.mds_sb, inode, handle, 0);
         if (!rc)
                 rc = err ? err : size;
-        GOTO(conv_free, rc);
+        EXIT;
 conv_free:
         OBD_FREE(new, size);
         return rc;