Whamcloud - gitweb
- fixed using not initialized ctxt with pop_ctxt() in mds_lock_and_check_slave()
[fs/lustre-release.git] / lustre / mds / mds_lmv.c
index 6c76ff3..0287b11 100644 (file)
 /*
  * TODO:
  *   - magic in mea struct
- *   - error handling is totally missed
  */
-
-int mds_lmv_connect(struct obd_device *obd, char *lmv_name)
+int mds_md_connect(struct obd_device *obd, char *md_name)
 {
         struct mds_obd *mds = &obd->u.mds;
         struct lustre_handle conn = {0};
         int rc, valsize, value;
         ENTRY;
 
-        if (IS_ERR(mds->mds_lmv_obd))
-                RETURN(PTR_ERR(mds->mds_lmv_obd));
+        if (IS_ERR(mds->mds_md_obd))
+                RETURN(PTR_ERR(mds->mds_md_obd));
 
-        if (mds->mds_lmv_connected)
+        if (mds->mds_md_connected)
                 RETURN(0);
 
-        down(&mds->mds_lmv_sem);
-        if (mds->mds_lmv_connected) {
-                up(&mds->mds_lmv_sem);
+        down(&mds->mds_md_sem);
+        if (mds->mds_md_connected) {
+                up(&mds->mds_md_sem);
                 RETURN(0);
         }
 
-        mds->mds_lmv_obd = class_name2obd(lmv_name);
-        if (!mds->mds_lmv_obd) {
-                CERROR("MDS cannot locate LMV %s\n",
-                       lmv_name);
-                mds->mds_lmv_obd = ERR_PTR(-ENOTCONN);
+        mds->mds_md_obd = class_name2obd(md_name);
+        if (!mds->mds_md_obd) {
+                CERROR("MDS cannot locate MD(LMV) %s\n",
+                       md_name);
+                mds->mds_md_obd = ERR_PTR(-ENOTCONN);
                 GOTO(err_last, rc = -ENOTCONN);
         }
 
-        rc = obd_connect(&conn, mds->mds_lmv_obd,
+        rc = obd_connect(&conn, mds->mds_md_obd,
                          &obd->obd_uuid, OBD_OPT_MDS_CONNECTION);
         if (rc) {
-                CERROR("MDS cannot connect to LMV %s (%d)\n",
-                       lmv_name, rc);
-                mds->mds_lmv_obd = ERR_PTR(rc);
+                CERROR("MDS cannot connect to MD(LMV) %s (%d)\n",
+                       md_name, rc);
+                mds->mds_md_obd = ERR_PTR(rc);
                 GOTO(err_last, rc);
         }
-        mds->mds_lmv_exp = class_conn2export(&conn);
-        if (mds->mds_lmv_exp == NULL)
+        mds->mds_md_exp = class_conn2export(&conn);
+        if (mds->mds_md_exp == NULL)
                 CERROR("can't get export!\n");
 
-        rc = obd_register_observer(mds->mds_lmv_obd, obd);
+        rc = obd_register_observer(mds->mds_md_obd, obd);
         if (rc) {
-                CERROR("MDS cannot register as observer of LMV %s, "
-                       "rc = %d\n", lmv_name, rc);
+                CERROR("MDS cannot register as observer of MD(LMV) %s, "
+                       "rc = %d\n", md_name, rc);
                 GOTO(err_discon, rc);
         }
 
         /* retrieve size of EA */
-        rc = obd_get_info(mds->mds_lmv_exp, strlen("mdsize"),
+        rc = obd_get_info(mds->mds_md_exp, strlen("mdsize"),
                           "mdsize", &valsize, &value);
         if (rc) 
                 GOTO(err_reg, rc);
@@ -100,41 +98,41 @@ int mds_lmv_connect(struct obd_device *obd, char *lmv_name)
                 mds->mds_max_mdsize = value;
 
         /* find our number in LMV cluster */
-        rc = obd_get_info(mds->mds_lmv_exp, strlen("mdsnum"),
+        rc = obd_get_info(mds->mds_md_exp, strlen("mdsnum"),
                           "mdsnum", &valsize, &value);
         if (rc) 
                 GOTO(err_reg, rc);
         
         mds->mds_num = value;
 
-        rc = obd_set_info(mds->mds_lmv_exp, strlen("inter_mds"),
+        rc = obd_set_info(mds->mds_md_exp, strlen("inter_mds"),
                           "inter_mds", 0, NULL);
         if (rc)
                 GOTO(err_reg, rc);
 
-        mds->mds_lmv_connected = 1;
-        up(&mds->mds_lmv_sem);
+        mds->mds_md_connected = 1;
+        up(&mds->mds_md_sem);
        RETURN(0);
 
 err_reg:
-        obd_register_observer(mds->mds_lmv_obd, NULL);
+        obd_register_observer(mds->mds_md_obd, NULL);
 err_discon:
-        obd_disconnect(mds->mds_lmv_exp, 0);
-        mds->mds_lmv_exp = NULL;
-        mds->mds_lmv_obd = ERR_PTR(rc);
+        obd_disconnect(mds->mds_md_exp, 0);
+        mds->mds_md_exp = NULL;
+        mds->mds_md_obd = ERR_PTR(rc);
 err_last:
-        up(&mds->mds_lmv_sem);
+        up(&mds->mds_md_sem);
         return rc;
 }
 
-int mds_lmv_postsetup(struct obd_device *obd)
+int mds_md_postsetup(struct obd_device *obd)
 {
         struct mds_obd *mds = &obd->u.mds;
         int rc = 0;
         ENTRY;
 
-        if (mds->mds_lmv_exp) {
-                rc = obd_init_ea_size(mds->mds_lmv_exp,
+        if (mds->mds_md_exp) {
+                rc = obd_init_ea_size(mds->mds_md_exp,
                                       mds->mds_max_mdsize,
                                       mds->mds_max_cookiesize);
         }
@@ -142,26 +140,26 @@ int mds_lmv_postsetup(struct obd_device *obd)
         RETURN(rc);
 }
 
-int mds_lmv_disconnect(struct obd_device *obd, int flags)
+int mds_md_disconnect(struct obd_device *obd, int flags)
 {
         struct mds_obd *mds = &obd->u.mds;
         int rc = 0;
         ENTRY;
 
-        if (!mds->mds_lmv_connected)
+        if (!mds->mds_md_connected)
                 RETURN(0);
 
-        down(&mds->mds_lmv_sem);
-        if (!IS_ERR(mds->mds_lmv_obd) && mds->mds_lmv_exp != NULL) {
-                LASSERT(mds->mds_lmv_connected);
+        down(&mds->mds_md_sem);
+        if (!IS_ERR(mds->mds_md_obd) && mds->mds_md_exp != NULL) {
+                LASSERT(mds->mds_md_connected);
                 
-                obd_register_observer(mds->mds_lmv_obd, NULL);
+                obd_register_observer(mds->mds_md_obd, NULL);
 
                 if (flags & OBD_OPT_FORCE) {
                         struct obd_device *lmv_obd;
                         struct obd_ioctl_data ioc_data = { 0 };
                         
-                        lmv_obd = class_exp2obd(mds->mds_lmv_exp);
+                        lmv_obd = class_exp2obd(mds->mds_md_exp);
                         if (lmv_obd == NULL)
                                 GOTO(out, rc = 0);
 
@@ -175,7 +173,7 @@ int mds_lmv_disconnect(struct obd_device *obd, int flags)
                          * it here. --umka.
                          */
                         lmv_obd->obd_no_recov = 1;
-                        obd_iocontrol(IOC_OSC_SET_ACTIVE, mds->mds_lmv_exp,
+                        obd_iocontrol(IOC_OSC_SET_ACTIVE, mds->mds_md_exp,
                                       sizeof(ioc_data), &ioc_data, NULL);
                 }
 
@@ -184,33 +182,33 @@ int mds_lmv_disconnect(struct obd_device *obd, int flags)
                  * disconnected by class_disconnect_exports()) then we just need
                  * to drop our ref.
                  */
-                mds->mds_lmv_connected = 0;
-                rc = obd_disconnect(mds->mds_lmv_exp, flags);
+                mds->mds_md_connected = 0;
+                rc = obd_disconnect(mds->mds_md_exp, flags);
                 if (rc)
-                        class_export_put(mds->mds_lmv_exp);
+                        class_export_put(mds->mds_md_exp);
 
         out:
-                mds->mds_lmv_exp = NULL;
-                mds->mds_lmv_obd = NULL;
+                mds->mds_md_exp = NULL;
+                mds->mds_md_obd = NULL;
         }
-        up(&mds->mds_lmv_sem);
+        up(&mds->mds_md_sem);
         RETURN(rc);
 }
 
-int mds_get_lmv_attr(struct obd_device *obd, struct inode *inode,
-                     struct mea **mea, int *mea_size)
+int mds_md_get_attr(struct obd_device *obd, struct inode *inode,
+                    struct mea **mea, int *mea_size)
 {
         struct mds_obd *mds = &obd->u.mds;
        int rc;
         ENTRY;
 
-       if (!mds->mds_lmv_obd)
+       if (!mds->mds_md_obd)
                RETURN(0);
         if (!S_ISDIR(inode->i_mode))
                 RETURN(0);
 
        /* first calculate mea size */
-        *mea_size = obd_alloc_diskmd(mds->mds_lmv_exp,
+        *mea_size = obd_alloc_diskmd(mds->mds_md_exp,
                                      (struct lov_mds_md **)mea);
         if (*mea_size < 0 || *mea == NULL)
                 return *mea_size < 0 ? *mea_size : -EINVAL;
@@ -360,7 +358,7 @@ static int flush_buffer_onto_mds(struct dirsplit_control *dc, int mdsnum)
                 ca->brwc.count = PAGE_SIZE;
                 ca->brwc.flag = 0;
                 ca->oa.o_mds = mdsnum;
-                rc = obd_brw(OBD_BRW_WRITE, mds->mds_lmv_exp, &ca->oa,
+                rc = obd_brw(OBD_BRW_WRITE, mds->mds_md_exp, &ca->oa,
                              (struct lov_stripe_md *) dc->mea,
                              1, &ca->brwc, NULL);
                 if (rc)
@@ -551,7 +549,7 @@ int mds_splitting_expected(struct obd_device *obd, struct dentry *dentry)
         int rc, size;
 
        /* clustered MD ? */
-       if (!mds->mds_lmv_obd)
+       if (!mds->mds_md_obd)
                return MDS_NO_SPLITTABLE;
 
         /* inode exist? */
@@ -574,7 +572,7 @@ int mds_splitting_expected(struct obd_device *obd, struct dentry *dentry)
         if (dentry->d_inode->i_size < MAX_DIR_SIZE)
                 return MDS_NO_SPLIT_EXPECTED;
 
-        mds_get_lmv_attr(obd, dentry->d_inode, &mea, &size);
+        mds_md_get_attr(obd, dentry->d_inode, &mea, &size);
         if (mea) {
                 /* already splitted or slave object: shouldn't be splitted */
                 rc = MDS_NO_SPLITTABLE;
@@ -624,13 +622,13 @@ int mds_try_to_split_dir(struct obd_device *obd, struct dentry *dentry,
 
         if (mea == NULL)
                 mea = &tmea;
-        mea_size = obd_size_diskmd(mds->mds_lmv_exp, NULL);
+        mea_size = obd_size_diskmd(mds->mds_md_exp, NULL);
 
         /* FIXME: Actually we may only want to allocate enough space for
          * necessary amount of stripes, but on the other hand with this
          * approach of allocating maximal possible amount of MDS slots,
          * it would be easier to split the dir over more MDSes */
-        rc = obd_alloc_diskmd(mds->mds_lmv_exp, (void *)mea);
+        rc = obd_alloc_diskmd(mds->mds_md_exp, (void *)mea);
         if (rc < 0) {
                 CERROR("obd_alloc_diskmd() failed, error %d.\n", rc);
                 RETURN(rc);
@@ -679,7 +677,7 @@ int mds_try_to_split_dir(struct obd_device *obd, struct dentry *dentry,
         CDEBUG(D_OTHER, "%s: create subdirs with mode %o, uid %u, gid %u\n",
                obd->obd_name, dir->i_mode, dir->i_uid, dir->i_gid);
                         
-        rc = obd_create(mds->mds_lmv_exp, oa,
+        rc = obd_create(mds->mds_md_exp, oa,
                         (struct lov_stripe_md **)mea, NULL);
         if (rc) {
                 CERROR("Can't create remote inode, rc = %d\n", rc);
@@ -719,7 +717,7 @@ int mds_try_to_split_dir(struct obd_device *obd, struct dentry *dentry,
        /* 3) read through the dir and distribute it over objects */
         rc = scan_and_distribute(obd, dentry, *mea);
        if (mea == &tmea)
-                obd_free_diskmd(mds->mds_lmv_exp, (struct lov_mds_md **)mea);
+                obd_free_diskmd(mds->mds_md_exp, (struct lov_mds_md **)mea);
         if (rc) {
                 CERROR("scan_and_distribute() failed, error %d.\n", rc);
                 RETURN(rc);
@@ -890,8 +888,8 @@ int mds_choose_mdsnum(struct obd_device *obd, const char *name, int len, int fla
 
         if (flags & REC_REINT_CREATE) { 
                 i = mds->mds_num;
-        } else if (mds->mds_lmv_exp) {
-                lmv = &mds->mds_lmv_exp->exp_obd->u.lmv;
+        } else if (mds->mds_md_exp) {
+                lmv = &mds->mds_md_exp->exp_obd->u.lmv;
                 i = raw_name2idx(MEA_MAGIC_LAST_CHAR, lmv->desc.ld_tgt_count, name, len);
         }
         RETURN(i);
@@ -901,7 +899,7 @@ int mds_lock_slave_objs(struct obd_device *obd, struct dentry *dentry,
                         struct lustre_handle **rlockh)
 {
         struct mds_obd *mds = &obd->u.mds;
-        struct mdc_op_data op_data;
+        struct mdc_op_data *op_data;
         struct lookup_intent it;
         struct mea *mea = NULL;
         int mea_size, rc;
@@ -913,15 +911,15 @@ int mds_lock_slave_objs(struct obd_device *obd, struct dentry *dentry,
         LASSERT(dentry->d_inode != NULL);
 
        /* clustered MD ? */
-       if (!mds->mds_lmv_obd)
+       if (!mds->mds_md_obd)
                RETURN(0);
 
         /* a dir can be splitted only */
         if (!S_ISDIR(dentry->d_inode->i_mode))
                 RETURN(0);
 
-        rc = mds_get_lmv_attr(obd, dentry->d_inode,
-                              &mea, &mea_size);
+        rc = mds_md_get_attr(obd, dentry->d_inode,
+                             &mea, &mea_size);
         if (rc)
                 RETURN(rc);
 
@@ -944,35 +942,46 @@ int mds_lock_slave_objs(struct obd_device *obd, struct dentry *dentry,
                 GOTO(cleanup, rc = -ENOMEM);
 
         memset(*rlockh, 0, handle_size);
-        memset(&op_data, 0, sizeof(op_data));
+        OBD_ALLOC(op_data, sizeof(*op_data));
+        if (op_data == NULL) {
+                OBD_FREE(*rlockh, handle_size);
+                RETURN(-ENOMEM);
+        }
+        memset(op_data, 0, sizeof(*op_data));
 
-        op_data.mea1 = mea;
+        op_data->mea1 = mea;
         it.it_op = IT_UNLINK;
 
-        rc = md_enqueue(mds->mds_lmv_exp, LDLM_IBITS, &it, LCK_EX,
-                        &op_data, *rlockh, NULL, 0, ldlm_completion_ast,
+        rc = md_enqueue(mds->mds_md_exp, LDLM_IBITS, &it, LCK_EX,
+                        op_data, *rlockh, NULL, 0, ldlm_completion_ast,
                         mds_blocking_ast, NULL);
+        OBD_FREE(op_data, sizeof(*op_data));
+        EXIT;
 cleanup:
         OBD_FREE(mea, mea_size);
-        RETURN(rc);
+        return rc;
 }
 
 void mds_unlock_slave_objs(struct obd_device *obd, struct dentry *dentry,
-                        struct lustre_handle *lockh)
+                           struct lustre_handle *lockh)
 {
         struct mds_obd *mds = &obd->u.mds;
         struct mea *mea = NULL;
         int mea_size, rc, i;
+        ENTRY;
 
-        if (lockh == NULL)
+        if (lockh == NULL) {
+                EXIT;
                 return;
+        }
 
-       LASSERT(mds->mds_lmv_obd != NULL);
+       LASSERT(mds->mds_md_obd != NULL);
         LASSERT(S_ISDIR(dentry->d_inode->i_mode));
 
-        rc = mds_get_lmv_attr(obd, dentry->d_inode, &mea, &mea_size);
+        rc = mds_md_get_attr(obd, dentry->d_inode, &mea, &mea_size);
         if (rc) {
                 CERROR("locks are leaked\n");
+                EXIT;
                 return;
         }
         LASSERT(mea_size != 0);
@@ -990,26 +999,27 @@ void mds_unlock_slave_objs(struct obd_device *obd, struct dentry *dentry,
 
         OBD_FREE(lockh, sizeof(struct lustre_handle) * mea->mea_count);
         OBD_FREE(mea, mea_size);
+        EXIT;
 }
 
 int mds_unlink_slave_objs(struct obd_device *obd, struct dentry *dentry)
 {
         struct mds_obd *mds = &obd->u.mds;
         struct ptlrpc_request *req = NULL;
-        struct mdc_op_data op_data;
+        struct mdc_op_data *op_data;
         struct mea *mea = NULL;
         int mea_size, rc;
         ENTRY;
 
        /* clustered MD ? */
-       if (!mds->mds_lmv_obd)
+       if (!mds->mds_md_obd)
                RETURN(0);
 
         /* a dir can be splitted only */
         if (!S_ISDIR(dentry->d_inode->i_mode))
                 RETURN(0);
 
-        rc = mds_get_lmv_attr(obd, dentry->d_inode, &mea, &mea_size);
+        rc = mds_md_get_attr(obd, dentry->d_inode, &mea, &mea_size);
         if (rc)
                 RETURN(rc);
 
@@ -1020,12 +1030,17 @@ int mds_unlink_slave_objs(struct obd_device *obd, struct dentry *dentry)
                 GOTO(cleanup, rc = 0);
 
         CDEBUG(D_OTHER, "%s: unlink slaves for %lu/%lu\n", obd->obd_name,
-               (unsigned long) dentry->d_inode->i_ino,
-               (unsigned long) dentry->d_inode->i_generation);
+               (unsigned long)dentry->d_inode->i_ino,
+               (unsigned long)dentry->d_inode->i_generation);
 
-        memset(&op_data, 0, sizeof(op_data));
-        op_data.mea1 = mea;
-        rc = md_unlink(mds->mds_lmv_exp, &op_data, &req);
+        OBD_ALLOC(op_data, sizeof(*op_data));
+        if (op_data == NULL)
+                RETURN(-ENOMEM);
+        
+        memset(op_data, 0, sizeof(*op_data));
+        op_data->mea1 = mea;
+        rc = md_unlink(mds->mds_md_exp, op_data, &req);
+        OBD_FREE(op_data, sizeof(*op_data));
         LASSERT(req == NULL);
         EXIT;
 cleanup:
@@ -1138,7 +1153,14 @@ int mds_lock_and_check_slave(int offset, struct ptlrpc_request *req,
         }
         cleanup_phase = 1;
 
-        LASSERT(S_ISDIR(dentry->d_inode->i_mode));
+        /* 
+         * handling the case when remote MDS checks if dir is empty before
+         * rename. But it also does it for all entries, because inode is stored
+         * here and remote MDS does not know if rename point to dir or to reg
+         * file. So we check it here.
+         */
+       if (!S_ISDIR(dentry->d_inode->i_mode))
+               GOTO(cleanup, rc = 0);
 
         rc = mds_init_ucred(&uc, rsd);
         if (rc) {
@@ -1147,11 +1169,10 @@ int mds_lock_and_check_slave(int offset, struct ptlrpc_request *req,
         }
 
         push_ctxt(&saved, &obd->obd_lvfs_ctxt, &uc);
+       rc = mds_is_dir_empty(obd, dentry) ? 0 : -ENOTEMPTY;
+        pop_ctxt(&saved, &obd->obd_lvfs_ctxt, &uc);
 
-        rc = 0;
-        if (!mds_is_dir_empty(obd, dentry))
-                rc = -ENOTEMPTY;
-
+        mds_exit_ucred(&uc);
         EXIT;
 cleanup:
         switch(cleanup_phase) {
@@ -1159,8 +1180,6 @@ cleanup:
                 if (rc)
                         ldlm_lock_decref(lockh, LCK_EX);
                 l_dput(dentry);
-                pop_ctxt(&saved, &obd->obd_lvfs_ctxt, &uc);
-                mds_exit_ucred(&uc);
         default:
                 break;
         }