Whamcloud - gitweb
land lustre part of b_hd_sec on HEAD.
[fs/lustre-release.git] / lustre / mds / handler.c
index d328fd1..2b2c223 100644 (file)
@@ -39,6 +39,7 @@
 #include <linux/random.h>
 #include <linux/fs.h>
 #include <linux/jbd.h>
+#include <linux/namei.h>
 #include <linux/ext3_fs.h>
 #if (LINUX_VERSION_CODE >= KERNEL_VERSION(2,5,0))
 # include <linux/smp_lock.h>
@@ -55,6 +56,7 @@
 #include <linux/lprocfs_status.h>
 #include <linux/lustre_commit_confd.h>
 
+#include <linux/lustre_acl.h>
 #include "mds_internal.h"
 
 static int mds_intent_policy(struct ldlm_namespace *ns,
@@ -720,33 +722,150 @@ int mds_pack_md(struct obd_device *obd, struct lustre_msg *msg, int offset,
 
         RETURN(rc);
 }
+int mds_pack_link(struct dentry *dentry, struct ptlrpc_request *req,
+                  struct mds_body *repbody, int reply_off)
+{
+        struct inode *inode = dentry->d_inode;
+        char *symname;
+        int len, rc;
+        ENTRY;
+
+        symname = lustre_msg_buf(req->rq_repmsg, reply_off + 1,0);
+        LASSERT(symname != NULL);
+        len = req->rq_repmsg->buflens[reply_off + 1];
+        
+        rc = inode->i_op->readlink(dentry, symname, len);
+        if (rc < 0) {
+                CERROR("readlink failed: %d\n", rc);
+        } else if (rc != len - 1) {
+                CERROR ("Unexpected readlink rc %d: expecting %d\n",
+                        rc, len - 1);
+                rc = -EINVAL;
+        } else {
+                CDEBUG(D_INODE, "read symlink dest %s\n", symname);
+                repbody->valid |= OBD_MD_LINKNAME;
+                repbody->eadatasize = rc + 1;
+                symname[rc] = 0;        /* NULL terminate */
+                rc = 0;
+        }
+
+        RETURN(rc);
+}
+
+int mds_pack_ea(struct dentry *dentry, struct ptlrpc_request *req,
+                struct mds_body *repbody, int req_off, int reply_off)
+{
+        struct inode *inode = dentry->d_inode;
+        char *ea_name;
+        void *value = NULL;
+        int len, rc;
+        ENTRY;
+
+        ea_name = lustre_msg_string(req->rq_reqmsg, req_off + 1, 0);
+        len = req->rq_repmsg->buflens[reply_off + 1];
+        if (len != 0)
+                value = lustre_msg_buf(req->rq_repmsg, reply_off + 1, len);
+
+        rc = -EOPNOTSUPP;
+        if (inode->i_op && inode->i_op->getxattr) 
+                rc = inode->i_op->getxattr(dentry, ea_name, value, len);
 
+        if (rc < 0) {
+                if (rc != -ENODATA && rc != -EOPNOTSUPP)
+                        CERROR("getxattr failed: %d", rc);
+        } else {
+                repbody->valid |= OBD_MD_FLEA;
+                repbody->eadatasize = rc;
+                rc = 0;
+        }
+
+        RETURN(rc);        
+}
+
+int mds_pack_ealist(struct dentry *dentry, struct ptlrpc_request *req,
+                    struct mds_body *repbody, int reply_off)
+{
+        struct inode *inode = dentry->d_inode;        
+        void *value = NULL;
+        int len, rc;
+        ENTRY;
+
+        len = req->rq_repmsg->buflens[reply_off + 1];
+        if (len != 0)
+                value = lustre_msg_buf(req->rq_repmsg, reply_off + 1, len);
+
+        rc = -EOPNOTSUPP;
+        if (inode->i_op && inode->i_op->getxattr) 
+                rc = inode->i_op->listxattr(dentry, value, len);
+
+        if (rc < 0) {
+                CERROR("listxattr failed: %d", rc);
+        } else {
+                repbody->valid |= OBD_MD_FLEALIST;
+                repbody->eadatasize = rc;
+                rc = 0;
+        }
+        RETURN(rc);
+}
+
+int mds_pack_acl(struct obd_device *obd, struct lustre_msg *repmsg, int offset,
+                 struct mds_body *body, struct inode *inode)
+{
+        struct dentry de = { .d_inode = inode };
+        void *buf;
+        __u32 buflen, *sizep, size;
+        ENTRY;
+
+        if (!inode->i_op->getxattr)
+                RETURN(0);
+
+        buflen = repmsg->buflens[offset + 1];
+        buf = lustre_msg_buf(repmsg, offset + 1, buflen);
+
+        size = inode->i_op->getxattr(&de, XATTR_NAME_ACL_ACCESS, buf, buflen);
+        if (size == -ENODATA)
+                RETURN(0);
+        if (size < 0)
+                RETURN(size);
+        LASSERT(size);
+
+        sizep = lustre_msg_buf(repmsg, offset, 4);
+        if (!sizep) {
+                CERROR("can't locate returned acl size buf\n");
+                RETURN(-EPROTO);
+        }
+
+        *sizep = cpu_to_le32(size);
+        body->valid |= OBD_MD_FLACL_ACCESS;
+
+        RETURN(0);
+}
+
+/* 
+ * we only take care of fsuid/fsgid.
+ */
 void mds_squash_root(struct mds_obd *mds, struct mds_req_sec_desc *rsd,
                      ptl_nid_t *peernid)
 {
-        if (!mds->mds_squash_uid ||
-            (rsd->rsd_uid && rsd->rsd_fsuid))
+        if (!mds->mds_squash_uid || rsd->rsd_fsuid)
                 return;
 
         if (*peernid == mds->mds_nosquash_nid)
                 return;
 
-        CDEBUG(D_OTHER, "squash req from 0x%llx, (%d:%d/%x)=>(%d:%d/%x)\n",
+        CDEBUG(D_SEC, "squash req from 0x%llx, (%d:%d/%x)=>(%d:%d/%x)\n",
                 *peernid, rsd->rsd_fsuid, rsd->rsd_fsgid, rsd->rsd_cap,
                 mds->mds_squash_uid, mds->mds_squash_gid,
                 (rsd->rsd_cap & ~CAP_FS_MASK));
 
-        rsd->rsd_uid = mds->mds_squash_uid;
         rsd->rsd_fsuid = mds->mds_squash_uid;
         rsd->rsd_fsgid = mds->mds_squash_gid;
-
-        /* XXX should we remove all capabilities? */
         rsd->rsd_cap &= ~CAP_FS_MASK;
 }
 
 static int mds_getattr_internal(struct obd_device *obd, struct dentry *dentry,
-                                struct ptlrpc_request *req, struct mds_body *reqbody,
-                                int reply_off)
+                                struct ptlrpc_request *req, int req_off,
+                                struct mds_body *reqbody, int reply_off)
 {
         struct inode *inode = dentry->d_inode;
         struct mds_body *body;
@@ -782,30 +901,22 @@ static int mds_getattr_internal(struct obd_device *obd, struct dentry *dentry,
                                         OBD_MD_FLATIME | OBD_MD_FLMTIME);
         } else if (S_ISLNK(inode->i_mode) &&
                    (reqbody->valid & OBD_MD_LINKNAME) != 0) {
-                int len = req->rq_repmsg->buflens[reply_off + 1];
-                char *symname = lustre_msg_buf(req->rq_repmsg, reply_off + 1, 0);
-
-                LASSERT(symname != NULL);       /* caller prepped reply */
-
-                if (!inode->i_op->readlink) {
-                        rc = -ENOSYS;
-                } else {
-                        rc = inode->i_op->readlink(dentry, symname, len);
-                        if (rc < 0) {
-                                CERROR("readlink failed: %d\n", rc);
-                        } else if (rc != len - 1) {
-                                CERROR("Unexpected readlink rc %d: expecting %d\n",
-                                        rc, len - 1);
-                                rc = -EINVAL;
-                        } else {
-                                CDEBUG(D_INODE, "read symlink dest %s\n", symname);
-                                body->valid |= OBD_MD_LINKNAME;
-                                body->eadatasize = rc + 1;
-                                symname[rc] = 0;
-                                rc = 0;
-                        }
-                }
+                rc = mds_pack_link(dentry, req, body, reply_off);
+        } else if (reqbody->valid & OBD_MD_FLEA) {
+                rc = mds_pack_ea(dentry, req, body, req_off, reply_off);
+        } else if (reqbody->valid & OBD_MD_FLEALIST) {
+                rc = mds_pack_ealist(dentry, req, body, reply_off);
         }
+        
+        if (reqbody->valid & OBD_MD_FLACL_ACCESS) {
+                int inc = (reqbody->valid & OBD_MD_FLEASIZE) ? 2 : 1;
+                rc = mds_pack_acl(obd, req->rq_repmsg, reply_off + inc, 
+                                  body, inode);
+        }                
+
+        /* do reverse uid/gid mapping if needed */
+        if (rc == 0 && req->rq_remote)
+                mds_reverse_map_ugid(req, body);
 
         RETURN(rc);
 }
@@ -834,13 +945,13 @@ out:
         return rc;
 }
 
-static int mds_getattr_pack_msg(struct ptlrpc_request *req, 
-                               struct inode *inode,
+static int mds_getattr_pack_msg(struct ptlrpc_request *req, struct dentry *de,
                                 int offset)
 {
+        struct inode *inode = de->d_inode;
         struct mds_obd *mds = mds_req2mds(req);
         struct mds_body *body;
-        int rc = 0, size[2] = {sizeof(*body)}, bufcount = 1;
+        int rc = 0, size[4] = {sizeof(*body)}, bufcount = 1;
         ENTRY;
 
         body = lustre_msg_buf(req->rq_reqmsg, offset, sizeof (*body));
@@ -853,8 +964,6 @@ static int mds_getattr_pack_msg(struct ptlrpc_request *req,
                 down(&inode->i_sem);
                 rc = fsfilt_get_md(req->rq_export->exp_obd, inode, NULL, 0);
                 up(&inode->i_sem);
-                CDEBUG(D_INODE, "got %d bytes MD data for inode %lu\n",
-                       rc, inode->i_ino);
                 if (rc < 0) {
                         if (rc != -ENODATA)
                                 CERROR("error getting inode %lu MD: rc = %d\n",
@@ -876,6 +985,42 @@ static int mds_getattr_pack_msg(struct ptlrpc_request *req,
                 bufcount++;
                 CDEBUG(D_INODE, "symlink size: %Lu, reply space: %d\n",
                        inode->i_size + 1, body->eadatasize);
+        } else if ((body->valid & OBD_MD_FLEA)) {
+                char *ea_name = lustre_msg_string(req->rq_reqmsg, 
+                                                  offset + 1, 0);
+                rc = -EOPNOTSUPP;
+                if (inode->i_op && inode->i_op->getxattr) 
+                        rc = inode->i_op->getxattr(de, ea_name, NULL, 0);
+                
+                if (rc < 0) {
+                        if (rc != -ENODATA)
+                                CERROR("error getting inode %lu EA: rc = %d\n",
+                                       inode->i_ino, rc);
+                        size[bufcount] = 0;
+                } else {
+                        size[bufcount] = min_t(int, body->eadatasize, rc);
+                }
+                bufcount++;
+        } else if (body->valid & OBD_MD_FLEALIST) {
+                rc = -EOPNOTSUPP;
+                if (inode->i_op && inode->i_op->getxattr) 
+                        rc = inode->i_op->listxattr(de, NULL, 0);
+
+                if (rc < 0) {
+                        if (rc != -ENODATA)
+                                CERROR("error getting inode %lu EA: rc = %d\n",
+                                       inode->i_ino, rc);
+                        size[bufcount] = 0;
+                } else {
+                        size[bufcount] = min_t(int, body->eadatasize, rc);
+                }
+                bufcount++;
+        }
+        
+        /* may co-exist with OBD_MD_FLEASIZE */
+        if (body->valid & OBD_MD_FLACL_ACCESS) {
+                size[bufcount++] = 4;
+                size[bufcount++] = xattr_acl_size(LL_ACL_MAX_ENTRIES);
         }
 
         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_GETATTR_PACK)) {
@@ -935,7 +1080,7 @@ static int mds_getattr_lock(struct ptlrpc_request *req, int offset,
         struct mds_req_sec_desc *rsd;
         struct mds_body *body;
         struct dentry *dparent = NULL, *dchild = NULL;
-        struct lvfs_ucred uc;
+        struct lvfs_ucred uc = {NULL, NULL,};
         struct lustre_handle parent_lockh[2] = {{0}, {0}};
         unsigned int namesize;
         int rc = 0, cleanup_phase = 0, resent_req = 0, update_mode, reply_offset;
@@ -950,7 +1095,6 @@ static int mds_getattr_lock(struct ptlrpc_request *req, int offset,
                 CERROR("Can't unpack security desc\n");
                 RETURN(-EFAULT);
         }
-        mds_squash_root(mds, rsd, &req->rq_peer.peer_id.nid); 
 
         /* swab now, before anyone looks inside the request. */
         body = lustre_swab_reqbuf(req, offset, sizeof(*body),
@@ -981,7 +1125,7 @@ static int mds_getattr_lock(struct ptlrpc_request *req, int offset,
                 reply_offset = 0;
         }
 
-        rc = mds_init_ucred(&uc, rsd);
+        rc = mds_init_ucred(&uc, req, rsd);
         if (rc) {
                 CERROR("can't init ucred\n");
                 GOTO(cleanup, rc);
@@ -1084,18 +1228,30 @@ static int mds_getattr_lock(struct ptlrpc_request *req, int offset,
                          id_fid(&body->id1), (unsigned long)id_group(&body->id1),
                          child_lockh->cookie);
 
-                dparent = mds_id2dentry(obd, &body->id1, NULL);
-                LASSERT(dparent);
-
-                dchild = ll_lookup_one_len(name, dparent, namesize - 1);
-                if (IS_ERR(dchild)) {
-                        DEBUG_REQ(D_ERROR, req, "resent, not enqueuing new locks");
-                        CDEBUG(D_ERROR, "lock against [%lu:%lu]/%*s\n",
-                               (unsigned long) id_ino(&body->id1),
-                               (unsigned long) id_gen(&body->id1),
-                               namesize - 1, name);
+                if (name) {
+                        /* usual named request */
+                        dparent = mds_id2dentry(obd, &body->id1, NULL);
+                        LASSERT(!IS_ERR(dparent));
+                        dchild = ll_lookup_one_len(name, dparent, namesize - 1);
+                        if (IS_ERR(dchild)) {
+                                DEBUG_REQ(D_ERROR, req, "resent, not enqueuing new locks");
+                                CDEBUG(D_ERROR, "lock against [%lu:%lu]/%*s\n",
+                                                (unsigned long) id_ino(&body->id1),
+                                                (unsigned long) id_gen(&body->id1),
+                                                namesize - 1, name);
+                        }
+                        LASSERT(!IS_ERR(dchild));
+                } else {
+                        /* client wants to get attr. by id */
+                        dchild = mds_id2dentry(obd, &body->id1, NULL);
+                        if (IS_ERR(dchild)) {
+                                DEBUG_REQ(D_ERROR, req, "resent, not enqueuing new locks");
+                                CDEBUG(D_ERROR, "lock against [%lu:%lu]\n",
+                                                (unsigned long) id_ino(&body->id1),
+                                                (unsigned long) id_gen(&body->id1));
+                        }
+                        LASSERT(!IS_ERR(dchild));
                 }
-                LASSERT(!IS_ERR(dchild));
                 LDLM_LOCK_PUT(granted_lock);
         }
 
@@ -1117,14 +1273,14 @@ static int mds_getattr_lock(struct ptlrpc_request *req, int offset,
                 if (dchild->d_flags & DCACHE_CROSS_REF)
                         rc = mds_getattr_pack_msg_cf(req, dchild, offset);
                 else
-                        rc = mds_getattr_pack_msg(req, dchild->d_inode, offset);
+                        rc = mds_getattr_pack_msg(req, dchild, offset);
                 if (rc != 0) {
                         CERROR ("mds_getattr_pack_msg: %d\n", rc);
                         GOTO (cleanup, rc);
                 }
         }
 
-        rc = mds_getattr_internal(obd, dchild, req, body, reply_offset);
+        rc = mds_getattr_internal(obd, dchild, req, offset, body, reply_offset);        
         GOTO(cleanup, rc); /* returns the lock to the client */
 
  cleanup:
@@ -1145,6 +1301,7 @@ static int mds_getattr_lock(struct ptlrpc_request *req, int offset,
                 l_dput(dchild);
         case 1:
                 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, &uc);
+        default:
                 mds_exit_ucred(&uc);
         }
         return rc;
@@ -1157,7 +1314,7 @@ static int mds_getattr(struct ptlrpc_request *req, int offset)
         struct dentry *de;
         struct mds_req_sec_desc *rsd;
         struct mds_body *body;
-        struct lvfs_ucred uc;
+        struct lvfs_ucred uc = {NULL, NULL,};
         int rc = 0;
         ENTRY;
 
@@ -1176,8 +1333,9 @@ static int mds_getattr(struct ptlrpc_request *req, int offset)
 
         MD_COUNTER_INCREMENT(obd, getattr);
 
-        rc = mds_init_ucred(&uc, rsd);
+        rc = mds_init_ucred(&uc, req, rsd);
         if (rc) {
+                mds_exit_ucred(&uc);
                 CERROR("can't init ucred\n");
                 RETURN(rc);
         }
@@ -1189,14 +1347,13 @@ static int mds_getattr(struct ptlrpc_request *req, int offset)
                 GOTO(out_pop, rc);
         }
 
-        rc = mds_getattr_pack_msg(req, de->d_inode, offset);
+        rc = mds_getattr_pack_msg(req, de, offset);
         if (rc != 0) {
                 CERROR("mds_getattr_pack_msg: %d\n", rc);
                 GOTO(out_pop, rc);
         }
 
-        req->rq_status = mds_getattr_internal(obd, de, req, body, 0);
-
+        req->rq_status = mds_getattr_internal(obd, de, req, offset, body, 0);
         l_dput(de);
 
         EXIT;
@@ -1306,7 +1463,6 @@ out:
 static int mds_readpage(struct ptlrpc_request *req, int offset)
 {
         struct obd_device *obd = req->rq_export->exp_obd;
-        struct mds_obd *mds = &obd->u.mds;
         struct vfsmount *mnt;
         struct dentry *de;
         struct file *file;
@@ -1314,7 +1470,7 @@ static int mds_readpage(struct ptlrpc_request *req, int offset)
         struct mds_body *body, *repbody;
         struct lvfs_run_ctxt saved;
         int rc, size = sizeof(*repbody);
-        struct lvfs_ucred uc;
+        struct lvfs_ucred uc = {NULL, NULL,};
         ENTRY;
 
         rc = lustre_pack_reply(req, 1, &size, NULL);
@@ -1328,7 +1484,6 @@ static int mds_readpage(struct ptlrpc_request *req, int offset)
                 CERROR("Can't unpack security desc\n");
                 GOTO (out, rc = -EFAULT);
         }
-        mds_squash_root(mds, rsd, &req->rq_peer.peer_id.nid); 
 
         body = lustre_swab_reqbuf(req, offset, sizeof(*body),
                                   lustre_swab_mds_body);
@@ -1337,7 +1492,7 @@ static int mds_readpage(struct ptlrpc_request *req, int offset)
                 GOTO (out, rc = -EFAULT);
         }
 
-        rc = mds_init_ucred(&uc, rsd);
+        rc = mds_init_ucred(&uc, req, rsd);
         if (rc) {
                 CERROR("can't init ucred\n");
                 GOTO(out, rc);
@@ -1384,8 +1539,8 @@ out_file:
         filp_close(file, 0);
 out_pop:
         pop_ctxt(&saved, &obd->obd_lvfs_ctxt, &uc);
-        mds_exit_ucred(&uc);
 out:
+        mds_exit_ucred(&uc);
         req->rq_status = rc;
         return 0;
 }
@@ -1479,7 +1634,6 @@ EXPORT_SYMBOL(mds_read_mid);
 int mds_reint(struct ptlrpc_request *req, int offset,
               struct lustre_handle *lockh)
 {
-        struct mds_obd *mds = &req->rq_export->exp_obd->u.mds;
         struct mds_update_record *rec;
         struct mds_req_sec_desc *rsd;
         int rc;
@@ -1494,7 +1648,6 @@ int mds_reint(struct ptlrpc_request *req, int offset,
                 CERROR("Can't unpack security desc\n");
                 GOTO(out, rc = -EFAULT);
         }
-        mds_squash_root(mds, rsd, &req->rq_peer.peer_id.nid); 
 
         rc = mds_update_unpack(req, offset, rec);
         if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_REINT_UNPACK)) {
@@ -1502,7 +1655,7 @@ int mds_reint(struct ptlrpc_request *req, int offset,
                 GOTO(out, req->rq_status = -EINVAL);
         }
 
-        rc = mds_init_ucred(&rec->ur_uc, rsd);
+        rc = mds_init_ucred(&rec->ur_uc, req, rsd);
         if (rc) {
                 CERROR("can't init ucred\n");
                 GOTO(out, rc);
@@ -1510,11 +1663,27 @@ int mds_reint(struct ptlrpc_request *req, int offset,
 
         /* rc will be used to interrupt a for loop over multiple records */
         rc = mds_reint_rec(rec, offset, req, lockh);
-        mds_exit_ucred(&rec->ur_uc);
-        EXIT;
+
+        /* do reverse uid/gid mapping if needed */
+        if (rc == 0 && req->rq_remote &&
+            (rec->ur_opcode == REINT_SETATTR ||
+             rec->ur_opcode == REINT_OPEN)) {
+                struct mds_body *body;
+                int bodyoff;
+
+                if (rec->ur_opcode == REINT_SETATTR)
+                        bodyoff = 0;
+                else /* open */
+                        bodyoff = (offset == 3 ? 1 : 0);
+                body = lustre_msg_buf(req->rq_repmsg, bodyoff, sizeof(*body));
+                LASSERT(body);
+
+                mds_reverse_map_ugid(req, body);
+        }
  out:
+        mds_exit_ucred(&rec->ur_uc);
         OBD_FREE(rec, sizeof(*rec));
-        return rc;
+        RETURN(rc);
 }
 
 static int mds_filter_recovery_request(struct ptlrpc_request *req,
@@ -1655,7 +1824,7 @@ static int mdt_obj_create(struct ptlrpc_request *req)
          * this only serve to inter-mds request, don't need check group database
          * here. --ericm.
          */
-        uc.luc_ghash = NULL;
+        uc.luc_lsd = NULL;
         uc.luc_ginfo = NULL;
         uc.luc_uid = body->oa.o_uid;
         uc.luc_fsuid = body->oa.o_uid;
@@ -1891,7 +2060,6 @@ cleanup:
 
         l_dput(new);
         pop_ctxt(&saved, &obd->obd_lvfs_ctxt, &uc);
-        mds_put_group_entry(mds, uc.luc_ghash);
         return rc;
 }
 
@@ -2008,6 +2176,38 @@ static int mdt_set_info(struct ptlrpc_request *req)
         RETURN(-EINVAL);
 }
 
+static int mds_init_export_data(struct ptlrpc_request *req)
+{
+        struct mds_export_data *med = &req->rq_export->u.eu_mds_data;
+        __u32 *nllu;
+
+        nllu = lustre_msg_buf(req->rq_reqmsg, 4, sizeof(__u32) * 2);
+        if (nllu == NULL) {
+                CERROR("failed to extract nllu, use 99:99\n");
+                med->med_nllu = 99;
+                med->med_nllg = 99;
+        } else {
+                if (lustre_msg_swabbed(req->rq_reqmsg)) {
+                        __swab32s(&nllu[0]);
+                        __swab32s(&nllu[1]);
+                }
+                med->med_nllu = nllu[0];
+                med->med_nllg = nllu[1];
+        }
+
+        if (req->rq_remote) {
+                CWARN("exp %p, peer "LPX64": set as remote\n",
+                       req->rq_export, req->rq_peer.peer_id.nid);
+                med->med_local = 0;
+        } else
+                med->med_local = 1;
+
+        LASSERT(med->med_idmap == NULL);
+        spin_lock_init(&med->med_idmap_lock);
+
+        return 0;
+}
+
 static int mds_msg_check_version(struct lustre_msg *msg)
 {
         int rc;
@@ -2066,6 +2266,11 @@ static int mds_msg_check_version(struct lustre_msg *msg)
                         CERROR("bad opc %u version %08x, expecting %08x\n",
                                msg->opc, msg->version, LUSTRE_OBD_VERSION);
                 break;
+        case SEC_INIT:
+        case SEC_INIT_CONTINUE:
+        case SEC_FINI:
+                rc = 0;
+                break;
         default:
                 CERROR("MDS unknown opcode %d\n", msg->opc);
                 rc = -ENOTSUPP;
@@ -2093,6 +2298,13 @@ int mds_handle(struct ptlrpc_request *req)
                 RETURN(rc);
         }
 
+        /* Security opc should NOT trigger any recovery events */
+        if (req->rq_reqmsg->opc == SEC_INIT ||
+            req->rq_reqmsg->opc == SEC_INIT_CONTINUE ||
+            req->rq_reqmsg->opc == SEC_FINI) {
+                GOTO(out, rc = 0);
+        }
+
         LASSERT(current->journal_info == NULL);
         /* XXX identical to OST */
         if (req->rq_reqmsg->opc != MDS_CONNECT) {
@@ -2148,9 +2360,11 @@ int mds_handle(struct ptlrpc_request *req)
                 DEBUG_REQ(D_INODE, req, "connect");
                 OBD_FAIL_RETURN(OBD_FAIL_MDS_CONNECT_NET, 0);
                 rc = target_handle_connect(req);
-                if (!rc)
+                if (!rc) {
                         /* Now that we have an export, set mds. */
                         mds = mds_req2mds(req);
+                        mds_init_export_data(req);
+                }
                 break;
 
         case MDS_DISCONNECT:
@@ -2690,10 +2904,8 @@ static int mds_setup(struct obd_device *obd, obd_count len, void *buf)
         /*
          * here we use "iopen_nopriv" hardcoded, because it affects MDS utility
          * and the rest of options are passed by mount options. Probably this
-         * should be moved to somewhere else like startup scripts or lconf.
-         */
-        sprintf(options, "iopen_nopriv");
-
+         * should be moved to somewhere else like startup scripts or lconf. */
+        sprintf(options, "iopen_nopriv,acl,user_xattr");
         if (lcfg->lcfg_inllen4 > 0 && lcfg->lcfg_inlbuf4)
                 sprintf(options + strlen(options), ",%s",
                         lcfg->lcfg_inlbuf4);
@@ -3002,6 +3214,7 @@ static int mds_precleanup(struct obd_device *obd, int flags)
         RETURN(rc);
 }
 
+extern void lgss_svc_cache_purge_all(void);
 static int mds_cleanup(struct obd_device *obd, int flags)
 {
         struct mds_obd *mds = &obd->u.mds;
@@ -3046,9 +3259,65 @@ static int mds_cleanup(struct obd_device *obd, int flags)
         dev_clear_rdonly(2);
         fsfilt_put_ops(obd->obd_fsops);
 
+#ifdef ENABLE_GSS
+        /* XXX */
+        lgss_svc_cache_purge_all();
+#endif
         RETURN(0);
 }
 
+static int set_security(const char *value, char **sec)
+{
+        int rc = 0;
+
+        if (!strcmp(value, "null"))
+                *sec = "null";
+        else if (!strcmp(value, "krb5i"))
+                *sec = "krb5i";
+        else if (!strcmp(value, "krb5p"))
+                *sec = "krb5p";
+        else {
+                CERROR("Unrecognized value, force use NULL\n");
+                rc = -EINVAL;
+        }
+
+        return rc;
+}
+
+static int mds_process_config(struct obd_device *obd, obd_count len, void *buf)
+{
+        struct lustre_cfg *lcfg = buf;
+        struct mds_obd *mds = &obd->u.mds;
+        int rc = 0;
+        ENTRY;
+
+        switch(lcfg->lcfg_command) {
+        case LCFG_SET_SECURITY: {
+                if (!lcfg->lcfg_inllen1 || !lcfg->lcfg_inllen2)
+                        GOTO(out, rc = -EINVAL);
+
+                if (!strcmp(lcfg->lcfg_inlbuf1, "mds_mds_sec"))
+                        rc = set_security(lcfg->lcfg_inlbuf2,
+                                          &mds->mds_mds_sec);
+                else if (!strcmp(lcfg->lcfg_inlbuf1, "mds_ost_sec"))
+                        rc = set_security(lcfg->lcfg_inlbuf2,
+                                          &mds->mds_ost_sec);
+                else {
+                        CERROR("Unrecognized key\n");
+                        rc = -EINVAL;
+                }
+                break;
+        }
+        default: {
+                CERROR("Unknown command: %d\n", lcfg->lcfg_command);
+                GOTO(out, rc = -EINVAL);
+
+        }
+        }
+out:
+        RETURN(rc);
+}
+
 static void fixup_handle_for_resent_req(struct ptlrpc_request *req,
                                         int offset,
                                         struct ldlm_lock *new_lock,
@@ -3126,10 +3395,11 @@ static int mds_intent_policy(struct ldlm_namespace *ns,
         struct lustre_handle lockh[2] = {{0}, {0}};
         struct ldlm_lock *new_lock = NULL;
         int getattr_part = MDS_INODELOCK_UPDATE;
-        int rc, repsize[4] = { sizeof(struct ldlm_reply),
-                               sizeof(struct mds_body),
-                               mds->mds_max_mdsize,
-                               mds->mds_max_cookiesize };
+        int rc, reply_buffers;
+        int repsize[5] = {sizeof(struct ldlm_reply),
+                          sizeof(struct mds_body),
+                          mds->mds_max_mdsize};
+
         int offset = MDS_REQ_INTENT_REC_OFF; 
         ENTRY;
 
@@ -3153,7 +3423,14 @@ static int mds_intent_policy(struct ldlm_namespace *ns,
 
         LDLM_DEBUG(lock, "intent policy, opc: %s", ldlm_it2str(it->opc));
 
-        rc = lustre_pack_reply(req, 3, repsize, NULL);
+        reply_buffers = 3;
+        if (it->opc & ( IT_OPEN | IT_GETATTR | IT_LOOKUP | IT_CHDIR )) {
+                reply_buffers = 5;
+                repsize[3] = 4;
+                repsize[4] = xattr_acl_size(LL_ACL_MAX_ENTRIES);
+        }
+
+        rc = lustre_pack_reply(req, reply_buffers, repsize, NULL);
         if (rc)
                 RETURN(req->rq_status = rc);
 
@@ -3488,6 +3765,7 @@ static struct obd_ops mds_obd_ops = {
         .o_setup           = mds_setup,
         .o_precleanup      = mds_precleanup,
         .o_cleanup         = mds_cleanup,
+        .o_process_config  = mds_process_config,
         .o_postrecov       = mds_postrecov,
         .o_statfs          = mds_obd_statfs,
         .o_iocontrol       = mds_iocontrol,
@@ -3514,7 +3792,7 @@ static int __init mds_init(void)
 {
         struct lprocfs_static_vars lvars;
 
-        mds_group_hash_init();
+        mds_init_lsd_cache();
 
         lprocfs_init_multi_vars(0, &lvars);
         class_register_type(&mds_obd_ops, NULL, lvars.module_vars,
@@ -3528,7 +3806,7 @@ static int __init mds_init(void)
 
 static void /*__exit*/ mds_exit(void)
 {
-        mds_group_hash_cleanup();
+        mds_cleanup_lsd_cache();
 
         class_unregister_type(LUSTRE_MDS_NAME);
         class_unregister_type(LUSTRE_MDT_NAME);