Whamcloud - gitweb
b=8080
[fs/lustre-release.git] / lustre / mds / mds_lmv.c
index e406fd7..332d43e 100644 (file)
 #define DEBUG_SUBSYSTEM S_MDS
 
 #include <linux/module.h>
+#include <linux/dcache.h>
+#include <linux/namei.h>
+#include <linux/obd_support.h>
+#include <linux/obd_class.h>
+#include <linux/obd.h>
+#include <linux/lustre_lib.h>
 #include <linux/lustre_mds.h>
 #include <linux/lustre_idl.h>
 #include <linux/obd_class.h>
 #include <linux/obd_lov.h>
 #include <linux/lustre_lib.h>
 #include <linux/lustre_fsfilt.h>
+#include <linux/lustre_lite.h>
+#include <linux/lustre_sec.h>
+#include <asm/div64.h>
 
 #include "mds_internal.h"
 
-
 /*
  * TODO:
  *   - magic in mea struct
@@ -46,7 +54,9 @@ int mds_md_connect(struct obd_device *obd, char *md_name)
 {
         struct mds_obd *mds = &obd->u.mds;
         struct lustre_handle conn = {0};
-        int rc, valsize, value;
+        unsigned long sec_flags = PTLRPC_SEC_FL_MDS;
+        int rc, value;
+        __u32 valsize;
         ENTRY;
 
         if (IS_ERR(mds->mds_md_obd))
@@ -69,8 +79,8 @@ int mds_md_connect(struct obd_device *obd, char *md_name)
                 GOTO(err_last, rc = -ENOTCONN);
         }
 
-        rc = obd_connect(&conn, mds->mds_md_obd,
-                         &obd->obd_uuid, OBD_OPT_MDS_CONNECTION);
+        rc = obd_connect(&conn, mds->mds_md_obd, &obd->obd_uuid, NULL,
+                         OBD_OPT_MDS_CONNECTION);
         if (rc) {
                 CERROR("MDS cannot connect to MD(LMV) %s (%d)\n",
                        md_name, rc);
@@ -110,6 +120,18 @@ int mds_md_connect(struct obd_device *obd, char *md_name)
         if (rc)
                 GOTO(err_reg, rc);
 
+        if (mds->mds_mds_sec) {
+                rc = obd_set_info(mds->mds_md_exp, strlen("sec"), "sec",
+                                  strlen(mds->mds_mds_sec), mds->mds_mds_sec);
+                if (rc)
+                        GOTO(err_reg, rc);
+        }
+
+        rc = obd_set_info(mds->mds_md_exp, strlen("sec_flags"), "sec_flags",
+                          sizeof(sec_flags), &sec_flags);
+        if (rc)
+                GOTO(err_reg, rc);
+
         mds->mds_md_connected = 1;
         up(&mds->mds_md_sem);
        RETURN(0);
@@ -152,7 +174,7 @@ int mds_md_disconnect(struct obd_device *obd, int flags)
         down(&mds->mds_md_sem);
         if (!IS_ERR(mds->mds_md_obd) && mds->mds_md_exp != NULL) {
                 LASSERT(mds->mds_md_connected);
-                
+
                 obd_register_observer(mds->mds_md_obd, NULL);
 
                 if (flags & OBD_OPT_FORCE) {
@@ -195,6 +217,28 @@ int mds_md_disconnect(struct obd_device *obd, int flags)
         RETURN(rc);
 }
 
+int mds_md_reconnect(struct obd_device *obd)
+{
+        struct mds_obd *mds = &obd->u.mds;
+        struct obd_statfs osfs;
+        int err;
+        ENTRY;
+
+        /* We don't know state of connections to another MDSes
+         * before the failure. If MDS we were connected to before
+         * the failure gets failed, then it will wait for us to
+         * reconnect and will timed recovery out. bug 4920 */
+        if (mds->mds_md_connected == 0)
+                RETURN(0);
+        if (mds->mds_md_obd == NULL)
+                RETURN(0);
+
+        err = obd_statfs(mds->mds_md_obd, &osfs, jiffies - HZ);
+        if (err)
+                CERROR("can't reconnect to MDSes after recovery: %d\n", err);
+        RETURN(0);
+}
+
 int mds_md_get_attr(struct obd_device *obd, struct inode *inode,
                     struct mea **mea, int *mea_size)
 {
@@ -213,7 +257,7 @@ int mds_md_get_attr(struct obd_device *obd, struct inode *inode,
         if (*mea_size < 0 || *mea == NULL)
                 return *mea_size < 0 ? *mea_size : -EINVAL;
 
-        rc = mds_get_md(obd, inode, *mea, mea_size, 1);
+        rc = mds_get_md(obd, inode, *mea, mea_size, 1, 1);
 
        if (rc <= 0) {
                OBD_FREE(*mea, *mea_size);
@@ -433,6 +477,11 @@ static int filldir(void * __buf, const char * name, int namlen,
         }
         
         OBD_ALLOC(n, namlen + 1);
+
+        /* XXX: should be memory allocation checked here in some more smart
+         * manner? */
+        LASSERT(n != NULL);
+
         memcpy(n, name, namlen);
         n[namlen] = (char) 0;
         
@@ -599,7 +648,7 @@ int mds_try_to_split_dir(struct obd_device *obd, struct dentry *dentry,
         struct obdo *oa = NULL;
        int rc, mea_size = 0;
         struct lustre_id id;
-       void *handle;
+        void *handle;
        ENTRY;
 
         if (update_mode != LCK_EX)
@@ -662,9 +711,8 @@ int mds_try_to_split_dir(struct obd_device *obd, struct dentry *dentry,
          */
        oa->o_id = dir->i_ino;
 
-        down(&dir->i_sem);
+        /* get parent id: ldlm lock on the parent protects ea */
         rc = mds_read_inode_sid(obd, dir, &id);
-        up(&dir->i_sem);
         if (rc) {
                 CERROR("Can't read inode self id, inode %lu, "
                        "rc %d.\n", dir->i_ino, rc);
@@ -677,7 +725,7 @@ int mds_try_to_split_dir(struct obd_device *obd, struct dentry *dentry,
         CDEBUG(D_OTHER, "%s: create subdirs with mode %o, uid %u, gid %u\n",
                obd->obd_name, dir->i_mode, dir->i_uid, dir->i_gid);
                         
-        rc = obd_create(mds->mds_md_exp, oa,
+        rc = obd_create(mds->mds_md_exp, oa, NULL, 0,
                         (struct lov_stripe_md **)mea, NULL);
         if (rc) {
                 CERROR("Can't create remote inode, rc = %d\n", rc);
@@ -697,7 +745,7 @@ int mds_try_to_split_dir(struct obd_device *obd, struct dentry *dentry,
                 GOTO(err_oa, rc = PTR_ERR(handle));
         }
 
-       rc = fsfilt_set_md(obd, dir, handle, *mea, mea_size);
+       rc = fsfilt_set_md(obd, dir, handle, *mea, mea_size, EA_MEA);
         if (rc) {
                 up(&dir->i_sem);
                 CERROR("fsfilt_set_md() failed, error %d.\n", rc);
@@ -744,15 +792,11 @@ static int filter_start_page_write(struct inode *inode,
         return 0;
 }
 
-struct dentry *filter_id2dentry(struct obd_device *obd,
-                                struct dentry *dir_dentry,
-                                obd_gr group, obd_id id);
-
 int mds_preprw(int cmd, struct obd_export *exp, struct obdo *oa,
                 int objcount, struct obd_ioobj *obj,
                 int niocount, struct niobuf_remote *nb,
                 struct niobuf_local *res,
-                struct obd_trans_info *oti)
+                struct obd_trans_info *oti, struct lustre_capa *capa)
 {
         struct niobuf_remote *rnb;
         struct niobuf_local *lnb = NULL;
@@ -880,14 +924,34 @@ int mds_commitrw(int cmd, struct obd_export *exp, struct obdo *oa,
         RETURN(rc);
 }
 
-int mds_choose_mdsnum(struct obd_device *obd, const char *name, int len, int flags)
+int mds_choose_mdsnum(struct obd_device *obd, const char *name, int len, int flags,
+                      struct ptlrpc_peer *peer, struct inode *parent, int local)
 {
-        struct lmv_obd *lmv;
         struct mds_obd *mds = &obd->u.mds;
+        char peer_str[PTL_NALFMT_SIZE];
         int i = mds->mds_num;
+        struct lmv_obd *lmv;
+        ENTRY;
 
-        if (flags & REC_REINT_CREATE) { 
+        if (local)
+                RETURN(mds->mds_num);
+
+        if (flags & REC_REINT_CREATE) {
                 i = mds->mds_num;
+        } else if (mds->mds_md_exp != NULL && peer != NULL) {
+                LASSERT(parent != NULL);
+                /* distribute only at root level */
+                lmv = &mds->mds_md_exp->exp_obd->u.lmv;
+                if (parent->i_ino != id_ino(&mds->mds_rootid)) {
+                        i = mds->mds_num;
+                } else {
+                        __u64 nid = peer->peer_id.nid;
+                        __u64 count = lmv->desc.ld_tgt_count;
+                        i = do_div(nid, count);
+                        CDEBUG(D_OTHER, "client from %s creates top dir %*s "
+                               "on mds #%d\n",
+                               ptlrpc_peernid2str(peer,peer_str), len, name, i+1);
+                }
         } else if (mds->mds_md_exp) {
                 lmv = &mds->mds_md_exp->exp_obd->u.lmv;
                 i = raw_name2idx(MEA_MAGIC_LAST_CHAR, lmv->desc.ld_tgt_count, name, len);
@@ -952,10 +1016,19 @@ int mds_lock_slave_objs(struct obd_device *obd, struct dentry *dentry,
         op_data->mea1 = mea;
         it.it_op = IT_UNLINK;
 
+        OBD_ALLOC(it.d.fs_data, sizeof(struct lustre_intent_data));
+        if (!it.d.fs_data) {
+                OBD_FREE(*rlockh, handle_size);
+                OBD_FREE(op_data, sizeof(*op_data));
+                RETURN(-ENOMEM);
+        }
+
         rc = md_enqueue(mds->mds_md_exp, LDLM_IBITS, &it, LCK_EX,
                         op_data, *rlockh, NULL, 0, ldlm_completion_ast,
                         mds_blocking_ast, NULL);
+        
         OBD_FREE(op_data, sizeof(*op_data));
+        OBD_FREE(it.d.fs_data, sizeof(struct lustre_intent_data));
         EXIT;
 cleanup:
         OBD_FREE(mea, mea_size);
@@ -1133,7 +1206,6 @@ int mds_lock_and_check_slave(int offset, struct ptlrpc_request *req,
                 CERROR("Can't unpack security desc\n");
                 GOTO(cleanup, rc = -EFAULT);
         }
-        mds_squash_root(&obd->u.mds, rsd, &req->rq_peer.peer_id.nid); 
 
         body = lustre_swab_reqbuf(req, offset, sizeof(*body),
                                   lustre_swab_mds_body);
@@ -1153,27 +1225,25 @@ int mds_lock_and_check_slave(int offset, struct ptlrpc_request *req,
         }
         cleanup_phase = 1;
 
-        /* 
-         * handling the case when remote MDS checks if dir is empty before
-         * rename. But it also does it for all entries, because inode is stored
-         * here and remote MDS does not know if rename point to dir or to reg
-         * file. So we check it here.
-         */
+       /* 
+        * handling the case when remote MDS checks if dir is empty 
+        * before rename. But it also does it for all entries, because
+        * inode is stored here and remote MDS does not know if rename
+        * point to dir or to reg file. So we check it here. 
+        */
        if (!S_ISDIR(dentry->d_inode->i_mode))
                GOTO(cleanup, rc = 0);
 
-        rc = mds_init_ucred(&uc, rsd);
+        rc = mds_init_ucred(&uc, req, rsd);
         if (rc) {
-                CERROR("can't init ucred\n");
                 GOTO(cleanup, rc);
         }
 
         push_ctxt(&saved, &obd->obd_lvfs_ctxt, &uc);
+       rc = mds_is_dir_empty(obd, dentry) ? 0 : -ENOTEMPTY;
+        pop_ctxt(&saved, &obd->obd_lvfs_ctxt, &uc);
 
-        rc = 0;
-        if (!mds_is_dir_empty(obd, dentry))
-                rc = -ENOTEMPTY;
-
+        mds_exit_ucred(&uc);
         EXIT;
 cleanup:
         switch(cleanup_phase) {
@@ -1181,8 +1251,6 @@ cleanup:
                 if (rc)
                         ldlm_lock_decref(lockh, LCK_EX);
                 l_dput(dentry);
-                pop_ctxt(&saved, &obd->obd_lvfs_ctxt, &uc);
-                mds_exit_ucred(&uc);
         default:
                 break;
         }
@@ -1190,49 +1258,41 @@ cleanup:
 }
 
 int mds_convert_mea_ea(struct obd_device *obd, struct inode *inode,
-                       struct lov_mds_md *lmm, int lmmsize)
+                       struct lov_mds_md *lmm, int lmm_size)
 {
-        int i, rc, err, size;
+        struct lov_stripe_md *lsm = NULL;
         struct mea_old *old;
         struct mea *mea;
-        struct mea *new;
         void *handle;
+        int rc, err;
         ENTRY;
 
-        mea = (struct mea *) lmm;
+        mea = (struct mea *)lmm;
+        old = (struct mea_old *)lmm;
+
         if (mea->mea_magic == MEA_MAGIC_LAST_CHAR ||
-                mea->mea_magic == MEA_MAGIC_ALL_CHARS)
+            mea->mea_magic == MEA_MAGIC_ALL_CHARS)
                 RETURN(0);
 
-        old = (struct mea_old *) lmm;
-        
-        rc = sizeof(struct lustre_id) * old->mea_count + 
-                sizeof(struct mea_old);
-        
-        if (old->mea_count > 256 || old->mea_master > 256 || lmmsize < rc
-                        || old->mea_master > old->mea_count) {
-                CWARN("unknown MEA format, dont convert it\n");
-                CWARN("  count %u, master %u, size %u\n",
-                      old->mea_count, old->mea_master, rc);
-                RETURN(0);
-        }
-                
-        CWARN("converting MEA EA on %lu/%u from V0 to V1 (%u/%u)\n",
-              inode->i_ino, inode->i_generation, old->mea_count, 
-              old->mea_master);
+        /*
+         * making MDS try LOV EA converting in the non-LMV configuration
+         * cases.
+         */
+        if (!obd->u.mds.mds_md_exp)
+                RETURN(-EINVAL);
 
-        size = sizeof(struct lustre_id) * old->mea_count + 
-                sizeof(struct mea);
-        
-        OBD_ALLOC(new, size);
-        if (new == NULL)
-                RETURN(-ENOMEM);
+        CDEBUG(D_INODE, "converting MEA EA on %lu/%u from V0 to V1 (%u/%u)\n",
+               inode->i_ino, inode->i_generation, old->mea_count, 
+               old->mea_master);
+
+        rc = obd_unpackmd(obd->u.mds.mds_md_exp, &lsm, lmm, lmm_size);
+        if (rc < 0)
+                GOTO(conv_end, rc);
 
-        new->mea_magic = MEA_MAGIC_LAST_CHAR;
-        new->mea_count = old->mea_count;
-        new->mea_master = old->mea_master;
-        for (i = 0; i < new->mea_count; i++)
-                new->mea_ids[i] = old->mea_ids[i];
+        rc = obd_packmd(obd->u.mds.mds_md_exp, &lmm, lsm);
+        if (rc < 0)
+                GOTO(conv_free, rc);
+        lmm_size = rc;
 
         handle = fsfilt_start(obd, inode, FSFILT_OP_SETATTR, NULL);
         if (IS_ERR(handle)) {
@@ -1240,17 +1300,14 @@ int mds_convert_mea_ea(struct obd_device *obd, struct inode *inode,
                 GOTO(conv_free, rc);
         }
 
-        rc = fsfilt_set_md(obd, inode, handle, (struct lov_mds_md *) new, size);
-        if (rc > lmmsize)
-                size = lmmsize;
-        memcpy(lmm, new, size);
-
+        rc = fsfilt_set_md(obd, inode, handle, lmm, lmm_size, EA_MEA);
         err = fsfilt_commit(obd, obd->u.mds.mds_sb, inode, handle, 0);
         if (!rc)
-                rc = err ? err : size;
-        EXIT;
+                rc = err ? err : lmm_size;
+        GOTO(conv_free, rc);
 conv_free:
-        OBD_FREE(new, size);
+        obd_free_memmd(obd->u.mds.mds_md_exp, &lsm);
+conv_end:
         return rc;
 }