Whamcloud - gitweb
b=8080
[fs/lustre-release.git] / lustre / mds / mds_lmv.c
index c675654..332d43e 100644 (file)
@@ -41,6 +41,8 @@
 #include <linux/lustre_lib.h>
 #include <linux/lustre_fsfilt.h>
 #include <linux/lustre_lite.h>
+#include <linux/lustre_sec.h>
+#include <asm/div64.h>
 
 #include "mds_internal.h"
 
@@ -52,7 +54,9 @@ int mds_md_connect(struct obd_device *obd, char *md_name)
 {
         struct mds_obd *mds = &obd->u.mds;
         struct lustre_handle conn = {0};
-        int rc, valsize, value;
+        unsigned long sec_flags = PTLRPC_SEC_FL_MDS;
+        int rc, value;
+        __u32 valsize;
         ENTRY;
 
         if (IS_ERR(mds->mds_md_obd))
@@ -123,6 +127,11 @@ int mds_md_connect(struct obd_device *obd, char *md_name)
                         GOTO(err_reg, rc);
         }
 
+        rc = obd_set_info(mds->mds_md_exp, strlen("sec_flags"), "sec_flags",
+                          sizeof(sec_flags), &sec_flags);
+        if (rc)
+                GOTO(err_reg, rc);
+
         mds->mds_md_connected = 1;
         up(&mds->mds_md_sem);
        RETURN(0);
@@ -165,7 +174,7 @@ int mds_md_disconnect(struct obd_device *obd, int flags)
         down(&mds->mds_md_sem);
         if (!IS_ERR(mds->mds_md_obd) && mds->mds_md_exp != NULL) {
                 LASSERT(mds->mds_md_connected);
-                
+
                 obd_register_observer(mds->mds_md_obd, NULL);
 
                 if (flags & OBD_OPT_FORCE) {
@@ -468,6 +477,11 @@ static int filldir(void * __buf, const char * name, int namlen,
         }
         
         OBD_ALLOC(n, namlen + 1);
+
+        /* XXX: should be memory allocation checked here in some more smart
+         * manner? */
+        LASSERT(n != NULL);
+
         memcpy(n, name, namlen);
         n[namlen] = (char) 0;
         
@@ -634,7 +648,7 @@ int mds_try_to_split_dir(struct obd_device *obd, struct dentry *dentry,
         struct obdo *oa = NULL;
        int rc, mea_size = 0;
         struct lustre_id id;
-       void *handle;
+        void *handle;
        ENTRY;
 
         if (update_mode != LCK_EX)
@@ -697,9 +711,8 @@ int mds_try_to_split_dir(struct obd_device *obd, struct dentry *dentry,
          */
        oa->o_id = dir->i_ino;
 
-        down(&dir->i_sem);
+        /* get parent id: ldlm lock on the parent protects ea */
         rc = mds_read_inode_sid(obd, dir, &id);
-        up(&dir->i_sem);
         if (rc) {
                 CERROR("Can't read inode self id, inode %lu, "
                        "rc %d.\n", dir->i_ino, rc);
@@ -712,7 +725,7 @@ int mds_try_to_split_dir(struct obd_device *obd, struct dentry *dentry,
         CDEBUG(D_OTHER, "%s: create subdirs with mode %o, uid %u, gid %u\n",
                obd->obd_name, dir->i_mode, dir->i_uid, dir->i_gid);
                         
-        rc = obd_create(mds->mds_md_exp, oa,
+        rc = obd_create(mds->mds_md_exp, oa, NULL, 0,
                         (struct lov_stripe_md **)mea, NULL);
         if (rc) {
                 CERROR("Can't create remote inode, rc = %d\n", rc);
@@ -779,15 +792,11 @@ static int filter_start_page_write(struct inode *inode,
         return 0;
 }
 
-struct dentry *filter_id2dentry(struct obd_device *obd,
-                                struct dentry *dir_dentry,
-                                obd_gr group, obd_id id);
-
 int mds_preprw(int cmd, struct obd_export *exp, struct obdo *oa,
                 int objcount, struct obd_ioobj *obj,
                 int niocount, struct niobuf_remote *nb,
                 struct niobuf_local *res,
-                struct obd_trans_info *oti)
+                struct obd_trans_info *oti, struct lustre_capa *capa)
 {
         struct niobuf_remote *rnb;
         struct niobuf_local *lnb = NULL;
@@ -915,14 +924,34 @@ int mds_commitrw(int cmd, struct obd_export *exp, struct obdo *oa,
         RETURN(rc);
 }
 
-int mds_choose_mdsnum(struct obd_device *obd, const char *name, int len, int flags)
+int mds_choose_mdsnum(struct obd_device *obd, const char *name, int len, int flags,
+                      struct ptlrpc_peer *peer, struct inode *parent, int local)
 {
-        struct lmv_obd *lmv;
         struct mds_obd *mds = &obd->u.mds;
+        char peer_str[PTL_NALFMT_SIZE];
         int i = mds->mds_num;
+        struct lmv_obd *lmv;
+        ENTRY;
+
+        if (local)
+                RETURN(mds->mds_num);
 
-        if (flags & REC_REINT_CREATE) { 
+        if (flags & REC_REINT_CREATE) {
                 i = mds->mds_num;
+        } else if (mds->mds_md_exp != NULL && peer != NULL) {
+                LASSERT(parent != NULL);
+                /* distribute only at root level */
+                lmv = &mds->mds_md_exp->exp_obd->u.lmv;
+                if (parent->i_ino != id_ino(&mds->mds_rootid)) {
+                        i = mds->mds_num;
+                } else {
+                        __u64 nid = peer->peer_id.nid;
+                        __u64 count = lmv->desc.ld_tgt_count;
+                        i = do_div(nid, count);
+                        CDEBUG(D_OTHER, "client from %s creates top dir %*s "
+                               "on mds #%d\n",
+                               ptlrpc_peernid2str(peer,peer_str), len, name, i+1);
+                }
         } else if (mds->mds_md_exp) {
                 lmv = &mds->mds_md_exp->exp_obd->u.lmv;
                 i = raw_name2idx(MEA_MAGIC_LAST_CHAR, lmv->desc.ld_tgt_count, name, len);
@@ -988,10 +1017,16 @@ int mds_lock_slave_objs(struct obd_device *obd, struct dentry *dentry,
         it.it_op = IT_UNLINK;
 
         OBD_ALLOC(it.d.fs_data, sizeof(struct lustre_intent_data));
+        if (!it.d.fs_data) {
+                OBD_FREE(*rlockh, handle_size);
+                OBD_FREE(op_data, sizeof(*op_data));
+                RETURN(-ENOMEM);
+        }
 
         rc = md_enqueue(mds->mds_md_exp, LDLM_IBITS, &it, LCK_EX,
                         op_data, *rlockh, NULL, 0, ldlm_completion_ast,
                         mds_blocking_ast, NULL);
+        
         OBD_FREE(op_data, sizeof(*op_data));
         OBD_FREE(it.d.fs_data, sizeof(struct lustre_intent_data));
         EXIT;